Group
Extension

Net-RabbitMQ-Client/Client.pm

package Net::RabbitMQ::Client;

use utf8;
use strict;
use vars qw($AUTOLOAD $VERSION $ABSTRACT @ISA @EXPORT);

BEGIN {
	$VERSION = 0.4;
	$ABSTRACT = "RabbitMQ client (XS for librabbitmq)";
	
	@ISA = qw(Exporter DynaLoader);
	@EXPORT = qw(
		AMQP_STATUS_OK AMQP_STATUS_NO_MEMORY AMQP_STATUS_BAD_AMQP_DATA AMQP_STATUS_UNKNOWN_CLASS
		AMQP_STATUS_UNKNOWN_METHOD AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION
		AMQP_STATUS_CONNECTION_CLOSED AMQP_STATUS_BAD_URL AMQP_STATUS_SOCKET_ERROR AMQP_STATUS_INVALID_PARAMETER
		AMQP_STATUS_TABLE_TOO_BIG AMQP_STATUS_WRONG_METHOD AMQP_STATUS_TIMEOUT AMQP_STATUS_TIMER_FAILURE
		AMQP_STATUS_HEARTBEAT_TIMEOUT AMQP_STATUS_UNEXPECTED_STATE AMQP_STATUS_SOCKET_CLOSED AMQP_STATUS_SOCKET_INUSE
		AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD _AMQP_STATUS_NEXT_VALUE AMQP_STATUS_TCP_ERROR
		AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR _AMQP_STATUS_TCP_NEXT_VALUE AMQP_STATUS_SSL_ERROR
		AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED AMQP_STATUS_SSL_PEER_VERIFY_FAILED
		
		AMQP_DELIVERY_NONPERSISTENT AMQP_DELIVERY_PERSISTENT
		
		AMQP_SASL_METHOD_UNDEFINED AMQP_SASL_METHOD_PLAIN AMQP_SASL_METHOD_EXTERNAL
		
		AMQP_RESPONSE_NONE AMQP_RESPONSE_NORMAL AMQP_RESPONSE_LIBRARY_EXCEPTION AMQP_RESPONSE_SERVER_EXCEPTION
		
		AMQP_FIELD_KIND_BOOLEAN AMQP_FIELD_KIND_I8 AMQP_FIELD_KIND_U8 AMQP_FIELD_KIND_I16 AMQP_FIELD_KIND_U16
		AMQP_FIELD_KIND_I32 AMQP_FIELD_KIND_U32 AMQP_FIELD_KIND_I64 AMQP_FIELD_KIND_U64 AMQP_FIELD_KIND_F32
		AMQP_FIELD_KIND_F64 AMQP_FIELD_KIND_DECIMAL AMQP_FIELD_KIND_UTF8 AMQP_FIELD_KIND_ARRAY AMQP_FIELD_KIND_TIMESTAMP
		AMQP_FIELD_KIND_TABLE AMQP_FIELD_KIND_VOID AMQP_FIELD_KIND_BYTES
		
		AMQP_PROTOCOL_VERSION_MAJOR AMQP_PROTOCOL_VERSION_MINOR AMQP_PROTOCOL_VERSION_REVISION AMQP_PROTOCOL_PORT
		AMQP_FRAME_METHOD AMQP_FRAME_HEADER AMQP_FRAME_BODY AMQP_FRAME_HEARTBEAT AMQP_FRAME_MIN_SIZE AMQP_FRAME_END
		AMQP_REPLY_SUCCESS AMQP_CONTENT_TOO_LARGE AMQP_NO_ROUTE AMQP_NO_CONSUMERS AMQP_ACCESS_REFUSED AMQP_NOT_FOUND
		AMQP_RESOURCE_LOCKED AMQP_PRECONDITION_FAILED AMQP_CONNECTION_FORCED AMQP_INVALID_PATH AMQP_FRAME_ERROR
		AMQP_SYNTAX_ERROR AMQP_COMMAND_INVALID AMQP_CHANNEL_ERROR AMQP_UNEXPECTED_FRAME AMQP_RESOURCE_ERROR AMQP_NOT_ALLOWED
		AMQP_NOT_IMPLEMENTED AMQP_INTERNAL_ERROR
		
		AMQP_BASIC_CLASS AMQP_BASIC_CONTENT_TYPE_FLAG AMQP_BASIC_CONTENT_ENCODING_FLAG AMQP_BASIC_HEADERS_FLAG
		AMQP_BASIC_DELIVERY_MODE_FLAG AMQP_BASIC_PRIORITY_FLAG AMQP_BASIC_CORRELATION_ID_FLAG AMQP_BASIC_REPLY_TO_FLAG
		AMQP_BASIC_EXPIRATION_FLAG AMQP_BASIC_MESSAGE_ID_FLAG AMQP_BASIC_TIMESTAMP_FLAG AMQP_BASIC_TYPE_FLAG AMQP_BASIC_USER_ID_FLAG
		AMQP_BASIC_APP_ID_FLAG AMQP_BASIC_CLUSTER_ID_FLAG
	);
};

