Group
Extension

App-Netdisco/lib/App/Netdisco/JobQueue/PostgreSQL.pm

package App::Netdisco::JobQueue::PostgreSQL;

use Dancer qw/:moose :syntax :script/;
use Dancer::Plugin::DBIC 'schema';

use App::Netdisco::Util::Device 'get_denied_actions';
use App::Netdisco::Backend::Job;
use App::Netdisco::DB::ExplicitLocking ':modes';

use JSON::PP ();
use Try::Tiny;

use base 'Exporter';
our @EXPORT = ();
our @EXPORT_OK = qw/
  jq_warm_thrusters
  jq_getsome
  jq_locked
  jq_queued
  jq_lock
  jq_defer
  jq_complete
  jq_log
  jq_userlog
  jq_insert
  jq_delete
/;
our %EXPORT_TAGS = ( all => \@EXPORT_OK );

sub jq_warm_thrusters {
  my $rs = schema(vars->{'tenant'})->resultset('DeviceSkip');

  schema(vars->{'tenant'})->txn_do(sub {
    $rs->search({
      backend => setting('workers')->{'BACKEND'},
    }, { for => 'update' }, )->update({ actionset => [] });

    # on backend restart, allow one retry of all devices which have
    # reached max retry (max_deferrals)
    my $deferrals = setting('workers')->{'max_deferrals'} - 1;
    $rs->search({
      backend => setting('workers')->{'BACKEND'},
      device => { '!=' => '255.255.255.255' },
      deferrals => { '>' => $deferrals },
    }, { for => 'update' }, )->update({ deferrals => $deferrals });

    $rs->search({
      backend => setting('workers')->{'BACKEND'},
      actionset => { -value => [] }, # special syntax for matching empty ARRAY
      deferrals => 0,
    })->delete;

    # also clean out any previous backend hint
    # primeskiplist action will then run to recreate it
    $rs->search({
      backend => setting('workers')->{'BACKEND'},
      device => '255.255.255.255',
      actionset => { -value => [] }, # special syntax for matching empty ARRAY
    })->delete;
  });
}

sub jq_getsome {
  my $num_slots = shift;
  return () unless $num_slots and $num_slots > 0;

  my $jobs = schema(vars->{'tenant'})->resultset('Admin');
  my @returned = ();

  my $tasty = schema(vars->{'tenant'})->resultset('Virtual::TastyJobs')
    ->search(undef,{ bind => [
      setting('workers')->{'BACKEND'}, setting('job_prio')->{'high'},
      setting('workers')->{'BACKEND'}, setting('workers')->{'max_deferrals'},
      setting('workers')->{'retry_after'}, $num_slots,
    ]});

  while (my $job = $tasty->next) {
    if ($job->device
      and not scalar grep {$job->action eq $_} @{ setting('job_targets_prefix') }) {

      # need to handle device discovered since backend daemon started
      # and the skiplist was primed. these should be checked against
      # the various acls and have device_skip entry added if needed,
      # and return false if it should have been skipped.
      my @badactions = get_denied_actions($job->device);
      if (scalar @badactions) {
        schema(vars->{'tenant'})->resultset('DeviceSkip')->txn_do_locked(EXCLUSIVE, sub {
            schema(vars->{'tenant'})->resultset('DeviceSkip')->find_or_create({
              backend => setting('workers')->{'BACKEND'}, device => $job->device,
            },{ key => 'device_skip_pkey' })->add_to_actionset(@badactions);
        });

        # will now not be selected in a future _getsome()
        next if scalar grep {$_ eq $job->action} @badactions;
      }
    }

    # remove any duplicate jobs, incuding possibly this job if there
    # is already an equivalent job running

    # note that the self-removal of a job has an unhelpful log: it is
    # reported as a duplicate of itself! however what's happening is that
    # netdisco has seen another running job with same params (but the query
    # cannot see that ID to use it in the message).

    my %job_properties = (
      action => $job->action,
      port   => $job->port,
      subaction => $job->subaction,
      -or => [
        { device => $job->device },
        ($job->device_key ? ({ device_key => $job->device_key }) : ()),
      ],
      # never de-duplicate user-submitted jobs
      username => { '=' => undef },
      userip   => { '=' => undef },
    );

    my $gone = $jobs->search({
      status => 'queued',
      -and => [
        %job_properties,
        -or => [{
          job => { '<' => $job->id },
        },{
          job => $job->id,
          -exists => $jobs->search({
            job => { '>' => $job->id },
            status => 'queued',
            backend => { '!=' => undef },
            started => \[q/> (LOCALTIMESTAMP - ?::interval)/, setting('jobs_stale_after')],
            %job_properties,
          })->as_query,
        }],
      ],
    }, { for => 'update' })
        ->update({ status => 'info', log => (sprintf 'duplicate of %s', $job->id) });

    debug sprintf 'getsome: cancelled %s duplicate(s) of job %s', ($gone || 0), $job->id;
    push @returned, App::Netdisco::Backend::Job->new({ $job->get_columns });
  }

  return @returned;
}

