Group
Extension

Padre-Plugin-Swarm/lib/Padre/Plugin/Swarm/Transport/Local.pm

package Padre::Plugin::Swarm::Transport::Local;
use strict;
use warnings;
use Carp 'confess';
use Padre::Logger;
use Data::Dumper;
use base qw( Object::Event );
use AnyEvent::Handle;
use IO::Socket::Multicast;
use Padre::Swarm::Message;
use JSON;
our $VERSION = '0.2';

=pod

=head1 NAME

Padre::Plugin::Swarm::Transport::Local - Multicast swarm message bus

=head1 DESCRIPTION

=head1 SYNOPSIS

    my $t = Padre::Plugin::Swarm::Transport::Local->new();
    $t->reg_cb('connect' , sub { printf "Transport %s connected", shift } );
    $t->reg_cb('recv', \&incoming_message );
    $t->reg_cb('disconnect', sub { warn "Disconnected" } );
    
    $t->enable;

=cut 

sub new {
    my $class = shift;
    my $self = $class->SUPER::new(@_);
    $self->{token} = $$.rand();
    return $self;
}

sub enable {
    my  $self = shift;
    
    my $m = IO::Socket::Multicast->new(
                LocalPort => 12000,
                ReuseAddr => 1,
    ) or die $!;
    
    $m->mcast_add('239.255.255.1'); #should have the interface
    $m->mcast_loopback( 1 );
    
    $self->{m} = $m;
    $self->{io} = AnyEvent->io(
        fh => $m,
        poll => 'r',
        cb => sub {
             $self->event('readable') 
        }
    );
    $self->reg_cb( 'readable' , \&readable );
    $self->reg_cb('disconnect', \&disconnect );
    $self->event('connect',$self->{token} );
    
    return;
}

sub send {
    my $self = shift;
    my $message = shift;
    
    if ( threads::shared::is_shared( $message ) ) {
        TRACE( "SEND A SHARED REFERENCE ?!?!?! - " . Dumper $message );
        confess "$message , is a shared value";    
    }    

    $message->{token} = $self->{token};
    my $data = eval { $self->_marshal->encode($message) };
    if ($data) {
        $self->{m}->mcast_send(
            $data, '239.255.255.1:12000'
        );
    }
}

sub readable {
    my $self = shift;
    my $data;
    unless ( $self->{m} ) {
        TRACE( 'Multicast handle has gone away!' );
        return;
    }
    $self->{m}->recv($data,65535);
    my $message = eval{ $self->_marshal->decode($data) };
    if ( $message ) {
        $self->event('recv', $message);
    }
    
}


sub disconnect {
    my $self = shift;
    if ( $self->{io} ) {
        delete $self->{io};
        my $m = delete $self->{m};
        $m->mcast_drop('239.255.255.1');
    }
    $self->unreg_me;
}

sub _marshal {
    JSON->new
        ->allow_blessed
        ->convert_blessed
        ->utf8
        ->filter_json_object(\&synthetic_class );
}


sub synthetic_class {
    my $var = shift ;
    if ( exists $var->{__origin_class} ) {
        my $stub = $var->{__origin_class};
        my $msg_class = 'Padre::Swarm::Message::' . $stub;
        my $instance = bless $var , $msg_class;
        return $instance;
    } else {
        return bless $var , 'Padre::Swarm::Message';
    }
};


1;


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.