Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F91103766
PhutilChannel.php
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Thu, Nov 7, 22:48
Size
10 KB
Mime Type
text/x-php
Expires
Sat, Nov 9, 22:48 (2 d)
Engine
blob
Format
Raw Data
Handle
22197287
Attached To
rPHU libphutil
PhutilChannel.php
View Options
<?php
/**
* Wrapper arounds streams, pipes, and other things that have basic read/write
* I/O characteristics.
*
* Channels include buffering, so you can do fire-and-forget writes and reads
* without worrying about network/pipe buffers. Use @{method:read} and
* @{method:write} to read and write.
*
* Channels are nonblocking and provide a select()-oriented interface so you
* can reasonably write server-like and daemon-like things with them. Use
* @{method:waitForAny} to select channels.
*
* Channel operations other than @{method:update} generally operate on buffers.
* Writes and reads affect buffers, while @{method:update} flushes output
* buffers and fills input buffers.
*
* Channels are either "open" or "closed". You can detect that a channel has
* closed by calling @{method:isOpen} or examining the return value of
* @{method:update}.
*
* NOTE: Channels are new (as of June 2012) and subject to interface changes.
*
* @task io Reading and Writing
* @task wait Waiting for Activity
* @task update Responding to Activity
* @task impl Channel Implementation
*
* @group channel
*/
abstract
class
PhutilChannel
{
private
$ibuf
=
''
;
private
$obuf
;
private
$name
;
private
$readBufferSize
;
public
function
__construct
()
{
$this
->
obuf
=
new
PhutilRope
();
}
/* -( Reading and Writing )------------------------------------------------ */
/**
* Read from the channel. A channel defines the format of data that is read
* from it, so this method may return strings, objects, or anything else.
*
* The default implementation returns bytes.
*
* @return wild Data from the channel, normally bytes.
*
* @task io
*/
public
function
read
()
{
$result
=
$this
->
ibuf
;
$this
->
ibuf
=
''
;
return
$result
;
}
/**
* Write to the channel. A channel defines what data format it accepts,
* so this method may take strings, objects, or anything else.
*
* The default implementation accepts bytes.
*
* @param wild Data to write to the channel, normally bytes.
* @return this
*
* @task io
*/
public
function
write
(
$bytes
)
{
if
(!
is_scalar
(
$bytes
))
{
throw
new
Exception
(
'PhutilChannel->write() may only write strings!'
);
}
$this
->
obuf
->
append
(
$bytes
);
return
$this
;
}
/* -( Waiting for Activity )----------------------------------------------- */
/**
* Wait for any activity on a list of channels. Convenience wrapper around
* @{method:waitForActivity}.
*
* @param list<PhutilChannel> A list of channels to wait for.
* @param dict Options, see above.
* @return void
*
* @task wait
*/
public
static
function
waitForAny
(
array
$channels
,
array
$options
=
array
())
{
return
self
::
waitForActivity
(
$channels
,
$channels
,
$options
);
}
/**
* Wait (using select()) for channels to become ready for reads or writes.
* This method blocks until some channel is ready to be updated.
*
* It does not provide a way to determine which channels are ready to be
* updated. The expectation is that you'll just update every channel. This
* might change eventually.
*
* Available options are:
*
* - 'read' (list<stream>) Additional streams to select for read.
* - 'write' (list<stream>) Additional streams to select for write.
* - 'except' (list<stream>) Additional streams to select for except.
* - 'timeout' (float) Select timeout, defaults to 1.
*
* NOTE: Extra streams must be //streams//, not //sockets//, because this
* method uses `stream_select()`, not `socket_select()`.
*
* @param list<PhutilChannel> List of channels to wait for reads on.
* @param list<PhutilChannel> List of channels to wait for writes on.
* @return void
*
* @task wait
*/
public
static
function
waitForActivity
(
array
$reads
,
array
$writes
,
array
$options
=
array
())
{
assert_instances_of
(
$reads
,
'PhutilChannel'
);
assert_instances_of
(
$writes
,
'PhutilChannel'
);
$read
=
idx
(
$options
,
'read'
,
array
());
$write
=
idx
(
$options
,
'write'
,
array
());
$except
=
idx
(
$options
,
'except'
,
array
());
$wait
=
idx
(
$options
,
'timeout'
,
1
);
// TODO: It would be nice to just be able to categorically reject these as
// unselectable.
foreach
(
array
(
$reads
,
$writes
)
as
$channels
)
{
foreach
(
$channels
as
$channel
)
{
$r_sockets
=
$channel
->
getReadSockets
();
$w_sockets
=
$channel
->
getWriteSockets
();
// If any channel has no read sockets and no write sockets, assume it
// isn't selectable and return immediately (effectively degrading to a
// busy wait).
if
(!
$r_sockets
&&
!
$w_sockets
)
{
return
false
;
}
}
}
foreach
(
$reads
as
$channel
)
{
// If any of the read channels have data in read buffers, return
// immediately. If we don't, we risk running select() on a bunch of
// sockets which won't become readable because the data the application
// expects is already in a read buffer.
if
(!
$channel
->
isReadBufferEmpty
())
{
return
;
}
$r_sockets
=
$channel
->
getReadSockets
();
foreach
(
$r_sockets
as
$socket
)
{
$read
[]
=
$socket
;
$except
[]
=
$socket
;
}
}
foreach
(
$writes
as
$channel
)
{
if
(
$channel
->
isWriteBufferEmpty
())
{
// If the channel's write buffer is empty, don't select the write
// sockets, since they're writable immediately.
$w_sockets
=
array
();
}
else
{
$w_sockets
=
$channel
->
getWriteSockets
();
}
foreach
(
$w_sockets
as
$socket
)
{
$write
[]
=
$socket
;
$except
[]
=
$socket
;
}
}
if
(!
$read
&&
!
$write
&&
!
$except
)
{
return
false
;
}
$wait_sec
=
(
int
)
$wait
;
$wait_usec
=
1000000
*
(
$wait
-
$wait_sec
);
@
stream_select
(
$read
,
$write
,
$except
,
$wait_sec
,
$wait_usec
);
}
/* -( Responding to Activity )--------------------------------------------- */
/**
* Updates the channel, filling input buffers and flushing output buffers.
* Returns false if the channel has closed.
*
* @return bool True if the channel is still open.
*
* @task update
*/
public
function
update
()
{
$maximum_read
=
PHP_INT_MAX
;
if
(
$this
->
readBufferSize
!==
null
)
{
$maximum_read
=
(
$this
->
readBufferSize
-
strlen
(
$this
->
ibuf
));
}
while
(
$maximum_read
>
0
)
{
$in
=
$this
->
readBytes
(
$maximum_read
);
if
(!
strlen
(
$in
))
{
// Reading is blocked for now.
break
;
}
$this
->
ibuf
.=
$in
;
$maximum_read
-=
strlen
(
$in
);
}
while
(
$this
->
obuf
->
getByteLength
())
{
$len
=
$this
->
writeBytes
(
$this
->
obuf
->
getAnyPrefix
());
if
(!
$len
)
{
// Writing is blocked for now.
break
;
}
$this
->
obuf
->
removeBytesFromHead
(
$len
);
}
return
$this
->
isOpen
();
}
/* -( Channel Implementation )--------------------------------------------- */
/**
* Set a channel name. This is primarily intended to allow you to debug
* channel code more easily, by naming channels something meaningful.
*
* @param string Channel name.
* @return this
*
* @task impl
*/
public
function
setName
(
$name
)
{
$this
->
name
=
$name
;
return
$this
;
}
/**
* Get the channel name, as set by @{method:setName}.
*
* @return string Name of the channel.
*
* @task impl
*/
public
function
getName
()
{
return
coalesce
(
$this
->
name
,
get_class
(
$this
));
}
/**
* Test if the channel is open: active, can be read from and written to, etc.
*
* @return bool True if the channel is open.
*
* @task impl
*/
abstract
public
function
isOpen
();
/**
* Close the channel for writing.
*
* @return void
* @task impl
*/
abstract
public
function
closeWriteChannel
();
/**
* Test if the channel is open for reading.
*
* @return bool True if the channel is open for reading.
*
* @task impl
*/
public
function
isOpenForReading
()
{
return
$this
->
isOpen
();
}
/**
* Test if the channel is open for writing.
*
* @return bool True if the channel is open for writing.
*
* @task impl
*/
public
function
isOpenForWriting
()
{
return
$this
->
isOpen
();
}
/**
* Read from the channel's underlying I/O.
*
* @param int Maximum number of bytes to read.
* @return string Bytes, if available.
*
* @task impl
*/
abstract
protected
function
readBytes
(
$length
);
/**
* Write to the channel's underlying I/O.
*
* @param string Bytes to write.
* @return int Number of bytes written.
*
* @task impl
*/
abstract
protected
function
writeBytes
(
$bytes
);
/**
* Get sockets to select for reading.
*
* @return list<stream> Read sockets.
*
* @task impl
*/
protected
function
getReadSockets
()
{
return
array
();
}
/**
* Get sockets to select for writing.
*
* @return list<stream> Write sockets.
*
* @task impl
*/
protected
function
getWriteSockets
()
{
return
array
();
}
/**
* Set the maximum size of the channel's read buffer. Reads will artificially
* block once the buffer reaches this size until the in-process buffer is
* consumed.
*
* @param int|null Maximum read buffer size, or `null` for a limitless buffer.
* @return this
* @task impl
*/
public
function
setReadBufferSize
(
$size
)
{
$this
->
readBufferSize
=
$size
;
return
$this
;
}
/**
* Test state of the read buffer.
*
* @return bool True if the read buffer is empty.
*
* @task impl
*/
public
function
isReadBufferEmpty
()
{
return
(
strlen
(
$this
->
ibuf
)
==
0
);
}
/**
* Test state of the write buffer.
*
* @return bool True if the write buffer is empty.
*
* @task impl
*/
public
function
isWriteBufferEmpty
()
{
return
!
$this
->
getWriteBufferSize
();
}
/**
* Get the number of bytes we're currently waiting to write.
*
* @return int Number of waiting bytes.
*
* @task impl
*/
public
function
getWriteBufferSize
()
{
return
$this
->
obuf
->
getByteLength
();
}
/**
* Wait for any buffered writes to complete. This is a blocking call. When
* the call returns, the write buffer will be empty.
*
* @task impl
*/
public
function
flush
()
{
while
(!
$this
->
isWriteBufferEmpty
())
{
self
::
waitForAny
(
array
(
$this
));
if
(!
$this
->
update
())
{
throw
new
Exception
(
'Channel closed while flushing output!'
);
}
}
return
$this
;
}
}
Event Timeline
Log In to Comment