Group
Extension

Test2-Harness/lib/Test2/Harness/IPC/Util.pm

package Test2::Harness::IPC::Util;
use strict;
use warnings;

our $VERSION = '2.000006'; # TRIAL

use Carp qw/croak confess longmess/;
use Errno qw/ESRCH EINTR/;
use Config qw/%Config/;
use Time::HiRes qw/time sleep/;
use Scalar::Util qw/blessed/;
use Data::Dumper qw/Dumper/;

use POSIX();
use IO::Select();

use Test2::Harness::Util::JSON qw/encode_pretty_json encode_json encode_json_file/;
use Test2::Harness::Util qw/mod2file parse_exit/;

use Importer Importer => 'import';

our @EXPORT_OK = qw{
    USE_P_GROUPS
    swap_io
    pid_is_running
    check_pipe
    ipc_warn
    ipc_connect
    ipc_loop

    inflate

    set_procname

    start_process
    start_collected_process
};

BEGIN {
    if ($Config{'d_setpgrp'}) {
        *USE_P_GROUPS = sub() { 1 };
    }
    else {
        *USE_P_GROUPS = sub() { 0 };
    }
}

sub set_procname {
    my %params = @_;

    my $prefix = $params{prefix} // $ENV{T2_HARNESS_PROC_PREFIX} // 'Test2-Harness';
    my $append = $params{append} // [];
    my $set    = $params{set}    // [];

    $append = [$append] unless ref($append);
    $set    = [$set]    unless ref($set);

    my $name = join('-', (@$set ? @$set : $0), @$append);

    $name = "${prefix}-${name}" unless $name =~ m/^\Q$prefix\E-/;

    $0 = $name;
}

sub inflate {
    my ($ref, $fallback_class) = @_;

    return unless $ref;
    return $ref if blessed($ref);

    my $class = $ref->{class} // $fallback_class // croak "No class to inflate, provide a 'class' key or as second argument to inflate()";
    require(mod2file($class));

    $_[0] = $class->new($ref);
}

sub pid_is_running {
    my ($pid) = @_;

    confess "A pid is required" unless $pid;

    local $!;

    return 1 if kill(0, $pid); # Running and we have perms
    return 0 if $! == ESRCH;   # Does not exist (not running)
    return -1;                 # Running, but not ours
}

sub swap_io {
    my ($fh, $to, $die, $mode) = @_;

    $die ||= sub {
        my @caller = caller;
        my @caller2 = caller(1);
        die("$_[0] at $caller[1] line $caller[2] ($caller2[1] line $caller2[2], ${ \__FILE__ } line ${ \__LINE__ }).\n");
    };

    my $orig_fd;
    if (ref($fh) eq 'ARRAY') {
        ($orig_fd, $fh) = @$fh;
    }
    else {
        $orig_fd = fileno($fh);
    }

    $die->("Could not get original fd ($fh)") unless defined $orig_fd;

    if (ref($to)) {
        $mode //= $orig_fd ? '>&' : '<&';
        open($fh, $mode, $to) or $die->("Could not redirect output: $!");
    }
    else {
        $mode //= $orig_fd ? '>' : '<';
        open($fh, $mode, $to) or $die->("Could not redirect output to '$to': $!");
    }

    return if fileno($fh) == $orig_fd;

    $die->("New handle does not have the desired fd!");
}

sub start_collected_process {
    my %params = @_;

    my $post_fork = delete $params{post_fork};

    my %seen;
    my $pid = start_process(
        [
            $^X,                                                             # Call current perl
            (map { ("-I$_") } grep { -d $_ && !$seen{$_}++ } @INC),          # Use the dev libs specified
            '-mTest2::Harness::Collector',                                   # Load Collector
            '-e' => 'exit(Test2::Harness::Collector->collect($ARGV[0]))',    # Run it.
            encode_json_file(\%params),                                      # json data for what to do
        ],
        $post_fork,
    );

    return $pid unless $params{setsid};

    local $? = 0;
    my $check = waitpid($pid, 0);
    my $exit  = $?;
    if ($exit || $check != $pid) {
        my $x = parse_exit($exit);
        die "Collector failed ($check vs $pid) (Exit code: $x->{err}, Signal: $x->{sig})";
    }

    return;
}

sub start_process {
    my ($cmd, $post_fork) = @_;

    confess "cmd is required, and must be populated" unless $cmd && @$cmd;
    confess "cmd may not contain undefined values: " . Dumper($cmd) if grep { !defined($_)} @$cmd;

    my $pid = fork // die "Could not fork: $!";
    return $pid if $pid;

    $post_fork->() if $post_fork;

    no warnings;
    my $ok = eval { exec(@$cmd); 1 };
    my $err = $@;
    print STDERR longmess("Failed to exec " . join(' ', @$cmd) . " . ($!) $@\n");
    POSIX::_exit(255);
}

