Group
Extension

POE-Component-ElasticSearch-Indexer/bin/file-to-elasticsearch.pl

#!perl
# PODNAME: file-to-elasticsearch.pl
# ABSTRACT: A simple utility to tail a file and index each line as a document in ElasticSearch
use strict;
use warnings;

use Getopt::Long::Descriptive qw(describe_options);
use Hash::Merge::Simple qw(merge);
use JSON::MaybeXS qw(decode_json encode_json);
use Log::Log4perl qw(:easy);
use Module::Load qw(load);
use Module::Loaded qw(is_loaded);
use Pod::Usage;
use Ref::Util qw(is_arrayref is_hashref);
use YAML ();

sub POE::Kernel::ASSERT_DEFAULT { 1 }
use POE qw(
    Component::ElasticSearch::Indexer
    Wheel::FollowTail
);

my %DEFAULT = (
    config => '/etc/file-to-elasticsearch.yaml',
    stats_interval => 60,
);

my ($opt,$usage) = describe_options('%c %o',
    ['config|c=s', "Config file, default: $DEFAULT{config}",
        { default => $DEFAULT{config}, callbacks => { "must be a readable file" => sub { -r $_[0] } } }
    ],
    ['log4perl-config|L=s', "Log4perl Configuration to use, defaults to STDERR",
        { callbacks => { "must be a readable file" => sub { -r $_[0] } } }
    ],
    ['stats-interval|s=i', "Seconds between displaying statistics, default: $DEFAULT{stats_interval}",
        { default => $DEFAULT{stats_interval} },
    ],
    ['debug',       "Enable most verbose output" ],
    [],
    ['help',       "Display this help.", { shortcircuit => 1 }],
    ['manual|pod', "Display the user manaul.", { shortcircuit => 1 }],
);

if( $opt->help ) {
    print $usage->text;
    exit 0;
}
pod2usage( -verbose => 2, -exitval => 0 ) if $opt->manual;

my $config = YAML::LoadFile( $opt->config );

# Initialize Logging
my $level = $opt->debug ? 'TRACE' : 'DEBUG';
my $loggingConfig = $opt->log4perl_config || \qq{
    log4perl.logger = $level, Screen
    log4perl.appender.Screen = Log::Log4perl::Appender::ScreenColoredLevels
    log4perl.appender.Screen.layout   = PatternLayout
    log4perl.appender.Screen.layout.ConversionPattern = %d [%P] %p - %m%n
};
Log::Log4perl->init($loggingConfig);

my $main = POE::Session->create(
    inline_states => {
       _start => \&main_start,
       _stop  => \&main_stop,
       _child => \&main_child,
       stats  => \&main_stats,

       got_new_line     => \&got_new_line,
       get_error        => \&got_error,
       log4perl_refresh => \&log4perl_refresh,
    },
    heap => {
        config => $config,
        stats  => {},
    },
);

POE::Kernel->run();
exit 0;

sub main_start {
    my ($kernel,$heap) = @_[KERNEL,HEAP];

    my $config = $heap->{config};
    my %defaults = (
        interval => 5,
        index    => 'files-%Y.%m.%d',
        type     => 'log',
    );

    my $files = 0;
    foreach my $tail ( @{ $config->{tail} } ) {
        if( -r $tail->{file} ) {
            my $wheel = POE::Wheel::FollowTail->new(
                Filename     => $tail->{file},
                InputEvent   => 'got_new_line',
                ErrorEvent   => 'got_error',
                PollInterval => $tail->{interval} || $defaults{interval},
            );
            $heap->{wheels}{$wheel->ID} = $wheel;
            $heap->{instructions}{$wheel->ID} = {
                %defaults,
                %{ $tail },
            };
            $files++;
            DEBUG(sprintf "Wheel %d tailing %s", $wheel->ID, $tail->{file});
        }
    }

    die sprintf("No files found to tail in %s", $opt->config) unless $files > 0;

    my $es = $config->{elasticsearch} || {};
    $heap->{elasticsearch} = POE::Component::ElasticSearch::Indexer->spawn(
        Alias         => 'es',
        Protocol      => $es->{protocol} || 'http',
        Servers       => $es->{servers} || [qw( localhost:9200 )],
        Timeout       => $es->{timeout} || 5,
        FlushInterval => $es->{flush_interval} || 10,
        FlushSize     => $es->{flush_size} || 100,
        LoggingConfig => $loggingConfig,
        StatsInterval => $opt->stats_interval,
        StatsHandler  => sub {
            my ($stats) = @_;
            foreach my $k (keys %{ $stats }) {
                $heap->{stats}{$k} ||= 0;
                $heap->{stats}{$k}++;
            }
        },
        exists $es->{index} ? ( DefaultIndex => $es->{index} ) : (),
        exists $es->{type}  ? ( DefaultType  => $es->{type}  ) : (),
    );

    # Watch the Log4perl Config
    $kernel->delay( log4perl_refresh => 60 ) if $opt->log4perl_config;
    $kernel->delay( stats => $opt->stats_interval );

    INFO("Started $0 watching $files files.");
}

