Group
Extension

AnyEvent-MP/MP/Intro.pod

=head1 Message Passing for the Non-Blocked Mind

=head1 Introduction and Terminology

This is a tutorial about how to get the swing of the L<AnyEvent::MP>
module family, which allows processes to transparently pass messages to
itself and to other processes on the same or a different host.

What kind of messages? Basically a message here means a list of Perl
strings, numbers, hashes and arrays, anything that can be expressed as a
L<JSON> text (as JSON is the default serialiser in the protocol). Here are
two examples:

    write_log => 1251555874, "action was successful.\n"
    123, ["a", "b", "c"], { foo => "bar" }

When using L<AnyEvent::MP> it is customary to use a descriptive string as
first element of a message that indicates the type of the message. This
element is called a I<tag> in L<AnyEvent::MP>, as some API functions
(C<rcv>) support matching it directly.

Supposedly you want to send some kind of ping message with your current
time to somewhere, this is how such a message might look like (in Perl
syntax):

   ping => 1251381636

Now that we know what a message is, to which entities are those
messages being I<passed>? They are I<passed> to I<ports>. A I<port> is
a destination for messages but also a context to execute code: when
a runtime error occurs while executing code belonging to a port, the
exception will be raised on the port and can even travel to interested
parties on other nodes, which makes supervision of distributed processes
easy.

How do these ports relate to things you know? Each I<port> belongs
to a I<node>, and a I<node> is just the UNIX process that runs your
L<AnyEvent::MP> application.

Each I<node> is distinguished from other I<nodes> running on the same or
another host in a network by its I<node ID>. A I<node ID> is simply a
unique string chosen manually or assigned by L<AnyEvent::MP> in some way
(UNIX nodename, random string...).

Here is a diagram about how I<nodes>, I<ports> and UNIX processes relate
to each other. The setup consists of two nodes (more are of course
possible): Node C<A> (in UNIX process 7066) with the ports C<ABC> and
C<DEF>. And the node C<B> (in UNIX process 8321) with the ports C<FOO> and
C<BAR>.


  |- PID: 7066 -|                  |- PID: 8321 -|
  |             |                  |             |
  | Node ID: A  |                  | Node ID: B  |
  |             |                  |             |
  |   Port ABC =|= <----\ /-----> =|= Port FOO   |
  |             |        X         |             |
  |   Port DEF =|= <----/ \-----> =|= Port BAR   |
  |             |                  |             |
  |-------------|                  |-------------|

The strings for the I<port IDs> here are just for illustrative
purposes: Even though I<ports> in L<AnyEvent::MP> are also identified by
strings, they can't be chosen manually and are assigned by the system
dynamically. These I<port IDs> are unique within a network and can also be
used to identify senders, or even as message tags for instance.

The next sections will explain the API of L<AnyEvent::MP> by going through
a few simple examples. Later some more complex idioms are introduced,
which are hopefully useful to solve some real world problems.

=head2 Passing Your First Message

For starters, let's have a look at the messaging API. The following
example is just a demo to show the basic elements of message passing with
L<AnyEvent::MP>.

The example should print: C<Ending with: 123>, in a rather complicated
way, by passing some message to a port.

   use AnyEvent;
   use AnyEvent::MP;

   my $end_cv = AnyEvent->condvar;

   my $port = port;

   rcv $port, test => sub {
      my ($data) = @_;
      $end_cv->send ($data);
   };

   snd $port, test => 123;

   print "Ending with: " . $end_cv->recv . "\n";

It already uses most of the essential functions inside
L<AnyEvent::MP>: First there is the C<port> function which creates a
I<port> and will return it's I<port ID>, a simple string.

This I<port ID> can be used to send messages to the port and install
handlers to receive messages on the port. Since it is a simple string
it can be safely passed to other I<nodes> in the network when you want
to refer to that specific port (usually used for RPC, where you need
to tell the other end which I<port> to send the reply to - messages in
L<AnyEvent::MP> have a destination, but no source).

The next function is C<rcv>:

   rcv $port, test => sub { ... };

It installs a receiver callback on the I<port> that specified as the first
argument (it only works for "local" ports, i.e. ports created on the same
node). The next argument, in this example C<test>, specifies a I<tag> to
match. This means that whenever a message with the first element being
the string C<test> is received, the callback is called with the remaining
parts of that message.

Messages can be sent with the C<snd> function, which is used like this in
the example above:

   snd $port, test => 123;

This will send the message C<'test', 123> to the I<port> with the I<port
ID> stored in C<$port>. Since in this case the receiver has a I<tag> match
on C<test> it will call the callback with the first argument being the
number C<123>.

The callback is a typical AnyEvent idiom: the callback just passes
that number on to the I<condition variable> C<$end_cv> which will then
pass the value to the print. Condition variables are out of the scope
of this tutorial and not often used with ports, so please consult the
L<AnyEvent::Intro> about them.

Passing messages inside just one process is boring. Before we can move on
and do interprocess message passing we first have to make sure some things
have been set up correctly for our nodes to talk to each other.

=head2 System Requirements and System Setup

Before we can start with real IPC we have to make sure some things work on
your system.

First we have to setup a I<shared secret>: for two L<AnyEvent::MP>
I<nodes> to be able to communicate with each other over the network it is
necessary to setup the same I<shared secret> for both of them, so they can
prove their trustworthyness to each other.

