Net-Async-UWSGI/lib/Net/Async/UWSGI/Server/Connection.pm
package Net::Async::UWSGI::Server::Connection;
$Net::Async::UWSGI::Server::Connection::VERSION = '0.006';
use strict;
use warnings;
use parent qw(IO::Async::Stream);
=head1 NAME
Net::Async::UWSGI::Server::Connection - represents an incoming connection to a server
=head1 VERSION
version 0.006
=head1 DESCRIPTION
=cut
use JSON::MaybeXS;
use URI::QueryParam;
use IO::Async::Timer::Countdown;
use Encode qw(encode);
use Protocol::UWSGI qw(:server);
use List::UtilsBy qw(bundle_by);
=head2 CONTENT_TYPE_HANDLER
=cut
our %CONTENT_TYPE_HANDLER = (
'application/javascript' => 'json',
);
use constant USE_HTTP_RESPONSE => 0;
=head1 METHODS
=cut
=head2 configure
Applies configuration parameters.
=over 4
=item * bus - the event bus
=item * on_request - callback when we get an incoming request
=back
=cut
sub configure {
my ($self, %args) = @_;
for(qw(bus on_request default_content_handler)) {
$self->{$_} = delete $args{$_} if exists $args{$_};
}
$self->SUPER::configure(%args);
}
sub default_content_handler { shift->{default_content_handler} }
=head2 json
Accessor for the current JSON state
=cut
sub json { shift->{json} ||= JSON::MaybeXS->new(utf8 => 1) }
=head2 on_read
Base read handler for incoming traffic.
Attempts to delegate to L</dispatch_request> as soon as we get the UWSGI
frame.
=cut
sub on_read {
my ( $self, $buffref, $eof ) = @_;
if(my $pkt = extract_frame($buffref)) {
$self->{env} = $pkt;
# We have a request, start processing
return $self->can('dispatch_request');
} elsif($eof) {
# EOF before a valid request? Bail out immediately
$self->cancel;
}
return 0;
}
=head2 cancel
Cancels any request in progress.
If there's still a connection to the client,
they'll receive a 500 response.
It's far more likely that the client has gone
away, in which case there's no response to send.
=cut
sub cancel {
my ($self) = @_;
$self->response->cancel unless $self->response->is_ready
}
=head2 env
Accessor for the UWSGI environment.
=cut
sub env { shift->{env} }
=head2 response
Resolves when the response is complete.
=cut
sub response {
$_[0]->{response} ||= $_[0]->loop->new_future;
}
=head2 dispatch_request
At this point we have a request including headers,
and we should know whether there's a body involved
somewhere.
=cut
sub dispatch_request {
my ($self, $buffref, $eof) = @_;
# Plain GET request? We might be able to bail out here
return $self->finish_request unless $self->has_body;
my $env = $self->env;
my $handler = $self->default_content_handler || 'raw';
if(my $type = $env->{CONTENT_TYPE}) {
$handler = $CONTENT_TYPE_HANDLER{$type} if exists $CONTENT_TYPE_HANDLER{$type};
}
$handler = 'content_handler_' . $handler;
$self->{input_handler} = $self->${\"curry::weak::$handler"};
# Try to read N bytes if we have content length. Most UWSGI implementations seem
# to set this.
if(exists $env->{CONTENT_LENGTH}) {
$self->{remaining} = $env->{CONTENT_LENGTH};
return $self->can('read_to_length');
}
# Streaming might be nice, but nginx has no support for this
if(exists $env->{HTTP_TRANSFER_ENCODING} && $env->{HTTP_TRANSFER_ENCODING} eq 'chunked') {
return $self->can('read_chunked');
}
die "no idea how to handle this, missing length and not chunked";
}
sub finish_request {
my ($self) = @_;
$self->{request_body} = $self->{input_handler}->()
if $self->has_body;
$self->{completion} = $self->{on_request}->($self)
->then($self->curry::write_response)
->on_fail(sub {
$self->debug_printf("Failed while attempting to handle request: %s (%s)", @_);
})->on_ready($self->curry::close_now);
return sub {
my ($self, $buffref, $eof) = @_;
$self->{completion}->cancel if $eof && !$self->{completion}->is_ready;
0
}
}
{
my %methods_with_body = (
PUT => 1,
POST => 1,
PROPPATCH => 1,
);
=head2 has_body
Returns true if we're expecting a request body
for the current request method.
=cut
sub has_body {
my ($self, $env) = @_;
return 1 if $methods_with_body{$self->env->{REQUEST_METHOD}};
return 0;
}
}
=head2 read_chunked
Read handler for chunked data. Unlikely to be used by any real implementations.
=cut
sub read_chunked {
my ($self, $buffref, $eof) = @_;
$self->debug_printf("Body read: $self, $buffref, $eof: [%s]", $$buffref);
if(defined $self->{chunk_remaining}) {
my $data = substr $$buffref, 0, $self->{chunk_remaining}, '';
$self->{chunk_remaining} -= length $data;
$self->debug_printf("Had %d bytes, %d left in chunk", length($data), $self->{chunk_remaining});
$self->{input_handler}->($data);
return 0 if $self->{chunk_remaining};
$self->debug_printf("Look for next chunk");
delete $self->{chunk_remaining};
return 1;
} else {
return 0 if -1 == (my $size_len = index($$buffref, "\x0D\x0A"));
$self->{chunk_remaining} = hex substr $$buffref, 0, $size_len, '';
substr $$buffref, 0, 2, '';
$self->debug_printf("Have %d bytes in this chunk", $self->{chunk_remaining});
return 1 if $self->{chunk_remaining};
$self->debug_printf("End of chunked data, looking for trailing headers");
return $self->can('on_trailing_header');
}
}
=head2 on_trailing_header
Deal with trailing headers. Not yet implemented.
=cut
sub on_trailing_header {
my ($self, $buffref, $eof) = @_;
# FIXME not yet implemented
$$buffref = '';
return $self->finish_request;
}
=head2 read_to_length
Read up to the expected fixed length of data.
=cut
sub read_to_length {
my ($self, $buffref, $eof) = @_;
$self->{remaining} -= length $$buffref;
$self->debug_printf("Body read: $self, $buffref, $eof: %s with %d remaining", $$buffref, $self->{remaining});
$self->{input_handler}->($$buffref);
$$buffref = '';
return $self->finish_request unless $self->{remaining};
return 0;
}
=head2 request_body
Accessor for the request body, available to the L</finish_request> callback.
=cut
sub request_body { shift->{request_body} }
sub content_handler_raw {
my ($self, $data) = @_;
if(defined $data) {
$self->{data} .= $data;
} else {
return $self->{data}
}
}
=head2 content_handler_json
Handle JSON content.
=cut
sub content_handler_json {
my ($self, $data) = @_;
if(defined $data) {
eval {
$self->json->incr_parse($data);
1
} or do {
$self->debug_printf("Invalid JSON received: %s", $@);
};
} else {
return eval {
$self->json->incr_parse
} // do {
$self->debug_printf("Invalid JSON from incr_parse: %s", $@);
}
}
}
my %status = (
100 => 'Continue',
101 => 'Switching protocols',
102 => 'Processing',
200 => 'OK',
201 => 'Created',
202 => 'Accepted',
203 => 'Non-authoritative information',
204 => 'No content',
205 => 'Reset content',
206 => 'Partial content',
207 => 'Multi-status',
208 => 'Already reported',
226 => 'IM used',
300 => 'Multiple choices',
301 => 'Moved permanently',
302 => 'Found',
303 => 'See other',
304 => 'Not modified',
305 => 'Use proxy',
307 => 'Temporary redirect',
308 => 'Permanent redirect',
400 => 'Bad request',
401 => 'Unauthorised',
402 => 'Payment required',
403 => 'Forbidden',
404 => 'Not found',
405 => 'Method not allowed',
500 => 'Internal server error',
);
sub write_response {
my ($self, $code, $hdr, $body) = @_;
my $type = ref($body) ? 'text/javascript' : 'text/plain';
my $content = ref($body) ? encode_json($body) : encode(
'UTF-8' => $body
);
$hdr ||= [];
if(USE_HTTP_RESPONSE) {
return $self->write(
'HTTP/1.1 ' . HTTP::Response->new(
$code => ($status{$code} // 'Unknown'), [
'Content-Type' => $type,
'Content-Length' => length $content,
@$hdr
],
$content
)->as_string("\x0D\x0A")
)
} else {
return $self->write(
join "\015\012", (
'HTTP/1.1 ' . $code . ' ' . ($status{$code} // 'Unknown'),
'Content-Type: ' . $type,
'Content-Length: ' . length($content),
(bundle_by { join ': ', @_ } 2, @$hdr),
'',
$content
)
)
}
}
1;
__END__
=head1 AUTHOR
Tom Molesworth <cpan@perlsite.co.uk>
=head1 LICENSE
Copyright Tom Molesworth 2013-2015. Licensed under the same terms as Perl itself.