Forks-Queue/lib/Forks/Queue/SQLite.pm
package Forks::Queue::SQLite;
use strict;
use warnings;
use Carp;
use JSON;
use DBI;
use DBD::SQLite;
use Time::HiRes 'time';
use base 'Forks::Queue';
use 5.010; # implementation contains // //= operators
our $VERSION = '0.15';
our ($DEBUG,$XDEBUG);
*DEBUG = \$Forks::Queue::DEBUG;
*XDEBUG = \$Forks::Queue::XDEBUG;
$SIG{IO} = sub { } if $Forks::Queue::NOTIFY_OK;
our $jsonizer = JSON->new->allow_nonref(1)->ascii(1);
sub new {
my $class = shift;
my %opts = (%Forks::Queue::OPTS, @_);
my $_DEBUG = $DEBUG || $opts{debug};
if ($opts{join} && !$opts{db_file}) {
croak "Forks::Queue::SQLite: db_file opt required with join";
}
if ($opts{file} && !$opts{db_file}) {
carp "file => passed to Forks::Queue::SQLite constructor! ",
"You probably meant db_file => ... !";
}
$opts{db_file} //= _impute_file(); # $opts{file} = $opts{db_file};
$opts{limit} //= -1;
$opts{on_limit} //= 'fail';
$opts{style} //= 'fifo';
my $list = delete $opts{list};
if (!$opts{join} && -f $opts{db_file}) {
carp "Forks::Queue: sqlite db file $opts{db_file} already exists!";
}
my $exists = $opts{join} && -f $opts{db_file};
$opts{_pid} = [ $$, TID() ];
# process id is tied to database handle. If process id doesn't match
# $self->{_pid}, we must open a new process id.
my $self = bless { %opts }, $class;
if (!$exists) {
# sometimes unlink doesn't work -- just on NTFS or on other
# systems, too?
my $z1 = -f $opts{db_file};
my $z2 = unlink $opts{db_file};
$_DEBUG && print STDERR "Creating new database at ",
$opts{db_file}," (unlink=$z1 ^ $z2)\n";
my $dbh = DBI->connect("dbi:SQLite:dbname=" . $opts{db_file},
"", "");
$self->{_dbh} = $opts{_dbh} = $dbh;
if ($z1 && !$z2) {
# file is not fresh. Reinitialize.
my $z3 = $dbh->do("DROP TABLE the_queue");
my $z4 = $dbh->do("DROP TABLE pids");
my $z5 = $dbh->do("DROP TABLE status");
$_DEBUG && print STDERR "Could not erase existing db file ",
$opts{db_file}, " for reuse ... reinitialized $z3 $z4 $z5\n";
}
if (!eval { $self->_init }) {
carp __PACKAGE__, ": db initialization failed";
return;
}
} else {
$_DEBUG && print STDERR "Using existing database at ",$opts{db_file},"\n";
$self->_dbh;
}
if (defined($list)) {
if (ref($list) eq 'ARRAY') {
$self->push( @$list );
} else {
carp "Forks::Queue::new: 'list' option must be an array ref";
}
}
return $self;
}
# wrapper for database operations I expect to succeed, but may fail with
# intermittent synchronization issues ("attempt to write to a readonly
# database...") on perl v5.10 and v5.12. Pausing and retrying the operation
# generally fixes these issues.
sub _try {
my ($count, $code) = @_;
$count = 1 if $] >= 5.014;
my $z = $code->();
my ($f0,$f1) = (1,1);
while (!$z) {
last if --$count <= 0;
($f0,$f1)=($f1,$f0+$f1);
my (undef,undef,$lcaller) = caller(0);
$DEBUG && print STDERR "retry after ${f0}s: $lcaller\a\n";
sleep $f0;
$z = $code->();
}
return $z;
}
sub _init {
my $self = shift;
my $dbh = $self->{_dbh};
my $z1 = $dbh->do("CREATE TABLE the_queue (
timestamp decimal(27,15), batchid mediumint,
item text)");
if (!$z1) {
carp __PACKAGE__, ": error creating init table";
return;
}
my $z2 = $dbh->do("CREATE TABLE pids (pid mediumint,tid mediumint)");
if (!$z2) {
carp __PACKAGE__, ": error creating init table";
return;
}
my $sth = $dbh->prepare("INSERT INTO pids VALUES (?,?)");
my $z3 = $sth->execute(@{$self->{_pid}});
if (!$z3) {
carp __PACKAGE__, ": error adding process id to tracker";
return;
}
my $z4 = $dbh->do("CREATE TABLE status(key text,value text)");
if (!$z4) {
carp __PACKAGE__, ": error creating init table";
return;
}
$self->_status("db_file", $self->{db_file});
$self->_status("owner", "@{$self->{_pid}}");
$self->_status("style", $self->{style});
$self->_status("limit", $self->{limit});
$self->_status("on_limit", $self->{on_limit});
return 1;
}
sub TID { $INC{'threads.pm'} ? threads->tid : 0 }
sub _dbh {
my $self = shift;
my $tid = TID();
if ($self->{_dbh} && $$ == $self->{_pid}[0] && $tid == $self->{_pid}[1]) {
return $self->{_dbh};
}
if (Forks::Queue::__inGD()) {
# database already destroyed? Don't try to recreate in GD.
return;
}
$self->{_pid} = [$$,$tid];
$self->{_dbh} =
DBI->connect("dbi:SQLite:dbname=".$self->{db_file},"","");
$self->{_dbh}{AutoCommit} = 1;
if (!$self->{_DESTROY}) {
$self->{_dbh}->begin_work;
$self->{_dbh}->do("DELETE FROM pids WHERE pid=$$ AND tid=$tid");
$self->{_dbh}->do("INSERT INTO pids VALUES ($$,$tid)");
$self->{_dbh}->commit;
$self->{style} = $self->_status("style");
$self->{limit} = $self->_status("limit");
$self->{on_limit} = $self->_status("on_limit");
}
return $self->{_dbh};
}
# DESTROY responsibilities for F::Q::SQLite:
# remove pid+tid from pids table
# disconnect from database
# delete database file if last pid/tid and !persist
sub DESTROY {
my $self = shift;
$self->{_DESTROY}++;
my $_DEBUG = $self->{debug} // $DEBUG;
my $dbh;
if (Forks::Queue::__inGD()) {
return unless eval { $dbh = $self->_dbh; 1 };
} else {
$dbh = $self->_dbh;
}
my $tid = $self->{_pid} ? $self->{_pid}[1] : TID();
my $t = [[-1]];
my $pid_rm = $dbh && eval {
$dbh->{PrintWarn} = # suppress "attempt to write ..."
$dbh->{PrintError} = 0; # warnings, particularly on 5.010, 5.012
$dbh->begin_work;
my $z1 = _try(3, sub {
$dbh->do("DELETE FROM pids WHERE pid=$$ AND tid=$tid") } );
if ($z1) {
my $sth = $dbh->prepare("SELECT COUNT(*) FROM pids");
my $z2 = $sth->execute;
$t = $sth->fetchall_arrayref;
} else {
$_DEBUG && print STDERR "$$ DESTROY: DELETE FROM pids failed\n";
$t = [[-2]];
}
$dbh->commit;
$_DEBUG and print STDERR "$$ DESTROY npids=$t->[0][0]\n";
1;
};
$dbh && eval { $dbh->disconnect };
if ($t && $t->[0] && $t->[0][0] == 0) {
$_DEBUG and print STDERR "$$ Unlinking files from here\n";
if (!$self->{persist}) {
sleep 1;
unlink $self->{db_file};
}
} else {
}
}
sub _status {
# if transactions are desired, they must be provided by the caller
my $self = shift;
my $dbh = $self->_dbh;
return if !$dbh && $self->{_DESTROY};
if (@_ == 1) {
my $sth = $dbh->prepare("SELECT value FROM status WHERE key=?");
if (!$sth && $self->{_DESTROY}) {
warn "prepare failed in global destruction: $$";
return;
}
my $key = $_[0];
my $z = _try( 3, sub { $sth->execute($key) } );
if (!$z) {
carp __PACKAGE__, ": lookup on status key '$_[0]' failed";
return;
}
my $t = $sth->fetchall_arrayref;
if (@$t == 0) {
return; # no value
}
return $t->[0][0];
} elsif (@_ == 2) {
my ($key,$value) = @_;
my $sth1 = $dbh->prepare("DELETE FROM status WHERE key=?");
my $sth2 = $dbh->prepare("INSERT INTO status VALUES(?,?)");
my $z1 = _try( 3, sub { $sth1->execute($key) } );
my $z2 = $z1 && _try( 5, sub { $sth2->execute($key,$value) } );
return $z1 && $z2;
} else {
croak "Forks::Queue::SQLite: wrong number of args to _status call";
}
return;
}
sub end {
my $self = shift;
my $dbh = $self->_dbh;
my $end = $self->_end;
if ($end) {
carp "Forks::Queue: end() called from $$, ",
"previously called from $end";
}
if (!$end) {
$dbh->begin_work;
$self->_status("end",$$);
$dbh->commit;
}
$self->_notify;
return;
}
sub _end {
my $self = shift;
return $self->{_end} ||= $self->_status("end");
# XXX - can end condition be cleared? Not yet, but when it can,
# this code will have to change
}
# MagicLimit: a tie class to allow $q->limit to work as an lvalue
sub Forks::Queue::SQLite::MagicLimit::TIESCALAR {
my ($pkg,$obj) = @_;
return bless \$obj,$pkg;
}
sub Forks::Queue::SQLite::MagicLimit::FETCH {
$XDEBUG && print STDERR "MagicLimit::FETCH => ",${$_[0]}->{limit},"\n";
return ${$_[0]}->{limit};
}
sub Forks::Queue::SQLite::MagicLimit::STORE {
my ($tie,$val) = @_;
$XDEBUG && print STDERR "MagicLimit::STORE => $val\n";
my $queue = $$tie;
my $oldval = $queue->{limit};
$queue->{limit} = $val;
my $dbh = $queue->_dbh;
$dbh->begin_work;
$queue->_status("limit",$val);
$dbh->commit;
return $oldval;
}
sub limit :lvalue {
my $self = shift;
if (!$self->{_limit_magic}) {
tie $self->{_limit_magic}, 'Forks::Queue::SQLite::MagicLimit', $self;
$XDEBUG && print STDERR "tied \$self->\{_limit_magic\}\n";
}
if (@_) {
$self->_dbh->begin_work;
$XDEBUG && print STDERR "setting _limit_magic to $_[0]\n";
$self->_status("limit", shift);
if (@_) {
$XDEBUG && print STDERR "setting on_limit to $_[0]\n";
$self->_status("on_limit", $self->{on_limit} = $_[0]);
}
$self->_dbh->commit;
} else {
$self->{limit} = $self->_status("limit");
$XDEBUG && print STDERR "updating {limit} to $self->{limit}\n";
}
return $self->{_limit_magic};
}
sub status {
my $self = shift;
my $dbh = $self->_dbh;
my $status = {};
my $sth = $dbh->prepare("SELECT key,value FROM status");
my $z = $sth->execute;
my $tt = $sth->fetchall_arrayref;
foreach my $t (@$tt) {
$status->{$t->[0]} = $t->[1];
}
$status->{avail} = $self->_avail; # update {count}, {avail}
$status->{end} //= 0;
return $status;
}
sub _avail {
# if transactions are needed, set them up in the caller
my ($self,$dbh) = @_;
$dbh ||= $self->_dbh;
return unless $dbh;
my $sth = $dbh->prepare("SELECT COUNT(*) FROM the_queue");
return unless $sth;
my $z = $sth->execute;
my $tt = $sth->fetchall_arrayref;
return $self->{avail} = $tt->[0][0];
}
sub _maintain {
my ($self) = @_;
return;
}
sub push {
my ($self,@items) = @_;
$self->_push(+1,@items);
}
sub enqueue {
my ($self,@items) = @_;
my $tfactor = +1;
my (@deferred_items,$failed_items);
my $pushed = 0;
my $_DEBUG = $self->{debug} // $DEBUG;
if ($self->_end) {
carp "Forks::Queue: put call from process $$ ",
"after end call from process " . $self->{_end};
return 0;
}
my $limit = $self->{limit};
$limit = 9E9 if $self->{limit} <= 0;
my $dbh = $self->_dbh;
$dbh->begin_work;
my $stamp = Time::HiRes::time;
my $id = $self->_batch_id($stamp,$dbh);
# For Thread::queue compatibility, enqueue puts all items on
# the queue without blocking if there is even one free space,
if (@items && $self->_avail < $limit) {
foreach my $item (@items) {
$self->_add($item, $stamp, $id++);
$pushed++;
}
@items = ();
}
$dbh->commit;
if (@items > 0) {
@deferred_items = @items;
$failed_items = @deferred_items;
}
$self->_notify if $pushed;
if ($failed_items) {
if ($self->{on_limit} eq 'fail') {
carp "Forks::Queue: queue buffer is full ",
"and $failed_items items were not added";
} else {
$_DEBUG && print STDERR "$$ $failed_items on enqueue. ",
"Waiting for capacity\n";
$self->_wait_for_capacity;
$_DEBUG && print STDERR "$$ got some capacity\n";
$pushed += $self->enqueue(@deferred_items);
}
}
return $pushed;
}
sub unshift {
my ($self,@items) = @_;
$self->_push(-1,@items);
}
sub _add {
# do not use transactions here!
# if they are needed, call begin_work/commit from the caller
my ($self,$item,$timestamp,$id) = @_;
my $jitem = $jsonizer->encode($item);
my $dbh = $self->_dbh;
my $sth = $dbh->prepare("INSERT INTO the_queue VALUES(?,?,?)");
my $z = _try(3, sub { $sth->execute($timestamp, $id, $jitem) } );
return $z;
}
sub _push {
my ($self,$tfactor,@items) = @_;
my (@deferred_items,$failed_items);
my $pushed = 0;
my $_DEBUG = $self->{debug} // $DEBUG;
if ($self->_end) {
carp "Forks::Queue: put call from process $$ ",
"after end call from process " . $self->{_end};
return 0;
}
my $limit = $self->{limit};
$limit = 9E9 if $self->{limit} <= 0;
my $dbh = $self->_dbh;
$dbh->begin_work;
my $stamp = Time::HiRes::time;
my $id = $self->_batch_id($stamp,$dbh);
while (@items && $self->_avail < $limit) {
my $item = shift @items;
$self->_add($item, $stamp, $id++);
$pushed++;
}
$dbh->commit;
if (@items > 0) {
@deferred_items = @items;
$failed_items = @deferred_items;
}
$self->_notify if $pushed;
if ($failed_items) {
if ($self->{on_limit} eq 'fail') {
carp "Forks::Queue: queue buffer is full ",
"and $failed_items items were not added";
} else {
$_DEBUG && print STDERR "$$ $failed_items on put. ",
"Waiting for capacity\n";
$self->_wait_for_capacity;
$_DEBUG && print STDERR "$$ got some capacity\n";
$pushed += $self->_push($tfactor,@deferred_items);
}
}
return $pushed;
}
sub _wait_for_item {
my $self = shift;
my $ready = 0;
do {
$ready = $self->_avail || $self->_end || $self->_expired;
sleep($Forks::Queue::SLEEP_INTERVAL || 1) if !$ready;
} while !$ready;
return $self->{avail};
}
sub _wait_for_capacity {
my $self = shift;
if ($self->{limit} <= 0) {
return 9E9;
}
my $ready = 0;
my $count = @_ ? shift : 1;
while (!$ready) {
last if $self->_avail + $count <= $self->{limit};
last if $self->_end;
sleep($Forks::Queue::SLEEP_INTERVAL || 1);
}
return $self->{avail} + $count <= $self->{limit};
}
sub _batch_id {
my ($self,$stamp,$dbh) = @_;
$dbh ||= $self->_dbh;
my $sth = $dbh->prepare("SELECT MAX(batchid) FROM the_queue WHERE timestamp=?");
my $z = $sth->execute($stamp);
my $tt = $sth->fetchall_arrayref;
if (@$tt == 0) {
return 0;
} else {
return $tt->[0][0];
}
}
sub dequeue {
my $self = shift;
Forks::Queue::_validate_input($_[0], 'count', 1) if @_;
my $count = $_[0] || 1;
if ($self->limit > 0 && $count > $self->limit) {
croak "dequeue: exceeds queue size limit";
}
if ($self->{style} ne 'lifo') {
return @_ ? $self->_retrieve(-1,1,2,0,$_[0])
: $self->_retrieve(-1,1,2,0);
} else {
return @_ ? $self->_retrieve(+1,1,2,0,$_[0])
: $self->_retrieve(+1,1,2,0);
}
}
sub shift :method {
my $self = shift;
# purge, block
return @_ ? $self->_retrieve(-1,1,1,0,$_[0]) : $self->_retrieve(-1,1,1,0);
}
sub pop {
my $self = shift;
Forks::Queue::_validate_input($_[0], 'index', 1) if @_;
# purge, block
my @popped = $self->_retrieve(+1,1,1,0,$_[0] // 1);
return @_ ? reverse(@popped) : $popped[0];
}
sub shift_nb {
my $self = shift;
# purge, no block
return @_ ? $self->_retrieve(-1,1,0,0,$_[0]) : $self->_retrieve(-1,1,0,0);
}
sub pop_nb {
my $self = shift;
# purge, no block
my @popped = @_
? $self->_retrieve(+1,1,0,0,$_[0]) : $self->_retrieve(+1,1,0,0);
return @_ ? @popped : $popped[0];
return @popped;
}
sub extract {
my $self = shift;
Forks::Queue::_validate_input( $_[0], 'index' ) if @_;
my $index = shift || 0;
Forks::Queue::_validate_input( $_[0], 'count', 1) if @_;
my $count = $_[0] // 1;
my $reverse = 0;
my $tfactor = -1;
if ($self->{style} eq 'lifo') {
$tfactor = 1;
$reverse = 1;
}
if ($count <= 0) {
carp "Forks::Queue::extract: count must be positive";
return;
}
if ($index < 0) {
if ($index + $count > 0) {
$count = -$index;
}
$index = -$index - 1;
$index -= $count - 1;
$tfactor *= -1;
$reverse = !$reverse;
}
# purge, no block
my @items = $self->_retrieve( $tfactor, 1, 0, $index, $index+$count);
if ($reverse) {
@items = reverse(@items);
}
return @_ ? @items : $items[0] // ();
}
sub insert {
my ($self, $pos, @items) = @_;
Forks::Queue::_validate_input($pos,'index');
my (@deferred_items);
my $_DEBUG = $self->{debug} // $DEBUG;
my $inserted = 0;
if ($self->_end) {
carp "Forks::Queue: insert call from process $$ ",
"after end call from process " . $self->{_end} . "\n";
return 0;
}
my $limit = $self->{limit};
$limit = 9E9 if $self->{limit} <= 0;
if ($pos >= $self->_avail) {
if ($self->{on_limit} eq 'tq-compat') {
my $limit = $self->{limit};
$self->{limit} = 0;
my $enq = $self->enqueue(@items);
$self->{limit} = $limit;
return $enq;
} else {
return $self->put(@items);
}
}
if ($pos <= -$self->_avail) {
#return $self->unshift(@items);
$pos = 0;
}
if ($pos < 0) {
$pos += $self->_avail;
}
# find timestamps for items $pos and $pos+1
# choose 0+@items intermediate timestamps
# if $pos+1 is undef, use current time as timestamp
# as in the _push function, add items
my $dbh = $self->_dbh;
my $sths = $dbh->prepare(
"SELECT timestamp,batchid FROM the_queue ORDER BY timestamp,batchid LIMIT ?");
$dbh->begin_work;
my $z = $sths->execute($pos+1);
my $tt = $sths->fetchall_arrayref;
$DB::single = 1;
my ($t1,$t2,$b1,$b2);
if (@$tt > 0) {
$t2 = $tt->[-1][0];
$b2 = $tt->[-1][1];
} else {
$t2 = Time::HiRes::time();
$b2 = 0;
}
if (@$tt == $pos) {
$t1 = $t2;
$b1 = $b2;
$b2 = 0;
if ($t2 < 0) {
$t2 = -Time::HiRes::time();
} else {
$t2 = Time::HiRes::time();
}
} elsif ($pos == 0) {
$t1 = $t2 - 100000;
$b1 = 0;
} else {
$t1 = $tt->[-2][0];
$b1 = $tt->[-2][1];
}
my ($t3,$b3);
if ($t1 == $t2) {
my $sthr = $dbh->prepare("UPDATE the_queue SET batchid=batchid+?
WHERE timestamp=? AND batchid>=?");
$sthr->execute(0+@items,$t1,$b2);
$t3 = $t1;
$b3 = $b1+1;
} else {
$t3 = ($t1 + $t2) / 2;
$b3 = 0;
if ($t3 == $t1) {
$b3 = $b1+1;
}
}
if ($self->{on_limit} eq "tq-compat") {
for my $item (@items) {
$self->_add($item,$t3,$b3);
$inserted++;
$b3++;
}
@items = ();
} else {
while (@items && $self->_avail < $limit) {
my $item = shift @items;
_try(3, sub { $self->_add($item,$t3,$b3) });
$inserted++;
$b3++;
}
}
$dbh->commit;
if (@items > 0) {
@deferred_items = @items;
}
if (@deferred_items) {
if ($self->{on_limit} eq 'fail') {
carp "Forks::Queue: queue buffer is full and ",
0+@deferred_items," items were not inserted";
} else {
$_DEBUG && print STDERR "$$ ",0+@deferred_items, " on insert. ",
"Waiting for capacity\n";
$self->_wait_for_capacity;
$_DEBUG && print STDERR "$$ got some capacity\n";
$inserted += $self->insert($pos+$inserted,@deferred_items);
}
}
$self->_notify if $inserted;
return $inserted;
}
sub _retrieve {
my $self = shift;
my $tfactor = shift;
# tfactor = -1: select newest items first
# tfactor = +1: select oldest items first
my $purge = shift;
# purge = 0: do not delete items that we retrieve
# purge = 1: delete items that we retrieve
my $block = shift;
# block = 0: no block if queue is empty
# block = 1: block only if queue is empty
# block = 2: block if full request can not be fulfilled
my $lo = shift;
my $hi = @_ ? $_[0] : $lo+1;
return if $hi <= $lo;
# attempt to retrieve items $lo .. $hi and return them
# retrieved items are removed from the queue if $purge is set
# get newest items first if $tfactor > 0, oldest first if $tfactor < 0
# only block while
# $block is set
# zero items have been found
if ($lo > 0 && $block) {
carp __PACKAGE__, ": _retrieve() didn't expect block=$block and lo=$lo";
$block = 0;
}
my $order = $tfactor > 0
? "timestamp DESC,batchid DESC" : "timestamp,batchid";
my $dbh = $self->_dbh;
my $sths = $dbh->prepare(
"SELECT item,batchid,timestamp FROM the_queue
ORDER BY $order LIMIT ?");
my $sthd = $purge && $dbh->prepare(
"DELETE FROM the_queue WHERE item=? AND timestamp=? AND batchid=?");
my @return;
if (!$sths) {
warn "prepare queue SELECT statement failed: $dbh->errstr";
}
while (@return <= 0) {
my $limit = $hi - @return + ($lo < 0 ? $lo : 0);
$dbh->begin_work;
my $z = $sths && $sths->execute($limit);
my $tt = $sths && $sths->fetchall_arrayref;
if ($lo < 0 && -$lo > @$tt) {
$hi += (@$tt - $lo);
$lo += (@$tt - $lo);
}
if (!$tt || @$tt == 0) {
$dbh->rollback;
if ($block) {
$self->_wait_for_item;
next;
} else {
return;
}
} elsif ($block > 1 && $lo == 0 && @$tt < $hi) {
# not enough items on queue to satisfy request
$dbh->rollback;
next;
} elsif (@$tt <= $lo) {
# not enough items on queue to satisfy request
$dbh->rollback;
return;
}
$hi = @$tt if $hi > @$tt;
foreach my $itt ($lo .. $hi-1) {
if (!defined($tt->[$itt])) {
warn "\nResult $itt from $lo .. $hi-1 is undefined!";
}
my ($item,$bid,$timestamp) = @{$tt->[$itt]};
CORE::push @return, $jsonizer->decode($item);
if ($purge) {
my $zd = _try(4, sub { $sthd->execute($item,$timestamp,$bid)} );
if (!$zd) {
warn "Forks::Queue::SQLite: ",
"purge failed: $item,$timestamp,$bid";
}
}
}
$dbh->commit;
} continue {
if ($block) {
if ($self->_end || $self->_expired) {
$block = 0;
}
}
}
return @_ ? @return : $return[0] // ();
}
sub _pop {
my $self = shift;
my $tfactor = shift;
my $purge = shift;
my $block = shift;
my $wantarray = shift;
my ($count) = @_;
$count ||= 1;
my $order = "timestamp,batchid";
if ($tfactor > 0) {
$order = "timestamp DESC,batchid DESC";
}
my $dbh = $self->_dbh;
my $sths = $dbh->prepare(
"SELECT item,timestamp,pid FROM the_queue ORDER BY $order LIMIT ?");
my $sthd = $dbh->prepare(
"DELETE FROM the_queue WHERE item=? AND timestamp=? AND pid=?");
my @return = ();
while (@return == 0) {
my $limit = $count - @return;
my $z = $sths->execute($limit);
my $tt = $sths->fetchall_arrayref;
if (@$tt == 0) {
if ($block && $self->_wait_for_item) {
next;
} else {
last;
}
}
foreach my $t (@$tt) {
my ($item,$bid,$timestamp) = @$t;
CORE::push @return, $jsonizer->decode($item);
if ($purge) {
$dbh->begin_work;
my $zd = $sthd->execute($item,$timestamp,$bid);
if (!$zd) {
carp "purge failed: $item,$timestamp,$bid\n";
}
$dbh->commit;
}
}
}
return $wantarray ? @return : $return[0];
}
sub clear {
my $self = shift;
my $dbh = $self->_dbh;
$dbh->begin_work;
$dbh->do("DELETE FROM the_queue");
$dbh->commit;
}
sub peek_front {
my $self = shift;
my ($index) = @_;
$index ||= 0;
if ($index < 0) {
return $self->peek_back(-$index - 1);
}
# no purge, no block, always retrieve a single item
return $self->_retrieve(-1,0,0,$index);
}
sub peek_back {
my $self = shift;
my ($index) = @_;
$index ||= 0;
if ($index < 0) {
return $self->peek_front(-$index - 1);
}
# no purge, no block, always retrieve a single item
return $self->_retrieve(+1,0,0,$index);
}
sub _notify {
return unless $Forks::Queue::NOTIFY_OK;
my $self = shift;
my $dbh = $self->_dbh;
my $sth = $dbh->prepare("SELECT pid,tid FROM pids");
my $z = $sth->execute;
my $pt = $sth->fetchall_arrayref;
my @pids = map { $_->[0] } grep { $_->[0] != $$ } @$pt;
if (@pids) {
($self->{debug} // $DEBUG) && print STDERR "$$ notify: @pids\n";
kill 'IO', @pids;
}
my @tids = map { $_->[1] } grep { $_->[0] == $$ && $_->[1] != TID() } @$pt;
if (@tids) {
foreach my $tid (@tids) {
my $thr = threads->object($tid);
$thr && $thr->kill('IO');
}
}
}
my $id = 0;
sub _impute_file {
my $base = $0;
$base =~ s{.*[/\\](.)}{$1};
$base =~ s{[/\\]$}{};
$id++;
my @candidates;
if ($^O eq 'MSWin32') {
@candidates = (qw(C:/Temp C:/Windows/Temp));
} else {
@candidates = qw(/tmp /var/tmp);
}
for my $candidate ($ENV{FORKS_QUEUE_DIR},
$ENV{TMPDIR}, $ENV{TEMP},
$ENV{TMP}, @candidates,
$ENV{HOME}, ".") {
if (defined($candidate) && $candidate ne '' &&
-d $candidate && -w _ && -x _) {
return $candidate . "/fq-$$-$id-$base.sql3";
}
}
my $file = "./fq-$$-$id-$base.sql3";
carp __PACKAGE__, ": queue db file $file might not be a good location!";
return $file;
}
sub _DUMP {
my ($self,$fh_dump) = @_;
my $dbh = $self->_dbh;
$fh_dump ||= *STDERR;
my $sth = $dbh->prepare("SELECT * FROM pids");
my $z = $sth->execute;
print {$fh_dump} "\n\n=== pids ===\n------------\n";
foreach my $r (@{$sth->fetchall_arrayref}) {
print {$fh_dump} join("\t",@$r),"\n";
}
$sth = $dbh->prepare("SELECT * FROM status");
$z = $sth->execute;
print {$fh_dump} "\n\n=== status ===\n--------------\n";
foreach my $r (@{$sth->fetchall_arrayref}) {
print {$fh_dump} join("\t",@$r),"\n";
}
$sth = $dbh->prepare("SELECT * FROM the_queue");
$z = $sth->execute;
print {$fh_dump} "\n\n=== queue ===\n-------------\n";
foreach my $r (@{$sth->fetchall_arrayref}) {
print {$fh_dump} join("\t",@$r),"\n";
}
print {$fh_dump} "\n\n";
}
1;
=head1 NAME
Forks::Queue::SQLite - SQLite-based implementation of Forks::Queue
=head1 VERSION
0.15
=head1 SYNOPSIS
my $q = Forks::Queue->new( impl => 'SQLite', db_file => "queue-file" );
$q->put( "job1" );
$q->put( { name => "job2", task => "do something", data => [42,19] } );
...
$q->end;
for my $w (1 .. $num_workers) {
if (fork() == 0) {
my $task;
while (defined($task = $q->get)) {
... perform task in child ...
}
exit;
}
}
=head1 DESCRIPTION
SQLite-based implementation of L<Forks::Queue|Forks::Queue>.
It requires the C<sqlite3> libraries and the L<DBD::SQLite|DBD::SQLite>
Perl module.
=head1 METHODS
See L<Forks::Queue> for an overview of the methods supported by
this C<Forks::Queue> implementation.
=head2 new
=head2 $queue = Forks::Queue::SQLite->new( %opts )
=head2 $queue = Forks::Queue->new( impl => 'SQLite', %opts )
The C<Forks::Queue::SQLite> constructor recognized the following
configuration options.
=over 4
=item * db_file
The name of the file to use to store queue data and metadata.
If omitted, a temporary filename is chosen.
=item * style
=item * limit
=item * on_limit
=item * join
=item * persist
See L<Forks::Queue/"new"> for descriptions of these options.
=back
=head1 LICENSE AND COPYRIGHT
Copyright (c) 2017-2019, Marty O'Brien.
This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself, either Perl version 5.10.1 or,
at your option, any later version of Perl 5 you may have available.
See http://dev.perl.org/licenses/ for more information.
=cut