Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F101895728
AxiosTransformStream.js
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Fri, Feb 14, 20:58
Size
4 KB
Mime Type
text/x-c++
Expires
Sun, Feb 16, 20:58 (1 d, 23 h)
Engine
blob
Format
Raw Data
Handle
24239787
Attached To
rOACCT Open Access Compliance Check Tool (OACCT)
AxiosTransformStream.js
View Options
'use strict'
;
import
stream
from
'stream'
;
import
utils
from
'../utils.js'
;
import
throttle
from
'./throttle.js'
;
import
speedometer
from
'./speedometer.js'
;
const
kInternals
=
Symbol
(
'internals'
);
class
AxiosTransformStream
extends
stream
.
Transform
{
constructor
(
options
)
{
options
=
utils
.
toFlatObject
(
options
,
{
maxRate
:
0
,
chunkSize
:
64
*
1024
,
minChunkSize
:
100
,
timeWindow
:
500
,
ticksRate
:
2
,
samplesCount
:
15
},
null
,
(
prop
,
source
)
=>
{
return
!
utils
.
isUndefined
(
source
[
prop
]);
});
super
({
readableHighWaterMark
:
options
.
chunkSize
});
const
self
=
this
;
const
internals
=
this
[
kInternals
]
=
{
length
:
options
.
length
,
timeWindow
:
options
.
timeWindow
,
ticksRate
:
options
.
ticksRate
,
chunkSize
:
options
.
chunkSize
,
maxRate
:
options
.
maxRate
,
minChunkSize
:
options
.
minChunkSize
,
bytesSeen
:
0
,
isCaptured
:
false
,
notifiedBytesLoaded
:
0
,
ts
:
Date
.
now
(),
bytes
:
0
,
onReadCallback
:
null
};
const
_speedometer
=
speedometer
(
internals
.
ticksRate
*
options
.
samplesCount
,
internals
.
timeWindow
);
this
.
on
(
'newListener'
,
event
=>
{
if
(
event
===
'progress'
)
{
if
(
!
internals
.
isCaptured
)
{
internals
.
isCaptured
=
true
;
}
}
});
let
bytesNotified
=
0
;
internals
.
updateProgress
=
throttle
(
function
throttledHandler
()
{
const
totalBytes
=
internals
.
length
;
const
bytesTransferred
=
internals
.
bytesSeen
;
const
progressBytes
=
bytesTransferred
-
bytesNotified
;
if
(
!
progressBytes
||
self
.
destroyed
)
return
;
const
rate
=
_speedometer
(
progressBytes
);
bytesNotified
=
bytesTransferred
;
process
.
nextTick
(()
=>
{
self
.
emit
(
'progress'
,
{
'loaded'
:
bytesTransferred
,
'total'
:
totalBytes
,
'progress'
:
totalBytes
?
(
bytesTransferred
/
totalBytes
)
:
undefined
,
'bytes'
:
progressBytes
,
'rate'
:
rate
?
rate
:
undefined
,
'estimated'
:
rate
&&
totalBytes
&&
bytesTransferred
<=
totalBytes
?
(
totalBytes
-
bytesTransferred
)
/
rate
:
undefined
});
});
},
internals
.
ticksRate
);
const
onFinish
=
()
=>
{
internals
.
updateProgress
(
true
);
};
this
.
once
(
'end'
,
onFinish
);
this
.
once
(
'error'
,
onFinish
);
}
_read
(
size
)
{
const
internals
=
this
[
kInternals
];
if
(
internals
.
onReadCallback
)
{
internals
.
onReadCallback
();
}
return
super
.
_read
(
size
);
}
_transform
(
chunk
,
encoding
,
callback
)
{
const
self
=
this
;
const
internals
=
this
[
kInternals
];
const
maxRate
=
internals
.
maxRate
;
const
readableHighWaterMark
=
this
.
readableHighWaterMark
;
const
timeWindow
=
internals
.
timeWindow
;
const
divider
=
1000
/
timeWindow
;
const
bytesThreshold
=
(
maxRate
/
divider
);
const
minChunkSize
=
internals
.
minChunkSize
!==
false
?
Math
.
max
(
internals
.
minChunkSize
,
bytesThreshold
*
0.01
)
:
0
;
function
pushChunk
(
_chunk
,
_callback
)
{
const
bytes
=
Buffer
.
byteLength
(
_chunk
);
internals
.
bytesSeen
+=
bytes
;
internals
.
bytes
+=
bytes
;
if
(
internals
.
isCaptured
)
{
internals
.
updateProgress
();
}
if
(
self
.
push
(
_chunk
))
{
process
.
nextTick
(
_callback
);
}
else
{
internals
.
onReadCallback
=
()
=>
{
internals
.
onReadCallback
=
null
;
process
.
nextTick
(
_callback
);
};
}
}
const
transformChunk
=
(
_chunk
,
_callback
)
=>
{
const
chunkSize
=
Buffer
.
byteLength
(
_chunk
);
let
chunkRemainder
=
null
;
let
maxChunkSize
=
readableHighWaterMark
;
let
bytesLeft
;
let
passed
=
0
;
if
(
maxRate
)
{
const
now
=
Date
.
now
();
if
(
!
internals
.
ts
||
(
passed
=
(
now
-
internals
.
ts
))
>=
timeWindow
)
{
internals
.
ts
=
now
;
bytesLeft
=
bytesThreshold
-
internals
.
bytes
;
internals
.
bytes
=
bytesLeft
<
0
?
-
bytesLeft
:
0
;
passed
=
0
;
}
bytesLeft
=
bytesThreshold
-
internals
.
bytes
;
}
if
(
maxRate
)
{
if
(
bytesLeft
<=
0
)
{
// next time window
return
setTimeout
(()
=>
{
_callback
(
null
,
_chunk
);
},
timeWindow
-
passed
);
}
if
(
bytesLeft
<
maxChunkSize
)
{
maxChunkSize
=
bytesLeft
;
}
}
if
(
maxChunkSize
&&
chunkSize
>
maxChunkSize
&&
(
chunkSize
-
maxChunkSize
)
>
minChunkSize
)
{
chunkRemainder
=
_chunk
.
subarray
(
maxChunkSize
);
_chunk
=
_chunk
.
subarray
(
0
,
maxChunkSize
);
}
pushChunk
(
_chunk
,
chunkRemainder
?
()
=>
{
process
.
nextTick
(
_callback
,
null
,
chunkRemainder
);
}
:
_callback
);
};
transformChunk
(
chunk
,
function
transformNextChunk
(
err
,
_chunk
)
{
if
(
err
)
{
return
callback
(
err
);
}
if
(
_chunk
)
{
transformChunk
(
_chunk
,
transformNextChunk
);
}
else
{
callback
(
null
);
}
});
}
setLength
(
length
)
{
this
[
kInternals
].
length
=
+
length
;
return
this
;
}
}
export
default
AxiosTransformStream
;
Event Timeline
Log In to Comment