import {RPCRequest} from "~/core/rpc/rpc-request";
import {ToastService} from "~/core/toast.service";
import {ClientOpCode, WebSocketClientMessage, WebSocketServerMessage} from "./web-socket-message";
import {WebSocketObservable} from "./web-socket-observable";
import {Md5} from "ts-md5";
import {BehaviorSubject, interval, Observable, ReplaySubject, Subject, Subscriber, takeUntil} from "rxjs";
import {WebSocketKeyedObservable} from "~/core/websocket/web-socket-keyed-observable";
import {cloneDeep} from "~/core/util/object-util";
import {getWSTargetHost} from "~/core/rpc/rpc-target";
import {APP_VERSION} from "~/app.version";

/**
 * To prevent duplicate subscription requests (e.g. two different pages watching for the same data, but using separate observables),
 * we hash the actual "request" and use it as a client-side subscription key. That way, we can re-use existing subscriptions,
 * but add new observables to watch for the same data stream. */
export function generateSubscriptionKey(request: RPCRequest<any>) {
  return Md5.hashStr(JSON.stringify(request));
}

interface WebSocketSubscription {
  /** Just keeping count of the number of observers watching this subscription. Once it returns back to 0, then we can actually notify the server to unsubscribe from the particular endpoint. */
  count: number;

  /** In the event that the underlying web-socket is closed/broken unexpectedly, the client observables are still assuming the subscription is active. The websocket connector
   * should be trying to re-establish the connection, and once it succeeds, it'll look to see if there are disconnected subscriptions and re-create those subscriptions.
   * When the subscription is re-created, the tracking rid is simply replaced with the new one, and messages will start flowing again.
   *
   * This should only ever be "true" when the "rid" is set to null. */
  disconnected: boolean;

  /** During the subscription request stage, this flag indicates that the subscription request has actually been sent and we're now waiting for the response/error before
   * setting this flag back to false. When true, this flag is used to protect against duplicate attempts to establish the same subscription in the event of unknown race conditions
   * with websocket re-connections. */
  subscribing: boolean;

  /** The underlying websocket request ID that was used to create the data subscription, and therefore be used to map future messages back to this websocket subscription. */
  rid: number;

  /** The actual subscription request sent to the server. This is store permanently because it is needed to restore the original subscription if the socket is ever closed/re-created. */
  request: RPCRequest<any>;

  /** A client-side hash of the request, so that duplicate requests can be combined into one websocket subscription. See generateSubscriptionKey. */
  subscriptionKey: string;

  /** When a new observable is added and begins watching an existing subscription, we need to feed it some initial data, so we keep the
   * most recent message in memory so we can re-send it if any new observables are added. */
  lastData: any;
}

export class WebSocketConnector {
  private url: string;

  private websocket: WebSocket | null = null;
  private requestCount: number = 0;

  /** In order to route messages to the correct observers (WebSocketObservables), the individual Observables watch ALL messages
   * coming from the websocket and when they see the correct subscriptionKey, they can then relay the message to their own observers.
   * So this subscriptionDistributor is really just broadcasting all of the incoming websocket messages to be further relayed to
   * individual observers. */
  private subscriptionDistributor: ReplaySubject<{subscriptionKey: string, messageType: "next"|"error", data: any}> = new ReplaySubject<any>();

  /** Each time a websocket request is sent, it may expect to receive a corresponding server message
   * acknowledgement/response -- in such case, the message will include an "rid". When an "rid" is part of the
   * client message, a promise is returned, and that promise will only "resolve", once the server has
   * sent the corresponding response message using the same "rid" as its data.rid. At that point, the
   * resolve/reject from the waiting promise is called, and the completed request is removed from the activeRequests
   * map. */
  private activeRequests: Map<number, {resolve: (value: any) => void, reject: (reason?: any) => void}> = new Map();

  /** Since we use a hash subscriptionKey to prevent duplicate subscriptions from being sent to the server, we need a way
   * to do a lookup to find if there is an existing WebSocketSubscription using that subscriptionKey. This map tracks those
   * subscriptions using the subscriptionKey so we can add new observables under the same subscription. */
  private activeSubscriptionsBySubscriptionKey: Map<string, WebSocketSubscription> = new Map();

  /** The websocket subscription messages are identified by their numeric request id (rid). The rid is then used to
   * lookup the WebSocketSubscription initially established for that rid. Using the WebSocketSubscription, the message is
   * re-broadcast via the subscriptionDistributor using the hashed subscriptionKey. */
  private activeSubscriptionsByRid: Map<number, WebSocketSubscription> = new Map();

