Group
Extension

Mojo-Run/lib/Mojo/Run.pm

package Mojo::Run;

use Mojo::Base -base;

use bytes;
use Carp;
use Errno;
use Socket;
use Time::HiRes qw(time gettimeofday);
use Scalar::Util qw(blessed);
use Storable qw(thaw nfreeze);
use POSIX ":sys_wait_h";
use Mojo::Log;
use Mojo::IOLoop;
use Data::Dumper;
use Mojo::Reactor;
has 'num_forks'  => sub { 0 };
has 'max_forks'  => sub { 0 };
has 'log'        => sub { Mojo::Log   ->new };
has 'ioloop'     => sub { Mojo::IOLoop->new };
has [qw/reactor error is_child/];

our $VERSION = '0.3';

my $_obj  = undef;

BEGIN {
	*portable_pipe = sub () { my ($r, $w);
		pipe $r, $w or return;
		
		($r, $w);
	};
	*portable_socketpair = sub () {
		socketpair my $fh1, my $fh2, Socket::AF_UNIX(), Socket::SOCK_STREAM(), PF_UNSPEC
			or return;
		$fh1->autoflush(1);
		$fh2->autoflush(1);
		
		($fh1, $fh2)
	};	
}

sub new { __PACKAGE__->singleton }

sub singleton {
	return $_obj if defined $_obj;
	return $_obj = __PACKAGE__->_constructor;
}

sub _constructor {
	my $proto = shift;
	my $class = ref($proto) || $proto;
	my $self  = $class->SUPER::new;

	bless $self => $class;
	
	# install SIGCHLD handler
	$SIG{'CHLD'} = sub { _sig_chld($self, @_) };
	
	return $self;
}

sub log_level {
	my ($self, $level) = @_;
	
	$self->log->level($level) if defined $level;
	
	return $self->log->level;
}

sub spawn {
	my ($self, %opt) = @_;
	
	unless (defined $self && blessed($self) && $self->isa(__PACKAGE__)) {
		my $obj = __PACKAGE__->new;
		return $obj->spawn(%opt);
	}
	
	$self->error('');
	
	if ($self->max_forks > 0 && $self->num_forks >= $self->max_forks) {
		$self->error("Unable to spawn another subprocess: "
			."Limit of " . $self->max_forks . " concurrently spawned process(es) is reached."
		);
		return 0;
	}
	
	# normalize and validate run parameters...
	my $proc = $self->_getRunStruct(\%opt);
	return 0 unless $self->_validateRunStruct($proc);
	
	$self->log->debug("Spawning command "
		."timeout: "
		.($proc->{exec_timeout} > 0 ? sprintf("%-.3f seconds]", $proc->{exec_timeout}) : "none")
		." : [$proc->{cmd}]"
	);
	my ($stdout_p, $stdout_c) = portable_socketpair;
	my ($stderr_p, $stderr_c) = portable_socketpair;
	my ($stdres_p, $stdres_c) = portable_socketpair;
	
	$proc->{time_started} = time;
	$proc->{running     } = 1;
	$proc->{hdr_stdout  } = $stdout_c;
	$proc->{hdr_stderr  } = $stderr_c;
	$proc->{hdr_stdres  } = $stdres_c;
	
	my $pid = fork;
	
	if ($pid) {
		# parent
		$self->num_forks($self->num_forks + 1);
		
		$self->log->debug("Subprocess spawned as pid $pid.");
		
		$proc->{pid} = $pid;
		
		# exec timeout
		if (defined $proc->{exec_timeout} && $proc->{exec_timeout} > 0) {
			$self->log->debug(
				"[process $pid]: Setting execution timeout to " .
				sprintf("%-.3f seconds.", $proc->{exec_timeout})
			);
			my $timer = $self->ioloop->timer(
				$proc->{exec_timeout},
				sub { _timeout_cb($self, $pid) }
			);
	
			# save timer
			$proc->{id_timeout} = $timer;
		}
		
		$self->{_data}->{$pid} = $proc;
		
		close $stdout_p;
		close $stderr_p;
		close $stdres_p;
		
		$self->watch('stdout', $pid);
		$self->watch('stderr', $pid);
		$self->watch('stdres', $pid);
	} else {
		# child
		
		$self->is_child(1);
		
		close $stdout_c;
		close $stderr_c;
		close $stdres_c;
		
		# Stdio should not be tied.
		if (tied *STDOUT) {
			carp "Cannot redirect into tied STDOUT.  Untying it";
			untie *STDOUT;
		}
		if (tied *STDERR) {
			carp "Cannot redirect into tied STDERR.  Untying it";
			untie *STDERR;
		}
		
		# Redirect STDOUT
		open STDOUT, ">&" . fileno($stdout_p)
			or croak "can't redirect stdout in child pid $$: $!";
		# Redirect STDERR
		open STDERR, ">&" . fileno($stderr_p)
			or croak "can't redirect stderr in child pid $$: $!";

		select STDERR; $| = 1;
		select STDOUT; $| = 1;
		
		if (ref $proc->{cmd} eq 'CODE') {
			my @rv = eval { $proc->{cmd}->($$, $proc->{param}); };
			
			if ($@) {
				carp "exec of coderef failed: $@\n";
				exit 255;
			}
			
			print $stdres_p nfreeze(\ @rv);
			
		} else {
			exec(ref $proc->{cmd} eq 'ARRAY' ? @{ $proc->{cmd} } : $proc->{cmd}) or do {
				carp "exec failed";
				exit 255;
			};
		}
		
		close $stdout_p;
		close $stderr_p;
		close $stdres_p;
		
		exit 1;
	}
	
	return $pid;
}

