Group
Extension

App-rhttp/bin/rhttp.pl

#!/usr/bin/env perl

package Base;

use Modern::Perl;
use Carp qw();

# Kudos to Mojo::Base

sub import {
    my $class = shift;

    my $caller = caller;
    no strict 'refs';
    *{"${caller}::has"} = sub { attr($caller, @_) };
}

sub attr {
    my ($class, $attrs, $default) = @_;
    return unless ($class = ref $class || $class) && $attrs;

    Carp::croak 'Default has to be a code reference or constant value'
    if ref $default && ref $default ne 'CODE';

    for my $attr (@{ref $attrs eq 'ARRAY' ? $attrs : [$attrs]}) {
        Carp::croak qq{Attribute "$attr" invalid} unless $attr =~ /^[a-zA-Z_]\w*$/;

        # Header (check arguments)
        my $code = "package $class;\nsub $attr {\n  if (\@_ == 1) {\n";

        # No default value (return value)
        unless (defined $default) { $code .= "    return \$_[0]{'$attr'};" }

        # Default value
        else {
            # Return value
            $code .= "    return \$_[0]{'$attr'} if exists \$_[0]{'$attr'};\n";

            # Return default value
            $code .= "    return \$_[0]{'$attr'} = ";
            $code .= ref $default eq 'CODE' ? '$default->($_[0]);' : '$default;';
        }

        # Store value
        $code .= "\n  }\n  \$_[0]{'$attr'} = \$_[1];\n";

        # Footer (return invocant)
        $code .= "  \$_[0];\n}";

        Carp::croak "error: $@" unless eval "$code;1";
    }
}

package Backend;

use strict;
use warnings;

our @ISA = qw(Base AnyEvent::Handle);
BEGIN {
    Base->import;
}

use AnyEvent::Socket;
use IO::String;

has qw(browser);
has qw(headers);
has qw(headers_done);
has qw(wtf_buffer);
has qw(send_size);
has qw(content_length);
has qw(keep_alive);
has qw(websocket);

sub DESTROY {
    # say("DESTROY: Backend");
}

sub new {
    my ($self, %ops) = @_;

    my $backend;
    my %tls = ();
    if ($ops{vhost}{tls}) {
        $tls{tls} = "connect";
    }
    $backend = shift->SUPER::new(
        fh => $ops{fh},
        timeout => 30,
        on_timeout => sub {
            AE::log error => "Operation timed out";

            $backend->destroy;
        },
        on_error => sub {
            AE::log error => $_[2];
            $_[0]->destroy;
        },
        on_eof => sub {
            $backend->destroy;
            undef($backend);
            AE::log info => "Done.";
        }, 
        on_read => \&default_read,
        %tls
    );

    $backend->browser(delete $ops{browser});
    $backend->browser->backend($backend);  # Hrmm

    $backend->init;

    return($backend);
}

sub default_read {
   my ($backend) = @_;
 
   if ($backend->websocket) {
       $backend->push_read(\&pipe_websocket);
   }
   else {
       $backend->push_read(line => \&get_headers);
   }
}

sub init {
    my ($backend) = @_;

    $backend->headers(IO::String->new);
    $backend->headers_done(0);
    $backend->send_size(0);
    $backend->content_length(undef);
    $backend->wtf_buffer("");
    $backend->keep_alive(1);
    $backend->websocket(0);
}

sub get_headers {
    my ($backend, $line, $eol) = @_;

    my $h = $backend->headers;

    if (!$line) {
        print($h "\015\012");
        $backend->headers_done(1);
        $backend->stop_read;

        $h->setpos(0);
        while (<$h>) {
            print(">>> $_") if ($ENV{RHTTP_PROXY_LOG});

            if (/Content-Length: (\d+)/) {
                $backend->content_length($1);
                $backend->send_size($1);
            }
            if (/Connection: close/) {
                $backend->keep_alive(0);
            }
            if (/Sec-WebSocket-Accept/) {
                $backend->websocket(1);
                $backend->browser->websocket(1);
            }
        }
        $h->setpos(0);

        # "pipe" content if avail, if not, then restart
        if ($backend->content_length) {
            # Send the response header, then "pipe" the content
            $backend->browser->push_write(${ $backend->headers->string_ref });
            $backend->browser->on_drain(sub { $backend->unshift_read(sub { shift->pipe_body($backend->browser) }) });  # Who sells see shores?
        }
        elsif ($backend->websocket && $backend->browser->websocket) {
            $backend->start_read;
            $backend->browser->push_write(${ $backend->headers->string_ref });

            $backend->browser->on_drain(sub { 
                my $browser = shift;
                $browser->timeout(60);
                $browser->backend->timeout(60);
                say("+++ Websocket pipe") if $ENV{RHTTP_PROXY_LOG};
                $browser->on_drain(undef);
                $browser->backend->on_drain(undef);
            });
        }
        else {
            $backend->browser->push_write(${ $backend->headers->string_ref });
            $backend->browser->on_drain(sub { shift->restart($backend) });
        }
    }
    elsif ($line && !$backend->headers_done) {
        print($h "$line$eol");
    }
    else {
        $backend->wtf_buffer($backend->wtf_buffer ."$line$eol");
    }
}