The easiest way is to set this up is to use the F<aemp> utility:

   aemp gensecret

This creates a F<$HOME/.perl-anyevent-mp> config file and generates a
random shared secret. You can copy this file to any other system and
then communicate over the network (via TCP) with it. You can also select
your own shared secret (F<aemp setsecret>) and for increased security
requirements you can even create (or configure) a TLS certificate (F<aemp
gencert>), causing connections to not just be securely authenticated, but
also to be encrypted and protected against tinkering.

Connections will only be successfully established when the I<nodes>
that want to connect to each other have the same I<shared secret> (or
successfully verify the TLS certificate of the other side, in which case
no shared secret is required).

B<If something does not work as expected, and for example tcpdump shows
that the connections are closed almost immediately, you should make sure
that F<~/.perl-anyevent-mp> is the same on all hosts/user accounts that
you try to connect with each other!>

Thats is all for now, you will find some more advanced fiddling with the
C<aemp> utility later.

=head2 Shooting the Trouble

Sometimes things go wrong, and AnyEvent::MP, being a professional module,
does not gratuitously spill out messages to your screen.

To help troubleshooting any issues, there are two environment variables
that you can set. The first, C<AE_VERBOSE> sets the logging level of
L<AnyEvent::Log>, which AnyEvent::MP uses. The default is C<4>, which
means nothing much is printed. You can increase it to C<8> or C<9> to get
more verbose output. This is example output when starting a node (somewhat
abridged to get shorter lines):

   2012-03-22 01:41:43.59 debug AE::Util: using Guard module to implement guards.
   2012-03-22 01:41:43.62 debug AE::MP::Kernel: node cerebro/slwK2LEq7O starting up.
   2012-03-22 01:41:43.62 debug AE::MP::Kernel: node listens on [10.0.0.1:52110].
   2012-03-22 01:41:43.62 trace AE::MP::Kernel: trying connect to seed node 10.0.0.19:4040.
   2012-03-22 01:41:43.66 trace AE::MP::Transport: 10.0.0.19:4040 connected as rain.
   2012-03-22 01:41:43.66 info  AE::MP::Kernel: rain is up.

A lot of info, but at least you can see that it does something. To only
get info about AnyEvent::MP, you can use C<AE_LOG=AnyEvent::MP=+log> in
your environment.

The other environment variable that can be useful is
C<AE_MP_TRACE>, which, when set to a true value, will cause
most messages that are sent or received to be printed. For example, F<aemp
restart rijk> might output these message exchanges:

   SND rijk <- [null,"eval","AnyEvent::Watchdog::Util::restart; ()","aemp/cerebro/z4kUPp2JT4#b"]
   SND rain <- [null,"g_slave",{"'l":{"aemp/cerebro/z4kUPp2JT4":["10.0.0.1:48168"]}}]
   SND rain <- [null,"g_find","rijk"]
   RCV rain -> ["","g_found","rijk",["10.0.0.23:4040"]]
   RCV rijk -> ["b",""]

=head1 PART 1: Passing Messages Between Processes

=head2 The Receiver

Lets split the previous example up into two programs: one that contains
the sender and one for the receiver. First the receiver application, in
full:

   use AnyEvent;
   use AnyEvent::MP;

   configure nodeid => "eg_receiver/%u", binds => ["*:4040"];

   my $port = port;
   db_set eg_receivers => $port;

   rcv $port, test => sub {
      my ($data, $reply_port) = @_;

      print "Received data: " . $data . "\n";
   };

   AnyEvent->condvar->recv;

Now, that wasn't too bad, was it? OK, let's go through the new functions
that have been used.

=head3 C<configure> and Joining and Maintaining the Network

First let's have a look at C<configure>:

   configure nodeid => "eg_receiver/%u", binds => ["*:4040"];

Before we are able to send messages to other nodes we have to configure
the node to become a "networked node". Configuring a node means naming
the node and binding some TCP listeners so that other nodes can contact
it. The choice on whether a process becomes a networked node or not must
be done before doing anything else with AnyEvent::MP.

Additionally, to actually link all nodes in a network together, you should
specify a number of seed addresses, which will be used by the node to
connect itself into an existing network, as we will see shortly.

All of this info (and more) can be passed to the C<configure> function -
later we will see how we can do all this without even passing anything to
C<configure>!

