You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

717 lines
21 KiB

  1. const os = require('os');
  2. const throng = require('throng');
  3. const dotenv = require('dotenv');
  4. const express = require('express');
  5. const http = require('http');
  6. const redis = require('redis');
  7. const pg = require('pg');
  8. const log = require('npmlog');
  9. const url = require('url');
  10. const { WebSocketServer } = require('@clusterws/cws');
  11. const uuid = require('uuid');
  12. const fs = require('fs');
  13. const env = process.env.NODE_ENV || 'development';
  14. const alwaysRequireAuth = process.env.WHITELIST_MODE === 'true' || process.env.AUTHORIZED_FETCH === 'true';
  15. dotenv.config({
  16. path: env === 'production' ? '.env.production' : '.env',
  17. });
  18. log.level = process.env.LOG_LEVEL || 'verbose';
  19. const dbUrlToConfig = (dbUrl) => {
  20. if (!dbUrl) {
  21. return {};
  22. }
  23. const params = url.parse(dbUrl, true);
  24. const config = {};
  25. if (params.auth) {
  26. [config.user, config.password] = params.auth.split(':');
  27. }
  28. if (params.hostname) {
  29. config.host = params.hostname;
  30. }
  31. if (params.port) {
  32. config.port = params.port;
  33. }
  34. if (params.pathname) {
  35. config.database = params.pathname.split('/')[1];
  36. }
  37. const ssl = params.query && params.query.ssl;
  38. if (ssl && ssl === 'true' || ssl === '1') {
  39. config.ssl = true;
  40. }
  41. return config;
  42. };
  43. const redisUrlToClient = (defaultConfig, redisUrl) => {
  44. const config = defaultConfig;
  45. if (!redisUrl) {
  46. return redis.createClient(config);
  47. }
  48. if (redisUrl.startsWith('unix://')) {
  49. return redis.createClient(redisUrl.slice(7), config);
  50. }
  51. return redis.createClient(Object.assign(config, {
  52. url: redisUrl,
  53. }));
  54. };
  55. const numWorkers = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1));
  56. const startMaster = () => {
  57. if (!process.env.SOCKET && process.env.PORT && isNaN(+process.env.PORT)) {
  58. log.warn('UNIX domain socket is now supported by using SOCKET. Please migrate from PORT hack.');
  59. }
  60. log.info(`Starting streaming API server master with ${numWorkers} workers`);
  61. };
  62. const startWorker = (workerId) => {
  63. log.info(`Starting worker ${workerId}`);
  64. const pgConfigs = {
  65. development: {
  66. user: process.env.DB_USER || pg.defaults.user,
  67. password: process.env.DB_PASS || pg.defaults.password,
  68. database: process.env.DB_NAME || 'mastodon_development',
  69. host: process.env.DB_HOST || pg.defaults.host,
  70. port: process.env.DB_PORT || pg.defaults.port,
  71. max: 10,
  72. },
  73. production: {
  74. user: process.env.DB_USER || 'mastodon',
  75. password: process.env.DB_PASS || '',
  76. database: process.env.DB_NAME || 'mastodon_production',
  77. host: process.env.DB_HOST || 'localhost',
  78. port: process.env.DB_PORT || 5432,
  79. max: 10,
  80. },
  81. };
  82. if (!!process.env.DB_SSLMODE && process.env.DB_SSLMODE !== 'disable') {
  83. pgConfigs.development.ssl = true;
  84. pgConfigs.production.ssl = true;
  85. }
  86. const app = express();
  87. app.set('trusted proxy', process.env.TRUSTED_PROXY_IP || 'loopback,uniquelocal');
  88. const pgPool = new pg.Pool(Object.assign(pgConfigs[env], dbUrlToConfig(process.env.DATABASE_URL)));
  89. const server = http.createServer(app);
  90. const redisNamespace = process.env.REDIS_NAMESPACE || null;
  91. const redisParams = {
  92. host: process.env.REDIS_HOST || '127.0.0.1',
  93. port: process.env.REDIS_PORT || 6379,
  94. db: process.env.REDIS_DB || 0,
  95. password: process.env.REDIS_PASSWORD,
  96. };
  97. if (redisNamespace) {
  98. redisParams.namespace = redisNamespace;
  99. }
  100. const redisPrefix = redisNamespace ? `${redisNamespace}:` : '';
  101. const redisSubscribeClient = redisUrlToClient(redisParams, process.env.REDIS_URL);
  102. const redisClient = redisUrlToClient(redisParams, process.env.REDIS_URL);
  103. const subs = {};
  104. redisSubscribeClient.on('message', (channel, message) => {
  105. const callbacks = subs[channel];
  106. log.silly(`New message on channel ${channel}`);
  107. if (!callbacks) {
  108. return;
  109. }
  110. callbacks.forEach(callback => callback(message));
  111. });
  112. const subscriptionHeartbeat = (channel) => {
  113. const interval = 6*60;
  114. const tellSubscribed = () => {
  115. redisClient.set(`${redisPrefix}subscribed:${channel}`, '1', 'EX', interval*3);
  116. };
  117. tellSubscribed();
  118. const heartbeat = setInterval(tellSubscribed, interval*1000);
  119. return () => {
  120. clearInterval(heartbeat);
  121. };
  122. };
  123. const subscribe = (channel, callback) => {
  124. log.silly(`Adding listener for ${channel}`);
  125. subs[channel] = subs[channel] || [];
  126. if (subs[channel].length === 0) {
  127. log.verbose(`Subscribe ${channel}`);
  128. redisSubscribeClient.subscribe(channel);
  129. }
  130. subs[channel].push(callback);
  131. };
  132. const unsubscribe = (channel, callback) => {
  133. log.silly(`Removing listener for ${channel}`);
  134. subs[channel] = subs[channel].filter(item => item !== callback);
  135. if (subs[channel].length === 0) {
  136. log.verbose(`Unsubscribe ${channel}`);
  137. redisSubscribeClient.unsubscribe(channel);
  138. }
  139. };
  140. const allowCrossDomain = (req, res, next) => {
  141. res.header('Access-Control-Allow-Origin', '*');
  142. res.header('Access-Control-Allow-Headers', 'Authorization, Accept, Cache-Control');
  143. res.header('Access-Control-Allow-Methods', 'GET, OPTIONS');
  144. next();
  145. };
  146. const setRequestId = (req, res, next) => {
  147. req.requestId = uuid.v4();
  148. res.header('X-Request-Id', req.requestId);
  149. next();
  150. };
  151. const setRemoteAddress = (req, res, next) => {
  152. req.remoteAddress = req.connection.remoteAddress;
  153. next();
  154. };
  155. const accountFromToken = (token, allowedScopes, req, next) => {
  156. pgPool.connect((err, client, done) => {
  157. if (err) {
  158. next(err);
  159. return;
  160. }
  161. client.query('SELECT oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id WHERE oauth_access_tokens.token = $1 AND oauth_access_tokens.revoked_at IS NULL LIMIT 1', [token], (err, result) => {
  162. done();
  163. if (err) {
  164. next(err);
  165. return;
  166. }
  167. if (result.rows.length === 0) {
  168. err = new Error('Invalid access token');
  169. err.statusCode = 401;
  170. next(err);
  171. return;
  172. }
  173. const scopes = result.rows[0].scopes.split(' ');
  174. if (allowedScopes.size > 0 && !scopes.some(scope => allowedScopes.includes(scope))) {
  175. err = new Error('Access token does not cover required scopes');
  176. err.statusCode = 401;
  177. next(err);
  178. return;
  179. }
  180. req.accountId = result.rows[0].account_id;
  181. req.chosenLanguages = result.rows[0].chosen_languages;
  182. req.allowNotifications = scopes.some(scope => ['read', 'read:notifications'].includes(scope));
  183. next();
  184. });
  185. });
  186. };
  187. const accountFromRequest = (req, next, required = true, allowedScopes = ['read']) => {
  188. const authorization = req.headers.authorization;
  189. const location = url.parse(req.url, true);
  190. const accessToken = location.query.access_token || req.headers['sec-websocket-protocol'];
  191. if (!authorization && !accessToken) {
  192. if (required) {
  193. const err = new Error('Missing access token');
  194. err.statusCode = 401;
  195. next(err);
  196. return;
  197. } else {
  198. next();
  199. return;
  200. }
  201. }
  202. const token = authorization ? authorization.replace(/^Bearer /, '') : accessToken;
  203. accountFromToken(token, allowedScopes, req, next);
  204. };
  205. const PUBLIC_STREAMS = [
  206. 'public',
  207. 'public:media',
  208. 'public:local',
  209. 'public:local:media',
  210. 'hashtag',
  211. 'hashtag:local',
  212. ];
  213. const wsVerifyClient = (info, cb) => {
  214. const location = url.parse(info.req.url, true);
  215. const authRequired = alwaysRequireAuth || !PUBLIC_STREAMS.some(stream => stream === location.query.stream);
  216. const allowedScopes = [];
  217. if (authRequired) {
  218. allowedScopes.push('read');
  219. if (location.query.stream === 'user:notification') {
  220. allowedScopes.push('read:notifications');
  221. } else {
  222. allowedScopes.push('read:statuses');
  223. }
  224. }
  225. accountFromRequest(info.req, err => {
  226. if (!err) {
  227. cb(true, undefined, undefined);
  228. } else {
  229. log.error(info.req.requestId, err.toString());
  230. cb(false, 401, 'Unauthorized');
  231. }
  232. }, authRequired, allowedScopes);
  233. };
  234. const PUBLIC_ENDPOINTS = [
  235. '/api/v1/streaming/public',
  236. '/api/v1/streaming/public/local',
  237. '/api/v1/streaming/hashtag',
  238. '/api/v1/streaming/hashtag/local',
  239. ];
  240. const authenticationMiddleware = (req, res, next) => {
  241. if (req.method === 'OPTIONS') {
  242. next();
  243. return;
  244. }
  245. const authRequired = alwaysRequireAuth || !PUBLIC_ENDPOINTS.some(endpoint => endpoint === req.path);
  246. const allowedScopes = [];
  247. if (authRequired) {
  248. allowedScopes.push('read');
  249. if (req.path === '/api/v1/streaming/user/notification') {
  250. allowedScopes.push('read:notifications');
  251. } else {
  252. allowedScopes.push('read:statuses');
  253. }
  254. }
  255. accountFromRequest(req, next, authRequired, allowedScopes);
  256. };
  257. const errorMiddleware = (err, req, res, {}) => {
  258. log.error(req.requestId, err.toString());
  259. res.writeHead(err.statusCode || 500, { 'Content-Type': 'application/json' });
  260. res.end(JSON.stringify({ error: err.statusCode ? err.toString() : 'An unexpected error occurred' }));
  261. };
  262. const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', ');
  263. const authorizeListAccess = (id, req, next) => {
  264. pgPool.connect((err, client, done) => {
  265. if (err) {
  266. next(false);
  267. return;
  268. }
  269. client.query('SELECT id, account_id FROM lists WHERE id = $1 LIMIT 1', [id], (err, result) => {
  270. done();
  271. if (err || result.rows.length === 0 || result.rows[0].account_id !== req.accountId) {
  272. next(false);
  273. return;
  274. }
  275. next(true);
  276. });
  277. });
  278. };
  279. const streamFrom = (id, req, output, attachCloseHandler, needsFiltering = false, notificationOnly = false) => {
  280. const accountId = req.accountId || req.remoteAddress;
  281. const streamType = notificationOnly ? ' (notification)' : '';
  282. log.verbose(req.requestId, `Starting stream from ${id} for ${accountId}${streamType}`);
  283. const listener = message => {
  284. const { event, payload, queued_at } = JSON.parse(message);
  285. const transmit = () => {
  286. const now = new Date().getTime();
  287. const delta = now - queued_at;
  288. const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload;
  289. log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload} Delay: ${delta}ms`);
  290. output(event, encodedPayload);
  291. };
  292. if (notificationOnly && event !== 'notification') {
  293. return;
  294. }
  295. if (event === 'notification' && !req.allowNotifications) {
  296. return;
  297. }
  298. // Only send local-only statuses to logged-in users
  299. if (event === 'update' && payload.local_only && !req.accountId) {
  300. log.silly(req.requestId, `Message ${payload.id} filtered because it was local-only`);
  301. return;
  302. }
  303. // Only messages that may require filtering are statuses, since notifications
  304. // are already personalized and deletes do not matter
  305. if (!needsFiltering || event !== 'update') {
  306. transmit();
  307. return;
  308. }
  309. const unpackedPayload = payload;
  310. const targetAccountIds = [unpackedPayload.account.id].concat(unpackedPayload.mentions.map(item => item.id));
  311. const accountDomain = unpackedPayload.account.acct.split('@')[1];
  312. if (Array.isArray(req.chosenLanguages) && unpackedPayload.language !== null && req.chosenLanguages.indexOf(unpackedPayload.language) === -1) {
  313. log.silly(req.requestId, `Message ${unpackedPayload.id} filtered by language (${unpackedPayload.language})`);
  314. return;
  315. }
  316. // When the account is not logged in, it is not necessary to confirm the block or mute
  317. if (!req.accountId) {
  318. transmit();
  319. return;
  320. }
  321. pgPool.connect((err, client, done) => {
  322. if (err) {
  323. log.error(err);
  324. return;
  325. }
  326. const queries = [
  327. client.query(`SELECT 1 FROM blocks WHERE (account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)})) OR (account_id = $2 AND target_account_id = $1) UNION SELECT 1 FROM mutes WHERE account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, unpackedPayload.account.id].concat(targetAccountIds)),
  328. ];
  329. if (accountDomain) {
  330. queries.push(client.query('SELECT 1 FROM account_domain_blocks WHERE account_id = $1 AND domain = $2', [req.accountId, accountDomain]));
  331. }
  332. Promise.all(queries).then(values => {
  333. done();
  334. if (values[0].rows.length > 0 || (values.length > 1 && values[1].rows.length > 0)) {
  335. return;
  336. }
  337. transmit();
  338. }).catch(err => {
  339. done();
  340. log.error(err);
  341. });
  342. });
  343. };
  344. subscribe(`${redisPrefix}${id}`, listener);
  345. attachCloseHandler(`${redisPrefix}${id}`, listener);
  346. };
  347. // Setup stream output to HTTP
  348. const streamToHttp = (req, res) => {
  349. const accountId = req.accountId || req.remoteAddress;
  350. res.setHeader('Content-Type', 'text/event-stream');
  351. res.setHeader('Transfer-Encoding', 'chunked');
  352. const heartbeat = setInterval(() => res.write(':thump\n'), 15000);
  353. req.on('close', () => {
  354. log.verbose(req.requestId, `Ending stream for ${accountId}`);
  355. clearInterval(heartbeat);
  356. });
  357. return (event, payload) => {
  358. res.write(`event: ${event}\n`);
  359. res.write(`data: ${payload}\n\n`);
  360. };
  361. };
  362. // Setup stream end for HTTP
  363. const streamHttpEnd = (req, closeHandler = false) => (id, listener) => {
  364. req.on('close', () => {
  365. unsubscribe(id, listener);
  366. if (closeHandler) {
  367. closeHandler();
  368. }
  369. });
  370. };
  371. // Setup stream output to WebSockets
  372. const streamToWs = (req, ws) => (event, payload) => {
  373. if (ws.readyState !== ws.OPEN) {
  374. log.error(req.requestId, 'Tried writing to closed socket');
  375. return;
  376. }
  377. ws.send(JSON.stringify({ event, payload }));
  378. };
  379. // Setup stream end for WebSockets
  380. const streamWsEnd = (req, ws, closeHandler = false) => (id, listener) => {
  381. const accountId = req.accountId || req.remoteAddress;
  382. ws.on('close', () => {
  383. log.verbose(req.requestId, `Ending stream for ${accountId}`);
  384. unsubscribe(id, listener);
  385. if (closeHandler) {
  386. closeHandler();
  387. }
  388. });
  389. ws.on('error', () => {
  390. log.verbose(req.requestId, `Ending stream for ${accountId}`);
  391. unsubscribe(id, listener);
  392. if (closeHandler) {
  393. closeHandler();
  394. }
  395. });
  396. };
  397. const httpNotFound = res => {
  398. res.writeHead(404, { 'Content-Type': 'application/json' });
  399. res.end(JSON.stringify({ error: 'Not found' }));
  400. };
  401. app.use(setRequestId);
  402. app.use(setRemoteAddress);
  403. app.use(allowCrossDomain);
  404. app.get('/api/v1/streaming/health', (req, res) => {
  405. res.writeHead(200, { 'Content-Type': 'text/plain' });
  406. res.end('OK');
  407. });
  408. app.use(authenticationMiddleware);
  409. app.use(errorMiddleware);
  410. app.get('/api/v1/streaming/user', (req, res) => {
  411. const channel = `timeline:${req.accountId}`;
  412. streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req, subscriptionHeartbeat(channel)));
  413. });
  414. app.get('/api/v1/streaming/user/notification', (req, res) => {
  415. streamFrom(`timeline:${req.accountId}`, req, streamToHttp(req, res), streamHttpEnd(req), false, true);
  416. });
  417. app.get('/api/v1/streaming/public', (req, res) => {
  418. const onlyMedia = req.query.only_media === '1' || req.query.only_media === 'true';
  419. const channel = onlyMedia ? 'timeline:public:media' : 'timeline:public';
  420. streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req), true);
  421. });
  422. app.get('/api/v1/streaming/public/local', (req, res) => {
  423. const onlyMedia = req.query.only_media === '1' || req.query.only_media === 'true';
  424. const channel = onlyMedia ? 'timeline:public:local:media' : 'timeline:public:local';
  425. streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req), true);
  426. });
  427. app.get('/api/v1/streaming/direct', (req, res) => {
  428. const channel = `timeline:direct:${req.accountId}`;
  429. streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req, subscriptionHeartbeat(channel)), true);
  430. });
  431. app.get('/api/v1/streaming/hashtag', (req, res) => {
  432. const { tag } = req.query;
  433. if (!tag || tag.length === 0) {
  434. httpNotFound(res);
  435. return;
  436. }
  437. streamFrom(`timeline:hashtag:${tag.toLowerCase()}`, req, streamToHttp(req, res), streamHttpEnd(req), true);
  438. });
  439. app.get('/api/v1/streaming/hashtag/local', (req, res) => {
  440. const { tag } = req.query;
  441. if (!tag || tag.length === 0) {
  442. httpNotFound(res);
  443. return;
  444. }
  445. streamFrom(`timeline:hashtag:${tag.toLowerCase()}:local`, req, streamToHttp(req, res), streamHttpEnd(req), true);
  446. });
  447. app.get('/api/v1/streaming/list', (req, res) => {
  448. const listId = req.query.list;
  449. authorizeListAccess(listId, req, authorized => {
  450. if (!authorized) {
  451. httpNotFound(res);
  452. return;
  453. }
  454. const channel = `timeline:list:${listId}`;
  455. streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req, subscriptionHeartbeat(channel)));
  456. });
  457. });
  458. const wss = new WebSocketServer({ server, verifyClient: wsVerifyClient });
  459. wss.on('connection', (ws, req) => {
  460. const location = url.parse(req.url, true);
  461. req.requestId = uuid.v4();
  462. req.remoteAddress = ws._socket.remoteAddress;
  463. let channel;
  464. switch(location.query.stream) {
  465. case 'user':
  466. channel = `timeline:${req.accountId}`;
  467. streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)));
  468. break;
  469. case 'user:notification':
  470. streamFrom(`timeline:${req.accountId}`, req, streamToWs(req, ws), streamWsEnd(req, ws), false, true);
  471. break;
  472. case 'public':
  473. streamFrom('timeline:public', req, streamToWs(req, ws), streamWsEnd(req, ws), true);
  474. break;
  475. case 'public:local':
  476. streamFrom('timeline:public:local', req, streamToWs(req, ws), streamWsEnd(req, ws), true);
  477. break;
  478. case 'public:media':
  479. streamFrom('timeline:public:media', req, streamToWs(req, ws), streamWsEnd(req, ws), true);
  480. break;
  481. case 'public:local:media':
  482. streamFrom('timeline:public:local:media', req, streamToWs(req, ws), streamWsEnd(req, ws), true);
  483. break;
  484. case 'direct':
  485. channel = `timeline:direct:${req.accountId}`;
  486. streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)), true);
  487. break;
  488. case 'hashtag':
  489. if (!location.query.tag || location.query.tag.length === 0) {
  490. ws.close();
  491. return;
  492. }
  493. streamFrom(`timeline:hashtag:${location.query.tag.toLowerCase()}`, req, streamToWs(req, ws), streamWsEnd(req, ws), true);
  494. break;
  495. case 'hashtag:local':
  496. if (!location.query.tag || location.query.tag.length === 0) {
  497. ws.close();
  498. return;
  499. }
  500. streamFrom(`timeline:hashtag:${location.query.tag.toLowerCase()}:local`, req, streamToWs(req, ws), streamWsEnd(req, ws), true);
  501. break;
  502. case 'list':
  503. const listId = location.query.list;
  504. authorizeListAccess(listId, req, authorized => {
  505. if (!authorized) {
  506. ws.close();
  507. return;
  508. }
  509. channel = `timeline:list:${listId}`;
  510. streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)));
  511. });
  512. break;
  513. default:
  514. ws.close();
  515. }
  516. });
  517. wss.startAutoPing(30000);
  518. attachServerWithConfig(server, address => {
  519. log.info(`Worker ${workerId} now listening on ${address}`);
  520. });
  521. const onExit = () => {
  522. log.info(`Worker ${workerId} exiting, bye bye`);
  523. server.close();
  524. process.exit(0);
  525. };
  526. const onError = (err) => {
  527. log.error(err);
  528. server.close();
  529. process.exit(0);
  530. };
  531. process.on('SIGINT', onExit);
  532. process.on('SIGTERM', onExit);
  533. process.on('exit', onExit);
  534. process.on('uncaughtException', onError);
  535. };
  536. const attachServerWithConfig = (server, onSuccess) => {
  537. if (process.env.SOCKET || process.env.PORT && isNaN(+process.env.PORT)) {
  538. server.listen(process.env.SOCKET || process.env.PORT, () => {
  539. if (onSuccess) {
  540. fs.chmodSync(server.address(), 0o666);
  541. onSuccess(server.address());
  542. }
  543. });
  544. } else {
  545. server.listen(+process.env.PORT || 4000, process.env.BIND || '127.0.0.1', () => {
  546. if (onSuccess) {
  547. onSuccess(`${server.address().address}:${server.address().port}`);
  548. }
  549. });
  550. }
  551. };
  552. const onPortAvailable = onSuccess => {
  553. const testServer = http.createServer();
  554. testServer.once('error', err => {
  555. onSuccess(err);
  556. });
  557. testServer.once('listening', () => {
  558. testServer.once('close', () => onSuccess());
  559. testServer.close();
  560. });
  561. attachServerWithConfig(testServer);
  562. };
  563. onPortAvailable(err => {
  564. if (err) {
  565. log.error('Could not start server, the port or socket is in use');
  566. return;
  567. }
  568. throng({
  569. workers: numWorkers,
  570. lifetime: Infinity,
  571. start: startWorker,
  572. master: startMaster,
  573. });
  574. });