import { Injectable } from '@angular/core';
import * as Rx from 'rxjs';

@Injectable({
    providedIn: 'root',
})
export class WebsocketService
{
    public StateChanged: Rx.Subject<string> = new Rx.Subject<string>();

    private ws: WebSocket;

    constructor()
    {
        //   console.log("WebsocketService constructor");
    }

    public IsWebSocketOk(): boolean
    {
        if (!this.ws)
        {
            return false;
        }

        return this.ws.readyState == WebSocket.CONNECTING ||
            this.ws.readyState == WebSocket.OPEN
    }
    public IsWebSocketOpen(): boolean
    {
        if (!this.ws)
        {
            return false;
        }

        return this.ws.readyState == WebSocket.OPEN
    }

    // there is 1 web socket connection and it is shared with all components
    private subject: Rx.Subject<MessageEvent>;

    public connect(webSocket: WebSocket): Rx.Subject<MessageEvent>
    {
        // if we are not created...
        if (!this.subject)
        {
            // create the subject
            this.subject = this.create(webSocket);

            //console.log("Successfully connected: " + url);
        }
        return this.subject;
    }

    public close()
    {
        if (this.ws)
        {
            this.ws.close();
            this.ws = null;
        }
        this.subject = null;
    }

    private create(webSocket: WebSocket): Rx.Subject<MessageEvent>
    {
        this.ws = webSocket;
        this.ws.binaryType = "arraybuffer";

        this.ws.onopen = () =>
        {
            this.StateChanged.next("opened");
            // console.log("Successfully connected: " + this.ws.url);
        }

        // create an observable object
        let observable = Rx.Observable.create(
            // the observable's subscribe function connects the websocket onmessage to the observer's next function
            (obs: Rx.Observer<MessageEvent>) =>
            {
                this.ws.onmessage = obs.next.bind(obs);
                this.ws.onerror = obs.error.bind(obs);
                this.ws.onclose = obs.complete.bind(obs);

                return this.ws.close.bind(this.ws); // return the 'unsubscribe' function to close the websocket
            });

        // the observer sends the next item by converting the data to JSON and sending it to the websocket.
        const observer = {
            next: (data: Object) =>
            {
                if (this.ws.readyState === WebSocket.OPEN)
                {
                    this.ws.send(JSON.stringify(data));
                }
            },
            error: (err) =>
            {
                console.log("observer error");
            },
            complete: () =>
            {
                console.log("observer complete");
            },
        };
        return Rx.Subject.create(observer, observable);
    }
}
