Group
Extension

Net-Async-AMQP/lib/Net/Async/AMQP/Channel.pm

package Net::Async::AMQP::Channel;
$Net::Async::AMQP::Channel::VERSION = '2.000';
use strict;
use warnings;

use parent qw(IO::Async::Notifier);

=head1 NAME

Net::Async::AMQP::Channel - represents a single channel in an MQ connection

=head1 VERSION

version 2.000

=head1 SYNOPSIS

 use IO::Async::Loop;
 use Net::Async::AMQP;
 my $loop = IO::Async::Loop->new;
 $loop->add(my $amqp = Net::Async::AMQP->new);
 $amqp->connect(
   host => 'localhost',
   username => 'guest',
   password => 'guest',
 )->then(sub {
  shift->open_channel->publish(
   type => 'application/json'
  )
 });

=head1 DESCRIPTION

Each Net::Async::AMQP::Channel instance represents a virtual channel for
communicating with the MQ server.

Channels are layered over the TCP protocol and most of the common AMQP frames
operate at channel level - typically you'd connect to the server, open one
channel for one-shot requests such as binding/declaring/publishing, and a further
channel for every consumer.

Since any error typically results in a closed channel, it's not recommended to
have multiple consumers on the same channel if there's any chance the Basic.Consume
request will fail.

=cut

use Future;
use curry::weak;
use Class::ISA ();
use Variable::Disposition qw(retain_future);
use Data::Dumper;
use Scalar::Util qw(weaken);

use Net::Async::AMQP;
use Net::Async::AMQP::Utils;

use overload
	'""' => sub { shift->as_string },
	'0+' => sub { 0 + shift->id },
	bool => sub { 1 },
	fallback => 1;

=head1 METHODS

=cut

sub configure {
	my ($self, %args) = @_;
	for(grep exists $args{$_}, qw(amqp)) {
		Scalar::Util::weaken($self->{$_} = delete $args{$_})
	}
	for(grep exists $args{$_}, qw(future id)) {
		$self->{$_} = delete $args{$_};
	}
	$self->SUPER::configure(%args);
}

=head2 confirm_mode

Switches confirmation mode on for this channel.
In confirm mode, all messages must be ACKed
explicitly after delivery.

Note that this is an irreversible operation - once
confirm mode has been enabled on a channel, closing that
channel and reopening is the only way to turn off confirm
mode again.

Returns a L<Future> which will resolve with this
channel once complete.

 $ch->confirm_mode ==> $ch

=cut

sub confirm_mode {
	my $self = shift;
	my %args = @_;
	$self->debug_printf("Enabling confirm mode");
	die "already requested confirm_mode for this channel" if $self->{confirm_mode};

	my $f = $self->loop->new_future->set_label("set confirm_mode on channel " . $self->id);
	$self->{delivery_tag} = 0;
	$self->{confirm_mode} = $f;
	my $nowait = $self->nowait_from_args(%args);
	my $frame = Net::AMQP::Frame::Method->new(
		method_frame => Net::AMQP::Protocol::Confirm::Select->new(
			nowait => $nowait,
		)
	);

	# No-wait mode means we don't expect the SelectOk frame back
	if($nowait) {
		$f->done
	} else {
		$self->closure_protection($f);
	}
	$self->send_frame($frame);
	return $f->transform(done => sub { $self });
}

=head2 nowait_from_args

If we have a C<wait> argument, then return the inverse of that.

Otherwise, return zero.

=cut

sub nowait_from_args {
	my ($self, %args) = @_;
	return 0 unless exists $args{wait};
	return $args{wait} ? 0 : 1;
}

=head2 exchange_declare

Declares a new exchange.

Returns a L<Future> which will resolve with this
channel once complete.

 $ch->exchange_declare(
  exchange   => 'some_exchange',
  type       => 'fanout',
  autodelete => 1,
 ) ==> $ch

=cut

