Group
Extension

HTTP-ClickHouse/lib/HTTP/ClickHouse.pm

package HTTP::ClickHouse;

use 5.010000;
use strict;
use warnings FATAL => 'all';
use Carp;

=head1 NAME

HTTP::ClickHouse - Perl driver for ClickHouse

=cut

use Data::Dumper;
use Net::HTTP::NB;
use IO::Select;
use Storable qw(nfreeze thaw);
use URI;
use URI::QueryParam;

our @ISA = qw(HTTP::ClickHouse::Base);
use HTTP::ClickHouse::Base;

use constant READ_BUFFER_LENGTH => 4096;

=head1 VERSION

Version 0.061

=cut

our $VERSION = '0.061';
our $AUTOLOAD;

=head1 SYNOPSIS

HTTP::ClickHouse - Perl interface to ClickHouse Database via HTTP.

=head1 EXAMPLE

    use HTTP::ClickHouse;
 
    my $chdb =  HTTP::ClickHouse->new(
        host     => '127.0.0.1', 
        user     => 'Harry',
        password => 'Alohomora',
    );

    $chdb->do("create table test (id UInt8, f1 String, f2 String) engine = Memory");

    $chdb->do("INSERT INTO my_table (id, field_1, field_2) VALUES", 
        [1, "Gryffindor", "a546825467 1861834657416875469"],
        [2, "Hufflepuff", "a18202568975170758 46717657846"],
        [3, "Ravenclaw", "a678 2527258545746575410210547"],
        [4, "Slytherin", "a1068267496717456 878134788953"]
    );

    my $rows = $chdb->selectall_array("SELECT count(*) FROM my_table");  
    unless (@$rows) { $rows->[0]->[0] = 0; } # the query returns an empty string instead of 0
    print $rows->[0]->[0]."\n"; 


    if ($chdb->select_array("SELECT id, field_1, field_2 FROM my_table")) {
        my $rows = $chdb->fetch_array();
        foreach my $row (@$rows) {
            # Do something with your row
            foreach my $col (@$row) {
                # Do something
                print $col."\t";
            }
            print "\n";
        }
    }

    $rows = $chdb->selectall_hash("SELECT count(*) as count FROM my_table");
    foreach my $row (@$rows) {
        foreach my $key (keys %{$row}){
            # Do something
            print $key." = ".$row->{$key}."\t";
        }
        print "\n";
    }
 
    ...

    disconnect $chdb;

=head1 DESCRIPTION

This module implements HTTP driver for Clickhouse OLAP database 

=head1 SUBROUTINES/METHODS

=head2 new

Create a new connection object with auto reconnect socket if disconnected.

    my $chdb =  HTTP::ClickHouse->new(
        host     => '127.0.0.1', 
        user     => 'Harry',
        password => 'Alohomora'
    );

options:

    host       => 'hogwards.mag',              # optional, default value '127.0.0.1'
    port       => 8123,                        # optional, default value 8123
    user       => 'Harry',                     # optional, default value 'default'
    password   => 'Alohomora',                 # optional
    database   => 'database_name',             # optional, default name "default"         
    nb_timeout => 10                           # optional, default value 25 second
    keep_alive => 1                            # optional, default 1 (1 or 0)
    debug      => 1                            # optional, default 0

=cut

sub new {
    my $class = shift;
    my $self = { @_ };
    $self = bless $self, $class;
    $self->_init();
    $self->_connect();
    return $self;
}

sub _connect {
    my $self = shift;

    my $_uri = URI->new("/");
    $_uri->query_param('user' => $self->{user}) if $self->{user};
    $_uri->query_param('password' => $self->{password}) if $self->{password};
    $_uri->query_param('database' => $self->{database});
    $self->{_uri} = nfreeze($_uri);

    $self->{socket} = Net::HTTP::NB->new(
        Host        => $self->{host},
        PeerPort    => $self->{port},
        HTTPVersion => '1.1',
        KeepAlive   => $self->{keep_alive}
    ) or carp "Error. Can't connect to ClickHouse host: $!";
}

sub uri {
    my $self = shift;
    return thaw($self->{_uri});
}

sub _status {
    my $self = shift;
    my $select = IO::Select->new($self->{socket});
    my ($code, $mess, %h);

    my $_status = eval {
        READHEADER: {
            die "Get header timeout" unless $select->can_read($self->{nb_timeout});
            ($code, $mess, %h) = $self->{socket}->read_response_headers;
            redo READHEADER unless $code;
        }
    };

    $_status = $code if ($code);
    if ($@) {
        carp($@) if $self->{debug};
        $_status = 500; 
    }
    return $_status;
}

sub _read {
    my $self = shift;
    my $remainder = '';
    my @_response;
    READBODY: {
        my $_bufer;
        my $l = $self->{socket}->read_entity_body($_bufer, READ_BUFFER_LENGTH);
        $_bufer = $remainder . $_bufer;
        $remainder = '';
        last unless $l;
        if ($_bufer =~ s!([^\n]+\z)!!) {
            $remainder = $1;
        }
        push @_response, split (/\n/, $_bufer);
        redo READBODY;
    }
    return $self->body2array(@_response);
}

sub _query {
    my $self = shift;
    my $method = shift;
    my $query = shift;
    my $data = shift;

    delete $self->{response};

    my $t = $self->_ping();
    if ($t == 0) { $self->_connect(); carp('Reconnect socket') if $self->{debug}; }

    my $uri = $self->uri();
    $uri->query_param('query' => $query);

    my @qparam;
    push @qparam, $method => $uri->as_string();
    push @qparam, $data if ($method eq 'POST');

    $self->{socket}->write_request(@qparam);

    my $_status = $self->_status();
    my $body = $self->_read();  # By default, data is returned in TabSeparated format.
    $self->{response} = $body;

    if ($_status ne '200') {
        carp(join(" ".$body)) if $self->{debug};
        carp(Dumper($body)) if $self->{debug};
        return 0;
    }
    return 1;
}

