import { Injectable } from '@angular/core';
import { IMqttMessage, IOnErrorEvent, MqttService } from 'ngx-mqtt';
import { Observable, Subscription, takeWhile, withLatestFrom } from 'rxjs';
import { MessagePayload } from '../../models/api/message/payload.message';
import {
  configLoaded,
  selectConfigState,
} from '../../store/selectors/config.selectors';
import { select, Store } from '@ngrx/store';

import { filter } from 'rxjs/operators';
import { ConfigState } from '../../store/reducers/config.reducers';
import { selectAuthState } from '../../store/selectors/auth.selectors';
import { AuthState } from '../../store/reducers/auth.reducers';
import { mqttIsConnected } from '../../store/selectors/mqtt.selectors';
import { MqttActions } from '../../store/actions';
import { userLoaded } from '../../store/selectors/user.selectors';
import {
  MqttEventType,
  MqttStatusType,
  MqttTopicStatusType,
} from '../../models/mqtt.model';
import { AuthService } from '../auth/auth.service';

interface Packet {
  cmd: string;
  dup: boolean;
  length: number;
  payload: Uint8Array;
  qos: number;
  retain: boolean;
  topic: string;
}

@Injectable({
  providedIn: 'root',
})
export class MessageService {
  private subscriptions: Map<string, Subscription> = null;
  private auth: AuthState = null;

  constructor(
    private mqttService: MqttService,
    private authService: AuthService,
    private store: Store
  ) {
    this.subscriptions = new Map<string, Subscription>();

    this.store
      .pipe(select(selectAuthState))
      .subscribe((authState: AuthState) => {
        this.auth = authState;
      });

    this.store
      .pipe(
        select(userLoaded),
        filter((userLoaded) => userLoaded === true),
        withLatestFrom(
          this.store.pipe(select(configLoaded)),
          this.store.pipe(select(mqttIsConnected)),
          this.store.pipe(select(selectConfigState)),
          this.store.pipe(select(selectAuthState))
        ),
        takeWhile(
          ([userLoaded, configLoaded, mqttIsConnected, config, auth]: [
            boolean,
            boolean,
            boolean,
            ConfigState,
            AuthState
          ]) => !mqttIsConnected
        )
      )
      .subscribe(
        ([userLoaded, configLoaded, mqttIsConnected, config, auth]: [
          boolean,
          boolean,
          boolean,
          ConfigState,
          AuthState
        ]) => {
          if (userLoaded && configLoaded && !mqttIsConnected) {
            this.mqttService.onConnect.subscribe(
              (event) => {
                this.store.dispatch(
                  MqttActions.event({ event: MqttEventType.ONCONNECT })
                );

                this.store.dispatch(
                  MqttActions.updateStatus({ status: MqttStatusType.CONNECTED })
                );
              },
              (err) => {
                console.log('>>> MQTT ERROR CAUGHT IN MESSAGE SERVICE >>>');
                console.log(err);
              },
              () => {}
            );

            this.mqttService.onError.subscribe(
              (error: Error & { code: number }) => {
                if (error.code === 5) {
                  console.log('>>> RECONNECTING MQTT UNAUTH >>>');
                  this.disconnect();
                  // this.connect(auth, config);
                }
                this.store.dispatch(
                  MqttActions.event({ event: MqttEventType.ONERROR })
                );

                this.store.dispatch(
                  MqttActions.updateStatus({ status: MqttStatusType.ERROR })
                );
              },
              (err) => {
                console.log(err);
              },
              () => {}
            );

            this.mqttService.onMessage.subscribe(
              (packet: Packet) => {
                this.store.dispatch(
                  MqttActions.topicMessageReceived({
                    topic: packet.topic,
                  })
                );
              },
              (err) => {
                console.log(err);
              },
              () => {}
            );

            this.mqttService.onClose.subscribe(
              () => {
                this.store.dispatch(
                  MqttActions.event({ event: MqttEventType.ONCLOSE })
                );

                this.store.dispatch(
                  MqttActions.updateStatus({
                    status: MqttStatusType.DISCONNECTED,
                  })
                );
              },
              (err) => {
                console.log(err);
              },
              () => {}
            );

            this.mqttService.onEnd.subscribe(
              () => {
                this.store.dispatch(
                  MqttActions.event({ event: MqttEventType.ONEND })
                );
              },
              (err) => {
                console.log(err);
              },
              () => {}
            );

            this.mqttService.onOffline.subscribe(
              () => {
                this.store.dispatch(
                  MqttActions.event({ event: MqttEventType.ONOFFLINE })
                );
              },
              (err) => {
                console.log(err);
              },
              () => {}
            );

            this.mqttService.onPacketreceive.subscribe(
              () => {
                // this.store.dispatch(
                //   MqttActions.event({ event: MqttEventType.ONPACKETRECEIVE })
                // );
              },
              (err) => {
                console.log(err);
              },
              () => {}
            );

            this.mqttService.onPacketsend.subscribe(
              () => {
                // this.store.dispatch(
                //   MqttActions.event({ event: MqttEventType.ONPACKETSEND })
                // );
              },
              (err) => {
                console.log(err);
              },
              () => {}
            );

            this.mqttService.onReconnect.subscribe(
              () => {
                // this.store.dispatch(
                //   MqttActions.event({ event: MqttEventType.ONRECONNECT })
                // );
              },
              (err) => {
                console.log(err);
              },
              () => {}
            );

            this.mqttService.onSuback.subscribe(
              (event) => {
                this.store.dispatch(
                  MqttActions.updateTopicStatus({
                    topic: event.filter,
                    status: event.granted
                      ? MqttTopicStatusType.SUBSCRIBED
                      : MqttTopicStatusType.SUBSCRIPTION_DENIED,
                  })
                );
                // this.store.dispatch(
                //   MqttActions.event({ event: MqttEventType.ONSUBACK })
                // );
                // this.store.dispatch(
                //   MqttActions.subscribeTopic({
                //     topic: { name: event.filter, granted: event.granted },
                //   })
                // );
              },
              (err) => {
                console.log(err);
              },
              () => {}
            );

            this.connect(auth, config);
          }
        }
      );
  }

