Group
Extension

Kelp-Module-WebSocket-AnyEvent/lib/Kelp/Module/WebSocket/AnyEvent.pm

package Kelp::Module::WebSocket::AnyEvent;
$Kelp::Module::WebSocket::AnyEvent::VERSION = '1.05';
use Kelp::Base qw(Kelp::Module::Symbiosis::Base);
use Plack::App::WebSocket;
use Kelp::Module::WebSocket::AnyEvent::Connection;
use Carp qw(croak carp cluck);
use Try::Tiny;

attr "-serializer";
attr "-connections" => sub { {} };

attr "on_open" => sub {
	sub { }
};
attr "on_close" => sub {
	sub { }
};
attr "on_message" => sub {
	sub { }
};
attr "on_malformed_message" => sub {
	sub { die pop }
};
attr "on_error";

# This function is here to work around Twiggy bug that is silencing errors
# Warn them instead, so they can be logged and spotted
sub _trap(&)
{
	my ($block) = @_;
	try {
		$block->();
	}
	catch {
		cluck $_;
		die $_;
	};
}

sub name { 'websocket' }

sub psgi
{
	my ($self) = @_;

	my $conn_max_id = 0;
	my $websocket = Plack::App::WebSocket->new(

		# on_error - optional
		(defined $self->on_error ? (on_error => sub { $self->on_error->(@_) }) : ()),

		# on_establish - mandatory
		on_establish => sub {
			my ($orig_conn, $env) = @_;

			my $conn = Kelp::Module::WebSocket::AnyEvent::Connection->new(
				connection => $orig_conn,
				id => ++$conn_max_id,
				manager => $self
			);
			_trap { $self->on_open->($conn, $env) };

			$conn->connection->on(
				message => sub {
					my ($orig_conn, $message) = @_;
					my $err;

					if (my $s = $self->get_serializer) {
						try {
							$message = $s->decode($message);
						}
						catch {
							$err = $_ || 'unknown error';
						};
					}

					_trap {
						if ($err) {
							$self->on_malformed_message->($conn, $message, $err);
						}
						else {
							$self->on_message->($conn, $message);
						}
					};
				},
				finish => sub {
					_trap { $self->on_close->($conn) };
					$conn->close;
					undef $orig_conn;
				},
			);
		}
	);

	return $websocket->to_app;
}

sub add
{
	my ($self, $type, $sub) = @_;

	croak "websocket handler for $type is not a code reference"
		unless ref $sub eq 'CODE';

	$type = "on_$type";
	my $setter = $self->can($type);
	croak "unknown websocket event `$type`"
		unless defined $setter;

	return $setter->($self, $sub);
}

sub get_serializer
{
	my ($self) = @_;
	return undef unless defined $self->serializer;

	my $real_serializer_method = $self->app->can($self->serializer);
	croak "Kelp doesn't have $self->serializer serializer"
		unless defined $real_serializer_method;

	return $real_serializer_method->($self->app);
}

sub build
{
	my ($self, %args) = @_;
	$self->SUPER::build(%args);
	$self->{serializer} = $args{serializer} // $self->serializer;

	$self->register(websocket => $self);
}

1;
__END__

=pod

=head1 NAME

Kelp::Module::WebSocket::AnyEvent - AnyEvent websocket server integration with Kelp

=head1 SYNOPSIS

	# in config

	modules => [qw(Symbiosis WebSocket::AnyEvent)],
	modules_init => {
		"WebSocket::AnyEvent" => {
			mount => '/ws',
			serializer => "json",
		},
	},


	# in application's build method

	my $ws = $self->websocket;
	$ws->add(message => sub {
		my ($conn, $msg) = @_;
		$conn->send({received => $msg});
	});

	# can also be mounted like this, if not specified in config
	$self->symbiosis->mount("/ws" => $ws);         # by module object
	$self->symbiosis->mount("/ws" => 'websocket'); # by name


	# in psgi script

	$app = MyApp->new;
	$app->run_all;


=head1 DESCRIPTION

