Group
Extension

InfluxDB/lib/InfluxDB.pm

package InfluxDB;

use strict;
use warnings;
use 5.010_000;

our $VERSION = '1.005';

use Class::Accessor::Lite (
    new => 0,
    ro  => [qw(host port username database ssl json)],
);

use Carp;
use Data::Validator;
use Mouse::Util::TypeConstraints;
use Furl;
use URI::Escape;
use JSON 2;

enum 'TimePrecision' => qw(s m u);

subtype 'JSONBool' => as 'ScalarRef';
coerce 'JSONBool'
    => from 'Bool' => via { $_ ? \1 : \0 }
    => from 'Object' => via { JSON::is_bool($_) ? ($_ == 1 ? \1 : \0) : \0 }
;

sub new {
    state $rule = Data::Validator->new(
        host     => { isa => 'Str' },
        port     => { isa => 'Int', default => 8086 },
        username => { isa => 'Str', optional => 1 },
        password => { isa => 'Str', optional => 1 },
        database => { isa => 'Str' },
        ssl      => { isa => 'Bool', default => 0 },

        timeout  => { isa => 'Int', default => 120 },
        debug    => { isa => 'Bool', optional => 1 },
    )->with('NoRestricted')->with('Method');
    # Mouse::Util::apply_all_role leaks memory when takes 2 or more extensions.
    # so apply for each extension.
    my($class, $args) = $rule->validate(@_);

    if (delete $args->{debug}) {
        $ENV{IX_DEBUG} = 1;
    }

    my $self = bless {
        ua     => Furl->new(
            agent   => join('/', __PACKAGE__, $VERSION),
            timeout => $args->{timeout},
        ),
        json   => JSON->new,
        status => {},
        %$args
    }, $class;

    return $self;
}

sub debugf {
    return unless $ENV{IX_DEBUG};
    print STDERR "[DEBUG] ",sprintf(shift @_, @_),"\n";
}

sub status {
    my($self, $res) = @_;

    if ($res) {
        $self->{status} = {
            code        => $res->code,
            message     => $res->message,
            status_line => $res->status_line,
            content     => $res->content,
        };
        debugf("content: %s", $res->content);
    }

    return $self->{status};
}

sub errstr {
    my($self) = @_;
    my $errstr = "";

    if (substr($self->{status}{code}, 0, 2) ne "20") {
        $errstr = join("\n",
                       $self->{status}{status_line},
                       $self->{status}{content},
                   );
    }

    return $errstr;
}

sub as_hash {
    my(undef, $result) = @_;
    my $h;

    for my $r (@{ $result }) {
        my $series  = $r->{name};
        my @columns = @{ $r->{columns} };

        for my $p (@{ $r->{points} }) {
            my %ph;
            @ph{ @columns } = @$p;
            push @{ $h->{$series} }, \%ph;
        }
    }

    return $h;
}

sub switch_database {
    state $rule = Data::Validator->new(
        database => { isa => 'Str' },
    )->with('Method');
    my($self, $args) = $rule->validate(@_);

    $self->{database} = $args->{database};

    return 1;
}

sub switch_user {
    state $rule = Data::Validator->new(
        username => { isa => 'Str' },
        password => { isa => 'Str' },
    )->with('Method');
    my($self, $args) = $rule->validate(@_);

    $self->{username} = $args->{username};
    $self->{password} = $args->{password};

    return 1;
}

### database #############################################################
sub create_database {
    state $rule = Data::Validator->new(
        database => { isa => 'Str' },
    )->with('Method');
    my($self, $args) = $rule->validate(@_);

    my $url = $self->_build_url(
        path => '/db',
    );

    my $res = $self->{ua}->post($url, [], $self->json->encode({
        name => $args->{database},
    }));
    $self->status($res);

    return $res->is_success ? 1 : ();
}

sub list_database {
    my $self = shift;

    my $url = $self->_build_url(
        path => '/db',
    );

    my $res = $self->{ua}->get($url);
    $self->status($res);

    return $res->is_success ? $self->json->decode($res->content) : ();
}

sub delete_database {
    state $rule = Data::Validator->new(
        database => { isa => 'Str' },
    )->with('Method');
    my($self, $args) = $rule->validate(@_);

    my $url = $self->_build_url(
        path => '/db/'. $args->{database},
    );

    my $res = $self->{ua}->delete($url);
    $self->status($res);

    return $res->is_success ? 1 : ();
}

