Group
Extension

Test2-Harness/blib/lib/Test2/Harness/IPC/Protocol/AtomicPipe.pm

package Test2::Harness::IPC::Protocol::AtomicPipe;
use strict;
use warnings;

our $VERSION = '2.000004';

use Test2::Harness::IPC::Protocol::AtomicPipe::Connection;

use Atomic::Pipe;

use Carp qw/croak confess/;
use Errno qw/EINTR/;
use POSIX qw/mkfifo/;
use Scalar::Util qw/blessed/;
use Test2::Harness::Util qw/mod2file/;
use Test2::Harness::IPC::Util qw/check_pipe ipc_warn pid_is_running/;
use Test2::Harness::Util::JSON qw/decode_json/;

use Test2::Harness::Instance::Message;
use Test2::Harness::Instance::Request;
use Test2::Harness::Instance::Response;

use parent 'Test2::Harness::IPC::Protocol';
use Test2::Harness::Util::HashBase qw{
    <active
    <read_file
    <read_pipe

    <messages
    <requests

    <peer_pid
    <my_pid

    +connections

    <wait_time
};

sub callback {
    my $self = shift;

    croak "Inactive pipe" unless $self->{+ACTIVE};

    return {
        protocol => $self->protocol,
        connect  => [$self->{+READ_FILE}, undef],
    };
}

sub get_address {
    my $class = shift;
    my ($file) = @_;
    return $file;
}

sub init {
    my $self = shift;

    $self->SUPER::init();

    $self->{+WAIT_TIME} //= 0.2;

    $self->{+ACTIVE} = 0;
}

