Group
Extension

DBIx-QuickORM/master/DBIx/QuickORM/Connection.pm

package DBIx::QuickORM::Connection;
use strict;
use warnings;

our $VERSION = '0.000005';

use Carp qw/confess croak/;

require DBIx::QuickORM::SQLAbstract;

use DBIx::QuickORM::Util::HashBase qw{
    <db
    <schema
    <orm
    +dbh
    <pid
    <transaction
    <column_type_cache
    <sqla
    +async
    +side
    <created
};

##############
# DB Proxies #
##############

sub tables      { my $self = shift; $self->{+DB}->tables($self->dbh, @_) }
sub table       { my $self = shift; $self->{+DB}->table($self->dbh, @_) }
sub db_keys     { my $self = shift; $self->{+DB}->db_keys($self->dbh, @_) }
sub db_version  { my $self = shift; $self->{+DB}->db_version($self->dbh, @_) }
sub indexes     { my $self = shift; $self->{+DB}->indexes($self->dbh, @_) }
sub column_type { my $self = shift; $self->{+DB}->column_type($self->dbh, $self->{+COLUMN_TYPE_CACHE}, @_) }
sub columns     { my $self = shift; $self->{+DB}->columns($self->dbh, $self->{+COLUMN_TYPE_CACHE}, @_) }

sub create_temp_view     { my $self = shift; $self->{+DB}->create_temp_view($self->dbh, @_) }
sub create_temp_table    { my $self = shift; $self->{+DB}->create_temp_table($self->dbh, @_) }
sub temp_table_supported { my $self = shift; $self->{+DB}->temp_table_supported($self->dbh, @_) }
sub temp_view_supported  { my $self = shift; $self->{+DB}->temp_view_supported($self->dbh, @_) }

sub load_schema_sql { my $self = shift; $self->{+DB}->load_schema_sql($self->dbh, @_) }

sub supports_uuid     { my $self = shift; $self->{+DB}->supports_uuid($self->dbh, @_) }
sub supports_json     { my $self = shift; $self->{+DB}->supports_json($self->dbh, @_) }
sub supports_datetime { my $self = shift; $self->{+DB}->supports_datetime($self->dbh, @_) }

sub ping {
    my $self = shift;
    my $dbh = $self->{+DBH} or return 0;
    return $self->{+DB}->ping($dbh);
}

##############
# DB Proxies #
##############

#################
# INIT and MISC #
#################

sub init {
    my $self = shift;

    croak "A database is required" unless $self->{+DB};

    $self->{+PID}  //= $$;
    $self->{+SQLA} //= DBIx::QuickORM::SQLAbstract->new();

    $self->{+COLUMN_TYPE_CACHE} //= {};

    $self->{+TRANSACTION} //= 0;
}

