Group
Extension

Search-ElasticDirect/lib/Search/ElasticDirect.pm

# ABSTRACT:An interface to ElasticSearch API, version independent
#
# George Bouras, george.mpouras@yandex.com and Joan Ntzougani

package Search::ElasticDirect;
our $VERSION = '2.5.2';

use strict;
use warnings;
use Search::ElasticDirect;
use parent qw(Search::ElasticDirectHttp);
use URI::Escape;
use Cpanel::JSON::XS;

my $json = Cpanel::JSON::XS->new;
$json->allow_bignum(1);
$json->allow_blessed(1);
$json->canonical(0);
$json->pretty(0);
$json->utf8(1);

my $first_doc= 1;
my $more     = 1;
my $HashReply= {};
my $SendData = {};
my $cursor   = undef;
my $response = '';
my $options  = {};
my $url      = '';


    #############
    #  Methods  #
    #############


#   Create new object and connect to the ElasticSearch using varius methods
sub new
{
my $class = shift || __PACKAGE__;
my $self  = {

  host          => '127.0.0.1',
  port          => 9200,
  protocol      => 'https',
  timeout       => 360,
  scroll        => '5m',
  keep_alive    => 1,
  verify_SSL    => 0,
  error         => 0,
  authentication=> 1,
  username      => '',
  password      => '',
  method        => 'GET',
  certca        => undef, # define it if you have self sign certificates
  cert          => undef, # define it if you have Searchguard plugin
  key           => undef, # define it if you have Searchguard plugin
  __this_server => Search::ElasticDirectHttp::HostnameShort(),
  PIT           => 1,
  documents     => 9000,
  index         => '',
  sort          => [],
  path          => '_cat/health',
  param         => '',
  send_data     => '',
  format        => 'data' # json, json_pretty, yaml, txt, data
	};

# Set properties from arguments

  for (my($i,$j)=(0,1); $i < scalar(@_) - (scalar(@_) % 2); $i+=2,$j+=2) {

    if ( exists $self->{$_[$i]} ) {

      if ('_' eq substr $_[$i],0,1) {
      $self->{error} = "Private property \"$_[$i]\" is read only. Valid user properties are : ". join(', ', sort grep ! /^_/, keys %{$self});
      return $self
      }

    $self->{$_[$i]}=$_[$j]
    }
    else {
    $self->{error} = "Invalid property \"$_[$i]\" valid proerties are : ". join(', ', sort grep ! /^_/, keys %{$self});
    return $self
    }
  }

$self->{authentication} = $self->{authentication} =~/(?i)t|y|1/ ? 1:0;
$self->{keep_alive}     = $self->{keep_alive}     =~/(?i)t|y|1/ ? 1:0;
$self->{verify_SSL}     = $self->{verify_SSL}     =~/(?i)t|y|1/ ? 1:0;
$self->{PIT}            = $self->{PIT}            =~/(?i)t|y|1/ ? 1:0;
$self->{protocol}     //= 'https';

if ($self->{timeout} !~/^\d+$/) {$self->{error} = "port ($self->{timeout}) should be an integer\n";      return $self}
if ($self->{port}    !~/^\d+$/) {$self->{error} = "port ($self->{port}) should be an integer\n";         return $self}
if ($self->{port}    > 65535)   {$self->{error} = "port ($self->{port}) should be an less than 65536\n"; return $self}

  if ($self->{authentication}) {
  $self->{username} = URI::Escape::uri_escape( $self->{username} );
  $self->{password} = URI::Escape::uri_escape( $self->{password} );
  }
  else {
  delete $self->{username};
  delete $self->{password}
  }


  if ('https' eq $self->{protocol}) {

    # https searchguard
    if ((defined $self->{certca}) && (defined $self->{cert}) && (defined $self->{key})) {
    unless (-f $self->{certca}) {$self->{error} = "missing ssl certca file $self->{certca}"; return $self}
    unless (-f $self->{cert}  ) {$self->{error} = "missing ssl cert file $self->{cert}";     return $self}
    unless (-f $self->{key}   ) {$self->{error} = "missing ssl key file $self->{key}";       return $self}
    $self->{connect} = Search::ElasticDirectHttp::HTTP_Tiny_https_certca_cert_key($self->{timeout}, $self->{keep_alive}, $self->{verify_SSL}, $self->{certca}, $self->{cert}, $self->{key});

      if ($self->{authentication}) {
      $self->{url}         = "https://$self->{username}:$self->{password}\@$self->{host}:$self->{port}";
      $self->{description} = 'SearchGuard, CA certificate, cert admin, key admin'
      }
      else {
      $self->{url}         = "https://$self->{host}:$self->{port}";
      $self->{description} = 'SearchGuard, CA certificate, cert admin, key admin'
      }
    }

    # https username, password, certificate. Usually you want this one
    elsif ( ($self->{authentication}) && (defined $self->{certca}) && (defined $self->{username}) && (defined $self->{password}) ) {
    unless (-f $self->{certca}) {$self->{error} = "missing ssl certca file $self->{certca}"; return $self}
    $self->{connect}     = Search::ElasticDirectHttp::HTTP_Tiny_https_certca($self->{timeout}, $self->{keep_alive}, $self->{verify_SSL}, $self->{certca});
    $self->{url}         = "https://$self->{username}:$self->{password}\@$self->{host}:$self->{port}";
    $self->{description} = 'https, CA certificate, username, password';
    $self->{cert}        = undef;
    $self->{key}         = undef
    }

    # https username, password, when you have a signed host certificate
    elsif ( ($self->{authentication}) && (defined $self->{username}) && (defined $self->{password}) ) {
    $self->{connect}     = Search::ElasticDirectHttp::HTTP_Tiny_https($self->{timeout}, $self->{keep_alive}, $self->{verify_SSL});
    $self->{url}         = "https://$self->{username}:$self->{password}\@$self->{host}:$self->{port}";
    $self->{description} = 'https, username, password';
    $self->{certca}      = undef;
    $self->{cert}        = undef;
    $self->{key}         = undef
    }

    else {
    $self->{error} = "uknown $self->{protocol} connection mode";
    return $self
    }
  }
  elsif ('http' eq $self->{protocol}) {

    # http username, password
    if ( ($self->{authentication}) && (defined $self->{username}) && (defined $self->{password}) ) {
    $self->{connect}     = Search::ElasticDirectHttp::HTTP_Tiny_http($self->{timeout}, $self->{keep_alive});
    $self->{url}         = "http://$self->{username}:$self->{password}\@$self->{host}:$self->{port}";
    $self->{description} = 'http, username, password'
    }

    # http
    elsif (! $self->{authentication}) {
    $self->{connect}     = Search::ElasticDirectHttp::HTTP_Tiny_http($self->{timeout}, $self->{keep_alive});
    $self->{url}         = "http://$self->{host}:$self->{port}";
    $self->{description} = 'http';
    }

    else {
    $self->{error} = "unknown $self->{protocol} connection mode";
    return $self
    }
  }

  else {
  $self->{error} = "protocol ( $self->{protocol} ) should be http or https";
  return $self
  }

delete $self->{protocol};
bless $self,$class
}


