Group
Extension

Test2-Harness/lib/Test2/Harness/Instance.pm

package Test2::Harness::Instance;
use strict;
use warnings;

our $VERSION = '2.000006'; # TRIAL

use IO::Select;

use Carp qw/croak/;
use Time::HiRes qw/sleep time/;

use Test2::Harness::Run;

use Test2::Harness::Util qw/mod2file/;
use Test2::Harness::IPC::Util qw/ipc_warn ipc_loop/;
use Test2::Harness::Util::JSON qw/encode_pretty_json/;

use Test2::Harness::Instance::Message;
use Test2::Harness::Instance::Request;
use Test2::Harness::Instance::Response;

use Test2::Harness::Util::HashBase qw{
    +started
    +stop
    <ipc
    <runner
    <scheduler
    <resources
    <plugins

    <log_file

    <terminated
    <cleaned_up

    <wait_time
};

sub stop { $_[0]->{+STOP} = 1 }

sub init {
    my $self = shift;

    $self->{+STARTED} = time;

    $self->{+WAIT_TIME} //= 0.2;

    croak "'log_file' is a required attribute" unless $self->{+LOG_FILE};

    my $scheduler = $self->{+SCHEDULER} or croak "'scheduler' is a required attribute";
    my $runner = $self->{+RUNNER} or croak "'runner' is a required attribute";
    $scheduler->set_runner($runner) unless $scheduler->runner;

    croak "'ipc' is a required attribute" unless $self->{+IPC};

    $self->{+IPC} = [$self->{+IPC}] unless ref($self->{+IPC}) eq 'ARRAY';
}

sub run {
    my $self = shift;
    my ($cb) = @_;

    my $runner    = $self->runner;
    my $scheduler = $self->scheduler;
    $scheduler->start($self->{+IPC});

    my $plugins   = $self->plugins   // [];
    my $resources = $self->resources // [];

    my %args = (instance => $self, scheduler => $scheduler, runner => $runner);
    $_->instance_setup(%args) for @$plugins;

    for my $res (@$resources) {
        $res->setup(%args);
        $res->spawn_process(%args) if $res->spawns_process;
    }

    ipc_loop(
        ipcs      => $self->{+IPC},
        wait_time => $self->{+WAIT_TIME},

        iteration_start => sub {
            $_->tick(type => 'instance') for @$plugins, @$resources;
            return $cb->(@_) if $cb;
            return;
        },

        end_check => sub {
            return 1 if $self->{+TERMINATED};
            return 1 if $runner->terminated;
            return 1 if $scheduler->terminated;
            return 0;
        },

        signals        => sub { $self->terminate("SIG" . $_[0]) },
        iteration_end  => sub { $self->{+SCHEDULER}->advance() ? 1 : 0 },
        handle_request => sub { $self->handle_request(@_) },

        handle_message => sub { },                                   # Intentionally do nothing.
    );

    $_->teardown(%args)          for reverse @$resources;
    $_->instance_teardown(%args) for reverse @$plugins;

    $runner->terminate();
    $scheduler->terminate();

    $_->cleanup(%args)           for @$resources;
    $_->instance_finalize(%args) for @$plugins;

    return;
}

sub handle_request {
    my $self = shift;
    my ($req) = @_;

    my $res;
    unless (eval { $res = $self->_handle_request($req); 1 }) {
        chomp(my $err = $@);

        return Test2::Harness::Instance::Response->new(
            api => {
                success   => 0,
                error     => "There was an exception processing the request",
                exception => $err,
            },
            response_id => $req->request_id,
            response => undef,
        );
    }

    return Test2::Harness::Instance::Response->new(
        api => { success   => 1 },
        response_id => $req->request_id,
        response => $res,
    );
}

sub _handle_request {
    my $self = shift;
    my ($req) = @_;

    my $api_call = $req->api_call;
    my $args = $req->args;

    my $sub = $self->get_api_sub($api_call);

    my $res = $self->$sub($req, $self->parse_request_args($args));

    return $res;
}

sub parse_request_args {
    my $self = shift;
    my ($args) = @_;

    return unless $args;
    my $ref = ref($args);

    return ($args) unless $ref;
    return @$args if $ref eq 'ARRAY';
    return %$args if $ref eq 'HASH';

    die "Invalid API argument format: ($ref) $args.";
}

sub get_api_sub {
    my $self = shift;
    my ($api_call) = @_;

    # In the future we can add the ability for plugins to register new api
    # calls. For now just look for api_XXX methods.
    my $meth = "api_${api_call}";
    return $self->can($meth) // die "'$api_call' is not a valid api call";
}

sub api_preload_blacklist {
    my $self = shift;
    my ($req, @mods) = @_;

    return $self->runner->blacklist(@mods);
}

sub api_log_file { shift->{+LOG_FILE} }

sub api_ping { "pong" }

sub api_pid { $$ }

sub api_reload {
    my $self = shift;
    my ($req) = @_;

    return $self->runner->reload;
}

