Group
Extension

tntcompat/lib/TntCompat/Server.pm

use utf8;
use strict;
use warnings;

package TntCompat::Server;

use Coro;
use Coro::AnyEvent;
use AnyEvent::Socket;
use TntCompat::Config;
use TntCompat::Debug;
use Coro::Handle;
use Errno qw(EAGAIN EINTR EINPROGRESS);
use TntCompat::Proto;
use TntCompat::Msgpack;
use MIME::Base64;
use Encode qw(decode encode);
use Data::Dumper;
use Carp;
use File::Spec::Functions 'catfile';

use TntCompat::Cat::SnapMsgpack;
use TntCompat::Cat::Snap;
use TntCompat::Cat::Xlog;
use JSON::XS;
use File::Basename 'basename';
use feature 'state';


sub strhex($) {
    my ($str) = @_;
    $str =~ s/./sprintf '.%02x', ord $&/ges;
    return $str;
}

sub new {
    my ($class, $cfgfile) = @_;
    my $cfg;
    if (ref $cfgfile) {
        $cfg = $cfgfile;
    } else {
        $cfg = TntCompat::Config->new($cfgfile) unless ref $cfgfile;
    }
    $cfg->_set_defaults;


    my $self = bless {
        cfg         => $cfg,
        lsn         => {},
        wbuf        => {},
        enable_wbuf => {},
    } => ref($class) || $class;

    $self->{skip} = { map {( $_ => 1 )} @{ $self->{cfg}->get('skip_spaces') }};


    my $server;
    DEBUGF 'Create server at %s:%s',
        $cfg->get('host') || '', $cfg->get('port') || '';
    $self->{guard} =
        tcp_server $cfg->get('host'), $cfg->get('port'), $self->on_connect;

    unless ($self->{guard}) {
        die $!;
    }
    

    return $self;
}


sub is_space_skip {
    my ($self, $space) = @_;
    return 0 unless exists $self->{skip}{$space};
    return 1;
}


sub host            { $_[0]->{cfg}->get('host') }
sub port            { $_[0]->{cfg}->get('port') }
sub wal_dir         { $_[0]->{cfg}->get('wal_dir') }
sub snap_dir        { $_[0]->{cfg}->get('snap_dir') }
sub bootstrap       { $_[0]->{cfg}->get('bootstrap_data') }
sub server_uuid     { $_[0]->{cfg}->get('server_uuid') }
sub cluster_uuid    { $_[0]->{cfg}->get('cluster_uuid') }


sub lsn {
    my ($self, $fd) = @_;
    $fd ||= 0;
    $fd = fileno $fd if ref $fd;
    $self->{lsn}{$fd} = 0 unless $self->{lsn}{$fd};
    $self->{lsn}{$fd}++;
    return $self->{lsn}{$fd};
}

sub clean_lsn {
    my ($self, $fd) = @_;
    $fd ||= 0;
    $fd = fileno $fd if ref $fd;
    $self->{lsn}{$fd} = 0;
}

sub schema      {
    my ($self, $sno) = @_;
    my $schema = $self->{cfg}->get('schema');
    return $schema unless @_ > 1;
    croak "Unknown space number: $sno" unless exists $schema->{$sno};
    return $schema->{$sno};
}

sub run {
    Coro::schedule;
}

sub on_connect {
    my ($self) = @_;
    sub {
        my ($fh, $host, $port) = @_;
        binmode $fh => ':raw';
        DEBUGF 'Connection from %s:%s', $host // '', $port // '';
        async { $self->downlink(Coro::Handle->new_from_fh($fh)); }
    }
}


sub read_request {
    my ($self, $fh) = @_;

    my $r = '';


    while($fh->readable) {
        my $len = 5;
        if (length $r) {

            my ($dlen, $doff) = msgunpack $r;
            if (defined $doff) {
                $len = $doff  + $dlen; # total len
                $len = $len - length $r;
            } else {
                $len = 5 - length $r;
                if ($len <= 0) {
                    DEBUGF 'Broken request length';
                    return;
                }
            }
        }

        my $rd = sysread $fh, $r, $len, length $r;
        if (defined $rd) {
            next unless $rd == $len;

            my ($dlen, $doff) = msgunpack $r;
            next unless $doff  + $dlen == length $r;
            return TntCompat::Proto::parse_response $r;
        } else {
            DEBUGF 'Error reading socket: %s', decode utf8 => $!;
            return;
        }
    }
}


