Group
Extension

Minion-Backend-Redis/lib/Minion/Backend/Redis.pm

package Minion::Backend::Redis;
use Mojo::Base 'Minion::Backend';

use Carp 'croak';
use Digest::SHA 'sha256_base64';
use List::Util 'any';
use Mojo::IOLoop;
use Mojo::JSON qw(from_json to_json);
use Mojo::Redis;
use Mojo::Util 'encode';
use Sort::Versions 'versioncmp';
use Sys::Hostname 'hostname';
use Time::HiRes 'time';

use Data::Dumper;

our $VERSION = '0.003';

has 'redis';

sub new {
    my $self = shift->SUPER::new( redis => Mojo::Redis->new(@_) );

    my $redis_version =
      $self->redis->db->info_structured('server')->{redis_version};
    croak 'Redis Server 2.8.0 or later is required'
      if versioncmp( $redis_version, '2.8.0' ) == -1;

    return $self;
}

sub broadcast {
    my ( $self, $command, $args, $ids ) =
      ( shift, shift, shift || [], shift || [] );
    my $item = to_json( [ $command, @$args ] );
    my %worker_ids =
      map { ( $_ => 1 ) } @{ $self->redis->db->smembers('minion.workers') };
    $ids =
      @$ids
      ? [ grep { exists $worker_ids{$_} } @$ids ]
      : [ keys %worker_ids ];
    my $redis = $self->redis->db;
    $redis->multi;
    $redis->rpush( "minion.worker.$_.inbox", $item ) for @$ids;
    $redis->exec;
    return !!@$ids;
}

sub dequeue {
    my ( $self, $id, $wait, $options ) = @_;

    if ( ( my $job = $self->_try( $id, $options ) ) ) { return $job }
    return undef if Mojo::IOLoop->is_running;

    my $pubsub = $self->redis->pubsub;
    $pubsub->listen( "minion.job" => sub { Mojo::IOLoop->stop } );
    my $timer = Mojo::IOLoop->timer( $wait => sub { Mojo::IOLoop->stop } );
    Mojo::IOLoop->start;
    $pubsub->unlisten('minion.job') and Mojo::IOLoop->remove($timer);
    undef $pubsub;

    return $self->_try( $id, $options );
}

