You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

73 lines
1.8 KiB

  1. import WebSocketClient from 'websocket.js';
  2. export function connectStream(path, pollingRefresh = null, callbacks = () => ({ onConnect() {}, onDisconnect() {}, onReceive() {} })) {
  3. return (dispatch, getState) => {
  4. const streamingAPIBaseURL = getState().getIn(['meta', 'streaming_api_base_url']);
  5. const accessToken = getState().getIn(['meta', 'access_token']);
  6. const { onConnect, onDisconnect, onReceive } = callbacks(dispatch, getState);
  7. let polling = null;
  8. const setupPolling = () => {
  9. polling = setInterval(() => {
  10. pollingRefresh(dispatch);
  11. }, 20000);
  12. };
  13. const clearPolling = () => {
  14. if (polling) {
  15. clearInterval(polling);
  16. polling = null;
  17. }
  18. };
  19. const subscription = getStream(streamingAPIBaseURL, accessToken, path, {
  20. connected () {
  21. if (pollingRefresh) {
  22. clearPolling();
  23. }
  24. onConnect();
  25. },
  26. disconnected () {
  27. if (pollingRefresh) {
  28. setupPolling();
  29. }
  30. onDisconnect();
  31. },
  32. received (data) {
  33. onReceive(data);
  34. },
  35. reconnected () {
  36. if (pollingRefresh) {
  37. clearPolling();
  38. pollingRefresh(dispatch);
  39. }
  40. onConnect();
  41. },
  42. });
  43. const disconnect = () => {
  44. if (subscription) {
  45. subscription.close();
  46. }
  47. clearPolling();
  48. };
  49. return disconnect;
  50. };
  51. }
  52. export default function getStream(streamingAPIBaseURL, accessToken, stream, { connected, received, disconnected, reconnected }) {
  53. const ws = new WebSocketClient(`${streamingAPIBaseURL}/api/v1/streaming/?access_token=${accessToken}&stream=${stream}`);
  54. ws.onopen = connected;
  55. ws.onmessage = e => received(JSON.parse(e.data));
  56. ws.onclose = disconnected;
  57. ws.onreconnect = reconnected;
  58. return ws;
  59. };