bootstrap Net::RabbitMQ::Client $VERSION;

use DynaLoader ();
use Exporter ();

my $STATUSES = {
	0  => undef,
	10 => "Can't open socket",
	20 => "Can't login on server",
	30 => "Can't open chanel",
	40 => "Can't declare exchange",
	50 => "Cant' declare queue",
	51 => "Can't bind queue",
	60 => "Can't basic consume",
	70 => "Can't publish data"
};

sub _destroy_and_status {
	my ($self, $status) = @_;
	
	$self->sm_destroy();
	
	$status;
}

sub sm_new {
	my ($class) = shift;
	my $config = {
		host => '', port => 5672, channel => 1,
		login => '', password => '',
		exchange => undef, exchange_type => undef, exchange_declare => 0,
		queue => undef, routingkey => '', queue_declare => 0,
		@_
	};
	
	my $rmq    = $class->create();
	my $conn   = $rmq->new_connection();
	my $socket = $rmq->tcp_socket_new($conn);
	
	my $self = bless {rmq => $rmq, conn => $conn, socket => $socket, config => $config}, $class;
	
	my $status = $rmq->socket_open($socket, $config->{host}, $config->{port});
	return $self->_destroy_and_status(10) if $status;
	
	$status = $rmq->login($conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN(), $config->{login}, $config->{password});
	return $self->_destroy_and_status(20) if $status != AMQP_RESPONSE_NORMAL();
	
	$status = $rmq->channel_open($conn, $config->{channel});
	return $self->_destroy_and_status(30) if $status != AMQP_RESPONSE_NORMAL();
	
	if (defined $config->{exchange} && $config->{exchange_declare} &&
	    $config->{exchange} ne '' && defined $config->{exchange_type})
	{
		$status = $rmq->exchange_declare($conn, $config->{channel}, $config->{exchange}, $config->{exchange_type}, 0, 1, 0, 0);
		return $self->_destroy_and_status(40) if $status != AMQP_RESPONSE_NORMAL();
	}
	
	if (defined $config->{queue} && $config->{queue} ne '')
	{
		if ($config->{queue_declare}) {
			$rmq->queue_declare($conn, $config->{channel}, $config->{queue}, 0, 1, 0, 0);
			return $self->_destroy_and_status(50) if $status != AMQP_RESPONSE_NORMAL();
		}
		
		if (exists $config->{exchange} && $config->{exchange} ne '') {
			$rmq->queue_bind($conn, $config->{channel}, $config->{queue}, $config->{exchange}, $config->{routingkey}, 0);
			return $self->_destroy_and_status(51) if $status != AMQP_RESPONSE_NORMAL();
		}
	}
	
	$self;
}

sub sm_destroy {
	my ($self) = @_;
	
	my $rmq = $self->{rmq};
	my $config = $self->{config};
	
	if (ref $rmq && ref $self->{conn} && ref $config eq "HASH") {
		$rmq->channel_close($self->{conn}, $config->{channel}, AMQP_REPLY_SUCCESS());
		$rmq->connection_close($self->{conn}, AMQP_REPLY_SUCCESS());
		$rmq->destroy_connection($self->{conn});
	}
}

sub sm_publish {
	my ($self, $data) = (shift, shift);
	my $args = {
		content_type  => "text/plain",
		delivery_mode => AMQP_DELIVERY_PERSISTENT(),
		_flags        => AMQP_BASIC_CONTENT_TYPE_FLAG()|AMQP_BASIC_DELIVERY_MODE_FLAG(),
		priority      => undef
		@_
	};
	
	my $rmq     = $self->{rmq};
	my $conn    = $self->{conn};
	my $config  = $self->{config};
	my $channel = $config->{channel};
	
	my $props = $rmq->type_create_basic_properties();
        $rmq->set_prop__flags($props, $args->{"_flags"});
        $rmq->set_prop_content_type($props, $args->{content_type});
        $rmq->set_prop_delivery_mode($props, $args->{delivery_mode});
	$rmq->set_prop_priority($props, $args->{priority}) if defined $args->{priority};
	
        my $status = $rmq->basic_publish($conn, $channel, $config->{exchange}, $config->{routingkey}, 0, 0, $props, $data);
	
        if($status != AMQP_STATUS_OK()) {
		$status = 70;
        }
	
        $rmq->type_destroy_basic_properties($props);
	
	$status;
}

