import {Observable, Subscription} from "rxjs";
import {generateSubscriptionKey, WebSocketConnector} from "./web-socket-connector";
import {RPCRequest} from "~/core/rpc/rpc-request";
import {cloneDeep} from "~/core/util/object-util";

export class WebSocketObservable<T> extends Observable<T> {
  constructor(
    private websocket: WebSocketConnector,
    private request: RPCRequest<any>
  ) {
    super((subscriber) => {
      // each time a new subscribe is called
      request = cloneDeep(request);
      const subscriptionKey = generateSubscriptionKey(request);
      websocket.addActiveObservable(request, subscriber);

      const messageSubscription: Subscription = this.websocket.observeSubscriptions().subscribe((message) => {
        if (message.subscriptionKey === subscriptionKey) {
          if (message.messageType === "next") subscriber.next(cloneDeep(message.data));
          else if (message.messageType === "error") subscriber.error(cloneDeep(message.data));
        }
      });

      return () => {
        messageSubscription.unsubscribe();
        websocket.removeActiveObservable(request);
      };
    });
  }
}