sub enqueue {
    my ( $self, $task, $args, $options ) =
      ( shift, shift, shift || [], shift || {} );

    my $id = $self->redis->db->incr('minion.last_job_id');

    my %notes = %{ $options->{notes} || {} };
    $_ = to_json($_) for values %notes;
    my $parents  = $options->{parents} || [];
    my $queue    = $options->{queue} // 'default';
    my $attempts = $options->{attempts} // 1;
    my $priority = $options->{priority} // 0;
    my $now      = time;
    my $delayed  = $now + ( $options->{delay} // 0 );

    my $redis = $self->redis->db;
    $redis->multi;
    $redis->hmset(
        "minion.job.$id",
        id       => $id,
        args     => to_json($args),
        attempts => $attempts,
        created  => $now,
        delayed  => $delayed,
        parents  => to_json($parents),
        priority => $priority,
        queue    => $queue,
        retries  => 0,
        state    => 'inactive',
        task     => $task,
    );

    $redis->hmset( "minion.job.$id.notes", %notes ) if %notes;
    $redis->sadd( "minion.job.$id.parents", @$parents ) if @$parents;
    $redis->sadd( "minion.job.$_.children", $id ) for @$parents;

    $redis->sadd( "minion.job_queue.$queue",                 $id );
    $redis->sadd( 'minion.job_state.inactive',               $id );
    $redis->sadd( 'minion.job_state.inactive,active,failed', $id );
    $redis->sadd( "minion.job_task.$task",                   $id );
    $redis->sadd( 'minion.jobs',                             $id );

    my $alphaid = sprintf '%012d', $id;
    $redis->zadd( "minion.inactive_job_queue.$queue",
        ( 0 - $priority ) => $alphaid );
    $redis->zadd( "minion.inactive_job_task.$task", 0        => $alphaid );
    $redis->zadd( 'minion.inactive_job_delayed',    $delayed => $id );

    $redis->exec;

    $self->_notify_job if $delayed <= $now;

    return $id;
}

sub fail_job   { shift->_update( 1, @_ ) }
sub finish_job { shift->_update( 0, @_ ) }

sub history {
    my $self = shift;

    my $db = $self->redis->db;

    # TODO: Not implemented.
    my @daily_ordered = [];
    return { daily => \@daily_ordered };
}

sub list_jobs {
    my ( $self, $offset, $limit, $options ) = @_;

    my $redis = $self->redis->db;
    $redis->multi;
    my @sets = (
        'minion.jobs',
        map    { "minion.job_$_.$options->{$_}" }
          grep { defined $options->{$_} } qw(queue state task)
    );
    if ( defined( my $ids = $options->{ids} ) ) {
        my $key = 'minion.temp.jobs.' . join( ',', @$ids );
        $redis->del($key);
        $redis->sadd( $key, @$ids ) if @$ids;
        $redis->expire( $key, 60 );
        push @sets, $key;
    }

    if ( defined( my $notes = $options->{notes} ) ) {
        croak 'Listing jobs by existence of notes is unimplemented';
    }

    my $jobs_hash = sha256_base64( join '$', $$, time );
    my $jobs_key  = "minion.temp.list_jobs.$jobs_hash";
    $redis->sinterstore( $jobs_key, @sets );
    $redis->expire( $jobs_key, 60 );
    $redis->exec;

    my $job_ids = $redis->sort( $jobs_key, LIMIT => $offset, $limit, 'DESC' );
    my $total   = $redis->scard($jobs_key);

    my @jobs;
    foreach my $id (@$job_ids) {

        my %job_info = %{ $redis->hgetall("minion.job.$id") };
        my $children = $redis->smembers("minion.job.$id.children");
        my %notes    = %{ $redis->hgetall("minion.job.$id.notes") };
        $_ = from_json($_) for values %notes;

        if ( defined( my $states = $options->{states} ) ) {
            unless ( grep { $_ eq $job_info{state} } @$states ) {
                next;
            }
        }

        push @jobs,
          {
            id       => $job_info{id},
            args     => from_json( $job_info{args} // 'null' ),
            attempts => $job_info{attempts},
            children => $children,
            created  => $job_info{created},
            delayed  => $job_info{delayed},
            finished => $job_info{finished},
            notes    => \%notes,
            parents  => from_json( $job_info{parents} // 'null' ),
            priority => $job_info{priority},
            queue    => $job_info{queue},
            result   => from_json( $job_info{result} // 'null' ),
            retried  => $job_info{retried},
            retries  => $job_info{retries},
            started  => $job_info{started},
            state    => $job_info{state},
            task     => $job_info{task},
            worker   => $job_info{worker},
          };
    }
    return { jobs => \@jobs, total => $total };
}

sub list_locks {
    my ( $self, $offset, $limit, $options ) = @_;

    my $redis = $self->redis->db;
    my $keys  = $redis->keys('minion.lock.*');
    my @locks;
    foreach my $name (@$keys) {

        my $jobname = substr( $name, 12 );

        # Filter out the job if the name is wrong
        if ( defined( my $names = $options->{names} ) ) {
            my %split_names = map { $_ => 1 } @$names;
            unless ( exists( $split_names{$jobname} ) ) { next; }
        }

        my %named_locks =
          @{ $redis->zrangebyscore( $name, '-inf', '+inf', 'WITHSCORES' ) };

        foreach my $lock_id ( keys %named_locks ) {

            # Craft lock info hashes by hand.
            push @locks,
              {
                name    => $jobname,
                expires => $named_locks{$lock_id}
              };
        }
    }

    # Reorder locks by expiration date
    @locks = sort { $a->{expires} cmp $b->{expires} } @locks;
    my $total = scalar @locks;

    if ( $offset > 0 ) {
        splice( @locks, 0, $offset );
    }

    if ( $limit > 0 ) {
        splice( @locks, $limit );
    }

    return { locks => \@locks, total => $total };
}

sub list_workers {
    my ( $self, $offset, $limit, $options ) = @_;

    my $worker_ids = $self->redis->db->sort(
        'minion.workers',
        LIMIT => $offset,
        $limit, 'DESC'
    );
    my $total = $self->redis->db->scard('minion.workers');

    my @workers;
    foreach my $id (@$worker_ids) {

        my %worker_info = %{ $self->redis->db->hgetall("minion.worker.$id") };

        my $notified =
          $self->redis->db->zscore( 'minion.worker_notified', $id );
        my $jobs = $self->redis->db->sinter( "minion.worker.$id.jobs",
            'minion.job_state.active' );

        push @workers,
          {
            id       => $worker_info{id},
            notified => $notified,
            jobs     => $jobs,
            host     => $worker_info{host},
            pid      => $worker_info{pid},
            status   => from_json( $worker_info{status} // 'null' ),
            started  => $worker_info{started},
          };
    }

    return { total => $total, workers => \@workers };
}

sub lock {
    my ( $self, $name, $duration, $options ) =
      ( shift, shift, shift, shift // {} );
    $self->redis->db->zremrangebyscore( "minion.lock.$name", '-inf',
        '(' . time );
    my $redis = $self->redis->db;
    my $locks = $redis->zcard("minion.lock.$name");

    $redis->watch("minion.lock.$name");
    $redis->multi;
    return !!0 if $locks >= ( $options->{limit} || 1 );

    if ( defined $duration and $duration > 0 ) {
        $self->redis->db->incr('minion.last_lock_id');
        my @lock_id = @{ $redis->exec };
        $redis->zadd( "minion.lock.$name", time + $duration, $lock_id[0] );
    }
    return !!1;
}

sub note {
    my ( $self, $id, $merge ) = @_;
    my $redis = $self->redis->db;
    return !!0 unless $redis->exists("minion.job.$id");
    $redis->watch("minion.job.$id");
    $redis->multi;

    foreach my $key ( keys %$merge ) {

        croak qq{Invalid note key '$key'; must not contain '.', '[', or ']'}
          if $key =~ m/[\[\].]/;

        if ( defined $merge->{$key} ) {
            $redis->hset( "minion.job.$id.notes", $key,
                to_json( $merge->{$key} ) );
        }
        else {
            $redis->hdel( "minion.job.$id.notes", $key );
        }
    }

    $redis->exec;
    return !!1;
}

sub receive {
    my ( $self, $id ) = @_;
    my $redis = $self->redis->db;
    my $items = $redis->lrange( "minion.worker.$id.inbox", 0, -1 );
    $redis->watch("minion.worker.$id.inbox");
    $redis->multi;
    $redis->del("minion.worker.$id.inbox");
    $redis->exec;
    return [ map { from_json($_) } @$items ];
}

sub register_worker {
    my ( $self, $id, $options ) = ( shift, shift, shift || {} );

    $id //= $self->redis->db->incr('minion.last_worker_id');

    my $now = time;

    my $redis = $self->redis->db;
    $redis->multi;
    $redis->hmset(
        "minion.worker.$id",
        id     => $id,
        status => to_json( $options->{status} // {} ),
    );
    $redis->hsetnx( "minion.worker.$id", host => $self->{host} //= hostname );
    $redis->hsetnx( "minion.worker.$id", pid  => $$ );
    $redis->hsetnx( "minion.worker.$id", started => $now );
    $redis->zadd( 'minion.worker_notified', $now => $id );
    $redis->sadd( 'minion.workers', $id );
    $redis->exec;

    return $id;
}

sub remove_job {
    my ( $self, $id ) = @_;

    my $redis = $self->redis->db;
    my ( $queue, $state, $task, $worker ) =
      @{ $redis->hmget( "minion.job.$id", qw(queue state task worker) ) };
    return !!0
      unless defined $state
      and ($state eq 'inactive'
        or $state eq 'failed'
        or $state eq 'finished' );

    $redis->watch("minion.job.$id");
    $redis->multi;

    _delete_job( $redis, $id, $queue, $state, $task, $worker );
    $redis->exec;

    return 1;
}

sub repair {
    my $self = shift;

    # Workers without heartbeat
    my $redis  = $self->redis->db;
    my $minion = $self->minion;
    $redis->watch('minion.worker_notified');
    my $missing = $redis->zrangebyscore( 'minion.worker_notified',
        '-inf', '(' . ( time - $minion->missing_after ) );
    _delete_worker( $redis, $_ ) for @$missing;

    # Jobs with missing worker (can be retried)
    $redis->watch('minion.jobs_missing_worker');
    $redis->multi;

    $redis->sinter( 'minion.job_state.active', 'minion.jobs_missing_worker' );
    $redis->del('minion.jobs_missing_worker');
    my $orphaned_jobs = $redis->exec;

    # Duct tape to ensure we get a flat list of job IDs
    my @flattened_jobs = _flatten(@$orphaned_jobs);

    foreach my $id (@flattened_jobs) {
        my ( $queue, $retries ) =
          @{ $redis->hmget( "minion.job.$id", qw(queue retries) ) };

        if ( !$queue || $queue ne "minion_foreground" ) {
            $self->fail_job( $id, $retries, 'Worker went away' );
        }
    }

    # Old jobs with no unresolved dependencies
    my $old_jobs = $redis->zrangebyscore( 'minion.job_finished',
        '-inf', '(' . ( time - $minion->remove_after ) );
    foreach my $id (@$old_jobs) {
        my ( $queue, $state, $task, $worker ) =
          @{ $redis->hmget( "minion.job.$id", qw(queue state task worker) ) };
        next
          if @{
            $redis->sdiff( "minion.job.$id.children",
                'minion.job_state.finished' )
          };
        $redis->watch( "minion.job.$id", "minion.job.$id.children" );
        $redis->multi;
        _delete_job( $redis, $id, $queue, $state, $task, $worker );
        $redis->exec;
    }
}

sub reset {
    my ($self) = @_;
    my $redis = $self->redis->db;
    $redis->watch( 'minion.jobs', 'minion.workers' );
    my $keys = $redis->keys('minion.*');
    $redis->multi;
    $redis->del(@$keys) if @$keys;
    $redis->exec;
}

sub retry_job {
    my ( $self, $id, $retries, $options ) =
      ( shift, shift, shift, shift || {} );

    my $now = time;
    my %set;
    $set{attempts} = $options->{attempts} if defined $options->{attempts};
    $set{delayed}  = my $delayed = $now + ( $options->{delay} // 0 );
    $set{priority} = $options->{priority} if defined $options->{priority};
    $set{retried}  = $now;

    my $redis = $self->redis->db;
    $redis->watch("minion.job.$id");
    my ( $curr_queue, $curr_priority, $curr_retries, $curr_state, $task ) = @{
        $self->redis->db->hmget( "minion.job.$id",
            qw(queue priority retries state task) )
    };
    return !!0 unless defined $curr_retries and $curr_retries == $retries;

    $redis->multi;
    $redis->hmset( "minion.job.$id", %set );
    $redis->hincrby( "minion.job.$id", retries => 1 );

    my $alphaid = sprintf '%012d', $id;
    if ( defined $options->{queue} ) {
        $redis->hset( "minion.job.$id", queue => $options->{queue} );
        $redis->srem( "minion.job_queue.$curr_queue", $id );
        $redis->sadd( "minion.job_queue.$options->{queue}", $id );
        $redis->zrem( "minion.inactive_job_queue.$curr_queue", $alphaid );
    }

    $redis->hset( "minion.job.$id", state => 'inactive' );
    $redis->srem( "minion.job_state.$curr_state", $id );
    $redis->sadd( 'minion.job_state.inactive',               $id );
    $redis->sadd( 'minion.job_state.inactive,active,failed', $id );

    my $priority = $options->{priority} // $curr_priority;
    my $queue    = $options->{queue}    // $curr_queue;
    $redis->zadd( "minion.inactive_job_queue.$queue",
        ( 0 - $priority ) => $alphaid );
    $redis->zadd( "minion.inactive_job_task.$task", 0        => $alphaid );
    $redis->zadd( 'minion.inactive_job_delayed',    $delayed => $id );

    $redis->exec;

    $self->_notify_job if $delayed <= $now;

    return 1;
}

sub stats {
    my $self = shift;

    my %stats;
    $stats{inactive_jobs} =
      $self->redis->db->scard('minion.job_state.inactive');
    $stats{active_jobs} = $self->redis->db->scard('minion.job_state.active');
    $stats{failed_jobs} = $self->redis->db->scard('minion.job_state.failed');
    $stats{finished_jobs} =
      $self->redis->db->scard('minion.job_state.finished');
    $stats{delayed_jobs} =
      $self->redis->db->zcount( 'minion.inactive_job_delayed', time, '+inf' );
    $stats{active_workers} = 0;

    my $locks = 0;
    my $keys  = $self->redis->db->keys('minion.lock.*');

    foreach my $name (@$keys) {
        $locks += $self->redis->db->zcount( $name, time, '+inf' );
    }

    $stats{active_locks} = $locks;

    foreach my $id ( @{ $self->redis->db->smembers('minion.workers') } ) {
        $stats{active_workers}++
          if @{
            $self->redis->db->sinter( 'minion.job_state.active',
                "minion.worker.$id.jobs" )
          };
    }
    $stats{enqueued_jobs} = $self->redis->db->get('minion.last_job_id') // 0;
    $stats{inactive_workers} =
      $self->redis->db->scard('minion.workers') - $stats{active_workers};

    $stats{uptime} =
      $self->redis->db->info_structured('server')->{uptime_in_seconds};

    return \%stats;
}

sub unlock {
    my ( $self, $name ) = @_;
    my $redis = $self->redis->db;
    $redis->multi;
    $redis->zremrangebyscore( "minion.lock.$name", '-inf', '(' . time );
    $redis->zremrangebyrank( "minion.lock.$name", 0, 0 );
    my $res = $redis->exec;
    return !!$res->[1];
}

sub unregister_worker {
    my ( $self, $id ) = @_;
    my $redis = $self->redis->db;
    _delete_worker( $redis, $id );
}

sub _flatten {    # no prototype for this one to avoid warnings
    return map { ref eq 'ARRAY' ? _flatten(@$_) : $_ } @_;
}

sub _delete_job {
    my ( $redis, $id, $queue, $state, $task, $worker ) = @_;
    $redis->del(
        "minion.job.$id",         "minion.job.$id.notes",
        "minion.job.$id.parents", "minion.job.$id.children"
    );
    $redis->srem( "minion.job_queue.$queue",                 $id );
    $redis->srem( "minion.job_state.$state",                 $id );
    $redis->srem( "minion.job_task.$task",                   $id );
    $redis->srem( 'minion.job_state.inactive,active,failed', $id );
    $redis->srem( 'minion.jobs',                             $id );
    my $alphaid = sprintf '%012d', $id;
    $redis->zrem( "minion.inactive_job_queue.$queue", $alphaid );
    $redis->zrem( "minion.inactive_job_task.$task",   $alphaid );
    $redis->zrem( 'minion.inactive_job_delayed',      $id );
    $redis->zrem( 'minion.job_finished',              $id );
    $redis->srem( "minion.worker.$worker.jobs", $id ) if defined $worker;
}

sub _delete_worker {
    my ( $redis, $id ) = @_;
    $redis->multi;
    $redis->sunionstore( 'minion.jobs_missing_worker',
        'minion.jobs_missing_worker', "minion.worker.$id.jobs" );
    $redis->del( "minion.worker.$id", "minion.worker.$id.inbox",
        "minion.worker.$id.jobs" );
    $redis->srem( 'minion.workers', $id );
    $redis->zrem( 'minion.worker_notified', $id );
    $redis->exec;
}

sub _notify_job { shift->redis->pubsub->notify( 'minion.job', '' ) }

sub _try {
    my ( $self, $id, $options ) = @_;

    my $queues = $options->{queues} || ['default'];
    my $tasks  = [ keys %{ $self->minion->tasks } ];

    my $job;
    my $redis_job = $self->redis->db;
    my $now       = time;
    if ( defined $options->{id} ) {

        # ensure job isn't taken by someone else
        $redis_job->watch("minion.job.$options->{id}");
        $redis_job->multi;
        $redis_job->hmget( "minion.job.$options->{id}", qw(queue task) );
        my $result = $redis_job->exec;

        my ( $queue, $task ) = @{$result};

        if (    defined $task
            and exists $self->minion->tasks->{$task}
            and defined $queue
            and ( any { $_ eq $queue } @$queues ) )
        {
            $job = $self->_try_job( $options->{id}, $now );
        }
    }
    else {
        my $queue_hash = sha256_base64( encode 'UTF-8', join( ',', @$queues ) );
        my $queue_key  = "minion.temp.queues.$queue_hash";
        my $task_hash  = sha256_base64( encode 'UTF-8', join( ',', @$tasks ) );
        my $task_key   = "minion.temp.tasks.$task_hash";

        my $redis = $self->redis->db;
        $redis->multi;
        $redis->del($queue_key);
        $redis->zunionstore( $queue_key, scalar(@$queues),
            map { "minion.inactive_job_queue.$_" } @$queues )
          if @$queues;
        $redis->expire( $queue_key, 60 );
        $redis->del($task_key);
        $redis->zunionstore( $task_key, scalar(@$tasks),
            map { "minion.inactive_job_task.$_" } @$tasks )
          if @$tasks;
        $redis->expire( $task_key, 60 );

        my $priority_hash =
          sha256_base64( join '$', $queue_hash, $task_hash, $$, $now );
        my $priority_key = "minion.temp.inactive_jobs.$priority_hash";
        $redis->del($priority_key);
        $redis->zinterstore(
            $priority_key, 2, $queue_key, $task_key,
            WEIGHTS => 1,
            0
        );
        $redis->expire( $priority_key, 60 );
        $redis->exec;

        my $i = 0;
        while (
            my @check = @{
                $self->redis->db->zrangebyscore(
                    $priority_key,
                    '-inf', '+inf',
                    LIMIT => $i,
                    1
                )
            }
          )
        {
            my $check_id = 0 + $check[0];

            # ensure job isn't taken by someone else
            #$redis_job->watch("minion.job.$check_id");
            #$redis_job->multi;
            #print "ersdfsdf";
            $job = $self->_try_job( $check_id, $now );
            last if $job;
        }
        continue {
            #$redis_job->discard;
            $i++;
        }
    }

    return undef unless defined $job;

    $redis_job->multi;
    $redis_job->hmset(
        "minion.job.$job->{id}",
        started => time,
        state   => 'active',
        worker  => $id,
    );
    $redis_job->srem( 'minion.job_state.inactive', $job->{id} );
    $redis_job->sadd( 'minion.job_state.active', $job->{id} );
    $redis_job->srem( "minion.worker.$job->{worker}.jobs", $job->{id} )
      if defined $job->{worker};
    $redis_job->sadd( "minion.worker.$id.jobs", $job->{id} );
    my $alphaid = sprintf '%012d', $job->{id};
    $redis_job->zrem( "minion.inactive_job_queue.$job->{queue}", $alphaid );
    $redis_job->zrem( "minion.inactive_job_task.$job->{task}",   $alphaid );
    $redis_job->zrem( 'minion.inactive_job_delayed',             $job->{id} );
    $redis_job->exec;

    return {
        id      => $job->{id},
        args    => from_json( $job->{args} // 'null' ),
        retries => $job->{retries},
        task    => $job->{task},
    };
}

sub _try_job {
    my ( $self, $id, $now ) = @_;
    my ( $state, $delayed ) =
      @{ $self->redis->db->hmget( "minion.job.$id", qw(state delayed) ) };
    return undef
      unless defined $state
      and $state eq 'inactive'
      and defined $delayed
      and $delayed <= $now;
    my $pending = @{
        $self->redis->db->sinter( "minion.job.$id.parents",
            'minion.job_state.inactive,active,failed' )
    };
    return undef if $pending;
    my %job;
    @job{qw(id args queue retries task worker)} = @{
        $self->redis->db->hmget( "minion.job.$id",
            qw(id args queue retries task worker) )
    };
    return \%job;
}

sub _update {
    my ( $self, $fail, $id, $retries, $result ) = @_;

    my $state = $fail ? 'failed' : 'finished';
    my $redis = $self->redis->db;

    my ( $attempts, $curr_retries, $curr_state ) =
      @{ $redis->hmget( "minion.job.$id", qw(attempts retries state) ) };

    return undef
      unless defined $curr_retries
      and $curr_retries == $retries
      and defined $curr_state
      and $curr_state eq 'active';

    $redis->watch("minion.job.$id");
    $redis->multi;
    my $now = time;
    $redis->hmset(
        "minion.job.$id",
        finished => $now,
        result   => to_json($result),
        state    => $state,
    );
    $redis->srem( 'minion.job_state.active',                 $id );
    $redis->srem( 'minion.job_state.inactive,active,failed', $id )
      unless $fail;
    $redis->sadd( "minion.job_state.$state", $id );
    $redis->zadd( 'minion.job_finished', $now => $id ) unless $fail;
    $redis->exec;

    return 1 if !$fail || $attempts == 1;
    return 1 if $retries >= ( $attempts - 1 );
    my $delay = $self->minion->backoff->($retries);
    return $self->retry_job( $id, $retries, { delay => $delay } );
}

1;

=head1 NAME

Minion::Backend::Redis - Redis backend for Minion job queue

=head1 SYNOPSIS

  use Minion::Backend::Redis;
  my $backend = Minion::Backend::Redis->new('redis://127.0.0.1:6379/5');

  # Minion
  use Minion;
  my $minion = Minion->new(Redis => 'redis://127.0.0.1:6379');

  # Mojolicious (via Mojolicious::Plugin::Minion)
  $self->plugin(Minion => { Redis => 'redis://127.0.0.1:6379/2' });

  # Mojolicious::Lite (via Mojolicious::Plugin::Minion)
  plugin Minion => { Redis => 'redis://x:s3cret@127.0.0.1:6379' };

=head1 DESCRIPTION

L<Minion::Backend::Redis> is a backend for L<Minion> based on L<Mojo::Redis>.
Note that L<Redis Server|https://redis.io/download> version C<2.8.0> or newer
is required to use this backend.

=head1 CAUTION

This is a slightly hackish modification of the original code by L<Dan Book|https://github.com/Grinnz/Minion-Backend-Redis> to use L<Mojo::Redis> instead of L<Mojo::Redis2>.

Due to the original code being written against an older Minion version, "history" is currently unimplemented.

=head1 PERFORMANCE

You can run examples/minion_bench.pl to get some performance metrics.  

  Clean start with 10000 jobs
  Enqueued 10000 jobs in 52.6373450756073 seconds (189.979/s)
  4 workers finished 1000 jobs each in 76.6429250240326 seconds (52.190/s)
  4 workers finished 1000 jobs each in 64.2053661346436 seconds (62.300/s)
  Requesting job info 100 times
  Received job info 100 times in 0.783659934997559 seconds (127.606/s)
  Requesting stats 100 times
  Received stats 100 times in 0.595925092697144 seconds (167.806/s)
  Repairing 100 times
  Repaired 100 times in 0.28698992729187 seconds (348.444/s)
  Acquiring locks 1000 times
  Acquired locks 1000 times in 2.0602331161499 seconds (485.382/s)
  Releasing locks 1000 times
  Releasing locks 1000 times in 1.19675707817078 seconds (835.591/s)

=head1 ATTRIBUTES

L<Minion::Backend::Redis> inherits all attributes from L<Minion::Backend> and
implements the following new ones.

=head2 redis

  my $redis = $backend->redis;
  $backend  = $backend->redis(Mojo::Redis->new);

L<Mojo::Redis> object used to store all data.

=head1 METHODS

L<Minion::Backend::Redis> inherits all methods from L<Minion::Backend> and
implements the following new ones.

=head2 new

  my $backend = Minion::Backend::Redis->new;
  my $backend = Minion::Backend::Redis->new('redis://x:s3cret@localhost:6379/5');

Construct a new L<Minion::Backend::Redis> object.

=head2 broadcast

  my $bool = $backend->broadcast('some_command');
  my $bool = $backend->broadcast('some_command', [@args]);
  my $bool = $backend->broadcast('some_command', [@args], [$id1, $id2, $id3]);

Broadcast remote control command to one or more workers.

=head2 dequeue

  my $job_info = $backend->dequeue($worker_id, 0.5);
  my $job_info = $backend->dequeue($worker_id, 0.5, {queues => ['important']});

Wait a given amount of time in seconds for a job, dequeue it and transition
from C<inactive> to C<active> state, or return C<undef> if queues were empty.

These options are currently available:

=over 2

=item id

  id => '10023'

Dequeue a specific job.

=item queues

  queues => ['important']

One or more queues to dequeue jobs from, defaults to C<default>.

=back

These fields are currently available:

=over 2

=item args

  args => ['foo', 'bar']

Job arguments.

=item id

  id => '10023'

Job ID.

=item retries

  retries => 3

Number of times job has been retried.

=item task

  task => 'foo'

Task name.

=back

=head2 enqueue

  my $job_id = $backend->enqueue('foo');
  my $job_id = $backend->enqueue(foo => [@args]);
  my $job_id = $backend->enqueue(foo => [@args] => {priority => 1});

Enqueue a new job with C<inactive> state.

These options are currently available:

=over 2

=item attempts

  attempts => 25

Number of times performing this job will be attempted, with a delay based on
L<Minion/"backoff"> after the first attempt, defaults to C<1>.

=item delay

  delay => 10

Delay job for this many seconds (from now).

=item notes

  notes => {foo => 'bar', baz => [1, 2, 3]}

Hash reference with arbitrary metadata for this job.

=item parents

  parents => [$id1, $id2, $id3]

One or more existing jobs this job depends on, and that need to have
transitioned to the state C<finished> before it can be processed.

=item priority

  priority => 5

Job priority, defaults to C<0>. Jobs with a higher priority get performed first.

=item queue

  queue => 'important'

Queue to put job in, defaults to C<default>.

=back

=head2 fail_job

  my $bool = $backend->fail_job($job_id, $retries);
  my $bool = $backend->fail_job($job_id, $retries, 'Something went wrong!');
  my $bool = $backend->fail_job(
    $job_id, $retries, {msg => 'Something went wrong!'});

Transition from C<active> to C<failed> state, and if there are attempts
remaining, transition back to C<inactive> with an exponentially increasing
delay based on L<Minion/"backoff">.

=head2 finish_job

  my $bool = $backend->finish_job($job_id, $retries);
  my $bool = $backend->finish_job($job_id, $retries, 'All went well!');
  my $bool = $backend->finish_job($job_id, $retries, {msg => 'All went well!'});

Transition from C<active> to C<finished> state.

=head2 history

  my $history = $backend->history;

Get history information for job queue. Unimplemented for now.

These fields are currently available:

=over 2

=item daily

  daily => [{epoch => 12345, finished_jobs => 95, failed_jobs => 2}, ...]

Hourly counts for processed jobs from the past day.

=back

=head2 list_jobs

  my $results = $backend->list_jobs($offset, $limit);
  my $results = $backend->list_jobs($offset, $limit, {state => 'inactive'});

Returns the information about jobs in batches.

  # Check job state
  my $results = $backend->list_jobs(0, 1, {ids => [$job_id]});
  my $state = $results->{jobs}[0]{state};

  # Get job result
  my $results = $backend->list_jobs(0, 1, {ids => [$job_id]});
  my $result = $results->{jobs}[0]{result};

These options are currently available:

=over 2

=item ids

  ids => ['23', '24']

List only jobs with these ids.

=item queue

  queue => 'important'

List only jobs in this queue.

=item state

  state => 'inactive'

List only jobs in this state.

=item task

  task => 'test'

List only jobs for this task.

=back

These fields are currently available:

=over 2

=item args

  args => ['foo', 'bar']

Job arguments.

=item attempts

  attempts => 25

Number of times performing this job will be attempted.

=item children

  children => ['10026', '10027', '10028']

Jobs depending on this job.

=item created

  created => 784111777

Epoch time job was created.

=item delayed

  delayed => 784111777

Epoch time job was delayed to.

=item finished

  finished => 784111777

Epoch time job was finished.

=item notes

  notes => {foo => 'bar', baz => [1, 2, 3]}

Hash reference with arbitrary metadata for this job.

=item parents

  parents => ['10023', '10024', '10025']

Jobs this job depends on.

=item priority

  priority => 3

Job priority.

=item queue

  queue => 'important'

Queue name.

=item result

  result => 'All went well!'

Job result.

=item retried

  retried => 784111777

Epoch time job has been retried.

=item retries

  retries => 3

Number of times job has been retried.

=item started

  started => 784111777

Epoch time job was started.

=item state

  state => 'inactive'

Current job state, usually C<active>, C<failed>, C<finished> or C<inactive>.

=item task

  task => 'foo'

Task name.

=item worker

  worker => '154'

Id of worker that is processing the job.

=back

=head2 list_locks

  my $results = $backend->list_locks($offset, $limit);
  my $results = $backend->list_locks($offset, $limit, {names => ['foo']});

Returns information about locks in batches.

  # Get the total number of results (without limit)
  my $num = $backend->list_locks(0, 100, {names => ['bar']})->{total};

  # Check expiration time
  my $results = $backend->list_locks(0, 1, {names => ['foo']});
  my $expires = $results->{locks}[0]{expires};

These options are currently available:

=over 2

=item names

  names => ['foo', 'bar']

List only locks with these names.

=back

These fields are currently available:

=over 2

=item expires

  expires => 784111777

Epoch time this lock will expire.

=item name

  name => 'foo'

Lock name.

=back

=head2 list_workers

  my $results = $backend->list_workers($offset, $limit);
  my $results = $backend->list_workers($offset, $limit, {ids => [23]});

Returns information about workers in batches.

  # Check worker host
  my $results = $backend->list_workers(0, 1, {ids => [$worker_id]});
  my $host    = $results->{workers}[0]{host};

These options are currently available:

=over 2

=item ids

  ids => ['23', '24']

List only workers with these ids.

=back

These fields are currently available:

=over 2

=item host

  host => 'localhost'

Worker host.

=item jobs

  jobs => ['10023', '10024', '10025', '10029']

Ids of jobs the worker is currently processing.

=item notified

  notified => 784111777

Epoch time worker sent the last heartbeat.

=item pid

  pid => 12345

Process id of worker.

=item started

  started => 784111777

Epoch time worker was started.

=item status

  status => {queues => ['default', 'important']}

Hash reference with whatever status information the worker would like to share.

=back

=head2 lock

  my $bool = $backend->lock('foo', 3600);
  my $bool = $backend->lock('foo', 3600, {limit => 20});

Try to acquire a named lock that will expire automatically after the given
amount of time in seconds.

These options are currently available:

=over 2

=item limit

  limit => 20

Number of shared locks with the same name that can be active at the same time,
defaults to C<1>.

=back

=head2 note

  my $bool = $backend->note($job_id, foo => 'bar');

Change a metadata field for a job.

=head2 receive

  my $commands = $backend->receive($worker_id);

Receive remote control commands for worker.

=head2 register_worker

  my $worker_id = $backend->register_worker;
  my $worker_id = $backend->register_worker($worker_id);
  my $worker_id = $backend->register_worker(
    $worker_id, {status => {queues => ['default', 'important']}});

Register worker or send heartbeat to show that this worker is still alive.

These options are currently available:

=over 2

=item status

  status => {queues => ['default', 'important']}

Hash reference with whatever status information the worker would like to share.

=back

=head2 remove_job

  my $bool = $backend->remove_job($job_id);

Remove C<failed>, C<finished> or C<inactive> job from queue.

=head2 repair

  $backend->repair;

Repair worker registry and job queue if necessary.

=head2 reset

  $backend->reset;

Reset job queue.

=head2 retry_job

  my $bool = $backend->retry_job($job_id, $retries);
  my $bool = $backend->retry_job($job_id, $retries, {delay => 10});

Transition job back to C<inactive> state, already C<inactive> jobs may also be
retried to change options.

These options are currently available:

=over 2

=item attempts

  attempts => 25

Number of times performing this job will be attempted.

=item delay

  delay => 10

Delay job for this many seconds (from now).

=item priority

  priority => 5

Job priority.

=item queue

  queue => 'important'

Queue to put job in.

=back

=head2 stats

  my $stats = $backend->stats;

Get statistics for jobs and workers.

These fields are currently available:

=over 2

=item active_jobs

  active_jobs => 100

Number of jobs in C<active> state.

=item active_workers

  active_workers => 100

Number of workers that are currently processing a job.

=item delayed_jobs

  delayed_jobs => 100

Number of jobs in C<inactive> state that are scheduled to run at specific time
in the future. Note that this field is EXPERIMENTAL and might change without
warning!

=item enqueued_jobs

  enqueued_jobs => 100000

Rough estimate of how many jobs have ever been enqueued. Note that this field is
EXPERIMENTAL and might change without warning!

=item failed_jobs

  failed_jobs => 100

Number of jobs in C<failed> state.

=item finished_jobs

  finished_jobs => 100

Number of jobs in C<finished> state.

=item inactive_jobs

  inactive_jobs => 100

Number of jobs in C<inactive> state.

=item inactive_workers

  inactive_workers => 100

Number of workers that are currently not processing a job.

=item uptime

  uptime => 1000

Uptime in seconds.

=back

=head2 unlock

  my $bool = $backend->unlock('foo');

Release a named lock.

=head2 unregister_worker

  $backend->unregister_worker($worker_id);

Unregister worker.

=head1 BUGS

Report any issues on the public bugtracker.

=head1 AUTHOR

Dan Book <dbook@cpan.org>

=head1 COPYRIGHT AND LICENSE

This software is Copyright (c) 2017 by Dan Book.

This is free software, licensed under:

  The Artistic License 2.0 (GPL Compatible)

=head1 SEE ALSO

L<Minion>, L<Mojo::Redis>


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.