HPC-Runner-Command/lib/HPC/Runner/Command/execute_job/Utils/Log.pm
package HPC::Runner::Command::execute_job::Utils::Log;
use MooseX::App::Role;
use namespace::autoclean;
use MooseX::Types::Path::Tiny qw/Path Paths AbsPath AbsFile/;
use IPC::Open3;
use IPC::Cmd qw[can_run];
use IO::Select;
use Symbol;
use Try::Tiny;
use Path::Tiny;
use File::Path qw(make_path remove_tree);
use File::Slurp;
with 'HPC::Runner::Command::Utils::Log';
with 'HPC::Runner::Command::execute_job::Utils::MemProfile';
##Command Log
has 'command_log' => ( is => 'rw', );
#TODO This should be changed to execute_jobs Logging
#We also have task_tags as an ArrayRef for JobDeps
has 'task_tags' => (
traits => ['Hash'],
is => 'rw',
isa => 'HashRef',
default => sub { {} },
handles => {
set_task_tag => 'set',
get_task_tag => 'get',
has_no_task_tags => 'is_empty',
num_task_tags => 'count',
delete_task_tag => 'delete',
task_tag_pairs => 'kv',
},
);
=head3 table_data
Each time we make an update to the table throw it in here
=cut
has 'table_data' => (
traits => ['Hash'],
is => 'rw',
isa => 'HashRef',
default => sub { {} },
handles => {
set_table_data => 'set',
get_table_data => 'get',
delete_table_data => 'delete',
has_no_table_data => 'is_empty',
num_table_data => 'count',
table_data_pairs => 'kv',
clear_table_data => 'clear',
},
);
#TODO Move this to App/execute_job/Log ... something to mark that this logs the
#individual processes that are executed
=head3 _log_commands
Log the commands run them. Cat stdout/err with IO::Select so we hopefully don't break things.
This example was just about 100% from the following perlmonks discussions.
http://www.perlmonks.org/?node_id=151886
You can use the script at the top to test the runner. Just download it, make it executable, and put it in the infile as
perl command.pl 1
perl command.pl 2
#so on and so forth
=cut
sub _log_commands {
my $self = shift;
my $pid = shift;
my $dt1 = DateTime->now( time_zone => 'local' );
$self->task_start_time($dt1);
#$DB::single = 2;
my $ymd = $dt1->ymd();
my $hms = $dt1->hms();
$self->clear_table_data;
$self->set_table_data( start_time => "$dt1" );
$self->set_table_data( start_time_dt => $dt1 );
my ( $cmdpid, $exitcode ) = $self->log_job;
return unless defined $cmdpid;
return unless defined $exitcode;
#TODO Make table data its own class and return it
$self->set_table_data( cmdpid => $cmdpid );
my $meta = $self->pop_note_meta;
$self->set_task_tag( $cmdpid => $meta ) if $meta;
$self->log_cmd_messages( "info",
"Finishing job " . $self->counter . " with ExitCode $exitcode",
$cmdpid );
my $dt2 = DateTime->now( time_zone => 'local' );
my $duration = $dt2 - $dt1;
my $format =
DateTime::Format::Duration->new(
pattern => ' %e days, %H hours, %M minutes, %S seconds' );
$self->log_cmd_messages( "info",
"Total execution time " . $format->format_duration($duration),
$cmdpid );
$self->log_table( $cmdpid, $exitcode, $format->format_duration($duration) );
$self->update_json_task;
return $exitcode;
}
=head3 name_log
Default is dt, jobname, counter
=cut
#TODO move to execute_jobs
sub name_log {
my $self = shift;
my $cmdpid = shift;
my $counter = $self->counter;
$self->logfile( $self->set_logfile );
$counter = sprintf( "%03d", $counter );
$self->append_logfile( "-CMD_" . $counter . "-$cmdpid.md" );
$self->set_task_tag( "$counter" => $cmdpid );
}
#TODO move to execute_jobs
sub log_table {
my $self = shift;
my $cmdpid = shift;
my $exitcode = shift;
my $duration = shift;
my $dt1 = DateTime->now( time_zone => 'local' );
my $ymd = $dt1->ymd();
my $hms = $dt1->hms();
$self->set_table_data( exit_time => "$dt1" );
$self->set_table_data( exitcode => $exitcode );
$self->set_table_data( duration => $duration );
$self->set_table_data( task_id => $self->counter );
my $version = $self->version || "0.0";
my $task_tags = "";
##TODO Update this with File::Spec
my $logfile = File::Spec->catdir( $self->logdir, $self->logfile );
if ( $self->can('task_tags') ) {
my $aref = $self->get_task_tag($cmdpid) || [];
$task_tags = join( ", ", @{$aref} ) || "";
$self->set_table_data( task_tags => $task_tags );
}
if ( $self->can('version') && $self->has_version ) {
$version = $self->version;
$self->set_table_data( version => $version );
}
my $text = '';
if ( $self->can('job_scheduler_id') && $self->can('jobname') ) {
my $schedulerid = $self->job_scheduler_id || '';
my $jobname = $self->jobname || '';
$text = <<EOF;
|$version|$schedulerid|$jobname|$task_tags|$cmdpid|$exitcode|$duration|
EOF
$self->set_table_data( schedulerid => $schedulerid );
$self->set_table_data( jobname => $jobname );
}
else {
$text = <<EOF;
|$cmdpid|$exitcode|$duration|
EOF
}
write_file( $self->process_table, { append => 1 }, $text )
|| $self->app_log->warn("Unable to write to the process table! $!");
}
#TODO move to execute_jobs
sub log_cmd_messages {
my ( $self, $level, $message, $cmdpid ) = @_;
return unless $message;
return unless $level;
if ( $self->show_processid && $cmdpid ) {
$self->command_log->$level("PID: $cmdpid\t$message");
}
else {
$self->command_log->$level($message);
}
}
#TODO move to execute_jobs
sub log_job {
my $self = shift;
#Start running job
my ( $infh, $outfh, $errfh, $exitcode, $stderr );
$errfh = gensym(); # if you uncomment this line, $errfh will
my $cmdpid;
eval { $cmdpid = open3( $infh, $outfh, $errfh, $self->cmd ); };
if ($@) {
$stderr = $@;
$exitcode = $?;
$self->app_log->fatal( "Error running job "
. $self->counter
. " with ExitCode $exitcode" );
$self->app_log->warn("There was an error running the command $@\n");
$cmdpid = 0;
}
$infh->autoflush();
# Start Command Log
$self->start_command_log($cmdpid);
$self->create_json_task($cmdpid);
##IF we have an exitcode the job failed with a command not found
return ( $cmdpid, $exitcode ) if $exitcode;
## Rolling back cmd_stats for now
# $self->get_cmd_stats($cmdpid);
my $sel = new IO::Select; # create a select object
$sel->add( $outfh, $errfh ); # and add the fhs
# while (1) {
# last unless $sel->can_read;
while ( my @ready = $sel->can_read ) {
foreach my $fh (@ready) { # loop through them
my $line;
my $len = sysread $fh, $line, 4096;
next unless defined $len;
if ( $len == 0 ) {
$sel->remove($fh);
close($fh);
}
else { # we read data alright
if ( $fh == $outfh ) {
$self->log_cmd_messages( "info", $line, $cmdpid );
}
elsif ( $fh == $errfh ) {
$self->log_cmd_messages( "error", $line, $cmdpid );
}
else {
$self->log_cmd_messages( 'fatal', "Shouldn't be here!\n" );
}
}
}
}
# $self->get_cmd_stats($cmdpid);
# sleep( $self->poll_time );
# }
waitpid( $cmdpid, 1 );
$exitcode = $? unless $exitcode;
return ( $cmdpid, $exitcode );
}
=head3 start_command_log
Initialize the command log
Print out command info - schedulerId, taskId, cmdPID, etc.
=cut
sub start_command_log {
my $self = shift;
my $cmdpid = shift;
if ( $self->single_node ) {
$self->name_log( "PID_" . $cmdpid );
}
elsif ( $self->job_scheduler_id ) {
$self->name_log(
"_SID_" . $self->job_scheduler_id . "_PID_" . $cmdpid );
}
else {
$self->name_log( "PID_" . $cmdpid );
}
$self->command_log( $self->init_log );
#$DB::single = 2;
my $log_array_msg = "";
if ( $self->can('task_id') ) {
$log_array_msg = "\nArray ID:\t" . $self->task_id . "\n";
}
else {
$log_array_msg = "\nTask ID:\t" . $self->counter . "\n";
}
$self->log_cmd_messages(
"info",
"Starting Job:\n"
. "================================================"
. "\nJobID:\t"
. $self->job_scheduler_id
. " \nCmdPID:\t"
. $cmdpid
. "\nHostname:\t"
. $self->hostname
. "\nJob Scheduler ID:\t"
. $self->job_scheduler_id
. "$log_array_msg\n",
$cmdpid
);
#TODO counter is not terribly applicable with task ids
$self->log_cmd_messages(
"info",
"Starting execution: "
. $self->counter
. "\n\nCOMMAND:\n\n"
. $self->cmd . "\n\n",
$cmdpid
);
}
sub pop_note_meta {
my $self = shift;
my $lines = $self->cmd;
return unless $lines;
my @lines = split( "\n", $lines );
my @ts = ();
foreach my $line (@lines) {
next unless $line;
next unless $line =~ m/^#TASK/;
my ( @match, $t1, $t2 );
@match = $line =~ m/TASK (\w+)=(.+)$/;
( $t1, $t2 ) = ( $match[0], $match[1] );
#$DB::single = 2;
if ($t1) {
if ( $t1 eq "tags" ) {
my @tmp = split( ",", $t2 );
map { push( @ts, $_ ) } @tmp;
}
elsif ( $t1 eq "deps" ) {
my @tmp = split( ",", $t2 );
map { push( @ts, $_ ) } @tmp;
}
else {
#We should give a warning here
$self->$t1($t2);
$self->log_main_messages( 'debug',
"Command:\n\t"
. $self->cmd
. "\nHas invalid #TASK attribute. Should be #TASK tags=thing1,thing2 or #TASK deps=thing1,thing2"
);
}
}
}
return \@ts;
}
1;