sub start { shift->ioloop->start }

sub watch {
	my $self = shift;
	my $io   = lc(shift || '');
	my $pid  = shift;
	
	my $proc = $self->get_proc($pid);
	
	$self->log->error('Cant start IO watcher off NULL process'           ) and return unless $proc;
	$self->log->error("[process $proc->{pid}]: IO ($io) is unsupported"  ) and return unless $io ~~ [qw/stdout stderr stdres/];
	$self->log->error("[process $proc->{pid}]: IO handler ($io) is EMPTY") and return unless $proc->{"hdr_$io"};
	
	my $id = fileno $proc->{"hdr_$io"};
	
	$self->ioloop->reactor->io($proc->{"hdr_$io"}, sub {
		my $chunk = undef;
		my $len   = sysread $proc->{"hdr_$io"}, $chunk, 65536;
		
		return unless defined $len or $! != Errno::EINTR;
		
		if (!$len) {
			$self->drop_handle($pid, $io);
			return;
		}
		
		if (defined $proc->{"$io\_cb"}) {
			$self->log->debug("[process $proc->{pid}]: (handle: $id) Invoking ".uc($io)." callback.");
			
			eval { $proc->{"$io\_cb"}->($proc->{pid}, $chunk) };
			
			if ($@) {
				$self->log->error("[process $proc->{pid}]: (handle: $id) Exception in $io\_cb: $@");
			}
		} else {
			# append to buffer
			$self->log->debug("[process $proc->{pid}]: (handle: $id) Appending $len bytes to ".uc($io)." buffer.");
			$proc->{"buf_$io"} .= $chunk;
		}
	})->watch($proc->{"hdr_$io"}, 1, 0);
}

sub drop_handle {
	my $self = shift;
	my $pid  = shift;
	my $io   = lc(shift || '');
	
	my $proc = $self->get_proc($pid);
	return unless $proc;
	
	$self->log->debug("[process $pid]: Got HUP for unmanaged handle ".$proc->{"hdr_$io"}."; ignoring.") and return
		unless $proc->{"hdr_$io"};
	
	
	$self->ioloop->remove( $proc->{"hdr_$io"} );
	undef $proc->{"hdr_$io"};
	
	$self->log->debug("[process $pid]: ".uc($io)." closed.");
	
	$self->complete($pid);
}

sub get_proc {
	my ($self, $pid) = @_;
	
	no warnings;
	my $err = "[process $pid]: Unable to get process data structure: ";
	
	unless (defined $pid) {
		$self->error($err . "Undefined pid.");
		return undef;
	}
	
	unless (
		exists $self->{_data}->{$pid}
		&& defined $self->{_data}->{$pid}
	) {
		$self->error($err . "Non-managed process pid: $pid");
		return undef;
	}

	return $self->{_data}->{$pid};
}

sub cleanup {
	my ($self, $pid, $exit_val, $signum, $core) = @_;
	
	my $proc = $self->get_proc($pid);
	unless (defined $proc) {
		no warnings;
		$self->log->warn("Untracked process pid $pid exited with exit status $exit_val by signal $signum, core: $core.");
		return 0;
	}
	return 0 if $proc->{cleanup};
	
	$proc->{cleanup} = 1;

	$self->log->debug("[process $pid]: Got SIGCHLD, "
		. "exited with exit status: $exit_val by signal $signum"
		. (($core) ? "with core dump." : ".")
	);

	if (defined $proc->{id_timeout}) {
		$self->ioloop->remove($proc->{id_timeout});
		$proc->{id_timeout} = undef;
	}
	if ($proc->{hard_kill}) {
		for (qw/stderr stdout stdres/) {
			$self->drop_handle($pid, $_) if $proc->{"hdr_$_"};
		}
	}
	$proc->{exit_status} = $exit_val;
	$proc->{exit_core  } = $core;
	$proc->{exit_signal} = $signum;

	# command timings...
	my $te = time;
	$proc->{time_stopped      } = $te;
	$proc->{time_duration_exec} = $te - $proc->{time_started};

	# this process is no longer running
	$proc->{running} = 0;

	$self->complete($pid);
}

