14.5 Threaded Program Architecture
A threaded program should always
arrange for a single thread to deal with any
given object or subsystem that is external to the program (such as a
file, a database, a GUI, or a network connection). Having multiple
threads that deal with the same external object can often cause
unpredictable
problems.
Whenever your threaded program
must deal with some external object, devote a thread to such
dealings, using a Queue object from which the
external-interfacing thread gets work requests that other threads
post. The external-interfacing thread can return results by putting
them on one or more other Queue objects. The
following example shows how to package this architecture into a
general, reusable class, assuming that each unit of work on the
external subsystem can be represented by a callable object:
import Threading, Queue
class ExternalInterfacing(Threading.Thread):
def _ _init_ _(self, externalCallable, **kwds):
Threading.Thread._ _init_ _(self, **kwds)
self.setDaemon(1)
self.externalCallable = externalCallable
self.workRequestQueue = Queue.Queue( )
self.resultQueue = Queue.Queue( )
self.start( )
def request(self, *args, **kwds):
"called by other threads as externalCallable would be"
self.workRequestQueue.put((args,kwds))
return self.resultQueue.get( )
def run(self):
while 1:
args, kwds = self.workRequestQueue.get( )
self.resultQueue.put(self.externalCallable(*args, **kwds))
Once some ExternalInterfacing object
ei is instantiated, all other threads may
now call ei.request
just like they would call
someExternalCallable without such a
mechanism (with or without arguments as appropriate). The advantage
of the ExternalInterfacing mechanism is that all
calls upon someExternalCallable are now
serialized. This means they are performed by just one thread (the
thread object bound to ei) in some defined
sequential order, without overlap, race conditions (hard-to-debug
errors that depend on which thread happens to get there first), or
other anomalies that might otherwise
result.
If several callables need to be serialized together, you can pass the
callable as part of the work request, rather than passing it at the
initialization of class ExternalInterfacing, for
greater generality. The following example shows this more general
approach:
import Threading, Queue
class Serializer(Threading.Thread):
def _ _init_ _(self, **kwds):
Threading.Thread._ _init_ _(self, **kwds)
self.setDaemon(1)
self.workRequestQueue = Queue.Queue( )
self.resultQueue = Queue.Queue( )
self.start( )
def apply(self, callable, *args, **kwds):
"called by other threads as callable would be"
self.workRequestQueue.put((callable, args,kwds))
return self.resultQueue.get( )
def run(self):
while 1:
callable, args, kwds = self.workRequestQueue.get( )
self.resultQueue.put(callable(*args, **kwds))
Once a Serializer object
ser has been instantiated, other threads
may call
ser.apply(someExternalCallable)
just like they would call
someExternalCallable without such a
mechanism (with or without further arguments as appropriate). The
Serializer mechanism has the same advantages as
ExternalInterfacing, except that all calls to the
same or different callables wrapped by a single
ser instance are now serialized.
The user interface of the whole program
is an external subsystem and thus should be dealt with by a single
thread, specifically the main thread of the program (this is
mandatory for some user interface toolkits and advisable even when
not mandatory). A Serializer thread is therefore
inappropriate. Rather, the program's main thread
should deal only with user interface issues, and farm out actual work
to worker threads that accept work requests on a
Queue object and return results on another. A set
of worker threads is also known as a thread
pool. As shown in the following example, all worker
threads should share a single queue of requests and a single queue of
results, since the main thread will be the only one posting work
requests and harvesting results:
import Threading
class Worker(Threading.Thread):
requestID = 0
def _ _init_ _(self, requestsQueue, resultsQueue, **kwds):
Threading.Thread._ _init_ _(self, **kwds)
self.setDaemon(1)
self.workRequestQueue = requestsQueue
self.resultQueue = resultsQueue
self.start( )
def performWork(self, callable, *args, **kwds):
"called by the main thread as callable would be, but w/o return"
Worker.requestID += 1
self.workRequestQueue.put((Worker.requestID, callable, args,kwds))
return Worker.requestID
def run(self):
while 1:
requestID, callable, args, kwds = self.workRequestQueue.get( )
self.resultQueue.put((requestID, callable(*args, **kwds)))
The main thread creates the two queues, then instantiates worker
threads as follows:
import Queue
requestsQueue = Queue.Queue( )
resultsQueue = Queue.Queue( )
for i in range(numberOfWorkers):
worker = Worker(requestsQueue, resultsQueue)
Now, whenever the main thread needs to farm out work (execute some
callable object that may take substantial time to produce results),
the main thread calls
worker.performWork(callable)
much like it would call callable without
such a mechanism (with or without further arguments as appropriate).
However, performWork does not return the result of
the call. Instead of the results, the main thread gets an
id that identifies the work request. If
the main thread needs the results, it can keep track of that
id, since the request's
results will be tagged with that id when
they appear. The advantage of the mechanism is that the main thread
does not block waiting for the callable's lengthy
execution to complete, but rather becomes ready again at once and can
immediately return to its main business of dealing with the user
interface.
The main thread must arrange to check the
resultsQueue, since the result of each work
request eventually appears there, tagged with the
request's id, when the
worker thread that took that request from the queue finishes
computing the result. How the main thread arranges to check for both
user interface events and the results coming back from worker threads
onto the results queue depends on what user interface toolkit is
used, or, if the user interface is text-based, on the platform on
which the program runs.
A widely applicable general strategy is for the main thread to
poll (i.e., check the state of the results
queue periodically). On most Unix-like platforms, function
alarm of module signal allows
polling. The Tkinter GUI toolkit supplies method
after, usable for polling. Some toolkits and
platforms afford more effective strategies, letting a worker thread
alert the main thread when it places some result on the results
queue, but there is no generally available, cross-platform, and
cross-toolkit way to arrange for this. Therefore, the following
artificial example ignores user interface events, and just simulates
work by evaluating random expressions, with random delays, on several
worker threads, thus completing the previous
example:
import random, time
def makeWork( ):
return "%d %s %d"%(random.randrange(2,10),
random.choice(('+', '-', '*', '/', '%', '**')),
random.randrange(2,10))
def slowEvaluate(expressionString):
time.sleep(random.randrange(1,5))
return eval(expressionString)
workRequests = { }
def showResults( ):
while 1:
try: id, results = resultsQueue.get_nowait( )
except Queue.Empty: return
print 'Result %d: %s -> %s' % (id, workRequests[id], results)
del workRequests[id]
for i in range(10):
expressionString = makeWork( )
id = worker.performWork(slowEvaluate, expressionString)
workRequests[id] = expressionString
print 'Submitted request %d: %s' % (id, expressionString)
time.sleep(1)
showResults( )
while workRequests:
time.sleep(1)
showResults( )
|