You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

265 lines
6.7 KiB

  1. // @ts-check
  2. import WebSocketClient from '@gamestdio/websocket';
  3. /**
  4. * @type {WebSocketClient | undefined}
  5. */
  6. let sharedConnection;
  7. /**
  8. * @typedef Subscription
  9. * @property {string} channelName
  10. * @property {Object.<string, string>} params
  11. * @property {function(): void} onConnect
  12. * @property {function(StreamEvent): void} onReceive
  13. * @property {function(): void} onDisconnect
  14. */
  15. /**
  16. * @typedef StreamEvent
  17. * @property {string} event
  18. * @property {object} payload
  19. */
  20. /**
  21. * @type {Array.<Subscription>}
  22. */
  23. const subscriptions = [];
  24. /**
  25. * @type {Object.<string, number>}
  26. */
  27. const subscriptionCounters = {};
  28. /**
  29. * @param {Subscription} subscription
  30. */
  31. const addSubscription = subscription => {
  32. subscriptions.push(subscription);
  33. };
  34. /**
  35. * @param {Subscription} subscription
  36. */
  37. const removeSubscription = subscription => {
  38. const index = subscriptions.indexOf(subscription);
  39. if (index !== -1) {
  40. subscriptions.splice(index, 1);
  41. }
  42. };
  43. /**
  44. * @param {Subscription} subscription
  45. */
  46. const subscribe = ({ channelName, params, onConnect }) => {
  47. const key = channelNameWithInlineParams(channelName, params);
  48. subscriptionCounters[key] = subscriptionCounters[key] || 0;
  49. if (subscriptionCounters[key] === 0) {
  50. sharedConnection.send(JSON.stringify({ type: 'subscribe', stream: channelName, ...params }));
  51. }
  52. subscriptionCounters[key] += 1;
  53. onConnect();
  54. };
  55. /**
  56. * @param {Subscription} subscription
  57. */
  58. const unsubscribe = ({ channelName, params, onDisconnect }) => {
  59. const key = channelNameWithInlineParams(channelName, params);
  60. subscriptionCounters[key] = subscriptionCounters[key] || 1;
  61. if (subscriptionCounters[key] === 1 && sharedConnection.readyState === WebSocketClient.OPEN) {
  62. sharedConnection.send(JSON.stringify({ type: 'unsubscribe', stream: channelName, ...params }));
  63. }
  64. subscriptionCounters[key] -= 1;
  65. onDisconnect();
  66. };
  67. const sharedCallbacks = {
  68. connected () {
  69. subscriptions.forEach(subscription => subscribe(subscription));
  70. },
  71. received (data) {
  72. const { stream } = data;
  73. subscriptions.filter(({ channelName, params }) => {
  74. const streamChannelName = stream[0];
  75. if (stream.length === 1) {
  76. return channelName === streamChannelName;
  77. }
  78. const streamIdentifier = stream[1];
  79. if (['hashtag', 'hashtag:local'].includes(channelName)) {
  80. return channelName === streamChannelName && params.tag === streamIdentifier;
  81. } else if (channelName === 'list') {
  82. return channelName === streamChannelName && params.list === streamIdentifier;
  83. }
  84. return false;
  85. }).forEach(subscription => {
  86. subscription.onReceive(data);
  87. });
  88. },
  89. disconnected () {
  90. subscriptions.forEach(subscription => unsubscribe(subscription));
  91. },
  92. reconnected () {
  93. },
  94. };
  95. /**
  96. * @param {string} channelName
  97. * @param {Object.<string, string>} params
  98. * @return {string}
  99. */
  100. const channelNameWithInlineParams = (channelName, params) => {
  101. if (Object.keys(params).length === 0) {
  102. return channelName;
  103. }
  104. return `${channelName}&${Object.keys(params).map(key => `${key}=${params[key]}`).join('&')}`;
  105. };
  106. /**
  107. * @param {string} channelName
  108. * @param {Object.<string, string>} params
  109. * @param {function(Function, Function): { onConnect: (function(): void), onReceive: (function(StreamEvent): void), onDisconnect: (function(): void) }} callbacks
  110. * @return {function(): void}
  111. */
  112. export const connectStream = (channelName, params, callbacks) => (dispatch, getState) => {
  113. const streamingAPIBaseURL = getState().getIn(['meta', 'streaming_api_base_url']);
  114. const accessToken = getState().getIn(['meta', 'access_token']);
  115. const { onConnect, onReceive, onDisconnect } = callbacks(dispatch, getState);
  116. // If we cannot use a websockets connection, we must fall back
  117. // to using individual connections for each channel
  118. if (!streamingAPIBaseURL.startsWith('ws')) {
  119. const connection = createConnection(streamingAPIBaseURL, accessToken, channelNameWithInlineParams(channelName, params), {
  120. connected () {
  121. onConnect();
  122. },
  123. received (data) {
  124. onReceive(data);
  125. },
  126. disconnected () {
  127. onDisconnect();
  128. },
  129. reconnected () {
  130. onConnect();
  131. },
  132. });
  133. return () => {
  134. connection.close();
  135. };
  136. }
  137. const subscription = {
  138. channelName,
  139. params,
  140. onConnect,
  141. onReceive,
  142. onDisconnect,
  143. };
  144. addSubscription(subscription);
  145. // If a connection is open, we can execute the subscription right now. Otherwise,
  146. // because we have already registered it, it will be executed on connect
  147. if (!sharedConnection) {
  148. sharedConnection = /** @type {WebSocketClient} */ (createConnection(streamingAPIBaseURL, accessToken, '', sharedCallbacks));
  149. } else if (sharedConnection.readyState === WebSocketClient.OPEN) {
  150. subscribe(subscription);
  151. }
  152. return () => {
  153. removeSubscription(subscription);
  154. unsubscribe(subscription);
  155. };
  156. };
  157. const KNOWN_EVENT_TYPES = [
  158. 'update',
  159. 'delete',
  160. 'notification',
  161. 'conversation',
  162. 'filters_changed',
  163. 'encrypted_message',
  164. 'announcement',
  165. 'announcement.delete',
  166. 'announcement.reaction',
  167. ];
  168. /**
  169. * @param {MessageEvent} e
  170. * @param {function(StreamEvent): void} received
  171. */
  172. const handleEventSourceMessage = (e, received) => {
  173. received({
  174. event: e.type,
  175. payload: e.data,
  176. });
  177. };
  178. /**
  179. * @param {string} streamingAPIBaseURL
  180. * @param {string} accessToken
  181. * @param {string} channelName
  182. * @param {{ connected: Function, received: function(StreamEvent): void, disconnected: Function, reconnected: Function }} callbacks
  183. * @return {WebSocketClient | EventSource}
  184. */
  185. const createConnection = (streamingAPIBaseURL, accessToken, channelName, { connected, received, disconnected, reconnected }) => {
  186. const params = channelName.split('&');
  187. channelName = params.shift();
  188. if (streamingAPIBaseURL.startsWith('ws')) {
  189. const ws = new WebSocketClient(`${streamingAPIBaseURL}/api/v1/streaming/?${params.join('&')}`, accessToken);
  190. ws.onopen = connected;
  191. ws.onmessage = e => received(JSON.parse(e.data));
  192. ws.onclose = disconnected;
  193. ws.onreconnect = reconnected;
  194. return ws;
  195. }
  196. channelName = channelName.replace(/:/g, '/');
  197. if (channelName.endsWith(':media')) {
  198. channelName = channelName.replace('/media', '');
  199. params.push('only_media=true');
  200. }
  201. params.push(`access_token=${accessToken}`);
  202. const es = new EventSource(`${streamingAPIBaseURL}/api/v1/streaming/${channelName}?${params.join('&')}`);
  203. es.onopen = () => {
  204. connected();
  205. };
  206. KNOWN_EVENT_TYPES.forEach(type => {
  207. es.addEventListener(type, e => handleEventSourceMessage(/** @type {MessageEvent} */ (e), received));
  208. });
  209. es.onerror = /** @type {function(): void} */ (disconnected);
  210. return es;
  211. };