sub complete {
	my ($self, $pid, $force) = @_;
	
	my $proc = $self->get_proc($pid);
	
	return 0 if !$force
		&& (
			$proc->{running}
			|| defined $proc->{hdr_stdout}
			|| defined $proc->{hdr_stdres}
			|| defined $proc->{hdr_stderr}
		);
	

	if ($proc && %$proc) {
		$self->log->debug("[process $pid]: All streams closed, process execution complete.");
		
		$proc->{time_duration_total} = time - $proc->{time_started};

		# fire exit callback!
		if (defined $proc->{exit_cb} && ref $proc->{exit_cb} eq 'CODE') {
			my $result = eval { $proc->{buf_stdres} ? thaw($proc->{buf_stdres}) : undef};
			
			if ($@) {
				croak "Error de-serializing subprocess data: $@";
			}
			
			# prepare callback structure
			my $cb_d = {
				cmd => ref $proc->{cmd} eq 'CODE'  ? 'CODE'                     :
					   ref $proc->{cmd} eq 'ARRAY' ? join(' ', @{$proc->{cmd}}) :
					   $proc->{cmd}
				,
				param               => $proc->{param},
				exit_status         => $proc->{exit_status},
				exit_signal         => $proc->{exit_signal},
				exit_core           => $proc->{exit_core},
				stdout              => $proc->{buf_stdout},
				stderr              => ($proc->{buf_stderr} ? $proc->{buf_stderr} : '').($proc->{stderr} ? $proc->{stderr} : ''),
				result              => $result,
				time_started        => $proc->{time_started},
				time_stopped        => $proc->{time_stopped},
				time_duration_exec  => $proc->{time_duration_exec},
				time_duration_total => $proc->{time_duration_total},
			};

			# safely invoke callback
			$self->log->debug("[process $pid]: invoking exit_cb callback.");
			eval { $proc->{exit_cb}->($pid, $cb_d); };
			
			$self->log->error("[process $pid]: Error running exit_cb: $@") if $@;
		} else {
			$self->log->error("[process $pid]: No exit_cb callback!");
		}
	}

	delete $self->{_data}->{$pid};
	$self->num_forks($self->num_forks - 1);
}

sub _sig_chld {
	my ($self) = @_;

	no strict 'subs';
	
	my $i = 0;
	while ((my $pid = waitpid(-1, WNOHANG)) > 0) {
		$i++;
		my $exit_val = $? >> 8;
		my $signum   = $? & 127;
		my $core     = $? & 128;

		# do process cleanup
		$self->cleanup($pid, $exit_val, $signum, $core);
	}
	
	$self->log->debug("SIGCHLD handler cleaned up after $i process(es).")
	  if $i > 0;
}

sub _getRunStruct {
	my ($self, $opt) = @_;
	
	my $s = {
		pid          => 0,
		cmd          => undef,
		param        => undef,
		error        => undef,
		stdout_cb    => undef,
		stderr_cb    => undef,
		exit_cb      => undef,
		exec_timeout => 0,
		buf_stdout   => '',
		buf_stderr   => '',
		buf_stdres   => '',
		hdr_stdout   => undef,
		hdr_stderr   => undef,
		hdr_stdres   => undef,
	};

	# apply user defined vars...
	$s->{$_} = $opt->{$_}
		for grep { exists $s->{$_} } keys %$opt;

	return $s;
}

sub _validateRunStruct {
	my ($self, $s) = @_;

	# command?
	$self->error('Undefined command.') and return
		unless defined $s->{cmd};
	
	# check command...
	my $cmd_ref = ref $s->{cmd};
	$self->error('Zero-length command.') and return
		if $cmd_ref eq '' && length $s->{cmd} == 0;
	
	$self->error('Command can be pure scalar, arrayref or coderef.') and return
		if $cmd_ref ne '' && not $cmd_ref ~~ ['CODE', 'ARRAY'];

	# callbacks...
	$self->error("STDOUT callback defined, but is not code reference.") and return
		if defined $s->{stdout_cb} && ref $s->{stdout_cb} ne 'CODE';
	
	$self->error("STDERR callback defined, but is not code reference.") and return
		if defined $s->{stderr_cb} && ref $s->{stderr_cb} ne 'CODE';
	
	$self->error("Process exit_cb callback defined, but is not code reference.") and return
		if defined $s->{exit_cb} && ref($s->{exit_cb}) ne 'CODE';

	# exec timeout
	{ no warnings; $s->{exec_timeout} += 0; }

	return 1;
}