sub api_overall_status {
    my $self = shift;
    my ($req) = @_;

    return [
        $self->runner->overall_status,
        $self->scheduler->overall_status,
    ];
}

sub api_process_list {
    my $self = shift;
    my ($req) = @_;

    my @list = $self->process_list();

    return \@list;
}

sub process_list {
    my $self = shift;

    return (
        {pid => $$, type => 'instance', name => 'instance', stamp => $self->{+STARTED}},
        $self->runner->process_list,
        $self->scheduler->process_list,
    );
}

sub api_resources {
    my $self = shift;

    my @out;

    for my $resource (@{$self->{+RESOURCES} // []}) {
        my $status_data = $resource->status_data or next;
        push @out => [ref($resource), $status_data],
    }

    return \@out;
}

sub api_spawn {
    my $self = shift;
    my ($req, %spawn) = @_;

    my $runner = $self->runner;
    die "Runner is not capable of spawning.\n" unless $runner->can('spawn');

    return $runner->spawn(\%spawn);
}

sub api_kill {
    my $self = shift;

    $self->scheduler->kill();
    $self->runner->kill();
    $self->terminate('KILL');

    return 1;
}

sub api_abort {
    my $self = shift;

    $self->scheduler->abort();
    $self->runner->abort();

    return 1;
}

sub api_stop {
    my $self = shift;

    $self->scheduler->stop();
    $self->runner->stop();
    $self->stop();

    return 1;
}

sub api_terminate {
    my $self = shift;

    return $self->terminate('API');
}

sub api_set_stage_data {
    my $self = shift;
    my ($req, %stage_data) = @_;

    $self->runner->set_stages(\%stage_data);
}

sub api_set_stage_up {
    my $self = shift;
    my ($req, %params) = @_;

    my $stage = $params{stage} // die "'stage' is required";
    my $pid   = $params{pid} // die "'pid' is required";

    $self->runner->set_stage_up($stage, $pid, $req->connection);
}

sub api_set_stage_down {
    my $self = shift;
    my ($req, %params) = @_;

    my $stage = $params{stage} // die "'stage' is required";
    my $pid   = $params{pid} // die "'pid' is required";
    my $err   = $params{error};

    $self->runner->set_stage_down($stage, $pid, $err);
}

sub api_queue_run {
    my $self = shift;
    my ($req, %run_data) = @_;

    die "This runner has been terminated" if $self->{+TERMINATED};
    die "This runner has been stopped"    if $self->{+STOP};

    my $run = Test2::Harness::Run->new(%run_data);

    my $agg_ipc = $run->aggregator_ipc;

    my $ipc;
    for my $i (@{$self->{+IPC}}) {
        next unless $i->protocol eq $agg_ipc->{protocol};
        $ipc = $i;
        last;
    }

    if ($ipc) {
        $run->set_ipc($ipc);
    }
    else {
        $ipc = $run->ipc;
        push @{$self->{+IPC}} => $ipc;
    }

    $run->set_instance_ipc($ipc->callback);

    my $con = $run->connect;

    $self->scheduler->queue_run($run);

    return $run->run_id;
}

sub api_job_update {
    my $self = shift;
    my ($req, %update) = @_;

    $self->runner->job_update(\%update);
    $self->scheduler->job_update(\%update);

    return 1;
}

sub DESTROY {
    my $self = shift;
    $self->cleanup;
}

sub cleanup {
    my $self = shift;

    return if $self->{+CLEANED_UP}++;

    $self->terminate('CLEANUP');

    eval { $_->terminate() } for @{$self->{+IPC} // []};
}

sub terminate {
    my $self = shift;
    my ($reason) = @_;

    $reason ||= 1;

    unless ($self->{+TERMINATED}) {
        $self->{+TERMINATED} ||= 1;
        $self->scheduler->terminate($self->{+TERMINATED}) if $self->scheduler;
        $self->runner->terminate($self->{+TERMINATED})    if $self->runner;
    }

    return $self->{+TERMINATED};
}

1;

__END__

=pod

=encoding UTF-8

=head1 NAME

Test2::Harness::Instance - FIXME

=head1 DESCRIPTION

=head1 SYNOPSIS

=head1 EXPORTS

=over 4

=back

=head1 SOURCE

The source code repository for Test2-Harness can be found at
L<http://github.com/Test-More/Test2-Harness/>.

=head1 MAINTAINERS

=over 4

=item Chad Granum E<lt>exodist@cpan.orgE<gt>

=back

=head1 AUTHORS

=over 4

=item Chad Granum E<lt>exodist@cpan.orgE<gt>

=back

=head1 COPYRIGHT

Copyright Chad Granum E<lt>exodist7@gmail.comE<gt>.

This program is free software; you can redistribute it and/or
modify it under the same terms as Perl itself.

See L<http://dev.perl.org/licenses/>

=cut


=pod

=cut POD NEEDS AUDIT



Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.