package ZeroMQ::PubSub;
use Moose;
use ZMQ::LibZMQ2;
use JSON;
use namespace::autoclean;
with 'MooseX::Callbacks';
has 'context' => (
is => 'rw',
isa => 'ZMQ::LibZMQ2::Context',
lazy_bu
e Moose;
extends 'ZeroMQ::PubSub';
use ZMQ::LibZMQ2;
use ZMQ::Constants ':all';
use JSON qw/encode_json decode_json/;
use Clone qw/clone/;
use Carp qw/croak/;
# socket to listen for client events
#
from JSON, or undef if failure.
=cut
sub recv {
my ($self) = @_;
my $msg = zmq_recv($self->publish_sock);
my $json_str = zmq_msg_data($msg);
my $json = eval { decode_json($json_str)
};
unless ($json) {
warn "Got invalid event: failed to parse JSON: $@";
return;
}
return $json;
}
=head2 broadcast($event)
Sends $event to all connected subscribers.
e Moose;
extends 'ZeroMQ::PubSub';
use ZMQ::LibZMQ2;
use ZMQ::Constants ':all';
use JSON qw/encode_json decode_json/;
use Carp qw/croak/;
use List::Util qw/shuffle/;
# should only be used internally
= zmq_recv($self->subscribe_sock);
my $msg_str = zmq_msg_data($msg_raw);
my $msg = decode_json($msg_str);
$self->dispatch_event($msg);
}
after 'subscribe' => sub {
my ($self, $evt, $c
re we're connected
$self->connect_publish_sock;
my $json_str = encode_json($msg);
my $res = zmq_send($self->publish_sock, $json_str);
$self->print_debug("Published $evt, res=$res");