Group
Extension

InfluxDB-Client/lib/InfluxDB/Client.pm

package InfluxDB::Client;

use 5.006;
use strict;
use warnings;

use Carp;
use IO::Socket::INET;
use JSON::MaybeXS;
use LWP::UserAgent;
use URI;

=head1 NAME

InfluxDB::Client - The lightweight InfluxDB client

=head1 VERSION

Version 0.01

=cut

our $VERSION = '0.01';

=head1 SYNOPSIS

InfluxDB::Client provides an easy way to interact with an InfluxDB server.

    use InfluxDB::Client;

    ########################## TCP ##########################
    my $client = InfluxDB::Client->new( host => 'server.address.com', port => 8086, protocol => 'tcp' ) or die "Can't instantiate client";

    # Check server connectivity
    my $result = $client->ping();
    die "No pong" unless $result;

    # You can also get the server version
    print $result->{version};

    # Read
    $result = $client->query('SELECT "severity_code" FROM "syslog" WHERE ("severity" = \'err\' AND "hostname" =~ /^(srv01|srv02)$/) AND time >= 1558878013531ms and time <= 1609886964827ms', database => 'grafana');

    # Write
    $result = $client->write("testing,host=containment,repo=cadi-libs,file=testfile statement=42,pod=85", database => 'dbname');

    ########################## UDP ##########################
    $client = InfluxDB::Client->new( host => 'server.address.com', port => 8089, protocol => 'udp' ) or die "Can't instantiate client";

    # UDP allows only write()
    $result = $client->write("testing,host=containment,repo=cadi-libs,file=testfile statement=47,pod=89", database => 'grafana');

=head1 WHY

