Group
Extension

Metabrik-Repository/lib/Metabrik/Client/Elasticsearch/Query.pm

#
# $Id$
#
# client::elasticsearch::query Brik
#
package Metabrik::Client::Elasticsearch::Query;
use strict;
use warnings;

use base qw(Metabrik::Client::Elasticsearch);

sub brik_properties {
   return {
      revision => '$Revision$',
      tags => [ qw(unstable) ],
      author => 'GomoR <GomoR[at]metabrik.org>',
      license => 'http://opensource.org/licenses/BSD-3-Clause',
      attributes => {
         nodes => [ qw(node_list) ], # Inherited
         index => [ qw(index) ],     # Inherited
         type => [ qw(type) ],       # Inherited
         from => [ qw(number) ],     # Inherited
         size => [ qw(count) ],      # Inherited
         client => [ qw(INTERNAL) ],
      },
      attributes_default => {
         index => '*',
         type => '*',
      },
      commands => {
         create_client => [ ],
         reset_client => [ ],
         query => [ qw(query index|OPTIONAL type|OPTIONAL hash|OPTIONAL) ],
         get_query_result_total => [ qw($query_result|OPTIONAL) ],
         get_query_result_hits => [ qw($query_result|OPTIONAL) ],
         get_query_result_aggregations => [ qw($query_result|OPTIONAL) ],
         get_query_result_timed_out => [ qw($query_result|OPTIONAL) ],
         get_query_result_took => [ qw($query_result|OPTIONAL) ],
         term => [ qw(kv index|OPTIONAL type|OPTIONAL) ],
         unique_term => [ qw(unique kv index|OPTIONAL type|OPTIONAL) ],
         unique_values => [ qw(field index|OPTIONAL type|OPTIONAL) ],
         wildcard => [ qw(kv index|OPTIONAL type|OPTIONAL) ],
         range => [ qw(kv_from kv_to index|OPTIONAL type|OPTIONAL) ],
         top => [ qw(kv_count index|OPTIONAL type|OPTIONAL) ],
         top_match => [ qw(kv_count kv_match index|OPTIONAL type|OPTIONAL) ],
         match => [ qw(kv index|OPTIONAL type|OPTIONAL) ],
         match_phrase => [ qw(kv index|OPTIONAL type|OPTIONAL) ],
         from_json_file => [ qw(json_file index|OPTIONAL type|OPTIONAL) ],
         from_dump_file => [ qw(dump_file index|OPTIONAL type|OPTIONAL) ],
      },
      require_modules => {
         'Metabrik::File::Json' => [ ],
         'Metabrik::File::Dump' => [ ],
      },
   };
}

sub create_client {
   my $self = shift;

   my $ce = $self->client;
   if (! defined($ce)) {
      $ce = $self->open or return;
      $self->client($ce);
   }

   return $ce;
}

sub reset_client {
   my $self = shift;

   my $ce = $self->client;
   if (defined($ce)) {
      $self->client(undef);
   }

   return 1;
}

sub get_query_result_total {
   my $self = shift;
   my ($run) = @_;

   $self->brik_help_run_undef_arg('get_query_result_total', $run) or return;
   $self->brik_help_run_invalid_arg('get_query_result_total', $run, 'HASH') or return;

   if (! exists($run->{hits})) {
      return $self->log->error("get_query_result_total: invalid query result, no hits found");
   }
   if (! exists($run->{hits}{total})) {
      return $self->log->error("get_query_result_total: invalid query result, no total found");
   }

   return $run->{hits}{total};
}

sub get_query_result_hits {
   my $self = shift;
   my ($run) = @_;

   $self->brik_help_run_undef_arg('get_query_result_hits', $run) or return;
   $self->brik_help_run_invalid_arg('get_query_result_hits', $run, 'HASH') or return;

   if (! exists($run->{hits})) {
      return $self->log->error("get_query_result_hits: invalid query result, no hits found");
   }
   if (! exists($run->{hits}{hits})) {
      return $self->log->error("get_query_result_hits: invalid query result, no hits in hits found");
   }

   return $run->{hits}{hits};
}