Back to the function call in the program: the first parameter, C<nodeid>,
specified the node ID (in this case C<eg_receiver/%u> - the default is to
use the node name of the current host plus C</%u>, which gives the node a
name with a random suffix to make it unique, but for this example we want
the node to have a bit more personality, and name it C<eg_receiver> with a
random suffix.

Why the random suffix? Node IDs need to be unique within the network and
appending a random suffix is the easiest way to do that.

The second parameter, C<binds>, specifies a list of C<address:port> pairs
to bind TCP listeners on. The special "address" of C<*> means to bind on
every local IP address (this might not work on every OS, so explicit IP
addresses are best).

The reason to bind on a TCP port is not just that other nodes can connect
to us: if no binds are specified, the node will still bind on a dynamic
port on all local addresses - but in this case we won't know the port, and
cannot tell other nodes to connect to it as seed node.

Now, a I<seed> is simply the TCP address of some other node in the
network, often the same string as used for the C<binds> parameter of the
other node. The need for seeds is easy to explain: I<somehow> the nodes
of an aemp network have to find each other, and often this means over the
internet. So broadcasts are out.

Instead, a node usually specifies the addresses of one or few (for
redundancy) other nodes, some of which should be up. Two nodes can set
each other as seeds without any issues. You could even specify all nodes
as seeds for all nodes, for total redundancy. But the common case is to
have some more or less central, stable servers running seed services for
other nodes.

All you need to do to ensure that an AnyEvent::MP network connects
together is to make sure that all seed nodes are connected together via
their seed connections, i.e., all connections from seed nodes to I<their>
seed nodes form a connected graph.

A node tries to keep connections open to all of it's seed nodes at all
times, while other connections are made on demand only.

The simplest way to do that would be for all nodes to use the same seed
nodes: seed nodes would seed each other, and all other nodes would connect
to the seed nodes.

All of this ensures that the network stays one network - even if all the
nodes in one half of the net are separated from the nodes in the other
half by some network problem, once that is over, they will eventually
become a single network again.

In addition to creating the network, a node also expects the seed nodes to
run the shared database service - if need be, by automatically starting
it, so you don't normally need to configure this explicitly.

The process of joining a network takes time, during which the node
is already running. This means it takes time until the node is
fully connected, and information about services in the network are
available. This is why most AnyEvent::MP programs either just register
themselves in the database and wait to be "found" by others, or they start
to monitor the database until some nodes of the required type show up.

We will see how this is done later, in the sender program.

=head3 Registering the Receiver

Coming back to our example, after the node has been configured for network
access, it is time to publish some service, namely the receive service.

For that, let's look at the next lines:

   my $port = port;
   db_set eg_receivers => $port;

The C<port> function has already been discussed. It simply creates a new
I<port> and returns the I<port ID>. The C<db_set> function, however, is
new: The first argument is the name of a I<database family> and the second
argument is the name of a I<subkey> within that family. The third argument
would be the I<value> to be associated with the family and subkey, but,
since it is missing, it will simply be C<undef>.

What is a "family" you wonder? Well, AnyEvent::MP comes with a distributed
database. This database runs on so-called "global" nodes, which usually
are the seed nodes of your network. The database structure is "simply" a
hash of hashes of values.

To illustrate this with Perl syntax, assume the database was stored in
C<%DB>, then the C<db_set> function more or less would do this:

   $DB{eg_receivers}{$port} = undef;

So the ominous "family" selects a hash in the database, and the "subkey"
is simply the key in this hash - C<db_set> very much works like this
assignment.

The family namespace is shared by all nodes in a network, so the names
should be reasonably unique, for example, they could start with the name
of your module, or the name of the program, using your port name or node
name as subkey.

The purpose behind adding this key to the database is that the sender can
look it up and find our port. We will shortly see how.

The last step in the example is to set up a receiver callback for those
messages, just as was discussed in the first example. We again match
for the tag C<test>. The difference is that this time we don't exit the
application after receiving the first message. Instead we continue to wait
for new messages indefinitely.

=head2 The Sender

OK, now let's take a look at the sender code:

   use AnyEvent;
   use AnyEvent::MP;

   configure nodeid => "eg_sender/%u", seeds => ["*:4040"];

   my $guard = db_mon eg_receivers => sub {
      my ($family, $a, $c, $d) = @_;
      return unless %$family;

      # now there are some receivers, send them a message
      snd $_ => test => time
         for keys %$family;
   };

   AnyEvent->condvar->recv;

It's even less code. The C<configure> serves the same purpose as in the
receiver, but instead of specifying binds we specify a list of seeds - the
only seed happens to be the same as the bind used by the receiver, which
therefore becomes our seed node.

Remember the part about having to wait till things become available? Well,
after configure returns, nothing has been done yet - the node is not
connected to the network, knows nothing about the database contents, and
it can take ages (for a computer :) for this situation to change.

Therefore, the sender waits, in this case by using the C<db_mon>
function. This function registers an interest in a specific database
family (in this case C<eg_receivers>). Each time something inside the
family changes (a key is added, changed or deleted), it will call our
callback with the family hash as first argument, and the list of keys as
second argument.

The callback only checks whether the C<%$family> hash is empty - if it is,
then it doesn't do anything. But eventually the family will contain the
port subkey we set in the sender. Then it will send a message to it (and
any other receiver in the same family). Likewise, should the receiver go
away and come back, or should another receiver come up, it will again send
a message to all of them.

You can experiment by having multiple receivers - you have to change the
"binds" parameter in the receiver to the seeds used in the sender to start
up additional receivers, but then you can start as many as you like. If
you specify proper IP addresses for the seeds, you can even run them on
different computers.

Each time you start the sender, it will send a message to all receivers it
finds (you have to interrupt it manually afterwards).

Additional experiments you could try include using C<AE_MP_TRACE=1> to see
which messages are exchanged, or starting the sender before the receiver
and see how long it then takes to find the receiver.

=head3 Splitting Network Configuration and Application Code

OK, so far, this works reasonably well. In the real world, however, the
person configuring your application to run on a specific network (the end
user or network administrator) is often different to the person coding the
application.

Or to put it differently: the arguments passed to configure are usually
provided not by the programmer, but by whoever is deploying the program -
even in the example above, we would like to be able to just start senders
and receivers without having to patch the programs.

To make this easy, AnyEvent::MP supports a simple configuration database,
using profiles, which can be managed using the F<aemp> command-line
utility (yes, this section is about the advanced tinkering mentioned
before).

When you change both programs above to simply call

   configure;

then AnyEvent::MP tries to look up a profile using the current node name
in its configuration database, falling back to some global default.

You can run "generic" nodes using the F<aemp> utility as well, and we will
exploit this in the following way: we configure a profile "seed" and run
a node using it, whose sole purpose is to be a seed node for our example
programs.

We bind the seed node to port 4040 on all interfaces:

   aemp profile seed binds "*:4040"

And we configure all nodes to use this as seed node (this only works when
running on the same host, for multiple machines you would replace the C<*>
by the IP address or hostname of the node running the seed), by changing
the global settings shared between all profiles:

   aemp seeds "*:4040"

Then we run the seed node:

   aemp run profile seed

After that, we can start as many other nodes as we want, and they will
all use our generic seed node to discover each other. The reason we can
start our existing programs even though they specify "incompatible"
parameters to C<configure> is that the configuration file (by default)
takes precedence over any arguments passed to C<configure>.

That's all for now - next we will teach you about monitoring by writing a
simple chat client and server :)