sub jq_locked {
  my @returned = ();
  my $rs = schema(vars->{'tenant'})->resultset('Admin')->search({
    status  => 'queued',
    backend => setting('workers')->{'BACKEND'},
    started => \[q/> (LOCALTIMESTAMP - ?::interval)/, setting('jobs_stale_after')],
  });

  while (my $job = $rs->next) {
      push @returned, App::Netdisco::Backend::Job->new({ $job->get_columns });
  }
  return @returned;
}

sub jq_queued {
  my $job_type = shift;

  return schema(vars->{'tenant'})->resultset('Admin')->search({
    device => { '!=' => undef},
    action => $job_type,
    status => 'queued',
  })->get_column('device')->all;
}

sub jq_lock {
  my $job = shift;
  return true unless $job->id;
  my $happy = false;

  # lock db row and update to show job has been picked
  try {
    my $updated = schema(vars->{'tenant'})->resultset('Admin')
      ->search({ job => $job->id, status => 'queued' }, { for => 'update' })
      ->update({
          status  => 'queued',
          backend => setting('workers')->{'BACKEND'},
          started => \"LOCALTIMESTAMP",
      });

    $happy = true if $updated > 0;
  }
  catch {
    error $_;
  };

  return $happy;
}

sub jq_defer {
  my $job = shift;
  my $happy = false;

  # note this taints all actions on the device. for example if both
  # macsuck and arpnip are allowed, but macsuck fails 10 times, then
  # arpnip (and every other action) will be prevented on the device.

  # seeing as defer is only triggered by an SNMP connect failure, this
  # behaviour seems reasonable, to me (or desirable, perhaps).

  # the deferrable_actions setting exists as a workaround to this behaviour
  # should it be needed by any action (that is, per-device action but
  # do not increment deferrals count and simply try to run again).

  try {
    schema(vars->{'tenant'})->resultset('DeviceSkip')->txn_do_locked(EXCLUSIVE, sub {
      if ($job->device
          and not scalar grep { $job->action eq $_ }
                              @{ setting('deferrable_actions') || [] }) {

        schema(vars->{'tenant'})->resultset('DeviceSkip')->find_or_create({
          backend => setting('workers')->{'BACKEND'}, device => $job->device,
        },{ key => 'device_skip_pkey' })->increment_deferrals;
      }

      debug sprintf 'defer: job %s', ($job->id || 'unknown');

      # lock db row and update to show job is available
      schema(vars->{'tenant'})->resultset('Admin')
        ->search({ job => $job->id }, { for => 'update' })
        ->update({
            device => $job->device, # if job had alias this sets to canonical
            status => 'queued',
            backend => undef,
            started => undef,
            log => $job->log,
        });
    });
    $happy = true;
  }
  catch {
    error $_;
  };

  return $happy;
}

sub jq_complete {
  my $job = shift;
  my $happy = false;

  # lock db row and update to show job is done/error

  # now that SNMP connect failures are deferrals and not errors, any complete
  # status, whether success or failure, indicates an SNMP connect. reset the
  # connection failures counter to forget about occasional connect glitches.

  try {
    schema(vars->{'tenant'})->resultset('DeviceSkip')->txn_do_locked(EXCLUSIVE, sub {
      if ($job->device and not $job->is_offline
            and not scalar grep {$job->action eq $_} @{ setting('job_targets_prefix') }) {

        schema(vars->{'tenant'})->resultset('DeviceSkip')->find_or_create({
          backend => setting('workers')->{'BACKEND'}, device => $job->device,
        },{ key => 'device_skip_pkey' })->update({ deferrals => 0 });
      }

      schema(vars->{'tenant'})->resultset('Admin')
        ->search({ job => $job->id }, { for => 'update' })
        ->update({
          status => $job->status,
          log    => (ref($job->log) eq ref('')) ? $job->log : '',
          started  => $job->started,
          finished => $job->finished,
          (($job->action eq 'hook') ? (subaction => $job->subaction) : ()),
          ($job->only_namespace ? (action => ($job->action .'::'. $job->only_namespace)) : ()),
        });
    });
    $happy = true;
  }
  catch {
    # use DDP; p $job;
    error $_;
  };

  return $happy;
}

