Group
Extension

Mojo-WebSocketProxy/lib/Mojo/WebSocketProxy/Dispatcher.pm

package Mojo::WebSocketProxy::Dispatcher;

use strict;
use warnings;

use Mojo::Base 'Mojolicious::Controller';
use Mojo::WebSocketProxy::Parser;
use Mojo::WebSocketProxy::Config;

use Class::Method::Modifiers;

use JSON::MaybeUTF8    qw(:v1);
use Log::Any           qw($log);
use Unicode::Normalize ();
use Future::Mojo 0.004;    # ->new_timeout
use Future::Utils qw(fmap);
use Scalar::Util  qw(blessed);
use Encode;
use DataDog::DogStatsd::Helper qw(stats_inc);

use constant TIMEOUT => $ENV{MOJO_WEBSOCKETPROXY_TIMEOUT} || 15;
use Mojo::WebSocketProxy::RequestLogger;

our $VERSION = '0.16';     ## VERSION
around 'send' => sub {
    my ($orig, $c, $api_response, $req_storage) = @_;

    my $config = $c->wsp_config->{config};

    my $max_response_size = $config->{max_response_size};
    if ($max_response_size && length(encode_json_utf8($api_response)) > $max_response_size) {
        $api_response->{json} = $c->wsp_error('error', 'ResponseTooLarge', 'Response too large.');
    }

    my $before_send_api_response = $config->{before_send_api_response};
    $_->($c, $req_storage, $api_response->{json})
        for grep { $_ } (ref $before_send_api_response eq 'ARRAY' ? @{$before_send_api_response} : $before_send_api_response);

    my $ret = $orig->($c, $api_response);

    my $after_sent_api_response = $config->{after_sent_api_response};
    $_->($c, $req_storage) for grep { $_ } (ref $after_sent_api_response eq 'ARRAY' ? @{$after_sent_api_response} : $after_sent_api_response);

    return $ret;
};

sub ok {
    return 1;
}

sub open_connection {
    my ($c) = @_;

    my $remote_address = $c->tx->remote_address // 'undef';
    $log->debugf('accepting a websocket connection from %s', $remote_address);
    # Enable permessage-deflate
    $c->tx->with_compression;

    my $config = $c->wsp_config->{config};

    Mojo::IOLoop->singleton->stream($c->tx->connection)->timeout($config->{stream_timeout}) if $config->{stream_timeout};
    Mojo::IOLoop->singleton->max_connections($config->{max_connections})                    if $config->{max_connections};

    $config->{opened_connection}->($c) if $config->{opened_connection};

    $c->on(
        text => sub {
            my ($c, $msg) = @_;

            my $original = "$msg";
            # Incoming data will be JSON-formatted text, as a Unicode string.
            # We normalize the entire string before decoding.

            my $decoded = eval { Encode::decode_utf8($msg, Encode::FB_CROAK) } or do {
                $c->tx->emit(
                    encoding_error => _get_error_details(
                        code    => 'INVALID_UTF8',
                        reason  => 'Malformed UTF-8 data',
                        message => $msg
                    ));
                return;
            };

            # The Unicode::Normalize::NFC check is added as a safety net. However, the error is not triggered so far.
            my $normalized_msg = eval { Unicode::Normalize::NFC($decoded) } or do {
                $c->tx->emit(
                    encoding_error => _get_error_details(
                        code    => 'INVALID_UNICODE',
                        reason  => 'Malformed Unicode data',
                        message => $msg
                    ));
                return;
            };

            my $args = eval { decode_json_text($normalized_msg); } or do {
                $c->tx->emit(
                    encoding_error => _get_error_details(
                        code    => 'INVALID_JSON',
                        reason  => 'Malformed JSON data',
                        message => $msg
                    ));
                return;
            };

            on_message($c, $args);
        });

    $c->on(
        binary => sub {
            my ($d, $bytes) = @_;
            $config->{binary_frame}(@_) if $bytes and exists($config->{binary_frame});
        });

    $c->on(finish => $config->{finish_connection}) if $config->{finish_connection};

    return;
}

sub on_message {
    my ($c, $args) = @_;

    my $config = $c->wsp_config->{config};

    my $req_storage = {};
    $req_storage->{args} = $args;

    $req_storage->{logger} = Mojo::WebSocketProxy::RequestLogger->new;
    # We still want to run any hooks even for invalid requests.
    if (my $err = Mojo::WebSocketProxy::Parser::parse_req($c, $req_storage)) {
        $c->send({json => $err}, $req_storage);
        return $c->_run_hooks($config->{after_dispatch} || [])->retain;
    }

    my $action = $c->dispatch($args) or do {
        my $err = $c->wsp_error('error', UnrecognisedRequest => 'Unrecognised request');
        $c->send({json => $err}, $req_storage);
        return $c->_run_hooks($config->{after_dispatch} || [])->retain;
    };

    @{$req_storage}{keys %$action} = (values %$action);
    $req_storage->{method} = $req_storage->{name};

    # main processing pipeline
    my $f = $c->before_forward($req_storage)->transform(
        done => sub {
            # Note that we completely ignore the return value of ->before_forward here.
            return $req_storage->{instead_of_forward}->($c, $req_storage) if $req_storage->{instead_of_forward};
            return $c->forward($req_storage);
        }
    )->then(
        sub {
            my $result = shift;
            return $c->after_forward($result, $req_storage)->transform(done => sub { $result });
        },
        sub {
            my $result = shift;
            Future->done($result);
        });

    return Future->wait_any(
        Future::Mojo->new_timeout(TIMEOUT)->else(
            sub {
                return Future->done($c->wsp_error('error', Timeout => 'Timeout'));
            }
        ),
        $f
    )->then(
        sub {
            my ($result) = @_;
            $c->send({json => $result}, $req_storage) if $result;
            return $c->_run_hooks($config->{after_dispatch} || []);
        }
    )->on_fail(
        sub {
            $c->app->log->error("An error occurred handling on_message. Error @_");
        })->retain;
}