sub exchange_declare {
	my $self = shift;
	my %args = @_;
	die "No exchange specified" unless exists $args{exchange};
	die "No exchange type specified" unless exists $args{type};

	$args{exchange} //= '';
	$self->debug_printf("Declaring exchange [%s]", $args{exchange});

	my $f = $self->loop->new_future->set_label("declare exchange [" . $args{exchange} . "]");
	my $frame = Net::AMQP::Frame::Method->new(
		method_frame => Net::AMQP::Protocol::Exchange::Declare->new(
			exchange    => Net::AMQP::Value::String->new($args{exchange}),
			type        => Net::AMQP::Value::String->new($args{type}),
			passive     => $args{passive} || 0,
			durable     => $args{durable} || 0,
			auto_delete => $args{auto_delete} || 0,
			internal    => $args{internal} || 0,
			ticket      => 0,
			nowait      => 0,
		)
	);
	$self->push_pending(
		'Exchange::DeclareOk' => [ $f, $self ]
	);
	$self->closure_protection($f);
	$self->send_frame($frame);
	return $f;
}

=head2 exchange_bind

Binds an exchange to another exchange. This is a RabbitMQ-specific extension.

=cut

sub exchange_bind {
    my $self = shift;
    my %args = @_;
    die "No source exchange specified" unless exists $args{source};
    die "No destination exchange specified" unless exists $args{destination};

	$self->debug_printf("Binding exchange [%s] to [%s] with rkey [%s]", $args{source}, $args{destination}, $args{routing_key});

	my $f = $self->loop->new_future->set_label("bind exchange [" . $args{source} . "] to [" . $args{destination} . "]" . (exists $args{routing_key} ? (" rkey [" . $args{routing_key} . "]") : ""));
	my $frame = Net::AMQP::Frame::Method->new(
		method_frame => Net::AMQP::Protocol::Exchange::Bind->new(
			source      => Net::AMQP::Value::String->new($args{source}),
			destination => Net::AMQP::Value::String->new($args{destination}),
			(exists($args{routing_key}) ? ('routing_key' => Net::AMQP::Value::String->new($args{routing_key})) : ()),
			nowait      => 0,
		)
	);
	$self->push_pending(
		'Exchange::BindOk' => [ $f, $self ]
	);
	$self->closure_protection($f);
	$self->send_frame($frame);
	return $f;
}

=head2 queue_declare

Returns a L<Future> which will resolve with the new L<Net::Async::AMQP::Queue> instance,
the number of messages in the queue, and the number of consumers.

 $ch->queue_declare(
  queue      => 'some_queue',
 ) ==> ($q, $message_count, $consumer_count)

=cut

sub queue_declare {
	my $self = shift;
	my %args = @_;
	die "No queue specified" unless defined $args{queue};

	$self->future->then(sub {
		my $f = $self->loop->new_future->set_label("declare queue [" . $args{queue} . "]");
		my $ready = $self->loop->new_future->set_label("queue readiness for [" . $args{queue} . "]");
		my $q = Net::Async::AMQP::Queue->new(
			amqp    => $self->amqp,
			future  => $ready,
		);
		$self->debug_printf("Declaring queue [%s]", $args{queue});
		my $frame = Net::AMQP::Frame::Method->new(
			method_frame => Net::AMQP::Protocol::Queue::Declare->new(
				queue       => Net::AMQP::Value::String->new($args{queue}),
				passive     => $args{passive} || 0,
				durable     => $args{durable} || 0,
				exclusive   => $args{exclusive} || 0,
				auto_delete => $args{auto_delete} || 0,
				no_ack      => $args{no_ack} || 0,
				($args{arguments}
				? (arguments   => $args{arguments})
				: ()
				),
				ticket      => 0,
				nowait      => 0,
			)
		);
		$self->push_pending(
			'Queue::DeclareOk' => sub {
				my ($amqp, $frame) = @_;
				my $method_frame = $frame->method_frame;
				$q->queue_name($method_frame->queue);
				my $messages = $method_frame->message_count;
				my $consumer_count = $method_frame->consumer_count;
				$ready->done() unless $ready->is_ready;
				$f->done($q, $messages, $consumer_count) unless $f->is_ready;
			}
		);
		$self->closure_protection($f);
		$self->send_frame($frame);
		$f;
	})
}

sub next_dtag { ++shift->{delivery_tag} }

=head2 publish

Publishes a message on this channel.

Returns a L<Future> which will resolve with the
channel instance once the server has confirmed publishing is complete.

 $ch->publish(
  exchange => 'some_exchange',
  routing_key => 'some.rkey.here',
  type => 'some_type',
 ) ==> $ch