sub jq_log {
  return schema(vars->{'tenant'})->resultset('Admin')->search({
    (param('backend') ? ('me.backend' => param('backend')) : ()),
    (param('action') ? ('me.action' => param('action')) : ()),
    (param('device') ? (
      -or => [
        { 'me.device' => param('device') },
        { 'target.ip' => param('device') },
      ],
    ) : ()),
    (param('username') ? ('me.username' => param('username')) : ()),
    (param('status') ? (
      (param('status') eq 'Running') ? (
      -and => [
        { 'me.backend' => { '!=' => undef } },
        { 'me.status'  => 'queued' },
      ],
      ) : (
      'me.status' => lc(param('status'))
      )
    ) : ()),
    (param('duration') ? (
      -bool => [
        -or => [
          {
            'me.finished' => undef,
            'me.started'  => { '<' => \[q{(CURRENT_TIMESTAMP - ? ::interval)}, param('duration') .' minutes'] },
          },
          -and => [
            { 'me.started'  => { '!=' => undef } },
            { 'me.finished' => { '!=' => undef } },
            \[ q{ (me.finished - me.started) > ? ::interval }, param('duration') .' minutes'],
          ],
        ],
      ],
    ) : ()),
    'me.log' => [
      { '=' => undef },
      { '-not_like' => 'duplicate of %' },
    ],
  }, {
    prefetch => 'target',
    order_by => { -desc => [qw/entered device action/] },
    rows     => (setting('jobs_qdepth') || 50),
  })->with_times->hri->all;
}

sub jq_userlog {
  my $user = shift;
  return schema(vars->{'tenant'})->resultset('Admin')->search({
    username => $user,
    log      => { '-not_like' => 'duplicate of %' },
    finished => { '>' => \"(CURRENT_TIMESTAMP - interval '5 seconds')" },
  })->with_times->all;
}

sub jq_insert {
  my $jobs = shift;
  $jobs = [$jobs] if ref [] ne ref $jobs;

  my $happy = false;
  try {
    schema(vars->{'tenant'})->txn_do(sub {
      if (scalar @$jobs == 1 and defined $jobs->[0]->{device} and
          scalar grep {$_ eq $jobs->[0]->{action}} @{ setting('_inline_actions') || [] }) {

          # bit of a hack for heroku hosting to avoid DB overload
          return true if setting('defanged_admin') ne 'admin';

          my $spec = $jobs->[0];
          my $row = undef;

          if ($spec->{port}) {
              $row = schema(vars->{'tenant'})->resultset('DevicePort')
                                             ->find($spec->{port}, $spec->{device});
              undef $row unless
                scalar grep {('cf_'. $_) eq $spec->{action}}
                            grep {defined}
                            map {$_->{name}}
                            @{ setting('custom_fields')->{device_port} || [] };
          }
          else {
              $row = schema(vars->{'tenant'})->resultset('Device')
                                             ->find($spec->{device});
              undef $row unless
                scalar grep {('cf_'. $_) eq $spec->{action}}
                            grep {defined}
                            map {$_->{name}}
                            @{ setting('custom_fields')->{device} || [] };
          }

          die 'failed to find row for custom field update' unless $row;

          my $coder = JSON::PP->new->utf8(0)->allow_nonref(1)->allow_unknown(1);
          $spec->{subaction} = $coder->encode( $spec->{extra} || $spec->{subaction} );
          $spec->{action} =~ s/^cf_//;
          $row->make_column_dirty('custom_fields');
          $row->update({
            custom_fields => \['jsonb_set(custom_fields, ?, ?)'
                              => (qq{{$spec->{action}}}, $spec->{subaction}) ]
            })->discard_changes();
      }
      else {
          schema(vars->{'tenant'})->resultset('Admin')->populate([
            map {{
                device     => $_->{device},
                device_key => $_->{device_key},
                port       => $_->{port},
                action     => $_->{action},
                subaction  => ($_->{extra} || $_->{subaction}),
                username   => $_->{username},
                userip     => $_->{userip},
                status     => 'queued',
            }} @$jobs
          ]);
      }
    });
    $happy = true;
  }
  catch {
    error $_;
  };

  return $happy;
}

sub jq_delete {
  my $id = shift;

  if ($id) {
      schema(vars->{'tenant'})->txn_do(sub {
        schema(vars->{'tenant'})->resultset('Admin')->search({ job => $id })->delete;
      });
  }
  else {
      schema(vars->{'tenant'})->txn_do(sub {
        schema(vars->{'tenant'})->resultset('Admin')
          ->search({ action => { '!=' => 'primeskiplist'} })->delete();
      });
  }
}

true;


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.