Group
Extension

Data-AnyXfer/lib/Data/AnyXfer/Elastic/Import/SpawnTask.pm

package Data::AnyXfer::Elastic::Import::SpawnTask;

use Carp;
use Moo;
use MooX::Types::MooseLike::Base qw(:all);




use Data::UUID ();
use IPC::Run3  ();
use Search::Elasticsearch;
use List::MoreUtils qw( uniq );

use Data::AnyXfer ();
use Data::AnyXfer::JSON qw( encode_json );
use Data::AnyXfer::Elastic::IndexInfo ();
use Data::AnyXfer::Elastic::Logger    ();
use Data::AnyXfer::Elastic::Importer  ();
use Data::AnyXfer::Elastic::Role::IndexInfo;
use Data::AnyXfer::Elastic::Import::DataFile;

use Data::AnyXfer::Elastic::Import::SpawnTask::Process ();

use constant PROCESS_CLASS =>
    qw(Data::AnyXfer::Elastic::Import::SpawnTask::Process);


=head1 NAME

Data::AnyXfer::Elastic::Import::SpawnTask - Play an import
in a separate background process

=head1 SYNOPSIS

    use Path::Class                               ();
    use Data::AnyXfer::Elastic                    ();
    use Data::AnyXfer::Elastic::Import::DataFile  ();
    use Data::AnyXfer::Elastic::Import::SpawnTask ();

    my $datafile = Data::AnyXfer::Elastic::Import::DataFile->new(
        file => Path::Class::file( 'my_data.datafile' )
    );

    # generate a task per client
    my @process_list;
    my $elastic = Data::AnyXfer::Elastic->new;
    foreach ($elastic->all_clients_for('public_data')) {
        my @args = (
            datafile             => $datafile,
            client               => $_,
            verbose              => 1,
            delete_before_create => 1,
        );
        push @process_list,
            Data::AnyXfer::Elastic::Import::SpawnTask->run(@args);
    }

    # wait for the tasks to complete
    foreach (@process_list) { $_->wait() }

=head1 DESCRIPTION

This module allows us to play a datafile using
L<Data::AnyXfer::Elastic::Importer>, asynchronously, using a background
process.

The background process will be spun up as a new process. You will be returned a
process object which you can inspect to track progress.

=head1 ATTRIBUTES

For internal use only.

=cut

has argsfile => (
    is       => 'ro',
    isa      => Str,
    required => 1,
);

has datafile => (
    is       => 'ro',
    isa      => InstanceOf['Data::AnyXfer::Elastic::Import::DataFile'],
    required => 1,
);

has index_info => (
    is       => 'ro',
    does     => ConsumerOf['Data::AnyXfer::Elastic::Role::IndexInfo'],
    required => 1,
);

has client => (
    is       => 'ro',
    does     => InstanceOf['Search::Elasticsearch::Role::Client::Direct'],
    required => 1,
);

has verbose => (
    is      => 'ro',
    isa     => Bool,
    default => 0,
);

has delete_before_create => (
    is      => 'ro',
    isa     => Bool,
    default => 0,
);

has bulk_max_count => (
    is      => 'ro',
    isa     => Int,
    default => Data::AnyXfer::Elastic::Importer::DEFAULT_BULK_MAX_COUNT,
);


=head1 METHODS

=cut

# INSTANCE CREATION (FROM WITHIN TASK)

sub BUILDARGS {
    my ( $class, %args ) = @_;

    my $file = Path::Class::file( $args{argsfile} );
    croak "Argument 'file' does not exist at: $file" unless -f $file;

    my $data = Data::AnyXfer::JSON::decode_json(
        $file->slurp( iomode => '<:encoding(UTF-8)' ) );

    # re-enable test flags if we are in testmode
    Data::AnyXfer->test(1) if $data->{testmode};

    return {
        argsfile => "$file",
        datafile => $class->inflate_datafile_dir(
            $data->{datafile_dir}, $data->{datafile_label}
        ),
        index_info => $class->inflate_index_info( $data->{index_info} ),
        client     => $class->inflate_client_info( $data->{client_info} ),
        delete_before_create => $data->{delete_before_create},
        bulk_max_count       => $data->{bulk_max_count},
        verbose              => $data->{verbose},
    };
}

sub BUILD {
    my $self = $_[0];

    # RUN THE IMPORT ON CREATION
    $self->do_import;
}


# TASK SPAWNING

=head2 run

    my $process = Data::AnyXfer::Elastic::Import::SpawnTask->run(
        datafile             => $datafile,
        client               => $_,
        verbose              => 1,
        delete_before_create => 1,
        debug                => 1,
    );
    do_something(); # do something in the meantime
    $process->wait(); # wait for the import to complete

