Group
Extension

Test2-Harness/blib/lib/Test2/Harness/Collector.pm

package Test2::Harness::Collector;
use strict;
use warnings;

use IO::Select;
use File::Spec;
use Atomic::Pipe;

use Carp qw/croak/;
use POSIX ":sys_wait_h";
use File::Temp qw/tempfile/;
use File::Path qw/remove_tree/;
use Time::HiRes qw/time sleep/;

use Test2::Harness::Collector::Auditor::Job;
use Test2::Harness::Collector::IOParser::Stream;

use Test2::Harness::Util qw/mod2file parse_exit open_file chmod_tmp/;
use Test2::Harness::Util::JSON qw/decode_json encode_json decode_json_file/;
use Test2::Harness::IPC::Util qw/pid_is_running swap_io start_process ipc_connect ipc_loop inflate set_procname/;

BEGIN {
    if (eval { require Linux::Inotify2; 1 }) {
        *USE_INOTIFY = sub() { 1 };
    }
    else {
        *USE_INOTIFY = sub() { 0 };
    }
}

our $VERSION = '2.000004';

use Test2::Harness::Util::HashBase qw{
    merge_outputs

    <encoding
    <handles
    <peeks

    <end_callback

    <step
    <parser
    <auditor
    <output
    <output_cb

    root_pid

    <run
    <job
    <test_settings
    <command

    <workdir

    <start_times

    <run_id
    <job_id
    <job_try
    <skip

    +clean
    +buffer

    <tempdir

    <interactive
    <always_flush
};

sub init {
    my $self = shift;

    croak "'parser' is a required attribute"
        unless $self->{+PARSER};

    croak "'output' is a required attribute"
        unless $self->{+OUTPUT};

    my $ref = ref($self->{+OUTPUT});

    if ($ref eq 'CODE') {
        $self->{+OUTPUT_CB} = $self->{+OUTPUT};
    }
    elsif ($ref eq 'GLOB') {
        my $fh = $self->{+OUTPUT};
        $self->{+OUTPUT_CB} = sub { print $fh encode_json($_) . "\n" for @_ };
    }
    elsif ($self->{+OUTPUT}->isa('Atomic::Pipe')) {
        my $wp = $self->{+OUTPUT};
        $self->{+OUTPUT_CB} = sub { $wp->write_message(encode_json($_)) for @_ };
    }
    else {
        croak "Unknown output type: $self->{+OUTPUT} ($ref)";
    }

    $self->{+START_TIMES} //= [times()];

    $self->{+RUN_ID}        //= 0;
    $self->{+JOB_ID}        //= 0;
    $self->{+JOB_TRY}       //= 0;
    $self->{+MERGE_OUTPUTS} //= 0;

    my ($out_r, $out_w) = Atomic::Pipe->pair(mixed_data_mode => 1);
    my ($err_r, $err_w) = $self->{+MERGE_OUTPUTS} ? ($out_r, $out_w) : Atomic::Pipe->pair(mixed_data_mode => 1);

    $self->{+HANDLES} = {
        out_r => $out_r,
        out_w => $out_w,
        err_r => $err_r,
        err_w => $err_w,
    };
}

