Group
Extension

Net-Statsd-Server/lib/Net/Statsd/Server/Backend/Elasticsearch.pm

#
# Flush stats to ElasticSearch (http://www.elasticsearch.org/)
#
# To enable this backend, include 'elasticsearch' in the backends
# configuration array:
#
#   backends: ['./backends/elasticsearch'] 
#  (if the config file is in the statsd folder)
#
# A sample configuration can be found in exampleElasticConfig.js
#
# This backend supports the following config options:
#
#   host:          hostname or IP of ElasticSearch server
#   port:          port of Elastic Search Server
#   path:          http path of Elastic Search Server (default: '/')
#   indexPrefix:   Prefix of the dynamic index to be created (default: 'statsd')
#   indexType:     The dociment type of the saved stat (default: 'stat')
#

package Net::Statsd::Server::Backend::Elasticsearch;

use 5.008;
use strict;
use warnings;
use base qw(Net::Statsd::Server::Backend);

our $VERSION = '0.01';

use HTTP::Request  ();
use LWP::UserAgent ();

my $debug;
my $flushInterval;
my $elasticFilter;
my $elasticHost;
my $elasticPort;
my $elasticPath;
my $elasticIndex;
my $elasticCountType;
my $elasticTimerType;
my $elasticStats = {};
my $statsdIndex;

sub bulk_insert {
  my ($self, $listCounters, $listTimers, $listTimerData) = @_;

  my @utc_date = gmtime();
  my $utc_date = sprintf("%04d.%02d.%02d", $utc_date[5] + 1900, $utc_date[4] + 1, $utc_date[3]);

  $statsdIndex = $elasticIndex . '-' . $utc_date;
  my $payload = '';

  my $key = 0;
  for (@{ $listCounters }) {
    $payload .= '{"index":{"_index":"' . $statsdIndex . '","_type":"' . $elasticCountType . '"}}' . "\n"; 
    $payload .= '{';
    my $innerPayload = '';
    for my $statKey (keys %{ $listCounters->[$key] }) {
      if ($innerPayload) { $innerPayload .= ',' }
      $innerPayload .= '"' . $statKey . '":"' . $listCounters->[$key]->{$statKey} . '"';
    }
    $payload .= $innerPayload . '}' . "\n";
    $key++;
  }

  $key = 0;
  for (@{ $listTimers }) {
    $payload .= '{"index":{"_index":"' . $statsdIndex . '","_type":"' . $elasticTimerType . '"}}' . "\n";
    $payload .= '{';
    my $innerPayload = '';
    for my $statKey (keys %{ $listTimers->[$key] }) {
      if ($innerPayload) { $innerPayload .= ',' }
      $innerPayload .= '"' . $statKey . '":"' . $listTimers->[$key]->{$statKey} . '"';
    }
    $payload .= $innerPayload . '}' . "\n";
    $key++;
  }

  $key = 0;
  for (@{ $listTimerData }) {
    $payload .= '{"index":{"_index":"' . $statsdIndex . '","_type":"' . $elasticTimerType . '_stats"}}' . "\n";
    $payload += '{';
    my $innerPayload = '';
    for my $statKey (keys %{ $listTimerData->[$key] }) {
      if ($innerPayload) { $innerPayload .= ','; }
      $innerPayload .= '"' . $statKey . '":"' . $listTimerData->[$key]->{$statKey} . '"';
    }
    $payload .= $innerPayload . '}' . "\n";
    $key++;
  }

  return $payload;
}

sub post_stats {
  my ($self, $payload) = @_;

  my $optionsPost = {
    host => $elasticHost,
    port => $elasticPort,
    path => $elasticPath . $statsdIndex . '/_bulk',
    method => 'POST',
    headers => {
      'Content-Type'   => 'application/json',
      'Content-Length' => length($payload),
    }
  };

  my $req = HTTP::Request->new();
  $req->method($optionsPost->{method});
  $req->uri("http://" . $optionsPost->{host} . ":" . ($optionsPost->{port} || 80) . $optionsPost->{path});
  while (my ($name, $value) = each %{ $optionsPost->{headers} }) {
    $req->header($name => $value);
  }
  $req->content($payload);
  warn "-------------- Sending to Elasticsearch:\n$payload\n\n";
  my $lwp = LWP::UserAgent->new();
  $lwp->agent("Net::Statsd::Server::Backend::ElasticSearch/$VERSION");
  return $lwp->request($req);
}

