// @flow
import { buffers, channel } from 'redux-saga'
import { call, flush, fork, take } from 'redux-saga/effects'
import AppMonitoring from 'utilities/monitoring'

type CreateQueueProps = {
  concurrent: number, // number of workers (set this to 1 when you need to maintain the order of events)
}

/*
 * Creates a queue with specified number of workers.
 *
 * Returns the queue and a generator function to start the worker threads.
 *
 * Example usage:
 *
 * function * initQueue () {
 *   const { queue, workers } = createQueue({ concurrent: 1 })
 *   yield fork(workers) // start the workers
 *
 *   yield queue.put({
 *     handler: jobFn,
 *     payload: {}
 *   }) // enqueue a job
 * }
 *
 * function * jobFn (payload) { ... }
 */
export function * createQueue ({ concurrent = 1 }: CreateQueueProps) : Generator<any, any, any> {
  // create a channel to queue incoming requests
  const buffer = buffers.fixed() // defaults to a fixed buffer with maximum of 10 items
  const queue = yield call(channel, buffer)

  function * workers () {
    // create N worker 'threads'
    for (var i = 0; i < concurrent; i++) {
      yield fork(handleRequest, queue)
    }
  }

  function * handleRequest (channel) {
    while (true) {
      const request = yield take(channel)
      const { handler, payload, errorHandler } = request

      try {
        yield handler(payload)
      } catch (e) {
        const unprocessedRequests = yield flush(channel)

        if (errorHandler) {
          yield errorHandler([request].concat(unprocessedRequests))
        } else {
          AppMonitoring.noticeError(e)
          throw e // re-raise error
        }
      }
    }
  }

  return {
    buffer,
    queue,
    workers
  }
}

export function * enqueueJob (channel: ?Object, job: Object) : Generator<any, any, any> {
  const { queue } = channel || {}
  if (!queue) throw new Error('Error enqueuing job: queue is undefined')
  yield queue.put(job)
}
