DR-TarantoolQueue/lib/DR/TarantoolQueue.pm
package DR::TarantoolQueue;
use utf8;
use strict;
use warnings;
use Mouse;
use Carp;
use JSON::XS;
require DR::TarantoolQueue::Task;
$Carp::Internal{ (__PACKAGE__) }++;
our $VERSION = '0.44';
use feature 'state';
=head1 NAME
DR::TarantoolQueue - client for tarantool's queue
=head1 SYNOPSIS
my $queue = DR::TarantoolQueue->new(
host => 'tarantool.host',
port => 33014,
tube => 'request_queue',
space => 11,
connect_opts => { # see perldoc DR::Tarantool
reconnect_period => 1,
reconnect_always => 1
}
);
# put empty task into queue with name 'request_queue'
my $task = $queue->put;
my $task = $queue->put(data => [ 1, 2, 3 ]);
printf "task.id = %s\n", $task->id;
=head2 DESCRIPTION
The module contains sync and async (coro) driver for tarantool queue.
=head1 ATTRIBUTES
=head2 host (ro) & port (ro)
Tarantool's parameters.
=head2 connect_opts (ro)
Additional options for L<DR::Tarantool>. HashRef.
=head2 fake_in_test (ro, default=true)
Start fake tarantool (only for msgpack) if C<($0 =~ /\.t$/)>.
For the case the driver uses the following lua code:
log.info('Fake Queue starting')
box.cfg{ listen = os.getenv('PRIMARY_PORT') }
box.schema.user.create('test', { password = 'test' })
box.schema.user.grant('test', 'read,write,execute', 'universe')
_G.queue = require('megaqueue')
queue:init()
log.info('Fake Queue started')
=head2 msgpack (ro)
If true, the driver will use L<DR::Tnt> driver (C<1.6>). Also it will use
L<tarantool-megaqueue|https://github.com/dr-co/tarantool-megaqueue> lua
module with namespace C<queue>.
=head2 coro (ro)
If B<true> (default) the driver will use L<Coro> tarantool's driver,
otherwise the driver will use sync driver.
=head2 ttl (rw)
Default B<ttl> for tasks.
=head2 ttr (rw)
Default B<ttr> for tasks.
=head2 pri (rw)
Default B<pri> for tasks.
=head2 delay (rw)
Default B<delay> for tasks.
=head2 space (rw)
Default B<space> for tasks.
=head2 tube (rw)
Default B<tube> for tasks.
=head2 defaults
Defaults for queues. B<HashRef>. Key is tube name. Value is a hash with
the following fields:
=over
=item ttl
=item ttr
=item delay
=item pri
=back
Methods L</put> (L</urgent>) use these parameters if they
are absent (otherwise it uses the same global attributes).
=cut
with 'DR::TarantoolQueue::JSE';
has host => is => 'ro', isa => 'Maybe[Str]';
has port => is => 'ro', isa => 'Maybe[Str]';
has user => is => 'ro', isa => 'Maybe[Str]';
has password => is => 'ro', isa => 'Maybe[Str]';
has coro => is => 'ro', isa => 'Bool', default => 1;
has ttl => is => 'rw', isa => 'Maybe[Num]';
has ttr => is => 'rw', isa => 'Maybe[Num]';
has pri => is => 'rw', isa => 'Maybe[Num]';
has delay => is => 'rw', isa => 'Maybe[Num]';
has space => is => 'rw', isa => 'Maybe[Str]';
has tube => is => 'rw', isa => 'Maybe[Str]';
has connect_opts => is => 'ro', isa => 'HashRef', default => sub {{}};
has defaults => is => 'ro', isa => 'HashRef', default => sub {{}};
has msgpack => is => 'ro', isa => 'Bool', default => 0;
# если $0 =~ /\.t$/ то будет запускать фейковый тарантул
has fake_in_test => is => 'ro', isa => 'Bool', default => 1;
sub _check_opts($@) {
my $h = shift;
my %can = map { ($_ => 1) } @_;
for (keys %$h) {
next if $can{$_};
croak 'unknown option: ' . $_;
}
}
sub _producer_messagepack {
my ($self, $method, $o) = @_;
state $alias = {
urgent => 'put',
};
$method = $alias->{$method} if exists $alias->{$method};
_check_opts $o, qw(space tube delay ttl ttr pri data domain);
my $tube = $o->{tube};
$tube = $self->tube unless defined $tube;
croak 'tube was not defined' unless defined $tube;
for ('ttl', 'delay', 'ttr', 'pri') {
my $n = $_;
my $res;
if (exists $o->{$n}) {
$res = $o->{$n};
} else {
if (exists $self->defaults->{ $tube }) {
if (exists $self->defaults->{ $tube }{ $n }) {
$res = $self->defaults->{ $tube }{ $n };
} else {
$res = $self->$n;
}
} else {
$res = $self->$n;
}
}
$res ||= 0;
if ($res == 0) {
delete $o->{ $n };
} else {
$o->{ $n } = $res;
}
}
my $task = $self->tnt->call_lua(
["queue:$method" => 'MegaQueue'],
$tube,
$o,
$self->jse->encode($o->{data})
);
DR::TarantoolQueue::Task->tuple_messagepack($task->[0], $self);
}
sub _producer {
my ($self, $method, $o) = @_;
goto \&_producer_messagepack if $self->msgpack;
_check_opts $o, qw(space tube delay ttl ttr pri data domain);
my $space = $o->{space};
$space = $self->space unless defined $space;
croak 'space was not defined' unless defined $space;
my $tube = $o->{tube};
$tube = $self->tube unless defined $tube;
croak 'tube was not defined' unless defined $tube;
my ($ttl, $ttr, $pri, $delay);
for ([\$ttl, 'ttl'], [\$delay, 'delay'], [\$ttr, 'ttr'], [\$pri, 'pri']) {
my $rv = $_->[0];
my $n = $_->[1];
if (exists $o->{$n}) {
$$rv = $o->{$n};
} else {
if (exists $self->defaults->{ $tube }) {
if (exists $self->defaults->{ $tube }{ $n }) {
$$rv = $self->defaults->{ $tube }{ $n };
} else {
$$rv = $self->$n;
}
} else {
$$rv = $self->$n;
}
}
$$rv ||= 0;
}
my $tuple = $self->tnt->call_lua(
"queue.$method" => [
$space,
$tube,
$delay,
$ttl,
$ttr,
$pri,
$self->jse->encode($o->{data})
]
);
return DR::TarantoolQueue::Task->tuple($tuple, $space, $self);
}
=head1 METHODS
=head2 new
my $q = DR::TarantoolQueue->new(host => 'abc.com', port => 123);
Creates new queue(s) accessor.
=cut
=head2 dig
$q->dig(task => $task);
$task->dig; # the same
$q->dig(id => $task->id);
$q->dig(id => $task->id, space => $task->space);
'Dig up' a buried task. Checks, that the task is buried.
The task status is changed to ready.
=head2 unbury
Is a synonym of L</dig>.
=head2 delete
$q->delete(task => $task);
$task->delete; # the same
$q->delete(id => $task->id);
$q->delete(id => $task->id, space => $task->space);
Delete a task from the queue (regardless of task state or status).
=head2 peek
$q->peek(task => $task);
$task->peek; # the same
$q->peek(id => $task->id);
$q->peek(id => $task->id, space => $task->space);
Return a task by task id.
=head2 statistics
my $s = $q->statistics;
my $s = $q->statistics(space => 123);
my $s = $q->statistics(space => 123, tube => 'abc');
my $s = DR::TarantoolQueue->statistics(space => 123);
my $s = DR::TarantoolQueue->statistics(space => 123, tube => 'abc');
Return queue module statistics, since server start.
The statistics is broken down by queue id.
Only queues on which there was some activity are
included in the output.
=cut
sub _statistics_msgpack {
my ($self, %o) = @_;
_check_opts \%o, qw(tube);
my $list = $self->tnt->call_lua(
["queue:stats" => 'MegaQueueStats'], $o{tube}
);
my %res = map { ($_->{tube}, $_->{counters}) } @$list;
return \%res;
}
sub statistics {
my ($self, %o) = @_;
goto \&_statistics_msgpack if $self->msgpack;
_check_opts \%o, qw(space tube);
unless (exists $o{space}) {
$o{space} = $self->space if ref $self;
}
unless (exists $o{tube}) {
$o{tube} = $self->tube if ref $self;
}
croak 'space was not defined'
if defined $o{tube} and !defined $o{space};
my $raw = $self->tnt->call_lua(
"queue.statistics" => [
defined($o{space}) ? $o{space} : (),
defined($o{tube}) ? $o{tube} : ()
]
)->raw;
return { @$raw };
}
=head2 get_meta
Task was processed (and will be deleted after the call).
my $m = $q->get_meta(task => $task);
my $m = $q->get_meta(id => $task->id);
Returns a hashref with fields:
=over
=item id
task id
=item tube
queue id
=item status
task status
=item event
time of the next important event in task life time, for example,
when ttl or ttr expires, in microseconds since start of the UNIX epoch.
=item ipri
internal value of the task priority
=item pri
task priority as set when the task was added to the queue
=item cid
consumer id, of the consumer which took the task (only if the task is taken)
=item created
time when the task was created (microseconds since start of the UNIX epoch)
=item ttl
task time to live (microseconds)
=item ttr
task time to run (microseconds)
=item cbury
how many times the task was buried
=item ctaken
how many times the task was taken
=item now
time recorded when the meta was called
=back
=cut
sub get_meta {
my ($self, %o) = @_;
_check_opts \%o, qw(task id space);
croak 'task was not defined' unless $o{task} or $o{id};
my ($id, $space, $tube);
if ($o{task}) {
($id, $space, $tube) = ($o{task}->id,
$o{task}->space, $o{task}->tube);
} else {
($id, $space, $tube) = @o{'id', 'space', 'tube'};
$space = $self->space unless defined $o{space};
croak 'space is not defined' unless defined $space;
$tube = $self->tube unless defined $tube;
}
my $fields = [
{ name => 'id', type => 'STR' },
{ name => 'tube', type => 'STR' },
{ name => 'status', type => 'STR' },
{ name => 'event', type => 'NUM64' },
{ name => 'ipri', type => 'STR', },
{ name => 'pri', type => 'STR', },
{ name => 'cid', type => 'NUM', },
{ name => 'created', type => 'NUM64', },
{ name => 'ttl', type => 'NUM64' },
{ name => 'ttr', type => 'NUM64' },
{ name => 'cbury', type => 'NUM' },
{ name => 'ctaken', type => 'NUM' },
{ name => 'now', type => 'NUM64' },
];
my $tuple = $self->tnt->call_lua(
"queue.meta" => [ $space, $id ], fields => $fields
)->raw;
return { map { ( $fields->[$_]{name}, $tuple->[$_] ) } 0 .. $#$fields };
}
=head1 Producer methods
=head2 put
$q->put;
$q->put(data => { 1 => 2 });
$q->put(space => 1, tube => 'abc',
delay => 10, ttl => 3600,
ttr => 60, pri => 10, data => [ 3, 4, 5 ]);
$q->put(data => 'string');
Enqueue a task. Returns new L<task|DR::TarantoolQueue::Task> object.
The list of fields with task data (C<< data => ... >>) is optional.
If 'B<space>' and (or) 'B<tube>' aren't defined the method
will try to use them from L<queue|DR::TarantoolQueue/new> object.
=cut
sub put {
my ($self, %opts) = @_;
return $self->_producer(put => \%opts);
}
=head2 put_unique
$q->put_unique(data => { 1 => 2 });
$q->put_unique(space => 1, tube => 'abc',
delay => 10, ttl => 3600,
ttr => 60, pri => 10, data => [ 3, 4, 5 ]);
$q->put_unique(data => 'string');
Enqueue an unique task. Returns new L<task|DR::TarantoolQueue::Task> object,
if it was not enqueued previously. Otherwise it will return existing task.
The list of fields with task data (C<< data => ... >>) is optional.
If 'B<space>' and (or) 'B<tube>' aren't defined the method
will try to use them from L<queue|DR::TarantoolQueue/new> object.
=cut
sub put_unique {
my ($self, %opts) = @_;
return $self->_producer(put_unique => \%opts);
}
=head2 urgent
Enqueue a task. The task will get the highest priority.
If delay is not zero, the function is equivalent to
L<put|DR::TarantoolQueue/put>.
=cut
sub urgent {
my ($self, %opts) = @_;
return $self->_producer(urgent => \%opts);
}
=head1 Consumer methods
=head2 take
my $task = $q->take;
my $task = $q->take(timeout => 0.5);
my $task = $q->take(space => 1, tube => 'requests, timeout => 20);
If there are tasks in the queue ready for execution,
take the highest-priority task. Otherwise, wait for
a ready task to appear in the queue, and, as soon as
it appears, mark it as taken and return to the consumer.
If there is a timeout, and the task doesn't appear until
the timeout expires, returns B<undef>. If timeout is not
given, waits indefinitely.
All the time while the consumer is working on a task,
it must keep the connection to the server open. If a
connection disappears while the consumer is still
working on a task, the task is put back on the ready list.
=cut
sub _take_messagepack {
my ($self, %o) = @_;
_check_opts \%o, qw(tube timeout);
$o{tube} = $self->tube unless defined $o{tube};
croak 'tube was not defined' unless defined $o{tube};
$o{timeout} ||= 0;
my $tuples = $self->tnt->call_lua(
['queue:take' => 'MegaQueue'] => $o{tube}, $o{timeout}
);
if (@$tuples and $tuples->[0]{tube} ne $o{tube}) {
warn sprintf "take(%s, timeout => %s) returned task.tube == %s\n",
$o{tube},
$o{timeout} // 'undef',
$tuples->[0]{tube} // 'undef';
}
return DR::TarantoolQueue::Task->tuple_messagepack($tuples->[0], $self);
}
sub take {
my ($self, %o) = @_;
goto \&_take_messagepack if $self->msgpack;
_check_opts \%o, qw(space tube timeout);
$o{space} = $self->space unless defined $o{space};
croak 'space was not defined' unless defined $o{space};
$o{tube} = $self->tube unless defined $o{tube};
croak 'tube was not defined' unless defined $o{tube};
$o{timeout} ||= 0;
my $tuple = $self->tnt->call_lua(
'queue.take' => [
$o{space},
$o{tube},
$o{timeout}
]
);
return DR::TarantoolQueue::Task->tuple($tuple, $o{space}, $self);
}
=head2 ack
$q->ack(task => $task);
$task->ack; # the same
$q->ack(id => $task->id);
$q->ack(space => $task->space, id => $task->id);
Confirm completion of a task. Before marking a task as
complete, this function verifies that:
=over
=item *
the task is taken
=item *
the consumer that is confirming the task is the one which took it
=back
Consumer identity is established using a session identifier.
In other words, the task must be confirmed by the same connection
which took it. If verification fails, the function returns an error.
On success, deletes the task from the queue. Throws an exception otherwise.
=head2 requeue
$q->requeue(task => $task);
$task->requeue; # the same
$q->requeue(id => $task->id);
$q->requeue(id => $task->id, space => $task->space);
Return a task to the queue, the task is not executed.
Puts the task at the end of the queue, so that it's executed
only after all existing tasks in the queue are executed.
=head2 bury
$q->bury(task => $task);
$task->bury; # the same
$q->bury(id => $task->id);
$q->bury(id => $task->id, space => $task->space);
Mark a task as B<buried>. This special status excludes the task
from the active list, until it's dug up. This function is useful
when several attempts to execute a task lead to a failure. Buried
tasks can be monitored by the queue owner, and treated specially.
=cut
sub _task_method_messagepack {
my ($self, $m, %o) = @_;
_check_opts \%o, qw(task id);
croak 'task was not defined' unless $o{task} or $o{id};
my $id;
if ($o{task}) {
$id = $o{task}->id;
} else {
$id = $o{id};
}
state $alias = { requeue => 'release' };
$m = $alias->{$m} if exists $alias->{$m};
my $tuples = $self->tnt->call_lua( [ "queue:$m" => 'MegaQueue' ] => $id );
my $task = DR::TarantoolQueue::Task->tuple_messagepack($tuples->[0], $self);
if ($m eq 'delete') {
$task->_set_status('removed');
} elsif ($m eq 'ack') {
$task->_set_status('ack(removed)');
}
$task;
}
sub _task_method {
my ($self, $m, %o) = @_;
goto \&_task_method_messagepack if $self->msgpack;
_check_opts \%o, qw(task id space);
croak 'task was not defined' unless $o{task} or $o{id};
my ($id, $space);
if ($o{task}) {
($id, $space) = ($o{task}->id, $o{task}->space);
} else {
($id, $space) = @o{'id', 'space'};
$space = $self->space unless defined $o{space};
croak 'space is not defined' unless defined $space;
}
my $tuple = $self->tnt->call_lua( "queue.$m" => [ $space, $id ] );
my $task = DR::TarantoolQueue::Task->tuple($tuple, $space, $self);
if ($m eq 'delete') {
$task->_set_status('removed');
} elsif ($m eq 'ack') {
$task->_set_status('ack(removed)');
}
$task;
}
for my $m (qw(ack requeue bury dig unbury delete peek)) {
no strict 'refs';
next if *{ __PACKAGE__ . "::$m" }{CODE};
*{ __PACKAGE__ . "::$m" } = sub {
splice @_, 1, 0, $m;
goto \&_task_method;
}
}
=head2 release
$q->release(task => $task);
$task->release; # the same
$q->release(id => $task->id, space => $task->space);
$q->release(task => $task, delay => 10); # delay the task
$q->release(task => $task, ttl => 3600); # append task's ttl
Return a task back to the queue: the task is not executed.
Additionally, a new time to live and re-execution delay can be provided.
=cut
sub _release_messagepack {
my ($self, %o) = @_;
_check_opts \%o, qw(task id delay);
$o{delay} ||= 0;
my $id;
if ($o{task}) {
$id = $o{task}->id;
} else {
$id = $o{id};
}
my $tuples = $self->tnt->call_lua(
['queue:release' => 'MegaQueue'], $id, $o{delay});
return DR::TarantoolQueue::Task->tuple_messagepack($tuples->[0], $self);
}
sub release {
my ($self, %o) = @_;
goto \&_release_messagepack if $self->msgpack;
_check_opts \%o, qw(task id space ttl delay);
$o{delay} ||= 0;
my ($id, $space);
if ($o{task}) {
($id, $space) = ($o{task}->id, $o{task}->space);
} else {
($id, $space) = @o{'id', 'space'};
$space = $self->space unless defined $o{space};
croak 'space is not defined' unless defined $space;
}
my $tuple = $self->tnt->call_lua('queue.release' =>
[ $space, $id, $o{delay}, $o{ttl} || () ]
);
return DR::TarantoolQueue::Task->tuple($tuple, $space, $self);
}
=head2 done
$q->done(task => $task, data => { result => '123' });
$task->done(data => { result => '123' }); # the same
$q->done(id => $task->id, space => $task->space);
Mark a task as complete (done), but don't delete it. Replaces task
data with the supplied B<data>.
=cut
sub done {
my ($self, %o) = @_;
_check_opts \%o, qw(task id space data);
my ($id, $space);
if ($o{task}) {
($id, $space) = ($o{task}->id, $o{task}->space);
} else {
($id, $space) = @o{'id', 'space'};
$space = $self->space unless defined $o{space};
croak 'space is not defined' unless defined $space;
}
my $tuple = $self->tnt->call_lua('queue.done' =>
[ $space, $id, $self->jse->encode($o{data}) ]
);
return DR::TarantoolQueue::Task->tuple($tuple, $space, $self);
}
=head1 COPYRIGHT AND LICENCE
Copyright (C) 2012 by Dmitry E. Oboukhov <unera@debian.org>
Copyright (C) 2012 by Roman V. Nikolaev <rshadow@rambler.ru>
This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself, either Perl version 5.8.8 or,
at your option, any later version of Perl 5 you may have available.
=cut
with 'DR::TarantoolQueue::Tnt';
__PACKAGE__->meta->make_immutable();