Group
Extension

AnyEvent-Fork-RPC/RPC.pm

=head1 NAME

AnyEvent::Fork::RPC - simple RPC extension for AnyEvent::Fork

=head1 SYNOPSIS

   use AnyEvent::Fork;
   use AnyEvent::Fork::RPC;

   my $rpc = AnyEvent::Fork
      ->new
      ->require ("MyModule")
      ->AnyEvent::Fork::RPC::run (
         "MyModule::server",
      );

   use AnyEvent;

   my $cv = AE::cv;

   $rpc->(1, 2, 3, sub {
      print "MyModule::server returned @_\n";
      $cv->send;
   });

   $cv->recv;

=head1 DESCRIPTION

This module implements a simple RPC protocol and backend for processes
created via L<AnyEvent::Fork> or L<AnyEvent::Fork::Remote>, allowing you
to call a function in the child process and receive its return values (up
to 4GB serialised).

It implements two different backends: a synchronous one that works like a
normal function call, and an asynchronous one that can run multiple jobs
concurrently in the child, using AnyEvent.

It also implements an asynchronous event mechanism from the child to the
parent, that could be used for progress indications or other information.

=head1 EXAMPLES

=head2 Example 1: Synchronous Backend

Here is a simple example that implements a backend that executes C<unlink>
and C<rmdir> calls, and reports their status back. It also reports the
number of requests it has processed every three requests, which is clearly
silly, but illustrates the use of events.

First the parent process:

   use AnyEvent;
   use AnyEvent::Fork;
   use AnyEvent::Fork::RPC;

   my $done = AE::cv;

   my $rpc = AnyEvent::Fork
      ->new
      ->require ("MyWorker")
      ->AnyEvent::Fork::RPC::run ("MyWorker::run",
         on_error   => sub { warn "ERROR: $_[0]"; exit 1 },
         on_event   => sub { warn "$_[0] requests handled\n" },
         on_destroy => $done,
      );

   for my $id (1..6) {
      $rpc->(rmdir => "/tmp/somepath/$id", sub {
         $_[0]
            or warn "/tmp/somepath/$id: $_[1]\n";
      });
   }

   undef $rpc;

   $done->recv;

The parent creates the process, queues a few rmdir's. It then forgets
about the C<$rpc> object, so that the child exits after it has handled the
requests, and then it waits till the requests have been handled.

The child is implemented using a separate module, C<MyWorker>, shown here:

   package MyWorker;

   my $count;

   sub run {
      my ($cmd, $path) = @_;

      AnyEvent::Fork::RPC::event ($count)
         unless ++$count % 3;

      my $status = $cmd eq "rmdir"  ? rmdir  $path
                 : $cmd eq "unlink" ? unlink $path
                 : die "fatal error, illegal command '$cmd'";

      $status or (0, "$!")
   }

   1

The C<run> function first sends a "progress" event every three calls, and
then executes C<rmdir> or C<unlink>, depending on the first parameter (or
dies with a fatal error - obviously, you must never let this happen :).

Eventually it returns the status value true if the command was successful,
or the status value 0 and the stringified error message.

On my system, running the first code fragment with the given
F<MyWorker.pm> in the current directory yields:

   /tmp/somepath/1: No such file or directory
   /tmp/somepath/2: No such file or directory
   3  requests handled
   /tmp/somepath/3: No such file or directory
   /tmp/somepath/4: No such file or directory
   /tmp/somepath/5: No such file or directory
   6  requests handled
   /tmp/somepath/6: No such file or directory

Obviously, none of the directories I am trying to delete even exist. Also,
the events and responses are processed in exactly the same order as
they were created in the child, which is true for both synchronous and
asynchronous backends.

Note that the parentheses in the call to C<AnyEvent::Fork::RPC::event> are
not optional. That is because the function isn't defined when the code is
compiled. You can make sure it is visible by pre-loading the correct
backend module in the call to C<require>:

      ->require ("AnyEvent::Fork::RPC::Sync", "MyWorker")

Since the backend module declares the C<event> function, loading it first
ensures that perl will correctly interpret calls to it.

And as a final remark, there is a fine module on CPAN that can
asynchronously C<rmdir> and C<unlink> and a lot more, and more efficiently
than this example, namely L<IO::AIO>.

