POE-Component-Client-Stomp-Utils/lib/POE/Component/Client/Stomp/Utils.pm
package POE::Component::Client::Stomp::Utils;
use 5.008;
use strict;
use warnings;
use Net::Stomp::Frame;
our $VERSION = '0.02';
# -------------------------------------------------------------------
sub new {
my $proto = shift;
my $self = {};
my $class = ref($proto) || $proto;
$self->{transaction_id} = 0;
$self->{session_id} = 0;
$self->{message_id} = 0;
bless($self, $class);
return $self;
}
# -------------------------------------------------------------------
# Mutators
# -----------------------------------------------------------
sub transaction_id {
my ($self, $a) = @_;
$self->{transaction_id} = $a if (defined $a);
return $self->{transaction_id};
}
sub session_id {
my ($self, $a) = @_;
$self->{session_id} = $a if (defined $a);
return $self->{session_id};
}
sub message_id {
my ($self, $a) = @_;
$self->{message_id} = $a if (defined $a);
return $self->{message_id};
}
# -----------------------------------------------------------
# Methods
# -----------------------------------------------------------
sub connect {
my ($self, $params) = @_;
my $body = '';
my $command = 'CONNECT';
return(Net::Stomp::Frame->new({command => $command,
headers => $params,
body => $body}));
}
sub subscribe {
my ($self, $params) = @_;
my $body = '';
my $command = 'SUBSCRIBE';
return(Net::Stomp::Frame->new({command => $command,
headers => $params,
body => $body}));
}
sub unsubscribe {
my ($self, $params) = @_;
my $body = '';
my $command = 'UNSUBSCRIBE';
return(Net::Stomp::Frame->new({command => $command,
headers => $params,
body => $body}));
}
sub begin {
my ($self, $params) = @_;
my $body = '';
my $command = 'BEGIN';
$self->transaction_id($params->{transaction});
return(Net::Stomp::Frame->new({command => $command,
headers => $params,
body => $body}));
}
sub commit {
my ($self, $params) = @_;
my $body = '';
my $command = 'COMMIT';
$params->{transaction} = $self->transaction_id;
return(Net::Stomp::Frame->new({command => $command,
headers => $params,
body => $body}));
}
sub ack {
my ($self, $params) = @_;
my $body = '';
my $command = 'ACK';
$params->{transaction} = $self->transaction_id
if ($self->{transaction_id} > 0);
return(Net::Stomp::Frame->new({command => $command,
headers => $params,
body => $body}));
}
sub abort {
my ($self, $params) = @_;
my $body = '';
my $command = 'ABORT';
$params->{transaction} = $self->transaction_id;
$self->{transaction_id} = 0;
return(Net::Stomp::Frame->new({command => $command,
headers => $params,
body => $body}));
}
sub disconnect {
my ($self, $params) = @_;
my $body = '';
my $command = 'DISCONNECT';
return(Net::Stomp::Frame->new({command => $command,
headers => $params,
body => $body}));
}
sub send {
my ($self, $params) = @_;
my $body = $params->{data};
my $command = 'SEND';
delete $params->{data};
$params->{'content-length'} = length($body);
return(Net::Stomp::Frame->new({command => $command,
headers => $params,
body => $body}));
}
1;
__END__
# Below is stub documentation for your module. You'd better edit it!
=head1 NAME
POE::Component::Client::Stomp::Utils - A set of utility routines for POE clients
that wish to use a Message Queue server that understands the Stomp protocol.
=head1 SYNOPSIS
This module uses Net::Stomp::Frame to create frames for usage within POE based
programs that wish to communicate to Message Queue servers.
=head1 DESCRIPTION
Your program could use this module in the following fashion:
use POE;
use POE::Component::Client::Stomp;
use POE::Component::Client::Stomp::Utils;
my $stomp = POE::Component::Client::Stomp::Utils->new();
my $frame = $stomp->connect({login => 'test', passcode => 'test'});
$heap->{server}->put($frame);
The above examples creates a "CONNENCT" frame and sends it to the server. If
the connection suceeds, the server will send back a "CONNECTION" frame. The
handling of that frame is left up to your input handlers.
A Stomp frame consists of the following:
COMMAND\012
HEADERS\012
BODY\000
Each command may have one or more headers. Some of those headers are
optional. For the most part, the parameters passed to these methods are
literal translations of what the protocol needs for each of the frame
commands. This was done, because the protocol is described as being in flux,
with a 1.0 version being available "real soon now".
So, NO ERROR checking is done. How the server will handle protocol errors is
highly dependent on the servers implementation. For example, the server that
I have been testing against, quietly dies. You have been warned...
No serialization of data is done within these routines. You will need to
decide what serialiazarion is needed and perform that serialization before
calling these methods. I have found the JSON is a light, and efficent
serialization method. And just about every other lanaguage has a JSON
implementation readily avaiable.
Some terminology issues. I am using the term "channel" to describe the
communications pathway to named resources. The documentation for
some of the vaious Message Queue servers, are using terms such "queue",
"topic" and other nouns to describe the same thing.
=head1 METHODS
=over 4
=item new
This method initializes the base object. It also creates internal storage for
the session ID, the message ID and the transaction ID. If a transaction ID has
been set, it will be automatically passed to those methods that require one.
=over 4
=item Example
$stomp = POE::Component::Client::Stomp::Utils->new();
=back
=item connect
This method creates a "connect" frame. This frame is used to initiate a
session with a Stomp server.
=over 4
=item Example
$frame = $stomp->connect({login => 'test', passcode => 'test'});
$heap->{server}->put($frame);
=back
=item disconnect
This method creates a "disconnect' frame. This framse is used to signal the
server that you no longer wish to communicate with it.
=over 4
=item Example
$frame = $stomp->disconnect();
$heap->{server}->put($frame);
=back
=item subscribe
This method create a "subscribe" frame. This frame is used to notify
the server which channels you want to listen too. The naming of channels is
left up to the server implementation. When a message is available on requested
channels, it will be sent your program.
=over 4 Example
$frame = $stomp->subscribe({destination => '/queue/test',
ack => 'client'});
$heap->{server}->put($frame);
=back
=item unsubscribe
This method creates an "unsubscribe" frame. This frame is used to notify the
server that you don't want to listen on that channel anymore. Subsequently
any messages left on that channel will no longer be sent to your program.
=over 4 Example
$frame = $stomp->unsubcribe({destination => 'test'});
$heap->{server}->put($frame);
=back
=item begin
This method creates a "begin" frame. This frame signals the server that a
transaction is beginning. A transaction is either ended by a "commit" frame
or an "abort" frame. Any other frame that is sent must have a transaction id
associated with them. This is handled internally. The transaction id can be
anything that makes sense to you.
=over 4
=item Example
$frame = $stomp->begin({transaction => '1234'});
$heap->{server}->put($frame);
$frame = $stomp->send({destination => 'test',
data => 'this is my message'});
$heap->{server}->put($frame);
$frame = $stomp->commit();
$heap->{server}->put($frame);
=back
=item commit
This method creates a "commit" frame. This frame signals the end of a
transaction. See the above example on usage.
=item abort
This method creates an "abort" frame. This frame is used to signal the server
that the current transaction is to be aborted.
=over 4
=item Example
$frame = $stomp->abort();
$heap->{server}->put($frame);
=back
=item send
This method create a "send" frame. This frame is the basis of communication
over your channel to the server.
=over 4
=item Example
$frame = $stomp->send({destination => 'test',
data => 'this is my packet'});
$heap->{server}->put($frame);
or
$message = objToJson($data);
$frame = $stomp->send({destination => 'test',
data => $message,
receipt => 'abcd'});
$heap->{server}->put($frame);
sub input_handler {
my ($heap, $frame) = @_[HEAP, ARG0 ];
if (($frame->command eq 'RECEIPT') and
($frame->headers->{'receipt-id'} eq 'abcd')) {
print "Message was seccessfully sent!\n";
}
}
=back
=item ack
This method creates an "ack" frame. This frame is used to tell the server that
the message was successfully received. It requires a message id.
=over 4
=item Example
$frame = $stomp->ack({'message-id' => '1234'});
$heap->{server}->put($frame);
or
sub input_handler {
my ($heap, $frame) = @_[HEAP, ARG0];
if ($frame->command eq 'MESSAGE') {
$stomp->message_id($frame->headers->{'message-id'});
$poe_kernel->post('test' => srv_message => $frame);
}
}
sub srv_message {
my ($heap, $frame) = @_[HEAP, ARG0];
my $frame = $stomp->ack({'message-id' => $stomp->message_id});
my $data = jsonToObj($frame->body);
handle_data($data);
$heap->{server}->put($frame);
}
=back
=head1 MUTATORS
=item transaction_id
This mutator will set/get the current transaction id. This transaction id will
be used automatically in other methods that require this parameter when
a transaction is in progress.
=over 4
=item Example
$trans_id = $stomp->transaction_id;
$stomp->transaction_id('1234');
=back
=item message_id
This mutator will set/get the current message id.
=over 4
=item Example
$msg_id = $stomp->message_id;
$stomp->message_id('1234');
=back
=item session_id
This mutator will get/set the session_id. The session id is set once upon
initial connection to the server.
=head1 EXPORT
None by default.
=head1 SEE ALSO
Net::Stomp
Net::Stomp::Frame
POE::Component::Client::Stomp
http://stomp.codehaus.org/Protocol
=head1 AUTHOR
Kevin L. Esteb, E<lt>kesteb@wsipc.orgE<gt>
=head1 COPYRIGHT AND LICENSE
Copyright (C) 2007 by Kevin L. Esteb
This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself, either Perl version 5.8.8 or,
at your option, any later version of Perl 5 you may have available.
=cut