sub flush {
  my ($self, $timestamp, $metrics) = @_;

  my $statString = '';
  my $numStats = 0;
  my $key;
  my @counts;
  my @timers;
  my @timer_data;

  $timestamp *= 1000;

  for my $key (keys %{ $metrics->{counters} }) {
    #my @keys = split m{\.}, $key;
    if (defined $elasticFilter && $key !~ $elasticFilter) {
      next;
    }
    my $value = $metrics->{counters}->{$key};
    #push @counts, {
    #  ns  => $keys[0] || '',
    #  grp => $keys[1] || '',
    #  tgt => $keys[2] || '',
    #  act => $keys[3] || '',
    #  val => $value,
    #};
    push @counts, {
      key => $key,
      counter => $value,
      '@timestamp' => $timestamp,
    };
    $numStats++;
  }

  for my $key (keys %{ $metrics->{timers} }) {
    my @keys = split m{\.}, $key;
    my $series = $metrics->{timers}->{$key};
    if (defined $elasticFilter && $key !~ $elasticFilter) {
      next;
    }
    for my $keyTimer (keys %{ $series }) {
      my $value = $series->{$keyTimer};
      push @timers, {
        key => $key,
        timer => $value,
	'@timestamp' => $timestamp,
      };
#     push @timers, {
#	ns  => $keys[0] || '',
#	grp => $keys[1] || '',
#	tgt => $keys[2] || '',
#	act => $keys[3] || '',
#	val => $value,
#	'@timestamp' => $timestamp,
#     };
    }
  }

  for my $key (keys %{ $metrics->{timer_data} }) {
    my @keys = split m{\.}, $key;
    if (defined $elasticFilter && $key !~ $elasticFilter) {
      next;
    }
    my $value = $metrics->{timer_data}->{$key};
    $value->{'@timestamp'} = $timestamp;
    $value->{key} = $key;
    if (defined $value->{histogram}) {
      for my $keyH (keys %{ $value->{histogram} }) {
        $value->{$keyH} = $value->{histogram}->{$keyH};
      }
      delete $value->{histogram};
    }
    push @timer_data, $value;
    $numStats++;
  }

  my $es_payload = $self->bulk_insert(\@counts, \@timers, \@timer_data);

  if ($numStats > 0 && $es_payload) {
    $self->post_stats($es_payload);
    if ($debug) {
      warn "flushed ${numStats} stats to ElasticSearch\n";
    }
  }

}

#var elastic_backend_status = function graphite_status(writeCb) {
#  for (stat in elasticStats) {
#    writeCb(null, 'elastic', stat, elasticStats[stat]);
#  }
#};

sub init {
  my ($self, $startup_time, $config, $events) = @_;

  $debug = $config->{debug};
  my $configEs = $config->{elasticsearch} || {};

  $elasticHost      = $configEs->{host}          || 'localhost';
  $elasticPort      = $configEs->{port}          || 9200;
  $elasticPath      = $configEs->{path}          || '/';
  $elasticIndex     = $configEs->{indexPrefix}   || 'statsd';
  # Only sends stats that match the 'statsFilter' substring
  $elasticFilter    = $configEs->{statsFilter}   || 'vstatd',
  $elasticCountType = $configEs->{countType}     || 'counter';
  $elasticTimerType = $configEs->{timerType}     || 'timer';
  $elasticTimerType = $configEs->{timerDataType} || 'timer_data';
  $flushInterval    = $config->{flushInterval};

  $elasticStats->{last_flush} = $startup_time;
  $elasticStats->{last_exception} = $startup_time;

  #events.on('flush', flush_stats);
  #events.on('status', elastic_backend_status);

  return 1;
}

1;


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.