### series ###############################################################
## hmmm v0.5.1 (latest version) returns empty response
## https://github.com/FGRibreau/influxdb-cli/issues/8
sub list_series {
    my $self = shift;
    return $self->query(q => "list series");
}

### points ###############################################################
sub write_points {
    state $rule = Data::Validator->new(
        data           => { isa => 'ArrayRef|HashRef' },
        time_precision => { isa => 'TimePrecision', optional => 1 },
    )->with('Method');
    my($self, $args) = $rule->validate(@_);

    my $data = ref($args->{data}) eq 'ARRAY' ? $args->{data} : [$args->{data}];
    $data = $self->json->encode($data);
    debugf("data: %s", $data);

    my $url = $self->_build_url(
       path =>  '/db/' . $self->database . '/series',
       qs   => {
           time_precision => exists $args->{time_precision} ? $args->{time_precision} :  undef,
       },
    );

    my $res = $self->{ua}->post($url, [], $data);
    $self->status($res);

    return $res->is_success ? 1 : ();
}

sub delete_points {
    state $rule = Data::Validator->new(
        name  => { isa => 'Str' },
    )->with('Method');
    my($self, $args) = $rule->validate(@_);

    my $url = $self->_build_url(
       path =>  '/db/' . $self->database . '/series/' . $args->{name},
       qs   => {
           %$args,
       },
    );

    my $res = $self->{ua}->delete($url);
    $self->status($res);

    return $res->is_success ? 1 : ();
}

sub create_scheduled_deletes {
    croak "Not implemented in InfluxDB v0.5.1";
    # state $rule = Data::Validator->new(
    # )->with('Method');
    # my($self, $args) = $rule->validate(@_);
}

sub list_scheduled_deletes {
    croak "Not implemented in InfluxDB v0.5.1";
    # state $rule = Data::Validator->new(
    # )->with('Method');
    # my($self, $args) = $rule->validate(@_);
}

sub delete_scheduled_deletes {
    croak "Not implemented in InfluxDB v0.5.1";
    # state $rule = Data::Validator->new(
    # )->with('Method');
    # my($self, $args) = $rule->validate(@_);
}

sub query {
    state $rule = Data::Validator->new(
        q              => { isa => 'Str' },
        time_precision => { isa => 'TimePrecision', optional => 1 },
        chunked        => { isa => 'Bool', default => 0 },
        # order          => { isa => 'Str', optional => 1 }, # not implemented?
    )->with('Method');
    my($self, $args) = $rule->validate(@_);

    my $url = $self->_build_url(
        path => '/db/' . $self->database . '/series',
        qs   => {
            q              => $args->{q},
            time_precision => (exists $args->{time_precision} ? $args->{time_precision} :  undef),
            chunked        => ($args->{chunked} ? 'true' : 'false'),
            order          => (exists $args->{order} ? $args->{order} :  undef),
        },
    );

    my $res = $self->{ua}->get($url);
    $self->status($res);

    if ($res->is_success) {
        my $result;
        if ($args->{chunked}) {
            $result = [$self->json->incr_parse($res->content)];
        } else {
            $result = $self->json->decode($res->content);
        }
        return $result;
    } else {
        return;
    }
}

### Continuous Queries ###################################################
sub create_continuous_query {
    state $rule = Data::Validator->new(
        q    => { isa => 'Str' },
        name => { isa => 'Str' },
    )->with('Method');
    my($self, $args) = $rule->validate(@_);

    return $self->query(q => "$args->{q} into $args->{name}");
}

sub list_continuous_queries {
    my $self = shift;
    return $self->query(q => "list continuous queries");
}

sub drop_continuous_query {
    state $rule = Data::Validator->new(
        id => { isa => 'Str' },
    )->with('Method');
    my($self, $args) = $rule->validate(@_);

    return $self->query(q => "drop continuous query $args->{id}");
}

### user #################################################################
sub create_database_user {
    state $rule = Data::Validator->new(
        name     => { isa => 'Str' },
        password => { isa => 'Str' },
    )->with('Method');
    my($self, $args) = $rule->validate(@_);

    my $url = $self->_build_url(
        path => '/db/' . $self->database . '/users',
    );

    my $res = $self->{ua}->post($url, [], $self->json->encode({
        name     => $args->{name},
        password => $args->{password},
    }));
    $self->status($res);

    return $res->is_success ? 1 : ();
}

