Group
Extension

Padre-Plugin-Swarm/lib/Padre/Plugin/Swarm/Transport/Global.pm

package Padre::Plugin::Swarm::Transport::Global;
use strict;
use warnings;
use Carp 'confess';
use Padre::Logger;
use Data::Dumper;
use base qw( Object::Event );
use Padre::Swarm::Message;
use AnyEvent::Socket;
use AnyEvent::Handle;
use Scalar::Util 'blessed';
use JSON;

our $VERSION = '0.2';

sub new {
    my $class = shift;
    my $self = $class->SUPER::new(@_);
    $self->reg_cb( 'start_session' => \&start_session );
    return $self;
}

sub enable {
    my  $self = shift;
    my $g = tcp_connect $self->{host} , $self->{port},
        sub { $self->event( 'start_session', shift) };
    $self->{g} = $g;
}

sub start_session {
    my ($self,$fh) = @_;
    unless ($fh) {
        $self->event('disconnect','Connection failed ' . $!);
        return;   
    }
    my $h = AnyEvent::Handle->new(
        fh => $fh,
        json => $self->_marshal,
        on_eof => sub { $self->event('disconnect', shift ) },
    );
    
    
    # now we register our own disconnect handler for teardown;
    $self->reg_cb('disconnect', \&disconnect );
    
    $self->{h} = $h;
    $h->push_write( json => { trustme=>$$.rand() } );
    $h->push_read( json => sub { $self->event( 'see_auth' , @_ ) } );
    $self->reg_cb( 'see_auth' , \&see_auth );
    
}


sub disconnect {
    my $self = shift;

    if ($self->{h}) {
        $self->{h}->destroy;
        delete $self->{h};
    }
    delete $self->{chirp};
    
    $self->unreg_me;
    
}


sub see_auth {
    my $self = shift;
    my $handle = shift;
    my $message = shift;
    $self->unreg_cb('start_session');
    $self->{h} = $handle;
    $self->{token} = $message->{token};
    if ( $message->{session} eq 'authorized' ) {
        $self->{h}->on_read( sub {
                shift->push_read( json => sub { $self->event('recv',$_[1]) } );
            }
        );
        $self->event('connect'=>$self->{token} );
        # this is hideous but works for me
        # timer pushes some data to the socket every so often to convince
        # firewalls "I really DO want this connection - OK!"
        my $chirp = AnyEvent->timer(
            after => 60,
            interval => 300,
            cb => sub { $self->send( {type=>'noop'}) }
        );
        $self->{chirp} = $chirp;
    }
    else {
        $self->{h}->destroy;
        delete $self->{h};      
        $self->event('disconnect','Authorization failed');
        
    }
}

sub send {
    my $self = shift;
    my $message = shift;
    if ( threads::shared::is_shared( $message ) ) {
        TRACE( "SEND A SHARED REFERENCE ?!?!?! - " . Dumper $message );
        confess "$message , is a shared value";
    }
    $message->{token} = $self->{token};
    $self->{h}->push_write( json => $message );
    # implement our own loopback ?
    # nasty but fake what the deserializing marshal _would_ do.
    unless ( blessed $message ) {
        bless $message, 'Padre::Swarm::Message';
    }
    $self->event('recv', $message );
    
}

sub _marshal {
    JSON->new
        ->allow_blessed
        ->convert_blessed
        ->utf8
        ->filter_json_object(\&synthetic_class );
}


sub synthetic_class {
    my $var = shift ;
    if ( exists $var->{__origin_class} ) {
        my $stub = $var->{__origin_class};
        my $msg_class = 'Padre::Swarm::Message::' . $stub;
        my $instance = bless $var , $msg_class;
        return $instance;
    } else {
        return bless $var , 'Padre::Swarm::Message';
    }
};


1;


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.