sub sm_get_messages {
	my ($self, $callback) = @_;
	
	my $rmq     = $self->{rmq};
	my $conn    = $self->{conn};
	my $config  = $self->{config};
	my $channel = $config->{channel};
	
	my $status = $rmq->basic_consume($conn, $channel, $config->{queue}, undef, 0, 0, 0);
	return 60 if $status != AMQP_RESPONSE_NORMAL();
	
	my $envelope = $rmq->type_create_envelope();
	
	while (1) {
		$rmq->maybe_release_buffers($conn);
		
		my $status = $rmq->consume_message($conn, $envelope, 0, 0);
		next if $status != AMQP_RESPONSE_NORMAL();
		
		if($callback->($self, $rmq->envelope_get_message_body($envelope))) {
			$rmq->basic_ack($conn, $channel, $rmq->envelope_get_delivery_tag($envelope), 0);
		}
		
		$rmq->destroy_envelope($envelope);
	}
	
	$rmq->type_destroy_envelope($envelope);
	
	0;
}

sub sm_get_message {
	my ($self) = shift;
	
	my $rmq     = $self->{rmq};
	my $conn    = $self->{conn};
	my $config  = $self->{config};
	my $channel = $config->{channel};
	
	my $status = $rmq->basic_consume($conn, $channel, $config->{queue}, undef, 0, 0, 0);
	return $_[0] = 60 if $status != AMQP_RESPONSE_NORMAL() && exists $_[0];
	
	my $envelope = $rmq->type_create_envelope();
	my $message;
	
	$rmq->maybe_release_buffers($conn);
	
	$status = $rmq->consume_message($conn, $envelope, 0, 0);
	
	if($status == AMQP_RESPONSE_NORMAL()) {
		$message = $self, $rmq->envelope_get_message_body($envelope);
	}
	
	$rmq->basic_ack($conn, $channel, $rmq->envelope_get_delivery_tag($envelope), 0);
	$rmq->destroy_envelope($envelope);
	
	$rmq->type_destroy_envelope($envelope);
	
	$message;
}

sub sm_get_rabbitmq   {$_[0]->{rmq}}
sub sm_get_connection {$_[0]->{conn}}
sub sm_get_socket     {$_[0]->{socket}}
sub sm_get_config     {$_[0]->{config}}

sub sm_get_error_desc {$STATUSES->{$_[0]}}

1;


__END__

=head1 NAME

Net::RabbitMQ::Client - RabbitMQ client (XS for librabbitmq)

=head1 SYNOPSIS


Simple API:

	use utf8;
	use strict;
	
	use Net::RabbitMQ::Client;
	
	produce();
	consume();
	
	sub produce {
		my $simple = Net::RabbitMQ::Client->sm_new(
			host => "best.host.for.rabbitmq.net",
			login => "login", password => "password",
			exchange => "test_ex", exchange_type => "direct", exchange_declare => 1,
			queue => "test_queue", queue_declare => 1
		);
		die sm_get_error_desc($simple) unless ref $simple;
		
		my $sm_status = $simple->sm_publish('{"say": "hello"}', content_type => "application/json");
		die sm_get_error_desc($sm_status) if $sm_status;
		
		$simple->sm_destroy();
	}
	
	sub consume {
		my $simple = Net::RabbitMQ::Client->sm_new(
			host => "best.host.for.rabbitmq.net",
			login => "login", password => "password",
			queue => "test_queue"
		);
		die sm_get_error_desc($simple) unless ref $simple;
		
		my $sm_status = $simple->sm_get_messages(sub {
			my ($self, $message) = @_;
			
			print $message, "\n";
			
			1; # it is important to return 1 (send ask) or 0
		});
		die sm_get_error_desc($sm_status) if $sm_status;
		
		$simple->sm_destroy();
	}


