Group
Extension

Message-Passing-Collectd/lib/Collectd/Plugin/Read/Message/Passing.pm

package Collectd::Plugin::Read::Message::Passing;
use strict;
use warnings;
use Collectd ();
use JSON;
use Module::Runtime qw/ require_module /;
use String::RewritePrefix ();
use Try::Tiny;
use Message::Passing::Output::Callback;
use AnyEvent;
BEGIN {
    *tid = eval {
        require threads;
    } ? sub { threads->tid } : sub { 0 };
}
use namespace::clean;

our $INPUT;
our %CONFIG;

sub _clean_value {
    my $val = shift;
    scalar(@$val) > 1 ? $val : $val->[0];
}

sub _flatten_item {
    my $item = shift;
    my $val;
    if (scalar(@{$item->{children}})) {
        $val = [ map { my $i = $_; _flatten_item($i) } @{$item->{children}} ];
    }
    else {
        $val = $item->{values};
    }
    return {
        $item->{key} => _clean_value($val)
    }
}

sub config {
    my @items = @{ $_[0]->{children} };
    foreach my $item (@items) {
        %CONFIG = ( %{_flatten_item($item)} , %CONFIG );
    }
}

my %_TYPE_LOOKUP = (
    'COUNTER' => 0,
    'GAUGE' => 1,
);

sub _do_message_passing_read {
    my $message = shift;
    Collectd::plugin_log(Collectd::LOG_WARNING, "Got message from Message::Passing " . JSON::encode_json($message));
    my $vl = {
        values => [ map { $_->{value} } @{$message->{values}} ],
        plugin => $message->{plugin},
        type => $_TYPE_LOOKUP{$message->{values}->[0]->{type}},
    };
    $vl = {
        values => [ map { $_->{value} } @{$message->{values}} ],
        plugin => $message->{plugin},
        type => $message->{type},
        $message->{plugin_instance} ? (plugin_instance => $message->{plugin_instance}) : (),
    };
    Collectd::plugin_log(Collectd::LOG_WARNING, "Got message for collectd " . JSON::encode_json($vl));
    Collectd::plugin_dispatch_values($vl);
}

# ["load",[{"min":0,"max":100,"name":"shortterm","type":1},{"min":0,"max":100,"name":"midterm","type":1},{"min":0,"max":100,"name":"longterm","type":1}],{"plugin":"load","time":1341655869.22588,"type":"load","values":[0.41,0.13,0.08],"interval":10,"host":"ldn-dev-tdoran.youdevise.com"}]
# "transport.tx.size",[{"min":0,"max":0,"name":"transport.tx.size","type":0}],{"plugin":"ElasticSearch","time":1341655799.77979,"type":"transport.tx.size","values":[9725948078],"interval":10,"host":"ldn-dev-tdoran.youdevise.com"}
sub _input {
    if (!$INPUT) {
        try {
            my $out = Message::Passing::Output::Callback->new(
                cb => \&_do_message_passing_read,
            );
            my $decoder = $CONFIG{decoderclass}->new(
                %{ $CONFIG{decoderoptions} },
                output_to => $out,
            );
            $INPUT = $CONFIG{inputclass}->new(
                %{ $CONFIG{inputoptions} },
                output_to => $decoder,
            );
        }
        catch {
            Collectd::plugin_log(Collectd::LOG_WARNING, "Got exception building inputs: $_ - DISABLING thread id " . tid());
            undef $INPUT;
        };
    }
    return $INPUT;
}

sub init {
    if (!$CONFIG{inputclass}) {
        Collectd::plugin_log(Collectd::LOG_WARNING, "No inputclass config for Message::Passing plugin - disabling PID $$ TID " . tid());
        return 0;
    }
    $CONFIG{inputclass} = String::RewritePrefix->rewrite(
        { '' => 'Message::Passing::Input::', '+' => '' },
        $CONFIG{inputclass}
    );
    if (!eval { require_module($CONFIG{inputclass}) }) {
        Collectd::plugin_log(Collectd::LOG_WARNING, "Could not load inputclass=" . $CONFIG{InputClass} . " error: $@");
        return 0;
    }
    $CONFIG{decoderclass} ||= '+Message::Passing::Filter::Decoder::JSON';
    $CONFIG{decoderclass} = String::RewritePrefix->rewrite(
        { '' => 'Message::Passing::Filter::Decoder::', '+' => '' },
        $CONFIG{decoderclass}
    );
    if (!eval { require_module($CONFIG{decoderclass}) }) {
        Collectd::plugin_log(Collectd::LOG_WARNING, "Could not load decoderclass=" . $CONFIG{decoderclass} . " error: $@");
        return 0;
    }
    $CONFIG{inputoptions} ||= {};
    $CONFIG{decoderoptions} ||= {};
    $CONFIG{readtimeslice} = 0.25;
    return 1;
}

