Group
Extension

AnyEvent-RabbitMQ-RPC/lib/AnyEvent/RabbitMQ/RPC.pm

package AnyEvent::RabbitMQ::RPC;

use strict;
use warnings;

use AnyEvent::RabbitMQ;
use Try::Tiny;

our $VERSION = '0.5';

sub new {
    my $class = shift;
    my %args = @_;

    my $self = bless {}, $class;

    my $cv = AE::cv;
    my $success = $args{on_success} || $cv;
    my $failure = $args{on_failure} || sub {
        warn "@_";
        $cv->(undef);
    };

    $self->{connection} = $args{connection};
    my $channel = sub {
        $self->connection->open_channel(
            on_success => sub {
                $self->{channel} = shift;
                $self->{channel}->qos;
                $success->($self);
            },
            on_failure => sub {
                $failure->("Channel failed: @_");
            }
        );
    };
    if ($self->connection) {
        $channel->();
    } else {
        AnyEvent::RabbitMQ->load_xml_spec;
        $self->{connection} = AnyEvent::RabbitMQ->new(timeout => 1, verbose => 0);
        $self->connection->connect(
            %args,
            on_success => $channel,
            on_failure => sub {
                $failure->("Connect failed: @_");
            }
        );
    }

    $args{serialize} ||= '';
    if ($args{serialize} eq "YAML") {
        require YAML::Any;
        $self->{serialize}   = \&YAML::Any::Dump;
        $self->{unserialize} = \&YAML::Any::Load;
    } elsif ($args{serialize} eq "JSON") {
        require JSON::Any;
        JSON::Any->import;
        my $json = JSON::Any->new;
        $self->{serialize}   = sub { $json->objToJson( [@_] ) };
        $self->{unserialize} = sub { (@{ $json->jsonToObj(@_) })[0] };
    } elsif ($args{serialize} eq "Storable") {
        require Storable;
        $self->{serialize}   = sub { Storable::nfreeze( [@_] )};
        $self->{unserialize} = sub { (@{ Storable::thaw(@_) })[0] };
    }

    # If they have a callback waiting for them, bail now
    return if $args{on_success};

    # Otherwise, block on having set up the channel
    return $cv->recv;
}

sub connection {
    my $self = shift;
    return $self->{connection};
}

sub channel {
    my $self = shift;
    return $self->{channel};
}

sub rpc_queue {
    my $self = shift;
    my %args = @_;

    # These queues are durable -- as such, we should only need to check
    # that they are there once per process.
    return $args{on_success}->()
        if $self->{queues}{$args{queue}};

    $self->channel->declare_queue(
        no_ack     => 0,
        durable    => 1,
        exclusive  => 0,
        %args,
        on_success => sub {
            $self->{queues}{$args{queue}}++;
            $args{on_success}->();
        },
    );
}

sub reply_queue {
    my $self = shift;
    my %args = @_;

    $self->channel->declare_queue(
        no_ack     => 1,
        durable    => 0,
        exclusive  => 1,
        on_success => sub {
            $args{on_success}->(shift->method_frame->queue);
        },
        on_failure => $args{on_failure},
    );
}

sub register {
    my $self = shift;
    my %args = (
        name => undef,
        run  => sub {},
        on_failure => sub { warn "Failure: @_" },
        @_
    );

    # Ensure we have the queue
    $self->rpc_queue(
        queue      => $args{name},
        on_success => sub {
            # And set up a listen on it
            $self->channel->consume(
                queue      => $args{name},
                no_ack     => 0,
                on_consume => sub {
                    my $frame = shift;
                    my $failed;
                    my $args = $frame->{body}->payload;
                    if ($self->{unserialize}) {
                        try {
                            $args = $self->{unserialize}->($args);
                        } catch {
                            $failed = 1;
                            $args{on_failure}->("Unserialization failed: $_");
                        };
                        return if $failed;
                    }

                    # Call the sub
                    my $return;
                    try {
                        $return = $args{run}->( $args );
                    } catch {
                        $failed = 1;
                        $args{on_failure}->("Call died: $_");
                    };
                    return if $failed;

                    # Send the response, if they asked for it
                    if (my $reply_to = $frame->{header}->reply_to) {
                        if ($self->{serialize}) {
                            try {
                                $return = $self->{serialize}->($return);
                            } catch {
                                $failed = 1;
                                $args{on_failure}->("Serialization failed: $_");
                            };
                            return if $failed;
                        }

                        $return = "0E0" if not $return;
                        $self->channel->publish(
                            exchange => '',
                            routing_key => $reply_to,
                            body => $return,
                        );
                    }

                    # And finally mark the task as complete
                    $self->channel->ack;
                },
                on_failure => $args{on_failure},
            );
        },
        on_failure => $args{on_failure},
    );
}

