Group
Extension

RPC-Lite/lib/RPC/Lite/Session.pm

package RPC::Lite::Session;

use strict;

use RPC::Lite;
use RPC::Lite::MessageQuantizer;

=pod

=head1 NAME

RPC::Lite::Session - Manages a client session.  Used internally.

=head1 SYNOPSIS

  use RPC::Lite::Session;

  my $session = RPC::Lite::Session->new( $uniqueClientId, $transport, $sessionManager, $extraInfoHash );

  my $request = $session->GetRequest();

=head1 DESCRIPTION

RPC::Lite::Session implements a session for managing clients which are connected
to an RPC::Lite::Server.  Sessions handle receiving requests and sending responses
to clients.  Sessions store information about the connection so that
multiple transports and serializers can be supported by a single server.

=over 12

=cut

sub Id               { $_[0]->{id}               = $_[1] if @_ > 1; $_[0]->{id} }
sub StartTime        { $_[0]->{starttime}        = $_[1] if @_ > 1; $_[0]->{starttime} }
sub SessionManager   { $_[0]->{sessionmanager}   = $_[1] if @_ > 1; $_[0]->{sessionmanager} }
sub SerializerType   { $_[0]->{serializertype}   = $_[1] if @_ > 1; $_[0]->{serializertype} }
sub Transport        { $_[0]->{transport}        = $_[1] if @_ > 1; $_[0]->{transport} }
sub Established      { $_[0]->{established}      = $_[1] if @_ > 1; $_[0]->{established} }
sub Disconnected     { $_[0]->{disconnected}     = $_[1] if @_ > 1; $_[0]->{disconnected} }
sub MessageQuantizer { $_[0]->{messagequantizer} = $_[1] if @_ > 1; $_[0]->{messagequantizer} }
sub Stream           { $_[0]->{stream}           = $_[1] if @_ > 1; $_[0]->{stream} }
sub MessageQueue     { $_[0]->{messagequeue}     = $_[1] if @_ > 1; $_[0]->{messagequeue} }

sub new
{
  my $class          = shift;
  my $id             = shift;
  my $transport      = shift;
  my $sessionManager = shift;
  my $extraInfo      = shift;

  my $self = {};
  bless $self, $class;

  $self->StartTime( $extraInfo->{StartTime} || time() );

  $self->Id( $id );
  $self->Transport( $transport );
  $self->SessionManager( $sessionManager );
  $self->SerializerType( undef );
  $self->Established( 0 );
  $self->Disconnected( 0 );
  $self->MessageQuantizer( RPC::Lite::MessageQuantizer->new() );
  $self->Stream( '' );
  $self->MessageQueue( [] );

  return $self;
}

sub Pump
{
  my $self = shift;
  
  if ( !$self->Established() )
  {
    $self->__EstablishSession();
    return;
  }
  
  # read any new data off the transport stream
  my $newData = $self->Transport->ReadData();

  if ( !defined( $newData ) )
  {
    $self->Disconnected( 1 );
    return;
  }
  
  $self->Stream( $self->Stream() . $newData );

  # now that we've read more data, try to process the stream again
  $self->__ProcessStream();
}

=pod

=item GetRequest()

Returns an RPC::Lite::Request object or undef if there is an error (such
as the client being disconnected).

=cut

sub GetRequest
{
  my $self = shift;

  return undef if $self->Disconnected();
  return undef if !$self->Established();

  my $message = shift @{ $self->MessageQueue };

  return defined( $message ) ? $self->SessionManager->Serializers->{ $self->SerializerType() }->Deserialize( $message ) : undef;
}

sub __EstablishSession
{
  my $self = shift;
  
  # should be the first message, and we'll assume it should all come over the wire

  my $data = $self->Transport->ReadData();

  if ( !defined( $data ) )
  {
    $self->Disconnected( 1 );
    return;
  }
  
  return undef if !length( $data );

  $self->Stream( $self->Stream() . $data );

  # it should come back as our one message here
  $self->__ProcessStream();

  # if we didn't get it in our queue, disconnect
  if ( !@{ $self->MessageQueue() } )
  {
    $self->Disconnected( 1 );
    return undef;
  }
  
  my $message = shift @{ $self->MessageQueue() };

  # handshake string examples:
  #
  #  RPC-Lite 1.0 / JSON 1.1
  #  RPC-Lite 2.2 / XML 3.2
  if ( $message !~ /^RPC-Lite (.*?) \/ (.*?) (.*?)$/ )
  {
    $self->Disconnected( 1 );
    return undef;
  }
  
  my $rpcLiteVersion = $1;
  my $serializerType = $2;
  my $serializerVersion = $3;

  # FIXME return some kind of error to the client about why it's being dropped?
  if (    !RPC::Lite::VersionSupported( $rpcLiteVersion )
       or !$self->SessionManager->__InitializeSerializer( $serializerType, $serializerVersion ) )
  {
    $self->Disconnected( 1 );
    return;
  }

  $self->SerializerType( $serializerType );
  $self->Established( 1 );

  return 1;
}

sub __ProcessStream
{
  my $self = shift;
  
  return undef if ( !length( $self->Stream ) );

  my $quantized = $self->MessageQuantizer->Quantize( $self->Stream );
  
  push( @{$self->MessageQueue}, @{ $quantized->{messages} } );
  
  $self->Stream( $quantized->{remainder} );

  return scalar( @{$self->MessageQueue} );
}

=pod

=item Write( $data )

Serializes (using this particular client's serializer preference) and writes
the data referenced by $data to the client.

=cut

sub Write
{
  my $self = shift;
  my $data = shift;

  my $serializedContent = $self->SessionManager->Serializers->{ $self->SerializerType() }->Serialize( $data );
  my $packedMessage = $self->MessageQuantizer->Pack( $serializedContent );
  return $self->Transport->WriteData( $packedMessage );
}

1;


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.