sub DESTROY {}


#   Auto run when a non existing subroutine is called
sub AUTOLOAD
{
my  $self	= shift;
my  $class	= ref $self || __PACKAGE__;
(my $method	= eval "\$$class\::AUTOLOAD");  # ή πιο απλά $ExampleClass::AUTOLOAD  αλλά θέλουμε να είναι γενικό
my  @Subs;

	foreach ( eval "keys \%$class\::" ) {
	next if ! $class->can($_);
	next if   /^_/;
	next if   $_ eq "\U$_";
	push @Subs, $_
	}

warn "Method $method is missing. Available methods are: ". join(',',@Subs) ."\n";
exit 1
}


sub __NOT_AN_OBJECT
{
my $caller = shift;
print "$caller method did not called as an object. Arguments : ".join(',', @_)."\n";
exit 1
}


#   Returns a copy of an existing object with its properties
sub clone
{
	# clone an existing object
	if ((exists $_[0]) && (__PACKAGE__ eq ref $_[0])) {
	my $obj   = shift;
	my $class = ref $obj;
	my $clone = {};
	foreach (keys %{$obj}) { $clone->{$_} = $obj->{$_} } # Αντιγραφή των ιδιοτήτων
	bless $clone,$class
	}

	# Create new object
	else {
	new(@_)
	}
}