sub get_query_result_aggregations {
   my $self = shift;
   my ($run) = @_;

   $self->brik_help_run_undef_arg('get_query_result_aggregations', $run) or return;
   $self->brik_help_run_invalid_arg('get_query_result_aggregations', $run, 'HASH')
      or return;

   if (! exists($run->{aggregations})) {
      return $self->log->error("get_query_result_aggregations: invalid query result, ".
         "no aggregations found");
   }

   return $run->{aggregations};
}

sub get_query_result_timed_out {
   my $self = shift;
   my ($run) = @_;

   $self->brik_help_run_undef_arg('get_query_result_timed_out', $run) or return;
   $self->brik_help_run_invalid_arg('get_query_result_timed_out', $run, 'HASH')
      or return;

   if (! exists($run->{timed_out})) {
      return $self->log->error("get_query_result_timed_out: invalid query result, ".
         "no timed_out found");
   }

   return $run->{timed_out} ? 1 : 0;
}

sub get_query_result_took {
   my $self = shift;
   my ($run) = @_;

   $self->brik_help_run_undef_arg('get_query_result_took', $run) or return;
   $self->brik_help_run_invalid_arg('get_query_result_took', $run, 'HASH')
      or return;

   if (! exists($run->{took})) {
      return $self->log->error("get_query_result_took: invalid query result, no took found");
   }

   return $run->{took};
}

sub query {
   my $self = shift;
   my ($q, $index, $type, $hash) = @_;

   $index ||= '*';
   $type ||= '*';
   $self->brik_help_run_undef_arg('query', $q) or return;

   my $ce = $self->create_client or return;

   my $r = $self->SUPER::query($q, $index, $type, $hash) or return;
   if (defined($r)) {
      if (exists($r->{hits}{total})) {
         return $r;
      }
      else {
         return $self->log->error("query: failed with [$r]");
      }
   }

   return $self->log->error("query: failed");
}

#
# run client::elasticsearch::query term domain=example.com index1-*,index2-*
#
sub term {
   my $self = shift;
   my ($kv, $index, $type) = @_;

   $index ||= $self->index;
   $type ||= $self->type;
   $self->brik_help_run_undef_arg('term', $kv) or return;
   $self->brik_help_set_undef_arg('term', $index) or return;
   $self->brik_help_set_undef_arg('term', $type) or return;

   if ($kv !~ /^\S+?=.+$/) {
      return $self->log->error("term: kv must be in the form 'key=value'");
   }
   my ($key, $value) = split('=', $kv);

   $self->log->debug("term: key[$key] value[$value]");

   # Optimized version on ES 5.0.0
   my $q = {
      size => $self->size,
      query => {
         bool => {
            must => { term => { $key => $value } },
         },
      },
   };

   $self->log->verbose("term: keys [$key] value [$value] index [$index] type [$type]");

   return $self->query($q, $index, $type);
}

#
# run client::elasticsearch::query unique_term ip domain=example.com index1-*,index2-*
#
sub unique_term {
   my $self = shift;
   my ($unique, $kv, $index, $type) = @_;

   $index ||= $self->index;
   $type ||= $self->type;
   $self->brik_help_run_undef_arg('unique_term', $unique) or return;
   $self->brik_help_run_undef_arg('unique_term', $kv) or return;
   $self->brik_help_set_undef_arg('unique_term', $index) or return;
   $self->brik_help_set_undef_arg('unique_term', $type) or return;

   if ($kv !~ m{^.+?=.+$}) {
      return $self->log->error("unique_term: kv [$kv] must be in the form ".
         "'key=value'");
   }
   my ($key, $value) = split('=', $kv);

   $self->log->debug("unique_term: key[$key] value[$value]");

   # Optimized version on ES 5.0.0
   my $q = {
      size => 0,
      query => {
         bool => {
            must => { term => { $key => $value } },
         },
      },
      aggs => {
         1 => {
            cardinality => {
               field => $unique,
               precision_threshold => 40000,
            },
         },
      },
   };

   $self->log->verbose("unique_term: unique [$unique] keys [$key] value [$value] ".
      "index [$index] type [$type]");

   return $self->query($q, $index, $type);
}