This is the primary method in this module (It should be called as a class
method).

It takes a named argument list, and returns a L<Proc::Background> instance
representing the import process. The process will die once the import is
complete, so if you need to you can call C<wait> to synchronise.

=cut

sub run {
    my ( $class, %args ) = @_;

    # ensure we have a datafile
    my $datafile = $args{datafile};
    croak 'datafile argument is required to spawn an import task'
        unless $datafile;

    # ensure we have a target client
    my $client = $args{client};
    croak 'client argument is required to spawn an import task'
        unless $client;

    # we need to pass on any connection information
    my @connections
        = map { $_->uri->as_string } @{ $client->transport->cxn_pool->cxns };

    # serialise information required for task into argsfile
    # to be picked up by the new process
    my $argsfile = $class->generate_argsfile(
        {   client_info  => \@connections,
            datafile_dir => $args{datafile_dir}
                || $datafile->storage->working_dir . '',
            datafile_label => $datafile->storage->get_destination_info . '',
            index_info     => $datafile->export_index_info->as_hash,
            delete_before_create => $args{delete_before_create},
            bulk_max_count       => $args{bulk_max_count},
            verbose              => $args{verbose},
            testmode             => Data::AnyXfer->test,
        }
    );

    # spawn process passing in the information we serialised
    my $process = $class->_run_self_with_argsfile( $argsfile, \%args );
    return $process;
}


# ARGUMENT SERIALISATION

sub _run_self_with_argsfile {
    my ( $class, $argsfile, $args ) = @_;

    my $verbose = $args->{debug};

    # build the command list
    # starts the perl interpretter and creates an instance of ourselves
    # passing in the argsfile as a string

    # make sure we use the same version of perl intepretter we're
    # currently running under

    my $command = sprintf                                          #
        q/ %1$s -M%2$s -E '%2$s->new( argsfile => q!%3$s! )' /,    #
        $^X, __PACKAGE__, $argsfile;                               #

    print "Executing in background: ${command}\n" if $verbose;

    # spawn a background process running the command
    # and return a process instance
    return $class->_exec_command($command, $args);
}


sub _exec_command {
    my ($class, $command, $args) = @_;

    # add syntax to capture pid to the end of the command
    $command .= ' > /dev/null & echo $!';

    # run the command in a new process and capture the pid
    my $pid;
    IPC::Run3::run3( [ qw/bash -c/, $command ], undef, \$pid );
    $pid = ( $pid =~ /^(\d+)/ )[0];

    # return a process instance representing the command
    # running in the background
    return PROCESS_CLASS()->new( pid => $pid );
}


sub generate_argsfile {
    my ( $class, $args ) = @_;

    # find a unique file name (accounts for clashes after shortening the UID)
    my ( $tmpdir, $file, $uuid ) = Data::AnyXfer->tmp_dir();
    do {
        $uuid = substr( Data::UUID->new()->create_str, 0, 10 );
        $file = $tmpdir->file($uuid);

    } while ( -f $tmpdir->file($uuid) );

    # serialise the arguments
    my $argsjson = encode_json($args);
    $file->spew( iomode => '>:raw', $argsjson );

    # return the file location / object
    return $file;
}


# ARGUMENT INFLATION

sub inflate_datafile_dir {
    my ( $class, $dir, $datafile_label ) = @_;

    # create datafile storage using the parent process working directory
    # XXX : use read-only mode so that the storage instance knows it is
    # safe to skip cloning the working directory
    my $storage
        = Data::AnyXfer::Elastic::Import::Storage::Directory->new(
        dir              => Path::Class::dir($dir),
        destination_info => $datafile_label,
        read_only        => 1,
        );

    return Data::AnyXfer::Elastic::Import::DataFile->new(
        storage => $storage );
}

sub inflate_index_info {
    my ( $class, $info ) = @_;
    return Data::AnyXfer::Elastic::IndexInfo->new( %{$info} );
}

sub inflate_client_info {
    my ( $class, $info ) = @_;
    return Data::AnyXfer::Elastic->default->build_client( nodes => $info );
}


# ES IMPORT

sub do_import {
    my $self = $_[0];

    my $importer = Data::AnyXfer::Elastic::Importer->new(
        # other args
        delete_before_create => $self->delete_before_create,
        bulk_max_count       => $self->bulk_max_count,
        # define custom logger to allow verbose option
        logger => Data::AnyXfer::Elastic::Logger->new(
            screen => $self->verbose,
            file   => 1,
        ),
    );

    $importer->execute(
        datafile      => $self->datafile,
        elasticsearch => $self->client
    );
}


1;

=head1 COPYRIGHT

This software is copyright (c) 2019, Anthony Lucas.

This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.

=cut


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.