disbatch/lib/Disbatch.pm
package Disbatch;
$Disbatch::VERSION = '4.103';
use 5.12.0;
use warnings;
use boolean 0.25;
use Cpanel::JSON::XS;
use Data::Dumper;
use Encode;
use File::Slurp;
use Log::Log4perl;
use MongoDB 1.0.4;
use POSIX 'setsid';
use Safe::Isa;
use Sys::Hostname;
use Time::Moment;
use Try::Tiny::Retry;
my $default_log4perl = {
level => 'DEBUG', # 'TRACE'
appenders => {
filelog => {
type => 'Log::Log4perl::Appender::File',
layout => '[%p] %d %F{1} %L %C %c> %m %n',
args => { filename => '/var/log/disbatchd.log' }, # 'disbatchd.log'
},
screenlog => {
type => 'Log::Log4perl::Appender::ScreenColoredLevels',
layout => '[%p] %d %F{1} %L %C %c> %m %n',
args => { },
}
}
};
sub new {
my $class = shift;
my $self = { @_ };
$self->{node} = hostname;
$self->{class} //= 'Disbatch';
$self->{class} = lc $self->{class};
bless $self, $class;
}
sub logger {
my ($self, $type) = @_;
my $logger = defined $type ? "$self->{class}.$type" : $self->{class};
$self->{loggers}{$logger} //= Log::Log4perl->get_logger($logger);
if (!keys %{Log::Log4perl->appenders}){
$self->{loggers}{$logger}->level($self->{config}{log4perl}{level});
my $default_layout = "[%p] %d %F{1} %L %C %c> %m %n";
for my $name (keys %{$self->{config}{log4perl}{appenders}}) {
my $ap = Log::Log4perl::Appender->new($self->{config}{log4perl}{appenders}{$name}{type}, name => $name, %{$self->{config}{log4perl}{appenders}{$name}{args}});
$ap->layout(Log::Log4perl::Layout::PatternLayout->new($self->{config}{log4perl}{appenders}{$name}{layout} // $default_layout));
$self->{loggers}{$logger}->add_appender($ap);
}
}
$self->{loggers}{$logger};
}
sub mongo {
my ($self) = @_;
return $self->{mongo} if defined $self->{mongo};
my %attributes = %{$self->{config}{attributes}};
if (keys %{$self->{config}{auth}}) {
my $username = 'plugin';
$username = 'disbatchd' if $self->{class} eq 'disbatch';
$username = 'disbatch_web' if $self->{class} eq 'disbatch::web';
$username = 'task_runner' if $self->{class} eq 'task_runner';
$username = 'queuebalance' if $self->{class} eq 'disbatch::queuebalance';
$attributes{username} = $username;
$attributes{password} = $self->{config}{auth}{$username};
$attributes{db_name} = $self->{config}{database};
}
warn "Connecting ", scalar(localtime), "\n";
$self->{mongo} = MongoDB->connect($self->{config}{mongohost}, \%attributes)->get_database($self->{config}{database}) ;
}
sub nodes { $_[0]->mongo->coll('nodes') }
sub queues { $_[0]->mongo->coll('queues') }
sub tasks { $_[0]->mongo->coll('tasks') }
sub balance { $_[0]->mongo->coll('balance') }
# loads the config file at startup.
# anything in the config file at startup is static and cannot be changed without restarting disbatchd
sub load_config {
my ($self) = @_;
if (!defined $self->{config}) {
$self->{config} = try {
Cpanel::JSON::XS->new->utf8->relaxed->decode(scalar read_file($self->{config_file}));
} catch {
$self->{config}{log4perl} = $default_log4perl;
$self->logger->logdie("Could not parse $self->{config_file}: $_");
};
# Ensure defaults:
$self->{config}{attributes} //= {};
$self->{config}{auth} //= {};
$self->{config}{gfs} //= 'auto';
$self->{config}{quiet} //= false;
$self->{config}{task_runner} //= '/usr/bin/task_runner';
$self->{config}{testing} //= false;
$self->{config}{log4perl} //= $default_log4perl;
$self->{config}{activequeues} //= [];
$self->{config}{ignorequeues} //= [];
$self->{config}{plugins} //= [];
# FIXME: validate config values
if (!defined $self->{config}{mongohost} or !defined $self->{config}{database}) {
my $error = "Both 'mongohost' and 'database' must be defined in file $self->{config_file}";
$self->logger->logdie($error);
}
}
}
# from Synacor::Disbatch::Backend
sub ensure_indexes {
my ($self) = @_;
my @task_indexes = (
{ keys => [node => 1, status => 1, queue => 1] },
{ keys => [node => 1, status => 1, queue => 1, _id => 1] },
{ keys => [node => 1, status => 1, queue => 1, _id => -1] },
{ keys => [queue => 1, status => 1] },
);
try { $self->tasks->indexes->create_many(@task_indexes) } catch { $self->logger->logdie("Could not ensure_indexes: $_") };
try {
$self->queues->indexes->create_one([ name => 1 ], { unique => true });
$self->nodes->indexes->create_one([ node => 1 ], { unique => true });
$self->mongo->coll('tasks.chunks')->indexes->create_one([ files_id => 1, n => 1 ], { unique => true });
$self->mongo->coll('tasks.files')->indexes->create_one([ filename => 1, 'metadata.task_id' => 1 ]);
} catch {
$self->logger->logdie("Could not ensure_indexes: $_")
};
}
# validates plugins for defined queues
sub validate_plugins {
my ($self) = @_;
my @queues = try { $self->queues->find->all } catch { $self->logger->error("Could not find queues: $_"); () };
for my $plugin (map { $_->{plugin} } @queues) {
next if exists $self->{plugins}{$plugin};
if ($plugin !~ /^[\w:]+$/) {
$self->logger->error("Illegal plugin value: $plugin");
} elsif (eval "require $plugin; $plugin->new->can('run');") {
$self->{plugins}{$plugin} = $plugin;
next if exists $self->{old_plugins}{$plugin};
$self->logger->info("$plugin is valid for queues");
} elsif (eval "require ${plugin}::Task; ${plugin}::Task->new->can('run');") {
$self->{plugins}{$plugin} = $plugin . '::Task';
next if exists $self->{old_plugins}{$plugin};
$self->logger->info("${plugin}::Task is valid for queues");
$self->logger->warn("Having a plugin format with a subpackage *::Task is deprecated");
} else {
$self->{plugins}{$plugin} = undef;
$self->logger->warn("Could not load $plugin, ignoring queues using it");
}
}
}
# clears plugin validation and re-runs
sub revalidate_plugins {
my ($self) = @_;
$self->{old_plugins} = $self->{plugins};
$self->{plugins} = {};
$self->validate_plugins;
}
sub scheduler_report {
my ($self) = @_;
my @result;
my @queues = $self->queues->find->all;
for my $queue (@queues) {
push @result, {
id => $queue->{_id}{value},
plugin => $queue->{plugin},
name => $queue->{name},
threads => $queue->{threads},
queued => $self->count_queued($queue->{_id}),
running => $self->count_running($queue->{_id}),
completed => $self->count_completed($queue->{_id}),
};
}
\@result;
}
sub scheduler_report_old_api {
my ($self) = @_;
my @result;
my @queues = try { $self->queues->find->all } catch { $self->logger->error("Could not find queues: $_"); () };
for my $queue (@queues) {
push @result, {
id => $queue->{_id}{value},
plugin => $queue->{plugin},
name => $queue->{name},
threads => $queue->{threads},
queued => $self->count_queued($queue->{_id}),
running => $self->count_running($queue->{_id}),
completed => $self->count_completed($queue->{_id}),
};
}
\@result;
}
# updates the nodes collection
sub update_node_status {
my ($self) = @_;
my $status = try { $self->nodes->find_one({node => $self->{node}}) // {} } catch { $self->logger->error("Could not find node: $_"); undef };
return unless defined $status;
$status->{node} = $self->{node};
$status->{timestamp} = Time::Moment->now_utc;
try { $self->nodes->update_one({node => $self->{node}}, {'$set' => $status}, {upsert => 1}) } catch { $self->logger->error("Could not update node: $_") };
}
### Synacor::Disbatch::Queue like stuff ###
# will claim and return a task for given queue, or return undef
sub claim_task {
my ($self, $queue) = @_;
$queue->{sort} //= 'default';
my $query = { '$or' => [{node => undef}, {node => -1}], status => -2, queue => $queue->{_id} };
my $update = { '$set' => {node => $self->{node}, status => -1, mtime => Time::Moment->now_utc} };
my $options;
if ($queue->{sort} eq 'fifo') {
$options->{sort} = { _id => 1 };
} elsif ($queue->{sort} eq 'lifo') {
$options->{sort} = { _id => -1 };
} elsif ($queue->{sort} ne 'default') {
$self->logger->warn("$queue->{name}: unknown sort order '$queue->{sort}' -- using default");
}
$self->{claimed_task} = try { $self->tasks->find_one_and_update($query, $update, $options) } catch { $self->logger->error("Could not claim task: $_"); undef };
}
# will unclaim and return the task document for the given OID, or return undef
sub unclaim_task {
my ($self, $task_id) = @_;
my $query = { _id => $task_id, node => $self->{node}, status => -1 };
my $update = { '$set' => {node => undef, status => -2, mtime => Time::Moment->now_utc} };
$self->logger->warn("Unclaliming task $task_id");
retry { $self->tasks->find_one_and_update($query, $update) } catch { $self->logger->error("Could not unclaim task $task_id: $_"); undef };
}
sub orphaned_tasks {
my ($self) = @_;
try { $self->tasks->update_many(
{
node => $self->{node},
status => -1,
'$or' => [
{ '$and' => [{mtime => {'$type' => 9}}, {mtime => {'$lt' => Time::Moment->now_utc->minus_minutes(5)}}] },
{ '$and' => [{mtime => {'$not' => {'$type' => 9}}}, {mtime => {'$lt' => time - 300}}] },
],
},
{'$set' => {status => -6, mtime => Time::Moment->now_utc}}
) }
catch { $self->logger->error("Could not find orphaned_tasks: $_") };
}
# will fork & exec to start a given task
# NOTE: $self->{config_file} is required
sub start_task {
my ($self, $queue, $task) = @_;
my $command = $self->{config}{task_runner};
my @args = (
'--config' => $self->{config_file},
'--task' => $task->{_id},
);
push @args, '--gfs', $self->{config}{gfs} if $self->{config}{gfs};
push @args, '--quiet' if $self->{config}{quiet};
push @args, '--testing' if $self->{config}{testing};
$self->logger->info(join ' ', $command, @args);
unless (fork) {
setsid != -1 or die "Can't start a new session: $!";
unless (exec $command, @args) {
$self->mongo->reconnect;
$self->logger->error("Could not exec '$command', unclaiming task $task->{_id} and setting threads to 0 for $queue->{name}");
retry { $self->queues->update_one({_id => $queue->{_id}}, {'$set' => {threads => 0}}) } catch { "Could not set queues to 0 for $queue->{name}: $_" };
$self->unclaim_task($task->{_id});
exit;
}
}
$self->{claimed_task} = undef;
}
sub count_tasks {
my ($self, $queue_id, $status, $node) = @_;
my $query = {};
$query->{queue} = $queue_id if defined $queue_id;
$query->{status} = $status if defined $status;
$query->{node} = $node if defined $node;
try { $self->tasks->count($query) } catch { $self->logger->error("Could not count tasks: $_"); undef };
}
sub count_queued {
my ($self, $queue_id) = @_;
$self->count_tasks($queue_id, {'$lte' => -2});
}
sub count_running {
my ($self, $queue_id) = @_;
$self->count_tasks($queue_id, {'$in' => [-1,0]});
}
sub count_node_running {
my ($self, $queue_id) = @_;
$self->count_tasks($queue_id, {'$in' => [-1,0]}, $self->{node});
}
sub count_completed {
my ($self, $queue_id) = @_;
$self->count_tasks($queue_id, {'$gte' => 1});
}
sub count_total {
my ($self, $queue_id) = @_;
$self->count_tasks($queue_id);
}
# checks if this node will process (activequeues) or ignore (ignorequeues) a queue
sub is_active_queue {
my ($self, $queue_id) = @_;
if (@{$self->{config}{activequeues}}) {
grep($queue_id->{value} eq $_, @{$self->{config}{activequeues}}) ? 1 : 0;
} elsif (@{$self->{config}{ignorequeues}}) {
grep($queue_id->{value} eq $_, @{$self->{config}{ignorequeues}}) ? 0 : 1;
} else {
1;
}
}
# will run as many tasks for each queue as allowed
sub process_queues {
my ($self) = @_;
my $revalidate_plugins = 0;
my $node = try { $self->nodes->find_one({node => $self->{node}}, {maxthreads => 1}) } catch { $self->logger->error("Could not find node: $_"); { maxthreads => 0 } };
my $node_running = $self->count_node_running({'$exists' => 1}) // 0;
return if defined $node and defined $node->{maxthreads} and $node_running >= $node->{maxthreads};
my @queues = try { $self->queues->find->all } catch { $self->logger->error("Could not find queues: $_"); () };
for my $queue (@queues) {
if ($self->{plugins}{$queue->{plugin}} and $self->is_active_queue($queue->{_id})) {
my $queue_running = $self->count_running($queue->{_id});
while (defined $queue_running and ($queue->{threads} // 0) > $queue_running and (!defined $node->{maxthreads} or $node->{maxthreads} > $node_running)) {
my $task = $self->claim_task($queue);
last unless defined $task;
$self->start_task($queue, $task);
$queue_running = $self->count_running($queue->{_id});
$node_running = $self->count_node_running({'$exists' => 1}) // 0;
}
} else {
$revalidate_plugins = 1;
}
}
$self->revalidate_plugins if $revalidate_plugins;
}
### END Synacor::Disbatch::Queue like stuff ###
# returns the file document
# throws any error
sub put_gfs {
my ($self, $content, $filename, $metadata) = @_;
$filename ||= 'unknown';
my $chunk_size = 255 * 1024;
my $file_doc = {
uploadDate => DateTime->now,
filename => $filename,
chunkSize => $chunk_size,
length => defined $content ? length encode_utf8($content) : 0,
md5 => Digest::MD5->new->add(defined $content ? encode_utf8($content) : '')->hexdigest,
};
if (defined $metadata) {
die 'metadata must be a HASH' unless ref $metadata eq 'HASH';
$file_doc->{metadata} = $metadata;
}
my $files_id = $self->mongo->coll('tasks.files')->insert_one($file_doc)->inserted_id;
my $n = 0;
for (my $n = 0; length $content; $n++) {
my $data = substr $content, 0, $chunk_size, '';
$self->mongo->coll('tasks.chunks')->insert_one({ n => $n, data => bless(\$data, 'MongoDB::BSON::String'), files_id => $files_id });
}
$files_id;
}
# returns the content as a string
# throws an error if the file document does not exist, and any other error
sub get_gfs {
my ($self, $filename_or_id, $metadata) = @_;
my $file_id;
if ($filename_or_id->$_isa('MongoDB::OID')) {
$file_id = $filename_or_id;
} else {
my $query = {};
$query->{filename} = $filename_or_id if defined $filename_or_id;
$query->{metadata} = $metadata if defined $metadata;
$file_id = $self->mongo->coll('tasks.files')->find($query)->next->{_id}; # FIXME: why is this not find_one??
}
# this does no error-checking:
my $result = $self->mongo->coll('tasks.chunks')->find({files_id => $file_id})->sort({n => 1})->result;
my $data;
while (my $chunk = $result->next) {
$data .= $chunk->{data};
}
$data;
}
DESTROY {
my ($self) = @_;
# this happens after the END block in the calling script, or if the object ever goes out of scope
}
1;
__END__
=encoding utf8
=head1 NAME
Disbatch - a scalable distributed batch processing framework using MongoDB.
=head1 VERSION
version 4.103
=head1 SUBROUTINES
=over 2
=item new(class => $class, ...)
"class" defaults to "Disbatch", and the value is then lowercased.
"node" is the hostname.
Anything else is put into $self.
=item logger($type)
Parameters: type (string, optional)
Returns a L<Log::Log4perl> object.
=item mongo
Parameters: none
Returns a L<MongoDB::Database> object.
=item nodes
Parameters: none
Returns a L<MongoDB::Collection> object for collection "nodes".
=item queues
Parameters: none
Returns a L<MongoDB::Collection> object for collection "queues".
=item tasks
Parameters: none
Returns a L<MongoDB::Collection> object for collection "tasks".
=item balance
Parameters: none
Returns a L<MongoDB::Collection> object for collection "balance".
=item load_config
Parameters: none
Loads C<< $self->{config_file} >> only if C<< $self->{config} >> is undefined.
Anything in the config file at startup is static and cannot be changed without restarting disbatchd.
Returns nothing.
=item ensure_indexes
Parameters: none
Ensures the proper MongoDB indexes are created for C<tasks>, C<tasks.files>, and C<tasks.chunks> collections.
Returns nothing.
=item validate_plugins
Parameters: none
Validates plugins for defined queues.
Returns nothing.
=item revalidate_plugins
Parameters: none
Clears plugin validation and re-runs C<validate_plugins()>.
Returns nothing.
=item scheduler_report
Parameters: none
Used by the Disbatch Command Interface to get queue information.
Returns an C<ARRAY> containing C<HASH>es of queue information.
Throws errors.
=item update_node_status
Parameters: none
Updates the node document with the current timestamp and queues as returned by C<scheduler_report()>.
Returns nothing.
=item claim_task($queue)
Parameters: queue document
Claims a task (sets status to -1 and sets node to hostname) for the given queue.
Returns a task document, or undef if no queued task found.
=item unclaim_task($task_id)
Parameters: L<MongoDB::OID> object for a task
Sets the task's node to null, status to -2, and update mtime if it has status -1 and this node's hostname.
Returns a task document, or undef if a matching task is not found.
=item orphaned_tasks
Parameters: none
Sets status to -6 for all tasks for this node with status -1 and an mtime of more than 300 seconds ago.
Returns nothing.
=item start_task($queue, $task)
Parameters: queue document, task document
Will fork and exec C<< $self->{config}{task_runner} >> to start the given task.
If the exec fails, it will set threads to 0 for the given queue and call C<unclaim_task()>.
Returns nothing.
=item count_tasks($queue_id, $status, $node)
Parameters: L<MongoDB::OID> object for a queue or a query operator value or C<undef>, a status or a query operator value or C<undef>, a node or C<undef>.
Counts all tasks for the given C<$queue_id> with given C<$status> and C<$node>.
Used by the below C<count_*> subroutines. If any of the parameters are C<undef>, they will not be added to the query.
Returns: a non-negative integer, or undef if an error.
=item count_queued($queue_id)
=item count_running($queue_id)
=item count_node_running($queue_id)
=item count_completed($queue_id)
=item count_total($queue_id)
Parameters: L<MongoDB::OID> object for a queue or a query operator value or C<undef>
Counts queued (status <= -2), running (status of 0 or -1), running on this node, completed (status >= 1), or all tasks for the given queue (status <= -2).
Returns: a non-negative integer, or undef if an error.
=item is_active_queue($queue_id)
Parameters: L<MongoDB::OID> object for a queue
Checks C<config.activequeues> if it has entries, and returns 1 if given queue is defined in it or 0 if not.
If it does not have entries, checks C<config.ignorequeues> if it has entries, and returns 0 if given queue is defined in it or 1 if not.
Returns 1 or 0.
=item process_queues
Parameters: none
Will claim and start as many tasks for each queue as allowed by the current node's C<maxthreads> and each queue's C<threads>.
Returns nothing.
=item put_gfs($content, $filename, $metadata)
Parameters: UTF-8 content to store, optional filename to store it as, optional metadata C<HASH>
Stores UTF-8 content in a custom GridFS format that stores data as strings instead of as BinData.
Returns a C<MongoDB::OID> object for the ID inserted in the C<tasks.files> collection.
=item get_gfs($filename_or_id, $metadata)
Parameters: filename or C<MongoDB::OID> object, optional metadata C<HASH>
Gets UTF-8 content from the custom GridFS format. Metadata is only used if given a filename instead of a C<MongoDB::OID> object.
Returns: content string.
=back
=head1 SEE ALSO
L<Disbatch::Web>
L<Disbatch::Roles>
L<Disbatch::Plugin::Demo>
L<disbatchd>
L<disbatch.pl>
L<task_runner>
L<disbatch-create-users>
=head1 AUTHORS
Ashley Willis <awillis@synacor.com>
Matt Busigin
=head1 COPYRIGHT AND LICENSE
This software is Copyright (c) 2016, 2019 by Ashley Willis.
This is free software, licensed under:
The Apache License, Version 2.0, January 2004