import { Injectable, Output } from '@angular/core';
import { WebsocketService } from '../websocket/websocket.service';
import { Subject, BehaviorSubject } from 'rxjs';
import { map, count } from 'rxjs/operators';
import { RealTimeUpdate } from '../custom-view-widget/custom-view-widget.component';
import { Inflate } from 'pako';
import { TextDecoder } from 'text-encoding';
import { AccountService } from './account.service';

export class ArcPlusPid
{
  site: string;
  block: string;
  channel: number;
  pid: string;
}

class Subscription
{
  widget: RealTimeUpdate;
  site: string;
  block: string; // Analog/Status/Command/Site
  channel: number;
  pid: string;

  toString(): string
  {
    return this.site + ":" + this.block + ":" + this.channel + ":" + this.pid;
  }
}

class SubscribeOperation
{
  op: string; // operation
  arcPlusPid: ArcPlusPid;
}

class PingOperation
{
  op: string; // operation
  count: number;
}

class UnsubscribeOperation
{
  op: string; // operation
  arcPlusPid: ArcPlusPid;
}

class Update
{
  site: string;
  block: string;
  pid: string; // Analog/Status/Command/Site
  channel: number;
  value: string; // stringified value
}

@Injectable({
  providedIn: 'root'
})
export class DataSubscriptionService
{
  private dataConnection: Subject<any>;
  private subscriptions: Set<Subscription> = new Set();

  /**
   * Returns "unconnected", "opened", "closed", "error".
   */
  @Output() WebSocketEvents: BehaviorSubject<string> = new BehaviorSubject<string>("unconnected");

  /**
   * There is a circuilar dependency bug where a component should be able to have a 
   * DataSubscriptionService and a AccountService,
   * but Angular detects this as a circular dependency.  The workaround is to use just the DataSubscriptionService
   * and use this method to get the AccountService.
   */
  public get AccountService()
  {
    return this.accountService;
  }

  private timerHandles = [];

  ///////////////////////////////////////////////////////////////////////////
  // constructor
  //
  // IN: WebSocketService: 
  ///////////////////////////////////////////////////////////////////////////
  constructor(
    private wss: WebsocketService,
    private accountService: AccountService,
  ) 
  {
    // pass all the web socket state changes up the chain.
    wss.StateChanged.subscribe((str) =>
    {
      // console.log("Web Socket Change: " + str);

      this.WebSocketEvents.next(str);

      // if the web socket opened, resubscribe to all the data
      if (str == "opened")
      {
        this.SubscribeUpdate();
      }
    });

    // when we get a new username, close the previous web socket and start a new one.
    accountService.user.subscribe((user) =>
    {
      if (user != null)
      {
        this.wss.close();

        if (this.firstTime)
        {
          this.firstTime = false;
          this.ConnectToWebSocket();
        }
      }
    });

    // every 30 seconds, resubscribe to the data
    this.timerHandles.push(window.setInterval(() => this.SubscribeUpdate(), 30000));

    // every 5 seconds check that the ws is opened
    this.timerHandles.push(window.setInterval(() => this.checkWebSocket(), 10000));

    // every 10 seconds ping the web socket server
    this.timerHandles.push(window.setInterval(() => this.sendWebSocketPing(), 10000));
  }

  /**
   * Stop all the timers
   */
  public stop()
  {
    this.timerHandles.forEach((t) =>
    {
      clearInterval(t);
    });
  }

  private pingCount = 1;
  private pongFlag = true; // assume the first ping is OK.

  /**
   * Look for a pong from the web socket.
   * If no pong, restart the web socket.
   * Send a ping to the web socket.
   */
  sendWebSocketPing()
  {
    let msg: PingOperation =
    {
      "op": "ping",
      "count": this.pingCount,
    };

    // if the pong flag is still off, the web socket is down.
    if (!this.pongFlag)
    {
      // restart web socket
      this.wss.close();
      // pretend this cycle succeeded so the web socket has time to connect
      this.pongFlag = true;
    }
    else
    {
      // clear the pong flag.  It is set in the receive update method.
      this.pongFlag = false;

      // convert the Subscription to a string and send it to the server.
      this.dataConnection.next(msg);

      this.pingCount++;
    }
  }

  /**
   * Check that the web socket service has an open web socket.
   */
  checkWebSocket(): void
  {
    // if the web socket is not connecting and not opened...
    if (!this.wss.IsWebSocketOk())
    {
      this.WebSocketEvents.next("error");

      // get a copy of the data connection
      const tempDataConnection = this.dataConnection;
      // clear the real reference
      this.dataConnection = null;
      // complete the true data connection.  This should call ConnectToWebSocket
      tempDataConnection.complete();

      // if it didn't call ConnectToWebSocket...
      if (this.dataConnection == null)
      {
        this.ConnectToWebSocket();
      }
    }
    // if the web socket is opened...
    else if (this.wss.IsWebSocketOpen())
    {
      this.WebSocketEvents.next("opened");
    }
    // the connecting state will timeout and we will catch the next state
  }

  private firstTime = true;