sub pipe_websocket {
    my ($backend) = @_;

    my $msg = $backend->rbuf;
    substr($backend->rbuf, 0) = "";
    $backend->browser->push_write($msg);

    say(">>> [ws:brw] " . length($msg)) if $ENV{RHTTP_PROXY_LOG};
}

sub pipe_body {
    my ($backend, $browser) = @_;

    if ($backend->content_length == $backend->send_size) {
        $browser->on_drain(undef);
    }

    if ($backend->wtf_buffer) {
        $browser->push_write($backend->wtf_buffer);
        $backend->send_size($backend->send_size - length($backend->wtf_buffer));
        say(">>> [f] " . $backend->content_length() . " " . $backend->send_size() . " " . length($backend->wtf_buffer)) if $ENV{RHTTP_PROXY_LOG};
        $backend->wtf_buffer(undef);
    }

    my $msg = $backend->rbuf;
    substr($backend->rbuf, 0) = "";

    $backend->send_size($backend->send_size - length($msg));

    if (0 == $backend->send_size) {
        say(">>> [s] " . $backend->content_length() . " " . $backend->send_size() . " " . length($msg)) if $ENV{RHTTP_PROXY_LOG};
        $browser->stop_read;
        $backend->stop_read;
        $browser->push_write($msg);

        if ($browser->keep_alive && $backend->keep_alive) {
            $browser->on_drain( sub { shift->restart($backend) } );
        }
        else {
            $browser->on_drain( sub { shutdown($$backend{fh}, 1); $backend->timeout(1); } );
        }

        return 1;
    }
    else {
        say(">>> [w] " . $backend->content_length() . " " . $backend->send_size() . " " . length($msg)) if $ENV{RHTTP_PROXY_LOG};
        $browser->push_write($msg);
    }

    return 0;
}

package Browser;

use strict;
use warnings;

our @ISA = qw(Base AnyEvent::Handle);
BEGIN {
    Base->import;
}


use AnyEvent::Socket;
use IO::String;

has qw(backend);
has qw(headers);
has qw(headers_done);
has qw(host_header);
has qw(content_length);
has qw(send_size);
has qw(keep_alive);
has qw(websocket);

sub DESTROY {
    my ($browser) = @_;

    # say("DESTROY: Browser");

    if ($browser->backend) {
        $browser->backend->destroy;
    }
}

sub new {
    my ($self, $fh, $host, $port) = @_;

    my %tls = ();
    if ($main::Config{cert_file}) {
        if ($main::Config{listen}{"${host}:$port"}{tls}) {
            $tls{tls} = "accept";
            $tls{tls_ctx} = { cert_file => $main::Config{cert_file}, key_file => $main::Config{key_file} };
        }
    }

    my $browser;
    $browser = shift->SUPER::new(
        fh => $fh,
        timeout => 30,
        on_timeout => sub {
            my $h = $browser->headers;
            $h->setpos(0);

            my $line = <$h> || "";
            $line =~ s#\015\012##;
            chomp($line);

            $line = $line && !$tls{tls} ? " [$line]" : "";

            AE::log error => "Operation timed out$line";

            if ($browser->backend) {
                $browser->backend->destroy;
            }

            $browser->destroy;
        },
        on_error => sub {
            AE::log error => $_[2];
            $browser->destroy;
        },
        on_eof => sub {
            $browser->destroy;
            if ($browser->backend) {
                $browser->backend->destroy;

                undef($browser->backend);
            }
            undef($browser);
            AE::log info => "Done.";
        }, 
        on_read => \&default_read,
        %tls,
    );

    $browser->init;

    return($browser);
}

