Group
Extension

Test2-Harness-UI/lib/Test2/Harness/UI/Sync.pm

package Test2::Harness::UI::Sync;
use strict;
use warnings;

use DBI;
use Scope::Guard;
use Carp qw/croak/;
use Test2::Harness::Util::JSON qw/encode_json decode_json/;
use Test2::Harness::UI::UUID qw/uuid_inflate gen_uuid/;

our $VERSION = '0.000144';

use Test2::Harness::UI::Util::HashBase;

sub run_delta {
    my $self = shift;
    my ($dbh_a, $dbh_b) = @_;

    my $refa = ref($dbh_a);
    my $refb = ref($dbh_b);

    my $a_runs = $refa eq 'ARRAY' ? $dbh_a : $self->get_runs($dbh_a);
    my $b_runs = $refb eq 'ARRAY' ? $dbh_b : $self->get_runs($dbh_b);

    my %map_a = map {($_ => 1)} @$a_runs;
    my %map_b = map {($_ => 1)} @$b_runs;

    return {
        missing_in_a => [grep { !$map_a{$_} } @$b_runs],
        missing_in_b => [grep { !$map_b{$_} } @$a_runs],
    };
}

sub sync {
    my $self = shift;
    my %params = @_;

    my $from_dbh = $params{from_dbh} or croak "Need a DBH to pull data (from_dbh)";
    my $to_dbh   = $params{to_dbh}   or croak "Need a DBH to push data TO (to_dbh)";
    my $run_ids  = $params{run_ids}  or croak "Need a list of run_id's to sync";

    my $name  = $params{name}  // "$from_dbh -> $to_dbh";
    my $skip  = $params{skip}  // {};
    my $cache = $params{cache} // {};
    my $debug = $params{debug} // 0;

    my $from_uuidf = $params{from_uuid_format} // 'binary';
    my $to_uuidf   = $params{to_uuid_format}   // 'binary';

    my ($rh, $wh);
    pipe($rh, $wh) or die "Could not open pipe: $!";
    $wh->autoflush(1);

    my $pid = fork // die "Could not fork: $!";
    unless ($pid) {
        close($wh);

        my $guard = Scope::Guard->new(sub {
            warn "Scope Leak";
            exit 255;
        });

        $self->read_sync(
            dbh     => $to_dbh,
            run_ids => $run_ids,
            rh      => $rh,
            cache   => $cache,
            debug   => $debug,
            uuidf   => $to_uuidf,
        );

        $guard->dismiss();

        exit 0;
    }

    close($rh);
    $self->write_sync(
        dbh     => $from_dbh,
        run_ids => $run_ids,
        wh      => $wh,
        skip    => $skip,
        debug   => $debug,
        uuidf   => $from_uuidf,
    );
    close($wh);

    die "Loader exited badly" if $self->wait_on($pid => "[Loader] $name");

    return;
}

sub wait_on {
    my $self = shift;
    my ($pid, $desc) = @_;

    my $out = 0;

    my $check = waitpid($pid, 0);
    my $exit = $?;
    if ($check != $pid) {
        warn "$desc waitpid failed: $check (exit: $?)";
        return $exit || 1;
    }

    return 0 unless $exit;
    warn "$desc exited badly: $exit\n";
    return $exit;
}

sub get_runs {
    my $self = shift;
    my ($dbh_or_file) = @_;

    return $self->_get_jsonl_runs($dbh_or_file)
        if $dbh_or_file =~ m/\.jsonl$/ && -f $dbh_or_file;

    return $self->_get_dbh_runs($dbh_or_file);
}

sub _get_jsonl_runs {
    my $self = shift;
    my ($file) = @_;

    my $runs = [];

    open(my $fh, '<', $file) or croak "Could not open file '$file' for reading: $!";
    while (my $line = <$fh>) {
        # We only care about lines that start with this, the format does not allow this to false-positive
        next unless $line =~ m/\{"run":\{.*"run_id":"([^"]+)"/;
        push @$runs => $1;
    }
    close($fh);

    return $runs;
}

