Gearman-JobScheduler/lib/Gearman/JobScheduler.pm
=head1 NAME
C<Gearman::JobScheduler> - Gearman job scheduler utilities.
=cut
package Gearman::JobScheduler;
$VERSION = '0.16';
use strict;
use warnings;
use Modern::Perl "2012";
use Gearman::JobScheduler::Configuration;
use Gearman::XS qw(:constants);
use Gearman::XS::Client;
# Hashref serializing / unserializing
use Data::Compare;
use Data::Dumper;
use JSON;
# serialize hashes with the same key order:
my $json = JSON->new->allow_nonref->canonical->utf8;
use Data::UUID;
use File::Path qw(make_path);
use Digest::SHA qw(sha256_hex);
use Carp;
use Email::MIME;
use Email::Sender::Simple qw(try_to_sendmail);
use Log::Log4perl qw(:easy);
Log::Log4perl->easy_init({
level => $DEBUG,
utf8=>1,
layout => "%d{ISO8601} [%P]: %m%n"
});
# flush sockets after every write
$| = 1;
# Max. job ID length for GJS jobs (when GJS comes up with a job ID of its own)
use constant GJS_JOB_ID_MAX_LENGTH => 256;
=head2 (static) C<job_status($function_name, $gearman_job_id[, $config])>
Get Gearman job status.
Parameters:
=over 4
=item * Gearman function name (e.g. "NinetyNineBottlesOfBeer")
=item * Gearman job ID (e.g. "H:localhost.localdomain:8")
=item * (optional) Instance of Gearman::JobScheduler::Configuration
=back
Returns hashref with the job status, e.g.:
=begin text
{
# Gearman job ID that was passed as a parameter
'gearman_job_id' => 'H:tundra.home:8',
# Whether or not the job is currently running
'running' => 1,
# Numerator and denominator of the job's progress
# (in this example, job is 1333/2000 complete)
'numerator' => 1333,
'denominator' => 2000
};
=end text
Returns undef if the job ID was not found; dies on error.
=cut
sub job_status($$;$)
{
my ($function_name, $gearman_job_id, $config) = @_;
unless ($config) {
$config = Gearman::JobScheduler::_default_configuration($function_name);
}
my $client = _gearman_xs_client($config);
my ($ret, $known, $running, $numerator, $denominator) = $client->job_status($gearman_job_id);
unless ($ret == GEARMAN_SUCCESS) {
LOGDIE("Unable to determine status for Gearman job '$gearman_job_id': " . $client->error());
}
unless ($known) {
# No such job
return undef;
}
my $response = {
'gearman_job_id' => $gearman_job_id,
'running' => $running,
'numerator' => $numerator,
'denominator' => $denominator
};
return $response;
}
=head2 (static) C<log_path_for_gearman_job($function_name, $gearman_job_handle[, $config])>
Get a path to where Gearman expects to save the job's log.
(Note: if the job is not running or finished, the log path will be empty.)
Parameters:
=over 4
=item * Gearman function name (e.g. "NinetyNineBottlesOfBeer")
=item * Gearman job ID (e.g. "H:localhost.localdomain:8")
=item * (optional) Instance of Gearman::JobScheduler::Configuration
=back
Returns log path where the job's log is being written, e.g.
"/var/log/gjs/NinetyNineBottlesOfBeer/H_tundra.local_93.NinetyNineBottlesOfBeer().log"
Returns C<undef> if no log path was found.
die()s on error.
=cut
sub log_path_for_gearman_job($$;$)
{
my ($function_name, $gearman_job_handle, $config) = @_;
unless ($config) {
$config = Gearman::JobScheduler::_default_configuration($function_name);
}
# If the job is not running, the log path will not be available
my $job_status = job_status($function_name, $gearman_job_handle, $config);
if ((! $job_status) or (! $job_status->{running})) {
WARN("Job '$gearman_job_handle' is not running; either it is finished already or hasn't started yet. "
. "Thus, the path returned might not yet exist.");
}
my $gearman_job_id = _gearman_job_id_from_handle($gearman_job_handle);
# Sanitize the ID just like run_locally() would
$gearman_job_id = _sanitize_for_path($gearman_job_id);
my $log_path_glob = _worker_log_path($function_name, $gearman_job_id, $config);
$log_path_glob =~ s/\.log$/\*\.log/;
my @log_paths = glob $log_path_glob;
if (scalar @log_paths == 0) {
INFO("Log path not found for expression: $log_path_glob");
return undef;
}
if (scalar @log_paths > 1) {
LOGDIE("Two or more logs found for expression: $log_path_glob");
}
return $log_paths[0];
}
# (static) Return an unique job ID that will identify a particular job with its
# arguments
#
# * Gearman function name, e.g. 'NinetyNineBottlesOfBeer'
# * hashref of job arguments, e.g. "{ 'how_many_bottles' => 13 }"
#
# Returns: SHA256 of the unique job ID, e.g. "18114c0e14fe5f3a568f73da16130640b1a318ba"
# (SHASUM of "NinetyNineBottlesOfBeer(how_many_bottles_=_2000)"
#
# FIXME maybe use Data::Dumper?
sub unique_job_id($$)
{
my ($function_name, $job_args) = @_;
unless ($function_name) {
return undef;
}
# Convert to string
$job_args = ($job_args and scalar keys %{ $job_args })
? join(', ', map {
$_ . ' = ' . ($job_args->{$_} // 'undef')
} sort(keys %{ $job_args }))
: '';
my $unique_id = "$function_name($job_args)";
# Gearman limits the "unique" parameter of a task to 64 bytes (see
# GEARMAN_MAX_UNIQUE_SIZE in
# https://github.com/sni/gearmand/blob/master/libgearman-1.0/limits.h)
# which is usually not enough for most Gearman functions, so we hash the
# parameter instead
$unique_id = sha256_hex($unique_id);
return $unique_id;
}
# (static) Return an unique, path-safe job name which is suitable for writing
# to the filesystem (e.g. for logging)
#
# Parameters:
# * Gearman function name, e.g. 'NinetyNineBottlesOfBeer'
# * hashref of job arguments, e.g. "{ 'how_many_bottles' => 13 }"
# * (optional) Gearman job ID, e.g.:
# * "H:tundra.home:18" (as reported by an instance of Gearman::Job), or
# * "127.0.0.1:4730//H:tundra.home:18" (as reported by gearmand)
#
# Returns: unique job ID, e.g.:
# * "084567C4146F11E38F00CB951DB7256D.NinetyNineBottlesOfBeer(how_many_bottles_=_2000)", or
# * "H_tundra.home_18.NinetyNineBottlesOfBeer(how_many_bottles_=_2000)"
sub _unique_path_job_id($$;$)
{
my ($function_name, $job_args, $gearman_job_id) = @_;
unless ($function_name) {
return undef;
}
my $unique_id;
if ($gearman_job_id) {
# If Gearman job ID was passed as a parameter, this means that the job
# was run by Gearman (by running run_on_gearman() or enqueue_on_gearman()).
# Thus, the job has to be logged to a location that can later be found
# by knowing the Gearman job ID.
# Strip the host part (if present)
$unique_id = _gearman_job_id_from_handle($gearman_job_id);
} else {
# If no Gearman job ID was provided, this means that the job is being
# run locally.
# The job's output still has to be logged somewhere, so we generate an
# UUID to serve in place of Gearman job ID.
my $ug = new Data::UUID;
my $uuid = $ug->create_str(); # e.g. "059303A4-F3F1-11E2-9246-FB1713B42706"
$uuid =~ s/\-//gs; # e.g. "059303A4F3F111E29246FB1713B42706"
$unique_id = $uuid;
}
# ID goes first in case the job name shortener decides to cut out a part of the job ID
my $gjs_job_id = $unique_id. '.' . unique_job_id($function_name, $job_args);
if (length ($gjs_job_id) > GJS_JOB_ID_MAX_LENGTH) {
$gjs_job_id = substr($gjs_job_id, 0, GJS_JOB_ID_MAX_LENGTH);
}
# Sanitize for paths
$gjs_job_id = _sanitize_for_path($gjs_job_id);
return $gjs_job_id;
}
sub _sanitize_for_path($)
{
my $string = shift;
$string =~ s/[^a-zA-Z0-9\.\-_\(\)=,]/_/gi;
return $string;
}
# Create and return a configured instance of Gearman::Client
sub _gearman_xs_client($)
{
my $config = shift;
my $client = new Gearman::XS::Client;
unless (scalar (@{$config->gearman_servers})) {
LOGDIE("No Gearman servers are configured.");
}
my $ret = $client->add_servers(join(',', @{$config->gearman_servers}));
unless ($ret == GEARMAN_SUCCESS) {
LOGDIE("Unable to add Gearman servers: " . $client->error());
}
$client->set_created_fn(sub {
my $task = shift;
DEBUG("Gearman task created: '" . $task->job_handle() . '"');
return GEARMAN_SUCCESS;
});
$client->set_data_fn(sub {
my $task = shift;
DEBUG("Data sent to Gearman task '" . $task->job_handle() . "': " . $task->data());
return GEARMAN_SUCCESS;
});
$client->set_status_fn(sub {
my $task = shift;
DEBUG("Status updated for Gearman task '" . $task->job_handle()
. "': " . $task->numerator()
. " / " . $task->denominator());
return GEARMAN_SUCCESS;
});
$client->set_complete_fn(sub {
my $task = shift;
DEBUG("Gearman task '" . $task->job_handle()
. "' completed with data: " . ($task->data() || ''));
return GEARMAN_SUCCESS;
});
$client->set_fail_fn(sub {
my $task = shift;
DEBUG("Gearman task failed: '" . $task->job_handle() . '"');
return GEARMAN_SUCCESS;
});
return $client;
}
# Return Gearman job ID from Gearman job handle
#
# Parameters:
# * Gearman job handle, e.g.:
# * "H:tundra.home:18" (as reported by an instance of Gearman::Job), or
# * "127.0.0.1:4730//H:tundra.home:18" (as reported by gearmand)
#
# Returns: Gearman job ID (e.g. "H:localhost.localdomain:8")
#
# Dies on error.
sub _gearman_job_id_from_handle($)
{
my $gearman_job_handle = shift;
my $gearman_job_id;
# Strip the host part (if present)
if (index($gearman_job_handle, '//') != -1) {
# "127.0.0.1:4730//H:localhost.localdomain:8"
my ($server, $gearman_job_id) = split('//', $gearman_job_handle);
} else {
# "H:localhost.localdomain:8"
$gearman_job_id = $gearman_job_handle;
}
# Validate
unless ($gearman_job_id =~ /^H:.+?:\d+?$/) {
LOGDIE("Invalid Gearman job ID: $gearman_job_id");
}
return $gearman_job_id;
}
# (static) Return worker log path for the function name and GJS job ID
sub _worker_log_path($$$)
{
my ($function_name, $gearman_job_id, $config) = @_;
my $log_path = _init_and_return_worker_log_dir($function_name, $config);
if ($function_name->unify_logs()) {
$log_path .= _sanitize_for_path($function_name) . '.log';
} else {
$log_path .= _sanitize_for_path($gearman_job_id) . '.log';
}
return $log_path;
}
# (static) Initialize (create missing directories) and return a worker log directory path (with trailing slash)
sub _init_and_return_worker_log_dir($$)
{
my ($function_name, $config) = @_;
my $worker_log_dir = $config->worker_log_dir;
unless ($worker_log_dir) {
LOGDIE("Worker log directory is undefined.");
}
# Add a trailing slash
$worker_log_dir =~ s!/*$!/!;
# Append the function name
$worker_log_dir .= _sanitize_for_path($function_name) . '/';
unless ( -d $worker_log_dir ) {
make_path( $worker_log_dir );
}
return $worker_log_dir;
}
# Serialize a hashref into string (to be passed to Gearman)
#
# Parameters:
# * hashref that is serializable by JSON module (may be undef)
#
# Returns:
# * a string (string is empty if the hashref is undef)
#
# Dies on error.
sub _serialize_hashref($)
{
my $hashref = shift;
unless (defined $hashref) {
return '';
}
unless (ref $hashref eq 'HASH') {
LOGDIE("Parameter is not a hashref.");
}
# Gearman accepts only scalar arguments
my $hashref_serialized = undef;
eval {
$hashref_serialized = $json->encode( $hashref );
# Try to deserialize, see if we get the same hashref
my $hashref_deserialized = $json->decode($hashref_serialized);
unless (Compare($hashref, $hashref_deserialized)) {
my $error = "Serialized and deserialized hashrefs differ.\n";
$error .= "Original hashref: " . Dumper($hashref);
$error .= "Deserialized hashref: " . Dumper($hashref_deserialized);
LOGDIE($error);
}
};
if ($@)
{
LOGDIE("Unable to serialize hashref with the JSON module: $@");
}
return $hashref_serialized;
}
# Unserialize string (coming from Gearman) back into hashref
#
# Parameters:
# * string to be unserialized; may be empty or undef
#
# Returns:
# * hashref (of the unserialized string), or
# * undef if the string is undef or empty
#
# Dies on error.
sub _unserialize_hashref($)
{
my $string = shift;
unless ($string) {
return undef;
}
my $hashref = undef;
eval {
# Unserialize
$hashref = $json->decode($string);
unless (defined $hashref) {
LOGDIE("Unserialized hashref is undefined.");
}
unless (ref $hashref eq 'HASH') {
LOGDIE("Result is not a hashref.");
}
};
if ($@)
{
LOGDIE("Unable to unserialize string '$string' with the JSON module: $@");
}
return $hashref;
}
# Returns default configuration (used in case a modified one doesn't exist)
sub _default_configuration($)
{
my $function_name = shift;
DEBUG("Will use default configuration");
# We're assuming that the Gearman function Perl module is loaded at this point
return $function_name->configuration();
}
# Send email to someone; returns 1 on success, 0 on failure
sub _send_email($$$)
{
my ( $subject, $message, $config ) = @_;
unless (scalar (@{$config->notifications_emails})) {
# No one to send mail to
return 1;
}
my $from_email = $config->notifications_from_address;
$subject = ($config->notifications_subject_prefix ? $config->notifications_subject_prefix . ' ' : '' ) . $subject;
my $message_body = <<"EOF";
Hello,
$message
--
Gearman::JobScheduler
EOF
# DEBUG("Will send email to: " . Dumper($config->notifications_emails));
# DEBUG("Subject: $subject");
# DEBUG("Message: $message_body");
foreach my $to_email (@{$config->notifications_emails})
{
my $email = Email::MIME->create(
header_str => [
From => $from_email,
To => $to_email,
Subject => $subject,
],
attributes => {
encoding => 'quoted-printable',
charset => 'UTF-8',
},
body_str => $message_body
);
unless ( try_to_sendmail( $email ) )
{
WARN("Unable to send email to $to_email: $!");
return 0;
}
}
return 1;
}
1;