In its current state this module offers few additional features over InfluxDB::HTTP (from which it's derived) 

The only reasons why you would use this module are:
=over

=item *
Less dependencies (no Object::Result and its dependencies)

=item *
You want to use UDP protocol for writing (WIP)

=back

=cut

=head1 SUBROUTINES/METHODS

=head2 new ( [%options] )

Constructor.
%otions is an hash with the following keys:

=over

=item*
host - Server hostname (default: 'localhost')

=item*
port - Server port (default: 8086)

=item*
timeout - Timeout value in seconds (default: 180)

=item*
protocol - Transport protocol (default: 'tcp')

=back

=cut

sub new {
    my $class = shift;
    my %args = ( host     => 'localhost',
                 port     => 8086,
                 timeout  => 180,
                 protocol => 'tcp',
                 @_,
    );
    my ( $host, $port, $timeout, $protocol ) = map { lc } @args{ 'host', 'port', 'timeout', 'protocol' };

    my $self = { host => $host,
                 port => $port,
                 protocol => $protocol
    };

    if ( $protocol eq 'tcp' ) {
        my $ua = LWP::UserAgent->new();
        $ua->agent("InfluxDB-Client/$VERSION");
        $ua->timeout($timeout);
        $self->{lwp_user_agent} = $ua;
    } else {
        die "Unknown protocol: $protocol" unless $protocol eq "udp";

        my $socket = IO::Socket::INET->new( PeerAddr => "$host:$port",
                                            Proto    => $protocol,
                                            Blocking => 0
        ) || die("Can't open socket: $@");
        $self->{udp} = $socket;
    }

    bless $self, $class;

    return $self;
}

=head2 ping()

Check the server connectivity.

Returns an hashref which evaluates to true if the connection is ok and to false otherwise.
The hashref has the following keys:

=over

=item*
raw - The raw response from the server

=item*
error - The error message returned by the server (empty on success)

=item*
version - The InfluxDB verstion returned by the server through the 'X-Influxdb-Version' header

=back

=cut

sub ping {
    my ($self)   = @_;
    my $uri      = $self->_get_influxdb_http_api_uri('ping');
    my $response = $self->{lwp_user_agent}->head( $uri->canonical() );

    if ( !$response->is_success() ) {
        my $error = $response->message();
        return { raw     => $response,
                 error   => $error,
                 version => undef,
        };
    }

    my $version = $response->header('X-Influxdb-Version');
    return { raw     => $response,
             error   => undef,
             version => $version,
    };
}

=head2 query( $query [, %options] )

=cut

sub query {
    my $self  = shift;
    my $query = shift;
    my %args  = ( epoch => 'ns', @_ );
    my ( $database, $chunk_size, $epoch ) = @args{ 'database', 'chunk_size', 'epoch' };

    die "Missing argument 'query'" if !$query;
    die "Argument epoch '$epoch' is not one of (h,m,s,ms,u,ns)" if $epoch !~ /^(h|m|s|ms|u|ns)$/;

    if ( ref($query) eq 'ARRAY' ) {
        $query = join( ';', @$query );
    }

    my $uri = $self->_get_influxdb_http_api_uri('query');

    $uri->query_form( q => $query,
                      ( $database   ? ( db         => $database )   : () ),
                      ( $chunk_size ? ( chunk_size => $chunk_size ) : () ),
                      ( $epoch      ? ( epoch      => $epoch )      : () )
    );

    my $response = $self->{lwp_user_agent}->post( $uri->canonical() );

    chomp( my $content = $response->content() );

    my $error;
    if ( $response->is_success() ) {
        local $@;
        my $data = eval { decode_json($content) };
        $error = $@;

        if ($data) {
            $error = $data->{error};
        }

        if ( !$error ) {
            $data->{request_id} = $response->header('Request-Id');
            return { raw   => $response,
                     data  => $data,
                     error => undef,
            };
        }
    } else {
        $error = $content;
    }

    return { raw   => $response,
             data  => undef,
             error => $error,
    };
}

=head2 write

=cut

sub write {
    my $self        = shift;
    my $measurement = shift;
    my %args        = @_;
    my ( $database, $precision, $retention_policy ) = @args{ 'database', 'precision', 'retention_policy' };

    die "Missing argument 'measurement'"                                        if !$measurement;
    die "Missing argument 'database'"                                           if !$database;
    die "Argument precision '$precision' is set and not one of (h,m,s,ms,u,ns)" if $precision && $precision !~ /^(h|m|s|ms|u|ns)$/;

    if ( ref($measurement) eq 'ARRAY' ) {
        $measurement = join( "\n", @$measurement );
    }

  if ($self->{protocol} eq 'tcp') {
    my $uri = $self->_get_influxdb_http_api_uri('write');

    $uri->query_form( db => $database,
                      ( $precision        ? ( precision => $precision )        : () ),
                      ( $retention_policy ? ( rp        => $retention_policy ) : () )
    );

    my $response = $self->{lwp_user_agent}->post( $uri->canonical(), Content => $measurement );

    chomp( my $content = $response->content() );

    if ( $response->code() != 204 ) {
        local $@;
        my $data = eval { decode_json($content) };
        my $error = $@;
        $error = $data->{error} if ( !$error && $data );

        return { raw   => $response,
                 error => $error,
                 data  => 0,
        };
    }

    return { raw   => $response,
             error => undef,
             data  => 1,
    };

  } else {
  # Udp send
  my $bytes = $self->{udp}->send($measurement);
        return { raw   => undef,
                 error => undef,
                 data  => $bytes?1:0,
        };
  }
}

sub _get_influxdb_http_api_uri {
    my ( $self, $endpoint ) = @_;

    die "Missing argument 'endpoint'" if !$endpoint;

    my $uri = URI->new();

    $uri->scheme('http');
    $uri->host( $self->{host} );
    $uri->port( $self->{port} );
    $uri->path($endpoint);

    return $uri;
}

1;

=head1 AUTHOR

Arnaud (Arhuman) ASSAD, C<< <aassad at cpan.org> >>

=head1 BUGS

Please report any bugs or feature requests to C<bug-influxdb-client at rt.cpan.org>, or through
the web interface at L<https://rt.cpan.org/NoAuth/ReportBug.html?Queue=InfluxDB-Client>.  I will be notified, and then you'll
automatically be notified of progress on your bug as I make changes.

=head1 SEE ALSO

This module is derived from InfluxDB::HTTP.

=head1 SUPPORT

You can find documentation for this module with the perldoc command.

    perldoc InfluxDB::Client


You can also look for information at:

=over 4

=item * RT: CPAN's request tracker (report bugs here)

L<https://rt.cpan.org/NoAuth/Bugs.html?Dist=InfluxDB-Client>

=item * AnnoCPAN: Annotated CPAN documentation

L<http://annocpan.org/dist/InfluxDB-Client>

=item * CPAN Ratings

L<https://cpanratings.perl.org/d/InfluxDB-Client>

=item * Search CPAN

L<https://metacpan.org/release/InfluxDB-Client>

=back


=head1 ACKNOWLEDGEMENTS


=head1 LICENSE AND COPYRIGHT

This software is copyright (c) 2020 by Arnaud (Arhuman) ASSAD.

This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.


=cut

1;    # End of InfluxDB::Client


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.