sub delete_database_user {
    state $rule = Data::Validator->new(
        name => { isa => 'Str' },
    )->with('Method');
    my($self, $args) = $rule->validate(@_);

    my $url = $self->_build_url(
        path => '/db/' . $self->database . '/users/' . $args->{name},
    );

    my $res = $self->{ua}->delete($url);
    $self->status($res);

    return $res->is_success ? 1 : ();
}

sub update_database_user {
    state $rule = Data::Validator->new(
        name     => { isa => 'Str' },
        password => { isa => 'Str', optional => 1 },
        admin    => { isa => 'JSONBool', optional => 1 },
    )->with('Method');
    my($self, $args) = $rule->validate(@_);

    my $url = $self->_build_url(
        path => '/db/' . $self->database . '/users/' . $args->{name},
    );

    my $res = $self->{ua}->post($url, [], $self->json->encode({
        exists $args->{password} ? (password => $args->{password}) : (),
        exists $args->{admin} ? (admin => $args->{admin}) : (),
    }));
    $self->status($res);

    return $res->is_success ? 1 : ();
}

sub list_database_users {
    my $self = shift;

    my $url = $self->_build_url(
        path => '/db/' . $self->database . '/users',
    );

    my $res = $self->{ua}->get($url);
    $self->status($res);

    return $res->is_success ? $self->json->decode($res->content) : ();
}

sub show_database_user {
    state $rule = Data::Validator->new(
        name => { isa => 'Str' },
    )->with('Method');
    my($self, $args) = $rule->validate(@_);

    my $url = $self->_build_url(
        path => '/db/' . $self->database . '/users/' . $args->{name},
    );

    my $res = $self->{ua}->get($url);
    $self->status($res);

    return $res->is_success ? $self->json->decode($res->content) : ();
}

sub create_cluster_admin {
    state $rule = Data::Validator->new(
        name     => { isa => 'Str' },
        password => { isa => 'Str' },
    )->with('Method');
    my($self, $args) = $rule->validate(@_);

    my $url = $self->_build_url(
        path => '/cluster_admins',
    );

    my $res = $self->{ua}->post($url, [], $self->json->encode({
        name     => $args->{name},
        password => $args->{password},
    }));
    $self->status($res);

    return $res->is_success ? 1 : ();
}

sub delete_cluster_admin {
    state $rule = Data::Validator->new(
        name => { isa => 'Str' },
    )->with('Method');
    my($self, $args) = $rule->validate(@_);

    my $url = $self->_build_url(
        path => '/cluster_admins/' . $args->{name},
    );

    my $res = $self->{ua}->delete($url);
    $self->status($res);

    return $res->is_success ? 1 : ();
}

sub update_cluster_admin {
    state $rule = Data::Validator->new(
        name     => { isa => 'Str' },
        password => { isa => 'Str' },
    )->with('Method');
    my($self, $args) = $rule->validate(@_);

    my $url = $self->_build_url(
        path => '/cluster_admins/' . $args->{name},
    );

    my $res = $self->{ua}->post($url, [], $self->json->encode({
        exists $args->{password} ? (password => $args->{password}) : (),
    }));
    $self->status($res);

    return $res->is_success ? 1 : ();
}

sub list_cluster_admins {
    my $self = shift;

    my $url = $self->_build_url(
        path => '/cluster_admins',
    );

    my $res = $self->{ua}->get($url);
    $self->status($res);

    return $res->is_success ? $self->json->decode($res->content) : ();
}

# sub show_cluster_admin {
#     state $rule = Data::Validator->new(
#         name => { isa => 'Str' },
#     )->with('Method');
#     my($self, $args) = $rule->validate(@_);

#     my $url = $self->_build_url(
#         path => '/cluster_admins/' . $args->{name},
#     );

#     my $res = $self->{ua}->get($url);
#     $self->status($res);

#     return $res->is_success ? $self->json->decode($res->content) : ();
# }

