Group
Extension

Test2-Harness/blib/lib/Test2/Harness/IPC/Protocol/AtomicPipe/Connection.pm

package Test2::Harness::IPC::Protocol::AtomicPipe::Connection;
use strict;
use warnings;

our $VERSION = '2.000004';

use IO::Select;
use Atomic::Pipe;

use Carp qw/confess croak/;
use Errno qw/EINTR/;
use Time::HiRes qw/sleep/;
use Scalar::Util qw/weaken blessed/;

use Test2::Harness::IPC::Util qw/check_pipe ipc_warn pid_is_running/;
use Test2::Util::UUID qw/gen_uuid/;
use Test2::Harness::Util::JSON qw/encode_json/;

use Test2::Harness::Instance::Message;
use Test2::Harness::Instance::Request;
use Test2::Harness::Instance::Response;

use parent 'Test2::Harness::IPC::Connection';
use Test2::Harness::Util::HashBase qw{
    <active
    <fifo
    +pipe
    <reader
    <requests
    <peer_pid
    <deactivated
};

my %CACHE;

sub init {
    my $self = shift;

    weaken($self->{+READER});
    $self->{+PROTOCOL} //= 'Test2::Harness::IPC::Protocol::AtomicPipe';

    $self->SUPER::init();

    my $fifo = $self->{+FIFO} or croak "'fifo' is a required attribute";

    if ($CACHE{$fifo}) {
        $self->{+PIPE} = $CACHE{$fifo};
    }

    $self->{+ACTIVE} = 1;
}

sub pipe {
    my $self = shift;
    return $self->{+PIPE} if $self->{+PIPE};

    my $fifo = $self->{+FIFO} or die "No fifo?!";

    my $valid_fifo = 0;
    for (1 .. 10) {
        $valid_fifo ||= -e $fifo && -p $fifo;
        last if $valid_fifo;
        sleep 0.1;
    }

    confess "'$fifo' is not a valid fifo" unless $valid_fifo;

    my $p = Atomic::Pipe->write_fifo($fifo);
    $p->resize_or_max($p->max_size) if $p->max_size;

    $self->{+PIPE} = $p;

    $CACHE{$fifo} = $p;
    weaken($CACHE{$fifo});

    return $p;
}

sub handles_for_select { $_[0]->{+READER}->handles_for_select }
sub callback           { $_[0]->{+READER}->callback }

sub health_check {
    my $self = shift;

    return 0 unless $self->{+ACTIVE};

    my $ok = 1;
    $ok &&= check_pipe($self->pipe, $self->{+FIFO});
    $ok &&= pid_is_running($self->{+PEER_PID}) if $self->{+PEER_PID};

    return 1 if $ok;

    $self->terminate();

    return 0;
}

sub expired {
    my $self = shift;

    return 0 if $self->{+ACTIVE} && $self->health_check;
    return 1 unless keys %{$self->{+REQUESTS}};

    $self->{+DEACTIVATED} //= time;

    my $delta = time - $self->{+DEACTIVATED};

    # If we have not had a response in 60 seconds we assume the connection died.
    return 1 if $delta > 60;

    return 0;
}


sub send_message {
    my $self = shift;
    my ($msg) = @_;

    croak "Disconnected pipe" unless $self->{+ACTIVE};

    $msg = Test2::Harness::Instance::Message->new(%$msg)
        unless blessed($msg) && $msg->isa('Test2::Harness::Instance::Message');

    $msg->set_ipc_meta({return_fifo => $self->{+READER}->read_file});

    my $json = encode_json($msg);

    my $ok;
    while (1) {
        $! = 0;
        last if eval { local $SIG{PIPE} = 'IGNORE'; $self->pipe->write_message($json); 1 };
        next if $! == EINTR;
        $self->terminate();
        croak "Disconnected pipe";
    }

    return $json;
}

sub send_request {
    my $self = shift;
    my ($api_call, $args, %params) = @_;

    croak "Communication is currently one-way, (did you forget to call start() on the main pipe object?)" unless $self->{+READER} && $self->{+READER}->active;

    my $id = gen_uuid();

    my $req = Test2::Harness::Instance::Request->new(
        request_id     => $id,
        api_call       => $api_call,
        args           => $args,
        do_not_respond => $params{do_not_respond},
    );

    my $json = $self->send_message($req);

    $self->{+REQUESTS}->{$id} = undef
        unless $req->do_not_respond;

    return $params{return_request} ? $req : $id;
}

sub get_response {
    my $self = shift;
    my ($id, %params) = @_;

    croak "Invalid request id $id" unless exists $self->{+REQUESTS}->{$id};

    my $blocking = $params{blocking} //= 0;
    my $timeout  = $params{timeout};

    while (1) {
        return delete $self->{+REQUESTS}->{$id}
            if defined $self->{+REQUESTS}->{$id};

        return unless $self->{+READER} && $self->{+READER}->active;

        my $new = $self->{+READER}->_read_messages(%params);
        croak "Connection error: $!" if $new < 0;
        next if $new;

        return unless $blocking || defined($timeout);
    }
}

sub send_response {
    my $self = shift;
    my ($req, $res) = @_;

    $res = Test2::Harness::Instance::Response->new(%$req, response_id => $req->request_id)
        unless blessed($res) && $res->isa('Test2::Harness::Instance::Response');

    return $self->send_message($res);
}

sub handle_response {
    my $self = shift;
    my ($res) = @_;

    my $id = $res->response_id;

    croak "'$id' is not a valid request/response id" unless exists $self->{+REQUESTS}->{$id};
    croak "'$id' already has a response" if defined $self->{+REQUESTS}->{$id};

    $self->{+REQUESTS}->{$id} //= $res;
}

sub terminate {
    my $self = shift;

    $self->{+ACTIVE} = 0;
    $self->{+DEACTIVATED} = time;

    delete $self->{+PIPE};
    delete $self->{+READER};
}

sub TO_JSON {
    my $self = shift;

    return {
        protocol => $self->protocol,
        connect  => [$self->{+FIFO}, undef],
    };
}

1;

__END__

=pod

=encoding UTF-8

=head1 NAME

Test2::Harness::IPC::Protocol::AtomicPipe::Connection - FIXME

=head1 DESCRIPTION

=head1 SYNOPSIS

=head1 EXPORTS

=over 4

=back

=head1 SOURCE

The source code repository for Test2-Harness can be found at
L<http://github.com/Test-More/Test2-Harness/>.

=head1 MAINTAINERS

=over 4

=item Chad Granum E<lt>exodist@cpan.orgE<gt>

=back

=head1 AUTHORS

=over 4

=item Chad Granum E<lt>exodist@cpan.orgE<gt>

=back

=head1 COPYRIGHT

Copyright Chad Granum E<lt>exodist7@gmail.comE<gt>.

This program is free software; you can redistribute it and/or
modify it under the same terms as Perl itself.

See L<http://dev.perl.org/licenses/>

=cut


=pod

=cut POD NEEDS AUDIT



Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.