sub send_response {
    my ($self, $fh, $resp) = @_;

    my $no = fileno $fh;
    if ($self->{enable_wbuf}{$no}) {

        $self->{wbuf}{$no} //= '';
        if (length $self->{wbuf}{$no} < 50_000) {
            $self->{wbuf}{$no} .= $resp;
            return 1;
        }
    }

    $resp = delete($self->{wbuf}{$no}) . $resp if defined $self->{wbuf}{$no};

    while($fh->writable and length $resp) {
        my $rc = syswrite $fh, $resp;
        unless (defined $rc) {
            DEBUGF 'Can not send response: %s', decode utf8 => $!;
            last;
        }

        return 1 if $rc == length $resp;
        substr $resp, 0, $rc, '';
    }
    return 0;
}


sub enable_wbuf {
    my ($self, $fh) = @_;
    my $no = fileno $fh;

    $self->{enable_wbuf}{$no} = 1;
}

sub disable_wbuf {
    my ($self, $fh) = @_;
    my $no = fileno $fh;
    delete $self->{enable_wbuf}{$no};
}


sub ping {
    my ($self, $fh, $request) = @_;
    # pong
    my $pkt = TntCompat::Proto::make_response($request->{SYNC}, 0);
    if ($self->send_response($fh, $pkt)) {
        DEBUGF 'Client <- pong';
        return 1;
    }
    return 0;
}

sub auth {
    my ($self, $fh, $request, $salt) = @_;

    my ($retval, $code, $message) = (-1, 0x2000_0000 | 42, 'Auth error');


    unless (ref($request->{TUPLE}) eq 'ARRAY') {
        $code = 0x2000_0000 | 22;
        $message = 'Tuple/Key must be MsgPack array';
        goto ERROR;
    }

    unless ($request->{TUPLE}[0] and $request->{TUPLE}[0] eq 'chap-sha1') {
        $code = 0x2000_0000 | 42;
        $message = 'Unknown chap-type: ' . $request->{TUPLE}[0] // 'undef';
        goto ERROR;
    }

    unless($request->{TUPLE}[1] and length $request->{TUPLE}[1] == 20) {
        $code = 0x2000_0000 | 42;
        $message = 'Wrong length of password hash';
        if ($request->{TUPLE}[1]) {
            $message .= ': ' . length $request->{TUPLE}[1];
        }
        goto ERROR;
    }

    unless (defined $request->{USER_NAME}) {
        $code = 0x2000_0000 | 42;
        $message = 'No username in request';
        goto ERROR;
    }

    unless ($self->{cfg}->get('user') eq $request->{USER_NAME}) {
        $code = 0x2000_0000 | 42;
        $message = 'Wrong username in request';
        goto ERROR;
    }

    my $chk = TntCompat::Proto::check_auth(
        $request->{TUPLE}[1], $salt, $self->{cfg}->get('password'));

    unless ($chk) {
        $code = 0x2000_0000 | 42;
        $message = sprintf 'Wrong password for user %s', $request->{USER_NAME};
        goto ERROR;
    }

    $code = 0;
    $message = [];
    $retval = 1;

    ERROR:
        DEBUGF 'Can not auth client: %s', $message if $code;
        my $pkt = TntCompat::Proto::make_response(
                            $request->{SYNC}, $code, $message);
        return 0 unless $self->send_response($fh, $pkt);
        return $retval;
}


sub _join_bootstrap {
    DEBUGF 'Send bootstrap';
    my ($self, $fh, $request) = @_;
    my $bootstrap = TntCompat::Cat::SnapMsgpack->new;

    my $send = 1;
    $bootstrap->on_row(sub {
        my ($lsn, $type, $space, $row) = @_;

        # skip _cluster space
        if ($space == TntCompat::Proto::SC_CLUSTER_ID) {
            return;
        }

        # skip 'cluster' record in _schema
        if ($space == TntCompat::Proto::SC_SCHEMA_ID) {
            return if $row->[0] and $row->[0] eq 'cluster';
        }

        my $pkt = TntCompat::Proto::insert
            $request->{SYNC},
            $self->lsn($fh),
            $space, $row, { server_id => 0 };

        unless ($self->send_response($fh, $pkt)) {
            DEBUGF "Can't send bootstrap row for space %s", $space;
            $send = 0;
            return;
        }
        DEBUGF 'Sent bootstrap row space: %s', $space;
    });

    $bootstrap->data($self->bootstrap);
    return 0 unless $send;



    my $pkt = TntCompat::Proto::insert
                    $request->{SYNC},
                    $self->lsn($fh),
                    TntCompat::Proto::SC_SCHEMA_ID,
                    [ 'cluster' => $self->cluster_uuid ],
                    { server_id => 0 };
    return 0 unless $self->send_response($fh, $pkt);

    $pkt = TntCompat::Proto::insert
                    $request->{SYNC},
                    $self->lsn($fh),
                    TntCompat::Proto::SC_CLUSTER_ID,
                    [ 1, $self->server_uuid ],
                    { server_id => 0 };
    return 0 unless $self->send_response($fh, $pkt);

    return 1;
}