sub init {
    my ($browser) = @_;

    $browser->headers(IO::String->new);
    $browser->headers_done(0);
    $browser->host_header("");
    $browser->content_length(undef);
    $browser->send_size(0);
    $browser->keep_alive(1);
    $browser->websocket(0);
}

sub restart {
    my ($browser, $backend) = @_;

    say("===") if $ENV{RHTTP_PROXY_LOG};

    $backend->init;
    $browser->init;

    $browser->on_drain(undef);
    $backend->on_drain(undef);

    $browser->start_read;
    $backend->start_read;
}

sub pipe_websocket {
    my ($browser) = @_;

    my $msg = $browser->rbuf;
    substr($browser->rbuf, 0) = "";

    $browser->backend->push_write($msg);

    say(">>> [ws:bck] " . length($msg)) if $ENV{RHTTP_PROXY_LOG};
}

sub pipe_browser_content {
    my ($browser) = @_;

    my $msg = $browser->rbuf;
    substr($browser->rbuf, 0) = "";

    $browser->send_size($browser->send_size - length($msg));
    say(">>> " . $browser->content_length() . " " . $browser->send_size() . " " . length($msg)) if $ENV{RHTTP_PROXY_LOG};
    say("    >>> $msg") if $ENV{RHTTP_PROXY_LOG};

    $browser->backend->push_write($msg);

    if (0 == $browser->send_size) {
        return 1;
    }

    return 0;
}

sub default_read {
   my ($browser) = @_;
 
   if ($browser->websocket) {
       $browser->push_read(\&pipe_websocket);
   }
   else {
       $browser->push_read(line => \&get_headers);
   }
}

sub get_headers {
    my ($browser, $line, $eol) = @_;

    my $h = $browser->headers;

    if (!$line) {
        print($h "\015\012");
        $browser->headers_done(1);

        $h->setpos(0);
        while (<$h>) {
            if (/Host:\s+([:\d\w\.-]+)/) {
                my $tmp = $1;
                $tmp =~ s#\015\012##;
                $browser->host_header($tmp);
            }
            if (/Content-Length: (\d+)/) {
                $browser->content_length($1);
                $browser->send_size($1);
            }
            if (/Connection: close/) {
                $browser->keep_alive(0);
            }
            print("<<< $_") if ($ENV{RHTTP_PROXY_LOG});
        }
        $h->setpos(0);

        $browser->connect_backend;
    }
    elsif ($line && !$browser->headers_done) {
        print($h "$line$eol");
    }
    else {
        die;
    }
}

sub connect_backend {
    my ($browser) = @_;

    my $vhosts = $main::Config{vhost};

    my $host_header = $browser->host_header;
    my $host;
    my $port;
    my $vhost;

    foreach my $key (keys %$vhosts) {
        if ($key =~ m/^dbi/) {
            my $config = DBX->vhost($key, $host_header);
            if ($config) {
                my ($tls);

                ($host, $port, $tls) = split(/:/, $config);

                $vhost = {};
                $$vhost{host} = $host;
                $$vhost{port} = $port;
                $$vhost{tls} = $tls;
                
                last;
            }
        }
        if ($host_header eq $key) {
            $host = $vhosts->{$key}{host};
            $port = $vhosts->{$key}{port};
            $vhost = $vhosts->{$key};
        }
    }

    if (!$host && !$port) {
        if ($ENV{RHTTP_PROXY_LOG}) {
            say("Can't tcp_connect without a host and port: " . $host_header);
        }

        return;
    }

    tcp_connect($host => $port, sub {
        eval {
            my $fh = shift or die "unable to connect: [$host:$port]: $!";
            say("tcp_connect($_[0]:$_[1]) [$$vhost{tls}]") if ($ENV{RHTTP_PROXY_LOG});
            my $backend = Backend->new(fh => $fh, browser => $browser, vhost => $vhost);

            $backend->browser->parse_header;
        };
        warn($@) if $@;
    }) unless $browser->backend;
}

sub parse_header {
    my ($browser) = @_;

    if ($browser->content_length) {
        # Send headers to backend
        $browser->stop_read;
        $browser->backend->push_write(${ $browser->headers->string_ref });

        # "pipe" content data if avail, if not, then read backend headers
        if ($browser->content_length) {
            $browser->backend->on_drain(sub { $browser->unshift_read(sub { shift->pipe_browser_content }) });
        }
    }
    else {
        # Write the non post request
        $browser->backend->push_write(${ $browser->headers->string_ref });
    }
}