sub call {
    my $self = shift;

    my %args = (
        name => undef,
        args => undef,
        on_sent => undef,
        on_failure => sub { warn "RPC Failure: @_" },
        @_
    );

    my $finished;
    if (defined wantarray and not $args{on_reply}) {
        # We we're called in a not-void context, and without a reply
        # callback, assume this is a syncronous call, and set up
        # $finished to block on the reply
        $args{on_reply} = $finished = AE::cv;
        my $fail = $args{on_failure};
        $args{on_failure} = sub {
            $fail->(@_) if $fail;
            $finished->send(undef);
        }
    }

    my $sent_failure = $args{on_sent} ? sub {
        $args{on_sent}->(0);
        $args{on_failure}->(@_);
    } : $args{on_failure};

    my $send; $send = sub {
        my $REPLIES = shift;
        my $args = $args{args};
        if ($self->{serialize}) {
            my $failed;
            try {
                $args = $self->{serialize}->($args);
            } catch {
                $failed = 1;
                $args{on_failure}->("Serialization failed: $_");
            };
            return if $failed;
        }
        $args = "0E0" if not $args;
        $self->channel->publish(
            exchange    => '',
            routing_key => $args{name},
            body        => $args,
            header => {
                ($REPLIES ? (reply_to => $REPLIES) : ()),
                delivery_mode => 2, # Persistent storage
            },
        );
        $args{on_sent}->(1) if $args{on_sent};
    };

    unless ($args{on_reply}) {
        # Fire and forget
        $self->rpc_queue(
            queue      => $args{name},
            on_success => sub { $send->(undef) },
            on_failure => $sent_failure,
        );
        return;
    }

    # We need to set up an ephemeral reply queue
    $self->rpc_queue(
        queue      => $args{name},
        on_success => sub {
            $self->reply_queue(
                on_success => sub {
                    my $REPLIES = shift;
                    $self->channel->consume(
                        queue => $REPLIES,
                        no_ack => 1,
                        on_consume => sub {
                            my $frame = shift;
                            # We got a reply, tear down our reply queue
                            $self->channel->delete_queue(
                                queue => $REPLIES,
                            );
                            my $return = $frame->{body}->payload;
                            if ($self->{unserialize}) {
                                my $failed;
                                try {
                                    $return = $self->{unserialize}->($return);
                                } catch {
                                    $args{on_failure}->("Unserialization failed: $_");
                                    $failed = 1;
                                };
                                return if $failed;
                            }
                            $args{on_reply}->($return);
                        },
                        on_success => sub { $send->($REPLIES) },
                        on_failure => $sent_failure,
                    );
                },
                on_failure => $sent_failure,
            );
        },
        on_failure => $sent_failure,
    );

    return $finished->recv if $finished;
    return 1;
}

1;

__END__

=head1 NAME

AnyEvent::RabbitMQ::RPC - RPC queues via RabbitMQ

=head1 SYNOPSIS

    use AnyEvent::RabbitMQ::RPC;

    my $rpc = AnyEvent::RabbitMQ::RPC->new(
        host   => 'localhost',
        port   => 5672,
        user   => 'guest',
        pass   => 'guest',
        vhost  => '/',
        serialize => 'Storable',
    );

    print $rpc->call(
        name => 'MethodName',
        args => { some => "data" },
    );

=head1 DESCRIPTION

C<AnyEvent::RabbitMQ::RPC> provides an AnyEvent-based reliable job queue
atop the RabbitMQ event server.  This can be used as a replacement for
similar reliable job queue/RPC client-worker models, such as
L<TheSchwartz>.

RPC classes can L<register> calls that they can handle, and/or use
L<call> to request another client perform work.

=head1 METHODS

=head2 new

