Group
Extension

Net-Async-Pusher/lib/Net/Async/Pusher/Connection.pm

package Net::Async::Pusher::Connection;

use strict;
use warnings;

our $VERSION = '0.005'; # VERSION
our $AUTHORITY = 'cpan:TEAM'; # AUTHORITY

use parent qw(IO::Async::Notifier);

=head1 NAME

Net::Async::Pusher::Connection - represents one L<Net::Async::Pusher> server connection

=head1 DESCRIPTION

Provides basic integration with the L<https://pusher.com|Pusher> API. This implements
the protocol as documented in L<https://pusher.com/docs/pusher_protocol>.

=cut

use Syntax::Keyword::Try;
use Mixin::Event::Dispatch::Bus;
use Net::Async::WebSocket::Client;
use IO::Async::SSL;
use URI;
use HTML::Entities ();
use JSON::MaybeXS;
use JSON::MaybeUTF8 qw(:v1);
use curry::weak;
use Log::Any qw($log);
use Variable::Disposition qw(retain_future);

use Net::Async::Pusher::Channel;

=head1 METHODS

=cut

sub bus { shift->{bus} //= Mixin::Event::Dispatch::Bus->new }

sub json { shift->{json} //= JSON::MaybeXS->new( allow_nonref => 1) }

=head2 send_ping

Sends a ping request on this connection.

=cut

sub send_ping {
    my ($self) = @_;
    $self->client->send_frame(
        buffer => encode_json_utf8({
            event => 'pusher:ping',
            data  => { }
        }),
        masked => 1,
    );
    if(my $timer = $self->{inactivity_timer}) {
        $timer->stop if $timer->is_running;
        $timer->reset;
        $timer->start;
    }
    $self
}

=head2 incoming_frame

Deals with incoming frames.

=cut

sub incoming_frame {
    my $self = shift;
    my ($client, $frame) = @_;

    return unless defined($frame) && length($frame);

    try {
        $log->tracef("Frame [%s]", $frame);
        $self->{last_seen} = time;
        my $info = $self->json->decode($frame);
        if(exists $info->{channel}) {
            return $self->{channel}{$info->{channel}}->incoming_message($info);
        } elsif($info->{event} eq 'pusher:connection_established') {
            my $data = $self->json->decode($info->{data});
            $self->{socket_id} = $data->{socket_id};
            $log->tracef('Setting inactivity timeout to %d seconds', $data->{activity_timeout});
            $self->add_child(
                $self->{inactivity_timer} = IO::Async::Timer::Countdown->new(
                    delay     => $data->{activity_timeout},
                    on_expire => $self->curry::weak::send_ping,
                )
            );
            $self->{inactivity_timer}->start;
            return $self->connected->done;
        } elsif($info->{event} eq 'pusher:ping') {
            return $self->client->send_frame(
                buffer => encode_json_utf8({
                    event => 'pusher:pong',
                    data  => { }
                }),
                masked => 1,
            );
        } elsif($info->{event} eq 'pusher:pong') {
            return $log->trace("Pong event received from pusher");
        }
        die "unhandled"
    } catch {
        my $err = $@;
        $log->errorf("Unexpected frame (%s) [%s]", $err, $frame);
        $self->bus->invoke_event(
            error => $err, $frame
        );
    }
}

sub incoming_ping_frame {
    my $self = shift;
    my ($client, $frame) = @_;
    $log->debugf('Received ping frame');
    $self->client->send_pong_frame(
        '',
    );
}

sub socket_id { shift->{socket_id} }

=head2 client

Returns the L<Net::Async::WebSocket::Client> instance.

=cut

sub client { shift->{client} }

=head2 open_channel

Opens a channel.

 my $ch = $conn->open_channel(
  'xyz'
 )->get;

Resolves to a L<Net::Async::Pusher::Channel> instance.

=cut

sub open_channel {
    my ($self, $name, %args) = @_;
    $self->connected->then(sub {
        $log->tracef("Subscribing to [%s]", $name);
        my $ch = $self->{channel}{$name} = Net::Async::Pusher::Channel->new(
            name => $name,
        );
        $self->add_child($ch);
        my $frame = encode_json_utf8({
            event => 'pusher:subscribe',
            # double-encoded
            data  => {
                (exists $args{auth} ? (auth => $args{auth}) : ()),
                channel => $name
            }
        });
        $log->tracef("Subscribing: %s", $frame);
        $self->client->send_frame(
            buffer => $frame,
            masked => 1,
        );
        # We map the channel ourselves so that we don't end up with
        # the channel's ->subscribed method holding a strong reference
        # to itself
        $self->{channel}{$name}->subscribed->transform(
            done => sub { $ch }
        )
    });
}

=head2 connect

(Re)connects to the feed.

=cut

sub connect {
    my ($self) = @_;
    $self->add_child(
         $self->{client} = Net::Async::WebSocket::Client->new(
            on_frame => $self->curry::weak::incoming_frame,
            on_ping_frame => $self->curry::weak::incoming_ping_frame,
        )
    );
    my $uri = URI->new('wss://ws-mt1.pusher.com/app/' . $self->key . '?protocol=7&client=perl-net-async-pusher&version=' . ($self->VERSION || '1.0'));
    $log->tracef('Connecting to %s', $uri);
    $self->client->connect(
        url     => $uri,
    )->then(sub {
        $log->tracef('Connected to %s', $uri);
        # don't seem to get any response until we send something first
        $self->send_ping;
        Future->done($self)
    })->on_fail(sub {
        $log->errorf('Failed to connect - %s', join ',', @_)
    })->retain;
}

=head2 connected

L<Future> representing current connection state.

=cut

sub connected { $_[0]->{connected} ||= $_[0]->loop->new_future }

sub configure {
    my ($self, %args) = @_;
    for(grep exists $args{$_}, qw(key)) {
        $self->{$_} = delete $args{$_};
    }
    $self->SUPER::configure(%args);
}

=head2 key

The key.

=cut

sub key { shift->{key} }

1;

__END__

=head1 AUTHOR

Tom Molesworth <TEAM@cpan.org>

=head1 LICENSE

Copyright Tom Molesworth 2015-2021. Licensed under the same terms as Perl itself.



Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.