Group
Extension

SQS-Worker/lib/SQS/Worker.pm

package SQS::Worker;
use Paws;
use Moose::Role;
use Data::Dumper;
use SQS::Consumers::Default;
use SQS::Consumers::DeleteAlways;
use SQS::Consumers::DeleteAndFork;

our $VERSION = '0.06';

requires 'process_message';

has queue_url => (is => 'ro', isa => 'Str', required => 1);
has region => (is => 'ro', isa => 'Str', required => 1);

has sqs => (is => 'ro', isa => 'Paws::SQS', lazy => 1, default => sub {
    my $self = shift;
    Paws->service('SQS', region => $self->region);
});

has log => (is => 'ro', required => 1);

has on_failure => (is => 'ro', isa => 'CodeRef', default => sub {
    return sub {
        my ($self, $message) = @_;
        $self->log->error("Error processing message " . $message->ReceiptHandle);
        $self->log->debug("Message Dump " . Dumper($message));
    }
});

has processor => (is => 'ro', lazy => 1, default => sub {
    my $self = shift;
    return SQS::Consumers::Default->new;
});

sub fetch_message {
    my $self = shift;
    $self->processor->fetch_message($self);
}

sub run {
    my $self = shift;
    while (1) {
        $self->fetch_message;
    }
}

sub delete_message {
    my ($self, $message) = @_;
    $self->sqs->DeleteMessage(
        QueueUrl      => $self->queue_url,
        ReceiptHandle => $message->ReceiptHandle,
    );
}

sub receive_message {
    my $self = shift;
    my $message_pack = $self->sqs->ReceiveMessage(
        WaitTimeSeconds => 20,
        QueueUrl => $self->queue_url,
        MaxNumberOfMessages => 1
    );
    return $message_pack;
}

1;

=head1 NAME

SQS::Worker - A light framework for processing messages from SQS queues

=head1 DESCRIPTION

SQS::Worker is a light framework that allows you to just code asyncronous tasks
that consume messages from an SQS Queue. The framework takes care of launching the 
necessary processes (workers), and executes your code on incoming messages, so you
can focus on writing the important part (behavior)

Also, since you're surely going to be deserializing the messages that come from the
queue, SQS::Worker provides you with ways to easily consume JSON messages, for example.

It comes in the form of a Moose role that is to be composed into the end user code 
that wants to receive and process messages from an SQS queue. 

The worker runs uninterrumped, fetching messages from it's configured queue, 
one at a time and then executing the process_message of the worker class.

The worker consumer can compose further funcionality by consuming more roles 
from the SQS::Worker namespace.

=head1 USAGE

Simple usage

	package YourWorker;

	use Moose;
	with 'SQS::Worker';

	sub process_message {
		my ($self,$message) = @_;

    # $message is a Paws::SQS::Message
		# do something with the message 
	}

Composing automatic json decoding to perl data structure

	package YourWorker;
  use Moose;
	with 'SQS::Worker', 'SQS::Worker::DecodeJson';

	sub process_message {
		my ($self, $data) = @_;
		
		# Do something with the data, already parsed into a structure
		my $name = $data->{name};

    # You get a logger attached to the worker so you can log stuff
    $c->log->info("I processed a message for $name");
	}

=head1 Bundled roles

L<SQS::Worker::DecodeJson> decodes the message body in json format and passes 

L<SQS::Worker::DecodeStorable> decodes the message body in Perl storable format

L<SQS::Worker::Multiplex> dispatches to different methods via a dispatch table

L<SQS::Worker::SNS> decodes a message sent from SNS and inflates it to a C<SNS::Notfication>

=head1 Creating your own processing module

Create a Moose role that wraps functionality around the method C<process_message>

  package PrefixTheMessage;
    use Moose::Role;

    around process_message => sub {
      my ($orig, $self, $message) = @_;
      return 'prefixed ' . $message->Body;
    };

  1;

And then use it inside your consumers

  package YourWorker;
  
	use Moose;
	with 'SQS::Worker', 'PrefixTheMessage';
  
	sub process_message {
		my ($self, $message) = @_;
    # surprise! $message is prefixed!
  }
  
  1;

=head1 Composing roles

The worker roles can be composed (if it makes sense), so your worker could implement

  with 'SQS::Worker', 'SQS::Worker::DecodeJson', 'SQS::Worker::Multiplex';

to decode a message in json format that will then dispatch the json to the multiplex worker

=head1 Error handling

Any exception thrown from process_message will be treated as a failed message. Different
message processors treat failed messages in different ways:

=head1 Message processors

L<SQS::Consumers::Default> Messages processed before deleting them from the queue. If a message fails, 
it will be treated by SQS as an unprocessed message, and will reappear in the queue to be processed
again by SQS (or delivered to a dead letter queue after N redeliveries if your SQS queue is configured 
appropiately

L<SQS::Consumers::DeleteAlways> Message deleted, then processed. If a message fails it will
not be reprocessed ever

=head1 Running the worker

Running the worker can be done via the C<spawn_worker> command that comes bundled with the 
distribution

  spawn_worker --worker YourWorker --queue_url sqs_endpoint_url --region aws_sqs_region --log_conf log4perl_config_file_path

You can also control if the message should be deleted upon reception (before the message is actually processed) with

 spawn_worker --worker YourClass --queue_url sqs_endpoint_url --region aws_sqs_region --log_conf log4perl_config_file_path --consumer DeleteAlways

or you can create an instance of your object and invoke run:

  my $worker_instance = YourWorker->new(
    queue_url => $args->queue_url,
    region    => $args->region,
    log => Log::Log4perl->get_logger('async'),
    processor => $args->_consumer,
  );
  $worker_instance->run

=head1 Credentials

SQS::Worker uses the same credential system as L<Paws> to authenticate to SQS: so, in a nutshell, it
will work if you:

=over

=item *

have the credentials in the home of the user launching the script, in the ~/.aws/credentials file.

=item *

assign an IAM role to the EC2 instance that is running the code (if deploying the code inside an EC2 instance)

=item *

set environment variables: AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY

=back

=head1 SEE ALSO
 
L<Paws>
 
=head1 COPYRIGHT and LICENSE
 
Copyright (c) 2016 by CAPSiDE
 
This code is distributed under the Apache 2 License. The full text of the license can be found in the LICENSE file included with this module.
 
=head1 AUTHORS

Jose Luis Martinez, Albert Hilazo, Pau Cervera and Loic Prieto

=cut


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.