sub _join_schema {
    DEBUGF 'Send schema';
    my ($self, $fh, $request) = @_;

    for my $sno (keys %{ $self->schema }) {
        my $space = $self->schema($sno);

        my $pkt = TntCompat::Proto::insert($request->{LSN}, $self->lsn($fh),
            TntCompat::Proto::SC_SPACE_ID, [
                $sno,                                   # space_id
                1,                                      # uid
                $space->{name} // 'space_' . $sno,      # space_name
                'memtx',                                # engine
                0,                                      # fields_count
                '',                                     # options
            ],
            { server_id => 0 }
        );

        return 0 unless $self->send_response($fh, $pkt);
        DEBUGF 'Space record about space %s was sent', $sno;

        for (my $ino = 0; $ino < @{ $space->{indexes} }; $ino++) {
            my $idx = $space->{indexes}[$ino];


            my $type = lc($idx->{type} || 'tree');
            $type = 'num' if $type eq 'num64';

            my $unique = $idx->{unique} ? 1 : 0;
            $unique = 1 if $ino == 0;
            my $name = $idx->{name} // 'idx_' . $ino;
            $name = 'pk' if $ino == 0 and !defined $idx->{name};

            my $ituple = [
                $sno,
                $ino,
                $name,
                $type,
                $unique,
                int(@{ $idx->{fields} } / 2),
                @{ $idx->{fields} }
            ];

            $pkt = TntCompat::Proto::insert
                $request->{LSN}, $self->lsn($fh),
                TntCompat::Proto::SC_INDEX_ID,
                $ituple,
                { server_id => 0 };
            return 0 unless $self->send_response($fh, $pkt);
            DEBUGF 'Index record about space[%s].index[%s] was sent',
                $sno, $ino;
        }
    }
    return 1;
}


sub _join_snapshot {
    DEBUGF 'Send snapshot';

    my ($self, $fh, $request) = @_;
    my $last_snap = [ sort glob catfile $self->snap_dir, '*.snap' ]->[-1];

    unless ($last_snap) {
        DEBUGF 'No one snapshot in %s', $self->snap_dir;
        return;
    }

    my $reader = TntCompat::Cat::Snap->new;


    my $last_lsn = $last_snap;
    for ($last_lsn) {
        s{.*/}{};
        s/^0+//;
        s/\.snap$//;
    }

    DEBUGF 'Send snapshot %s to client (lsn: %s)', $last_snap, $last_lsn;

    open my $fhb, '<:raw', $last_snap
        or die sprintf "Can't open snapshot file %s\n", $last_snap;

    my $send = 1;

    my $space_debug = -1;
    $reader->on_row(sub {
        my ($lsn, $type, $space, $row) = @_;


        # skip some spaces
        return if $self->is_space_skip($space);

        $row = $self->convert_row($space, $row);

        $lsn = $self->lsn($fh);

        my $pkt = TntCompat::Proto::insert
            $request->{SYNC}, $lsn, $space, $row, { server_id => 0 };

        DEBUGF 'Sending space[%s]...', $space unless $space == $space_debug;
        $space_debug = $space;

#         DEBUGF 'Sending snapshot row space: %s (lsn: %s)...', $space, $lsn;
        unless ($self->send_response($fh, $pkt)) {

            DEBUGF "Can't send snapshot row for space %s", $space;
            $send = undef;
            return;
        }
        DEBUGF 'Sent lsn = %s snapshot row', $lsn
            if $lsn % 10000 == 0;
    });

    while($send and sysread $fhb, my $data, 4096) {
        next unless length $data;
        $reader->data($data);
    }

    DEBUGF 'Snapshot was send fully';
    return 0 unless defined $send;
    return $last_lsn;
}

