Group
Extension

ETL-Yertl/lib/ETL/Yertl/Adapter/graphite.pm

package ETL::Yertl::Adapter::graphite;
our $VERSION = '0.044';
# ABSTRACT: Adapter to read/write from Graphite time series database

#pod =head1 SYNOPSIS
#pod
#pod     my $db = ETL::Yertl::Adapter::graphite->new( 'graphite://localhost:8086' );
#pod     my @points = $db->read_ts( { metric => 'db.cpu_load.1m' } );
#pod     $db->write_ts( { metric => 'db.cpu_load.1m', value => 1.23 } );
#pod
#pod =head1 DESCRIPTION
#pod
#pod This class allows Yertl to read and write time series from L<the
#pod Graphite time series system|https://graphiteapp.org>. Yertl can write to
#pod Carbon using the "plaintext" protocol, and read from Graphite using its
#pod HTTP API.
#pod
#pod This adapter is used by the L<yts> command.
#pod
#pod =head2 Metric Name Format
#pod
#pod Graphite metrics are paths separated by C<.>, which is the native format
#pod that Yertl supports.
#pod
#pod     yts graphite://localhost foo.bar.baz
#pod     yts graphite://localhost foo.bar
#pod
#pod =head1 SEE ALSO
#pod
#pod L<ETL::Yertl>, L<yts>,
#pod L<Write to Carbon|http://graphite.readthedocs.io/en/latest/feeding-carbon.html>
#pod L<Read from Graphite|http://graphite.readthedocs.io/en/latest/render_api.html>
#pod
#pod =cut

use ETL::Yertl;
use Net::Async::HTTP;
use URI;
use JSON::MaybeXS qw( decode_json );
use List::Util qw( first );
use IO::Async::Loop;
use Time::Piece ();
use Scalar::Util qw( looks_like_number );

#pod =method new
#pod
#pod     my $db = ETL::Yertl::Adapter::graphite->new( 'graphite://localhost' );
#pod     my $db = ETL::Yertl::Adapter::graphite->new( 'graphite://localhost:8080' );
#pod
#pod Construct a new Graphite adapter for the database on the given host and port.
#pod Port is optional and defaults to C<2003> (the Carbon plaintext port) for writing
#pod and C<8080> (the default Graphite HTTP API port) for reading.
#pod
#pod =cut