=head3 Example 1a: the same with the asynchronous backend

This example only shows what needs to be changed to use the async backend
instead. Doing this is not very useful, the purpose of this example is
to show the minimum amount of change that is required to go from the
synchronous to the asynchronous backend.

To use the async backend in the previous example, you need to add the
C<async> parameter to the C<AnyEvent::Fork::RPC::run> call:

      ->AnyEvent::Fork::RPC::run ("MyWorker::run",
         async      => 1,
         ...

And since the function call protocol is now changed, you need to adopt
C<MyWorker::run> to the async API.

First, you need to accept the extra initial C<$done> callback:

   sub run {
      my ($done, $cmd, $path) = @_;

And since a response is now generated when C<$done> is called, as opposed
to when the function returns, we need to call the C<$done> function with
the status:

      $done->($status or (0, "$!"));

A few remarks are in order. First, it's quite pointless to use the async
backend for this example - but it I<is> possible. Second, you can call
C<$done> before or after returning from the function. Third, having both
returned from the function and having called the C<$done> callback, the
child process may exit at any time, so you should call C<$done> only when
you really I<are> done.

=head2 Example 2: Asynchronous Backend

This example implements multiple count-downs in the child, using
L<AnyEvent> timers. While this is a bit silly (one could use timers in the
parent just as well), it illustrates the ability to use AnyEvent in the
child and the fact that responses can arrive in a different order then the
requests.

It also shows how to embed the actual child code into a C<__DATA__>
section, so it doesn't need any external files at all.

And when your parent process is often busy, and you have stricter timing
requirements, then running timers in a child process suddenly doesn't look
so silly anymore.

Without further ado, here is the code:

   use AnyEvent;
   use AnyEvent::Fork;
   use AnyEvent::Fork::RPC;

   my $done = AE::cv;

   my $rpc = AnyEvent::Fork
      ->new
      ->require ("AnyEvent::Fork::RPC::Async")
      ->eval (do { local $/; <DATA> })
      ->AnyEvent::Fork::RPC::run ("run",
         async      => 1,
         on_error   => sub { warn "ERROR: $_[0]"; exit 1 },
         on_event   => sub { print $_[0] },
         on_destroy => $done,
      );

   for my $count (3, 2, 1) {
      $rpc->($count, sub {
         warn "job $count finished\n";
      });
   }

   undef $rpc;

   $done->recv;

   __DATA__

   # this ends up in main, as we don't use a package declaration

   use AnyEvent;

   sub run {
      my ($done, $count) = @_;

      my $n;

      AnyEvent::Fork::RPC::event "starting to count up to $count\n";

      my $w; $w = AE::timer 1, 1, sub {
         ++$n;

         AnyEvent::Fork::RPC::event "count $n of $count\n";

         if ($n == $count) {
            undef $w;
            $done->();
         }
      };
   }

The parent part (the one before the C<__DATA__> section) isn't very
different from the earlier examples. It sets async mode, preloads
the backend module (so the C<AnyEvent::Fork::RPC::event> function is
declared), uses a slightly different C<on_event> handler (which we use
simply for logging purposes) and then, instead of loading a module with
the actual worker code, it C<eval>'s the code from the data section in the
child process.

It then starts three countdowns, from 3 to 1 seconds downwards, destroys
the rpc object so the example finishes eventually, and then just waits for
the stuff to trickle in.

The worker code uses the event function to log some progress messages, but
mostly just creates a recurring one-second timer.

The timer callback increments a counter, logs a message, and eventually,
when the count has been reached, calls the finish callback.

On my system, this results in the following output. Since all timers fire
at roughly the same time, the actual order isn't guaranteed, but the order
shown is very likely what you would get, too.

   starting to count up to 3
   starting to count up to 2
   starting to count up to 1
   count 1 of 3
   count 1 of 2
   count 1 of 1
   job 1 finished
   count 2 of 2
   job 2 finished
   count 2 of 3
   count 3 of 3
   job 3 finished

While the overall ordering isn't guaranteed, the async backend still
guarantees that events and responses are delivered to the parent process
in the exact same ordering as they were generated in the child process.

And unless your system is I<very> busy, it should clearly show that the
job started last will finish first, as it has the lowest count.

This concludes the async example. Since L<AnyEvent::Fork> does not
actually fork, you are free to use about any module in the child, not just
L<AnyEvent>, but also L<IO::AIO>, or L<Tk> for example.

=head2 Example 3: Asynchronous backend with Coro

With L<Coro> you can create a nice asynchronous backend implementation by
defining an rpc server function that creates a new Coro thread for every
request that calls a function "normally", i.e. the parameters from the
parent process are passed to it, and any return values are returned to the
parent process, e.g.:

   package My::Arith;

   sub add {
      return $_[0] + $_[1];
   }

   sub mul {
      return $_[0] * $_[1];
   }

   sub run {
      my ($done, $func, @arg) = @_;

      Coro::async_pool {
         $done->($func->(@arg));
      };
   }

The C<run> function creates a new thread for every invocation, using the
first argument as function name, and calls the C<$done> callback on it's
return values. This makes it quite natural to define the C<add> and C<mul>
functions to add or multiply two numbers and return the result.

Since this is the asynchronous backend, it's quite possible to define RPC
function that do I/O or wait for external events - their execution will
overlap as needed.

The above could be used like this:

   my $rpc = AnyEvent::Fork
      ->new
      ->require ("MyWorker")
      ->AnyEvent::Fork::RPC::run ("My::Arith::run",
         on_error => ..., on_event => ..., on_destroy => ...,
      );

   $rpc->(add => 1, 3, Coro::rouse_cb); say Coro::rouse_wait;
   $rpc->(mul => 3, 2, Coro::rouse_cb); say Coro::rouse_wait;

The C<say>'s will print C<4> and C<6>.

=head2 Example 4: Forward AnyEvent::Log messages using C<on_event>

This partial example shows how to use the C<event> function to forward
L<AnyEvent::Log> messages to the parent.

For this, the parent needs to provide a suitable C<on_event>:

   ->AnyEvent::Fork::RPC::run (
      on_event => sub {
         if ($_[0] eq "ae_log") {
            my (undef, $level, $message) = @_;
            AE::log $level, $message;
         } else {
            # other event types
         }
      },
   )

In the child, as early as possible, the following code should reconfigure
L<AnyEvent::Log> to log via C<AnyEvent::Fork::RPC::event>:

   $AnyEvent::Log::LOG->log_cb (sub {
      my ($timestamp, $orig_ctx, $level, $message) = @{+shift};

      if (defined &AnyEvent::Fork::RPC::event) {
         AnyEvent::Fork::RPC::event (ae_log => $level, $message);
      } else {
         warn "[$$ before init] $message\n";
      }
   });

There is an important twist - the C<AnyEvent::Fork::RPC::event> function
is only defined when the child is fully initialised. If you redirect the
log messages in your C<init> function for example, then the C<event>
function might not yet be available. This is why the log callback checks
whether the function is there using C<defined>, and only then uses it to
log the message.

=head1 PARENT PROCESS USAGE

This module exports nothing, and only implements a single function:

=over 4

=cut

package AnyEvent::Fork::RPC;

use common::sense;

use Errno ();
use Guard ();

use AnyEvent;

our $VERSION = '2.0';

=item my $rpc = AnyEvent::Fork::RPC::run $fork, $function, [key => value...]

The traditional way to call it. But it is way cooler to call it in the
following way:

=item my $rpc = $fork->AnyEvent::Fork::RPC::run ($function, [key => value...])

This C<run> function/method can be used in place of the
L<AnyEvent::Fork::run> method. Just like that method, it takes over
the L<AnyEvent::Fork> process, but instead of calling the specified
C<$function> directly, it runs a server that accepts RPC calls and handles
responses.

It returns a function reference that can be used to call the function in
the child process, handling serialisation and data transfers.

The following key/value pairs are allowed. It is recommended to have at
least an C<on_error> or C<on_event> handler set.

=over 4

=item on_error => $cb->($msg)

Called on (fatal) errors, with a descriptive (hopefully) message. If
this callback is not provided, but C<on_event> is, then the C<on_event>
callback is called with the first argument being the string C<error>,
followed by the error message.

If neither handler is provided, then the error is reported with loglevel
C<error> via C<AE::log>.

=item on_event => $cb->(...)

Called for every call to the C<AnyEvent::Fork::RPC::event> function in the
child, with the arguments of that function passed to the callback.

Also called on errors when no C<on_error> handler is provided.

=item on_destroy => $cb->()

Called when the C<$rpc> object has been destroyed and all requests have
been successfully handled. This is useful when you queue some requests and
want the child to go away after it has handled them. The problem is that
the parent must not exit either until all requests have been handled, and
this can be accomplished by waiting for this callback.

=item init => $function (default: none)

When specified (by name), this function is called in the child as the very
first thing when taking over the process, with all the arguments normally
passed to the C<AnyEvent::Fork::run> function, except the communications
socket.

It can be used to do one-time things in the child such as storing passed
parameters or opening database connections.

It is called very early - before the serialisers are created or the
C<$function> name is resolved into a function reference, so it could be
used to load any modules that provide the serialiser or function. It can
not, however, create events.

=item done => $function (default: C<CORE::exit>)

The function to call when the asynchronous backend detects an end of file
condition when reading from the communications socket I<and> there are no
outstanding requests. It is ignored by the synchronous backend.

By overriding this you can prolong the life of a RPC process after e.g.
the parent has exited by running the event loop in the provided function
(or simply calling it, for example, when your child process uses L<EV> you
could provide L<EV::run> as C<done> function).

Of course, in that case you are responsible for exiting at the appropriate
time and not returning from

=item async => $boolean (default: C<0>)

The default server used in the child does all I/O blockingly, and only
allows a single RPC call to execute concurrently.

Setting C<async> to a true value switches to another implementation that
uses L<AnyEvent> in the child and allows multiple concurrent RPC calls (it
does not support recursion in the event loop however, blocking condvar
calls will fail).

The actual API in the child is documented in the section that describes
the calling semantics of the returned C<$rpc> function.

If you want to pre-load the actual back-end modules to enable memory
sharing, then you should load C<AnyEvent::Fork::RPC::Sync> for
synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode.

If you use a template process and want to fork both sync and async
children, then it is permissible to load both modules.

=item serialiser => $string (default: C<$AnyEvent::Fork::RPC::STRING_SERIALISER>)

All arguments, result data and event data have to be serialised to be
transferred between the processes. For this, they have to be frozen and
thawed in both parent and child processes.

By default, only octet strings can be passed between the processes,
which is reasonably fast and efficient and requires no extra modules
(the C<AnyEvent::Fork::RPC> distribution does not provide these extra
serialiser modules).

For more complicated use cases, you can provide your own freeze and thaw
functions, by specifying a string with perl source code. It's supposed to
return two code references when evaluated: the first receives a list of
perl values and must return an octet string. The second receives the octet
string and must return the original list of values.

If you need an external module for serialisation, then you can either
pre-load it into your L<AnyEvent::Fork> process, or you can add a C<use>
or C<require> statement into the serialiser string. Or both.

Here are some examples - all of them are also available as global
variables that make them easier to use.

=over 4

=item C<$AnyEvent::Fork::RPC::STRING_SERIALISER> - octet strings only

This serialiser (currently the default) concatenates length-prefixes octet
strings, and is the default. That means you can only pass (and return)
strings containing character codes 0-255.

The main advantages of this serialiser are the high speed and that it
doesn't need another module. The main disadvantage is that you are very
limited in what you can pass - only octet strings.

Implementation:

   (
      sub { pack   "(w/a*)*", @_ },
      sub { unpack "(w/a*)*", shift }
   )

=item C<$AnyEvent::Fork::RPC::CBOR_XS_SERIALISER> - uses L<CBOR::XS>

This serialiser creates CBOR::XS arrays - you have to make sure the
L<CBOR::XS> module is installed for this serialiser to work. It can be
beneficial for sharing when you preload the L<CBOR::XS> module in a template
process.

L<CBOR::XS> is about as fast as the octet string serialiser, but supports
complex data structures (similar to JSON) and is faster than any of the
other serialisers. If you have the L<CBOR::XS> module available, it's the
best choice.

The encoder enables C<allow_sharing> (so this serialisation method can
encode cyclic and self-referencing data structures).

Implementation:

   use CBOR::XS ();
   (
      sub {    CBOR::XS::encode_cbor_sharing \@_ },
      sub { @{ CBOR::XS::decode_cbor shift } }
   )

=item C<$AnyEvent::Fork::RPC::JSON_SERIALISER> - uses L<JSON::XS> or L<JSON>

This serialiser creates JSON arrays - you have to make sure the L<JSON>
module is installed for this serialiser to work. It can be beneficial for
sharing when you preload the L<JSON> module in a template process.

L<JSON> (with L<JSON::XS> installed) is slower than the octet string
serialiser, but usually much faster than L<Storable>, unless big chunks of
binary data need to be transferred.

Implementation:

   use JSON ();
   (
      sub {    JSON::encode_json \@_ },
      sub { @{ JSON::decode_json shift } }
   )

=item C<$AnyEvent::Fork::RPC::STORABLE_SERIALISER> - L<Storable>

This serialiser uses L<Storable>, which means it has high chance of
serialising just about anything you throw at it, at the cost of having
very high overhead per operation. It also comes with perl. It should be
used when you need to serialise complex data structures.

Implementation:

   use Storable ();
   (
      sub {    Storable::freeze \@_ },
      sub { @{ Storable::thaw shift } }
   )

=item C<$AnyEvent::Fork::RPC::NSTORABLE_SERIALISER> - portable Storable

This serialiser also uses L<Storable>, but uses it's "network" format
to serialise data, which makes it possible to talk to different
perl binaries (for example, when talking to a process created with
L<AnyEvent::Fork::Remote>).

Implementation:

   use Storable ();
   (
      sub {    Storable::nfreeze \@_ },
      sub { @{ Storable::thaw shift } }
   )

=back

=item buflen => $bytes (default: C<512 - 16>)

The starting size of the read buffer for request and response data.

C<AnyEvent::Fork::RPC> ensures that the buffer for reeading request and
response data is large enough for at leats aingle request or response, and
will dynamically enlarge the buffer if needed.

While this ensures that memory is not overly wasted, it typically leads
to having to do one syscall per request, which can be inefficient in some
cases. In such cases, it can be beneficient to increase the buffer size to
hold more than one request.

=item buflen_req => $bytes (default: same as C<buflen>)

Overrides C<buflen> for request data (as read by the forked process).

=item buflen_res => $bytes (default: same as C<buflen>)

Overrides C<buflen> for response data (replies read by the parent process).

=back

See the examples section earlier in this document for some actual
examples.

=cut

our $STRING_SERIALISER    = '(sub { pack "(w/a*)*", @_ }, sub { unpack "(w/a*)*", shift })';
our $CBOR_XS_SERIALISER   = 'use CBOR::XS (); (sub { CBOR::XS::encode_cbor_sharing \@_ }, sub { @{ CBOR::XS::decode_cbor shift } })';
our $JSON_SERIALISER      = 'use JSON     (); (sub { JSON::encode_json             \@_ }, sub { @{ JSON::decode_json     shift } })';
our $STORABLE_SERIALISER  = 'use Storable (); (sub { Storable::freeze  \@_ }, sub { @{ Storable::thaw shift } })';
our $NSTORABLE_SERIALISER = 'use Storable (); (sub { Storable::nfreeze \@_ }, sub { @{ Storable::thaw shift } })';

sub run {
   my ($self, $function, %arg) = @_;

   my $serialiser = delete $arg{serialiser} || $STRING_SERIALISER;
   my $on_event   = delete $arg{on_event};
   my $on_error   = delete $arg{on_error};
   my $on_destroy = delete $arg{on_destroy};
   
   # default for on_error is to on_event, if specified
   $on_error ||= $on_event
               ? sub { $on_event->(error => shift) }
               : sub { AE::log die => "AnyEvent::Fork::RPC: uncaught error: $_[0]." };

   # default for on_event is to raise an error
   $on_event ||= sub { $on_error->("event received, but no on_event handler") };

   my ($f, $t) = eval $serialiser; die $@ if $@;

   my (@rcb, %rcb, $fh, $shutdown, $wbuf, $ww);
   my ($rlen, $rbuf, $rw) = $arg{buflen_res} || $arg{buflen} || 512 - 16;

   my $wcb = sub {
      my $len = syswrite $fh, $wbuf;

      unless (defined $len) {
         if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
            undef $rw; undef $ww; # it ends here
            $on_error->("$!");
         }
      }

      substr $wbuf, 0, $len, "";

      unless (length $wbuf) {
         undef $ww;
         $shutdown and shutdown $fh, 1;
      }
   };

   my $module = "AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync");

   $self->eval ("use $module 2 ()")
        ->send_arg (
             function   => $function,
             init       => $arg{init},
             serialiser => $serialiser,
             done       => $arg{done} || "$module\::do_exit",
             rlen       => $arg{buflen_req} || $arg{buflen} || 512 - 16,
             -10 # the above are 10 arguments
          )
        ->run ("$module\::run", sub {
      $fh = shift
         or return $on_error->("connection failed");

      my ($id, $len);
      $rw = AE::io $fh, 0, sub {
         $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf;
         $len = sysread $fh, $rbuf, $rlen - length $rbuf, length $rbuf;

         if ($len) {
            while (8 <= length $rbuf) {
               ($id, $len) = unpack "NN", $rbuf;
               8 + $len <= length $rbuf
                  or last;

               my @r = $t->(substr $rbuf, 8, $len);
               substr $rbuf, 0, 8 + $len, "";

               if ($id) {
                  if (@rcb) {
                     (shift @rcb)->(@r);
                  } elsif (my $cb = delete $rcb{$id}) {
                     $cb->(@r);
                  } else {
                     undef $rw; undef $ww;
                     $on_error->("unexpected data from child");
                  }
               } else {
                  $on_event->(@r);
               }
            }
         } elsif (defined $len) {
            undef $rw; undef $ww; # it ends here

            if (@rcb || %rcb) {
               $on_error->("unexpected eof");
            } else {
               $on_destroy->()
                  if $on_destroy;
            }
         } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
            undef $rw; undef $ww; # it ends here
            $on_error->("read: $!");
         }
      };

      $ww ||= AE::io $fh, 1, $wcb;
   });

   my $guard = Guard::guard {
      $shutdown = 1;

      shutdown $fh, 1 if $fh && !$ww;
   };

   my $id;

   $arg{async}
      ? sub {
           $id = ($id == 0xffffffff ? 0 : $id) + 1;
           $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops

           $rcb{$id} = pop;

           $guard if 0; # keep it alive

           $wbuf .= pack "NN/a*", $id, &$f;
           $ww ||= $fh && AE::io $fh, 1, $wcb;
        }
      : sub {
           push @rcb, pop;

           $guard; # keep it alive

           $wbuf .= pack "N/a*", &$f;
           $ww ||= $fh && AE::io $fh, 1, $wcb;
        }
}

