import { map, take, takeUntil } from 'rxjs/operators';
import { ClientOptions, MqttClient } from 'src/app/shared/mqtt/mqtt';
import {
    Datasource,
    DatasourceConfig,
    DatasourceTopicOption,
} from '../datasource';
import { DatasourceType } from '../datasource-type';

export class DatasourceMQTTSubscribe extends Datasource {
    client: MqttClient;

    get topics() {
        const prefix = this.settings.topicPrefix;
        const topics = this.settings.topics.split(' ');
        return topics.map((t) => `${prefix}${t}`).filter((t) => !!t);
    }

    constructor(config: DatasourceConfig) {
        super(DatasourceType.MQTT_SUBSCRIBER, config);
    }

    async connect() {
        await this.connectMqtt();
        this.topicOptions = this.settings.topics
            .split(' ')
            .filter((topic) => topic != '#')
            .map(
                (topic): DatasourceTopicOption => ({
                    direction: 'subscribe',
                    name: topic,
                    group: 'datasource.settings.subscribe',
                    topicSegments: [this.settings.topicPrefix + topic],
                }),
            );
        this.topics.forEach((topic: string) => {
            this.client
                .observe(topic)
                .pipe(
                    takeUntil(this.disconnect$),
                    map(({ topic, payload }) => {
                        const value = new TextDecoder().decode(payload);
                        topic = topic.replace(this.settings.topicPrefix, '');
                        try {
                            return { [topic]: JSON.parse(value) };
                        } catch (e) {
                            return { [topic]: value };
                        }
                    }),
                )
                .subscribe((data) => this.data$.next(data));
        });
    }

    disconnect() {
        super.disconnect();
        if (this.client) {
            this.client.disconnect();
            delete this.client;
        }
    }

    async connectMqtt() {
        const options: ClientOptions = {
            protocol: this.settings.use_ssl ? 'wss' : 'ws',
            hostname: this.settings.server,
            port: this.settings.port,
        };
        if (this.client) {
            this.client.disconnect();
        }
        this.client = new MqttClient(options);
        this.client.abnormalDisconnect$
            .pipe(takeUntil(this.disconnect$), take(1))
            .subscribe(() => this.connectMqtt());
        await this.client.connect({
            username: this.settings.username,
            password: this.settings.password,
        });
    }
}
