Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F101008648
sender.js
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Tue, Feb 4, 18:43
Size
12 KB
Mime Type
text/x-c++
Expires
Thu, Feb 6, 18:43 (1 d, 23 h)
Engine
blob
Format
Raw Data
Handle
24072735
Attached To
rOACCT Open Access Compliance Check Tool (OACCT)
sender.js
View Options
/* eslint no-unused-vars: ["error", { "varsIgnorePattern": "^net|tls$" }] */
'use strict'
;
const
net
=
require
(
'net'
);
const
tls
=
require
(
'tls'
);
const
{
randomFillSync
}
=
require
(
'crypto'
);
const
PerMessageDeflate
=
require
(
'./permessage-deflate'
);
const
{
EMPTY_BUFFER
}
=
require
(
'./constants'
);
const
{
isValidStatusCode
}
=
require
(
'./validation'
);
const
{
mask
:
applyMask
,
toBuffer
}
=
require
(
'./buffer-util'
);
const
kByteLength
=
Symbol
(
'kByteLength'
);
const
maskBuffer
=
Buffer
.
alloc
(
4
);
/**
* HyBi Sender implementation.
*/
class
Sender
{
/**
* Creates a Sender instance.
*
* @param {(net.Socket|tls.Socket)} socket The connection socket
* @param {Object} [extensions] An object containing the negotiated extensions
* @param {Function} [generateMask] The function used to generate the masking
* key
*/
constructor
(
socket
,
extensions
,
generateMask
)
{
this
.
_extensions
=
extensions
||
{};
if
(
generateMask
)
{
this
.
_generateMask
=
generateMask
;
this
.
_maskBuffer
=
Buffer
.
alloc
(
4
);
}
this
.
_socket
=
socket
;
this
.
_firstFragment
=
true
;
this
.
_compress
=
false
;
this
.
_bufferedBytes
=
0
;
this
.
_deflating
=
false
;
this
.
_queue
=
[];
}
/**
* Frames a piece of data according to the HyBi WebSocket protocol.
*
* @param {(Buffer|String)} data The data to frame
* @param {Object} options Options object
* @param {Boolean} [options.fin=false] Specifies whether or not to set the
* FIN bit
* @param {Function} [options.generateMask] The function used to generate the
* masking key
* @param {Boolean} [options.mask=false] Specifies whether or not to mask
* `data`
* @param {Buffer} [options.maskBuffer] The buffer used to store the masking
* key
* @param {Number} options.opcode The opcode
* @param {Boolean} [options.readOnly=false] Specifies whether `data` can be
* modified
* @param {Boolean} [options.rsv1=false] Specifies whether or not to set the
* RSV1 bit
* @return {(Buffer|String)[]} The framed data
* @public
*/
static
frame
(
data
,
options
)
{
let
mask
;
let
merge
=
false
;
let
offset
=
2
;
let
skipMasking
=
false
;
if
(
options
.
mask
)
{
mask
=
options
.
maskBuffer
||
maskBuffer
;
if
(
options
.
generateMask
)
{
options
.
generateMask
(
mask
);
}
else
{
randomFillSync
(
mask
,
0
,
4
);
}
skipMasking
=
(
mask
[
0
]
|
mask
[
1
]
|
mask
[
2
]
|
mask
[
3
])
===
0
;
offset
=
6
;
}
let
dataLength
;
if
(
typeof
data
===
'string'
)
{
if
(
(
!
options
.
mask
||
skipMasking
)
&&
options
[
kByteLength
]
!==
undefined
)
{
dataLength
=
options
[
kByteLength
];
}
else
{
data
=
Buffer
.
from
(
data
);
dataLength
=
data
.
length
;
}
}
else
{
dataLength
=
data
.
length
;
merge
=
options
.
mask
&&
options
.
readOnly
&&
!
skipMasking
;
}
let
payloadLength
=
dataLength
;
if
(
dataLength
>=
65536
)
{
offset
+=
8
;
payloadLength
=
127
;
}
else
if
(
dataLength
>
125
)
{
offset
+=
2
;
payloadLength
=
126
;
}
const
target
=
Buffer
.
allocUnsafe
(
merge
?
dataLength
+
offset
:
offset
);
target
[
0
]
=
options
.
fin
?
options
.
opcode
|
0x80
:
options
.
opcode
;
if
(
options
.
rsv1
)
target
[
0
]
|=
0x40
;
target
[
1
]
=
payloadLength
;
if
(
payloadLength
===
126
)
{
target
.
writeUInt16BE
(
dataLength
,
2
);
}
else
if
(
payloadLength
===
127
)
{
target
[
2
]
=
target
[
3
]
=
0
;
target
.
writeUIntBE
(
dataLength
,
4
,
6
);
}
if
(
!
options
.
mask
)
return
[
target
,
data
];
target
[
1
]
|=
0x80
;
target
[
offset
-
4
]
=
mask
[
0
];
target
[
offset
-
3
]
=
mask
[
1
];
target
[
offset
-
2
]
=
mask
[
2
];
target
[
offset
-
1
]
=
mask
[
3
];
if
(
skipMasking
)
return
[
target
,
data
];
if
(
merge
)
{
applyMask
(
data
,
mask
,
target
,
offset
,
dataLength
);
return
[
target
];
}
applyMask
(
data
,
mask
,
data
,
0
,
dataLength
);
return
[
target
,
data
];
}
/**
* Sends a close message to the other peer.
*
* @param {Number} [code] The status code component of the body
* @param {(String|Buffer)} [data] The message component of the body
* @param {Boolean} [mask=false] Specifies whether or not to mask the message
* @param {Function} [cb] Callback
* @public
*/
close
(
code
,
data
,
mask
,
cb
)
{
let
buf
;
if
(
code
===
undefined
)
{
buf
=
EMPTY_BUFFER
;
}
else
if
(
typeof
code
!==
'number'
||
!
isValidStatusCode
(
code
))
{
throw
new
TypeError
(
'First argument must be a valid error code number'
);
}
else
if
(
data
===
undefined
||
!
data
.
length
)
{
buf
=
Buffer
.
allocUnsafe
(
2
);
buf
.
writeUInt16BE
(
code
,
0
);
}
else
{
const
length
=
Buffer
.
byteLength
(
data
);
if
(
length
>
123
)
{
throw
new
RangeError
(
'The message must not be greater than 123 bytes'
);
}
buf
=
Buffer
.
allocUnsafe
(
2
+
length
);
buf
.
writeUInt16BE
(
code
,
0
);
if
(
typeof
data
===
'string'
)
{
buf
.
write
(
data
,
2
);
}
else
{
buf
.
set
(
data
,
2
);
}
}
const
options
=
{
[
kByteLength
]
:
buf
.
length
,
fin
:
true
,
generateMask
:
this
.
_generateMask
,
mask
,
maskBuffer
:
this
.
_maskBuffer
,
opcode
:
0x08
,
readOnly
:
false
,
rsv1
:
false
};
if
(
this
.
_deflating
)
{
this
.
enqueue
([
this
.
dispatch
,
buf
,
false
,
options
,
cb
]);
}
else
{
this
.
sendFrame
(
Sender
.
frame
(
buf
,
options
),
cb
);
}
}
/**
* Sends a ping message to the other peer.
*
* @param {*} data The message to send
* @param {Boolean} [mask=false] Specifies whether or not to mask `data`
* @param {Function} [cb] Callback
* @public
*/
ping
(
data
,
mask
,
cb
)
{
let
byteLength
;
let
readOnly
;
if
(
typeof
data
===
'string'
)
{
byteLength
=
Buffer
.
byteLength
(
data
);
readOnly
=
false
;
}
else
{
data
=
toBuffer
(
data
);
byteLength
=
data
.
length
;
readOnly
=
toBuffer
.
readOnly
;
}
if
(
byteLength
>
125
)
{
throw
new
RangeError
(
'The data size must not be greater than 125 bytes'
);
}
const
options
=
{
[
kByteLength
]
:
byteLength
,
fin
:
true
,
generateMask
:
this
.
_generateMask
,
mask
,
maskBuffer
:
this
.
_maskBuffer
,
opcode
:
0x09
,
readOnly
,
rsv1
:
false
};
if
(
this
.
_deflating
)
{
this
.
enqueue
([
this
.
dispatch
,
data
,
false
,
options
,
cb
]);
}
else
{
this
.
sendFrame
(
Sender
.
frame
(
data
,
options
),
cb
);
}
}
/**
* Sends a pong message to the other peer.
*
* @param {*} data The message to send
* @param {Boolean} [mask=false] Specifies whether or not to mask `data`
* @param {Function} [cb] Callback
* @public
*/
pong
(
data
,
mask
,
cb
)
{
let
byteLength
;
let
readOnly
;
if
(
typeof
data
===
'string'
)
{
byteLength
=
Buffer
.
byteLength
(
data
);
readOnly
=
false
;
}
else
{
data
=
toBuffer
(
data
);
byteLength
=
data
.
length
;
readOnly
=
toBuffer
.
readOnly
;
}
if
(
byteLength
>
125
)
{
throw
new
RangeError
(
'The data size must not be greater than 125 bytes'
);
}
const
options
=
{
[
kByteLength
]
:
byteLength
,
fin
:
true
,
generateMask
:
this
.
_generateMask
,
mask
,
maskBuffer
:
this
.
_maskBuffer
,
opcode
:
0x0a
,
readOnly
,
rsv1
:
false
};
if
(
this
.
_deflating
)
{
this
.
enqueue
([
this
.
dispatch
,
data
,
false
,
options
,
cb
]);
}
else
{
this
.
sendFrame
(
Sender
.
frame
(
data
,
options
),
cb
);
}
}
/**
* Sends a data message to the other peer.
*
* @param {*} data The message to send
* @param {Object} options Options object
* @param {Boolean} [options.binary=false] Specifies whether `data` is binary
* or text
* @param {Boolean} [options.compress=false] Specifies whether or not to
* compress `data`
* @param {Boolean} [options.fin=false] Specifies whether the fragment is the
* last one
* @param {Boolean} [options.mask=false] Specifies whether or not to mask
* `data`
* @param {Function} [cb] Callback
* @public
*/
send
(
data
,
options
,
cb
)
{
const
perMessageDeflate
=
this
.
_extensions
[
PerMessageDeflate
.
extensionName
];
let
opcode
=
options
.
binary
?
2
:
1
;
let
rsv1
=
options
.
compress
;
let
byteLength
;
let
readOnly
;
if
(
typeof
data
===
'string'
)
{
byteLength
=
Buffer
.
byteLength
(
data
);
readOnly
=
false
;
}
else
{
data
=
toBuffer
(
data
);
byteLength
=
data
.
length
;
readOnly
=
toBuffer
.
readOnly
;
}
if
(
this
.
_firstFragment
)
{
this
.
_firstFragment
=
false
;
if
(
rsv1
&&
perMessageDeflate
&&
perMessageDeflate
.
params
[
perMessageDeflate
.
_isServer
?
'server_no_context_takeover'
:
'client_no_context_takeover'
]
)
{
rsv1
=
byteLength
>=
perMessageDeflate
.
_threshold
;
}
this
.
_compress
=
rsv1
;
}
else
{
rsv1
=
false
;
opcode
=
0
;
}
if
(
options
.
fin
)
this
.
_firstFragment
=
true
;
if
(
perMessageDeflate
)
{
const
opts
=
{
[
kByteLength
]
:
byteLength
,
fin
:
options
.
fin
,
generateMask
:
this
.
_generateMask
,
mask
:
options
.
mask
,
maskBuffer
:
this
.
_maskBuffer
,
opcode
,
readOnly
,
rsv1
};
if
(
this
.
_deflating
)
{
this
.
enqueue
([
this
.
dispatch
,
data
,
this
.
_compress
,
opts
,
cb
]);
}
else
{
this
.
dispatch
(
data
,
this
.
_compress
,
opts
,
cb
);
}
}
else
{
this
.
sendFrame
(
Sender
.
frame
(
data
,
{
[
kByteLength
]
:
byteLength
,
fin
:
options
.
fin
,
generateMask
:
this
.
_generateMask
,
mask
:
options
.
mask
,
maskBuffer
:
this
.
_maskBuffer
,
opcode
,
readOnly
,
rsv1
:
false
}),
cb
);
}
}
/**
* Dispatches a message.
*
* @param {(Buffer|String)} data The message to send
* @param {Boolean} [compress=false] Specifies whether or not to compress
* `data`
* @param {Object} options Options object
* @param {Boolean} [options.fin=false] Specifies whether or not to set the
* FIN bit
* @param {Function} [options.generateMask] The function used to generate the
* masking key
* @param {Boolean} [options.mask=false] Specifies whether or not to mask
* `data`
* @param {Buffer} [options.maskBuffer] The buffer used to store the masking
* key
* @param {Number} options.opcode The opcode
* @param {Boolean} [options.readOnly=false] Specifies whether `data` can be
* modified
* @param {Boolean} [options.rsv1=false] Specifies whether or not to set the
* RSV1 bit
* @param {Function} [cb] Callback
* @private
*/
dispatch
(
data
,
compress
,
options
,
cb
)
{
if
(
!
compress
)
{
this
.
sendFrame
(
Sender
.
frame
(
data
,
options
),
cb
);
return
;
}
const
perMessageDeflate
=
this
.
_extensions
[
PerMessageDeflate
.
extensionName
];
this
.
_bufferedBytes
+=
options
[
kByteLength
];
this
.
_deflating
=
true
;
perMessageDeflate
.
compress
(
data
,
options
.
fin
,
(
_
,
buf
)
=>
{
if
(
this
.
_socket
.
destroyed
)
{
const
err
=
new
Error
(
'The socket was closed while data was being compressed'
);
if
(
typeof
cb
===
'function'
)
cb
(
err
);
for
(
let
i
=
0
;
i
<
this
.
_queue
.
length
;
i
++
)
{
const
params
=
this
.
_queue
[
i
];
const
callback
=
params
[
params
.
length
-
1
];
if
(
typeof
callback
===
'function'
)
callback
(
err
);
}
return
;
}
this
.
_bufferedBytes
-=
options
[
kByteLength
];
this
.
_deflating
=
false
;
options
.
readOnly
=
false
;
this
.
sendFrame
(
Sender
.
frame
(
buf
,
options
),
cb
);
this
.
dequeue
();
});
}
/**
* Executes queued send operations.
*
* @private
*/
dequeue
()
{
while
(
!
this
.
_deflating
&&
this
.
_queue
.
length
)
{
const
params
=
this
.
_queue
.
shift
();
this
.
_bufferedBytes
-=
params
[
3
][
kByteLength
];
Reflect
.
apply
(
params
[
0
],
this
,
params
.
slice
(
1
));
}
}
/**
* Enqueues a send operation.
*
* @param {Array} params Send operation parameters.
* @private
*/
enqueue
(
params
)
{
this
.
_bufferedBytes
+=
params
[
3
][
kByteLength
];
this
.
_queue
.
push
(
params
);
}
/**
* Sends a frame.
*
* @param {Buffer[]} list The frame to send
* @param {Function} [cb] Callback
* @private
*/
sendFrame
(
list
,
cb
)
{
if
(
list
.
length
===
2
)
{
this
.
_socket
.
cork
();
this
.
_socket
.
write
(
list
[
0
]);
this
.
_socket
.
write
(
list
[
1
],
cb
);
this
.
_socket
.
uncork
();
}
else
{
this
.
_socket
.
write
(
list
[
0
],
cb
);
}
}
}
module
.
exports
=
Sender
;
Event Timeline
Log In to Comment