Group
Extension

Web-Hippie-PubSub/lib/Web/Hippie/PubSub.pm

package Web::Hippie::PubSub;

use strict;
use warnings;
our $VERSION = '0.08';
use parent 'Plack::Middleware';

use AnyEvent;
use AnyEvent::Socket;
use AnyEvent::Handle;
use Plack::Request;
use Plack::Builder;
use Plack::App::Cascade;
use Web::Hippie;
use Web::Hippie::Pipe;
use JSON;
use Carp qw/croak cluck/;

# bus = AnyMQ pubsub client bus
# keep_alive = seconds between "ping" events
use Plack::Util::Accessor qw/
    bus keep_alive
/;

sub call {
    my ($self, $env) = @_;
    my $res = $self->app->($env);
    return $res;
}

sub prepare_app {
    my ($self) = @_;

    die "bus is a required builder argument for Web::Hippie::PubSub"
        unless $self->bus;

    my $keep_alive = $self->keep_alive;

    my $builder = Plack::Builder->new;

    # stats server
    $builder->add_middleware(sub {
        my $app = shift;
        return sub {
            my $env = shift;
            my $req = Plack::Request->new($env);
            my $path = $req->path;

            #warn "path: $path";
            if ($path eq '/stats') {
                my $res = $req->new_response(200);
                $res->content_type('text/html; charset=utf-8');

                my $ret = '';
                while (my ($stat, $count) = each %{$self->stats}) {
                    $ret .= "$stat: $count\n";
                }

                $res->content($ret);
                $res->finalize;
            } else {
                return $app->($env);
            }
        }
    });

    # websocket/mxhr/poll handlers
    $builder->add_middleware('+Web::Hippie');
    
    # AnyMQ stuff for Web::Hippie
    $builder->add_middleware('+Web::Hippie::Pipe', bus => $self->bus);
    
    # our simple publish/subscribe event code
    $builder->add_middleware(sub {
        my $app = shift;
        return sub {
            # these are handlers for internal hippie events, NOT actual
            # URLs visited by the client
            # (/new_listener, /message, /error)
            my $env = shift;
            my $channel = $env->{'hippie.args'};
            my $req = Plack::Request->new($env);
            my $h = $env->{'hippie.handle'};

            if ($req->path eq '/new_listener') {
                # called when we get a new topic subscription

                return [ 400, [], [ "Channel is required for new_listener" ] ] unless $channel;
                my $topic = eval { $env->{'hippie.bus'}->topic($channel) };
                unless ($topic) {
                    warn "Could not get topic for channel $channel: $@";
                    return [ 500, [ 'Content-Type' => 'text/plain' ], [ "Unable to create listener for channel $channel" ] ];
                }

                # subscribe client to events on $channel
                my $res;
                my $ok = eval {
                    $env->{'hippie.listener'}->subscribe($topic);
                    $res = $app->($env);
                    1;
                };

                unless ($ok) {
                    warn "Error subscribing to topic '$topic': $@";
                }

                $self->start_keepalive_timer($env);

                $self->increment_stats_counter('current_subscribers');
                $self->increment_stats_counter('total_subscribers');
                
                # success
                return $res || [ '200', [ 'Content-Type' => 'text/plain' ], [ "Now listening on $channel" ] ];

            } elsif ($req->path eq '/message') {
                # called when we are publishing a message

                # get message channel
                return [ '400', [ 'Content-Type' => 'text/plain' ], [ "Channel is required" ] ] unless $channel;
                my $topic = $env->{'hippie.bus'}->topic($channel);

                # get message, tack on sent time and from addr
                my $msg = $env->{'hippie.message'};
                $msg->{time} = time;
                $msg->{address} = $env->{REMOTE_ADDR};

                # publish event, but don't notify local listeners (or
                # they will receive a duplicate event)
                $topic->publish($msg);

                $self->increment_stats_counter('events_published');

                my $res = $app->($env);
                return $res || [ '200', [ 'Content-Type' => 'text/plain' ], [ "Event published on $channel" ] ];
            } elsif ($req->path eq '/error') {
                $self->stop_keepalive_timer($env);
                $self->decrement_stats_counter('current_subscribers');
            }

            my $res = $app->($env);
            
            # we didn't handle anything
            return $res || [ '404', [ 'Content-Type' => 'text/plain' ], [ "unknown event server path " . $req->path ] ];
        }
    });

    $self->app( $builder->to_app($self->app) );
}