sub new {
    my $class = shift;

    my %args;
    if ( @_ == 1 ) {
        if ( $_[0] =~ m{://([^:]+)(?::([^/]+))?} ) {
            ($args{host}, my $port ) = ( $1, $2 );
            if ( $port ) {
                @args{qw( http_port write_port )} = ( $port ) x 2;
            }
        }
    }
    else {
        %args = @_;
    }

    die "Host is required" unless $args{host};

    $args{write_port} ||= 2003; # "plaintext" port
    $args{http_port} ||= 8080; # http port

    return bless \%args, $class;
}

sub _loop {
    my ( $self ) = @_;
    return $self->{_loop} ||= IO::Async::Loop->new;
}

sub write_client {
    my ( $self ) = @_;
    return $self->{write_client} ||= $self->_loop->connect(
        socktype => 'stream',
        host => $self->{host},
        service => $self->{write_port},
    )->get;
}

sub http_client {
    my ( $self ) = @_;
    return $self->{http_client} ||= do {
        my $http = Net::Async::HTTP->new;
        $self->_loop->add( $http );
        $http;
    };
}

#pod =method read_ts
#pod
#pod     my @points = $db->read_ts( $query );
#pod
#pod Read a time series from the database. C<$query> is a hash reference
#pod with the following keys:
#pod
#pod =over
#pod
#pod =item metric
#pod
#pod The time series to read. For Graphite, this is a  path separated by dots
#pod (C<.>).
#pod
#pod =item start
#pod
#pod An ISO8601 date/time for the start of the series points to return,
#pod inclusive.
#pod
#pod =item end
#pod
#pod An ISO8601 date/time for the end of the series points to return,
#pod inclusive.
#pod
#pod =item tags
#pod
#pod B<NOTE>: Graphite does not support per-value tags. Using this field will
#pod cause a fatal error.
#pod
#pod =back
#pod
#pod =cut

sub read_ts {
    my ( $self, $query ) = @_;
    die "Tags are not supported by Graphite" if $query->{tags} && keys %{ $query->{tags} };
    my $metric = $query->{ metric };

    my %form = (
        target => $metric,
        format => 'json',
        noNullPoints => 'true',
    );

    if ( $query->{ start } ) {
        $form{ from } = _format_graphite_dt( $query->{start} );
    }
    if ( $query->{ end } ) {
        $form{ until } = _format_graphite_dt( $query->{end} );
    }

    my $url = URI->new( sprintf 'http://%s:%s/render', $self->{host}, $self->{http_port} );
    $url->query_form( %form );

    #; say "Fetching $url";
    my $res = $self->http_client->GET( $url )->get;

    #; say $res->decoded_content;
    if ( $res->is_error ) {
        die sprintf "Error fetching metric '%s': " . $res->decoded_content . "\n", $metric;
    }

    my $result = decode_json( $res->decoded_content );
    my @points;
    for my $series ( @{ $result } ) {
        for my $point ( @{ $series->{datapoints} } ) {
            push @points, {
                metric => $series->{target},
                timestamp => Time::Piece->gmtime( $point->[1] )->datetime,
                value => $point->[0],
            };
        }
    }

    return @points;
}

#pod =method write_ts
#pod
#pod     $db->write_ts( @points );
#pod
#pod Write time series points to the database. C<@points> is an array
#pod of hashrefs with the following keys:
#pod
#pod =over
#pod
#pod =item metric
#pod
#pod The metric to write. For Graphite, this is a path separated by dots
#pod (C<.>).
#pod
#pod =item timestamp
#pod
#pod An ISO8601 timestamp or UNIX epoch time. Optional. Defaults to the
#pod current time.
#pod
#pod =item value
#pod
#pod The metric value.
#pod
#pod =item tags
#pod
#pod B<NOTE>: Graphite does not support per-value tags. Using this field will
#pod cause a fatal error.
#pod
#pod =back
#pod
#pod =cut

sub write_ts {
    my ( $self, @points ) = @_;
    my $sock = $self->write_client;
    for my $point ( @points ) {
        die "Tags are not supported by Graphite" if $point->{tags} && keys %{ $point->{tags} };
        $point->{timestamp} ||= time;
        $point->{timestamp} =~ s/[.]\d+Z?$//; # We do not support nanoseconds
        if ( !looks_like_number( $point->{timestamp} ) ) {
            $point->{timestamp} = Time::Piece->strptime( $point->{timestamp}, '%Y-%m-%dT%H:%M:%S' )->epoch;
        }
        $sock->write(
            join( " ", $point->{metric}, $point->{value}, $point->{timestamp}, )
            . "\n",
        );
    }
    return;
}

#=sub _format_graphite_dt
#
#   my $graphite_dt = _format_graphite_dt( $iso_dt );
#
# Graphite supports two date/time formats: YYYYMMDD and, bizarrely,
# HH:MM_YYYYMMDD
sub _format_graphite_dt {
    my ( $iso ) = @_;
    if ( looks_like_number( $iso ) ) {
        my $t = Time::Piece->new( $iso );
        return sprintf "%s:%s_%s%s%s", $t->hour, $t->min, $t->year, $t->mon, $t->mday;
    }
    if ( $iso =~ /^(\d{4})-?(\d{2})-?(\d{2})$/ ) {
        return join "", $1, $2, $3;
    }
    $iso =~ /^(\d{4})-?(\d{2})-?(\d{2})[T ]?(\d{2}):?(\d{2})/;
    return "$4:$5_$1$2$3";
}

1;

__END__

=pod

=head1 NAME

ETL::Yertl::Adapter::graphite - Adapter to read/write from Graphite time series database

=head1 VERSION

version 0.044

=head1 SYNOPSIS

    my $db = ETL::Yertl::Adapter::graphite->new( 'graphite://localhost:8086' );
    my @points = $db->read_ts( { metric => 'db.cpu_load.1m' } );
    $db->write_ts( { metric => 'db.cpu_load.1m', value => 1.23 } );

=head1 DESCRIPTION

This class allows Yertl to read and write time series from L<the
Graphite time series system|https://graphiteapp.org>. Yertl can write to
Carbon using the "plaintext" protocol, and read from Graphite using its
HTTP API.

This adapter is used by the L<yts> command.

=head2 Metric Name Format

Graphite metrics are paths separated by C<.>, which is the native format
that Yertl supports.

    yts graphite://localhost foo.bar.baz
    yts graphite://localhost foo.bar

=head1 METHODS

=head2 new

    my $db = ETL::Yertl::Adapter::graphite->new( 'graphite://localhost' );
    my $db = ETL::Yertl::Adapter::graphite->new( 'graphite://localhost:8080' );

Construct a new Graphite adapter for the database on the given host and port.
Port is optional and defaults to C<2003> (the Carbon plaintext port) for writing
and C<8080> (the default Graphite HTTP API port) for reading.

=head2 read_ts

    my @points = $db->read_ts( $query );

Read a time series from the database. C<$query> is a hash reference
with the following keys:

=over

=item metric

The time series to read. For Graphite, this is a  path separated by dots
(C<.>).

=item start

An ISO8601 date/time for the start of the series points to return,
inclusive.

=item end

An ISO8601 date/time for the end of the series points to return,
inclusive.

=item tags

B<NOTE>: Graphite does not support per-value tags. Using this field will
cause a fatal error.

=back

=head2 write_ts

    $db->write_ts( @points );

Write time series points to the database. C<@points> is an array
of hashrefs with the following keys:

=over

=item metric

The metric to write. For Graphite, this is a path separated by dots
(C<.>).

=item timestamp

An ISO8601 timestamp or UNIX epoch time. Optional. Defaults to the
current time.

=item value

The metric value.

=item tags

B<NOTE>: Graphite does not support per-value tags. Using this field will
cause a fatal error.

=back

=head1 SEE ALSO

L<ETL::Yertl>, L<yts>,
L<Write to Carbon|http://graphite.readthedocs.io/en/latest/feeding-carbon.html>
L<Read from Graphite|http://graphite.readthedocs.io/en/latest/render_api.html>

=head1 AUTHOR

Doug Bell <preaction@cpan.org>

=head1 COPYRIGHT AND LICENSE

This software is copyright (c) 2018 by Doug Bell.

This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.

=cut


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.