#
# 
#
sub unique_values {
   my $self = shift;
   my ($field, $index, $type) = @_;

   $index ||= $self->index;
   $type ||= $self->type;
   $self->brik_help_run_undef_arg('unique_values', $field) or return;
   $self->brik_help_set_undef_arg('unique_values', $index) or return;
   $self->brik_help_set_undef_arg('unique_values', $type) or return;

   my $size = $self->size * 10;

   # Will return 10*100000=1_000_000 unique values.
   my $q = {
      aggs => {
         1 => {
            terms => {
               field => $field,
               include => { num_partitions => 10, partition => 0 },
               size => $size,
            },
         },
      },
      size => 0,
   };

   $self->log->verbose("unique_values: unique [$field] index [$index] type [$type]");

   return $self->query($q, $index, $type);
}

sub wildcard {
   my $self = shift;
   my ($kv, $index, $type) = @_;

   $index ||= $self->index;
   $type ||= $self->type;
   $self->brik_help_run_undef_arg('wildcard', $kv) or return;
   $self->brik_help_set_undef_arg('wildcard', $index) or return;
   $self->brik_help_set_undef_arg('wildcard', $type) or return;

   if ($kv !~ /^\S+?=.+$/) {
      return $self->log->error("wildcard: kv must be in the form 'key=value'");
   }
   my ($key, $value) = split('=', $kv);

   my $q = {
      size => $self->size,
      query => {
         #constant_score => {  # Does not like constant_score
            #filter => {
               wildcard => {
                  $key => $value,
               },
            #},
         #},
      },
   };

   $self->log->verbose("wildcard: keys [$key] value [$value] index [$index] type [$type]");

   return $self->query($q, $index, $type);
}

#
# run client::elasticsearch::query range ip_range.from=192.168.255.36 ip_range.to=192.168.255.36
#
sub range {
   my $self = shift;
   my ($kv_from, $kv_to, $index, $type) = @_;

   $index ||= $self->index;
   $type ||= $self->type;
   $self->brik_help_run_undef_arg('range', $kv_from) or return;
   $self->brik_help_run_undef_arg('range', $kv_to) or return;
   $self->brik_help_set_undef_arg('range', $index) or return;
   $self->brik_help_set_undef_arg('range', $type) or return;

   if ($kv_from !~ /^\S+?=.+$/) {
      return $self->log->error("range: kv_from [$kv_from] must be in the form 'key=value'");
   }
   if ($kv_to !~ /^\S+?=.+$/) {
      return $self->log->error("range: kv_to [$kv_to] must be in the form 'key=value'");
   }
   my ($key_from, $value_from) = split('=', $kv_from);
   my ($key_to, $value_to) = split('=', $kv_to);

   #
   # http://stackoverflow.com/questions/40519806/no-query-registered-for-filtered
   # https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-filtered-query.html
   # Compatible with ES 5.0
   #
   my $q = {
      size => $self->size,
      query => {
         bool => {
            must => [
               { range => { $key_to => { gte => $value_to } } },
               { range => { $key_from => { lte => $value_from } } },
            ],
         },
      },
   };

   return $self->query($q, $index, $type);
}

#
# run client::elasticsearch::query top name=10 users-*
#
sub top {
   my $self = shift;
   my ($kv_count, $index, $type) = @_;

   $index ||= $self->index;
   $type ||= $self->type;
   $self->brik_help_run_undef_arg('top', $kv_count) or return;
   $self->brik_help_set_undef_arg('top', $index) or return;
   $self->brik_help_set_undef_arg('top', $type) or return;

   if ($kv_count !~ /^\S+=\d+$/) {
      return $self->log->error("top: kv_count [$kv_count] must be in the form 'key=value'");
   }
   my ($key_count, $value_count) = split('=', $kv_count);

   my $q = {
      aggs => {
         top_values => {
            terms => {
               field => $key_count,
               size => int($value_count),
               order => { _count => 'desc' },
            },
         },
      },
   };

   $self->log->verbose("top: key [$key_count] value [$value_count] index [$index] type [$type]");

   return $self->query($q, $index, $type);
}

sub match_phrase {
   my $self = shift;
   my ($kv, $index, $type) = @_;

   $index ||= $self->index;
   $type ||= $self->type;
   $self->brik_help_run_undef_arg('match_phrase', $kv) or return;
   $self->brik_help_set_undef_arg('match_phrase', $index) or return;
   $self->brik_help_set_undef_arg('match_phrase', $type) or return;

   if ($kv !~ /^\S+?=.+$/) {
      return $self->log->error("match_phrase: kv must be in the form 'key=value'");
   }
   my ($key, $value) = split('=', $kv);

   $self->log->debug("match_phrase: key[$key] value[$value]");

   my $q = {
      size => $self->size,
      query => {
         match_phrase => {
            $key => $value,
         },
      },
   };

   return $self->query($q, $index, $type);
}