=head1 PART 2: Monitoring, Supervising, Exception Handling and Recovery

That's a mouthful, so what does it mean? Our previous example is what one
could call "very loosely coupled" - the sender doesn't care about whether
there are any receivers, and the receivers do not care if there is any
sender.

This can work fine for simple services, but most real-world applications
want to ensure that the side they are expecting to be there is actually
there. Going one step further: most bigger real-world applications even
want to ensure that if some component is missing, or has crashed, it will
still be there, by recovering and restarting the service.

AnyEvent::MP supports this by catching exceptions and network problems,
and notifying interested parties of these.

=head2 Exceptions, Port Context, Network Errors and Monitors

=head3 Exceptions

Exceptions are handled on a per-port basis: all receive callbacks are
executed in a special context, the so-called I<port-context>: code
that throws an otherwise uncaught exception will cause the port to be
C<kil>led. Killed ports are destroyed automatically (killing ports is
actually the only way to free ports).

Ports can be monitored, even from a different node and host, and when a
port is killed, any entity monitoring it will be notified.

Here is a simple example:

  use AnyEvent::MP;

  # create a port, it always dies
  my $port = port { die "oops" };

  # monitor it
  mon $port, sub {
     warn "$port was killed (with reason @_)";
  };

  # now send it some message, causing it to die:
  snd $port;

  AnyEvent->condvar->recv;

It first creates a port whose only action is to throw an exception,
and the monitors it with the C<mon> function. Afterwards it sends it a
message, causing it to die and call the monitoring callback:

   anon/6WmIpj.a was killed (with reason die oops at xxx line 5.) at xxx line 9.

The callback was actually passed two arguments: C<die>, to indicate it
did throw an I<exception> as opposed to, say, a network error, and the
exception message itself.

What happens when a port is killed before we have a chance to monitor
it? Granted, this is highly unlikely in our example, but when you program
in a network this can easily happen due to races between nodes.

  use AnyEvent::MP;

  my $port = port { die "oops" };

  snd $port;

  mon $port, sub {
     warn "$port was killed (with reason @_)";
  };

  AnyEvent->condvar->recv;

This time we will get something else:

   2012-03-21 00:50:36 <2> unmonitored local port fADb died with reason: die oops at - line 3.
   anon/fADb was killed (with reason no_such_port cannot monitor nonexistent port)

The first line is an error message that is printed when a port dies that
isn't being monitored, because that is normally a bug. When later a C<mon>
is attempted, it is immediately killed, because the port is already
gone. The kill reason is now C<no_such_port> with some descriptive (we
hope) error message.

As you probably suspect from these examples, the kill reason is usually
some identifier as first argument and a human-readable error message as
second argument - all kill reasons by AnyEvent::MP itself follow this
pattern. But the kill reason can be anything: it is simply a list of
values you can choose yourself. It can even be nothing (an empty list) -
this is called a "normal" kill.

Apart from die'ing, you can kill ports manually using the C<kil>
function. Using the C<kil> function will be treated like an error when a
non-empty reason is specified:

   kil $port, custom_error => "don't like your steenking face";

And a I<normal> kill without any reason arguments:

   kil $port;

By now you probably wonder what this "normal" kill business is: A common
idiom is to not specify a callback to C<mon>, but another port, such as
C<$SELF>:

   mon $port, $SELF;

This basically means "monitor $port and kill me when it crashes" - and
the thing is, a "normal" kill does not count as a crash. This way you can
easily link ports together and make them crash together on errors, while
allowing you to remove a port silently when it has done it's job properly.

=head3 Port Context

Code runs in the so-called "port context". That means C<$SELF> contains
its own port ID and exceptions that the code throws will be caught.

Since AnyEvent::MP is event-based, it is not uncommon to register
callbacks from within C<rcv> handlers. As example, assume that the
following port receive handler wants to C<die> a second later, using
C<after>:

  my $port = port {
     after 1, sub { die "oops" };
  };

