Group
Extension

Gearman-Driver/lib/Gearman/Driver/Worker.pm

package Gearman::Driver::Worker;

use base qw(MooseX::MethodAttributes::Inheritable Gearman::Driver::Worker::Base);
use Moose;

=head1 NAME

Gearman::Driver::Worker - Base class for workers

=head1 SYNOPSIS

    package My::Worker;

    use base qw(Gearman::Driver::Worker);
    use Moose;

    sub begin {
        my ( $self, $job, $workload ) = @_;
        # called before each job
    }

    sub prefix {
        # default: return ref(shift) . '::';
        return join '_', split /::/, __PACKAGE__;
    }

    sub do_something : Job : MinProcesses(2) : MaxProcesses(15) {
        my ( $self, $job, $workload ) = @_;
        # $job => Gearman::XS::Job instance
    }

    sub end {
        my ( $self, $job, $workload ) = @_;
        # called after each job
    }

    sub spread_work : Job {
        my ( $self, $job, $workload ) = @_;

        my $gc = Gearman::XS::Client->new;
        $gc->add_servers( $self->server );

        $gc->do_background( 'some_job_1' => $job->workload );
        $gc->do_background( 'some_job_2' => $job->workload );
        $gc->do_background( 'some_job_3' => $job->workload );
        $gc->do_background( 'some_job_4' => $job->workload );
        $gc->do_background( 'some_job_5' => $job->workload );
    }

    1;

=head1 ATTRIBUTES

=head2 server

L<Gearman::Driver> connects to the L<server|Gearman::Driver/server>
passed to its constructor. This value is also stored in this class.
This can be useful if a job uses L<Gearman::XS::Client> to add
another job. See 'spread_work' method in L</SYNOPSIS> above.

=head1 METHODATTRIBUTES

=head2 Job

This will register the method with gearmand.

=head2 MinProcesses

Minimum number of processes working parallel on this job/method.

=head2 MaxProcesses

Maximum number of processes working parallel on this job/method.

=head2 Encode

This will automatically look for a method C<encode> in this object
which has to be defined in the subclass. It will call the C<encode>
method passing the return value from the job method. The return
value of the C<encode> method will be returned to the Gearman
client. This is useful to serialize Perl datastructures to JSON
before sending them back to the client.

    sub do_some_job : Job : Encode : Decode {
        my ( $self, $job, $workload ) = @_;
        return { message => 'OK', status => 1 };

        # calls 'encode' and returns JSON string: {"status":1,"message":"OK"}
    }

    sub custom_encoder : Job : Encode(enc_yaml) : Decode(dec_yaml) {
        my ( $self, $job, $workload ) = @_;
        return { message => 'OK', status => 1 };

        # calls 'enc_yaml' and returns YAML string:
        # ---
        # message: OK
        # status: 1
    }

    sub encode {
        my ( $self, $result ) = @_;
        return JSON::XS::encode_json($result);
    }

    sub decode {
        my ( $self, $workload ) = @_;
        return JSON::XS::decode_json($workload);
    }

    sub enc_yaml {
        my ( $self, $result ) = @_;
        return YAML::XS::Dump($result);
    }

    sub dec_yaml {
        my ( $self, $workload ) = @_;
        return YAML::XS::Load($workload);
    }


=head2 Decode

This will automatically look for a method C<decode> in this object
which has to be defined in the subclass. It will call the C<decode>
method passing the workload value (C<< $job->workload >>). The return
value of the C<decode> method will be passed as 3rd argument to the
job method. This is useful to deserialize JSON workload to Perl
datastructures for example. If this attribute is not set,
C<< $job->workload >> and C<$workload> is the same.

Example, workload is this string: C<{"status":1,"message":"OK"}>

    sub decode {
        my ( $self, $workload ) = @_;
        return JSON::XS::decode_json($workload);
    }

    sub job1 : Job {
        my ( $self, $job, $workload ) = @_;
        # $workload eq $job->workload eq '{"status":1,"message":"OK"}'
    }

    sub job2 : Job : Decode {
        my ( $self, $job, $workload ) = @_;
        # $workload ne $job->workload
        # $job->workload eq '{"status":1,"message":"OK"}'
        # $workload = { status => 1, message => 'OK' }
    }

=head2 ProcessGroup

Forking each job method in an own process may not always be the way
to go. It's possible to run many job methods in a single process by
defining C<ProcessGroup> attribute. This process group alias will
also show up in L<Gearman::Driver::Console> instead of the single
method names. The workers process name will also be affected.

    sub process_name {
        my ( $self, $orig, $job_name ) = @_;
        return "$orig ($job_name)";
    }

    sub scale_image : Job : ProcessGroup(image_worker) {
        my ( $self, $job, $workload ) = @_;
    }

    sub convert_image : Job : ProcessGroup(image_worker) {
        my ( $self, $job, $workload ) = @_;
    }

    # $ ~/Gearman-Driver$ ps ux|grep image_worker
    # plu   2608   0.0  0.1  2466720   4200 s001  S    12:46PM   0:00.01 script/gearman_driver.pl (XxX::image_worker)

    # $ ~/Gearman-Driver$ telnet localhost 47300
    # Trying ::1...
    # telnet: connect to address ::1: Connection refused
    # Trying fe80::1...
    # telnet: connect to address fe80::1: Connection refused
    # Trying 127.0.0.1...
    # Connected to localhost.
    # Escape character is '^]'.
    # status
    # XxX::image_worker  1  1  1  1970-01-01T00:00:00  1970-01-01T00:00:00