#   TRUE if this server is the Active master otherelse FALSE
sub ActiveMaster
{
my $obj   = ((exists $_[0]) && (__PACKAGE__ eq ref $_[0])) ? shift : __NOT_AN_OBJECT([caller 0]->[3],@_);
$response = $obj->{connect}->request('GET', "$obj->{url}/_cat/nodes?format=json&pretty=false&h=name,master", {headers=> {'Content-Type'=>'application/json'}});
$obj->{error}=0;

  unless ($response->{success}) {
  $obj->{error} = 'method : '. [caller 0]->[3]." , status : $response->{status} , error : $response->{reason} , content : $response->{content}\n";
  return undef
  }

  foreach (grep $_->{master} eq '*', @{ $json->decode($response->{content}) }) {
  return 1 if $_->{name} =~/^$obj->{__this_server}/
  }
0
}


#   pass through the request to elasticsearch
sub direct
{
my $obj = ((exists $_[0]) && (__PACKAGE__ eq ref $_[0])) ? shift : __NOT_AN_OBJECT([caller 0]->[3],@_);
$obj->{error}     = 0;
$obj->{scroll}    = undef;
$obj->{send_data} = '';
$obj->{param}     = '';
$obj->{sort}      = [];

  # Set properties from arguments
  for (my($i,$j)=(0,1); $i < scalar(@_) - (scalar(@_) % 2); $i+=2,$j+=2) {

    if (exists $obj->{$_[$i]}) {
		
      if ('_' eq substr $_[$i],0,1) {
      $obj->{error} = "Private property \"$_[$i]\" is read only. Valid user properties are : ". join(', ', sort grep ! /^(error|_)/, keys %{$obj});
      return undef
      }

    $obj->{$_[$i]}=$_[$j]
    }
    else {
    $obj->{error} = "Invalid property $_[$i] valid properties are : ". join(', ', sort grep ! /^(error|_)/, keys %{$obj});
    return undef
    }
  }

  if ( $obj->{method} !~/(?i)^(GET|DELETE|PUT|POST)$/ ) {
  $obj->{error} = "Unknown request method $obj->{method}";
  return undef
  }

  if ($obj->{send_data}=~/\S/) {
  $options = {content=> $obj->{send_data}, headers=>{charset=>'utf-8', 'Content-Type'=>'application/json'}} 
  }
  else {
  $options = {headers=> {charset=>'utf-8'}};
  }

  if ($obj->{path} eq '') {
  $url = $obj->{url}
  }
  else {
  $obj->{path} =~s/^(?:\/|\\)*(.*?)(?:\/|\\)*$/$1/;
  $url = "$obj->{url}/$obj->{path}";
  $url =~s/[\\\/]*$//
  }

$obj->{param} =~tr/;/&/s;                    # replace old style ; with &
$obj->{param} =~s/format=[^=&]+//g;          # remove format parameter
$obj->{param} =~s/^[\?&]*(.*?)[\?&]*$/$^N/;  # remove ? & from start/end

  if (($obj->{format} eq '')) {
  $obj->{format} = 'data';
  $url .= '?format=json&pretty=false'
  }
  elsif ($obj->{format} =~/json_pretty/i) {
  $url .= '?format=json&pretty=true'
  }
  elsif (($obj->{format} =~/data/i) || ($obj->{format} =~/json/i)) {
  $url .= '?format=json&pretty=false'
  }
  elsif ($obj->{format} eq 'yaml') {$url .= '?format=yaml'}
  elsif ($obj->{format} eq 'csv')  {$url .= '?format=csv'}
  elsif ($obj->{format} eq 'cbor') {$url .= '?format=cbor'}
  elsif ($obj->{format} eq 'smile'){$url .= '?format=smile'}
  else  {$obj->{format} =  'txt';   $url .= '?format=txt'}

$url .= "&$obj->{param}" if $obj->{param} ne '';

#print "path      : $obj->{path}\n";      #exit;
#print "send_data : $obj->{send_data}\n"; #exit;


$response = $obj->{connect}->request("\U$obj->{method}\E", $url, $options);

  unless ($response->{success}) {
  $obj->{error} = 'method : '.[caller 0]->[3]." , status : $response->{status} , error : $response->{reason} , content : $response->{content}";
  return undef
  }

  if (length $response->{content}) {

    if ($obj->{format} eq 'data') {
    $json->decode($response->{content})
    }
    elsif ($obj->{format} eq 'txt') {
    wantarray ? split /\v+/, $response->{content} : $response->{content}
    }
    else {
    $response->{content} # yaml, json, json_prety, 
    }
  }
  else {

    if    ($obj->{format} eq 'data') {{}}
    elsif ($obj->{format}=~/^json/)  {'{}'}
    elsif ($obj->{format} eq 'yaml') {'---'}
    else  {''} # txt
  }
}



