Group
Extension

Net-Async-Trello/lib/Net/Async/Trello/WS.pm

package Net::Async::Trello::WS;

use strict;
use warnings;

our $VERSION = '0.007'; # VERSION

use parent qw(IO::Async::Notifier);

use Syntax::Keyword::Try;

use JSON::MaybeXS;
use Net::Async::WebSocket::Client;

use Log::Any qw($log);

my $json = JSON::MaybeXS->new;

sub configure {
	my ($self, %args) = @_;
	for my $k (grep exists $args{$_}, qw(token trello)) {
		$self->{$k} = delete $args{$k};
	}
	$self->SUPER::configure(%args);
}

sub connection {
    my ($self, %args) = @_;
    $self->{ws_connection} ||= do {
        $log->tracef('Set up WS connection');
        my $uri = $self->trello->endpoint(
            'websockets',
            token => $self->token,
        );
        $log->tracef('Connecting to websocket endpoint %s', "$uri");
        $self->{ws}->connect(
            url        => $uri,
            host       => $uri->host,
            (
                $uri->scheme eq 'wss'
                ? (
                    SSL_hostname => $uri->host,
                ) : ()
            )
        )->then(sub {
            my ($conn) = @_;
            $log->tracef("Connected");
            # $conn->send_frame($json->encode({"type"=> "ping","reqid"=>0}));
            Future->done($conn);
        });
    };
}

my %model_for_type = (
    board => 'Board',
    card => 'Card'
);

sub next_request_id {
    return (shift->{last_request_id} //= 0)++;
}

sub subscribe {
    my ($self, $type, $id) = @_;
    $self->connection->then(sub {
        my ($conn) = @_;
        $log->tracef("Subscribing to %s %s", $type, $id);
        my $src = $self->{update_channel}{$id} //= $self->ryu->source;
        $conn->send_frame(
            buffer => $json->encode({
                idModel          => $id,
                invitationTokens => [],
                modelType        => $model_for_type{$type},
                reqid            => $self->next_request_id,
                tags             => [qw(clientActions updates)],
                type             => "subscribe",
            }),
            masked => 1
        );
        return Future->done($src);
    })
}

sub websocket_events { my ($self) = @_; $self->{websocket_events} //= $self->ryu->source }

sub on_frame {
	my ($self, $ws, $bytes) = @_;
    my $text = $bytes; # Encode::decode_utf8($bytes);
    $log->debugf("Have frame [%s]", $text);
    $self->websocket_events->emit($text);

    if(length $text) {
        $log->tracef("<< %s", $text);
        try {
            my $data = $json->decode($text);
            if(my $chan = $data->{idModelChannel}) {
                $log->tracef("Notification for [%s] - %s", $chan, $data);
                $self->{update_channel}{$chan}->emit($data->{notify});
            } else {
                $log->warnf("No idea what %s is", $data);
            }
        } catch {
            $log->errorf("Exception in websocket raw frame handling: %s (original text %s)", $@, $text);
        }
    } else {
        $log->tracef("<< ping received, responding");
        # Empty frame is used for PING, send a response back
        $self->pong;
    }
}

sub pong {
    my ($self) = @_;
    $self->{ws}->send_frame(
        masked => 1,
        buffer => ''
    );
}

sub _add_to_loop {
    my ($self, $loop) = @_;

    $self->add_child(
        $self->{ryu} = Ryu::Async->new
    );

    $self->add_child(
        $self->{ws} = Net::Async::WebSocket::Client->new(
            on_frame => $self->curry::weak::on_frame,
        )
    );
    if(0) {
        $self->add_child(
            my $timer = IO::Async::Timer::Periodic->new(
                interval => 15,
                on_tick => $self->curry::weak::on_tick,
            )
        );
        $timer->start;
        Scalar::Util::weaken($self->{timer} = $timer);
    }
}

sub on_tick {
    my ($self) = @_;
    my $ws = $self->connection;
    return unless $ws->is_ready;
    $self->pong;
}

sub trello { shift->{trello} }
sub token { shift->{token} }

sub ryu { shift->trello->ryu(@_) }

1;



Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.