=head2 select_array & fetch_array

First step - select data from the table (readonly). It returns 1 if query completed without errors or 0.
Don't set FORMAT in query. TabSeparated is used by default in the HTTP interface.

Second step - fetch data.
It returns a reference to an array containing a reference to an array for each row of data fetched.

    if ($chdb->select_array("SELECT id, field_1, field_2 FROM my_table")) {
        my $rows = $chdb->fetch_array();
        foreach my $row (@$rows) {
            # Do something with your row
            foreach my $col (@$row) {
                # Do something
                print $col."\t";
            }
            print "\n";
        }
    }

=cut

sub select_array {
    my $self = shift;
    my $query = shift;
    $query .= ' FORMAT TabSeparated';
    return $self->_query('GET', $query); # When using the GET method, 'readonly' is set.
}

sub fetch_array {
    my $self = shift;
    my $_responce = $self->{response};
#    unless (@$_responce) { $_responce->[0]->[0] = 0; } # the query returns an empty string instead of 0
    return $_responce;
}

=head2 selectall_array

Fetch data from the table (readonly). 
It returns a reference to an array containing a reference to an array for each row of data fetched.

    my $rows = $chdb->selectall_array("SELECT count(*) FROM my_table");  
    unless (@$rows) { $rows->[0]->[0] = 0; } # the query returns an empty string instead of 0
    print $rows->[0]->[0]."\n"; 

=cut

sub selectall_array {
    my $self = shift;
    my $query = shift;
    $self->select_array($query);
    return $self->fetch_array;
}

=head2 select_hash & fetch_hash

First step - select data from the table (readonly). It returns 1 if query completed without errors or 0.
Don't set FORMAT in query.

Second step - fetch data.
It returns a reference to an array containing a reference to an array for each row of data fetched.

    if ($chdb->select_hash("SELECT id, field_1, field_2 FROM my_table")) {
        my $rows = $chdb->fetch_hash();
        foreach my $row (@$rows) {
            # Do something with your row
            foreach my $key (sort(keys %{$row})){
                # Do something
                print $key." = ".$row->{$key}."\t";
            }
            print "\n";
        }
    }

=cut

sub select_hash {
    my $self = shift;
    my $query = shift;
    $query .= ' FORMAT TabSeparatedWithNames';
    return $self->_query('GET', $query); # When using the GET method, 'readonly' is set.
}

sub fetch_hash {
    my $self = shift;
    return $self->array2hash(@{$self->{response}});
}

=head2 selectall_hash

Fetch data from the table (readonly). 
It returns a reference to an array containing a reference to an hash for each row of data fetched.

    my $rows = $chdb->selectall_hash("SELECT id, field_1, field_2 FROM my_table");
    foreach my $row (@$rows) {
        # Do something with your row
        foreach my $key (sort(keys %{$row})){
            # Do something
            print $key." = ".$row->{$key}."\t";
        }
        print "\n";
    }

=cut

sub selectall_hash {
    my $self = shift;
    my $query = shift;
    $self->select_hash($query);
    return $self->fetch_hash;
}

=head2 do

Universal method for any queries inside the database, which modify data (insert data, create, alter, detach or drop table or partition).
It returns 1 if query completed without errors or 0.

    # drop
    $chdb->do("drop table test if exist");

    # create
    $chdb->do("create table test (id UInt8, f1 String, f2 String) engine = Memory");

    # insert
    $chdb->do("INSERT INTO my_table (id, field_1, field_2) VALUES", 
        [1, "Gryffindor", "a546825467 1861834657416875469"],
        [2, "Hufflepuff", "a18202568975170758 46717657846"],
        [3, "Ravenclaw", "a678 2527258545746575410210547"],
        [4, "Slytherin", "a1068267496717456 878134788953"]
    );

=cut

sub do {
    my $self = shift;
    my $query = shift;
    my $data = $self->data_prepare(@_);
    return $self->_query('POST', $query, $data);
}

sub _ping {
    my $self = shift;
    $self->{socket}->write_request(GET => '/ping');
    my $_status = $self->_status();   
    if ($_status == 200) {
        my $body = $self->_read();
        if ($body->[0]->[0] eq 'Ok.') {
            return 1;
        }
    }
    return 0;
}

=head2 disconnect

Disconnects http socket from the socket handle. Disconnect typically occures only used before exiting the program. 

    disconnect $chdb;

    # or

    $chdb->disconnect;

=cut

sub disconnect {
    my $self = shift;
    $self->{socket}->keep_alive(0) if ($self->{socket});
    $self->_ping();
}


=head1 SEE ALSO

=over 4

=item * ClickHouse official documentation

L<https://clickhouse.yandex/reference_en.html>

=back

=head1 AUTHOR

Maxim Motylkov

=head1 TODO

The closest plans are

=over 4

=item * Add json data format.

=back

=head1 MODIFICATION HISTORY

See the Changes file.

=head1 COPYRIGHT AND LICENSE

   Copyright 2016 Maxim Motylkov

   This module is free software; you can redistribute it and/or modify it under the same terms as Perl itself. 

   THIS PACKAGE IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, WITHOUT LIMITATION,
   THE IMPLIED WARRANTIES OF MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE.

=cut

sub HTTP::ClickHouse::AUTOLOAD {
    croak "No such method: $AUTOLOAD";
}

sub DESTROY {
}

1;


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.