sub _join_cluster {
    DEBUGF 'Send cluster id';
    my ($self, $fh, $request) = @_;

    my $pkt = TntCompat::Proto::insert(
        $request->{SYNC},
        $self->lsn($fh),
        TntCompat::Proto::SC_CLUSTER_ID,
        [ 2, $request->{SERVER_UUID} ],
        { server_id => 0 }
    );
    return $self->send_response($fh, $pkt);
}


sub _join_vclock {
    my ($self, $fh, $request, $lsn) = @_;

    my $vclock = {
        1   => $lsn,
        2   => 0
    };

    my $pkt = TntCompat::Proto::vclock(
        $request->{SYNC}, $vclock, { server_id => 0 });
    return $self->send_response($fh, $pkt);
}

sub convert_row {
    my ($self, $space, $row) = @_;

    my $schema = $self->schema;
    my @res = @$row;

    return \@res unless exists $schema->{$space};
    
    for (my $fno = 0; $fno < @res; $fno++) {
        if (exists $schema->{$space}{fields}{$fno}) {
            my $ftype = $schema->{$space}{fields}{$fno}{type}
                || $schema->{$space}{default_field_type} || 'STR';

            if ('NUM' eq uc $ftype) {
                $res[ $fno ] = unpack 'L', $res[ $fno ];
            } elsif ('MONEY' eq uc $ftype) {
                $res[ $fno ] = unpack 'L', $res[ $fno ];
                $res[ $fno ] /= 100;
            } elsif ('NUM64' eq uc $ftype) {
                if (length $res[ $fno ] == 4) {
                    $res[ $fno ] = unpack 'L', $res[ $fno ];
                } else {
                    $res[ $fno ] = unpack 'Q', $res[ $fno ];
                }
            } elsif ('JSON' eq uc $ftype) {

                # TODO: decode str
                $res[ $fno ] = JSON::XS->new->decode($res[ $fno ] );

            } elsif ('STR' eq uc $ftype) {
                # force all unknown items as strings
                $res[ $fno ] = string $res[ $fno ];
            }
        }
    }

    return \@res;
}


sub convert_pk {
    my ($self, $space, $row) = @_;
    my @res = @$row;
    my $schema = $self->schema;

    return \@res unless exists $schema->{$space};
    return \@res unless exists $schema->{$space}{indexes};
    return \@res unless my $idx = $schema->{$space}{indexes}[0];
    return \@res unless my $fields = $idx->{fields};


    croak "Inconsistent primary key" unless @$fields  == @res * 2;

    for (my ($i, $fno) = (0, 0); $i < @$fields; $i += 2, $fno++) {
        
        my $type = $fields->[$i + 1]
                || $schema->{$space}{default_field_type} || 'STR';

        if ('NUM' eq uc $type) {
            $res[ $fno ] = unpack 'L', $res[ $fno ];
        } elsif ('MONEY' eq uc $type) {
            $res[ $fno ] = unpack 'L', $res[ $fno ];
            $res[ $fno ] /= 100;
        } elsif ('NUM64' eq uc $type) {
            if (length $res[ $fno ] == 4) {
                $res[ $fno ] = unpack 'L', $res[ $fno ];
            } else {
                $res[ $fno ] = unpack 'Q', $res[ $fno ];
            }
        } else {
            $res[ $fno ] = string $res[ $fno ];
        }
    }

    return \@res;
}