sub before_forward {
    my ($c, $req_storage) = @_;

    my $config = $c->wsp_config->{config};

    my $before_forward_hooks = [];

    # Global hooks are always first
    for ($config, $req_storage) {
        push @$before_forward_hooks, ref($_->{before_forward}) eq 'ARRAY' ? @{$_->{before_forward}} : $_->{before_forward};
    }

    # We always want to clear these after every request.
    delete $req_storage->{before_forward};

    return $c->_run_hooks($before_forward_hooks, $req_storage);
}

sub after_forward {
    my ($c, $result, $req_storage) = @_;

    my $config = $c->wsp_config->{config};
    return $c->_run_hooks($config->{after_forward} || [], $result, $req_storage);
}

sub _run_hooks {
    my @hook_params = @_;
    my $c           = shift @hook_params;
    my $hooks       = shift @hook_params;

    my $result_f = fmap {
        my $hook   = shift;
        my $result = $hook->($c, @hook_params) or return Future->done;
        return $result if blessed($result) && $result->isa('Future');
        return Future->fail($result);
    }
    foreach        => [grep { defined } @$hooks],
        concurrent => 1;
    return $result_f;
}

sub dispatch {
    my ($c, $args) = @_;

    my $log = $c->app->log;
    $log->debug("websocket got json " . $c->dumper($args));

    my ($action) =
        sort { $a->{order} <=> $b->{order} }
        grep { defined }
        map  { $c->wsp_config->{actions}->{$_} } keys %$args;

    return $action;
}

sub forward {
    my ($c, $req_storage) = @_;

    my $config = $c->wsp_config->{config};

    for my $hook (qw/ before_call before_get_rpc_response after_got_rpc_response /) {
        $req_storage->{$hook} = [
            grep { $_ } (ref $config->{$hook} eq 'ARRAY'      ? @{$config->{$hook}}      : $config->{$hook}),
            grep { $_ } (ref $req_storage->{$hook} eq 'ARRAY' ? @{$req_storage->{$hook}} : $req_storage->{$hook}),
        ];
    }

    # default to config (generic) only if call specific is not defined
    $req_storage->{rpc_failure_cb} //= $config->{rpc_failure_cb};

    my $backend_name = $req_storage->{backend} // "default";
    my $backend      = $c->wsp_config->{backends}{$backend_name}
        or die "Cannot dispatch request - no backend named '$backend_name'";

    $backend->call_rpc($c, $req_storage);

    return;
}

sub _get_error_details {
    my (%args) = @_;

    return {
        error   => 'Error Processing Request',
        details => {
            error_code   => $args{code},
            reason       => $args{reason},
            request_body => $args{message},
        },
    };
}

1;

__END__

=head1 NAME

Mojo::WebSocketProxy::Dispatcher

=head1 DESCRIPTION

Using this module you can forward websocket JSON-RPC 2.0 requests to RPC server.
See L<Mojo::WebSocketProxy> for details on how to use hooks and parameters.

=head1 METHODS

=head2 open_connection

Run while opening new wss connection.
Run hook when connection is opened.
Set finish connection callback.

=head2 on_message

Handle message - parse and dispatch request messages.
Dispatching action and forward to RPC server. Note that all
incoming JSON messages are first normalised using
L<NFC|https://www.w3.org/International/articles/unicode-migration/#normalization>.
 


=head2 before_forward

Run hooks.

=head2 after_forward

Run hooks.

=head2 dispatch

Dispatch request using message json key.

=head2 forward

Forward call to RPC server using global and action hooks.
Don't forward call to RPC if any before_forward hook returns response.
Or if there is instead_of_forward action.

=head2 _get_error_details

Generates and returns a hash for error reporting

=head2 ok

=head1 SEE ALSO

L<Mojolicious::Plugin::WebSocketProxy>,
L<Mojo::WebSocketProxy>,
L<Mojo::WebSocketProxy::Backend>,
L<Mojo::WebSocketProxy::Dispatcher>,
L<Mojo::WebSocketProxy::Config>
L<Mojo::WebSocketProxy::Parser>

=cut


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.