Message-Passing-Output-ElasticSearch/lib/Message/Passing/Output/ElasticSearch.pm
package Message::Passing::Output::ElasticSearch;
use Moose;
use ElasticSearch;
use AnyEvent;
use Scalar::Util qw/ weaken /;
use MooseX::Types::Moose qw/ ArrayRef Str Bool /;
use Scalar::Util qw/ weaken /;
use Try::Tiny qw/ try catch /;
use aliased 'DateTime' => 'DT';
use MooseX::Types::ISO8601 qw/ ISO8601DateTimeStr /;
use MooseX::Types::DateTime qw/ DateTime /;
use JSON qw/ encode_json /;
use namespace::autoclean;
our $VERSION = '0.003';
$VERSION = eval $VERSION;
with 'Message::Passing::Role::Output';
has elasticsearch_servers => (
isa => ArrayRef[Str],
is => 'ro',
required => 1,
);
has _es => (
is => 'ro',
isa => 'ElasticSearch',
lazy => 1,
default => sub {
my $self = shift;
return ElasticSearch->new(
transport => "aehttp",
servers => $self->elasticsearch_servers,
timeout => 30,
# trace_calls => 1,
);
}
);
has queue => (
is => 'ro',
isa => ArrayRef,
default => sub { [] },
init_arg => undef,
lazy => 1,
clearer => '_clear_queue',
);
has _indexes => (
isa => 'HashRef',
lazy => 1,
is => 'ro',
default => sub { {} },
clearer => '_clear_indexes',
);
has verbose => (
isa => 'Bool',
is => 'ro',
default => sub {
-t STDIN
},
);
sub consume {
my ($self, $data) = @_;
return unless $data;
my $date;
if (my $epochtime = delete($data->{epochtime})) {
$date = DT->from_epoch(epoch => $epochtime);
delete($data->{date});
}
elsif (my $try_date = delete($data->{date})) {
if (is_ISO8601DateTimeStr($try_date)) {
$date = to_DateTime($try_date);
}
}
$date ||= DT->from_epoch(epoch => time());
my $type = $data->{__CLASS__} || 'unknown';
my $index_name = 'logstash-' . $date->year . '.' . sprintf("%02d", $date->month) . '.' . sprintf("%02d", $date->day);
if ($index_name !~ /logstash-2012/) {
use Data::Dumper;
die "BUG - We generated a message from before 2012 - Payload " . Dumper($data);
}
$self->_indexes->{$index_name} = 1;
my $to_queue = {
type => $type,
index => $index_name,
data => {
'@timestamp' => to_ISO8601DateTimeStr($date),
'@tags' => [],
'@type' => $type,
'@source_host' => delete($data->{hostname}) || 'none',
'@message' => exists($data->{message}) ? delete($data->{message}) : encode_json($data),
'@fields' => $data,
},
exists($data->{uuid}) ? ( id => delete($data->{uuid}) ) : (),
};
push(@{$self->queue}, $to_queue);
if (scalar(@{$self->queue}) > 1000) {
$self->_flush;
}
}
has _am_flushing => (
isa => Bool,
is => 'rw',
default => 0,
);
has _flush_timer => (
is => 'ro',
default => sub {
my $self = shift;
weaken($self);
AnyEvent->timer(
after => 1,
interval => 1,
cb => sub { $self->_flush },
);
},
);
has _needs_optimize => (
isa => Bool,
is => 'rw',
default => 0,
);
has _optimize_timer => (
is => 'ro',
default => sub {
my $self = shift;
weaken($self);
# FIXME!!! This is over-aggressive, you only need to do indexes
# when you've finished writing them.
my $time = 60 * 60; # Every hour
AnyEvent->timer(
after => $time,
interval => $time,
cb => sub { $self->_needs_optimize(1) },
);
},
);
sub _do_optimize {
my $self = shift;
weaken($self);
$self->_am_flushing(1);
my @indexes = sort keys( %{ $self->_indexes } );
$self->_clear_indexes;
$self->_es->optimize_index(
index => $indexes[0],
wait_for_merge => 1,
max_num_segments => 2,
)->cb(sub {
warn("Did optimize of " . $indexes[0] . "\n") if $self->verbose;
$self->_am_flushing(0); $self->_needs_optimize(0) });
}
sub _flush {
my $self = shift;
weaken($self);
return if $self->_am_flushing;
if ($self->_needs_optimize) {
return $self->_do_optimize;
}
my $queue = $self->queue;
return unless scalar @$queue;
$self->_clear_queue;
$self->_am_flushing(1);
my $res = $self->_es->bulk_index(
docs => $queue,
consistency => 'quorum',
);
$res->cb(sub {
my $res = shift;
my @indexes = sort keys( %{ $self->_indexes } );
warn("Indexed " . scalar(@$queue) . " " . join(", ", @indexes) . "\n") if $self->verbose;
$self->_am_flushing(0);
foreach my $result (@{ $res->{results} }) {
if (!$result->{index}->{ok} && !$result->{create}->{ok}) {
warn "Indexing failure: " . Dumper($result) . "\n";
last;
}
}
});
}
1;
=head1 NAME
Message::Passing::Output::ElasticSearch - output logstash messages into ElasticSearch.
=head1 DESCRIPTION
=head1 METHODS
=head2 consume ($msg)
Consumes a message, queuing it for consumption by ElasticSearch
=head1 SEE ALSO
=over
=item L<Message::Passing>
=item L<http://logstash.net>
=back
=head1 AUTHOR
Tomas (t0m) Doran <bobtfish@bobtfish.net>
=head1 COPYRIGHT
=head1 LICENSE
XXX - TODO