sub _get_dbh_runs {
    my $self = shift;
    my ($dbh) = @_;

    my $sth = $dbh->prepare(<<"    EOT");
        SELECT run_id
        FROM   runs
        WHERE  status NOT IN ('pending', 'running', 'broken')
        ORDER  BY added ASC
    EOT

    $sth->execute() or die "MySQL Error: " . $dbh->errstr;

    my @out;

    while (my $run = $sth->fetchrow_arrayref()) {
        push @out => uuid_inflate($run->[0])->string;
    }

    return \@out;
}

sub table_list { qw/runs run_fields jobs job_fields events binaries reporting coverage/ }

sub write_sync {
    my $self = shift;
    my %params = @_;

    my $dbh     = $params{dbh}     or croak "'dbh' is required";
    my $run_ids = $params{run_ids} or croak "'run_ids' must be an arrayref of run ids";
    my $wh      = $params{wh}      or croak "'wh' is required and must be a writable filehandle";
    my $uuidf   = $params{uuidf} // 'binary';
    my $skip    = $params{skip}  // {};
    my $debug   = $params{debug} // 0;

    my @to_dump;
    for my $table ($self->table_list) {
        next if $skip->{$table};
        push @to_dump => "render_${table}";
    }

    $wh->autoflush(1);

    STDOUT->autoflush(1);

    my $total = @$run_ids;
    my $counter = 0;
    my $subcount = 0;
    for my $run_id (@$run_ids) {
        my $run_uuid = uuid_inflate($run_id)->$uuidf;
        my @args = ($dbh, $run_uuid, $skip);

        for my $meth (@to_dump) {
            for my $item ($self->$meth(@args)) {
                $subcount++;
                my ($key) = keys(%$item);
                my $line = encode_json($item);
                print $wh $line, "\n";
            }
        }

        $counter++;
        if ($debug) {
            print "  Dumped run $counter/$total: $run_id\n";
        }
    }

    return;
}

sub read_sync {
    my $self = shift;
    my %params = @_;

    my $dbh     = $params{dbh}     or croak "'dbh' is required";
    my $run_ids = $params{run_ids} or croak "'run_ids' must be an arrayref of run ids";
    my $rh      = $params{rh}      or croak "'rh' is required and must be a readable filehandle";
    my $uuidf   = $params{uuidf} // 'binary';
    my $cache   = $params{cache} // {};
    my $debug   = $params{debug} // 0;

    $dbh->{AutoCommit} = 0;

    my %include = map {($_ => 1)} @$run_ids;
    my $total = @$run_ids;
    my $counter = 0;
    my $last_run_id;
    my $broken;
    while (my $line = <$rh>) {
        my $data = decode_json($line);

        my ($type, @bad) = keys %$data;
        die "Invalid data!" if @bad;

        if ($type eq 'run') {
            $dbh->commit();
            $dbh->{AutoCommit} = 0;

            if ($debug && $last_run_id) {
                if ($broken) {
                    print "  BROKEN run $counter/$total: $last_run_id\n";
                }
                else {
                    print "Imported run $counter/$total: $last_run_id\n";
                }
            }

            $broken = undef;
            my $new_run_id = $data->{$type}->{run_id};

            if ($new_run_id && !$include{$new_run_id}) {
                $last_run_id = undef;
                next;
            }

            $last_run_id = $new_run_id;
            $counter++;
        }

        next if $broken;
        next unless $last_run_id;

        my $method = "import_$type";

        next if eval {
            $self->$method(dbh => $dbh, item => $data->{$type}, uuidf => $uuidf, cache => $cache);
            1;
        };

        $dbh->rollback();
        $broken = $last_run_id;
    }

    $dbh->commit();
    $dbh->disconnect();

    return;
}

sub get_or_create_id {
    my $self = shift;
    my ($cache, $dbh, $uuidf, $table, $field, $via_field, $via_value) = @_;

    return undef unless $via_value;

    return $cache->{$table}{$via_field}{$via_value}{$field} //= $self->_get_or_create_id(@_);
}