#   Deep pagination using the scroll api
#   https://www.elastic.co/guide/en/elasticsearch/reference/current/scroll-api.html
sub DeepPagination_scroll
{
my $obj = ((exists $_[0]) && (__PACKAGE__ eq ref $_[0])) ? shift : __NOT_AN_OBJECT([caller 0]->[3],@_);
$obj->{error}     = 0;
$obj->{format}    = 'data';
$obj->{documents} = 10000;
$obj->{scroll}    = '10m';
$obj->{index}     = undef;
$obj->{send_data} = '{"query":{"match_all":{}}}';
$cursor           = undef;
$first_doc        = 1;
  
  for (my($i,$j)=(0,1); $i < scalar(@_) - (scalar(@_) % 2); $i+=2,$j+=2) {

    if (exists $obj->{$_[$i]}) {
		
      if ('_' eq substr $_[$i],0,1) {
      $obj->{error} = "Private property \"$_[$i]\" is read only. Valid user properties are : ". join(', ', sort grep ! /^(error|_)/, keys %{$obj});
      return undef
      }

    $obj->{$_[$i]}=$_[$j]
    }
    else {
    $obj->{error} = "Invalid property $_[$i] valid properties are : ". join(', ', sort grep ! /^(error|_)/, keys %{$obj});
    return undef
    }
  }

# Change the "size" at send_data by decompile and recompile the send text formatted as json
$HashReply         = $json->decode($obj->{send_data});
$HashReply->{size} = $obj->{documents};
$obj->{send_data}  = $json->encode($HashReply);
$HashReply         = {};
$obj->{scroll}   //= '10m';
$url               = "$obj->{url}/$obj->{index}/_search?scroll=$obj->{scroll}";
$options           = {content=> $obj->{send_data}, headers=> {charset=>'utf-8', 'Content-Type'=>'application/json'}};

  if   (($obj->{format} eq 'data') || ($obj->{format} eq 'json')) {$url .= '&format=json&pretty=false'}
  elsif ($obj->{format} eq 'json_pretty')                         {$url .= '&format=json&pretty=true'}
  elsif ($obj->{format} eq 'yaml')                                {$url .= '&format=yaml'}
  else  {$obj->{error} = "Format $obj->{format} is not one of choose data, yaml, json, json_pretty"; return undef}


  # return the scroll iterator
  sub {
  $response = $obj->{connect}->request('GET', $url, $options);
  CORE::die "method : iterator_scroll , status : $response->{status} , error : $response->{reason} , content : $response->{content}\n" unless $response->{success};

    # If there are more data
    if (($obj->{format} eq 'data') || ($obj->{format} =~/json/)) {
    $HashReply= $json->decode($response->{content});
    $more     = @{$HashReply->{hits}->{hits}} ? 1:0  # go for more if not empty
    }
    elsif ($obj->{format} eq 'yaml') {
    $more = $response->{content} =~/  hits:\s*\[\]/m ? 0:1
    }


    # run once, at the first result, to change the url path
    if ($first_doc) {

      if   (($obj->{format} eq 'data') || ($obj->{format} eq 'json')) {
      $cursor = $HashReply->{_scroll_id};
      $url    = "$obj->{url}/_search/scroll?format=json&pretty=false"
      }
      elsif ($obj->{format} eq 'json_pretty') {
      $cursor = $HashReply->{_scroll_id};
      $url    = "$obj->{url}/_search/scroll?format=json&pretty=true"
      }
      elsif ($obj->{format} eq 'yaml') {
      $url = "$obj->{url}/_search/scroll?format=yaml";

        if ( $response->{content} =~/_scroll_id:\s*"([^"]+)"/m ) {
        $cursor = $^N
        }
        else {
        CORE::die "Could not extract _scroll_id from yaml formatted reply\n$response->{content}\n"
        }
      }

    $options->{content} = "{\"scroll_id\":\"$cursor\",\"scroll\":\"$obj->{scroll}\"}";
    $first_doc=0
    }

    if ($more) {

        if ($obj->{format} eq 'data') {
        $HashReply->{hits}->{hits}
        }
        else {
        $response->{content}
        }
    }
    else {
    # All data retrieved, lets clear the scroll_id
    $response = $obj->{connect}->request('DELETE', "$obj->{url}/_search/scroll", {content=> "{\"scroll_id\":\"$cursor\"}", headers=> {'Content-Type'=>'application/json'}});
    unless ($response->{success}) { $obj->{error} = "method : clear_scroll_id , status : $response->{status} , error : $response->{reason} , content : $response->{content}"; return undef}
    undef
    }
  }
}



