Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F101440518
websocket-server.js
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Mon, Feb 10, 12:24
Size
15 KB
Mime Type
text/x-c++
Expires
Wed, Feb 12, 12:24 (2 d)
Engine
blob
Format
Raw Data
Handle
24147329
Attached To
rOACCT Open Access Compliance Check Tool (OACCT)
websocket-server.js
View Options
/* eslint no-unused-vars: ["error", { "varsIgnorePattern": "^net|tls|https$" }] */
'use strict'
;
const
EventEmitter
=
require
(
'events'
);
const
http
=
require
(
'http'
);
const
https
=
require
(
'https'
);
const
net
=
require
(
'net'
);
const
tls
=
require
(
'tls'
);
const
{
createHash
}
=
require
(
'crypto'
);
const
extension
=
require
(
'./extension'
);
const
PerMessageDeflate
=
require
(
'./permessage-deflate'
);
const
subprotocol
=
require
(
'./subprotocol'
);
const
WebSocket
=
require
(
'./websocket'
);
const
{
GUID
,
kWebSocket
}
=
require
(
'./constants'
);
const
keyRegex
=
/^[+/0-9A-Za-z]{22}==$/
;
const
RUNNING
=
0
;
const
CLOSING
=
1
;
const
CLOSED
=
2
;
/**
* Class representing a WebSocket server.
*
* @extends EventEmitter
*/
class
WebSocketServer
extends
EventEmitter
{
/**
* Create a `WebSocketServer` instance.
*
* @param {Object} options Configuration options
* @param {Number} [options.backlog=511] The maximum length of the queue of
* pending connections
* @param {Boolean} [options.clientTracking=true] Specifies whether or not to
* track clients
* @param {Function} [options.handleProtocols] A hook to handle protocols
* @param {String} [options.host] The hostname where to bind the server
* @param {Number} [options.maxPayload=104857600] The maximum allowed message
* size
* @param {Boolean} [options.noServer=false] Enable no server mode
* @param {String} [options.path] Accept only connections matching this path
* @param {(Boolean|Object)} [options.perMessageDeflate=false] Enable/disable
* permessage-deflate
* @param {Number} [options.port] The port where to bind the server
* @param {(http.Server|https.Server)} [options.server] A pre-created HTTP/S
* server to use
* @param {Boolean} [options.skipUTF8Validation=false] Specifies whether or
* not to skip UTF-8 validation for text and close messages
* @param {Function} [options.verifyClient] A hook to reject connections
* @param {Function} [options.WebSocket=WebSocket] Specifies the `WebSocket`
* class to use. It must be the `WebSocket` class or class that extends it
* @param {Function} [callback] A listener for the `listening` event
*/
constructor
(
options
,
callback
)
{
super
();
options
=
{
maxPayload
:
100
*
1024
*
1024
,
skipUTF8Validation
:
false
,
perMessageDeflate
:
false
,
handleProtocols
:
null
,
clientTracking
:
true
,
verifyClient
:
null
,
noServer
:
false
,
backlog
:
null
,
// use default (511 as implemented in net.js)
server
:
null
,
host
:
null
,
path
:
null
,
port
:
null
,
WebSocket
,
...
options
};
if
(
(
options
.
port
==
null
&&
!
options
.
server
&&
!
options
.
noServer
)
||
(
options
.
port
!=
null
&&
(
options
.
server
||
options
.
noServer
))
||
(
options
.
server
&&
options
.
noServer
)
)
{
throw
new
TypeError
(
'One and only one of the "port", "server", or "noServer" options '
+
'must be specified'
);
}
if
(
options
.
port
!=
null
)
{
this
.
_server
=
http
.
createServer
((
req
,
res
)
=>
{
const
body
=
http
.
STATUS_CODES
[
426
];
res
.
writeHead
(
426
,
{
'Content-Length'
:
body
.
length
,
'Content-Type'
:
'text/plain'
});
res
.
end
(
body
);
});
this
.
_server
.
listen
(
options
.
port
,
options
.
host
,
options
.
backlog
,
callback
);
}
else
if
(
options
.
server
)
{
this
.
_server
=
options
.
server
;
}
if
(
this
.
_server
)
{
const
emitConnection
=
this
.
emit
.
bind
(
this
,
'connection'
);
this
.
_removeListeners
=
addListeners
(
this
.
_server
,
{
listening
:
this
.
emit
.
bind
(
this
,
'listening'
),
error
:
this
.
emit
.
bind
(
this
,
'error'
),
upgrade
:
(
req
,
socket
,
head
)
=>
{
this
.
handleUpgrade
(
req
,
socket
,
head
,
emitConnection
);
}
});
}
if
(
options
.
perMessageDeflate
===
true
)
options
.
perMessageDeflate
=
{};
if
(
options
.
clientTracking
)
{
this
.
clients
=
new
Set
();
this
.
_shouldEmitClose
=
false
;
}
this
.
options
=
options
;
this
.
_state
=
RUNNING
;
}
/**
* Returns the bound address, the address family name, and port of the server
* as reported by the operating system if listening on an IP socket.
* If the server is listening on a pipe or UNIX domain socket, the name is
* returned as a string.
*
* @return {(Object|String|null)} The address of the server
* @public
*/
address
()
{
if
(
this
.
options
.
noServer
)
{
throw
new
Error
(
'The server is operating in "noServer" mode'
);
}
if
(
!
this
.
_server
)
return
null
;
return
this
.
_server
.
address
();
}
/**
* Stop the server from accepting new connections and emit the `'close'` event
* when all existing connections are closed.
*
* @param {Function} [cb] A one-time listener for the `'close'` event
* @public
*/
close
(
cb
)
{
if
(
this
.
_state
===
CLOSED
)
{
if
(
cb
)
{
this
.
once
(
'close'
,
()
=>
{
cb
(
new
Error
(
'The server is not running'
));
});
}
process
.
nextTick
(
emitClose
,
this
);
return
;
}
if
(
cb
)
this
.
once
(
'close'
,
cb
);
if
(
this
.
_state
===
CLOSING
)
return
;
this
.
_state
=
CLOSING
;
if
(
this
.
options
.
noServer
||
this
.
options
.
server
)
{
if
(
this
.
_server
)
{
this
.
_removeListeners
();
this
.
_removeListeners
=
this
.
_server
=
null
;
}
if
(
this
.
clients
)
{
if
(
!
this
.
clients
.
size
)
{
process
.
nextTick
(
emitClose
,
this
);
}
else
{
this
.
_shouldEmitClose
=
true
;
}
}
else
{
process
.
nextTick
(
emitClose
,
this
);
}
}
else
{
const
server
=
this
.
_server
;
this
.
_removeListeners
();
this
.
_removeListeners
=
this
.
_server
=
null
;
//
// The HTTP/S server was created internally. Close it, and rely on its
// `'close'` event.
//
server
.
close
(()
=>
{
emitClose
(
this
);
});
}
}
/**
* See if a given request should be handled by this server instance.
*
* @param {http.IncomingMessage} req Request object to inspect
* @return {Boolean} `true` if the request is valid, else `false`
* @public
*/
shouldHandle
(
req
)
{
if
(
this
.
options
.
path
)
{
const
index
=
req
.
url
.
indexOf
(
'?'
);
const
pathname
=
index
!==
-
1
?
req
.
url
.
slice
(
0
,
index
)
:
req
.
url
;
if
(
pathname
!==
this
.
options
.
path
)
return
false
;
}
return
true
;
}
/**
* Handle a HTTP Upgrade request.
*
* @param {http.IncomingMessage} req The request object
* @param {(net.Socket|tls.Socket)} socket The network socket between the
* server and client
* @param {Buffer} head The first packet of the upgraded stream
* @param {Function} cb Callback
* @public
*/
handleUpgrade
(
req
,
socket
,
head
,
cb
)
{
socket
.
on
(
'error'
,
socketOnError
);
const
key
=
req
.
headers
[
'sec-websocket-key'
];
const
version
=
+
req
.
headers
[
'sec-websocket-version'
];
if
(
req
.
method
!==
'GET'
)
{
const
message
=
'Invalid HTTP method'
;
abortHandshakeOrEmitwsClientError
(
this
,
req
,
socket
,
405
,
message
);
return
;
}
if
(
req
.
headers
.
upgrade
.
toLowerCase
()
!==
'websocket'
)
{
const
message
=
'Invalid Upgrade header'
;
abortHandshakeOrEmitwsClientError
(
this
,
req
,
socket
,
400
,
message
);
return
;
}
if
(
!
key
||
!
keyRegex
.
test
(
key
))
{
const
message
=
'Missing or invalid Sec-WebSocket-Key header'
;
abortHandshakeOrEmitwsClientError
(
this
,
req
,
socket
,
400
,
message
);
return
;
}
if
(
version
!==
8
&&
version
!==
13
)
{
const
message
=
'Missing or invalid Sec-WebSocket-Version header'
;
abortHandshakeOrEmitwsClientError
(
this
,
req
,
socket
,
400
,
message
);
return
;
}
if
(
!
this
.
shouldHandle
(
req
))
{
abortHandshake
(
socket
,
400
);
return
;
}
const
secWebSocketProtocol
=
req
.
headers
[
'sec-websocket-protocol'
];
let
protocols
=
new
Set
();
if
(
secWebSocketProtocol
!==
undefined
)
{
try
{
protocols
=
subprotocol
.
parse
(
secWebSocketProtocol
);
}
catch
(
err
)
{
const
message
=
'Invalid Sec-WebSocket-Protocol header'
;
abortHandshakeOrEmitwsClientError
(
this
,
req
,
socket
,
400
,
message
);
return
;
}
}
const
secWebSocketExtensions
=
req
.
headers
[
'sec-websocket-extensions'
];
const
extensions
=
{};
if
(
this
.
options
.
perMessageDeflate
&&
secWebSocketExtensions
!==
undefined
)
{
const
perMessageDeflate
=
new
PerMessageDeflate
(
this
.
options
.
perMessageDeflate
,
true
,
this
.
options
.
maxPayload
);
try
{
const
offers
=
extension
.
parse
(
secWebSocketExtensions
);
if
(
offers
[
PerMessageDeflate
.
extensionName
])
{
perMessageDeflate
.
accept
(
offers
[
PerMessageDeflate
.
extensionName
]);
extensions
[
PerMessageDeflate
.
extensionName
]
=
perMessageDeflate
;
}
}
catch
(
err
)
{
const
message
=
'Invalid or unacceptable Sec-WebSocket-Extensions header'
;
abortHandshakeOrEmitwsClientError
(
this
,
req
,
socket
,
400
,
message
);
return
;
}
}
//
// Optionally call external client verification handler.
//
if
(
this
.
options
.
verifyClient
)
{
const
info
=
{
origin
:
req
.
headers
[
`
$
{
version
===
8
?
'sec-websocket-origin'
:
'origin'
}
`
],
secure
:
!!
(
req
.
socket
.
authorized
||
req
.
socket
.
encrypted
),
req
};
if
(
this
.
options
.
verifyClient
.
length
===
2
)
{
this
.
options
.
verifyClient
(
info
,
(
verified
,
code
,
message
,
headers
)
=>
{
if
(
!
verified
)
{
return
abortHandshake
(
socket
,
code
||
401
,
message
,
headers
);
}
this
.
completeUpgrade
(
extensions
,
key
,
protocols
,
req
,
socket
,
head
,
cb
);
});
return
;
}
if
(
!
this
.
options
.
verifyClient
(
info
))
return
abortHandshake
(
socket
,
401
);
}
this
.
completeUpgrade
(
extensions
,
key
,
protocols
,
req
,
socket
,
head
,
cb
);
}
/**
* Upgrade the connection to WebSocket.
*
* @param {Object} extensions The accepted extensions
* @param {String} key The value of the `Sec-WebSocket-Key` header
* @param {Set} protocols The subprotocols
* @param {http.IncomingMessage} req The request object
* @param {(net.Socket|tls.Socket)} socket The network socket between the
* server and client
* @param {Buffer} head The first packet of the upgraded stream
* @param {Function} cb Callback
* @throws {Error} If called more than once with the same socket
* @private
*/
completeUpgrade
(
extensions
,
key
,
protocols
,
req
,
socket
,
head
,
cb
)
{
//
// Destroy the socket if the client has already sent a FIN packet.
//
if
(
!
socket
.
readable
||
!
socket
.
writable
)
return
socket
.
destroy
();
if
(
socket
[
kWebSocket
])
{
throw
new
Error
(
'server.handleUpgrade() was called more than once with the same '
+
'socket, possibly due to a misconfiguration'
);
}
if
(
this
.
_state
>
RUNNING
)
return
abortHandshake
(
socket
,
503
);
const
digest
=
createHash
(
'sha1'
)
.
update
(
key
+
GUID
)
.
digest
(
'base64'
);
const
headers
=
[
'HTTP/1.1 101 Switching Protocols'
,
'Upgrade: websocket'
,
'Connection: Upgrade'
,
`
Sec
-
WebSocket
-
Accept
:
$
{
digest
}
`
];
const
ws
=
new
this
.
options
.
WebSocket
(
null
);
if
(
protocols
.
size
)
{
//
// Optionally call external protocol selection handler.
//
const
protocol
=
this
.
options
.
handleProtocols
?
this
.
options
.
handleProtocols
(
protocols
,
req
)
:
protocols
.
values
().
next
().
value
;
if
(
protocol
)
{
headers
.
push
(
`
Sec
-
WebSocket
-
Protocol
:
$
{
protocol
}
`
);
ws
.
_protocol
=
protocol
;
}
}
if
(
extensions
[
PerMessageDeflate
.
extensionName
])
{
const
params
=
extensions
[
PerMessageDeflate
.
extensionName
].
params
;
const
value
=
extension
.
format
({
[
PerMessageDeflate
.
extensionName
]
:
[
params
]
});
headers
.
push
(
`
Sec
-
WebSocket
-
Extensions
:
$
{
value
}
`
);
ws
.
_extensions
=
extensions
;
}
//
// Allow external modification/inspection of handshake headers.
//
this
.
emit
(
'headers'
,
headers
,
req
);
socket
.
write
(
headers
.
concat
(
'\r\n'
).
join
(
'\r\n'
));
socket
.
removeListener
(
'error'
,
socketOnError
);
ws
.
setSocket
(
socket
,
head
,
{
maxPayload
:
this
.
options
.
maxPayload
,
skipUTF8Validation
:
this
.
options
.
skipUTF8Validation
});
if
(
this
.
clients
)
{
this
.
clients
.
add
(
ws
);
ws
.
on
(
'close'
,
()
=>
{
this
.
clients
.
delete
(
ws
);
if
(
this
.
_shouldEmitClose
&&
!
this
.
clients
.
size
)
{
process
.
nextTick
(
emitClose
,
this
);
}
});
}
cb
(
ws
,
req
);
}
}
module
.
exports
=
WebSocketServer
;
/**
* Add event listeners on an `EventEmitter` using a map of <event, listener>
* pairs.
*
* @param {EventEmitter} server The event emitter
* @param {Object.<String, Function>} map The listeners to add
* @return {Function} A function that will remove the added listeners when
* called
* @private
*/
function
addListeners
(
server
,
map
)
{
for
(
const
event
of
Object
.
keys
(
map
))
server
.
on
(
event
,
map
[
event
]);
return
function
removeListeners
()
{
for
(
const
event
of
Object
.
keys
(
map
))
{
server
.
removeListener
(
event
,
map
[
event
]);
}
};
}
/**
* Emit a `'close'` event on an `EventEmitter`.
*
* @param {EventEmitter} server The event emitter
* @private
*/
function
emitClose
(
server
)
{
server
.
_state
=
CLOSED
;
server
.
emit
(
'close'
);
}
/**
* Handle socket errors.
*
* @private
*/
function
socketOnError
()
{
this
.
destroy
();
}
/**
* Close the connection when preconditions are not fulfilled.
*
* @param {(net.Socket|tls.Socket)} socket The socket of the upgrade request
* @param {Number} code The HTTP response status code
* @param {String} [message] The HTTP response body
* @param {Object} [headers] Additional HTTP response headers
* @private
*/
function
abortHandshake
(
socket
,
code
,
message
,
headers
)
{
//
// The socket is writable unless the user destroyed or ended it before calling
// `server.handleUpgrade()` or in the `verifyClient` function, which is a user
// error. Handling this does not make much sense as the worst that can happen
// is that some of the data written by the user might be discarded due to the
// call to `socket.end()` below, which triggers an `'error'` event that in
// turn causes the socket to be destroyed.
//
message
=
message
||
http
.
STATUS_CODES
[
code
];
headers
=
{
Connection
:
'close'
,
'Content-Type'
:
'text/html'
,
'Content-Length'
:
Buffer
.
byteLength
(
message
),
...
headers
};
socket
.
once
(
'finish'
,
socket
.
destroy
);
socket
.
end
(
`
HTTP
/
1.1
$
{
code
}
$
{
http
.
STATUS_CODES
[
code
]}
\
r
\
n
`
+
Object
.
keys
(
headers
)
.
map
((
h
)
=>
`
$
{
h
}
:
$
{
headers
[
h
]}
`
)
.
join
(
'\r\n'
)
+
'\r\n\r\n'
+
message
);
}
/**
* Emit a `'wsClientError'` event on a `WebSocketServer` if there is at least
* one listener for it, otherwise call `abortHandshake()`.
*
* @param {WebSocketServer} server The WebSocket server
* @param {http.IncomingMessage} req The request object
* @param {(net.Socket|tls.Socket)} socket The socket of the upgrade request
* @param {Number} code The HTTP response status code
* @param {String} message The HTTP response body
* @private
*/
function
abortHandshakeOrEmitwsClientError
(
server
,
req
,
socket
,
code
,
message
)
{
if
(
server
.
listenerCount
(
'wsClientError'
))
{
const
err
=
new
Error
(
message
);
Error
.
captureStackTrace
(
err
,
abortHandshakeOrEmitwsClientError
);
server
.
emit
(
'wsClientError'
,
err
,
socket
,
req
);
}
else
{
abortHandshake
(
socket
,
code
,
message
);
}
}
Event Timeline
Log In to Comment