sub main_stop {
    $poe_kernel->post( es => 'shutdown' );
    FATAL("Shutting down $0");
}

sub main_child {
    my ($kernel,$heap,$reason,$child) = @_[KERNEL,HEAP,ARG0,ARG1];
    INFO(sprintf "Child [%d] %s event.", $child->ID, $reason);
}

sub main_stats {
    my ($kernel,$heap) = @_[KERNEL,HEAP];

    # Reschedule
    $kernel->delay( stats => $opt->stats_interval );

    # Collect our stats
    my $stats = delete $heap->{stats};
    $heap->{stats} = {};

    # Display them
    my $message = keys %{ $stats } ? join(", ", map {"$_=$stats->{$_}"} sort keys %{ $stats })
                : "Nothing to report.";
    INFO("STATS - $message");
}

sub got_error {
    my ($kernel,$heap,$op,$errnum,$errstr,$wheel_id) = @_[KERNEL,HEAP,ARG0..ARG3];

    ERROR("Wheel $wheel_id during $op got $errnum : $errstr");

    # Remove the Wheel from the polling
    if( exists $heap->{wheels}{$wheel_id} ) {
        delete $heap->{wheels}{$wheel_id};
        $heap->{stats}{wheel_error} ||= 0;
        $heap->{stats}{wheel_error}++;
    }

    # Close the ElasticSearch session if this is the last wheel
    if( !keys %{ $heap->{wheels} } ) {
        $kernel->post( es => 'shutdown' );
    }
}

sub log4perl_refresh {
    my $kernel = $_[KERNEL];
    TRACE("Rescanning Log4perl configuration at" . $opt->log4perl_config);
    # Reschedule
    $kernel->delay( log4perl_refresh => 60 );
    # Rescan the Log4perl configuration
    Log::Log4perl::Config->watcher->force_next_check();
    return;
}