=item $rpc->(..., $cb->(...))

The RPC object returned by C<AnyEvent::Fork::RPC::run> is actually a code
reference. There are two things you can do with it: call it, and let it go
out of scope (let it get destroyed).

If C<async> was false when C<$rpc> was created (the default), then, if you
call C<$rpc>, the C<$function> is invoked with all arguments passed to
C<$rpc> except the last one (the callback). When the function returns, the
callback will be invoked with all the return values.

If C<async> was true, then the C<$function> receives an additional
initial argument, the result callback. In this case, returning from
C<$function> does nothing - the function only counts as "done" when the
result callback is called, and any arguments passed to it are considered
the return values. This makes it possible to "return" from event handlers
or e.g. Coro threads.

The other thing that can be done with the RPC object is to destroy it. In
this case, the child process will execute all remaining RPC calls, report
their results, and then exit.

See the examples section earlier in this document for some actual
examples.

=back

=head1 CHILD PROCESS USAGE

The following function is not available in this module. They are only
available in the namespace of this module when the child is running,
without having to load any extra modules. They are part of the child-side
API of L<AnyEvent::Fork::RPC>.

Note that these functions are typically not yet declared when code is
compiled into the child, because the backend module is only loaded when
you call C<run>, which is typically the last method you call on the fork
object.

