import md5 from 'md5';
import _ from 'lodash';
import { Observable, of, from, EMPTY } from 'rxjs';
import {
  map,
  first,
  toArray,
  groupBy,
  tap,
  flatMap,
  mergeMap
} from 'rxjs/operators';
import loadWithFsCache from './cache-firestore';
import loadWithGcsCache from './cache-gcs';
import loadWithFilesystemJsonCache from './cache-filesystem-json';
import logger from '~/app/logger';

// Simple snake case for easy namespace join. not very DRY but for easier IDE hints
export enum CacheNamespace {
  MICROLINK_API = 'microlink-api',
  GOOGLE_SYNTAX = 'google-syntax',
  GOOGLE_ANNOTATE = 'google-annotate',
  GOOGLE_TRANSLATE = 'google-translate',
  PUBSUB_MESSAGE_KEY = 'pubsub-message-key',
  MEDIA_METADATA = 'media-metadata',
  YOUTUBE_API = 'youtube-api',
  CONTENTFUL = 'contentful',
  LISTENNOTES = 'listennotes'
}

// some strategies are node base we should support dynamic require
// circular deps
export const CACHE_STRATEGIES_ISO = {
  FIRESTORE: loadWithFsCache,
  GCS: loadWithGcsCache,
  FILESYSTEM_JSON: loadWithFilesystemJsonCache
};

// in case we want object key in future https://github.com/puleos/object-hash
export const getCacheKeyHash = (cacheKey: string) => md5(cacheKey);

/**
 * Since we coupled loadFn & ids,
 * There might be case ids itself isn't unqiue & namespace isn't enough
 *  Optional getCacheKeyWithId(), e.g. can help to capture the params at loadFn
 */
export type loadCacheStrategy = (
  namespace: string,
  keys: string[],
  loadFn: (keys: string[]) => Observable<Record<string, any>>
) => any;

export const loadWithCacheBulk = (
  namespace: CacheNamespace,
  _ids: string[],
  loadFn: (ids: string[]) => Observable<Record<string, any>>,
  getCacheKeyWithId?: (id: string) => string,
  cacheStrategy: loadCacheStrategy = CACHE_STRATEGIES_ISO[
    process.env.WQ_CACHE_STORE || 'FIRESTORE'
  ]
): Observable<Record<string, any>> => {
  const ids = _ids.filter(Boolean);

  if (_.isEmpty(ids)) {
    logger.debug('No ids', ids);

    return of({});
  }
  // TODO Simplify the key-id logic
  // Limitation: We can't control the backpressure of loadFn given unknown cached entries and ids are not observable

  // @ts-ignore
  const cacheKeys = ids
    .map(getCacheKeyWithId || _.identity)
    .map(getCacheKeyHash);

  const idByCacheKey = _.zipObject(cacheKeys, ids);
  const cacheKeyById = _.zipObject(ids, cacheKeys);
  const loadFnWithCacheKeys = (cacheKeys: string[]) =>
    loadFn(cacheKeys.map((cacheKey) => idByCacheKey[cacheKey])).pipe(
      map((resultById) => _.mapKeys(resultById, (v, id) => cacheKeyById[id]))
    );

  return cacheStrategy(namespace, cacheKeys, loadFnWithCacheKeys).pipe(
    map((resultByKey) => _.mapKeys(resultByKey, (v, k) => idByCacheKey[k]))
  );
};

export const loadWithCache = (
  namespace: CacheNamespace,
  id: string,
  loadFn: (params: any) => Observable<any>,
  getCacheKeyWithId?: (id: string) => string,
  strategy
): Observable<any> =>
  loadWithCacheBulk(
    namespace,
    [id],
    (params) => loadFn(params).pipe(map((value) => _.fromPairs([[id, value]]))),
    getCacheKeyWithId,
    strategy
  ).pipe(
    map((resultById) => _.first(_.values(resultById))),
    first()
  );

/**
 * Generic Cache supporting cache as Map / query-based observable (e.g. firestore)
 * It's hard to ensure the likely external apis loadFn gives stable order,
 * i.e. returns null for missing /failed
 * We always return map
 *
 * Client could use original keys to pick values in order
 * current limitations = we are unable to tell if result is originally hit/miss
 */
export function loadWithCustomCacheBulk<T>(
  keys: string[],
  loadFn: (keys: string[]) => Observable<Record<string, T>>,
  checkIsCacheHit: (keys: string[]) => Observable<Record<string, boolean>>,
  loadCacheFn: (keys: string[]) => Observable<Record<string, T>>,
  mergeCache: (resultById: Record<string, T>) => Observable<Record<string, T>>
) {
  // TODO fix pipe not triggered
  if (keys.length === 0) {
    return of({});
  }

  // TODO work for video but not words. to investigate
  return checkIsCacheHit(keys).pipe(
    flatMap((isCacheHitById) => _.toPairs(isCacheHitById)),
    groupBy(([, isCacheHit]) => isCacheHit),
    mergeMap((group) =>
      group.pipe(
        toArray(),
        flatMap((pairs) => {
          const keys = _.flatten(pairs.map(([key]) => key));
          logger.debug('loadWithCustomCacheBulk', pairs, keys);
          if (group.key === true) {
            logger.debug('cache hit keys', keys);

            return loadCacheFn(keys);
          }
          logger.debug('cache miss keys', keys);

          return loadFn(keys).pipe(
            tap((resultByKey) => logger.debug('fetch result', resultByKey)),
            flatMap((resultByKey) =>
              of(resultByKey).pipe(
                flatMap((resultByKey) => {
                  if (mergeCache) {
                    logger.trace(
                      'mergeCache',
                      _.keys(resultByKey).length,
                      resultByKey
                    );

                    return mergeCache(resultByKey);
                  }

                  // do not return empty which will discard results
                  return of({});
                }),
                map(() => resultByKey)
              )
            )
          );
        })
      )
    ),
    toArray(),
    map((results) => {
      logger.trace('cache results', results);

      // @ts-ignore
      return _.merge(...results);
    }),
    first()
  );
}
