import { BackingOffTimer } from "src/nextgen/BackingOffTimer";
import { WebSocketConnection } from "src/nextgen/WebSocketConnection";
import { BackgroundTimer } from "src/util/BackgroundTimer";
import { Logger } from "src/util/Logger";
import { WrappedPromise } from "src/util/WrappedPromise";
import type { GroupTalkAPIError } from "src/lib/GroupTalkAPIError";
import type { AuthenticationModule } from "src/nextgen/AuthenticationModule";

const log = Logger.getLogger("ReconnectingWebsocket");

type Observer = {
  onConnected: () => void;
  onDisconnected: (error: GroupTalkAPIError) => void;
  onMessage: (msg: any) => void;
};

type Handshaker = {
  abortPromise?: WrappedPromise<void>;
  handshakePromise: Promise<void>;
  handshakeTimeout: number;
  onMessage?: (msg: any) => void;
};

type KeepAlive = {
  maxPingResponseTime: number;
  pingDetector: (msg: any) => boolean;
  pingInterval: number;
  pingMessageFactory: (msg?: any) => { type: string };
  pingResponseTimeout: number;
  pongDetector: (msg: any) => boolean;
  pongMessageFactory: (msg?: any) => { type: string };
};

export class ReconnectingWebsocket {
  /**
   * The authentication module to use for authenticating
   * @member {AuthenticationModule}
   */
  private readonly authenticationModule: AuthenticationModule;
  private readonly connectTask: BackingOffTimer<void>;
  /**
   * @member handshakerFactory function that takes a websocket as single parameter and returns an object with the following properties:
   *          - onMessage(msg) that will receive any incoming messages during handshake (optional)
   *          - handshakePromise which is a promise that will resolve when handshake completes
   *            or rejects if handshake fails.
   *          - handshakeTimeout time in millis until handshake should abandoned (optional)
   */
  private readonly handshakerFactory: (
    websocket: WebSocketConnection
  ) => Handshaker;
  /**
   * Websocket URI
   * @member {string}
   */
  private readonly websocketURI: string;
  private connected = false;
  private connection?: WebSocketConnection;
  private handshaker?: Handshaker;
  private keepAlive?: KeepAlive;
  private observers: Observer[] = [];
  private pingTimeout?: () => void;
  private pingTimer?: () => void;
  private started = false;
  private stopped = false;
  public constructor(
    authenticationModule: AuthenticationModule,
    websocketURI: string,
    handshakerFactory: (websocket: WebSocketConnection) => {
      handshakePromise: Promise<void>;
      handshakeTimeout: number;
      onMessage?: (msg: any) => void;
    }
  ) {
    this.authenticationModule = authenticationModule;
    this.websocketURI = websocketURI;
    this.handshakerFactory = handshakerFactory;
    this.connectTask = new BackingOffTimer<void>(() => this.connectInternal());
  }
  public closeDown(): void {
    if (!this.stopped) {
      this.stopped = true;
      if (this.connection !== undefined) {
        log.debug(`Disconnecting websocket from ${this.websocketURI}`);
        this.connection.stop();
        this.connection = undefined;
      }
      // In case we have a pending connect
      this.connectTask.cancel();
      this.cancelPing();
      this.cancelPingTimeout();
    }
  }
  // Start keep-alive on the connection. Typically called when completing the handshake.
  public startKeepAlive(
    config: Partial<KeepAlive> & { pingResponseTimeout: number }
  ): void {
    if (!this.connection) {
      log.warn("Requested startKeepAlive on a non-active connection");
      return;
    }
    this.keepAlive = {
      maxPingResponseTime: 10000,
      pingDetector: (msg) => msg.type === "Ping",
      pingInterval: 60000,
      pingMessageFactory: () => ({ type: "Ping" }),
      pongDetector: (msg) => msg.type === "Pong",
      pongMessageFactory: () => ({ type: "Pong" }),
      ...config,
    };
    this.schedulePing();
  }
  public subscribe(observer: Observer): () => void {
    if (!this.stopped && !this.started) {
      this.started = true;
      this.startConnectInternal();
    }
    this.observers.push(observer);
    if (this.connected && observer.onConnected) {
      observer.onConnected();
    }
    return () => {
      this.observers = this.observers.filter((f) => f !== observer);
    };
  }
  private cancelPing(): void {
    this.pingTimer?.();
    this.pingTimer = undefined;
  }
  private cancelPingTimeout(): void {
    this.pingTimeout?.();
    this.pingTimeout = undefined;
  }
  private cleanupKeepAliveInternal(): void {
    this.cancelPing();
    this.cancelPingTimeout();
    this.keepAlive = undefined;
  }
  // Makes a connection attempt and returns a promise that resolves if connected and rejects
  // if connect fails. No reconnects are made, but startConnectInternal() is called if
  // sockets disconnects after being connected
  private connectInternal(): Promise<void> {
    if (this.stopped) {
      return Promise.reject(new Error("Connection was stopped"));
    }
    return new Promise((resolve, reject) => {
      this.authenticationModule
        .login()
        .then((jwt) => {
          log.debug(`Connecting websocket to ${this.websocketURI}`);

          this.connection = new WebSocketConnection({
            onDisconnect: (disconnectError) => {
              log.debug(
                `Websocket to ${this.websocketURI} closed (${disconnectError})`
              );
              this.connection = undefined;
              this.cleanupKeepAliveInternal();
              if (this.handshaker) {
                // Still in handshake, reject it
                this.handshaker.abortPromise?.reject(disconnectError);
              }
              if (!this.connected) {
                // If we never did connect, nothing to inform anyone of
                // and we should not start new connection.
                return;
              }
              this.connected = false;
              this.observers
                .filter((o) => o.onDisconnected)
                .forEach((o) => {
                  o.onDisconnected(disconnectError);
                });
              // Since we were connected, we need to start a new cycle of connects
              this.startConnectInternal();
            },
            onMessage: (data) => {
              try {
                const json = JSON.parse(data);
                if (this.handshaker && this.handshaker.onMessage) {
                  log.trace(`Received websocket handshake data: ${data}`);
                  this.handshaker.onMessage(json);
                } else {
                  log.trace(`Received websocket data: ${data}`);
                  if (this.keepAlive && this.keepAlive.pingDetector(json)) {
                    log.trace("Received websocket Ping");
                    const pong = JSON.stringify(
                      this.keepAlive.pongMessageFactory(json)
                    );
                    this.connection?.sendMessage(pong);
                  } else if (
                    this.keepAlive &&
                    this.keepAlive.pongDetector(json)
                  ) {
                    log.trace("Received websocket Pong");
                    this.cancelPingTimeout();
                    this.schedulePing();
                  } else {
                    this.observers
                      .filter((o) => o.onMessage)
                      .forEach((o) => {
                        o.onMessage(json);
                      });
                  }
                }
              } catch (err) {
                log.warn(`Error while processing websocket data: ${err}`);
              }
            },
            protocol: ["bearer", jwt],
            server: this.websocketURI,
          });
          this.connection
            .start()
            .then(() => {
              log.debug(`Websocket to ${this.websocketURI} connected`);
              let handshakePromise;
              if (this.handshakerFactory && this.connection) {
                this.handshaker = this.handshakerFactory(this.connection);
                // We want to abort the handshake if we are disconnected, so keep our own promise
                this.handshaker.abortPromise = new WrappedPromise();
                const promises = [
                  this.handshaker.handshakePromise,
                  this.handshaker.abortPromise.promise,
                ];
                if (this.handshaker.handshakeTimeout > 0) {
                  promises.push(
                    new Promise((_, timeoutReject) => {
                      BackgroundTimer.setTimeout(
                        () => timeoutReject(new Error("Handshake timeout")),
                        this.handshaker!.handshakeTimeout
                      );
                    })
                  );
                }
                handshakePromise = Promise.race(promises);
              } else {
                handshakePromise = Promise.resolve();
              }
              handshakePromise
                .then(() => {
                  this.connected = true;
                  this.handshaker = undefined;
                  this.observers
                    .filter((o) => o.onConnected)
                    .forEach((o) => {
                      o.onConnected();
                    });
                  resolve();
                })
                .catch((error) => {
                  // Failure during handshake, disconnect and propagate failure
                  log.debug(
                    `Websocket connection to ${this.websocketURI} failed handshake: ${error}`
                  );
                  this.handshaker = undefined;
                  if (this.connection) {
                    this.connection.stop();
                    this.connection = undefined;
                  }
                  reject(error);
                });
            })
            .catch((error) => {
              log.debug(
                `Websocket connection to ${this.websocketURI} failed to connect: ${error}`
              );
              this.connection = undefined;
              reject(error);
            });
        })
        .catch((err: any) => {
          reject(err);
        });
    });
  }
  private schedulePing(): void {
    const { pingInterval, pingMessageFactory } = this.keepAlive!;
    log.trace(`Scheduling websocket ping in ${pingInterval}`);
    this.pingTimer = BackgroundTimer.setTimeout(() => {
      if (this.connection) {
        log.trace(`Sending websocket ping to ${this.websocketURI}`);
        this.connection.sendMessage(JSON.stringify(pingMessageFactory()));
        this.schedulePingTimeout();
      }
    }, pingInterval);
  }
  private schedulePingTimeout(): void {
    const { pingResponseTimeout } = this.keepAlive!;
    this.pingTimeout = BackgroundTimer.setTimeout(() => {
      if (this.connection) {
        log.warn("No ping response on websocket, closing");
        this.connection.stop();
      }
    }, pingResponseTimeout);
  }
  // Connect with retry if connection fails. If it succeeds no more retries will be made until
  // calling again.
  private startConnectInternal(): void {
    if (!this.stopped) {
      this.connectTask.start().catch((error) => {
        log.warn(`Connection to ${this.websocketURI} failed: ${error}`);
        this.startConnectInternal();
      });
    }
  }
}