  /**
   * Tell the web socket to connect.
   * Subscribe to messages, errors, and complete.
   * Reconnect on errors and complete.
   */
  private ConnectToWebSocket() 
  {
    try
    {
      // connect to /ws.  Pipe the receive data to a Map function.  This will unpack the response data to a plain string.
      this.dataConnection = this.wss.connect(this.GetWebSocket()).pipe(map((response: MessageEvent, index: number) => 
      {
        // uncompress the incoming message, convert to a string, convert to JSON
        let pako: Inflate = new Inflate();
        pako.push(response.data);
        let data = JSON.parse(new TextDecoder("utf-8").decode(pako.result));

        return data;
      })) as Subject<any>;

      // attach a listener to receive updates
      this.dataConnection.subscribe(
        (updateArray: Update[]) => // message handler
        {
          this.UpdateSubscribers(updateArray);
        },
        (err) => // error handler
        {
          // console.log("dataConnection error: " + err);
          this.WebSocketEvents.next("error");

          this.wss.close();
          this.ConnectToWebSocket();
        },
        () => // complete handler
        {
          // console.log("dataConnection complete");
          this.WebSocketEvents.next("closed");

          this.ConnectToWebSocket();
        }
      );
    }
    catch (err)
    {
      console.log(err);
    }
  }

  private UpdateSubscribers(updateArray: Update[])
  {
    // for each incoming update...
    updateArray.forEach((update: Update) => 
    {
      //console.log("subscriptions: " + this.subscriptions.size + " " + new Date().toString());

      switch (update.block)
      {
        case "pong":
          this.pongFlag = true;
          break;

        default:
          // loop through the subscriptions...
          this.subscriptions.forEach((sub: Subscription) => 
          {
            //console.log("sub update:" + update.site + update.pid + " " + update.value);

            // if the site and PID match...
            if (sub.site == update.site &&
              sub.block == update.block &&
              sub.pid == update.pid &&
              sub.channel == update.channel)
            {
              // update the widget.
              sub.widget.Update(update.site, update.pid, update.channel, update.value);
            }
          });
          break;
      }
    });
  }

  ///////////////////////////////////////////////////////////////////////////
  // GetWebSocket
  //
  // Return the URL for the web socket.  Begin with either ws or wss.
  // The path is /ws/.
  ///////////////////////////////////////////////////////////////////////////
  private GetWebSocket(): WebSocket
  {
    let url: string;
    if (window.location.protocol == "http:")
    {
      url = "ws://";
    }
    else // otherwise upgrade to secure.
    {
      url = "wss://";
    }
    url += window.location.host + "/ws/";

    // add the JWT
    url += "?bearer=" + this.accountService.userValue.token;

    return new WebSocket(url);
  }

  ///////////////////////////////////////////////////////////////////////////
  // Add
  //
  // A set of PIDs to the subscription for a widget.
  //
  // IN: widget: a custom view widget.
  //     sites: an array of sites of the PIDs.
  //     pids: an array of PIDs to subscribe to.
  ///////////////////////////////////////////////////////////////////////////
  Add(widget: RealTimeUpdate, arcPlusPids: ArcPlusPid[])
  {
    // loop through the PIDs
    for (let data of arcPlusPids) // index through the pids.
    {
      if (data.site == "0") continue;

      // make a new subscription for each one.
      let s = new Subscription();

      s.widget = widget;
      s.site = data.site;
      s.pid = data.pid;
      s.block = data.block;
      s.channel = data.channel;

      this.subscriptions.add(s);

      // console.log("add:" + s.toString());
      // console.log("subscriptions: " + this.subscriptions.size);

      let msg: SubscribeOperation =
      {
        "op": "sub",
        "arcPlusPid":
        {
          "site": data.site,
          "block": data.block,
          "channel": data.channel,
          "pid": data.pid,
        }
      };

      //console.log("Send subscriton");

      // convert the Subscription to a string and send it to the server.
      this.dataConnection.next(msg);

      //console.log("Subscription sent: " + JSON.stringify(msg));
    }
  }

  /**
   * Every 30 seconds re-subscribe to the data so a fresh value is returned.
   */
  SubscribeUpdate()
  {
    this.subscriptions.forEach((s) =>
    {
      let msg: SubscribeOperation =
      {
        "op": "sub",
        "arcPlusPid":
        {
          "site": s.site,
          "block": s.block,
          "channel": s.channel,
          "pid": s.pid,
        }
      };

      this.dataConnection.next(msg);
    });
  }

  ///////////////////////////////////////////////////////////////////////////
  // Remove
  //
  // Remove all the subscriptions from this widget.
  ///////////////////////////////////////////////////////////////////////////
  Remove(widget: RealTimeUpdate)
  {
    //console.log("remove subscriptions: " + this.subscriptions.size);
    let removeList: Subscription[] = [];

    // find all the subscriptions to remove.
    this.subscriptions.forEach((sub: Subscription) =>
    {
      if (sub.widget == widget)
      {
        removeList.push(sub);
      }
    });

    // Remove the subscriptions
    for (let r of removeList)
    {
      this.subscriptions.delete(r);

      // console.log("removed:" + r.pid);
    }
    // console.log("del subscriptions: " + this.subscriptions.size);

    // find all the subscriptions that we removed that do not have anymore widgets.
    removeList.forEach((r) =>
    {
      let found: boolean = false;

      // see if the subscription that we removed was the last subscription for that site:pid.
      this.subscriptions.forEach((sub) => 
      {
        if (sub.site == r.site &&
          sub.pid == r.pid &&
          sub.channel == r.channel)
        {
          found = true;
        }
      });

      // if the site:pid was not found in the subscription list, then send an unsubscribe message.
      if (!found)
      {
        // console.log("unsubscribe from pid");

        let msg: UnsubscribeOperation =
        {
          "op": "unsub",
          "arcPlusPid":
          {
            "site": r.site,
            "block": r.block,
            "channel": r.channel,
            "pid": r.pid,
          }
        };

        this.dataConnection.next(msg);
      }
    });

    //console.log("# subscriptions: " + this.subscriptions.size);
  }
}
