import { take, takeUntil } from 'rxjs/operators';
import {
    ClientOptions,
    MqttClient,
    OutgoingPacket,
} from 'src/app/shared/mqtt/mqtt';
import {
    Datasource,
    DatasourceConfig,
    DatasourceTopicOption,
} from '../datasource';
import { DatasourceType } from '../datasource-type';

export class DatasourceMQTTPublish extends Datasource {
    client: MqttClient;

    constructor(config: DatasourceConfig) {
        super(DatasourceType.MQTT_PUBLISHER, config);
    }

    async connect() {
        await this.connectMqtt();
        this.topicOptions = (this.settings.topics ?? '').split(' ').map(
            (topic): DatasourceTopicOption => ({
                direction: 'publish',
                name: topic,
                group: 'datasource.settings.publish',
                topicSegments: [topic],
            }),
        );
    }

    async disconnect() {
        await super.disconnect();
        if (this.client) {
            this.client.disconnect();
            delete this.client;
        }
    }

    async connectMqtt() {
        const options: ClientOptions = {
            protocol: this.settings.use_ssl ? 'wss' : 'ws',
            hostname: this.settings.server,
            port: this.settings.port,
        };
        if (this.client) {
            this.client.disconnect();
        }
        this.client = new MqttClient(options);
        this.client.abnormalDisconnect$
            .pipe(takeUntil(this.disconnect$), take(1))
            .subscribe(() => this.connectMqtt());

        await this.client.connect({
            username: this.settings.username,
            password: this.settings.password,
        });
    }

    async send(topic: string, message: Uint8Array | string) {
        const packet: OutgoingPacket = {
            topic: this.settings.topicPrefix + topic,
        };
        if (message instanceof Uint8Array) {
            packet.payload = message;
        } else {
            packet.text = message;
        }
        return await this.client.publish(packet);
    }
}
