Group
Extension

Queue-Gearman/lib/Queue/Gearman.pm

package Queue::Gearman;
use 5.008001;
use strict;
use warnings;

our $VERSION = "0.01";

use Queue::Gearman::Pool;
use Queue::Gearman::Message qw/:headers :msgtypes/;
use Queue::Gearman::Task;
use Queue::Gearman::Taskset;
use Queue::Gearman::Job;
use Queue::Gearman::Util qw/dumper/;
use Scalar::Util qw/weaken/;
use List::Util qw/shuffle/;
use Digest::MD5 qw/md5_hex/;
use Time::HiRes;

use constant DEFAULT_TIMEOUT            => 1;
use constant DEFAULT_DEQUEUE_TIMEOUT    => 1;
use constant DEFAULT_WAIT_TIMEOUT       => 1;
use constant DEFAULT_INACTIVITY_TIMEOUT => 10;

use Class::Accessor::Lite ro => [qw/
    servers
    prefix
    timeout
    dequeue_timeout
    wait_timeout
    inactivity_timeout
    serialize_method
    deserialize_method
/];

my %ENQUEUE_HEADER = (
    background => +{
        priority => +{
            high   => HEADER_REQ_SUBMIT_JOB_HIGH_BG,
            normal => HEADER_REQ_SUBMIT_JOB_BG,
            low    => HEADER_REQ_SUBMIT_JOB_LOW_BG,
        },
    },
    foreground => +{
        priority => +{
            high   => HEADER_REQ_SUBMIT_JOB_HIGH,
            normal => HEADER_REQ_SUBMIT_JOB,
            low    => HEADER_REQ_SUBMIT_JOB_LOW,
        },
    },
);

sub _noop     {}
sub _identity { $_[0] }

sub new {
    my $class = shift;
    my $self  = bless +{
        timeout            => DEFAULT_TIMEOUT,
        dequeue_timeout    => DEFAULT_DEQUEUE_TIMEOUT,
        wait_timeout       => DEFAULT_WAIT_TIMEOUT,
        inactivity_timeout => DEFAULT_INACTIVITY_TIMEOUT,
        serialize_method   => \&_identity,
        deserialize_method => \&_identity,
        ability_map        => {},
        @_
    } => $class;
    return $self;
}

sub taskset {
    my $self = shift;
    return $self->{taskset} ||= Queue::Gearman::Taskset->new(
        wait_timeout       => $self->wait_timeout,
        serialize_method   => $self->serialize_method,
        deserialize_method => $self->deserialize_method,
    );
}

sub client_id {
    my $self = shift;
    return $self->{client_id} ||= md5_hex(rand() . $$ . {} . time);
}

sub _pool {
    my ($self, $role) = @_;
    return $self->{pool}->{$role} if exists $self->{pool}->{$role};
    $self->{pool}->{$role} = Queue::Gearman::Pool->new(
        servers            => $self->servers,
        timeout            => $self->timeout,
        inactivity_timeout => $self->inactivity_timeout,
        on_connect_do      => $self->_on_connect_do($role),
    );
    return $self->{pool}->{$role};
}

sub _on_connect_do {
    my ($self, $role) = @_;
    if ($role eq 'worker') {
        weaken($self);
        return sub {
            my $socket = shift;
            $socket->send(HEADER_REQ_PRE_SLEEP);
            $socket->send(HEADER_REQ_SET_CLIENT_ID, $self->client_id);

            for my $args (values %{ $self->{ability_map} }) {
                my @msg = $self->_make_can_do_msg(@$args);
                $socket->send(@msg);
            }
        };
    }
    return \&_noop;
}

sub _encode_func {
    my ($self, $func) = @_;
    my $prefix = $self->prefix or return $func;
    return "$prefix\t$func";
}

sub _decode_func {
    my ($self, $func) = @_;
    my $prefix = $self->prefix or return $func;
    $func =~ s/^\Q$prefix\t//;
    return $func;
}

sub _serialize {
    my ($self, $arg) = @_;
    return scalar $self->serialize_method->($arg);
}

sub _deserialize {
    my ($self, $arg) = @_;
    return scalar $self->deserialize_method->($arg);
}

sub can_do {
    my $self = shift;
    my ($func) = @_;

    my @msg = $self->_make_can_do_msg(@_);
    $_->send(@msg) for $self->_pool('worker')->all;

    $self->{ability_map}->{$func} = [@_];
}

sub cant_do {
    my ($self, $func) = @_;

    delete $self->{ability_map}->{$func};

    my @msg = $self->_make_cant_do_msg($func);
    $_->send(@msg) for $self->_pool('worker')->all;
}

sub _make_can_do_msg {
    my $self = shift;
    my $func = shift;
    return @_ == 1 ? (HEADER_REQ_CAN_DO_TIMEOUT,  $self->_encode_func($func), @_)
                   : (HEADER_REQ_CAN_DO,          $self->_encode_func($func));
}

