Mojo-RabbitMQ-Client/lib/Mojo/RabbitMQ/Client.pm
package Mojo::RabbitMQ::Client;
use Mojo::Base 'Mojo::EventEmitter';
use Carp qw(croak confess);
use Mojo::URL;
use Mojo::Home;
use Mojo::IOLoop;
use Mojo::Parameters;
use Mojo::Promise;
use Mojo::Util qw(url_unescape dumper);
use List::Util qw(none);
use Scalar::Util qw(blessed weaken);
use File::Basename 'dirname';
use File::ShareDir qw(dist_file);
use Net::AMQP;
use Net::AMQP::Common qw(:all);
use Mojo::RabbitMQ::Client::Channel;
use Mojo::RabbitMQ::Client::LocalQueue;
require Mojo::RabbitMQ::Client::Consumer;
require Mojo::RabbitMQ::Client::Publisher;
our $VERSION = "0.3.1";
use constant DEBUG => $ENV{MOJO_RABBITMQ_DEBUG} // 0;
has is_open => 0;
has url => undef;
has tls => sub { shift->_uri_handler('tls') };
has user => sub { shift->_uri_handler('user') };
has pass => sub { shift->_uri_handler('pass') };
has host => sub { shift->_uri_handler('host') };
has port => sub { shift->_uri_handler('port') };
has vhost => sub { shift->_uri_handler('vhost') };
has params => sub { shift->_uri_handler('params') // Mojo::Parameters->new };
has connect_timeout => sub { $ENV{MOJO_CONNECT_TIMEOUT} // 10 };
has heartbeat_timeout => 60;
has heartbeat_received => 0; # When did we receive last heartbeat
has heartbeat_sent => 0; # When did we sent last heartbeat
has ioloop => sub { Mojo::IOLoop->singleton };
has max_buffer_size => 16384;
has max_channels => 0;
has queue => sub { Mojo::RabbitMQ::Client::LocalQueue->new };
has channels => sub { {} };
has stream_id => undef;
sub connect {
my $self = shift;
$self->{buffer} = '';
my $id;
$id = $self->_connect(sub { $self->_connected($id) });
$self->stream_id($id);
return $id;
}
sub connect_p {
my $self = shift;
my $promise = Mojo::Promise->new;
my $id;
weaken $self;
my $handler = sub {
my ($err) = @_;
if (defined $err) {
return $promise->reject($err);
}
return $promise->resolve($self);
};
$id = $self->_connect(sub { $self->_connected($id, $handler) });
$self->stream_id($id);
return $promise;
}
sub consumer {
my ($class, @params) = @_;
croak "consumer is a static method" if ref $class;
return Mojo::RabbitMQ::Client::Consumer->new(@params);
}
sub publisher {
my ($class, @params) = @_;
croak "publisher is a static method" if ref $class;
return Mojo::RabbitMQ::Client::Publisher->new(@params);
}
sub param {
my $self = shift;
return undef unless defined $self->params;
return $self->params->param(@_);
}
sub add_channel {
my $self = shift;
my $channel = shift;
my $id = $channel->id;
if ($id and $self->channels->{$id}) {
return $channel->emit(
error => 'Channel with id: ' . $id . ' already defined');
}
if ($self->max_channels > 0
and scalar keys %{$self->channels} >= $self->max_channels)
{
return $channel->emit(error => 'Maximum number of channels reached');
}
if (not $id) {
for my $candidate_id (1 .. (2**16 - 1)) {
next if defined $self->channels->{$candidate_id};
$id = $candidate_id;
last;
}
unless ($id) {
return $channel->emit(error => 'Ran out of channel ids');
}
}
$self->channels->{$id} = $channel->id($id)->client($self);
weaken $channel->{client};
return $channel;
}
sub acquire_channel_p {
my $self = shift;
my $promise = Mojo::Promise->new;
my $channel = Mojo::RabbitMQ::Client::Channel->new();
$channel->catch(sub { $promise->reject(@_); undef $promise });
$channel->on(close => sub { warn "Channel closed" });
$channel->on(open => sub { $promise->resolve(@_); undef $promise });
$self->open_channel($channel);
return $promise;
}
sub open_channel {
my $self = shift;
my $channel = shift;
return $channel->emit(error => 'Client connection not opened')
unless $self->is_open;
$self->add_channel($channel)->open;
return $self;
}
sub delete_channel {
my $self = shift;
return delete $self->channels->{shift};
}
sub close {
my $self = shift;
weaken $self;
$self->_write_expect(
'Connection::Close' => {},
'Connection::CloseOk' => sub {
warn "-- Connection::CloseOk\n" if DEBUG;
$self->emit('close');
$self->_close;
},
sub {
$self->_close;
}
);
}
sub _loop { $_[0]->ioloop }
sub _error {
my ($self, $id, $err) = @_;
$self->emit(error => $err);
}
sub _uri_handler {
my $self = shift;
my $attr = shift;
return undef unless defined $self->url;
$self->url(Mojo::URL->new($self->url))
unless blessed $self->url && $self->url->isa('Mojo::URL');
# Set some defaults
my %defaults = (
tls => 0,
user => undef,
pass => undef,
host => 'localhost',
port => 5672,
vhost => '/',
params => undef
);
# Check secure scheme in url
$defaults{tls} = 1
if $self->url->scheme
=~ /^(amqp|rabbitmq)s$/; # Fallback support for rabbitmq scheme name
$defaults{port} = 5671 if $defaults{tls};
# Get host & port
$defaults{host} = $self->url->host
if defined $self->url->host && $self->url->host ne '';
$defaults{port} = $self->url->port if defined $self->url->port;
# Get user & password
my $userinfo = $self->url->userinfo;
if (defined $userinfo) {
my ($user, $pass) = split /:/, $userinfo;
$defaults{user} = $user;
$defaults{pass} = $pass;
}
my $vhost = url_unescape $self->url->path;
$vhost =~ s|^/(.+)$|$1|;
$defaults{vhost} = $vhost if defined $vhost && $vhost ne '';
# Query params
my $params = $defaults{params} = $self->url->query;
# Handle common aliases to internal names
my %aliases = (
cacertfile => 'ca',
certfile => 'cert',
keyfile => 'key',
fail_if_no_peer_cert => 'verify',
connection_timeout => 'timeout'
);
$params->param($aliases{$_}, $params->param($_))
foreach grep { defined $params->param($_) } keys %aliases;
# Some query parameters are translated to attribute values
my %attributes = (
heartbeat_timeout => 'heartbeat',
connect_timeout => 'timeout',
max_channels => 'channel_max'
);
$self->$_($params->param($attributes{$_}))
foreach grep { defined $params->param($attributes{$_}) } keys %attributes;
# Set all
$self->$_($defaults{$_}) foreach keys %defaults;
return $self->$attr;
}
sub _close {
my $self = shift;
$self->_loop->stream($self->stream_id)->close_gracefully;
}
sub _handle {
my ($self, $id, $close) = @_;
$self->emit('disconnect');
$self->_loop->remove($id);
}
sub _read {
my ($self, $id, $chunk) = @_;
warn "<- @{[dumper $chunk]}" if DEBUG;
$self->{buffer} .= $chunk;
$self->_parse_frames;
return;
}
sub _parse_frames {
my $self = shift;
for my $frame (Net::AMQP->parse_raw_frames(\$self->{buffer})) {
if ($frame->isa('Net::AMQP::Frame::Heartbeat')) {
$self->heartbeat_received(time());
}
elsif ($frame->isa('Net::AMQP::Frame::Method')
and $frame->method_frame->isa('Net::AMQP::Protocol::Connection::Close'))
{
$self->is_open(0);
$self->_write_frame(Net::AMQP::Protocol::Connection::CloseOk->new());
$self->emit(disconnect => "Server side disconnection: "
. $frame->method_frame->{reply_text});
}
elsif ($frame->channel == 0) {
$self->queue->push($frame);
}
else {
my $channel = $self->channels->{$frame->channel};
if (defined $channel) {
$channel->_push_queue_or_consume($frame);
}
else {
$self->emit(
error => "Unknown channel id received: "
. ($frame->channel // '(undef)'),
$frame
);
}
}
}
}
sub _connect {
my ($self, $cb) = @_;
# Options
# Parse according to (https://www.rabbitmq.com/uri-spec.html)
my $options = {
address => $self->host,
port => $self->port,
timeout => $self->connect_timeout,
tls => $self->tls,
tls_ca => scalar $self->param('ca'),
tls_cert => scalar $self->param('cert'),
tls_key => scalar $self->param('key')
};
my $verify = $self->param('verify');
$options->{tls_verify} = hex $verify if defined $verify;
# Connect
weaken $self;
my $id;
return $id = $self->_loop->client(
$options => sub {
my ($loop, $err, $stream) = @_;
# Connection error
return unless $self;
return $self->_error($id, $err) if $err;
$self->emit(connect => $stream);
# Connection established
$stream->on(timeout => sub { $self->_error($id, 'Inactivity timeout') });
$stream->on(close => sub { $self && $self->_handle($id, 1) });
$stream->on(error => sub { $self && $self->_error($id, pop) });
$stream->on(read => sub { $self && $self->_read($id, pop) });
$cb->();
}
);
}
sub _connected {
my ($self, $id, $cb) = @_;
# Inactivity timeout
my $stream = $self->_loop->stream($id)->timeout(0);
# Store connection information in transaction
my $handle = $stream->handle;
# Detect that xml spec was already loaded
my $loaded = eval { Net::AMQP::Protocol::Connection::StartOk->new; 1 };
unless ($loaded) { # Load AMQP specs
my $file = "amqp0-9-1.stripped.extended.xml";
# Original spec is in "fixed_amqp0-8.xml"
my $share = dist_file('Mojo-RabbitMQ-Client', $file);
Net::AMQP::Protocol->load_xml_spec($share);
}
$self->_write($id => Net::AMQP::Protocol->header);
weaken $self;
$self->_expect(
'Connection::Start' => sub {
my $frame = shift;
my @server_mechanisms = split /\s/, $frame->method_frame->mechanisms;
my $param_mechanism = $self->param('auth_mechanism') // '';
my @client_mechanisms = ('AMQPLAIN', 'EXTERNAL');
@client_mechanisms = ($param_mechanism) if ($param_mechanism);
warn "-- Server mechanisms: @server_mechanisms\n" if DEBUG;
warn "-- Client mechanisms: @client_mechanisms\n" if DEBUG;
my $mechanism;
for my $cand (@client_mechanisms) {
if (grep { $_ eq $cand } @server_mechanisms) {
$mechanism = $cand;
last;
}
}
return $self->emit(error => 'No authentication mechanism could be negotiated')
unless $mechanism;
my @locales = split /\s/, $frame->method_frame->locales;
return $self->emit(error => 'en_US is not found in locales')
if none { $_ eq 'en_US' } @locales;
$self->{_server_properties} = $frame->method_frame->server_properties;
warn "-- Connection::Start {product: " . $self->{_server_properties}->{product} . ", version: " . $self->{_server_properties}->{version} . "}\n" if DEBUG;
$self->_write_frame(
Net::AMQP::Protocol::Connection::StartOk->new(
client_properties => {
platform => 'Perl',
product => __PACKAGE__,
information => 'https://github.com/inway/mojo-rabbitmq-client',
version => __PACKAGE__->VERSION,
},
mechanism => $mechanism,
response => {LOGIN => $self->user, PASSWORD => $self->pass},
locale => 'en_US',
),
);
$self->_tune($id, $cb);
},
sub {
$self->emit(error => 'Unable to start connection: ' . shift);
}
);
}
sub _tune {
my ($self, $id, $cb) = @_;
weaken $self;
$self->_expect(
'Connection::Tune' => sub {
my $frame = shift;
my $method_frame = $frame->method_frame;
$self->max_buffer_size($method_frame->frame_max);
my $heartbeat = $self->heartbeat_timeout || $method_frame->heartbeat;
warn "-- Connection::Tune {frame_max: " . $method_frame->frame_max . ", heartbeat: " . $method_frame->heartbeat . "}\n" if DEBUG;
# Confirm
$self->_write_frame(
Net::AMQP::Protocol::Connection::TuneOk->new(
channel_max => $method_frame->channel_max,
frame_max => $method_frame->frame_max,
heartbeat => $heartbeat,
),
);
# According to https://www.rabbitmq.com/amqp-0-9-1-errata.html
# The client should start sending heartbeats after receiving a Connection.Tune
# method, and start monitoring heartbeats after sending Connection.Open.
# -and-
# Heartbeat frames are sent about every timeout / 2 seconds. After two missed
# heartbeats, the peer is considered to be unreachable.
$self->{heartbeat_tid} = $self->_loop->recurring(
$heartbeat / 2 => sub {
return unless time() - $self->heartbeat_sent > $heartbeat / 2;
$self->_write_frame(Net::AMQP::Frame::Heartbeat->new());
$self->heartbeat_sent(time());
}
) if $heartbeat;
$self->_write_expect(
'Connection::Open' =>
{virtual_host => $self->vhost, capabilities => '', insist => 1,},
'Connection::OpenOk' => sub {
warn "-- Connection::OpenOk\n" if DEBUG;
$self->is_open(1);
$self->emit('open');
$cb->() if defined $cb;
},
sub {
my $err = shift;
$self->emit(error => 'Unable to open connection: ' . $err);
$cb->($err) if defined $cb;
}
);
},
sub {
$self->emit(error => 'Unable to tune connection: ' . shift);
}
);
}
sub _write_expect {
my $self = shift;
my ($method, $args, $exp, $cb, $failure_cb, $channel_id) = @_;
$method = 'Net::AMQP::Protocol::' . $method;
$channel_id ||= 0;
my $method_frame = Net::AMQP::Frame::Method->new(
method_frame => $method->new(%$args)
);
$self->_write_frame(
$method_frame,
$channel_id
);
return $self->_expect($exp, $cb, $failure_cb, $channel_id);
}
sub _expect {
my $self = shift;
my ($exp, $cb, $failure_cb, $channel_id) = @_;
my @expected = ref($exp) eq 'ARRAY' ? @$exp : ($exp);
$channel_id ||= 0;
my $queue;
if (!$channel_id) {
$queue = $self->queue;
}
else {
my $channel = $self->channels->{$channel_id};
if (defined $channel) {
$queue = $channel->queue;
}
else {
$failure_cb->(
"Unknown channel id received: " . ($channel_id // '(undef)'));
}
}
return unless $queue;
$queue->get(
sub {
my $frame = shift;
return $failure_cb->("Received data is not method frame")
if not $frame->isa("Net::AMQP::Frame::Method");
my $method_frame = $frame->method_frame;
for my $exp (@expected) {
return $cb->($frame)
if $method_frame->isa("Net::AMQP::Protocol::" . $exp);
}
$failure_cb->("Method is not "
. join(', ', @expected)
. ". It's "
. ref($method_frame));
}
);
}
sub _write_frame {
my $self = shift;
my $id = $self->stream_id;
my ($out, $channel, $cb) = @_;
if ($out->isa('Net::AMQP::Protocol::Base')) {
$out = $out->frame_wrap;
}
$out->channel($channel // 0);
return $self->_write($id, $out->to_raw_frame, $cb);
}
sub _write {
my $self = shift @_;
my $id = shift @_;
my $frame = shift @_;
my $cb = shift @_;
warn "-> @{[dumper $frame]}" if DEBUG;
utf8::downgrade($frame);
$self->_loop->stream($id)->write($frame => $cb)
if defined $self->_loop->stream($id);
}
sub DESTROY {
my $self = shift;
my $ioloop = $self->ioloop or return;
my $heartbeat_tid = $self->{heartbeat_tid};
$ioloop->remove($heartbeat_tid) if $heartbeat_tid;
}
1;
=encoding utf8
=head1 NAME
Mojo::RabbitMQ::Client - Mojo::IOLoop based RabbitMQ client
=head1 SYNOPSIS
use Mojo::RabbitMQ::Client;
# Supply URL according to (https://www.rabbitmq.com/uri-spec.html)
my $client = Mojo::RabbitMQ::Client->new(
url => 'amqp://guest:guest@127.0.0.1:5672/');
# Catch all client related errors
$client->catch(sub { warn "Some error caught in client"; });
# When connection is in Open state, open new channel
$client->on(
open => sub {
my ($client) = @_;
# Create a new channel with auto-assigned id
my $channel = Mojo::RabbitMQ::Client::Channel->new();
$channel->catch(sub { warn "Error on channel received"; });
$channel->on(
open => sub {
my ($channel) = @_;
$channel->qos(prefetch_count => 1)->deliver;
# Publish some example message to test_queue
my $publish = $channel->publish(
exchange => 'test',
routing_key => 'test_queue',
body => 'Test message',
mandatory => 0,
immediate => 0,
header => {}
);
# Deliver this message to server
$publish->deliver;
# Start consuming messages from test_queue
my $consumer = $channel->consume(queue => 'test_queue');
$consumer->on(message => sub { say "Got a message" });
$consumer->deliver;
}
);
$channel->on(close => sub { $log->error('Channel closed') });
$client->open_channel($channel);
}
);
# Start connection
$client->connect();
# Start Mojo::IOLoop if not running already
Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
=head2 CONSUMER
use Mojo::RabbitMQ::Client;
my $consumer = Mojo::RabbitMQ::Client->consumer(
url => 'amqp://guest:guest@127.0.0.1:5672/?exchange=mojo&queue=mojo',
defaults => {
qos => {prefetch_count => 1},
queue => {durable => 1},
consumer => {no_ack => 0},
}
);
$consumer->catch(sub { die "Some error caught in Consumer" } );
$consumer->on('success' => sub { say "Consumer ready" });
$consumer->on(
'message' => sub {
my ($consumer, $message) = @_;
$consumer->channel->ack($message)->deliver;
}
);
$consumer->start();
Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
=head2 PUBLISHER
use Mojo::RabbitMQ::Client;
my $publisher = Mojo::RabbitMQ::Client->publisher(
url => 'amqp://guest:guest@127.0.0.1:5672/?exchange=mojo&routing_key=mojo'
);
$publisher->publish('plain text');
$publisher->publish(
{encode => { to => 'json'}},
routing_key => 'mojo_mq'
)->then(sub {
say "Message published";
})->catch(sub {
die "Publishing failed"
})->wait;
=head1 DESCRIPTION
L<Mojo::RabbitMQ::Client> is a rewrite of L<AnyEvent::RabbitMQ> to work on top of L<Mojo::IOLoop>.
=head1 EVENTS
L<Mojo::RabbitMQ::Client> inherits all events from L<Mojo::EventEmitter> and can emit the
following new ones.
=head2 connect
$client->on(connect => sub {
my ($client, $stream) = @_;
...
});
Emitted when TCP/IP connection with RabbitMQ server is established.
=head2 open
$client->on(open => sub {
my ($client) = @_;
...
});
Emitted AMQP protocol Connection.Open-Ok method is received.
=head2 close
$client->on(close => sub {
my ($client) = @_;
...
});
Emitted on reception of Connection.Close-Ok method.
=head2 disconnect
$client->on(close => sub {
my ($client) = @_;
...
});
Emitted when TCP/IP connection gets disconnected.
=head1 ATTRIBUTES
L<Mojo::RabbitMQ::Client> has following attributes.
=head2 tls
my $tls = $client->tls;
$client = $client->tls(1)
Force secure connection. Default is disabled (C<0>).
=head2 user
my $user = $client->user;
$client = $client->user('guest')
Sets username for authorization, by default it's not defined.
=head2 pass
my $pass = $client->pass;
$client = $client->pass('secret')
Sets user password for authorization, by default it's not defined.
=head2 host
my $host = $client->host;
$client = $client->host('localhost')
Hostname or IP address of RabbitMQ server. Defaults to C<localhost>.
=head2 port
my $port = $client->port;
$client = $client->port(1234)
Port on which RabbitMQ server listens for new connections.
Defaults to C<5672>, which is standard RabbitMQ server listen port.
=head2 vhost
my $vhost = $client->vhost;
$client = $client->vhost('/')
RabbitMQ virtual server to user. Default is C</>.
=head2 params
my $params = $client->params;
$client = $client->params(Mojo::Parameters->new('verify=1'))
Sets additional parameters for connection. Default is not defined.
For list of supported parameters see L</"SUPPORTED QUERY PARAMETERS">.
=head2 url
my $url = $client->url;
$client = $client->url('amqp://...');
Sets all connection parameters in one string, according to specification from
L<https://www.rabbitmq.com/uri-spec.html>.
amqp_URI = "amqp[s]://" amqp_authority [ "/" vhost ] [ "?" query ]
amqp_authority = [ amqp_userinfo "@" ] host [ ":" port ]
amqp_userinfo = username [ ":" password ]
username = *( unreserved / pct-encoded / sub-delims )
password = *( unreserved / pct-encoded / sub-delims )
vhost = segment
=head2 heartbeat_timeout
my $timeout = $client->heartbeat_timeout;
$client = $client->heartbeat_timeout(180);
Heartbeats are use to monitor peer reachability in AMQP.
Default value is C<60> seconds, if set to C<0> no heartbeats will be sent.
=head2 connect_timeout
my $timeout = $client->connect_timeout;
$client = $client->connect_timeout(5);
Connection timeout used by L<Mojo::IOLoop::Client>.
Defaults to environment variable C<MOJO_CONNECT_TIMEOUT> or C<10> seconds
if nothing else is set.
=head2 max_channels
my $max_channels = $client->max_channels;
$client = $client->max_channels(10);
Maximum number of channels allowed to be active. Defaults to C<0> which
means no implicit limit.
When you try to call C<add_channel> over limit an C<error> will be
emitted on channel saying that: I<Maximum number of channels reached>.
=head1 STATIC METHODS
=head2 consumer
my $client = Mojo::RabbitMQ::Client->consumer(...)
Shortcut for creating L<Mojo::RabbitMQ::Client::Consumer>.
=head2 publisher
my $client = Mojo::RabbitMQ::Client->publisher(...)
Shortcut for creating L<Mojo::RabbitMQ::Client::Publisher>.
=head1 METHODS
L<Mojo::RabbitMQ::Client> inherits all methods from L<Mojo::EventEmitter> and implements
the following new ones.
=head2 connect
$client->connect();
Tries to connect to RabbitMQ server and negotiate AMQP protocol.
=head2 close
$client->close();
=head2 param
my $param = $client->param('name');
$client = $client->param(name => 'value');
=head2 add_channel
my $channel = Mojo::RabbitMQ::Client::Channel->new();
...
$channel = $client->add_channel($channel);
$channel->open;
=head2 open_channel
my $channel = Mojo::RabbitMQ::Client::Channel->new();
...
$client->open_channel($channel);
=head2 delete_channel
my $removed = $client->delete_channel($channel->id);
=head1 SUPPORTED QUERY PARAMETERS
There's no formal specification, nevertheless a list of common parameters
recognized by officially supported RabbitMQ clients is maintained here:
L<https://www.rabbitmq.com/uri-query-parameters.html>.
Some shortcuts are also supported, you'll find them in parenthesis.
Aliases are less significant, so when both are specified only primary
value will be used.
=head2 cacertfile (I<ca>)
Path to Certificate Authority file for TLS.
=head2 certfile (I<cert>)
Path to the client certificate file for TLS.
=head2 keyfile (I<key>)
Path to the client certificate private key file for TLS.
=head2 fail_if_no_peer_cert (I<verify>)
TLS verification mode, defaults to 0x01 on the client-side if a certificate
authority file has been provided, or 0x00 otherwise.
=head2 auth_mechanism
Sets the AMQP authentication mechanism. Defaults to AMQPLAIN. AMQPLAIN and
EXTERNAL are supported; EXTERNAL will only work if L<Mojo::RabbitMQ::Client> does not need
to do anything beyond passing along a username and password if specified.
=head2 heartbeat
Sets requested heartbeat timeout, just like C<heartbeat_timeout> attribute.
=head2 connection_timeout (I<timeout>)
Sets connection timeout - see L<connection_timeout> attribute.
=head2 channel_max
Sets maximum number of channels - see L<max_channels> attribute.
=head1 SEE ALSO
L<Mojo::RabbitMQ::Client::Channel>, L<Mojo::RabbitMQ::Client::Consumer>, L<Mojo::RabbitMQ::Client::Publisher>
=head1 COPYRIGHT AND LICENSE
Copyright (C) 2015-2019, Sebastian Podjasek and others
Based on L<AnyEvent::RabbitMQ> - Copyright (C) 2010 Masahito Ikuta, maintained by C<< bobtfish@bobtfish.net >>
This program is free software, you can redistribute it and/or modify it under the terms of the Artistic License version 2.0.
Contains AMQP specification (F<shared/amqp0-9-1.stripped.extended.xml>) licensed under BSD-style license.
=cut