#   Deep pagination using the search_after
#   https://www.elastic.co/guide/en/elasticsearch/reference/current/paginate-search-results.html
sub DeepPagination_search_after
{
my $obj = ((exists $_[0]) && (__PACKAGE__ eq ref $_[0])) ? shift : __NOT_AN_OBJECT([caller 0]->[3],@_);
$obj->{error}     = 0;
$obj->{index}     = undef;
$obj->{PIT}       = 0;
$obj->{keep_alive}= '5m';
$obj->{documents} = 10000;
$obj->{sort}      = [ {'@timestamp' => {order => 'asc'}} ];
$obj->{send_data} = '{"query":{"match_all":{}}}';

  for (my($i,$j)=(0,1); $i < scalar(@_) - (scalar(@_) % 2); $i+=2,$j+=2) {

    if (exists $obj->{$_[$i]}) {

      if ('_' eq substr $_[$i],0,1) {
      $obj->{error} = "Private property \"$_[$i]\" is read only. Valid user properties are : ". join(', ', sort grep ! /^(error|_)/, keys %{$obj});
      return undef
      }

    $obj->{$_[$i]}=$_[$j]
    }
    else {
    $obj->{error} = "Invalid property $_[$i] valid properties are : ". join(', ', sort grep ! /^(error|_)/, keys %{$obj});
    return undef
    }
  }

# Define the PIT, sort, size,  and change the "size" by decompiling the send_data 
$obj->{PIT}       = $obj->{PIT} =~/(?i)t|y|1/ ? 1:0;
$SendData         = $json->decode($obj->{send_data});
$SendData->{from} = 0;
$SendData->{size} = $obj->{documents};
$SendData->{sort} = $obj->{sort};

  # if using PIT (Point In Time) we must create it at the index
  if ($obj->{PIT}) {
  $response = $obj->{connect}->request('POST', "$obj->{url}/$obj->{index}/_pit?format=json&pretty=false&keep_alive=$obj->{keep_alive}");
  unless ($response->{success}) {CORE::die 'method : '. [caller 0]->[3]." , status : $response->{status} , error : $response->{reason} , content : $response->{content}\n"; return undef}
  $SendData->{pit} = { keep_alive => $obj->{keep_alive}, id => $json->decode($response->{content})->{id} };
  $url = "$obj->{url}/_search?format=json&pretty=false" # do not use the index
  }
  else {
  $url = "$obj->{url}/$obj->{index}/_search?format=json&pretty=false"
  }

$options = {content=> $json->encode($SendData), headers=> {charset=> 'utf-8', 'Content-Type'=> 'application/json'}};

  # return the search_after iterator
  sub {
  $response = $obj->{connect}->request('GET', $url, $options);
  CORE::die "method : iterator_search_after , status : $response->{status} , error : $response->{reason} , content : $response->{content}\n" unless $response->{success};
  $HashReply = $json->decode($response->{content});

    if (@{$HashReply->{hits}->{hits}}) {
    # use Data::Dumper; $Data::Dumper::Terse=1; $Data::Dumper::Indent=1; print Dumper $HashReply->{hits}->{hits};
    # grab the "sort" from the LAST [-1] document and with that update the search_after
    $SendData->{search_after} = $HashReply->{hits}->{hits}->[-1]->{sort};
    $options->{content}       = $json->encode($SendData); #use Data::Dumper; $Data::Dumper::Terse=1; $Data::Dumper::Indent=1; print Dumper $options;

    $HashReply->{hits}->{hits}
    }
    else {
    # All data retrieved, clear the PIT if used

      if ($obj->{PIT}) {
      $response = $obj->{connect}->request('DELETE', "$obj->{url}/_pit", {content=> "{\"id\":\"$SendData->{pit}->{id}\"}", headers=> {'Content-Type'=>'application/json'}});
      unless ($response->{success}) { $obj->{error} = "method : clear_PIT , status : $response->{status} , error : $response->{reason} , content : $response->{content}" }
      }

    undef
    }
  }
}