sub _get_or_create_id {
    my $self = shift;
    my ($cache, $dbh, $uuidf, $table, $field, $via_field, $via_value) = @_;

    my $sql = "SELECT $field FROM $table WHERE $via_field = ?";

    my $sth = $dbh->prepare($sql);
    $sth->execute($via_value) or die "MySQL Error: " . $dbh->errstr;
    if ($sth->rows) {
        my $row = $sth->fetchrow_hashref();
        return uuid_inflate($row->{$field})->string;
    }

    my $uuid = gen_uuid();
    $self->insert($dbh, $uuidf, $table, {$field => $uuid, $via_field => $via_value});
    return $uuid->string;
}

sub insert {
    my $self = shift;
    my ($dbh, $uuidf, $table, $data) = @_;

    _fix_uuids($uuidf => $data);

    my $sql  = "INSERT INTO $table(";
    my (@fields, @vars);
    for my $key (keys %$data) {
        push @fields => $key;
        push @vars => $data->{$key};
    }
    $sql .= join(',', @fields) . ') VALUES(' . join(',', map { '?' } @vars) . ')';

    my $sth = $dbh->prepare($sql);
    $sth->execute(@vars) or die "Insert failed: " . $dbh->errstr;
}

sub stringify_uuids {
    my ($in) = @_;
    _fix_uuids(string => $in);
}

sub binarify_uuids {
    my ($in) = @_;
    _fix_uuids(binary => $in);
}

my @ID_FIELDS = qw{
    coverage_id
    coverage_manager_id
    event_id
    job_field_id
    job_id
    job_key
    parent_id
    project_id
    reporting_id
    run_field_id
    run_id
    source_file_id
    source_sub_id
    test_file_id
    user_id
};

sub _fix_uuids {
    my ($method => $in) = @_;
    return unless $in;
    my $type = ref($in);

    if (!$type) {
        return uuid_inflate($in)->$method;
    }
    if ($type eq 'Test2::Harness::UI::UUID') {
        return $in->$method;
    }
    if ($type eq 'HASH') {
        # Cannot do all_id or _key fields, some are not uuids...
        # This is a list of safe ones
        for my $key (@ID_FIELDS) {
            next unless exists $in->{$key};
            $in->{$key} = _fix_uuids($method, $in->{$key});
        }
    }
    elsif($type eq 'ARRAY') {
        _fix_uuids($method, $_) for @$in;
    }
    else {
        die "Unsupported type '$type' '$in'";
    }

    return $in;
}

sub render_runs {
    my $self = shift;
    my ($dbh, $run_id, $skip) = @_;

    my $sth = $dbh->prepare(<<"    EOT");
        SELECT
            run_id, status, worker_id, error, added, duration, mode, buffer,
            passed, failed, retried, concurrency, parameters, has_coverage,
            users.username, projects.name as project_name
        FROM runs
        JOIN users    USING(user_id)
        JOIN projects USING(project_id)
        WHERE run_id = ?
    EOT

    $sth->execute($run_id) or die "MySQL Error: " . $dbh->errstr;

    my $run = stringify_uuids($sth->fetchrow_hashref());
    delete $run->{has_coverage} if $skip->{coverage};

    return {run => $run};
}

sub render_run_fields {
    my $self = shift;
    my ($dbh, $run_id) = @_;

    my $sth = $dbh->prepare(<<"    EOT");
        SELECT run_id, run_field_id, name, data, details, raw, link
          FROM run_fields
         WHERE run_id = ?
    EOT
    $sth->execute($run_id) or die "MySQL Error: " . $dbh->errstr;
    my $run_fields = stringify_uuids($sth->fetchall_arrayref({}));

    return map { +{run_field => $_} } @$run_fields;
}

