import {Injectable, OnDestroy, Inject} from '@angular/core';

import {WebSocketSubject, WebSocketSubjectConfig} from 'rxjs/webSocket';

import {
  Subject,
  interval,
  ReplaySubject,
  timer,
  Observable,
  race,
  merge,
  EMPTY,
  combineLatest,
  BehaviorSubject,
} from 'rxjs';
import {
  takeUntil,
  map,
  first,
  catchError,
  timeout,
  switchMap,
  take,
  mergeMap,
  filter,
  retry,
  tap,
  concatMap,
  auditTime,
  withLatestFrom,
} from 'rxjs/operators';

import {
  APIMessages as APIM,
  PL2,
  UtilsPL2 as U,
  KeyUtilsPL2 as KU,
} from '@common/utils/dist/index.js';

import {environment} from '../../environments/environment';

import {ServiceCommsWrapperService} from '@services/service-comms-wrapper.service';

import {UserStore} from '@stores/user.store';
import {APIStateStore} from '@stores/api-state.store';

import {
  WebsocketMessageProcessor,
  WSMessageProcessorOutput,
} from './websocket-message-processor/websocket-message-processor';
import {WSMessagePublisherService} from './ws-message-publisher.service';

enum SocketStatus {
  Opened,
  Closed,
  Stopped,
  Unknown,
}

@Injectable({
  providedIn: 'root',
})
export class WebsocketService implements OnDestroy {
  /* TODO this is being used by components to react to
   * websocket connection states. Would a store be better
   * now that we use it for more than ping/pong?
   */
  message$ = new Subject<APIM.WebSocketMessage>();

  private _socket$: WebSocketSubject<any>;
  private _socketStatus$ = new BehaviorSubject<SocketStatus>(
    SocketStatus.Unknown,
  );

  private _destroyed$ = new ReplaySubject<boolean>();

  private _ponged$ = new Subject<void>();
  private _acknowledged$ = new ReplaySubject<string>();

  private readonly _ackTimeout = 60_000;
  private _clientId = null;

  constructor(
    private serviceCommsWrapperService: ServiceCommsWrapperService,
    private userStore: UserStore,
    private apiStateStore: APIStateStore,
    @Inject('wsMessageProcessors')
    private wsMessageProcessors: Array<WebsocketMessageProcessor>,
    @Inject('webSocketFactory')
    private webSocketFactory: <T>(
      config: WebSocketSubjectConfig<T>,
    ) => WebSocketSubject<T>,
    private wsMessagePublisherService: WSMessagePublisherService,
    @Inject('SESSIONSTORAGE') private sessionStorage: any,
  ) {
    let previousUser: PL2.User = null;
    combineLatest([this.userStore.state$, this._socketStatus$])
      .pipe(
        filter(([userObject, status]) => {
          const hasChanged = previousUser?.userId !== userObject?.userId;
          previousUser = userObject;
          return hasChanged;
        }),
        tap(([userObject, status]) =>
          console.debug('user, status', userObject, status),
        ),
        map(([userObject, status]) => {
          switch (status) {
            case SocketStatus.Closed:
            case SocketStatus.Stopped:
            case SocketStatus.Unknown:
              if (!U.isEmpty(userObject)) {
                this.connect();
              }
              break;
            case SocketStatus.Opened:
              if (U.isEmpty(userObject)) {
                this._clientId = null;
                console.info('clientId', this._clientId);
                this.disconnect();
              }
              break;
          }
        }),
        takeUntil(this._destroyed$),
      )
      .subscribe();

    combineLatest([interval(30000), this._socketStatus$])
      .pipe(
        auditTime(30000),
        map(([_i, status]) => {
          if (status === SocketStatus.Opened) {
            this._socket$.next({
              action: 'sendmessage',
              data: {request: 'ping'},
            });
            timer(this._ackTimeout)
              .pipe(
                takeUntil(this._ponged$),
                map(() => this.apiStateStore.offline()),
              )
              .subscribe();
          } else {
            if (!U.isEmpty(this.userStore.state())) {
              console.log('Rescuing the websocket connection');
              this._connect();
            }
          }
        }),
        takeUntil(this._destroyed$),
      )
      .subscribe();

    let previousStatus: SocketStatus;
    this._socketStatus$
      .pipe(takeUntil(this._destroyed$))
      .subscribe((status) => {
        if (
          status === SocketStatus.Opened &&
          !U.isEmpty(previousStatus) &&
          previousStatus !== SocketStatus.Opened
        ) {
          this.message$.next({target: 'reconnection'});
        }
        previousStatus = status;
      });
  }

  ngOnDestroy() {
    this.disconnect();
    this._destroyed$.next(true);
    this._destroyed$.complete();
    this._socketStatus$.complete();
    this._acknowledged$.complete();
  }

  saveToStorage(): void {
    const uId = this.userStore.state()?.userId;
    if (!U.isEmpty(this._clientId) && !U.isEmpty(uId)) {
      this.sessionStorage.setItem(uId, this._clientId);
    }
  }

  loadStorage(uId: string): void {
    this._clientId =
      this.sessionStorage.getItem(uId) ?? this._clientId ?? U.generateId();
  }

  connect() {
    this._connect();
  }

  disconnect() {
    console.log('Disconnecting from websocket');
    this._close();
  }