If you try this out, you would find it does not work - when the C<after>
callback is executed, it does not run in the port context anymore, so
exceptions will not be caught.

For these cases, AnyEvent::MP exports a special "closure constructor"
called C<psub>, which works mostly like perl's built-in C<sub>:

  my $port = port {
     after 1, psub { die "oops" };
  };

C<psub> remembers the port context and returns a code reference. When the
code reference is invoked, it will run the code block within the context
that it was created in, so exception handling once more works as expected.

There is even a way to temporarily execute code in the context of some
port, namely C<peval>:

  peval $port, sub {
     # die'ing here will kil $port
  };

The C<peval> function temporarily replaces C<$SELF> by the given C<$port>
and then executes the given sub in a port context.

=head3 Network Errors and the AEMP Guarantee

Earlier we mentioned another important source of monitoring failures:
network problems. When a node loses connection to another node, it will
invoke all monitoring actions, just as if the port was killed, I<even if
it is possible that the port is still happily alive on another node> (not
being able to talk to a node means we have no clue what's going on with
it, it could be crashed, but also still running without knowing we lost
the connection).

So another way to view monitors is: "notify me when some of my messages
couldn't be delivered". AEMP has a guarantee about message delivery to a
port:  After starting a monitor, any message sent to a port will either
be delivered, or, when it is lost, any further messages will also be lost
until the monitoring action is invoked. After that, further messages
I<might> get delivered again.

This doesn't sound like a very big guarantee, but it is kind of the best
you can get while staying sane: Specifically, it means that there will be
no "holes" in the message sequence: all messages sent are delivered in
order, without any of them missing in between, and when some were lost,
you I<will> be notified of that, so you can take recovery action.

And, obviously, the guarantee only works in the presence of
correctly-working hardware, and no relevant bugs inside AEMP itself.

=head3 Supervising

OK, so how is this crashing-everything-stuff going to make applications
I<more> stable? Well, in fact, the goal is not really to make them
more stable, but to make them more resilient against actual errors
and crashes. And this is not done by crashing I<everything>, but by
crashing everything except a I<supervisor> that then cleans up and sgtarts
everything again.

A supervisor is simply some code that ensures that an application (or a
part of it) is running, and if it crashes, is restarted properly. That is,
it supervises a service by starting and restarting it, as necessary.

To show how to do all this we will create a simple chat server that can
handle many chat clients. Both server and clients can be killed and
restarted, and even crash, to some extent, without disturbing the chat
functionality.

=head2 Chatting, the Resilient Way

Without further ado, here is the chat server (to run it, we assume the
set-up explained earlier, with a separate F<aemp run seed> node):

   use common::sense;
   use AnyEvent::MP;

   configure;

   my %clients;

   sub msg {
      print "relaying: $_[0]\n";
      snd $_, $_[0]
         for values %clients;
   }

   our $server = port;

   rcv $server, join => sub {
      my ($client, $nick) = @_;

      $clients{$client} = $client;

      mon $client, sub {
         delete $clients{$client};
         msg "$nick (quits, @_)";
      };
      msg "$nick (joins)";
   };

   rcv $server, privmsg => sub {
      my ($nick, $msg) = @_;
      msg "$nick: $msg";
   };

   db_set eg_chat_server => $server;

   warn "server ready.\n";

   AnyEvent->condvar->recv;

Looks like a lot, but it is actually quite simple: after your usual
preamble (this time we use common sense), we define a helper function that
sends some message to every registered chat client:

   sub msg {
      print "relaying: $_[0]\n";
      snd $_, $_[0]
         for values %clients;
   }

The clients are stored in the hash C<%client>. Then we define a server
port and install two receivers on it, C<join>, which is sent by clients
to join the chat, and C<privmsg>, that clients use to send actual chat
messages.

C<join> is most complicated. It expects the client port and the nickname
to be passed in the message, and registers the client in C<%clients>.

   rcv $server, join => sub {
      my ($client, $nick) = @_;

      $clients{$client} = $client;

The next step is to monitor the client. The monitoring action removes the
client and sends a quit message with the error to all remaining clients.

      mon $client, sub {
         delete $clients{$client};
         msg "$nick (quits, @_)";
      };

And finally, it creates a join message and sends it to all clients.

      msg "$nick (joins)";
   };

The C<privmsg> callback simply broadcasts the message to all clients:

   rcv $server, privmsg => sub {
      my ($nick, $msg) = @_;
      msg "$nick: $msg";
   };

And finally, the server registers itself in the server group, so that
clients can find it:

   db_set eg_chat_server => $server;

Well, well... and where is this supervisor stuff? Well... we cheated,
it's not there. To not overcomplicate the example, we only put it into
the..... CLIENT!

=head3 The Client, and a Supervisor!

