You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

763 lines
22 KiB

  1. const os = require('os');
  2. const throng = require('throng');
  3. const dotenv = require('dotenv');
  4. const express = require('express');
  5. const http = require('http');
  6. const redis = require('redis');
  7. const pg = require('pg');
  8. const log = require('npmlog');
  9. const url = require('url');
  10. const { WebSocketServer } = require('@clusterws/cws');
  11. const uuid = require('uuid');
  12. const fs = require('fs');
  13. const env = process.env.NODE_ENV || 'development';
  14. const alwaysRequireAuth = process.env.WHITELIST_MODE === 'true' || process.env.AUTHORIZED_FETCH === 'true';
  15. dotenv.config({
  16. path: env === 'production' ? '.env.production' : '.env',
  17. });
  18. log.level = process.env.LOG_LEVEL || 'verbose';
  19. const dbUrlToConfig = (dbUrl) => {
  20. if (!dbUrl) {
  21. return {};
  22. }
  23. const params = url.parse(dbUrl, true);
  24. const config = {};
  25. if (params.auth) {
  26. [config.user, config.password] = params.auth.split(':');
  27. }
  28. if (params.hostname) {
  29. config.host = params.hostname;
  30. }
  31. if (params.port) {
  32. config.port = params.port;
  33. }
  34. if (params.pathname) {
  35. config.database = params.pathname.split('/')[1];
  36. }
  37. const ssl = params.query && params.query.ssl;
  38. if (ssl && ssl === 'true' || ssl === '1') {
  39. config.ssl = true;
  40. }
  41. return config;
  42. };
  43. const redisUrlToClient = (defaultConfig, redisUrl) => {
  44. const config = defaultConfig;
  45. if (!redisUrl) {
  46. return redis.createClient(config);
  47. }
  48. if (redisUrl.startsWith('unix://')) {
  49. return redis.createClient(redisUrl.slice(7), config);
  50. }
  51. return redis.createClient(Object.assign(config, {
  52. url: redisUrl,
  53. }));
  54. };
  55. const numWorkers = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1));
  56. const startMaster = () => {
  57. if (!process.env.SOCKET && process.env.PORT && isNaN(+process.env.PORT)) {
  58. log.warn('UNIX domain socket is now supported by using SOCKET. Please migrate from PORT hack.');
  59. }
  60. log.info(`Starting streaming API server master with ${numWorkers} workers`);
  61. };
  62. const startWorker = (workerId) => {
  63. log.info(`Starting worker ${workerId}`);
  64. const pgConfigs = {
  65. development: {
  66. user: process.env.DB_USER || pg.defaults.user,
  67. password: process.env.DB_PASS || pg.defaults.password,
  68. database: process.env.DB_NAME || 'mastodon_development',
  69. host: process.env.DB_HOST || pg.defaults.host,
  70. port: process.env.DB_PORT || pg.defaults.port,
  71. max: 10,
  72. },
  73. production: {
  74. user: process.env.DB_USER || 'mastodon',
  75. password: process.env.DB_PASS || '',
  76. database: process.env.DB_NAME || 'mastodon_production',
  77. host: process.env.DB_HOST || 'localhost',
  78. port: process.env.DB_PORT || 5432,
  79. max: 10,
  80. },
  81. };
  82. if (!!process.env.DB_SSLMODE && process.env.DB_SSLMODE !== 'disable') {
  83. pgConfigs.development.ssl = true;
  84. pgConfigs.production.ssl = true;
  85. }
  86. const app = express();
  87. app.set('trusted proxy', process.env.TRUSTED_PROXY_IP || 'loopback,uniquelocal');
  88. const pgPool = new pg.Pool(Object.assign(pgConfigs[env], dbUrlToConfig(process.env.DATABASE_URL)));
  89. const server = http.createServer(app);
  90. const redisNamespace = process.env.REDIS_NAMESPACE || null;
  91. const redisParams = {
  92. host: process.env.REDIS_HOST || '127.0.0.1',
  93. port: process.env.REDIS_PORT || 6379,
  94. db: process.env.REDIS_DB || 0,
  95. password: process.env.REDIS_PASSWORD || undefined,
  96. };
  97. if (redisNamespace) {
  98. redisParams.namespace = redisNamespace;
  99. }
  100. const redisPrefix = redisNamespace ? `${redisNamespace}:` : '';
  101. const redisSubscribeClient = redisUrlToClient(redisParams, process.env.REDIS_URL);
  102. const redisClient = redisUrlToClient(redisParams, process.env.REDIS_URL);
  103. const subs = {};
  104. redisSubscribeClient.on('message', (channel, message) => {
  105. const callbacks = subs[channel];
  106. log.silly(`New message on channel ${channel}`);
  107. if (!callbacks) {
  108. return;
  109. }
  110. callbacks.forEach(callback => callback(message));
  111. });
  112. const subscriptionHeartbeat = channels => {
  113. if (!Array.isArray(channels)) {
  114. channels = [channels];
  115. }
  116. const interval = 6 * 60;
  117. const tellSubscribed = () => {
  118. channels.forEach(channel => redisClient.set(`${redisPrefix}subscribed:${channel}`, '1', 'EX', interval * 3));
  119. };
  120. tellSubscribed();
  121. const heartbeat = setInterval(tellSubscribed, interval * 1000);
  122. return () => {
  123. clearInterval(heartbeat);
  124. };
  125. };
  126. const subscribe = (channel, callback) => {
  127. log.silly(`Adding listener for ${channel}`);
  128. subs[channel] = subs[channel] || [];
  129. if (subs[channel].length === 0) {
  130. log.verbose(`Subscribe ${channel}`);
  131. redisSubscribeClient.subscribe(channel);
  132. }
  133. subs[channel].push(callback);
  134. };
  135. const unsubscribe = (channel, callback) => {
  136. log.silly(`Removing listener for ${channel}`);
  137. subs[channel] = subs[channel].filter(item => item !== callback);
  138. if (subs[channel].length === 0) {
  139. log.verbose(`Unsubscribe ${channel}`);
  140. redisSubscribeClient.unsubscribe(channel);
  141. }
  142. };
  143. const allowCrossDomain = (req, res, next) => {
  144. res.header('Access-Control-Allow-Origin', '*');
  145. res.header('Access-Control-Allow-Headers', 'Authorization, Accept, Cache-Control');
  146. res.header('Access-Control-Allow-Methods', 'GET, OPTIONS');
  147. next();
  148. };
  149. const setRequestId = (req, res, next) => {
  150. req.requestId = uuid.v4();
  151. res.header('X-Request-Id', req.requestId);
  152. next();
  153. };
  154. const setRemoteAddress = (req, res, next) => {
  155. req.remoteAddress = req.connection.remoteAddress;
  156. next();
  157. };
  158. const accountFromToken = (token, allowedScopes, req, next) => {
  159. pgPool.connect((err, client, done) => {
  160. if (err) {
  161. next(err);
  162. return;
  163. }
  164. client.query('SELECT oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes, devices.device_id FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id LEFT OUTER JOIN devices ON oauth_access_tokens.id = devices.access_token_id WHERE oauth_access_tokens.token = $1 AND oauth_access_tokens.revoked_at IS NULL LIMIT 1', [token], (err, result) => {
  165. done();
  166. if (err) {
  167. next(err);
  168. return;
  169. }
  170. if (result.rows.length === 0) {
  171. err = new Error('Invalid access token');
  172. err.statusCode = 401;
  173. next(err);
  174. return;
  175. }
  176. const scopes = result.rows[0].scopes.split(' ');
  177. if (allowedScopes.size > 0 && !scopes.some(scope => allowedScopes.includes(scope))) {
  178. err = new Error('Access token does not cover required scopes');
  179. err.statusCode = 401;
  180. next(err);
  181. return;
  182. }
  183. req.accountId = result.rows[0].account_id;
  184. req.chosenLanguages = result.rows[0].chosen_languages;
  185. req.allowNotifications = scopes.some(scope => ['read', 'read:notifications'].includes(scope));
  186. req.deviceId = result.rows[0].device_id;
  187. next();
  188. });
  189. });
  190. };
  191. const accountFromRequest = (req, next, required = true, allowedScopes = ['read']) => {
  192. const authorization = req.headers.authorization;
  193. const location = url.parse(req.url, true);
  194. const accessToken = location.query.access_token || req.headers['sec-websocket-protocol'];
  195. if (!authorization && !accessToken) {
  196. if (required) {
  197. const err = new Error('Missing access token');
  198. err.statusCode = 401;
  199. next(err);
  200. return;
  201. } else {
  202. next();
  203. return;
  204. }
  205. }
  206. const token = authorization ? authorization.replace(/^Bearer /, '') : accessToken;
  207. accountFromToken(token, allowedScopes, req, next);
  208. };
  209. const PUBLIC_STREAMS = [
  210. 'public',
  211. 'public:media',
  212. 'public:local',
  213. 'public:local:media',
  214. 'public:remote',
  215. 'public:remote:media',
  216. 'hashtag',
  217. 'hashtag:local',
  218. ];
  219. const wsVerifyClient = (info, cb) => {
  220. const location = url.parse(info.req.url, true);
  221. const authRequired = alwaysRequireAuth || !PUBLIC_STREAMS.some(stream => stream === location.query.stream);
  222. const allowedScopes = [];
  223. if (authRequired) {
  224. allowedScopes.push('read');
  225. if (location.query.stream === 'user:notification') {
  226. allowedScopes.push('read:notifications');
  227. } else {
  228. allowedScopes.push('read:statuses');
  229. }
  230. }
  231. accountFromRequest(info.req, err => {
  232. if (!err) {
  233. cb(true, undefined, undefined);
  234. } else {
  235. log.error(info.req.requestId, err.toString());
  236. cb(false, 401, 'Unauthorized');
  237. }
  238. }, authRequired, allowedScopes);
  239. };
  240. const PUBLIC_ENDPOINTS = [
  241. '/api/v1/streaming/public',
  242. '/api/v1/streaming/public/local',
  243. '/api/v1/streaming/public/remote',
  244. '/api/v1/streaming/hashtag',
  245. '/api/v1/streaming/hashtag/local',
  246. ];
  247. const authenticationMiddleware = (req, res, next) => {
  248. if (req.method === 'OPTIONS') {
  249. next();
  250. return;
  251. }
  252. const authRequired = alwaysRequireAuth || !PUBLIC_ENDPOINTS.some(endpoint => endpoint === req.path);
  253. const allowedScopes = [];
  254. if (authRequired) {
  255. allowedScopes.push('read');
  256. if (req.path === '/api/v1/streaming/user/notification') {
  257. allowedScopes.push('read:notifications');
  258. } else {
  259. allowedScopes.push('read:statuses');
  260. }
  261. }
  262. accountFromRequest(req, next, authRequired, allowedScopes);
  263. };
  264. const errorMiddleware = (err, req, res, {}) => {
  265. log.error(req.requestId, err.toString());
  266. res.writeHead(err.statusCode || 500, { 'Content-Type': 'application/json' });
  267. res.end(JSON.stringify({ error: err.statusCode ? err.toString() : 'An unexpected error occurred' }));
  268. };
  269. const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', ');
  270. const authorizeListAccess = (id, req, next) => {
  271. pgPool.connect((err, client, done) => {
  272. if (err) {
  273. next(false);
  274. return;
  275. }
  276. client.query('SELECT id, account_id FROM lists WHERE id = $1 LIMIT 1', [id], (err, result) => {
  277. done();
  278. if (err || result.rows.length === 0 || result.rows[0].account_id !== req.accountId) {
  279. next(false);
  280. return;
  281. }
  282. next(true);
  283. });
  284. });
  285. };
  286. const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false, notificationOnly = false) => {
  287. const accountId = req.accountId || req.remoteAddress;
  288. const streamType = notificationOnly ? ' (notification)' : '';
  289. if (!Array.isArray(ids)) {
  290. ids = [ids];
  291. }
  292. log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}${streamType}`);
  293. const listener = message => {
  294. const { event, payload, queued_at } = JSON.parse(message);
  295. const transmit = () => {
  296. const now = new Date().getTime();
  297. const delta = now - queued_at;
  298. const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload;
  299. log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload} Delay: ${delta}ms`);
  300. output(event, encodedPayload);
  301. };
  302. if (notificationOnly && event !== 'notification') {
  303. return;
  304. }
  305. if (event === 'notification' && !req.allowNotifications) {
  306. return;
  307. }
  308. // Only messages that may require filtering are statuses, since notifications
  309. // are already personalized and deletes do not matter
  310. if (!needsFiltering || event !== 'update') {
  311. transmit();
  312. return;
  313. }
  314. const unpackedPayload = payload;
  315. const targetAccountIds = [unpackedPayload.account.id].concat(unpackedPayload.mentions.map(item => item.id));
  316. const accountDomain = unpackedPayload.account.acct.split('@')[1];
  317. if (Array.isArray(req.chosenLanguages) && unpackedPayload.language !== null && req.chosenLanguages.indexOf(unpackedPayload.language) === -1) {
  318. log.silly(req.requestId, `Message ${unpackedPayload.id} filtered by language (${unpackedPayload.language})`);
  319. return;
  320. }
  321. // When the account is not logged in, it is not necessary to confirm the block or mute
  322. if (!req.accountId) {
  323. transmit();
  324. return;
  325. }
  326. pgPool.connect((err, client, done) => {
  327. if (err) {
  328. log.error(err);
  329. return;
  330. }
  331. const queries = [
  332. client.query(`SELECT 1 FROM blocks WHERE (account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)})) OR (account_id = $2 AND target_account_id = $1) UNION SELECT 1 FROM mutes WHERE account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, unpackedPayload.account.id].concat(targetAccountIds)),
  333. ];
  334. if (accountDomain) {
  335. queries.push(client.query('SELECT 1 FROM account_domain_blocks WHERE account_id = $1 AND domain = $2', [req.accountId, accountDomain]));
  336. }
  337. Promise.all(queries).then(values => {
  338. done();
  339. if (values[0].rows.length > 0 || (values.length > 1 && values[1].rows.length > 0)) {
  340. return;
  341. }
  342. transmit();
  343. }).catch(err => {
  344. done();
  345. log.error(err);
  346. });
  347. });
  348. };
  349. ids.forEach(id => {
  350. subscribe(`${redisPrefix}${id}`, listener);
  351. });
  352. attachCloseHandler(ids.map(id => `${redisPrefix}${id}`), listener);
  353. };
  354. // Setup stream output to HTTP
  355. const streamToHttp = (req, res) => {
  356. const accountId = req.accountId || req.remoteAddress;
  357. res.setHeader('Content-Type', 'text/event-stream');
  358. res.setHeader('Cache-Control', 'no-store');
  359. res.setHeader('Transfer-Encoding', 'chunked');
  360. res.write(':)\n');
  361. const heartbeat = setInterval(() => res.write(':thump\n'), 15000);
  362. req.on('close', () => {
  363. log.verbose(req.requestId, `Ending stream for ${accountId}`);
  364. clearInterval(heartbeat);
  365. });
  366. return (event, payload) => {
  367. res.write(`event: ${event}\n`);
  368. res.write(`data: ${payload}\n\n`);
  369. };
  370. };
  371. // Setup stream end for HTTP
  372. const streamHttpEnd = (req, closeHandler = false) => (ids, listener) => {
  373. if (!Array.isArray(ids)) {
  374. ids = [ids];
  375. }
  376. req.on('close', () => {
  377. ids.forEach(id => {
  378. unsubscribe(id, listener);
  379. });
  380. if (closeHandler) {
  381. closeHandler();
  382. }
  383. });
  384. };
  385. // Setup stream output to WebSockets
  386. const streamToWs = (req, ws) => (event, payload) => {
  387. if (ws.readyState !== ws.OPEN) {
  388. log.error(req.requestId, 'Tried writing to closed socket');
  389. return;
  390. }
  391. ws.send(JSON.stringify({ event, payload }));
  392. };
  393. // Setup stream end for WebSockets
  394. const streamWsEnd = (req, ws, closeHandler = false) => (id, listener) => {
  395. const accountId = req.accountId || req.remoteAddress;
  396. ws.on('close', () => {
  397. log.verbose(req.requestId, `Ending stream for ${accountId}`);
  398. unsubscribe(id, listener);
  399. if (closeHandler) {
  400. closeHandler();
  401. }
  402. });
  403. ws.on('error', () => {
  404. log.verbose(req.requestId, `Ending stream for ${accountId}`);
  405. unsubscribe(id, listener);
  406. if (closeHandler) {
  407. closeHandler();
  408. }
  409. });
  410. };
  411. const httpNotFound = res => {
  412. res.writeHead(404, { 'Content-Type': 'application/json' });
  413. res.end(JSON.stringify({ error: 'Not found' }));
  414. };
  415. app.use(setRequestId);
  416. app.use(setRemoteAddress);
  417. app.use(allowCrossDomain);
  418. app.get('/api/v1/streaming/health', (req, res) => {
  419. res.writeHead(200, { 'Content-Type': 'text/plain' });
  420. res.end('OK');
  421. });
  422. app.use(authenticationMiddleware);
  423. app.use(errorMiddleware);
  424. app.get('/api/v1/streaming/user', (req, res) => {
  425. const channels = [`timeline:${req.accountId}`];
  426. if (req.deviceId) {
  427. channels.push(`timeline:${req.accountId}:${req.deviceId}`);
  428. }
  429. streamFrom(channels, req, streamToHttp(req, res), streamHttpEnd(req, subscriptionHeartbeat(channels)));
  430. });
  431. app.get('/api/v1/streaming/user/notification', (req, res) => {
  432. streamFrom(`timeline:${req.accountId}`, req, streamToHttp(req, res), streamHttpEnd(req), false, true);
  433. });
  434. app.get('/api/v1/streaming/public', (req, res) => {
  435. const onlyMedia = req.query.only_media === '1' || req.query.only_media === 'true';
  436. const channel = onlyMedia ? 'timeline:public:media' : 'timeline:public';
  437. streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req), true);
  438. });
  439. app.get('/api/v1/streaming/public/local', (req, res) => {
  440. const onlyMedia = req.query.only_media === '1' || req.query.only_media === 'true';
  441. const channel = onlyMedia ? 'timeline:public:local:media' : 'timeline:public:local';
  442. streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req), true);
  443. });
  444. app.get('/api/v1/streaming/public/remote', (req, res) => {
  445. const onlyMedia = req.query.only_media === '1' || req.query.only_media === 'true';
  446. const channel = onlyMedia ? 'timeline:public:remote:media' : 'timeline:public:remote';
  447. streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req), true);
  448. });
  449. app.get('/api/v1/streaming/direct', (req, res) => {
  450. const channel = `timeline:direct:${req.accountId}`;
  451. streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req, subscriptionHeartbeat(channel)), true);
  452. });
  453. app.get('/api/v1/streaming/hashtag', (req, res) => {
  454. const { tag } = req.query;
  455. if (!tag || tag.length === 0) {
  456. httpNotFound(res);
  457. return;
  458. }
  459. streamFrom(`timeline:hashtag:${tag.toLowerCase()}`, req, streamToHttp(req, res), streamHttpEnd(req), true);
  460. });
  461. app.get('/api/v1/streaming/hashtag/local', (req, res) => {
  462. const { tag } = req.query;
  463. if (!tag || tag.length === 0) {
  464. httpNotFound(res);
  465. return;
  466. }
  467. streamFrom(`timeline:hashtag:${tag.toLowerCase()}:local`, req, streamToHttp(req, res), streamHttpEnd(req), true);
  468. });
  469. app.get('/api/v1/streaming/list', (req, res) => {
  470. const listId = req.query.list;
  471. authorizeListAccess(listId, req, authorized => {
  472. if (!authorized) {
  473. httpNotFound(res);
  474. return;
  475. }
  476. const channel = `timeline:list:${listId}`;
  477. streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req, subscriptionHeartbeat(channel)));
  478. });
  479. });
  480. const wss = new WebSocketServer({ server, verifyClient: wsVerifyClient });
  481. wss.on('connection', (ws, req) => {
  482. const location = url.parse(req.url, true);
  483. req.requestId = uuid.v4();
  484. req.remoteAddress = ws._socket.remoteAddress;
  485. let channel;
  486. switch(location.query.stream) {
  487. case 'user':
  488. channel = [`timeline:${req.accountId}`];
  489. if (req.deviceId) {
  490. channel.push(`timeline:${req.accountId}:${req.deviceId}`);
  491. }
  492. streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)));
  493. break;
  494. case 'user:notification':
  495. streamFrom(`timeline:${req.accountId}`, req, streamToWs(req, ws), streamWsEnd(req, ws), false, true);
  496. break;
  497. case 'public':
  498. streamFrom('timeline:public', req, streamToWs(req, ws), streamWsEnd(req, ws), true);
  499. break;
  500. case 'public:local':
  501. streamFrom('timeline:public:local', req, streamToWs(req, ws), streamWsEnd(req, ws), true);
  502. break;
  503. case 'public:remote':
  504. streamFrom('timeline:public:remote', req, streamToWs(req, ws), streamWsEnd(req, ws), true);
  505. break;
  506. case 'public:media':
  507. streamFrom('timeline:public:media', req, streamToWs(req, ws), streamWsEnd(req, ws), true);
  508. break;
  509. case 'public:local:media':
  510. streamFrom('timeline:public:local:media', req, streamToWs(req, ws), streamWsEnd(req, ws), true);
  511. break;
  512. case 'public:remote:media':
  513. streamFrom('timeline:public:remote:media', req, streamToWs(req, ws), streamWsEnd(req, ws), true);
  514. break;
  515. case 'direct':
  516. channel = `timeline:direct:${req.accountId}`;
  517. streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)), true);
  518. break;
  519. case 'hashtag':
  520. if (!location.query.tag || location.query.tag.length === 0) {
  521. ws.close();
  522. return;
  523. }
  524. streamFrom(`timeline:hashtag:${location.query.tag.toLowerCase()}`, req, streamToWs(req, ws), streamWsEnd(req, ws), true);
  525. break;
  526. case 'hashtag:local':
  527. if (!location.query.tag || location.query.tag.length === 0) {
  528. ws.close();
  529. return;
  530. }
  531. streamFrom(`timeline:hashtag:${location.query.tag.toLowerCase()}:local`, req, streamToWs(req, ws), streamWsEnd(req, ws), true);
  532. break;
  533. case 'list':
  534. const listId = location.query.list;
  535. authorizeListAccess(listId, req, authorized => {
  536. if (!authorized) {
  537. ws.close();
  538. return;
  539. }
  540. channel = `timeline:list:${listId}`;
  541. streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)));
  542. });
  543. break;
  544. default:
  545. ws.close();
  546. }
  547. });
  548. wss.startAutoPing(30000);
  549. attachServerWithConfig(server, address => {
  550. log.info(`Worker ${workerId} now listening on ${address}`);
  551. });
  552. const onExit = () => {
  553. log.info(`Worker ${workerId} exiting, bye bye`);
  554. server.close();
  555. process.exit(0);
  556. };
  557. const onError = (err) => {
  558. log.error(err);
  559. server.close();
  560. process.exit(0);
  561. };
  562. process.on('SIGINT', onExit);
  563. process.on('SIGTERM', onExit);
  564. process.on('exit', onExit);
  565. process.on('uncaughtException', onError);
  566. };
  567. const attachServerWithConfig = (server, onSuccess) => {
  568. if (process.env.SOCKET || process.env.PORT && isNaN(+process.env.PORT)) {
  569. server.listen(process.env.SOCKET || process.env.PORT, () => {
  570. if (onSuccess) {
  571. fs.chmodSync(server.address(), 0o666);
  572. onSuccess(server.address());
  573. }
  574. });
  575. } else {
  576. server.listen(+process.env.PORT || 4000, process.env.BIND || '127.0.0.1', () => {
  577. if (onSuccess) {
  578. onSuccess(`${server.address().address}:${server.address().port}`);
  579. }
  580. });
  581. }
  582. };
  583. const onPortAvailable = onSuccess => {
  584. const testServer = http.createServer();
  585. testServer.once('error', err => {
  586. onSuccess(err);
  587. });
  588. testServer.once('listening', () => {
  589. testServer.once('close', () => onSuccess());
  590. testServer.close();
  591. });
  592. attachServerWithConfig(testServer);
  593. };
  594. onPortAvailable(err => {
  595. if (err) {
  596. log.error('Could not start server, the port or socket is in use');
  597. return;
  598. }
  599. throng({
  600. workers: numWorkers,
  601. lifetime: Infinity,
  602. start: startWorker,
  603. master: startMaster,
  604. });
  605. });