s.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html>).
- The RPC protocol is JSON-RPC 2.0 (see the L<specification|https://www.jsonrpc.org/specification>).
- There is no message persistence in the broker
eekeeper> and
then C</etc/beekeeper>. File format is relaxed JSON, which allows comments and trailing commas.
The file C<pool.config.json> defines all worker pools running on a host, specifying whic
ers" : {
"MyApp::Worker" : { "worker_count" : 4 },
},
}]
The file C<bus.config.json> defines all logical buses used by the application, specifying the connection
parameters to the b
tension::SharedCache::Cache; # hide from PAUSE
use Beekeeper::Worker ':log';
use AnyEvent;
use JSON::XS;
use Fcntl qw(:DEFAULT :flock);
use Scalar::Util 'weaken';
use Carp;
use constant SYNC_REQUE
sh => sub {
# my ($payload_ref, $mqtt_properties) = @_;
my $entry = decode_json( ${$_[0]} );
$self->_merge($entry);
},
on_suback => sub {
ish => sub {
my ($payload_ref, $mqtt_properties) = @_;
my $dump = decode_json($$payload_ref);
$self->_merge_dump($dump);
$self->_sync_completed(1);
package Beekeeper::Config;
use strict;
use warnings;
our $VERSION = '0.10';
use JSON::XS;
use Carp;
my %Cache;
my $Config_dir;
sub set_config_dir {
my ($class, $dir) = @_;
croak "Couldn
s ($bus_id);
my $config = $class->read_config_file( 'bus.config.json' );
croak "Couldn't read config file bus.config.json: file not found\n" unless defined ($config);
my %bus_cfg = map
($pool_id);
my $config = $class->read_config_file( 'pool.config.json' );
croak "Couldn't read config file pool.config.json: file not found\n" unless defined ($config);
my %pool_cfg = ma
VERSION = '0.10';
use Beekeeper::AnyEvent;
use Beekeeper::MQTT;
use Beekeeper::JSONRPC;
use Beekeeper::Config;
use JSON::XS;
use Compress::Raw::Zlib ();
use Digest::MD5 'md5_base64';
use Carp;
use
bus_id => $bus_id );
croak "Bus '$bus_id' is not defined into config file bus.config.json" unless $config;
%args = ( %$config, %args );
}
else {
my
fault} } keys %$config;
croak "No default bus defined into config file bus.config.json" unless $default;
$bus_id = $config->{$default}->{'bus_id'};
%arg
=> 7;
use constant LOG_DEBUG => 8;
use constant LOG_TRACE => 9;
use JSON::XS;
use Exporter 'import';
use Time::HiRes;
my $JSON;
our @EXPORT_OK = qw(
LOG_FATAL
LOG_ALERT
LOG_CRIT
L
>{is_connected};
# JSON-RPC notification
$JSON = JSON::XS->new->utf8->allow_blessed->convert_blessed unless defined $JSON;
my $json = $JSON->encode({
jsonrpc => '2.0',
m
|;
$bus->publish(
topic => "log/$level/$service",
payload => \$json,
);
};
if ($@) {
my $msg = $@; chomp($msg);
print STDERR "[$tsta
keeper::JSONRPC;
use strict;
use warnings;
our $VERSION = '0.10';
use Beekeeper::JSONRPC::Request;
use Beekeeper::JSONRPC::Notification;
use Beekeeper::JSONRPC::Response;
use Beekeeper::JSONRPC::Er
Beekeeper::JSONRPC::Request->new(@_);
}
sub notification {
my $class = shift;
Beekeeper::JSONRPC::Notification->new(@_);
}
sub response {
my $class = shift;
Beekeeper::JSONRPC::Respo
{
my $class = shift;
Beekeeper::JSONRPC::Error->new(@_);
}
1;
__END__
=pod
=encoding utf8
=head1 NAME
Beekeeper::JSONRPC - Representation of JSON-RPC objects
=head1 VERSION
Version 0.
package Beekeeper::JSONRPC::Request;
use strict;
use warnings;
our $VERSION = '0.10';
sub new {
my $class = shift;
bless {
jsonrpc => '2.0',
method => undef,
param
d_response(@_);
}
1;
__END__
=pod
=encoding utf8
=head1 NAME
Beekeeper::JSONRPC::Request - Representation of a JSON-RPC request
=head1 VERSION
Version 0.09
=head1 SYNOPSIS
my $client =
print $req->result;
=head1 DESCRIPTION
Objects of this class represent a JSON-RPC request (see L<http://www.jsonrpc.org/specification>).
The method L<Beekeeper::Client::call_remote_async> ret
10';
use Beekeeper::Client ':worker';
use Beekeeper::Logger ':log_levels';
use Beekeeper::JSONRPC;
use JSON::XS;
use Time::HiRes ();
use Sys::Hostname ();
use Compress::Raw::Zlib ();
use Digest::MD5
our $REPORT_STATUS_PERIOD = 5;
our $UNSUBSCRIBE_LINGER = 2;
my %AUTH_TOKENS;
my $DEFLATE;
my $JSON;
sub new {
my ($class, %args) = @_;
# Parameters passed by WorkerPool->spawn_worker
ig => $args{'bus_config'}, # content of bus.config.json
pool_config => $args{'pool_config'}, # content of pool.config.json
pool_id => $args{'pool_id'},
bus_
per::JSONRPC::Error;
use strict;
use warnings;
our $VERSION = '0.10';
use overload '""' => sub { $_[0]->{error}->{message} };
sub new {
my ($class, %args) = @_;
bless {
jsonrpc =
uccess { 0 }
sub TO_JSON { return { %{$_[0]} } }
sub parse_error {
shift->new(
code => -32700,
message => "Parse error",
data => "Invalid JSON was received by the
ft->new(
code => -32600,
message => "Invalid request",
data => "The JSON sent is not a valid request object.",
@_
);
}
sub request_timeout {
shift->new(
package Beekeeper::JSONRPC::Notification;
use strict;
use warnings;
our $VERSION = '0.10';
sub new {
my $class = shift;
bless {
jsonrpc => '2.0',
method => undef,
::JSONRPC::Notification - Representation of a JSON-RPC notification
=head1 VERSION
Version 0.09
=head1 DESCRIPTION
Objects of this class represent a JSON-RPC notification (see L<http://www.jsonr
y. In order to add Router workers to a pool
these must be declared into config file C<pool.config.json>.
=head1 METHODS
=head3 bind_remote_session ( address => $address )
Make authorization data pe
s> and C<on_error> must be coderefs and will receive respectively
L<Beekeeper::JSONRPC::Response> and L<Beekeeper::JSONRPC::Error> objects as arguments.
=head3 unbind_remote_session_async ( on_succe
elf->{users} = {};
my $config = Beekeeper::Config->read_config_file( 'toybroker.config.json' );
# Start a default listener if no config found
$config = [ {} ] unless defined $confi
roker>
option set to a true value in its config file C<pool.config.json>.
ToyBroker is configured from file C<toybroker.config.json>, which is looked for
in ENV C<BEEKEEPER_CONFIG_DIR>, C<~/.config
ds_config) {
die "No bus with role '$frontend_role' was found into config file bus.config.json\n";
}
$self->{wait_frontends_up} = AnyEvent->condvar;
# Create a connection to ever
y. In order to add Router workers to a pool
these must be declared into config file C<pool.config.json>:
[
{
"pool_id" : "myapp",
"bus_id" : "backend",
"workers
se base 'Beekeeper::Worker';
use Beekeeper::Logger ':log_levels';
use Scalar::Util 'weaken';
use JSON::XS;
my @Log_buffer;
sub authorize_request {
my ($self, $req) = @_;
return unless $se
=> sub {
# my ($payload_ref, $mqtt_properties) = @_;
$req = decode_json( ${$_[0]} );
push @Log_buffer, $req->{params};
shift @Log_buffe
ly. In order to add a LogTail worker to a
pool it must be declared into config file C<pool.config.json>:
[
{
"pool_id" : "myapp",
"bus_id" : "backend",
"workers
= '0.10';
use Beekeeper::Worker ':log';
use base 'Beekeeper::Worker';
use Beekeeper::JSONRPC::Error;
use JSON::XS;
sub authorize_request {
my ($self, $req) = @_;
if ($req->{method} eq '_
ub {
my ($payload_ref, $properties) = @_;
$self->on_worker_status( decode_json($$payload_ref)->[1] );
},
on_suback => sub {
my ($success) = @_;
ueue.*" );
}
}
sub reject_request {
my ($self, $params, $req) = @_;
# Just return a JSONRPC error response
if ($req->mqtt_properties->{'auth'}) {
# When client provided some
package Beekeeper::JSONRPC::Response;
use strict;
use warnings;
our $VERSION = '0.10';
sub new {
my $class = shift;
bless {
jsonrpc => '2.0',
result => undef,
id
ub success { 1 }
1;
__END__
=pod
=encoding utf8
=head1 NAME
Beekeeper::JSONRPC::Response - Representation of a JSON-RPC response
=head1 VERSION
Version 0.09
=head1 SYNOPSIS
my $client =
bjects of this class represent a JSON-RPC response (see L<http://www.jsonrpc.org/specification>).
When an RPC call is made the worker replies with a L<Beekeeper::JSONRPC::Response> object
if the invo
ly. In order to add a LogTail worker to a
pool it must be declared into config file C<pool.config.json>.
=head1 METHODS
=head3 tail ( %filters )
Returns all buffered entries that match the filter c
s> and C<on_error> must be coderefs and will receive respectively
L<Beekeeper::JSONRPC::Response> and L<Beekeeper::JSONRPC::Error> objects as arguments.
=head1 SEE ALSO
L<bkpr-log>, L<Beekeeper::Se
ess ($pool_cfg) {
die "Worker pool '$pool_id' is not defined into config file pool.config.json\n";
}
# Ensure that local bus is defined
my $bus_id = $pool_cfg->{'bus_id'};
un
config file bus.config.json\n";
}
# Merge pool.config.json contents
$self->{config}->{$_} = $pool_cfg->{$_} foreach (keys %$pool_cfg);
# Keep bus.config.json
$self->{bus_config}
respawns defunct ones.
=head1 CONFIGURATION
=head3 pool.config.json
Workers pools are defined into a file named C<pool.config.json>, which is searched
for into ENV C<BEEKEEPER_CONFIG_DIR>, C<~/.co