You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

720 lines
21 KiB

  1. const os = require('os');
  2. const throng = require('throng');
  3. const dotenv = require('dotenv');
  4. const express = require('express');
  5. const http = require('http');
  6. const redis = require('redis');
  7. const pg = require('pg');
  8. const log = require('npmlog');
  9. const url = require('url');
  10. const { WebSocketServer } = require('@clusterws/cws');
  11. const uuid = require('uuid');
  12. const fs = require('fs');
  13. const env = process.env.NODE_ENV || 'development';
  14. const alwaysRequireAuth = process.env.WHITELIST_MODE === 'true' || process.env.AUTHORIZED_FETCH === 'true';
  15. dotenv.config({
  16. path: env === 'production' ? '.env.production' : '.env',
  17. });
  18. log.level = process.env.LOG_LEVEL || 'verbose';
  19. const dbUrlToConfig = (dbUrl) => {
  20. if (!dbUrl) {
  21. return {};
  22. }
  23. const params = url.parse(dbUrl, true);
  24. const config = {};
  25. if (params.auth) {
  26. [config.user, config.password] = params.auth.split(':');
  27. }
  28. if (params.hostname) {
  29. config.host = params.hostname;
  30. }
  31. if (params.port) {
  32. config.port = params.port;
  33. }
  34. if (params.pathname) {
  35. config.database = params.pathname.split('/')[1];
  36. }
  37. const ssl = params.query && params.query.ssl;
  38. if (ssl && ssl === 'true' || ssl === '1') {
  39. config.ssl = true;
  40. }
  41. return config;
  42. };
  43. const redisUrlToClient = (defaultConfig, redisUrl) => {
  44. const config = defaultConfig;
  45. if (!redisUrl) {
  46. return redis.createClient(config);
  47. }
  48. if (redisUrl.startsWith('unix://')) {
  49. return redis.createClient(redisUrl.slice(7), config);
  50. }
  51. return redis.createClient(Object.assign(config, {
  52. url: redisUrl,
  53. }));
  54. };
  55. const numWorkers = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1));
  56. const startMaster = () => {
  57. if (!process.env.SOCKET && process.env.PORT && isNaN(+process.env.PORT)) {
  58. log.warn('UNIX domain socket is now supported by using SOCKET. Please migrate from PORT hack.');
  59. }
  60. log.info(`Starting streaming API server master with ${numWorkers} workers`);
  61. };
  62. const startWorker = (workerId) => {
  63. log.info(`Starting worker ${workerId}`);
  64. const pgConfigs = {
  65. development: {
  66. user: process.env.DB_USER || pg.defaults.user,
  67. password: process.env.DB_PASS || pg.defaults.password,
  68. database: process.env.DB_NAME || 'mastodon_development',
  69. host: process.env.DB_HOST || pg.defaults.host,
  70. port: process.env.DB_PORT || pg.defaults.port,
  71. max: 10,
  72. },
  73. production: {
  74. user: process.env.DB_USER || 'mastodon',
  75. password: process.env.DB_PASS || '',
  76. database: process.env.DB_NAME || 'mastodon_production',
  77. host: process.env.DB_HOST || 'localhost',
  78. port: process.env.DB_PORT || 5432,
  79. max: 10,
  80. },
  81. };
  82. if (!!process.env.DB_SSLMODE && process.env.DB_SSLMODE !== 'disable') {
  83. pgConfigs.development.ssl = true;
  84. pgConfigs.production.ssl = true;
  85. }
  86. const app = express();
  87. app.set('trusted proxy', process.env.TRUSTED_PROXY_IP || 'loopback,uniquelocal');
  88. const pgPool = new pg.Pool(Object.assign(pgConfigs[env], dbUrlToConfig(process.env.DATABASE_URL)));
  89. const server = http.createServer(app);
  90. const redisNamespace = process.env.REDIS_NAMESPACE || null;
  91. const redisParams = {
  92. host: process.env.REDIS_HOST || '127.0.0.1',
  93. port: process.env.REDIS_PORT || 6379,
  94. db: process.env.REDIS_DB || 0,
  95. password: process.env.REDIS_PASSWORD,
  96. };
  97. if (redisNamespace) {
  98. redisParams.namespace = redisNamespace;
  99. }
  100. const redisPrefix = redisNamespace ? `${redisNamespace}:` : '';
  101. const redisSubscribeClient = redisUrlToClient(redisParams, process.env.REDIS_URL);
  102. const redisClient = redisUrlToClient(redisParams, process.env.REDIS_URL);
  103. const subs = {};
  104. redisSubscribeClient.on('message', (channel, message) => {
  105. const callbacks = subs[channel];
  106. log.silly(`New message on channel ${channel}`);
  107. if (!callbacks) {
  108. return;
  109. }
  110. callbacks.forEach(callback => callback(message));
  111. });
  112. const subscriptionHeartbeat = (channel) => {
  113. const interval = 6*60;
  114. const tellSubscribed = () => {
  115. redisClient.set(`${redisPrefix}subscribed:${channel}`, '1', 'EX', interval*3);
  116. };
  117. tellSubscribed();
  118. const heartbeat = setInterval(tellSubscribed, interval*1000);
  119. return () => {
  120. clearInterval(heartbeat);
  121. };
  122. };
  123. const subscribe = (channel, callback) => {
  124. log.silly(`Adding listener for ${channel}`);
  125. subs[channel] = subs[channel] || [];
  126. if (subs[channel].length === 0) {
  127. log.verbose(`Subscribe ${channel}`);
  128. redisSubscribeClient.subscribe(channel);
  129. }
  130. subs[channel].push(callback);
  131. };
  132. const unsubscribe = (channel, callback) => {
  133. log.silly(`Removing listener for ${channel}`);
  134. subs[channel] = subs[channel].filter(item => item !== callback);
  135. if (subs[channel].length === 0) {
  136. log.verbose(`Unsubscribe ${channel}`);
  137. redisSubscribeClient.unsubscribe(channel);
  138. }
  139. };
  140. const allowCrossDomain = (req, res, next) => {
  141. res.header('Access-Control-Allow-Origin', '*');
  142. res.header('Access-Control-Allow-Headers', 'Authorization, Accept, Cache-Control');
  143. res.header('Access-Control-Allow-Methods', 'GET, OPTIONS');
  144. next();
  145. };
  146. const setRequestId = (req, res, next) => {
  147. req.requestId = uuid.v4();
  148. res.header('X-Request-Id', req.requestId);
  149. next();
  150. };
  151. const setRemoteAddress = (req, res, next) => {
  152. req.remoteAddress = req.connection.remoteAddress;
  153. next();
  154. };
  155. const accountFromToken = (token, allowedScopes, req, next) => {
  156. pgPool.connect((err, client, done) => {
  157. if (err) {
  158. next(err);
  159. return;
  160. }
  161. client.query('SELECT oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id WHERE oauth_access_tokens.token = $1 AND oauth_access_tokens.revoked_at IS NULL LIMIT 1', [token], (err, result) => {
  162. done();
  163. if (err) {
  164. next(err);
  165. return;
  166. }
  167. if (result.rows.length === 0) {
  168. err = new Error('Invalid access token');
  169. err.statusCode = 401;
  170. next(err);
  171. return;
  172. }
  173. const scopes = result.rows[0].scopes.split(' ');
  174. if (allowedScopes.size > 0 && !scopes.some(scope => allowedScopes.includes(scope))) {
  175. err = new Error('Access token does not cover required scopes');
  176. err.statusCode = 401;
  177. next(err);
  178. return;
  179. }
  180. req.accountId = result.rows[0].account_id;
  181. req.chosenLanguages = result.rows[0].chosen_languages;
  182. req.allowNotifications = scopes.some(scope => ['read', 'read:notifications'].includes(scope));
  183. next();
  184. });
  185. });
  186. };
  187. const accountFromRequest = (req, next, required = true, allowedScopes = ['read']) => {
  188. const authorization = req.headers.authorization;
  189. const location = url.parse(req.url, true);
  190. const accessToken = location.query.access_token || req.headers['sec-websocket-protocol'];
  191. if (!authorization && !accessToken) {
  192. if (required) {
  193. const err = new Error('Missing access token');
  194. err.statusCode = 401;
  195. next(err);
  196. return;
  197. } else {
  198. next();
  199. return;
  200. }
  201. }
  202. const token = authorization ? authorization.replace(/^Bearer /, '') : accessToken;
  203. accountFromToken(token, allowedScopes, req, next);
  204. };
  205. const PUBLIC_STREAMS = [
  206. 'public',
  207. 'public:media',
  208. 'public:local',
  209. 'public:local:media',
  210. 'hashtag',
  211. 'hashtag:local',
  212. ];
  213. const wsVerifyClient = (info, cb) => {
  214. const location = url.parse(info.req.url, true);
  215. const authRequired = alwaysRequireAuth || !PUBLIC_STREAMS.some(stream => stream === location.query.stream);
  216. const allowedScopes = [];
  217. if (authRequired) {
  218. allowedScopes.push('read');
  219. if (location.query.stream === 'user:notification') {
  220. allowedScopes.push('read:notifications');
  221. } else {
  222. allowedScopes.push('read:statuses');
  223. }
  224. }
  225. accountFromRequest(info.req, err => {
  226. if (!err) {
  227. cb(true, undefined, undefined);
  228. } else {
  229. log.error(info.req.requestId, err.toString());
  230. cb(false, 401, 'Unauthorized');
  231. }
  232. }, authRequired, allowedScopes);
  233. };
  234. const PUBLIC_ENDPOINTS = [
  235. '/api/v1/streaming/public',
  236. '/api/v1/streaming/public/local',
  237. '/api/v1/streaming/hashtag',
  238. '/api/v1/streaming/hashtag/local',
  239. ];
  240. const authenticationMiddleware = (req, res, next) => {
  241. if (req.method === 'OPTIONS') {
  242. next();
  243. return;
  244. }
  245. const authRequired = alwaysRequireAuth || !PUBLIC_ENDPOINTS.some(endpoint => endpoint === req.path);
  246. const allowedScopes = [];
  247. if (authRequired) {
  248. allowedScopes.push('read');
  249. if (req.path === '/api/v1/streaming/user/notification') {
  250. allowedScopes.push('read:notifications');
  251. } else {
  252. allowedScopes.push('read:statuses');
  253. }
  254. }
  255. accountFromRequest(req, next, authRequired, allowedScopes);
  256. };
  257. const errorMiddleware = (err, req, res, {}) => {
  258. log.error(req.requestId, err.toString());
  259. res.writeHead(err.statusCode || 500, { 'Content-Type': 'application/json' });
  260. res.end(JSON.stringify({ error: err.statusCode ? err.toString() : 'An unexpected error occurred' }));
  261. };
  262. const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', ');
  263. const authorizeListAccess = (id, req, next) => {
  264. pgPool.connect((err, client, done) => {
  265. if (err) {
  266. next(false);
  267. return;
  268. }
  269. client.query('SELECT id, account_id FROM lists WHERE id = $1 LIMIT 1', [id], (err, result) => {
  270. done();
  271. if (err || result.rows.length === 0 || result.rows[0].account_id !== req.accountId) {
  272. next(false);
  273. return;
  274. }
  275. next(true);
  276. });
  277. });
  278. };
  279. const streamFrom = (id, req, output, attachCloseHandler, needsFiltering = false, notificationOnly = false) => {
  280. const accountId = req.accountId || req.remoteAddress;
  281. const streamType = notificationOnly ? ' (notification)' : '';
  282. log.verbose(req.requestId, `Starting stream from ${id} for ${accountId}${streamType}`);
  283. const listener = message => {
  284. const { event, payload, queued_at } = JSON.parse(message);
  285. const transmit = () => {
  286. const now = new Date().getTime();
  287. const delta = now - queued_at;
  288. const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload;
  289. log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload} Delay: ${delta}ms`);
  290. output(event, encodedPayload);
  291. };
  292. if (notificationOnly && event !== 'notification') {
  293. return;
  294. }
  295. if (event === 'notification' && !req.allowNotifications) {
  296. return;
  297. }
  298. // Only send local-only statuses to logged-in users
  299. if (event === 'update' && payload.local_only && !req.accountId) {
  300. log.silly(req.requestId, `Message ${payload.id} filtered because it was local-only`);
  301. return;
  302. }
  303. // Only messages that may require filtering are statuses, since notifications
  304. // are already personalized and deletes do not matter
  305. if (!needsFiltering || event !== 'update') {
  306. transmit();
  307. return;
  308. }
  309. const unpackedPayload = payload;
  310. const targetAccountIds = [unpackedPayload.account.id].concat(unpackedPayload.mentions.map(item => item.id));
  311. const accountDomain = unpackedPayload.account.acct.split('@')[1];
  312. if (Array.isArray(req.chosenLanguages) && unpackedPayload.language !== null && req.chosenLanguages.indexOf(unpackedPayload.language) === -1) {
  313. log.silly(req.requestId, `Message ${unpackedPayload.id} filtered by language (${unpackedPayload.language})`);
  314. return;
  315. }
  316. // When the account is not logged in, it is not necessary to confirm the block or mute
  317. if (!req.accountId) {
  318. transmit();
  319. return;
  320. }
  321. pgPool.connect((err, client, done) => {
  322. if (err) {
  323. log.error(err);
  324. return;
  325. }
  326. const queries = [
  327. client.query(`SELECT 1 FROM blocks WHERE (account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)})) OR (account_id = $2 AND target_account_id = $1) UNION SELECT 1 FROM mutes WHERE account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, unpackedPayload.account.id].concat(targetAccountIds)),
  328. ];
  329. if (accountDomain) {
  330. queries.push(client.query('SELECT 1 FROM account_domain_blocks WHERE account_id = $1 AND domain = $2', [req.accountId, accountDomain]));
  331. }
  332. Promise.all(queries).then(values => {
  333. done();
  334. if (values[0].rows.length > 0 || (values.length > 1 && values[1].rows.length > 0)) {
  335. return;
  336. }
  337. transmit();
  338. }).catch(err => {
  339. done();
  340. log.error(err);
  341. });
  342. });
  343. };
  344. subscribe(`${redisPrefix}${id}`, listener);
  345. attachCloseHandler(`${redisPrefix}${id}`, listener);
  346. };
  347. // Setup stream output to HTTP
  348. const streamToHttp = (req, res) => {
  349. const accountId = req.accountId || req.remoteAddress;
  350. res.setHeader('Content-Type', 'text/event-stream');
  351. res.setHeader('Cache-Control', 'no-store');
  352. res.setHeader('Transfer-Encoding', 'chunked');
  353. res.write(':)\n');
  354. const heartbeat = setInterval(() => res.write(':thump\n'), 15000);
  355. req.on('close', () => {
  356. log.verbose(req.requestId, `Ending stream for ${accountId}`);
  357. clearInterval(heartbeat);
  358. });
  359. return (event, payload) => {
  360. res.write(`event: ${event}\n`);
  361. res.write(`data: ${payload}\n\n`);
  362. };
  363. };
  364. // Setup stream end for HTTP
  365. const streamHttpEnd = (req, closeHandler = false) => (id, listener) => {
  366. req.on('close', () => {
  367. unsubscribe(id, listener);
  368. if (closeHandler) {
  369. closeHandler();
  370. }
  371. });
  372. };
  373. // Setup stream output to WebSockets
  374. const streamToWs = (req, ws) => (event, payload) => {
  375. if (ws.readyState !== ws.OPEN) {
  376. log.error(req.requestId, 'Tried writing to closed socket');
  377. return;
  378. }
  379. ws.send(JSON.stringify({ event, payload }));
  380. };
  381. // Setup stream end for WebSockets
  382. const streamWsEnd = (req, ws, closeHandler = false) => (id, listener) => {
  383. const accountId = req.accountId || req.remoteAddress;
  384. ws.on('close', () => {
  385. log.verbose(req.requestId, `Ending stream for ${accountId}`);
  386. unsubscribe(id, listener);
  387. if (closeHandler) {
  388. closeHandler();
  389. }
  390. });
  391. ws.on('error', () => {
  392. log.verbose(req.requestId, `Ending stream for ${accountId}`);
  393. unsubscribe(id, listener);
  394. if (closeHandler) {
  395. closeHandler();
  396. }
  397. });
  398. };
  399. const httpNotFound = res => {
  400. res.writeHead(404, { 'Content-Type': 'application/json' });
  401. res.end(JSON.stringify({ error: 'Not found' }));
  402. };
  403. app.use(setRequestId);
  404. app.use(setRemoteAddress);
  405. app.use(allowCrossDomain);
  406. app.get('/api/v1/streaming/health', (req, res) => {
  407. res.writeHead(200, { 'Content-Type': 'text/plain' });
  408. res.end('OK');
  409. });
  410. app.use(authenticationMiddleware);
  411. app.use(errorMiddleware);
  412. app.get('/api/v1/streaming/user', (req, res) => {
  413. const channel = `timeline:${req.accountId}`;
  414. streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req, subscriptionHeartbeat(channel)));
  415. });
  416. app.get('/api/v1/streaming/user/notification', (req, res) => {
  417. streamFrom(`timeline:${req.accountId}`, req, streamToHttp(req, res), streamHttpEnd(req), false, true);
  418. });
  419. app.get('/api/v1/streaming/public', (req, res) => {
  420. const onlyMedia = req.query.only_media === '1' || req.query.only_media === 'true';
  421. const channel = onlyMedia ? 'timeline:public:media' : 'timeline:public';
  422. streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req), true);
  423. });
  424. app.get('/api/v1/streaming/public/local', (req, res) => {
  425. const onlyMedia = req.query.only_media === '1' || req.query.only_media === 'true';
  426. const channel = onlyMedia ? 'timeline:public:local:media' : 'timeline:public:local';
  427. streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req), true);
  428. });
  429. app.get('/api/v1/streaming/direct', (req, res) => {
  430. const channel = `timeline:direct:${req.accountId}`;
  431. streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req, subscriptionHeartbeat(channel)), true);
  432. });
  433. app.get('/api/v1/streaming/hashtag', (req, res) => {
  434. const { tag } = req.query;
  435. if (!tag || tag.length === 0) {
  436. httpNotFound(res);
  437. return;
  438. }
  439. streamFrom(`timeline:hashtag:${tag.toLowerCase()}`, req, streamToHttp(req, res), streamHttpEnd(req), true);
  440. });
  441. app.get('/api/v1/streaming/hashtag/local', (req, res) => {
  442. const { tag } = req.query;
  443. if (!tag || tag.length === 0) {
  444. httpNotFound(res);
  445. return;
  446. }
  447. streamFrom(`timeline:hashtag:${tag.toLowerCase()}:local`, req, streamToHttp(req, res), streamHttpEnd(req), true);
  448. });
  449. app.get('/api/v1/streaming/list', (req, res) => {
  450. const listId = req.query.list;
  451. authorizeListAccess(listId, req, authorized => {
  452. if (!authorized) {
  453. httpNotFound(res);
  454. return;
  455. }
  456. const channel = `timeline:list:${listId}`;
  457. streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req, subscriptionHeartbeat(channel)));
  458. });
  459. });
  460. const wss = new WebSocketServer({ server, verifyClient: wsVerifyClient });
  461. wss.on('connection', (ws, req) => {
  462. const location = url.parse(req.url, true);
  463. req.requestId = uuid.v4();
  464. req.remoteAddress = ws._socket.remoteAddress;
  465. let channel;
  466. switch(location.query.stream) {
  467. case 'user':
  468. channel = `timeline:${req.accountId}`;
  469. streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)));
  470. break;
  471. case 'user:notification':
  472. streamFrom(`timeline:${req.accountId}`, req, streamToWs(req, ws), streamWsEnd(req, ws), false, true);
  473. break;
  474. case 'public':
  475. streamFrom('timeline:public', req, streamToWs(req, ws), streamWsEnd(req, ws), true);
  476. break;
  477. case 'public:local':
  478. streamFrom('timeline:public:local', req, streamToWs(req, ws), streamWsEnd(req, ws), true);
  479. break;
  480. case 'public:media':
  481. streamFrom('timeline:public:media', req, streamToWs(req, ws), streamWsEnd(req, ws), true);
  482. break;
  483. case 'public:local:media':
  484. streamFrom('timeline:public:local:media', req, streamToWs(req, ws), streamWsEnd(req, ws), true);
  485. break;
  486. case 'direct':
  487. channel = `timeline:direct:${req.accountId}`;
  488. streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)), true);
  489. break;
  490. case 'hashtag':
  491. if (!location.query.tag || location.query.tag.length === 0) {
  492. ws.close();
  493. return;
  494. }
  495. streamFrom(`timeline:hashtag:${location.query.tag.toLowerCase()}`, req, streamToWs(req, ws), streamWsEnd(req, ws), true);
  496. break;
  497. case 'hashtag:local':
  498. if (!location.query.tag || location.query.tag.length === 0) {
  499. ws.close();
  500. return;
  501. }
  502. streamFrom(`timeline:hashtag:${location.query.tag.toLowerCase()}:local`, req, streamToWs(req, ws), streamWsEnd(req, ws), true);
  503. break;
  504. case 'list':
  505. const listId = location.query.list;
  506. authorizeListAccess(listId, req, authorized => {
  507. if (!authorized) {
  508. ws.close();
  509. return;
  510. }
  511. channel = `timeline:list:${listId}`;
  512. streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)));
  513. });
  514. break;
  515. default:
  516. ws.close();
  517. }
  518. });
  519. wss.startAutoPing(30000);
  520. attachServerWithConfig(server, address => {
  521. log.info(`Worker ${workerId} now listening on ${address}`);
  522. });
  523. const onExit = () => {
  524. log.info(`Worker ${workerId} exiting, bye bye`);
  525. server.close();
  526. process.exit(0);
  527. };
  528. const onError = (err) => {
  529. log.error(err);
  530. server.close();
  531. process.exit(0);
  532. };
  533. process.on('SIGINT', onExit);
  534. process.on('SIGTERM', onExit);
  535. process.on('exit', onExit);
  536. process.on('uncaughtException', onError);
  537. };
  538. const attachServerWithConfig = (server, onSuccess) => {
  539. if (process.env.SOCKET || process.env.PORT && isNaN(+process.env.PORT)) {
  540. server.listen(process.env.SOCKET || process.env.PORT, () => {
  541. if (onSuccess) {
  542. fs.chmodSync(server.address(), 0o666);
  543. onSuccess(server.address());
  544. }
  545. });
  546. } else {
  547. server.listen(+process.env.PORT || 4000, process.env.BIND || '127.0.0.1', () => {
  548. if (onSuccess) {
  549. onSuccess(`${server.address().address}:${server.address().port}`);
  550. }
  551. });
  552. }
  553. };
  554. const onPortAvailable = onSuccess => {
  555. const testServer = http.createServer();
  556. testServer.once('error', err => {
  557. onSuccess(err);
  558. });
  559. testServer.once('listening', () => {
  560. testServer.once('close', () => onSuccess());
  561. testServer.close();
  562. });
  563. attachServerWithConfig(testServer);
  564. };
  565. onPortAvailable(err => {
  566. if (err) {
  567. log.error('Could not start server, the port or socket is in use');
  568. return;
  569. }
  570. throng({
  571. workers: numWorkers,
  572. lifetime: Infinity,
  573. start: startWorker,
  574. master: startMaster,
  575. });
  576. });