### utils ################################################################
sub _build_url {
    state $rule = Data::Validator->new(
        path => { isa => 'Str' },
        qs   => { isa => 'HashRef', optional => 1 },
    )->with('Method');
    my($self, $args) = $rule->validate(@_);

    my $url = sprintf("%s://%s%s:%d%s",
                      ($self->ssl ? 'https' : 'http'),
                      ($self->username and $self->{password})
                          ? sprintf('%s:%s@', $self->username, $self->{password})
                          : '',
                      $self->host,
                      $self->port,
                      $args->{path},
                  );
    if (exists $args->{qs}) {
        my @qs;
        for my $k (keys %{ $args->{qs} }) {
            next unless defined $args->{qs}{$k};
            push @qs, join('=', uri_escape($k), uri_escape($args->{qs}{$k}));
        }
        $url .= '?' . join('&', @qs) if @qs;
    }

    debugf("url: %s", $url);
    return $url
}

1;

__END__

=encoding utf-8

=begin html

<a href="https://travis-ci.org/hirose31/p5-InfluxDB"><img src="https://travis-ci.org/hirose31/p5-InfluxDB.png?branch=master" alt="Build Status" /></a>
<a href="https://coveralls.io/r/hirose31/p5-InfluxDB?branch=master"><img src="https://coveralls.io/repos/hirose31/p5-InfluxDB/badge.png?branch=master" alt="Coverage Status" /></a>

=end html

=head1 NAME

InfluxDB - Client library for InfluxDB

=begin readme

=head1 INSTALLATION

To install this module, run the following commands:

    perl Build.PL
    ./Build
    ./Build test
    ./Build install

=end readme

=head1 CAUTION

    The JSON write protocol is deprecated as of InfluxDB 0.9.1. It is
    still present but it will be removed when InfluxDB 1.0 is
    released. The line protocol is the primary write protocol for
    InfluxDB 0.9.1+.

This InfluxDB module can handle only JSON protocol.

If you want to use line protocol, please use L<InfluxDB::LineProtocol> module.


=head1 SYNOPSIS

    use InfluxDB;
    
    my $ix = InfluxDB->new(
        host     => '127.0.0.1',
        port     => 8086,
        username => 'scott',
        password => 'tiger',
        database => 'test',
        # ssl => 1, # enable SSL/TLS access
        # timeout => 5, # set timeout to 5 seconds
    );
    
    $ix->write_points(
        data => {
            name    => "cpu",
            columns => [qw(sys user idle)],
            points  => [
                [20, 50, 30],
                [30, 60, 10],
            ],
        },
    ) or die "write_points: " . $ix->errstr;
    
    my $rs = $ix->query(
        q => 'select * from cpu',
        time_precision => 's',
    ) or die "query: " . $ix->errstr;
    
    # $rs is ArrayRef[HashRef]:
    # [
    #   {
    #     columns => ["time","sequence_number","idle","sys","user"],
    #     name => "cpu",
    #     points => [
    #       ["1391743908",6500001,10,30,60],
    #       ["1391743908",6490001,30,20,50],
    #     ],
    #   },
    # ]
    
    my $hrs = $ix->as_hash($rs); # or InfluxDB->as_hash($rs);
    # convert into HashRef for convenience
    # {
    #   cpu => [
    #     {
    #       idle   => 10,
    #       seqnum => 6500001,
    #       sys    => 30,
    #       time   => "1391743908",
    #       user   => 60
    #     },
    #     {
    #       idle   => 30,
    #       seqnum => 6490001,
    #       sys    => 20,
    #       time   => "1391743908",
    #       user   => 50
    #     }
    #   ]
    # }

=head1 DESCRIPTION

This module `InfluxDB` is a client library for InfluxDB E<lt>L<http://influxdb.org>E<gt>.

=head1 METHODS

=head2 Class Methods

=head3 B<new>(%args:Hash) :InfluxDB

Creates and returns a new InfluxDB client instance. Dies on errors.

%args is following:

=over 4

=item host => Str

=item port => Int (default: 8086)

=item username => Str

=item password => Str

=item database => Str

=item ssl => Bool (optional)

=item timeout => Int (default: 120)

=item debug => Bool (optional)

=back

=head2 Instance Methods

=head3 B<write_points>(%args:Hash) :Bool

Write to multiple time series names.

=over 4

=item data => ArrayRef[HashRef] | HashRef

HashRef like following:

    {
        name    => "name_of_series",
        columns => ["col1", "col2", ...],
        points  => [
            [10.0, 20.0, ...],
            [10.9, 21.3, ...],
            ...
        ],
    }

The C<time> and any other data fields which should be graphable must
be numbers, not text.  See L<JSON/simple scalars>.

=item time_precision => "s" | "m" | "u" (optional)