  private nextAppVersion: Subject<string> = new Subject<string>();
  private connectedObservable: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(false);

  constructor(
    private readonly document: Document,
    private readonly toast: ToastService,
  ) {
    this.resetUrl();
    this.setupHeartbeat();
  }

  observeNextAppVersion(): Observable<string> {
    return this.nextAppVersion;
  }

  private resetUrl(){
    this.url = getWSTargetHost(document) + '/ws';
    if (window.sessionStorage.getItem("subsessionid")) {
      this.url += '?subsessionid=';
      this.url += window.sessionStorage.getItem("subsessionid")
    }
  }

  private setupHeartbeat() {
    interval(5000).subscribe(n => {
      // TODO if the websocket is connected, perform the heartbeat to make sure the connection hasn't died in a way we can't detect
      this.reconnectSubscriptions()
    });
  }

  private reconnectSubscriptions() {
    // if the websocket was closed without unsubscribing, then try to re-establish the connection and resurrect the subscriptions
    if (!this.websocket && this.connectedObservable.value === false && this.activeSubscriptionsBySubscriptionKey.size > 0) {
      this.connectAllSubscriptions();
    }
  }

  private connect(): Promise<void> {
    if (this.websocket) {
      // if the websocket has already been created, we don't necessarily know whether the actual connection was successful
      if (!this.connectedObservable.value) {
        // if we don't have a successful connection, then we need to wait for the observable to fire so we can find out if the connection succeeded or was destroyed.
        // Basically, connectedObservable "true" means the connection was successfully established, but if the websocket becomes
        // null, then the connection failed, and we resolve/reject accordingly
        const unsubscribe = new Subject<void>();
        return new Promise((resolve, reject) => {
          this.connectedObservable.pipe(takeUntil(unsubscribe)).subscribe({ next: (value) => {
            if (value) {
              resolve();
              unsubscribe.next();
            } else if (!this.websocket) {
              reject("Not connected");
              unsubscribe.next();
            }
          }, error: (error) => {
            reject(error);
          }});
        });
      } else {
        // if we already know the connection is established, we can just resolve it
        return Promise.resolve();
      }
    }

    // finally, if the websocket isn't created yet, we set up the entire websocket, but we need to resolve/reject this connection
    // promise, and we'll track that to make sure we resolve/reject exactly once depending on the various scenarios
    let promiseResolved = false;
    // else create a new promise and attempt to establish the websocket...
    return new Promise((resolve, reject) => {
      // establishing the websocket connection could encounter an error, so we need the try/catch to handle it
      this.websocket = new WebSocket(this.url, );
      this.websocket.addEventListener('open', (event) => {
        if (!this.connectedObservable.value) {
          this.connectedObservable.next(true);
          if (!promiseResolved) {
            resolve();
            promiseResolved = true;
          }
        }
      });

      this.websocket.addEventListener('error', (event) => {
        console.log(event);
        if (!promiseResolved) {
          reject("Error connecting");
          promiseResolved = true;
        }
        this.disconnect(false);
      });

      this.websocket.addEventListener('close', (event) => {
        // if there are still open subscriptions, we'll need to try to re-establish the connection and re-subscribe/link those
        // subscriptions back up using their original subscription IDs, so we put them into a reset state
        this.activeSubscriptionsBySubscriptionKey.forEach(subscription => {
          subscription.subscribing = false;
          subscription.disconnected = true;
          if (subscription.rid) this.activeSubscriptionsByRid.delete(subscription.rid);
          subscription.rid = null;
        });
        if (!promiseResolved) {
          reject("Unable to connect");
          promiseResolved = true;
        }
        this.disconnect(false);
      });

      this.websocket.addEventListener('message', (event) => {
        console.log(event);
        const data = JSON.parse(event.data) as WebSocketServerMessage<any>;
        switch (data.op) {
          case "version":
            if (APP_VERSION && data.m && !(typeof data.m === 'undefined')) {
              if (APP_VERSION !== data.m) {
                // the backend has changed, so need to refresh
                this.nextAppVersion.next(data.m);
              }
            }
            // TODO
            break;
          case "authorized":
            // nothing to do here, the connection is good
            break;
          case "unauthorized":
            // TODO log an error? redirect to login screen?
            break;
          case "invalid_op":
            // TODO log an error?
            break;
          case "disconnect":
            // nothing really to do, but know that the connection is being closed by the server
            if (this.websocket) {
              this.disconnect(true);
            }
            break;
          case "response": {
            // we just need to pass this message along to the response promise
            const requestResolver = this.activeRequests.get(data.rid);
            if (requestResolver) {
              this.activeRequests.delete(data.rid);
              requestResolver.resolve(data.m);
            } else {
              console.log("NO RESOLVER");
            }
            let webSocketSubscription = this.activeSubscriptionsByRid.get(data.rid);
            if (webSocketSubscription){
              // console.log("Remove subscription key: " + webSocketSubscription.subscriptionKey);
              this.activeSubscriptionsBySubscriptionKey.delete(webSocketSubscription.subscriptionKey);
              this.activeSubscriptionsByRid.delete(data.rid);
            }
          } break;
          case "subscribed":{
            // A "request_subscribe" will receive a "subscribed" response, which will give us the subscription ID, so we need to connect the response here
            const requestResolver = this.activeRequests.get(data.rid);
            if (requestResolver) {
              this.activeRequests.delete(data.rid);
              requestResolver.resolve(data.m);
            } else {
              console.log("NO RESOLVER");
            }
          } break;
          case "subscription_data": {
            // send the data packet to everything observing the incoming messages so it can be routed to the correct subscription observable
            const subscription = this.activeSubscriptionsByRid.get(data.rid);
            if (subscription) {
              this.subscriptionDistributor.next({ subscriptionKey: subscription.subscriptionKey, messageType: "next", data: data.m});
            }
          } break;
          case "subscription_error": {
            // send the data packet to everything observing the incoming messages (as an error) so it can be routed to the correct subscription observable
            const subscription = this.activeSubscriptionsByRid.get(data.rid);
            if (subscription) {
              this.subscriptionDistributor.next({ subscriptionKey: subscription.subscriptionKey, messageType: "error", data: data.m});
            }
          } break;
        }
      });
    });
  }

