App-ElasticSearch-Utilities/lib/App/ElasticSearch/Utilities/Connection.pm
package App::ElasticSearch::Utilities::Connection;
# ABSTRACT: Abstract the connection element
use v5.16;
use warnings;
our $VERSION = '8.8'; # VERSION
use App::ElasticSearch::Utilities::HTTPRequest;
use CLI::Helpers qw(:output);
use JSON::MaybeXS;
use LWP::UserAgent;
use Module::Load;
use Ref::Util qw(is_ref is_arrayref is_hashref);
use Types::Standard qw( Enum HashRef InstanceOf Int Str );
use URI;
use URI::QueryParam;
use Moo;
use namespace::autoclean;
has 'host' => (
is => 'ro',
isa => Str,
default => sub { 'localhost' },
);
has 'port' => (
is => 'ro',
isa => Int,
default => sub { 9200 },
);
has 'proto' => (
is => 'rw',
isa => Enum[qw(http https)],
default => sub { 'http' },
);
has 'timeout' => (
is => 'ro',
isa => Int,
default => sub { 10 },
);
has 'username' => (
is => 'ro',
isa => Str,
default => sub { $ENV{USER} },
);
has 'password' => (
is => 'ro',
);
has 'ssl_opts' => (
is => 'ro',
isa => HashRef,
default => sub { {} },
);
has 'ua' => (
is => 'lazy',
isa => InstanceOf["LWP::UserAgent"],
);
sub _build_ua {
my ($self) = @_;
# Construct the UA Object
## no critic
my $local_version = eval '$VERSION' || '999.9';
## use critic
my $ua = LWP::UserAgent->new(
keep_alive => 3,
agent => sprintf("%s/%s (Perl %s)", __PACKAGE__, $local_version, $^V),
protocols_allowed => [qw(http https)],
timeout => $self->timeout,
ssl_opts => $self->ssl_opts,
);
debug({color=>'cyan'}, sprintf "Initialized a UA: %s%s", $ua->agent, $self->password ? ' (password provided)' : '');
# Decode the JSON Automatically
$ua->add_handler( response_done => sub {
my ($response,$lwp_ua,$headers) = @_;
debug( {color=>'magenta'}, "respone_done handler, got:");
debug_var($response);
my $ctype = $response->content_type() || 'invalid';
# JSON Transform
if( $ctype =~ m{^application/json\b} ) {
debug({color=>'yellow',indent=>1},"JSON Decoding Response Content");
eval {
my $decoded = decode_json( $response->content );
$response->content($decoded);
};
}
elsif ( $response->is_success && $ctype =~ m{^text/plain} ) {
# Plain text transform for the _cat API
debug({color=>'yellow',indent=>1},"Plain Text Transform Response Content");
my $decoded = [
grep { defined && length && !/^\s+$/ }
split /\r?\n/, $response->content
];
debug_var($decoded);
$response->content($decoded);
}
if( my $content = $response->content ) {
debug({color=>'yellow'}, "After translation:");
if( is_ref($content) ) {
debug_var( $content );
}
else{
debug( $content );
}
}
$_[0] = $response;
});
# Warn About Basic Auth without TLS
warn "HTTP Basic Authorization configured and not using TLS, this is not supported"
if length $self->password && $self->proto ne 'https';
return $ua;
}
sub request {
my ($self,$url,$options,$body) = @_;
# Build the Path
$options->{command} ||= $url;
my @path = grep { defined and length } @{ $options }{qw(index command)};
my $path = join('/', @path);
debug(sprintf "calling %s->request(%s)", ref $self, $path);
# Build a URI
my $uri = URI->new( sprintf "%s://%s:%d",
$self->proto,
$self->host,
$self->port,
);
$uri->path($path);
# Query String
if( exists $options->{uri_param} and is_hashref($options->{uri_param}) ) {
foreach my $k ( keys %{ $options->{uri_param} } ) {
$uri->query_param( $k => $options->{uri_param}{$k} );
}
}
# Body Translations
if(!defined $body && exists $options->{body}) {
$body ||= delete $options->{body};
}
# Determine request method
my $method = exists $options->{method} ? uc $options->{method} : 'GET';
# Special Case for Index Creation
if( $method eq 'PUT' && $options->{index} && $options->{command} eq '/' ) {
$uri->path($options->{index});
}
debug({color=>'magenta'}, sprintf "Issuing %s with URI of '%s'", $method, $uri->as_string);
if( defined $body ) {
if( is_ref($body) ) {
debug_var({indent=>1}, $body);
}
else {
debug({indent=>1}, split /\r?\n/, $body);
}
}
# Make the request
my $req = App::ElasticSearch::Utilities::HTTPRequest->new( $method => $uri->as_string );
# Authentication
$req->authorization_basic( $self->username, $self->password )
if length $self->password and $self->proto eq 'https';
$req->content($body) if defined $body;
return $self->ua->request( $req );
}
sub exists {
my ($self,%options) = @_;
return unless exists $options{index};
my %params = (
method => 'HEAD',
index => $options{index},
);
return $self->request('', \%params,)->is_success;
}
sub put {
my ($self,%options) = @_;
return unless exists $options{body};
my %params = ( method => 'PUT' );
$params{index} = $options{index} if exists $options{index};
my $resp = $self->request('', \%params, $options{body});
return ( $resp->code, $resp->content );
}
sub bulk {
my ($self,%options) = @_;
return unless exists $options{body};
my %params = ( method => 'POST' );
$params{index} = $options{index} if exists $options{index};
my $resp = $self->request( '_bulk', \%params, $options{body} );
return ( $resp->code, $resp->content );
}
__PACKAGE__->meta->make_immutable;
__END__
=pod
=head1 NAME
App::ElasticSearch::Utilities::Connection - Abstract the connection element
=head1 VERSION
version 8.8
=head1 SYNOPSIS
For most users, this code will never be called directly since this module
doesn't handle parameter parsing on the CLI. To get an object, instead call:
use App::ElasticSearch::Utilities qw(es_connect);
my $es = es_connect();
my $http_response_obj = $es->request('_search',
{
index=>'logstash',
uri_param => {
size => 10,
}
},
{
query => {
query_string => "program:sshd",
}
}
);
Though even this is overkill. The B<es_request> method maintains compatability with older versions and emulates
the API you'd expect from B<Elastijk>.
=head1 ATTRIBUTES
=head2 host
Hostname or ip to connect to, default 'B<localhost>'
=head2 port
Port to connect the HTTP transport for the ElasticSearch cluster, default is B<9200>
=head2 proto
Protocol to use, defaults to 'B<http>'.
This module converts from the performance concerned backend of B<Hijk> and B<Elastijk>, to the feature
rich B<LWP::UserAgent>. This means we can now support TLS communication to the ES back-end and things like
basic authentication.
=head2 timeout
Connection and Read Timeout for the HTTP connection, defaults to B<10> seconds.
=head2 username
HTTP Basic Authorization username, defaults to C<$ENV{USER}>.
=head2 password
HTTP Basic Authorization password, if set, we'll try authentication.
=head2 ssl_opts
SSL Options for L<LWP::UserAgent/ssl_opts>.
=head2 ua
Lazy built B<LWP::UserAgent> to access LWP::UserAgent directly.
=head1 METHODS
=head2 request( $command, { index => ... uri_param => { size => 1 } }, $body )
This method provides a wrapper between the Hijk/Elastijk request syntax and the
LWP::UserAgent flow. It's return value is the B<HTTP::Response> object from
B<LWP::UserAgent> instead of the more simplistic return values of B<Hijk> and
B<Elastijk>. Use B<App::ElasticSearch::Utilities::es_request> for a simpler
interface.
=head2 exists( index => 'name' )
Takes the name of an index, returns true if the index exists, false otherwise.
=head2 put( body => ... , index => ... )
Parameter B<body> is required. Puts something to an index. This is often used to
put settings and/or mappings to an index.
Returns a list containing the HTTP Status Code, and the Response Content.
=head2 bulk( body => ..., index => ... )
Parameter B<body> is required. The body should be an array containing the command and documents to send to the
ElasticSearch bulk API, see: L<Bulk API|https://www.elastic.co/guide/en/elasticsearch/reference/2.3/docs-bulk.html>
Returns a list containing the HTTP Status Code, and the Response Content.
=head1 AUTHOR
Brad Lhotsky <brad@divisionbyzero.net>
=head1 COPYRIGHT AND LICENSE
This software is Copyright (c) 2024 by Brad Lhotsky.
This is free software, licensed under:
The (three-clause) BSD License
=cut