#   Deep pagination for SQL syntax using the cusror
#   https://www.elastic.co/guide/en/elasticsearch/reference/current/sql-pagination.html
sub DeepPagination_sql
{
my $obj = ((exists $_[0]) && (__PACKAGE__ eq ref $_[0])) ? shift : __NOT_AN_OBJECT([caller 0]->[3],@_);
$obj->{error}     = 0;
$obj->{documents} = 9000;
$url = "$obj->{url}/_sql";
$first_doc = 1;

  for (my($i,$j)=(0,1); $i < scalar(@_) - (scalar(@_) % 2); $i+=2,$j+=2) {

    if (exists $obj->{$_[$i]}) {

      if ('_' eq substr $_[$i],0,1) {
      $obj->{error} = "Private property \"$_[$i]\" is read only. Valid user properties are : ". join(', ', sort grep ! /^(error|_)/, keys %{$obj});
      return undef
      }

    $obj->{$_[$i]}=$_[$j]
    }
    else {
    $obj->{error} = "Invalid property $_[$i] valid properties are : ". join(', ', sort grep ! /^(error|_)/, keys %{$obj});
    return undef
    }
  }

$SendData = $json->decode($obj->{send_data});
$SendData->{fetch_size} = $obj->{documents} unless exists $SendData->{fetch_size};
if (! exists $SendData->{query}) {$obj->{error} = 'You have not define the query'; return undef}
$options = {content=> $json->encode($SendData), headers=> {charset=> 'utf-8', 'Content-Type'=> 'application/json'}};

$obj->{param} =~s/format=[^=&]+//g;          # remove format parameter
$obj->{param} =~s/^[\?&]*(.*?)[\?&]*$/$^N/;  # remove ? & from start/end

  if (($obj->{format} eq '')) {
  $obj->{format} = 'data';
  $url .= '?format=json&pretty=false'
  }
  elsif ($obj->{format} =~/json_pretty/i) {
  $url .= '?format=json&pretty=true'
  }
  elsif (($obj->{format} =~/data/i) || ($obj->{format} =~/json/i)) {
  $url .= '?format=json&pretty=false'
  }
  elsif ($obj->{format} eq 'yaml') {$url .= '?format=yaml'}
  elsif ($obj->{format} eq 'cbor') {$url .= '?format=cbor'}
  elsif ($obj->{format} eq 'smile'){$url .= '?format=smile'}
  elsif ($obj->{format} eq 'csv')  {$url .= '?format=csv'}
  else  {$obj->{format} =  'txt';   $url .= '?format=txt'}

$url .= "&$obj->{param}" if $obj->{param} ne '';


  # return the cursor iterator
  sub {
  $response = $obj->{connect}->request('POST', $url, $options);
  CORE::die "method : iterator_search_after , status : $response->{status} , error : $response->{reason} , content : $response->{content}\n" unless $response->{success};
  $cursor=undef;

    # read the cursor at every possible fommat, well almost except the cbor,smile
    if (($obj->{format} eq 'csv') || ($obj->{format} eq 'txt')) {

      if (exists $response->{headers}->{cursor}) {
      $cursor = $response->{headers}->{cursor}
      }
    }
    elsif (($obj->{format} eq 'data') || ($obj->{format} =~/json/)) {
    $HashReply = $json->decode($response->{content});

      if (exists $HashReply->{cursor}) {
      $cursor =  $HashReply->{cursor};
      delete     $HashReply->{cursor}
      }
    }
    elsif ($obj->{format} eq 'yaml') {

      if ( $response->{content} =~/cursor: "([^"]+)"/m ) {
      $cursor = $^N
      }
    }

    if ($first_doc) {
    $first_doc=0;

        if (defined $cursor) {
        $options->{content} = "{\"cursor\":\"$cursor\"}" 
        }

    $obj->{format} eq 'data' ? $HashReply : $response->{content}
    }
    else {

      if (defined $cursor) {
      $obj->{format} eq 'data' ? $HashReply : $response->{content}
      }
      else {
      undef
      }
    }
  }
}

1;

__END__

=pod

=encoding UTF-8

=head1 NAME

Search::ElasticDirect - An interface to ElasticSearch API, version independent

=head1 VERSION

version 2.5.2

=head1 SYNOPSIS

  use Search::ElasticDirect;

  my $elk = Search::ElasticDirect->new( host=> 'SomeServer',  protocol=> 'http', authentication=> 'no' );
  die "Connection error because $elk->{error}\n" if $elk->{error};

  my $reply = $elk->direct( method=>'GET', path=>'_cat/nodes', param=>'h=name,ip,role,version&s=name:asc', format=>'data' );
  die "$elk->{error}\n" if $elk->{error};

  die "Allowed to run only at active master\n" unless $elk->ActiveMaster;
  die $elk->{error} if $elk->{error};

  foreach my $srv (@{$reply}) {
  say join ',', $srv->{name}, $srv->{ip}, $srv->{version}, $srv->{role}
  }