  private disconnect(closeSubscriptions: boolean = true) {
    if (closeSubscriptions) {
      // TODO this is an unrecoverable disconnect, so we just need to close everything
    }
    if (!this.websocket) return false;
    try {
      this.connectedObservable.next(false);
      this.websocket.close();
      return true;
    } finally {
      this.websocket = null;
    }
  }

  resetSocket() {
    this.disconnect(false);
    this.resetUrl();
    this.reconnectSubscriptions();
  }


  observeSubscriptions(): Observable<{ subscriptionKey: string, messageType: "next"|"error", data: any }> {
    return this.subscriptionDistributor.asObservable();
  }

  addAssumedSubscriptionObservable<T>(request: RPCRequest<any>, rid: number): WebSocketKeyedObservable<T> {
    // Do not reuse for similar requests (shouldn't happen, but still...)
    const subscriptionKey = generateSubscriptionKey(request)+rid;
    let subscription: WebSocketSubscription = this.activeSubscriptionsBySubscriptionKey.get(subscriptionKey);
    if (subscription) {
      throw new Error("Invariant exception: tried to make a unique subscription key but failed");
    }
    // create the new subscription
    subscription = {
      count: 1,
      disconnected: false,
      subscribing: false,
      rid: rid,
      lastData: null,
      subscriptionKey: subscriptionKey,
      request: null // we don't store the request because this isn't a re-creatable request
    };
    // console.log("Set subscription key: " + subscriptionKey, subscription);
    this.activeSubscriptionsBySubscriptionKey.set(subscriptionKey, subscription);
    this.activeSubscriptionsByRid.set(rid, subscription);
    console.log(this, "Set key!", subscriptionKey, subscription, this.activeSubscriptionsBySubscriptionKey);
    return new WebSocketKeyedObservable<T>(this, subscriptionKey);
  }

  addActiveObservable(request: RPCRequest<any>, subscriber: Subscriber<any>) {
    // should re-use any existing subscription object rather than opening a new one for the same data stream
    const subscriptionKey = generateSubscriptionKey(request);
    let subscription: WebSocketSubscription = this.activeSubscriptionsBySubscriptionKey.get(subscriptionKey);
    if (!subscription) {
      // create the new subscription
      subscription = {
        count: 1,
        disconnected: true,
        subscribing: false,
        rid: null,
        lastData: null,
        subscriptionKey: subscriptionKey,
        request: cloneDeep(request)
      };
      // console.log("Add active observable for subscription key: " + subscription.subscriptionKey);
      this.activeSubscriptionsBySubscriptionKey.set(subscription.subscriptionKey, subscription);
      // we've just set up the subscription relay, but we haven't actually connected the server-side subscription, so we do that next
      this.connectAllSubscriptions();
    } else {
      subscription.count++;
      // make sure to emit the last message of the existing subscription onto the observable (if there is one)
      if (subscription.lastData && subscriber) {
        subscriber.next(cloneDeep(subscription.lastData));
      }
    }
  }

