App-MonM-Notifier/lib/App/MonM/Notifier/Store.pm
package App::MonM::Notifier::Store; # $Id: Store.pm 81 2022-09-16 10:21:57Z abalama $
use strict;
use utf8;
=encoding utf8
=head1 NAME
App::MonM::Notifier::Store - monotifier store class
=head1 VERSION
Version 1.02
=head1 SYNOPSIS
use App::MonM::Notifier::Store;
my $store = App::MonM::Notifier::Store->new(
dsn => "DBI:mysql:database=monotifier;host=mysql.example.com",
user => "username",
password => "password",
set => [
"RaiseError 0",
"PrintError 0",
"mysql_enable_utf8 1",
],
expires => 3600*24*7,
maxtime => 300,
);
die($store->error) if $store->error;
=head1 DESCRIPTION
DBI interface for monotifier store. This module provides store methods
=head2 new
my $store = App::MonM::Notifier::Store->new(
dsn => "DBI:mysql:database=monotifier;host=mysql.example.com",
user => "username",
password => "password",
set => [
"RaiseError 0",
"PrintError 0",
"mysql_enable_utf8 1",
],
expires => 3600*24*7,
maxtime => 300,
);
Creates DBI object
=over 8
=item B<expires>
Time in seconds of life of database record
=item B<maxtime>
Max time in seconds to sending one message
=back
=head2 cleanup
my $st = $store->cleanup;
Removes permanently queue entities based on how old they are
=head2 dequeue
my $st = $store->dequeue(
id => 1,
);
Dequeues the element by setting success status (STATUS_SENT)
=head2 delById
$store->delById($id) or die($store->error);
Delete record by id
=head2 dsn
my $dsn = $store->dsn;
Returns DSN string of current database connection
=head2 enqueue
$store->enqueue(
to => $user,
channel => $ch_name,
subject => $subject,
message => $message,
attributes => $ch, # Channel attributes
) or die($store->error);
Adds a new element at the end of the current queue
and returns queue element ID
=head2 error
my $error = $store->error;
Returns error message
my $error = $store->error( "Error message" );
Sets error message if argument is provided.
=head2 getById
my %data = $store->getById($id);
Returns data from database by id
=head2 getAll
my @table = $store->getAll();
my @table_100 = $store->getAll(100);
Returns data from database with limit supporting
=head2 is_sqlite
print $store->is_sqlite ? "Is SQLite" : "Is not SQLite"
Returns true if type of current database is SQLite
=head2 ping
$store->ping ? 'OK' : 'Database session is expired';
Checks the connection to database
=head2 requeue
my $st = $store->requeue(
id => 1,
code => 2,
error => "My Error",
);
Requeue entities that have been retrieved for processing early; sets status to STATUS_FAIL
=head2 retrieve
my $entity = $store->retrieve(STATUS_FAIL);
Retrieves the next entity from the queue and returns it as hashref
or undef if no entity
=head2 serializer
my $serializer = $store->serializer;
Returns serializer object
=head2 purge
$store->purge or die($store->error);
Delete all records
=head1 DDL
CREATE TABLE IF NOT EXISTS monotifier (
`id` INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL UNIQUE,
`to` CHAR(255), -- Recipient name
`channel` CHAR(255), -- Recipient channel
`subject` TEXT, -- Message subject
`message` TEXT, -- Message content (BASE64)
`attributes` TEXT, -- Message attributes (JSON)
`published` BIGINT(20), -- The publication time (unixtime)
`scheduled` BIGINT(20), -- The scheduled time (unixtime)
`expired` BIGINT(20), -- The expiration time (unixtime)
`sent` BIGINT(20), -- The send time
`attempt` INTEGER DEFAULT 0, -- Count of failed attempts
`status` CHAR(32), -- Status of transaction
`errcode` INT(11), -- Error code
`errmsg` TEXT -- Error message
)
=head1 ERRORCODES
0 -- No errors found
1 -- Error of the notifier level (notify method)
2 -- Error of the notifier level (remind method)
255 -- Error of the cleanup level
=head1 SEE ALSO
L<CTK::DBI>, L<App::MonM>
=head1 AUTHOR
Serż Minus (Sergey Lepenkov) L<https://www.serzik.com> E<lt>abalama@cpan.orgE<gt>
=head1 COPYRIGHT
Copyright (C) 1998-2022 D&D Corporation. All Rights Reserved
=head1 LICENSE
This program is free software; you can redistribute it and/or
modify it under the same terms as Perl itself.
See C<LICENSE> file and L<https://dev.perl.org/licenses/>
=cut
use vars qw/$VERSION/;
$VERSION = '1.02';
use File::Spec;
use MIME::Base64 qw/encode_base64 decode_base64/;
use CTK::DBI;
use CTK::Util qw/ read_attributes touch /;
use CTK::ConfGenUtil;
use CTK::TFVals qw/ :ALL /;
use CTK::Serializer;
use App::MonM::Const;
use App::MonM::Util qw/ set2attr /;
use constant {
EXPIRES => 30*24*60*60, # 30 days max (how time to hold of messages)
MAXTIME => 300, # 5 min
JSON_ATTRS => [
{ # For serialize
utf8 => 0,
pretty => 1,
allow_nonref => 1,
allow_blessed => 1,
},
{ # For deserialize
utf8 => 0,
allow_nonref => 1,
allow_blessed => 1,
},
],
# Database
DB_FILENAME_NASK => 'monotifier-%s.db', # username
DEFAULT_DSN_MASK => 'dbi:SQLite:dbname=%s',
DEFAULT_DBI_ATTR => {
dsn => '', # See DEFAULT_DSN_MASK
user => '',
password => '',
set => [
'RaiseError 0',
'PrintError 0',
'sqlite_unicode 1',
],
},
# Statuses
STATUS_NEW => 'NEW',
STATUS_BUSY => 'BUSY',
STATUS_FAIL => 'FAIL', # See Attempt
STATUS_SENT => 'SENT',
};
use constant MONOTIFIER_DDL => <<'DDL';
CREATE TABLE IF NOT EXISTS monotifier (
`id` INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL UNIQUE,
`to` CHAR(255), -- Recipient name
`channel` CHAR(255), -- Recipient channel
`subject` TEXT, -- Message subject
`message` TEXT, -- Message content (BASE64)
`attributes` TEXT, -- Message attributes (JSON)
`published` BIGINT(20), -- The publication time (unixtime)
`scheduled` BIGINT(20), -- The scheduled time (unixtime)
`expired` BIGINT(20), -- The expiration time (unixtime)
`sent` BIGINT(20), -- The send time
`attempt` INTEGER DEFAULT 0, -- Count of failed attempts
`status` CHAR(32), -- Status of transaction
`errcode` INT(11), -- Error code
`errmsg` TEXT -- Error message
)
DDL
use constant MONOTIFIER_ADD => <<'DML';
INSERT INTO monotifier
(`to`,`channel`,`subject`,`message`,`attributes`,`published`,`scheduled`,`expired`,`sent`,`attempt`,`status`,`errcode`,`errmsg`)
VALUES
(?,?,?,?,?,?,?,?,?,?,?,?,?)
DML
use constant MONOTIFIER_GET_NEXT => <<'DML';
SELECT `id`,`to`,`channel`,`subject`,`message`,`attributes`,`published`,`scheduled`,`expired`,`sent`,`attempt`,`status`,`errcode`,`errmsg`
FROM `monotifier`
WHERE `scheduled` <= ? AND `status` = ?
LIMIT 1
DML
use constant MONOTIFIER_UPDATE_STATUS => <<'DML';
UPDATE `monotifier`
SET `status` = ?, `scheduled` = ?, `sent` = ?, `attempt` = ?, `errcode` = ?, `errmsg` = ?
WHERE `id` = ?
DML
use constant MONOTIFIER_UPDATE_ERROR => <<'DML';
UPDATE `monotifier`
SET `status` = ?, `errcode` = ?, `errmsg` = ?
WHERE `id` = ?
DML
use constant MONOTIFIER_CLEANUP => <<'DML';
DELETE FROM `monotifier`
WHERE `expired` <= ?
DML
use constant MONOTIFIER_FLUSH => <<'DML';
UPDATE `monotifier`
SET `status` = ?, `errcode` = ?, `errmsg` = ?
WHERE (`status` = ? OR `status` = ?) AND `scheduled` < ?
DML
use constant MONOTIFIER_PURGE => <<'DML';
DELETE FROM monotifier
DML
use constant MONOTIFIER_GET_ALL => <<'DML';
SELECT `id`,`to`,`channel`,`subject`,`message`,`attributes`,`published`,`scheduled`,`expired`,`sent`,`attempt`,`status`,`errcode`,`errmsg`
FROM monotifier
ORDER BY `id` DESC
DML
use constant MONOTIFIER_GET_BY_ID => <<'DML';
SELECT `id`,`to`,`channel`,`subject`,`message`,`attributes`,`published`,`scheduled`,`expired`,`sent`,`attempt`,`status`,`errcode`,`errmsg`
FROM monotifier
WHERE `id` = ?
DML
use constant MONOTIFIER_DEL_BY_ID => <<'DML';
DELETE FROM monotifier WHERE `id` = ?
DML
sub new {
my $class = shift;
my %args = @_;
unless ($args{dsn}) {
my $dda = DEFAULT_DBI_ATTR;
foreach (%$dda) {
$args{$_} //= $dda->{$_}
}
}
my $username = getlogin() || (getpwuid($>))[0] || $ENV{LOGNAME} || $ENV{USER} || "anonymous";
my $filename = sprintf(DB_FILENAME_NASK, $username);
my $file = $args{file} || File::Spec->catfile(File::Spec->tmpdir(), $filename);
my $dsn = $args{dsn} || sprintf(DEFAULT_DSN_MASK, $file);
# DB
my $db = CTK::DBI->new(
-dsn => $dsn,
-debug => 0,
-username => $args{'user'},
-password => $args{'password'},
-attr => set2attr($args{'set'}),
$args{timeout} ? (
-timeout_connect => $args{timeout},
-timeout_request => $args{timeout},
) : (),
);
my $dbh = $db->connect if $db;
# SQLite
my $fnew = 0;
my $issqlite = 0;
if ($dbh && $dsn =~ /SQLite/i) {
$file = $dbh->sqlite_db_filename();
unless ($file && (-e $file) && !(-z $file)) {
touch($file);
chmod(0666, $file);
$fnew = 1;
}
$issqlite = 1;
}
# Errors
my $error = "";
if (!$db) {
$error = sprintf("Can't init database \"%s\"", $dsn);
} elsif (!$dbh) {
$error = sprintf("Can't connect to database \"%s\": %s", $dsn, $DBI::errstr || "unknown error");
} elsif ($fnew) {
$db->execute(MONOTIFIER_DDL);
$error = $dbh->errstr() if $dbh->err;
}
unless ($error) {
$error = sprintf("Can't init database \"%s\". Ping failed: %s",
$dsn, $dbh->errstr() || "unknown error") unless $dbh->ping;
}
my $self = bless {
file => $file,
issqlite=> $issqlite,
dsn => $dsn,
error => $error,
dbi => $db,
expires => $args{expires} || EXPIRES,
maxtime => $args{maxtime} || MAXTIME,
serializer => CTK::Serializer->new('json', attrs => { json => JSON_ATTRS }),
}, $class;
return $self;
}
sub error {
my $self = shift;
my $err = shift;
return $self->{error} unless defined $err;
$self->{error} = $err;
return $self->{error};
}
sub ping {
my $self = shift;
return 0 unless $self->{dsn};
my $dbi = $self->{dbi};
return 0 unless $dbi;
my $dbh = $dbi->{dbh};
return 0 unless $dbh;
return 1 unless $dbh->can('ping');
return $dbh->ping();
}
sub dsn {
my $self = shift;
return $self->{dsn};
}
sub serializer {
my $self = shift;
return $self->{serializer};
}
sub is_sqlite {
my $self = shift;
return $self->{issqlite} ? 1 : 0;
}
# CRUD Methods
sub getAll {
my $self = shift;
my $limit = shift || 0;
return () unless $self->ping;
$self->error("");
my $dbi = $self->{dbi};
my @tbl = $dbi->table(sprintf("%s%s", MONOTIFIER_GET_ALL, $limit ? " LIMIT $limit" : "" ));
if ($dbi->connect->err) {
$self->error(sprintf("Can't get records: %s", uv2null($dbi->connect->errstr)));
return ();
}
return @tbl;
}
sub getById {
my $self = shift;
my $id = shift || 0;
my $dbi = $self->{dbi};
$self->error("");
my %rec = $dbi->recordh(MONOTIFIER_GET_BY_ID, $id);
if ($dbi->connect->err) {
$self->error(sprintf("Can't get record: %s", uv2null($dbi->connect->errstr)));
return ();
}
if (defined($rec{message}) && length($rec{message})) {
$rec{message} = decode_base64($rec{message});
}
if (defined($rec{attributes}) && length($rec{attributes})) {
$rec{attributes} = $self->serializer->deserialize($rec{attributes});
unless ($self->serializer->status) {
$self->error(sprintf("Can't deserialize channel attributes: %s", uv2null($self->serializer->error)));
return ();
}
}
return %rec;
}
sub delById {
my $self = shift;
my $id = shift || 0;
my $dbi = $self->{dbi};
$self->error("");
$dbi->execute(MONOTIFIER_DEL_BY_ID, $id);
if ($dbi->connect->err) {
$self->error(sprintf("Can't delete record: %s", uv2null($dbi->connect->errstr)));
return 0;
}
return 1;
}
# Queue methods
sub enqueue { # Set STATUS_NEW
my $self = shift;
my ($to, $ch_name, $ch_attr, $subject, $message) =
read_attributes([
[qw/TO USER USERNAME RECIPIENT/],
[qw/NAME CHANNEL CH_NAME/],
[qw/ATTR ATTRS ATTRIBUTES CH_ATTR CH_ATTRS/],
[qw/SUBJECT SUBJ SBJ/],
[qw/MESSAGE MSG/],
], @_);
return 0 unless $self->ping;
$self->error("");
my $dbi = $self->{dbi};
# Add new record
my $now = time();
my $json = $self->serializer->serialize($ch_attr);
unless ($self->serializer->status) {
$self->error(sprintf("Can't serialize channel attributes: %s", uv2null($self->serializer->error)));
return 0;
}
# Add new record
$dbi->execute(MONOTIFIER_ADD,
$to, $ch_name, $subject, encode_base64($message), $json,
$now, # published
$now, # scheduled
($now + $self->{expires}), # expired
undef, # sent
0, # attempt
STATUS_NEW, # status
undef, # errcode
undef, # errmsg
);
if ($dbi->connect->err) {
$self->error(sprintf("Can't insert new record: %s", uv2null($dbi->connect->errstr)));
return 0;
}
# Get ID
my $id = $self->{issqlite}
? $dbi->connect->sqlite_last_insert_rowid()
: $dbi->connect->last_insert_id();
return $id || 0;
}
sub retrieve { # Set STATUS_BUSY
my $self = shift;
my ($status) =
read_attributes([
[qw/STATUS REQUIRE REQ/],
], @_);
return unless $self->ping;
$self->error("");
my $dbi = $self->{dbi};
# status == ? || STATUS_FAIL; scheduled <= now();
my $now = time();
my %rec = $dbi->recordh(MONOTIFIER_GET_NEXT, $now, $status || STATUS_FAIL);
if ($dbi->connect->err) {
$self->error(sprintf("Can't get record: %s", uv2null($dbi->connect->errstr)));
return;
}
return unless %rec;
# Set status to STATUS_BUSY
my $attempt = $rec{attempt} || 0;
$dbi->execute(MONOTIFIER_UPDATE_STATUS,
STATUS_BUSY, # status
$now + _sheduled_calc($attempt), # scheduled
undef, # sent
++$attempt, # attempt (new)
undef, # errcode
undef, # errmsg
$rec{id} || 0
);
if ($dbi->connect->err) {
$self->error(sprintf("Can't change status: %s", uv2null($dbi->connect->errstr)));
return;
}
if (defined($rec{message}) && length($rec{message})) {
$rec{message} = decode_base64($rec{message});
}
if (defined($rec{attributes}) && length($rec{attributes})) {
$rec{attributes} = $self->serializer->deserialize($rec{attributes});
unless ($self->serializer->status) {
$self->error(sprintf("Can't deserialize channel attributes: %s", uv2null($self->serializer->error)));
return;
}
}
return {%rec};
}
sub requeue { # Set STATUS_FAIL
my $self = shift;
my ($id, $code, $error) =
read_attributes([
[qw/ID/],
[qw/CODE ERRCODEE ERR_CODE/],
[qw/ERROR ERRMESSAGE ERRMSG ERR_MESSAGE ERR_MSG/],
], @_);
return 0 unless $self->ping;
$self->error("");
my $dbi = $self->{dbi};
# Set status
$dbi->execute(MONOTIFIER_UPDATE_ERROR,
STATUS_FAIL, # status
$code, # errcode
$error, # errmsg
$id || 0
);
if ($dbi->connect->err) {
$self->error(sprintf("Can't update record: %s", uv2null($dbi->connect->errstr)));
return 0;
}
return 1;
}
sub dequeue { # Set STATUS_SENT
my $self = shift;
my ($id) =
read_attributes([
[qw/ID/],
], @_);
return 0 unless $self->ping;
$self->error("");
my $dbi = $self->{dbi};
# Set status to STATUS_SENT
$dbi->execute(MONOTIFIER_UPDATE_STATUS,
STATUS_SENT, # status
undef, # scheduled
time(), # sent
0, # attempt
undef, # errcode
undef, # errmsg
$id || 0
);
if ($dbi->connect->err) {
$self->error(sprintf("Can't change status: %s", uv2null($dbi->connect->errstr)));
return;
}
return $id;
}
sub cleanup { # Delete too old records by expired field
my $self = shift;
return 0 unless $self->ping;
$self->error("");
my $dbi = $self->{dbi};
# CleanUp (by expired)
my $now = time();
$dbi->execute(MONOTIFIER_CLEANUP, $now);
if ($dbi->connect->err) {
$self->error(sprintf("Can't delete records (cleanup): %s", uv2null($dbi->connect->errstr)));
return 0;
}
# CleanUp (by maxtime)
my $maxtime = $self->{maxtime} || MAXTIME;
$dbi->execute(MONOTIFIER_FLUSH,
STATUS_FAIL, # status
255, # errcode (Cleanup level)
"Sending the message is taking too long!", # errmsg
STATUS_BUSY, STATUS_NEW,
$now - $maxtime,
);
if ($dbi->connect->err) {
$self->error(sprintf("Can't update records (cleanup): %s", uv2null($dbi->connect->errstr)));
return 0;
}
return 1;
}
sub purge {
my $self = shift;
return 0 unless $self->ping;
$self->error("");
my $dbi = $self->{dbi};
$dbi->execute(MONOTIFIER_PURGE);
if ($dbi->connect->err) {
$self->error(sprintf("Can't purge table: %s", uv2null($dbi->connect->errstr)));
return 0;
}
return 1;
}
sub _sheduled_calc {
my $t = shift; # Attempt number
if ($t >= 0 and $t < 5) { return 60 } # 1 min per 5 min (5 times)
elsif ($t >= 5 and $t < 7) { return 60*5 } # 5 min per 15 min (2 times)
elsif ($t >= 7 and $t < 10) { return 60*15 } # 15 min per 1 hour (3 times)
elsif ($t >= 10 and $t < 33) { return 60*60 } # 1 hour per day (23 times)
elsif ($t >= 33 and $t < 39) { return 60*60*24 } # 1 day per week (6 times)
elsif ($t >= 39 and $t < 42) { return 60*60*24*7 } # 1 week per month (3 times)
return 60*60*24*30; # every 1 month
}
1;
__END__