sub convert_ops {
    my ($self, $space, $oplist) = @_;
    my @res = @$oplist;
    my $schema = $self->schema;
    return \@res unless exists $schema->{$space};

    for (@res) {
        my $op  = $_->[0];
        my $fno = $_->[1];
        
        my $type = $schema->{$space}{fields}{$fno}{type} ||
            $schema->{$space}{default_field_type} || 'STR';

        # numbers (by opcopde)
        if ($op =~ /[+&|^]/) {
            if (length $_->[2] == 4) {
                $_->[2] = unpack 'L', $_->[2];
            } elsif(length $_->[2] == 8) {
                $_->[2] = unpack 'Q', $_->[2];
            } else {
                DEBUGF "Corrupted update in xlog (opcode: %s, space: %s)",
                    $op, $space;
            }
            $_->[2] /= 100 if 'MONEY' eq uc $type;
        
        } elsif ($op eq '=' or $op eq '!') {
            if ('NUM' eq uc $type) {
                $_->[2] = unpack 'L', $_->[2];
            } elsif ('MONEY' eq uc $type) {
                $_->[2] = unpack 'L', $_->[2];
                $_->[2] /= 100;
            } elsif ('NUM64' eq uc $type) {
                if (length $_->[2] == 4) {
                    $_->[2] = unpack 'L', $_->[2];
                } elsif (length $_->[2] == 8) {
                    $_->[2] = unpack 'Q', $_->[2];
                } else {
                    $_->[2] = string $_->[2];
                }
            } elsif ('JSON' eq uc $type) {

                # TODO: decode str
                $_->[2] = JSON::XS->new->decode($_->[2] );

            } else {
                $_->[2] = string $_->[2];
            }
        } elsif ($op eq '#') {
            $_->[2] = 1;
        } elsif ($op eq ':') {
            $_->[4] = string $_->[4];
        } else {
            DEBUGF "Unknown update opcode: `%s' (space: %s)", $op, $space;
        }
    }
    return \@res;
}


sub join_snapshot {
    my ($self, $fh, $request) = @_;

    eval {
        $self->enable_wbuf($fh);
        return unless $self->_join_bootstrap($fh, $request);
        return unless $self->_join_schema($fh, $request);
        my $lsn = $self->_join_snapshot($fh, $request);
        return unless $lsn;
        return unless $self->_join_cluster($fh, $request);

        $self->disable_wbuf($fh);
        return unless $self->_join_vclock($fh, $request, $lsn);
        DEBUGF 'Finished join';
    };
    if ($@) {
        DEBUGF 'Error while join: %s', $@;
        my $pkt = TntCompat::Proto::make_response($request->{SYNC},
            0x2000_0000 | 42,
            'error: ' . $@
        );
        unless ($self->send_response($fh, $pkt)) {
            DEBUGF "Can't send response to client";
        }
    }
}


sub subscribe {
    my ($self, $fh, $request) = @_;

    if ($request->{CLUSTER_UUID} ne $self->cluster_uuid) {
        my $pkt = TntCompat::Proto::make_response(
                            $request->{SYNC}, 0x2000_0000 | 63,
            sprintf "Cluster id of the replica %s doesn't ".
                "match cluster id of the master %s",
                $request->{CLUSTER_UUID} // 'undef',
                $self->cluster_uuid
        );

        $self->send_response($fh, $pkt);
        return 0;
    }


    unless (exists $request->{VCLOCK} and exists $request->{VCLOCK}{1}) {
        my $pkt = TntCompat::Proto::make_response(
                            $request->{SYNC}, 0x2000_0000 | 63,
                            "Invalid vclock"
        );

        $self->send_response($fh, $pkt);
        return 0
    }

    my $lsn = $request->{VCLOCK}{1};
    DEBUGF 'Client subscribe since LSN=%s', $lsn;

    my ($file, $prev_file);
    while(1) {
        $file = $self->_wait_xlog($lsn, $prev_file);
        next unless $file;
        $prev_file = $file;
        
        DEBUGF 'Send %s xlog file to client', $file;



        my $f;
        unless (open $f, '<:raw', $file) {
            DEBUGF "Can't open file %s: %s", $file, decode utf8 => $!;
            next;
        }
        
        my $send = 1;
        my $xlog = new TntCompat::Cat::Xlog;
        $xlog->on_row(sub {
            my ($rlsn, $type, $space, $row, @args) = @_;
            return unless $send;

            # SKIP old lsns
            return if $rlsn <= $lsn;


#             DEBUGF 'Send record for space %s.%s (lsn=%s)', $space, $type, $rlsn;
            if ($rlsn > $lsn + 1) {
                DEBUGF 'LSN hole from lsn %s to lsn %s', $lsn, $rlsn;
                $send = 0;
                return;
            }
            $lsn = $rlsn;


            my $pkt;

            if ($type eq 'insert') {
                $pkt = TntCompat::Proto::insert
                    $request->{SYNC},
                    $lsn,
                    $space,
                    $self->convert_row($space, $row),
                    { server_id => 1 }
                ;
            } elsif ($type eq 'replace') {
                $pkt = TntCompat::Proto::replace
                    $request->{SYNC},
                    $lsn,
                    $space,
                    $self->convert_row($space, $row),
                    { server_id => 1 }
                ;
            } elsif ($type eq 'update') {
                $pkt = TntCompat::Proto::update
                    $request->{SYNC},
                    $lsn,
                    $space,
                    $self->convert_pk($space, $row),
                    $self->convert_ops($space, $args[0]),
                    { server_id => 1 }
                ;
            } elsif ($type eq 'delete') {
                $pkt = TntCompat::Proto::del
                    $request->{SYNC},
                    $lsn,
                    $space,
                    $self->convert_pk($space, $row),
                    { server_id => 1 }
                ;
            }

            unless (defined $pkt) {
                croak "Can't convert $type command";
            }

            if ($self->is_space_skip($space)) {
                DEBUGF 'Skip space %s.%s record', $space, $type;
            } else {
                unless ($self->send_response($fh, $pkt)) {
                    DEBUGF "Can't send xlog row for space %s, lsn: %s",
                        $space, $lsn;
                    $send = 0;
                    return;
                }
                DEBUGF 'Sent xlog row space: %s, lsn: %s', $space, $lsn
                    if $lsn % 10000 == 0;
            }
        });

        READ_PROCESS: while($send) {

            while($send) {
                my $rd = sysread $f, my $data, 4096;
                unless (defined $rd) {
                    DEBUGF "Can't read file %s: %s", $file, decode utf8 => $!;
                    last READ_PROCESS;
                }

                last unless length $data;
                $xlog->data($data);
            }

            return 0 unless $send;

            my @xlogs = sort glob catfile $self->wal_dir, '*.xlog';
            for (@xlogs) {
                next if $_ le $file;
                last READ_PROCESS;
            }
            Coro::AnyEvent::sleep .5;
        }
    }
}

