Group
Extension

BoardStreams/lib/Mojolicious/Plugin/BoardStreams.pm

package Mojolicious::Plugin::BoardStreams;

use Mojo::Base 'Mojolicious::Plugin', -signatures, -async_await;

use BoardStreams::Registry;
use BoardStreams::DBUtil qw/
    query_throwing_exception_object_p exists_p
    row_exists query_throwing_exception_object
/;
use BoardStreams::Util qw/
    trim :bool unique_id hashify next_tick_p sleep_p
    encode_json decode_json
/;
use BoardStreams::Exceptions qw/ jsonrpc_error /;
use BoardStreams::REs;
use BoardStreams::DBMigrations;

use Mojo::IOLoop;
use RxPerl::Mojo ':all';
use Syntax::Keyword::Try;
use Syntax::Keyword::Dynamically;
use Safe::Isa;
use Text::Trim;
use Struct::Diff 'diff';
use List::AllUtils 'pairs', 'min';
use Crypt::Digest::SHA256 'sha256_b64u';
use Storable 'dclone';
use Carp 'croak';

no autovivification;

use experimental 'isa';

use constant UNIQUE_STREAM_NAME_INDEX => 'bs_streams_uidx_name';
use constant {
    DEFAULT_HEARTBEAT_INTERVAL => 5,
    DEFAULT_REPAIR_INTERVAL    => 40,
    DEFAULT_PING_INTERVAL      => 15,
    DEFAULT_NOTIFY_SIZE_LIMIT  => 8_000,
    MAX_WEBSOCKET_SIZE         => 262_144,
};

our $VERSION = "v0.0.36";

has _listener_observables => sub { +{} };

my $MAX_WEBSOCKET_SIZE = MAX_WEBSOCKET_SIZE;

our @CARP_NOT;

