Job-Machine/lib/Job/Machine/DB.pm
package Job::Machine::DB;
BEGIN {
$Job::Machine::DB::VERSION = '0.18';
}
use strict;
use warnings;
use Carp qw/croak confess/;
use DBI;
use JSON;
use constant QUEUE_PREFIX => 'jm:';
use constant RESPONSE_PREFIX => 'jmr:';
sub new {
my ($class, %args) = @_;
croak "No connect information" unless $args{dbh} or $args{dsn};
croak "invalid queue" if ref $args{queue} and ref $args{queue} ne 'ARRAY';
$args{user} ||= undef;
$args{password} ||= undef;
$args{db_attr} ||= undef;
$args{dbh} ||= DBI->connect($args{dsn},$args{user},$args{password},$args{db_attr});
$args{schema} ||= 'jobmachine';
return bless \%args, $class;
}
sub json {
my ($self) = @_;
return $self->{json} ||= JSON->new->allow_nonref;
}
sub listen {
my ($self, %args) = @_;
my $queue = $args{queue} || return undef;
my $prefix = $args{reply} ? RESPONSE_PREFIX : QUEUE_PREFIX;
for my $q (ref $queue ? @$queue : ($queue)) {
$self->{dbh}->do(qq{listen "$prefix$q";});
}
}
sub unlisten {
my ($self, %args) = @_;
my $queue = $args{queue} || return undef;
my $prefix = $args{reply} ? RESPONSE_PREFIX : QUEUE_PREFIX;
for my $q (ref $queue ? @$queue : ($queue)) {
$self->{dbh}->do(qq{unlisten "$prefix$q";});
}
}
sub notify {
my ($self, %args) = @_;
my $queue = $args{queue} || return undef;
my $payload = $args{payload};
my $prefix = $args{reply} ? RESPONSE_PREFIX : QUEUE_PREFIX;
$queue = $prefix . $queue;
my $sql = qq{SELECT pg_notify(?,?)};
my $task = $self->select_first(
sql => $sql,
data => [ $queue, $payload],
);
}
sub get_notification {
my ($self,$timeout) = @_;
my $dbh = $self->dbh;
my $notifies = $dbh->func('pg_notifies');
return $notifies;
}
sub set_listen {
my ($self,$timeout) = @_;
my $dbh = $self->dbh;
my $notifies = $dbh->func('pg_notifies');
if (!$notifies) {
my $fd = $dbh->{pg_socket};
vec(my $rfds='',$fd,1) = 1;
my $n = select($rfds, undef, undef, $timeout);
$notifies = $dbh->func('pg_notifies');
}
return $notifies || [0,0];
}
sub fetch_work_task {
my ($self,$pid) = @_;
my $queue = ref $self->{queue} ? $self->{queue} : [$self->{queue}];
$self->{current_table} = 'task';
my $elems = join(',', ('?') x @$queue);
my $sql = qq{
UPDATE
"$self->{schema}".$self->{current_table} t
SET
status=100,
modified=default
FROM
"jobmachine".class cx
WHERE
t.class_id = cx.class_id
AND
task_id = (
SELECT
min(task_id)
FROM
"$self->{schema}".$self->{current_table} t
JOIN
"jobmachine".class c
USING
(class_id)
WHERE
t.status=0
AND
c.name IN ($elems)
AND
t.run_after IS NULL
OR
t.run_after > now()
)
AND
t.status=0
RETURNING
*
;
};
my $task = $self->select_first(
sql => $sql,
data => $queue
) || return;
$self->{task_id} = $task->{task_id};
$task->{data} = $self->_decode(delete $task->{parameters});
return $task;
}
sub insert_task {
my ($self,$data,$queue) = @_;
my $class = $self->fetch_class($queue);
$self->{current_table} = 'task';
my $frozen = $self->json->encode($data);
my $sql = qq{
INSERT INTO
"$self->{schema}".$self->{current_table}
(class_id,parameters,status)
VALUES
(?,?,?)
RETURNING
task_id
};
$self->insert(sql => $sql,data => [$class->{class_id},$frozen,0]);
}
sub set_task_status {
my ($self,$status) = @_;
my $id = $self->task_id;
$self->{current_table} = 'task';
my $sql = qq{
UPDATE
"$self->{schema}".$self->{current_table}
SET
status=?
WHERE
task_id=?
};
$self->update(sql => $sql,data => [$status,$id]);
}
sub fetch_class {
my ($self,$queue) = @_;
$self->{current_table} = 'class';
my $sql = qq{
SELECT
*
FROM
"$self->{schema}".$self->{current_table}
WHERE
name=?
};
return $self->select_first(sql => $sql,data => [$queue]) || $self->insert_class($queue);
}
sub insert_class {
my ($self,$queue) = @_;
my $sql = qq{
INSERT INTO
"$self->{schema}".$self->{current_table}
(name)
VALUES
(?)
RETURNING
class_id
};
$self->select_first(sql => $sql,data => [$queue]);
}
sub insert_result {
my ($self,$data,$queue) = @_;
$self->{current_table} = 'result';
my $frozen = $self->json->encode($data);
my $sql = qq{
INSERT INTO
"$self->{schema}".$self->{current_table}
(task_id,result)
VALUES
(?,?)
RETURNING
result_id
};
$self->insert(sql => $sql,data => [$self->{task_id},$frozen]);
}
sub fetch_result {
my ($self,$id) = @_;
$self->{current_table} = 'result';
my $sql = qq{
SELECT
*
FROM
"$self->{schema}".$self->{current_table}
WHERE
task_id=?
ORDER BY
result_id DESC
};
my $result = $self->select_first(sql => $sql,data => [$id]) || return;
return $self->_decode($result->{result})->{data};
}
sub fetch_results {
my ($self,$id) = @_;
$self->{current_table} = 'result';
my $sql = qq{
SELECT
*
FROM
"$self->{schema}".$self->{current_table}
WHERE
task_id=?
ORDER BY
result_id DESC
};
my $results = $self->select_all(sql => $sql,data => [$id]) || return;
return [map { $self->_decode($_->{result}) } @{ $results } ];
}
sub _decode {
my ($self,$data) = @_;
my $resultdata;
eval {
$resultdata = $self->json->utf8(!utf8::is_utf8($resultdata))->decode($data);
};
if ($@) {
warn $@;
return;
}
return $resultdata;
}
# 1. Find started tasks that have passed the time limit, most probably because
# of a dead worker. (status 100, modified < now - max_runtime)
# 2. Trim status so task can be tried again
sub revive_tasks {
my ($self,$max) = @_;
$self->{current_table} = 'task';
my $status = 100;
my $sql = qq{
UPDATE
"$self->{schema}".$self->{current_table}
SET
status=0
WHERE
status=?
AND
modified < now() - INTERVAL '$max seconds'
};
my $result = $self->do(sql => $sql,data => [$status]);
return $result;
}
# 1. Find tasks that have failed too many times (# of result rows > $self->retries
# 2. fail them (Set status 900)
# There's a hard limit (100) for how many tasks can be failed at one time for
# performance resons
sub fail_tasks {
my ($self,$retries) = @_;
$self->{current_table} = 'result';
my $limit = 100;
my $sql = qq{
SELECT
task_id
FROM
"$self->{schema}".$self->{current_table}
GROUP BY
task_id
HAVING
count(*)>?
LIMIT ?
};
my $result = $self->select_all(sql => $sql,data => [$retries,$limit]) || return 0;
return 0 unless @$result;
my $task_ids = join ',',map {$_->{task_id}} @$result;
$self->{current_table} = 'task';
my $status = 900;
$sql = qq{
UPDATE
"$self->{schema}".$self->{current_table}
SET
status=?
WHERE
task_id IN ($task_ids)
};
$self->do(sql => $sql,data => [$status]);
return scalar @$result;
}
# 3. Find tasks that should be removed (remove_task < now)
# - delete them
# - log
sub remove_tasks {
my ($self,$after) = @_;
return 0 unless $after;
$self->{current_table} = 'task';
my $limit = 100;
my $sql = qq{
DELETE FROM
"$self->{schema}".$self->{current_table}
WHERE
modified < now() - INTERVAL '$after days'
};
my $result = $self->do(sql => $sql,data => []);
return $result;
}
sub select_first {
my ($self, %args) = @_;
my $sth = $self->dbh->prepare($args{sql}) || return 0;
unless($sth->execute(@{$args{data}})) {
my @c = caller;
print STDERR "File: $c[1] line $c[2]\n";
print STDERR $args{sql}."\n" if($args{sql});
return 0;
}
my $r = $sth->fetchrow_hashref();
$sth->finish();
return ( $r );
}
sub select_all {
my ($self, %args) = @_;
my $sth = $self->dbh->prepare($args{sql}) || return 0;
$self->set_bind_type($sth,$args{data} || []);
unless($sth->execute(@{$args{data}})) {
my @c = caller;
print STDERR "File: $c[1] line $c[2]\n";
print STDERR $args{sql}."\n" if($args{sql});
return 0;
}
my @result;
while( my $r = $sth->fetchrow_hashref) {
push(@result,$r);
}
$sth->finish();
return ( \@result );
}
sub set_bind_type {
my ($self,$sth,$data) = @_;
for my $i (0..scalar(@$data)-1) {
next unless(ref($data->[$i]));
$sth->bind_param($i+1, undef, $data->[$i]->[1]);
$data->[$i] = $data->[$i]->[0];
}
return;
}
sub do {
my ($self, %args) = @_;
my $sth = $self->dbh->prepare($args{sql}) || return 0;
$sth->execute(@{$args{data}});
my $rows = $sth->rows;
$sth->finish();
return $rows;
}
sub insert {
my ($self, %args) = @_;
my $sth = $self->dbh->prepare($args{sql}) || return 0;
$sth->execute(@{$args{data}});
my $retval = $sth->fetch()->[0];
$sth->finish();
return $retval;
}
sub update {
my $self = shift;
$self->do(@_);
return;
}
sub dbh {
return $_[0]->{dbh} || confess "No database handle";
}
sub task_id {
return $_[0]->{task_id} || confess "No task id";
}
sub disconnect {
return $_[0]->{dbh}->disconnect if $_[0]->{dbh};
}
sub DESTROY {
$_[0]->disconnect();
return;
}
1;
=pod
=head1 NAME
Job::Machine::DB
=head1 VERSION
version 0.18
=head1 NAME
Job::Machine::DB - Database class for Job::Machine
=head1 METHODS
=head2 new
my $client = Job::Machine::DB->new(
dbh => $dbh,
queue => 'queue.subqueue',
);
my $client = Job::Machine::Base->new(
dsn => @dsn,
);
=head2 set_listen
$self->listen( queue => 'queue_name' );
$self->listen( queue => \@queues, reply => 1 );
Sets up the listener. Quit listening to the named queues. If 'reply' is
passed, we unlisten to the related reply queue instead of the task queue.
Return undef immediately if no queue is provided.
=head2 unlisten
$self->unlisten( queue => 'queue_name' );
$self->unlisten( queue => \@queues, reply => 1 );
Quit listening to the named queues. If 'reply' is passed, we unlisten
to the related reply queue instead of the task queue.
Return undef immediately if no queue is provided.
=head2 notify
$self->notify( queue => 'queue_name' );
$self->notify( queue => 'queue_name', reply => 1, payload => $data );
Sends an asynchronous notification to the named queue, with an optional
payload. If 'reply' is true, then the queue names are taken to be reply.
Return undef immediately if no queue name is provided.
=head2 get_notification
my $notifies = $self->get_notification();
Retrievies the pending notifications. The return value is an arrayref where
each row looks like this:
my ($name, $pid, $payload) = @$notify;
=head1 AUTHOR
Kaare Rasmussen <kaare at cpan dot net>
=head1 COPYRIGHT AND LICENSE
This software is copyright (c) 2011 by Kaare Rasmussen.
This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.
=cut
__END__