Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F100979176
permessage-deflate.js
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Tue, Feb 4, 11:59
Size
13 KB
Mime Type
text/x-c++
Expires
Thu, Feb 6, 11:59 (1 d, 21 h)
Engine
blob
Format
Raw Data
Handle
24066101
Attached To
rOACCT Open Access Compliance Check Tool (OACCT)
permessage-deflate.js
View Options
'use strict'
;
const
zlib
=
require
(
'zlib'
);
const
bufferUtil
=
require
(
'./buffer-util'
);
const
Limiter
=
require
(
'./limiter'
);
const
{
kStatusCode
}
=
require
(
'./constants'
);
const
FastBuffer
=
Buffer
[
Symbol
.
species
];
const
TRAILER
=
Buffer
.
from
([
0x00
,
0x00
,
0xff
,
0xff
]);
const
kPerMessageDeflate
=
Symbol
(
'permessage-deflate'
);
const
kTotalLength
=
Symbol
(
'total-length'
);
const
kCallback
=
Symbol
(
'callback'
);
const
kBuffers
=
Symbol
(
'buffers'
);
const
kError
=
Symbol
(
'error'
);
//
// We limit zlib concurrency, which prevents severe memory fragmentation
// as documented in https://github.com/nodejs/node/issues/8871#issuecomment-250915913
// and https://github.com/websockets/ws/issues/1202
//
// Intentionally global; it's the global thread pool that's an issue.
//
let
zlibLimiter
;
/**
* permessage-deflate implementation.
*/
class
PerMessageDeflate
{
/**
* Creates a PerMessageDeflate instance.
*
* @param {Object} [options] Configuration options
* @param {(Boolean|Number)} [options.clientMaxWindowBits] Advertise support
* for, or request, a custom client window size
* @param {Boolean} [options.clientNoContextTakeover=false] Advertise/
* acknowledge disabling of client context takeover
* @param {Number} [options.concurrencyLimit=10] The number of concurrent
* calls to zlib
* @param {(Boolean|Number)} [options.serverMaxWindowBits] Request/confirm the
* use of a custom server window size
* @param {Boolean} [options.serverNoContextTakeover=false] Request/accept
* disabling of server context takeover
* @param {Number} [options.threshold=1024] Size (in bytes) below which
* messages should not be compressed if context takeover is disabled
* @param {Object} [options.zlibDeflateOptions] Options to pass to zlib on
* deflate
* @param {Object} [options.zlibInflateOptions] Options to pass to zlib on
* inflate
* @param {Boolean} [isServer=false] Create the instance in either server or
* client mode
* @param {Number} [maxPayload=0] The maximum allowed message length
*/
constructor
(
options
,
isServer
,
maxPayload
)
{
this
.
_maxPayload
=
maxPayload
|
0
;
this
.
_options
=
options
||
{};
this
.
_threshold
=
this
.
_options
.
threshold
!==
undefined
?
this
.
_options
.
threshold
:
1024
;
this
.
_isServer
=
!!
isServer
;
this
.
_deflate
=
null
;
this
.
_inflate
=
null
;
this
.
params
=
null
;
if
(
!
zlibLimiter
)
{
const
concurrency
=
this
.
_options
.
concurrencyLimit
!==
undefined
?
this
.
_options
.
concurrencyLimit
:
10
;
zlibLimiter
=
new
Limiter
(
concurrency
);
}
}
/**
* @type {String}
*/
static
get
extensionName
()
{
return
'permessage-deflate'
;
}
/**
* Create an extension negotiation offer.
*
* @return {Object} Extension parameters
* @public
*/
offer
()
{
const
params
=
{};
if
(
this
.
_options
.
serverNoContextTakeover
)
{
params
.
server_no_context_takeover
=
true
;
}
if
(
this
.
_options
.
clientNoContextTakeover
)
{
params
.
client_no_context_takeover
=
true
;
}
if
(
this
.
_options
.
serverMaxWindowBits
)
{
params
.
server_max_window_bits
=
this
.
_options
.
serverMaxWindowBits
;
}
if
(
this
.
_options
.
clientMaxWindowBits
)
{
params
.
client_max_window_bits
=
this
.
_options
.
clientMaxWindowBits
;
}
else
if
(
this
.
_options
.
clientMaxWindowBits
==
null
)
{
params
.
client_max_window_bits
=
true
;
}
return
params
;
}
/**
* Accept an extension negotiation offer/response.
*
* @param {Array} configurations The extension negotiation offers/reponse
* @return {Object} Accepted configuration
* @public
*/
accept
(
configurations
)
{
configurations
=
this
.
normalizeParams
(
configurations
);
this
.
params
=
this
.
_isServer
?
this
.
acceptAsServer
(
configurations
)
:
this
.
acceptAsClient
(
configurations
);
return
this
.
params
;
}
/**
* Releases all resources used by the extension.
*
* @public
*/
cleanup
()
{
if
(
this
.
_inflate
)
{
this
.
_inflate
.
close
();
this
.
_inflate
=
null
;
}
if
(
this
.
_deflate
)
{
const
callback
=
this
.
_deflate
[
kCallback
];
this
.
_deflate
.
close
();
this
.
_deflate
=
null
;
if
(
callback
)
{
callback
(
new
Error
(
'The deflate stream was closed while data was being processed'
)
);
}
}
}
/**
* Accept an extension negotiation offer.
*
* @param {Array} offers The extension negotiation offers
* @return {Object} Accepted configuration
* @private
*/
acceptAsServer
(
offers
)
{
const
opts
=
this
.
_options
;
const
accepted
=
offers
.
find
((
params
)
=>
{
if
(
(
opts
.
serverNoContextTakeover
===
false
&&
params
.
server_no_context_takeover
)
||
(
params
.
server_max_window_bits
&&
(
opts
.
serverMaxWindowBits
===
false
||
(
typeof
opts
.
serverMaxWindowBits
===
'number'
&&
opts
.
serverMaxWindowBits
>
params
.
server_max_window_bits
)))
||
(
typeof
opts
.
clientMaxWindowBits
===
'number'
&&
!
params
.
client_max_window_bits
)
)
{
return
false
;
}
return
true
;
});
if
(
!
accepted
)
{
throw
new
Error
(
'None of the extension offers can be accepted'
);
}
if
(
opts
.
serverNoContextTakeover
)
{
accepted
.
server_no_context_takeover
=
true
;
}
if
(
opts
.
clientNoContextTakeover
)
{
accepted
.
client_no_context_takeover
=
true
;
}
if
(
typeof
opts
.
serverMaxWindowBits
===
'number'
)
{
accepted
.
server_max_window_bits
=
opts
.
serverMaxWindowBits
;
}
if
(
typeof
opts
.
clientMaxWindowBits
===
'number'
)
{
accepted
.
client_max_window_bits
=
opts
.
clientMaxWindowBits
;
}
else
if
(
accepted
.
client_max_window_bits
===
true
||
opts
.
clientMaxWindowBits
===
false
)
{
delete
accepted
.
client_max_window_bits
;
}
return
accepted
;
}
/**
* Accept the extension negotiation response.
*
* @param {Array} response The extension negotiation response
* @return {Object} Accepted configuration
* @private
*/
acceptAsClient
(
response
)
{
const
params
=
response
[
0
];
if
(
this
.
_options
.
clientNoContextTakeover
===
false
&&
params
.
client_no_context_takeover
)
{
throw
new
Error
(
'Unexpected parameter "client_no_context_takeover"'
);
}
if
(
!
params
.
client_max_window_bits
)
{
if
(
typeof
this
.
_options
.
clientMaxWindowBits
===
'number'
)
{
params
.
client_max_window_bits
=
this
.
_options
.
clientMaxWindowBits
;
}
}
else
if
(
this
.
_options
.
clientMaxWindowBits
===
false
||
(
typeof
this
.
_options
.
clientMaxWindowBits
===
'number'
&&
params
.
client_max_window_bits
>
this
.
_options
.
clientMaxWindowBits
)
)
{
throw
new
Error
(
'Unexpected or invalid parameter "client_max_window_bits"'
);
}
return
params
;
}
/**
* Normalize parameters.
*
* @param {Array} configurations The extension negotiation offers/reponse
* @return {Array} The offers/response with normalized parameters
* @private
*/
normalizeParams
(
configurations
)
{
configurations
.
forEach
((
params
)
=>
{
Object
.
keys
(
params
).
forEach
((
key
)
=>
{
let
value
=
params
[
key
];
if
(
value
.
length
>
1
)
{
throw
new
Error
(
`
Parameter
"${key}"
must
have
only
a
single
value
`
);
}
value
=
value
[
0
];
if
(
key
===
'client_max_window_bits'
)
{
if
(
value
!==
true
)
{
const
num
=
+
value
;
if
(
!
Number
.
isInteger
(
num
)
||
num
<
8
||
num
>
15
)
{
throw
new
TypeError
(
`
Invalid
value
for
parameter
"${key}"
:
$
{
value
}
`
);
}
value
=
num
;
}
else
if
(
!
this
.
_isServer
)
{
throw
new
TypeError
(
`
Invalid
value
for
parameter
"${key}"
:
$
{
value
}
`
);
}
}
else
if
(
key
===
'server_max_window_bits'
)
{
const
num
=
+
value
;
if
(
!
Number
.
isInteger
(
num
)
||
num
<
8
||
num
>
15
)
{
throw
new
TypeError
(
`
Invalid
value
for
parameter
"${key}"
:
$
{
value
}
`
);
}
value
=
num
;
}
else
if
(
key
===
'client_no_context_takeover'
||
key
===
'server_no_context_takeover'
)
{
if
(
value
!==
true
)
{
throw
new
TypeError
(
`
Invalid
value
for
parameter
"${key}"
:
$
{
value
}
`
);
}
}
else
{
throw
new
Error
(
`
Unknown
parameter
"${key}"
`
);
}
params
[
key
]
=
value
;
});
});
return
configurations
;
}
/**
* Decompress data. Concurrency limited.
*
* @param {Buffer} data Compressed data
* @param {Boolean} fin Specifies whether or not this is the last fragment
* @param {Function} callback Callback
* @public
*/
decompress
(
data
,
fin
,
callback
)
{
zlibLimiter
.
add
((
done
)
=>
{
this
.
_decompress
(
data
,
fin
,
(
err
,
result
)
=>
{
done
();
callback
(
err
,
result
);
});
});
}
/**
* Compress data. Concurrency limited.
*
* @param {(Buffer|String)} data Data to compress
* @param {Boolean} fin Specifies whether or not this is the last fragment
* @param {Function} callback Callback
* @public
*/
compress
(
data
,
fin
,
callback
)
{
zlibLimiter
.
add
((
done
)
=>
{
this
.
_compress
(
data
,
fin
,
(
err
,
result
)
=>
{
done
();
callback
(
err
,
result
);
});
});
}
/**
* Decompress data.
*
* @param {Buffer} data Compressed data
* @param {Boolean} fin Specifies whether or not this is the last fragment
* @param {Function} callback Callback
* @private
*/
_decompress
(
data
,
fin
,
callback
)
{
const
endpoint
=
this
.
_isServer
?
'client'
:
'server'
;
if
(
!
this
.
_inflate
)
{
const
key
=
`
$
{
endpoint
}
_max_window_bits
`
;
const
windowBits
=
typeof
this
.
params
[
key
]
!==
'number'
?
zlib
.
Z_DEFAULT_WINDOWBITS
:
this
.
params
[
key
];
this
.
_inflate
=
zlib
.
createInflateRaw
({
...
this
.
_options
.
zlibInflateOptions
,
windowBits
});
this
.
_inflate
[
kPerMessageDeflate
]
=
this
;
this
.
_inflate
[
kTotalLength
]
=
0
;
this
.
_inflate
[
kBuffers
]
=
[];
this
.
_inflate
.
on
(
'error'
,
inflateOnError
);
this
.
_inflate
.
on
(
'data'
,
inflateOnData
);
}
this
.
_inflate
[
kCallback
]
=
callback
;
this
.
_inflate
.
write
(
data
);
if
(
fin
)
this
.
_inflate
.
write
(
TRAILER
);
this
.
_inflate
.
flush
(()
=>
{
const
err
=
this
.
_inflate
[
kError
];
if
(
err
)
{
this
.
_inflate
.
close
();
this
.
_inflate
=
null
;
callback
(
err
);
return
;
}
const
data
=
bufferUtil
.
concat
(
this
.
_inflate
[
kBuffers
],
this
.
_inflate
[
kTotalLength
]
);
if
(
this
.
_inflate
.
_readableState
.
endEmitted
)
{
this
.
_inflate
.
close
();
this
.
_inflate
=
null
;
}
else
{
this
.
_inflate
[
kTotalLength
]
=
0
;
this
.
_inflate
[
kBuffers
]
=
[];
if
(
fin
&&
this
.
params
[
`
$
{
endpoint
}
_no_context_takeover
`
])
{
this
.
_inflate
.
reset
();
}
}
callback
(
null
,
data
);
});
}
/**
* Compress data.
*
* @param {(Buffer|String)} data Data to compress
* @param {Boolean} fin Specifies whether or not this is the last fragment
* @param {Function} callback Callback
* @private
*/
_compress
(
data
,
fin
,
callback
)
{
const
endpoint
=
this
.
_isServer
?
'server'
:
'client'
;
if
(
!
this
.
_deflate
)
{
const
key
=
`
$
{
endpoint
}
_max_window_bits
`
;
const
windowBits
=
typeof
this
.
params
[
key
]
!==
'number'
?
zlib
.
Z_DEFAULT_WINDOWBITS
:
this
.
params
[
key
];
this
.
_deflate
=
zlib
.
createDeflateRaw
({
...
this
.
_options
.
zlibDeflateOptions
,
windowBits
});
this
.
_deflate
[
kTotalLength
]
=
0
;
this
.
_deflate
[
kBuffers
]
=
[];
this
.
_deflate
.
on
(
'data'
,
deflateOnData
);
}
this
.
_deflate
[
kCallback
]
=
callback
;
this
.
_deflate
.
write
(
data
);
this
.
_deflate
.
flush
(
zlib
.
Z_SYNC_FLUSH
,
()
=>
{
if
(
!
this
.
_deflate
)
{
//
// The deflate stream was closed while data was being processed.
//
return
;
}
let
data
=
bufferUtil
.
concat
(
this
.
_deflate
[
kBuffers
],
this
.
_deflate
[
kTotalLength
]
);
if
(
fin
)
{
data
=
new
FastBuffer
(
data
.
buffer
,
data
.
byteOffset
,
data
.
length
-
4
);
}
//
// Ensure that the callback will not be called again in
// `PerMessageDeflate#cleanup()`.
//
this
.
_deflate
[
kCallback
]
=
null
;
this
.
_deflate
[
kTotalLength
]
=
0
;
this
.
_deflate
[
kBuffers
]
=
[];
if
(
fin
&&
this
.
params
[
`
$
{
endpoint
}
_no_context_takeover
`
])
{
this
.
_deflate
.
reset
();
}
callback
(
null
,
data
);
});
}
}
module
.
exports
=
PerMessageDeflate
;
/**
* The listener of the `zlib.DeflateRaw` stream `'data'` event.
*
* @param {Buffer} chunk A chunk of data
* @private
*/
function
deflateOnData
(
chunk
)
{
this
[
kBuffers
].
push
(
chunk
);
this
[
kTotalLength
]
+=
chunk
.
length
;
}
/**
* The listener of the `zlib.InflateRaw` stream `'data'` event.
*
* @param {Buffer} chunk A chunk of data
* @private
*/
function
inflateOnData
(
chunk
)
{
this
[
kTotalLength
]
+=
chunk
.
length
;
if
(
this
[
kPerMessageDeflate
].
_maxPayload
<
1
||
this
[
kTotalLength
]
<=
this
[
kPerMessageDeflate
].
_maxPayload
)
{
this
[
kBuffers
].
push
(
chunk
);
return
;
}
this
[
kError
]
=
new
RangeError
(
'Max payload size exceeded'
);
this
[
kError
].
code
=
'WS_ERR_UNSUPPORTED_MESSAGE_LENGTH'
;
this
[
kError
][
kStatusCode
]
=
1009
;
this
.
removeListener
(
'data'
,
inflateOnData
);
this
.
reset
();
}
/**
* The listener of the `zlib.InflateRaw` stream `'error'` event.
*
* @param {Error} err The emitted error
* @private
*/
function
inflateOnError
(
err
)
{
//
// There is no need to call `Zlib#close()` as the handle is automatically
// closed when an error is emitted.
//
this
[
kPerMessageDeflate
].
_inflate
=
null
;
err
[
kStatusCode
]
=
1007
;
this
[
kCallback
](
err
);
}
Event Timeline
Log In to Comment