This is a module that integrates a websocket instance into Kelp using L<Kelp::Module::Symbiosis>. To run it, a non-blocking Plack server based on AnyEvent is required, like L<Twiggy>. All this module does is wrap L<Plack::App::WebSocket> instance in Kelp's module, introduce a method to get this instance in Kelp and integrate it into running alongside Kelp using Symbiosis. An instance of this class will be available in Kelp under the I<websocket> method.

=head1 METHODS INTRODUCED TO KELP

=head2 websocket

Returns the instance of this class (Kelp::Module::WebSocket::AnyEvent).

=head1 METHODS

=head2 name

	sig: name($self)

Reimplemented from L<Kelp::Module::Symbiosis::Base>. Returns a name of a module that can be used in C<< $symbiosis->loaded >> hash or when mounting by name. The return value is constant string I<'websocket'>.

Requires Symbiosis version I<1.10> for name mounting to function.

=head2 connections

	sig: connections($self)

Returns a hashref containing all available L<Kelp::Module::WebSocket::AnyEvent::Connection> instances (open connections) keyed by their unique id. An id is autoincremented from 1 and guaranteed not to change and not to be replaced by a different connection unless the server restarts.

A connection holds some additional data that can be used to hold custom data associated with that connection:

	# set / get data fields (it's an empty hash ref by default)
	$connection->data->{internal_id} = $internal_id;

	# get the entire hash reference
	$hash_ref = $connection->data;

	# replace the hash reference with something else
	$connection->data($something_else);

=head2 middleware

	sig: middleware($self)

Returns an arrayref of all middlewares in format: C<[ middleware_class, [ middleware_config ] ]>.

=head2 psgi

	sig: psgi($self)

Returns a ran instance of L<Plack::App::WebSocket>.

=head2 run

	sig: run($self)

Same as psgi, but wraps the instance in all wanted middlewares.

=head2 add

	sig: add($self, $event, $handler)

Registers a $handler (coderef) for websocket $event (string). Handler will be passed an instance of L<Kelp::Module::WebSocket::AnyEvent::Connection> and an incoming message. $event can be either one of: I<open close message error>. You can only specify one handler for each event type.

=head1 EVENTS

All event handlers must be code references.

=head2 open

	open => sub ($new_connection, $env) { ... }

B<Optional>. Called when a new connection opens. A good place to set up its variables.

=head2 message

	message => sub ($connection, $message) { ... }

B<Optional>. This is where you handle all the incoming websocket messages. If a serializer is specified, C<$message> will already be unserialized.

=head2 malformed_message

	message => sub ($connection, $message, $error) { ... }

B<Optional>. This is where you handle the incoming websocket messages which could not be unserialized by a serializer. By default, an exception will be re-thrown, and effectively the connection will be closed.

If Kelp JSON module is initialized with I<'allow_nonref'> flag then this event will never occur.

C<$error> will not be likely be fit for end user message, as it will contain file names and line numbers.

=head2 close

	close => sub ($connection) { ... }

B<Optional>. Called when the connection is closing.

=head2 error

	error => $psgi_app

B<Optional>. The code reference should be a psgi application. It will be called if an error occurs and the websocket connection have to be closed.

=head1 CONFIGURATION

=head2 middleware, middleware_init

See L<Kelp::Module::Symbiosis::Base/middleware, middleware_init>.

=head2 mount

See L<Kelp::Module::Symbiosis::Base/mount>.

=head2 serializer

Contains the name of the method that will be called to obtain an instance of serializer. Kelp instance must have that method registered. It must be able to C<< ->encode >> and C<< ->decode >>. Should also throw exception on error.

=head1 SEE ALSO

=over 2

=item * L<Dancer2::Plugin::WebSocket>, same integration for Dancer2 framework this module was inspired by

=item * L<Kelp>, the framework

=item * L<Twiggy>, a server capable of running this websocket

=back

=head1 AUTHOR

Bartosz Jarzyna, E<lt>bbrtj.pro@gmail.comE<gt>

=head1 COPYRIGHT AND LICENSE

Copyright (C) 2020 - 2022 by Bartosz Jarzyna

This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.

=cut



Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.