import { ofType, StateObservable } from 'redux-observable';
import { firstValueFrom, from, merge, Observable, of } from 'rxjs';
import {
  catchError,
  filter,
  first,
  mergeMap,
  startWith,
  takeWhile,
  throttleTime
} from 'rxjs/operators';
import { ISreDependencies } from 'sre/epics/sre-epics.model';
import {
  LanguageFrameSession as LF,
  SreSessionType
} from '@lexialearning/lobo-common/main-model/sre';
import { AppState } from 'services';
import {
  SreConfigAction,
  SreSelector,
  SreSessionAction,
  SreSessionActionCancel,
  SreSessionActionPrime,
  SreSessionActionPrimeFailure,
  SreSessionActionPrimeSuccess,
  SreSessionActionType
} from 'sre';
import { SreError, SreErrorType } from '@lexialearning/sre';
import { ILogger, LoggingLevel } from '@lexialearning/main-model';
import { LexiaError } from '@lexialearning/utils';
import { LoboLogItemCategory } from 'logging';

export interface IPrimeSreSessionEpicDependencies extends ISreDependencies {
  logger: ILogger;
}
// Create a primed session, and handle any errors that may
// be hit before the session is activated
export function primeSreSessionEpic(
  action$: Observable<SreSessionActionPrime>,
  state$: StateObservable<AppState>,
  deps: IPrimeSreSessionEpicDependencies
): Observable<
  | SreSessionActionPrimeSuccess
  | SreSessionActionPrimeFailure
  | SreSessionActionCancel
  | void
> {
  return action$.pipe(
    ofType(SreSessionActionType.Prime),
    throttleTime(250),
    mergeMap(async action => {
      // Check if current session is being canceled and await cancellation to finish before starting priming
      const isCanceling = SreSelector.getIsCanceling(state$.value);
      if (isCanceling) {
        await firstValueFrom(
          state$.pipe(
            startWith(state$.value),
            first(s => !SreSelector.getIsCanceling(s))
          )
        );
      }

      const { resultPromise } = await deps.sre.prime(
        action.payload as LF.IConfig
      );

      return merge(
        of(SreSessionAction.prime.success()),
        handleSessionErrors$(resultPromise, state$, deps)
      );
    }),
    mergeMap(r => r)
  );
}
primeSreSessionEpic.displayName = 'primeSreSessionEpic';

/**
 * Catch and handle any errors that may occur in the primed session
 */
function handleSessionErrors$(
  resultPromise: Promise<any>,
  state$: StateObservable<AppState>,
  deps: IPrimeSreSessionEpicDependencies
): Observable<void | SreSessionActionCancel> {
  return from(resultPromise).pipe(
    takeWhile(
      () =>
        SreSelector.getSessionType(state$.value) ===
        SreSessionType.PrimedLanguageFrame
    ),
    // eslint-disable-next-line rxjs/no-implicit-any-catch
    catchError((err: LexiaError) => handleSessionErrors(err, deps)),
    // only error actions should go through
    filter(action => !!action?.type)
  );
}

function handleSessionErrors(
  error: LexiaError,
  deps: IPrimeSreSessionEpicDependencies
): Observable<any> {
  const origError: SreError | undefined = (error.context as any)?.sreErr;
  const err = origError instanceof SreError ? origError.type : undefined;

  if (err === SreErrorType.SreSessionInterruptedBeforeActivation) {
    void deps.logger.log({
      category: LoboLogItemCategory.SessionInterruptedBeforeActivation,
      loggingLevel: LoggingLevel.Info,
      payload: {
        error: origError as SreError,
        sessionType: SreSessionType.PrimedLanguageFrame
      },
      summary: 'Prime session was cancelled before it was activated'
    });

    // Dispatch nothing - no need to cancel as this error means it was canceled
    return from([]);
  }
  if (
    // blocked mic beginning a session
    err === SreErrorType.EmscriptenMediastreamNotAllowedError ||
    // blocked mic during an active session
    err === SreErrorType.ListentaskNoMicrophone
  ) {
    return from([
      SreSessionAction.cancel.request(),
      SreConfigAction.setMicBlocked(true)
    ]);
  }

  throw error;
}
