Group
Extension

Catmandu-Store-DBI/lib/Catmandu/Store/DBI.pm

package Catmandu::Store::DBI;

use namespace::clean;
use Catmandu::Sane;
use DBI;
use Moo;

our $VERSION = "0.04";

with 'Catmandu::Store';

has data_source => (
    is       => 'ro',
    required => 1,
    trigger  => sub { $_[0] =~ /^DBI:/i ? $_[0] : "DBI:$_[0]" },
);

has username => ( is => 'ro', default => sub { '' } );
has password => ( is => 'ro', default => sub { '' } );

has dbh => (
    is       => 'ro',
    init_arg => undef,
    lazy     => 1,
    builder  => '_build_dbh',
);

# Only mysql seems to need auto_reconnect for now
sub _build_dbh {
    my $self = $_[0];
    my $opts = {
        AutoCommit           => 1,
        RaiseError           => 1,
        mysql_auto_reconnect => 1,
    };
    DBI->connect($self->data_source, $self->username, $self->password, $opts);
}

sub transaction {
    my ($self, $sub) = @_;

    if ($self->{_tx}) {
        return $sub->();
    }

    my $dbh = $self->dbh;
    my @res;

    eval {
        $self->{_tx} = 1;
        $dbh->begin_work;
        @res = $sub->();
        $dbh->commit;
        $self->{_tx} = 0;
        1;
    } or do {
        my $err = $@;
        eval { $dbh->rollback };
        $self->{_tx} = 0;
        die $err;
    };

    @res;
}

sub DEMOLISH {
    $_[0]->dbh->disconnect if $_[0]->dbh;
}

package Catmandu::Store::DBI::Bag;

use Catmandu::Sane;
use Moo;
use Catmandu::Iterator;
use Catmandu::Util qw(require_package);

with 'Catmandu::Bag';
with 'Catmandu::Serializer';

has _sql_get    => (is => 'ro', lazy => 1, builder => '_build_sql_get');
has _sql_delete => (is => 'ro', lazy => 1, builder => '_build_sql_delete');
has _sql_delete_all =>
  (is => 'ro', lazy => 1, builder => '_build_sql_delete_all');
has _sql_generator =>
  (is => 'ro', lazy => 1, builder => '_build_sql_generator');
has _sql_count => (is => 'ro', lazy => 1, builder => '_build_sql_count');
has _add       => (is => 'ro', lazy => 1, builder => '_build_add');

sub BUILD {
    $_[0]->_build_create;
}

sub _build_sql_get {
    my $name = $_[0]->name;
    "select data from $name where id=?";
}

sub _build_sql_delete {
    my $name = $_[0]->name;
    "delete from $name where id=?";
}

sub _build_sql_delete_all {
    my $name = $_[0]->name;
    "delete from $name";
}

sub _build_sql_generator {
    my $name = $_[0]->name;
    "select data from $name";
}

sub _build_sql_count {
    my $name = $_[0]->name;
    "select count(*) from $name";
}

sub _build_add_sqlite {
    my $self = $_[0];
    my $name = $self->name;
    my $sql  = "insert or replace into $name(id,data) values(?,?)";
    sub {
        my $dbh = $self->store->dbh;
        my $sth = $dbh->prepare_cached($sql)
          or Catmandu::Error->throw($dbh->errstr);
        $sth->execute($_[0], $_[1]) or Catmandu::Error->throw($sth->errstr);
        $sth->finish;
    };
}

sub _build_add_mysql {
    my $self = $_[0];
    my $name = $self->name;
    my $sql = "insert into $name(id,data) values(?,?) on duplicate key update data=values(data)";
    sub {
        my $dbh = $self->store->dbh;
        my $sth = $dbh->prepare_cached($sql)
            or Catmandu::Error->throw($dbh->errstr);
        $sth->execute($_[0], $_[1])
            or Catmandu::Error->throw($sth->errstr);
        $sth->finish;
    };
}

sub _build_add_postgres {
    my ($self)     = @_;
    my $pg         = require_package('DBD::Pg');
    my $name       = $self->name;
    my $sql_update = "update $name set data=? where id=?";
    # see http://stackoverflow.com/questions/15840922/where-not-exists-in-postgresql-gives-syntax-error
    my $sql_insert = "insert into $name select ?,? where not exists (select 1 from $name where id=?)";

    sub {
        my $dbh = $self->store->dbh;
        my $sth = $dbh->prepare_cached($sql_update)
            or Catmandu::Error->throw($dbh->errstr);

        # special quoting for bytea in postgres:
        # https://rt.cpan.org/Public/Bug/Display.html?id=13180
        # http://www.nntp.perl.org/group/perl.dbi.users/2005/01/msg25370.html
        $sth->bind_param(1,$_[1], {pg_type => $pg->PG_BYTEA});
        $sth->bind_param(2,$_[0]);

        $sth->execute
            or Catmandu::Error->throw($sth->errstr);

        unless ($sth->rows) {
            $sth->finish;
            $sth = $dbh->prepare_cached($sql_insert)
              or Catmandu::Error->throw($dbh->errstr);
            $sth->execute( $_[0], $_[1], $_[0] )
              or Catmandu::Error->throw($sth->errstr);
            $sth->finish;
        }
    };
}

