Group
Extension

Mojo-Redis/t/pubsub.t

use Mojo::Base -strict;
use Test::More;
use Mojo::JSON qw(encode_json);
use Mojo::Redis;

plan skip_all => 'TEST_ONLINE=redis://localhost' unless $ENV{TEST_ONLINE};
*memory_cycle_ok
  = eval 'require Test::Memory::Cycle;1' ? \&Test::Memory::Cycle::memory_cycle_ok : sub { ok 1, 'memory_cycle_ok' };

my $redis  = Mojo::Redis->new($ENV{TEST_ONLINE});
my $db     = $redis->db;
my $pubsub = $redis->pubsub;
my (@events, @messages, @res);

subtest memory => sub {
  memory_cycle_ok($redis, 'cycle ok for Mojo::Redis');
  memory_cycle_ok($redis, 'cycle ok for Mojo::Redis::PubSub');
};

subtest events => sub {
  $pubsub->on(error      => sub { shift; push @events, [error      => @_] });
  $pubsub->on(psubscribe => sub { shift; push @events, [psubscribe => @_] });
  $pubsub->on(subscribe  => sub { shift; push @events, [subscribe  => @_] });

  is ref($pubsub->listen("rtest:$$:1" => \&gather)), 'CODE', 'listen';
  $pubsub->listen("rtest:$$:2" => \&gather);
  note 'Waiting for subscriptions to be set up...';
  Mojo::Promise->timer(0.15)->wait;
  memory_cycle_ok($redis, 'cycle ok after listen');
};

subtest notify => sub {
  $pubsub->notify("rtest:$$:1" => 'message one');
  $db->publish_p("rtest:$$:2" => 'message two')->wait;
  memory_cycle_ok($redis, 'cycle ok after notify');
  has_messages("rtest:$$:1/message one", "rtest:$$:2/message two");
};

subtest channels => sub {
  $pubsub->channels_p('rtest*')->then(sub { @res = @_ })->wait;
  is_deeply [sort @{$res[0]}], ["rtest:$$:1", "rtest:$$:2"], 'channels_p';
};

subtest numsub => sub {
  $pubsub->numsub_p("rtest:$$:1")->then(sub { @res = @_ })->wait;
  is_deeply $res[0], {"rtest:$$:1" => 1}, 'numsub_p';
};

subtest numpat => sub {
  $pubsub->numpat_p->then(sub { @res = @_ })->wait;
  is_deeply $res[0], 0, 'numpat_p';
};

subtest unlisten => sub {
  is $pubsub->unlisten("rtest:$$:1", \&gather), $pubsub, 'unlisten';
  memory_cycle_ok($pubsub, 'cycle ok after unlisten');
  $db->publish_p("rtest:$$:1" => 'nobody is listening to this');

  note 'Making sure the last message is not received';
  Mojo::Promise->timer(0.15)->wait;
  has_messages();
};

subtest 'listen patterns' => sub {
  $pubsub->listen("rtest:$$:*" => \&gather);
  Mojo::Promise->timer(0.1)->wait;

  $pubsub->notify("rtest:$$:4" => 'message four');
  $pubsub->notify("rtest:$$:5" => 'message five');
  wait_for_messages(2);

  has_messages("rtest:$$:5/message five", "rtest:$$:4/message four");
  $pubsub->unlisten("rtest:$$:*");
};

subtest connection => sub {
  my $conn = $pubsub->connection;
  is @{$conn->subscribers('response')}, 1, 'only one message subscriber';

  undef $pubsub;
  delete $redis->{pubsub};
  isnt $redis->db->connection, $conn, 'pubsub connection cannot be re-used';
};

subtest 'json data' => sub {
  $pubsub = $redis->pubsub;
  $pubsub->listen("rtest:$$:1" => \&gather);
  Mojo::Promise->timer(0.1)->wait;

  $pubsub->notify_p("rtest:$$:1" => '{"invalid"');
  $pubsub->json("rtest:$$:1");
  $pubsub->notify("rtest:$$:1" => {some => 'data'});
  $pubsub->notify("rtest:$$:1" => 'just a string');
  wait_for_messages(3);

  has_messages("rtest:$$:1/undef", qq(rtest:$$:1/HASH/{"some":"data"}), "rtest:$$:1/just a string");
};

subtest events => sub {
  is_deeply [sort { $a cmp $b } map { $_->[0] } @events], [qw(psubscribe subscribe subscribe)], 'events';
};

done_testing;

sub gather {
  shift;
  push @messages, join '/', map { !defined($_) ? 'undef' : ref($_) ? (ref($_), encode_json($_)) : $_ } reverse @_;
}

sub has_messages {
  is_deeply [sort @messages], [sort @_], 'has messages' or diag explain \@messages;
  @messages = ();
}

sub wait_for_messages {
  my $n = shift;
  Mojo::IOLoop->one_tick until @messages >= $n;
}


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.