sub got_new_line {
    my ($kernel,$heap,$line,$wheel_id) = @_[KERNEL,HEAP,ARG0,ARG1];

    $heap->{stats}{received} ||= 0;
    $heap->{stats}{received}++;

    my $instr = $heap->{instructions}{$wheel_id};

    my $doc;
    if( $instr->{decode} ) {
        my $decoders = is_arrayref($instr->{decode}) ? $instr->{decode} : [ $instr->{decode} ];
        foreach my $decoder ( @{ $decoders } ) {
            if( $decoder eq 'json' ) {
                my $start = index('{', $line);
                my $blob  = $start > 0 ? substr($line,$start) : $line;
                my $new;
                eval {
                    $new = decode_json($blob);
                    1;
                } or do {
                    my $err = $@;
                    TRACE("Bad JSON, error: $err\n$blob");
                    next;
                };
                $doc = merge( $doc, $new );
            }
            elsif( $decoder eq 'syslog' ) {
                unless( is_loaded('Parse::Syslog::Line') ) {
                    eval {
                        load "Parse::Syslog::Line";
                        1;
                    } or do {
                        my $err = $@;
                        die "To use the 'syslog' decoder, please install Parse::Syslog::Line: $err";
                    };
                    no warnings qw(once);
                    $Parse::Syslog::Line::PruneRaw = 1;
                }
                # If we make it here, we're ready to parse
                $doc = Parse::Syslog::Line::parse_syslog_line($line);
            }
        }
    }

    # Extractors
    if( my $extracters = $instr->{extract} ) {
        foreach my $extract( @{ $extracters } ) {
            # Only process items with a "by"
            if( $extract->{by} ) {
                my $from = $extract->{from} ? ( is_hashref($doc) && exists $doc->{$extract->{from}} ? $doc->{$extract->{from}} : undef )
                         : $line;
                next unless $from;
                if( $extract->{when} ) {
                    next unless $from =~ /$extract->{when}/;
                }
                if( $extract->{by} eq 'split' ) {
                    next unless $extract->{split_on};
                    my @parts = split /$extract->{split_on}/, $from;
                    if( my $keys = $extract->{split_parts} ) {
                        # Name parts
                        for( my $i = 0; $i < @parts; $i++ ) {
                            next unless $keys->[$i] and length $keys->[$i] and $parts[$i];
                            next if lc $keys->[$i] eq 'null' or lc $keys->[$i] eq 'undef';
                            if( my $into = $extract->{into} ) {
                                # Make sure we have a hash reference
                                $doc ||=  {};
                                $doc->{$into} = {} unless is_hashref($doc->{$into});
                                $doc->{$into}{$keys->[$i]} = $parts[$i];
                            }
                            else {
                                $doc ||=  {};
                                $doc->{$keys->[$i]} = $parts[$i];
                            }
                        }
                    }
                    else {
                        # This is an array, so it's simple
                        my $target = $extract->{into} ? $extract->{into} : $extract->{from};
                        $doc->{$target} = @parts > 1  ? [ grep { defined and length } @parts ] : $parts[0];
                    }
                }
                elsif( $extract->{by} eq 'regex' ) {
                    # Skip unless it's valid
                    next unless $extract->{regex} and $extract->{regex_parts};

                    if( my @parts = ($from =~ /$extract->{regex}/) ) {
                        # Name parts
                        my $keys = $extract->{regex_parts};
                        for( my $i = 0; $i < @parts; $i++ ) {
                            next unless $keys->[$i] and length $keys->[$i] and $parts[$i];
                            next if lc $keys->[$i] eq 'null' or lc $keys->[$i] eq 'undef';
                            if( my $into = $extract->{into} ) {
                                # Make sure we have a hash reference
                                $doc ||=  {};
                                $doc->{$into} = {} unless is_hashref($doc->{$into});
                                $doc->{$into}{$keys->[$i]} = $parts[$i];
                            }
                            else {
                                $doc ||=  {};
                                $doc->{$keys->[$i]} = $parts[$i];
                            }
                        }
                    }
                }
            }
        }
    }

    # Skip if the document isn't put together yet
    return unless $doc;

    $heap->{stats}{docs} ||= 0;
    $heap->{stats}{docs}++;

    # Store Line in _raw now
    $doc->{_raw}  = $line;
    $doc->{_path} = $instr->{file};

    # Mutators
    if( my $mutate = $instr->{mutate} ) {
        # Copy
        if( my $copy = $mutate->{copy} ) {
            foreach my $k ( keys %{ $copy } ) {
                my $destinations = is_arrayref($copy->{$k}) ? $copy->{$k} : [ $copy->{$k} ];
                foreach my $dst ( @{ $destinations } ) {
                    $doc->{$dst} = $doc->{$k};
                }
            }
        }
        # Rename Keys
        if( my $rename = $mutate->{rename} ) {
            foreach my $k ( keys %{ $rename } ) {
                next unless exists $doc->{$k};
                $doc->{$rename->{$k}} = delete $doc->{$k};
            }
        }
        # Remove unwanted keys
        if( $mutate->{remove} ) {
            foreach my $k ( @{ $mutate->{remove} } ) {
                delete $doc->{$k} if exists $doc->{$k};
            }
        }
        # Append
        if( my $append = $mutate->{append} ) {
            foreach my $k ( keys %{ $append } ) {
                $doc ||=  {};
                $doc->{$k} = $append->{$k};
            }
        }
        # Prune empty or undefined keys
        if( $mutate->{prune} ) {
            foreach my $k (keys %{ $doc }) {
                delete $doc->{$k} unless defined $doc->{$k} and length $doc->{$k};
            }
        }
    }

    foreach my $meta (qw(index type)) {
        $doc->{"_$meta"} = $instr->{$meta} if exists $instr->{$meta};
    }

    if( $opt->debug ) {
        TRACE("Indexing: " . encode_json($doc));
    }

    # Send the document to ElasticSearch
    $kernel->post( es => queue => $doc );
}

__END__

=pod

=encoding UTF-8

=head1 NAME

