Group
Extension

AnyMQ-RawSocket/lib/AnyMQ/Trait/RawSocket.pm

package AnyMQ::Trait::RawSocket;

use Any::Moose 'Role';

use AnyEvent::Socket;
use AnyEvent::Handle;
use Carp qw/croak/;

has 'address' => ( is => 'rw', isa => 'Str' );

has 'allow_wildcard_subscription' => ( is => 'rw', isa => 'Bool' );

has 'server_socket' => (
    is => 'rw',
    lazy_build => 1,
);

has 'connections' => (
    is => 'rw',
    isa => 'ArrayRef[AnyEvent::Handle]',
    traits  => ['Array'],
    default => sub { [] },
    handles => {
        all_connections => 'elements',
        add_connection  => 'push',
    },
);

sub BUILD {}; after 'BUILD' => sub {
    my ($self) = @_;

    # start listening
    $self->server_socket;
};

sub _build_server_socket {
    my ($self) = @_;

    my ($address, $port) = parse_hostport($self->address)
        or croak 'subscribe_address must be defined to subscribe to messages';
    
    my $server = tcp_server $address, $port, sub {
        my ($fh, $rhost, $rport) = @_;

        my $h; $h = new AnyEvent::Handle
           fh => $fh,
           on_read => sub {
               $h->push_read(json => sub {
                   my (undef, $evt) = @_;

                   $self->handle_event($evt);
               });
           },
           on_error => sub {
               my (undef, $fatal, $msg) = @_;
               AE::log error => $msg;
               $self->remove_connection($h);
               $h->destroy;
               undef $h;
           };

        $self->add_connection($h);
    };

    warn "Now listening on $address, port $port\n";

    return $server;
}

sub handle_event {
    my ($self, $evt) = @_;
    
    # find event topic
    my $topic_name = $evt->{Type} || $evt->{type} || $evt->{name} || $evt->{topic} || '*';
    
    $self->topic($topic_name)->publish($evt);
    
    if ($topic_name ne '*') {
        # publish to '*' topic too
        
        $self->topic('*')->publish($evt)
            if $self->allow_wildcard_subscription;
    }
}

sub new_topic {
    my ($self, $opt) = @_;

    # name of topic to subscribe to, passed in
    $opt = { name => $opt } unless ref $opt;

    # use our topic role
    AnyMQ::Topic->new_with_traits(
        %$opt,
        traits => [ 'RawSocket' ],
        bus => $self,
    );
}

sub remove_connection {
    my ($self, $h) = @_;

    $self->connections([ grep { $_ != $h } $self->all_connections ]);
}

sub DEMOLISH {}; after 'DEMOLISH' => sub {
    my $self = shift;
    my ($igd) = @_;

    return if $igd;

    # cleanup
};

1;


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.