Again, here is the client, including supervisor, which makes it a bit
longer:

   use common::sense;
   use AnyEvent::MP;

   my $nick = shift || "anonymous";

   configure;

   my ($client, $server);

   sub server_connect {
      my $db_mon;
      $db_mon = db_mon eg_chat_server => sub {
         return unless %{ $_[0] };
         undef $db_mon;

         print "\rconnecting...\n";

         $client = port { print "\r  \r@_\n> " };
         mon $client, sub {
            print "\rdisconnected @_\n";
            &server_connect;
         };

         $server = (keys %{ $_[0] })[0];                                      

         snd $server, join => $client, $nick;
         mon $server, $client;
      };
   }

   server_connect;

   my $w = AnyEvent->io (fh => 0, poll => 'r', cb => sub {
      chomp (my $line = <STDIN>);
      print "> ";
      snd $server, privmsg => $nick, $line
        if $server;
   });

   $| = 1;
   print "> ";
   AnyEvent->condvar->recv;

The first thing the client does is to store the nick name (which is
expected as the only command line argument) in C<$nick>, for further
usage.

The next relevant thing is... finally... the supervisor:

   sub server_connect {
      my $db_mon;
      $db_mon = db_mon eg_chat_server => sub {
         return unless %{ $_[0] };
         undef $db_mon; # stop monitoring

This monitors the C<eg_chat_server> database family. It waits until a
chat server becomes available. When that happens, it "connects" to it
by creating a client port that receives and prints chat messages, and
monitoring it:

      $client = port { print "\r  \r@_\n> " };
      mon $client, sub {
         print "\rdisconnected @_\n";
         &server_connect;
      };

If the client port dies (for whatever reason), the "supervisor" will start
looking for a server again - the semantics of C<db_mon> ensure that it
will immediately find it if there is a server port.

After this, everything is ready: the client will send a C<join> message
with its local port to the server, and start monitoring it:

      $server = (keys %{ $_[0] })[0];

      snd $server, join => $client, $nick;
      mon $server, $client;
   }

This second monitor will ensure that, when the server port crashes or goes
away (e.g. due to network problems), the client port will be killed as
well. This tells the user that the client was disconnected, and will then
start to connect the server again.

The rest of the program deals with the boring details of actually invoking
the supervisor function to start the whole client process and handle the
actual terminal input, sending it to the server.

Now... the "supervisor" in this example is a bit of a cheat - it doesn't
really clean up much (because the cleanup done by AnyEvent::MP suffices),
and there isn't much of a restarting action either - if the server isn't
there because it crashed, well, it isn't there.

In the real world, one would often add a timeout that would trigger when
the server couldn't be found within some time limit, and then complain,
or even try to start a new server. Or the supervisor would have to do
some real cleanups, such as rolling back database transactions when the
database thread crashes. For this simple chat server, however, this simple
supervisor works fine. Hopefully future versions of AnyEvent::MP will
offer some predefined supervisors, for now you will have to code it on
your own.

You should now try to start the server and one or more clients in different
terminal windows (and the seed node):

   perl eg/chat_client nick1
   perl eg/chat_client nick2
   perl eg/chat_server
   aemp run profile seed

And then you can experiment with chatting, killing one or more clients, or
stopping and restarting the server, to see the monitoring in action.

The crucial point you should understand from this example is that
monitoring is usually symmetric: when you monitor some other port,
potentially on another node, that other port usually should monitor you,
too, so when the connection dies, both ports get killed, or at least both
sides can take corrective action. Exceptions are "servers" that serve
multiple clients at once and might only wish to clean up, and supervisors,
who of course should not normally get killed (unless they, too, have a
supervisor).

If you often think in object-oriented terms, then you can think of a port
as an object: C<port> is the constructor, the receive callbacks set by
C<rcv> act as methods, the C<kil> function becomes the explicit destructor
and C<mon> installs a destructor hook. Unlike conventional object oriented
programming, it can make sense to exchange port IDs more freely (for
example, to monitor one port from another), because it is cheap to send
port IDs over the network, and AnyEvent::MP blurs the distinction between
local and remote ports.

Lastly, there is ample room for improvement in this example: the server
should probably remember the nickname in the C<join> handler instead of
expecting it in every chat message, it should probably monitor itself, and
the client should not try to send any messages unless a server is actually
connected.

=head1 PART 3: TIMTOWTDI: Virtual Connections

The chat system developed in the previous sections is very "traditional"
in a way: you start some server(s) and some clients statically and they
start talking to each other.

Sometimes applications work more like "services": They can run on almost
any node and even talk to copies of themselves on other nodes in case they
are distributed. The L<AnyEvent::MP::Global> service for example monitors
nodes joining the network and sometimes even starts itself on other nodes.

One good way to design such services is to put them into a module and
create "virtual connections" to other nodes. We call this the "bridge
head" method, because you start by I<creating a remote port> (the bridge
head) and from that you start to bootstrap your application.

Since that sounds rather theoretical, let us redesign the chat server and
client using this design method.

As usual, we start with the full program - here is the server:

   use common::sense;
   use AnyEvent::MP;

   configure;

   db_set eg_chat_server2 => $NODE;

   my %clients;

   sub msg {
      print "relaying: $_[0]\n";
      snd $_, $_[0]
         for values %clients;
   }

   sub client_connect {
      my ($client, $nick) = @_;

      mon $client;
      mon $client, psub {
         delete $clients{$client};
         msg "$nick (quits, @_)";
      };

      $clients{$client} = $client;

      msg "$nick (joins)";

      rcv $SELF, sub { msg "$nick: $_[0]" };
   }

   warn "server ready.\n";

   AnyEvent->condvar->recv;