sub _timeout_cb {
	my ($self, $pid) = @_;
	
	my $proc = $self->get_proc($pid);
	return 0 unless $proc;
	
	# drop timer (can't hurt...)
	if (defined $proc->{id_timeout}) {
		$self->ioloop->remove($proc->{id_timeout});
		$proc->{id_timeout} = undef;
	}

	# is process still alive?
	return 0 unless kill 0, $pid;

	$self->log->debug("[process $pid]: Execution timeout ("
		.sprintf("%-.3f seconds).", $proc->{exec_timeout})
		." Killing process.");

	$proc->{stderr} .= ";Execution timeout.";
	
	# kill the motherfucker!

	unless (CORE::kill(9, $pid)) {
		$self->log->warn("[process $pid]: Unable to kill process: $!");
	}
	$proc->{hard_kill} = 1;
	$self->cleanup($pid, 0, 9, 0);

	return 1;
}

sub kill {
	my ($self, $pid, $signal) = @_;
	$signal = 15 unless defined $signal;
	
	my $proc = $self->get_proc($pid);
	return 0 unless $proc;

	# kill the process...
	unless (kill($signal, $pid)) {
		$self->error("Unable to send signal $signal to process $pid: $!");
		return 0;
	}
	
	return 1;
}

sub DESTROY {
	my ($self) = @_;
	
	# perform cleanup...
	unless ($self->is_child) {
		foreach my $pid (keys %{$self->{_data}}) {
			my $proc = $self->{_data}->{$pid};
			
			$self->log->debug("Killing subprocess $pid with SIGKILL") if $self->log;
			# kill process (HARD!)
			$self->kill($pid, 9);
	
			next unless defined $self->ioloop;
	
			# drop fds
			$self->drop_handle($pid, $_) for grep {$proc->{"hdr_$_"}} qw/stdout stderr stdres/;
	
			# fire exit callbacks (if any)
			$self->complete($pid, 1);
		}
	}

	# disable sigchld hander
	$SIG{'CHLD'} = 'IGNORE';
}

1;

=pod
 
=head1 NAME

Mojo::Run - asynchronous external command execution for Mojo

=head1 VERSION

version 0.3

=head1 SYNOPSIS

	use Mojo::Run;
	use Mojo::Log;

	my $run = Mojo::Run->new;
	$run->max_forks(10);
	$run->log(Mojo::Log->new(
	    level => 'error',
	    path  => 'log/mojo_run.log',
	));

	$run->spawn(
	    cmd => sub {
	        my $pid   = shift;
	        my $param = shift; # {a => 1, b => 2}
	        
	        my $data = {};
	        ... do something
	        return $data;
	    },
	    param => {a => 1, b => 2},
	    exec_timeout => 120, # sec
	    stdout_cb => sub {
	        my ($pid, $chunk) = @_;
	    },
	    stderr_cb => sub {
	        my ($pid, $chunk) = @_;
	    },
	    exit_cb => sub {
	        my $pid = shift;
	        my $res = shift;
	        warn $res->{result}->[0];
	    },
	);
	$run->spawn(
	    cmd => 'ps aux',
	    exit_cb => sub {
	        my $pid = shift;
	        my $res = shift;
	    },
	);
	$run->spawn(
	    cmd => ['perl', '-v'],
	    exit_cb => sub {
	        my $pid = shift;
	        my $res = shift;
	    },
	);

	$run->start;

=head1 Result

Result in B< exit_cb > is a HASH with following keys:

=over
 
=item B< cmd >

=item B< param >

=item B< exit_status >

=item B< exit_signal >

=item B< exit_core >

=item B< stdout >

=item B< stderr >

=item B< result >

=item B< time_started >

=item B< time_stopped >

=item B< time_duration_exec >

=item B< time_duration_total >

=back

=head1 SOURCE REPOSITORY

L<https://github.com/likhatskiy/Mojo-Run> 

=head1 AUTHOR

Alexey Likhatskiy, <likhatskiy@gmail.com>

=head1 LICENSE AND COPYRIGHT

Copyright (C) 2012-2013 "Alexey Likhatskiy"

This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.