Group
Extension

Database-Async/lib/Database/Async/ORM.pm

package Database::Async::ORM;

use strict;
use warnings;

our $VERSION = '0.019'; # VERSION

=head1 NAME

Database::Async::ORM - provides object-relational features for L<Database::Async>

=head1 SYNOPSIS

 use 5.020;
 use IO::Async::Loop;
 use Database::Async::ORM;
 my $loop = IO::Async::Loop->new;
 $loop->add(
  my $orm = Database::Async::ORM->new
 );

 # Load schemata directly from the database
 $orm->load_from($db)
  ->then(sub {
   say 'We have the following tables:';
   $orm->tables
       ->map('name')
       ->say
       ->completed
  })->get;

 # Load schemata from a hashref (e.g. pulled
 # from a YAML/JSON/XML file or API)
 $orm->load_from({ ... })
  ->then(sub {
   $orm->apply_to($db)
  })->then(sub {
   say 'We have the following tables:';
   $orm->tables
       ->map('name')
       ->say
       ->completed
  })->get;

=cut

use Future;
use Future::AsyncAwait;
use Syntax::Keyword::Try;
use Path::Tiny;
use List::Util qw(sum0);
use Scalar::Util qw(blessed);

use Database::Async::ORM::Schema;
use Database::Async::ORM::Type;
use Database::Async::ORM::Table;
use Database::Async::ORM::Field;
use Database::Async::ORM::Constraint;
use Database::Async::ORM::Extension;

use Log::Any qw($log);

sub new {
    my $class = shift;
    bless {
        schema => [],
        extension => [],
        @_
    }, $class
}

sub add_schema {
    my ($self, $schema) = @_;
    push $self->{schema}->@*, $schema;
}

sub add_extension {
    my ($self, $extension) = @_;
    push $self->{extension}->@*, $extension;
}

sub schemata {
    shift->{schema}->@*
}

sub schema_list {
    shift->{schema}->@*
}

sub extension_list {
    shift->{extension}->@*
}

sub schema_by_name {
    my ($self, $name) = @_;
    my ($schema) = grep { $_->name eq $name } $self->schemata or die 'cannot find schema ' . $name . ', have these instead: ' . join(',', map $_->name, $self->schemata);
    return $schema;
}