It starts out not much different then the previous example, except that
this time, we register the node port in the database and not a port we
created - the clients only want to know which node the server should
be running on, and there can only be one such server (or service) per
node. In fact, the clients could also use some kind of election mechanism,
to find the node with lowest node ID, or lowest load, or something like
that.

The much more interesting difference to the previous server is that
indeed no server port is created - the server consists only of code,
and "does" nothing by itself. All it "does" is to define a function
named C<client_connect>, which expects a client port and a nick name as
arguments. It then monitors the client port and binds a receive callback
on C<$SELF>, which expects messages that in turn are broadcast to all
clients.

The two C<mon> calls are a bit tricky - the first C<mon> is a shorthand
for C<mon $client, $SELF>. The second does the normal "client has gone
away" clean-up action.

The last line, the C<rcv $SELF>, is a good hint that something interesting
is going on. And indeed, when looking at the client code, you can see a
new function, C<spawn>:
#todo#

   use common::sense;
   use AnyEvent::MP;

   my $nick = shift;

   configure;

   $| = 1;

   my $port = port;

   my ($client, $server);

   sub server_connect {
      my $servernodes = grp_get "eg_chat_server2"
         or return after 1, \&server_connect;

      print "\rconnecting...\n";

      $client = port { print "\r  \r@_\n> " };
      mon $client, sub {
         print "\rdisconnected @_\n";
         &server_connect;
      };

      $server = spawn $servernodes->[0], "::client_connect", $client, $nick;
      mon $server, $client;
   }

   server_connect;

   my $w = AnyEvent->io (fh => 0, poll => 'r', cb => sub {
      chomp (my $line = <STDIN>);
      print "> ";
      snd $server, $line
        if $server;
   });

   print "> ";
   AnyEvent->condvar->recv;

The client is quite similar to the previous one, but instead of contacting
the server I<port> (which no longer exists), it C<spawn>s (creates) a new
the server I<port on node>:

      $server = spawn $servernodes->[0], "::client_connect", $client, $nick;
      mon $server, $client;

And of course the first thing after creating it is monitoring it.

Phew, let's go through this in slow motion: the C<spawn> function creates
a new port on a remote node and returns its port ID. After creating
the port it calls a function on the remote node, passing any remaining
arguments to it, and - most importantly - executes the function within
the context of the new port, so it can be manipulated by referring to
C<$SELF>. The init function can reside in a module (actually it normally
I<should> reside in a module) - AnyEvent::MP will automatically load the
module if the function isn't defined.

The C<spawn> function returns immediately, which means you can instantly
send messages to the port, long before the remote node has even heard
of our request to create a port on it. In fact, the remote node might
not even be running. Despite these troubling facts, everything should
work just fine: if the node isn't running (or the init function throws an
exception), then the monitor will trigger because the port doesn't exist.

If the spawn message gets delivered, but the monitoring message is not
because of network problems (extremely unlikely, but monitoring, after
all, is implemented by passing a message, and messages can get lost), then
this connection loss will eventually trigger the monitoring action. On the
remote node (which in return monitors the client) the port will also be
cleaned up on connection loss. When the remote node comes up again and our
monitoring message can be delivered, it will instantly fail because the
port has been cleaned up in the meantime.

If your head is spinning by now, that's fine - just keep in mind, after
creating a port using C<spawn>, monitor it on the local node, and monitor
"the other side" from the remote node, and all will be cleaned up just
fine.

=head2 Services

Above it was mentioned that C<spawn> automatically loads modules. This can
be exploited in various useful ways.

Assume for a moment you put the server into a file called
F<mymod/chatserver.pm> reachable from the current directory. Then you
could run a node there with:

   aemp run

The other nodes could C<spawn> the server by using
C<mymod::chatserver::client_connect> as init function - without any other
configuration.

Likewise, when you have some service that starts automatically when loaded
(similar to AnyEvent::MP::Global), then you can configure this service
statically:

   aemp profile mysrvnode services mymod::service::
   aemp run profile mysrvnode

And the module will automatically be loaded in the node, as specifying a
module name (with C<::>-suffix) will simply load the module, which is then
free to do whatever it wants.

Of course, you can also do it in the much more standard way by writing
a module (e.g. C<BK::Backend::IRC>), installing it as part of a module
distribution and then configure nodes. For example, if I wanted to run the
Bummskraut IRC backend on a machine named "ruth", I could do this:

   aemp profile ruth addservice BK::Backend::IRC::

And any F<aemp run> on that host will automatically have the Bummskraut
IRC backend running.

There are plenty of possibilities you can use - it's all up to you how you
structure your application.

=head1 PART 4: Coro::MP - selective receive

Not all problems lend themselves naturally to an event-based solution:
sometimes things are easier if you can decide in what order you want to
receive messages, regardless of the order in which they were sent.

In these cases, L<Coro::MP> can provide a nice solution: instead of
registering callbacks for each message type, C<Coro::MP> attaches a
(coro-) thread to a port. The thread can then opt to selectively receive
messages it is interested in. Other messages are not lost, but queued, and
can be received at a later time.

The C<Coro::MP> module is not part of L<AnyEvent::MP>, but a separate
module. It is, however, tightly integrated into C<AnyEvent::MP> - the
ports it creates are fully compatible to C<AnyEvent::MP> ports.