sub register ($self, $app, $config) {
    # config options
    my $HEARTBEAT_INTERVAL = $config->{heartbeat_interval} // DEFAULT_HEARTBEAT_INTERVAL;
    my $REPAIR_INTERVAL = $config->{repair_interval} // DEFAULT_REPAIR_INTERVAL;
    my $PING_INTERVAL = $config->{ping_interval} // DEFAULT_PING_INTERVAL;
    my $NOTIFY_SIZE_LIMIT = ($config->{notify_size_limit} // DEFAULT_NOTIFY_SIZE_LIMIT) - 1;

    my $pg = $config->{Pg};
    my $registry = BoardStreams::Registry->new;
    my $worker_id;   # UUID
    my $am_repairer; # is this worker process the repairer?
    my $is_finished; # is this worker process shutting down?
    my $can_accept_clients = 0;

    # Migrate database to newest schema
    BoardStreams::DBMigrations->apply_migrations($pg);

    $app->helper('bs.worker_id' => sub ($c) { $worker_id });

    my sub stop_gracefully {
        Mojo::IOLoop->stop_gracefully unless $is_finished;
    }

    my sub get_pg_channel_name ($stream_name) {
        return sha256_b64u("boardstreams.$stream_name");
    }

    my $on_client_finish_sub = async sub ($c) {
        $registry->is_conn_registered($c) or return;
        $registry->unregister_conn($c);
        await first_value_from(
            $registry->pending_joins->{$c}->pipe(
                op_filter(sub { $_ == 0 }),
            ),
        );
        await next_tick_p; # to allow pending join handlers to send their return values
        my $streams_and_counts_of_conn = $registry->get_streams_and_counts_of_conn($c) or return;

        my $db = dynamically $BoardStreams::db = $pg->db;

        PAIR:
        foreach my $stream_name (keys %$streams_and_counts_of_conn) {
            $registry->is_member_of($c, $stream_name) or next PAIR;

            my $tx = $db->begin;

            await exists_p(
                $db,
                'bs_streams',
                { name => $stream_name },
                { for => 'update' },
            ) or next PAIR;

            $registry->is_member_of($c, $stream_name) or next PAIR;

            if (my $leave_sub = $registry->get_leave($stream_name)) {
                my $left_completely;
                while (! $left_completely) {
                    $left_completely = $registry->remove_membership($c, $stream_name)
                        and (delete $registry->conn_subscriptions->{$c}{$stream_name})->unsubscribe;

                    try {
                        my $result = $leave_sub->($c, $stream_name);
                        await $result if $result->$_can('then');
                    } catch ($e) {
                        $c->log->fatal("Couldn't execute leave sub, stopping worker: $e");
                        # so as to not leave the streams in an inconsistent state
                        stop_gracefully();
                    };
                }
            } else {
                1 until $registry->remove_membership($c, $stream_name);
                (delete $registry->conn_subscriptions->{$c}{$stream_name})->unsubscribe;
            }

            $tx->commit;
        }
    };

    # all workers execute this at the beginning
    Mojo::IOLoop->next_tick(sub {
        $worker_id = unique_id;

        # set $can_accept_clients
        {
            my $pubsub_connected_o = rx_merge(
                rx_from_event($pg->pubsub, 'reconnect')->pipe(op_map_to(1)),
                rx_from_event($pg->pubsub, 'disconnect')->pipe(op_map_to(0)),
            )->pipe(op_start_with(0));

            # stop & kick everyone on these events
            rx_merge(
                # if disconnect from pg
                $pubsub_connected_o->pipe(
                    op_pairwise(),
                    op_filter(sub {
                        my ($prev, $curr) = @$_;
                        return $prev && ! $curr;
                    })
                ),

                # if not connected to pg the first three seconds
                rx_timer(3)->pipe(
                    op_take_until(
                        $pubsub_connected_o->pipe(op_filter(sub { $_ })),
                    ),
                ),
            )->subscribe(sub {
                $can_accept_clients = 0;
                stop_gracefully();
            });

            # allow connections on...
            $pubsub_connected_o->pipe(
                op_filter(sub { $_ }),
            )->subscribe(sub { $can_accept_clients = 1 });
        }

        # this is to... (?)
        $pg->pubsub->listen('boardstreams.dummy' => sub {});

        # create worker row
        $pg->db->insert(
            'bs_workers',
            {
                id           => $worker_id,
                dt_heartbeat => \'CURRENT_TIMESTAMP',
            },
            { on_conflict => undef },
        )->rows or stop_gracefully();

        # store heartbeat, shutdown if worker row is missing
        rx_timer(rand($HEARTBEAT_INTERVAL), $HEARTBEAT_INTERVAL)->subscribe(async sub {
            # update heartbeat or stop and finish
            (await $pg->db->update_p(
                'bs_workers',
                { dt_heartbeat => \'CURRENT_TIMESTAMP' },
                { id => $worker_id },
            ))->rows or stop_gracefully();
        });

        # elect repairer if needed, and repair
        rx_timer(rand($REPAIR_INTERVAL), $REPAIR_INTERVAL)->subscribe(async sub {
            my $db = $pg->db;

            # revolt against absent ruler
            await $db->delete_p(
                'bs_workers',
                {
                    is_repairer  => 1,
                    dt_heartbeat => { '<', \["NOW() - INTERVAL '1 SECOND' * ?", 2 * $HEARTBEAT_INTERVAL] }
                }
            );

            my $repairer_row = (await $db->select_p(
                'bs_workers',
                'id',
                { is_repairer => 1 },
            ))->hashes->[0];

            if ($repairer_row) {
                $am_repairer = $repairer_row->{id} eq $worker_id;
            } else {
                $am_repairer = 1 if (await $db->update_p(
                    'bs_workers',
                    { is_repairer => 1 },
                    { id => $worker_id },
                    { on_conflict => undef },
                ))->rows;
            }

            if ($am_repairer) {
                await $app->bs->repair_p;
            }
        });

        # on ioloop finish, make clients leave and remove worker row
        rx_from_event(Mojo::IOLoop->singleton, 'finish')->pipe(
            op_take(1),
        )->subscribe(async sub {
            $is_finished = 1;
            # this is to make clients leave their streams before before deleting worker row, to avoid
            # having the repairer repairing w/o reason
            $_->finish foreach $registry->get_all_conns->@*;
            foreach my $conn ($registry->get_all_conns->@*) {
                await $on_client_finish_sub->($conn);
            };

            await $pg->db->delete_p(
                'bs_workers',
                { id => $worker_id },
            );
        });
    });

    # repair all streams that need repairing
    $app->helper('bs.repair_p' => async sub ($c) {
        my $db = dynamically $BoardStreams::db = $pg->db;

        await $db->delete_p(
            'bs_workers',
            { dt_heartbeat => { '<', \["NOW() - INTERVAL '1 SECOND' * ?", 2 * $HEARTBEAT_INTERVAL] } },
        );

        my @dead_worker_ids = (await $db->select_p(
            [
                'bs_guards',
                [-left, 'bs_workers', id => 'worker_id'],
            ],
            [\'DISTINCT bs_guards.worker_id'],
            { 'bs_workers.id' => undef },
        ))->hashes->map(sub {$_->{worker_id}})->@* or return;

        while (1) {
            my $tx = $db->begin;

            # lock one stream that needs repair
            my $stream_row = (await $db->select_p(
                [
                    'bs_guards',
                    ['bs_streams', id => 'stream_id'],
                ],
                ['bs_streams.id', 'bs_streams.name'],
                { 'bs_guards.worker_id' => {-in, \@dead_worker_ids} },
                {
                    limit => 1,
                    for => 'update',
                },
            ))->hashes->[0] or last;
            my ($stream_id, $stream_name) = $stream_row->@{qw/ id name /};

            my @new_dead_worker_ids_for_stream;
            my $get_dead_worker_ids = async sub {
                my $results = await $db->select_p(
                    [
                        'bs_guards',
                        [-left, 'bs_workers', id => 'worker_id'],
                    ],
                    [\'DISTINCT bs_guards.worker_id'],
                    {
                        'bs_guards.stream_id' => $stream_id,
                        'bs_workers.id'       => undef,
                    },
                );
                @new_dead_worker_ids_for_stream = $results->arrays->map(sub {$_->[0]})->@*;
                return { map {( $_ => 1 )} @new_dead_worker_ids_for_stream };
            };

            # repair stream
            if (my $stream_repair_sub = $registry->get_repair($stream_name)) {
                my $ret = $stream_repair_sub->($c, $stream_name, $get_dead_worker_ids);
                await $ret if $ret->$_can('then');
            }

            # delete guards pointing to this stream
            await $db->delete_p(
                'bs_guards',
                {
                    stream_id => $stream_id,
                    worker_id => {-in, \@new_dead_worker_ids_for_stream},
                },
            ) if @new_dead_worker_ids_for_stream;

            $tx->commit;
        }
    });

    # send JSON, but only if transaction is not destroyed
    async sub _send_p ($c, $data) {
        my sub get_max_size {
            my $tx = $c->tx or return $MAX_WEBSOCKET_SIZE;
            return min($tx->max_websocket_size, $MAX_WEBSOCKET_SIZE);
        }

        my $message = encode_json $data;
        my $whole_length = length $message;
        if ($whole_length <= get_max_size) {
            $c->tx or return !!0; # check if transaction is destroyed
            $c->send({ binary => $message });
            return 1;
        }

        my $identifier = $c->stash->{'boardstreams.outgoing_uuid'}++;

        for (my ($i, $cursor, $sent_ending) = (0, 0, 0); ! $sent_ending; $i++) {
            my $bytes_prefix;
            my $ending_prefix = ":$identifier $i\$: ";
            my $max_size = get_max_size;
            if (length($ending_prefix) + $whole_length - $cursor <= $max_size) {
                $bytes_prefix = $ending_prefix;
                $sent_ending = 1;
            } else {
                $bytes_prefix = ":$identifier $i: ";
            }

            my $max_sublength = $max_size - length $bytes_prefix;
            my $substring = $cursor <= $whole_length ? substr($message, $cursor, $max_sublength) : '';
            $cursor += $max_sublength;

            $c->tx or return !!0; # check if transaction is destroyed
            $c->send({ binary => $bytes_prefix . $substring });

            # don't cause other threads to hang if message is very large
            await next_tick_p unless $sent_ending;
        }

        return 1;
    }

    $app->helper('bs.init_client_p' => async sub ($c) {
        $can_accept_clients or return $c->rendered(503);

        $registry->conn_subscriptions->{$c} = {};
        $registry->pending_joins->{$c} = rx_behavior_subject->new(0);
        $registry->register_conn($c);
        $c->stash->{'boardstreams.outgoing_uuid'} = 'a';

        $c->on(finish => async sub ($_c, @) {
            await $on_client_finish_sub->($_c);
        });

        await sleep_p(0.25); # mojo issue 1895

        $c->on(text => async sub ($_c, $bytes) {
            my $data = decode_json $bytes;

            my $id;

            # pong and return on ping
            if (($data->{type} // '') eq 'ping') {
                await _send_p($_c, { type => 'pong' });
                return;
            }

            try {
                defined $data->{jsonrpc} and $data->{jsonrpc} eq '2.0'
                    or die 'incoming message is not jsonrpc 2.0';

                (my $method, my $params, $id) = $data->@{qw/ method params id /};

                ! $is_finished or die jsonrpc_error 503, 'this server worker stopped, please try again';
                $registry->is_conn_registered($_c)
                    or die jsonrpc_error 503, "can't receive because connection is closing";

                if ($method eq 'doAction') {
                    # params
                    my ($stream_name, $action_name, $payload) = @$params;

                    # validation
                    ! defined $id or die "action '$action_name' on stream '$stream_name' contains extra id ($id)\n";
                    $registry->is_member_of($_c, $stream_name)
                        or die "Connection has not joined '$stream_name' but tried to do action '$action_name'\n"
                        unless $stream_name eq '!open';

                    # fetch + act
                    my $action_sub = $registry->get_action($stream_name, $action_name)
                        or die "invalid action '$action_name' on stream '$stream_name'\n";

                    my $ret = $action_sub->($_c, $stream_name, $payload);
                    await $ret if $ret->$_can('then');
                } elsif ($method eq 'doRequest') {
                    # params
                    my ($stream_name, $request_name, $payload) = @$params;

                    # validation
                    defined $id and ! length ref $id
                        or die "request '$request_name' on stream '$stream_name' has missing or invalid id ("
                            . ($id // 'undef') . ")\n";
                    $registry->is_member_of($_c, $stream_name)
                        or die jsonrpc_error(
                            403,
                            "Connection has not joined '$stream_name' but tried to do request '$request_name'"
                        )
                        unless $stream_name eq '!open';

                    # fetch + act
                    my $request_sub = $registry->get_request($stream_name, $request_name)
                        or die "invalid request '$request_name' on stream '$stream_name'\n";

                    my $result = $request_sub->($_c, $stream_name, $payload);
                    $result = await $result if $result->$_can('then');

                    # respond
                    await _send_p($_c, {
                        jsonrpc => '2.0',
                        result  => $result,
                        id      => $id,
                    });
                } else {
                    die jsonrpc_error -32_601, 'invalid method', { method => $method };
                }
            } catch ($e) {
                $_c->log->error(trim "$e");
                if (defined $id) {
                    my $jsonrpc_error =
                        $e isa 'BoardStreams::Error::JSONRPC' ? $e
                        : jsonrpc_error 500, trim("$e");

                    await _send_p($_c, {
                        jsonrpc => '2.0',
                        error   => $jsonrpc_error,
                        id      => $id,
                    });
                }
            };
        }) if $c->tx;

        await _send_p($c, {
            type => 'config',
            data => {
                pingInterval => 0 + $PING_INTERVAL,
            },
        });
    });

    $app->hook(around_action => async sub ($next, $c, $action, $last) {
        if ($last and $c->stash->{'boardstreams.endpoint'}) {
            $c->render_later;
            try {
                my $ret = $next->();
                await $ret if $ret->$_can('then');
                return await $c->bs->init_client_p;
            } catch ($e) {
                await sleep_p(1.5);
                await _send_p($c, { type => 'connection failure', requestId => scalar eval {$c->req->request_id} });
                $c->finish if $c->tx;
                $c->log->error($e);
                die $e;
            };
        } else {
            return $next->();
        }
    });

    $app->helper('bs.create_stream_p' => async sub ($c, $stream_name, $starting_state, %opts) {
        local @CARP_NOT = qw/ Mojolicious::Renderer /,
            croak 'Not in an event loop, using bs->create_stream_p; use bs->create_stream instead'
            unless Mojo::IOLoop->is_running;

        # opts can be: type, keep_events
        my $type = $opts{type};
        my $keep_events = exists $opts{keep_events} ? int(!!$opts{keep_events}) : 1;

        # validate params
        $stream_name =~ $BoardStreams::REs::STREAM_NAME
            or croak "invalid stream name: '$stream_name'";
        defined $starting_state
            or croak "starting state not defined";

        my $db = $BoardStreams::db // $pg->db;

        # TODO: Consider locking this row for update
        return !!0 if await exists_p($db, 'bs_streams', { name => $stream_name });

        my $savepoint_name = unique_id;
        my ($in_txn) = eval {
            await $db->query_p(qq{SAVEPOINT "$savepoint_name"});
            1
        };

        try {
            await query_throwing_exception_object_p($db, 'insert_p', [
                'bs_streams',
                {
                    name        => $stream_name,
                    state       => { -json => $starting_state },
                    type        => $type,
                    keep_events => $keep_events,
                },
            ]);

            return 1;
        } catch ($e) {
            die $e
                unless $e isa 'BoardStreams::Error::DB::Duplicate'
                    and $e->data->{key_name} eq UNIQUE_STREAM_NAME_INDEX;

            await $db->query_p(qq{ROLLBACK TO "$savepoint_name"}) if $in_txn;
            return !!0;
        };
    });

    $app->helper('bs.create_stream' => async sub ($c, $stream_name, $starting_state, %opts) {
        local @CARP_NOT = qw/ Mojolicious::Renderer /,
            croak 'In an event loop, using bs->create_stream; use await bs->create_stream_p instead'
            if Mojo::IOLoop->is_running;

        # opts can be: type, keep_events
        my $type = $opts{type};
        my $keep_events = exists $opts{keep_events} ? int(!!$opts{keep_events}) : 1;

        # validate params
        $stream_name =~ $BoardStreams::REs::STREAM_NAME
            or croak "invalid stream name: '$stream_name'";
        defined $starting_state
            or croak "starting state not defined";

        my $db = $BoardStreams::db // $pg->db;

        # TODO: Consider locking this row for update
        return !!0 if row_exists($db, 'bs_streams', { name => $stream_name });

        my $savepoint_name = unique_id;
        my ($in_txn) = eval { $db->query(qq{SAVEPOINT "$savepoint_name"}); 1 };

        try {
            query_throwing_exception_object($db, 'insert', [
                'bs_streams',
                {
                    name        => $stream_name,
                    state       => { -json => $starting_state },
                    type        => $type,
                    keep_events => $keep_events,
                },
            ]);

            return 1;
        } catch ($e) {
            die $e
                unless $e isa 'BoardStreams::Error::DB::Duplicate'
                    and $e->data->{key_name} eq UNIQUE_STREAM_NAME_INDEX;

            $db->query(qq{ROLLBACK TO "$savepoint_name"}) if $in_txn;
            return !!0;
        };
    });

    $app->helper('bs.lock_stream_p' => async sub ($c, $stream_name, $sub) {
        my $is_array = ref $stream_name eq 'ARRAY';
        my $stream_names = $is_array ? $stream_name : [$stream_name];
        await $c->bs->do_txn_p(async sub ($db) {
            my $stream_rows = (await $db->select_p(
                'bs_streams',
                [qw/ id name state keep_events /],
                { name => {-in, $stream_names} },
                { for => 'update' },
            ))->expand->hashes;
            my $streams = hashify($stream_rows, ['name']);
            my ($missing_stream) = grep { not exists $streams->{$_} } @$stream_names;
            ! defined $missing_stream or die "Couldn't find stream $missing_stream in database to lock\n";
            # my ($stream_id, $old_state, $keep_events) = $stream_row->@{qw/ id state keep_events /};

            my @old_states = map $_->{state}, $streams->@{@$stream_names};
            @old_states = map { (dclone [$_])->[0] } @old_states;

            my @ret = $sub->(map $_->{state}, $streams->@{@$stream_names});
            @ret = await $ret[0] if $ret[0]->$_can('then');
            $is_array or @ret = ([@ret]);

            foreach my $sname (@$stream_names) {
                my $old_state = shift @old_states;
                my $stream_id = $streams->{$sname}{id};
                my ($new_event, $new_state, $extra_guards) = (shift @ret)->@*;

                my $new_event_id;
                $new_event_id = (await $db->insert_p(
                    'bs_events',
                    {
                        stream_id => $stream_id,
                        data      => { -json => $new_event },
                    },
                    { returning => 'id' },
                ))->hashes->[0]{id} if $streams->{$sname}{keep_events} and defined $new_event;

                if (defined $new_event or defined $new_state) {
                    $new_event_id //= (await $db->query_p("SELECT nextval('bs_events_id_seq')"))->arrays->[0][0];

                    await $db->update_p(
                        'bs_streams',
                        {
                            defined $new_state ? (state => { -json => $new_state }) : (),
                            event_id => $new_event_id,
                            last_dt  => \'CURRENT_TIMESTAMP',
                        },
                        { id => $stream_id },
                    ) if (defined $new_event and $streams->{$sname}{keep_events}) or defined $new_state;
                }

                my $diff;
                if (defined $new_state) {
                    delete $old_state->{_secret} if ref $old_state eq 'HASH';
                    delete $new_state->{_secret} if ref $new_state eq 'HASH';
                    $diff = diff($old_state, $new_state, noO => 1, noU => 1);
                    $diff = undef if ! %$diff;
                }

                if (defined $extra_guards) {
                    if ($extra_guards > 0) {
                        await $db->insert_p(
                            'bs_guards',
                            {
                                worker_id => $worker_id,
                                stream_id => $stream_id,
                                count     => $extra_guards,
                            },
                            {
                                on_conflict => [
                                    [ 'worker_id', 'stream_id' ] => { count => \'bs_guards.count + EXCLUDED.count' },
                                ],
                            }
                        );
                    } elsif ($extra_guards < 0) {
                        my $guard_row = (await $db->update_p(
                            'bs_guards',
                            { count => \['"count" - ?', abs($extra_guards)] },
                            {
                                worker_id => $worker_id,
                                stream_id => $stream_id,
                            },
                            { returning => 'count' },
                        ))->hashes->[0] or croak "missing guard row";

                        if (! $guard_row->{count}) {
                            await $db->delete_p(
                                'bs_guards',
                                {
                                    worker_id => $worker_id,
                                    stream_id => $stream_id,
                                    count     => 0, # because user may have forgotten to start a txn
                                },
                            );
                        }
                    }
                }

                if (defined $diff or defined $new_event) {
                    my $pg_channel = get_pg_channel_name($sname);
                    my $notification = encode_json({
                        id    => int $new_event_id,
                        event => $new_event,
                        patch => $diff,
                    });
                    my $whole_length = length $notification;
                    if ($whole_length <= $NOTIFY_SIZE_LIMIT) {
                        $db->notify($pg_channel, $notification);
                    } else {
                        # send notification in chunks
                        for (my ($i, $cursor, $sent_ending) = (0, 0, 0); ! $sent_ending; $i++) {
                            my $bytes_prefix;
                            my $ending_bytes_prefix = ":$new_event_id $i\$: ";
                            # this assumes that length $ending_bytes_prefix < $NOTIFY_SIZE_LIMIT
                            if (length($ending_bytes_prefix) + $whole_length - $cursor <= $NOTIFY_SIZE_LIMIT) {
                                $bytes_prefix = $ending_bytes_prefix;
                                $sent_ending = 1;
                            } else {
                                $bytes_prefix = ":$new_event_id $i: ";
                            }

                            my $max_sublength = $NOTIFY_SIZE_LIMIT - length $bytes_prefix;
                            my $substring = $cursor <= $whole_length ? substr($notification, $cursor, $max_sublength) : '';
                            $cursor += $max_sublength;

                            $db->notify($pg_channel, $bytes_prefix . $substring);

                            # don't cause other threads to hang if notification is very large
                            await next_tick_p unless $stream_names->[-1] eq $sname and $sent_ending;
                        }
                    }
                }
            }
        });

        return undef;
    });

    $app->helper('bs.set_action' => sub ($c, $stream_name, $action_name, $sub) {
        $registry->set_action_request(
            action => $stream_name, $action_name, $sub
        );
    });

    $app->helper('bs.set_request' => sub ($c, $stream_name, $request_name, $sub) {
        $registry->set_action_request(
            request => $stream_name, $request_name, $sub
        );
    });

    $app->helper('bs.set_join' => sub ($c, $stream_name, $sub) {
        $registry->set_action_request(
            join_leave => $stream_name, 'join', $sub
        );
    });

    $app->helper('bs.set_leave' => sub ($c, $stream_name, $sub) {
        $registry->set_action_request(
            join_leave => $stream_name, 'leave', $sub
        );
    });

    $app->helper('bs.set_repair' => sub ($c, $stream_name, $sub) {
        $registry->set_action_request(
            join_leave => $stream_name, 'repair', $sub
        );
    });

    $app->helper('bs.get_state_p' => async sub ($c, $stream_name) {
        my $db = $BoardStreams::db // $pg->db;

        my $stream_row = (await $db->select_p(
            'bs_streams',
            [qw/ state /],
            { name => $stream_name },
            { for => 'update' },
        ))->expand->hashes->[0] or return undef;

        return $stream_row->{state};
    });

    # join
    $app->bs->set_request('!open', 'join', async sub ($c, $, $payload) {
        my ($stream_name, $last_id) = $payload->@{qw/ name last_id /};

        # fetch + act
        my $join_sub = $registry->get_join($stream_name)
            or die "stream '$stream_name' has no join method\n";

        my $db = dynamically $BoardStreams::db = $pg->db;
        my $tx = $db->begin;

        await exists_p(
            $db,
            'bs_streams',
            { name => $stream_name },
            { for => 'update' },
        ) or die "stream '$stream_name' does not exist\n";

        $registry->inc_pending_joins_by($c, 1);
        my $result = do {
            try {
                my $result = $join_sub->($c, $stream_name, {
                    is_reconnect => defined $last_id,
                });
                $result = await $result if $result->$_can('then');
                $result;
            } catch ($e) {
                $registry->inc_pending_joins_by($c, -1);
                die $e;
            };
        };

        $result or die jsonrpc_error 403, 'joining not allowed';

        return do {
            try {
                if (my $is_first_join = $registry->add_membership($c, $stream_name)) {
                    my $o = $c->bs->_get_listener_observable($stream_name);
                    my $s = $o->subscribe();
                    $registry->conn_subscriptions->{$c}{$stream_name} = $s;
                }

                my $limit = eval { $result->{limit} } // 0;

                my $stream_row = (await $db->select_p(
                    'bs_streams',
                    [qw/ id event_id state /],
                    { name => $stream_name },
                ))->expand->hashes->[0] or die "Stream $stream_name does not exist in database";
                my ($stream_id, $stream_event_id, $stream_state) = $stream_row->@{qw/ id event_id state /};
                my $past_event_rows = !$limit ? [] : (await $db->select_p(
                    'bs_events',
                    [qw/ id data /],
                    {
                        stream_id => $stream_id,
                        id        => { '<=', $stream_event_id },
                        defined($last_id) ? (id => { '>', $last_id }) : (),
                    },
                    {
                        order_by => { -desc, 'id' },
                        $limit ne 'all' ? (limit => $limit) : (),
                    }
                ))->expand->hashes->reverse;

                $tx->commit;

                delete $stream_state->{_secret} if ref $stream_state eq 'HASH';

                +{
                    state  => {
                        id   => int $stream_event_id,
                        data => $stream_state,
                    },
                    events => [
                        map +{
                            id    => int $_->{id},
                            event => $_->{data},
                        }, @$past_event_rows
                    ],
                };
            } catch ($e) {
                my $is_last_leave = $registry->remove_membership($c, $stream_name);
                if ($is_last_leave and exists $registry->conn_subscriptions->{$c}{$stream_name}) {
                    (delete $registry->conn_subscriptions->{$c}{$stream_name})->unsubscribe();
                }
                die $e;
            } finally {
                $registry->inc_pending_joins_by($c, -1);
            };
        };
    });

    $app->bs->set_request('!open', 'leave', async sub ($c, $, $payload) {
        my $stream_name = $payload;

        my $db = dynamically $BoardStreams::db = $pg->db;
        my $tx = $db->begin;

        await exists_p(
            $db,
            'bs_streams',
            { name => $stream_name },
            { for => 'update' },
        ) or return;

        if (my $leave_sub = $registry->get_leave($stream_name)) {
            if (my $is_last_leave = $registry->remove_membership($c, $stream_name)) {
                (delete $registry->conn_subscriptions->{$c}{$stream_name})->unsubscribe;
            }

            try {
                my $result = $leave_sub->($c, $stream_name);
                await $result if $result->$_can('then');
            } catch ($e) {
                $c->log->fatal("Couldn't execute leave sub, stopping worker: $e");
                # so as to not leave the streams in an inconsistent state
                stop_gracefully();
                die "$e";
            };

            $tx->commit;
        } else {
            if (my $is_last_leave = $registry->remove_membership($c, $stream_name)) {
                (delete $registry->conn_subscriptions->{$c}{$stream_name})->unsubscribe;
            }
        }

        return 'ok';
    });

    $app->helper('bs._get_listener_observable' => sub ($c, $stream_name) {
        return $self->_listener_observables->{$stream_name} //= rx_observable->new(sub ($subscriber) {
            my $pg_channel = get_pg_channel_name($stream_name);
            my ($acc_event_id, $acc_string, $acc_i) = (0, '', -1);
            my $cb = $pg->pubsub->listen($pg_channel => sub ($, $payload) {
                my $msg;
                if ($payload =~ s/^\:([0-9]+) ([0-9]+)(\$)?\: //) {
                    my ($event_id, $i, $is_final) = ($1, $2, $3);
                    $event_id >= $acc_event_id or die 'event_id < acc_event_id';
                    $event_id == $acc_event_id or ($acc_event_id, $acc_string, $acc_i) = ($event_id, '', -1);
                    $i == $acc_i + 1 or return; # on listen, maybe we receive second part of old message
                    $acc_string .= $payload;
                    if ($is_final) {
                        $msg = decode_json $acc_string;
                        ($acc_string, $acc_i) = ('', -2); # -2 is closed to appends
                    } else {
                        $acc_i = $i;
                    }
                } else {
                    $msg = decode_json $payload;
                    my ($event_id) = $msg->{id};
                    $event_id > $acc_event_id or die 'event_id <= acc_event_id';
                    ($acc_event_id, $acc_string, $acc_i) = ($event_id, '', -2);
                }
                $subscriber->next($msg) if defined $msg;
            });

            return sub {
                $pg->pubsub->unlisten($pg_channel => $cb);
                delete $self->_listener_observables->{$stream_name};
            };
        })->pipe(
            # wait until all clients have been sent their message before going on to the next msg
            op_concat_map(sub ($payload, @) {
                my @conns = $registry->get_conns_of_stream($stream_name)->@*;
                my $msg = {
                    type   => 'eventPatch',
                    stream => $stream_name,
                    # payload is +{id, event, patch}
                    %$payload,
                };
                return rx_merge(
                    map rx_from(_send_p($_, $msg)), @conns
                );
            }),
            op_share(),
        );
    });

    $app->helper("bs.delete_events_p", async sub ($c, $stream_name, %opts) {
        # opts can be: keep_num and/or keep_dur
        my @until_ids;

        my $db = $pg->db;

        my $stream_row = (await $db->select_p(
            'bs_streams',
            'id',
            { name => $stream_name },
        ))->hashes->[0] or return;
        my $stream_id = $stream_row->{id};

        if (defined $opts{keep_num}) {
            my $until_row = (await $db->select_p(
                'bs_events',
                'id',
                { stream_id => $stream_id },
                {
                    order_by => { -desc, 'id' },
                    offset   => $opts{keep_num},
                    limit    => 1,
                },
            ))->hashes->[0];
            my $until_id = $until_row ? $until_row->{id} : 0;
            push @until_ids, $until_id;
        }

        if (defined $opts{keep_dur}) {
            my $until_row = (await $db->select_p(
                'bs_events',
                'id',
                {
                    stream_id => $stream_id,
                    datetime  => { '<', \["CURRENT_TIMESTAMP - INTERVAL '1 SECOND' * ?", $opts{keep_dur}] },
                },
                {
                    order_by => { -desc, 'datetime' },
                    limit    => 1,
                }
            ))->hashes->[0];
            my $until_id = $until_row ? $until_row->{id} : 0;
            push @until_ids, $until_id;
        }

        my $until_id = min @until_ids;

        await $db->delete_p(
            'bs_events',
            {
                stream_id => $stream_id,
                defined($until_id) ? (id => {'<=', $until_id}) : (),
            },
        );
    });

    my $deleting_streams = {};
    $app->helper("bs.delete_stream_p", async sub ($c, $stream_name) {
        return 0 if $deleting_streams->{$stream_name};
        dynamically $deleting_streams->{$stream_name} = 1;

        await $c->bs->do_txn_p(async sub ($db) {
            await exists_p(
                $db,
                'bs_streams',
                { name => $stream_name },
                { for => 'update' },
            ) or return 0;


            if (my $leave_sub = $registry->get_leave($stream_name)) {
                foreach my $conn ($registry->get_conns_of_stream($stream_name)->@*) {
                    my $left_completely;
                    while (! $left_completely) {
                        $left_completely = $registry->remove_membership($conn, $stream_name)
                            and (delete $registry->conn_subscriptions->{$conn}{$stream_name})->unsubscribe;

                        try {
                            my $result = $leave_sub->($conn, $stream_name);
                            await $result if $result->$_can('then');
                        } catch ($e) {
                            $c->log->fatal("Couldn't execute leave sub, stopping worker: $e");
                            # so as to not leave the streams in an inconsistent state
                            stop_gracefully();
                        };
                    }
                }
            } else {
                foreach my $conn ($registry->get_conns_of_stream($stream_name)->@*) {
                    1 until $registry->remove_membership($conn, $stream_name);
                    (delete $registry->conn_subscriptions->{$conn}{$stream_name})->unsubscribe;
                }
            }

            await $db->delete_p(
                'bs_streams',
                { name => $stream_name },
            );
        });

        delete $deleting_streams->{$stream_name};

        return 1;
    });

    $app->helper('bs.do_txn_p', async sub ($c, $sub) {
        my $tx;
        dynamically $BoardStreams::db = do { my $db = $pg->db; $tx = $db->begin; $db }
            if ! $BoardStreams::db;
        my $db = $BoardStreams::db;

        my $ret = $sub->($db);
        await $ret if $ret->$_can('then');

        $tx->commit if $tx;
    });
}

1;


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.