import * as Mqtt from 'mqtt/dist/mqtt';
import { SubscribeProps } from '../models/mqtt';

declare global {
    interface Window {
        mqttClient: Mqtt.Client;
    }
}

export let mqttClient: Mqtt.Client;

let activeSubspriptions: Array<{topic: string; messageListenerList: Array<Mqtt.OnMessageCallback>}> = [];
let pendingSubspriptions: Array<{topic: string; onMessageHandlerList: Array<(message: string) => void>}> = [];

export const subscribe = ({ topic, onMessageHandler, subscribeOptions }: SubscribeProps) => {
    if (mqttClient?.connected) {
        const messageListener = (messageTopic, messagePayload) => {
            (messageTopic === topic) && onMessageHandler(messagePayload.toString());
        };

        mqttClient.subscribe(topic, subscribeOptions);
        mqttClient.on('message', messageListener);

        window.mqttClient = mqttClient;

        // Update activeSubscriptions list on subscribe: add a new subscription if such topic does not exist
        // otherwise, add a new messageListener to the listeners list
        const subscriptionAlreadyExist = activeSubspriptions.find(subscription => subscription.topic === topic);

        subscriptionAlreadyExist
            ? subscriptionAlreadyExist.messageListenerList.push(messageListener)
            : activeSubspriptions.push({ topic, messageListenerList: [ messageListener ] });
    } else {
    // Update pendingSubspriptions list if mqttClient is not connected
        const subscriptionAlreadyInPending = pendingSubspriptions.find(subscription => subscription.topic === topic);

        subscriptionAlreadyInPending
            ? subscriptionAlreadyInPending.onMessageHandlerList.push(onMessageHandler)
            : pendingSubspriptions.push({ topic, onMessageHandlerList: [ onMessageHandler ] });
    }
};

export const unsubscribe = (topic: string) => {
    if (mqttClient?.connected) {
        const activeSubscription = activeSubspriptions.find(subscription => subscription.topic === topic);

        if (activeSubscription) {
            mqttClient.unsubscribe(topic);
            // Remove all messageListeners related to the unsubscribe topic
            activeSubscription.messageListenerList.forEach(listener => mqttClient.removeListener('message', listener));

            activeSubspriptions = activeSubspriptions.filter(subscription => subscription.topic !== topic);
        }
    } else {
        pendingSubspriptions = pendingSubspriptions.filter(subscription => subscription.topic !== topic);
    }

    mqttClient.end(true);
};

let openingConnection = false;

export const initMqttConnection = (jwtToken, url) => {
    if (!openingConnection) {
        openingConnection = true;

        const connectionOptions: Mqtt.IClientOptions = {
            clean: false,
            clientId: 'EMP_' + Math.random().toString(16).substr(2, 8),
            username: jwtToken,
            keepalive: 30, // seconds
        };

        mqttClient = Mqtt.connect(url, connectionOptions);
        setupHandlers();
    }
};

const setupHandlers = () => {
    mqttClient.on('connect', () => {
        console.log('Mqtt: Connect');
        pendingSubspriptions.forEach(subscription => {
            subscription.onMessageHandlerList.forEach(onMessageHandler => subscribe({ topic: subscription.topic, onMessageHandler }));
        });

        pendingSubspriptions = [];
    });

    mqttClient.on('reconnect', async () => {
        console.log('Mqtt: Reconnecting');
    });

    mqttClient.on('close', async () => {
        openingConnection = false;
        console.log('Mqtt: Close');
    });

    mqttClient.on('disconnect', () => {
        console.log('Mqtt: Disconnect');
    });

    mqttClient.on('end', () => {
        console.log('Mqtt: End');
    });

    // try unsubscribe on the app unload
    window.addEventListener('beforeunload', finalize);
};

// unsubscribe topics during a shutdown
const finalize = () => {
    activeSubspriptions.forEach(subscription => unsubscribe(subscription.topic));
    mqttClient.end();
};
