Group
Extension

Mojo-Pg/lib/Mojo/Pg/PubSub.pm

package Mojo::Pg::PubSub;
use Mojo::Base 'Mojo::EventEmitter';

use Mojo::JSON   qw(from_json to_json);
use Scalar::Util qw(weaken);

has pg => undef, weak => 1;
has reconnect_interval => 1;

sub db {
  my $self = shift;

  return $self->{db} if $self->{db};

  my $db = $self->{db} = $self->pg->db;
  weaken $self;
  $db->on(
    notification => sub {
      my ($db, $name, $pid, $payload) = @_;
      return unless my $cbs = $self->{chans}{$name};
      $payload = eval { from_json $payload } if $self->{json}{$name};
      for my $cb (@$cbs) { $self->$cb($payload) }
    }
  );

  $db->once(close => sub { $self->emit(disconnect => delete $self->{db}) });
  $db->listen($_) for keys %{$self->{chans}}, 'mojo.pubsub';
  delete $self->{reconnecting};
  $self->emit(reconnect => $db);

  return $db;
}

sub DESTROY { shift->reset unless ${^GLOBAL_PHASE} eq 'DESTRUCT' }

sub json { ++$_[0]{json}{$_[1]} and return $_[0] }

sub listen {
  my ($self, $name, $cb) = @_;
  $self->db->listen($name) if !@{$self->{chans}{$name} ||= []} && !$self->{reconnecting};
  push @{$self->{chans}{$name}}, $cb;
  return $cb;
}

sub new {
  my $self = shift->SUPER::new(@_);
  $self->on(disconnect => \&_disconnect);
  return $self;
}

sub notify { $_[0]->db->notify(_json(@_)) and return $_[0] }

sub reset {
  my $self = shift;
  delete @$self{qw(chans json pid)};
  return unless my $db = delete $self->{db};
  ++$db->dbh->{private_mojo_no_reuse} and $db->_unwatch;
}

sub unlisten {
  my ($self, $name, $cb) = @_;

  my $chan = $self->{chans}{$name};
  unless (@$chan = $cb ? grep { $cb ne $_ } @$chan : ()) {
    $self->db->unlisten($name) unless $self->{reconnecting};
    delete $self->{chans}{$name};
  }

  return $self;
}

sub _disconnect {
  my $self = shift;

  $self->{reconnecting} = 1;

  weaken $self;
  my $r;
  $r = Mojo::IOLoop->recurring(
    $self->reconnect_interval => sub {
      Mojo::IOLoop->remove($r) if eval { $self->db };
    }
  );
}

sub _json { $_[1], $_[0]{json}{$_[1]} ? to_json $_[2] : $_[2] }

1;

=encoding utf8

=head1 NAME

Mojo::Pg::PubSub - Publish/Subscribe

=head1 SYNOPSIS

  use Mojo::Pg::PubSub;

  my $pubsub = Mojo::Pg::PubSub->new(pg => $pg);
  my $cb = $pubsub->listen(foo => sub ($pubsub, $payload) {
    say "Received: $payload";
  });
  $pubsub->notify(foo => 'I ♥ Mojolicious!');
  $pubsub->unlisten(foo => $cb);

=head1 DESCRIPTION

L<Mojo::Pg::PubSub> is a scalable implementation of the publish/subscribe pattern used by L<Mojo::Pg>. It is based on
PostgreSQL notifications and allows many consumers to share the same database connection, to avoid many common
scalability problems.

=head1 EVENTS

L<Mojo::Pg::PubSub> inherits all events from L<Mojo::EventEmitter> and can emit the following new ones.

=head2 disconnect

  $pubsub->on(disconnect => sub ($pubsub, $db) {
    ...
  });

Emitted after the current database connection is lost.

=head2 reconnect

  $pubsub->on(reconnect => sub ($pubsub, $db) {
    ...
  });

Emitted after switching to a new database connection for sending and receiving notifications.

=head1 ATTRIBUTES

L<Mojo::Pg::PubSub> implements the following attributes.

=head2 pg

  my $pg  = $pubsub->pg;
  $pubsub = $pubsub->pg(Mojo::Pg->new);

L<Mojo::Pg> object this publish/subscribe container belongs to. Note that this attribute is weakened.

=head2 reconnect_interval

  my $interval = $pubsub->reconnect_interval;
  $pubsub      = $pubsub->reconnect_interval(0.1);

Amount of time in seconds to wait to reconnect after disconnecting, defaults to C<1>.

=head1 METHODS

L<Mojo::Pg::PubSub> inherits all methods from L<Mojo::EventEmitter> and implements the following new ones.

=head2 db

  my $db = $pubsub->db;

Build and cache or get cached L<Mojo::Pg::Database> connection from L</"pg">. Used to reconnect if disconnected.

  # Reconnect immediately
  $pubsub->unsubscribe('disconnect')->on(disconnect => sub ($pubsub, $db) { pubsub->db });

=head2 json

  $pubsub = $pubsub->json('foo');

Activate automatic JSON encoding and decoding with L<Mojo::JSON/"to_json"> and L<Mojo::JSON/"from_json"> for a channel.

  # Send and receive data structures
  $pubsub->json('foo')->listen(foo => sub ($pubsub, $payload) {
    say $payload->{bar};
  });
  $pubsub->notify(foo => {bar => 'I ♥ Mojolicious!'});

=head2 listen

  my $cb = $pubsub->listen(foo => sub {...});

Subscribe to a channel, there is no limit on how many subscribers a channel can have. Automatic decoding of JSON text
to Perl values can be activated with L</"json">.

  # Subscribe to the same channel twice
  $pubsub->listen(foo => sub ($pubsub, $payload) {
    say "One: $payload";
  });
  $pubsub->listen(foo => sub ($pubsub, $payload) {
    say "Two: $payload";
  });

=head2 new

  my $pubsub = Mojo::Pg::PubSub->new;
  my $pubsub = Mojo::Pg::PubSub->new(pg => Mojo::Pg->new);
  my $pubsub = Mojo::Pg::PubSub->new({pg => Mojo::Pg->new});

Construct a new L<Mojo::Pg::PubSub> object and subscribe to the L</"disconnect"> event with default reconnect logic.

=head2 notify

  $pubsub = $pubsub->notify('foo');
  $pubsub = $pubsub->notify(foo => 'I ♥ Mojolicious!');
  $pubsub = $pubsub->notify(foo => {bar => 'baz'});

Notify a channel. Automatic encoding of Perl values to JSON text can be activated with L</"json">.

=head2 reset

  $pubsub->reset;

Reset all subscriptions and the database connection. This is usually done after a new process has been forked, to
prevent the child process from stealing notifications meant for the parent process.

=head2 unlisten

  $pubsub = $pubsub->unlisten('foo');
  $pubsub = $pubsub->unlisten(foo => $cb);

Unsubscribe from a channel.

=head1 SEE ALSO

L<Mojo::Pg>, L<Mojolicious::Guides>, L<https://mojolicious.org>.

=cut


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.