Group
Extension

DBIx-QuickORM/lib/DBIx/QuickORM/STH/Fork.pm

package DBIx::QuickORM::STH::Fork;
use strict;
use warnings;

our $VERSION = '0.000019';

use Role::Tiny::With qw/with/;
with 'DBIx::QuickORM::Role::STH';
with 'DBIx::QuickORM::Role::Async';

use Carp qw/croak/;
use Time::HiRes qw/sleep/;
use Cpanel::JSON::XS qw/decode_json/;

use IO::Select;

use DBIx::QuickORM::Util::HashBase qw{
    <connection
    <source

    only_one

    +dialect
    +ready
    <got_result
    <done
    <pid
    <pipe
    <ios
};

sub cancel_supported { 1 }

sub dialect { $_[0]->{+DIALECT} //= $_[0]->{+CONNECTION}->dialect }
sub clear   { $_[0]->{+CONNECTION}->clear_fork($_[0]) }

sub init {
    my $self = shift;

    croak "'pid' is a required attribute"         unless $self->{+PID};
    croak "'pipe' is a required attribute"        unless $self->{+PIPE};
    croak "'connection' is a required attribute"  unless $self->{+CONNECTION};
    croak "'source' is a required attribute" unless $self->{+SOURCE};
}

sub ready {
    my $self = shift;
    return 1 if $self->{+READY};

    my $ios = $self->{+IOS} //= IO::Select->new($self->{+PIPE});
    return unless $ios->can_read(0);

    return $self->{+READY} = 1;
}

sub result {
    my $self = shift;
    return $self->{+GOT_RESULT} if $self->{+GOT_RESULT};

    $self->wait unless $self->{+READY};

    my $pipe = $self->{+PIPE};
    my $line = <$pipe>;
    my $data = decode_json($line);

    unless ($data && exists $data->{result}) {
        chomp($line);
        croak "Got invalid data from pipe: $line";
    }

    return $self->{+GOT_RESULT} = $data->{result};
}

sub cancel {
    my $self = shift;

    return if $self->{+DONE};

    close(delete $self->{+PIPE}) if $self->{+PIPE};

    kill('TERM', $self->{+PID}) or die "Could not kill pid $self->{+PID}: $!\n";

    $self->clear;
    $self->{+DONE} = 1;
}

sub next {
    my $self = shift;
    my $row = $self->_next;

    if ($self->{+ONLY_ONE}) {
        croak "Expected only 1 row, but got more than one" if $self->_next;
        $self->set_done;
    }

    return $row;
}

sub _next {
    my $self = shift;

    return if $self->{+DONE};

    $self->result unless $self->{+GOT_RESULT};

    my $pipe = $self->{+PIPE};
    my $line = <$pipe>;
    if (defined $line) {
        my $row = decode_json($line);
        return $row if $row;
    }

    $self->set_done;

    return;
}

sub set_done {
    my $self = shift;

    return if $self->{+DONE};

    close(delete $self->{+PIPE}) if $self->{+PIPE};
    $self->clear;
    $self->{+DONE} = 1;
}

1;


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.