Therefore, you either have to explicitly pre-load the right backend module
or mark calls to these functions as function calls, e.g.:

   AnyEvent::Fork::RPC::event (0 => "five");
   AnyEvent::Fork::RPC::event->(0 => "five");
   &AnyEvent::Fork::RPC::flush;

=over 4

=item AnyEvent::Fork::RPC::event (...)

Send an event to the parent. Events are a bit like RPC calls made by the
child process to the parent, except that there is no notion of return
values.

See the examples section earlier in this document for some actual
examples.

Note: the event data, like any data send to the parent, might not be sent
immediatelly but queued for later sending, so there is no guarantee that
the event has been sent to the parent when the call returns - when you
e.g. exit directly after calling this function, the parent might never
receive the event. See the next function for a remedy.

=item $success = AnyEvent::Fork::RPC::flush ()

Synchronously wait and flush the reply data to the parent. Returns true on
success and false otherwise (i.e. when the reply data cannot be written at
all). Ignoring the success status is a common and healthy behaviour.

Only the "async" backend does something on C<flush> - the "sync" backend
is not buffering reply data and always returns true from this function.

Normally, reply data might or might not be written to the parent
immediatelly but is buffered. This can greatly improve performance and
efficiency, but sometimes can get in your way: for example. when you want
to send an error message just before exiting, or when you want to ensure
replies timely reach the parent before starting a long blocking operation.

