import {WebSocketConnector} from "~/core/websocket/web-socket-connector";
import {Observable, Subscription} from "rxjs";

export class WebSocketKeyedObservable<T> extends Observable<T> {
  constructor(
    private websocket: WebSocketConnector,
    private subscriptionKey: string,
  ) {
    super((subscriber) => {
      const messageSubscription: Subscription = this.websocket.observeSubscriptions().subscribe((message) => {
        if (message.subscriptionKey === subscriptionKey) {
          if (message.messageType === "next") subscriber.next(message.data);
          else if (message.messageType === "error") subscriber.error(message.data);
        }
      });

      return () => {
        messageSubscription.unsubscribe();
        websocket.removeActiveObservableByKey(subscriptionKey);
      };
    });
  }
}