sub _build_add_generic {
    my $self       = $_[0];
    my $name       = $self->name;
    my $sql_update = "update $name set data=? where id=?";
    my $sql_insert = "insert into $name values(?,?) where not exists (select 1 from $name where id=?)";
    sub {
        my $dbh = $self->store->dbh;
        my $sth = $dbh->prepare_cached($sql_update)
          or Catmandu::Error->throw( $dbh->errstr );
        $sth->execute($_[1], $_[0]) or Catmandu::Error->throw($sth->errstr);
        unless ($sth->rows) {
            $sth->finish;
            $sth = $dbh->prepare_cached($sql_insert)
              or Catmandu::Error->throw($dbh->errstr);
            $sth->execute( $_[0], $_[1], $_[0] )
              or Catmandu::Error->throw($sth->errstr);
            $sth->finish;
        }
    };
}

sub _build_create {
    my $self = $_[0];
    my $driver_name = $self->store->dbh->{Driver}{Name} // "";
    if ($driver_name =~ /pg/i) { return $self->_build_create_postgres }
    $self->_build_create_generic;
}

sub _build_create_postgres {
    my $self = $_[0];
    my $name = $self->name;
    my $dbh  = $self->store->dbh;
    # requires al least Postgres 9.1
    # TODO get rid of this annoying warning:
    # 'NOTICE:  relation "$name" already exists, skipping'
    my $sql = "create table if not exists $name(id varchar(255) not null primary key, data bytea not null)";
    $dbh->do($sql) or Catmandu::Error->throw($dbh->errstr);
}

sub _build_create_generic {
    my $self = $_[0];
    my $name = $self->name;
    my $dbh  = $self->store->dbh;
    my $sql = "create table if not exists $name(id varchar(255) not null primary key, data longblob not null)";
    $dbh->do($sql) or Catmandu::Error->throw($dbh->errstr);
}

sub _build_add {
    my $self = $_[0];
    my $driver_name = $self->store->dbh->{Driver}{Name} // "";
    if ($driver_name =~ /sqlite/i) { return $self->_build_add_sqlite; }
    if ($driver_name =~ /mysql/i)  { return $self->_build_add_mysql; }
    if ($driver_name =~ /pg/i)     { return $self->_build_add_postgres; }
    return $self->_build_add_generic;
}

sub get {
    my ($self, $id) = @_;
    my $dbh = $self->store->dbh;
    my $sth = $dbh->prepare_cached($self->_sql_get)
        or Catmandu::Error->throw($dbh->errstr);
    $sth->execute($id) or Catmandu::Error->throw($sth->errstr);
    my $data;
    if (my $row = $sth->fetchrow_arrayref) {
        $data = $self->deserialize($row->[0]);
    }
    $sth->finish;
    $data;
}

sub add {
    my ($self, $data) = @_;
    $self->_add->($data->{_id}, $self->serialize($data));
}

sub delete_all {
    my ($self) = @_;
    my $dbh    = $self->store->dbh;
    my $sth    = $dbh->prepare_cached($self->_sql_delete_all)
        or Catmandu::Error->throw($dbh->errstr);
    $sth->execute or Catmandu::Error->throw($sth->errstr);
    $sth->finish;
}

sub delete {
    my ($self, $id) = @_;
    my $dbh = $self->store->dbh;
    my $sth = $dbh->prepare_cached($self->_sql_delete)
      or Catmandu::Error->throw($dbh->errstr);
    $sth->execute($id) or Catmandu::Error->throw($sth->errstr);
    $sth->finish;
}

sub generator {
    my ($self) = @_;
    my $dbh = $self->store->dbh;
    sub {
        state $sth;
        state $row;
        unless ($sth) {
            $sth = $dbh->prepare($self->_sql_generator)
                or Catmandu::Error->throw($dbh->errstr);
            $sth->execute;
        }
        if ( $row = $sth->fetchrow_arrayref ) {
            return $self->deserialize($row->[0]);
        }
        $sth->finish;
        return;
    };
}

sub count {
    my ($self) = @_;
    my $dbh    = $self->store->dbh;
    my $sth    = $dbh->prepare_cached($self->_sql_count)
      or Catmandu::Error->throw($dbh->errstr);
    $sth->execute or Catmandu::Error->throw($sth->errstr);
    my ($n) = $sth->fetchrow_array;
    $sth->finish;
    $n;
}

# mysql:     select * from <bag> limit <offset>,<limit>
# postgres:  select * from <bag> limit <limit> offset <offset>
# sqlite:    select * from <bag> limit <offset>,<limit>
#            select * from <bag> limit <limit> offset <offset>

