disbatch/lib/Disbatch/Web.pm
package Disbatch::Web;
$Disbatch::Web::VERSION = '4.103';
use 5.12.0;
use strict;
use warnings;
use Clone qw/clone/;
use Cpanel::JSON::XS;
use Data::Dumper;
use Disbatch;
use Exporter qw/ import /;
use File::Slurp;
use Limper::SendFile; # needed for public()
use Limper::SendJSON;
use Limper 0.015;
use MongoDB::OID 1.0.4;
use Safe::Isa;
use Scalar::Util qw/ looks_like_number /;
use Template;
use Time::Moment;
use Try::Tiny::Retry;
use URL::Encode qw/url_params_mixed/;
our @EXPORT = qw/ parse_params send_json_options template /;
my $oid_keys = [ qw/ queue / ]; # NOTE: in addition to _id
sub send_json_options { allow_blessed => 1, canonical => 1, convert_blessed => 1 }
my $tt;
# this should be compatible with Dancer's template(), except we do not support the optional settings (third value), and it was unused by RemoteControl
sub template {
my ($template, $params) = @_;
my $output = '';
$params->{perl_version} = $];
$params->{limper_version} = $Limper::VERSION;
$params->{request} = request;
$tt->process($template, $params, \$output) || die $tt->error();
headers 'Content-Type' => 'text/html';
$output;
}
my $disbatch;
sub init {
my $args = { @_ };
$disbatch = Disbatch->new(class => 'Disbatch::Web', config_file => ($args->{config_file} // '/etc/disbatch/config.json'));
$disbatch->load_config;
public ($disbatch->{config}{web_root} // '/etc/disbatch/htdocs/');
for my $plugin (keys %{$disbatch->{config}{web_extensions} // {}}) {
if ($plugin !~ /^[\w:]+$/) {
Limper::warning "Illegal plugin value: $plugin, ignored";
} elsif (eval "require $plugin") {
Limper::info "$plugin found and loaded";
no strict 'refs';
${"${plugin}::"}{init}->($disbatch, $disbatch->{config}{web_extensions}{$plugin}) if $plugin->can('init');
} else {
Limper::warning "Could not load $plugin, ignored";
}
}
require Disbatch::Web::Files; # this has a catch-all to send any matching file in the public root directory, so must be loaded last.
# the following options should be compatible with previous Dancer usage:
$tt = Template->new(ANYCASE => 1, ABSOLUTE => 1, ENCODING => 'utf8', INCLUDE_PATH => $disbatch->{config}{views_dir} // '/etc/disbatch/views/', START_TAG => '\[%', END_TAG => '%\]', WRAPPER => 'layouts/main.tt');
}
sub parse_params {
my $params = {};
if ((request->{headers}{'content-type'} // '') eq 'application/x-www-form-urlencoded') {
$params = url_params_mixed(request->{body}, 1);
} elsif ((request->{headers}{'content-type'} // '') eq 'application/json') {
$params = try { Cpanel::JSON::XS->new->utf8->decode(request->{body}) } catch { $_ };
} elsif (request->{query}) {
$params = url_params_mixed(request->{query}, 1);
}
my $options = { map { $_ => delete $params->{$_} } grep { /^\./ } keys %$params } if ref $params eq 'HASH'; # put fields starting with '.' into their own HASH
# NOTE: $options may contain: .limit .skip .count .pretty .terse .epoch
wantarray ? ($params, $options) : $params;
}
sub parse_accept {
+{ map { @_ = split(/;q=/, $_); $_[0] => $_[1] // 1 } split /,\s*/, request->{headers}{accept} // '' };
}
sub want_json {
my $accept = parse_accept;
# prefer 'text/html' over 'application/json' if equal, but default to 'application/json'
($accept->{'text/html'} // 0) >= ($accept->{'application/json'} // 1) ? 0 : 1;
}
################
#### NEW API ###
################
get '/' => sub {
# NOTE: not doing just "template 'index.tt', $params;" because not using WRAPPER here
my $tt = Template->new(ANYCASE => 1, ABSOLUTE => 1, ENCODING => 'utf8', INCLUDE_PATH => $disbatch->{config}{views_dir} // '/etc/disbatch/views/', START_TAG => '\[%', END_TAG => '%\]');
my $output = '';
my $params = { database => $disbatch->{config}{database}, web_extensions => [sort keys %{$disbatch->{config}{web_extensions} // {}}], get_routes => [ grep { m{^/} } sort keys +{@{Limper::routes('GET')}} ] };
$tt->process('index.tt', $params, \$output) || die $tt->error();
headers 'Content-Type' => 'text/html';
$output;
};
get '/info' => sub {
my $routes = Limper::routes; # WARNING: do not modify $routes, it is a footgun!
my %routes;
for my $verb (keys %$routes) {
# this takes just the even elements of @{$routes->{$verb}} and ensures they are strings, keeping their order
$routes{$verb} = [ map { $routes->{$verb}[$_*2] . "" } (0..@{$routes->{$verb}}/2-1) ];
}
my $info = { database => $disbatch->{config}{database}, web_extensions => [sort keys %{$disbatch->{config}{web_extensions} // {}}], routes => \%routes };
send_json $info, send_json_options;
};
sub datetime_to_millisecond_epoch {
int($_[0]->hires_epoch * 1000);
}
# will throw errors
sub get_nodes {
my ($filter) = @_;
$filter //= {};
my @nodes = $disbatch->nodes->find($filter)->sort({node => 1})->all;
for my $node (@nodes) {
$node->{id} = "$node->{_id}";
$node->{timestamp} = datetime_to_millisecond_epoch($node->{timestamp}) if ref $node->{timestamp} eq 'DateTime';
}
\@nodes;
}
get '/nodes' => sub {
undef $disbatch->{mongo};
my $nodes = try { get_nodes } catch { status 400; "Could not get current nodes: $_" };
if ((status() // 200) == 400) {
Limper::warning $nodes;
return send_json { error => $nodes }, send_json_options;
}
send_json $nodes, send_json_options;
};
get qr'^/nodes/(?<node>.+)' => sub {
undef $disbatch->{mongo};
my $filter = try { {_id => MongoDB::OID->new(value => $+{node})} } catch { {node => $+{node}} };
my $node = try { get_nodes($filter) } catch { status 400; "Could not get node $+{node}: $_" };
if ((status() // 200) == 400) {
Limper::warning $node;
return send_json { error => $node }, send_json_options;
}
send_json $node->[0], send_json_options;
};
# postJSON('/nodes/' + row.rowId , { maxthreads: newValue}, loadQueues);
post qr'^/nodes/(?<node>.+)' => sub {
undef $disbatch->{mongo};
my $params = parse_params;
unless (keys %$params) {
status 400;
return send_json {error => 'No params'}, send_json_options;
}
my @valid_params = qw/maxthreads/;
for my $param (keys %$params) {
unless (grep $_ eq $param, @valid_params) {
status 400;
return send_json { error => 'Invalid param', param => $param}, send_json_options;
}
}
my $node = $+{node}; # regex on next line clears $+
if (exists $params->{maxthreads} and defined $params->{maxthreads} and $params->{maxthreads} !~ /^\d+$/) {
status 400;
return send_json {error => 'maxthreads must be a non-negative integer or null'}, send_json_options;
}
my $filter = try { {_id => MongoDB::OID->new(value => $node)} } catch { {node => $node} };
my $res = try {
$disbatch->nodes->update_one($filter, {'$set' => $params});
} catch {
Limper::warning "Could not update node $node: $_";
$_;
};
my $reponse = {
ref $res => {%$res},
};
unless ($res->{matched_count} == 1) {
status 400;
if ($res->$_isa('MongoDB::UpdateResult')) {
$reponse->{error} = $reponse->{'MongoDB::UpdateResult'};
} else {
$reponse->{error} = "$res";
}
}
send_json $reponse, send_json_options;
};
# This is needed at least to create queues in the web interface.
get '/plugins' => sub {
send_json $disbatch->{config}{plugins}, send_json_options;
};
get '/queues' => sub {
undef $disbatch->{mongo};
my $queues = try { $disbatch->scheduler_report } catch { status 400; "Could not get current queues: $_" };
if ((status() // 200) == 400) {
Limper::warning $queues;
return send_json { error => $queues }, send_json_options;
}
send_json $queues, send_json_options;
};
get qr'^/queues/(?<queue>.+)$' => sub {
undef $disbatch->{mongo};
my $key = try { MongoDB::OID->new(value => $+{queue}); 'id' } catch { 'name' };
my $queues = try { $disbatch->scheduler_report } catch { status 400; "Could not get current queues: $_" };
if ((status() // 200) == 400) {
Limper::warning $queues;
return send_json { error => $queues }, send_json_options;
}
my ($queue) = grep { $_->{$key} eq $+{queue} } @$queues;
send_json $queue, send_json_options;
};
sub map_plugins {
my %plugins = map { $_ => 1 } @{$disbatch->{config}{plugins}};
\%plugins;
}
post '/queues' => sub {
undef $disbatch->{mongo};
my $params = parse_params;
unless (($params->{name} // '') and ($params->{plugin} // '')) {
status 400;
return send_json { error => 'name and plugin required' }, send_json_options;
}
my @valid_params = qw/name plugin/;
for my $param (keys %$params) {
unless (grep $_ eq $param, @valid_params) {
status 400;
return send_json { error => 'Invalid param', param => $param}, send_json_options;
}
}
unless (map_plugins->{$params->{plugin}}) {
status 400;
return send_json { error => 'Unknown plugin', plugin => $params->{plugin} }, send_json_options;
}
my $res = try { $disbatch->queues->insert_one($params) } catch { Limper::warning "Could not create queue $params->{name}: $_"; $_ };
my $reponse = {
ref $res => {%$res},
id => $res->{inserted_id},
};
unless (defined $res->{inserted_id}) {
status 400;
$reponse->{error} = "$res";
$reponse->{ref $res}{result} = { ref $reponse->{ref $res}{result} => {%{$reponse->{ref $res}{result}}} } if ref $reponse->{ref $res}{result};
}
send_json $reponse, send_json_options;
};
post qr'^/queues/(?<queue>.+)$' => sub {
my $queue = $+{queue};
undef $disbatch->{mongo};
my $params = parse_params;
my @valid_params = qw/threads name plugin/;
unless (keys %$params) {
status 400;
return send_json {error => 'no params'}, send_json_options;
}
for my $param (keys %$params) {
unless (grep $_ eq $param, @valid_params) {
status 400;
return send_json { error => 'unknown param', param => $param}, send_json_options;
}
}
if (exists $params->{plugin} and !map_plugins()->{$params->{plugin}}) {
status 400;
return send_json { error => 'unknown plugin', plugin => $params->{plugin} }, send_json_options;
}
if (exists $params->{threads} and $params->{threads} !~ /^\d+$/) {
status 400;
return send_json {error => 'threads must be a non-negative integer'}, send_json_options;
}
if (exists $params->{name} and (ref $params->{name} or !($params->{name} // ''))){
status 400;
return send_json {error => 'name must be a string'}, send_json_options;
}
my $filter = try { {_id => MongoDB::OID->new(value => $queue)} } catch { {name => $queue} };
my $res = try {
$disbatch->queues->update_one($filter, {'$set' => $params});
} catch {
Limper::warning "Could not update queue $queue: $_";
$_;
};
my $reponse = {
ref $res => {%$res},
};
unless ($res->{matched_count} == 1) {
status 400;
$reponse->{error} = "$res";
}
send_json $reponse, send_json_options;
};
del qr'^/queues/(?<queue>.+)$' => sub {
undef $disbatch->{mongo};
my $filter = try { {_id => MongoDB::OID->new(value => $+{queue})} } catch { {name => $+{queue}} };
my $res = try { $disbatch->queues->delete_one($filter) } catch { Limper::warning "Could not delete queue '$+{queue}': $_"; $_ };
my $reponse = {
ref $res => {%$res},
};
unless ($res->{deleted_count}) {
status 400;
$reponse->{error} = "$res";
}
send_json $reponse, send_json_options;
};
# returns an MongoDB::OID object of either a simple string representation of the OID or a queue name, or undef if queue not found/valid
sub get_queue_oid {
my ($queue) = @_;
my $queue_id = try {
$disbatch->queues->find_one({_id => MongoDB::OID->new(value => $queue)});
} catch {
try { $disbatch->queues->find_one({name => $queue}) } catch { Limper::warning "Could not find queue $queue: $_"; undef };
};
defined $queue_id ? $queue_id->{_id} : undef;
}
# creates a task for given queue _id and params, returning task _id
sub create_tasks {
my ($queue_id, $tasks) = @_;
my @tasks = map {
queue => $queue_id,
status => -2,
stdout => undef,
stderr => undef,
node => undef,
params => $_,
ctime => Time::Moment->now_utc,
mtime => Time::Moment->now_utc,
}, @$tasks;
my $res = try { $disbatch->tasks->insert_many(\@tasks) } catch { Limper::warning "Could not create tasks: $_"; $_ };
$res;
}
sub post_tasks {
my ($legacy_params) = @_;
undef $disbatch->{mongo};
my $params = parse_params;
# NEW:
# { "queue": queue, "params": single_task_params }
# { "queue": queue, "params": [single_task_params, another_task_params, ...] }
# { "queue": queue, "params": generic_task_params, "collection": collection, "filter": collection_filter }
$params = { params => $params } if ref $params eq 'ARRAY';
$params = { %$params, %$legacy_params } if defined $legacy_params;
my $queue_id = get_queue_oid($params->{queue});
unless (defined $queue_id) {
status 400;
return send_json { error => 'queue not found' }, send_json_options;
}
my $task_params = $params->{params};
my $keys = join(',', sort keys %$params);
# { "queue": queue, "params": single_task_params }
# NOTE: wait does anything use this??
if ($keys eq 'params,queue' and ref $task_params eq 'HASH') {
$task_params = [$task_params];
}
# { "queue": queue, "params": [single_task_params, another_task_params, ...] }
if ($keys eq 'params,queue' and ref $task_params eq 'ARRAY') {
# validate array of hash params
if (!@$task_params or grep { ref $_ ne 'HASH' } @$task_params) {
status 400;
return send_json { error => "'params' must be a JSON array of task params objects" }, send_json_options;
} elsif (grep { keys %$_ == 0 } @$task_params) {
status 400;
return send_json { error => "'params' must be a JSON array of task params objects with key/value pairs" }, send_json_options;
}
# $task_params is ready
# { "queue": queue, "params": generic_task_params, "collection": collection, "filter": collection_filter }
} elsif ($keys eq 'collection,filter,params,queue' and ref $task_params eq 'HASH') {
# validate and parse
# {"migration":"foo"}
# {"migration":"document.migration","user1":"document.username"}
if (ref $params->{filter} ne 'HASH') {
status 400;
return send_json { error => "'filter' must be a JSON object" }, send_json_options;
} elsif (!ref $params->{collection} eq '' or !$params->{collection}) {
status 400;
return send_json { error => "'collection' required and must be a scalar (string)'" }, send_json_options;
}
my @fields = grep /^document\./, values %$task_params;
my %fields = map { s/^document\.//; $_ => 1 } @fields;
my $cursor = $disbatch->mongo->coll($params->{collection})->find($params->{filter})->fields(\%fields);
# FIXME: maybe fail unless $cursor->has_next
my @tasks;
my $error;
try {
# NOTE: yes, this loads all of them into @tasks
while (my $doc = $cursor->next) {
my $task = clone $task_params;
for my $key (keys %$task) {
if ($task->{$key} =~ /^document\./) {
for my $field (@fields) {
my $f = quotemeta $field;
if ($task->{$key} =~ /^document\.$f$/) {
$task->{$key} = $doc->{$field};
}
}
}
}
push @tasks, $task;
}
} catch {
Limper::warning "Could not iterate on collection $params->{collection}: $_";
$error = "$_";
};
if (defined $error) {
status 400;
return send_json { error => $error }, send_json_options;
}
$task_params = \@tasks;
# $task_params is ready
} else {
# fail
status 400;
return send_json { error => 'invalid parameters passed' }, send_json_options;
}
my $res = create_tasks($queue_id, $task_params); # doing 100k at once only take 12 seconds on my 13" rMBP
my $reponse = {
ref $res => {%$res},
};
unless (@{$res->{inserted}}) {
status 400;
$reponse->{error} = 'Unknown error';
}
send_json $reponse, send_json_options;
};
post '/tasks' => sub {
post_tasks();
};
# NOTE: i hate this, but it should maybe be here for backcompat
sub _munge_tasks {
my ($tasks, $options) = @_;
$tasks = [$tasks] if ref $tasks eq 'HASH'; # NOTE: if $options->{'.limit'} is 1
for my $task (@$tasks) {
for my $type (qw/stdout stderr/) {
if ($options->{'.terse'}) {
$task->{$type} = '[terse mode]' if defined $task->{$type} and !$task->{$type}->$_isa('MongoDB::OID') and $task->{$type};
} elsif ($options->{'.full'} // 0 and $task->{$type}->$_isa('MongoDB::OID')) {
$task->{$type} = try { $disbatch->get_gfs($task->{$type}) } catch { Limper::warning "Could not get task $task->{_id} $type: $_"; $task->{$type} };
}
}
if ($options->{'.epoch'}) {
for my $type (qw/ctime mtime/) {
$task->{$type} = $task->{$type}->hires_epoch if ref $task->{$type} eq 'DateTime';
}
}
}
}
# FIXME: in query.tt at least toggleGroup() should run at $(document).ready() when returning a form because of invalid params, instead of only showing the limit (bug is there, not at all here)
get '/tasks' => sub {
undef $disbatch->{mongo}; # FIXME: why is this added?
my ($params, $options) = parse_params; # NOTE: $options may contain: .limit .skip .count .pretty .terse .epoch .full
$params = undef if defined $params and $params eq ''; # FIXME: maybe move to parse_params() above
my $want_json = want_json;
my $indexes = get_indexes($disbatch->tasks);
my $schema = {
verb => 'GET',
limit => 100,
title => 'Disbatch Tasks Query',
subtitle => 'Warning: this can return a LOT of data!',
params => +{ map { map { $_ => { repeatable => 'yes', type => ['string' ]} } @$_ } @$indexes },
};
if (!$want_json and !%$params and !%$options) {
my $result = { schema => $schema, indexes => $indexes };
return template 'query.tt', $result;
}
my $result = query($params, $options, $schema->{title}, $oid_keys, $disbatch->tasks, request->{path}, $want_json, $indexes);
if ($want_json) {
status 400 if ref $result ne 'ARRAY' and exists $result->{error};
_munge_tasks($result, $options);
send_json $result, send_json_options, pretty => $options->{'.pretty'} // 0;
} else {
if (exists $result->{error}) {
$result->{schema} = $schema;
$result->{schema}{error} = $result->{error};
status 400;
}
_munge_tasks($result, $options); # FIXME: do we want _munge_tasks() here too? well let's TIAS
template 'query.tt', $result;
}
};
get qr'^/tasks/(?<id>[0-9a-f]{24})$' => sub {
my $title = "Disbatch Single Task Query";
my $want_json = want_json;
my $result = query({id => $+{id}}, {'.limit' => 1}, $title, $oid_keys, $disbatch->tasks, request->{path}, $want_json, [['id']]);
if ($want_json) {
if (!keys %$result) {
status 404;
$result = { error => "no task with id $+{id}" };
} elsif (exists $result->{error}) {
status 400;
}
send_json $result, send_json_options, pretty => 1;
} else {
if (!defined $result->{result}) {
status 404;
} elsif (exists $result->{error}) {
status 400;
}
template 'query.tt', $result;
}
};
sub get_balance {
my $balance = $disbatch->balance->find_one() // { notice => 'balance document not found' };
delete $balance->{_id};
$balance->{known_queues} = [ $disbatch->queues->distinct("name")->all ];
$balance->{settings} = $disbatch->{config}{balance}; # { log => 1, verbose => 0, pretend => 0, enabled => 0 }
$balance;
}
sub post_balance {
my $params = parse_params;
# TODO: make this not all hardcoded:
my $error = try {
die join(',', sort keys %$params) unless join(',', sort keys %$params) =~ /^(?:disabled,)?max_tasks,queues$/;
if (defined $params->{disabled}) {
die unless $params->{disabled} =~ /^\d+$/;
die if $params->{disabled} and $params->{disabled} < time;
}
die unless ref $params->{queues} eq 'ARRAY';
ref $_ eq 'ARRAY' or die for @{$params->{queues}};
my @q;
my @known_queues = $disbatch->queues->distinct("name")->all;
for my $q (@{$params->{queues}}) {
ref $_ and die ref $_ for @$q;
/^[\w-]+$/ or die for @$q;
for my $e (@$q) {
grep { /^$e$/ } @known_queues or die;
}
push @q, @$q;
}
my %q = map { $_ => undef } @q;
die unless @q == keys %q;
die unless join(',', sort @q) eq join(',', sort keys %q);
die unless ref $params->{max_tasks} eq 'HASH';
/^\d+$/ or die for values %{$params->{max_tasks}};
/^[*0-6] (?:[01]\d|2[0-3]):[0-5]\d$/ or die for keys %{$params->{max_tasks}};
return undef;
} catch {
status 400;
return { status => 'failed: invalid json passed ' . $_ };
};
return $error if defined $error;
$_ += 0 for values %{$params->{max_tasks}};
$disbatch->balance->update_one({}, {'$set' => $params }, {upsert => 1});
{ status => 'success: queuebalance modified' };
};
get '/balance' => sub {
my $want_json = want_json;
if ($want_json) {
send_json get_balance(), send_json_options, pretty => 1;
} else {
template 'balance.tt', get_balance();
}
};
post '/balance' => sub {
send_json post_balance(), send_json_options;
};
# For Disbatch basic status.
# Returns hash with keys status and message.
# NOTE: this *is* disbatch (web). but we now check if any nodes are running, instead of if the web server is running on a list of hosts (as old disbatch was monolithic)
sub check_disbatch {
try {
# $nodes is an ARRAY of nodes, each HASH has a 'timestamp' field (in ms) so you can tell if it's running, as well as 'node' and 'id'
my $nodes = get_nodes;
if (!@$nodes) {
return { status => 'WARNING', message => 'No Disbatch nodes found' };
}
my $status = {};
my $now = time;
for my $node (@$nodes) {
my $timestamp = int($node->{timestamp} / 1000);
if ($timestamp + 60 < $now) {
# old
$status->{stale}{$node->{node}} = $now - $timestamp;
} else {
$status->{fresh}{$node->{node}} = $now - $timestamp;
}
}
if (keys %{$status->{fresh}}) {
return { status => 'OK', message => 'Disbatch is running on one or more nodes', nodes => $status };
} else {
return { status => 'CRITICAL', message => 'No active Disbatch nodes found', nodes => $status };
}
} catch {
return { status => 'CRITICAL', message => "Could not get current Disbatch nodes: $_" };
};
}
sub check_queuebalance {
return { status => 'OK', message => 'queuebalance disabled' } unless $disbatch->{config}{balance}{enabled};
# FIXME: return some sort of OK status if 'balance' collection doesn't exist (no QueueBalance) or $qb below is undef
my $qb = $disbatch->balance->find_one({}, {status => 1, message => 1, timestamp => 1, _id => 0});
return $qb if $qb->{status} eq 'CRITICAL' and !exists $qb->{timestamp}; # error via _mongo() # FIXME: this will never happen because rewrite (wait why??), but maybe should check for timestamp anyway
my $timestamp = delete $qb->{timestamp};
return { status => 'CRITICAL' , message => 'queuebalanced not running for ' . (time - $timestamp) . 's' } if $timestamp < time - 60;
return $qb if $qb->{status} =~ /^(?:OK|WARNING|CRITICAL)$/;
return { status => 'CRITICAL', message => 'queuebalanced unknown status', result => $qb };
}
sub checks {
my $checks = {};
if ($disbatch->{config}{monitoring}) {
$checks->{disbatch} = check_disbatch();
$checks->{queuebalance} = check_queuebalance();
} else {
$checks->{disbatch} = { status => 'OK', message => 'monitoring disabled' };
$checks->{queuebalance} = { status => 'OK', message => 'monitoring disabled' };
}
$checks;
}
get '/monitoring' => sub {
send_json checks(), send_json_options;
};
sub get_indexes {
my ($coll) = @_;
my @indexes = $coll->indexes->list->all;
my %names = map { $_->{name} =~ s/_-1(_|$)/_1$1/; $_->{name} => $_ } @indexes;
my @parsed;
for my $name (sort keys %names) {
next if grep { my $qm = quotemeta $name; $_ =~ /^$qm.+/ } keys %names; # $name is a subset of another index, so ignore it
my $count = keys %{$names{$name}{key}};
$names{$name}{name} =~ s/_1$//;
my @array = split /_1_/, $names{$name}{name}, $count;
die "Couldn't parse index: ", Cpanel::JSON::XS->new->convert_blessed->allow_blessed->pretty->encode($names{$name}) unless $count == @array;
$array[0] = '_id' if $array[0] eq '_id_'; # damn mongo for it ending in '_'
map { $array[$_] = 'id' if $array[$_] eq '_id' } 0..@array-1; # damn T::T
push @parsed, \@array;
}
\@parsed;
}
sub invalid_params {
my ($params, $indexes) = @_;
my @invalid;
param: for my $param (keys %$params) {
# 2. if param is part of an index, and every part of the index to its left is a param, it's good
for my $compound (@$indexes) {
if (grep { $param eq $_ } @$compound) {
# we know at least this param is part of this index
my $good = 1;
for my $i (@$compound) {
$good = 0 unless grep {$i eq $_ } keys %$params; # part of the prefix is not indexed
last if !$good or $i eq $param;
}
next param if $good;
}
}
# 3. otherwise, it's bad
push @invalid, $param;
}
@invalid;
}
sub params_to_query {
my ($params, $oid_keys) = @_;
# build a query:
my @and = ();
while (my ($k, $v) = each %$params) {
next if $v eq '';
if ($k eq 'id' or grep { $k eq $_ } @$oid_keys) {
# TT doesn't like keys starting with an underscore:
$k = '_id' if $k eq 'id';
# change $v into an ObjectId / ARRAY of ObectIds:
push @and, ref($v) eq 'ARRAY'
? { '$or' => [ map { MongoDB::OID->new(value => $_) } @$v ] }
: { $k => MongoDB::OID->new(value => $v) };
} elsif (looks_like_number(ref $v eq 'ARRAY' ? $v->[0] : $v)) { # NOTE: this only checks the first element in @$v
push @and, ref($v) eq 'ARRAY'
? { '$or' => [ map { { $k => 0 + $_ } } @$v ] }
: { $k => 0 + $v };
} else {
push @and, ref($v) eq 'ARRAY'
? { '$or' => [ map { { $k => $_ } } @$v ] }
: { $k => $v };
}
}
@and ? { '$and' => \@and } : {};
}
# get_indexes() invalid_params() params_to_query()
# FIXME: i hate this code
sub query {
my ($params, $options, $title, $oid_keys, $collection, $path, $raw, $indexes) = @_;
$options //= {}; # .count .limit .skip .fields
$title //= '';
$indexes //= get_indexes($collection);
my $fields = $options->{'.fields'} || {};
my $limit = $options->{'.limit'} || 0;
my $skip = $options->{'.skip'} || 0;
# FIXME: maybe move this $fields modification to parse_params()
$fields = Cpanel::JSON::XS->new->utf8->decode($fields) unless ref $fields; # NOTE: i don't like embedding json in url params # FIXME: catch and return error
$fields = { map { $_ => 1 } @$fields } if ref $fields eq 'ARRAY';
# can only query indexed fields
my @invalid_params = invalid_params($params, $indexes);
return { title => $title, path => $path, error => 'non-indexed params given', invalid_params => \@invalid_params, indexes => $indexes } if @invalid_params;
my $query = params_to_query($params, $oid_keys);
return { count => $collection->count($query) } if $options->{'.count'};
# we don't want to return the entire collection
return { title => $title, path => $path, error => 'refusing to return everything - include one or more indexed search restrictions', indexes => $indexes } unless keys %$query or $limit > 0;
my @documents = $collection->find($query)->fields($fields)->limit($limit)->skip($skip)->all;
if ($raw // 0) {
# FIXME: return [] if no @documents unless $limit == 1, then maybe return undef
return {} unless @documents;
return ($limit == 1 ? $documents[0] : \@documents);
}
# need allow_blessed for some reason because analysed value is a boolean. convert_blessed messes this up, but is needed for OIDs.
my $documents = Cpanel::JSON::XS->new->convert_blessed->allow_blessed->pretty->encode($limit == 1 ? $documents[0] : \@documents) if @documents;
my $result = {
result => $documents,
title => "$title Results",
count => scalar @documents,
limit => $limit,
skip => $skip,
params_str => join('&', map { "$_=$params->{$_}" } keys %$params), # FIXME: maybe we need $options in here too
mypath => $path,
};
$result->{json} = $documents[0] if $limit == 1;
return $result;
}
1;
__END__
=encoding utf8
=head1 NAME
Disbatch::Web - Disbatch Command Interface (JSON REST API and web browser interface to Disbatch).
=head1 VERSION
version 4.103
=head1 EXPORTED
parse_params, send_json_options, template
=head1 SUBROUTINES
=over 2
=item init(config_file => $config_file)
Parameters: path to the Disbatch config file. Default is C</etc/disbatch/config.json>.
Initializes the settings for the web server, including loading any custom routes via C<config.web_extensions> (see L<CUSTOM ROUTES> below).
Returns nothing.
=item template($template, $params)
Parameters: template (C<.tt>) file name in the C<config.views_dir> directory, C<HASH> of parameters for the template.
Creates a web page based on the passed data.
Sets C<Content-Type> to C<text/html>.
Returns the generated html document.
NOTE: this sub is automatically exported, so any package using L<Disbatch::Web> can call it.
=item parse_params
Parameters: none
Parses request parameters in the following order:
* from the request body if the Content-Type is C<application/x-www-form-urlencoded>
* from the request body if the Content-Type is C<application/json>
* from the request query otherwise
It then puts any fields starting with C<.> into their own C<HASH> C<$options>.
Returns the C<HASH> of the parsed request parameters, and if C<wantarray> also returns the C<HASH> of options.
NOTE: this sub is automatically exported, so any package using L<Disbatch::Web> can call it.
=item send_json_options
Parameters: none
Used to enable the following options when returning JSON: C<allow_blessed>, C<canonical>, and C<convert_blessed>.
Returns a C<list> of key/value pairs of options to pass to C<send_json>.
NOTE: this sub is automatically exported, so any package using L<Disbatch::Web> can call it.
=item parse_accept
Parameters: none
Parses C<Accept> header.
Returns a C<HASH> where keys are types and values are q-factor weights.
=item want_json
Parameters: none
Returns true if C<Accept> header has C<application/json> with a higher q-factor weight than C<text/html>.
Note: if not specified, C<text/html> has an assumed q-factor weight of C<0> and C<application/json> has an assumed q-factor weight of C<1>.
=item get_nodes
Parameters: none
Returns an array of node objects defined, with C<timestamp> stringified and C<id> the stringified C<_id>.
=item get_plugins
Parameters: none
Returns a C<HASH> of defined queues plugins and any defined C<config.plugins>, where values match the keys.
=item get_queue_oid($queue)
Parameters: Queue ID as a string, or queue name.
Returns a C<MongoDB::OID> object representing this queue's _id.
=item create_tasks($queue_id, $tasks)
Parameters: C<MongoDB::OID> object of the queue _id, C<ARRAY> of task params.
Creates one queued task document for the given queue _id per C<$tasks> entry. Each C<$task> entry becomes the value of the C<params> field of the document.
Returns: the repsonse object from a C<MongoDB::Collection#insert_many> request.
=item post_tasks($legacy_params)
Parameters: legacy params (optional, used by routes in Disbatch::Web::Tasks), also parses request parameters
Handles creating tasks to insert, and then creates them via C<create_tasks()>. See C<POST /tasks> below for usage.
Returns the resonse of C<create_tasks()> as JSON with the key the ref type of the response and the value the response turned into a C<HASH>,
or on error sets HTTP status to C<400> and returns JSON of C<{"error":message}>.
=item _munge_tasks($tasks, $options)
Parameters: C<ARRAY> of task documents, C<HASH> of param options
Options handled are C<.terse>, C<.full>, and C<.epoch>, all booleans.
If C<.terse>, C<stdout> and C<stderr> values of each document will be C<[terse mode]> if defined and not a L<MongoDB::OID> object.
Else if C<.full>, C<stdout> and C<stderr> values of each document will be actual content instead of L<MongoDB::OID> objects.
If C<.epoch>, C<ctime> and C<mtime> will be turned into C<hires_epoch> (ex: C<1548272576.574>) insteaad of stringified (ex: C<2019-01-23T19:42:56>) if they are C<DateTime> objects.
Returns nothing, modifies passed tasks.
=item get_balance
Parameters: none
Returns a C<HASH> of the balance doc without the C<_id> field, with the following added:
field C<known_queues> with value an C<ARRAY> of all existing queue names, field C<settings> with value the C<HASH> of C<config.balance>.
If the balance doc does not exist, the field C<notice> with value C<balance document not found> is added.
=item post_balance
Parameters: none (but parses request parameters, see C<POST /balance> below)
Sets the C<balance> document fields given in the request parameters to the given values.
Returns C<< { status => 'success: queuebalance modified' } >> on success, or C<< { status => 'failed: invalid json passed ' . $_ } >> with HTTP status of C<400> on error.
=item check_disbatch
Parameters: none
Checks if Disbatch nodes exist and determines if any have been running within the last 60 seconds.
Returns C<< { status => 'WARNING', message => 'No Disbatch nodes found' } >> if no nodes,
C<< { status => 'OK', message => 'Disbatch is running on one or more nodes', nodes => $status } >> if at least one node recently running,
or C<< { status => 'CRITICAL', message => 'No active Disbatch nodes found', nodes => $status } >> if not.
On error, returns C<< { status => 'CRITICAL', message => "Could not get current Disbatch nodes: $_" } >>.
=item check_queuebalance
Parameters: none
Checks if QueueBalance has been running within the last 60 seconds.
Returns C<< { status => 'OK', message => 'queuebalance disabled' } >> if C<config.balance.enabled> is false.
If the balance doc has C<status> of C<CRITICAL> and no C<timestamp>, returns C<< { status => 'CRITICAL', message => $message } >>.
If the C<timestamp> value is older than 60 seconds, returns C<< { status => 'CRITICAL' , message => "queuebalanced not running for ${seconds}s" } >>.
If the C<status> value is not C<OK>, C<WARNING>, or C<CRITICAL>, returns C<< { status => 'CRITICAL', message => 'queuebalanced unknown status', result => $doc } >>.
Otherwise returns the doc: C<< { status => $status, message => $message, timestamp => $timestamp } >>.
=item checks
Parameters: none
Checks the status of Disbatch and QueueBalance.
If C<config.monitoring>, calls C<check_disbatch()> and C<check_queuebalance()>.
Returns C<< { disbatch => check_disbatch() , queuebalance => check_queuebalance() } >> if C<config.monitoring> is true, otherwise
C<< { disbatch => { status => 'OK', message => 'monitoring disabled' }, queuebalance => $checks->{queuebalance} = { status => 'OK', message => 'monitoring disabled' } } >>.
=item get_indexes($coll)
Parameters: C<MongoDB::Collection>.
Returns an C<ARRAY> of C<ARRAY>s of current indexes for the given collection.
Note: C<_id> is turned into C<id> because of L<Template>.
=item invalid_params($params, $indexes)
Parameters: MongoDB query params C<HASH>, current existsing collection indexes C<HASH>
Returns a list of all params passed which do not match the given indexes. If the list is empty, the params are good.
Note: only looks at keys in C<$params>, not their values.
=item params_to_query($params, $oid_keys)
Parameters: C<HASH> form parameters for a MongoDB query, C<ARRAY> of index keys whose values are always ObjectIds, excluding C<_id>.
Turns fields from an HTTP request into a query suitable for L<MongoDB::Collection>.
=over 2
Skips key/value pairs where the value is the empty string.
If a key is C<id> or is in C<$oid_keys>, turns the value(s) which should be hex strings into L<MongoDB::OID> objects.
Otherwise if a value (or first element of an C<ARRAY> value) looks like a number, ensures the value (or elements) is a Perl number.
Any values which are C<ARRAY>s are turned into queries joined by C<$or>.
If more than one key/value pair, they are joined into an C<$and> query.
=back
Returns a query to pass a L<MongoDB::Collection> object.
=item query($params, $options, $title, $oid_keys, $collection, $path, $raw, $indexes)
Performs a MongoDB query (C<count> or C<find>).
Parameters: HTTP params (C<HASH>), options (C<HASH>), title (string), OID keys (C<ARRAY>), L<MongoDB::Collection> object,
form action path (string), return raw result (boolean), indexes (C<ARRAY> of arrays).
Form action path should be from C<< request->{path} >>.
Options can be C<.count>, C<.fields> to return, C<.limit>, and C<.skip>.
Raw and indexes key are optional -- raw defaults to 0, and indexes are queried if C<undef>.
Returns the result of the query as a C<HASH> or C<ARRAY>, or an error C<HASH>.
NOTE: I hate this code. Read it to determine the formats it might return.
=back
=head1 JSON ROUTES
NOTE: all JSON routes use C<send_json_options>, documented above.
=over 2
=item GET /info
Parameters: none.
Returns an object with the following fields: C<database> (the name of the MongoDB database used), C<web_extensions> (an array of configured web extensions for custom routes),
and C<routes> (an object where fields are HTTP verbs and values are routes in the ordered configured).
Note: new in Disbatch 4.2
=item GET /nodes
Parameters: none.
Returns an Array of node Objects defined (with C<id> the stringified C<_id>) on success, C<< { "error": "Could not get current nodes: $_" } >> on error.
Sets HTTP status to C<400> on error.
Note: new in Disbatch 4
=item GET /nodes/:node
URL: C<:node> is the C<_id> if it matches C</\A[0-9a-f]{24}\z/>, or C<node> name if it does not.
Parameters: none.
Returns node Object (with C<id> the stringified C<_id>) on success, C<< { "error": "Could not get node $node: $_" } >> on error.
Sets HTTP status to C<400> on error.
Note: new in Disbatch 4
=item POST /nodes/:node
URL: C<:node> is the C<_id> if it matches C</\A[0-9a-f]{24}\z/>, or C<node> name if it does not.
Parameters: C<< { "maxthreads": maxthreads } >>
"maxthreads" is a non-negative integer or null
Returns C<< { ref $res: Object } >> or C<< { ref $res: Object, "error": error_string_or_reponse_object } >>
Sets HTTP status to C<400> on error.
Note: new in Disbatch 4
=item GET /plugins
Parameters: none.
Returns an Array of allowed plugin names.
Should never fail.
Note: replaces /queue-prototypes-json
=item GET /queues
Parameters: none.
Returns an Array of queue Objects on success, C<< { "error": "Could not get current queues: $_" } >> on error.
Each item has the following keys: id, plugin, name, threads, queued, running, completed
Sets HTTP status to C<400> on error.
Note: replaces /scheduler-json
=item GET /queues/:queue
URL: C<:queue> is the C<_id> if it matches C</\A[0-9a-f]{24}\z/>, or C<name> if it does not.
Parameters: none.
Returns a queue Object on success, C<< { "error": "Could not get current queues: $_" } >> on error.
Each item has the following keys: id, plugin, name, threads, queued, running, completed
Sets HTTP status to C<400> on error.
=item POST /queues
Create a new queue.
Parameters: C<< { "name": name, "plugin": plugin } >>
C<name> is the desired name for the queue (must be unique), C<plugin> is the plugin name for the queue.
Returns: C<< { ref $res: Object, "id": $inserted_id } >> on success; C<< { "error": "name and plugin required" } >>,
C<< { "error": "Invalid param", "param": $param } >>, or C<< { "error": "Unknown plugin", "plugin": $plugin } >> on input error; or
C<< { ref $res: Object, "id": null, "error": "$res" } >> on MongoDB error.
Sets HTTP status to C<400> on error.
Note: replaces /start-queue-json
=item POST /queues/:queue
URL: C<:queue> is the C<_id> if it matches C</\A[0-9a-f]{24}\z/>, or C<name> if it does not.
Parameters: C<< { "name": name, "plugin": plugin, "threads": threads } >>
C<name> is the new name for the queue (must be unique), C<plugin> is the new plugin name for the queue (must be defined in the config file),
C<threads> must be a non-negative integer. Only one of C<name>, C<plugin>, and C<threads> is required, but any combination is allowed.
Returns C<< { ref $res: Object } >> or C<< { "error": error } >>
Sets HTTP status to C<400> on error.
Note: replaces /set-queue-attr-json
=item DELETE /queues/:queue
Deletes the specified queue.
URL: C<:queue> is the C<_id> if it matches C</\A[0-9a-f]{24}\z/>, or C<name> if it does not.
Parameters: none
Returns: C<< { ref $res: Object } >> on success, or C<< { ref $res: Object, "error": "$res" } >> on error.
Sets HTTP status to C<400> on error.
Note: replaces /delete-queue-json
=item GET /tasks
Parameters: anything indexed on the C<tasks> collection, as well as any dot options.
Options can be C<.count>, C<.fields> to return, query C<.limit> and C<.skip>, C<.terse> or C<.full> output, dates as C<.epoch>, and C<.pretty> print JSON result.
Performs a search of tasks, returning either JSON or a web page.
If C<want_json()> (based on the C<Accept> header), returns a JSON array (which may be pretty-printed if specified in the parameters) of task documents,
or on error an object with an C<error> field (and possibly other fields).
Otherwise, if no parameters returns a web form to perform a search of indexed fields. If parameters, returns a web page of results or error.
Sets HTTP status to C<400> on error.
Note: new in 4.2, replaces C<POST /tasks/search>
=item GET /tasks/:id
Parameters: Task OID in URL
Returns the task matching OID as JSON, or C<{ "error": "no task with id :id" }> and status C<404> if OID not found.
Or, via a web browser (based on C<Accept> header value), returns the task matching OID with some formatting, or C<No tasks found matching query> if OID not found.
=cut
=item POST /tasks
Parameters: C<{ "queue": queue, "params": [single_task_params, another_task_params, ...] }> or C< { "queue": queue, "params": generic_task_params, "collection": collection, "filter": filter }>.
C<queue> is the C<_id> if it matches C</\A[0-9a-f]{24}\z/>, or C<name> if it does not.
C<collection> is a MongoDB collection name.
C<filter> is a filter expression (query) object for the C<:collection> collection.
C<params> depends on if passing a collection and filter or not.
=over 2
If not, C<params> is an array of objects, each of which will be inserted as-is as the C<params> value in created tasks.
Otherwise, C<params> is an object of generic task params. To insert a document value from a query into the params, prefix the desired key name with C<document.> as a value.
=back
Returns: C<< { ref $res: Object } >> on success; C<< { "error": "params must be a JSON array of task params" } >>, C<< { "error": "filter and params required and must be name/value objects" } >>
or C<< { "error": "queue not found" } >> on input error; C<< { "error": "Could not iterate on collection $collection: $error" } >> on query error, or C<< { ref $res: Object, "error": "Unknown error" } >> on MongoDB error.
Sets HTTP status to C<400> on error.
Note: new in 4.2, replaces C<POST /tasks/:queue> and C<POST /tasks/:queue/:collection>
=item GET /balance
Parameters: none
Returns a web page to view and update Queue Balance settings if the C<Accept> header wants C<text/html>, otherwise returns a pretty JSON result of C<get_balance>
=item POST /balance
Parameters: C<{ "max_tasks": max_tasks, "queues": queues, "disabled": disabled }>
C<max_tasks> is a C<HASH> where keys match C</^[*0-6] (?:[01]\d|2[0-3]):[0-5]\d$/> (that is, C<0..6> or C<*> for DOW, followed by a space and a 24-hour time) and values are non-negative integers.
C<queues> is an C<ARRAY> of C<ARRAY>s of queue names which must exist
C<disabled> is a timestamp which must be in the future (optional)
Sets the C<balance> document fields given in the above parameters to the given values.
Returns JSON C<{"status":"success: queuebalance modified"}> on success, or C<{"status":"failed: invalid json passed " . $_}> with HTTP status of C<400> on error.
=item GET /monitoring
Parameters: none
Checks the status of Disbatch and QueueBalance.
Monitoring is controlled by setting C<config.monitoring> and C<config.balance.enabled>.
Returns as JSON the result of C<checks()>, documented above.
=back
=head1 CUSTOM ROUTES
You can set an object of package names and arguments (can be C<null>) to C<web_extensions> in the config file to load any custom routes and call
C<< init($disbatch, $arguments) >> if available.
Note that if a request which matches your custom route is also matched by an above route, your custom route will never be called.
If a custom route package needs to interface with Disbatch or have any arguments passed to it, it should have the following:
my $disbatch;
sub init {
($disbatch, my $args) = @_;
# do whatever you may need to do with $args
}
For examples see L<Disbatch::Web::Files> (which is automatically loaded at the end of C<init(), after any custom routes) and L<Disbatch::Web::Tasks> (not loaded by default).
=head1 BROWSER ROUTES
=over 2
=item GET /
Returns the contents of "/index.html" – the queue browser page.
=item GET qr{^/}
Returns the contents of the request path.
Note: this is loaded from L<Disbatch::Web::Files>.
=back
=head1 SEE ALSO
L<Disbatch>
L<Disbatch::Roles>
L<Disbatch::Plugin::Demo>
L<disbatchd>
L<disbatch.pl>
L<task_runner>
L<disbatch-create-users>
=head1 AUTHORS
Ashley Willis <awillis@synacor.com>
Matt Busigin
=head1 COPYRIGHT AND LICENSE
This software is Copyright (c) 2015, 2016, 2019 by Ashley Willis.
This is free software, licensed under:
The Apache License, Version 2.0, January 2004