The precision timestamps should come back in. Valid options are s for seconds, m for milliseconds, and u for microseconds.

=back

=head3 B<delete_points>(name => Str) :Bool

Delete ALL DATA from series specified by I<name>

=head3 B<query>(%args:Hash) :ArrayRef

=over 4

=item q => Str

The InfluxDB query language, see: L<http://influxdb.org/docs/query_language/>

=item time_precision => "s" | "m" | "u" (optional)

The precision timestamps should come back in. Valid options are s for seconds, m for milliseconds, and u for microseconds.

=item chunked => Bool (default: 0)

Chunked response.

=back

=head3 B<as_hash>($result:ArrayRef[HashRef]) :HashRef

Utility instance/class method for handling result of query.

Takes result of C<query()>(ArrayRef) and convert into following HashRef.

    {
      cpu => [
        {
          idle => 10,
          seqnum => 6500001,
          sys => 30,
          time => "1391743908",
          user => 60
        },
        {
          idle => 30,
          seqnum => 6490001,
          sys => 20,
          time => "1391743908",
          user => 50
        }
      ]
    }

=head3 B<create_continuous_query>(q => Str, name => Str) :ArrayRef

Create continuous query.

    $ix->create_continuous_query(
        q    => "select mean(sys) as sys, mean(usr) as usr from cpu group by time(15m)",
        name => "cpu.15m",
    );

=head3 B<list_continuous_queries>() :ArrayRef

List continuous queries.

=head3 B<drop_continuous_query>(id => Str) :ArrayRef

Delete continuous query that has specified id.

You can get id of continuous query by list_continuous_queries().

=head3 B<switch_database>(database => Str) :Bool

Switch to another database.

=head3 B<switch_user>(username => Str, password => Str) :Bool

Change your user-context.

=head3 B<create_database>(database => Str) :Bool

Create database. Requires cluster-admin privileges.

=head3 B<list_database>() :ArrayRef[HashRef]

List database. Requires cluster-admin privileges.

    [
      {
        name => "databasename",
        replicationFactor => 1
      },
      ...
    ]

=head3 B<delete_database>(database => Str) :Bool

Delete database. Requires cluster-admin privileges.

=head3 B<list_series>() :ArrayRef[HashRef]

List series in current database

=head3 B<create_database_user>(name => Str, password => Str) :Bool

Create a database user on current database.

=head3 B<delete_database_user>(name => Str) :Bool

Delete a database user on current database.

=head3 B<update_database_user>(name => Str [,password => Str] [,admin => Bool]) :Bool

Update a database user on current database.

=head3 B<list_database_users>() :ArrayRef

List all database users on current database.

=head3 B<show_database_user>(name => Str) :HashRef

Show a database user on current database.

=head3 B<create_cluster_admin>(name => Str, password => Str) :Bool

Create a database user on current database.

=head3 B<delete_cluster_admin>(name => Str) :Bool

Delete a database user on current database.

=head3 B<update_cluster_admin>(name => Str, password => Str) :Bool

Update a database user on current database.

=head3 B<list_cluster_admins>() :ArrayRef

List all database users on current database.

=begin comment

=head3 B<show_cluster_admin>(name => Str) :HashRef

Show a database user on current database.

=end comment

=head3 B<status>() :HashRef

Returns status of previous request, as following hash:

=over 4

=item code => Int

HTTP status code.

=item message => Str

HTTP status message.

=item status_line => Str

HTTP status line (code . " " . message).

=item content => Str

Response body.

=back

=head3 B<errstr>() :Str

Returns error message if previous query was failed.

=head3 B<host>() :Str

Returns hostname of InfluxDB server.

=head3 B<port>() :Str

Returns port number of InfluxDB server.

=head3 B<username>() :Str

Returns current user name.

=head3 B<database>() :Str

Returns current database name.

=head1 ENVIRONMENT VARIABLES

=over 4

=item IX_DEBUG

Print debug messages to STDERR.

=back

=head1 AUTHOR

HIROSE Masaaki E<lt>hirose31@gmail.comE<gt>

=head1 REPOSITORY

L<https://github.com/hirose31/p5-InfluxDB>

    git clone https://github.com/hirose31/p5-InfluxDB.git

patches and collaborators are welcome.

=head1 SEE ALSO

L<http://influxdb.org>

=head1 COPYRIGHT

Copyright HIROSE Masaaki

=head1 LICENSE

This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.

=cut


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.