Base API:

	use utf8;
	use strict;
	
	use Net::RabbitMQ::Client;
	
	produce();
	consume();
	
	sub produce {
		my $rmq = Net::RabbitMQ::Client->create();
		
		my $exchange = "test";
		my $routingkey = "";
		my $messagebody = "lalala";
		
		my $channel = 1;
		
		my $conn = $rmq->new_connection();
		my $socket = $rmq->tcp_socket_new($conn);
		
		my $status = $rmq->socket_open($socket, "best.host.for.rabbitmq.net", 5672);
		die "Can't create socket" if $status;
		
		$status = $rmq->login($conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "login", "password");
		die "Can't login on server" if $status != AMQP_RESPONSE_NORMAL;
		
		$status = $rmq->channel_open($conn, $channel);
		die "Can't open chanel" if $status != AMQP_RESPONSE_NORMAL;
	    
		$rmq->queue_bind($conn, 1, "test_q", $exchange, $routingkey, 0);
		die "Can't bind queue" if $status != AMQP_RESPONSE_NORMAL;
		
		my $props = $rmq->type_create_basic_properties();
		$rmq->set_prop__flags($props, AMQP_BASIC_CONTENT_TYPE_FLAG|AMQP_BASIC_DELIVERY_MODE_FLAG);
		$rmq->set_prop_content_type($props, "text/plain");
		$rmq->set_prop_delivery_mode($props, AMQP_DELIVERY_PERSISTENT);
		
		$status = $rmq->basic_publish($conn, $channel, $exchange, $routingkey, 0, 0, $props, $messagebody);
		
		if($status != AMQP_STATUS_OK) {
		    print "Can't send message\n";
		}
		
		$rmq->type_destroy_basic_properties($props);
		
		$rmq->channel_close($conn, 1, AMQP_REPLY_SUCCESS);
		$rmq->connection_close($conn, AMQP_REPLY_SUCCESS);
		$rmq->destroy_connection($conn);
	}
	
	sub consume {
		my $rmq = Net::RabbitMQ::Client->create();
		
		my $exchange = "test";
		my $routingkey = "";
		my $messagebody = "lalala";
		
		my $channel = 1;
		
		my $conn = $rmq->new_connection();
		my $socket = $rmq->tcp_socket_new($conn);
		
		my $status = $rmq->socket_open($socket, "best.host.for.rabbitmq.net", 5672);
		die "Can't create socket" if $status;
		
		$status = $rmq->login($conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "login", "password");
		die "Can't login on server" if $status != AMQP_RESPONSE_NORMAL;
		
		$status = $rmq->channel_open($conn, $channel);
		die "Can't open chanel" if $status != AMQP_RESPONSE_NORMAL;
		
		$status = $rmq->basic_consume($conn, 1, "test_q", undef, 0, 1, 0);
		die "Consuming" if $status != AMQP_RESPONSE_NORMAL;
		
		my $envelope = $rmq->type_create_envelope();
		
		while (1)
		{
			$rmq->maybe_release_buffers($conn);
			
			$status = $rmq->consume_message($conn, $envelope, 0, 0);
			last if $status != AMQP_RESPONSE_NORMAL;
			
			print "New message: \n", $rmq->envelope_get_message_body($envelope), "\n";
			
			$rmq->destroy_envelope($envelope);
		}
		
		$rmq->type_destroy_envelope($envelope);
		
		$rmq->channel_close($conn, 1, AMQP_REPLY_SUCCESS);
		$rmq->connection_close($conn, AMQP_REPLY_SUCCESS);
		$rmq->destroy_connection($conn);
	}


=head1 DESCRIPTION

This is binding for RabbitMQ-C library.

Please, before install this module make RabbitMQ-C library.

See https://github.com/alanxz/rabbitmq-c

https://github.com/lexborisov/perl-net-rabbitmq-client


=head1 METHODS

=head2 Simple API

=head3 sm_new

	my $simple = Net::RabbitMQ::Client->sm_new(
		host => '',              # default: 
		port => 5672,            # default: 5672
		channel => 1,            # default: 1
		login => '',             # default: 
		password => '',          # default: 
		exchange => undef,       # default: undef
		exchange_type => undef,  # optional, if exchange_declare == 1; types: topic, fanout, direct, headers; default: undef
		exchange_declare => 0,   # declare exchange or not; 1 or 0; default: 0
		queue => undef,          # default: undef
		routingkey => '',        # default: 
		queue_declare => 0,      # declare queue or not; 1 or 0; default: 0
	);

Return: a Simple object if successful, otherwise an error occurred


