From dab8fc458420b5773a47ba232aabe089d0ff45b5 Mon Sep 17 00:00:00 2001 From: Clworld Date: Sun, 4 Jun 2017 03:50:53 +0900 Subject: [PATCH] Execute PushUpdateWorker only for accounts who uses StreamingAPI just now. (#3278) * Add redis key "subscribed:timeline:#{account.id}" to indicate active streaming API listeners exists. * Add endpoint for notification only stream. * Run PushUpdateWorker only for users uses Streaming API now. * Move close hander streamTo(Http/Ws) -> stream(Http/Ws)End (Deal with #3370) * Add stream type for stream start log message. --- app/lib/feed_manager.rb | 6 ++++- streaming/index.js | 52 ++++++++++++++++++++++++++++++++++------- 2 files changed, 49 insertions(+), 9 deletions(-) diff --git a/app/lib/feed_manager.rb b/app/lib/feed_manager.rb index c2d3a2e2cd..86928fa36d 100644 --- a/app/lib/feed_manager.rb +++ b/app/lib/feed_manager.rb @@ -34,7 +34,7 @@ class FeedManager trim(timeline_type, account.id) end - PushUpdateWorker.perform_async(account.id, status.id) + PushUpdateWorker.perform_async(account.id, status.id) if push_update_required?(timeline_type, account.id) end def trim(type, account_id) @@ -43,6 +43,10 @@ class FeedManager redis.zremrangebyscore(key(type, account_id), '-inf', "(#{last.last}") end + def push_update_required?(timeline_type, account_id) + timeline_type != :home || redis.get("subscribed:timeline:#{account_id}").present? + end + def merge_into_timeline(from_account, into_account) timeline_key = key(:home, into_account.id) query = from_account.statuses.limit(FeedManager::MAX_ITEMS / 4) diff --git a/streaming/index.js b/streaming/index.js index 0411ae8efe..d77ca63ff5 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -110,11 +110,12 @@ const startWorker = (workerId) => { const redisPrefix = redisNamespace ? `${redisNamespace}:` : ''; + const redisSubscribeClient = redisUrlToClient(redisParams, process.env.REDIS_URL); const redisClient = redisUrlToClient(redisParams, process.env.REDIS_URL); const subs = {}; - redisClient.on('pmessage', (_, channel, message) => { + redisSubscribeClient.on('pmessage', (_, channel, message) => { const callbacks = subs[channel]; log.silly(`New message on channel ${channel}`); @@ -126,7 +127,19 @@ const startWorker = (workerId) => { callbacks.forEach(callback => callback(message)); }); - redisClient.psubscribe(`${redisPrefix}timeline:*`); + redisSubscribeClient.psubscribe(`${redisPrefix}timeline:*`); + + const subscriptionHeartbeat = (channel) => { + const interval = 6*60; + const tellSubscribed = () => { + redisClient.set(`${redisPrefix}subscribed:${channel}`, '1', 'EX', interval*3); + }; + tellSubscribed(); + const heartbeat = setInterval(tellSubscribed, interval*1000); + return () => { + clearInterval(heartbeat); + }; + }; const subscribe = (channel, callback) => { log.silly(`Adding listener for ${channel}`); @@ -231,8 +244,9 @@ const startWorker = (workerId) => { const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', '); - const streamFrom = (id, req, output, attachCloseHandler, needsFiltering = false) => { - log.verbose(req.requestId, `Starting stream from ${id} for ${req.accountId}`); + const streamFrom = (id, req, output, attachCloseHandler, needsFiltering = false, notificationOnly = false) => { + const streamType = notificationOnly ? ' (notification)' : ''; + log.verbose(req.requestId, `Starting stream from ${id} for ${req.accountId}${streamType}`); const listener = message => { const { event, payload, queued_at } = JSON.parse(message); @@ -245,6 +259,10 @@ const startWorker = (workerId) => { output(event, payload); }; + if (notificationOnly && event !== 'notification') { + return; + } + // Only messages that may require filtering are statuses, since notifications // are already personalized and deletes do not matter if (needsFiltering && event === 'update') { @@ -313,9 +331,12 @@ const startWorker = (workerId) => { }; // Setup stream end for HTTP - const streamHttpEnd = req => (id, listener) => { + const streamHttpEnd = (req, closeHandler = false) => (id, listener) => { req.on('close', () => { unsubscribe(id, listener); + if (closeHandler) { + closeHandler(); + } }); }; @@ -330,15 +351,21 @@ const startWorker = (workerId) => { }; // Setup stream end for WebSockets - const streamWsEnd = (req, ws) => (id, listener) => { + const streamWsEnd = (req, ws, closeHandler = false) => (id, listener) => { ws.on('close', () => { log.verbose(req.requestId, `Ending stream for ${req.accountId}`); unsubscribe(id, listener); + if (closeHandler) { + closeHandler(); + } }); ws.on('error', e => { log.verbose(req.requestId, `Ending stream for ${req.accountId}`); unsubscribe(id, listener); + if (closeHandler) { + closeHandler(); + } }); }; @@ -348,7 +375,12 @@ const startWorker = (workerId) => { app.use(errorMiddleware); app.get('/api/v1/streaming/user', (req, res) => { - streamFrom(`timeline:${req.accountId}`, req, streamToHttp(req, res), streamHttpEnd(req)); + const channel = `timeline:${req.accountId}`; + streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req, subscriptionHeartbeat(channel))); + }); + + app.get('/api/v1/streaming/user/notification', (req, res) => { + streamFrom(`timeline:${req.accountId}`, req, streamToHttp(req, res), streamHttpEnd(req), false, true); }); app.get('/api/v1/streaming/public', (req, res) => { @@ -382,7 +414,11 @@ const startWorker = (workerId) => { switch(location.query.stream) { case 'user': - streamFrom(`timeline:${req.accountId}`, req, streamToWs(req, ws), streamWsEnd(req, ws)); + const channel = `timeline:${req.accountId}`; + streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel))); + break; + case 'user:notification': + streamFrom(`timeline:${req.accountId}`, req, streamToWs(req, ws), streamWsEnd(req, ws), false, true); break; case 'public': streamFrom('timeline:public', req, streamToWs(req, ws), streamWsEnd(req, ws), true);