  private connectAllSubscriptions(): void {
    // since the subscriptions can be reserved even if the websocket is not established, or temporarily down, we just
    // iterate over out reserved subscriptions to make sure they're all connected
    this.activeSubscriptionsBySubscriptionKey.forEach(subscription => {
      this.performRequestSubscription(subscription);
    });
  }

  private performRequestSubscription(subscription: WebSocketSubscription): void {
    // we only need to connect the subscription if the "subscribing" process isn't active, and it is considered disconnected
    if (!subscription.subscribing && subscription.disconnected) {
      // console.log("Attempt connect " + JSON.stringify(subscription) + "...");
      subscription.subscribing = true;
      this.webSocketMessage("request_subscribe", subscription.request).then((result) => {
        // console.log("Succeeded connect " + JSON.stringify(subscription) + ".");
        subscription.subscribing = false;
        subscription.disconnected = false;
        subscription.rid = result;
        this.activeSubscriptionsByRid.set(result, subscription);
        this.checkUnsubscribeAfterReady(subscription);
      }, (error) => {
        // TODO what should we do if we actually fail to setup the subscription?
        subscription.subscribing = false;
        // console.log("Failed connect " + JSON.stringify(subscription) + ".");
      });
    }
  }

  removeActiveObservable(request: RPCRequest<any>) {
    const subscriptionKey = generateSubscriptionKey(request);
    this.removeActiveObservableByKey(subscriptionKey)
  }

  removeActiveObservableByKey(subscriptionKey: string) {
    const subscription: WebSocketSubscription = this.activeSubscriptionsBySubscriptionKey.get(subscriptionKey);
    if (!subscription) {
      throw new Error("Logic error. There is no active subscription found: " + subscriptionKey);
    }
    subscription.count--;
    if (subscription.count < 0) throw new Error("Logic error. Subscription closed too many times.");
    this.checkUnsubscribeAfterReady(subscription);
  }

  rpcCall<I, O>(namespace: string, method: string, input: I): Promise<O|null> {
    const request: RPCRequest<I> = {
      namespace: namespace,
      method: method,
      data: input
    };
    return this.webSocketMessage("request", request);
  }

  rpcCallWithUpdates<I,U,O>(namespace: string, method: string, input: I): [Observable<U|null>, Promise<O|null>] {
    const request: RPCRequest<I> = {
      namespace: namespace,
      method: method,
      data: input
    };
    // the "this.requestCount + 1" takes advantage of the expectation the that subsequent webSocketMessage will increment the requestCount and send the message.
    // So, we're basically setting up a "pre-subscription" to watch for data coming back on that future request ID.
    const updates = this.addAssumedSubscriptionObservable<U>(request, this.requestCount+1)
    const response = this.webSocketMessage("request_with_updates", request);
    return [updates, response]
  }

  rpcSubscribe<I, O>(namespace: string, method: string, input: I): WebSocketObservable<O> {
    const request: RPCRequest<I> = {
      namespace: namespace,
      method: method,
      data: input,
      trace: false
    };
    return new WebSocketObservable<O>(this, request);
  }

  webSocketMessage<I extends RPCRequest<any>|any, O>(op: ClientOpCode, request: I): Promise<O|any> {
    const rid = ++this.requestCount;
    const message: WebSocketClientMessage<I> = {
      op: op,
      rid: rid,
      m: request
    };
    return this.sendWebSocketMessage(message);
  }

  private sendWebSocketMessage<O>(message: WebSocketClientMessage<any>): Promise<O|any> {
    return new Promise((resolve, reject) => {
      this.connect().then(() => {
        if (message.rid) {
          this.activeRequests.set(message.rid, { resolve, reject });
          this.websocket?.send(JSON.stringify(message));
        } else {
          this.websocket?.send(JSON.stringify(message));
          resolve(null);
        }
      }, (error) => {
        reject(error);
      }).catch(error => {
        reject(error);
      });
    });
  }

  private checkUnsubscribeAfterReady(subscription: WebSocketSubscription) {
    if (subscription.count === 0) {
      // only remove if the current active subscription is the same one being unsubscribed
      if (this.activeSubscriptionsBySubscriptionKey.get(subscription.subscriptionKey) === subscription) {
        // console.log("Remove subscription after ready: " + subscription.subscriptionKey);
        this.activeSubscriptionsBySubscriptionKey.delete(subscription.subscriptionKey);
      }
      if (subscription.rid) {
        this.activeSubscriptionsByRid.delete(subscription.rid);
        this.webSocketMessage("unsubscribe", subscription.rid).then((result) => {
          // do nothing
        });
      }
    }
  }
}
