From c73a1fb5377556884ffab97fe95e98b92a027ebe Mon Sep 17 00:00:00 2001 From: masarakki Date: Thu, 16 Nov 2017 00:04:15 +0900 Subject: [PATCH] reusable-streaming (#5709) --- app/javascript/mastodon/actions/streaming.js | 57 +++--------------- app/javascript/mastodon/stream.js | 61 ++++++++++++++++++++ 2 files changed, 69 insertions(+), 49 deletions(-) diff --git a/app/javascript/mastodon/actions/streaming.js b/app/javascript/mastodon/actions/streaming.js index 7802694a3c..dcce048cab 100644 --- a/app/javascript/mastodon/actions/streaming.js +++ b/app/javascript/mastodon/actions/streaming.js @@ -1,4 +1,4 @@ -import createStream from '../stream'; +import { connectStream } from '../stream'; import { updateTimeline, deleteFromTimelines, @@ -12,42 +12,19 @@ import { getLocale } from '../locales'; const { messages } = getLocale(); export function connectTimelineStream (timelineId, path, pollingRefresh = null) { - return (dispatch, getState) => { - const streamingAPIBaseURL = getState().getIn(['meta', 'streaming_api_base_url']); - const accessToken = getState().getIn(['meta', 'access_token']); - const locale = getState().getIn(['meta', 'locale']); - let polling = null; - - const setupPolling = () => { - polling = setInterval(() => { - pollingRefresh(dispatch); - }, 20000); - }; - - const clearPolling = () => { - if (polling) { - clearInterval(polling); - polling = null; - } - }; - - const subscription = createStream(streamingAPIBaseURL, accessToken, path, { - connected () { - if (pollingRefresh) { - clearPolling(); - } + return connectStream (path, pollingRefresh, (dispatch, getState) => { + const locale = getState().getIn(['meta', 'locale']); + return { + onConnect() { dispatch(connectTimeline(timelineId)); }, - disconnected () { - if (pollingRefresh) { - setupPolling(); - } + onDisconnect() { dispatch(disconnectTimeline(timelineId)); }, - received (data) { + onReceive (data) { switch(data.event) { case 'update': dispatch(updateTimeline(timelineId, JSON.parse(data.payload))); @@ -60,26 +37,8 @@ export function connectTimelineStream (timelineId, path, pollingRefresh = null) break; } }, - - reconnected () { - if (pollingRefresh) { - clearPolling(); - pollingRefresh(dispatch); - } - dispatch(connectTimeline(timelineId)); - }, - - }); - - const disconnect = () => { - if (subscription) { - subscription.close(); - } - clearPolling(); }; - - return disconnect; - }; + }); } function refreshHomeTimelineAndNotification (dispatch) { diff --git a/app/javascript/mastodon/stream.js b/app/javascript/mastodon/stream.js index 4b36082b2e..36c68ffc5b 100644 --- a/app/javascript/mastodon/stream.js +++ b/app/javascript/mastodon/stream.js @@ -1,5 +1,66 @@ import WebSocketClient from 'websocket.js'; +export function connectStream(path, pollingRefresh = null, callbacks = () => ({ onConnect() {}, onDisconnect() {}, onReceive() {} })) { + return (dispatch, getState) => { + const streamingAPIBaseURL = getState().getIn(['meta', 'streaming_api_base_url']); + const accessToken = getState().getIn(['meta', 'access_token']); + const { onConnect, onDisconnect, onReceive } = callbacks(dispatch, getState); + let polling = null; + + const setupPolling = () => { + polling = setInterval(() => { + pollingRefresh(dispatch); + }, 20000); + }; + + const clearPolling = () => { + if (polling) { + clearInterval(polling); + polling = null; + } + }; + + const subscription = getStream(streamingAPIBaseURL, accessToken, path, { + connected () { + if (pollingRefresh) { + clearPolling(); + } + onConnect(); + }, + + disconnected () { + if (pollingRefresh) { + setupPolling(); + } + onDisconnect(); + }, + + received (data) { + onReceive(data); + }, + + reconnected () { + if (pollingRefresh) { + clearPolling(); + pollingRefresh(dispatch); + } + onConnect(); + }, + + }); + + const disconnect = () => { + if (subscription) { + subscription.close(); + } + clearPolling(); + }; + + return disconnect; + }; +} + + export default function getStream(streamingAPIBaseURL, accessToken, stream, { connected, received, disconnected, reconnected }) { const ws = new WebSocketClient(`${streamingAPIBaseURL}/api/v1/streaming/?access_token=${accessToken}&stream=${stream}`);