Some named parameters currently accepted - note that this list is likely to
expand in future:

=over 4

=item * ack - we default to ACK mode, so set this to 0 to turn off explicit server ACK
on message routing/delivery

=item * immediate - if set, will cause a failure if the message could not be routed
immediately to a consumer

=item * mandatory - if set, will require that the message ends up in a queue (i.e. will
fail messages sent to an exchange that do not have an appropriate binding)

=item * content_type - defaults to application/binary

=item * content_encoding - defaults to undef (none)

=item * timestamp - the message timestamp, defaults to epoch time

=item * expiration - use this to set per-message expiry, see L<https://www.rabbitmq.com/ttl.html>

=item * priority - defaults to undef (none), use this to take advantage of RabbitMQ 3.5+ priority support

=item * reply_to - which queue to reply to (used for RPC, default undef)

=item * correlation_id - unique message ID (used for RPC, default undef)

=item * delivery_mode - whether to persist message (default 1, don't persist - set to 2 for persistent, see also "durable" flag for queues)

=back

=cut

sub publish {
	my $self = shift;
	my %args = @_;
	die "no exchange" unless exists $args{exchange};

	$self->future->then(sub {
		my $f = $self->loop->new_future->set_label("publish on [" . $args{exchange} . "]");
		my $dtag = $self->next_dtag;
		if($self->{confirm_mode}) {
			push @{$self->{published}}, [ $dtag => $f ];
		} else {
			$f->done;
		}

		my @frames = $self->amqp->split_payload(
			$args{payload},
			exchange         => Net::AMQP::Value::String->new($args{exchange}),
			mandatory		 => $args{mandatory} // 0,
			immediate        => $args{immediate} // 0,
			(exists $args{routing_key} ? (routing_key => Net::AMQP::Value::String->new($args{routing_key})) : ()),
			ticket           => 0,
			content_type     => $args{content_type} // 'application/binary',
			content_encoding => $args{content_encoding},
			timestamp        => $args{timestamp} // time,
			type             => Net::AMQP::Value::String->new($args{type} // ''),
			user_id          => $self->amqp->user,
            headers          => $args{headers} || { },
			delivery_mode    => $args{delivery_mode} // 1,
			priority         => $args{priority} // 1,
			correlation_id   => $args{correlation_id},
			expiration       => (
				exists $args{expiration}
				# This would seem to make more sense as a numeric value, but the spec
				# defines this as a shortstr
				? Net::AMQP::Value::String->new($args{expiration})
				: undef
			),
			message_id       => $args{message_id},
			app_id           => $args{app_id},
			cluster_id       => $args{cluster_id},
			reply_to         => $args{reply_to},
			weight           => $args{weight} // 0,
		);
		$self->closure_protection($f);
		$self->send_frame(
			$_,
		) for @frames;
		$f
	})
}

=head2 qos

Changes QOS settings on the channel. Probably most
useful for limiting the number of messages that can
be delivered to us before we have to ACK/NAK to
proceed.

Returns a L<Future> which will resolve with the
channel instance once the operation is complete.

 $ch->qos(
  prefetch_count => 5,
  prefetch_size  => 1048576,
 ) ==> $ch

=cut

sub qos {
	my $self = shift;
	my %args = @_;

	$self->future->then(sub {
		my $f = $self->loop->new_future->set_label("set qos count " . ($args{prefetch_count} // 0) . " size " . ($args{prefetch_size} // 0));
		my $channel = $self->id;
		$self->push_pending(
			'Basic::QosOk' => [ $f, $self ],
		);

		my $frame = Net::AMQP::Frame::Method->new(
			method_frame => Net::AMQP::Protocol::Basic::Qos->new(
				nowait         => 0,
				prefetch_count => $args{prefetch_count},
				prefetch_size  => $args{prefetch_size} || 0,
			)
		);
		$self->closure_protection($f);
		$self->send_frame($frame);
		$f
	});
}

=head2 ack

Acknowledge a specific delivery.

Returns a L<Future> which will resolve with the
channel instance once the operation is complete.

 $ch->ack(
  delivery_tag => 123,
 ) ==> $ch

=cut

sub ack {
	my $self = shift;
	my %args = @_;

	my $id = $self->id;
	$self->future->on_done(sub {
		my $channel = $id;
		my $frame = Net::AMQP::Frame::Method->new(
			method_frame => Net::AMQP::Protocol::Basic::Ack->new(
			   # nowait      => 0,
				delivery_tag => $args{delivery_tag},
				multiple     => $args{multiple} // 0,
			)
		);
		$self->send_frame($frame);
	});
}

=head2 nack

Negative acknowledgement for a specific delivery.

Returns a L<Future> which will resolve with the
channel instance once the operation is complete.

 $ch->nack(
  delivery_tag => 123,
 ) ==> $ch

=cut

sub nack {
	my $self = shift;
	my %args = @_;

	my $id = $self->id;
	$self->future->on_done(sub {
		my $channel = $id;
		my $frame = Net::AMQP::Frame::Method->new(
			method_frame => Net::AMQP::Protocol::Basic::Nack->new(
			   # nowait      => 0,
				delivery_tag => $args{delivery_tag},
				multiple     => $args{multiple} // 0,
				requeue      => $args{requeue} // 0,
			)
		);
		$self->send_frame($frame);
	});
}

=head2 reject

Reject a specific delivery.

Returns a L<Future> which will resolve with the
channel instance once the operation is complete.

 $ch->nack(
  delivery_tag => 123,
 ) ==> $ch

=cut

sub reject {
	my ($self, %args) = @_;

	my $id = $self->id;
	$self->future->on_done(sub {
		my $channel = $id;
		my $frame = Net::AMQP::Frame::Method->new(
			method_frame => Net::AMQP::Protocol::Basic::Reject->new(
			   # nowait      => 0,
				delivery_tag => $args{delivery_tag},
				multiple     => $args{multiple} // 0,
				requeue      => $args{requeue} // 0,
			)
		);
		$self->send_frame($frame);
	});
}

=pod

Example output:

		'method_id' => 40,
		'reply_code' => 404,
		'class_id' => 60,
		'reply_text' => 'NOT_FOUND - no exchange \'invalidchan\' in vhost \'vhost\''

=cut

=head2 on_close

Called when the channel has been closed.

=cut

sub on_close {
	my ($self, $frame) = @_;

	$self->{is_closed} = 1;
	$self->{future} = Future->fail('closed');

	# ACK the close first - we have to send a close-ok
	# before it's legal to reopen this channel ID
	retain_future(
		(
			  # If we initiated the close, then the CloseOk comes from the server
			  $self->{closing}
			? Future->done
			: $self->send_frame(
				Net::AMQP::Frame::Method->new(
					method_frame => Net::AMQP::Protocol::Channel::CloseOk->new(
					)
				)
			)
		)->on_ready(sub {
			# Any remaining consumers need to be cancelled at this point
			$self->bus->invoke_event(
				'cancel',
				ctag => $_,
			) for keys %{$self->{consumer_tags}};
			$self->{consumer_tags} = {};

			$_->fail('channel closed') for grep !$_->is_ready, map $_->[1], @{$self->{published}};
			$self->{published} = [];

			# It's important that the MQ instance knows
			# about the channel closure first before we
			# go ahead and dispatch events, since any
			# subscribed handlers might go ahead and
			# attempt to open the channel again immediately.

			$self->amqp->channel_closed($self->id);
			$self->bus->invoke_event(
				'close',
				code => $frame->reply_code,
				reason => $frame->reply_text,
			);
			Future->done
		})
	)
}

=head2 send_frame

Proxy frame sending requests to the parent
L<Net::Async::AMQP> instance.

=cut

sub send_frame {
	my $self = shift;
	$self->amqp->send_frame(
		@_,
		channel => $self->id,
	)
}

=head2 close

Ask the server to close this channel.

Returns a L<Future> which will resolve with the
channel instance once the operation is complete.

 $ch->close(
  code => 404,
  text => 'something went wrong',
 ) ==> $ch

=cut

sub close {
	my $self = shift;
	my %args = @_;
	$self->debug_printf("Close channel %d", $self->id);

	# There's a slight chance we'll get called after being
	# removed from the loop, since we wanted to close anyway then
	# don't treat that as an error
	return Future->done if $self->{closing} or !$self->loop;

	$self->{closing} = 1;
	$self->{future} = Future->fail('closing');

	my $f = $self->loop->new_future->set_label("Close channel " . $self->id);
	my $frame = Net::AMQP::Frame::Method->new(
		method_frame => Net::AMQP::Protocol::Channel::Close->new(
			reply_code  => $args{code} // 404,
			reply_text  => $args{text} // 'closing',
		)
	);
	$self->push_pending(
		'Channel::CloseOk' => [ $f, $self ],
	);
	$self->closure_protection($f);
	$self->send_frame($frame);
	return $f;
}

=head2 push_pending

=cut

sub push_pending {
	my $self = shift;
	while(@_) {
		my ($type, $code) = splice @_, 0, 2;
		push @{$self->{pending}{$type}}, $code;
	}
	return $self;
}

=head2 remove_pending

Removes a coderef from the pending event handler.

Returns C< $self >.

=cut

sub remove_pending {
	my $self = shift;
	while(@_) {
		my ($type, $code) = splice @_, 0, 2;
		# This is the same as extract_by { $_ eq $code } @{$self->{pending}{$type}};,
		# but since we'll be calling it a lot might as well do it inline:
		splice
			@{$self->{pending}{$type}},
			$_,
			1 for grep {
				$self->{pending}{$type}[$_] eq $code
			} reverse 0..$#{$self->{pending}{$type}};
	}
	return $self;
}

=head2 next_pending

Retrieves the next pending handler for the given incoming frame type (see L<Net::Async::AMQP::Utils/amqp_frame_type>),
and calls it.

Takes the following parameters:

=over 4

=item * $frame - the frame itself

=back

Returns $self.

=cut

sub next_pending {
	my ($self, $frame) = @_;

	# First part of a frame. There's more to come, so stash a new future
	# and return.
	if($frame->isa('Net::AMQP::Frame::Header')) {
		# Properties are available directly from the header frame
		my $hdr_frame = $frame->header_frame;
		$self->{incoming_message}{type} = $hdr_frame->type;
		$self->{incoming_message}{properties} = {
			map {; $_ => scalar($hdr_frame->$_) } qw(
				content_type 
				content_encoding
				delivery_mode
				priority
				correlation_id
				reply_to
				expiration
				message_id
				timestamp
				user_id
				app_id
			)
		};
		if($frame->header_frame->headers) {
			eval {
				#$self->{incoming_message}{type} = $frame->header_frame->headers->{type}
				#	if exists $frame->header_frame->headers->{type};
				# Shallow copy for local storage
				$self->{incoming_message}{headers} = { %{$frame->header_frame->headers} };
				1
			} or $self->debug_printf("Unexpected exception while doing something: %s", $@);
		} else {
			$self->{incoming_message}{headers} = {};
		}

		if($frame->body_size) {
			# Stash the size so we can do some basic validation on the payload frames
			$self->{incoming_message}{pending} = $frame->body_size;
		} else {
			# Messages may be empty - in this case we'd have no body frames at all, we're done already:
			$self->deliver_current_message;
		}

		return $self;
	}

	# Body part of an incoming message.
	if($frame->isa('Net::AMQP::Frame::Body')) {
		my $bytes = $frame->payload;
		$self->{incoming_message}{payload} .= $bytes;
		$self->{incoming_message}{pending} -= length $bytes;
		if($self->{incoming_message}{pending} > 0) {
			# We still have more to come, just return for now
			return $self;
		} elsif($self->{incoming_message}{pending} < 0) {
			$self->close(
				code => 500,
				text => -$self->{incoming_message}{pending} . ' excess payload bytes detected in delivery'
			);
			delete $self->{incoming_message};
			return $self;
		} else {
			# We have a full message now - hand it over to the event bus
			$self->deliver_current_message;
			return $self;
		}
	}

	return $self unless $frame->can('method_frame') && (my $method_frame = $frame->method_frame);
	my $type = amqp_frame_type($frame);
	if($type eq 'Basic::ConsumeOk') {
		my $ctag = $method_frame->consumer_tag;
		$self->{consumer_tags}{$ctag} = 1;
	} elsif($type eq 'Basic::Cancel' or $type eq 'Basic::CancelOk') {
		my ($ctag) = ($method_frame->consumer_tag);
		$self->debug_printf("Cancel $ctag");
		$self->bus->invoke_event(
			'cancel',
			ctag => $ctag,
		);
		# Also raise this as a "listener_stop"
		# event, for managed channels
		$self->bus->invoke_event(
			listener_stop => $ctag
		);
		delete $self->{consumer_tags}{$ctag};
	}

	# Message delivery, part 3: The "Deliver" message.
	# This is actually where we start.
	if($type eq 'Basic::Deliver') {
		$self->debug_printf("Already have incoming_message?") if exists $self->{incoming_message};
		$self->{incoming_message} = {
			ctag => $method_frame->consumer_tag,
			dtag => $method_frame->delivery_tag,
			rkey => $method_frame->routing_key,
			payload => '',
			payload_size => undef,
		};
		return $self;
	}

	if($type eq 'Channel::Close') {
		$self->debug_printf(
			"Channel was %d, calling close - code %d, text '%s', class:method %d:%d",
			$frame->channel,
			$method_frame->reply_code,
			$method_frame->reply_text,
			$method_frame->class_id,
			$method_frame->method_id,
		);
		$self->on_close(
			$method_frame
		);
		return $self;
	}

	# Confirm mode => mark pending task as done
	if($type eq 'Confirm::SelectOk') {
		$self->debug_printf("Confirm mode enabled");
		$self->{confirm_mode}->done;
		return $self;
	} elsif($type eq 'Basic::Ack') {
		shift @{$self->{pending}{'Basic::Return'} || []};
		eval {
			my @msg = $self->extract_published($method_frame->delivery_tag, $method_frame->multiple);
			$self->debug_printf("received ack for %d messages", 0 + @msg);
			$_->done for grep !$_->is_ready, map $_->[1], @msg;
			1
		} or do {
			my $err = $@;
			$self->debug_printf("error retrieving messages for ack - %s", $err);
			$self->close(
				code => 406,
				text => $err
			);
		};
		return $self;
	} elsif($type eq 'Basic::Nack') {
		shift @{$self->{pending}{'Basic::Return'} || []};
		eval {
			my @msg = $self->extract_published($method_frame->delivery_tag, $method_frame->multiple);
			$self->debug_printf("received nack for %d messages", 0 + @msg);
			$_->fail('nack') for grep !$_->is_ready, map $_->[1], @msg;
			1
		} or do {
			my $err = $@;
			$self->debug_printf("error retrieving messages for nack - %s", $err);
			$self->close(
				code => 406,
				text => $err
			);
		};
		return $self;
	} elsif($type eq 'Basic::Return') {
		# Basic::Return would always be for the first unacked message in our publish
		# queue... except when we don't have publisher confirms, in which case... uh...
		# okay in that case we'd just raise an event maybe
		if($self->{confirm_mode}) {
			$self->debug_printf("basic::return in confirm mode");
			my $f = $self->{published}[0][1];
			$f->fail(
				$method_frame->reply_text,
				code     => $method_frame->reply_code,
				exchange => $method_frame->exchange,
				rkey     => $method_frame->routing_key
			) if $f && !$f->is_ready;
		} else {
			$self->debug_printf("basic::return in normal mode");
			$self->bus->invoke_event(
				return => $method_frame->reply_text,
				code     => $method_frame->reply_code,
				exchange => $method_frame->exchange,
				rkey     => $method_frame->routing_key
			)
		}
		return $self;
	}

	if(my $next = shift @{$self->{pending}{$type} || []}) {
		# We have a registered handler for this frame type. This usually
		# means that we've sent a message and are awaiting a response.
		if(ref($next) eq 'ARRAY') {
			my ($f, @args) = @$next;
			$f->done(@args) unless $f->is_ready;
		} else {
			$next->($self, $frame, @_);
		}
		return $self;
	}

	# It's quite possible we'll see unsolicited frames back from
	# the server. We don't expect many so report them when in debug mode.
	$self->debug_printf("We had no pending handlers for [%s]", $type);
	return $self;
}

sub deliver_current_message {
	my $self = shift;
	$self->bus->invoke_event(
		message => @{$self->{incoming_message}}{qw(type payload ctag dtag rkey headers properties)},
	);
	delete $self->{incoming_message};
}

sub extract_published {
	my ($self, $dtag, $multiple) = @_;
	if($multiple) {
		my @msg;
		while(@{$self->{published}}) {
			my $msg = shift @{$self->{published}};
			# Nonzero dtag means "up to and including this message"
			die 'ack for dtag ' . $dtag . ' but our earliest message is ' . $msg->[0] if $dtag && $dtag < $msg->[0];

			# with dtag=0 that's "everything you've got". this is fundamentally
			# flawed since the server may not have even received the most recent
			# items... might work if the server *always* uses dtag=0, I guess
			push @msg, $msg;
			last if $dtag == $msg->[0];
		}
		return @msg;
	}
	# Single-ack *probably* handles things in order, but the spec does not seem
	# to mandate this - better to be safe
	for my $idx (0..$#{$self->{published}}) {
		return splice @{$self->{published}}, $idx, 1 if $self->{published}[$idx][0] == $dtag;
	}
	die 'ack for dtag ' . $dtag . ' but not found in our pending list (we had ' . @{$self->{published}} . ' pending messages)';
}

=head1 METHODS - Accessors

=cut

=head2 amqp

The parent L<Net::Async::AMQP> instance.

=cut

sub amqp { shift->{amqp} }

=head2 bus

Event bus. Used for sharing channel-specific events.

=cut

sub bus { $_[0]->{bus} ||= Mixin::Event::Dispatch::Bus->new }

=head2 future

The underlying L<Future> for this channel.

Will resolve to the L<Net::Async::Channel> instance once the channel is open.

=cut

sub future { shift->{future} }

=head2 id

This channel ID.

=cut

sub id {
	my $self = shift;
	return $self->{id} unless @_;
	$self->{id} = shift;
	$self
}

sub as_string {
	my $self = shift;
	sprintf "Channel[%d]", $self->id;
}

=head2 closed

Returns true if the channel has been closed, 1 if not (which could mean it is either not yet open,
or that it is open and has not yet been closed by either side).

=cut

sub is_closed { shift->{is_closed} }

=head2 closure_protection

Helper method for marking any outstanding requests as failed when the channel closes.

Takes a L<Future>, returns a L<Future> (probably the same one).

=cut

sub closure_protection {
	my ($self, $f) = @_;
	unless($f) {
		$self->debug_printf("Closure protection requested on channel %d for future which has already disappeared", $self->id);
		return Future->fail(closed => 'future has already been released');
	}

	# No sense in proceeding if the Future has already completed
	if($f->is_ready) {
		$self->debug_printf("Closure protection requested for future %s on channel %d which has already compelted", $f->label, $self->id);
		return $f;
	}

	my $id = $self->id;
	my @ev;
	my $bus = $self->bus;
	$bus->subscribe_to_event(
		@ev = (close => sub {
			my ($ev, %args) = @_;
			$self->debug_printf("Closure protection engaging for %s on channel %d, code %s, reason: %s", ($f ? $f->label : "(future which no longer exists)"), $id, $args{code} // '(none)', $args{reason});
			if($f) {
				$f->fail($args{reason}, 'amqp', $args{code}) unless $f->is_ready;
			} else {
				$self->debug_printf("Future has disappeared already, not marking as failed");
			}
			# We should have unsubscribed already, but do this just in case.
			splice @ev;
			eval { $ev->unsubscribe; };
		})
	);

	# Use return value from ->on_ready, since we may clear $f immediately if the future is already
	# marked as ready.
	$f->on_ready(sub {
		$self->debug_printf("Future %s on channel %d is ready, disengaging closure protection", ($f ? $f->label : "(future which no longer exists)"), $id);
		eval { $bus->unsubscribe_from_event(splice @ev); };
		undef $f;
	});
}

1;

__END__

=head1 AUTHOR

Tom Molesworth <TEAM@cpan.org>

=head1 LICENSE

Licensed under the same terms as Perl itself, with additional licensing
terms for the MQ spec to be found in C<share/amqp0-9-1.extended.xml>
('a worldwide, perpetual, royalty-free, nontransferable, nonexclusive
license to (i) copy, display, distribute and implement the Advanced
Messaging Queue Protocol ("AMQP") Specification').


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.