import {Injectable, OnDestroy} from '@angular/core';
import {IUserMessageSearch, UserMessageService} from '../user-message.service';
import {UserMessageEntry} from '../../model/UserMessage/UserMessageEntry';
import {BehaviorSubject, EMPTY, forkJoin, Observable, Subject, Subscription} from 'rxjs';
import {HttpClient} from '@angular/common/http';
import {environment} from '../../../../environments/environment';
import {Parser} from '../../model/utm/parser/OperationParser';
import {SearchResult} from '../../model/SearchResult';
import {map, share} from 'rxjs/operators';
import {RxStomp} from '@stomp/rx-stomp';
import {StompService} from '../stomp.service';
import * as _ from 'lodash';
import {DirectMessageSubmission, ITransportDirectMessageSubmissionResponse} from '../../model/DirectMessageSubmission';
import {IMessage} from "@stomp/stompjs/esm6";

@Injectable({
  providedIn: 'root'
})
export class RestUserMessageService extends UserMessageService implements OnDestroy {
  private baseGetMessagesUrl = `${environment.baseUrl}/operator/nmessages`;
  private baseMessagesUrl = `${environment.baseUrl}/operator/messages/`;
  private baseGetUnreadMessageCountUrl = `${environment.baseUrl}/operator/messages/unread`;
  private baseReadMessagesUrl = `${environment.baseUrl}/operator/messages/read`;
  // private unreadMessageCountWatchObservables: Map<number, Observable<number>> = new Map<number, Observable<number>>();
  private messageSubject: Subject<UserMessageEntry[]> = new Subject<UserMessageEntry[]>();
  private unreadMessageCountSubject: BehaviorSubject<number> = new BehaviorSubject<number>(0);
  private stompSub: Subscription;
  private refreshUnreadMessageSub: Subscription;
  private messageSubscription: Subscription;
  private lastClient: RxStomp | null = null;
  private rawMessageSubject: Subject<IMessage> = new Subject<IMessage>();
  constructor(private stompService: StompService, private http: HttpClient) {
    super();

    this.rawMessageSubject.pipe(map((message) => [Parser.parseUserMessageEntry(JSON.parse(message.body))])).subscribe((messages) => {
      this.messageSubject.next(messages);
    });

    this.stompSub = this.stompService.getStompClient().subscribe((rxStomp: RxStomp | null) => {
      if (rxStomp === null) {
        this.messageSubscription?.unsubscribe();
        this.messageSubscription = null;
        this.lastClient = null;
        return;
        // return EMPTY;
      }
      if (rxStomp !== this.lastClient || !this.messageSubscription){
        this.messageSubscription = rxStomp.watch(`/exchange/user_messages_topic/#`).pipe(share()).subscribe(this.rawMessageSubject);
      }

      this.lastClient = rxStomp;

    });



  }

  deleteMessages(uuids: string[]): Observable<boolean> {
    return forkJoin((uuids || []).map(id => this.http.delete(this.baseMessagesUrl + id))).pipe(map(() => true));
  }

  markMessagesAsRead(uuids: string[]): Observable<boolean> {
    return this.http.post(this.baseReadMessagesUrl, uuids).pipe(map(() => true));
  }

  getMessages(messageSearch: IUserMessageSearch,
              limit?: number,
              offset?: number,
              fetchCount?: boolean): Observable<SearchResult<UserMessageEntry>> {
    return this.http.get(this.baseGetMessagesUrl, {
      params: {
        /* eslint-disable @typescript-eslint/naming-convention */
        read: !_.isNil(messageSearch.read) ? messageSearch.read : '',
        after_time: messageSearch.afterTime ? messageSearch.afterTime.toUTC().toISO() : '',
        limit: (limit ? limit : 100).toString(),
        offset: ((offset !== null && offset !== undefined) && offset >= 0 ? offset : 0).toString(),
        fetch_count: (!!fetchCount).toString(),
        /* eslint-enable @typescript-eslint/naming-convention */
      }
    }).pipe(map((messages: any) => new SearchResult<UserMessageEntry>(
        (messages.messages || []).map(Parser.parseUserMessageEntry), messages.count, messages.offset)
      )
    );
  }

  watchMessages(): Observable<UserMessageEntry[]> {
    return this.messageSubject;
  }

  getUnreadMessageCount(): Observable<number> {
    return this.http.get(this.baseGetUnreadMessageCountUrl).pipe(map((res: {
      unread: number;
      timeOfLastMessage: string;
    }) => res.unread));
  }

  watchUnreadMessageCount(): Observable<number> {
    return this.unreadMessageCountSubject;
  }

  emitMessages(message: UserMessageEntry | UserMessageEntry[]): void {
    const messagesToEmit = Array.isArray(message) ? message : [message];
    this.messageSubject.next(messagesToEmit);
  }

  ngOnDestroy(): void {
    this.stompSub?.unsubscribe();
    this.messageSubscription?.unsubscribe();
  }

  refreshUnreadMessageCount(force: boolean = false): void {
    if (this.refreshUnreadMessageSub && !this.refreshUnreadMessageSub.closed && !force) {
      return;
    }
    this.refreshUnreadMessageSub?.unsubscribe();

    this.refreshUnreadMessageSub = this.getUnreadMessageCount().subscribe(count => {
      this.unreadMessageCountSubject.next(count);
    });
  }

  sendMessageForOperation(operationID: string, message: DirectMessageSubmission): Observable<DirectMessageSubmission> {
    return this.http.post(this.baseMessagesUrl + 'sendtooperation/' + operationID, message)
      .pipe(map((rawRes: ITransportDirectMessageSubmissionResponse) => (Parser.parseUserDirectMessageResponse(rawRes))));
  }

  sendMessageToUser(userID: string, message: DirectMessageSubmission): Observable<DirectMessageSubmission> {
    return this.http.post(this.baseMessagesUrl + 'sendtouser/' + userID, message)
      .pipe(map((rawRes: ITransportDirectMessageSubmissionResponse) => (Parser.parseUserDirectMessageResponse(rawRes))));
  }

  getConflictingEntity(opId: string): Observable<any> {
    return this.http.get(`${environment.baseUrl}/operator/messages/byentity?entity=${opId}`)
      .pipe(map((res) => res[0].message.data));
  }

  // private getUnreadMessageCountWatchObservable(watchInterval: number): Observable<number> {
  //   if (!this.unreadMessageCountWatchObservables.has(watchInterval)) {
  //     const source = timer(100, watchInterval).pipe(mergeMap(() => this.getUnreadMessageCount()));
  //     const subject = new Subject<number>();
  //     const refCounted = source.pipe(multicast(subject), refCount());
  //
  //     this.unreadMessageCountWatchObservables.set(watchInterval, refCounted);
  //   }
  //
  //   return this.unreadMessageCountWatchObservables.get(watchInterval);
  // }

}