sub check_pipe {
    my ($pipe, $file) = @_;

    if ($file) {
        return 0 unless -e $file;
        return 0 unless -p $file;
    }

    return 0 unless $pipe;

    my @h;
    if (blessed($pipe) && $pipe->isa('Atomic::Pipe')) {
        for my $type (qw/rh wh/) {
            my $h = $pipe->$type or next;
            push @h => $h;
        }
    }
    else {
        push @h => $pipe;
    }

    return 0 unless @h;
    for my $h (@h) {
        return 0 unless $h->opened;
        return 0 unless -p $h;
    }

    return 1;
}

sub ipc_connect {
    my ($ipc_data) = @_;

    return unless $ipc_data;

    require Test2::Harness::IPC::Protocol;
    my $ipc = Test2::Harness::IPC::Protocol->new(protocol => $ipc_data->{protocol});
    my $con = $ipc->connect(@{$ipc_data->{connect}});

    return ($ipc, $con);
}

sub ipc_loop {
    my %params = @_;

    my $caller = [caller];
    my $trace = "$caller->[1] line $caller->[2]";

    my $ipcs    = $params{ipcs};
    my $handles = $params{handles};

    croak "'ipcs' or 'handles' or 'apipes' required" unless $ipcs || $handles;

    $ipcs    //= [];
    $handles //= [];

    my $wait_time = $params{wait_time} // 0.2;

    my $iteration_start = $params{iteration_start};
    my $iteration_end   = $params{iteration_end};
    my $end_check       = $params{end_check};

    my $handle_request = $params{handle_request} // sub { ipc_warn(request => $_[0], error => "Got a request, loop does not handle requests at $trace.\n") };
    my $handle_message = $params{handle_message} // sub { ipc_warn(message => $_[0], error => "Got a message, loop does not handle messages at $trace.\n") };

    my $debug = $params{debug};

    my ($int, $term);
    if (my $signal = $params{signals} // $params{quiet_signals}) {
        my $sig_cnt = 0;
        $int = sub {
            $signal->('INT');
            $sig_cnt++;

            if ($sig_cnt >= 5) {
                die "$0: Got $sig_cnt signals, shutting down more forcefully...\n";
            }

            unless ($params{quiet_signals}) {
                print "\n";
                warn "$0: Cought SIGINT, shutting down... (press control+c " . (5 - $sig_cnt) ." more time(s) to be more forceful)\n";
            }
        };

        $term = sub {
            $signal->('TERM');
            $sig_cnt++;

            die "$0: Got SIGTERM, shutting down...\n";
        };
    }

    local $SIG{TERM} = $term if $term;
    local $SIG{INT}  = $int  if $int;

    my $ipc_map;
    my $ios;
    my $reset_ios = sub {
        $ios = IO::Select->new();

        $ipc_map = {};

        for my $set (@$handles) {
            my ($h) = @$set;
            $ios->add($h);
            $ipc_map->{$h} = $set;
            $ipc_map->{$set} = $set;
        }

        for my $ipc (@$ipcs) {
            for my $h ($ipc->handles_for_select) {
                $ios->add($h);
                $ipc_map->{$h}   = $ipc;
                $ipc_map->{$ipc} = $ipc;
            }
        }
    };
    $reset_ios->();

    # This is used to interrupt a select below. The var and ++ are to prevent the sub from being optimized away
    my $sigchild = 0;
    local $SIG{CHLD} = $params{sigchild} // sub { $sigchild++ };

    my $last_ipc_count = 1;
    my $last_health_check = 0;
    my $did_work = 1;

    my $wait_time_is_code = ref($wait_time) eq 'CODE' ? 1 : 0;
    my $get_wait_time = sub {
        my $ready = shift;
        return 0 if $did_work || @$ready;
        return $wait_time unless $wait_time_is_code;
        return $wait_time->();
    };

    IPC_LOOP: while (1) {
        sleep(0.2) if $debug;
        print "$$ $0 - LOOP ($caller->[1] line $caller->[2]): " . sprintf('%-02.4f', time) . "\n" if $debug;

        $did_work++ if $iteration_start && $iteration_start->();

        if (time - $last_health_check > 4) {
            print "Health Check\n" if $debug;
            $last_ipc_count = 0;

            for my $ipc (@$ipcs) {
                next unless $ipc->active;
                $ipc->health_check;
                $last_ipc_count++ if $ipc->active;
            }

            for my $set (@$handles) {
                my ($h, $cb, %params) = @$set;
                my $eof = $params{eof} // sub { eof($h) };
                if ($eof->()) {
                    print "EOF\n" if $debug;
                }
                else {
                    $last_ipc_count++;
                }
            }

            $last_health_check = time;
            print "New IPC Count: $last_ipc_count\n" if $debug;
        }

        # Some handles may already have messages read, which means can_read()
        # might skip these.
        my @ready;
        push @ready => grep { $_->have_requests || $_->have_messages } @$ipcs;
        push @ready => grep { my ($h, $cb, %p) = @{$_}; $p{ready} ? $p{ready}->() : 0 } @$handles;

        while (1) {
            $! = 0;

            # Add any handles that have things to read.
            push @ready => $ios->can_read($get_wait_time->(\@ready));
            last if @ready || $! == 0;

            # If the system call was interrupted it could mean a child process
            # exited, or similar. Just break the loop so we can advance.
            last if $! == EINTR;

            warn((0 + $!) . ": $!");

            $reset_ios->();
            last unless keys %$ipc_map;
        }

        print "Ready: " . @ready . "\n" if $debug;

        $did_work = 0;

        my %seen;
        for my $h (@ready) {
            my $ipc = $ipc_map->{$h} or next;
            next if $seen{$ipc}++;

            if(ref($ipc) eq 'ARRAY') {
                $did_work++ if $ipc->[1]->();
                next;
            }

            while (my $msg = $ipc->get_message) {
                $did_work++;
                $handle_message->($msg);
            }

            while (my $req = $ipc->get_request) {
                $did_work++;

                print "Request ($trace):  " . encode_pretty_json($req) . "\n" if $debug;
                my $res = $handle_request->($req);

                next if $req->do_not_respond;

                print "Response ($trace): " . encode_pretty_json($res) . "\n" if $debug;
                eval { $ipc->send_response($req, $res); 1 } or ipc_warn(ipc_class => ref($ipc), error => $@, request => $req, response => $res);
            }
        }

        $did_work++ if $iteration_end && $iteration_end->();
        print "Did Work: $did_work\n" if $debug;

        # No IPC means nothing to do
        print "IPC Map: " . (keys %$ipc_map) . "\n" if $debug;
        last unless keys %$ipc_map;
        print "IPC Count: $last_ipc_count\n" if $debug;
        last unless $last_ipc_count;

        if ($end_check && $end_check->(did_work => $did_work)) {
            print "End Check TRUE\n" if $debug;
            last;
        }
    }
}