package DBX;

use DBIx::Connector;

sub vhost {
    my ($self, $dsn, $header) = @_;

    state $conn = {};
    state $lb = {};
    
    $$conn{$dsn} //= DBIx::Connector->new($dsn, undef, undef, {
        RaiseError => 1,
        PrintError => 0,
        AutoCommit => 0,
        pg_server_prepare => 0,
        pg_enable_utf8 => 1,
    });

    my $sql = "SELECT COUNT(config) FROM vhost WHERE header = ?";
    my $count = col($$conn{$dsn}, $sql, undef, $header);

    if (!$$lb{$header} || $count != $$lb{$header}{count}) {
        $$lb{$header} = {};
        $$lb{$header}{count} = $count;
        $$lb{$header}{offset} = 0;
    }

    my $offset = $$lb{$header}{offset};

    $sql = "SELECT config FROM vhost WHERE header = ? ORDER BY config LIMIT 1 OFFSET $offset";

    # It's a loadbalancer
    ++$offset;
    if ($offset >= $count) {
        $$lb{$header}{offset} = 0;
    }
    else {
        $$lb{$header}{offset} = $offset;
    }
    
    return(col($$conn{$dsn}, $sql, undef, $header));
}
sub col {
    my $conn = shift;
    my $sql = shift;
    my $attrs = shift;
    my @vars = @_;

    my $ret = $conn->dbh()->selectcol_arrayref($sql, $attrs, @vars);
    if ($ret && $$ret[0]) {
        return($$ret[0]);
    }

    return(undef);
}

package main;

use strict;
use warnings;

use Modern::Perl;

use AnyEvent;
use AnyEvent::Handle;
use AnyEvent::Log;
use AnyEvent::Socket;
use Getopt::Long;
use JSON::PP;

our $VERSION = "0.03";

$| = 1;

$AnyEvent::Log::FILTER->level("info");

our %Config = (
    config_file => "rhttp.json",
    cert_file => "",
    key_file => "",
    vhost => {},
    listen => {}
);

# load balancer with round-robin, least-connected

GetOptions(\%Config, "config_file|config=s", "host=s", "port=s", "add=s", "del=s") or exit;

if ($Config{add}) {
    my $json_config = config($Config{config_file});

    my $add = delete($Config{add});
    %Config = %{$json_config} if defined $json_config;

    if ($add =~ m#^vhost:(?<vhost>[^=]+)=(?<host>[^:]+):(?<port>\d+):tls_(?<tls>off|on)#) {
        my ($vhost, $host, $port, $tls) = ($+{vhost}, $+{host}, $+{port}, $+{tls});

        $Config{vhost}{$vhost}{host} = $host;
        $Config{vhost}{$vhost}{port} = $port;
        $Config{vhost}{$vhost}{tls} = "off" eq $tls ? 0 : 1;
    }
    elsif ($add =~ m#^vhost:dsn:(?<dsn>.*)#) {
        my ($dsn) = ($+{dsn});

        $Config{vhost}{$dsn} = 1;
    }
    elsif ($add =~ m#^listen:(?<ip>[^:]+):(?<port>\d+):tls_(?<tls>off|on)#) {
        my ($ip, $port, $tls) = ($+{ip}, $+{port}, $+{tls});

        $Config{listen}{"${ip}:$port"} = {ip => $ip, port => $port, tls => "off" eq $tls ? 0 : 1 };
    }
    elsif ($add =~ m#^host:(?<host>.*)#) {
        $Config{host} = $+{host};
    }
    elsif ($add =~ m#^port:(?<port>.*)#) {
        $Config{port} = $+{port};
    }
    elsif ($add =~ m#^cert_file:(?<cert_file>.*)#) {
        $Config{cert_file} = $+{cert_file};
    }
    elsif ($add =~ m#^key_file:(?<key_file>.*)#) {
        $Config{key_file} = $+{key_file};
    }
    else {
        die("Unknown -add option ($add)\n");
    }

    config($Config{config_file}, JSON::PP->new->ascii->pretty->encode(\%Config));

    exit;
}