  connect(auth: AuthState, config: ConfigState) {
    try {
      this.mqttService.connect({
        username: 'Bearer',
        password: auth.jwt,
        hostname: config.hostname,
        port: config.port,
        protocol: config.development ? 'ws' : 'wss',
        path: '/ws',
        keepalive: 30,
        reconnectPeriod: 1000,
        transformWsUrl: (url: string, options, client): string => {
          if (this.auth) {
            client.options.password = this.auth.jwt;
          }
          return url;
        },
      });
    } catch (err) {
      console.log(err);
    }
  }

  disconnect() {
    try {
      this.mqttService.disconnect(true);
    } catch (err) {
      console.log(err);
    }
  }

  topic<T extends MessagePayload<any>>(
    type: {
      new (...args: any[]): T;
    },
    ...args
  ): Observable<T> {
    return new Observable<T>((subscriber) => {
      // @ts-ignore
      const payload = new type(...args);
      let subscription = this.subscriptions.get(payload.topic);

      if (subscription == null) {
        try {
          this.store.dispatch(
            MqttActions.createTopic({
              payload,
              status: MqttTopicStatusType.CREATED,
            })
          );

          subscription = this.mqttService
            .observe(payload.topic, {
              qos: 1,
            })
            .subscribe(
              (message: IMqttMessage) => {
                subscriber.next(
                  JSON.parse(
                    String.fromCharCode.apply(
                      null,
                      new Uint8Array(message.payload)
                    )
                  ) as T
                );
              },
              (error) => {},
              () => {}
            );
          this.subscriptions.set(payload.topic, subscription);
        } catch (err) {}
      }

      return subscription;
    });
  }

  unsubscribe<T extends MessagePayload<any>>(
    subscriber: Subscription,
    type: {
      new (...args: any[]): T;
    },
    ...args
  ) {
    // @ts-ignore
    const payload = new type(...args);

    if (subscriber) {
      subscriber.unsubscribe();

      const subscription = this.subscriptions.get(payload.topic);
      if (subscription != null) {
        subscription.unsubscribe();
        this.store.dispatch(
          MqttActions.updateTopicStatus({
            topic: payload.topic,
            status: MqttTopicStatusType.UNSUBSCRIBED,
          })
        );
      }
      this.subscriptions.delete(payload.topic);
    }
  }
}