In these cases, you can call this function to flush any outstanding reply
data to the parent. This is done blockingly, so no requests will be
handled and no event callbacks will be called.

For example, you could wrap your request function in a C<eval> block and
report the exception string back to the caller just before exiting:

   sub req {
      ...

      eval {
         ...
      };

      if ($@) {
         AnyEvent::RPC::event (throw => "$@");
         AnyEvent::RPC::flush ();
         exit;
      }

      ...
   }

=back

=head2 PROCESS EXIT

If and when the child process exits depends on the backend and
configuration. Apart from explicit exits (e.g. by calling C<exit>) or
runtime conditions (uncaught exceptions, signals etc.), the backends exit
under these conditions:

=over 4

=item Synchronous Backend

The synchronous backend is very simple: when the process waits for another
request to arrive and the writing side (usually in the parent) is closed,
it will exit normally, i.e. as if your main program reached the end of the
file.

That means that if your parent process exits, the RPC process will usually
exit as well, either because it is idle anyway, or because it executes a
request. In the latter case, you will likely get an error when the RPc
process tries to send the results to the parent (because agruably, you
shouldn't exit your parent while there are still outstanding requests).

The process is usually quiescent when it happens, so it should rarely be a
problem, and C<END> handlers can be used to clean up.

=item Asynchronous Backend

For the asynchronous backend, things are more complicated: Whenever it
listens for another request by the parent, it might detect that the socket
was closed (e.g. because the parent exited). It will sotp listening for
new requests and instead try to write out any remaining data (if any) or
simply check whether the socket can be written to. After this, the RPC
process is effectively done - no new requests are incoming, no outstanding
request data can be written back.

Since chances are high that there are event watchers that the RPC server
knows nothing about (why else would one use the async backend if not for
the ability to register watchers?), the event loop would often happily
continue.

This is why the asynchronous backend explicitly calls C<CORE::exit> when
it is done (under other circumstances, such as when there is an I/O error
and there is outstanding data to write, it will log a fatal message via
L<AnyEvent::Log>, also causing the program to exit).

You can override this by specifying a function name to call via the C<done>
parameter instead.

=back

=head1 ADVANCED TOPICS

=head2 Choosing a backend

So how do you decide which backend to use? Well, that's your problem to
solve, but here are some thoughts on the matter:

=over 4

=item Synchronous

The synchronous backend does not rely on any external modules (well,
except L<common::sense>, which works around a bug in how perl's warning
system works). This keeps the process very small, for example, on my
system, an empty perl interpreter uses 1492kB RSS, which becomes 2020kB
after C<use warnings; use strict> (for people who grew up with C64s around
them this is probably shocking every single time they see it). The worker
process in the first example in this document uses 1792kB.

Since the calls are done synchronously, slow jobs will keep newer jobs
from executing.

The synchronous backend also has no overhead due to running an event loop
- reading requests is therefore very efficient, while writing responses is
less so, as every response results in a write syscall.

If the parent process is busy and a bit slow reading responses, the child
waits instead of processing further requests. This also limits the amount
of memory needed for buffering, as never more than one response has to be
buffered.

The API in the child is simple - you just have to define a function that
does something and returns something.

It's hard to use modules or code that relies on an event loop, as the
child cannot execute anything while it waits for more input.

=item Asynchronous

The asynchronous backend relies on L<AnyEvent>, which tries to be small,
but still comes at a price: On my system, the worker from example 1a uses
3420kB RSS (for L<AnyEvent>, which loads L<EV>, which needs L<XSLoader>
which in turn loads a lot of other modules such as L<warnings>, L<strict>,
L<vars>, L<Exporter>...).

It batches requests and responses reasonably efficiently, doing only as
few reads and writes as needed, but needs to poll for events via the event
loop.

Responses are queued when the parent process is busy. This means the child
can continue to execute any queued requests. It also means that a child
might queue a lot of responses in memory when it generates them and the
parent process is slow accepting them.

The API is not a straightforward RPC pattern - you have to call a
"done" callback to pass return values and signal completion. Also, more
importantly, the API starts jobs as fast as possible - when 1000 jobs
are queued and the jobs are slow, they will all run concurrently. The
child must implement some queueing/limiting mechanism if this causes
problems. Alternatively, the parent could limit the amount of rpc calls
that are outstanding.

Blocking use of condvars is not supported (in the main thread, outside of
e.g. L<Coro> threads).

Using event-based modules such as L<IO::AIO>, L<Gtk2>, L<Tk> and so on is
easy.

=back

=head2 Passing file descriptors

Unlike L<AnyEvent::Fork>, this module has no in-built file handle or file
descriptor passing abilities.

The reason is that passing file descriptors is extraordinary tricky
business, and conflicts with efficient batching of messages.

There still is a method you can use: Create a
C<AnyEvent::Util::portable_socketpair> and C<send_fh> one half of it to
the process before you pass control to C<AnyEvent::Fork::RPC::run>.

Whenever you want to pass a file descriptor, send an rpc request to the
child process (so it expects the descriptor), then send it over the other
half of the socketpair. The child should fetch the descriptor from the
half it has passed earlier.

Here is some (untested) pseudocode to that effect:

   use AnyEvent::Util;
   use AnyEvent::Fork;
   use AnyEvent::Fork::RPC;
   use IO::FDPass;

   my ($s1, $s2) = AnyEvent::Util::portable_socketpair;

   my $rpc = AnyEvent::Fork
      ->new
      ->send_fh ($s2)
      ->require ("MyWorker")
      ->AnyEvent::Fork::RPC::run ("MyWorker::run"
           init => "MyWorker::init",
        );

   undef $s2; # no need to keep it around

   # pass an fd
   $rpc->("i'll send some fd now, please expect it!", my $cv = AE::cv);

   IO::FDPass fileno $s1, fileno $handle_to_pass;

   $cv->recv;

The MyWorker module could look like this:

   package MyWorker;

   use IO::FDPass;

   my $s2;

   sub init {
      $s2 = $_[0];
   }

   sub run {
      if ($_[0] eq "i'll send some fd now, please expect it!") {
         my $fd = IO::FDPass::recv fileno $s2;
         ...
      }
   }

Of course, this might be blocking if you pass a lot of file descriptors,
so you might want to look into L<AnyEvent::FDpasser> which can handle the
gory details.

=head1 EXCEPTIONS

There are no provisions whatsoever for catching exceptions at this time -
in the child, exceptions might kill the process, causing calls to be lost
and the parent encountering a fatal error. In the parent, exceptions in
the result callback will not be caught and cause undefined behaviour.

=head1 SEE ALSO

L<AnyEvent::Fork>, to create the processes in the first place.

L<AnyEvent::Fork::Remote>, likewise, but helpful for remote processes.

L<AnyEvent::Fork::Pool>, to manage whole pools of processes.

=head1 AUTHOR AND CONTACT INFORMATION

 Marc Lehmann <schmorp@schmorp.de>
 http://software.schmorp.de/pkg/AnyEvent-Fork-RPC

=cut

1



Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.