@ -294,7 +294,7 @@ const startWorker = (workerId) => {
return ;
return ;
}
}
client . query ( 'SELECT oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes, devices.device_id FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id LEFT OUTER JOIN devices ON oauth_access_tokens.id = devices.access_token_id WHERE oauth_access_tokens.token = $1 AND oauth_access_tokens.revoked_at IS NULL LIMIT 1' , [ token ] , ( err , result ) => {
client . query ( 'SELECT oauth_access_tokens.id, oauth_access_tokens. resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes, devices.device_id FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id LEFT OUTER JOIN devices ON oauth_access_tokens.id = devices.access_token_id WHERE oauth_access_tokens.token = $1 AND oauth_access_tokens.revoked_at IS NULL LIMIT 1' , [ token ] , ( err , result ) => {
done ( ) ;
done ( ) ;
if ( err ) {
if ( err ) {
@ -310,6 +310,7 @@ const startWorker = (workerId) => {
return ;
return ;
}
}
req . accessTokenId = result . rows [ 0 ] . id ;
req . scopes = result . rows [ 0 ] . scopes . split ( ' ' ) ;
req . scopes = result . rows [ 0 ] . scopes . split ( ' ' ) ;
req . accountId = result . rows [ 0 ] . account_id ;
req . accountId = result . rows [ 0 ] . account_id ;
req . chosenLanguages = result . rows [ 0 ] . chosen_languages ;
req . chosenLanguages = result . rows [ 0 ] . chosen_languages ;
@ -450,6 +451,55 @@ const startWorker = (workerId) => {
} ) ;
} ) ;
} ;
} ;
/ * *
* @ typedef SystemMessageHandlers
* @ property { function ( ) : void } onKill
* /
/ * *
* @ param { any } req
* @ param { SystemMessageHandlers } eventHandlers
* @ return { function ( string ) : void }
* /
const createSystemMessageListener = ( req , eventHandlers ) => {
return message => {
const json = parseJSON ( message ) ;
if ( ! json ) return ;
const { event } = json ;
log . silly ( req . requestId , ` System message for ${ req . accountId } : ${ event } ` ) ;
if ( event === 'kill' ) {
log . verbose ( req . requestId , ` Closing connection for ${ req . accountId } due to expired access token ` ) ;
eventHandlers . onKill ( ) ;
}
}
} ;
/ * *
* @ param { any } req
* @ param { any } res
* /
const subscribeHttpToSystemChannel = ( req , res ) => {
const systemChannelId = ` timeline:access_token: ${ req . accessTokenId } ` ;
const listener = createSystemMessageListener ( req , {
onKill ( ) {
res . end ( ) ;
} ,
} ) ;
res . on ( 'close' , ( ) => {
unsubscribe ( ` ${ redisPrefix } ${ systemChannelId } ` , listener ) ;
} ) ;
subscribe ( ` ${ redisPrefix } ${ systemChannelId } ` , listener ) ;
} ;
/ * *
/ * *
* @ param { any } req
* @ param { any } req
* @ param { any } res
* @ param { any } res
@ -462,6 +512,8 @@ const startWorker = (workerId) => {
}
}
accountFromRequest ( req , alwaysRequireAuth ) . then ( ( ) => checkScopes ( req , channelNameFromPath ( req ) ) ) . then ( ( ) => {
accountFromRequest ( req , alwaysRequireAuth ) . then ( ( ) => checkScopes ( req , channelNameFromPath ( req ) ) ) . then ( ( ) => {
subscribeHttpToSystemChannel ( req , res ) ;
} ) . then ( ( ) => {
next ( ) ;
next ( ) ;
} ) . catch ( err => {
} ) . catch ( err => {
next ( err ) ;
next ( err ) ;
@ -536,7 +588,9 @@ const startWorker = (workerId) => {
const listener = message => {
const listener = message => {
const json = parseJSON ( message ) ;
const json = parseJSON ( message ) ;
if ( ! json ) return ;
if ( ! json ) return ;
const { event , payload , queued_at } = json ;
const { event , payload , queued_at } = json ;
const transmit = ( ) => {
const transmit = ( ) => {
@ -902,6 +956,28 @@ const startWorker = (workerId) => {
socket . send ( JSON . stringify ( { error : err . toString ( ) } ) ) ;
socket . send ( JSON . stringify ( { error : err . toString ( ) } ) ) ;
} ) ;
} ) ;
/ * *
* @ param { WebSocketSession } session
* /
const subscribeWebsocketToSystemChannel = ( { socket , request , subscriptions } ) => {
const systemChannelId = ` timeline:access_token: ${ request . accessTokenId } ` ;
const listener = createSystemMessageListener ( request , {
onKill ( ) {
socket . close ( ) ;
} ,
} ) ;
subscribe ( ` ${ redisPrefix } ${ systemChannelId } ` , listener ) ;
subscriptions [ systemChannelId ] = {
listener ,
stopHeartbeat : ( ) => { } ,
} ;
} ;
/ * *
/ * *
* @ param { string | string [ ] } arrayOrString
* @ param { string | string [ ] } arrayOrString
* @ return { string }
* @ return { string }
@ -948,7 +1024,9 @@ const startWorker = (workerId) => {
ws . on ( 'message' , data => {
ws . on ( 'message' , data => {
const json = parseJSON ( data ) ;
const json = parseJSON ( data ) ;
if ( ! json ) return ;
if ( ! json ) return ;
const { type , stream , ... params } = json ;
const { type , stream , ... params } = json ;
if ( type === 'subscribe' ) {
if ( type === 'subscribe' ) {
@ -960,6 +1038,8 @@ const startWorker = (workerId) => {
}
}
} ) ;
} ) ;
subscribeWebsocketToSystemChannel ( session ) ;
if ( location . query . stream ) {
if ( location . query . stream ) {
subscribeWebsocketToChannel ( session , firstParam ( location . query . stream ) , location . query ) ;
subscribeWebsocketToChannel ( session , firstParam ( location . query . stream ) , location . query ) ;
}
}