=head3 sm_publish

	my $sm_status = $simple->sm_publish($text,
		# default args
		content_type  => "text/plain",
		delivery_mode => AMQP_DELIVERY_PERSISTENT,
		_flags        => AMQP_BASIC_CONTENT_TYPE_FLAG|AMQP_BASIC_DELIVERY_MODE_FLAG
	);

Return: 0 if successful, otherwise an error occurred


=head3 sm_get_messages

Loop to get messages

	my $callback = {
		my ($simple, $message) = @_;
		
		1; # it is important to return 1 (send ask) or 0
	}
	
	my $sm_status = $simple->sm_get_messages($callback);

Return: 0 if successful, otherwise an error occurred


=head3 sm_get_message

Get one message
	
	my $sm_status = 0;
	my $message = $simple->sm_get_message($sm_status);

Return: message if successful


=head3 sm_get_rabbitmq

	my $rmq = $simple->sm_get_rabbitmq();
	
Return: RabbitMQ Base object


=head3 sm_get_connection

	my $conn = $simple->sm_get_connection();
	
Return: Connection object (from Base API new_connection)


=head3 sm_get_socket

	my $socket = $simple->sm_get_socket();
	
Return: Socket object (from Base API tcp_socket_new)


=head3 sm_get_config

	my $config = $simple->sm_get_config();
	
Return: Config when creating a Simple object


=head3 sm_get_config

	my $description = $simple->sm_get_error_desc($sm_error_code);
	
Return: Error description by Simple error code


=head3 sm_destroy

Destroy a Simple object
	
	$simple->sm_destroy();
	

=head2 Base API

=head2 Connection and Authorization

=head3 create

 my $rmq = Net::RabbitMQ::Client->create();

Return: rmq


=head3 new_connection

 my $amqp_connection_state_t = $rmq->new_connection();

Return: amqp_connection_state_t


=head3 tcp_socket_new

 my $amqp_socket_t = $rmq->tcp_socket_new($conn);

Return: amqp_socket_t


=head3 socket_open

 my $status = $rmq->socket_open($socket, $host, $port);

Return: status


=head3 socket_open_noblock

 my $status = $rmq->socket_open_noblock($socket, $host, $port, $struct_timeout);

Return: status


=head3 login

 my $status = $rmq->login($conn, $vhost, $channel_max, $frame_max, $heartbeat, $sasl_method);

Return: status


=head3 channel_open

 my $status = $rmq->channel_open($conn, $channel);

Return: status


=head3 socket_get_sockfd

 my $res = $rmq->socket_get_sockfd($socket);

Return: variable


=head3 get_socket

 my $amqp_socket_t  = $rmq->get_socket($conn);

Return: amqp_socket_t 


=head3 channel_close

 my $status = $rmq->channel_close($conn, $channel, $code);

Return: status


=head3 connection_close

 my $status = $rmq->connection_close($conn, $code);

Return: status


=head3 destroy_connection

 my $status = $rmq->destroy_connection($conn);

Return: status


=head2 SSL

=head3 ssl_socket_new

 my $amqp_socket_t = $rmq->ssl_socket_new($conn);

Return: amqp_socket_t


=head3 ssl_socket_set_key

 my $status = $rmq->ssl_socket_set_key($socket, $cert, $key);

Return: status


=head3 set_initialize_ssl_library

 $rmq->set_initialize_ssl_library($do_initialize);

=head3 ssl_socket_set_cacert

 my $status = $rmq->ssl_socket_set_cacert($socket, $cacert);

Return: status


=head3 ssl_socket_set_key_buffer

 my $status = $rmq->ssl_socket_set_key_buffer($socket, $cert, $key, $n);

Return: status


=head3 ssl_socket_set_verify

 $rmq->ssl_socket_set_verify($socket, $verify);

=head2 Basic Publish/Consume

=head3 basic_publish

 my $status = $rmq->basic_publish($conn, $channel, $exchange, $routing_key, $mandatory, $immediate, $properties, $body);

Return: status


=head3 basic_consume

 my $status = $rmq->basic_consume($conn, $channel, $queue, $consumer_tag, $no_local, $no_ack, $exclusive);

Return: status


=head3 basic_get

 my $status = $rmq->basic_get($conn, $channel, $queue, $no_ack);

Return: status


=head3 basic_ack

 my $status = $rmq->basic_ack($conn, $channel, $delivery_tag, $multiple);

Return: status


=head3 basic_nack

 my $status = $rmq->basic_nack($conn, $channel, $delivery_tag, $multiple, $requeue);

Return: status


=head3 basic_reject

 my $status = $rmq->basic_reject($conn, $channel, $delivery_tag, $requeue);