file-to-elasticsearch.pl - A simple utility to tail a file and index each line as a document in ElasticSearch

=head1 VERSION

version 0.015

=head1 SYNOPSIS

To see available options, run:

    file-to-elasticsearch.pl --help

Create a config file and run the utility:

    file-to-elasticsearch.pl --config config.yaml --log4perl logging.conf --debug

This will run a single threaded POE instance that will tail the log files
you've requested, performing the requested transformations and sending them to
the elasticsearch cluster and index you've specified.

=head1 CONFIGURATION

=head1 Configuration

=head2 ElasticSearch Settings

The C<elasticsearch> section of the config controls the settings passed to the
L<POE::Component::ElasticSearch::Indexer>.

    ---
    elasticsearch:
      servers: [ "localhost:9200" ]
      flush_interval: 30
      flush_size: 1_000
      index: logstash-%Y.%m.%d
      type: log

The settings available are:

=over 4

=item B<servers>

An array of servers used to send bulk data to ElasticSearch.  The default is
just localhost on port 9200.

=item B<flush_interval>

Every C<flush_interval> seconds, the queued documents are send to the Bulk API
of the cluster.

=item B<flush_size>

If this many documents is received, regardless of the time since the last
flush, force a flush of the queued documents to the Bulk API.

=item B<index>

A C<strftime> compatible string to use as the C<DefaultIndex> parameter if a
file doesn't pass one along.

=item B<type>

Mostly useless as Elastic is abandoning "types", but this will be set as the
C<DefaultType> for documents being indexed.

=back

=head2 Tail Section

The C<files> section contains the list of files to tail and the rules to use to
index them.

    ---
    tail:
      - file: '/var/log/osquery/result.log'
        index: "osquery-result-%Y.%m.%d"
        decode: json
        extract:
          - by: split
            from: name
            when: '^pack'
            into: 'pack'
            split_on: '/'
            split_parts: [ null, "name", "report" ]
        mutate:
          prune: true
          remove: [ "calendarTime", "epoch", "counter", "_raw" ]
          rename:
            unixTime: _epoch

Each element is a hash containing the following information.

=over 4

=item B<file>

B<Required>: The path to the file on the filesystem.

=item B<decode>

This may be a single element, or an array, containing one or more of the implemented decoders.

=over 4

=item json

Decode the discovered JSON in the document to a hash reference.  This finds the
first occurrence of an C<{> in the string and assumes everything to the end of
the string is JSON.

Decoding is done by L<JSON::MaybeXS>.

=item syslog

Parses each line as a standard UNIX syslog message.  Parsing is provided via
L<Parse::Syslog::Line> which isn't a hard requirement of the this package, but
will be loaded if available.

=back

=item B<index>

A C<strftime> compatible string to use as the index to put documents created
from this file.  If not specified, the defaults from the ElasticSearch section
will be used, and failing that, the default as specified in
L<POE::Component::ElasticSearch::Index>.

=item B<type>

The type to use for documents sourced from this file.

=item B<extract>

Extraction of fields from the document by one of the supported methods.

=over 4

=item by

Can be 'split' or 'regex'.

B<split> supports:

=over 4

=item split_on

Regex or string to split the string on.

=item split_parts

Name for each part of the split, C<undef> positions in the split string will be discarded.

=back

B<regex> supports:

=over 4

=item regex

The regex to use to extract, using capture groups to designate:

=item regex_parts

Name for reach captured group, C<undef> positions in the list will be discarded.

=back

=item from

Name of the field to apply the extraction to.

=item when

Limits applying the extraction to values matching the regex.

=item into

Top level namespace for the collected keys to wind up inside of, ie:

    extract:
      - by: split
        from: name
        when: '^pack'
        into: 'pack'
        split_on: '/'
        split_parts: [ null, "name", "report"  ]

Will look at the field B<name> and when it matches C<^pack> it will split the
name on C</> and index the second element to C<name> and the third to C<report>, so:

    name: pack/os/cpu_info

Becomes:

    pack:
      name: os
      report: cpu_info

=back

=back

=head1 AUTHOR

Brad Lhotsky <brad@divisionbyzero.net>

=head1 COPYRIGHT AND LICENSE

This software is Copyright (c) 2018 by Brad Lhotsky.

This is free software, licensed under:

  The (three-clause) BSD License

=cut


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.