Synchronous Communication
Module Event from the thread library implements the
communication of assorted values between two processes through
particular ``communication channels''. The effective communication of
the value is synchronized through send and receive events.
This model of communication synchronized by events allows the transfer
through typed channels of the values of the language, including closures,
objects, and events.
It is described in [Rep99].
Synchronization using Communication Events
The primitive communication events are:
-
send c v sends a value v on the channel c;
- receive c receives a value on the channel c
So as to implement the physical action with which they are associated,
two events should be synchronized. For this purpose, we introduce an operation
of synchronization (sync) on events. The sending and
receiving of a value are not effective unless the two communicating
processes are in phase. If a single process wishes to synchronize
itself, the operation gets blocked, waiting for the second process to perform
its synchronization. This implies that a sender wishing to synchronize
the sending of a value (sync
(send
c
v)) can find itself
blocked waiting for a synchronization from a receiver
(sync
(receive
c)).
Transmitted Values
The communication channels through which the exchanged values
travel are typed: Nothing prevents us from creating multiple channels
for communicating each type of value. As this communication takes
place between Objective CAML threads, any value of the language can be sent
on a channel of the same type. This is useful for closures, objects,
and also events, for a ``relayed'' synchronization request.
Module Event
The values encapsulated in communication events travel through
communication channels of the abstract data type 'a channel.
The creation function for channels is:
# Event.new_channel
;;
- : unit -> 'a Event.channel = <fun>
Send and receive events are created by a function call:
# Event.send
;;
- : 'a Event.channel -> 'a -> unit Event.event = <fun>
# Event.receive
;;
- : 'a Event.channel -> 'a Event.event = <fun>
We can consider the functions send and receive as
constructors of the abstract type 'a event. The event
constructed by send does not preserve the information about the type
of the value to transmit (type unit Event.event). On the
other hand, the receive event takes account of it to recover the value
during a synchronization. These functions are non-blocking in the
sense that the transmission of a value does not take place until the
time of the synchronization of two processes by the function:
# Event.sync
;;
- : 'a Event.event -> 'a = <fun>
This function may be blocking for the sender and the receiver.
There is a non-blocking version:
# Event.poll
;;
- : 'a Event.event -> 'a option = <fun>
This function verifies that another process is waiting for synchronization.
If this is the case, it performs the transmissions, and returns the
value Some v, if v is the value associated with the
event, and None otherwise. The received message, extracted
by the function sync, can be the result of a more or less
complicated process, triggering other exchanges of messages.
Example of synchronization.
We define three threads. The first, t1, sends a chain of
characters on channel c (function g) shared by
all the processes. The two others t2 and t3 wait
for a value on the same channel. Here are the functions executed by
the different processes:
# let
c
=
Event.new_channel
();;
val c : '_a Event.channel = <abstr>
# let
f
()
=
let
ids
=
string_of_int
(Thread.id
(Thread.self
()))
in
print_string
("-------- before -------"
^
ids)
;
print_newline()
;
let
e
=
Event.receive
c
in
print_string
("-------- during -------"
^
ids)
;
print_newline()
;
let
v
=
Event.sync
e
in
print_string
(v
^
" "
^
ids
^
" "
)
;
print_string
("-------- after -------"
^
ids)
;
print_newline()
;;
val f : unit -> unit = <fun>
# let
g
()
=
let
ids
=
string_of_int
(Thread.id
(Thread.self
()))
in
print_string
("Start of "
^
ids
^
"\n"
);
let
e2
=
Event.send
c
"hello"
in
Event.sync
e2
;
print_string
("End of "
^
ids)
;
print_newline
()
;;
val g : unit -> unit = <fun>
The three processes are created and executed:
# let
t1,
t2,
t3
=
Thread.create
f
(),
Thread.create
f
(),
Thread.create
g
();;
val t1 : Thread.t = <abstr>
val t2 : Thread.t = <abstr>
val t3 : Thread.t = <abstr>
# Thread.delay
1
.
0
;;
Start of 5
-------- before -------6
-------- during -------6
hello 6 -------- after -------6
-------- before -------7
-------- during -------7
End of 5
- : unit = <unknown constructor>
The transmission may block. The trace of
t1 is displayed after the synchronization traces of
t2 and t3. Only one of the two processes t1 or
t2 is really terminated, as the following calls show:
# Thread.kill
t1;;
- : unit = ()
# Thread.kill
t2;;
Uncaught exception: Failure("Thread.kill: killed thread")