Cothreads (or “cooperative threads”) are an approach to concurrent
programming where there is only one true thread of processing, but apparently
concurrent processes (or cothreads) can cooperatively share the processor.
Control is passed from one cothread to another when the current cothread
explicitly suspends control, ultimately via a call to a cothread library
routine. This means that between such suspending calls control will not be
interrupted. This has two advantages:
-
No locking between threads is required. This is a very helpful benefit of
using cothreads, as getting locking between multiple threads right can be
suprisingly difficult.
-
The thread of processing is frequently more predictable: between two
suspension points there is no possibility of unexpected background activity!
On the other hand, there is one disadvantage which needs to be kept in mind:
To use the cothread library the following overall structure should be
followed in the top level application:
# First the version of cothread library must be specified
from pkg_resources import require
require('cothread==1.6') # or just require('cothread')
# Import the cothread library in each module that uses it.
import cothread
# Enable Qt processing, hang onto application instance if needed.
qtapp = cothread.iqt()
# Do the real work of the module, including spawning any background tasks.
...
# Finally allow all background tasks to run to completion.
cothread.WaitForQuit()
-
require(cothread==1.6). This statement is required by the way Diamond
Controls modules are managed: it is necessary to specify precisely which
version of a module is to be used. This means that API changes can be made in
future releases without breaking existing code.
Alternatively require(cothread) can be used to request the most recent
installed version.
Note that the require statements should only occur once in each application:
it is an easy mistake to place them at the head of each Python module.
-
If Qt is to be used (for any graphical user interface) then the cothread
library needs to be informed: this is done by calling iqt() before any work
is done with Qt. This call ensures that Qt processing will occur while the
cothread scheduler is idle, and effectively turns Qt into another cothread.
The Qt application instance is created by this call and returned.
-
Finally the main cothread (the thread of control used to start and run the
program) must not exit until the program has finished. If all the desired
activity is in background tasks (spawned cothreads, camonitor processing or
other background activity) then the simplest thing is to call WaitForQuit()
before exiting: this will wait until the Quit function is called, or
control-C is pressed somewhere, or the last Qt window is closed.
Cothread Suspension Points
When using cothreads only certain function calls will cause control to be
yielded to another cothread, or in other words, will cause suspension of the
calling cothread — we call such a routine a “suspension point”.
Understanding suspension points is important for effective use of cothreads:
between suspension points no other cothread will run, and the current cothread
has exclusive control of the process (except for any “real” threads that
might be running). Once a suspension point is reached any other cothread can
run, in fact typically all other ready cothreads will run to their own
suspension points before control is returned to the suspended cothread.
The following are suspension points in the core cothread library:
|
Sleep, SleepUntil
|
Note that if a delay of 0 is specified to Sleep or
if the deadline for SleepUntil has passed the caller will not be suspended.
|
|
Yield
|
This suspends the caller until all other active cothreads have run
to their own suspension points.
|
|
event.Wait
|
On a Spawn, Event or EventQueue object the Wait
method will only suspend the caller if the event object is not yet ready and
any specified timeout has not yet expired.
|
The coselect module adds the following suspension points:
|
select, poll, poll_list
|
All of these routines suspend the caller until
at least one file descriptor in a monitored list is ready. The caller will be
suspended even if the ready condition is already met unless the timeout has
already expired.
|
In the catools module the following routines can cause suspension (note
that camonitor is the only routine guaranteed not to suspend):
|
caget
|
This always suspends the caller until the requested
channel access data has arrived (or a timeout occurs), the only exception
being if an expired timeout is specified.
|
|
caput
|
This routine will normally cause the caller to suspend. To avoid
suspension, only put to one PV, use wait=False, and ensure that the channel
is already connected — this will be the case if it has already been
successfully used in any catools method.
|
Order of Execution
It is possible to be fairly precise about the order in which certain processes
will occur.
-
Cothreads started by Spawn will initially be processed in the order in
which they were created with no other cothreads intervening.
-
Cothreads waiting on an event will be woken strictly in the sequence in
which waiting takes place, just so long as no timeout occurs. Cothreads woken
by timeouts generally execute after all all processing is complete.
This ordering of processing together with the fact that cothreads are only
suspended when control needs to be lost means that certain guarantees about
ordering of processing can be made, in particular see catools.caput.
Callbacks and Timers
Callbacks and timers are also provided through the cothread library, and it's
important to understand how they interact with other routines.
-
Timers
-
Timers are created by the Timer function documented below. The callback
that is invoked as part of the timer is a fresh cothread, spawned when the
timer is created. This means that the timer callback function can run for as
long as desired without interfering with other timer callbacks (so long as it
suspends regularly, of course!)
Note however that a timer will not retrigger itself until its current callback
routine completes.
-
Callbacks from camonitor
-
The callback routines called in response to camonitor are all invoked on a
single cothread. This means that extended processing within a single callback
will prevent any other callbacks from being processed. To avoid this either
spawn a new cothread to perform further process, or communicate with an
existing separate cothread.
-
Other callbacks
-
Other callbacks will depend on the library generating them, but it's safest to
treat them as “blocking” in the sense described above.
The following functions define the basic cothread interface provided by this
module.
-
Spawn(function, arguments, raise_on_wait=False, …)
-
A new cooperative thread, or cothread, is created as a call to
function(arguments) where arguments can be any list of values and keyword
arguments (except for the special raise_on_wait flag). This routine is not
a suspension point.
This is the fundamental building block of the cothreading library. It is
quite cheap to spawn fresh cothreads, and so this routine can be used freely.
It is possible to wait for the completion of a spawned cothread by calling its
Wait method:
-
Wait(timeout=None)
-
This blocks until the spawned cothread completes, either by returning from its
function call, or by raising an exception. If the cothread was created with
raise_on_wait set to True then any exception raised by the cothread will
be re-raised when Wait is called.
-
Sleep(delay), SleepUntil(time)
-
The calling task is suspended until the given time. Sleep(delay)
suspends the task for at least delay seconds, SleepUntil(time)
suspends until the specified time has passed (time is defined as the
value returned by time.time()).
-
Yield(timeout=0)
-
Yield() suspends control so that all other potentially busy tasks can
run. Control is not returned to the calling task until all other
active tasks have been processed, or the timeout has expired.
Communication between cothreads is provided by Event and EventQueue
objects. An Event can hold at most one value (or signal), while an
EventQueue can hold a list of unbounded length.
-
Event(auto_reset=True)
-
Event objects are initially created unsignalled. The auto_reset flag
determines whether the signalled state of the event object is persistent, and
determines how many cothreads are woken when Signal is called on an event.
The bool state of an event object is True iff it is signalled.
The following methods define the behaviour of this object.
-
Wait(timeout=None)
-
The calling cothread will be suspended until a signal is written to the
Event by a call to Signal(), at which point the value passed to Signal()
is returned. If a timeout occurs (a timeout of None specifies no timeout)
this is signalled by raising the exception Timedout.
If auto_reset was specified as True then the signal is consumed, and
subsequent calls to Wait will block until further Signal calls occur.
-
Signal(value=None)
-
The event object is marked as signalled and the value passed is recorded to be
returned by a call to Wait. If one or more cothreads are waiting for a
signal then at least one will be woken with the new value (if auto_reset is
True then only one will be woken, otherwise all will be).
Note that this routine does not suspend the caller, even if another cothread
is woken: it will not process until later.
-
SignalException(exception)
-
This is similar in effect to Signal, but the effect on cothreads calling
Wait is that they will receive the given exception.
-
Reset()
-
Resets the signal and erases its value. Also erases any exception written to
the event.
-
EventQueue()
-
The EventQueue is designed to support the communication of a stream of
values between two cothreads. Calling len() on an event queue returns the
number of entries currently in its queue. An event queue can also be consumed
as an iterator, see code example below.
The following methods are supported:
-
Wait(timeout=None)
-
Returns the next object from the queue, blocking if necessary. If a timeout
occurs then Timedout is raised. If the queue has been closed then
StopIteration is raised.
If the queue is non empty when Wait() is called control will not be
suspended.
-
Signal(value)
-
Adds the given value to the queue, waking up a waiting cothread if one is
waiting. This routine does not suspend the caller.
Example code using iteration over an EventQueue.
def consumer(e):
for x in e:
print 'consumed', x
eq = EventQueue()
Spawn(consumer, eq)
for i in range(10):
eq.Signal(i)
Sleep(1)
-
ThreadedEventQueue()
-
The ThreadedEventQueue behaves like an EventQueue, but is designed to be
used to communicate between a Python thread outside of the cothread library
and a cothread. Communication can occur in either direction: an outside
thread can call Signal on a threaded event queue while a cothread calls
Wait, or vice versa.
If a thread calls Wait it will block until a cothread (or another thread)
calls Signal. If this is undesirable then the field .wait_descriptor can
be waited on using the standard select or poll functions. Note that this
file handle must only be used for waiting, and must not be read!
-
Timer(timeout, callback, retrigger=False)
-
This triggers a call to callback after a delay of at least timeout
seconds. If retrigger is True then after callback completes the timer
will be reenabled and the cycle will repeat, otherwise only one call will
occur.
The timer can be cancelled at any time before it has triggered by calling
.cancel() on the timer object created by calling Timer().
-
WaitForAll(event_list, timeout=None)
-
This routine waits for all events in event_list to become ready: this is
done by simply iterating through all the events in turn, waiting for them to
complete. If timeout expires then an exception is raised.
-
Quit(), WaitForQuit(catch_interrupt=True)
-
The routine WaitForQuit blocks until Quit is called, or until interrupted
by an exception. By default (if catch_interrupt=True is set) the keyboard
interrupt is silently caught, but still causes WaitForQuit to exit.
This is designed to be used as the final blocking call at the end of the main
program so that other event loops can run.
-
iqt(poll_interval=0.05, use_timer=False)
-
If Qt is to be used then this routine must be called during initialisation to
enable the Qt event loop. The Qt application instance is returned.
The normal Qt event look hook does not work correctly with modal dialogs
(because they run their own message loops) — typically either a modal window
will be closed immediately, or else will block the interpreter. To work
around this, a Qt timer can be used for polling by setting use_timer to
True in this call. Note that this is highly experimental, and in particular
modal dialogs do not work properly with cothread when using Qt4.
Coselect Functions
To enable cothreaded access to sockets and other external event generating
sources the cothread.coselect library provides coperative implementations of
select and poll from the Python library select module. This module
provides the following functions.
-
select(iwtd, owtd, ewtd, timeout=None)
-
Cooperative select function, interface compatible with the Python library
select.select function (though the exceptions raised are different).
-
poll()
-
Cooperative poll object, interface compatible with the Python library
select.poll object.
-
poll_list(event_list, timeout=None)
-
Simpler function for waiting for one or more events to occur. This function
is used to implement the more compatible select and poll interfaces.
The event_list parameter is a list of pairs, each consisting of a waitable
descriptor and an event mask (generated by oring together POLL…
constants). This routine will cooperatively block until any descriptor
signals a selected event (or any event from HUP, ERR, NVAL) or until
the timeout (in seconds) occurs.