Group
Extension

NetSDS-Queue/lib/NetSDS/App/QueueProcessor.pm

#===============================================================================
#
#         FILE:  QueueProcessor.pm
#
#  DESCRIPTION:  NetSDS queue processing application
#
#        NOTES:  ---
#       AUTHOR:  Michael Bochkaryov (Rattler), <misha@rattler.kiev.ua>
#      COMPANY:  Net.Style
#      CREATED:  10.08.2009 20:57:57 EEST
#===============================================================================

=head1 NAME

NetSDS::App::QueueProcessor - queue processing server framework

=head1 SYNOPSIS

	-----------------------------
	# Configuration file

	# Queue server IP and port
	queue_server = "127.0.0.1:22201"

	# Pulling queue name
	queue_name = "myq"

	# Processing bandwidth (messages per second)
	bandwidth = 2

	# Timeout on idle loops
	idle_timeout = 3
	-----------------------------

	QProc->run(conf_file => './qproc.conf');

	1;

	package QProc;

	use Data::Dumper;
	use base 'NetSDS::App::QueueProcessor';

	# Message processing logic
	sub process {

		my ( $self, $msg ) = @_;

		# Just dump message structure
		print Dumper($msg);

	}

	1;

=head1 DESCRIPTION

C<NetSDS::App::QueueProcessor> module implements framework for applications
processing messages arriving from MemcacheQ queue server.

=cut

package NetSDS::App::QueueProcessor;

use 5.8.0;
use strict;
use warnings;

use Time::HiRes qw(sleep time);    # high resolution timer
use NetSDS::Queue;                 # MemcacheQ API
use base 'NetSDS::App';

use version; our $VERSION = '0.032';

#===============================================================================
#

=head1 CLASS API

=over

=item B<new([...])> - class constructor

=cut

#-----------------------------------------------------------------------
sub new {

	my ( $class, %params ) = @_;

	my $self = $class->SUPER::new(%params);

	return $self;

}


#***********************************************************************

=item B<initialize()> - application initialization

Internal method implementing common startup actions.

=cut 

#-----------------------------------------------------------------------

sub initialize {

	my ( $self, @params ) = @_;

	$self->SUPER::initialize(@params);

	# Initialize queue server
	$self->{server} = NetSDS::Queue->new(
		server => $self->conf->{queue_server},
	);
	$self->{queue_name} = $self->conf->{queue_name};

	# Set time for each iteration
	if ( $self->conf->{'bandwidth'} ) {
		my $bandwidth = $self->conf->{'bandwidth'} + 0;
		if ($bandwidth) {
			# Bandwidth is given in message/sec
			$self->{'sleep_period'} = ( 1 / $bandwidth );
		} else {
			$self->{'sleep_period'} = 1;    # Default bandwidth = 1 message/sec
		}
	} else {
		$self->{'sleep_period'} = 1;        # Default bandwidth = 1 message/sec
	}

	$self->{'idle_timeout'} = ( $self->conf->{'idle_timeout'} + 0 ) ? $self->conf->{'idle_timeout'} + 0 : 5;

	return $self;

} ## end sub initialize


#***********************************************************************

=item B<main_loop()> - main processing loop

Internal method for application logic.

=cut 

#-----------------------------------------------------------------------

sub main_loop {

	my ($self) = @_;

	$self->start();

	# Main processing loop itself
	while ( !$self->{to_finalize} ) {

		# Call production code
		while ( my $res = $self->{server}->pull( $self->{queue_name} ) ) {

			# Set iteration start timestamp in microseconds
			my $start_time = time;

			$self->process($res);
			last if ( $self->{to_finalize} );

			# If iteration finished fast - sleep
			if ( time < ( $start_time + $self->{'sleep_period'} ) ) {
				sleep( $self->{'sleep_period'} + $start_time - time );
			}

		}
		# Sleep if no messages in queue
		sleep $self->{'idle_timeout'};

		# Process infinite loop
		unless ( $self->{infinite} ) {
			$self->{to_finalize} = 1;
		}

	} ## end while ( !$self->{to_finalize...

	$self->stop();

} ## end sub main_loop

#***********************************************************************

=item B<process()> - main JSON-RPC iteration

This is internal method that implements JSON-RPC call processing.

=cut

#-----------------------------------------------------------------------

sub process {

	my ( $self, $msg ) = @_;

}

1;

__END__

=back

=head1 EXAMPLES

See C<samples/app_qproc.pl> appliction.

=head1 SEE ALSO

L<NetSDS::Queue>

L<NetSDS::App>

=head1 TODO

None

=head1 AUTHOR

Michael Bochkaryov <misha@rattler.kiev.ua>

=head1 LICENSE

Copyright (C) 2008-2009 Michael Bochkaryov

This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA

=cut




Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.