  sendEvent(
    request: APIM.EntryEventRequest | APIM.AiEnvelope | APIM.UserEnvelope,
  ): Observable<void> {
    return this._send(request);
  }

  fetch(
    pK: string,
    opts: {
      single?: boolean;
      uEPK?: string;
      ro?: boolean;
      watchCtx?: boolean;
      eSK?: {[key: string]: any};
    } = {},
  ): Observable<void> {
    const queryStringParameters: any = U.excludeEmptyProperties({
      mId: KU.pKFromString(pK).mId,
      sK: KU.pKFromString(pK).sK,
      idx: 'master',
      single: opts.single || null,
      uEPK: opts.uEPK,
      watchCtx: opts.watchCtx || null,
      eSK: opts.eSK ? JSON.stringify(opts.eSK) : null,
    });

    return this.query(queryStringParameters);
  }

  fetchAll(pKs: string[], opts: any): Observable<void> {
    return merge(...pKs.map((pK) => this.fetch(pK, {...opts})));
  }

  query(query: APIM.IndexQuery): Observable<void> {
    return this._send({payload: [{act: PL2.EntryAction.Read, q: query}]});
  }

  private _connect() {
    if (this._hasLiveSocket()) {
      return;
    }

    const user = this.userStore.state();
    if (U.isEmpty(user?.userId)) {
      console.info("Can't connect without a userId");
      return;
    }

    if (U.isEmpty(this._clientId)) {
      this.loadStorage(user.userId);
    }

    console.log('Connecting to websocket', this._clientId);
    const that = this;
    this.serviceCommsWrapperService
      .currentSession()
      .then((session) => {
        that._socket$ = that.webSocketFactory({
          url: `${environment.websocketAPIGatewayUrl}?clid=${that._clientId}`,
          protocol: session.getIdToken().getJwtToken(),
          closeObserver: {
            next() {
              console.log('closeObserver called');
              that._close();
            },
          },
          openObserver: {
            next() {
              console.log('open');
              that._socketStatus$.next(SocketStatus.Opened);
            },
          },
        });
        that._socket$
          .pipe(
            takeUntil(that._destroyed$),
            mergeMap((message: APIM.WebSocketMessage) =>
              race(
                ...this.wsMessageProcessors.map((mp) =>
                  mp
                    .processMessage({
                      msg: message,
                      wsAPI: {
                        pong: () => that._ponged$.next(),
                        acknowledge: (id) => that._acknowledge(id),
                        query: (q) => that.query(q),
                        error: (msg) => this.message$.next(msg),
                      },
                    })
                    .pipe(
                      catchError((err) => {
                        console.warn(
                          '(Ignored)Websocket Error processing message:',
                        );
                        console.warn(err);
                        return EMPTY;
                      }),
                    ),
                ),
              ),
            ),
          )
          .subscribe(
            (_output: WSMessageProcessorOutput) => {
              // console.debug('ok'); nothing to do yet with output
            },
            (err) => {
              console.warn('Unexpected Error:');
              console.warn(err);
              that._close();
            },
            () => {
              console.log(
                'Calling reconnect after complete called (probably on the server side)',
              );
            },
          );
      })
      .catch((err) => {
        console.warn(err);
        this.userStore.signOut();
      });
  }

  private _close() {
    if (this._hasLiveSocket()) {
      this._socket$.error({code: 1000, reason: 'Websocket Error'});
    }
    this._socket$ = null;
    this._socketStatus$.next(SocketStatus.Closed);
  }

  private _hasLiveSocket(): boolean {
    return !!this._socket$ && !this._socket$.closed && !this._socket$.isStopped;
  }

  private _acknowledge(id: string): void {
    console.debug('acknowledge', id);
    this._acknowledged$.next(id);
  }

  private _send(
    request: any,
    envelope?: Partial<APIM.WebSocketMsgEnvelope>,
  ): Observable<void> {
    const waitForConnection$ = this._socketStatus$.pipe(
      takeUntil(this._destroyed$),
      first((status) => status === SocketStatus.Opened),
      timeout(5000),
      retry(3),
    );

    const eventId = U.generateId();
    console.debug('eventId', eventId);
    const publish$ = this.wsMessagePublisherService
      .publish(JSON.stringify(request), this.userStore.state().userId)
      .pipe(
        tap((data) => console.debug('published', data)),
        map((data) => {
          this._socket$.next({
            action: 'default',
            data: data,
            id: eventId,
            user: this.userStore.state(), // TODO serialize?
            cId: this._clientId,
            ...envelope,
          });
        }),
      );

    const acknowledged$ = this._acknowledged$.pipe(
      tap((v) => console.debug('received, expected', v, eventId)),
      first((v) => v === eventId),
    );

    return waitForConnection$.pipe(
      // tap(() => console.debug('connected')),
      // withLatestFrom(publish$, acknowledged$, (c, p, a) => console.debug('connected, published, acknowledged', c, p, a)),
      switchMap(() => publish$),
      mergeMap(() => acknowledged$),
      // tap(([p, a]) => console.debug('published, acknowledged', p, a)),
      map(() => void 0),
      timeout(this._ackTimeout),
      catchError((err) => {
        this.apiStateStore.offline();
        throw err;
      }),
    );
  }
}