sub start_keepalive_timer {
    my ($self, $env) = @_;

    return unless $self->keep_alive;

    my $h = $env->{'hippie.handle'};
    
    my $w = AnyEvent->timer(
        interval => $self->keep_alive,
        cb => sub {
            my $ok = eval {
                $h->send_msg({
                    type => 'ping',
                    time => AnyEvent->now,
                });
                1;
            };

            # client has disconnected, stop firing
            unless ($ok) {
                $self->stop_keepalive_timer($env);
            }
        }
    );
    $env->{'hippie.listener'}->{keepalive_timer} = $w;
}

sub stop_keepalive_timer {
    my ($self, $env) = @_;

    undef $env->{'hippie.listener'}->{keepalive_timer};
    
    delete $env->{'hippie.listener'}->{keepalive_timer}
        if $env->{'hippie.listener'};
}

sub increment_stats_counter {
    my ($self, $stat_name) = @_;

    $self->{_stats}{$stat_name} ||= 0;
    $self->{_stats}{$stat_name}++;
}

sub decrement_stats_counter {
    my ($self, $stat_name) = @_;

    $self->{_stats}{$stat_name} ||= 0;
    $self->{_stats}{$stat_name}--;
}

sub stats {
    my ($self) = @_;
    
    return $self->{_stats} || {};
}

1;
__END__

=encoding utf-8

=for stopwords

=head1 NAME

Web::Hippie::PubSub - Comet/Long-poll event server using AnyMQ

=head1 SYNOPSIS

  use Plack::Builder;
  use AnyMQ;
  use AnyMQ::ZeroMQ;

  my $bus = AnyMQ->new_with_traits(
    traits            => [ 'ZeroMQ' ],
    subscribe_address => 'tcp://localhost:4001',
    publish_address   => 'tcp://localhost:4000',
  );

  # your plack application
  my $app = sub { ... }

  builder {
    # mount hippie server
    mount '/_hippie' => builder {
      enable "+Web::Hippie::PubSub",
        keep_alive => 30,   # send 'ping' event every 30 seconds
        bus        => $bus;
      sub {
        my $env = shift;
        my $args = $env->{'hippie.args'};
        my $handle = $env->{'hippie.handle'};
        # Your handler based on PATH_INFO: /init, /error, /message
      }
    };
    mount '/' => my $app;
  };

=head1 ATTRIBUTES

=over 4

=item bus

AnyMQ bus configured for publish/subscribe events

=item keep_alive

Number of seconds between keep-alive events. ZMQ::Server will send a
"ping" event to keep connections alive. Set to zero to disable.

=back

=head1 METHODS

=over 4

=item stats

Returns hashref of statistical event handling information.

=back

=head1 DESCRIPTION

This module adds publish/subscribe capabilities to L<Web::Hippie> using
AnyMQ.

See eg/event_server.psgi for example usage.

=head1 SEE ALSO

L<Web::Hippie>, L<Web::Hippie::Pipe>, L<AnyMQ>,
L<ZeroMQ::PubSub>

=head1 AUTHOR

Mischa Spiegelmock E<lt>revmischa@cpan.orgE<gt>


Based on work by:

Chia-liang Kao E<lt>clkao@clkao.orgE<gt>

Jonathan Rockway E<lt>jrockway@cpan.orgE<gt>

=head1 LICENSE

This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.

=cut


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.