Group
Extension

Net-WAMP/lib/Net/WAMP/RawSocket.pm

package Net::WAMP::RawSocket;

=encoding utf-8

=head1 NAME

Net::WAMP::RawSocket

=head1 SYNOPSIS

Client:

    my $rs = Net::WAMP::RawSocket::Client->new(

        #required
        io => IO::Framed::ReadWrite->new( $inet ),

        #optional
        max_pings           => 10,
        max_receive_length  => 2**23,    #default
    );

    #msgpack is also accepted
    $rs->send_handshake(
        serialization => 'json',    #default
    );

    $rs->verify_handshake();

    $rs->send_message('This is a message.);

    my $msg_txt = $rs->get_next_message();

Server:

    my $rs = Net::WAMP::RawSocket::Server->new(

        #required
        io => IO::Framed::ReadWrite->new( $inet ),

        #optional
        max_pings           => 10,
        max_receive_length  => 2**23,    #default
    );

    $rs->receive_and_answer_handshake();

    $rs->send_message('This is a message.);

    my $msg_txt = $rs->get_next_message();

=head1 DESCRIPTION

This module implements WAMP’s
L<RawSocket|http://wamp-proto.org/static/rfc/draft-oberstet-hybi-crossbar-wamp.html#rfc.section.14.5.3.1>
protocol. It’s a simpler—and hopefully faster—protocol for speaking to a WAMP server
when you have a raw TCP connection as opposed to a web browser.

Note that one of RawSocket’s limitations is a
hard upper limit (16 MiB) on message size: if you want to
send or receive
single messages of over 16 MiB, you’ll need some other transport mechanism
besides RawSocket.

=head1 GENERAL METHODS

=head2 I<CLASS>->new( %OPTS )

Instantiates the relevant class. %OPTS are:

=over

=item * C<max_receive_length> As per the protocol specification, this must be
a power of 2 from 512 (2**9) to 16,777,216 (2**24), inclusive.

=item * C<max_pings> The number of pings to allow unanswered before we
give up on the connection.

=back

=head2 I<OBJ>->send_message( MSG_STRING )

Sends a regular message.

=head2 I<OBJ>->get_next_message()

Returns the next message string, or undef if no message is available.
This will also (silently) consume any PONG messages that may arrive.

=head2 I<OBJ>->check_heartbeat()

Run this when your read timeout expires to send a PING message.

=head2 I<OBJ>->get_serialization()

C<json> or C<msgpack>.

=head2 I<OBJ>->get_max_send_length()

The maximum bytes that the connection’s peer is willing to receive in
a single RawSocket frame.

=cut

use strict;
use warnings;

use Net::WAMP::RawSocket::Constants ();
use Net::WAMP::RawSocket::PingStore ();

use Net::WAMP::RawSocket::Message::Regular ();
use Net::WAMP::RawSocket::Message::Ping ();
use Net::WAMP::RawSocket::Message::Pong ();

use constant {
    MSG_TYPE_REGULAR => 0,
    MSG_TYPE_PING => 1,
    MSG_TYPE_PONG => 2,

    DEFAULT_MAX_PINGS => 10,
};

use constant REQUIRED_CONSTRUCTOR_OPTS => ('io');

use constant OTHER_CONSTRUCTOR_OPTS => (
    'max_pings',
    'max_receive_length',
);

sub new {
    my ($class, %opts) = @_;

    my @missing = grep { !exists $opts{$_} } $class->REQUIRED_CONSTRUCTOR_OPTS();
    die "Need [@missing]!" if @missing;

    my $self = {
        _max_pings => $class->DEFAULT_MAX_PINGS,
        _max_receive_length => Net::WAMP::RawSocket::Constants::MAX_MESSAGE_LENGTH(),

        (
            map { exists($opts{$_}) ? ("_$_" => $opts{$_}) : () }
            $class->REQUIRED_CONSTRUCTOR_OPTS(),
            $class->OTHER_CONSTRUCTOR_OPTS(),
        ),

        _ping_store => Net::WAMP::RawSocket::PingStore->new(),
    };

    $self->{'_max_receive_code'} = Net::WAMP::RawSocket::Constants::get_max_length_code( $self->{'_max_receive_length'} );

    return bless $self, $class;
}

sub send_message {
    my ($self) = @_;

    die 'Handshake not completed!' if !$self->{'_handshake_ok'};

    if (length($_[1]) > $self->{'_max_send_length'}) {
        die "Too long!";    #XXX
    }

    $self->_send_frame(MSG_TYPE_REGULAR, @_[ 1 .. $#_ ]);

    return;
}

my ($msg_type_code, $msg_class_cr, $msg_size, $len1, $len2, $msg_body_r);

sub get_next_message {
    my ($self) = @_;

  MESSAGE: {

        #i.e., we were in the middle of reading:
        if ($self->{'_msg_size'}) {
            ($msg_class_cr, $msg_type_code, $msg_size) = @{$self}{ '_msg_class_cr', '_msg_type_code', '_msg_size' };
        }
        else {
            my $hdr = $self->_read_header() or return undef;

            ( $msg_type_code, $len1, $len2 ) = unpack 'CCn', $hdr;

            $msg_size = ($len1 << 16) | $len2;  #“|” is much faster than +

            if ($msg_size > $self->{'_max_receive_length'}) {
                die Net::WAMP::X->create('RawSocket::ReceivedTooBig');  #XXX
            }
        }

        $msg_body_r = \$self->{'_io'}->read($msg_size);

        #print STDERR "received-rs/$$ $$msg_body_r\n";

        if ($msg_type_code == MSG_TYPE_REGULAR()) {

            #Partial reads should (?) be very rare, so not
            #bothering to optimize for now.
            if (!$msg_size || length $$msg_body_r) {
                $self->{'_msg_size'} = 0;

                #It’s a bit less-than-tidy to commingle the “endpoint”
                #behavior (e.g., ping/pong) with the message parsing,
                #but it can be refactored later if that’s an issue.

                return bless $msg_body_r, 'Net::WAMP::RawSocket::Message::Regular';
            }
            else {
                @{$self}{ '_msg_class_cr', '_msg_type_code', '_msg_size' } = (
                    $msg_class_cr,
                    $msg_type_code,
                    $msg_size,
                );
            }

            return undef;
        }
        elsif ($msg_type_code == MSG_TYPE_PING()) {
            $self->_send_frame(MSG_TYPE_PONG, $$msg_body_r);
            redo MESSAGE;
        }
        elsif ($msg_type_code == MSG_TYPE_PONG()) {
            $self->{'_ping_store'}->remove($$msg_body_r);
            redo MESSAGE;
        }
        else {
            die "Huh?? Unknown message type: [$msg_type_code]";
        }
    }

    #Empty message …
    return bless \do { my $v = q<> }, 'Net::WAMP::RawSocket::Message::Regular';
}

sub check_heartbeat {
    my ($self) = @_;

    my $ping_counter = $self->{'_ping_store'}->get_count();
    if ( $ping_counter == $self->{'_max_pings'} ) {
        $self->shutdown();
        return 0;
    }

    my $text = $self->{'_ping_store'}->add();

    $self->_send_frame(MSG_TYPE_PING, $text);

    return 1;
}

sub get_serialization {
    return $_[0]->{'_serialization'};
}

sub get_max_send_length {
    return $_[0]->{'_max_send_length'};
}

sub get_max_receive_length {
    return $_[0]->{'_max_receive_length'};
}

#----------------------------------------------------------------------

sub _set_handshake_done {
    $_[0]->{'_handshake_ok'} = 1;

    return;
}

sub _send_frame {
    my ($self, $type_num) = @_;

    #print STDERR "rs-write/$$: $_[2]\n";

    _prefix_header( $type_num, $_[2] );

    $self->_send_bytes( @_[ 2 .. $#_ ] );

    return;
}

sub _read_header {
    my ($self) = @_;

    return $self->{'_io'}->read(
        Net::WAMP::RawSocket::Constants::HEADER_LENGTH(),
    );
}

sub _send_bytes {
    my ($self) = @_;    #bytes, callback

    $self->{'_io'}->write(@_[1 .. $#_]);

    return;
}

sub _get_and_unpack_handshake_header {
    my ($self) = @_;

    my $recv_hdr = $self->_read_header();

    if ($recv_hdr) {
        my ($octet1, $octet2, $reserved) = unpack 'CCa2', $recv_hdr;

        if ($octet1 ne Net::WAMP::RawSocket::Constants::MAGIC_FIRST_OCTET()) {
            die sprintf("Invalid first octet (header = %v.02x)!", $recv_hdr);
        }

        if ($reserved ne "\0\0") {
            die sprintf("Unsupported feature (reserved = %v.02x)", $reserved);
        }

        $self->{'_max_send_length'} = Net::WAMP::RawSocket::Constants::get_max_length_value($octet2 >> 4);

        my $recv_serializer_code = ($octet2 & 0xf);
        my $ser_name = Net::WAMP::RawSocket::Constants::get_serialization_name($recv_serializer_code);

        return( $octet2, $ser_name, $recv_serializer_code );
    }

    return undef;
}

#----------------------------------------------------------------------

#In the interest of saving memory, this alters the passed-in string.
#TODO: Deduplicate
sub _prefix_header {
    my ($type_code) = @_;    #bytes is the third parameter

    substr(
        $_[1],
        0, 0,
        pack(
            'CCn',
            $type_code,
            (length($_[1]) >> 16),
            (length($_[1]) & 0xffff),
        ),
    );

    return;
}

1;


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.