You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

714 lines
21 KiB

  1. const os = require('os');
  2. const throng = require('throng');
  3. const dotenv = require('dotenv');
  4. const express = require('express');
  5. const http = require('http');
  6. const redis = require('redis');
  7. const pg = require('pg');
  8. const log = require('npmlog');
  9. const url = require('url');
  10. const { WebSocketServer } = require('@clusterws/cws');
  11. const uuid = require('uuid');
  12. const fs = require('fs');
  13. const env = process.env.NODE_ENV || 'development';
  14. const alwaysRequireAuth = process.env.WHITELIST_MODE === 'true' || process.env.AUTHORIZED_FETCH === 'true';
  15. dotenv.config({
  16. path: env === 'production' ? '.env.production' : '.env',
  17. });
  18. log.level = process.env.LOG_LEVEL || 'verbose';
  19. const dbUrlToConfig = (dbUrl) => {
  20. if (!dbUrl) {
  21. return {};
  22. }
  23. const params = url.parse(dbUrl, true);
  24. const config = {};
  25. if (params.auth) {
  26. [config.user, config.password] = params.auth.split(':');
  27. }
  28. if (params.hostname) {
  29. config.host = params.hostname;
  30. }
  31. if (params.port) {
  32. config.port = params.port;
  33. }
  34. if (params.pathname) {
  35. config.database = params.pathname.split('/')[1];
  36. }
  37. const ssl = params.query && params.query.ssl;
  38. if (ssl && ssl === 'true' || ssl === '1') {
  39. config.ssl = true;
  40. }
  41. return config;
  42. };
  43. const redisUrlToClient = (defaultConfig, redisUrl) => {
  44. const config = defaultConfig;
  45. if (!redisUrl) {
  46. return redis.createClient(config);
  47. }
  48. if (redisUrl.startsWith('unix://')) {
  49. return redis.createClient(redisUrl.slice(7), config);
  50. }
  51. return redis.createClient(Object.assign(config, {
  52. url: redisUrl,
  53. }));
  54. };
  55. const numWorkers = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1));
  56. const startMaster = () => {
  57. if (!process.env.SOCKET && process.env.PORT && isNaN(+process.env.PORT)) {
  58. log.warn('UNIX domain socket is now supported by using SOCKET. Please migrate from PORT hack.');
  59. }
  60. log.info(`Starting streaming API server master with ${numWorkers} workers`);
  61. };
  62. const startWorker = (workerId) => {
  63. log.info(`Starting worker ${workerId}`);
  64. const pgConfigs = {
  65. development: {
  66. user: process.env.DB_USER || pg.defaults.user,
  67. password: process.env.DB_PASS || pg.defaults.password,
  68. database: process.env.DB_NAME || 'mastodon_development',
  69. host: process.env.DB_HOST || pg.defaults.host,
  70. port: process.env.DB_PORT || pg.defaults.port,
  71. max: 10,
  72. },
  73. production: {
  74. user: process.env.DB_USER || 'mastodon',
  75. password: process.env.DB_PASS || '',
  76. database: process.env.DB_NAME || 'mastodon_production',
  77. host: process.env.DB_HOST || 'localhost',
  78. port: process.env.DB_PORT || 5432,
  79. max: 10,
  80. },
  81. };
  82. if (!!process.env.DB_SSLMODE && process.env.DB_SSLMODE !== 'disable') {
  83. pgConfigs.development.ssl = true;
  84. pgConfigs.production.ssl = true;
  85. }
  86. const app = express();
  87. app.set('trusted proxy', process.env.TRUSTED_PROXY_IP || 'loopback,uniquelocal');
  88. const pgPool = new pg.Pool(Object.assign(pgConfigs[env], dbUrlToConfig(process.env.DATABASE_URL)));
  89. const server = http.createServer(app);
  90. const redisNamespace = process.env.REDIS_NAMESPACE || null;
  91. const redisParams = {
  92. host: process.env.REDIS_HOST || '127.0.0.1',
  93. port: process.env.REDIS_PORT || 6379,
  94. db: process.env.REDIS_DB || 0,
  95. password: process.env.REDIS_PASSWORD,
  96. };
  97. if (redisNamespace) {
  98. redisParams.namespace = redisNamespace;
  99. }
  100. const redisPrefix = redisNamespace ? `${redisNamespace}:` : '';
  101. const redisSubscribeClient = redisUrlToClient(redisParams, process.env.REDIS_URL);
  102. const redisClient = redisUrlToClient(redisParams, process.env.REDIS_URL);
  103. const subs = {};
  104. redisSubscribeClient.on('message', (channel, message) => {
  105. const callbacks = subs[channel];
  106. log.silly(`New message on channel ${channel}`);
  107. if (!callbacks) {
  108. return;
  109. }
  110. callbacks.forEach(callback => callback(message));
  111. });
  112. const subscriptionHeartbeat = (channel) => {
  113. const interval = 6*60;
  114. const tellSubscribed = () => {
  115. redisClient.set(`${redisPrefix}subscribed:${channel}`, '1', 'EX', interval*3);
  116. };
  117. tellSubscribed();
  118. const heartbeat = setInterval(tellSubscribed, interval*1000);
  119. return () => {
  120. clearInterval(heartbeat);
  121. };
  122. };
  123. const subscribe = (channel, callback) => {
  124. log.silly(`Adding listener for ${channel}`);
  125. subs[channel] = subs[channel] || [];
  126. if (subs[channel].length === 0) {
  127. log.verbose(`Subscribe ${channel}`);
  128. redisSubscribeClient.subscribe(channel);
  129. }
  130. subs[channel].push(callback);
  131. };
  132. const unsubscribe = (channel, callback) => {
  133. log.silly(`Removing listener for ${channel}`);
  134. subs[channel] = subs[channel].filter(item => item !== callback);
  135. if (subs[channel].length === 0) {
  136. log.verbose(`Unsubscribe ${channel}`);
  137. redisSubscribeClient.unsubscribe(channel);
  138. }
  139. };
  140. const allowCrossDomain = (req, res, next) => {
  141. res.header('Access-Control-Allow-Origin', '*');
  142. res.header('Access-Control-Allow-Headers', 'Authorization, Accept, Cache-Control');
  143. res.header('Access-Control-Allow-Methods', 'GET, OPTIONS');
  144. next();
  145. };
  146. const setRequestId = (req, res, next) => {
  147. req.requestId = uuid.v4();
  148. res.header('X-Request-Id', req.requestId);
  149. next();
  150. };
  151. const setRemoteAddress = (req, res, next) => {
  152. req.remoteAddress = req.connection.remoteAddress;
  153. next();
  154. };
  155. const accountFromToken = (token, allowedScopes, req, next) => {
  156. pgPool.connect((err, client, done) => {
  157. if (err) {
  158. next(err);
  159. return;
  160. }
  161. client.query('SELECT oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id WHERE oauth_access_tokens.token = $1 AND oauth_access_tokens.revoked_at IS NULL LIMIT 1', [token], (err, result) => {
  162. done();
  163. if (err) {
  164. next(err);
  165. return;
  166. }
  167. if (result.rows.length === 0) {
  168. err = new Error('Invalid access token');
  169. err.statusCode = 401;
  170. next(err);
  171. return;
  172. }
  173. const scopes = result.rows[0].scopes.split(' ');
  174. if (allowedScopes.size > 0 && !scopes.some(scope => allowedScopes.includes(scope))) {
  175. err = new Error('Access token does not cover required scopes');
  176. err.statusCode = 401;
  177. next(err);
  178. return;
  179. }
  180. req.accountId = result.rows[0].account_id;
  181. req.chosenLanguages = result.rows[0].chosen_languages;
  182. req.allowNotifications = scopes.some(scope => ['read', 'read:notifications'].includes(scope));
  183. next();
  184. });
  185. });
  186. };
  187. const accountFromRequest = (req, next, required = true, allowedScopes = ['read']) => {
  188. const authorization = req.headers.authorization;
  189. const location = url.parse(req.url, true);
  190. const accessToken = location.query.access_token || req.headers['sec-websocket-protocol'];
  191. if (!authorization && !accessToken) {
  192. if (required) {
  193. const err = new Error('Missing access token');
  194. err.statusCode = 401;
  195. next(err);
  196. return;
  197. } else {
  198. next();
  199. return;
  200. }
  201. }
  202. const token = authorization ? authorization.replace(/^Bearer /, '') : accessToken;
  203. accountFromToken(token, allowedScopes, req, next);
  204. };
  205. const PUBLIC_STREAMS = [
  206. 'public',
  207. 'public:media',
  208. 'public:local',
  209. 'public:local:media',
  210. 'hashtag',
  211. 'hashtag:local',
  212. ];
  213. const wsVerifyClient = (info, cb) => {
  214. const location = url.parse(info.req.url, true);
  215. const authRequired = alwaysRequireAuth || !PUBLIC_STREAMS.some(stream => stream === location.query.stream);
  216. const allowedScopes = [];
  217. if (authRequired) {
  218. allowedScopes.push('read');
  219. if (location.query.stream === 'user:notification') {
  220. allowedScopes.push('read:notifications');
  221. } else {
  222. allowedScopes.push('read:statuses');
  223. }
  224. }
  225. accountFromRequest(info.req, err => {
  226. if (!err) {
  227. cb(true, undefined, undefined);
  228. } else {
  229. log.error(info.req.requestId, err.toString());
  230. cb(false, 401, 'Unauthorized');
  231. }
  232. }, authRequired, allowedScopes);
  233. };
  234. const PUBLIC_ENDPOINTS = [
  235. '/api/v1/streaming/public',
  236. '/api/v1/streaming/public/local',
  237. '/api/v1/streaming/hashtag',
  238. '/api/v1/streaming/hashtag/local',
  239. ];
  240. const authenticationMiddleware = (req, res, next) => {
  241. if (req.method === 'OPTIONS') {
  242. next();
  243. return;
  244. }
  245. const authRequired = alwaysRequireAuth || !PUBLIC_ENDPOINTS.some(endpoint => endpoint === req.path);
  246. const allowedScopes = [];
  247. if (authRequired) {
  248. allowedScopes.push('read');
  249. if (req.path === '/api/v1/streaming/user/notification') {
  250. allowedScopes.push('read:notifications');
  251. } else {
  252. allowedScopes.push('read:statuses');
  253. }
  254. }
  255. accountFromRequest(req, next, authRequired, allowedScopes);
  256. };
  257. const errorMiddleware = (err, req, res, {}) => {
  258. log.error(req.requestId, err.toString());
  259. res.writeHead(err.statusCode || 500, { 'Content-Type': 'application/json' });
  260. res.end(JSON.stringify({ error: err.statusCode ? err.toString() : 'An unexpected error occurred' }));
  261. };
  262. const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', ');
  263. const authorizeListAccess = (id, req, next) => {
  264. pgPool.connect((err, client, done) => {
  265. if (err) {
  266. next(false);
  267. return;
  268. }
  269. client.query('SELECT id, account_id FROM lists WHERE id = $1 LIMIT 1', [id], (err, result) => {
  270. done();
  271. if (err || result.rows.length === 0 || result.rows[0].account_id !== req.accountId) {
  272. next(false);
  273. return;
  274. }
  275. next(true);
  276. });
  277. });
  278. };
  279. const streamFrom = (id, req, output, attachCloseHandler, needsFiltering = false, notificationOnly = false) => {
  280. const accountId = req.accountId || req.remoteAddress;
  281. const streamType = notificationOnly ? ' (notification)' : '';
  282. log.verbose(req.requestId, `Starting stream from ${id} for ${accountId}${streamType}`);
  283. const listener = message => {
  284. const { event, payload, queued_at } = JSON.parse(message);
  285. const transmit = () => {
  286. const now = new Date().getTime();
  287. const delta = now - queued_at;
  288. const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload;
  289. log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload} Delay: ${delta}ms`);
  290. output(event, encodedPayload);
  291. };
  292. if (notificationOnly && event !== 'notification') {
  293. return;
  294. }
  295. if (event === 'notification' && !req.allowNotifications) {
  296. return;
  297. }
  298. // Only messages that may require filtering are statuses, since notifications
  299. // are already personalized and deletes do not matter
  300. if (!needsFiltering || event !== 'update') {
  301. transmit();
  302. return;
  303. }
  304. const unpackedPayload = payload;
  305. const targetAccountIds = [unpackedPayload.account.id].concat(unpackedPayload.mentions.map(item => item.id));
  306. const accountDomain = unpackedPayload.account.acct.split('@')[1];
  307. if (Array.isArray(req.chosenLanguages) && unpackedPayload.language !== null && req.chosenLanguages.indexOf(unpackedPayload.language) === -1) {
  308. log.silly(req.requestId, `Message ${unpackedPayload.id} filtered by language (${unpackedPayload.language})`);
  309. return;
  310. }
  311. // When the account is not logged in, it is not necessary to confirm the block or mute
  312. if (!req.accountId) {
  313. transmit();
  314. return;
  315. }
  316. pgPool.connect((err, client, done) => {
  317. if (err) {
  318. log.error(err);
  319. return;
  320. }
  321. const queries = [
  322. client.query(`SELECT 1 FROM blocks WHERE (account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)})) OR (account_id = $2 AND target_account_id = $1) UNION SELECT 1 FROM mutes WHERE account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, unpackedPayload.account.id].concat(targetAccountIds)),
  323. ];
  324. if (accountDomain) {
  325. queries.push(client.query('SELECT 1 FROM account_domain_blocks WHERE account_id = $1 AND domain = $2', [req.accountId, accountDomain]));
  326. }
  327. Promise.all(queries).then(values => {
  328. done();
  329. if (values[0].rows.length > 0 || (values.length > 1 && values[1].rows.length > 0)) {
  330. return;
  331. }
  332. transmit();
  333. }).catch(err => {
  334. done();
  335. log.error(err);
  336. });
  337. });
  338. };
  339. subscribe(`${redisPrefix}${id}`, listener);
  340. attachCloseHandler(`${redisPrefix}${id}`, listener);
  341. };
  342. // Setup stream output to HTTP
  343. const streamToHttp = (req, res) => {
  344. const accountId = req.accountId || req.remoteAddress;
  345. res.setHeader('Content-Type', 'text/event-stream');
  346. res.setHeader('Cache-Control', 'no-store');
  347. res.setHeader('Transfer-Encoding', 'chunked');
  348. res.write(':)\n');
  349. const heartbeat = setInterval(() => res.write(':thump\n'), 15000);
  350. req.on('close', () => {
  351. log.verbose(req.requestId, `Ending stream for ${accountId}`);
  352. clearInterval(heartbeat);
  353. });
  354. return (event, payload) => {
  355. res.write(`event: ${event}\n`);
  356. res.write(`data: ${payload}\n\n`);
  357. };
  358. };
  359. // Setup stream end for HTTP
  360. const streamHttpEnd = (req, closeHandler = false) => (id, listener) => {
  361. req.on('close', () => {
  362. unsubscribe(id, listener);
  363. if (closeHandler) {
  364. closeHandler();
  365. }
  366. });
  367. };
  368. // Setup stream output to WebSockets
  369. const streamToWs = (req, ws) => (event, payload) => {
  370. if (ws.readyState !== ws.OPEN) {
  371. log.error(req.requestId, 'Tried writing to closed socket');
  372. return;
  373. }
  374. ws.send(JSON.stringify({ event, payload }));
  375. };
  376. // Setup stream end for WebSockets
  377. const streamWsEnd = (req, ws, closeHandler = false) => (id, listener) => {
  378. const accountId = req.accountId || req.remoteAddress;
  379. ws.on('close', () => {
  380. log.verbose(req.requestId, `Ending stream for ${accountId}`);
  381. unsubscribe(id, listener);
  382. if (closeHandler) {
  383. closeHandler();
  384. }
  385. });
  386. ws.on('error', () => {
  387. log.verbose(req.requestId, `Ending stream for ${accountId}`);
  388. unsubscribe(id, listener);
  389. if (closeHandler) {
  390. closeHandler();
  391. }
  392. });
  393. };
  394. const httpNotFound = res => {
  395. res.writeHead(404, { 'Content-Type': 'application/json' });
  396. res.end(JSON.stringify({ error: 'Not found' }));
  397. };
  398. app.use(setRequestId);
  399. app.use(setRemoteAddress);
  400. app.use(allowCrossDomain);
  401. app.get('/api/v1/streaming/health', (req, res) => {
  402. res.writeHead(200, { 'Content-Type': 'text/plain' });
  403. res.end('OK');
  404. });
  405. app.use(authenticationMiddleware);
  406. app.use(errorMiddleware);
  407. app.get('/api/v1/streaming/user', (req, res) => {
  408. const channel = `timeline:${req.accountId}`;
  409. streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req, subscriptionHeartbeat(channel)));
  410. });
  411. app.get('/api/v1/streaming/user/notification', (req, res) => {
  412. streamFrom(`timeline:${req.accountId}`, req, streamToHttp(req, res), streamHttpEnd(req), false, true);
  413. });
  414. app.get('/api/v1/streaming/public', (req, res) => {
  415. const onlyMedia = req.query.only_media === '1' || req.query.only_media === 'true';
  416. const channel = onlyMedia ? 'timeline:public:media' : 'timeline:public';
  417. streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req), true);
  418. });
  419. app.get('/api/v1/streaming/public/local', (req, res) => {
  420. const onlyMedia = req.query.only_media === '1' || req.query.only_media === 'true';
  421. const channel = onlyMedia ? 'timeline:public:local:media' : 'timeline:public:local';
  422. streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req), true);
  423. });
  424. app.get('/api/v1/streaming/direct', (req, res) => {
  425. const channel = `timeline:direct:${req.accountId}`;
  426. streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req, subscriptionHeartbeat(channel)), true);
  427. });
  428. app.get('/api/v1/streaming/hashtag', (req, res) => {
  429. const { tag } = req.query;
  430. if (!tag || tag.length === 0) {
  431. httpNotFound(res);
  432. return;
  433. }
  434. streamFrom(`timeline:hashtag:${tag.toLowerCase()}`, req, streamToHttp(req, res), streamHttpEnd(req), true);
  435. });
  436. app.get('/api/v1/streaming/hashtag/local', (req, res) => {
  437. const { tag } = req.query;
  438. if (!tag || tag.length === 0) {
  439. httpNotFound(res);
  440. return;
  441. }
  442. streamFrom(`timeline:hashtag:${tag.toLowerCase()}:local`, req, streamToHttp(req, res), streamHttpEnd(req), true);
  443. });
  444. app.get('/api/v1/streaming/list', (req, res) => {
  445. const listId = req.query.list;
  446. authorizeListAccess(listId, req, authorized => {
  447. if (!authorized) {
  448. httpNotFound(res);
  449. return;
  450. }
  451. const channel = `timeline:list:${listId}`;
  452. streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req, subscriptionHeartbeat(channel)));
  453. });
  454. });
  455. const wss = new WebSocketServer({ server, verifyClient: wsVerifyClient });
  456. wss.on('connection', (ws, req) => {
  457. const location = url.parse(req.url, true);
  458. req.requestId = uuid.v4();
  459. req.remoteAddress = ws._socket.remoteAddress;
  460. let channel;
  461. switch(location.query.stream) {
  462. case 'user':
  463. channel = `timeline:${req.accountId}`;
  464. streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)));
  465. break;
  466. case 'user:notification':
  467. streamFrom(`timeline:${req.accountId}`, req, streamToWs(req, ws), streamWsEnd(req, ws), false, true);
  468. break;
  469. case 'public':
  470. streamFrom('timeline:public', req, streamToWs(req, ws), streamWsEnd(req, ws), true);
  471. break;
  472. case 'public:local':
  473. streamFrom('timeline:public:local', req, streamToWs(req, ws), streamWsEnd(req, ws), true);
  474. break;
  475. case 'public:media':
  476. streamFrom('timeline:public:media', req, streamToWs(req, ws), streamWsEnd(req, ws), true);
  477. break;
  478. case 'public:local:media':
  479. streamFrom('timeline:public:local:media', req, streamToWs(req, ws), streamWsEnd(req, ws), true);
  480. break;
  481. case 'direct':
  482. channel = `timeline:direct:${req.accountId}`;
  483. streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)), true);
  484. break;
  485. case 'hashtag':
  486. if (!location.query.tag || location.query.tag.length === 0) {
  487. ws.close();
  488. return;
  489. }
  490. streamFrom(`timeline:hashtag:${location.query.tag.toLowerCase()}`, req, streamToWs(req, ws), streamWsEnd(req, ws), true);
  491. break;
  492. case 'hashtag:local':
  493. if (!location.query.tag || location.query.tag.length === 0) {
  494. ws.close();
  495. return;
  496. }
  497. streamFrom(`timeline:hashtag:${location.query.tag.toLowerCase()}:local`, req, streamToWs(req, ws), streamWsEnd(req, ws), true);
  498. break;
  499. case 'list':
  500. const listId = location.query.list;
  501. authorizeListAccess(listId, req, authorized => {
  502. if (!authorized) {
  503. ws.close();
  504. return;
  505. }
  506. channel = `timeline:list:${listId}`;
  507. streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)));
  508. });
  509. break;
  510. default:
  511. ws.close();
  512. }
  513. });
  514. wss.startAutoPing(30000);
  515. attachServerWithConfig(server, address => {
  516. log.info(`Worker ${workerId} now listening on ${address}`);
  517. });
  518. const onExit = () => {
  519. log.info(`Worker ${workerId} exiting, bye bye`);
  520. server.close();
  521. process.exit(0);
  522. };
  523. const onError = (err) => {
  524. log.error(err);
  525. server.close();
  526. process.exit(0);
  527. };
  528. process.on('SIGINT', onExit);
  529. process.on('SIGTERM', onExit);
  530. process.on('exit', onExit);
  531. process.on('uncaughtException', onError);
  532. };
  533. const attachServerWithConfig = (server, onSuccess) => {
  534. if (process.env.SOCKET || process.env.PORT && isNaN(+process.env.PORT)) {
  535. server.listen(process.env.SOCKET || process.env.PORT, () => {
  536. if (onSuccess) {
  537. fs.chmodSync(server.address(), 0o666);
  538. onSuccess(server.address());
  539. }
  540. });
  541. } else {
  542. server.listen(+process.env.PORT || 4000, process.env.BIND || '127.0.0.1', () => {
  543. if (onSuccess) {
  544. onSuccess(`${server.address().address}:${server.address().port}`);
  545. }
  546. });
  547. }
  548. };
  549. const onPortAvailable = onSuccess => {
  550. const testServer = http.createServer();
  551. testServer.once('error', err => {
  552. onSuccess(err);
  553. });
  554. testServer.once('listening', () => {
  555. testServer.once('close', () => onSuccess());
  556. testServer.close();
  557. });
  558. attachServerWithConfig(testServer);
  559. };
  560. onPortAvailable(err => {
  561. if (err) {
  562. log.error('Could not start server, the port or socket is in use');
  563. return;
  564. }
  565. throng({
  566. workers: numWorkers,
  567. lifetime: Infinity,
  568. start: startWorker,
  569. master: startMaster,
  570. });
  571. });