Group
Extension

DBIx-QuickORM/master/DBIx/QuickORM/Source.pm

package DBIx::QuickORM::Source;
use strict;
use warnings;

our $VERSION = '0.000005';

use Carp qw/croak confess/;
use List::Util qw/min zip/;
use Scalar::Util qw/blessed weaken refaddr/;
use DBIx::QuickORM::Util qw/parse_hash_arg mod2file/;

use Scope::Guard;
use DBIx::QuickORM::Row;
use DBIx::QuickORM::Select;
use DBIx::QuickORM::GlobalLookup;

use DBIx::QuickORM::Util::HashBase qw{
    <schema
    <table
    +table_name
    <orm
    +row_class
    <locator
    <created
};

use Role::Tiny::With qw/with/;
with 'DBIx::QuickORM::Role::SelectLike';
with 'DBIx::QuickORM::Role::HasORM';

sub source  { $_[0] }
sub count   { shift->select()->count(@_) }
sub update  { shift->select()->update(@_) }
sub shotgun { shift->select()->shotgun(@_) }

sub table_name { $_[0]->{+TABLE_NAME} //= $_[0]->{+TABLE}->name }

sub relations { die "Fixme" }

sub init {
    my $self = shift;

    my $table = $self->{+TABLE} or croak "The 'table' attribute must be provided";
    croak "The 'table' attribute must be an instance of 'DBIx::QuickORM::Table'" unless $table->isa('DBIx::QuickORM::Table');

    my $schema = $self->{+SCHEMA} or croak "The 'schema' attribute must be provided";
    croak "The 'schema' attribute must be an instance of 'DBIx::QuickORM::Schema'" unless $schema->isa('DBIx::QuickORM::Schema');

    weaken($self->{+ORM});

    $self->{+LOCATOR} = DBIx::QuickORM::GlobalLookup->register($self, weak => 1);

    return $self;
}

sub set_row_class {
    my $self = shift;
    my ($class) = @_;

    require(mod2file($class));

    return $self->{+ROW_CLASS} = $class;
}

sub row_class {
    my $self = shift;
    return $self->{+ROW_CLASS} if $self->{+ROW_CLASS};

    my $class = $self->{+TABLE}->row_class // $self->orm->row_class or return $self->{+ROW_CLASS} = 'DBIx::QuickORM::Row';

    require(mod2file($class));

    return $self->{+ROW_CLASS} = $class;
}

sub clone {
    my $self   = shift;
    my %params = @_;
    my $class  = $params{source_class} // blessed($self);

    return $class->new(
        %$self,
        %params,
    );
}

sub update_or_insert {
    my $self = shift;
    my $row_data = $self->parse_hash_arg(@_);

    my $cache = $self->cache;

    if (my $cached = $cache->find_row($self, $row_data)) {
        $cached->update($row_data);
        return $cached;
    }

    # Can we turn this to an upsert?
    my $row = $self->txn_do(sub {
        if (my $row = $self->find($row_data)) {
            $row->update($row_data);
            return $row;
        }

        return $self->insert($row_data);
    });

    return $cache->add_source_row($self, $row);
    return $row;
}

sub find_or_insert {
    my $self = shift;
    my $row_data = $self->parse_hash_arg(@_);

    my $cache = $self->cache;

    if (my $cached = $cache->find_row($self, $row_data)) {
        return $cached;
    }

    my $row = $self->txn_do(sub { $self->find($row_data) // $self->insert($row_data) });

    return $cache->add_source_row($self, $row);
    return $row;
}

sub _parse_find_and_fetch_args {
    my $self = shift;

    return {@_} unless @_ == 1;
    if (ref($_[0]) eq 'HASH') {
        return $_[0] if $_[0]->{where};
        return { where => $_[0] };
    }

    my $pk = $self->{+TABLE}->primary_key;
    croak "Cannot pass in a single value for find() or fetch() when table has no primary key"         unless $pk && @$pk;
    croak "Cannot pass in a single value for find() or fetch() when table has a compound primary key" unless @$pk == 1;
    return {where => {$pk->[0] => $_[0]}};
}

sub aggregate {
    my $self = shift;
    $self->select()->aggregate(@_);
}

sub any {
    my $self = shift;
    $self->select(@_)->first();
}

sub async {
    my $self = shift;
    $self->select(@_)->async();
}

sub aside {
    my $self = shift;
    $self->select(@_)->aside();
}

sub forked {
    my $self = shift;
    $self->select(@_)->forked();
}

sub first {
    my $self = shift;
    $self->select(@_)->first();
}

sub last {
    my $self = shift;
    $self->select(@_)->last();
}

sub all {
    my $self = shift;
    $self->select(@_)->all();
}

sub select {
    my $self = shift;

    # {where}
    # {where}, order
    # where => ..., order => ..., ...
    # {where => { ... }, order => ..., ...}
    my $params;
    if (!@_) {
        $params = {where => {}};
    }
    elsif (ref($_[0]) eq 'HASH') {
        $params = $self->_parse_find_and_fetch_args(shift(@_));
        $params->{order_by} = shift(@_) if @_ == 1;
    }

    $params = {%{$params // {}}, @_} if @_;

    return DBIx::QuickORM::Select->new(source => $self, %$params);
}

sub busy { $_[0]->connection->async_started }

sub count_select {
    my $self = shift;
    my ($params) = @_;

    my $where = $params->{where};

    my $table = $self->{+TABLE};
    my $con = $self->connection;

    confess "This database connection is currently engaged in an async query" if $con->async_started;

    my $source = $table->sqla_source;
    $source = \"$source AS me" unless ref($source);
    my ($stmt, $bind) = $self->build_select_sql($source, ['count(*)'], $where);

    my $sth = $con->dbh->prepare($stmt);
    $sth->execute(@$bind);

    my ($count) = $sth->fetchrow_array;

    $count //= 0;

    if (my $limit = $params->{limit}) {
        $count = min($count, $limit);
    }

    return $count;
}

sub do_select {
    my $self = shift;
    my ($params, %extra) = @_;

    my $aside  = $extra{aside};
    my $async  = $extra{async};
    my $forked = $extra{forked};
    my $where  = $params->{where};
    my $order  = $params->{order_by};

    my $con = $self->connection;

    confess "This database connection is currently engaged in an async query" if $con->async_started;
    confess "Async is not supported by this database engine" if $async && !$con->supports_async;

    my ($source, $cols, $relmap) = $self->_source_and_cols($params->{prefetch});
    my ($stmt, $bind) = $self->build_select_sql($source, $cols, $where, $order ? $order : ());

    if (my $limit = $params->{limit}) {
        $stmt .= " LIMIT ?";
        push @$bind => $limit;
    }

    my $sth = $con->dbh->prepare($stmt, $async ? ($con->async_query_arg) : ());
    $sth->execute(@$bind);

    my $guard;
    if ($aside) {
        $con->add_side_connection;
        $guard = Scope::Guard->new(sub { $con->pop_side_connection });
    }

    return {
        sth    => $sth,
        cols   => $cols,
        relmap => $relmap,
        guard  => $guard,
    } if $forked;

    my $fetch = sub {
        my $source = shift // $self;

        my @out;
        while (my $data = $sth->fetchrow_arrayref) {
            my $row = {};
            @{$row}{@$cols} = @$data;
            $source->expand_relations($row, $relmap);
            push @out => $source->_expand_row($row);
        }

        $con->async_stop() if $async;
        $con->pop_side_connection;

        return \@out;
    };

    return $fetch->() unless $async;

    my $started = {
        fetch  => $fetch,
        sth    => $sth,
        guard  => $guard,
        ready  => sub { $con->async_ready($sth) },
        result => sub { $con->async_result($sth) },
        cancel => sub { $con->async_cancel($sth) },
    };

    $con->async_start($async);

    return $started;
}

sub find {
    my $self  = shift;
    my $params = $self->_parse_find_and_fetch_args(@_);
    my $where = $params->{where};

    # See if there is a cached copy with the data we have
    my $cached = $self->cache->find_row($self, $where);
    return $cached if $cached;

    my $data = $self->fetch($params) or return;

    return $self->_expand_row($data);
}

# Get hashref data for one object (no cache)
sub fetch {
    my $self  = shift;
    my $params = $self->_parse_find_and_fetch_args(@_);
    my $where = $params->{where};

    my $con = $self->connection;

    my ($source, $cols, $relmap) = $self->_source_and_cols($params->{prefetch});
    my ($stmt, $bind) = $self->build_select_sql($source, $cols, $where);
    my $sth = $con->dbh->prepare($stmt);
    $sth->execute(@$bind);

    my $data = $sth->fetchrow_arrayref or return undef;
    my $extra = $sth->fetchrow_arrayref;
    croak "Multiple rows returned for fetch/find operation" if $extra;

    my $row = {};

    if ($relmap) {
        @{$row}{@$cols} = @$data;
        $self->expand_relations($row, $relmap);
    }
    else {
        s/^me\.// for @$cols;
        @{$row}{@$cols} = @$data;
    }


    return $row;
}

sub insert_row {
    my $self = shift;
    my ($row) = @_;

    croak "Row already exists in the database" if $row->stored;

    my $row_data = $row->dirty;

    my $data = $self->_insert($row_data);

    $row->refresh($data, $self->transaction, insert => 1);

    return $self->cache->add_source_row($self, $row);
}

sub insert {
    my $self = shift;

    my $row_data = $self->parse_hash_arg(@_);

    my $data = $self->_insert($row_data);
    my $row  = $self->row_class->new(stored => $data, source => $self);

    return $self->cache->add_source_row($self, $row);
}

sub _insert {
    my $self = shift;
    my ($row_data) = @_;

    my $con   = $self->connection;
    my $ret   = $con->db->insert_returning_supported;
    my $table = $self->{+TABLE};
    my $tname = $table->name;

    $row_data = $self->deflate_row_data($row_data);

    my ($stmt, @bind) = $con->sqla->insert($tname, $row_data, $ret ? {returning => [$table->column_names]} : ());

    my $dbh = $con->dbh;
    my $sth = $dbh->prepare($stmt);
    $sth->execute(@bind);

    my $data;
    if ($ret) {
        $data = $sth->fetchrow_hashref;
    }
    else {
        my $pk_fields = $self->{+TABLE}->primary_key;

        my $where;
        if (@$pk_fields > 1) {
            $where = {map { my $v = $row_data->{$_} or croak "Auto-generated compound primary keys are not supported for databses that do not support 'returning' functionality"; ($_ => $v) } @$pk_fields};
        }
        else {
            my $kv = $dbh->last_insert_id(undef, undef, $tname);
            $where = {$pk_fields->[0] => $kv};
        }

        my ($stmt, $bind) = $self->build_select_sql($table->sqla_source, $table->sqla_columns, $where);
        my $sth = $dbh->prepare($stmt);
        $sth->execute(@$bind);
        $data = $sth->fetchrow_hashref;
    }

    return $data;
}

sub build_select_sql {
    my $self = shift;
    my ($stmt, $bind, $bind_names) = $self->connection->sqla->select(@_);

    die "Internal error: Length mistmatch between bind elements and bind names (" . @$bind . " vs " . @$bind_names . ")"
        unless @$bind == @$bind_names;

    my @new_bind = map { $self->deflate_column_data(@{$_}) } zip($bind_names, $bind);

    return ($stmt, \@new_bind);
}

sub deflate_row_data {
    my $self = shift;
    my $row_data = $self->parse_hash_arg(@_);

    my $new_data = {map {($_ => $self->deflate_column_data($_, $row_data->{$_}))} keys %$row_data};

    return $new_data;
}

sub deflate_column_data {
    my $self = shift;
    my ($col, $val) = @_;

    my $table = $self->{+TABLE};
    my $tname = $table->name;
    $col =~ s/^\S+\.// if index($col, '.');
    my $def = $table->column($col) or confess "Table '$tname' does not have a column '$col'";

    if (my $conf = $def->{conflate}) {
        return $conf->qorm_deflate(quote_bin => $self->connection, source => $self, column => $def, value => $val, type => $self->connection->column_type($tname, $col));
    }

    if (blessed($val) && $val->can('qorm_deflate')) {
        return $val->qorm_deflate(quote_bin => $self->connection, source => $self, column => $def, value => $val, type => $self->connection->column_type($tname, $col));
    }

    return $val;
}

sub vivify {
    my $self = shift;
    my $row_data = $self->parse_hash_arg(@_);
    return $self->row_class->new(dirty => $row_data, source => $self);
}

sub DESTROY {
    my $self = shift;

    if (my $orm = $self->orm) {
        if (my $cache = $orm->cache) {
            $cache->remove_source($self);
        }
    }

    return;
}

sub _expand_row {
    my $self = shift;
    my ($data) = @_;

    my %relations;

    for my $key (keys %$data) {
        my $row = $data->{$key} or next;
        next unless ref($row) eq 'HASH';

        my $rel = $self->{+TABLE}->relation($key);
        my $source = $self->{+ORM}->source($rel->table);

        $relations{$key} = $source->_expand_row(delete $data->{$key});
    }

    my $cache = $self->cache;

    if (my $cached = $cache->find_row($self, $data)) {
        $cached->refresh($data);
        $cached->_update_fetched_relations(\%relations);
        return $cached;
    }

    return $cache->add_source_row(
        $self,
        $self->row_class->new(stored => $data, source => $self, fetched_relations => \%relations),
    );
}

sub _source_and_cols {
    my $self = shift;
    my ($prefetch) = @_;

    my $table = $self->{+TABLE};
    my $prefetch_sets = $table->prefetch_relations($prefetch);

    unless (@$prefetch_sets) {
        my $source = $table->sqla_source;
        my $cols = $table->sqla_columns;

        return ($source, $cols) if ref($source);

        return (\"$source AS me", [map { "me.$_" } @$cols]);
    }

    my $source = $table->sqla_source . " AS me";
    my @cols   = map { "me.$_" } @{$table->sqla_columns};

    my %relmap;
    my @todo = map {[@{$_}, 'me'] } @$prefetch_sets;

    while (my $item = shift @todo) {
        my ($as, $rel, @path) = @$item;
        my ($from) = @path;

        confess "Multiple relations requested the alias (AS) '$as', please specify an alternate name for one"
            if $relmap{$as};

        my $ftable = $rel->table;

        $relmap{$as} = [@path, $as];

        my $ts = $self->orm->source($ftable);
        my $t2 = $ts->table;

        my $s2 = $t2->sqla_source;
        my $c2 = $t2->sqla_columns;

        $source .= " JOIN $s2 AS $as";
        $source .= " ON(" . $rel->on_sql($from, $as) . ")";

        push @cols => map { "${as}.$_" } @$c2;
        push @todo => map { [@{$_}, @path] } @{$t2->prefetch_relations};
    }

    return (\$source, \@cols, \%relmap);
}

sub expand_relations {
    my $self = shift;
    my ($data, $relmap) = @_;

    $relmap //= {};

    for my $key (keys %$data) {
        next unless $key =~ m/^(.+)\.(.+)$/;
        my ($rel, $col) = ($1, $2);
        if ($rel eq 'me') {
            $data->{$col} = delete $data->{$key};
            next;
        }

        my $path = $relmap->{$rel} or die "No relmap path for '$key'";

        my $p = $data;
        for my $pt (@$path) {
            next if $pt eq 'me';
            $p = $p->{$pt} //= {};
        }

        $p->{$col} = delete $data->{$key};
    }

    return $data;
}

# See the Cpanel::JSON::XS docs
sub FREEZE {
    my $self = shift;
    my ($serializer) = @_;
    return ($self->TO_JSON);
}

sub THAW {
    my $class = shift;
    my ($serializer, $data) = @_;

    return $class->FROM_JSON($data);
}

sub FROM_JSON {
    my $class = shift;
    my ($data) = @_;

    return $data if $data->{error};

    if (my $locator = $data->{locator}) {
        if (my $found = DBIx::QuickORM::GlobalLookup->lookup(@$locator)) {
            my $check = $found->TO_JSON;

            return {error => "Source content mismatch between origin and local copy (Source objects can only be serialized between parent and child processes, and only using source objects established prior to forking)"}
                if grep { $_ eq 'locator' || $data->{$_} ne $check->{$_} } keys %$check;

            return $found;
        }
    }

    return { error => "Could not find the source in this processes memory" }
}

sub TO_JSON {
    my $self = shift;

    my $orm   = $self->{+ORM};
    my $table = $self->{+TABLE};

    # We may be able to relax this later and send the temp table spec, but not
    # going to write that until it is needed.
    return { error => "Cannot serialize, references a temporary table that may not exist when deserialized" }
        if $table->is_temp;

    return {
        orm_name     => $orm->name          // '',
        table_name   => $self->table_name   // '',
        row_class    => $self->{+ROW_CLASS} // '',
        source_class => blessed($self),
        locator      => $self->{+LOCATOR},
    };
}

1;


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.