sub render_jobs {
    my $self = shift;
    my ($dbh, $run_id) = @_;

    my $sth = $dbh->prepare(<<"    EOT");
        SELECT
                run_id, job_key, job_id, job_try, job_ord, is_harness_out, status, parameters, fields, name, fail, retry,
                exit_code, launch, start, ended, duration, pass_count, fail_count,
                test_files.filename
          FROM jobs
          JOIN test_files USING (test_file_id)
         WHERE run_id = ?
    EOT

    $sth->execute($run_id) or die "MySQL Error: " . $dbh->errstr;
    my $jobs = stringify_uuids($sth->fetchall_arrayref({}));
    return map { +{job => $_} } @$jobs;
}

sub render_job_fields {
    my $self = shift;
    my ($dbh, $run_id) = @_;

    my $sth = $dbh->prepare(<<"    EOT");
        SELECT job_field_id, job_key, job_fields.name as name, data, details, raw, link
          FROM job_fields
          JOIN jobs USING(job_key)
         WHERE run_id = ?
    EOT

    $sth->execute($run_id) or die "MySQL Error: " . $dbh->errstr;
    my $job_fields = stringify_uuids($sth->fetchall_arrayref({}));
    return map { +{job_field => $_} } @$job_fields;
}

sub render_events {
    my $self = shift;
    my ($dbh, $run_id) = @_;

    my $sth = $dbh->prepare(<<"    EOT");
        SELECT events.*
          FROM events
          JOIN jobs USING(job_key)
         WHERE run_id = ?
           AND is_subtest = TRUE
    EOT

    $sth->execute($run_id) or die "MySQL Error: " . $dbh->errstr;
    my $events = stringify_uuids($sth->fetchall_arrayref({}));
    return map { +{event => $_} } @$events;
}

sub render_binaries {
    my $self = shift;
    my ($dbh, $run_id) = @_;

    my $sth = $dbh->prepare(<<"    EOT");
        SELECT binaries.*
          FROM binaries
          JOIN events USING(event_id)
          JOIN jobs USING(job_key)
         WHERE run_id = ?
           AND is_subtest = TRUE
    EOT

    $sth->execute($run_id) or die "MySQL Error: " . $dbh->errstr;
    my $binaries = stringify_uuids($sth->fetchall_arrayref({}));
    return map { +{binary => $_} } @$binaries;
}

sub render_reporting {
    my $self = shift;
    my ($dbh, $run_id) = @_;

    my $sth = $dbh->prepare(<<"    EOT");
        SELECT
                reporting_id, run_id, run_ord, job_try, subtest, duration, fail, pass, retry, abort, job_key, event_id,
                projects.name AS project_name,
                users.username AS username,
                test_files.filename AS filename
          FROM reporting
          JOIN projects   USING(project_id)
          JOIN users      USING(user_id)
          JOIN test_files USING(test_file_id)
         WHERE run_id = ?
    EOT
    $sth->execute($run_id) or die "MySQL Error: " . $dbh->errstr;
    my $reporting = stringify_uuids($sth->fetchall_arrayref({}));
    return map { +{reporting => $_} } @$reporting;
}

sub render_coverage {
    my $self = shift;
    my ($dbh, $run_id) = @_;

    my $sth = $dbh->prepare(<<"    EOT");
        SELECT
                coverage_id, run_id, job_key, metadata,
                test_files.filename AS test_file,
                source_files.filename AS source_file,
                source_subs.subname AS source_sub,
                coverage_manager.package AS coverage_manager
          FROM coverage
          JOIN test_files       USING(test_file_id)
          JOIN source_files     USING(source_file_id)
          JOIN source_subs      USING(source_sub_id)
          JOIN coverage_manager USING(coverage_manager_id)
         WHERE run_id = ?
    EOT

    $sth->execute($run_id) or die "MySQL Error: " . $dbh->errstr;
    my $coverage = stringify_uuids($sth->fetchall_arrayref({}));
    return map { +{coverage => $_} } @$coverage;
}

sub import_run {
    my $self   = shift;
    my %params = @_;

    my $dbh   = $params{dbh};
    my $uuidf = $params{uuidf};
    my $cache = $params{cache};
    my $run   = $params{item};

    $run->{user_id}    = $self->get_or_create_id($cache, $dbh, $uuidf, 'users'    => 'user_id',    username => delete $run->{username});
    $run->{project_id} = $self->get_or_create_id($cache, $dbh, $uuidf, 'projects' => 'project_id', name     => delete $run->{project_name});

    $self->insert($dbh, $uuidf, runs => $run);
}