sub _make_cant_do_msg {
    my ($self, $func) = @_;
    return (HEADER_REQ_CANT_DO, $self->_encode_func($func));
}

sub reset_abilities {
    my $self = shift;
    for my $socket ($self->_pool('worker')->all) {
        $socket->send(HEADER_REQ_RESET_ABILITIES);
    }
    %{$self->{ability_map}} = ();
}

sub enqueue { shift->enqueue_background(@_) }

sub enqueue_background {
    my $self = shift;
    my $opt  = exists $_[3] ? $_[3] : +{};

    my $unique   = $opt->{unique}   || '';
    my $priority = $opt->{priority} || 'normal';
    my $header   = exists $ENQUEUE_HEADER{background}{$priority} ? $ENQUEUE_HEADER{background}{$priority}
                                                                 : HEADER_REQ_SUBMIT_JOB_BG;
    return $self->_enqueue($header, 1, $unique, @_);
}

sub enqueue_forground {
    my $self = shift;
    my $opt  = exists $_[3] ? $_[3] : +{};

    my $unique   = $opt->{unique}   || '';
    my $priority = $opt->{priority} || 'normal';
    my $header   = exists $ENQUEUE_HEADER{foreground}{$priority} ? $ENQUEUE_HEADER{foreground}{$priority}
                                                                 : HEADER_REQ_SUBMIT_JOB;
    return $self->_enqueue($header, 0, $unique, @_);
}

sub _enqueue {
    my ($self, $header, $is_background, $unique, $func, $arg) = @_;

    my $socket = $self->_pool('client')->pick();
    my $res = $socket->send($header, $self->_encode_func($func), $unique, $self->_serialize($arg))
           && $socket->recv();
    return unless defined $res;

    if ($res->{msgtype} eq MSGTYPE_RES_JOB_CREATED) {
        my ($handle) = @{ $res->{args} };
        return Queue::Gearman::Task->new(
            func          => $func,
            handle        => $handle,
            arg           => $arg,
            taskset       => $self->taskset,
            socket        => $socket,
            is_background => $is_background,
        );
    }

    die "Unexpected res: ", dumper($res);
}

sub dequeue {
    my ($self, $timeout) = @_;
    $timeout ||= $self->dequeue_timeout();

    my $timeout_at = Time::HiRes::time + $timeout;

    while (Time::HiRes::time < $timeout_at) {
        for my $socket (shuffle $self->_pool('worker')->all) {
            $socket->send(HEADER_REQ_GRAB_JOB);

        TRY_RECV:
            my $res = $socket->recv();
            next unless defined $res;

            if ($res->{msgtype} eq MSGTYPE_RES_JOB_ASSIGN) {
                my ($handle, $func, $arg) = @{ $res->{args} };
                return Queue::Gearman::Job->new(
                    func               => $self->_decode_func($func),
                    handle             => $handle,
                    arg                => $self->_deserialize($arg),
                    socket             => $socket,
                    serialize_method   => $self->serialize_method,
                    deserialize_method => $self->deserialize_method,
                );
            }
            elsif ($res->{msgtype} eq MSGTYPE_RES_NO_JOB) {
                $socket->send(HEADER_REQ_PRE_SLEEP);
                next;
            }
            elsif ($res->{msgtype} eq MSGTYPE_RES_NOOP) {
                goto TRY_RECV; ## retry to recv
            }

            die "Unexpected res: ", dumper($res);
        }
    }

    return;
}

1;
__END__

=encoding utf-8

=head1 NAME

Queue::Gearman - Queue like low-level interface for Gearman.

=head1 SYNOPSIS

    use Queue::Gearman;
    use JSON;

    sub add {
        my $args = shift;
        return $args->{left} + $args->{rigth};
    }

    my $queue = Queue::Gearman->new(
        servers            => ['127.0.0.1:6667'],
        serialize_method   => \&JSON::encode_json,
        deserialize_method => \&JSON::decode_json,
    );
    $queue->can_do('add');

    my $task = $queue->enqueue_forground(add => { left => 1, rigth => 2 })
        or die 'failure';
    $queue->enqueue_background(add => { left => 2, rigth => 1 })
        or die 'failure';

    my $job = $queue->dequeue();
    if ($job && $job->func eq 'add') {
        my $res = eval { add($job->arg) };
        if (my $e = $@) {
            $job->fail($e);
        }
        else {
            $job->complete($res);
        }
    }

    $task->wait();
    print $task->result, "\n"; ## => 3

=head1 DESCRIPTION

Queue::Gearman is ...

=head1 LICENSE

Copyright (C) karupanerura.

This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.

=head1 AUTHOR

karupanerura E<lt>karupa@cpan.orgE<gt>

=cut



Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.