has _sql_slice => (is => 'ro', lazy => 1, builder => '_build_sql_slice');

sub _build_sql_slice {
    my $self = $_[0];
    my $driver_name = $self->store->dbh->{Driver}{Name} // "";
    if ($driver_name =~ /sqlite/i) { return $self->_build_slice_sqlite; }
    if ($driver_name =~ /mysql/i)  { return $self->_build_slice_mysql; }
    if ($driver_name =~ /pg/i)     { return $self->_build_slice_postgres; }
    # TODO fall back on default slice implementation
    Catmandu::NotImplemented->throw("slice is only supported for mysql, postgres or sqlite");
}

sub _build_slice_sqlite {
    my $self = $_[0];
    my $name = $self->name;
    my $dbh  = $self->store->dbh;
    my $sql  = "SELECT data FROM $name LIMIT ?,?";

    sub {
        my ($start, $limit) = @_;
        my $sth = $dbh->prepare_cached($sql)
          or Catmandu::Error->throw($dbh->errstr);
        $sth->execute( $start, $limit )
          or Catmandu::Error->throw($sth->errstr);
        $sth;
    };
}

sub _build_slice_mysql {
    my $self = $_[0];
    my $name = $self->name;
    my $dbh  = $self->store->dbh;
    my $sql  = "SELECT data FROM $name LIMIT ?,?";

    sub {
        my ($start, $limit) = @_;
        my $sth = $dbh->prepare_cached($sql)
          or Catmandu::Error->throw($dbh->errstr);
        $sth->execute( $start, $limit )
          or Catmandu::Error->throw($sth->errstr);
        $sth;
    };
}

sub _build_slice_postgres {
    my $self = $_[0];
    my $name = $self->name;
    my $dbh  = $self->store->dbh;
    my $sql  = "SELECT data FROM $name LIMIT ? OFFSET ?";

    sub {
        my ($start, $limit) = @_;
        my $sth = $dbh->prepare_cached($sql)
          or Catmandu::Error->throw($dbh->errstr);
        $sth->execute($limit, $start)
          or Catmandu::Error->throw($sth->errstr);
        $sth;
    };
}

sub slice {
    my ($self, $start, $total) = @_;
    $start //= 0;
    my $dbh = $self->store->dbh;

    Catmandu::Iterator->new(
        sub {
            sub {
                if (defined $total) {
                    $total || return;
                }

                state $sth;
                state $row;
                unless ($sth) {
                    if (defined($total)) {
                        $sth = $self->_sql_slice->($start, $total);
                    }
                    else {
                        $sth = $dbh->prepare($self->_sql_generator)
                          or Catmandu::Error->throw($dbh->errstr);
                        $sth->execute;
                    }
                }
                if ($row = $sth->fetchrow_arrayref) {
                    return $self->deserialize($row->[0]);
                }
                $sth->finish;
                return;

              }
        }
    );

}

1;

=head1 NAME

Catmandu::Store::DBI - A Catmandu::Store plugin for DBI based interfaces

=head1 VERSION

Version 0.04

=head1 SYNOPSIS

    use Catmandu::Store::DBI;

    my $store = Catmandu::Store::DBI->new(
        data_source => 'DBI:mysql:database=test', # prefix "DBI:" optionl
        username => '', # optional
        password => '', # optional
    );

    my $obj1 = $store->bag->add({ name => 'Patrick' });

    printf "obj1 stored as %s\n" , $obj1->{_id};

    # Force an id in the store
    my $obj2 = $store->bag->add({ _id => 'test123' , name => 'Nicolas' });

    my $obj3 = $store->bag->get('test123');

    $store->bag->delete('test123');

    $store->bag->delete_all;

    # All bags are iterators
    $store->bag->each(sub { ... });
    $store->bag->take(10)->each(sub { ... });

The L<catmandu> command line client can be used like this:

    catmandu import JSON to DBI --data_source SQLite:mydb.sqlite < data.json

=head1 DESCRIPTION

A Catmandu::Store::DBI is a Perl package that can store data into
DBI backed databases. The database as a whole is called a 'store'
(L<Catmandu::Store>. Databases also have compartments (e.g. tables) 
called 'bags' (L<Catmandu::Bag>).

=head1 METHODS

=head2 new(data_source => $data_source)

Create a new Catmandu::Store::DBI store using a DBI $data_source. The 
prefix "DBI:" is added automatically if needed.

=head2 bag($name)

Create or retieve a bag with name $name. Returns a Catmandu::Bag.

=head1 AUTHOR

Nicolas Steenlant, C<< <nicolas.steenlant at ugent.be> >>

=head1 CONTRIBUTOR

Vitali Peil C<< <vitali.peil at uni-bielefeld.de> >>

=head1 CONTRIBUTOR

Nicolas Franck C<< <nicolas.franck at ugent.be> >>

=head1 SEE ALSO

L<Catmandu::Bag>, L<Catmandu::Searchable>, L<DBI>

=cut


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.