sub import_run_field {
    my $self   = shift;
    my %params = @_;

    my $dbh       = $params{dbh};
    my $uuidf     = $params{uuidf};
    my $run_field = $params{item};

    $self->insert($dbh, $uuidf, run_fields => $run_field);
}

sub import_job {
    my $self = shift;
    my %params = @_;

    my $dbh   = $params{dbh};
    my $uuidf = $params{uuidf};
    my $cache = $params{cache};
    my $job   = $params{item};

    $job->{test_file_id} = $self->get_or_create_id($cache, $dbh, $uuidf, 'test_files' => 'test_file_id', filename => delete $job->{filename});

    $self->insert($dbh, $uuidf, jobs => $job);
}

sub import_job_field {
    my $self   = shift;
    my %params = @_;

    my $dbh       = $params{dbh};
    my $uuidf     = $params{uuidf};
    my $job_field = $params{item};

    $self->insert($dbh, $uuidf, job_fields => $job_field);
}

sub import_event {
    my $self = shift;
    my %params = @_;

    my $dbh   = $params{dbh};
    my $uuidf = $params{uuidf};
    my $event = $params{item};

    $self->insert($dbh, $uuidf, events => $event);
}

sub import_binary {
    my $self = shift;
    my %params = @_;

    my $dbh    = $params{dbh};
    my $uuidf  = $params{uuidf};
    my $binary = $params{item};

    $self->insert($dbh, $uuidf, binaries => $binary);
}

sub import_reporting {
    my $self = shift;
    my %params = @_;

    my $dbh       = $params{dbh};
    my $uuidf     = $params{uuidf};
    my $cache     = $params{cache};
    my $reporting = $params{item};

    $reporting->{project_id}   = $self->get_or_create_id($cache, $dbh, $uuidf, 'projects'   => 'project_id',   name     => delete $reporting->{project_name});
    $reporting->{user_id}      = $self->get_or_create_id($cache, $dbh, $uuidf, 'users'      => 'user_id',      username => delete $reporting->{username});
    $reporting->{test_file_id} = $self->get_or_create_id($cache, $dbh, $uuidf, 'test_files' => 'test_file_id', filename => delete $reporting->{filename});

    $self->insert($dbh, $uuidf, reporting => $reporting);
}

sub import_coverage {
    my $self = shift;
    my %params = @_;

    my $dbh      = $params{dbh};
    my $uuidf    = $params{uuidf};
    my $cache    = $params{cache};
    my $coverage = $params{item};

    $coverage->{test_file_id}        = $self->get_or_create_id($cache, $dbh, $uuidf, 'test_files'       => 'test_file_id',        filename => delete $coverage->{test_file});
    $coverage->{source_file_id}      = $self->get_or_create_id($cache, $dbh, $uuidf, 'source_files'     => 'source_file_id',      filename => delete $coverage->{source_file});
    $coverage->{source_sub_id}       = $self->get_or_create_id($cache, $dbh, $uuidf, 'source_subs'      => 'source_sub_id',       subname  => delete $coverage->{source_sub});
    $coverage->{coverage_manager_id} = $self->get_or_create_id($cache, $dbh, $uuidf, 'coverage_manager' => 'coverage_manager_id', package  => delete $coverage->{coverage_manager});

    $self->insert($dbh, $uuidf, coverage => $coverage);
}

1;

__END__

=pod

=encoding UTF-8

=head1 NAME

Test2::Harness::UI::Sync - Module for dumping, loading, and syncing yathui databases

=head1 DESCRIPTION

If you need to migrate data between databases, merge databases, or sync
databases, this is the module for you.

This module will sync the essential data, while re-mapping items that may
already be present in the destination database, such as usernames, test file
names, etc, all of which may have different uuids in the new database.

Care is taken to preserve the uuids of runs, jobs, events, etc.

