@ -48,14 +48,25 @@ initializeLogLevel(process.env, environment);
* @ property { number } permissions
* /
/ * *
* @ typedef { http . IncomingMessage & ResolvedAccount & {
* path : string
* query : Record < string , unknown >
* remoteAddress ? : string
* cachedFilters : unknown
* scopes : string [ ]
* necessaryScopes : string [ ]
* } } Request
* /
/ * *
* Attempts to safely parse a string as JSON , used when both receiving a message
* from redis and when receiving a message from a client over a websocket
* connection , this is why it accepts a ` req ` argument .
* @ param { string } json
* @ param { any ? } req
* @ returns { Object . < string , any > | null }
* @ param { Request ? } req
* @ returns { Object . < string , unknown > | null }
* /
const parseJSON = ( json , req ) => {
try {
@ -172,6 +183,7 @@ const startServer = async () => {
let resolvedAccount ;
try {
// @ts-expect-error
resolvedAccount = await accountFromRequest ( request ) ;
} catch ( err ) {
// Unfortunately for using the on('upgrade') setup, we need to manually
@ -222,7 +234,7 @@ const startServer = async () => {
} ) ;
/ * *
* @ type { Object . < string , Array . < function ( Object < string , any > ) : void >> }
* @ type { Object . < string , Array . < function ( Object < string , unknown > ) : void >> }
* /
const subs = { } ;
@ -340,7 +352,7 @@ const startServer = async () => {
} ;
/ * *
* @ param { http. IncomingMessage & ResolvedAccoun t} req
* @ param { Reques t} req
* @ param { string [ ] } necessaryScopes
* @ returns { boolean }
* /
@ -349,7 +361,7 @@ const startServer = async () => {
/ * *
* @ param { string } token
* @ param { any } req
* @ param { Request } req
* @ returns { Promise < ResolvedAccount > }
* /
const accountFromToken = async ( token , req ) => {
@ -374,13 +386,13 @@ const startServer = async () => {
} ;
/ * *
* @ param { any } req
* @ param { Request } req
* @ returns { Promise < ResolvedAccount > }
* /
const accountFromRequest = ( req ) => new Promise ( ( resolve , reject ) => {
const authorization = req . headers . authorization ;
const location = url. parse ( req . url , true ) ;
const accessToken = location . query . access _token || req . headers [ 'sec-websocket-protocol' ] ;
const location = req. url ? url. parse ( req . url , true ) : undefined ;
const accessToken = location ? . query . access _token || req . headers [ 'sec-websocket-protocol' ] ;
if ( ! authorization && ! accessToken ) {
reject ( new AuthenticationError ( 'Missing access token' ) ) ;
@ -389,11 +401,12 @@ const startServer = async () => {
const token = authorization ? authorization . replace ( /^Bearer / , '' ) : accessToken ;
// @ts-expect-error
resolve ( accountFromToken ( token , req ) ) ;
} ) ;
/ * *
* @ param { any } req
* @ param { Request } req
* @ returns { string | undefined }
* /
const channelNameFromPath = req => {
@ -425,7 +438,7 @@ const startServer = async () => {
} ;
/ * *
* @ param { http. IncomingMessage & ResolvedAccoun t} req
* @ param { Reques t} req
* @ param { import ( 'pino' ) . Logger } logger
* @ param { string | undefined } channelName
* @ returns { Promise . < void > }
@ -463,7 +476,7 @@ const startServer = async () => {
* /
/ * *
* @ param { any } req
* @ param { Request } req
* @ param { SystemMessageHandlers } eventHandlers
* @ returns { SubscriptionListener }
* /
@ -488,7 +501,7 @@ const startServer = async () => {
} ;
/ * *
* @ param { http. IncomingMessage & ResolvedAccoun t} req
* @ param { Reques t} req
* @ param { http . OutgoingMessage } res
* /
const subscribeHttpToSystemChannel = ( req , res ) => {
@ -515,8 +528,8 @@ const startServer = async () => {
} ;
/ * *
* @ param { any } req
* @ param { any } res
* @ param { Request } req
* @ param { http. ServerResponse } res
* @ param { function ( Error = ) : void } next
* /
const authenticationMiddleware = ( req , res , next ) => {
@ -545,8 +558,8 @@ const startServer = async () => {
/ * *
* @ param { Error } err
* @ param { any } req
* @ param { any } res
* @ param { Request } req
* @ param { http. ServerResponse } res
* @ param { function ( Error = ) : void } next
* /
const errorMiddleware = ( err , req , res , next ) => {
@ -564,16 +577,15 @@ const startServer = async () => {
} ;
/ * *
* @ param { any [ ] } arr
* @ param { string [ ] } arr
* @ param { number = } shift
* @ returns { string }
* /
// @ts-ignore
const placeholders = ( arr , shift = 0 ) => arr . map ( ( _ , i ) => ` $ ${ i + 1 + shift } ` ) . join ( ', ' ) ;
/ * *
* @ param { string } listId
* @ param { any } req
* @ param { Request } req
* @ returns { Promise . < void > }
* /
const authorizeListAccess = async ( listId , req ) => {
@ -623,7 +635,7 @@ const startServer = async () => {
/ * *
* @ param { string [ ] } channelIds
* @ param { http. IncomingMessage & ResolvedAccoun t} req
* @ param { Reques t} req
* @ param { import ( 'pino' ) . Logger } log
* @ param { function ( string , string ) : void } output
* @ param { undefined | function ( string [ ] , SubscriptionListener ) : void } attachCloseHandler
@ -675,6 +687,7 @@ const startServer = async () => {
// The channels that need filtering are determined in the function
// `channelNameToIds` defined below:
if ( ! needsFiltering || ( event !== 'update' && event !== 'status.update' ) ) {
// @ts-expect-error
transmit ( event , payload ) ;
return ;
}
@ -689,7 +702,9 @@ const startServer = async () => {
}
// Filter based on language:
// @ts-expect-error
if ( Array . isArray ( req . chosenLanguages ) && req . chosenLanguages . indexOf ( payload . language ) === - 1 ) {
// @ts-expect-error
log . debug ( ` Message ${ payload . id } filtered by language ( ${ payload . language } ) ` ) ;
return ;
}
@ -701,8 +716,9 @@ const startServer = async () => {
}
// Filter based on domain blocks, blocks, mutes, or custom filters:
// @ts- ignore
// @ts- expect-error
const targetAccountIds = [ payload . account . id ] . concat ( payload . mentions . map ( item => item . id ) ) ;
// @ts-expect-error
const accountDomain = payload . account . acct . split ( '@' ) [ 1 ] ;
// TODO: Move this logic out of the message handling loop
@ -713,7 +729,7 @@ const startServer = async () => {
}
const queries = [
// @ts- ignore
// @ts- expect-error
client . query ( ` SELECT 1
FROM blocks
WHERE ( account _id = $1 AND target _account _id IN ( $ { placeholders ( targetAccountIds , 2 ) } ) )
@ -722,17 +738,19 @@ const startServer = async () => {
SELECT 1
FROM mutes
WHERE account _id = $1
AND target _account _id IN ( $ { placeholders ( targetAccountIds , 2 ) } ) ` , [req.accountId, payload.account.id].concat(targetAccountIds)),
AND target _account _id IN ( $ { placeholders ( targetAccountIds , 2 ) } ) ` , [req.accountId, payload.
// @ts-expect-error
account . id ] . concat ( targetAccountIds ) ) ,
] ;
if ( accountDomain ) {
// @ts- ignore
// @ts- expect-error
queries . push ( client . query ( 'SELECT 1 FROM account_domain_blocks WHERE account_id = $1 AND domain = $2' , [ req . accountId , accountDomain ] ) ) ;
}
// @ts- ignore
// @ts- expect-error
if ( ! payload . filtered && ! req . cachedFilters ) {
// @ts- ignore
// @ts- expect-error
queries . push ( client . query ( 'SELECT filter.id AS id, filter.phrase AS title, filter.context AS context, filter.expires_at AS expires_at, filter.action AS filter_action, keyword.keyword AS keyword, keyword.whole_word AS whole_word FROM custom_filter_keywords keyword JOIN custom_filters filter ON keyword.custom_filter_id = filter.id WHERE filter.account_id = $1 AND (filter.expires_at IS NULL OR filter.expires_at > NOW())' , [ req . accountId ] ) ) ;
}
@ -741,6 +759,7 @@ const startServer = async () => {
// Handling blocks & mutes and domain blocks: If one of those applies,
// then we don't transmit the payload of the event to the client
// @ts-expect-error
if ( values [ 0 ] . rows . length > 0 || ( accountDomain && values [ 1 ] . rows . length > 0 ) ) {
return ;
}
@ -757,9 +776,9 @@ const startServer = async () => {
// TODO: Move this logic out of the message handling lifecycle
// @ts-ignore
if ( ! req . cachedFilters ) {
// @ts-expect-error
const filterRows = values [ accountDomain ? 2 : 1 ] . rows ;
// @ts-ignore
req . cachedFilters = filterRows . reduce ( ( cache , filter ) => {
if ( cache [ filter . id ] ) {
cache [ filter . id ] . keywords . push ( [ filter . keyword , filter . whole _word ] ) ;
@ -789,9 +808,9 @@ const startServer = async () => {
// needs to be done in a separate loop as the database returns one
// filterRow per keyword, so we need all the keywords before
// constructing the regular expression
// @ts- ignore
// @ts- expect-error
Object . keys ( req . cachedFilters ) . forEach ( ( key ) => {
// @ts- ignore
// @ts- expect-error
req . cachedFilters [ key ] . regexp = new RegExp ( req . cachedFilters [ key ] . keywords . map ( ( [ keyword , whole _word ] ) => {
let expr = keyword . replace ( /[.*+?^${}()|[\]\\]/g , '\\$&' ) ;
@ -812,16 +831,14 @@ const startServer = async () => {
// Apply cachedFilters against the payload, constructing a
// `filter_results` array of FilterResult entities
// @ts-ignore
if ( req . cachedFilters ) {
const status = payload ;
// TODO: Calculate searchableContent in Ruby on Rails:
// @ts- ignore
// @ts- expect-error
const searchableContent = ( [ status . spoiler _text || '' , status . content ] . concat ( ( status . poll && status . poll . options ) ? status . poll . options . map ( option => option . title ) : [ ] ) ) . concat ( status . media _attachments . map ( att => att . description ) ) . join ( '\n\n' ) . replace ( /<br\s*\/?>/g , '\n' ) . replace ( /<\/p><p>/g , '\n\n' ) ;
const searchableTextContent = JSDOM . fragment ( searchableContent ) . textContent ;
const now = new Date ( ) ;
// @ts-ignore
const filter _results = Object . values ( req . cachedFilters ) . reduce ( ( results , cachedFilter ) => {
// Check the filter hasn't expired before applying:
if ( cachedFilter . expires _at !== null && cachedFilter . expires _at < now ) {
@ -881,8 +898,8 @@ const startServer = async () => {
} ;
/ * *
* @ param { any } req
* @ param { any } res
* @ param { Request } req
* @ param { http. ServerResponse } res
* @ returns { function ( string , string ) : void }
* /
const streamToHttp = ( req , res ) => {
@ -924,7 +941,7 @@ const startServer = async () => {
} ;
/ * *
* @ param { any } req
* @ param { Request } req
* @ param { function ( ) : void } [ closeHandler ]
* @ returns { function ( string [ ] , SubscriptionListener ) : void }
* /
@ -974,10 +991,13 @@ const startServer = async () => {
app . use ( api ) ;
// @ts-expect-error
api . use ( authenticationMiddleware ) ;
// @ts-expect-error
api . use ( errorMiddleware ) ;
api . get ( '/api/v1/streaming/*' , ( req , res ) => {
// @ts-expect-error
const channelName = channelNameFromPath ( req ) ;
// FIXME: In theory we'd never actually reach here due to
@ -988,8 +1008,11 @@ const startServer = async () => {
return ;
}
// @ts-expect-error
channelNameToIds ( req , channelName , req . query ) . then ( ( { channelIds , options } ) => {
// @ts-expect-error
const onSend = streamToHttp ( req , res ) ;
// @ts-expect-error
const onEnd = streamHttpEnd ( req , subscriptionHeartbeat ( channelIds ) ) ;
// @ts-ignore
@ -1012,7 +1035,7 @@ const startServer = async () => {
* /
/ * *
* @ param { any } req
* @ param { Request } req
* @ returns { string [ ] }
* /
const channelsForUserStream = req => {
@ -1026,7 +1049,7 @@ const startServer = async () => {
} ;
/ * *
* @ param { any } req
* @ param { Request } req
* @ param { string } name
* @ param { StreamParams } params
* @ returns { Promise . < { channelIds : string [ ] , options : { needsFiltering : boolean , filterLocal ? : boolean , filterRemote ? : boolean } } > }
@ -1145,7 +1168,7 @@ const startServer = async () => {
/ * *
* @ typedef WebSocketSession
* @ property { import ( 'ws' ) . WebSocket & { isAlive : boolean } } websocket
* @ property { http. IncomingMessage & ResolvedAccoun t} request
* @ property { Reques t} request
* @ property { import ( 'pino' ) . Logger } logger
* @ property { Object . < string , { channelName : string , listener : SubscriptionListener , stopHeartbeat : function ( ) : void } > } subscriptions
* /
@ -1271,7 +1294,7 @@ const startServer = async () => {
/ * *
* @ param { import ( 'ws' ) . WebSocket & { isAlive : boolean } } ws
* @ param { http. IncomingMessage & ResolvedAccoun t} req
* @ param { Reques t} req
* @ param { import ( 'pino' ) . Logger } log
* /
function onConnection ( ws , req , log ) {
@ -1338,9 +1361,19 @@ const startServer = async () => {
const { type , stream , ... params } = json ;
if ( type === 'subscribe' ) {
subscribeWebsocketToChannel ( session , firstParam ( stream ) , params ) ;
subscribeWebsocketToChannel (
session ,
// @ts-expect-error
firstParam ( stream ) ,
params
) ;
} else if ( type === 'unsubscribe' ) {
unsubscribeWebsocketFromChannel ( session , firstParam ( stream ) , params ) ;
unsubscribeWebsocketFromChannel (
session ,
// @ts-expect-error
firstParam ( stream ) ,
params
) ;
} else {
// Unknown action type
}
@ -1360,13 +1393,13 @@ const startServer = async () => {
setInterval ( ( ) => {
wss . clients . forEach ( ws => {
// @ts- ignore
// @ts- expect-error
if ( ws . isAlive === false ) {
ws . terminate ( ) ;
return ;
}
// @ts- ignore
// @ts- expect-error
ws . isAlive = false ;
ws . ping ( '' , false ) ;
} ) ;
@ -1396,14 +1429,16 @@ const startServer = async () => {
} ;
/ * *
* @ param { any } server
* @ param { http. Server } server
* @ param { function ( string ) : void } [ onSuccess ]
* /
const attachServerWithConfig = ( server , onSuccess ) => {
if ( process . env . SOCKET ) {
server . listen ( process . env . SOCKET , ( ) => {
if ( onSuccess ) {
// @ts-expect-error
fs . chmodSync ( server . address ( ) , 0o666 ) ;
// @ts-expect-error
onSuccess ( server . address ( ) ) ;
}
} ) ;
@ -1418,6 +1453,7 @@ const attachServerWithConfig = (server, onSuccess) => {
server . listen ( port , bind , ( ) => {
if ( onSuccess ) {
// @ts-expect-error
onSuccess ( ` ${ server . address ( ) . address } : ${ server . address ( ) . port } ` ) ;
}
} ) ;