import { eventChannel } from "redux-saga";
import { all, put, takeEvery, fork, call } from "redux-saga/effects";
import Feed, { EventType } from "@dxfeed/api";
import moment from "moment";
import { wsUrl } from "../actions/helpers";
import isEqual from "lodash/isEqual";

// Action Types
export const DXFEED_SOCKET_LISTEN = "DX/LISTEN/TOP";
export const DXFEED_SOCKET_SUCCESS = "DX/SUB/TOP/SUCCESS";
export const DXFEED_SOCKET_FAILURE = "DX/SUB/TOP/FAILURE";

let connected = false;
const marketOpenTime = moment.tz("09:30:00", "HH:mm:ss", "America/New_York");
const hourPastOpen = moment.tz("09:45:00", "HH:mm:ss", "America/New_York");
const hourBeforeClose = moment.tz("15:45:00", "HH:mm:ss", "America/New_York");
const marketAfterHoursTime = moment.tz(
  "16:00:00",
  "HH:mm:ss",
  "America/New_York",
);
let extraThrottle = false;

// Action Creators
export const dxFeedSocketListen = (
  symbols,
  options = false,
  disconnect = false,
) => ({
  type: DXFEED_SOCKET_LISTEN,
  symbols,
  options,
  disconnect,
});
export const dxFeedSocketSuccess = (data, unsub = false) => ({
  type: DXFEED_SOCKET_SUCCESS,
  data,
  unsub,
});
export const dxFeedSocketFailure = (error) => ({
  type: DXFEED_SOCKET_FAILURE,
  error,
});

let feed;
let unsub;
let socket;
let i = 0;

const connect = async (disconnect) => {
  if (!!feed || disconnect) {
    feed.disconnect();
    feed = null;
  }
  feed = new Feed();
  feed.connect(wsUrl);
  return new Promise((resolve) => {
    resolve(feed);
  });
};

const createSocketChannel = (feed, symbols, options, disconnect) =>
  eventChannel((emit) => {
    if (!feed) {
      connect(false);
    }
    if ((!!symbols.length || disconnect) && !symbols.includes("CHAT")) {
      unsub && unsub();
      let eventType = options ? EventType.TimeAndSale : EventType.Candle;
      unsub = feed.subscribe([eventType], symbols, (event) => {
        if (options) {
          const { price, askPrice, bidPrice, eventSymbol } = event;
          const payload = {
            price,
            ticker: eventSymbol,
            options,
            askPrice,
            bidPrice,
          };
          if (extraThrottle) i % 10 === 0 && emit(payload);
          else emit(payload);
          i++;
        } else {
          const { close: price, eventSymbol } = event;
          const payload = { price, ticker: eventSymbol, options };
          const throttle = -3;
          i % (extraThrottle ? 15 : 4 + throttle) === 0 && emit(payload);
          i++;
        }
      });
    }
    return () => {
      unsub();
    };
  });

function* listenSocket(action) {
  try {
    const today = moment().tz("America/New_York");
    extraThrottle =
      (today.isBefore(hourPastOpen) && today.isAfter(marketOpenTime)) ||
      (today.isBefore(marketAfterHoursTime) && today.isAfter(hourBeforeClose));

    const { symbols, options, disconnect } = action;

    if (!connected) {
      socket = yield call(() => connect(disconnect));
      connected = true;
    }

    const uniqueSubs = symbols.filter((symbol) => !symbol.includes("options"));
    const socketChannel = yield call(
      createSocketChannel,
      socket,
      uniqueSubs,
      options,
      disconnect,
    );
    let socketData = new Map();

    yield fork(function* () {
      yield takeEvery(socketChannel, function* (payload) {
        const previousValue = socketData.get(payload.ticker);
        const newValue = payload.ticker[0] === "." ? payload : payload.price;

        const shouldUpdate =
          payload.ticker[0] === "."
            ? !isEqual(previousValue, newValue)
            : previousValue !== newValue;
        if (shouldUpdate) {
          socketData.set(payload.ticker, newValue);
          const filteredPayload = {
            socketData: new Map(socketData),
            currentSubs: uniqueSubs,
          };
          yield put(dxFeedSocketSuccess(filteredPayload));
        }

        if (disconnect || !symbols.length) {
          socketData.clear();
          yield put(
            dxFeedSocketSuccess({ socketData: new Map(), currentSubs: [] }),
          );
        }
      });
    });
  } catch (error) {
    yield put(dxFeedSocketFailure(error));
  }
}

function* listenSocketLoad() {
  yield takeEvery(DXFEED_SOCKET_LISTEN, listenSocket);
}

// Root Saga
export function* saga() {
  const listen = yield fork(listenSocketLoad);
  yield all([listen]);
}

const INIT_STATE = {
  currentSubs: [],
  socketData: new Map(),
};
// Reducer
const reducer = (state = INIT_STATE, action) => {
  switch (action.type) {
    case DXFEED_SOCKET_SUCCESS:
      return {
        ...state,
        currentSubs: action.data.currentSubs,
        socketData: new Map(action.data.socketData),
      };
    default:
      return state;
  }
};

export default reducer;
