Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F121634060
websocket.js
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Sat, Jul 12, 16:29
Size
33 KB
Mime Type
text/x-c++
Expires
Mon, Jul 14, 16:29 (2 d)
Engine
blob
Format
Raw Data
Handle
27363542
Attached To
rOACCT Open Access Compliance Check Tool (OACCT)
websocket.js
View Options
/* eslint no-unused-vars: ["error", { "varsIgnorePattern": "^Readable$" }] */
'use strict'
;
const
EventEmitter
=
require
(
'events'
);
const
https
=
require
(
'https'
);
const
http
=
require
(
'http'
);
const
net
=
require
(
'net'
);
const
tls
=
require
(
'tls'
);
const
{
randomBytes
,
createHash
}
=
require
(
'crypto'
);
const
{
Readable
}
=
require
(
'stream'
);
const
{
URL
}
=
require
(
'url'
);
const
PerMessageDeflate
=
require
(
'./permessage-deflate'
);
const
Receiver
=
require
(
'./receiver'
);
const
Sender
=
require
(
'./sender'
);
const
{
BINARY_TYPES
,
EMPTY_BUFFER
,
GUID
,
kForOnEventAttribute
,
kListener
,
kStatusCode
,
kWebSocket
,
NOOP
}
=
require
(
'./constants'
);
const
{
EventTarget
:
{
addEventListener
,
removeEventListener
}
}
=
require
(
'./event-target'
);
const
{
format
,
parse
}
=
require
(
'./extension'
);
const
{
toBuffer
}
=
require
(
'./buffer-util'
);
const
closeTimeout
=
30
*
1000
;
const
kAborted
=
Symbol
(
'kAborted'
);
const
protocolVersions
=
[
8
,
13
];
const
readyStates
=
[
'CONNECTING'
,
'OPEN'
,
'CLOSING'
,
'CLOSED'
];
const
subprotocolRegex
=
/^[!#$%&'*+\-.0-9A-Z^_`|a-z~]+$/
;
/**
* Class representing a WebSocket.
*
* @extends EventEmitter
*/
class
WebSocket
extends
EventEmitter
{
/**
* Create a new `WebSocket`.
*
* @param {(String|URL)} address The URL to which to connect
* @param {(String|String[])} [protocols] The subprotocols
* @param {Object} [options] Connection options
*/
constructor
(
address
,
protocols
,
options
)
{
super
();
this
.
_binaryType
=
BINARY_TYPES
[
0
];
this
.
_closeCode
=
1006
;
this
.
_closeFrameReceived
=
false
;
this
.
_closeFrameSent
=
false
;
this
.
_closeMessage
=
EMPTY_BUFFER
;
this
.
_closeTimer
=
null
;
this
.
_extensions
=
{};
this
.
_paused
=
false
;
this
.
_protocol
=
''
;
this
.
_readyState
=
WebSocket
.
CONNECTING
;
this
.
_receiver
=
null
;
this
.
_sender
=
null
;
this
.
_socket
=
null
;
if
(
address
!==
null
)
{
this
.
_bufferedAmount
=
0
;
this
.
_isServer
=
false
;
this
.
_redirects
=
0
;
if
(
protocols
===
undefined
)
{
protocols
=
[];
}
else
if
(
!
Array
.
isArray
(
protocols
))
{
if
(
typeof
protocols
===
'object'
&&
protocols
!==
null
)
{
options
=
protocols
;
protocols
=
[];
}
else
{
protocols
=
[
protocols
];
}
}
initAsClient
(
this
,
address
,
protocols
,
options
);
}
else
{
this
.
_isServer
=
true
;
}
}
/**
* This deviates from the WHATWG interface since ws doesn't support the
* required default "blob" type (instead we define a custom "nodebuffer"
* type).
*
* @type {String}
*/
get
binaryType
()
{
return
this
.
_binaryType
;
}
set
binaryType
(
type
)
{
if
(
!
BINARY_TYPES
.
includes
(
type
))
return
;
this
.
_binaryType
=
type
;
//
// Allow to change `binaryType` on the fly.
//
if
(
this
.
_receiver
)
this
.
_receiver
.
_binaryType
=
type
;
}
/**
* @type {Number}
*/
get
bufferedAmount
()
{
if
(
!
this
.
_socket
)
return
this
.
_bufferedAmount
;
return
this
.
_socket
.
_writableState
.
length
+
this
.
_sender
.
_bufferedBytes
;
}
/**
* @type {String}
*/
get
extensions
()
{
return
Object
.
keys
(
this
.
_extensions
).
join
();
}
/**
* @type {Boolean}
*/
get
isPaused
()
{
return
this
.
_paused
;
}
/**
* @type {Function}
*/
/* istanbul ignore next */
get
onclose
()
{
return
null
;
}
/**
* @type {Function}
*/
/* istanbul ignore next */
get
onerror
()
{
return
null
;
}
/**
* @type {Function}
*/
/* istanbul ignore next */
get
onopen
()
{
return
null
;
}
/**
* @type {Function}
*/
/* istanbul ignore next */
get
onmessage
()
{
return
null
;
}
/**
* @type {String}
*/
get
protocol
()
{
return
this
.
_protocol
;
}
/**
* @type {Number}
*/
get
readyState
()
{
return
this
.
_readyState
;
}
/**
* @type {String}
*/
get
url
()
{
return
this
.
_url
;
}
/**
* Set up the socket and the internal resources.
*
* @param {(net.Socket|tls.Socket)} socket The network socket between the
* server and client
* @param {Buffer} head The first packet of the upgraded stream
* @param {Object} options Options object
* @param {Function} [options.generateMask] The function used to generate the
* masking key
* @param {Number} [options.maxPayload=0] The maximum allowed message size
* @param {Boolean} [options.skipUTF8Validation=false] Specifies whether or
* not to skip UTF-8 validation for text and close messages
* @private
*/
setSocket
(
socket
,
head
,
options
)
{
const
receiver
=
new
Receiver
({
binaryType
:
this
.
binaryType
,
extensions
:
this
.
_extensions
,
isServer
:
this
.
_isServer
,
maxPayload
:
options
.
maxPayload
,
skipUTF8Validation
:
options
.
skipUTF8Validation
});
this
.
_sender
=
new
Sender
(
socket
,
this
.
_extensions
,
options
.
generateMask
);
this
.
_receiver
=
receiver
;
this
.
_socket
=
socket
;
receiver
[
kWebSocket
]
=
this
;
socket
[
kWebSocket
]
=
this
;
receiver
.
on
(
'conclude'
,
receiverOnConclude
);
receiver
.
on
(
'drain'
,
receiverOnDrain
);
receiver
.
on
(
'error'
,
receiverOnError
);
receiver
.
on
(
'message'
,
receiverOnMessage
);
receiver
.
on
(
'ping'
,
receiverOnPing
);
receiver
.
on
(
'pong'
,
receiverOnPong
);
socket
.
setTimeout
(
0
);
socket
.
setNoDelay
();
if
(
head
.
length
>
0
)
socket
.
unshift
(
head
);
socket
.
on
(
'close'
,
socketOnClose
);
socket
.
on
(
'data'
,
socketOnData
);
socket
.
on
(
'end'
,
socketOnEnd
);
socket
.
on
(
'error'
,
socketOnError
);
this
.
_readyState
=
WebSocket
.
OPEN
;
this
.
emit
(
'open'
);
}
/**
* Emit the `'close'` event.
*
* @private
*/
emitClose
()
{
if
(
!
this
.
_socket
)
{
this
.
_readyState
=
WebSocket
.
CLOSED
;
this
.
emit
(
'close'
,
this
.
_closeCode
,
this
.
_closeMessage
);
return
;
}
if
(
this
.
_extensions
[
PerMessageDeflate
.
extensionName
])
{
this
.
_extensions
[
PerMessageDeflate
.
extensionName
].
cleanup
();
}
this
.
_receiver
.
removeAllListeners
();
this
.
_readyState
=
WebSocket
.
CLOSED
;
this
.
emit
(
'close'
,
this
.
_closeCode
,
this
.
_closeMessage
);
}
/**
* Start a closing handshake.
*
* +----------+ +-----------+ +----------+
* - - -|ws.close()|-->|close frame|-->|ws.close()|- - -
* | +----------+ +-----------+ +----------+ |
* +----------+ +-----------+ |
* CLOSING |ws.close()|<--|close frame|<--+-----+ CLOSING
* +----------+ +-----------+ |
* | | | +---+ |
* +------------------------+-->|fin| - - - -
* | +---+ | +---+
* - - - - -|fin|<---------------------+
* +---+
*
* @param {Number} [code] Status code explaining why the connection is closing
* @param {(String|Buffer)} [data] The reason why the connection is
* closing
* @public
*/
close
(
code
,
data
)
{
if
(
this
.
readyState
===
WebSocket
.
CLOSED
)
return
;
if
(
this
.
readyState
===
WebSocket
.
CONNECTING
)
{
const
msg
=
'WebSocket was closed before the connection was established'
;
abortHandshake
(
this
,
this
.
_req
,
msg
);
return
;
}
if
(
this
.
readyState
===
WebSocket
.
CLOSING
)
{
if
(
this
.
_closeFrameSent
&&
(
this
.
_closeFrameReceived
||
this
.
_receiver
.
_writableState
.
errorEmitted
)
)
{
this
.
_socket
.
end
();
}
return
;
}
this
.
_readyState
=
WebSocket
.
CLOSING
;
this
.
_sender
.
close
(
code
,
data
,
!
this
.
_isServer
,
(
err
)
=>
{
//
// This error is handled by the `'error'` listener on the socket. We only
// want to know if the close frame has been sent here.
//
if
(
err
)
return
;
this
.
_closeFrameSent
=
true
;
if
(
this
.
_closeFrameReceived
||
this
.
_receiver
.
_writableState
.
errorEmitted
)
{
this
.
_socket
.
end
();
}
});
//
// Specify a timeout for the closing handshake to complete.
//
this
.
_closeTimer
=
setTimeout
(
this
.
_socket
.
destroy
.
bind
(
this
.
_socket
),
closeTimeout
);
}
/**
* Pause the socket.
*
* @public
*/
pause
()
{
if
(
this
.
readyState
===
WebSocket
.
CONNECTING
||
this
.
readyState
===
WebSocket
.
CLOSED
)
{
return
;
}
this
.
_paused
=
true
;
this
.
_socket
.
pause
();
}
/**
* Send a ping.
*
* @param {*} [data] The data to send
* @param {Boolean} [mask] Indicates whether or not to mask `data`
* @param {Function} [cb] Callback which is executed when the ping is sent
* @public
*/
ping
(
data
,
mask
,
cb
)
{
if
(
this
.
readyState
===
WebSocket
.
CONNECTING
)
{
throw
new
Error
(
'WebSocket is not open: readyState 0 (CONNECTING)'
);
}
if
(
typeof
data
===
'function'
)
{
cb
=
data
;
data
=
mask
=
undefined
;
}
else
if
(
typeof
mask
===
'function'
)
{
cb
=
mask
;
mask
=
undefined
;
}
if
(
typeof
data
===
'number'
)
data
=
data
.
toString
();
if
(
this
.
readyState
!==
WebSocket
.
OPEN
)
{
sendAfterClose
(
this
,
data
,
cb
);
return
;
}
if
(
mask
===
undefined
)
mask
=
!
this
.
_isServer
;
this
.
_sender
.
ping
(
data
||
EMPTY_BUFFER
,
mask
,
cb
);
}
/**
* Send a pong.
*
* @param {*} [data] The data to send
* @param {Boolean} [mask] Indicates whether or not to mask `data`
* @param {Function} [cb] Callback which is executed when the pong is sent
* @public
*/
pong
(
data
,
mask
,
cb
)
{
if
(
this
.
readyState
===
WebSocket
.
CONNECTING
)
{
throw
new
Error
(
'WebSocket is not open: readyState 0 (CONNECTING)'
);
}
if
(
typeof
data
===
'function'
)
{
cb
=
data
;
data
=
mask
=
undefined
;
}
else
if
(
typeof
mask
===
'function'
)
{
cb
=
mask
;
mask
=
undefined
;
}
if
(
typeof
data
===
'number'
)
data
=
data
.
toString
();
if
(
this
.
readyState
!==
WebSocket
.
OPEN
)
{
sendAfterClose
(
this
,
data
,
cb
);
return
;
}
if
(
mask
===
undefined
)
mask
=
!
this
.
_isServer
;
this
.
_sender
.
pong
(
data
||
EMPTY_BUFFER
,
mask
,
cb
);
}
/**
* Resume the socket.
*
* @public
*/
resume
()
{
if
(
this
.
readyState
===
WebSocket
.
CONNECTING
||
this
.
readyState
===
WebSocket
.
CLOSED
)
{
return
;
}
this
.
_paused
=
false
;
if
(
!
this
.
_receiver
.
_writableState
.
needDrain
)
this
.
_socket
.
resume
();
}
/**
* Send a data message.
*
* @param {*} data The message to send
* @param {Object} [options] Options object
* @param {Boolean} [options.binary] Specifies whether `data` is binary or
* text
* @param {Boolean} [options.compress] Specifies whether or not to compress
* `data`
* @param {Boolean} [options.fin=true] Specifies whether the fragment is the
* last one
* @param {Boolean} [options.mask] Specifies whether or not to mask `data`
* @param {Function} [cb] Callback which is executed when data is written out
* @public
*/
send
(
data
,
options
,
cb
)
{
if
(
this
.
readyState
===
WebSocket
.
CONNECTING
)
{
throw
new
Error
(
'WebSocket is not open: readyState 0 (CONNECTING)'
);
}
if
(
typeof
options
===
'function'
)
{
cb
=
options
;
options
=
{};
}
if
(
typeof
data
===
'number'
)
data
=
data
.
toString
();
if
(
this
.
readyState
!==
WebSocket
.
OPEN
)
{
sendAfterClose
(
this
,
data
,
cb
);
return
;
}
const
opts
=
{
binary
:
typeof
data
!==
'string'
,
mask
:
!
this
.
_isServer
,
compress
:
true
,
fin
:
true
,
...
options
};
if
(
!
this
.
_extensions
[
PerMessageDeflate
.
extensionName
])
{
opts
.
compress
=
false
;
}
this
.
_sender
.
send
(
data
||
EMPTY_BUFFER
,
opts
,
cb
);
}
/**
* Forcibly close the connection.
*
* @public
*/
terminate
()
{
if
(
this
.
readyState
===
WebSocket
.
CLOSED
)
return
;
if
(
this
.
readyState
===
WebSocket
.
CONNECTING
)
{
const
msg
=
'WebSocket was closed before the connection was established'
;
abortHandshake
(
this
,
this
.
_req
,
msg
);
return
;
}
if
(
this
.
_socket
)
{
this
.
_readyState
=
WebSocket
.
CLOSING
;
this
.
_socket
.
destroy
();
}
}
}
/**
* @constant {Number} CONNECTING
* @memberof WebSocket
*/
Object
.
defineProperty
(
WebSocket
,
'CONNECTING'
,
{
enumerable
:
true
,
value
:
readyStates
.
indexOf
(
'CONNECTING'
)
});
/**
* @constant {Number} CONNECTING
* @memberof WebSocket.prototype
*/
Object
.
defineProperty
(
WebSocket
.
prototype
,
'CONNECTING'
,
{
enumerable
:
true
,
value
:
readyStates
.
indexOf
(
'CONNECTING'
)
});
/**
* @constant {Number} OPEN
* @memberof WebSocket
*/
Object
.
defineProperty
(
WebSocket
,
'OPEN'
,
{
enumerable
:
true
,
value
:
readyStates
.
indexOf
(
'OPEN'
)
});
/**
* @constant {Number} OPEN
* @memberof WebSocket.prototype
*/
Object
.
defineProperty
(
WebSocket
.
prototype
,
'OPEN'
,
{
enumerable
:
true
,
value
:
readyStates
.
indexOf
(
'OPEN'
)
});
/**
* @constant {Number} CLOSING
* @memberof WebSocket
*/
Object
.
defineProperty
(
WebSocket
,
'CLOSING'
,
{
enumerable
:
true
,
value
:
readyStates
.
indexOf
(
'CLOSING'
)
});
/**
* @constant {Number} CLOSING
* @memberof WebSocket.prototype
*/
Object
.
defineProperty
(
WebSocket
.
prototype
,
'CLOSING'
,
{
enumerable
:
true
,
value
:
readyStates
.
indexOf
(
'CLOSING'
)
});
/**
* @constant {Number} CLOSED
* @memberof WebSocket
*/
Object
.
defineProperty
(
WebSocket
,
'CLOSED'
,
{
enumerable
:
true
,
value
:
readyStates
.
indexOf
(
'CLOSED'
)
});
/**
* @constant {Number} CLOSED
* @memberof WebSocket.prototype
*/
Object
.
defineProperty
(
WebSocket
.
prototype
,
'CLOSED'
,
{
enumerable
:
true
,
value
:
readyStates
.
indexOf
(
'CLOSED'
)
});
[
'binaryType'
,
'bufferedAmount'
,
'extensions'
,
'isPaused'
,
'protocol'
,
'readyState'
,
'url'
].
forEach
((
property
)
=>
{
Object
.
defineProperty
(
WebSocket
.
prototype
,
property
,
{
enumerable
:
true
});
});
//
// Add the `onopen`, `onerror`, `onclose`, and `onmessage` attributes.
// See https://html.spec.whatwg.org/multipage/comms.html#the-websocket-interface
//
[
'open'
,
'error'
,
'close'
,
'message'
].
forEach
((
method
)
=>
{
Object
.
defineProperty
(
WebSocket
.
prototype
,
`
on$
{
method
}
`
,
{
enumerable
:
true
,
get
()
{
for
(
const
listener
of
this
.
listeners
(
method
))
{
if
(
listener
[
kForOnEventAttribute
])
return
listener
[
kListener
];
}
return
null
;
},
set
(
handler
)
{
for
(
const
listener
of
this
.
listeners
(
method
))
{
if
(
listener
[
kForOnEventAttribute
])
{
this
.
removeListener
(
method
,
listener
);
break
;
}
}
if
(
typeof
handler
!==
'function'
)
return
;
this
.
addEventListener
(
method
,
handler
,
{
[
kForOnEventAttribute
]
:
true
});
}
});
});
WebSocket
.
prototype
.
addEventListener
=
addEventListener
;
WebSocket
.
prototype
.
removeEventListener
=
removeEventListener
;
module
.
exports
=
WebSocket
;
/**
* Initialize a WebSocket client.
*
* @param {WebSocket} websocket The client to initialize
* @param {(String|URL)} address The URL to which to connect
* @param {Array} protocols The subprotocols
* @param {Object} [options] Connection options
* @param {Boolean} [options.followRedirects=false] Whether or not to follow
* redirects
* @param {Function} [options.generateMask] The function used to generate the
* masking key
* @param {Number} [options.handshakeTimeout] Timeout in milliseconds for the
* handshake request
* @param {Number} [options.maxPayload=104857600] The maximum allowed message
* size
* @param {Number} [options.maxRedirects=10] The maximum number of redirects
* allowed
* @param {String} [options.origin] Value of the `Origin` or
* `Sec-WebSocket-Origin` header
* @param {(Boolean|Object)} [options.perMessageDeflate=true] Enable/disable
* permessage-deflate
* @param {Number} [options.protocolVersion=13] Value of the
* `Sec-WebSocket-Version` header
* @param {Boolean} [options.skipUTF8Validation=false] Specifies whether or
* not to skip UTF-8 validation for text and close messages
* @private
*/
function
initAsClient
(
websocket
,
address
,
protocols
,
options
)
{
const
opts
=
{
protocolVersion
:
protocolVersions
[
1
],
maxPayload
:
100
*
1024
*
1024
,
skipUTF8Validation
:
false
,
perMessageDeflate
:
true
,
followRedirects
:
false
,
maxRedirects
:
10
,
...
options
,
createConnection
:
undefined
,
socketPath
:
undefined
,
hostname
:
undefined
,
protocol
:
undefined
,
timeout
:
undefined
,
method
:
'GET'
,
host
:
undefined
,
path
:
undefined
,
port
:
undefined
};
if
(
!
protocolVersions
.
includes
(
opts
.
protocolVersion
))
{
throw
new
RangeError
(
`
Unsupported
protocol
version
:
$
{
opts
.
protocolVersion
}
`
+
`
(
supported
versions
:
$
{
protocolVersions
.
join
(
', '
)})
`
);
}
let
parsedUrl
;
if
(
address
instanceof
URL
)
{
parsedUrl
=
address
;
websocket
.
_url
=
address
.
href
;
}
else
{
try
{
parsedUrl
=
new
URL
(
address
);
}
catch
(
e
)
{
throw
new
SyntaxError
(
`
Invalid
URL
:
$
{
address
}
`
);
}
websocket
.
_url
=
address
;
}
const
isSecure
=
parsedUrl
.
protocol
===
'wss:'
;
const
isIpcUrl
=
parsedUrl
.
protocol
===
'ws+unix:'
;
let
invalidUrlMessage
;
if
(
parsedUrl
.
protocol
!==
'ws:'
&&
!
isSecure
&&
!
isIpcUrl
)
{
invalidUrlMessage
=
'The URL\'s protocol must be one of "ws:", "wss:", or "ws+unix:"'
;
}
else
if
(
isIpcUrl
&&
!
parsedUrl
.
pathname
)
{
invalidUrlMessage
=
"The URL's pathname is empty"
;
}
else
if
(
parsedUrl
.
hash
)
{
invalidUrlMessage
=
'The URL contains a fragment identifier'
;
}
if
(
invalidUrlMessage
)
{
const
err
=
new
SyntaxError
(
invalidUrlMessage
);
if
(
websocket
.
_redirects
===
0
)
{
throw
err
;
}
else
{
emitErrorAndClose
(
websocket
,
err
);
return
;
}
}
const
defaultPort
=
isSecure
?
443
:
80
;
const
key
=
randomBytes
(
16
).
toString
(
'base64'
);
const
request
=
isSecure
?
https
.
request
:
http
.
request
;
const
protocolSet
=
new
Set
();
let
perMessageDeflate
;
opts
.
createConnection
=
isSecure
?
tlsConnect
:
netConnect
;
opts
.
defaultPort
=
opts
.
defaultPort
||
defaultPort
;
opts
.
port
=
parsedUrl
.
port
||
defaultPort
;
opts
.
host
=
parsedUrl
.
hostname
.
startsWith
(
'['
)
?
parsedUrl
.
hostname
.
slice
(
1
,
-
1
)
:
parsedUrl
.
hostname
;
opts
.
headers
=
{
...
opts
.
headers
,
'Sec-WebSocket-Version'
:
opts
.
protocolVersion
,
'Sec-WebSocket-Key'
:
key
,
Connection
:
'Upgrade'
,
Upgrade
:
'websocket'
};
opts
.
path
=
parsedUrl
.
pathname
+
parsedUrl
.
search
;
opts
.
timeout
=
opts
.
handshakeTimeout
;
if
(
opts
.
perMessageDeflate
)
{
perMessageDeflate
=
new
PerMessageDeflate
(
opts
.
perMessageDeflate
!==
true
?
opts
.
perMessageDeflate
:
{},
false
,
opts
.
maxPayload
);
opts
.
headers
[
'Sec-WebSocket-Extensions'
]
=
format
({
[
PerMessageDeflate
.
extensionName
]
:
perMessageDeflate
.
offer
()
});
}
if
(
protocols
.
length
)
{
for
(
const
protocol
of
protocols
)
{
if
(
typeof
protocol
!==
'string'
||
!
subprotocolRegex
.
test
(
protocol
)
||
protocolSet
.
has
(
protocol
)
)
{
throw
new
SyntaxError
(
'An invalid or duplicated subprotocol was specified'
);
}
protocolSet
.
add
(
protocol
);
}
opts
.
headers
[
'Sec-WebSocket-Protocol'
]
=
protocols
.
join
(
','
);
}
if
(
opts
.
origin
)
{
if
(
opts
.
protocolVersion
<
13
)
{
opts
.
headers
[
'Sec-WebSocket-Origin'
]
=
opts
.
origin
;
}
else
{
opts
.
headers
.
Origin
=
opts
.
origin
;
}
}
if
(
parsedUrl
.
username
||
parsedUrl
.
password
)
{
opts
.
auth
=
`
$
{
parsedUrl
.
username
}
:
$
{
parsedUrl
.
password
}
`
;
}
if
(
isIpcUrl
)
{
const
parts
=
opts
.
path
.
split
(
':'
);
opts
.
socketPath
=
parts
[
0
];
opts
.
path
=
parts
[
1
];
}
let
req
;
if
(
opts
.
followRedirects
)
{
if
(
websocket
.
_redirects
===
0
)
{
websocket
.
_originalIpc
=
isIpcUrl
;
websocket
.
_originalSecure
=
isSecure
;
websocket
.
_originalHostOrSocketPath
=
isIpcUrl
?
opts
.
socketPath
:
parsedUrl
.
host
;
const
headers
=
options
&&
options
.
headers
;
//
// Shallow copy the user provided options so that headers can be changed
// without mutating the original object.
//
options
=
{
...
options
,
headers
:
{}
};
if
(
headers
)
{
for
(
const
[
key
,
value
]
of
Object
.
entries
(
headers
))
{
options
.
headers
[
key
.
toLowerCase
()]
=
value
;
}
}
}
else
if
(
websocket
.
listenerCount
(
'redirect'
)
===
0
)
{
const
isSameHost
=
isIpcUrl
?
websocket
.
_originalIpc
?
opts
.
socketPath
===
websocket
.
_originalHostOrSocketPath
:
false
:
websocket
.
_originalIpc
?
false
:
parsedUrl
.
host
===
websocket
.
_originalHostOrSocketPath
;
if
(
!
isSameHost
||
(
websocket
.
_originalSecure
&&
!
isSecure
))
{
//
// Match curl 7.77.0 behavior and drop the following headers. These
// headers are also dropped when following a redirect to a subdomain.
//
delete
opts
.
headers
.
authorization
;
delete
opts
.
headers
.
cookie
;
if
(
!
isSameHost
)
delete
opts
.
headers
.
host
;
opts
.
auth
=
undefined
;
}
}
//
// Match curl 7.77.0 behavior and make the first `Authorization` header win.
// If the `Authorization` header is set, then there is nothing to do as it
// will take precedence.
//
if
(
opts
.
auth
&&
!
options
.
headers
.
authorization
)
{
options
.
headers
.
authorization
=
'Basic '
+
Buffer
.
from
(
opts
.
auth
).
toString
(
'base64'
);
}
req
=
websocket
.
_req
=
request
(
opts
);
if
(
websocket
.
_redirects
)
{
//
// Unlike what is done for the `'upgrade'` event, no early exit is
// triggered here if the user calls `websocket.close()` or
// `websocket.terminate()` from a listener of the `'redirect'` event. This
// is because the user can also call `request.destroy()` with an error
// before calling `websocket.close()` or `websocket.terminate()` and this
// would result in an error being emitted on the `request` object with no
// `'error'` event listeners attached.
//
websocket
.
emit
(
'redirect'
,
websocket
.
url
,
req
);
}
}
else
{
req
=
websocket
.
_req
=
request
(
opts
);
}
if
(
opts
.
timeout
)
{
req
.
on
(
'timeout'
,
()
=>
{
abortHandshake
(
websocket
,
req
,
'Opening handshake has timed out'
);
});
}
req
.
on
(
'error'
,
(
err
)
=>
{
if
(
req
===
null
||
req
[
kAborted
])
return
;
req
=
websocket
.
_req
=
null
;
emitErrorAndClose
(
websocket
,
err
);
});
req
.
on
(
'response'
,
(
res
)
=>
{
const
location
=
res
.
headers
.
location
;
const
statusCode
=
res
.
statusCode
;
if
(
location
&&
opts
.
followRedirects
&&
statusCode
>=
300
&&
statusCode
<
400
)
{
if
(
++
websocket
.
_redirects
>
opts
.
maxRedirects
)
{
abortHandshake
(
websocket
,
req
,
'Maximum redirects exceeded'
);
return
;
}
req
.
abort
();
let
addr
;
try
{
addr
=
new
URL
(
location
,
address
);
}
catch
(
e
)
{
const
err
=
new
SyntaxError
(
`
Invalid
URL
:
$
{
location
}
`
);
emitErrorAndClose
(
websocket
,
err
);
return
;
}
initAsClient
(
websocket
,
addr
,
protocols
,
options
);
}
else
if
(
!
websocket
.
emit
(
'unexpected-response'
,
req
,
res
))
{
abortHandshake
(
websocket
,
req
,
`
Unexpected
server
response
:
$
{
res
.
statusCode
}
`
);
}
});
req
.
on
(
'upgrade'
,
(
res
,
socket
,
head
)
=>
{
websocket
.
emit
(
'upgrade'
,
res
);
//
// The user may have closed the connection from a listener of the
// `'upgrade'` event.
//
if
(
websocket
.
readyState
!==
WebSocket
.
CONNECTING
)
return
;
req
=
websocket
.
_req
=
null
;
if
(
res
.
headers
.
upgrade
.
toLowerCase
()
!==
'websocket'
)
{
abortHandshake
(
websocket
,
socket
,
'Invalid Upgrade header'
);
return
;
}
const
digest
=
createHash
(
'sha1'
)
.
update
(
key
+
GUID
)
.
digest
(
'base64'
);
if
(
res
.
headers
[
'sec-websocket-accept'
]
!==
digest
)
{
abortHandshake
(
websocket
,
socket
,
'Invalid Sec-WebSocket-Accept header'
);
return
;
}
const
serverProt
=
res
.
headers
[
'sec-websocket-protocol'
];
let
protError
;
if
(
serverProt
!==
undefined
)
{
if
(
!
protocolSet
.
size
)
{
protError
=
'Server sent a subprotocol but none was requested'
;
}
else
if
(
!
protocolSet
.
has
(
serverProt
))
{
protError
=
'Server sent an invalid subprotocol'
;
}
}
else
if
(
protocolSet
.
size
)
{
protError
=
'Server sent no subprotocol'
;
}
if
(
protError
)
{
abortHandshake
(
websocket
,
socket
,
protError
);
return
;
}
if
(
serverProt
)
websocket
.
_protocol
=
serverProt
;
const
secWebSocketExtensions
=
res
.
headers
[
'sec-websocket-extensions'
];
if
(
secWebSocketExtensions
!==
undefined
)
{
if
(
!
perMessageDeflate
)
{
const
message
=
'Server sent a Sec-WebSocket-Extensions header but no extension '
+
'was requested'
;
abortHandshake
(
websocket
,
socket
,
message
);
return
;
}
let
extensions
;
try
{
extensions
=
parse
(
secWebSocketExtensions
);
}
catch
(
err
)
{
const
message
=
'Invalid Sec-WebSocket-Extensions header'
;
abortHandshake
(
websocket
,
socket
,
message
);
return
;
}
const
extensionNames
=
Object
.
keys
(
extensions
);
if
(
extensionNames
.
length
!==
1
||
extensionNames
[
0
]
!==
PerMessageDeflate
.
extensionName
)
{
const
message
=
'Server indicated an extension that was not requested'
;
abortHandshake
(
websocket
,
socket
,
message
);
return
;
}
try
{
perMessageDeflate
.
accept
(
extensions
[
PerMessageDeflate
.
extensionName
]);
}
catch
(
err
)
{
const
message
=
'Invalid Sec-WebSocket-Extensions header'
;
abortHandshake
(
websocket
,
socket
,
message
);
return
;
}
websocket
.
_extensions
[
PerMessageDeflate
.
extensionName
]
=
perMessageDeflate
;
}
websocket
.
setSocket
(
socket
,
head
,
{
generateMask
:
opts
.
generateMask
,
maxPayload
:
opts
.
maxPayload
,
skipUTF8Validation
:
opts
.
skipUTF8Validation
});
});
req
.
end
();
}
/**
* Emit the `'error'` and `'close'` events.
*
* @param {WebSocket} websocket The WebSocket instance
* @param {Error} The error to emit
* @private
*/
function
emitErrorAndClose
(
websocket
,
err
)
{
websocket
.
_readyState
=
WebSocket
.
CLOSING
;
websocket
.
emit
(
'error'
,
err
);
websocket
.
emitClose
();
}
/**
* Create a `net.Socket` and initiate a connection.
*
* @param {Object} options Connection options
* @return {net.Socket} The newly created socket used to start the connection
* @private
*/
function
netConnect
(
options
)
{
options
.
path
=
options
.
socketPath
;
return
net
.
connect
(
options
);
}
/**
* Create a `tls.TLSSocket` and initiate a connection.
*
* @param {Object} options Connection options
* @return {tls.TLSSocket} The newly created socket used to start the connection
* @private
*/
function
tlsConnect
(
options
)
{
options
.
path
=
undefined
;
if
(
!
options
.
servername
&&
options
.
servername
!==
''
)
{
options
.
servername
=
net
.
isIP
(
options
.
host
)
?
''
:
options
.
host
;
}
return
tls
.
connect
(
options
);
}
/**
* Abort the handshake and emit an error.
*
* @param {WebSocket} websocket The WebSocket instance
* @param {(http.ClientRequest|net.Socket|tls.Socket)} stream The request to
* abort or the socket to destroy
* @param {String} message The error message
* @private
*/
function
abortHandshake
(
websocket
,
stream
,
message
)
{
websocket
.
_readyState
=
WebSocket
.
CLOSING
;
const
err
=
new
Error
(
message
);
Error
.
captureStackTrace
(
err
,
abortHandshake
);
if
(
stream
.
setHeader
)
{
stream
[
kAborted
]
=
true
;
stream
.
abort
();
if
(
stream
.
socket
&&
!
stream
.
socket
.
destroyed
)
{
//
// On Node.js >= 14.3.0 `request.abort()` does not destroy the socket if
// called after the request completed. See
// https://github.com/websockets/ws/issues/1869.
//
stream
.
socket
.
destroy
();
}
process
.
nextTick
(
emitErrorAndClose
,
websocket
,
err
);
}
else
{
stream
.
destroy
(
err
);
stream
.
once
(
'error'
,
websocket
.
emit
.
bind
(
websocket
,
'error'
));
stream
.
once
(
'close'
,
websocket
.
emitClose
.
bind
(
websocket
));
}
}
/**
* Handle cases where the `ping()`, `pong()`, or `send()` methods are called
* when the `readyState` attribute is `CLOSING` or `CLOSED`.
*
* @param {WebSocket} websocket The WebSocket instance
* @param {*} [data] The data to send
* @param {Function} [cb] Callback
* @private
*/
function
sendAfterClose
(
websocket
,
data
,
cb
)
{
if
(
data
)
{
const
length
=
toBuffer
(
data
).
length
;
//
// The `_bufferedAmount` property is used only when the peer is a client and
// the opening handshake fails. Under these circumstances, in fact, the
// `setSocket()` method is not called, so the `_socket` and `_sender`
// properties are set to `null`.
//
if
(
websocket
.
_socket
)
websocket
.
_sender
.
_bufferedBytes
+=
length
;
else
websocket
.
_bufferedAmount
+=
length
;
}
if
(
cb
)
{
const
err
=
new
Error
(
`
WebSocket
is
not
open
:
readyState
$
{
websocket
.
readyState
}
`
+
`
(
$
{
readyStates
[
websocket
.
readyState
]})
`
);
process
.
nextTick
(
cb
,
err
);
}
}
/**
* The listener of the `Receiver` `'conclude'` event.
*
* @param {Number} code The status code
* @param {Buffer} reason The reason for closing
* @private
*/
function
receiverOnConclude
(
code
,
reason
)
{
const
websocket
=
this
[
kWebSocket
];
websocket
.
_closeFrameReceived
=
true
;
websocket
.
_closeMessage
=
reason
;
websocket
.
_closeCode
=
code
;
if
(
websocket
.
_socket
[
kWebSocket
]
===
undefined
)
return
;
websocket
.
_socket
.
removeListener
(
'data'
,
socketOnData
);
process
.
nextTick
(
resume
,
websocket
.
_socket
);
if
(
code
===
1005
)
websocket
.
close
();
else
websocket
.
close
(
code
,
reason
);
}
/**
* The listener of the `Receiver` `'drain'` event.
*
* @private
*/
function
receiverOnDrain
()
{
const
websocket
=
this
[
kWebSocket
];
if
(
!
websocket
.
isPaused
)
websocket
.
_socket
.
resume
();
}
/**
* The listener of the `Receiver` `'error'` event.
*
* @param {(RangeError|Error)} err The emitted error
* @private
*/
function
receiverOnError
(
err
)
{
const
websocket
=
this
[
kWebSocket
];
if
(
websocket
.
_socket
[
kWebSocket
]
!==
undefined
)
{
websocket
.
_socket
.
removeListener
(
'data'
,
socketOnData
);
//
// On Node.js < 14.0.0 the `'error'` event is emitted synchronously. See
// https://github.com/websockets/ws/issues/1940.
//
process
.
nextTick
(
resume
,
websocket
.
_socket
);
websocket
.
close
(
err
[
kStatusCode
]);
}
websocket
.
emit
(
'error'
,
err
);
}
/**
* The listener of the `Receiver` `'finish'` event.
*
* @private
*/
function
receiverOnFinish
()
{
this
[
kWebSocket
].
emitClose
();
}
/**
* The listener of the `Receiver` `'message'` event.
*
* @param {Buffer|ArrayBuffer|Buffer[])} data The message
* @param {Boolean} isBinary Specifies whether the message is binary or not
* @private
*/
function
receiverOnMessage
(
data
,
isBinary
)
{
this
[
kWebSocket
].
emit
(
'message'
,
data
,
isBinary
);
}
/**
* The listener of the `Receiver` `'ping'` event.
*
* @param {Buffer} data The data included in the ping frame
* @private
*/
function
receiverOnPing
(
data
)
{
const
websocket
=
this
[
kWebSocket
];
websocket
.
pong
(
data
,
!
websocket
.
_isServer
,
NOOP
);
websocket
.
emit
(
'ping'
,
data
);
}
/**
* The listener of the `Receiver` `'pong'` event.
*
* @param {Buffer} data The data included in the pong frame
* @private
*/
function
receiverOnPong
(
data
)
{
this
[
kWebSocket
].
emit
(
'pong'
,
data
);
}
/**
* Resume a readable stream
*
* @param {Readable} stream The readable stream
* @private
*/
function
resume
(
stream
)
{
stream
.
resume
();
}
/**
* The listener of the `net.Socket` `'close'` event.
*
* @private
*/
function
socketOnClose
()
{
const
websocket
=
this
[
kWebSocket
];
this
.
removeListener
(
'close'
,
socketOnClose
);
this
.
removeListener
(
'data'
,
socketOnData
);
this
.
removeListener
(
'end'
,
socketOnEnd
);
websocket
.
_readyState
=
WebSocket
.
CLOSING
;
let
chunk
;
//
// The close frame might not have been received or the `'end'` event emitted,
// for example, if the socket was destroyed due to an error. Ensure that the
// `receiver` stream is closed after writing any remaining buffered data to
// it. If the readable side of the socket is in flowing mode then there is no
// buffered data as everything has been already written and `readable.read()`
// will return `null`. If instead, the socket is paused, any possible buffered
// data will be read as a single chunk.
//
if
(
!
this
.
_readableState
.
endEmitted
&&
!
websocket
.
_closeFrameReceived
&&
!
websocket
.
_receiver
.
_writableState
.
errorEmitted
&&
(
chunk
=
websocket
.
_socket
.
read
())
!==
null
)
{
websocket
.
_receiver
.
write
(
chunk
);
}
websocket
.
_receiver
.
end
();
this
[
kWebSocket
]
=
undefined
;
clearTimeout
(
websocket
.
_closeTimer
);
if
(
websocket
.
_receiver
.
_writableState
.
finished
||
websocket
.
_receiver
.
_writableState
.
errorEmitted
)
{
websocket
.
emitClose
();
}
else
{
websocket
.
_receiver
.
on
(
'error'
,
receiverOnFinish
);
websocket
.
_receiver
.
on
(
'finish'
,
receiverOnFinish
);
}
}
/**
* The listener of the `net.Socket` `'data'` event.
*
* @param {Buffer} chunk A chunk of data
* @private
*/
function
socketOnData
(
chunk
)
{
if
(
!
this
[
kWebSocket
].
_receiver
.
write
(
chunk
))
{
this
.
pause
();
}
}
/**
* The listener of the `net.Socket` `'end'` event.
*
* @private
*/
function
socketOnEnd
()
{
const
websocket
=
this
[
kWebSocket
];
websocket
.
_readyState
=
WebSocket
.
CLOSING
;
websocket
.
_receiver
.
end
();
this
.
end
();
}
/**
* The listener of the `net.Socket` `'error'` event.
*
* @private
*/
function
socketOnError
()
{
const
websocket
=
this
[
kWebSocket
];
this
.
removeListener
(
'error'
,
socketOnError
);
this
.
on
(
'error'
,
NOOP
);
if
(
websocket
)
{
websocket
.
_readyState
=
WebSocket
.
CLOSING
;
this
.
destroy
();
}
}
Event Timeline
Log In to Comment