sub _wait_xlog {
    my ($self, $lsn, $file) = @_;

    my $do_wait = 1;
    my @xlogs = glob catfile $self->wal_dir, '*.xlog';

CHECK:
    goto WAIT unless @xlogs;
    @xlogs = sort @xlogs;

    while(my $xlog = shift @xlogs) {
        # skip file that was already sent
        next if $file and $xlog eq $file;

        my $xlog_lsn = basename $xlog, '.xlog';
        $xlog_lsn =~ s/^0+//;
        goto WAIT unless $xlog_lsn <= $lsn + 1;
        return $xlog unless @xlogs;


        my $next_xlog = $xlogs[0];
        my $next_xlog_lsn = basename $next_xlog, '.xlog';
        $next_xlog_lsn =~ s/^0+//;

        return $xlog if $next_xlog_lsn > $lsn + 1;
    }


WAIT:
    return unless $do_wait;
    $do_wait = 0;
    Coro::AnyEvent::sleep .2;
    @xlogs = glob catfile $self->wal_dir, '*.xlog';
    goto CHECK;
}

sub downlink {
    my ($self, $fh) = @_;

    $self->clean_lsn($fh);

    my $salt = '';
    $salt .= chr int rand 256 for 1 .. 32;
    my $salthex = encode_base64($salt, '');

    my $handshake = pack 'a64a64', "Tarantool 1.6~compat\n", "$salthex\n";

    my $wrd = $fh->syswrite($handshake);
    unless ($wrd and $wrd == 128) {
        DEBUGF 'Can not write handshake %s', $!;
        close $fh;
        return;
    }

    my $authen;

    DEBUGF 'Wait requests';
    while (my $request = $self->read_request($fh)) {
        unless ($request) {
            DEBUGF 'Close connection after error';
            last;
        }


        if ($request->{CODE} eq 'PING') {
            DEBUGF 'Client -> pings';
            last unless $self->ping($fh, $request);
            next;
        }

        if ($request->{CODE} eq 'AUTH') {
            DEBUGF 'Client -> auth';
            my $authen_ok = $self->auth($fh, $request, $salt);
            last unless $authen_ok;
            $authen = 1 if $authen_ok and $authen_ok != -1;
            DEBUGF 'Client is %sauthentificated', $authen ? '' : 'not ';
            next;
        }

        if ($request->{CODE} eq 'JOIN') {
            DEBUGF 'Client -> join';
            $self->join_snapshot($fh, $request);
            last;
        }


        if ($request->{CODE} eq 'SUBSCRIBE') {
            DEBUGF 'Client -> subscribe';
            $self->subscribe($fh, $request);
            last;
        }

        DEBUGF 'Unsupported request %s', Dumper $request;
    }

    DEBUGF 'Closing connection';
    close $fh;
}

1;

# bootstrap
__DATA__



Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.