my $warned = 0;
sub collect {
    my $class = shift;
    my %params;

    if (@_ == 1) {
        my ($in) = @_;
        if ($in =~ m/\.json$/ || -f $in) {
            %params = %{decode_json_file($in, unlink => 1)};
        }
        else {
            %params = %{decode_json($in)};
        }
    }
    else {
        %params = @_;
    }

    die "No root pid" unless $params{root_pid};
    $class->setsid if $params{setsid};

    my ($self, $cb, $cleanup, @ipc);
    if ($params{job}) {
        ($self, $cb, $cleanup, @ipc) = $class->collect_job(%params);
    }
    elsif ($params{command}) {
        ($self, $cb, @ipc) = $class->collect_command(%params);
    }
    else {
        die "Was not given either a 'job' or 'command' to collect";
    }

    $self->{+SKIP} = $params{skip} if $params{skip};

    open(my $stderr, '>&', \*STDERR) or die "Could not clone STDERR";
    local $SIG{__WARN__} = sub { print $stderr @_ };

    my $start_pid = $$;
    my $exit;
    my $ok = eval { $exit = $self->launch_and_process($cb); 1 } // 0;
    my $err = $@;

    if ($cleanup && $start_pid == $$) {
        eval { $cleanup->(); 1 } or warn $@;
    }

    if (!$ok) {
        eval { $self->_die($err, no_exit => 1) };
        eval { print $stderr $err };
        eval { print STDERR "Test2 Harness Collector Error: $err" };
        exit(255);
    }

    return 0 unless $params{forward_exit};

    if ($exit->{sig}) {
        delete $SIG{$_} for grep { $SIG{$_} } keys %SIG;
        kill($exit->{sig}, $$);
        sleep 1;
        exit(255); # In case signal cannot be forwarded
    }

    exit($exit->{err} // 0);
}

sub setsid {
    POSIX::setsid() or die "Could not setsid: $!";
    my $pid = fork // die "Could not fork: $!";
    exit(0) if $pid;
}

sub collect_command {
    my $class = shift;
    my %params = @_;

    my $root_pid = $params{root_pid} or die "No root pid";
    my $io_pipes = $params{io_pipes} or die "IO pipes are required";

    my ($stdout, $stderr);
    $stdout = Atomic::Pipe->from_fh('>&=', \*STDOUT);
    $stdout->set_mixed_data_mode();

    if ($io_pipes > 1) {
        $stderr = Atomic::Pipe->from_fh('>&=', \*STDERR);
        $stderr->set_mixed_data_mode();
    }

    my $handler = sub {
        for my $e (@_) {
            $stdout->write_message(encode_json($e));
            next unless $stderr;
            my $event_id = $e->{event_id} or next;
            $stderr->write_message(qq/{"event_id":"$event_id"}/);
        }
    };

    my $parser = Test2::Harness::Collector::IOParser->new(
        job_id  => 0,
        job_try => 0,
        run_id  => 0,
        type    => $params{type} // 'unknown',
        name    => $params{name} // 'unknown',
        tag     => $params{tag}  // $params{name} // $params{type} // 'unknown',
    );

    return $class->new(
        parser   => $parser,
        job_id   => 0,
        job_try  => 0,
        run_id   => 0,
        root_pid => $root_pid,
        output   => $handler,
        command  => $params{command},
        always_flush => 1,
    );
}

sub collect_job {
    my $class = shift;
    my %params = @_;

    my $root_pid = $params{root_pid} or die "No root pid";

    my $ts  = inflate($params{test_settings}, 'Test2::Harness::TestSettings') or die "No test_settings provided";
    my $run = inflate($params{run},           'Test2::Harness::Run')          or die "No run provided";
    my $job = inflate($params{job},           'Test2::Harness::Run::Job')     or die "No job provided";

    die "No workdir provided" unless $params{workdir};
    my $tempdir = File::Temp::tempdir(DIR => $params{workdir}, TEMPLATE => "XXXXXX");
    $params{tempdir} = $tempdir;
    chmod_tmp($tempdir);

    my ($inst_ipc, $inst_con) = ipc_connect($run->instance_ipc);
    my ($agg_ipc,  $agg_con)  = ipc_connect($run->aggregator_ipc);
    my $agg_use_io            = $run->aggregator_use_io;

    my $inst_handler = sub {
        my ($e) = @_;

        my $fd = $e->{facet_data};

        my ($halt, $result);
        $halt = $fd->{control}->{details} || 'halt' if $fd->{control} && $fd->{control}->{halt};

        if (my $end = $fd->{harness_job_end}) {
            $result = {
                fail => $end->{fail},
                retry => $end->{retry},
            };
        }

        return unless $halt || $result;

        $inst_con->send_and_get(
            job_update => {
                run_id => $run->run_id,
                job_id => $job->job_id,
                result => $result,
                halt   => $halt,
            },
        );
    };

    my $child_pid;
    my $handler;
    if ($agg_con) {
        $handler = sub {
            for my $e (@_) {
                unless (eval { $agg_con->send_message({event => $e}); 1 }) {
                    my $err = $@;
                    die $err unless $err =~ m/Disconnected pipe/;

                    kill('TERM', $child_pid) if $child_pid;
                    exit(255);
                }

                $inst_handler->($e) if $inst_con;
            }
        };
    }
    elsif ($agg_use_io && $run->send_event_cb) {
        my $send_event = $run->send_event_cb;
        $handler = sub {
            for my $e (@_) {
                $send_event->($e);
                $inst_handler->($e) if $inst_con;
            }
        };
    }
    else {
        $handler = sub {
            for my $e (@_) {
                print STDOUT encode_json($e), "\n";
                $inst_handler->($e) if $inst_con;
            }
        };
    }

    if ($job->test_file->check_feature('collector_echo')) {
        my $tmpdir = $params{tempdir};
        my $file   = File::Spec->catfile($tmpdir, 'COLLECTOR-ECHO');
        $ENV{TEST2_HARNESS_COLLECTOR_ECHO_FILE} = $file;

        open(my $fh, '>', $file) or die "Could not open file $file: $!";

        my $old_handler = $handler;
        $handler = sub {
            print $fh encode_json($_), "\n" for @_;
            $fh->flush();
            $old_handler->(@_);
        };
    }

    my %create_params = (
        run_id  => $run->run_id,
        job_id  => $job->job_id,
        job_try => $job->try,
        file    => $job->test_file->file,
        name    => $job->test_file->relative,
    );

    my $auditor = Test2::Harness::Collector::Auditor::Job->new(%create_params);
    my $parser  = Test2::Harness::Collector::IOParser::Stream->new(%create_params, type => 'test');

    my $self = $class->new(
        %create_params,
        %params,
        parser   => $parser,
        auditor  => $auditor,
        output   => $handler,
        root_pid => $root_pid,
    );

    return (
        $self,
        sub {
            my $pid = shift;

            $child_pid = $pid;

            $inst_con->send_and_get(
                job_update => {
                    run_id => $run->run_id,
                    job_id => $job->job_id,
                    pid    => $pid,
                },
            );
        },
        sub {
            remove_tree($tempdir, {safe => 1, keep_root => 0}) if -d $tempdir;
        },
        $inst_ipc => $inst_con,
        $agg_ipc =>  $agg_con,
    );
}

sub event_timeout     { my $ts = shift->test_settings or return; $ts->event_timeout }
sub post_exit_timeout { my $ts = shift->test_settings or return; $ts->post_exit_timeout }

sub launch_command {
    my $self = shift;

    return ([$^X, '-e', "print \"1..0 # SKIP $self->{+SKIP}\\n\""])
        if $self->{+SKIP};

    if(my $job = $self->{+JOB}) {
        my $run = $self->{+RUN};
        my $ts  = $self->{+TEST_SETTINGS};

        my ($cmd, $env) = $job->launch_command($run, $ts);

        my $cb;
        my $dir = $ts->ch_dir;
        if($dir || $env) {
            $cb = sub {
                chdir($dir) if $dir;
                $ENV{$_} = $env->{$_} for keys %{$env //= {}};
            };
        }

        return ($cmd, $cb);
    }

    return ($self->{+COMMAND}) if $self->{+COMMAND};

    die "No command!";
}

sub launch_and_process {
    my $self = shift;
    my ($parent_cb) = @_;

    my ($cmd, $cb) = $self->launch_command;
    my $pid = start_process($cmd, sub { $self->setup_child(); $cb->() if $cb });

    set_procname(set => ['collector', $pid]);

    $parent_cb->($pid) if $parent_cb;
    return $self->process($pid);
}

sub _pre_event {
    my $self = shift;
    my (%data) = @_;

    my $peek = $data{peek};
    my $canon = !$peek || $peek eq 'peek_end';

    my @events = $self->{+PARSER}->parse_io(\%data);

    if ($canon) {
        @events = $self->{+AUDITOR}->audit(@events) if $self->{+AUDITOR};
        $self->{+OUTPUT_CB}->(@events);
    }
    else {
        for my $event (@events) {
            my $fd = $event->{facet_data};

            # Only info (stderr/stdout) should be peeked.
            next unless $fd->{info} && @{$fd->{info}};
            $event->{facet_data}->{harness}->{peek} = 1;

            my %keep = (harness => 1, info => 1, trace => 1, hubs => 1, from_tap => 1);
            delete $fd->{$_} for grep { !$keep{$_} } keys %$fd;
            $self->{+OUTPUT_CB}->($event);
        }
    }

    return;
}

sub _die {
    my $self = shift;
    my ($msg, %params) = @_;

    my @caller = caller();
    $msg .= " at $caller[1] line $caller[2].\n" unless $msg =~ m/\n$/;

    my $stamp = time;
    $self->_pre_event(
        %{$params{event_data} // {}},
        stream => 'process',
        stamp  => $stamp,
        event  => {
            facet_data => {
                %{$params{facets} // {}},
                errors => [{tag => 'ERROR', details => $msg, fail => 1}],
                trace  => {frame => \@caller, stamp => $stamp},
            },
        },
    );

    exit(255) unless $params{no_exit};
}

sub _warn {
    my $self = shift;
    my ($msg, %params) = @_;

    my @caller = caller();
    $msg .= " at $caller[1] line $caller[2].\n" unless $msg =~ m/\n$/;

    my $stamp = time;
    $self->_pre_event(
        %{$params{event_data} // {}},
        stream => 'process',
        stamp  => $stamp,
        event  => {
            facet_data => {
                %{$params{facets} // {}},
                info  => [{tag => 'WARNING', details => $msg, debug => 1}],
                trace => {frame => \@caller, stamp => $stamp}
            },
        },
    );
}

sub setup_child {
    my $self = shift;

    $self->setup_child_env_vars();
    $self->setup_child_output();
    $self->setup_child_input();
}

sub setup_child_output {
    my $self = shift;

    my $handles = $self->handles;

    swap_io(\*STDOUT, $handles->{out_w}->wh, sub { $self->_die(@_) });
    swap_io(\*STDERR, $handles->{err_w}->wh, sub { $self->_die(@_) });

    STDOUT->autoflush(1);
    STDERR->autoflush(1);

    select STDOUT;

    $ENV{T2_HARNESS_PIPE_COUNT} = $self->{+MERGE_OUTPUTS} ? 1 : 2;
    require Test2::Harness::Collector::Child;
    {
        no warnings 'once';
        $Test2::Harness::Collector::Child::STDOUT_APIPE = $handles->{out_w};
        $Test2::Harness::Collector::Child::STDERR_APIPE = $handles->{err_w} if $self->{+MERGE_OUTPUTS};
    }

    return;
}

sub setup_child_input {
    my $self = shift;

    my $run = $self->{+RUN};
    if ($run && $run->interactive) {
        my $pid = $run->interactive_pid;

        close(STDIN);
        open(STDIN, "<", "/proc/$pid/fd/0") or die "Could not connect to STDIN from pid $pid: $!";
        return;
    }

    my $ts = $self->{+TEST_SETTINGS} or return;

    if (my $in_file = $ts->input_file) {
        my $in_fh = open_file($in_file, '<');
        swap_io(\*STDIN, $in_fh, sub { $self->_die(@_) });
    }
    else {
        my $input = $ts->input // "";
        my ($fh, $file) = tempfile("input-$$-XXXX", TMPDIR => 1, UNLINK => 1);
        print $fh $input;
        close($fh);
        open($fh, '<', $file) or die "Could not open '$file' for reading: $!";
        swap_io(\*STDIN, $fh, sub { $self->_die(@_) });
    }

    return;
}

sub setup_child_env_vars {
    my $self = shift;

    my $ts = $self->{+TEST_SETTINGS} or return;

    delete $ENV{T2_HARNESS_PIPE_COUNT};

    $ENV{TEMPDIR}  = $self->tempdir;
    $ENV{TEMP_DIR} = $self->tempdir;
    $ENV{TMPDIR}   = $self->tempdir;
    $ENV{TMP_DIR}  = $self->tempdir;

    $ENV{T2_TRACE_STAMPS} = 1;

    $ENV{HARNESS_ACTIVE}       = 1;
    $ENV{TEST2_HARNESS_ACTIVE} = $VERSION;
    $ENV{T2_HARNESS_RUN_ID}    = $self->run_id;

    if (my $job = $self->{+JOB}) {
        $ENV{T2_HARNESS_JOB_ID}     = $job->job_id;
        $ENV{T2_HARNESS_JOB_IS_TRY} = @{$job->results // []};
        $ENV{T2_HARNESS_JOB_FILE}   = $job->test_file->file;
    }

    my $env = $ts->env_vars;
    {
        no warnings 'uninitialized';
        $ENV{$_} = $env->{$_} for keys %$env;
    }

    return;
}

sub close_parent_handles {
    my $self = shift;

    my $handles = $self->handles;

    delete($handles->{out_r})->close();
    delete($handles->{err_r})->close();

    1;
}

sub process {
    my $self = shift;
    my ($child_pid) = @_;

    delete($self->handles->{out_w})->close();
    delete($self->handles->{err_w})->close();

    if (my $job = $self->{+JOB}) {
        my $file   = $job->test_file;
        my $job_id = $job->job_id;

        my $stamp = time;
        $self->_pre_event(
            stream => 'process',
            stamp  => $stamp,
            event  => {
                facet_data => {
                    trace => {frame => [__PACKAGE__, __FILE__, __LINE__], stamp => $stamp},

                    harness_job => {
                        %{$job->process_info},

                        test_file     => $file->process_info,
                        test_settings => $self->{+TEST_SETTINGS},

                        # For compatibility
                        file   => $file->relative,
                        job_id => $job_id,
                    },

                    harness_job_launch => {
                        job_id => $job_id,
                        stamp  => $stamp,
                        retry  => $job->try,
                        pid    => $child_pid,
                    },

                    harness_job_start => {
                        file     => $file->file,
                        abs_file => $file->file,
                        rel_file => $file->relative,

                        stamp   => $stamp,
                        job_id  => $job_id,
                        details => "Launched " . $file->relative . " as job $job_id.",
                    },
                },
            },
        );
    }

    my $exit;
    my $ok = eval { $exit = $self->_process($child_pid); 1 };
    my $err = $@;

    if ($self->end_callback) {
        my $ok2 = eval { $self->end_callback->($self); 1};
        $err = $ok ? $@ : "$err\n$@";
        $ok &&= $ok2;
    }

    die $err unless $ok;

    return $exit;
}

sub _add_item {
    my $self = shift;
    my ($stream, $val, $peek) = @_;

    if(ref($val) && $val->{facet_data}->{control}->{encoding}) {
        require Encode;
        $self->{+ENCODING} = $val->{facet_data}->{control}->{encoding};
    }

    my $buffer = $self->{+BUFFER} //= {};
    my $seen   = $buffer->{seen}  //= {};

    push @{$buffer->{$stream}} => [time, $val, $peek];

    $self->_flush() unless keys(%$seen);

    return unless ref($val);

    my $event_id = $val->{event_id} or die "Event has no ID!";

    my $count = ++($seen->{$event_id});
    return unless $count >= ($self->{+MERGE_OUTPUTS} ? 1 : 2);

    $self->_flush(to => $event_id);
}

sub _flush {
    my $self = shift;
    my %params = @_;

    my $to = $params{to};
    my $age = $params{age};

    my $buffer = $self->{+BUFFER} //= {};
    my $seen   = $buffer->{seen}  //= {};

    for my $stream (qw/stderr stdout/) {
        while (1) {
            my $set = shift(@{$buffer->{$stream}}) or last;
            my ($stamp, $val, $peek) = @$set;

            if ($age && (time - $stamp) < $age) {
                unshift @{$buffer->{$stream}} => $set;
                last;
            }

            if (ref($val)) {
                # Send the event, unless it came via STDERR in which case it should only be a hashref with an event_id
                $self->_pre_event(stream => $stream, data => $val, stamp => $stamp)
                    unless $stream eq 'stderr';

                last if $to && $val->{event_id} eq $to;
            }
            else {
                $self->_pre_event(stream => $stream, line => $val, stamp => $stamp, peek => $peek);
            }
        }
    }
}

sub _process {
    my $self = shift;
    my ($pid) = @_;

    # Some initial signal handlers to make sure the child is killed if we die.
    my $sig_stamp;
    for my $sigtype (qw/INT TERM/) {
        my $sig = $sigtype;
        $SIG{$sig} = sub {
            $sig_stamp //= time;
            $self->_warn("$$: Got SIG${sig}, forwarding to child process $pid.\n");
            kill($sigtype, $pid);

            if (time - $sig_stamp > 5) {
                $SIG{$sig} = 'DEFAULT';
                kill($sig, $$);
            }
        };
    }

    local $SIG{PIPE} = 'IGNORE';
    $self->{+BUFFER} = {seen => {}, stderr => [], stdout => []};

    my $stdout = $self->handles->{out_r};
    my $stderr = $self->handles->{err_r};

    my $last_event = time;

    my ($exited, $exit);
    my $reap = sub {
        my ($flags) = @_;

        return -1 if $exited;
        return -1 if defined $exit;

        local ($!, $?);

        my $check = waitpid($pid, $flags);
        my $code = $?;

        return -1 if $check < 0;
        return 0 if $check == 0 && $flags == WNOHANG;

        die("waitpid returned $check, expected $pid") if $check != $pid;

        $exit = $code;
        $exited = time;
        $last_event = $exited;

        return 1;
    };

    my $auditor = $self->{+AUDITOR};
    my $ev_timeout = $self->event_timeout;
    my $pe_timeout = $self->post_exit_timeout;

    my $handles = [];
    my $broken = {};
    push @$handles => [$stdout->rh, sub { $last_event = time; $self->handle_event(stdout => $stdout, $broken) }, eof => sub { $broken->{$stdout} || $stdout->eof }, name => 'stdout'];
    push @$handles => [$stderr->rh, sub { $last_event = time; $self->handle_event(stderr => $stderr, $broken) }, eof => sub { $broken->{$stderr} || $stderr->eof }, name => 'stderr']
        unless $self->{+MERGE_OUTPUTS};

    my %timeout_warns;

    ipc_loop(
        handles   => $handles,
        sigchild  => sub { $reap->(0) },
        wait_time => sub { $sig_stamp ? 0 : 0.2 },
        signals   => sub { $sig_stamp //= time; kill($_[0], $pid) },

        iteration_start => sub {
            $self->{+STEP}->() if $self->{+STEP};
            $self->peek_event($pid, stderr => $stderr, $broken);
            $self->peek_event($pid, stdout => $stdout, $broken);
            $self->_flush(age => 0.5);
        },

        iteration_end => sub {
            my $out = 0;
            if ($self->{+INTERACTIVE} || $self->{+ALWAYS_FLUSH}) {
                $self->_flush();
            }
            else {
                # Anything that has been sitting in the buffer for more than half a second should probably get rendered
                $self->_flush(age => 0.5);
            }
            $out++ if $reap->(WNOHANG) > 0;
            return $out;
        },

        end_check => sub {
            my %params = @_;
            return 1 if $sig_stamp;

            # Wait for all output
            return 0 if $params{did_work};

            if ($self->{+ROOT_PID} && !pid_is_running($self->{+ROOT_PID})) {
                $self->_warn("Yath exited, killing process.");
                kill('TERM', $pid);
                return 1;
            }

            if (defined $exited) {
                for my $h (@$handles) {
                    my ($x, $y, %params) = @$h;

                    my $timeout;
                    if (my $delta = int(time - $last_event)) {
                        $timeout = 1 if $delta > 10;

                        unless ($timeout) {
                            if ($timeout_warns{main}) {
                                my $countdown = int(10 - $delta);
                                unless ($timeout_warns{$countdown}) {
                                    warn "  $countdown...\n";
                                    $timeout_warns{$countdown} = 1;
                                }
                            }
                            else {
                                warn "Testing looks complete, but a filehandle is still open (Did a plugin or renderer fork without an exec?), will timeout in 10 seconds...\n";
                                $timeout_warns{main} = 1;
                            }
                        }
                    }

                    return 0 unless $params{eof}->() || $timeout;
                }

                return 1 if !$auditor;
                return 1 if $auditor->has_plan;
                return 1 if $exit;                # If the exit value is not true we do not wait for post-exit timeout
                return 1 unless $pe_timeout;

                my $delta = int(time - $last_event);
                if ($delta > $pe_timeout) {

                    $self->_die(
                        "Post-exit timeout after $delta seconds. This means your test exited without a issuing plan, but STDOUT remained open, possibly in a child process. At timestamp '$last_event' the output stopped and the test has since timed out.\n",
                        facets  => {harness => {timeout => {post_exit => $delta}}},
                        no_exit => 1,
                    );

                    return 1;
                }
            }

            if ($ev_timeout) {
                my $delta = int(time - $last_event);

                if ($delta > $ev_timeout) {
                    $self->_die(
                        "Event timeout after $delta seconds. This means your test stopped producing output too long and will be terminated forcefully.\n",
                        facets  => {harness => {timeout => {events => $delta}}},
                        no_exit => 1,
                    );

                    return 1;
                }
            }

            return 0;
        },
    );

    $self->_flush();

    local $SIG{CHLD} = 'IGNORE';
    unless (defined($exit // $exited) || $reap->(WNOHANG)) {
        $self->_die("Sending 'TERM' signal to process...\n", no_exit => 1);
        my $did_kill = kill('TERM', $pid);

        my $start = time;
        while ($did_kill) {
            my $delta = time - $start;
            if ($delta > 10) {
                $self->_die("Sending 'KILL' signal to process...\n", no_exit => 1);
                last unless kill('KILL', $pid);

                $reap->(0);
                $exit   //= 255;
                $exited //= 0;
                last;
            }

            last if $reap->(WNOHANG);

            sleep(0.2);
        }
    }

    my $start_times = $self->{+START_TIMES};
    my $end_times = [times];
    my $times = [];
    while (@$start_times) {
        push @$times => shift(@$end_times) - shift(@$start_times);
    }

    # This can be undef if the test was killed by a signal the interrupts sigchild
    $exit //= 65280; # 255 << 8, the number it would have from an exit(255)

    my $ret = parse_exit($exit);

    $self->_pre_event(
        stream => 'process',
        stamp  => $exited,
        event  => {
            facet_data => {
                trace => {frame => [__PACKAGE__, __FILE__, __LINE__], stamp => $exited},

                harness_job_exit => {
                    job_id => $self->job_id,
                    exit   => $exit,
                    codes  => $ret,
                    stamp  => $exited,
                    retry  => $self->should_retry($exit),
                    times  => $times,
                },
            },
        },
    );

    return $ret;
}

sub peek_event {
    my $self = shift;
    my ($pid, $name, $fh, $broken) = @_;

    my $last_peek = $self->{+PEEKS}->{$name} // ['', 0];

    my ($type, $val) = $self->get_line_burst_or_data($name, $fh, broken => $broken, peek_line => 1);
    return unless $type;

    if ($type eq 'peek') {
        $val = $self->decode_line($val) if $self->{+ENCODING};
        return if $val =~ m/[\n\r]+$/;
        return if $val eq $last_peek->[0];

        my $inotify;
        if (USE_INOTIFY) {
            if (-e "/proc/$pid/fd/0") {
                $inotify = Linux::Inotify2->new();
                $inotify->blocking(0);
                $inotify->watch("/proc/$pid/fd/0", Linux::Inotify2::IN_ACCESS());
            }
        }

        $self->{+PEEKS}->{$name} = [$val, $inotify];
        $self->_add_item($name => $val, 'peek');
        return;
    }

    # If we get an item and it is not a peek we need to handle it.
    $self->_handle_event($name, $type, $val);

    return;
}

sub handle_event {
    my $self = shift;
    my ($name, $fh, $broken) = @_;

    my $out = 0;
    while (1) {
        my ($type, $val) = $self->get_line_burst_or_data($name, $fh, broken => $broken);
        last unless $type;

        $out .= $self->_handle_event($name, $type, $val);
    }

    return $out++;
}

sub get_line_burst_or_data {
    my $self = shift;
    my ($name, $fh, %params) = @_;

    my $broken = $params{broken};

    return if $broken && $broken->{$fh};

    my ($type, $val);
    if (eval { ($type, $val) = $fh->get_line_burst_or_data(%params); 1 }) {
        return ($type, $val);
    }

    my $err = $@ || "An error occured";

    warn $err;
    $broken->{$fh} = $err if $broken;

    return;
}

sub _handle_event {
    my $self = shift;
    my ($name, $type, $val) = @_;

    if ($type eq 'message') {
        my $decoded = decode_json($val);
        $self->_add_item($name => $decoded);
        return 1;
    }

    if ($type eq 'line') {
        $val = $self->decode_line($val) if $self->{+ENCODING};

        my $chomp = chomp($val);

        my $peek = $self->{+PEEKS}->{$name};
        if ($peek) {
            my $ref = delete $self->{+PEEKS}->{$name};
            $peek = 'peek_end';
            $val =~ s/^\Q$ref->[0]\E// if $ref->[1] && $ref->[1]->poll;
        }

        $self->_add_item($name => $val, $peek);
        return 1;
    }

    chomp($val);
    die("Invalid type '$type': $val");
}

sub decode_line {
    my $self = shift;
    my ($val) = @_;

    my $encoding = $self->{+ENCODING} or return $val;

    return Encode::decode($encoding, $val);
}

sub should_retry {
    my $self = shift;
    my ($exit) = @_;
    return 0 unless $exit;

    my $ts = $self->test_settings or return 0;
    return 0 unless $ts->allow_retry;
    return 0 unless $ts->retry;
    return 1 if $self->job_try < $ts->retry;
    return 0;
}

1;

__END__

=pod

=encoding UTF-8

=head1 NAME

Test2::Harness::Collector - FIXME

=head1 DESCRIPTION

=head1 SYNOPSIS

=head1 EXPORTS

=over 4

=back

=head1 SOURCE

The source code repository for Test2-Harness can be found at
L<http://github.com/Test-More/Test2-Harness/>.

=head1 MAINTAINERS

=over 4

=item Chad Granum E<lt>exodist@cpan.orgE<gt>

=back

=head1 AUTHORS

=over 4

=item Chad Granum E<lt>exodist@cpan.orgE<gt>

=back

=head1 COPYRIGHT

Copyright Chad Granum E<lt>exodist7@gmail.comE<gt>.

This program is free software; you can redistribute it and/or
modify it under the same terms as Perl itself.

See L<http://dev.perl.org/licenses/>

=cut


=pod

=cut POD NEEDS AUDIT



Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.