Create a new RPC object.  Either an existing L<AnyEvent::RabbitMQ>
object can be passed using the C<connection> argument, or the all of the
provided parameters will be passed through to
L<AnyEvent::RabbitMQ/connect> on a new object.  In the latter case,
common parameters include C<host>, C<port>, C<user>, C<pass>, and
C<vhost>.

If you wish to pass complex data structures back and forth to remote
workers, a value must be passed for C<serialize>.  Both worker and
client must be configured to use the same serialization method.  The
available options are:

=over

=item YAML

Use L<YAML::Any/Dump> and L<YAML::Any/Load> to serialize and deserialize
data.

=item JSON

Use L<JSON::Any/objToJson> and L<JSON::Any/jsonToObj> to serialize and
deserialize.

=item Storable

Use L<Storable/nfreeze> and L<Storable/thaw> to serialize and
deserialize.

=back

Two callback points, C<on_success> and C<on_failure>, are provided.
C<on_success> will be passed the initialized L<AnyEvent::RabbitMQ::RPC>
object; C<on_failure> will be passed the reason for the failure.  If no
C<on_success> is provided, this call will block using an
L<AnyEvent::CondVar> until the connection is established.


=head2 register name => C<STRING>, run => C<SUBREF>

Establishes that the current process knows how to run the job named
C<STRING>, whose definition is provided by the surboutine C<SUBREF>.
The subroutine will be called whenever a job is removed from the queue;
it will be called with the argument passed to C</call>, which may be
more than a string if C<serialize> was set during L</new>.

Due to a limitation of C<AnyEvent::RabbitMQ>, false values returned by
the subroutine are transformed into the true-valued string C<0E0>.
Subroutines which fail to execute to completion (via C<die> or other
runtime execution failure) will re-insert the job into the queue for the
next worker to process.

Returning non-string values requires that both worker and client have
been created with the same (non-empty) value of C<serialize> passed to
L</new>.

A callback C<on_failure> may optionally be passed, which will be called
with an error message if suitable channels cannoot be configured on the
RabbitMQ server.

=head2 call name => C<STRING>, args => C<VALUE>

Submits a job to the job queue.  The C<VALUE> provided must be a string,
unless C<serialize> was passed to L</new>.

Three callbacks exist:

=over

=item on_reply

Called when the job has been completed successfully, and will be passed
the return value of the job.  Returning non-string values requires that
both worker and client have been created with the same (non-empty) value
of C<serialize> passed to L</new>.

=item on_sent

Called once the job has been submitted, with a true value if the job was
submitted successfully.  A false value will be passed if the requisite
channels could not be configured, and the job was not submitted
sucessfully.

=item on_failure

Called if there was an error submitting the job, and is passed the
reason for the failure.  If C<on_sent> was also provided, this is
I<also> called after C<on_sent> is called with a false value.

=back


If no value for C<on_reply> is provided, and the C<call> function is not
in void context, a C<AnyEvent::CondVar> is used to automatically block
until a reply is received from a worker; the return value of the reply
is then returned from L</call>.


=head2 connection

Returns the L<AnyEvent::RabbitMQ> connection used by this object.

=head2 channel

Returns the L<AnyEvent::RabbitMQ::Channel> used by this object.

=head2 rpc_queue queue => C<NAME>, on_success => C<CALLBACK>

Creates the queue with the given name, used to schedule jobs.  These
queues are durable, and thus persist across program invocations and
RabbitMQ restarts.

The C<on_success> callback is called once the queue is known to exist.
The C<on_failure> may alternately be called with a reason if the queue
creation fails.

=head2 reply_queue on_success => C<CALLBACK>

Creates a temporary queue used to reply to job requests.  These queues
are anonymous and ephemeral, and are torn down after each RPC call.

The C<on_success> callback is called with the name of the queue that has
been created.  The C<on_failure> may alternately be called with a reason
if the queue creation fails.


=head1 AUTHOR

Alex Vandiver C<< <alexmv@bestpractical.com> >>

=head1 BUGS

All bugs should be reported via
L<http://rt.cpan.org/Public/Dist/Display.html?Name=AnyEvent-RabbitMQ-RPC>
or L<bug-AnyEvent-RabbitMQ-RPC@rt.cpan.org>.


=head1 LICENSE AND COPYRIGHT

This software is Copyright (c) 2012 by Best Practical Solutions

This is free software, licensed under:

  The GNU General Public License, Version 2, June 1991

=cut


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.