sub schema_definitions { shift->{schema_definitions} //= {} }

# Currently hardcoded to PostgreSQL, eventually we should be able to query
# the engine for this information.
sub ddl_for {
    require Database::Async::Engine::PostgreSQL::DDL;
    return Database::Async::Engine::PostgreSQL::DDL->new;
}

async sub apply_database_changes {
    my ($self, $db, @actions) = @_;

    my $ddl = $self->ddl_for($db);

    # Optional extensions first, and we don't care if any fail
    for my $ext ($self->extension_list) {
        my ($name) = $ext->name;
        try {
            die 'invalid name for extension: ' . $name unless $name =~ /^[a-zA-Z0-9_-]+$/;
            await $db->query(qq{create extension if not exists "$name" cascade})->void if $ext->is_optional;
        } catch {
            $log->warnf('Failed to install optional extension %s, ignoring: %s', $name, $@);
        }
    }

    # All remaining steps are in a single transaction
    await $db->query(q{begin})->void;

    for my $ext (grep { not $_->is_optional } $self->extension_list) {
        my ($name) = $ext->name;
        await $db->query(qq{create extension if not exists "$name" cascade})->void;
    }

    my @out;
    for my $action (@actions) {
        if($action->isa('Database::Async::ORM::Table')) {
            $log->tracef('Table definition for %s', $action->name);
            my ($sql, @bind) = $ddl->table_info($action);
            my %map = (
                schema => $action->schema->name,
                table  => $action->name
            );
            my @data = map { $map{$_} } @bind;
            my (@fields) = await $db->query(
                $sql => @data
            )->row_hashrefs
             ->as_list;
            push @out, $ddl->create_table($action) unless @fields;
        } elsif($action->isa('Database::Async::ORM::Schema')) {
            $log->tracef('Create schema %s', $action->name);
            my ($sql, @bind) = $ddl->schema_info($action);
            my %map = (
                schema => $action->name,
            );
            my @data = map { $map{$_} } @bind;
            my (@schema) = await $db->query(
                $sql => @data
            )->row_hashrefs
             ->as_list;
            push @out, $ddl->create_schema($action) unless @schema;
        } elsif($action->isa('Database::Async::ORM::Type')) {
            $log->tracef('Create type %s', $action->name);
            my ($sql, @bind) = $ddl->type_info($action);
            my %map = (
                schema => $action->schema->name,
                type   => $action->name
            );
            my @data = map { $map{$_} } @bind;
            my ($existing_type) = await $db->query(
                $sql => @data
            )->row_hashrefs
             ->as_list;
            push @out, $ddl->create_type($action) unless $existing_type;
        } else {
            die 'unknown thing ' . $action;
        }
    }

    # Make sure that we have no empty queries in the list... should not be necessary,
    # perhaps this should just bail out instead.
    @out = grep { length } @out;

    $log->debugf('Applying %d pending database migrations', 0 + @out);
    for my $query (@out) {
        $log->tracef('Apply SQL: %s', $query);
        await $db->query($query)->void;
    }

    await $db->query(q{commit})->void;
    $log->debugf('Applied %d database migrations', 0 + @out);
    return;
}

sub database_changes_as_sql {
    my ($self, $db, @actions) = @_;

    my $ddl = $self->ddl_for($db);

    my @out;
    # Optional extensions first, and we don't care if any fail
    for my $ext ($self->extension_list) {
        my ($name) = $ext->name;
        try {
            die 'invalid name for extension: ' . $name unless $name =~ /^[a-zA-Z0-9_-]+$/;
            push @out, qq{create extension if not exists "$name" cascade} if $ext->is_optional;
        } catch {
            $log->warnf('Failed to install optional extension %s, ignoring: %s', $name, $@);
        }
    }

    # All remaining steps are in a single transaction
    push @out, q{begin};

    for my $ext (grep { not $_->is_optional } $self->extension_list) {
        my ($name) = $ext->name;
        die 'invalid name for extension: ' . $name unless $name =~ /^[a-zA-Z0-9_-]+$/;
        push @out, qq{create extension if not exists "$name" cascade};
    }

    for my $action (@actions) {
        if($action->isa('Database::Async::ORM::Table')) {
            $log->tracef('Create table %s', $action->name);
            push @out, $ddl->create_table($action);
        } elsif($action->isa('Database::Async::ORM::Schema')) {
            $log->tracef('Create schema %s', $action->name);
            push @out, $ddl->create_schema($action);
        } elsif($action->isa('Database::Async::ORM::Type')) {
            $log->tracef('Create type %s', $action->name);
            push @out, $ddl->create_type($action);
        } else {
            die 'unknown thing ' . $action;
        }
    }

    push @out, q{commit};

    # Make sure that we have no empty queries in the list... should not be necessary,
    # perhaps this should just bail out instead.
    return grep { length } @out;
}

=head2 load_from

Loads schema, tables, types and any other available objects from
a source - currently supports the following:

=over 4

=item * hashref

=item * YAML file

=item * directory of YAML files

=back

You can call this multiple times to accumulate objects from various
different sources.

Returns the current L<Database::Async::ORM> instance.

=cut

sub load_from {
    my ($self, $source, $loader) = @_;
    die 'needs a source to load from' unless defined $source;

    my $cfg = ref($source) ? $source : $self->read_from($source, $loader);
    $self->{schema_definitions} = $cfg;
    $log->tracef('Loaded config %s', $cfg);

    my @pending;

    for my $extension_name ($cfg->{extensions}{required}->@*) {
        my $extension = Database::Async::ORM::Extension->new(
            defined_in => $cfg->{extensions}{defined_in},
            name       => $extension_name,
            optional   => 0,
        );
        $self->add_extension($extension);
    }
    for my $extension_name ($cfg->{extensions}{optional}->@*) {
        my $extension = Database::Async::ORM::Extension->new(
            defined_in => $cfg->{extensions}{defined_in},
            name       => $extension_name,
            optional   => 1,
        );
        $self->add_extension($extension);
    }

    my %pending = (type => []);
    for my $schema_name (sort keys $cfg->{schema}->%*) {
        $log->debugf('%s', $schema_name);
        my $schema_details = $cfg->{schema}{$schema_name};

        my $schema = Database::Async::ORM::Schema->new(
            defined_in => $schema_details->{defined_in},
            name       => $schema_name
        );
        $self->add_schema($schema);
        push @pending, $schema;

        for my $type_name (sort keys $schema_details->{types}->%*) {
            my $type_details = $schema_details->{types}{$type_name};
            push $pending{type}->@*, {
                schema  => $schema,
                name    => $type_name,
                details => $type_details,
            }
        }

        for my $table_name (sort keys $schema_details->{tables}->%*) {
            my $table_details = $schema_details->{tables}{$table_name};
            for($table_details->{fields}->@*) {
                $_->{nullable} = 1 unless exists $_->{nullable}
            }
            push $pending{table}->@*, {
                schema  => $schema,
                name    => $table_name,
                details => $table_details,
            }
        }
    }

    my $found = 0;
    my @missing;
    while(sum0 map { 0 + @$_ } values %pending) {
        @missing = ();
        $log->tracef('Have %d pending types to check', 0 + $pending{type}->@*);
        for my $item (splice $pending{type}->@*) {
            my $type_name = $item->{name};
            my $type_details = $item->{details};
            my $schema = $item->{schema};
            try {
                $log->debugf('Add type %s as %s', $type_name, $type_details);
                my @fields;
                for my $field_details ($type_details->{fields}->@*) {
                    my $type = $field_details->{type};
                    if(ref $type) {
                        $type = $self->schema_by_name(
                            $type->{schema}
                        )->type_by_name(
                            $type->{name}
                        )
                    } else {
                        $type = $schema->type_by_name($type);
                    }
                    push @fields, Database::Async::ORM::Field->new(
                        type       => $type,
                        name       => $field_details->{name},
                        attributes => $field_details->{attributes},
                        nullable   => 1,
                        default    => $field_details->{default},
                    )
                }
                my $type = Database::Async::ORM::Type->new(
                    defined_in  => $type_details->{defined_in},
                    name        => $type_name,
                    schema      => $schema,
                    type        => $type_details->{type} // 'enum',
                    description => $type_details->{description},
                    values      => $type_details->{data},
                    (exists $type_details->{is} ? (basis => $type_details->{is}) : ()),
                    fields      => \@fields,
                );
                $schema->add_type($type);
                push @pending, $type;
                ++$found;
            } catch {
                $log->tracef('Failed to apply %s.%s - %s, moved to pending',
                    $schema->name,
                    $type_name,
                    $@
                );
                push @missing, {
                    schema => $schema->name,
                    name   => $type_name,
                    type   => 'type',
                    error  => $@
                };
                push $pending{type}->@*, $item;
            }
        }

        $log->tracef('Have %d pending tables to check', 0 + $pending{table}->@*);

        for my $item (splice $pending{table}->@*) {
            my $table_name = $item->{name};
            my $table_details = $item->{details};
            my $schema = $item->{schema};
            try {
                my @parents;
                if(my $parents = $item->{details}{parents}) {
                    for my $parent (@$parents) {
                        # For convenience, we allow strings for tables in the current schema
                        my $details = ref $parent ? $parent : { name => $parent };
                        $log->tracef('Parent table is %s', $details);
                        my $target_schema = $self->schema_by_name($details->{schema} // $schema->name);
                        push @parents, (
                            $target_schema->table_by_name($details->{name})
                                or die 'parent table ' . $details->{name} . ' not found in schema ' . $target_schema->name
                        );
                    }
                }
                if(my $constraints = $item->{details}{constraints}) {
                    for my $fk (@$constraints) {
                        next unless $fk->{type} eq 'foreign_key';
                        die 'FK table ' . $fk->{references}{table} . ' not found' unless $schema->table_by_name($fk->{references}{table});
                    }
                }
                my $table = $self->populate_table(
                    schema  => $schema,
                    details => $table_details,
                    name    => $table_name,
                    parents => \@parents,
                );
                push @pending, $table;
                ++$found;
            } catch {
                $log->tracef('Failed to apply %s.%s - %s, moved to pending',
                    $schema->name,
                    $table_name,
                    $@
                );
                push @missing, {
                    schema => $schema->name,
                    name   => $table_name,
                    type   => 'table',
                    error  => $@
                };
                push $pending{table}->@*, $item;
            }
        }
    } continue {
        if(@missing and not $found) {
            $log->error('Currently pending items:');
            s/\v+$// for map $_->{error}, @missing;
            $log->errorf('- %s.%s (%s) - %s', $_->{schema}, $_->{name}, $_->{type}, $_->{error}) for @missing;
            die 'Unable to resolve dependencies, bailing out';
        }
        $found = 0;
    }
    return Future->done(@pending);
}

=head2 METHODS - Internal

These are used by L<Database::Async::ORM> and the precise API details
may change in future.

=cut

=head2 populate_table

Populates a L<Database::Async::ORM::Table> instance.

=cut

sub populate_table {
    my ($self, %args) = @_;
    my $table_name = $args{name};
    my $table_details = $args{details};
    my $schema = $args{schema};
    $log->tracef('Add table %s as %s', $table_name, $table_details);
    my $table = Database::Async::ORM::Table->new(
        defined_in   => $table_details->{defined_in},
        name         => $table_name,
        schema       => $schema,
        table        => $table_details->{table} // 'enum',
        description  => $table_details->{description},
        values       => $table_details->{data},
        parents      => $args{parents},
        primary_keys => $table_details->{primary_keys},
    );
    for my $field_details ($table_details->{fields}->@*) {
        my $type = $field_details->{type};
        if(ref $type) {
            $type = $self->schema_by_name(
                $type->{schema}
            )->type_by_name(
                $type->{name}
            )
        } else {
            $type = $schema->type_by_name($type);
        }
        my $field = Database::Async::ORM::Field->new(
            defined_in => $table_details->{defined_in},
            table      => $table,
            type       => $type,
            %{$field_details}{grep { exists $field_details->{$_} } qw(name description nullable default attributes)}
        );
        $log->tracef('Add field %s as %s with type %s', $field->name, $field_details, $field->type);
        push $table->{fields}->@*, $field;
    }
    for my $constraint_details ($table_details->{constraints}->@*) {
        my $constraint = Database::Async::ORM::Constraint->new(
            defined_in => $table_details->{defined_in},
            table      => $table,
            %{$constraint_details}{grep { exists $constraint_details->{$_} } qw(name type deferrable initially_deferred fields references)}
        );
        $log->tracef('Add constraint %s as %s with type %s', $constraint->name, $constraint_details, $constraint->type);
        push $table->{constraints}->@*, $constraint;
    }
    $schema->add_table($table);
    return $table;
}

=head2 read_from

Reads data from a file or recursively from a base path.

=cut

sub read_from {
    my ($self, $source, $loader) = @_;
    die 'needs a source to load from' unless defined $source;

    my $base = path($source);
    die "$source does not exist" unless $base->exists;

    $loader //= sub {
        my ($path) = @_;
        if($path->basename ne $path->basename(qw(.yaml .yml))) {
            require YAML::XS;
            return YAML::XS::LoadFile("$path")
        } elsif($path->basename ne $path->basename(qw(.yaml .yml))) {
            require JSON::MaybeXS;
            return JSON::MaybeXS->new->decode($path->slurp_utf8);
        } else {
            die 'Unknown file type for ' . $path;
        }
    };

    return $self->load_from_file(undef, $base, $loader) unless $base->is_dir;

    # Merge in the data from all files recursively.
    # For example, schema/personal/tables/address.yml would populate
    # the {schema}->{personal}->{tables}->{address} element.
    $base->visit(sub {
        my $file = $_;
        unless($_->is_file) {
            $log->tracef('Skipping %s since it is not a file', "$_");
            return;
        }

        $self->load_from_file($base, $file, $loader);
    }, {
        recurse => 1,
        follow_symlinks => 1
    });
    return $self->schema_definitions;
}

sub load_from_file {
    my ($self, $base, $file, $loader) = @_;
    my $cfg = $self->schema_definitions;

    # Strip off the base prefix so that we have something that matches our
    # desired hash data path
    my $relative = $base ? substr $file, 1 + length($base->stringify) : '';

    # Also drop any file extensions
    $relative =~ s{\.[^.]+$}{};

    # We now want to recurse into our configuration data stucture to the appropriate level.
    my $target = do {
        my $target = $cfg;
        my (@path) = split qr{/}, $relative;
        $target = ($target->{$_} //= {}) for @path;
        $target
    };

    # So at this point, $target indicates where we should load data into our structure.
    # For now, we're blindly overwriting, but ideally we should merge the data structure
    # recursively with the elements in the file.
    $log->debugf('Pulling in configuration from %s', join '.', split qr{/}, $relative);
    my $file_data = $loader->($file);
    if(ref($file_data) eq 'ARRAY') {
        push @$target, @$file_data;
    } elsif(ref($file_data) eq 'HASH') {
        $target->{defined_in} = $base ? substr $file, 1 + length($base->stringify) : $file;
        @{$target}{keys %$file_data} = values %$file_data;
    } else {
        die 'Unknown data type in file ' . $file . ' - ' . ref($file_data) . " (actual value $file_data)";
    }
    $cfg;
}

1;

__END__

=head1 SEE ALSO

=head1 AUTHOR

Tom Molesworth C<< <TEAM@cpan.org> >>

=head1 LICENSE

Copyright Tom Molesworth 2011-2023. Licensed under the same terms as Perl itself.



Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.