Return: status


=head2 Consume

=head3 consume_message

 my $status = $rmq->consume_message($conn, $envelope, $struct_timeout, $flags);

Return: status


=head2 Queue

=head3 queue_declare

 my $status = $rmq->queue_declare($conn, $channel, $queue, $passive, $durable, $exclusive, $auto_delete);

Return: status


=head3 queue_bind

 my $status = $rmq->queue_bind($conn, $channel, $queue, $exchange, $routing_key);

Return: status


=head3 queue_unbind

 my $status = $rmq->queue_unbind($conn, $channel, $queue, $exchange, $routing_key);

Return: status


=head2 Exchange

=head3 exchange_declare

 my $status = $rmq->exchange_declare($conn, $channel, $exchange, $type, $passive, $durable, $auto_delete, $internal);

Return: status


=head2 Envelope

=head3 envelope_get_redelivered

 my $res = $rmq->envelope_get_redelivered($envelope);

Return: variable


=head3 envelope_get_channel

 my $res = $rmq->envelope_get_channel($envelope);

Return: variable


=head3 envelope_get_exchange

 my $res = $rmq->envelope_get_exchange($envelope);

Return: variable


=head3 envelope_get_routing_key

 my $res = $rmq->envelope_get_routing_key($envelope);

Return: variable


=head3 destroy_envelope

 $rmq->destroy_envelope($envelope);

=head3 envelope_get_consumer_tag

 my $res = $rmq->envelope_get_consumer_tag($envelope);

Return: variable


=head3 envelope_get_delivery_tag

 my $res = $rmq->envelope_get_delivery_tag($envelope);

Return: variable


=head3 envelope_get_message_body

 my $res = $rmq->envelope_get_message_body($envelope);

Return: variable


=head2 Types

=head3 type_create_envelope

 my $amqp_envelope_t = $rmq->type_create_envelope();

Return: amqp_envelope_t


=head3 type_destroy_envelope

 $rmq->type_destroy_envelope($envelope);

=head3 type_create_timeout

 my $struct_timeval = $rmq->type_create_timeout($timeout_sec);

Return: struct_timeval


=head3 type_destroy_timeout

 $rmq->type_destroy_timeout($timeout);

=head3 type_create_basic_properties

 my $amqp_basic_properties_t = $rmq->type_create_basic_properties();

Return: amqp_basic_properties_t


=head3 type_destroy_basic_properties

 my $status = $rmq->type_destroy_basic_properties($props);

Return: status


=head2 For a Basic Properties

=head3 set_prop_app_id

 $rmq->set_prop_app_id($props, $value);

=head3 set_prop_content_type

 $rmq->set_prop_content_type($props, $value);

=head3 set_prop_reply_to

 $rmq->set_prop_reply_to($props, $value);

=head3 set_prop_priority

 $rmq->set_prop_priority($props, $priority);

=head3 set_prop__flags

 $rmq->set_prop__flags($props, $flags);

=head3 set_prop_user_id

 $rmq->set_prop_user_id($props, $value);

=head3 set_prop_delivery_mode

 $rmq->set_prop_delivery_mode($props, $delivery_mode);

=head3 set_prop_message_id

 $rmq->set_prop_message_id($props, $value);

=head3 set_prop_timestamp

 $rmq->set_prop_timestamp($props, $timestamp);

=head3 set_prop_cluster_id

 $rmq->set_prop_cluster_id($props, $value);

=head3 set_prop_correlation_id

 $rmq->set_prop_correlation_id($props, $value);

=head3 set_prop_expiration

 $rmq->set_prop_expiration($props, $value);

=head3 set_prop_type

 $rmq->set_prop_type($props, $value);

=head3 set_prop_content_encoding

 $rmq->set_prop_content_encoding($props, $value);

=head2 Other

=head3 data_in_buffer

 my $amqp_boolean_t = $rmq->data_in_buffer($conn);

Return: amqp_boolean_t


=head3 maybe_release_buffers

 $rmq->maybe_release_buffers($conn);

=head3 error_string

 my $res = $rmq->error_string($error);

Return: variable


=head1 DESTROY

 undef $rmq;

Free mem and destroy object.

=head1 AUTHOR

Alexander Borisov <lex.borisov@gmail.com>

=head1 COPYRIGHT AND LICENSE

This software is copyright (c) 2015 by Alexander Borisov.

This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.

See librabbitmq license and COPYRIGHT https://github.com/alanxz/rabbitmq-c


=cut


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.