In this section, we use the Msg library to implement a Remote Procedure Call module, RPC.pm. The idea of RPC is to transparently invoke a subroutine in another process space and have it behave exactly as if it had been invoked in its own process. The following are the features we take for granted while calling ordinary subroutines, which the RPC module takes into account:
The caller waits until the called procedure finishes. The RPC module invokes Msg::send_now and Msg::rcv_now to get this blocking behavior.
A Perl subroutine can take any number of parameters of any type (including references to objects, complex data structures, and subroutines). The RPC module uses the FreezeThaw module described in Chapter 10, Persistence, for marshalling parameters: all parameters are flattened and encoded into a single string (frozen) and recovered on the other side (thawed). This means that all data structures sent by reference are copied in their entirety so that the receiving subroutine on the other side can get a reference to an object (as it would if it were in the same process). FreezeThaw - and hence RPC - does not account for code references, because there is no way (in Perl) to decode a code reference and get the subroutine text (because it could be compiled into machine code). We could create a dummy subroutine on the remote side and have it make a nested RPC invocation back to the real code reference, but the current implementation does not have this feature (though it doesn't preclude it).
A subroutine can use wantarray to find out whether the caller is specifying a list or scalar context. The fact that the subroutine is being called from a remote process should not be an issue. The RPC module arranges the necessary transparency. Another example of context (not to be confused with the Perl meaning of the word) is the caller's package. When you say foo(), you mean foo() in the current package.
A subroutine can invoke die, and expect its caller to trap it. The receiving side of RPC invokes the target subroutine inside an eval, and if it dies, it sends a message back to the calling process, which in turn invokes a die
in its own space with the error received.
Subroutine A can call subroutine B, which in turn can call A - they are said to be mutually recursive. RPC allows this because of its ability to process incoming messages while it is blocked on a send.
Traditional RPC systems are subject to deadlocks if two peer processes decide to call each other at the same time, because as we saw in Chapter 12, they are too impolite to listen to what the other is saying. Not so with RPC. In fact, it can dispatch incoming messages on all file descriptors while, from the caller's perspective, it is still blocked.
Typical RPC systems generate client and server stub code, but RPC does not need to - a tribute to the dynamic aspects of Perl.
Let us take a look at a sample use of the RPC module. The client is shown first:
# Client stuff use RPC; my $conn = RPC->connect($host, $port); my $answer = $conn->rpc('ask_sheep', "Ba ba black sheep, have you any wool ?"); print "$answer\n";
The client sets up an RPC connection, given a host and port. A subroutine that is normally invoked as
$answer = ask_sheep ($question);
is invoked by using RPC as follows:
$answer = $conn->rpc ("ask_sheep", $question);
The client code knows it is making an RPC call. Making this transparent (as typical RPC systems do) is quite simple, really. Using eval, we can dynamically create a dummy client stub called ask_sheep on the caller's side and have it make the call to rpc().
The called subroutine, however, does not know whether it has been invoked locally or from a remote process (unless of course, it uses caller() to find out).
The remote process (call it the RPC server) provides the required subroutines and invokes new_server and event_loop to accept incoming RPC calls; ask_sheep will get called at the right time. Simple!
# Server stuff RPC->new_rpc_server($host, $port); RPC->event_loop(); sub ask_sheep { # Sample subroutine to be invoked from client print "Question: @_\n"; return "No"; }
Now, let us look at an example of using RPC between peer processes. Process 1 (identified by $host1, $port1) calls subroutine two
on Process 2 ($host2, $port2), which in turn calls subroutine one back on Process 1.
Process 1 looks like this:
sub one { print "One called\n"; } $conn2 = RPC->new_rpc_server($host2, $port2); $conn2->rpc ("two");
Process 2 looks like this:
sub two { print "Two called\n"; } $conn1 = RPC->new_rpc_server($host1, $port1); $conn1->rpc ("one");
Each process calls new_rpc_server to establish a listening port. Since the rpc call listens to incoming messages while it is still sending stuff out, neither process needs to call event_loop explicitly. A process that intends to hang around for a while should, of course, do so.
The RPC implementation is surprisingly small, thanks to the Msg and FreezeThaw modules. It inherits from Msg to provide the same connection and event loop abstractions.
Let us examine the calling side first:
package RPC; use Msg; use strict; use Carp; @RPC::ISA = qw(Msg); use FreezeThaw qw(freeze thaw); sub connect { my ($pkg, $host, $port) = @_; my $conn = $pkg->SUPER::connect($host,$port, \&_incoming_msg); return $conn; }
connect simply calls Msg's connect, with _incoming_msg as the subroutine to notify on all incoming messages (including responses to subroutine calls and end-of-file indications). It leaves it to Msg's connect to create a connection object and bless it under RPC's auspices. Both Msg and RPC have been written so that they can be inherited by another module; the package name is not hardcoded.
my $g_msg_id = 0; my $send_err = 0; sub handle_send_err { $send_err = $!; }
handle_send_err overrides Msg::handle_send_err and stores any errors encountered while sending a message. This error code is checked in rpc, as shown next. The error handling in both RPC and Msg is definitely not up to snuff and needs a considerable amount of work before it can be reliably used in a production application.
sub rpc { my $conn = shift; my $subname = shift; $subname = (caller() . '::' . $subname) unless $subname =~ /:/; my $gimme = wantarray ? 'a' : 's'; # Array or scalar my $msg_id = ++$g_msg_id; my $serialized_msg = freeze ('>', $msg_id, $gimme, @_); # Send and Receive $conn->send_later ($serialized_msg); do { Msg->event_loop(1); # Dispatch other messages until we # get a response } until (exists $conn->{rcvd}->{$msg_id} || $send_err); if ($send_err) { die "RPC Error: $send_err"; } # Dequeue message. my $rl_retargs = delete $conn->{rcvd}->{$msg_id}; # ref to list if (ref($rl_retargs->[0]) eq 'RPC::Error') { die ${$rl_retargs->[0]}; } wantarray ? @$rl_retargs : $rl_retargs->[0]; }
rpc uses the FreezeThaw module's freeze method to bundle the following pieces of information into one big string:
Name of the remote subroutine. The caller's module is prepended to the subroutine name if it is not fully qualified, which is the behavior expected of a normal subroutine.
wantarray indicator ($gimme): "s" for scalar, "a" for array.
Request or response indicator. ">" indicates request, and "<" indicates response. When the receiver gets a message, it should know whether it is a response to an outgoing message or an incoming request that it is expected to evaluate.
A message identifier. This is to identify the response corresponding to this request.
The freeze method accounts for cyclic data structures and objects and returns one ASCII string, which means that we don't have to worry about the size of native integers or doubles or their memory layout (byte order). Msg->send_later() is used because it triggers nonblocking I/O where available. The message is really sent only when event_loop is called, because it determines when the socket is writable. At the same time, event_loop tracks other incoming messages and dispatches them. The count of 1 forces the event loop to return right after dispatching one round of messages, so we can retain control. When the response comes from the remote host, event_loop calls _incoming_msgs, which decodes it and hangs the return arguments on the connection object. Read on.
Let us now take a look at the receiving side:
sub new_server { my ($pkg, $my_host, $my_port) = @_; $pkg->SUPER::new_server($my_host, $my_port, sub {$pkg->_login(@_)}); } sub _login { \&_incoming_msg; }
new_server, like connect, is a simple wrapper over its Msg counterpart. All incoming connections are unconditionally accepted by default, and messages are directed towards the subroutine _incoming_msg, shown next. Calling the _login procedure indirectly via $pkg gives you the opportunity to subclass RPC and supply your own _login procedure and refuse the connection if needed.
sub _incoming_msg { my ($conn, $msg, $err) = @_; return if ($err); # Need better error handling. return unless defined($msg); my ($dir, $id, @args) = thaw ($msg); my ($result, @results); if ($dir eq '>') { # New request message my $gimme = shift @args; my $sub_name = shift @args; eval { no strict 'refs'; # Because we call the subroutine using # a symbolic reference if ($gimme eq 'a') { # Want an array back @results = &{$sub_name} (@args); } else { $result = &{$sub_name} (@args); } }; if ($@) { $msg = bless \$@, "RPC::Error"; $msg = freeze('<', $id, $msg); } elsif ($gimme eq 'a') { $msg = freeze('<', $id, @results); } else { $msg = freeze('<', $id, $result); } $conn->send_later($msg); } else { # Response to a message we had sent out earlier $conn->{rcvd}->{$id} = \@args; } }
_incoming_msg is the counterpart to the rpc method. It unpacks the message sent by rpc and checks the direction (whether it is a request or a response). If it is a request, it calls the required subroutine using a symbolic reference. Notice that depending on the wantarray indication, it provides a scalar or vector result parameter. If eval reports an error, the $@ variable is stamped with an RPC::Error module tag and shipped back to the calling process (which invokes die).