=head1 DESCRIPTION

An interface to ElasticSearch API, version independent

The DeepPagination methods are added to help with complexity of the scroll/cursor/search_after APIs

=head1 NAME

Search::ElasticDirect - An interface to ElasticSearch API, version independent

=head1 VERSION

version 2.5.1

=head1 ERRORS

Check for errors the property  $obj->{error} . if everything is ok it is B<FALSE>

=head1 METHODS

=head2 new

Create a new ElasticDirect object and define the connection parameters to the elasticsearch cluster. 

=head3 https, username, password, CA certificate

Most of the times you will use this one

  my $elk = Search::ElasticDirect->new(
  
    host           => 'SomeServer',
    port           => 9200,
    protocol       => 'https',
    timeout        => 1800,
    keep_alive     => 'yes',
    authentication => 'yes',
    username       => 'Joe',
    password       => 'JoePass',
    certca         => '/etc/elasticsearch/chain.pem',
    verify_SSL     => 'no' );
  
  die $elk->{error} if $elk->{error};

=head3 https, CA certificate, server key, server certificate

If you are using the B<SearchGuard> security plugin and you need node admin key/cert without username and password to perform admin tasks e.g. to delete an immutable index

  my $elk = Search::ElasticDirect->new(
  
    host           => 'SomeServer',
    port           => 9200,
    protocol       => 'https',
    timeout        => 1800,
    keep_alive     => 'yes',
    authentication => 'no',
    certca         => '/etc/elasticsearch/chain.pem',
    cert           => '/etc/elasticsearch/SomeServer.pem',
    key            => '/etc/elasticsearch/SomeServer.key',  
    verify_SSL     =>'no' );
  
  die $elk->{error} if $elk->{error};

=head3 https, username, password

Without certificate

  my $elk = Search::ElasticDirect->new(
  
    host           => 'SomeServer',
    port           => 9200,
    protocol       => 'https',
    timeout        => 1800,
    keep_alive     => 'yes',
    authentication => 'yes',
    username       => 'Joe',
    password       => 'JoePass' );
  
  die $elk->{error} if $elk->{error};

=head3 http, username, password

Plain text with authentication

  my $elk = Search::ElasticDirect->new(

    host           => 'SomeServer',
    port           => 9200,
    protocol       => 'http',
    timeout        => 1800,
    keep_alive     => 'yes',
    authentication => 'yes',
    username       => 'Joe',
    password       => 'SomePass' );

  die $elk->{error} if $elk->{error};

=head3 http

Plain text without authentication

  my $elk = Search::ElasticDirect->new(

    host           => 'SomeServer',
    port           => 9200,
    protocol       => 'http',
    timeout        => 1800,
    keep_alive     => 'yes',
    authentication => 'no' );

  die $elk->{error} if $elk->{error};

=head2 ActiveMaster

If you have schedule your script at multiple servers for high availability but it must run once at active master server

  my  $obj = Search::ElasticDirect->new(...);
  die $elk->{error} if $elk->{error};

  die "Allowed to run only at active master\n" unless $elk->ActiveMaster;
  die $elk->{error} if $elk->{error};

=head2 direct

Pass through the request to ElasticSeach API transparent without much validation, so you must know the correct syntax and parameters of the API/path you are calling

You can receive the result at the format  B<data>, B<yaml>, B<json>, B<json_pretty>, B<txt>

The data format returns a Perl data structure e.g. hash or array, while the other are text formatted as defined

Examples

=head3 Health as yaml

  my $reply = $elk->direct(method=>'GET', path=>'_cat/health', param=>'filter_path=status', format=>'yaml');
  die $elk->{error} if $elk->{error};
  say $reply

=head3 Health as human readable json

  $elk->direct(method=>'GET', path=>'_cat/health', param=>'filter_path=status', format=>'json_pretty');

=head3 Health as data; no filter_path is required with data

  $reply = $elk->direct(method=>'GET', path=>'_cat/health', format=>'data');
  say $reply->[0]->{status};

=head3 Indicies sorted

  $reply = $elk->direct(method=>'GET', path=>'_cat/indices', param=>'h=index,health,docs.count,store.size&bytes=g&s=index:asc', format=>'data');
  die $elk->{error} if $elk->{error};

  foreach ( @{ $reply } ) {
  say "$_->{index}, $_->{health}, $_->{'docs.count'}, $_->{'store.size'}"
  }