sub ipc_warn {
    my %params;
    if (@_ == 1) {
        %params = %{$_[0]} if defined($_[0]);
    }
    else {
        %params = @_;
    }

    my @caller = caller;

    my $fields = "";

    my %seen;
    for my $see_field ((grep {exists $params{$_}} 'error', 'request', 'response'), keys %params) {
        my $field = $see_field;
        $field =~ s/_json$//;
        next if $seen{$field}++;

        my $value = $params{$field} // $params{"${field}_json"} // '<UNDEFINED>';
        eval { $value = encode_pretty_json($value) } or warn $@ if ref $value;
        chomp($value);

        my $title = ucfirst($field);
        $fields .= "    ==== Start $title ====\n" . $value . "\n    ==== End $title ====\n";
    }

    warn <<"    EOT1" . $fields . <<"    EOT2";

*******************************************************************************
!!                     Unable to handle IPC transaction                      !!
*******************************************************************************
File: $caller[1]
Line: $caller[2]
    EOT1
*******************************************************************************

    EOT2
}

1;

__END__

=pod

=encoding UTF-8

=head1 NAME

Test2::Harness::IPC::Util - FIXME

=head1 DESCRIPTION

=head1 SYNOPSIS

=head1 EXPORTS

=over 4

=back

=head1 SOURCE

The source code repository for Test2-Harness can be found at
L<http://github.com/Test-More/Test2-Harness/>.

=head1 MAINTAINERS

=over 4

=item Chad Granum E<lt>exodist@cpan.orgE<gt>

=back

=head1 AUTHORS

=over 4

=item Chad Granum E<lt>exodist@cpan.orgE<gt>

=back

=head1 COPYRIGHT

Copyright Chad Granum E<lt>exodist7@gmail.comE<gt>.

This program is free software; you can redistribute it and/or
modify it under the same terms as Perl itself.

See L<http://dev.perl.org/licenses/>

=cut


=pod

=cut POD NEEDS AUDIT



Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.