if ($Config{del}) {
    my $json_config = config($Config{config_file});

    my $del = delete($Config{del});
    %Config = %{$json_config} if defined $json_config;

    if ($del =~ m#^vhost:(?<vhost>\S+)#) {
        my ($vhost) = ($+{vhost});

        if ($Config{vhost}{$vhost}) {
            delete($Config{vhost}{$vhost});
        }
        else {
            die("No vhost config found for ($vhost)\n");
        }
    } elsif ($del =~ m#^listen:(?<ip>[^:]+):(?<port>\d+)#) {
        my ($ip, $port) = ($+{host}, $+{port});

        my $listen = "${ip}:$port";

        if ($Config{listen}{$listen}) {
            delete($Config{listen}{$listen});
        }
        else {
            die("No listen config found for ($listen)\n");
        }
    }

    config($Config{config_file}, JSON::PP->new->ascii->pretty->encode(\%Config));

    exit;
}

my $json_config = config($Config{config_file});
%Config = %{$json_config} if defined $json_config;

if (0 == keys %{$Config{vhost}} || 0 == keys %{$Config{listen}}) {
    warn("Please configure rhttp.pl.\n");
    warn("Possibly:\n\n");

    warn("\$ perl rhttp.pl -add vhost:domain.com:80=127.0.0.1:3001:tls_off\n");
    warn("\$ perl rhttp.pl -add vhost:domain.com=127.0.0.1:3001:tls_off\n");
    warn("\$ perl rhttp.pl -add listen:192.168.10.12:80:tls_oiff\n");
    warn("\$ perl rhttp.pl -add listen:192.168.10.12:443:tls_on\n");

    exit;
}

my %server = ();

foreach my $listen (sort keys %{$Config{listen}}) {
    my ($host, $port) = split(/:/, $listen);

    say("tcp_server($host:$port) [tls: $Config{listen}{$listen}{tls}]") if ($ENV{RHTTP_PROXY_LOG});
    $server{$listen} = tcp_server($host, $port, sub {
        my ($fh, $peerhost, $peerport) = @_;

        my $listen = "${host}:$port";

        say("proxy_accept($peerhost:$peerport) [tls: $Config{listen}{$listen}{tls}]") if ($ENV{RHTTP_PROXY_LOG});

        Browser->new($fh, $host, $port);
    });
}

my $done = AnyEvent->condvar;
my $w = AnyEvent->signal (signal => "INT", cb => sub { $done->send });
$done->recv;

sub dumper {
    require Data::Dumper;
    $Data::Dumper::Useqq = 1;
    $Data::Dumper::Useqq = 1;

    print Data::Dumper::Dumper(\@_);
}

sub config {
    my ($file, $json) = @_;

    if ($json) {
        open(my $h, "> $file") or die("error: open: $file\n");
        print($h $json);
        close($file);

        return(JSON::PP::decode_json($json));
    }
    else {
        unless (-f $file) {
            return(undef);
        }

        open(my $h, $file) or die("error: open: $file\n");
        my $text = join("", <$h>);
        close($file);

        return(JSON::PP::decode_json($text));
    }
}

__END__


=pod

=head1 NAME

rhttp.pl - Reverse HTTP Proxy

=head1 SYNOPSIS

B<rhttp.pl>

=head1 OPTIONS

=over 4

=item B<-config>

Which config file to use.

=item B<-add>

Modify stored config file.  See below.

=item B<-del>

Modify stored config file.  See below.

=item B<-host>

IP to listen on.  Can be set in config file.

=item B<-port>

Port to listen on.  Can be set in config file.

=item B<-cert_file>

SSL cert file

=item B<-key_file>

SSL key file

=back

=head1 DESCRIPTION

This is a reverse http proxy that supports:

    HTTP/HTTPS reverse proxy (GET/POST)

    WebSockets

    VirtualHosts

    SSL

=head1 EXAMPLES

Run a reverse http proxy on domain.com and domain.net that routes to
127.0.0.1:3001 and 127.0.0.1:3002.  The proxy will listen on ip 192.168.10.12.

    $ perl rhttp.pl -add vhost:domain.com:80=127.0.0.1:3001:tls_off
    $ perl rhttp.pl -add vhost:domain.com=127.0.0.1:3001:tls_off
    $ perl rhttp.pl -add vhost:localhost=127.0.0.1:3001:tls_off
    $ perl rhttp.pl -add vhost:127.0.0.1:80=127.0.0.1:3001:tls_off
    $ perl rhttp.pl -add listen:127.0.0.1:80
    $ perl rhttp.pl

    $ perl rhttp.pl -del vhost:localhost
    $ perl rhttp.pl -del listen:127.0.0.1:80

=head1 NOTES

This program is considered beta.

=head1 AUTHOR
 
Brian Medley - C<bpmedley@cpan.org>

=cut


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.