@ -63,20 +63,29 @@ const dbUrlToConfig = (dbUrl) => {
* @ param { Object . < string , any > } defaultConfig
* @ param { string } redisUrl
* /
const redisUrlToClient = ( defaultConfig , redisUrl ) => {
const redisUrlToClient = async ( defaultConfig , redisUrl ) => {
const config = defaultConfig ;
let client ;
if ( ! redisUrl ) {
return redis . createClient ( config ) ;
client = redis . createClient ( config ) ;
} else if ( redisUrl . startsWith ( 'unix://' ) ) {
client = redis . createClient ( Object . assign ( config , {
socket : {
path : redisUrl . slice ( 7 ) ,
} ,
} ) ) ;
} else {
client = redis . createClient ( Object . assign ( config , {
url : redisUrl ,
} ) ) ;
}
if ( redisUrl . startsWith ( 'unix://' ) ) {
return redis . createClient ( redisUrl . slice ( 7 ) , config ) ;
}
client . on ( 'error' , ( err ) => log . error ( 'Redis Client Error!' , err ) ) ;
await client . connect ( ) ;
return redis . createClient ( Object . assign ( config , {
url : redisUrl ,
} ) ) ;
return client ;
} ;
const numWorkers = + process . env . STREAMING_CLUSTER_NUM || ( env === 'development' ? 1 : Math . max ( os . cpus ( ) . length - 1 , 1 ) ) ;
@ -102,7 +111,7 @@ const startMaster = () => {
log . warn ( ` Starting streaming API server master with ${ numWorkers } workers ` ) ;
} ;
const startWorker = ( workerId ) => {
const startWorker = async ( workerId ) => {
log . warn ( ` Starting worker ${ workerId } ` ) ;
const pgConfigs = {
@ -127,7 +136,7 @@ const startWorker = (workerId) => {
if ( ! ! process . env . DB_SSLMODE && process . env . DB_SSLMODE !== 'disable' ) {
pgConfigs . development . ssl = true ;
pgConfigs . production . ssl = true ;
pgConfigs . production . ssl = true ;
}
const app = express ( ) ;
@ -139,9 +148,11 @@ const startWorker = (workerId) => {
const redisNamespace = process . env . REDIS_NAMESPACE || null ;
const redisParams = {
host : process . env . REDIS_HOST || '127.0.0.1' ,
port : process . env . REDIS_PORT || 6379 ,
db : process . env . REDIS_DB || 0 ,
socket : {
host : process . env . REDIS_HOST || '127.0.0.1' ,
port : process . env . REDIS_PORT || 6379 ,
} ,
database : process . env . REDIS_DB || 0 ,
password : process . env . REDIS_PASSWORD || undefined ,
} ;
@ -151,25 +162,8 @@ const startWorker = (workerId) => {
const redisPrefix = redisNamespace ? ` ${ redisNamespace } : ` : '' ;
const redisSubscribeClient = redisUrlToClient ( redisParams , process . env . REDIS_URL ) ;
const redisClient = redisUrlToClient ( redisParams , process . env . REDIS_URL ) ;
/ * *
* @ type { Object . < string , Array . < function ( string ) : void >> }
* /
const subs = { } ;
redisSubscribeClient . on ( 'message' , ( channel , message ) => {
const callbacks = subs [ channel ] ;
log . silly ( ` New message on channel ${ channel } ` ) ;
if ( ! callbacks ) {
return ;
}
callbacks . forEach ( callback => callback ( message ) ) ;
} ) ;
const redisSubscribeClient = await redisUrlToClient ( redisParams , process . env . REDIS_URL ) ;
const redisClient = await redisUrlToClient ( redisParams , process . env . REDIS_URL ) ;
/ * *
* @ param { string [ ] } channels
@ -197,34 +191,16 @@ const startWorker = (workerId) => {
* /
const subscribe = ( channel , callback ) => {
log . silly ( ` Adding listener for ${ channel } ` ) ;
subs [ channel ] = subs [ channel ] || [ ] ;
if ( subs [ channel ] . length === 0 ) {
log . verbose ( ` Subscribe ${ channel } ` ) ;
redisSubscribeClient . subscribe ( channel ) ;
}
subs [ channel ] . push ( callback ) ;
redisSubscribeClient . subscribe ( channel , callback ) ;
} ;
/ * *
* @ param { string } channel
* @ param { function ( string ) : void } callback
* /
const unsubscribe = ( channel , callback ) => {
log . silly ( ` Removing listener for ${ channel } ` ) ;
if ( ! subs [ channel ] ) {
return ;
}
const unsubscribe = ( channel ) => {
subs [ channel ] = subs [ channel ] . filter ( item => item !== callback ) ;
if ( subs [ channel ] . length === 0 ) {
log . verbose ( ` Unsubscribe ${ channel } ` ) ;
redisSubscribeClient . unsubscribe ( channel ) ;
delete subs [ channel ] ;
}
redisSubscribeClient . unsubscribe ( channel ) ;
} ;
const FALSE_VALUES = [
@ -366,7 +342,7 @@ const startWorker = (workerId) => {
const onlyMedia = isTruthy ( query . only_media ) ;
const allowLocalOnly = isTruthy ( query . allow_local_only ) ;
switch ( path ) {
switch ( path ) {
case '/api/v1/streaming/user' :
return 'user' ;
case '/api/v1/streaming/user/notification' :
@ -497,7 +473,7 @@ const startWorker = (workerId) => {
const listener = createSystemMessageListener ( req , {
onKill ( ) {
onKill ( ) {
res . end ( ) ;
} ,
@ -549,7 +525,7 @@ const startWorker = (workerId) => {
} ;
/ * *
* @ param { array }
* @ param { array } arr
* @ param { number = } shift
* @ return { string }
* /
@ -592,7 +568,7 @@ const startWorker = (workerId) => {
* @ return { function ( string ) : void }
* /
const streamFrom = ( ids , req , output , attachCloseHandler , needsFiltering = false , allowLocalOnly = false ) => {
const accountId = req . accountId || req . remoteAddress ;
const accountId = req . accountId || req . remoteAddress ;
log . verbose ( req . requestId , ` Starting stream from ${ ids . join ( ', ' ) } for ${ accountId } ` ) ;
@ -604,8 +580,8 @@ const startWorker = (workerId) => {
const { event , payload , queued_at } = json ;
const transmit = ( ) => {
const now = new Date ( ) . getTime ( ) ;
const delta = now - queued_at ;
const now = new Date ( ) . getTime ( ) ;
const delta = now - queued_at ;
const encodedPayload = typeof payload === 'object' ? JSON . stringify ( payload ) : payload ;
log . silly ( req . requestId , ` Transmitting for ${ accountId } : ${ event } ${ encodedPayload } Delay: ${ delta } ms ` ) ;
@ -625,9 +601,9 @@ const startWorker = (workerId) => {
return ;
}
const unpackedPayload = payload ;
const unpackedPayload = payload ;
const targetAccountIds = [ unpackedPayload . account . id ] . concat ( unpackedPayload . mentions . map ( item => item . id ) ) ;
const accountDomain = unpackedPayload . account . acct . split ( '@' ) [ 1 ] ;
const accountDomain = unpackedPayload . account . acct . split ( '@' ) [ 1 ] ;
if ( Array . isArray ( req . chosenLanguages ) && unpackedPayload . language !== null && req . chosenLanguages . indexOf ( unpackedPayload . language ) === - 1 ) {
log . silly ( req . requestId , ` Message ${ unpackedPayload . id } filtered by language ( ${ unpackedPayload . language } ) ` ) ;
@ -647,7 +623,15 @@ const startWorker = (workerId) => {
}
const queries = [
client . query ( ` SELECT 1 FROM blocks WHERE (account_id = $ 1 AND target_account_id IN ( ${ placeholders ( targetAccountIds , 2 ) } )) OR (account_id = $ 2 AND target_account_id = $ 1) UNION SELECT 1 FROM mutes WHERE account_id = $ 1 AND target_account_id IN ( ${ placeholders ( targetAccountIds , 2 ) } ) ` , [ req . accountId , unpackedPayload . account . id ] . concat ( targetAccountIds ) ) ,
client . query ( ` SELECT 1
FROM blocks
WHERE ( account_id = $1 AND target_account_id IN ( $ { placeholders ( targetAccountIds , 2 ) } ) )
OR ( account_id = $2 AND target_account_id = $1 )
UNION
SELECT 1
FROM mutes
WHERE account_id = $1
AND target_account_id IN ( $ { placeholders ( targetAccountIds , 2 ) } ) ` , [req.accountId, unpackedPayload.account.id].concat(targetAccountIds)),
] ;
if ( accountDomain ) {
@ -710,12 +694,12 @@ const startWorker = (workerId) => {
/ * *
* @ param { any } req
* @ param { function ( ) : void } [ closeHandler ]
* @ return { function ( string [ ] , function ( string ): void ) }
* @ return { function ( string [ ] ) : void }
* /
const streamHttpEnd = ( req , closeHandler = undefined ) => ( ids , listener ) => {
const streamHttpEnd = ( req , closeHandler = undefined ) => ( ids ) => {
req . on ( 'close' , ( ) => {
ids . forEach ( id => {
unsubscribe ( id , listener );
unsubscribe ( id ) ;
} ) ;
if ( closeHandler ) {
@ -762,7 +746,7 @@ const startWorker = (workerId) => {
app . get ( '/api/v1/streaming/*' , ( req , res ) => {
channelNameToIds ( req , channelNameFromPath ( req ) , req . query ) . then ( ( { channelIds , options } ) => {
const onSend = streamToHttp ( req , res ) ;
const onEnd = streamHttpEnd ( req , subscriptionHeartbeat ( channelIds ) ) ;
const onEnd = streamHttpEnd ( req , subscriptionHeartbeat ( channelIds ) ) ;
streamFrom ( channelIds , req , onSend , onEnd , options . needsFiltering , options . allowLocalOnly ) ;
} ) . catch ( err => {
@ -805,7 +789,7 @@ const startWorker = (workerId) => {
* @ return { Promise . < { channelIds : string [ ] , options : { needsFiltering : boolean } } > }
* /
const channelNameToIds = ( req , name , params ) => new Promise ( ( resolve , reject ) => {
switch ( name ) {
switch ( name ) {
case 'user' :
resolve ( {
channelIds : channelsForUserStream ( req ) ,
@ -949,15 +933,17 @@ const startWorker = (workerId) => {
* @ param { StreamParams } params
* /
const subscribeWebsocketToChannel = ( { socket , request , subscriptions } , channelName , params ) =>
checkScopes ( request , channelName ) . then ( ( ) => channelNameToIds ( request , channelName , params ) ) . then ( ( { channelIds , options } ) => {
checkScopes ( request , channelName ) . then ( ( ) => channelNameToIds ( request , channelName , params ) ) . then ( ( {
channelIds ,
options ,
} ) => {
if ( subscriptions [ channelIds . join ( ';' ) ] ) {
return ;
}
const onSend = streamToWs ( request , socket , streamNameFromChannelName ( channelName , params ) ) ;
const onSend = streamToWs ( request , socket , streamNameFromChannelName ( channelName , params ) ) ;
const stopHeartbeat = subscriptionHeartbeat ( channelIds ) ;
const listener = streamFrom ( channelIds , request , onSend , undefined , options . needsFiltering , options . allowLocalOnly ) ;
const listener = streamFrom ( channelIds , request , onSend , undefined , options . needsFiltering , options . allowLocalOnly ) ;
subscriptions [ channelIds . join ( ';' ) ] = {
listener ,
@ -1005,7 +991,7 @@ const startWorker = (workerId) => {
const listener = createSystemMessageListener ( request , {
onKill ( ) {
onKill ( ) {
socket . close ( ) ;
} ,
@ -1015,7 +1001,8 @@ const startWorker = (workerId) => {
subscriptions [ systemChannelId ] = {
listener ,
stopHeartbeat : ( ) => { } ,
stopHeartbeat : ( ) => {
} ,
} ;
} ;
@ -1034,7 +1021,7 @@ const startWorker = (workerId) => {
wss . on ( 'connection' , ( ws , req ) => {
const location = url . parse ( req . url , true ) ;
req . requestId = uuid . v4 ( ) ;
req . requestId = uuid . v4 ( ) ;
req . remoteAddress = ws . _socket . remoteAddress ;
ws . isAlive = true ;