sub read {
    _input();
    my $cv = AnyEvent->condvar;
    my $t = AnyEvent->timer(
        after => $CONFIG{readtimeslice},
        cb => sub { $cv->send },
    );
    $cv->recv;
    undef $t;

    return 1;
}

Collectd::plugin_register(
    Collectd::TYPE_INIT, 'Read::Message::Passing', 'Collectd::Plugin::Read::Message::Passing::init'
);
Collectd::plugin_register(
    Collectd::TYPE_CONFIG, 'Read::Message::Passing', 'Collectd::Plugin::Read::Message::Passing::config'
);
Collectd::plugin_register(
    Collectd::TYPE_READ, 'Read::Message::Passing', 'Collectd::Plugin::Read::Message::Passing::read'
);

1;

=head1 NAME

Collectd::Plugin::Read::Message::Passing - Read collectd metrics via Message::Passing

=head1 SYNOPSIS

    # Only tested with 1 read thread!
    ReadThreads   1
    # You MUST setup types.db for all types you emit!
    TypesDB       "/usr/share/collectd/types.db"
    TypesDB       "/usr/local/share/collectd/types_local.db"
    <LoadPlugin perl>
        Globals true
    </LoadPlugin>
    <Plugin perl>
        BaseName "Collectd::Plugin"
        LoadPlugin "Read::Message::Passing"
        <Plugin "Read::Message::Passing">
            # MANDATORY - You MUST configure an output class
            inputclass "ZeroMQ"
            <inputoptions>
                socket_bind "tcp://192.168.0.1:5552"
            </inputoptions>
            # OPTIONAL - Defaults to JSON
            #decoderclass "JSON"
            #<decoderoptions>
            #</decoderoptions>
        </Plugin>
    </Plugin>

    Will consume metrics like this:

    {
        "plugin":"ElasticSearch",
        "time":1341656031.18621,
        "values":[
            {
                "value":0,
                "min":0,
                "name":"indices.get.time",
                "max":0,
                "type":0
            }
        ],
        "type":"indices.get.time",
        "interval":10,
        "host":"t0m.local"
    }

    or, for multi-value metrics:

    {
        "plugin":"load",
        "time":1341655869.22588,
        "type":"load",
        "values":[
            {
                "value":0.41,
                "min":0,"max":100,"name":"shortterm","type":1
            },
            {
                "value":0.13,
                "min":0,
                "max":100,
                "name": "midterm",
                "type":1
            },
            {
                "value":0.08
                "min":0,
                "max":100,
                "name":"longterm",
                "type":1
            }
        ],
        "interval":10,
        "host":"t0m.local"
    }

=head1 DESCRIPTION

A collectd plugin to consume metrics from L<Message::Passing> into L<collectd|http://collectd.org/>.

B<WARNING:> This plugin is pre-alpha, and is only tested with 1 collectd thread and the ZeroMQ Input.

B<NOTE:> You B<MUST> have registered any types you ingest in a C<types.db> for collectd. Failure to do this can result in segfaults!

=head1 PACKAGE VARIABLES

=head2 %CONFIG

A hash containing the following:

=head3 inputclass

The name of the class which will act as the Message::Passing output. Will be used as-is if prefixed with C<+>,
otherwise C<Message::Passing::Input::> will be prepended. Required.

=head3 inputoptions

The hash of options for the input class. Not required, but almost certainly needed.

=head3 decoderclass

The name of the class which will act  the Message::Passing decoder. Will be used as-is if prefixed with C<+>,
otherwise C<Message::Passing::Filter::Decoder::> will be prepended. Optional, defaults to L<JSON|Message::Passing::Filter::Decoder::JSON>.

=head3 decoderoptions

The hash of options for the decoder class.

=head3 readtimeslice

The amount of time to block in Message::Passing's read loop. Defaults to 0.25 seconds, which could
not be enough if you are consuming a lot of metrics..

=head1 FUNCTIONS

=head2 config

Called first with configuration in the config file, munges it into the format expected
and places it into the C<%CONFIG> hash.

=head2 init

Validates the config, and initializes the C<$INPUT>

=head2 read

Blocks for a metric to the output in C<$INPUT>.

=head1 BUGS

Blocking collectd for a fixed time to allow the AnyEvent loop to run is a horrible horrible way
of reading.

=head1 AUTHOR, COPYRIGHT & LICENSE

See L<Message::Passing::Collectd>.

=cut



Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.