It's possible to combine C<ProcessGroup> and C<MinProcesses> +
C<MaxProcesses>. But there's one small caveat: Because one single
process shares many methods, you can only set the min/max process
amount once per C<ProcessGroup>:

    sub scale_image : Job : ProcessGroup(image_worker) : MinProcesses(5) : MaxProcesses(10) {
        my ( $self, $job, $workload ) = @_;
    }

    sub convert_image : Job : ProcessGroup(image_worker) {
        my ( $self, $job, $workload ) = @_;
    }

If you do not obey this restriction, L<Gearman::Driver> will barf:

    sub scale_image : Job : ProcessGroup(image_worker) : MinProcesses(5) : MaxProcesses(10) {
        my ( $self, $job, $workload ) = @_;
    }

    sub convert_image : Job : ProcessGroup(image_worker) : MinProcesses(6) : MaxProcesses(12) {
        my ( $self, $job, $workload ) = @_;
    }

C<MinProcesses redefined in ProcessGroup(image_worker) at XxX::convert_image at lib/Gearman/Driver.pm line 850.>

=head1 METHODS

=head2 prefix

Having the same method name in two different classes would result
in a clash when registering it with gearmand. To avoid this,
all jobs are registered with the full package and method name
(e.g. C<My::Worker::some_job>). The default prefix is
C<ref(shift . '::')>, but this can be changed by overriding the
C<prefix> method in the subclass, see L</SYNOPSIS> above.

=head2 begin

This method is called before a job method is called. In this base
class this methods just does nothing, but can be overridden in a
subclass.

The parameters are the same as in the job method:

=over 4

=item * C<$self>

=item * C<$job>

=back

=head2 end

This method is called after a job method has been called. In this
base class this methods just does nothing, but can be overridden
in a subclass.

The parameters are the same as in the job method:

=over 4

=item * C<$self>

=item * C<$job>

=back

=head2 process_name

If this method is overridden in the subclass it will change the
process name after a job has been forked.

The following parameters are passed to this method:

=over 4

=item * C<$self>

=item * C<$orig> - the original process name ( C<$0> )

=item * C<$job_name> - the name of the job

=back

Example:

    sub process_name {
        my ( $self, $orig, $job_name ) = @_;
        return "$orig ($job_name)";
    }

This may look like:

    plu       2034  0.0  1.7  22392 17948 pts/2    S    21:17   0:00 gearman_driver.pl (GDExamples::Convert::convert_to_jpeg)
    plu       2035  0.0  1.7  22392 17944 pts/2    S    21:17   0:00 gearman_driver.pl (GDExamples::Convert::convert_to_gif)

=head2 override_attributes

If this method is overridden in the subclass it will change B<all>
attributes of your job methods. It must return a reference to a hash
containing valid L<attribute keys|/METHODATTRIBUTES>. E.g.:

    sub override_attributes {
        return {
            MinProcesses => 1,
            MaxProcesses => 1,
        }
    }

    sub job1 : Job : MinProcesses(10) : MaxProcesses(20) {
        my ( $self, $job, $workload ) = @_;
        # This will get MinProcesses(1) MaxProcesses(1) from override_attributes
    }

=head2 default_attributes

If this method is overridden in the subclass it can supply default
attributes which are added to all job methods. This is useful if
you want to Encode/Decode all your jobs:

    sub default_attributes {
        return {
            Encode => 'encode',
            Decode => 'decode',
        }
    }

    sub decode {
        my ( $self, $workload ) = @_;
        return JSON::XS::decode_json($workload);
    }

    sub encode {
        my ( $self, $result ) = @_;
        return JSON::XS::encode_json($result);
    }

    sub job1 : Job {
        my ( $self, $job, $workload ) = @_;
    }

=cut

no Moose;

__PACKAGE__->meta->make_immutable;

=head1 AUTHOR

See L<Gearman::Driver>.

=head1 COPYRIGHT AND LICENSE

See L<Gearman::Driver>.

=head1 SEE ALSO

=over 4

=item * L<Gearman::Driver>

=item * L<Gearman::Driver::Adaptor>

=item * L<Gearman::Driver::Console>

=item * L<Gearman::Driver::Console::Basic>

=item * L<Gearman::Driver::Console::Client>

=item * L<Gearman::Driver::Job>

=item * L<Gearman::Driver::Job::Method>

=item * L<Gearman::Driver::Loader>

=item * L<Gearman::Driver::Observer>

=item * L<Gearman::Driver::Worker::AttributeParser>

=item * L<Gearman::Driver::Worker::Base>

=back

=cut

1;


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.