In fact, C<Coro::MP> is more of an extension than a separate module: all
functions exported by C<AnyEvent::MP> are exported by it as well.

To illustrate how programing with C<Coro::MP> looks like, consider the
following (slightly contrived) example: Let's implement a server that
accepts a C<< (write_file =>, $port, $path) >> message with a (source)
port and a filename, followed by as many C<< (data => $port, $data) >>
messages as required to fill the file, followed by an empty C<< (data =>
$port) >> message.

The server only writes a single file at a time, other requests will stay
in the queue until the current file has been finished.

Here is an example implementation that uses L<Coro::AIO> and largely
ignores error handling:

   my $ioserver = port_async {
      while () {
         my ($tag, $port, $path) = get_cond;

         $tag eq "write_file"
            or die "only write_file messages expected";

         my $fh = aio_open $path, O_WRONLY|O_CREAT, 0666
            or die "$path: $!";

         while () {
            my (undef, undef, $data) = get_cond {
               $_[0] eq "data" && $_[1] eq $port
            } 5
               or die "timeout waiting for data message from $port\n";

            length $data or last;

            aio_write $fh, undef, undef, $data, 0;
         };
      }
   };

   mon $ioserver, sub {
      warn "ioserver was killed: @_\n";
   }; 

Let's go through it, section by section.

   my $ioserver = port_async {

Ports can be created by attaching a thread to an existing port via
C<rcv_async>, or as in this example, by calling C<port_async> with the
code to execute as a thread. The C<async> component comes from the fact
that threads are created using the C<Coro::async> function.

The thread runs in a normal port context (so C<$SELF> is set). In
addition, when the thread returns, it will be C<kil> I<normally>, i.e.
without a reason argument.

      while () {
         my ($tag, $port, $path) = get_cond;
            or die "only write_file messages expected";

The thread is supposed to serve many file writes, which is why it
executes in a loop. The first thing it does is fetch the next message,
using C<get_cond>, the "conditional message get". Without arguments, it
merely fetches the I<next> message from the queue, which I<must> be a
C<write_file> message.

The message contains the C<$path> to the file, which is then created:

         my $fh = aio_open $path, O_WRONLY|O_CREAT, 0666
            or die "$path: $!";

Then we enter a loop again, to serve as many C<data> messages as
necessary:

         while () {
            my (undef, undef, $data) = get_cond {
               $_[0] eq "data" && $_[1] eq $port
            } 5
               or die "timeout waiting for data message from $port\n";

This time, the condition is not empty, but instead a code block: similarly
to grep, the code block will be called with C<@_> set to each message in
the queue, and it has to return whether it wants to receive the message or
not.

In this case we are interested in C<data> messages (C<< $_[0] eq "data"
>>), whose first element is the source port (C<< $_[1] eq $port >>).

The condition must be this strict, as it is possible to receive both
C<write_file> messages and C<data> messages from other ports while we
handle the file writing.

The lone C<5> argument at the end is a timeout - when no matching message
is received within C<5> seconds, we assume an error and C<die>.

When an empty C<data> message is received we are done and can close the
file (which is done automatically as C<$fh> goes out of scope):

            length $data or last;

Otherwise we need to write the data:

            aio_write $fh, undef, undef, $data, 0;

And that's basically it. Note that every port thread should have some
kind of supervisor. In our case, the supervisor simply prints any error
message:

   mon $ioserver, sub {
      warn "ioserver was killed: @_\n";
   }; 

Here is a usage example:

   port_async {
      snd $ioserver, write_file => $SELF, "/tmp/unsafe";
      snd $ioserver, data => $SELF, "abc\n";
      snd $ioserver, data => $SELF, "def\n";
      snd $ioserver, data => $SELF;
   }; 

The messages are sent without any flow control or acknowledgement (feel
free to improve). Also, the source port does not actually need to be a
port - any unique ID will do - but port identifiers happen to be a simple
source of network-wide unique IDs.

Apart from C<get_cond> as seen above, there are other ways to receive
messages. The C<write_file> message above could also selectively be
received using a C<get> call:

   my ($port, $path) = get "write_file";

This is simpler, but when some other code part sends an unexpected message
to the C<$ioserver> it will stay in the queue forever. As a rule of thumb,
every threaded port should have a "fetch next message unconditionally"
somewhere, to avoid filling up the queue.

Finally, it is also possible to use more switch-like C<get_conds>:

  get_cond {
     $_[0] eq "msg1" and return sub {
        my (undef, @msg1_data) = @_;
        ...;
     };

     $_[0] eq "msg2" and return sub {
        my (undef, @msg2_data) = @_;
        ...;
     };

     die "unexpected message $_[0] received";
  };

=head1 THE END

This is the end of this introduction, but hopefully not the end of
your career as AEMP user. I hope the tutorial was enough to make the
basic concepts clear. Keep in mind that distributed programming is not
completely trivial, in fact, it's pretty complicated. We hope AEMP makes
it simpler and will be useful to create exciting new applications.

=head1 SEE ALSO

L<AnyEvent::MP>

L<AnyEvent::MP::Global>

L<Coro::MP>

L<AnyEvent>

=head1 AUTHOR

  Robin Redeker <elmex@ta-sa.org>
  Marc Lehmann <schmorp@schmorp.de>



Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.