=head1 SYNOPSIS

    use Test2::Harness::UI::Sync;

    my $source_dbh = ...;
    my $dest_dbh   = ...;

    my $sync = Test2::Harness::UI::Sync->new();

    my $delta = $sync->run_delta($source_dbh, $dest_dbh);

    # Do the work
    $sync->sync(
        from_dbh => $source_dbh,
        to_dbh   => $dest_dbh,
        run_ids  => $delta->{missing_in_b},

        debug => 1,    # Print a notice for each dumped run_id
    );

=head1 METHODS

=over 4

=item @list = $sync->table_list()

Get a list of tables that can be synced.

=item $run_ids = $sync->get_runs($dbh)

=item $run_ids = $sync->get_runs($jsonl_file)

Get all the run_ids from a database or jsonl file.

=item $delta = $sync->run_delta($dbh_a, $dbh_b)

Get lists of run_ids that exist in only one of the two provided databases.

    {
        missing_in_a => \@run_ids_a,
        missing_in_b => \@run_ids_b,
    }

=item $sync->sync(...)

Copy data from the source database to the destination database.

    $sync->sync(
        from_dbh => $from_dbh,    # Source database
        to_dbh   => $to_dbh,      # Destination database
        run_ids  => \@run_ids,    # list of run_ids to sync

        name  => "",              # Optional name for this operation in debugging/errors
        skip  => {},              # Optional hashref of (TABLE => bool) for tables to skip
        cache => {},              # Optional uuid cache map.
        debug => 0,               # Optional, turn on for verbosity

        from_uuid_format => 'binary',    # Defaults to 'binary' may be 'string' for older databases
        to_uuid_format   => 'binary',    # Defaults to 'binary' may be 'string' for older databases
    );

=item $sync->write_sync(...)

Output the data to jsonl format.

    $sync->write_sync(
        dbh     => $dbh,        # Source database
        run_ids => $run_ids,    # list of run_ids to sync
        wh      => $wh,         # Where to print the jsonl data
        uuidf   => $uuidf,      # UUID format, defaults to 'binary', 'string' is also valid.
        skip    => $skip,       # Optional hashref of (TABLE => bool) for tables to skip
        debug   => 0,           # Optional, turn on for verbosity
    );

=item $sync->read_sync(...)

Read the jsonl data and insert it into the database.

    $sync->read_sync(
        dbh     => $dbh,        # Destination database
        run_ids => $run_ids,    # list of run_ids to sync
        rh      => $rh,         # Where to read the jsonl data
        uuidf   => $uuidf,      # UUID format, defaults to 'binary', 'string' is also valid.
        cache   => $cache,      # Optional uuid cache map.
        debug   => 0,           # Optional, turn on for verbosity
    );

=item $uuid = $sync->get_or_create_id($cache, $dbh, $uuidf, $table, $uuid_field, $value_field, $value)

Create or find a common link in the database (think project, user, etc).

    my $uuid = $sync->get_or_create_id(
        $cache, $dbh, 'binary',
        users    => 'user_id',
        username => 'bob',
    );

=item $sync->insert($dbh, $uuidf, $table, $data)

Insert $data as a row into $table using the $uuidf uuid format.

=item $sync->stringify_uuids($thing)

Takes a string or nested data structure, will convert uuid's in id fields to the string form.

=item $sync->binarify_uuids($thing)

Takes a string or nested data structure, will convert uuid's in id fields to the binary form.

=back

=head1 SOURCE

The source code repository for Test2-Harness-UI can be found at
F<http://github.com/Test-More/Test2-Harness-UI/>.

=head1 MAINTAINERS

=over 4

=item Chad Granum E<lt>exodist@cpan.orgE<gt>

=back

=head1 AUTHORS

=over 4

=item Chad Granum E<lt>exodist@cpan.orgE<gt>

=back

=head1 COPYRIGHT

Copyright 2023 Chad Granum E<lt>exodist7@gmail.comE<gt>.

This program is free software; you can redistribute it and/or
modify it under the same terms as Perl itself.

See F<http://dev.perl.org/licenses/>

=cut


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.