Group
Extension

Fluent-AgentLite/lib/Fluent/AgentLite.pm

package Fluent::AgentLite;

use strict;
use warnings;
use English;
use Carp;

use POSIX qw(:errno_h);

use Time::HiRes;

use Time::Piece;
use Log::Minimal;

use IO::Socket::INET;
use Data::MessagePack;

our $VERSION = '1.0';

use constant READ_WAIT => 0.1; # 0.1sec

use constant SOCKET_TIMEOUT => 5; # 5sec

use constant CONNECTION_KEEPALIVE_INFINITY => 0;
use constant CONNECTION_KEEPALIVE_TIME => 1800; # 30min
use constant CONNECTION_KEEPALIVE_MIN => 120; # min 2min
use constant CONNECTION_KEEPALIVE_MARGIN_MAX => 30; # max 30sec

use constant RECONNECT_WAIT_MIN => 0.5;  # 0.5sec
use constant RECONNECT_WAIT_MAX => 3600; # 60min
use constant RECONNECT_WAIT_INCR_RATE => 1.5;

use constant SEND_RETRY_MAX => 4;

sub connection_keepalive_time {
    my ($keepalive_time) = @_;
    $keepalive_time + int(CONNECTION_KEEPALIVE_MARGIN_MAX * 2 * rand()) - CONNECTION_KEEPALIVE_MARGIN_MAX;
}

sub new {
    my $this = shift;
    my ($tag, $primary_servers, $secondary_servers, $configuration) = @_;
    my $self = {
        tag => $tag,
        servers => {
            primary => $primary_servers,
            secondary => $secondary_servers,
        },
        buffer_size => $configuration->{buffer_size},
        ping_message => $configuration->{ping_message},
        drain_log_tag => $configuration->{drain_log_tag},
        keepalive_time => $configuration->{keepalive_time},
        output_format => $configuration->{output_format},
    };

    if (defined $self->{output_format} and $self->{output_format} eq 'json') {
        *pack = *pack_json;
        *pack_drainlog = *pack_drainlog_json;
    }

    srand (time ^ $PID ^ unpack("%L*", `ps axww | gzip`));

    bless $self, $this;
}

sub execute {
    my $self = shift;
    my $args = shift;

    my $fieldname = $args->{fieldname};
    my $tailfd = $args->{tailfd};

    my $check_terminated = ($args->{checker} || {})->{term} || sub { 0 };
    my $check_reconnect = ($args->{checker} || {})->{reconnect} || sub { 0 };

    my $packer = Data::MessagePack->new();

    my $reconnect_wait = RECONNECT_WAIT_MIN;

    my $last_ping_message = time;
    if ($self->{ping_message}) {
        $last_ping_message = time - $self->{ping_message}->{interval} * 2;
    }
    my $keepalive_time = CONNECTION_KEEPALIVE_TIME;
    if (defined $self->{keepalive_time}) {
        $keepalive_time = $self->{keepalive_time};
        if ($keepalive_time < CONNECTION_KEEPALIVE_MIN and $keepalive_time != CONNECTION_KEEPALIVE_INFINITY) {
            warnf 'Keepalive time setting is too short. Set minimum value %s', CONNECTION_KEEPALIVE_MIN;
            $keepalive_time = CONNECTION_KEEPALIVE_MIN;
        }
    }

    my $pending_packed;
    my $continuous_line;
    my $disconnected_primary = 0;
    my $expiration_enable = $keepalive_time != CONNECTION_KEEPALIVE_INFINITY;

    while(not $check_terminated->()) {
        # at here, connection initialized (after retry wait if required)

        # connect to servers
        my $primary = $self->choose($self->{servers}->{primary});
        my $secondary;

        my $sock = $self->connect($primary) unless $disconnected_primary;
        if (not $sock and $self->{servers}->{secondary}) {
            $secondary = $self->choose($self->{servers}->{secondary});
            $sock = $self->connect($self->choose($self->{servers}->{secondary}));
        }
        $disconnected_primary = 0;
        unless ($sock) {
            # failed to connect both of primary / secondary
            warnf 'failed to connect servers, primary: %s, secondary: %s', $primary, ($secondary || 'none');
            warnf 'waiting %s seconds to reconnect', $reconnect_wait;

            Time::HiRes::sleep($reconnect_wait);
            $reconnect_wait *= RECONNECT_WAIT_INCR_RATE;
            $reconnect_wait = RECONNECT_WAIT_MAX if $reconnect_wait > RECONNECT_WAIT_MAX;
            next;
        }

        # succeed to connect. set keepalive disconnect time
        my $connecting = $secondary || $primary;

        my $expired = time + connection_keepalive_time($keepalive_time) if $expiration_enable;
        $reconnect_wait = RECONNECT_WAIT_MIN;

        while(not $check_reconnect->()) {
            # connection keepalive expired
            if ($expiration_enable and time > $expired) {
                infof "connection keepalive expired.";
                last;
            }

            # ping message (if enabled)
            my $ping_packed = undef;
            if ($self->{ping_message} and time >= $last_ping_message + $self->{ping_message}->{interval}) {
                $ping_packed = $self->pack_ping_message($packer, $self->{ping_message}->{tag}, $self->{ping_message}->{data});
                $last_ping_message = time;
            }

            # drain (sysread)
            my $lines = 0;
            if (not $pending_packed) {
                my $buffered_lines;
                ($buffered_lines, $continuous_line, $lines) = $self->drain($tailfd, $continuous_line);

                if ($buffered_lines) {
                    $pending_packed = $self->pack($packer, $fieldname, $buffered_lines);
                    if ($self->{drain_log_tag}) {
                        $pending_packed .= $self->pack_drainlog($packer, $self->{drain_log_tag}, $lines);
                    }
                }
                if ($ping_packed) {
                    $pending_packed ||= '';
                    $pending_packed .= $ping_packed;
                }
                unless ($pending_packed) {
                    Time::HiRes::sleep READ_WAIT;
                    next;
                }
            }
            # send
            my $written = $self->send($sock, $pending_packed);
            unless ($written) { # failed to write (socket error).
                $disconnected_primary = 1 unless $secondary;
                last;
            }

            $pending_packed = undef;
        }
        if ($check_reconnect->()) {
            infof "SIGHUP (or SIGTERM) received";
            $disconnected_primary = 0;
            $check_reconnect->(1); # clear SIGHUP signal
        }
        infof "disconnecting to current server";
        if ($sock) {
            $sock->close;
            $sock = undef;
        }
        infof "disconnected.";
    }
    if ($check_terminated->()) {
        warnf "SIGTERM received";
    }
    infof "process exit";
}

