import {Inject, Injectable, OnDestroy} from '@angular/core';
import {BehaviorSubject, interval, Observable, Observer, Subject, SubscriptionLike} from 'rxjs';
import {WebSocketSubject, WebSocketSubjectConfig} from 'rxjs/webSocket';
import {distinctUntilChanged, filter, map, share, takeWhile} from 'rxjs/operators';

import {IWsMessage, WebSocketConfig} from './websocket.interfaces';
import { config } from './websocket.config';

// TODO: status or testStatus. Разобраться что лучше использовать.
@Injectable()
export class WebsocketService implements OnDestroy {

  // объект конфигурации WebSocketSubject
  private readonly config: WebSocketSubjectConfig<IWsMessage<any>>;

  private websocketSub: SubscriptionLike;
  private statusSub: SubscriptionLike;

// Observable для реконнекта по interval
  private reconnection$: Observable<number>;
  private websocket$: WebSocketSubject<IWsMessage<any>>;

// сообщает, когда происходит коннект и реконнект
  private connection$: Observer<boolean>;

// вспомогательный Observable для работы с подписками на сообщения
  private wsMessages$: Subject<IWsMessage<any>>;

// пауза между попытками реконнекта в милисекундах
  private reconnectInterval: number;

// количество попыток реконнекта
  private readonly reconnectAttempts: number;

// синхронный вспомогатель для статуса соединения
  private isConnected: boolean;

// статус соединения
  public status: Observable<boolean>;

  public testStatus: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(false);

  constructor(@Inject(config) private wsConfig: WebSocketConfig) {
    this.wsMessages$ = new Subject<IWsMessage<any>>();

    // смотрим конфиг, если пусто, задаем умолчания для реконнекта
    this.reconnectInterval = wsConfig.reconnectInterval || 5000;
    this.reconnectAttempts = wsConfig.reconnectAttempts || 10;

    // при сворачивании коннекта меняем статус connection$ и глушим websocket$
    this.config = {
      url: wsConfig.url,
      closeObserver: {
        next: (event: CloseEvent) => {
          this.websocket$ = null;
          this.connection$.next(false);
          this.testStatus.next(false);
        }
      },
      // при коннекте меняем статус connection$
      openObserver: {
        next: (event: Event) => {
          console.log('WebSocket connected!');
          this.connection$.next(true);
          this.testStatus.next(true);
        }
      }
    };

    // connection status
    this.status = new Observable<boolean>((observer) => {
      this.connection$ = observer;
    }).pipe(share(), distinctUntilChanged());

    // запускаем реконнект при отсутствии соединения
    this.statusSub = this.status
      .subscribe((isConnected) => {
        this.isConnected = isConnected;

        if (!this.reconnection$ && typeof (isConnected) === 'boolean' && !isConnected) {
          this.reconnect();
        }
      });

    // говорим, что что-то пошло не так
    this.websocketSub = this.wsMessages$.subscribe(
      null, (error: ErrorEvent) => console.error('WebSocket error!', error)
    );

    // коннектимся
    this.connect();
  }

  ngOnDestroy(): void {
  }

  private connect(): void {
    this.websocket$ = new WebSocketSubject(this.config); // создаем подключение
    // если есть сообщения, шлем их в дальше, если нет, ожидаем
    // реконнектимся, если получили ошибку
    this.websocket$.subscribe(
      (message) => this.wsMessages$.next(message),
      (error: Event) => {
        if (!this.websocket$) {
          // run reconnect if errors
          this.reconnect();
        }
      });
  }

  private reconnect(): void {
    // Создаем interval со значением из reconnectInterval
    this.reconnection$ = interval(this.reconnectInterval)
      .pipe(takeWhile((v, index) => index < this.reconnectAttempts && !this.websocket$));

    // Пытаемся подключиться пока не подключимся, либо не упремся в ограничение попыток подключения
    this.reconnection$.subscribe(
      () => this.connect(),
      null,
      () => {
        // Subject complete if reconnect attempts ending
        this.reconnection$ = null;

        if (!this.websocket$) {
          this.wsMessages$.complete();
          this.connection$.complete();
        }
      });
  }

  public on<T>(event: string): Observable<T> {
    if (event) {
      return this.wsMessages$.pipe(
        filter((message: IWsMessage<T>) => message.event === event),
        map((message: IWsMessage<T>) => message.data)
      );
    }
  }

  public send(event: string, data: any = {}): void {
    if (event && this.isConnected) {
      const message = { event, data };
      // console.log(`Send message: ${JSON.stringify(message)}`);
      this.websocket$.next(message as any);
    } else {
      console.error(`Send error! Event: ${event}, Data: ${data}`);
    }
  }

  public getStatus() {
    return this.status;
  }
  public getTestStatus() {
    return this.testStatus;
  }
}
