Group
Extension

AnyMQ-ZeroMQ/lib/AnyMQ/Trait/ZeroMQ.pm

package AnyMQ::Trait::ZeroMQ;

use Any::Moose 'Role';

use AnyEvent::ZeroMQ;
use AnyEvent::ZeroMQ::Publish;
use AnyEvent::ZeroMQ::Subscribe;
use AnyMQ::Topic::Trait::ZeroMQ;
use Carp qw/croak/;
use JSON;

has 'publish_address'   => ( is => 'rw', isa => 'Str' );
has 'subscribe_address' => ( is => 'rw', isa => 'Str' );

has '_zmq_sub' => ( is => 'rw', lazy_build => 1, isa => 'AnyEvent::ZeroMQ::Subscribe' );
has '_zmq_pub' => ( is => 'rw', lazy_build => 1, isa => 'AnyEvent::ZeroMQ::Publish' );
has '_zmq_context' => ( is => 'rw', lazy_build => 1, isa => 'ZeroMQ::Raw::Context' );
has '_zmq_json' => ( is => 'rw', lazy_build => 1, isa => 'JSON' );

# topic => [ callbacks ]
has 'subscriptions' => (
    traits     => ['Hash'],
    is         => 'ro',
    isa        => 'HashRef[ArrayRef[CodeRef]]',
    default    => sub { {} },
    handles    => {
        subscription_topics => 'keys',
    },
);        

sub _build__zmq_json {
    my ($self) = @_;
    return JSON->new->utf8;
}

sub _build__zmq_context {
    my ($self) = @_;

    my $c = ZeroMQ::Raw::Context->new( threads => 10 );
    return $c;
}

sub _build__zmq_sub {
    my ($self) = @_;

    my $address = $self->subscribe_address
        or croak 'subscribe_address must be defined to publish messages';

    my $sub = AnyEvent::ZeroMQ::Subscribe->new(
        context => $self->_zmq_context,
        connect => $address,
    );

    $sub->on_read(sub { $self->read_event(@_) });

    return $sub;
}

sub _build__zmq_pub {
    my ($self) = @_;

    my $address = $self->publish_address
        or croak 'publish_address must be defined to publish messages';

    my $pub = AnyEvent::ZeroMQ::Publish->new(
        context => $self->_zmq_context,
        connect => $address,
    );

    return $pub;
}

# called when we read some data
sub read_event {
    my ($self, $subscription, $json) = @_;

    my $event = $self->_zmq_json->decode($json);

    unless ($event) {
        warn "Got invalid JSON: $json";
        return;
    }

    my $topic = $event->{type};
    unless ($topic) {
        warn "Got event with no topic type\n";
    }

    # call event handler callbacks
    my $cbs = $self->subscriptions->{$topic};

    unless ($cbs && @$cbs) {
        #warn "Got event $topic but no callbacks found\n";
        return;
    }
    
    foreach my $cb (@$cbs) {
        $cb->($event);
    }
}

# calls $cb when we receive a $topic event
# returns ref that can be passed to unsubscribe()
sub subscribe {
    my ($self, $topic, $cb) = @_;

    # make sure subscriber bus exists
    $self->_zmq_sub;
    
    # undef or '' means "all topics"
    $topic ||= '';

    $self->subscriptions->{$topic} ||= [];
    my $cbs = $self->subscriptions->{$topic};

    push @$cbs, $cb;

    $self->update_topic_subscriptions;

    # for now just use $cb as ref, should generate some unique
    # id in the future
    my $ref = $cb;
    return $ref;
}

# this controls what events we are subscribed to in ZMQ
sub update_topic_subscriptions {
    my ($self) = @_;

    my @topics = $self->subscription_topics;
    
    # update list of topics we are subscribed to
    # FIXME: use trait::topics
    # $self->_zmq_sub->topics(\@topics);
}

sub unsubscribe {
    my ($self, $topic, $ref) = @_;

    $self->subscriptions->{$topic} ||= [];
    my $cbs = $self->subscriptions->{$topic};
    $cbs = [ grep { $_ != $ref } @$cbs ];

    $self->update_topic_subscriptions;
}

sub new_topic {
    my ($self, $opt) = @_;
    
    $opt = { name => $opt } unless ref $opt;

    return AnyMQ::Topic->new_with_traits(
        traits => [ 'ZeroMQ' ],
        %$opt,
        bus => $self,
    );
}

sub DEMOLISH {}; after 'DEMOLISH' => sub {
    my $self = shift;
    my ($igd) = @_;

    return if $igd;

    # cleanup
};

1;


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.