sub drain {
    # if broken child process (undefined return value of $fd->sysread())
    #   if content exists, return it.
    #   else die
    my ($self,$fd, $continuous_line) = @_;
    my $readlimit = $self->{buffer_size};
    my $readsize = 0;
    my $readlines = 0;
    my @buffered_lines;

    my $chunk;
    while ($readsize < $readlimit) {
        my $bytes = $fd->sysread($chunk, $readlimit);
        if (defined $bytes and $bytes == 0) { # EOF (child process exit)
            last if $readsize > 0;
            warnf "failed to read from child process, maybe killed.";
            confess "give up to read tailing fd, see logs";
        }
        if (not defined $bytes and $! == EAGAIN) { # I/O Error (no data in fd): "Resource temporarily unavailable"
            last;
        }
        if (not defined $bytes) { # Other I/O error... what?
            warnf "I/O error with tail fd: $!";
            last;
        }

        $readsize += $bytes;
        my $terminated_line = chomp $chunk;
        my @lines = split(m!\n!, $chunk);
        if ($continuous_line) {
            $lines[0] = $continuous_line . $lines[0];
            $continuous_line = undef;
        }
        if (not $terminated_line) {
            $continuous_line = pop @lines;
        }
        if (scalar(@lines) > 0) {
            push @buffered_lines, @lines;
            $readlines += scalar(@lines);
        }
    }
    if ($readlines < 1) {
        return undef, $continuous_line, 0;
    }

    return (\@buffered_lines, $continuous_line, $readlines);
}

# MessagePack 'PackedForward' object
# see lib/fluent/plugin/in_forward.rb in fluentd
sub pack {
    my ($self,$packer,$fieldname,$lines) = @_;
    my $t = time;
    my $event_stream = join('', map { $packer->pack([$t, {$fieldname => $_}]) } @$lines);
    return $packer->pack([$self->{tag}, $event_stream]);
}

sub pack_json {
    use JSON::XS qw/encode_json/;
    my ($self,$packer,$fieldname,$lines) = @_;
    my $t = time;
    return encode_json([$self->{tag}, [map { [$t, {$fieldname => $_}] } @$lines ]]);
}

# MessagePack 'Message' object
sub pack_ping_message {
    my ($self,$packer,$ping_tag,$ping_data) = @_;
    my $t = time;
    return $packer->pack([$ping_tag, $t, {'data' => $ping_data}]);
}

# MessagePack 'Message' object
sub pack_drainlog {
    my ($self,$packer,$drain_log_tag,$drain_lines) = @_;
    my $t = time;
    return $packer->pack([$drain_log_tag, $t, {'drain' => $drain_lines}]);
}

sub pack_drainlog_json {
    use JSON::XS qw/encode_json/;
    my ($self,$packer,$drain_log_tag,$drain_lines) = @_;
    my $t = time;
    return encode_json([$drain_log_tag, $t, {'drain' => $drain_lines}]);
}

# choose a server [host, port] randomly from arg arrayref
sub choose {
    my ($self,$servers) = @_;
    my $num = scalar(@$servers);
    return $servers->[int(rand() * $num)];
}

sub connect {
    my ($self,$server) = @_;
    my $sock = IO::Socket::INET->new(
        PeerAddr  => $server->[0],
        PeerPort  => $server->[1],
        Proto     => 'tcp',
        Timeout   => SOCKET_TIMEOUT,
        ReuseAddr => 1,
    );
    if ($sock) {
        infof 'connected to server: %s', $server;
    } else {
        warnf 'failed to connect to server %s : %s', $server, $!;
    }
    $sock;
}

sub send {
    my ($self,$sock,$data) = @_;
    my $length = length($data);
    my $written = 0;
    my $retry = 0;

    local $SIG{"PIPE"} = sub { die $! };

    eval {
        while ($written < $length) {
            my $wbytes = $sock->syswrite($data, $length, $written);
            if ($wbytes) {
                $written += $wbytes;
            }
            else {
                die "failed $retry times to send data: $!" if $retry > SEND_RETRY_MAX;
                $retry += 1;
            }
        }
    };
    if ($@) {
        my $error = $@;
        warnf "Cannot send data: $error";
        return undef;
    }
    $written;
}

sub close {
    my ($self,$sock) = @_;
    $sock->close if $sock;
}

1;


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.