* provides L<JSON::MaybeUTF8/encode_json_text>, L<JSON::MaybeUTF8/encode_json_utf8>,
L<JSON::MaybeUTF8/decode_json_text>, L<JSON::MaybeUTF8/decode_json_utf8>, L<JSON::MaybeUTF8/format_json_text>
=ite
se Scalar::Util;
use List::Util;
use List::Keywords;
use Future::Utils;
use Module::Load ();
use JSON::MaybeUTF8;
use Unicode::UTF8;
use Heap;
use IO::Async::Notifier;
use Log::Any qw($log);
use Op
'::' . $_} = JSON::MaybeUTF8->can($_) for qw(
encode_json_text
encode_json_utf8
decode_json_text
decode_json_utf8
format_json_text
try {
my $payload = $_;
my $message = Myriad::RPC::Message::from_json($payload);
if(my $pending = delete $pending_requests->{$message->message_id}) {
return $redis->xadd(
encode_utf8($stream) => '*',
data => encode_json_utf8($event),
);
}))->ordered_futures(
low => 100,
hi
for my $event (@events) {
try {
my $event_data = decode_json_utf8($event->{data}->[1]);
$log->tracef('Passing event: %s | from stream: %s
port.
=head2 deadline
An epoch that represents when the timeout of the message.
=head2 args
A JSON encoded string contains the argument of the procedure.
=head2 response
The response to this mes
n a simple hash with the message data, it mustn't return nested hashes
so it will convert them to JSON encoded strings.
=head2 from_hash
a static method (can't be done with Object::Pad currently) th
he format returned by C<as_hash>.
=head2 as_json
returns the message data as a JSON string.
=head2 from_json
a static method that tries to parse a JSON string
and return a L<Myriad::RPC::Message>.
ervice, $message->rpc);
try {
await $self->redis->publish($message->who, $message->as_json);
await $self->redis->ack($stream, $self->group_name, $message->transport_id);
$p
f('Received RPC response as %s', $payload);
my $message = Myriad::RPC::Message::from_json($payload);
if(my $pending = delete $pending_requests->{$message->message_id}) {
->response = { response => $response };
await $transport->publish($message->who, $message->as_json);
await $transport->ack_message($stream, $self->group_name, $message->transport_id);
$pro
>message, reason => $error->reason } };
await $transport->publish($message->who, $message->as_json);
await $transport->ack_message($stream, $self->group_name, $message->transport_id);
$pro
r doesn't
match the structure.
=cut
use Scalar::Util qw(blessed);
use Syntax::Keyword::Try;
use JSON::MaybeUTF8 qw(:v1);
has $rpc;
has $message_id;
has $transport_id;
has $who;
has $deadline;
has
represents when the timeout of the message.
=cut
method deadline { $deadline }
=head2 args
A JSON encoded string contains the argument of the procedure.
=cut
method args { $args }
=head2 respo
n a simple hash with the message data, it mustn't return nested hashes
so it will convert them to JSON encoded strings.
=cut
method as_hash () {
my $data = {
rpc => $rpc,
who =>