=head3 DSL query

  $reply    = $elk->direct(
  method    => 'GET',
  path      => 'foo/_search',
  format    => 'data',
  send_data => '{"size": 2, "query": {"range": {"@timestamp": {"from": "1970-01-01T00:00:00.000Z", "to": "2036-01-03T23:59:59.999Z"}}}}'
  );

  die $elk->{error} if $elk->{error};

  foreach ( @{ $reply->{hits}->{hits} } ) {
  say say "$_->{_source}->{'@timestamp'} , $_->{_source}->{id}"
  }

=head3 SQL query

format can be here B<data>, B<txt>, B<csv>, B<yaml>, B<json>, B<json_pretty>, B<cbor>, B<smile>

  $reply = $elk->direct(
  method    => 'POST',
  path      => '_sql',
  format    => 'data',
  send_data => "{\"query\": \"SELECT * FROM foo WHERE id >= '01' ORDER BY id DESC LIMIT 100\"}"
  );

  die $elk->{error} if $elk->{error};

  if ($elk->{format} eq 'data') {
  
    foreach ( @{$reply->{rows}} ) {
    say join ' , ', @{$_}
    }
  }
  else {
  say $reply
  }

=head2 DeepPagination_scroll

Retrieve massive amount of results using the B<scroll> API

documents here is how many records to fetch per iteration

format can be B<data>, B<yaml>, B<json> or B<json_pretty>

  my $iterator = $elk->DeepPagination_scroll(
  index     => 'foo',
  documents => 9000,
  format    => 'data',
  send_data => '{"query": {"range": {"@timestamp": {"from": "1970-01-01T00:00:00.000Z", "to": "2036-01-03T23:59:59.999Z"}}}}',
  scroll    => '10m'
  );

  die $elk->{error} if $elk->{error};

  while (my $Documents = $iterator->()) {

    if ($elk->{format} eq 'data') {

      foreach my $doc (@{$Documents}) {
    
        foreach my $key (sort keys %{$doc->{_source}}) {
        say "$key : $doc->{_source}->{$key}";
        }
      }
    }
    else {
    say $Documents
    }
  }

=head2 DeepPagination_search_after

Retrieve massive amount of results using the B<search_after> API

If you choose to use the PIT (point in time) it will created automatic for you

documents here is how many records to fetch per iteration

At sort, define as less fields you can that produce a unique combination. The fields must be sortable reasonable.

  my $iterator = $elk->DeepPagination_search_after(
  index     => 'foo',
  PIT       => 'yes',
  keep_alive=> '10m',
  documents => 9000,
  send_data => '{ "query": {"range": {"@timestamp": {"from": "1970-01-01T00:00:00.000Z", "to": "2036-01-03T23:59:59.999Z"}}} }',
  sort      => [ {'@timestamp'=> {order=> 'asc', format=> "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"}} , {id=>'asc'} ],
  );

  while (my $Documents = $iterator->()) {

    foreach my $doc (@{$Documents}) {
  
      foreach my $key (sort keys %{$doc->{_source}}) {
      say "$key : $doc->{_source}->{$key}"
      }
    }
  }

=head2 DeepPagination_sql

Retrieve massive amount of results using the B<sql> API

documents here is how many records to fetch per iteration

format can be B<data>, B<csv>, B<txt>, B<yaml>, B<json>, B<json_pretty>, B<cbor>, B<smile>

  my $iterator = $elk->DeepPagination_sql(
  documents => 100,
  format    => 'data',
  send_data => '{ "query": "SELECT * FROM foo ORDER BY id DESC" }'
  );

  die $elk->{error} if $elk->{error};

  while (my $Documents = $iterator->()) {

    if ($elk->{format} eq 'data') {
    use Data::Dumper; say Dumper $Documents
    }
    else {
    print $Documents
    }
  }

=head1 SEE OTHER

B<Search::Elasticsearch> The official client for Elasticsearch

B<Dancer2::Plugin::ElasticSearch> ElasticSearch wrapper for Dancer

=head1 SUPPORT

Any ideas or contributors are welcome

=head1 AUTHOR

George Bouras <george.mpouras@yandex.com>

=head1 COPYRIGHT AND LICENSE

This software is copyright (c) 2023 by George Bouras.

This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.

=head1 AUTHOR

George Bouras <george.mpouras@yandex.com>

=head1 COPYRIGHT AND LICENSE

This software is copyright (c) 2023 by George Bouras.

This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.

=cut


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.