Group
Extension

AnyMQ-Pg/lib/AnyMQ/Trait/Pg.pm

package AnyMQ::Trait::Pg;

# use 5.010;

use Any::Moose 'Role';

use AnyEvent::Pg 0.04;
use JSON;
use Try::Tiny;

has 'debug' => (
    is => 'rw',
    isa => 'Bool',
    default => 0,
);

has 'dsn' => (
    is => 'ro',
    isa => 'Str',
    default => '',
);

has '_client' => (
    is => 'ro',
    isa => 'AnyEvent::Pg',
    lazy => 1, # need to construct client after all param attributes have been created
    builder => '_build_client',
    predicate => '_client_exists',
);

has 'on_connect' => (
    is => 'ro',
    isa => 'Maybe[CodeRef]',
);

has 'on_error' => (
    is => 'ro',
    isa => 'Maybe[CodeRef]',
);

has 'channels' => (
    is => 'ro',
    isa => 'ArrayRef',
    default => sub { [] },
    traits => [ 'Array' ],
    handles => {
        'add_channel'  => 'push',
        'all_channels' => 'elements',
    },
);

has 'publish_queue' => (
    is => 'ro',
    isa => 'ArrayRef',
    default => sub { [] },
    traits => [ 'Array' ],
    handles => {
        'publish_queue_push'    => 'push',
        'publish_queue_unshift' => 'unshift',
    },
);

has 'is_connected' => (
    is => 'rw',
    isa => 'Bool',
);

has '_json' => ( is => 'rw', lazy_build => 1, isa => 'JSON' );

has '_pg_query_watchers' => (
    is => 'ro',
    isa => 'ArrayRef',
    default => sub { [] },
    traits => [ 'Array' ],
    handles => {
        '_pg_query_watcher_push' => 'push',
    }
);

sub BUILD {
    my ($self) = @_;
    
    # once everything is set up, we can construct and connect our client object
    $self->_client;
}

# JSON codec pack
sub _build__json {
    my ($self) = @_;
    return JSON->new->utf8;
}

sub _build_client {
    my ($self) = @_;

    my $dsn = $self->dsn;
    my $pg = AnyEvent::Pg->new(
        $dsn,
        on_connect       => sub { $self->_on_connect(@_) },
        on_connect_error => sub { $self->_on_connect_error(@_) },
        on_error         => sub { $self->_on_error(@_) },
        on_notify        => sub { $self->_on_notify(@_) },
    );

    return $pg;
}

sub listen {
    my ($self, $channel, %query_opts) = @_;

    $self->add_channel($channel);
    return unless $self->is_connected;
    
    $self->_push_listen($channel, %query_opts);
}

sub _push_listen {
    my ($self, $channel, %query_opts) = @_;
    return $self->_push_notif_command('LISTEN', $channel, %query_opts);
}

sub unlisten {
    my ($self, $channel, %query_opts) = @_;

    return $self->_push_notif_command('UNLISTEN', $channel, %query_opts);
}

# publishes notification with $payload on channel
sub notify {
    my ($self, @rest) = @_;
    my ($channel, $payload, %query_opts) = @rest;

    unless ($self->is_connected) {
        $self->publish_queue_push(\@rest);
        return;
    }
    
    my $query = 'NOTIFY "' . $self->_client->dbc->escapeString($channel) . '"';
    $query = join(',', $query, $self->_client->dbc->escapeLiteral($payload)) if $payload;
    warn $query if $self->debug;
    my $qw = $self->_client->push_query(query => $query, %query_opts);
    $self->_pg_query_watcher_push($qw);
}

# handles LISTEN/UNLISTEN
sub _push_notif_command {
    my ($self, $cmd, $channel, %opts) = @_;

    my $query = $cmd . ' "' . $self->_client->dbc->escapeString($channel) . '"';
    my $qw = $self->_client->push_query(
        query => $query,
        %opts
    );
    warn $query if $self->debug;
    $self->_pg_query_watcher_push($qw);
    return $qw;
}

sub encode_event {
    my ($self, $evt) = @_;

    return $evt unless ref $evt;

    # encode refs with JSON
    return $self->_json->encode($evt);
}

sub _on_connect {
    my $self = shift;

    $self->is_connected(1);

    # subscribe to channels
    if ($self->all_channels) {
        $self->_push_listen($_) for $self->all_channels;
    }
    
    # publish outstanding notifs
    my $pub_queue = $self->publish_queue;
    if ($pub_queue) {
        foreach my $evt (@$pub_queue) {
            $self->notify(@$evt);
        }
    }
    
    # time to call our connect callback
    $self->on_connect->($self, @_) if $self->on_connect;
}

sub _on_connect_error {
    my ($self, @rest) = @_;

    $self->is_connected(0);
    $self->_on_error(@rest);
}

sub _on_error {
    my $self = shift;
    my ($pg) = @_;

    my $err = $pg->dbc->errorMessage;

    if ($self->on_error) {
        $self->on_error->($self, $err);
    } else {
        warn "AnyMQ::Pg error: $err";
    }
}

sub _on_notify {
    my ($self, $pg, $channel, $pid, $payload) = @_;

    my $evt;
    # assume payload is JSON
    try {
        # try decoding from json
        $evt = $self->_json->decode($payload);
    } catch {
        # we'll make the event whatever raw data we got from the payload
        $evt = $payload;
    };

    # no payload at all
    $evt //= { 'name' => $channel };

    # notify listeners
    $self->topic($channel)->append_to_queues($evt);
}

sub new_topic {
    my ($self, $opt) = @_;

    # name of topic to subscribe to, passed in
    $opt = { name => $opt } unless ref $opt;

    # use our topic role
    AnyMQ::Topic->new_with_traits(
        %$opt,
        traits => [ 'Pg' ],
        bus => $self,
    );
}

sub DEMOLISH {}; after 'DEMOLISH' => sub {
    my ($self, $igd) = @_;

    return if $igd;

    $self->_client->destroy if $self->_client_exists;
};

1;


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.