sub active {
    my $self = shift;

    my $dbh = $self->{+DBH} or return 0;
    return 0 unless $$ == $self->{+PID};

    local $@;

    # Cannot use ping if there is an async in progress, so instead of a ping we
    # ask it if the async is ready, if that does not die we return true.
    if (my $async = $self->{+ASYNC}) {
        # Have to break some encaptulation here to avoid infinite recursion
        return 1 if $async->{$async->READY};
        return 1 if eval { $async->{$async->READY} = 1 if $self->{+DB}->async_ready($dbh, $async->sth); 1 };
        warn $@;
        return 0;
    }

    return 1 if eval { $self->ping // 0 };
    warn $@;
    return 0;
}

sub _disconnect_issues {
    my $self = shift;
    my %params = @_;

    my @fatal;
    push @fatal => "transaction" unless $params{ignore_transaction} || !$self->in_transaction;
    push @fatal => "async query" unless $params{ignore_async}       || !$self->{+ASYNC};

    return @fatal;
}

sub dbh {
    my $self = shift;
    my %params = @_;

    return $self->_post_fork(%params) unless $$ == $self->{+PID};

    return $self->{+DBH} if $self->{+DBH} && $self->active;

    if ($self->{+DBH}) {
        my @fatal = $self->_disconnect_issues(%params);
        die "Lost database connection during " . join(' and ', @fatal) if @fatal;

        $self->disconnect(%params, ignore_transaction => 1, ignore_async => 1);

        warn "Lost database connection, reconnecting...\n";
    }

    return $self->{+DBH} = $self->db->connect(dbh_only => 1);
}

sub _post_fork {
    my $self = shift;
    my %params = @_;

    confess "Forked while inside a transaction" if $self->in_transaction && !$params{ignore_transaction};

    $self->reconnect(%params, ignore_transaction => 1, ignore_async => 1);
}

sub disconnect {
    my $self   = shift;
    my %params = @_;

    croak "Attempt to disconnect inside a transaction"       unless $params{ignore_transaction} || !$self->in_transaction;
    croak "Attempt to disconnect with a pending async query" unless $params{ignore_async}       || !$self->{+ASYNC};

    delete $self->{+ASYNC};
    delete $self->{+TRANSACTION};
    my $dbh = delete $self->{+DBH};

    if ($self->{+PID} == $$) {
        if ($dbh) {
            $dbh->disconnect or croak $dbh->errstr;
        }
    }
    else {
        $self->{+PID} = $$;
        delete $self->{+SIDE};
    }

    return $self;
}

sub reconnect {
    my $self = shift;
    my %params = @_;

    $self->disconnect(%params);
    $self->dbh(%params);

    return $self;
}

sub generate_schema {
    my $self = shift;
    require DBIx::QuickORM::Util::SchemaBuilder;
    return DBIx::QuickORM::Util::SchemaBuilder->generate_schema($self);
}

sub generate_table_schema {
    my $self = shift;
    my ($name) = @_;

    my $table = $self->table($name, details => 1);
    require DBIx::QuickORM::Util::SchemaBuilder;
    return DBIx::QuickORM::Util::SchemaBuilder->generate_table($self, $table);
}

#################
# INIT and MISC #
#################

#################
# Async / Aside #
#################

sub supports_async  { my $self = shift; $self->{+DB}->supports_async($self->dbh, @_) }
sub async_query_arg { my $self = shift; $self->{+DB}->async_query_arg($self->dbh, @_) }
sub async_ready     { my $self = shift; $self->{+DB}->async_ready($self->dbh,  @_ ? @_ : $self->{+ASYNC}->sth) }
sub async_result    { my $self = shift; $self->{+DB}->async_result($self->dbh, @_ ? @_ : $self->{+ASYNC}->sth) }
sub async_cancel    { my $self = shift; $self->{+DB}->async_cancel($self->dbh, @_ ? @_ : $self->{+ASYNC}->sth) }

sub async_start {
    my $self = shift;
    my ($async) = @_;
    croak "Already engaged in an async query" if $self->{+ASYNC};
    $self->{+ASYNC} = $async;
}

sub async_stop {
    my $self = shift;
    my ($async) = @_;

    return unless $async;
    return unless $self->{+ASYNC};
    return unless $async == $self->{+ASYNC};

    delete $self->{+ASYNC};
}

sub async_started { $_[0]->{+ASYNC} ? 1 : 0 }

sub busy { $_[0]->{+ASYNC} ? 1 : 0 }

sub add_side_connection { $_[0]->{+SIDE}++ }
sub pop_side_connection { $_[0]->{+SIDE}-- }
sub has_side_connection { $_[0]->{+SIDE} }

#################
# Async / Aside #
#################

################
# Transactions #
################

sub commit_savepoint   { my $self = shift; $self->{+DB}->commit_savepoint($self->dbh, @_) }
sub rollback_savepoint { my $self = shift; $self->{+DB}->rollback_savepoint($self->dbh, @_) }

sub create_savepoint {
    my $self = shift;

    my $in_txn = $self->in_transaction;

    croak 'Connection is already inside a transaction, but it is not controlled by DBIx::QuickORM'
        if $in_txn < 0;

    croak 'Connection is not inside a transaction, cannot use create_savepoint outside of one'
        unless $in_txn;

    croak "Cannot start a transaction while an async query is running"
        if $self->{+ASYNC};

    croak 'Cannot start a transaction while side connections are active (use $sel->ignore_transactions() to bypass)'
        if $self->{+SIDE};

    $self->{+DB}->create_savepoint($self->dbh, @_);
}

sub commit_txn   { my $self = shift; $self->{+DB}->commit_txn($self->dbh, @_);   $self->{+TRANSACTION} = 0 }
sub rollback_txn { my $self = shift; $self->{+DB}->rollback_txn($self->dbh, @_); $self->{+TRANSACTION} = 0 }

sub start_txn {
    my $self = shift;

    my $in_txn = $self->in_transaction;

    croak 'Connection is already inside a transaction, but it is not controlled by DBIx::QuickORM'
        if $in_txn < 0;

    croak 'Already inside a transaction, create_savepoint() should be used instead'
        if $in_txn;

    croak "Cannot start a transaction while an async query is running"
        if $self->{+ASYNC};

    croak 'Cannot start a transaction while side connections are active (use $sel->ignore_transactions() to bypass)'
        if $self->{+SIDE};

    $self->{+DB}->start_txn($self->dbh, @_);

    $self->{+TRANSACTION} = 1;
}

sub in_transaction {
    my $self = shift;

    return 1 if $self->{+TRANSACTION};
    return 0 unless $self->{+DBH};
    return -1 if $self->in_external_transaction;
}

sub in_external_transaction {
    my $self = shift;
    my $dbh = $self->{+DBH} or return 0;

    return $self->{+DB}->in_txn($dbh) ? 1 : 0;
}

################
# Transactions #
################

1;

__END__


1;


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.