sub match {
   my $self = shift;
   my ($kv, $index, $type) = @_;

   $index ||= $self->index;
   $type ||= $self->type;
   $self->brik_help_run_undef_arg('match', $kv) or return;
   $self->brik_help_set_undef_arg('match', $index) or return;
   $self->brik_help_set_undef_arg('match', $type) or return;

   if ($kv !~ /^\S+?=.+$/) {
      return $self->log->error("match: kv must be in the form 'key=value'");
   }
   my ($key, $value) = split('=', $kv);

   $self->log->debug("match: key[$key] value[$value]");

   my $q = {
      size => $self->size,
      query => {
         match => {
            $key => $value,
         },
      },
   };

   return $self->query($q, $index, $type);
}

#
# run client::elasticsearch::query top_match domain=10 host=*www*
#
sub top_match {
   my $self = shift;
   my ($kv_count, $kv_match, $index, $type) = @_;

   $index ||= $self->index;
   $type ||= $self->type;
   $self->brik_help_run_undef_arg('top_match', $kv_count) or return;
   $self->brik_help_run_undef_arg('top_match', $kv_match) or return;
   $self->brik_help_set_undef_arg('top_match', $index) or return;
   $self->brik_help_set_undef_arg('top_match', $type) or return;

   if ($kv_count !~ /^\S+?=.+$/) {
      return $self->log->error("top_match: kv_count [$kv_count] must be in the form 'key=value'");
   }
   if ($kv_match !~ /^\S+?=.+$/) {
      return $self->log->error("top_match: kv_match [$kv_match] must be in the form 'key=value'");
   }
   my ($key_count, $value_count) = split('=', $kv_count);
   my ($key_match, $value_match) = split('=', $kv_match);

   my $q = {
      size => $self->size,
      query => {
         #constant_score => {   # Does not like constant_score
            #filter => {
               match => {
                  $key_match => $value_match,
               },
            #},
         #},
      },
      aggs => {
         top_values => {
            terms => {
               field => $key_count,
               size => int($value_count),
            },
         },
      },
   };

   return $self->query($q, $index, $type);
}

sub from_json_file {
   my $self = shift;
   my ($file, $index, $type) = @_;

   $index ||= $self->index;
   $type ||= $self->type;
   $self->brik_help_run_undef_arg('from_json_file', $file) or return;
   $self->brik_help_run_file_not_found('from_json_file', $file) or return;
   $self->brik_help_set_undef_arg('from_json_file', $index) or return;
   $self->brik_help_set_undef_arg('from_json_file', $type) or return;

   my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
   my $q = $fj->read($file) or return;

   if (defined($q) && length($q)) {
      return $self->query($q, $index, $type);
   }

   return $self->log->error("from_json_file: nothing to read from this file [$file]");
}

sub from_dump_file {
   my $self = shift;
   my ($file, $index, $type) = @_;

   $index ||= $self->index;
   $type ||= $self->type;
   $self->brik_help_run_undef_arg('from_dump_file', $file) or return;
   $self->brik_help_run_file_not_found('from_dump_file', $file) or return;
   $self->brik_help_set_undef_arg('from_dump_file', $index) or return;
   $self->brik_help_set_undef_arg('from_dump_file', $type) or return;

   my $fd = Metabrik::File::Dump->new_from_brik_init($self) or return;
   my $q = $fd->read($file) or return;

   my $first = $q->[0];
   if (defined($first)) {
      return $self->query($first, $index, $type);
   }

   return $self->log->error("from_dump_file: nothing to read from this file [$file]");
}

1;

__END__

=head1 NAME

Metabrik::Client::Elasticsearch::Query - client::elasticsearch::query Brik

=head1 COPYRIGHT AND LICENSE

Copyright (c) 2014-2022, Patrice E<lt>GomoRE<gt> Auffret

You may distribute this module under the terms of The BSD 3-Clause License.
See LICENSE file in the source distribution archive.

=head1 AUTHOR

Patrice E<lt>GomoRE<gt> Auffret

=cut


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.