You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1026 lines
28 KiB

  1. // @ts-check
  2. const os = require('os');
  3. const throng = require('throng');
  4. const dotenv = require('dotenv');
  5. const express = require('express');
  6. const http = require('http');
  7. const redis = require('redis');
  8. const pg = require('pg');
  9. const log = require('npmlog');
  10. const url = require('url');
  11. const { WebSocketServer } = require('@clusterws/cws');
  12. const uuid = require('uuid');
  13. const fs = require('fs');
  14. const env = process.env.NODE_ENV || 'development';
  15. const alwaysRequireAuth = process.env.LIMITED_FEDERATION_MODE === 'true' || process.env.WHITELIST_MODE === 'true' || process.env.AUTHORIZED_FETCH === 'true';
  16. dotenv.config({
  17. path: env === 'production' ? '.env.production' : '.env',
  18. });
  19. log.level = process.env.LOG_LEVEL || 'verbose';
  20. /**
  21. * @param {string} dbUrl
  22. * @return {Object.<string, any>}
  23. */
  24. const dbUrlToConfig = (dbUrl) => {
  25. if (!dbUrl) {
  26. return {};
  27. }
  28. const params = url.parse(dbUrl, true);
  29. const config = {};
  30. if (params.auth) {
  31. [config.user, config.password] = params.auth.split(':');
  32. }
  33. if (params.hostname) {
  34. config.host = params.hostname;
  35. }
  36. if (params.port) {
  37. config.port = params.port;
  38. }
  39. if (params.pathname) {
  40. config.database = params.pathname.split('/')[1];
  41. }
  42. const ssl = params.query && params.query.ssl;
  43. if (ssl && ssl === 'true' || ssl === '1') {
  44. config.ssl = true;
  45. }
  46. return config;
  47. };
  48. /**
  49. * @param {Object.<string, any>} defaultConfig
  50. * @param {string} redisUrl
  51. */
  52. const redisUrlToClient = (defaultConfig, redisUrl) => {
  53. const config = defaultConfig;
  54. if (!redisUrl) {
  55. return redis.createClient(config);
  56. }
  57. if (redisUrl.startsWith('unix://')) {
  58. return redis.createClient(redisUrl.slice(7), config);
  59. }
  60. return redis.createClient(Object.assign(config, {
  61. url: redisUrl,
  62. }));
  63. };
  64. const numWorkers = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1));
  65. const startMaster = () => {
  66. if (!process.env.SOCKET && process.env.PORT && isNaN(+process.env.PORT)) {
  67. log.warn('UNIX domain socket is now supported by using SOCKET. Please migrate from PORT hack.');
  68. }
  69. log.info(`Starting streaming API server master with ${numWorkers} workers`);
  70. };
  71. const startWorker = (workerId) => {
  72. log.info(`Starting worker ${workerId}`);
  73. const pgConfigs = {
  74. development: {
  75. user: process.env.DB_USER || pg.defaults.user,
  76. password: process.env.DB_PASS || pg.defaults.password,
  77. database: process.env.DB_NAME || 'mastodon_development',
  78. host: process.env.DB_HOST || pg.defaults.host,
  79. port: process.env.DB_PORT || pg.defaults.port,
  80. max: 10,
  81. },
  82. production: {
  83. user: process.env.DB_USER || 'mastodon',
  84. password: process.env.DB_PASS || '',
  85. database: process.env.DB_NAME || 'mastodon_production',
  86. host: process.env.DB_HOST || 'localhost',
  87. port: process.env.DB_PORT || 5432,
  88. max: 10,
  89. },
  90. };
  91. if (!!process.env.DB_SSLMODE && process.env.DB_SSLMODE !== 'disable') {
  92. pgConfigs.development.ssl = true;
  93. pgConfigs.production.ssl = true;
  94. }
  95. const app = express();
  96. app.set('trusted proxy', process.env.TRUSTED_PROXY_IP || 'loopback,uniquelocal');
  97. const pgPool = new pg.Pool(Object.assign(pgConfigs[env], dbUrlToConfig(process.env.DATABASE_URL)));
  98. const server = http.createServer(app);
  99. const redisNamespace = process.env.REDIS_NAMESPACE || null;
  100. const redisParams = {
  101. host: process.env.REDIS_HOST || '127.0.0.1',
  102. port: process.env.REDIS_PORT || 6379,
  103. db: process.env.REDIS_DB || 0,
  104. password: process.env.REDIS_PASSWORD || undefined,
  105. };
  106. if (redisNamespace) {
  107. redisParams.namespace = redisNamespace;
  108. }
  109. const redisPrefix = redisNamespace ? `${redisNamespace}:` : '';
  110. const redisSubscribeClient = redisUrlToClient(redisParams, process.env.REDIS_URL);
  111. const redisClient = redisUrlToClient(redisParams, process.env.REDIS_URL);
  112. /**
  113. * @type {Object.<string, Array.<function(string): void>>}
  114. */
  115. const subs = {};
  116. redisSubscribeClient.on('message', (channel, message) => {
  117. const callbacks = subs[channel];
  118. log.silly(`New message on channel ${channel}`);
  119. if (!callbacks) {
  120. return;
  121. }
  122. callbacks.forEach(callback => callback(message));
  123. });
  124. /**
  125. * @param {string[]} channels
  126. * @return {function(): void}
  127. */
  128. const subscriptionHeartbeat = channels => {
  129. const interval = 6 * 60;
  130. const tellSubscribed = () => {
  131. channels.forEach(channel => redisClient.set(`${redisPrefix}subscribed:${channel}`, '1', 'EX', interval * 3));
  132. };
  133. tellSubscribed();
  134. const heartbeat = setInterval(tellSubscribed, interval * 1000);
  135. return () => {
  136. clearInterval(heartbeat);
  137. };
  138. };
  139. /**
  140. * @param {string} channel
  141. * @param {function(string): void} callback
  142. */
  143. const subscribe = (channel, callback) => {
  144. log.silly(`Adding listener for ${channel}`);
  145. subs[channel] = subs[channel] || [];
  146. if (subs[channel].length === 0) {
  147. log.verbose(`Subscribe ${channel}`);
  148. redisSubscribeClient.subscribe(channel);
  149. }
  150. subs[channel].push(callback);
  151. };
  152. /**
  153. * @param {string} channel
  154. * @param {function(string): void} callback
  155. */
  156. const unsubscribe = (channel, callback) => {
  157. log.silly(`Removing listener for ${channel}`);
  158. if (!subs[channel]) {
  159. return;
  160. }
  161. subs[channel] = subs[channel].filter(item => item !== callback);
  162. if (subs[channel].length === 0) {
  163. log.verbose(`Unsubscribe ${channel}`);
  164. redisSubscribeClient.unsubscribe(channel);
  165. delete subs[channel];
  166. }
  167. };
  168. const FALSE_VALUES = [
  169. false,
  170. 0,
  171. "0",
  172. "f",
  173. "F",
  174. "false",
  175. "FALSE",
  176. "off",
  177. "OFF"
  178. ];
  179. /**
  180. * @param {any} value
  181. * @return {boolean}
  182. */
  183. const isTruthy = value =>
  184. value && !FALSE_VALUES.includes(value);
  185. /**
  186. * @param {any} req
  187. * @param {any} res
  188. * @param {function(Error=): void}
  189. */
  190. const allowCrossDomain = (req, res, next) => {
  191. res.header('Access-Control-Allow-Origin', '*');
  192. res.header('Access-Control-Allow-Headers', 'Authorization, Accept, Cache-Control');
  193. res.header('Access-Control-Allow-Methods', 'GET, OPTIONS');
  194. next();
  195. };
  196. /**
  197. * @param {any} req
  198. * @param {any} res
  199. * @param {function(Error=): void}
  200. */
  201. const setRequestId = (req, res, next) => {
  202. req.requestId = uuid.v4();
  203. res.header('X-Request-Id', req.requestId);
  204. next();
  205. };
  206. /**
  207. * @param {any} req
  208. * @param {any} res
  209. * @param {function(Error=): void}
  210. */
  211. const setRemoteAddress = (req, res, next) => {
  212. req.remoteAddress = req.connection.remoteAddress;
  213. next();
  214. };
  215. /**
  216. * @param {string} token
  217. * @param {any} req
  218. * @return {Promise.<void>}
  219. */
  220. const accountFromToken = (token, req) => new Promise((resolve, reject) => {
  221. pgPool.connect((err, client, done) => {
  222. if (err) {
  223. reject(err);
  224. return;
  225. }
  226. client.query('SELECT oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes, devices.device_id FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id LEFT OUTER JOIN devices ON oauth_access_tokens.id = devices.access_token_id WHERE oauth_access_tokens.token = $1 AND oauth_access_tokens.revoked_at IS NULL LIMIT 1', [token], (err, result) => {
  227. done();
  228. if (err) {
  229. reject(err);
  230. return;
  231. }
  232. if (result.rows.length === 0) {
  233. err = new Error('Invalid access token');
  234. err.status = 401;
  235. reject(err);
  236. return;
  237. }
  238. req.scopes = result.rows[0].scopes.split(' ');
  239. req.accountId = result.rows[0].account_id;
  240. req.chosenLanguages = result.rows[0].chosen_languages;
  241. req.allowNotifications = req.scopes.some(scope => ['read', 'read:notifications'].includes(scope));
  242. req.deviceId = result.rows[0].device_id;
  243. resolve();
  244. });
  245. });
  246. });
  247. /**
  248. * @param {any} req
  249. * @param {boolean=} required
  250. * @return {Promise.<void>}
  251. */
  252. const accountFromRequest = (req, required = true) => new Promise((resolve, reject) => {
  253. const authorization = req.headers.authorization;
  254. const location = url.parse(req.url, true);
  255. const accessToken = location.query.access_token || req.headers['sec-websocket-protocol'];
  256. if (!authorization && !accessToken) {
  257. if (required) {
  258. const err = new Error('Missing access token');
  259. err.status = 401;
  260. reject(err);
  261. return;
  262. } else {
  263. resolve();
  264. return;
  265. }
  266. }
  267. const token = authorization ? authorization.replace(/^Bearer /, '') : accessToken;
  268. resolve(accountFromToken(token, req));
  269. });
  270. /**
  271. * @param {any} req
  272. * @return {string}
  273. */
  274. const channelNameFromPath = req => {
  275. const { path, query } = req;
  276. const onlyMedia = isTruthy(query.only_media);
  277. switch(path) {
  278. case '/api/v1/streaming/user':
  279. return 'user';
  280. case '/api/v1/streaming/user/notification':
  281. return 'user:notification';
  282. case '/api/v1/streaming/public':
  283. return onlyMedia ? 'public:media' : 'public';
  284. case '/api/v1/streaming/public/local':
  285. return onlyMedia ? 'public:local:media' : 'public:local';
  286. case '/api/v1/streaming/public/remote':
  287. return onlyMedia ? 'public:remote:media' : 'public:remote';
  288. case '/api/v1/streaming/hashtag':
  289. return 'hashtag';
  290. case '/api/v1/streaming/hashtag/local':
  291. return 'hashtag:local';
  292. case '/api/v1/streaming/direct':
  293. return 'direct';
  294. case '/api/v1/streaming/list':
  295. return 'list';
  296. }
  297. };
  298. const PUBLIC_CHANNELS = [
  299. 'public',
  300. 'public:media',
  301. 'public:local',
  302. 'public:local:media',
  303. 'public:remote',
  304. 'public:remote:media',
  305. 'hashtag',
  306. 'hashtag:local',
  307. ];
  308. /**
  309. * @param {any} req
  310. * @param {string} channelName
  311. * @return {Promise.<void>}
  312. */
  313. const checkScopes = (req, channelName) => new Promise((resolve, reject) => {
  314. log.silly(req.requestId, `Checking OAuth scopes for ${channelName}`);
  315. // When accessing public channels, no scopes are needed
  316. if (PUBLIC_CHANNELS.includes(channelName)) {
  317. resolve();
  318. return;
  319. }
  320. // The `read` scope has the highest priority, if the token has it
  321. // then it can access all streams
  322. const requiredScopes = ['read'];
  323. // When accessing specifically the notifications stream,
  324. // we need a read:notifications, while in all other cases,
  325. // we can allow access with read:statuses. Mind that the
  326. // user stream will not contain notifications unless
  327. // the token has either read or read:notifications scope
  328. // as well, this is handled separately.
  329. if (channelName === 'user:notification') {
  330. requiredScopes.push('read:notifications');
  331. } else {
  332. requiredScopes.push('read:statuses');
  333. }
  334. if (requiredScopes.some(requiredScope => req.scopes.includes(requiredScope))) {
  335. resolve();
  336. return;
  337. }
  338. const err = new Error('Access token does not cover required scopes');
  339. err.status = 401;
  340. reject(err);
  341. });
  342. /**
  343. * @param {any} info
  344. * @param {function(boolean, number, string): void} callback
  345. */
  346. const wsVerifyClient = (info, callback) => {
  347. // When verifying the websockets connection, we no longer pre-emptively
  348. // check OAuth scopes and drop the connection if they're missing. We only
  349. // drop the connection if access without token is not allowed by environment
  350. // variables. OAuth scope checks are moved to the point of subscription
  351. // to a specific stream.
  352. accountFromRequest(info.req, alwaysRequireAuth).then(() => {
  353. callback(true, undefined, undefined);
  354. }).catch(err => {
  355. log.error(info.req.requestId, err.toString());
  356. callback(false, 401, 'Unauthorized');
  357. });
  358. };
  359. /**
  360. * @param {any} req
  361. * @param {any} res
  362. * @param {function(Error=): void} next
  363. */
  364. const authenticationMiddleware = (req, res, next) => {
  365. if (req.method === 'OPTIONS') {
  366. next();
  367. return;
  368. }
  369. accountFromRequest(req, alwaysRequireAuth).then(() => checkScopes(req, channelNameFromPath(req))).then(() => {
  370. next();
  371. }).catch(err => {
  372. next(err);
  373. });
  374. };
  375. /**
  376. * @param {Error} err
  377. * @param {any} req
  378. * @param {any} res
  379. * @param {function(Error=): void} next
  380. */
  381. const errorMiddleware = (err, req, res, next) => {
  382. log.error(req.requestId, err.toString());
  383. if (res.headersSent) {
  384. return next(err);
  385. }
  386. res.writeHead(err.status || 500, { 'Content-Type': 'application/json' });
  387. res.end(JSON.stringify({ error: err.status ? err.toString() : 'An unexpected error occurred' }));
  388. };
  389. /**
  390. * @param {array}
  391. * @param {number=} shift
  392. * @return {string}
  393. */
  394. const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', ');
  395. /**
  396. * @param {string} listId
  397. * @param {any} req
  398. * @return {Promise.<void>}
  399. */
  400. const authorizeListAccess = (listId, req) => new Promise((resolve, reject) => {
  401. const { accountId } = req;
  402. pgPool.connect((err, client, done) => {
  403. if (err) {
  404. reject();
  405. return;
  406. }
  407. client.query('SELECT id, account_id FROM lists WHERE id = $1 LIMIT 1', [listId], (err, result) => {
  408. done();
  409. if (err || result.rows.length === 0 || result.rows[0].account_id !== accountId) {
  410. reject();
  411. return;
  412. }
  413. resolve();
  414. });
  415. });
  416. });
  417. /**
  418. * @param {string[]} ids
  419. * @param {any} req
  420. * @param {function(string, string): void} output
  421. * @param {function(string[], function(string): void): void} attachCloseHandler
  422. * @param {boolean=} needsFiltering
  423. * @param {boolean=} notificationOnly
  424. * @return {function(string): void}
  425. */
  426. const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false, notificationOnly = false) => {
  427. const accountId = req.accountId || req.remoteAddress;
  428. const streamType = notificationOnly ? ' (notification)' : '';
  429. log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}${streamType}`);
  430. const listener = message => {
  431. const { event, payload, queued_at } = JSON.parse(message);
  432. const transmit = () => {
  433. const now = new Date().getTime();
  434. const delta = now - queued_at;
  435. const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload;
  436. log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload} Delay: ${delta}ms`);
  437. output(event, encodedPayload);
  438. };
  439. if (notificationOnly && event !== 'notification') {
  440. return;
  441. }
  442. if (event === 'notification' && !req.allowNotifications) {
  443. return;
  444. }
  445. // Only messages that may require filtering are statuses, since notifications
  446. // are already personalized and deletes do not matter
  447. if (!needsFiltering || event !== 'update') {
  448. transmit();
  449. return;
  450. }
  451. const unpackedPayload = payload;
  452. const targetAccountIds = [unpackedPayload.account.id].concat(unpackedPayload.mentions.map(item => item.id));
  453. const accountDomain = unpackedPayload.account.acct.split('@')[1];
  454. if (Array.isArray(req.chosenLanguages) && unpackedPayload.language !== null && req.chosenLanguages.indexOf(unpackedPayload.language) === -1) {
  455. log.silly(req.requestId, `Message ${unpackedPayload.id} filtered by language (${unpackedPayload.language})`);
  456. return;
  457. }
  458. // When the account is not logged in, it is not necessary to confirm the block or mute
  459. if (!req.accountId) {
  460. transmit();
  461. return;
  462. }
  463. pgPool.connect((err, client, done) => {
  464. if (err) {
  465. log.error(err);
  466. return;
  467. }
  468. const queries = [
  469. client.query(`SELECT 1 FROM blocks WHERE (account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)})) OR (account_id = $2 AND target_account_id = $1) UNION SELECT 1 FROM mutes WHERE account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, unpackedPayload.account.id].concat(targetAccountIds)),
  470. ];
  471. if (accountDomain) {
  472. queries.push(client.query('SELECT 1 FROM account_domain_blocks WHERE account_id = $1 AND domain = $2', [req.accountId, accountDomain]));
  473. }
  474. Promise.all(queries).then(values => {
  475. done();
  476. if (values[0].rows.length > 0 || (values.length > 1 && values[1].rows.length > 0)) {
  477. return;
  478. }
  479. transmit();
  480. }).catch(err => {
  481. done();
  482. log.error(err);
  483. });
  484. });
  485. };
  486. ids.forEach(id => {
  487. subscribe(`${redisPrefix}${id}`, listener);
  488. });
  489. if (attachCloseHandler) {
  490. attachCloseHandler(ids.map(id => `${redisPrefix}${id}`), listener);
  491. }
  492. return listener;
  493. };
  494. /**
  495. * @param {any} req
  496. * @param {any} res
  497. * @return {function(string, string): void}
  498. */
  499. const streamToHttp = (req, res) => {
  500. const accountId = req.accountId || req.remoteAddress;
  501. res.setHeader('Content-Type', 'text/event-stream');
  502. res.setHeader('Cache-Control', 'no-store');
  503. res.setHeader('Transfer-Encoding', 'chunked');
  504. res.write(':)\n');
  505. const heartbeat = setInterval(() => res.write(':thump\n'), 15000);
  506. req.on('close', () => {
  507. log.verbose(req.requestId, `Ending stream for ${accountId}`);
  508. clearInterval(heartbeat);
  509. });
  510. return (event, payload) => {
  511. res.write(`event: ${event}\n`);
  512. res.write(`data: ${payload}\n\n`);
  513. };
  514. };
  515. /**
  516. * @param {any} req
  517. * @param {function(): void} [closeHandler]
  518. * @return {function(string[], function(string): void)}
  519. */
  520. const streamHttpEnd = (req, closeHandler = undefined) => (ids, listener) => {
  521. req.on('close', () => {
  522. ids.forEach(id => {
  523. unsubscribe(id, listener);
  524. });
  525. if (closeHandler) {
  526. closeHandler();
  527. }
  528. });
  529. };
  530. /**
  531. * @param {any} req
  532. * @param {any} ws
  533. * @param {string[]} streamName
  534. * @return {function(string, string): void}
  535. */
  536. const streamToWs = (req, ws, streamName) => (event, payload) => {
  537. if (ws.readyState !== ws.OPEN) {
  538. log.error(req.requestId, 'Tried writing to closed socket');
  539. return;
  540. }
  541. ws.send(JSON.stringify({ stream: streamName, event, payload }));
  542. };
  543. /**
  544. * @param {any} res
  545. */
  546. const httpNotFound = res => {
  547. res.writeHead(404, { 'Content-Type': 'application/json' });
  548. res.end(JSON.stringify({ error: 'Not found' }));
  549. };
  550. app.use(setRequestId);
  551. app.use(setRemoteAddress);
  552. app.use(allowCrossDomain);
  553. app.get('/api/v1/streaming/health', (req, res) => {
  554. res.writeHead(200, { 'Content-Type': 'text/plain' });
  555. res.end('OK');
  556. });
  557. app.use(authenticationMiddleware);
  558. app.use(errorMiddleware);
  559. app.get('/api/v1/streaming/*', (req, res) => {
  560. channelNameToIds(req, channelNameFromPath(req), req.query).then(({ channelIds, options }) => {
  561. const onSend = streamToHttp(req, res);
  562. const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds));
  563. streamFrom(channelIds, req, onSend, onEnd, options.needsFiltering, options.notificationOnly);
  564. }).catch(err => {
  565. log.verbose(req.requestId, 'Subscription error:', err.toString());
  566. httpNotFound(res);
  567. });
  568. });
  569. const wss = new WebSocketServer({ server, verifyClient: wsVerifyClient });
  570. /**
  571. * @typedef StreamParams
  572. * @property {string} [tag]
  573. * @property {string} [list]
  574. * @property {string} [only_media]
  575. */
  576. /**
  577. * @param {any} req
  578. * @param {string} name
  579. * @param {StreamParams} params
  580. * @return {Promise.<{ channelIds: string[], options: { needsFiltering: boolean, notificationOnly: boolean } }>}
  581. */
  582. const channelNameToIds = (req, name, params) => new Promise((resolve, reject) => {
  583. switch(name) {
  584. case 'user':
  585. resolve({
  586. channelIds: req.deviceId ? [`timeline:${req.accountId}`, `timeline:${req.accountId}:${req.deviceId}`] : [`timeline:${req.accountId}`],
  587. options: { needsFiltering: false, notificationOnly: false },
  588. });
  589. break;
  590. case 'user:notification':
  591. resolve({
  592. channelIds: [`timeline:${req.accountId}`],
  593. options: { needsFiltering: false, notificationOnly: true },
  594. });
  595. break;
  596. case 'public':
  597. resolve({
  598. channelIds: ['timeline:public'],
  599. options: { needsFiltering: true, notificationOnly: false },
  600. });
  601. break;
  602. case 'public:local':
  603. resolve({
  604. channelIds: ['timeline:public:local'],
  605. options: { needsFiltering: true, notificationOnly: false },
  606. });
  607. break;
  608. case 'public:remote':
  609. resolve({
  610. channelIds: ['timeline:public:remote'],
  611. options: { needsFiltering: true, notificationOnly: false },
  612. });
  613. break;
  614. case 'public:media':
  615. resolve({
  616. channelIds: ['timeline:public:media'],
  617. options: { needsFiltering: true, notificationOnly: false },
  618. });
  619. break;
  620. case 'public:local:media':
  621. resolve({
  622. channelIds: ['timeline:public:local:media'],
  623. options: { needsFiltering: true, notificationOnly: false },
  624. });
  625. break;
  626. case 'public:remote:media':
  627. resolve({
  628. channelIds: ['timeline:public:remote:media'],
  629. options: { needsFiltering: true, notificationOnly: false },
  630. });
  631. break;
  632. case 'direct':
  633. resolve({
  634. channelIds: [`timeline:direct:${req.accountId}`],
  635. options: { needsFiltering: false, notificationOnly: false },
  636. });
  637. break;
  638. case 'hashtag':
  639. if (!params.tag || params.tag.length === 0) {
  640. reject('No tag for stream provided');
  641. } else {
  642. resolve({
  643. channelIds: [`timeline:hashtag:${params.tag.toLowerCase()}`],
  644. options: { needsFiltering: true, notificationOnly: false },
  645. });
  646. }
  647. break;
  648. case 'hashtag:local':
  649. if (!params.tag || params.tag.length === 0) {
  650. reject('No tag for stream provided');
  651. } else {
  652. resolve({
  653. channelIds: [`timeline:hashtag:${params.tag.toLowerCase()}:local`],
  654. options: { needsFiltering: true, notificationOnly: false },
  655. });
  656. }
  657. break;
  658. case 'list':
  659. authorizeListAccess(params.list, req).then(() => {
  660. resolve({
  661. channelIds: [`timeline:list:${params.list}`],
  662. options: { needsFiltering: false, notificationOnly: false },
  663. });
  664. }).catch(() => {
  665. reject('Not authorized to stream this list');
  666. });
  667. break;
  668. default:
  669. reject('Unknown stream type');
  670. }
  671. });
  672. /**
  673. * @param {string} channelName
  674. * @param {StreamParams} params
  675. * @return {string[]}
  676. */
  677. const streamNameFromChannelName = (channelName, params) => {
  678. if (channelName === 'list') {
  679. return [channelName, params.list];
  680. } else if (['hashtag', 'hashtag:local'].includes(channelName)) {
  681. return [channelName, params.tag];
  682. } else {
  683. return [channelName];
  684. }
  685. };
  686. /**
  687. * @typedef WebSocketSession
  688. * @property {any} socket
  689. * @property {any} request
  690. * @property {Object.<string, { listener: function(string): void, stopHeartbeat: function(): void }>} subscriptions
  691. */
  692. /**
  693. * @param {WebSocketSession} session
  694. * @param {string} channelName
  695. * @param {StreamParams} params
  696. */
  697. const subscribeWebsocketToChannel = ({ socket, request, subscriptions }, channelName, params) =>
  698. checkScopes(request, channelName).then(() => channelNameToIds(request, channelName, params)).then(({ channelIds, options }) => {
  699. if (subscriptions[channelIds.join(';')]) {
  700. return;
  701. }
  702. const onSend = streamToWs(request, socket, streamNameFromChannelName(channelName, params));
  703. const stopHeartbeat = subscriptionHeartbeat(channelIds);
  704. const listener = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering, options.notificationOnly);
  705. subscriptions[channelIds.join(';')] = {
  706. listener,
  707. stopHeartbeat,
  708. };
  709. }).catch(err => {
  710. log.verbose(request.requestId, 'Subscription error:', err.toString());
  711. socket.send(JSON.stringify({ error: err.toString() }));
  712. });
  713. /**
  714. * @param {WebSocketSession} session
  715. * @param {string} channelName
  716. * @param {StreamParams} params
  717. */
  718. const unsubscribeWebsocketFromChannel = ({ socket, request, subscriptions }, channelName, params) =>
  719. channelNameToIds(request, channelName, params).then(({ channelIds }) => {
  720. log.verbose(request.requestId, `Ending stream from ${channelIds.join(', ')} for ${request.accountId}`);
  721. const subscription = subscriptions[channelIds.join(';')];
  722. if (!subscription) {
  723. return;
  724. }
  725. const { listener, stopHeartbeat } = subscription;
  726. channelIds.forEach(channelId => {
  727. unsubscribe(`${redisPrefix}${channelId}`, listener);
  728. });
  729. stopHeartbeat();
  730. delete subscriptions[channelIds.join(';')];
  731. }).catch(err => {
  732. log.verbose(request.requestId, 'Unsubscription error:', err);
  733. socket.send(JSON.stringify({ error: err.toString() }));
  734. });
  735. /**
  736. * @param {string|string[]} arrayOrString
  737. * @return {string}
  738. */
  739. const firstParam = arrayOrString => {
  740. if (Array.isArray(arrayOrString)) {
  741. return arrayOrString[0];
  742. } else {
  743. return arrayOrString;
  744. }
  745. };
  746. wss.on('connection', (ws, req) => {
  747. const location = url.parse(req.url, true);
  748. req.requestId = uuid.v4();
  749. req.remoteAddress = ws._socket.remoteAddress;
  750. /**
  751. * @type {WebSocketSession}
  752. */
  753. const session = {
  754. socket: ws,
  755. request: req,
  756. subscriptions: {},
  757. };
  758. const onEnd = () => {
  759. const keys = Object.keys(session.subscriptions);
  760. keys.forEach(channelIds => {
  761. const { listener, stopHeartbeat } = session.subscriptions[channelIds];
  762. channelIds.split(';').forEach(channelId => {
  763. unsubscribe(`${redisPrefix}${channelId}`, listener);
  764. });
  765. stopHeartbeat();
  766. });
  767. };
  768. ws.on('close', onEnd);
  769. ws.on('error', onEnd);
  770. ws.on('message', data => {
  771. const { type, stream, ...params } = JSON.parse(data);
  772. if (type === 'subscribe') {
  773. subscribeWebsocketToChannel(session, firstParam(stream), params);
  774. } else if (type === 'unsubscribe') {
  775. unsubscribeWebsocketFromChannel(session, firstParam(stream), params)
  776. } else {
  777. // Unknown action type
  778. }
  779. });
  780. if (location.query.stream) {
  781. subscribeWebsocketToChannel(session, firstParam(location.query.stream), location.query);
  782. }
  783. });
  784. wss.startAutoPing(30000);
  785. attachServerWithConfig(server, address => {
  786. log.info(`Worker ${workerId} now listening on ${address}`);
  787. });
  788. const onExit = () => {
  789. log.info(`Worker ${workerId} exiting, bye bye`);
  790. server.close();
  791. process.exit(0);
  792. };
  793. const onError = (err) => {
  794. log.error(err);
  795. server.close();
  796. process.exit(0);
  797. };
  798. process.on('SIGINT', onExit);
  799. process.on('SIGTERM', onExit);
  800. process.on('exit', onExit);
  801. process.on('uncaughtException', onError);
  802. };
  803. /**
  804. * @param {any} server
  805. * @param {function(string): void} [onSuccess]
  806. */
  807. const attachServerWithConfig = (server, onSuccess) => {
  808. if (process.env.SOCKET || process.env.PORT && isNaN(+process.env.PORT)) {
  809. server.listen(process.env.SOCKET || process.env.PORT, () => {
  810. if (onSuccess) {
  811. fs.chmodSync(server.address(), 0o666);
  812. onSuccess(server.address());
  813. }
  814. });
  815. } else {
  816. server.listen(+process.env.PORT || 4000, process.env.BIND || '127.0.0.1', () => {
  817. if (onSuccess) {
  818. onSuccess(`${server.address().address}:${server.address().port}`);
  819. }
  820. });
  821. }
  822. };
  823. /**
  824. * @param {function(Error=): void} onSuccess
  825. */
  826. const onPortAvailable = onSuccess => {
  827. const testServer = http.createServer();
  828. testServer.once('error', err => {
  829. onSuccess(err);
  830. });
  831. testServer.once('listening', () => {
  832. testServer.once('close', () => onSuccess());
  833. testServer.close();
  834. });
  835. attachServerWithConfig(testServer);
  836. };
  837. onPortAvailable(err => {
  838. if (err) {
  839. log.error('Could not start server, the port or socket is in use');
  840. return;
  841. }
  842. throng({
  843. workers: numWorkers,
  844. lifetime: Infinity,
  845. start: startWorker,
  846. master: startMaster,
  847. });
  848. });