import { map } from 'rxjs/operators';
import { Observable, Subject, Subscriber, Subscription, fromEvent } from 'rxjs';
import { WorkerInput, WorkerOutput } from './models';

/**
 * Useful to ignore the first value emitted by the Behavior Subject
 */
type formatterFunction<T, R> = (d: T) => R;

/**
 * Class that listen webworker and resolve messages emiited by the webworker as an Observable
 */
class WorkerRx<T> {

    private _result$ = new Subject<T>();

    public get result$() {
        return this._result$;
    }

    constructor(
        private _worker: Worker,
        private _workerMessageDataFormat?: formatterFunction<any, T>
    ) {
        // workerMessage$
        fromEvent<MessageEvent<T>>(this._worker, 'message').pipe(
            map(({ data }) => {
                if (typeof this._workerMessageDataFormat === 'function') {
                    data = this._workerMessageDataFormat(data);
                }
                return data;
            })
        ).subscribe({
            next: (data) => {
                this._result$.next(data);
            },
            error: (error) => {
                this._result$.error(error);
            },
            complete: () => {
                this._result$.complete();
            },
        });

        // workerError$
        fromEvent<MessageEvent<T>>(this._worker, 'error').subscribe(
            (err) => this._result$.error(err)
        );

        // workerMessageError$
        fromEvent<MessageEvent<T>>(this._worker, 'messageerror').subscribe(
            (err) => this._result$.error(err)
        );
    }

}

const destinationsBySource = new WeakMap<Observable<any>, Set<Subscriber<any>>>();
const destinationById = new Map<string, Subscriber<any>>();
const idByDestination = new WeakMap<Subscriber<any>, string>();
const activeSourceByWorker = new WeakMap<Worker, Set<Observable<any>>>();

/**
 * Custom RxJs operator that execute a function asynchroniously throught a web worker (multithread)
 * @param worker Web worker instance
 * @param dataFormatter - OPTIONAL - A transformation function to adapt the data coming from the previous operation (type T) to the type expected at the entry of the worker function (type U)
 * @param workerMessageDataFormat - OPTIONAL - A transformation function to adapt the data coming from the worker (type any) to the type expected after this operator (type R)
 */
export function workerMap<T, R, U>(
    worker: Worker,
    dataFormatter?: formatterFunction<T, U>,
    workerMessageDataFormat: formatterFunction<any, R> = v => v,
) {

    if (typeof dataFormatter !== 'function') {
        dataFormatter = (v) => v as unknown as U;
    }

    return (source$: Observable<T>) => new Observable<R>(destination$ => {
        let subscription: Subscription;
        if (!activeSourceByWorker.get(worker)?.has(source$)) {
            activeSourceByWorker.set(worker, (activeSourceByWorker.get(worker) ?? new Set()).add(source$));
        }
        destinationsBySource.set(source$, (destinationsBySource.get(source$) ?? new Set()).add(destination$));
        const id = (Math.random() * Date.now()).toString();
        idByDestination.set(destination$, id);
        destinationById.set(id, destination$);
        try {
            worker.onmessage = (ev: MessageEvent<WorkerOutput<any>>) => {
                const { messageId, ...data } = ev.data as WorkerOutput<any>;
                destinationById.get(messageId)?.next(workerMessageDataFormat(data));
            };
            worker.onerror = (ev: ErrorEvent) => destination$.error(ev);

            subscription = source$.pipe(
                map(
                    v => dataFormatter(v)
                ),
            ).subscribe(
                input => {
                    const messageId = idByDestination.get(destination$);
                    const message: WorkerInput<U>['data'] = {
                        messageId,
                        ...input,
                    };
                    worker.postMessage(message);
                }
            );
        } catch (error) {
            destination$.error(error);
        }

        return () => {
            if (subscription) {
                subscription.unsubscribe();
                destinationsBySource.get(source$).delete(destination$);
                const idToDelete = idByDestination.get(destination$);
                idByDestination.delete(destination$);
                destinationById.delete(idToDelete);
                if (!destinationsBySource.get(source$).size) {
                    destinationsBySource.delete(source$);
                    activeSourceByWorker.get(worker).delete(source$);
                }
            }
            if (worker && !activeSourceByWorker.get(worker).size) {
                worker.terminate();
            }
        };
    });
}

/**
 * Experimental - create a webworker during the runtime
 * @param cb Only pure function
 */
export function workerMapFunction<T, R>(cb: (data: any) => void) {

    if (cb.toString().match(/\W*this\W/gm)) {
        throw new Error('workerMapFunction operator support only pure function (no this reference)');
    }

    const workerFunctionBlob = new Blob([
        `const cb = ${cb.toString().replace(/^(\w*)(\([\w,\s]*\)\s?{)/, 'function$2')};
    addEventListener('message', ({data}) => {
      postMessage(cb(data));
    });`
    ], {
        type: 'text/javascript',
    });
    const worker = new Worker(URL.createObjectURL(workerFunctionBlob));
    const workerRx = new WorkerRx<R>(worker);

    return (source$: Observable<T>): Observable<R> => {
        source$.subscribe({
            next(value) {
                worker.postMessage(value);
            },
            error(error) {
                workerRx.result$.error(error);
            },
            complete() {
                workerRx.result$.complete();
                worker.terminate();
            },
        });
        return workerRx.result$;
    };

}