sub connections {
    my $self = shift;
    return values %{$self->{+CONNECTIONS} // {}};
}

sub handles_for_select {
    my $self = shift;
    $self->health_check();
    return unless $self->{+ACTIVE};
    return ($self->{+READ_PIPE}->rh);
}

sub health_check {
    my $self = shift;

    for my $con (values %{$self->{+CONNECTIONS}}) {
        delete $self->{+CONNECTIONS}->{$con->fifo} if $con->expired;
    }

    my $ok = $self->{+ACTIVE};
    $ok &&= check_pipe($self->{+READ_PIPE});
    $ok &&= pid_is_running($self->{+PEER_PID}) if $self->{+PEER_PID};
    return 1 if $ok;

    $self->terminate();

    return 0;
}

sub start {
    my $self = shift;
    my ($file) = @_;

    croak "Pipe is already active" if $self->{+ACTIVE};

    croak "'file' is a required argument" unless $file;

    unless (-p $file) {
        mkfifo($file, 0700) or die "Failed to make fifo '$file': $!";
    }

    my $p = Atomic::Pipe->read_fifo($file);
    $p->blocking(0);
    $p->resize_or_max($p->max_size) if $p->max_size;

    $self->{+READ_FILE}   = $file;
    $self->{+READ_PIPE}   = $p;
    $self->{+MESSAGES}    = [];
    $self->{+REQUESTS}    = [];
    $self->{+CONNECTIONS} = {};
    $self->{+MY_PID}      = $$;
    $self->{+ACTIVE}      = 1;
}

sub connect {
    my $self = shift;
    my ($fifo, $port, %options) = @_;

    $port //= $$;

    $options{auto_start} //= 1;

    if($options{auto_start} && !$self->{+ACTIVE}) {
        my $listen = "${fifo}.${port}";
        croak "File '$listen' already exists " if -e $listen;
        $self->start($listen);
    }

    my $con = Test2::Harness::IPC::Protocol::AtomicPipe::Connection->new(fifo => $fifo, reader => $self, protocol => $self->protocol);
    $self->{+CONNECTIONS}->{$fifo} = $con;

    return $con;
}

sub send_message {
    my $self = shift;
    my ($msg) = @_;

    $msg = Test2::Harness::Instance::Message->new(%$msg)
        unless blessed($msg) && $msg->isa('Test2::Harness::Instance::Message');

    for my $con (values %{$self->{+CONNECTIONS}}) {
        next if eval { $con->send_message($msg); 1 };
        ipc_warn(error => $@);
        delete $self->{+CONNECTIONS}->{$con->fifo} if $con->expired;
    }

    return;
}

sub have_messages { 0 + @{$_[0]->{+MESSAGES}} }
sub get_message {
    my $self = shift;
    my %params = @_;

    my $blocking = $params{blocking} //= 0;
    my $timeout  = $params{timeout};

    while (1) {
        return shift(@{$self->{+MESSAGES}}) if @{$self->{+MESSAGES}};
        my $count = $self->_read_messages(%params);
        return if $count < 0;
        next if $count;
        return unless $blocking && !$timeout;
    }
}

sub have_requests { 0 + @{$_[0]->{+REQUESTS}} }
sub get_request {
    my $self = shift;
    my %params = @_;

    my $blocking = $params{blocking} //= 0;

    while (1) {
        while (1) {
            last if @{$self->{+REQUESTS}};
            my $count = $self->_read_messages(%params);
            return if $count < 0;
            next if $count;
            return unless $blocking;
        }

        my $req = shift @{$self->{+REQUESTS}};

        unless ($req->do_not_respond) {
            my $con;
            unless (eval { $con = $self->connection_from_message($req); 1 }) {
                ipc_warn(error => $@, request => $req);
                return unless $blocking;
            }

            $req->set_connection($con);
        }

        return $req;
    }
}

sub send_response {
    my $self = shift;
    my ($req, $res) = @_;

    my $con = $self->connection_from_message($req);
    return $con->send_response($req, $res);
}

sub _read_messages {
    my $self = shift;
    my %params = @_;

    confess "Called from wrong process!" unless $$ == $self->{+MY_PID};

    # Do not default timeout to wait_time, they are different thing
    my $timeout  = $params{timeout};
    my $blocking = $params{blocking} // 0;

    return $self->__read_messages() unless $blocking;

    my $ios;
    while (1) {
        return -1 unless $self->health_check;

        my $count = $self->__read_messages();
        return $count if $count;

        $ios //= IO::Select->new([$self->handles_for_select]);

        $! = 0;
        my @h = $ios->can_read($timeout // $self->{+WAIT_TIME});
        next if @h;
        next if $! == EINTR;
        return -1 if $!;
        return 0 if $timeout;
    }
}

sub __read_messages {
    my $self = shift;

    return -1 unless $self->{+ACTIVE};

    my $count = 0;

    while (1) {
        $! = 0;
        my $json = $self->{+READ_PIPE}->read_message;
        next if !$json && $! == EINTR;
        last unless $json;

        my $msg;
        unless (eval { $msg = decode_json($json); 1 }) {
            ipc_warn(error => $@, input_json => $json, input => $msg);
            next;
        }

        $count++;

        if (my $class = $msg->{class}) {
            require(mod2file($class));
            $msg = $class->new($msg);
        }
        else {
            ipc_warn(input => $msg, error => 'No class found for message');
            next;
        }

        my $ipc_meta = $msg->ipc_meta;

        if ($msg->isa('Test2::Harness::Instance::Response')) {
            my $con = $self->connection_from_meta($ipc_meta);
            $con->handle_response($msg);
        }
        elsif ($msg->isa('Test2::Harness::Instance::Request')) {
            push @{$self->{+REQUESTS}} => $msg;
        }
        else {
            if ($msg->terminate && $ipc_meta && $ipc_meta->{return_fifo}) {
                my $fifo = $ipc_meta->{return_fifo};
                delete $self->{+CONNECTIONS}->{$fifo};
            }

            push @{$self->{+MESSAGES}} => $msg;
        }
    }

    $! = 0;
    return $count;
}

sub connection_from_message {
    my $self = shift;
    my ($msg) = @_;

    my $ipc_meta = $msg->ipc_meta or confess "Message did not provide 'return_fifo'";

    return $self->connection_from_meta($ipc_meta);
}

sub connection_from_meta {
    my $self = shift;
    my ($meta) = @_;

    my $fifo = $meta->{return_fifo} or confess "Message did not provide 'return_fifo'";
    return $self->{+CONNECTIONS}->{$fifo} //= Test2::Harness::IPC::Protocol::AtomicPipe::Connection->new(fifo => $fifo, reader => $self);
}

sub refuse_new_connections {
    my $self = shift;

    return unless $$ == $self->{+MY_PID};

    unlink($self->{+READ_FILE}) if -e $self->{+READ_FILE};
}

sub terminate {
    my $self = shift;

    return if $self->{+MY_PID} && $$ != $self->{+MY_PID};

    return unless $self->{+ACTIVE};
    unlink($self->{+READ_FILE}) if -e $self->{+READ_FILE};

    $self->{+ACTIVE} = 0;
}

sub TO_JSON { shift->callback }

1;

__END__

=pod

=encoding UTF-8

=head1 NAME

Test2::Harness::IPC::Protocol::AtomicPipe - FIXME

=head1 DESCRIPTION

=head1 SYNOPSIS

=head1 EXPORTS

=over 4

=back

=head1 SOURCE

The source code repository for Test2-Harness can be found at
L<http://github.com/Test-More/Test2-Harness/>.

=head1 MAINTAINERS

=over 4

=item Chad Granum E<lt>exodist@cpan.orgE<gt>

=back

=head1 AUTHORS

=over 4

=item Chad Granum E<lt>exodist@cpan.orgE<gt>

=back

=head1 COPYRIGHT

Copyright Chad Granum E<lt>exodist7@gmail.comE<gt>.

This program is free software; you can redistribute it and/or
modify it under the same terms as Perl itself.

See L<http://dev.perl.org/licenses/>

=cut


=pod

=cut POD NEEDS AUDIT



Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.