Group
Extension

Metabrik-Repository/lib/Metabrik/Client/Elasticsearch.pm

#
# $Id$
#
# client::elasticsearch Brik
#
package Metabrik::Client::Elasticsearch;
use strict;
use warnings;

use base qw(Metabrik::Client::Rest);

sub brik_properties {
   return {
      revision => '$Revision$',
      tags => [ qw(unstable es es) ],
      author => 'GomoR <GomoR[at]metabrik.org>',
      license => 'http://opensource.org/licenses/BSD-3-Clause',
      attributes => {
         datadir => [ qw(datadir) ],
         nodes => [ qw(node_list) ],
         cxn_pool => [ qw(Sniff|Static|Static::NoPing) ],
         date => [ qw(date) ],
         index => [ qw(index) ],
         type => [ qw(type) ],
         from => [ qw(number) ],
         size => [ qw(count) ],
         max => [ qw(count) ],
         max_flush_count => [ qw(count) ],
         max_flush_size => [ qw(count) ],
         rtimeout => [ qw(seconds) ],
         sniff_rtimeout => [ qw(seconds) ],
         try => [ qw(count) ],
         use_bulk_autoflush => [ qw(0|1) ],
         use_indexing_optimizations => [ qw(0|1) ],
         use_ignore_id => [ qw(0|1) ],
         use_type => [ qw(0|1) ],
         csv_header => [ qw(fields) ],
         csv_encoded_fields => [ qw(fields) ],
         csv_object_fields => [ qw(fields) ],
         encoding => [ qw(utf8|ascii) ],
         disable_deprecation_logging => [ qw(0|1) ],
         es_version => [ qw(0|1) ],
         _es => [ qw(INTERNAL) ],
         _bulk => [ qw(INTERNAL) ],
         _scroll => [ qw(INTERNAL) ],
      },
      attributes_default => {
         nodes => [ qw(http://localhost:9200) ],
         cxn_pool => 'Sniff',
         from => 0,
         size => 10,
         max => 0,
         index => '*',
         type => '*',
         rtimeout => 60,
         sniff_rtimeout => 3,
         try => 3,
         max_flush_count => 1_000,
         max_flush_size => 1_000_000,
         use_bulk_autoflush => 1,
         use_indexing_optimizations => 0,
         use_ignore_id => 0,
         use_type => 1,
         encoding => 'utf8',
         disable_deprecation_logging => 0,
         es_version => '7',
      },
      commands => {
         open => [ qw(nodes_list|OPTIONAL cxn_pool|OPTIONAL) ],
         open_bulk_mode => [ qw(index|OPTIONAL type|OPTIONAL) ],
         open_scroll_scan_mode => [ qw(index|OPTIONAL size|OPTIONAL) ],
         open_scroll => [ qw(index|OPTIONAL size|OPTIONAL type|OPTIONAL query|OPTIONAL) ],
         close_scroll => [ ],
         total_scroll => [ ],
         next_scroll => [ qw(count|OPTIONAL) ],
         reindex => [ qw(index_source index_destination type_destination|OPTIONAL) ],
         get_reindex_tasks => [ ],
         cancel_reindex_task => [ qw(id) ],
         get_taskid => [ qw(id) ],
         show_reindex_progress => [ ],
         loop_show_reindex_progress => [ qw(seconds|OPTIONAL) ],
         index_document => [ qw(document index|OPTIONAL type|OPTIONAL hash|OPTIONAL id|OPTIONAL) ],
         index_bulk => [ qw(document index|OPTIONAL type|OPTIONAL hash|OPTIONAL id|OPTIONAL) ],
         index_bulk_from_list => [ qw(document_list index|OPTIONAL type|OPTIONAL hash|OPTIONAL) ],
         clean_deleted_from_index => [ qw(index) ],
         update_document => [ qw(document id index|OPTIONAL type|OPTIONAL hash|OPTIONAL) ],
         update_document_bulk => [ qw(document index|OPTIONAL type|OPTIONAL hash|OPTIONAL id|OPTIONAL) ],
         bulk_flush => [ qw(index|OPTIONAL) ],
         query => [ qw($query_hash index|OPTIONAL type|OPTIONAL hash|OPTIONAL) ],
         count => [ qw(index|OPTIONAL type|OPTIONAL) ],
         get_from_id => [ qw(id index|OPTIONAL type|OPTIONAL) ],
         www_search => [ qw(query index|OPTIONAL type|OPTIONAL) ],
         delete_index => [ qw(index|indices_list) ],
         update_alias => [ qw(new_index alias) ],
         delete_document => [ qw(index type id) ],
         delete_by_query => [ qw($query_hash index type proceed|OPTIONAL) ],
         show_indices => [ qw(string_filter|OPTIONAL) ],
         show_nodes => [ ],
         show_health => [ ],
         show_recovery => [ ],
         show_allocation => [ ],
         list_indices => [ qw(regex|OPTIONAL) ],
         get_indices => [ qw(string_filter|OPTIONAL) ],
         get_index => [ qw(index|indices_list) ],
         get_index_stats => [ qw(index) ],
         list_index_types => [ qw(index) ],
         list_index_fields => [ qw(index) ],
         list_indices_version => [ qw(index|indices_list) ],
         open_index => [ qw(index|indices_list) ],
         close_index => [ qw(index|indices_list) ],
         get_aliases => [ qw(index) ],
         put_alias => [ qw(index alias) ],
         delete_alias => [ qw(index alias) ],
         is_mapping_exists => [ qw(index mapping) ],
         get_mappings => [ qw(index type|OPTIONAL) ],
         create_index => [ qw(index shards|OPTIONAL) ],
         create_index_with_mappings => [ qw(index mappings) ],
         info => [ qw(nodes_list|OPTIONAL) ],
         version => [ qw(nodes_list|OPTIONAL) ],
         get_templates => [ ],
         list_templates => [ ],
         get_template => [ qw(name) ],
         put_mapping => [ qw(index type mapping) ],
         put_mapping_from_json_file => [ qw(index type file) ],
         update_mapping_from_json_file => [ qw(file index type) ],
         put_template => [ qw(name template) ],
         put_template_from_json_file => [ qw(file name|OPTIONAL) ],
         update_template_from_json_file => [ qw(file name|OPTIONAL) ],
         get_settings => [ qw(index|indices_list|OPTIONAL name|names_list|OPTIONAL) ],
         put_settings => [ qw(settings_hash index|indices_list|OPTIONAL) ],
         set_index_readonly => [ qw(index|indices_list boolean|OPTIONAL) ],
         reset_index_readonly => [ qw(index|indices_list|OPTIONAL) ],
         list_index_readonly => [ ],
         set_index_number_of_replicas => [ qw(index|indices_list number) ],
         set_index_refresh_interval => [ qw(index|indices_list number) ],
         get_index_settings => [ qw(index|indices_list) ],
         get_index_readonly => [ qw(index|indices_list) ],
         get_index_number_of_replicas => [ qw(index|indices) ],
         get_index_refresh_interval => [ qw(index|indices_list) ],
         get_index_number_of_shards => [ qw(index|indices_list) ],
         delete_template => [ qw(name) ],
         is_index_exists => [ qw(index) ],
         is_type_exists => [ qw(index type) ],
         is_document_exists => [ qw(index type document) ],
         parse_error_string => [ qw(string) ],
         refresh_index => [ qw(index) ],
         export_as => [ qw(format index size|OPTIONAL callback|OPTIONAL) ],
         export_as_csv => [ qw(index size|OPTIONAL callback|OPTIONAL) ],
         export_as_json => [ qw(index size|OPTIONAL callback|OPTIONAL) ],
         import_from => [ qw(format input index|OPTIONAL type|OPTIONAL hash|OPTIONAL callback|OPTIONAL) ],
         import_from_csv => [ qw(input index|OPTIONAL type|OPTIONAL hash|OPTIONAL callback|OPTIONAL) ],
         import_from_json => [ qw(input index|OPTIONAL type|OPTIONAL hash|OPTIONAL callback|OPTIONAL) ],
         import_from_csv_worker => [ qw(input_csv index|OPTIONAL type|OPTIONAL hash|OPTIONAL callback|OPTIONAL) ],
         get_stats_process => [ ],
         get_process => [ ],
         get_cluster_state => [ ],
         get_cluster_health => [ ],
         get_cluster_settings => [ ],
         put_cluster_settings => [ qw(settings) ],
         count_green_indices => [ ],
         count_yellow_indices => [ ],
         count_red_indices => [ ],
         list_green_indices => [ ],
         list_yellow_indices => [ ],
         list_red_indices => [ ],
         count_indices => [ ],
         list_indices_status => [ ],
         count_shards => [ ],
         count_size => [ qw(string_filter|OPTIONAL) ],
         count_total_size => [ qw(string_filter|OPTIONAL) ],
         count_count => [ ],
         list_datatypes => [ ],
         get_hits_total => [ qw(results) ],
         disable_shard_allocation => [ ],
         enable_shard_allocation => [ ],
         flush_synced => [ ],
         create_snapshot_repository => [ qw(body repository_name|OPTIONAL) ],
         create_shared_fs_snapshot_repository => [ qw(location
            repository_name|OPTIONAL) ],
         get_snapshot_repositories => [ ],
         get_snapshot_status => [ ],
         delete_snapshot_repository => [ qw(repository_name) ],
         create_snapshot => [ qw(snapshot_name|OPTIONAL repository_name|OPTIONAL 
            body|OPTIONAL) ],
         create_snapshot_for_indices => [ qw(indices snapshot_name|OPTIONAL
            repository_name|OPTIONAL) ],
         is_snapshot_finished => [ ],
         get_snapshot_state => [ ],
         get_snapshot => [ qw(snapshot_name|OPTIONAL repository_name|OPTIONAL) ],
         delete_snapshot => [ qw(snapshot_name repository_name) ],
         restore_snapshot => [ qw(snapshot_name repository_name body|OPTIONAL) ],
         restore_snapshot_for_indices => [ qw(indices snapshot_name repository_name) ],
      },
      require_modules => {
         'Metabrik::String::Json' => [ ],
         'Metabrik::File::Csv' => [ ],
         'Metabrik::File::Json' => [ ],
         'Metabrik::File::Dump' => [ ],
         'Metabrik::Format::Number' => [ ],
         'Metabrik::Worker::Parallel' => [ ],
         'Search::Elasticsearch' => [ ],
      },
   };
}

sub brik_preinit {
   my $self = shift;

   eval("use Search::Elasticsearch;");
   if ($Search::Elasticsearch::VERSION < 5) {
      $self->log->error("brik_preinit: please upgrade Search::Elasticsearch module ".
         "with: run perl::module install Search::Elasticsearch");
   }

   return $self->SUPER::brik_preinit;
}

sub open {
   my $self = shift;
   my ($nodes, $cxn_pool) = @_;

   $nodes ||= $self->nodes;
   $cxn_pool ||= $self->cxn_pool;
   $self->brik_help_run_undef_arg('open', $nodes) or return;
   $self->brik_help_run_undef_arg('open', $cxn_pool) or return;
   $self->brik_help_run_invalid_arg('open', $nodes, 'ARRAY') or return;
   $self->brik_help_run_empty_array_arg('open', $nodes) or return;

   for my $node (@$nodes) {
      if ($node !~ m{https?://}) {
         return $self->log->error("open: invalid node[$node], must start with http(s)");
      }
   }

   my $timeout = $self->rtimeout;

   my $nodes_str = join('|', @$nodes);
   $self->log->debug("open: using nodes [$nodes_str]");

   #
   # Timeout description here:
   #
   # Search::Elasticsearch::Role::Cxn
   #

   my %args = (
      nodes => $nodes,
      cxn_pool => $cxn_pool,
      timeout => $timeout,
      max_retries => $self->try,
      retry_on_timeout => 1,
      sniff_timeout => $self->sniff_rtimeout, # seconds, default 1
      request_timeout => 60,  # seconds, default 30
      ping_timeout => 5,  # seconds, default 2
      dead_timeout => 120,  # seconds, detault 60
      max_dead_timeout => 3600,  # seconds, default 3600
      sniff_request_timeout => 15, # seconds, default 2
      #trace_to => 'Stderr',  # For debug purposes
   );

   if ($self->disable_deprecation_logging) {
      $args{deprecate_to} = ['File', '/dev/null'];
   }

   my $es = Search::Elasticsearch->new(%args);
   if (! defined($es)) {
      return $self->log->error("open: failed");
   }

   $self->_es($es);

   return $nodes;
}

#
# Search::Elasticsearch::Client::5_0::Bulk
#
sub open_bulk_mode {
   my $self = shift;
   my ($index, $type) = @_;

   $index ||= $self->index;
   $type ||= $self->type;
   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('open_bulk_mode', $index) or return;
   $self->brik_help_run_undef_arg('open_bulk_mode', $type) or return;

   my %args = (
      index => $index,
      on_error => sub {
         #my ($action, $response, $i) = @_;

         #print Data::Dumper::Dumper($action)."\n";
         #print Data::Dumper::Dumper($response)."\n";
         #print Data::Dumper::Dumper($i)."\n";
         print Data::Dumper::Dumper(\@_)."\n";
      },
   );

   if ($self->use_type) {
      $args{type} = $type;
   }

   if ($self->use_bulk_autoflush) {
      my $max_count = $self->max_flush_count || 1_000;
      my $max_size = $self->max_flush_size || 1_000_000;

      $args{max_count} = $max_count;
      $args{max_size} = $max_size;
      $args{max_time} = 0;

      $self->log->info("open_bulk_mode: opening with max_flush_count [$max_count] and ".
         "max_flush_size [$max_size]");
   }
   else {
      $args{max_count} = 0;
      $args{max_size} = 0;
      $args{max_time} = 0;
      $args{on_error} = undef;
      #$args{on_success} = sub {
         #my ($action, $response, $i) = @_;
      #};

      $self->log->info("open_bulk_mode: opening without automatic flushing");
   }

   my $bulk;
   eval {
      $bulk = $es->bulk_helper(%args);
   };
   if ($@) {
      chomp($@);
      return $self->log->error("open_bulk_mode: failed: [$@]");
   }

   $self->_bulk($bulk);

   return $self->nodes;
}

sub open_scroll_scan_mode {
   my $self = shift;
   my ($index, $size) = @_;

   my $version = $self->version or return;
   if ($version ge "5.0.0") {
      return $self->log->error("open_scroll_scan_mode: Command not supported for ES version ".
         "$version, try open_scroll Command instead");
   }

   $index ||= $self->index;
   $size ||= $self->size;
   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('open_scroll_scan_mode', $index) or return;
   $self->brik_help_run_undef_arg('open_scroll_scan_mode', $size) or return;

   my $scroll;
   eval {
      $scroll = $es->scroll_helper(
         index => $index,
         search_type => 'scan',
         size => $size,
      );
   };
   if ($@) {
      chomp($@);
      return $self->log->error("open_scroll_scan_mode: failed: $@");
   }

   $self->_scroll($scroll);

   return $self->nodes;
}

#
# Search::Elasticsearch::Client::5_0::Scroll
#
sub open_scroll {
   my $self = shift;
   my ($index, $size, $type, $query) = @_;

   my $version = $self->version or return;
   if ($version lt "5.0.0") {
      return $self->log->error("open_scroll: Command not supported for ES version ".
         "$version, try open_scroll_scan_mode Command instead");
   }

   $query ||= { query => { match_all => {} } };
   $index ||= $self->index;
   $type ||= $self->type;
   $size ||= $self->size;
   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('open_scroll', $index) or return;
   $self->brik_help_run_undef_arg('open_scroll', $size) or return;

   my $timeout = $self->rtimeout;

   my %args = (
      scroll => "${timeout}s",
      # Starting with Search::Elasticsearch 7.x, scroll_in_qs does not exist anymore
      #scroll_in_qs => 1,  # By default (0), pass scroll_id in request body. When 1, pass
                          # it in query string.
      index => $index,
      size => $size,
      body => $query,
   );
   if ($self->use_type) {
      if ($type ne '*') {
         $args{type} = $type;
      }
   }

   #
   # https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html
   #
   my $scroll;
   eval {
      $scroll = $es->scroll_helper(%args);
   };
   if ($@) {
      chomp($@);
      return $self->log->error("open_scroll: failed: $@");
   }

   $self->_scroll($scroll);

   $self->log->verbose("open_scroll: opened with size [$size] and timeout [${timeout}s]");

   return $self->nodes;
}

#
# Search::Elasticsearch::Client::5_0::Scroll
#
sub close_scroll {
   my $self = shift;

   my $scroll = $self->_scroll;
   if (! defined($scroll)) {
      return 1;
   }

   $scroll->finish;
   $self->_scroll(undef);

   return 1;
}

sub total_scroll {
   my $self = shift;

   my $scroll = $self->_scroll;
   $self->brik_help_run_undef_arg('open_scroll', $scroll) or return;

   my $total;
   eval {
      $total = $scroll->total;
   };
   if ($@) {
      chomp($@);
      return $self->log->error("total_scroll: failed with: [$@]");
   }

   return $total;
}

sub next_scroll {
   my $self = shift;
   my ($count) = @_;

   $count ||= 1;

   my $scroll = $self->_scroll;
   $self->brik_help_run_undef_arg('open_scroll', $scroll) or return;

   my $try = $self->try;

RETRY:

   my $next;
   eval {
      if ($count > 1) {
         my @docs = $scroll->next($count);
         if (@docs > 0) {
            $next = \@docs;
         }
      }
      else {
         $next = $scroll->next;
      }
   };
   if ($@) {
      chomp($@);
      if (--$try == 0) {
         return $self->log->error("next_scroll: failed after try [$try] tries ".
            "with error [$@]");
      }
      $self->log->warning("next_scroll: sleeping 10 seconds before retry cause error: [$@]");
      sleep 10;
      goto RETRY;
   }

   return $next;
}

#
# Search::Elasticsearch::Client::5_0::Direct
#
sub index_document {
   my $self = shift;
   my ($doc, $index, $type, $hash, $id) = @_;

   $index ||= $self->index;
   $type ||= $self->type;
   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('index_document', $doc) or return;
   $self->brik_help_run_invalid_arg('index_document', $doc, 'HASH') or return;
   $self->brik_help_set_undef_arg('index', $index) or return;
   $self->brik_help_set_undef_arg('type', $type) or return;

   my %args = (
      index => $index,
      body => $doc,
   );
   if (defined($id)) {
      $args{id} = $id;
   }

   if ($self->use_type) {
      $args{type} = $type;
   }

   if (defined($hash)) {
      $self->brik_help_run_invalid_arg('index_document', $hash, 'HASH')
         or return;
      my $this_hash = { %$hash };
      if (defined($hash->{routing}) && defined($doc->{$hash->{routing}})) {
         $this_hash->{routing} = $doc->{$hash->{routing}};
      }
      %args = ( %args, %$this_hash );
   }

   my $r;
   eval {
      $r = $es->index(%args);
   };
   if ($@) {
      chomp($@);
      return $self->log->error("index_document: index failed for ".
         "index [$index]: [$@]");
   }

   return $r;
}

#
# https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html
#
sub reindex {
   my $self = shift;
   my ($index, $new, $type) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('reindex', $index) or return;
   $self->brik_help_run_undef_arg('reindex', $new) or return;

   my %args = (
      body => {
         conflicts => 'proceed',
         source => { index => $index },
         dest => { index => $new },
      },
      wait_for_completion => 'false',  # Immediately return the task.
   );

   # Change the type for destination doc
   if ($self->use_type) {
      if (defined($type)) {
         $args{body}{dest}{type} = $type;
      }
   }

   my $r;
   eval {
      $r = $es->reindex(%args);
   };
   if ($@) {
      chomp($@);
      return $self->log->error("reindex: reindex failed for index [$index]: [$@]");
   }

   return $r;
}

#
# List reindex tasks
#
# curl -X GET "localhost:9200/_tasks?detailed=true&actions=*reindex" | jq .
#
# Cancel reindex task
#
# curl -X POST "localhost:9200/_tasks/7VelPnOxQm21HtuJNFUAvQ:120914725/_cancel" | jq .
#

#
# Search::Elasticsearch::Client::6_0::Direct::Tasks
#
sub get_reindex_tasks {
   my $self = shift;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;

   my $t = $es->tasks;

   my $list = $t->list;
   my $nodes = $list->{nodes};
   if (! defined($nodes)) {
      return $self->log->error("get_reindex_tasks: no nodes found");
   }

   my %tasks = ();
   for my $node (keys %$nodes) {
      for my $id (keys %{$nodes->{$node}}) {
         my $tasks = $nodes->{$node}{tasks};
         for my $task (keys %$tasks) {
            my $action = $tasks->{$task}{action};
            if ($action eq 'indices:data/write/reindex' && !exists($tasks{$task})) {
               $tasks{$task} = $tasks->{$task};
            }
         }
      }
   }

   return \%tasks;
}

sub cancel_reindex_task {
   my $self = shift;
   my ($id) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('cancel_reindex_task', $id) or return;

   my $t = $es->tasks;

   return $t->cancel(task_id => $id);
}

sub get_taskid {
   my $self = shift;
   my ($id) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('get_taskid', $id) or return;

   my $t = $es->tasks;

   return $t->get(task_id => $id);
}

sub show_reindex_progress {
   my $self = shift;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;

   my $tasks = $self->get_reindex_tasks or return;
   if (! keys %$tasks) {
      $self->log->info("show_reindex_progress: no reindex task in progress");
      return 0;
   }

   for my $id (keys %$tasks) {
      my $task = $self->get_taskid($id) or next;

      my $status = $task->{task}{status};
      my $desc = $task->{task}{description};
      my $total = $status->{total};
      my $created = $status->{created};
      my $deleted = $status->{deleted};
      my $updated = $status->{updated};

      my $perc = ($created + $deleted + $updated) / $total * 100;

      printf("> Task [%s]: %.02f%%\n", $desc, $perc);
      print "created[$created] deleted[$deleted] updated[$updated] total[$total]\n";
   }

   return 1;
}

sub loop_show_reindex_progress {
   my $self = shift;
   my ($sec) = @_;

   $sec ||= 60;
   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;

   while (1) {
      $self->show_reindex_progress or return;
      sleep($sec);
   }

   return 1;
}

sub reindex_with_mapping_from_json_file {
   my $self = shift;
   my ($index, $new, $file) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('reindex_with_mapping_from_json_file', $index)
      or return;
   $self->brik_help_run_undef_arg('reindex_with_mapping_from_json_file', $new) or return;
   $self->brik_help_run_undef_arg('reindex_with_mapping_from_json_file', $file) or return;
   $self->brik_help_run_file_not_found('reindex_with_mapping_from_json_file', $file)
      or return;

   my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
   my $json = $fj->read($file) or return;

   return $self->reindex($index, $new, $json);
}

#
# Search::Elasticsearch::Client::5_0::Direct
#
# To execute this Command using routing requires to use the correct field
# value directly in $hash->{routing}. We cannot "guess" it from arguments,
# this would be a little bit complicated to do in an efficient way.
#
sub update_document {
   my $self = shift;
   my ($doc, $id, $index, $type, $hash) = @_;

   $index ||= $self->index;
   $type ||= $self->type;
   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('update_document', $doc) or return;
   $self->brik_help_run_invalid_arg('update_document', $doc, 'HASH') or return;
   $self->brik_help_run_undef_arg('update_document', $id) or return;
   $self->brik_help_set_undef_arg('index', $index) or return;
   $self->brik_help_set_undef_arg('type', $type) or return;

   my %args = (
      id => $id,
      index => $index,
      body => { doc => $doc },
   );

   if ($self->use_type) {
      $args{type} = $type;
   }

   if (defined($hash)) {
      $self->brik_help_run_invalid_arg('update_document', $hash, 'HASH')
         or return;
      %args = ( %args, %$hash );
   }

   my $r;
   eval {
      $r = $es->update(%args);
   };
   if ($@) {
      chomp($@);
      return $self->log->error("update_document: index failed for index [$index]: [$@]");
   }

   return $r;
}

#
# Search::Elasticsearch::Client::5_0::Bulk
#
sub index_bulk {
   my $self = shift;
   my ($doc, $index, $type, $hash, $id) = @_;

   my $bulk = $self->_bulk;
   $index ||= $self->index;
   $type ||= $self->type;
   $self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
   $self->brik_help_run_undef_arg('index_bulk', $doc) or return;
   $self->brik_help_set_undef_arg('index', $index) or return;
   $self->brik_help_set_undef_arg('type', $type) or return;

   my %args = (
      source => $doc,
   );
   if (defined($id)) {
      $args{id} = $id;
   }

   if (defined($hash)) {
      $self->brik_help_run_invalid_arg('index_bulk', $hash, 'HASH') or return;
      my $this_hash = { %$hash };
      if (defined($hash->{routing}) && defined($doc->{$hash->{routing}})) {
         $this_hash->{routing} = $doc->{$hash->{routing}};
      }
      %args = ( %args, %$this_hash );
   }

   my $r;
   eval {
      $r = $bulk->add_action(index => \%args);
   };
   if ($@) {
      chomp($@);
      my $p = $self->parse_error_string($@);
      if (defined($p) && exists($p->{class})) {
         my $class = $p->{class};
         my $code = $p->{code};
         my $node = $p->{node};
         return $self->log->error("index_bulk: failed for index [$index] with error ".
            "[$class] code [$code] for node [$node]");
      }
      else {
         return $self->log->error("index_bulk: index failed for index [$index]: [$@]");
      }
   }

   return $r;
}

#
# Allows to index multiple docs at one time
# $bulk->index({ source => $doc1 }, { source => $doc2 }, ...);
#
sub index_bulk_from_list {
   my $self = shift;
   my ($list, $index, $type, $hash) = @_;

   my $bulk = $self->_bulk;
   $index ||= $self->index;
   $type ||= $self->type;
   $self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
   $self->brik_help_run_undef_arg('index_bulk_from_list', $list) or return;
   $self->brik_help_run_invalid_arg('index_bulk_from_list', $list, 'ARRAY')
      or return;
   $self->brik_help_run_empty_array_arg('index_bulk_from_list', $list)
      or return;
   $self->brik_help_set_undef_arg('index', $index) or return;
   $self->brik_help_set_undef_arg('type', $type) or return;

   if (defined($hash)) {
      $self->brik_help_run_invalid_arg('index_bulk_from_list', $hash, 'HASH')
         or return;
   }

   my @args = ();
   for my $doc (@$list) {
      my %args = (
         source => $doc,
      );
      if (defined($hash)) {
         my $this_hash = { %$hash };
         if (defined($hash->{routing}) && defined($doc->{$hash->{routing}})) {
            $this_hash->{routing} = $doc->{$hash->{routing}};
         }
         %args = ( %args, %$this_hash );
      }
      push @args, \%args;
   }

   my $r;
   eval {
      $r = $bulk->index(@args);
   };
   if ($@) {
      chomp($@);
      my $p = $self->parse_error_string($@);
      if (defined($p) && exists($p->{class})) {
         my $class = $p->{class};
         my $code = $p->{code};
         my $node = $p->{node};
         return $self->log->error("index_bulk: failed for index [$index] with error ".
            "[$class] code [$code] for node [$node]");
      }
      else {
         return $self->log->error("index_bulk: index failed for index [$index]: [$@]");
      }
   }

   return $r;
}

#
# https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-forcemerge.html
#
sub clean_deleted_from_index {
   my $self = shift;
   my ($index) = @_;

   $self->brik_help_run_undef_arg('clean_deleted_from_index', $index) or return;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;

   my $indices = $self->_es->indices;

   my $r;
   eval {
      $r = $indices->forcemerge(
         index => $index,
         only_expunge_deletes => 'true',
      );
   };
   if ($@) {
      chomp($@);
      my $p = $self->parse_error_string($@);
      if (defined($p) && exists($p->{class})) {
         my $class = $p->{class};
         my $code = $p->{code};
         my $node = $p->{node};
         return $self->log->error("clean_deleted_from_index: failed for index ".
            "[$index] with error [$class] code [$code] for node [$node]");
      }
      else {
         return $self->log->error("clean_deleted_from_index: index failed for ".
            "index [$index]: [$@]");
      }
   }

   return $r;
}

#
# To execute this Command using routing requires to use the correct field
# value directly in $hash->{routing}. We cannot "guess" it from arguments,
# this would be a little bit complicated to do in an efficient way.
#
sub update_document_bulk {
   my $self = shift;
   my ($doc, $index, $type, $hash, $id) = @_;

   my $bulk = $self->_bulk;
   $index ||= $self->index;
   $type ||= $self->type;
   $self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
   $self->brik_help_run_undef_arg('update_document_bulk', $doc) or return;
   $self->brik_help_set_undef_arg('index', $index) or return;
   $self->brik_help_set_undef_arg('type', $type) or return;

   my %args = (
      index => $index,
      doc => $doc,
   );

   if ($self->use_type) {
      $args{type} = $type;
   }

   if (defined($id)) {
      $args{id} = $id;
   }

   if (defined($hash)) {
      $self->brik_help_run_invalid_arg('update_document_bulk', $hash, 'HASH')
         or return;
      %args = ( %args, %$hash );
   }

   my $r;
   eval {
      $r = $bulk->update(\%args);
   };
   if ($@) {
      chomp($@);
      my $p = $self->parse_error_string($@);
      if (defined($p) && exists($p->{class})) {
         my $class = $p->{class};
         my $code = $p->{code};
         my $node = $p->{node};
         return $self->log->error("update_document_bulk: failed for index [$index] ".
            "with error [$class] code [$code] for node [$node]");
      }
      else {
         return $self->log->error("update_document_bulk: index failed for ".
            "index [$index]: [$@]");
      }
   }

   return $r;
}

#
# We may have to call refresh_index after a bulk_flush, so we give an additional 
# optional Argument for given index.
#
sub bulk_flush {
   my $self = shift;
   my ($index) = @_;

   my $bulk = $self->_bulk;
   $self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;

   my $try = $self->try;

RETRY:

   my $r;
   eval {
      $r = $bulk->flush;
   };
   if ($@) {
      chomp($@);
      if (--$try == 0) {
         my $p = $self->parse_error_string($@);
         if (defined($p) && exists($p->{class})) {
            my $class = $p->{class};
            my $code = $p->{code};
            my $node = $p->{node};
            return $self->log->error("bulk_flush: failed after [$try] tries with error ".
               "[$class] code [$code] for node [$node]");
         }
         else {
            return $self->log->error("bulk_flush: failed after [$try]: [$@]");
         }
      }
      $self->log->warning("bulk_flush: sleeping 10 seconds before retry cause error ".
               "[$@]");
      sleep 10;
      goto RETRY;
   }

   if (defined($index)) {
      $self->refresh_index($index);
   }

   return $r;
}

#
# Search::Elasticsearch::Client::2_0::Direct
# Search::Elasticsearch::Client::5_0::Direct
#
sub count {
   my $self = shift;
   my ($index, $type) = @_;

   $index ||= $self->index;
   $type ||= $self->type;
   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;

   my %args = ();
   if (defined($index) && $index ne '*') {
      $args{index} = $index;
   }
   if ($self->use_type) {
      if (defined($type) && $type ne '*') {
         $args{type} = $type;
      }
   }

   #$args{body} = {
      #query => {
         #match => { title => 'Elasticsearch clients' },
      #},
   #}

   my $r;
   my $version = $self->version or return;
   if ($version ge "5.0.0") {
      eval {
         $r = $es->count(%args);
      };
   }
   else {
      eval {
         $r = $es->search(%args);
      };
   }
   if ($@) {
      chomp($@);
      return $self->log->error("count: count failed for index [$index]: [$@]");
   }

   if ($version ge "5.0.0") {
      if (exists($r->{count})) {
         return $r->{count};
      }
   }
   elsif (exists($r->{hits}) && exists($r->{hits}{total})) {
      return $r->{hits}{total};
   }

   return $self->log->error("count: nothing found");
}

#
# https://www.elastic.co/guide/en/elasticsearch/reference/current/full-text-queries.html
# https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-body.html
#
# Example: my $q = { query => { term => { ip => "192.168.57.19" } } }
#
# To perform a query using routing requires to use the correct field
# value directly in $hash->{routing}. We cannot "guess" it from $q,
# this would be a little bit complicated to do in an efficient way.
#
sub query {
   my $self = shift;
   my ($query, $index, $type, $hash) = @_;

   $index ||= $self->index;
   $type ||= $self->type;
   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('query', $query) or return;
   $self->brik_help_set_undef_arg('index', $index) or return;
   $self->brik_help_set_undef_arg('type', $type) or return;
   $self->brik_help_run_invalid_arg('query', $query, 'HASH') or return;

   my $timeout = $self->rtimeout;
   my $es_version = $self->es_version;

   my %args = (
      index => $index,
      body => $query,
   );

   if ($es_version == 7) {
      $args{track_total_hits} = 'true';
   }

   if (defined($hash)) {
      $self->brik_help_run_invalid_arg('query', $hash, 'HASH') or return;
      %args = ( %args, %$hash );
   }

   if ($self->use_type) {
      if ($type ne '*') {
         $args{type} = $type;
      }
   }

   my $r;
   eval {
      $r = $es->search(%args);
   };
   if ($@) {
      chomp($@);
      return $self->log->error("query: failed for index [$index]: [$@]");
   }

   return $r;
}

sub get_from_id {
   my $self = shift;
   my ($id, $index, $type) = @_;

   $index ||= $self->index;
   $type ||= $self->type;
   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('get_from_id', $id) or return;
   $self->brik_help_set_undef_arg('index', $index) or return;
   $self->brik_help_set_undef_arg('type', $type) or return;

   my $r;
   eval {
      my %this_args = (
         index => $index,
         id => $id,
      );
      if ($self->use_type) {
         $this_args{type} = $type;
      }
      $r = $es->get(%this_args);
   };
   if ($@) {
      chomp($@);
      return $self->log->error("get_from_id: get failed for index [$index]: [$@]");
   }

   return $r;
}

#
# https://www.elastic.co/guide/en/elasticsearch/reference/current/search-uri-request.html
#
sub www_search {
   my $self = shift;
   my ($query, $index, $type) = @_;

   $index ||= $self->index;
   $type ||= $self->type;
   $self->brik_help_run_undef_arg('www_search', $query) or return;
   $self->brik_help_set_undef_arg('index', $index) or return;
   $self->brik_help_set_undef_arg('type', $type) or return;

   my $from = $self->from;
   my $size = $self->size;

   my $sj = Metabrik::String::Json->new_from_brik_init($self) or return;

   my $nodes = $self->nodes;
   for my $node (@$nodes) {
      # http://localhost:9200/INDEX/TYPE/_search/?size=SIZE&q=QUERY
      my $url = "$node/$index";
      if ($self->use_type) {
         if ($type ne '*') {
            $url .= "/$type";
         }
      }
      $url .= "/_search/?from=$from&size=$size&q=".$query;

      my $get = $self->SUPER::get($url) or next;
      my $body = $get->{content};

      my $decoded = $sj->decode($body) or next;

      return $decoded;
   }

   return;
}

#
# Search::Elasticsearch::Client::2_0::Direct::Indices
#
sub delete_index {
   my $self = shift;
   my ($index) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('delete_index', $index) or return;
   $self->brik_help_run_invalid_arg('delete_index', $index, 'ARRAY', 'SCALAR') or return;

   my %args = (
      index => $index,
   );

   my $r;
   eval {
      $r = $es->indices->delete(%args);
   };
   if ($@) {
      chomp($@);
      return $self->log->error("delete_index: delete failed for index [$index]: [$@]");
   }

   return $r;
}

#
# Search::Elasticsearch::Client::2_0::Direct::Indices
#
# To execute this Command using routing requires to use the correct field
# value directly in $hash->{routing}. We cannot "guess" it from arguments,
# this would be a little bit complicated to do in an efficient way.
#
sub delete_document {
   my $self = shift;
   my ($index, $type, $id, $hash) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('delete_document', $index) or return;
   $self->brik_help_run_undef_arg('delete_document', $id) or return;

   my %args = (
      index => $index,
      id => $id,
   );

   if ($self->use_type) {
      $args{type} = $type;
   }

   if (defined($hash)) {
      $self->brik_help_run_invalid_arg('delete_document', $hash, 'HASH')
         or return;
      %args = ( %args, %$hash );
   }

   my $r;
   eval {
      $r = $es->delete(%args);
   };
   if ($@) {
      chomp($@);
      return $self->log->error("delete_document: delete failed for index [$index]: [$@]");
   }

   return $r;
}

#
# https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html
#
# Example: my $q = { query => { term => { ip => "192.168.57.19" } } }
#
sub delete_by_query {
   my $self = shift;
   my ($query, $index, $type, $proceed) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('delete_by_query', $query) or return;
   $self->brik_help_run_undef_arg('delete_by_query', $index) or return;
   $self->brik_help_run_undef_arg('delete_by_query', $type) or return;
   $self->brik_help_run_invalid_arg('delete_by_query', $query, 'HASH') or return;

   my $timeout = $self->rtimeout;

   my %args = (
      index => $index,
      body => $query,
   );

   if ($self->use_type) {
      $args{type} = $type;
   }

   if (defined($proceed) && $proceed) {
      $args{conflicts} = 'proceed';
   }

   my $r;
   eval {
      $r = $es->delete_by_query(%args);
   };
   if ($@) {
      chomp($@);
      return $self->log->error("delete_by_query: failed for index [$index]: [$@]");
   }

   # This may fail, we ignore it.
   $self->refresh_index($index);

   return $r;
}

#
# Search::Elasticsearch::Client::2_0::Direct::Cat
#
# https://www.elastic.co/guide/en/elasticsearch/reference/current/cat-indices.html
#
sub show_indices {
   my $self = shift;
   my ($string) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;

   my $r;
   eval {
      $r = $es->cat->indices;
   };
   if ($@) {
      chomp($@);
      return $self->log->error("show_indices: failed: [$@]");
   }

   my @lines = split(/\n/, $r);

   if (@lines == 0) {
      $self->log->warning("show_indices: nothing returned, no index?");
   }

   my @filtered = ();
   if (defined($string)) {
      for (@lines) {
         if (m{$string}) {
            push @filtered, $_;
         }
      }
      @lines = @filtered;
   }

   return \@lines;
}

#
# Search::Elasticsearch::Client::2_0::Direct::Cat
#
sub show_nodes {
   my $self = shift;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;

   my $r;
   eval {
      $r = $es->cat->nodes;
   };
   if ($@) {
      chomp($@);
      return $self->log->error("show_nodes: failed: [$@]");
   }

   my @lines = split(/\n/, $r);

   if (@lines == 0) {
      $self->log->warning("show_nodes: nothing returned, no nodes?");
   }

   return \@lines;
}

#
# Search::Elasticsearch::Client::2_0::Direct::Cat
#
sub show_health {
   my $self = shift;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;

   my $r;
   eval {
      $r = $es->cat->health;
   };
   if ($@) {
      chomp($@);
      return $self->log->error("show_health: failed: [$@]");
   }

   my @lines = split(/\n/, $r);

   if (@lines == 0) {
      $self->log->warning("show_health: nothing returned, no recovery?");
   }

   return \@lines;
}

#
# Search::Elasticsearch::Client::2_0::Direct::Cat
#
sub show_recovery {
   my $self = shift;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;

   my $r;
   eval {
      $r = $es->cat->recovery;
   };
   if ($@) {
      chomp($@);
      return $self->log->error("show_recovery: failed: [$@]");
   }

   my @lines = split(/\n/, $r);

   if (@lines == 0) {
      $self->log->warning("show_recovery: nothing returned, no index?");
   }

   return \@lines;
}

#
# curl -s 'localhost:9200/_cat/allocation?v'
#
sub show_allocation {
   my $self = shift;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;

   my $r;
   eval {
      $r = $es->cat->allocation;
   };
   if ($@) {
      chomp($@);
      return $self->log->error("show_allocation: failed: [$@]");
   }

   my @lines = split(/\n/, $r);

   if (@lines == 0) {
      $self->log->warning("show_allocation: nothing returned, no index?");
   }

   return \@lines;
}

sub list_indices {
   my $self = shift;
   my ($regex) = @_;

   my $get = $self->get_indices or return;

   my @indices = ();
   for (@$get) {
      if (defined($regex)) {
         if ($_->{index} =~ m{$regex}) {
            push @indices, $_->{index};
         }
      }
      else {
         push @indices, $_->{index};
      }
   }

   return [ sort { $a cmp $b } @indices ];
}

sub get_indices {
   my $self = shift;
   my ($string) = @_;

   my $lines = $self->show_indices($string) or return;
   if (@$lines == 0) {
      $self->log->warning("get_indices: no index found");
      return [];
   }

   #
   # Format depends on ElasticSearch version. We try to detect the format.
   #
   # 5.0.0:
   # "yellow open www-2016-08-14 BmNE9RaBRSCKqB5Oe8yZcw 5 1  146 0 251.8kb 251.8kb"
   #
   my @indices = ();
   for (@$lines) {
      my @t = split(/\s+/);
      if (@t == 10) {  # Version 5.0.0
         my $color = $t[0];
         my $state = $t[1];
         my $index = $t[2];
         my $id = $t[3];
         my $shards = $t[4];
         my $replicas = $t[5];
         my $count = $t[6];
         my $count2 = $t[7];
         my $total_size = $t[8];
         my $size = $t[9];
         push @indices, {
            color => $color,
            state => $state,
            index => $index,
            id => $id,
            shards => $shards,
            replicas => $replicas,
            count => $count,
            total_size => $total_size,
            size => $size,
         };
      }
      elsif (@t == 9) {
         my $index = $t[2];
         push @indices, {
            index => $index,
         };
      }
      elsif (@t == 8) {
         my $index = $t[1];
         push @indices, {
            index => $index,
         };
      }
   }

   return \@indices;
}

#
# Search::Elasticsearch::Client::5_0::Direct::Indices
#
sub get_index {
   my $self = shift;
   my ($index) = @_;
 
   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('get_index', $index) or return;
   $self->brik_help_run_invalid_arg('get_index', $index, 'ARRAY', 'SCALAR') or return;

   my %args = (
      index => $index,
   );

   my $r;
   eval {
      $r = $es->indices->get(%args);
   };
   if ($@) {
      chomp($@);
      return $self->log->error("get_index: get failed for index [$index]: [$@]");
   }

   return $r;
}

sub get_index_stats {
   my $self = shift;
   my ($index) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('get_index', $index) or return;

   my %args = (
      index => $index,
   );

   my $r;
   eval {
      $r = $es->indices->stats(%args);
   };
   if ($@) {
      chomp($@);
      return $self->log->error("get_index_stats: get failed for index [$index]: ".
         "[$@]");
   }

   return $r->{indices}{$index};
}

sub list_index_types {
   my $self = shift;
   my ($index) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('list_index_types', $index) or return;
   $self->brik_help_run_invalid_arg('list_index_types', $index, 'SCALAR') or return;

   my $r = $self->get_mappings($index) or return;
   if (keys %$r > 1) {
      return $self->log->error("list_index_types: multiple indices found, choose one");
   }

   my @types = ();
   for my $this_index (keys %$r) {
      my $mappings = $r->{$this_index}{mappings};
      push @types, keys %$mappings;
   }

   my %uniq = map { $_ => 1 } @types;

   return [ sort { $a cmp $b } keys %uniq ];
}

#
# By default, if you provide only one index and no type,
# all types will be merged (including _default_)
# If you specify one type (other than _default_), _default_ will be merged to it.
#
sub list_index_fields {
   my $self = shift;
   my ($index, $type) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('list_index_fields', $index) or return;
   $self->brik_help_run_invalid_arg('list_index_fields', $index, 'SCALAR') or return;

   my $r;
   if (defined($type)) {
      $r = $self->get_mappings($index, $type) or return;
      if (keys %$r > 1) {
         return $self->log->error("list_index_fields: multiple indices found, ".
            "choose one");
      }
      # _default_ mapping may not exists.
      if ($self->is_mapping_exists($index, '_default_')) {
         my $r2 = $self->get_mappings($index, '_default_');
         # Merge
         for my $this_index (keys %$r2) {
            my $default = $r2->{$this_index}{mappings}{'_default_'};
            $r->{$this_index}{mappings}{_default_} = $default;
         }
      }
   }
   else {
      $r = $self->get_mappings($index) or return;
      if (keys %$r > 1) {
         return $self->log->error("list_index_fields: multiple indices found, ".
            "choose one");
      }
   }

   my @fields = ();
   for my $this_index (keys %$r) {
      my $mappings = $r->{$this_index}{mappings};
      for my $this_type (keys %$mappings) {
         my $properties = $mappings->{$this_type}{properties};
         push @fields, keys %$properties;
      }
   }

   my %uniq = map { $_ => 1 } @fields;

   return [ sort { $a cmp $b } keys %uniq ];
}

sub list_indices_version {
   my $self = shift;
   my ($index) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('list_indices_version', $index) or return;
   $self->brik_help_run_invalid_arg('list_indices_version', $index, 'ARRAY', 'SCALAR')
      or return;

   my $r = $self->get_index($index) or return;

   my @list = ();
   for my $this (keys %$r) {
      my $name = $this;
      my $version = $r->{$this}{settings}{index}{version}{created};
      push @list, {
         index => $name,
         version => $version,
      };
   }

   return \@list;
}

sub open_index {
   my $self = shift;
   my ($index) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('open_index', $index) or return;
   $self->brik_help_run_invalid_arg('open_index', $index, 'ARRAY', 'SCALAR') or return;

   my $r;
   eval {
      $r = $es->indices->open(
         index => $index,
      );
   };
   if ($@) {
      chomp($@);
      return $self->log->error("open_index: failed: [$@]");
   }

   return $r;
}

sub close_index {
   my $self = shift;
   my ($index) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('close_index', $index) or return;
   $self->brik_help_run_invalid_arg('close_index', $index, 'ARRAY', 'SCALAR') or return;

   my $r;
   eval {
      $r = $es->indices->close(
         index => $index,
      );
   };
   if ($@) {
      chomp($@);
      return $self->log->error("close_index: failed: [$@]");
   }

   return $r;
}

#
# Search::Elasticsearch::Client::5_0::Direct::Indices
#
sub get_aliases {
   my $self = shift;
   my ($index) = @_;

   $index ||= $self->index;
   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;

   #
   # [DEPRECATION] [types removal] The parameter include_type_name should be
   # explicitly specified in get indices requests to prepare for 7.0. In 7.0
   # include_type_name will default to 'false', which means responses will
   # omit the type name in mapping definitions. - In request: {body => undef,
   # ignore => [],method => "GET",path => "/*",qs => {},serialize => "std"}
   #

   my %args = (
      index => $index,
      params => { include_type_name => 'false' },
   );

   my $r;
   eval {
      $r = $es->indices->get(%args);
   };
   if ($@) {
      chomp($@);
      return $self->log->error("get_aliases: get_aliases failed: [$@]");
   }

   my %aliases = ();
   for my $this (keys %$r) {
      $aliases{$this} = $r->{$this}{aliases};
   }

   return \%aliases;
}

#
# Search::Elasticsearch::Client::5_0::Direct::Indices
#
sub put_alias {
   my $self = shift;
   my ($index, $alias) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('put_alias', $index) or return;
   $self->brik_help_run_undef_arg('put_alias', $alias) or return;

   my %args = (
      index => $index,
      name => $alias,
   );

   my $r;
   eval {
      $r = $es->indices->put_alias(%args);
   };
   if ($@) {
      chomp($@);
      return $self->log->error("put_alias: put_alias failed: [$@]");
   }

   return $r;
}

#
# Search::Elasticsearch::Client::5_0::Direct::Indices
#
sub delete_alias {
   my $self = shift;
   my ($index, $alias) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('delete_alias', $index) or return;
   $self->brik_help_run_undef_arg('delete_alias', $alias) or return;

   my %args = (
      index => $index,
      name => $alias,
   );

   my $r;
   eval {
      $r = $es->indices->delete_alias(%args);
   };
   if ($@) {
      chomp($@);
      return $self->log->error("delete_alias: delete_alias failed: [$@]");
   }

   return $r;
}

sub update_alias {
   my $self = shift;
   my ($new_index, $alias) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('update_alias', $new_index) or return;
   $self->brik_help_run_undef_arg('update_alias', $alias) or return;

   # Search for previous index with that alias, if any.
   my $prev_index;
   my $aliases = $self->get_aliases or return;
   while (my ($k, $v) = each %$aliases) {
      for my $this (keys %$v) {
         if ($this eq $alias) {
            $prev_index = $k;
            last;
         }
      }
      last if $prev_index;
   }

   # Delete previous alias if it exists.
   if (defined($prev_index)) {
      $self->delete_alias($prev_index, $alias) or return;
   }

   return $self->put_alias($new_index, $alias);
}

sub is_mapping_exists {
   my $self = shift;
   my ($index, $mapping) = @_;

   $self->brik_help_run_undef_arg('is_mapping_exists', $index) or return;
   $self->brik_help_run_undef_arg('is_mapping_exists', $mapping) or return;

   if (! $self->is_index_exists($index)) {
      return 0;
   }

   my $all = $self->get_mappings($index) or return;
   for my $this_index (keys %$all) {
      my $mappings = $all->{$this_index}{mappings};
      for my $this_mapping (keys %$mappings) {
         if ($this_mapping eq $mapping) {
            return 1;
         }
      }
   }

   return 0;
}

#
# Search::Elasticsearch::Client::2_0::Direct::Indices
#
sub get_mappings {
   my $self = shift;
   my ($index, $type) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('get_mappings', $index) or return;
   $self->brik_help_run_invalid_arg('get_mappings', $index, 'ARRAY', 'SCALAR') or return;

   my %args = (
      index => $index,
      params => { include_type_name => 'false' },
   );

   if ($self->use_type) {
      $args{type} = $type;
   }

   my $r;
   eval {
      $r = $es->indices->get_mapping(%args);
   };
   if ($@) {
      chomp($@);
      return $self->log->error("get_mappings: get_mapping failed for index [$index]: ".
         "[$@]");
   }

   return $r;
}

#
# Search::Elasticsearch::Client::2_0::Direct::Indices
#
sub create_index {
   my $self = shift;
   my ($index, $shards) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('create_index', $index) or return;

   my %args = (
      index => $index,
   );

   if (defined($shards)) {
      $args{body}{settings}{index}{number_of_shards} = $shards;
   }

   my $r;
   eval {
      $r = $es->indices->create(%args);
   };
   if ($@) {
      chomp($@);
      return $self->log->error("create_index: create failed ".
         "for index [$index]: [$@]");
   }
   
   return $r;
}

#
# https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html
#
sub create_index_with_mappings {
   my $self = shift;
   my ($index, $mappings) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('create_index_with_mappings', $index) or return;
   $self->brik_help_run_undef_arg('create_index_with_mappings', $mappings) or return;
   $self->brik_help_run_invalid_arg('create_index_with_mappings', $mappings, 'HASH')
      or return;

   my $r;
   eval {
      $r = $es->indices->create(
         index => $index,
         body => {
            mappings => $mappings,
         },
      );
   };
   if ($@) {
      chomp($@);
      return $self->log->error("create_index_with_mappings: create failed for ".
         "index [$index]: [$@]");
   }

   return $r;
}

# GET http://localhost:9200/
sub info {
   my $self = shift;
   my ($nodes) = @_;

   $nodes ||= $self->nodes;
   $self->brik_help_run_undef_arg('info', $nodes) or return;
   $self->brik_help_run_invalid_arg('info', $nodes, 'ARRAY') or return;
   $self->brik_help_run_empty_array_arg('info', $nodes) or return;

   my $first = $nodes->[0];

   $self->get($first) or return;

   return $self->content;
}

sub version {
   my $self = shift;
   my ($nodes) = @_;

   $nodes ||= $self->nodes;
   $self->brik_help_run_undef_arg('version', $nodes) or return;
   $self->brik_help_run_invalid_arg('version', $nodes, 'ARRAY') or return;
   $self->brik_help_run_empty_array_arg('version', $nodes) or return;

   my $first = $nodes->[0];

   $self->get($first) or return;
   my $content = $self->content or return;

   return $content->{version}{number};
}

#
# Search::Elasticsearch::Client::2_0::Direct::Indices
#
sub get_templates {
   my $self = shift;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;

   my $r;
   eval {
      $r = $es->indices->get_template;
   };
   if ($@) {
      chomp($@);
      return $self->log->error("get_templates: failed: [$@]");
   }

   return $r;
}

sub list_templates {
   my $self = shift;

   my $content = $self->get_templates or return;

   return [ sort { $a cmp $b } keys %$content ];
}

#
# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
#
sub get_template {
   my $self = shift;
   my ($template) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('get_template', $template) or return;

   my $r;
   eval {
      $r = $es->indices->get_template(
         name => $template,
      );
   };
   if ($@) {
      chomp($@);
      return $self->log->error("get_template: template failed for name [$template]: [$@]");
   }

   return $r;
}

sub put_mapping {
   my $self = shift;
   my ($index, $type, $mapping) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('put_mapping', $index) or return;
   $self->brik_help_run_undef_arg('put_mapping', $type) or return;
   $self->brik_help_run_undef_arg('put_mapping', $mapping) or return;
   $self->brik_help_run_invalid_arg('put_mapping', $mapping, 'HASH')
      or return;

   my $r;
   eval {
      my %this_args = (
         index => $index,
         body => $mapping,
      );
      if ($self->use_type) {
         $this_args{type} = $type;
      }
      $r = $es->indices->put_mapping(%this_args);
   };
   if ($@) {
      chomp($@);
      return $self->log->error("put_mapping: mapping failed ".
         "for index [$index]: [$@]");
   }

   return $r;
}

#
# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
#
sub put_template {
   my $self = shift;
   my ($name, $template) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('put_template', $name) or return;
   $self->brik_help_run_undef_arg('put_template', $template) or return;
   $self->brik_help_run_invalid_arg('put_template', $template, 'HASH')
      or return;

   my $r;
   eval {
      $r = $es->indices->put_template(
         name => $name,
         body => $template,
      );
   };
   if ($@) {
      chomp($@);
      return $self->log->error("put_template: template failed ".
         "for name [$name]: [$@]");
   }

   return $r;
}

sub put_mapping_from_json_file {
   my $self = shift;
   my ($index, $type, $json_file) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('put_mapping_from_json_file', $index)
      or return;
   $self->brik_help_run_undef_arg('put_mapping_from_json_file', $type)
      or return;
   $self->brik_help_run_undef_arg('put_mapping_from_json_file', $json_file)
      or return;
   $self->brik_help_run_file_not_found('put_mapping_from_json_file',
      $json_file) or return;

   my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
   my $data = $fj->read($json_file) or return;

   if (! exists($data->{mappings})) {
      return $self->log->error("put_mapping_from_json_file: no mapping ".
         "data found");
   }

   return $self->put_mapping($index, $type, $data->{mappings});
}

sub update_mapping_from_json_file {
   my $self = shift;
   my ($json_file, $index, $type) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('update_mapping_from_json_file',
      $json_file) or return;
   $self->brik_help_run_file_not_found('update_mapping_from_json_file',
      $json_file) or return;
   $self->brik_help_run_undef_arg('update_mapping_from_json_file',
      $type) or return;
   $self->brik_help_run_undef_arg('update_mapping_from_json_file',
      $index) or return;

   my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
   my $data = $fj->read($json_file) or return;

   if (! exists($data->{mappings})) {
      return $self->log->error("update_mapping_from_json_file: ".
         "no data found");
   }

   my $mappings = $data->{mappings};

   return $self->put_mapping($index, $type, $mappings);
}

sub put_template_from_json_file {
   my $self = shift;
   my ($json_file, $name) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('put_template_from_json_file', $json_file)
      or return;
   $self->brik_help_run_file_not_found('put_template_from_json_file',
      $json_file) or return;

   my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
   my $data = $fj->read($json_file) or return;

   if (!defined($name)) {
      ($name) = $json_file =~ m{([^/]+)\.json$};
   }

   if (! defined($name)) {
      return $self->log->error("put_template_from_json_file: no template ".
         "name found");
   }

   return $self->put_template($name, $data);
}

sub update_template_from_json_file {
   my $self = shift;
   my ($json_file, $name) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('update_template_from_json_file',
      $json_file) or return;
   $self->brik_help_run_file_not_found('update_template_from_json_file',
      $json_file) or return;

   my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
   my $data = $fj->read($json_file) or return;

   if (!defined($name)) {
      ($name) = $json_file =~ m{([^/]+)\.json$};
   }

   if (! defined($name)) {
      return $self->log->error("put_template_from_json_file: no template ".
         "name found");
   }

   # We ignore errors, template may not exist.
   $self->delete_template($name);

   return $self->put_template($name, $data);
}

#
# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-settings.html
# Search::Elasticsearch::Client::2_0::Direct::Indices
#
sub get_settings {
   my $self = shift;
   my ($indices, $names) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;

   my %args = ();
   if (defined($indices)) {
      $self->brik_help_run_undef_arg('get_settings', $indices) or return;
      my $ref = $self->brik_help_run_invalid_arg('get_settings', $indices, 'ARRAY', 'SCALAR')
         or return;
      $args{index} = $indices;
   }
   if (defined($names)) {
      $self->brik_help_run_file_not_found('get_settings', $names) or return;
      my $ref = $self->brik_help_run_invalid_arg('get_settings', $names, 'ARRAY', 'SCALAR')
         or return;
      $args{name} = $names;
   }

   my $r;
   eval {
      $r = $es->indices->get_settings(%args);
   };
   if ($@) {
      chomp($@);
      return $self->log->error("get_settings: failed: [$@]");
   }

   return $r;
}

#
# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-settings.html
# Search::Elasticsearch::Client::2_0::Direct::Indices
#
# Example:
#
# run client::elasticsearch put_settings "{ index => { refresh_interval => -1 } }"
#
# XXX: should be renamed to put_index_settings
#
sub put_settings {
   my $self = shift;
   my ($settings, $indices) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('put_settings', $settings) or return;
   $self->brik_help_run_invalid_arg('put_settings', $settings, 'HASH') or return;

   my %args = (
      body => $settings,
   );
   if (defined($indices)) {
      $self->brik_help_run_undef_arg('put_settings', $indices) or return;
      my $ref = $self->brik_help_run_invalid_arg('put_settings', $indices, 'ARRAY', 'SCALAR')
         or return;
      $args{index} = $indices;
   }

   my $r;
   eval {
      $r = $es->indices->put_settings(%args);
   };
   if ($@) {
      chomp($@);
      return $self->log->error("put_settings: failed: [$@]");
   }

   return $r;
}

sub set_index_readonly {
   my $self = shift;
   my ($indices, $bool) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('set_index_readonly', $indices) or return;
   $self->brik_help_run_invalid_arg('set_index_readonly', $indices, 'ARRAY', 'SCALAR')
      or return;

   if (! defined($bool)) {
      $bool = 'true';
   }
   else {
      $bool = $bool ? 'true' : 'false';
   }

   my $settings = {
      'blocks.read_only' => $bool,
      'blocks.read_only_allow_delete' => 'true',
   };

   return $self->put_settings($settings, $indices);
}

#
# curl -XPUT -H "Content-Type: application/json" http://localhost:9200/_all/_settings -d '{"index.blocks.read_only_allow_delete": null}'
# PUT synscan-2018-05/_settings
# {
#  "index": {
#    "blocks":{
#      "read_only":"false",
#      "read_only_allow_delete":"true"
#    }
#  }
#}
#
#
# If it fails with the following error:
#
# [2018-09-12T13:38:40,012][INFO ][logstash.outputs.elasticsearch] retrying failed action with response code: 403 ({"type"=>"cluster_block_exception", "reason"=>"blocked by: [FORBIDDEN/12/index read-only / allow delete (api)];"})
#
# Use Kibana dev console and copy/paste both requests:
#
# PUT _all/_settings
# {
#    "index": {
#       "blocks": {
#          "read_only_allow_delete": "false"
#       }
#    }
# }
#
sub reset_index_readonly {
   my $self = shift;
   my ($indices) = @_;

   $indices ||= '*';
   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_invalid_arg('reset_index_readonly', $indices,
      'ARRAY', 'SCALAR') or return;

   my $settings = {
      blocks => {
         read_only_allow_delete => 'false',
      },
   };

   # Settings on '*' indices should be enough to reset for everyone.
   my $r = $self->put_settings($settings, $indices);
   #$self->log->info(Data::Dumper::Dumper($r));

   return 1;
}

sub list_index_readonly {
   my $self = shift;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;

   my $list = $self->list_indices or return;

   my @indices = ();
   for my $this (@$list) {
      my $ro = $self->get_index_readonly($this) or next;
      if (defined($ro->{index}{provided_name})) {
         push @indices, $ro->{index}{provided_name};
      }
   }

   return \@indices;
}

sub set_index_number_of_replicas {
   my $self = shift;
   my ($indices, $number) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('set_index_number_of_replicas', $indices) or return;
   $self->brik_help_run_invalid_arg('set_index_number_of_replicas', $indices, 'ARRAY', 'SCALAR')
      or return;

   my $settings = { number_of_replicas => $number };

   return $self->put_settings($settings, $indices);
}

sub set_index_refresh_interval {
   my $self = shift;
   my ($indices, $number) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('set_index_refresh_interval', $indices) or return;
   $self->brik_help_run_invalid_arg('set_index_refresh_interval', $indices, 'ARRAY', 'SCALAR')
      or return;

   # If there is a meaningful value not postfixed with a unity,
   # we default to add a `s' for a number of seconds.
   if ($number =~ /^\d+$/ && $number > 0) {
      $number .= 's';
   }

   my $settings = { refresh_interval => $number };

   return $self->put_settings($settings, $indices);
}

sub get_index_settings {
   my $self = shift;
   my ($indices) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('get_index_settings', $indices) or return;
   $self->brik_help_run_invalid_arg('get_index_settings', $indices, 'ARRAY', 'SCALAR')
      or return;

   my $settings = $self->get_settings($indices);

   my %indices = ();
   for (keys %$settings) {
      $indices{$_} = $settings->{$_}{settings};
   }

   return \%indices;
}

sub get_index_readonly {
   my $self = shift;
   my ($indices) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('get_index_readonly', $indices) or return;
   $self->brik_help_run_invalid_arg('get_index_readonly', $indices, 'ARRAY', 'SCALAR')
      or return;

   my $settings = $self->get_settings($indices);

   my %indices = ();
   for (keys %$settings) {
      #$indices{$_} = $settings->{$_}{settings}{index}{'blocks_write'};
      $indices{$_} = $settings->{$_}{settings};
   }

   return \%indices;
}

sub get_index_number_of_replicas {
   my $self = shift;
   my ($indices) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('get_index_number_of_replicas', $indices) or return;
   $self->brik_help_run_invalid_arg('get_index_number_of_replicas', $indices, 'ARRAY', 'SCALAR')
      or return;

   my $settings = $self->get_settings($indices);

   my %indices = ();
   for (keys %$settings) {
      $indices{$_} = $settings->{$_}{settings}{index}{number_of_replicas};
   }

   return \%indices;
}

sub get_index_refresh_interval {
   my $self = shift;
   my ($indices, $number) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('get_index_refresh_interval', $indices) or return;
   $self->brik_help_run_invalid_arg('get_index_refresh_interval', $indices, 'ARRAY', 'SCALAR')
      or return;

   my $settings = $self->get_settings($indices);

   my %indices = ();
   for (keys %$settings) {
      $indices{$_} = $settings->{$_}{settings}{index}{refresh_interval};
   }

   return \%indices;
}

sub get_index_number_of_shards {
   my $self = shift;
   my ($indices, $number) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('get_index_number_of_shards', $indices) or return;
   $self->brik_help_run_invalid_arg('get_index_number_of_shards', $indices, 'ARRAY', 'SCALAR')
      or return;

   my $settings = $self->get_settings($indices);

   my %indices = ();
   for (keys %$settings) {
      $indices{$_} = $settings->{$_}{settings}{index}{number_of_shards};
   }

   return \%indices;
}

#
# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
#
sub delete_template {
   my $self = shift;
   my ($name) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('delete_template', $name) or return;

   my $r;
   eval {
      $r = $es->indices->delete_template(
         name => $name,
      );
   };
   if ($@) {
      chomp($@);
      return $self->log->error("delete_template: failed for name [$name]: [$@]");
   }

   return $r;
}

#
# Return a boolean to state for index existence
#
sub is_index_exists {
   my $self = shift;
   my ($index) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('is_index_exists', $index) or return;

   my $r;
   eval {
      $r = $es->indices->exists(
         index => $index,
      );
   };
   if ($@) {
      chomp($@);
      return $self->log->error("is_index_exists: failed for index [$index]: [$@]");
   }

   return $r ? 1 : 0;
}

#
# Return a boolean to state for index with type existence
#
sub is_type_exists {
   my $self = shift;
   my ($index, $type) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('is_type_exists', $index) or return;
   $self->brik_help_run_undef_arg('is_type_exists', $type) or return;

   my $r;
   eval {
      my %this_args = (
         index => $index,
      );
      if ($self->use_type) {
         $this_args{type} = $type;
      }
      $r = $es->indices->exists_type(%this_args);
   };
   if ($@) {
      chomp($@);
      return $self->log->error("is_type_exists: failed for index [$index] and ".
         "type [$type]: [$@]");
   }

   return $r ? 1 : 0;
}

#
# Return a boolean to state for document existence
#
sub is_document_exists {
   my $self = shift;
   my ($index, $type, $document) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('is_document_exists', $index) or return;
   $self->brik_help_run_undef_arg('is_document_exists', $type) or return;
   $self->brik_help_run_undef_arg('is_document_exists', $document) or return;
   $self->brik_help_run_invalid_arg('is_document_exists', $document, 'HASH') or return;

   my $r;
   eval {
      my %this_args = (
         index => $index,
         %$document,
      );
      if ($self->use_type) {
         $this_args{type} = $type;
      }
      $r = $es->exists(%this_args);
   };
   if ($@) {
      chomp($@);
      return $self->log->error("is_document_exists: failed for index [$index] and ".
         "type [$type]: [$@]");
   }

   return $r ? 1 : 0;
}

sub parse_error_string {
   my $self = shift;
   my ($string) = @_;

   $self->brik_help_run_undef_arg('parse_error_string', $string) or return;

   # [Timeout] ** [http://X.Y.Z.1:9200]-[599] Timed out while waiting for socket to become ready for reading, called from sub Search::Elasticsearch::Role::Client::Direct::__ANON__ at /usr/local/lib/perl5/site_perl/Metabrik/Client/Elasticsearch.pm line 1466. With vars: {'status_code' => 599,'request' => {'body' => undef,'qs' => {},'ignore' => [],'serialize' => 'std','path' => '/index-thing/_refresh','method' => 'POST'}}

   my ($class, $node, $code, $message, $dump) = $string =~
      m{^\[([^]]+)\] \*\* \[([^]]+)\]\-\[(\d+)\] (.+)\. With vars: (.+)$};

   if (defined($dump) && length($dump)) {
      my $sd = Metabrik::String::Dump->new_from_brik_init($self) or return;
      $dump = $sd->decode($dump);
   }

   # Sanity check
   if (defined($node) && $node =~ m{^http} && $code =~ m{^\d+$}
   &&  defined($dump) && ref($dump) eq 'HASH') {
      return {
         class => $class,
         node => $node,
         code => $code,
         message => $message,
         dump => $dump,
      };
   }

   # Were not able to decode, we return as-is.
   return {
      message => $string,
   };
}

#
# Refresh an index to receive latest additions
#
# Search::Elasticsearch::Client::5_0::Direct::Indices
# https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-refresh.html
#
sub refresh_index {
   my $self = shift;
   my ($index) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('refresh_index', $index) or return;

   my $try = $self->try;

RETRY:

   my $r;
   eval {
      $r = $es->indices->refresh(
         index => $index,
      );
   };
   if ($@) {
      if (--$try == 0) {
         chomp($@);
         my $p = $self->parse_error_string($@);
         if (defined($p) && exists($p->{class})) {
            my $class = $p->{class};
            my $code = $p->{code};
            my $node = $p->{node};
            return $self->log->error("refresh_index: failed for index [$index] ".
               "after [$try] tries with error [$class] code [$code] for node [$node]");
         }
         else {
            return $self->log->error("refresh_index: failed for index [$index] ".
               "after [$try]: [$@]");
         }
      }
      sleep 60;
      goto RETRY;
   }

   return $r;
}

sub export_as {
   my $self = shift;
   my ($format, $index, $size, $cb) = @_;

   $size ||= 10_000;
   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('export_as', $format) or return;
   $self->brik_help_run_undef_arg('export_as', $index) or return;
   $self->brik_help_run_undef_arg('export_as', $size) or return;

   if ($format ne 'csv' && $format ne 'json') {
      return $self->log->error("export_as: unsupported export format ".
         "[$format]");
   }

   my $max = $self->max;
   my $datadir = $self->datadir;

   $self->log->debug("export_as: selecting scroll Command...");

   my $scroll;
   my $version = $self->version or return;
   if ($version lt "5.0.0") {
      $scroll = $self->open_scroll_scan_mode($index, $size) or return;
   }
   else {
      $scroll = $self->open_scroll($index, $size) or return;
   }

   $self->log->debug("export_as: selecting scroll Command...OK.");

   my $fd = Metabrik::File::Dump->new_from_brik_init($self) or return;

   my $out;
   my $csv_header;
   if ($format eq 'csv') {
      $out = Metabrik::File::Csv->new_from_brik_init($self) or return;
      $out->encoding($self->encoding);
      $out->separator(',');
      $out->escape('\\');
      $out->append(1);
      $out->first_line_is_header(0);
      $out->write_header(1);
      $out->use_quoting(1);
      if (defined($self->csv_header)) {
         my $sorted = [ sort { $a cmp $b } @{$self->csv_header} ];
         $out->header($sorted);
      }
      if (defined($self->csv_encoded_fields)) {
         $out->encoded_fields($self->csv_encoded_fields);
      }
      if (defined($self->csv_object_fields)) {
         $out->object_fields($self->csv_object_fields);
      }

      $csv_header = $out->header;
   }
   elsif ($format eq 'json') {
      $out = Metabrik::File::Json->new_from_brik_init($self) or return;
      $out->encoding($self->encoding);
   }

   my $total = $self->total_scroll;
   $self->log->info("export_as: total [$total] for index [$index]");

   my %types = ();
   my $read = 0;
   my $skipped = 0;
   my $exported = 0;
   my $start = time();
   my $done = $datadir."/$index.exported";
   my $start_time = time();
   my %chunk = ();
   while (my $next = $self->next_scroll(10000)) {
      for my $this (@$next) {
         $read++;

         if (defined($cb)) {
            $this = $cb->($this);
            if (! defined($this)) {
               $self->log->error("export_as: callback failed for index ".
                  "[$index] at read [$read], skipping single entry");
               $skipped++;
               next;
            }
         }

         my $id = $this->{_id};
         my $doc = $this->{_source};
         # Prepare for when types will be removed from ES
         my $type = $this->{_type} || '_doc';
         if (! exists($types{$type})) {
            if ($format eq 'csv') {
               # If not given, we guess the CSV fields to use.
               if (! defined($csv_header)) {
                  my $fields = $self->list_index_fields($index, $type)
                     or return;
                  $types{$type}{header} = [ '_id', @$fields ];
               }
               else {
                  $types{$type}{header} = [ '_id', @$csv_header ];
               }

               $types{$type}{output} = $datadir."/$index:$type.csv";
            }
            elsif ($format eq 'json') {
               $types{$type}{output} = $datadir."/$index:$type.json";
            }

            # Verify it has not been exported yet
            if (-f $done) {
               return $self->log->error("export_as: export already done ".
                  "for index [$index]");
            }

            $self->log->info("export_as: exporting to file [".
               $types{$type}{output}."] for type [$type], using ".
               "chunk size of [$size]");
         }

         my $h = { _id => $id };

         for my $k (keys %$doc) {
            $h->{$k} = $doc->{$k};
         }

         if ($format eq 'csv') {
            $out->header($types{$type}{header});
         }

         push @{$chunk{$type}}, $h;
         if (@{$chunk{$type}} > 999) {
            my $r = $out->write($chunk{$type}, $types{$type}{output});
            if (!defined($r)) {
               $self->log->warning("export_as: unable to process entry, ".
                  "skipping");
               $skipped++;
               next;
            }
            $chunk{$type} = [];
         }

         # Log a status sometimes.
         if (! (++$exported % 100_000)) {
            my $now = time();
            my $perc = sprintf("%.02f", $exported / $total * 100);
            $self->log->info("export_as: fetched [$exported/$total] ".
               "($perc%) elements in ".($now - $start)." second(s) ".
               "from index [$index]");
            $start = time();
         }

         # Limit export to specified maximum
         if ($max > 0 && $exported >= $max) {
            $self->log->info("export_as: max export reached [$exported] ".
               "for index [$index], stopping");
            last;
         }
      }
   }

   # Process remaining data waiting to be written and build output file list
   my %files = ();
   for my $type (keys %types) {
      if (@{$chunk{$type}} > 0) {
         $out->write($chunk{$type}, $types{$type}{output});
         $files{$types{$type}{output}}++;
      }
   }

   $self->close_scroll;

   my $stop_time = time();
   my $duration = $stop_time - $start_time;
   my $eps = $exported;
   if ($duration > 0) {
      $eps = $exported / $duration;
   }

   my $result = {
      read => $read,
      exported => $exported,
      skipped => $read - $exported,
      total_count => $total,
      complete => ($exported == $total) ? 1 : 0,
      duration => $duration,
      eps => $eps, 
      files => [ sort { $a cmp $b } keys %files ],
   };

   # Say the file has been processed, and put resulting stats.
   $fd->write($result, $done) or return;

   $self->log->info("export_as: done.");

   return $result;
}

sub export_as_csv {
   my $self = shift;
   my ($index, $size, $cb) = @_;

   $size ||= 10_000;
   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('export_as_csv', $index) or return;
   $self->brik_help_run_undef_arg('export_as_csv', $size) or return;

   return $self->export_as('csv', $index, $size, $cb);
}

sub export_as_json {
   my $self = shift;
   my ($index, $size, $cb) = @_;

   $size ||= 10_000;
   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('export_as_json', $index) or return;
   $self->brik_help_run_undef_arg('export_as_json', $size) or return;

   return $self->export_as('json', $index, $size, $cb);
}

#
# Optimization instructions:
# https://www.elastic.co/guide/en/elasticsearch/reference/master/tune-for-indexing-speed.html
#
sub import_from {
   my $self = shift;
   my ($format, $input, $index, $type, $hash, $cb) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('import_from', $format) or return;
   $self->brik_help_run_undef_arg('import_from', $input) or return;
   $self->brik_help_run_file_not_found('import_from', $input) or return;

   if ($format ne 'csv' && $format ne 'json') {
      return $self->log->error("import_from: unsupported export format ".
         "[$format]");
   }

   # If index and/or types are not defined, we try to get them from
   # input filename
   if (! defined($index) || ! defined($type)) {
      # Example: index-DATE:type.csv
      if ($input =~ m{^(.+):(.+)\.(?:csv|json)(?:.*)?$}) {
         my ($this_index, $this_type) = $input =~
            m{^(.+):(.+)\.(?:csv|json)(?:.*)?$};
         $index ||= $this_index;
         $type ||= $this_type;
      }
   }

   # Verify it has not been indexed yet
   my $done = "$input.imported";
   if (-f $done) {
      $self->log->info("import_from: import already done for file ".
         "[$input]");
      return 0;
   }

   # And default to Attributes if guess failed.
   $index ||= $self->index;
   $type ||= $self->type;
   $self->brik_help_set_undef_arg('index', $index) or return;
   $self->brik_help_set_undef_arg('type', $type) or return;

   if ($index eq '*') {
      return $self->log->error("import_from: cannot import to invalid ".
         "index [$index]");
   }
   if ($self->use_type) {
      if ($type eq '*') {
         return $self->log->error("import_from: cannot import to invalid ".
            "type [$type]");
      }
   }

   $self->log->debug("input [$input]");
   $self->log->debug("index [$index]");
   $self->log->debug("type [$type]");

   my $count_before = 0;
   if ($self->is_index_exists($index)) {
      $count_before = $self->count($index, $type);
      if (! defined($count_before)) {
         return;
      }
      $self->log->info("import_from: current index [$index] count is ".
         "[$count_before]");
   }

   my $max = $self->max;

   $self->open_bulk_mode($index, $type) or return;

   $self->log->info("import_from: importing file [$input] to index ".
      "[$index] with type [$type]");

   my $fd = Metabrik::File::Dump->new_from_brik_init($self) or return;

   my $out;
   if ($format eq 'csv') {
      $out = Metabrik::File::Csv->new_from_brik_init($self) or return;
      $out->encoding($self->encoding);
      $out->separator(',');
      $out->escape('\\');
      $out->first_line_is_header(1);
      $out->encoded_fields($self->csv_encoded_fields);
      $out->object_fields($self->csv_object_fields);
   }
   elsif ($format eq 'json') {
      $out = Metabrik::File::Json->new_from_brik_init($self) or return;
      $out->encoding($self->encoding);
   }

   my $refresh_interval;
   my $number_of_replicas;
   my $start = time();
   my $speed_settings = {};
   my $imported = 0;
   my $first = 1;
   my $read = 0;
   my $skipped_chunks = 0;
   my $start_time = time();
   while (my $this = $out->read_next($input)) {
      $read++;

      my $h = {};
      my $id = $self->use_ignore_id ? undef : $this->{_id};
      delete $this->{_id};
      for my $k (keys %$this) {
         my $value = $this->{$k};
         # We keep only fields when they have a value.
         # No need to index data that is empty.
         if (defined($value) && length($value)) {
            $h->{$k} = $value;
         }
      }

      if (defined($cb)) {
         $h = $cb->($h);
         if (! defined($h)) {
            $self->log->error("import_from: callback failed for ".
               "index [$index] at read [$read], skipping single entry");
            $skipped_chunks++;
            next;
         }
      }

      # Set routing based on the provided field name, if any.
      my $this_hash;
      if (defined($hash) && defined($hash->{routing})
      &&  defined($h->{$hash->{routing}})) {
         $this_hash = { %$hash };  # Make a copy to avoid overwriting
                                   # user provided value.
         $this_hash->{routing} = $h->{$hash->{routing}};
      }

      #$self->log->info(Data::Dumper::Dumper($h));

      my $r;
      eval {
         $r = $self->index_bulk($h, $index, $type, $this_hash, $id);
      };
      if ($@) {
         chomp($@);
         $self->log->warning("import_from: error [$@]");
      }
      if (! defined($r)) {
         $self->log->error("import_from: bulk processing failed for ".
            "index [$index] at read [$read], skipping chunk");
         $skipped_chunks++;
         next;
      }

      # Gather index settings, and set values for speed.
      # We don't do it earlier, cause we need index to be created,
      # and it should have been done from index_bulk Command.
      if ($first && $self->is_index_exists($index)) {
         # Save current values so we can restore them at the end of Command.
         # We ignore errors here, this is non-blocking for indexing.
         $refresh_interval = $self->get_index_refresh_interval($index);
         $refresh_interval = $refresh_interval->{$index};
         $number_of_replicas = $self->get_index_number_of_replicas($index);
         $number_of_replicas = $number_of_replicas->{$index};
         if ($self->use_indexing_optimizations) {
            $self->set_index_number_of_replicas($index, 0);
         }
         $self->set_index_refresh_interval($index, -1);
         $first = 0;
      }

      # Log a status sometimes.
      if (! (++$imported % 100_000)) {
         my $now = time();
         $self->log->info("import_from: imported [$imported] entries in ".
            ($now - $start)." second(s) to index [$index]");
         $start = time();
      }

      # Limit import to specified maximum
      if ($max > 0 && $imported >= $max) {
         $self->log->info("import_from: max import reached [$imported] for ".
            "index [$index], stopping");
         last;
      }
   }

   $self->bulk_flush;

   my $stop_time = time();
   my $duration = $stop_time - $start_time;
   my $eps = sprintf("%.02f", $imported / ($duration || 1)); # Avoid divide by zero error.

   $self->refresh_index($index);

   my $count_current = $self->count($index, $type) or return;
   $self->log->info("import_from: after index [$index] count is [$count_current] ".
      "at EPS [$eps]");

   my $skipped = 0;
   my $complete = (($count_current - $count_before) == $read) ? 1 : 0;
   if ($complete) {  # If complete, import has been retried, and everything is now ok.
      $imported = $read;
   }
   else {
      $skipped = $read - ($count_current - $count_before);
   }

   my $result = {
      read => $read,
      imported => $imported,
      skipped => $skipped,
      previous_count => $count_before,
      current_count => $count_current,
      complete => $complete,
      duration => $duration,
      eps => $eps,
   };

   # Say the file has been processed, and put resulting stats.
   $fd->write($result, $done) or return;

   # Restore previous settings, if any
   if (defined($refresh_interval)) {
      $self->set_index_refresh_interval($index, $refresh_interval);
   }
   if (defined($number_of_replicas) && $self->use_indexing_optimizations) {
      $self->set_index_number_of_replicas($index, $number_of_replicas);
   }

   return $result;
}

sub import_from_csv {
   my $self = shift;
   my ($input, $index, $type, $hash, $cb) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('import_from_csv', $input) or return;
   $self->brik_help_run_file_not_found('import_from_csv', $input)
      or return;

   return $self->import_from('csv', $input, $index, $type, $hash, $cb);
}

sub import_from_json {
   my $self = shift;
   my ($input, $index, $type, $hash, $cb) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('import_from_json', $input) or return;
   $self->brik_help_run_file_not_found('import_from_json', $input)
      or return;

   return $self->import_from('json', $input, $index, $type, $hash, $cb);
}

#
# Same as import_from_csv Command but in worker mode for speed.
#
sub import_from_csv_worker {
   my $self = shift;
   my ($input_csv, $index, $type, $hash, $cb) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('import_from_csv_worker', $input_csv)
      or return;
   $self->brik_help_run_file_not_found('import_from_csv_worker', $input_csv)
      or return;

   # If index and/or types are not defined, we try to get them from input filename
   if (! defined($index) || ! defined($type)) {
      # Example: index-DATE:type.csv
      if ($input_csv =~ m{^(.+):(.+)\.csv(?:.*)?$}) {
         my ($this_index, $this_type) = $input_csv =~ m{^(.+):(.+)\.csv(?:.*)?$};
         $index ||= $this_index;
         $type ||= $this_type;
      }
   }

   # Verify it has not been indexed yet
   my $done = "$input_csv.imported";
   if (-f $done) {
      $self->log->info("import_from_csv_worker: import already done for ".
         "file [$input_csv]");
      return 0;
   }

   # And default to Attributes if guess failed.
   $index ||= $self->index;
   $type ||= $self->type;
   $self->brik_help_set_undef_arg('index', $index) or return;
   $self->brik_help_set_undef_arg('type', $type) or return;

   if ($index eq '*') {
      return $self->log->error("import_from_csv_worker: cannot import to invalid ".
         "index [$index]");
   }
   if ($self->use_type) {
      if ($type eq '*') {
         return $self->log->error("import_from_csv_worker: cannot import to ".
            "invalid type [$type]");
      }
   }

   $self->log->debug("input [$input_csv]");
   $self->log->debug("index [$index]");
   $self->log->debug("type [$type]");

   my $count_before = 0;
   if ($self->is_index_exists($index)) {
      $count_before = $self->count($index, $type);
      if (! defined($count_before)) {
         return;
      }
      $self->log->info("import_from_csv_worker: current index [$index] count is ".
         "[$count_before]");
   }

   my $max = $self->max;

   $self->open_bulk_mode($index, $type) or return;

   #my $batch = undef;
   my $batch = 10_000;

   $self->log->info("import_from_csv_worker: importing file [$input_csv] to ".
      "index [$index] with type [$type] and batch [$batch]");

   my $fd = Metabrik::File::Dump->new_from_brik_init($self) or return;

   my $fc = Metabrik::File::Csv->new_from_brik_init($self) or return;
   $fc->separator(',');
   $fc->escape('\\');
   $fc->first_line_is_header(1);
   $fc->encoded_fields($self->csv_encoded_fields);
   $fc->object_fields($self->csv_object_fields);

   my $wp = Metabrik::Worker::Parallel->new_from_brik_init($self) or return;
   $wp->pool_size(2);

   $wp->create_manager or return;

   my $refresh_interval;
   my $number_of_replicas;
   my $start = time();
   my $speed_settings = {};
   my $imported = 0;
   my $first = 1;
   my $read = 0;
   my $skipped_chunks = 0;
   my $start_time = time();
   while (my $list = $fc->read_next($input_csv, $batch)) {

      $wp->start(sub {
         my @list = ();
         for my $this (@$list) {
            $read++;

            my $h = {};
            my $id = $this->{_id};
            delete $this->{_id};
            for my $k (keys %$this) {
               my $value = $this->{$k};
               # We keep only fields when they have a value.
               # No need to index data that is empty.
               if (defined($value) && length($value)) {
                  $h->{$k} = $value;
               }
            }

            if (defined($cb)) {
               $h = $cb->($h);
               if (! defined($h)) {
                  $self->log->error("import_from_csv_worker: callback failed for ".
                     "index [$index] at read [$read], skipping single entry");
                  $skipped_chunks++;
                  next;
               }
            }

            push @list, $h;
         }

         my $r;
         eval {
            $r = $self->index_bulk_from_list(\@list, $index, $type, $hash);
         };
         if ($@) {
            chomp($@);
            $self->log->warning("import_from_csv_worker: error [$@]");
         }
         if (! defined($r)) {
            $self->log->error("import_from_csv_worker: bulk processing failed for ".
               "index [$index] at read [$read], skipping chunk");
            $skipped_chunks++;
            next;
         }

         # Log a status sometimes.
         if (! ($imported % 10_000)) {
            my $now = time();
            my $diff = sprintf("%.02f", $now - $start);
            my $eps = sprintf("%.02f", $imported / $diff);
            $self->log->info("import_from_csv_worker: imported [$imported] entries ".
               "in [$diff] second(s) to index [$index] at EPS [$eps]");
            $start = time();
         }

         exit(0);
      });

      # Gather index settings, and set values for speed.
      # We don't do it earlier, cause we need index to be created,
      # and it should have been done from index_bulk Command.
      if ($first && $self->is_index_exists($index)) {
         # Save current values so we can restore them at the end of Command.
         # We ignore errors here, this is non-blocking for indexing.
         $refresh_interval = $self->get_index_refresh_interval($index);
         $refresh_interval = $refresh_interval->{$index};
         $number_of_replicas = $self->get_index_number_of_replicas($index);
         $number_of_replicas = $number_of_replicas->{$index};
         if ($self->use_indexing_optimizations) {
            $self->set_index_number_of_replicas($index, 0);
         }
         $self->set_index_refresh_interval($index, -1);
         $first = 0;
      }

      # Log a status sometimes.
      #$imported += @$list;
      #if (! ($imported % 10_000)) {
         #my $now = time();
         #my $diff = sprintf("%.02f", $now - $start);
         #my $eps = sprintf("%.02f", 10_000 / $diff);
         #$self->log->info("import_from_csv_worker: imported [$imported] entries ".
            #"in [$diff] second(s) to index [$index] at EPS [$eps]");
         #$start = time();
      #}

      # Limit import to specified maximum
      if ($max > 0 && $imported >= $max) {
         $self->log->info("import_from_csv_worker: max import reached [$imported] for ".
            "index [$index], stopping");
         last;
      }

      last if (@$list < $batch);

      $imported += @$list;
   }

   $wp->stop;

   $self->bulk_flush;

   my $stop_time = time();
   my $duration = $stop_time - $start_time;
   my $eps = sprintf("%.02f", $imported / ($duration || 1)); # Avoid divide by zero error.

   $self->refresh_index($index);

   my $count_current = $self->count($index, $type) or return;
   $self->log->info("import_from_csv_worker: after index [$index] count ".
      "is [$count_current] at EPS [$eps]");

   my $skipped = 0;
   my $complete = (($count_current - $count_before) == $read) ? 1 : 0;
   if ($complete) {  # If complete, import has been retried, and everything is now ok.
      $imported = $read;
   }
   else {
      $skipped = $read - ($count_current - $count_before);
   }

   my $result = {
      read => $read,
      imported => $imported,
      skipped => $skipped,
      previous_count => $count_before,
      current_count => $count_current,
      complete => $complete,
      duration => $duration,
      eps => $eps,
   };

   # Say the file has been processed, and put resulting stats.
   $fd->write($result, $done) or return;

   # Restore previous settings, if any
   if (defined($refresh_interval)) {
      $self->set_index_refresh_interval($index, $refresh_interval);
   }
   if (defined($number_of_replicas) && $self->use_indexing_optimizations) {
      $self->set_index_number_of_replicas($index, $number_of_replicas);
   }

   return $result;
}

#
# http://localhost:9200/_nodes/stats/process?pretty
#
# Search::Elasticsearch::Client::2_0::Direct::Nodes
#
sub get_stats_process {
   my $self = shift;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;

   my $r;
   eval {
      $r = $es->nodes->stats(
         metric => [ qw(process) ],
      );
   };
   if ($@) {
      chomp($@);
      return $self->log->error("get_stats_process: failed: [$@]");
   }

   return $r;
}

#
# curl http://localhost:9200/_nodes/process?pretty
#
# Search::Elasticsearch::Client::2_0::Direct::Nodes
#
sub get_process {
   my $self = shift;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;

   my $r;
   eval {
      $r = $es->nodes->info(
         metric => [ qw(process) ],
      );
   };
   if ($@) {
      chomp($@);
      return $self->log->error("get_process: failed: [$@]");
   }

   return $r;
}

#
# Search::Elasticsearch::Client::2_0::Direct::Cluster
#
sub get_cluster_state {
   my $self = shift;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;

   my $r;
   eval {
      $r = $es->cluster->state;
   };
   if ($@) {
      chomp($@);
      return $self->log->error("get_cluster_state: failed: [$@]");
   }

   return $r;
}

#
# Search::Elasticsearch::Client::2_0::Direct::Cluster
#
sub get_cluster_health {
   my $self = shift;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;

   my $r;
   eval {
      $r = $es->cluster->health;
   };
   if ($@) {
      chomp($@);
      return $self->log->error("get_cluster_health: failed: [$@]");
   }

   return $r;
}

#
# Search::Elasticsearch::Client::2_0::Direct::Cluster
#
sub get_cluster_settings {
   my $self = shift;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;

   my $r;
   eval {
      $r = $es->cluster->get_settings;
   };
   if ($@) {
      chomp($@);
      return $self->log->error("get_cluster_settings: failed: [$@]");
   }

   return $r;
}

#
# Search::Elasticsearch::Client::2_0::Direct::Cluster
#
sub put_cluster_settings {
   my $self = shift;
   my ($settings) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('put_cluster_settings', $settings) or return;
   $self->brik_help_run_invalid_arg('put_cluster_settings', $settings, 'HASH') or return;

   my %args = (
      body => $settings,
   );

   my $r;
   eval {
      $r = $es->cluster->put_settings(%args);
   };
   if ($@) {
      chomp($@);
      return $self->log->error("put_cluster_settings: failed: [$@]");
   }

   return $r;
}

sub count_green_indices {
   my $self = shift;

   my $get = $self->show_indices or return;

   my $count = 0;
   for (@$get) {
      if (/^\s*green\s+/) {
         $count++;
      }
   }

   return $count;
}

sub count_yellow_indices {
   my $self = shift;

   my $get = $self->show_indices or return;

   my $count = 0;
   for (@$get) {
      if (/^\s*yellow\s+/) {
         $count++;
      }
   }

   return $count;
}

sub count_red_indices {
   my $self = shift;

   my $get = $self->show_indices or return;

   my $count = 0;
   for (@$get) {
      if (/^\s*red\s+/) {
         $count++;
      }
   }

   return $count;
}

sub count_indices {
   my $self = shift;

   my $get = $self->show_indices or return;

   return scalar @$get;
}

sub list_indices_status {
   my $self = shift;

   my $get = $self->show_indices or return;

   my $count_red = 0;
   my $count_yellow = 0;
   my $count_green = 0;
   for (@$get) {
      if (/^\s*red\s+/) {
         $count_red++;
      }
      elsif (/^\s*yellow\s+/) {
         $count_yellow++;
      }
      elsif (/^\s*green\s+/) {
         $count_green++;
      }
   }

   return {
      red => $count_red,
      yellow => $count_yellow,
      green => $count_green,
   };
}

sub count_shards {
   my $self = shift;

   my $indices = $self->get_indices or return;

   my $count = 0;
   for (@$indices) {
      $count += $_->{shards};
   }

   return $count;
}

sub count_size {
   my $self = shift;
   my ($string) = @_;

   my $indices = $self->get_indices($string) or return;

   my $fn = Metabrik::Format::Number->new_from_brik_init($self) or return;
   $fn->decimal_point(".");
   $fn->kibi_suffix("kb");
   $fn->mebi_suffix("mb");
   $fn->gibi_suffix("gb");
   $fn->kilo_suffix("KB");
   $fn->mega_suffix("MB");
   $fn->giga_suffix("GB");

   my $size = 0;
   for (@$indices) {
      $size += $fn->to_number($_->{size});
   }

   return $fn->from_number($size);
}

sub count_total_size {
   my $self = shift;
   my ($string) = @_;

   my $indices = $self->get_indices($string) or return;

   my $fn = Metabrik::Format::Number->new_from_brik_init($self) or return;
   $fn->decimal_point(".");
   $fn->kibi_suffix("kb");
   $fn->mebi_suffix("mb");
   $fn->gibi_suffix("gb");
   $fn->kilo_suffix("KB");
   $fn->mega_suffix("MB");
   $fn->giga_suffix("GB");

   my $size = 0;
   for (@$indices) {
      $size += $fn->to_number($_->{total_size});
   }

   return $fn->from_number($size);
}

sub count_count {
   my $self = shift;

   my $indices = $self->get_indices or return;

   my $fn = Metabrik::Format::Number->new_from_brik_init($self) or return;
   $fn->kilo_suffix('k');
   $fn->mega_suffix('m');
   $fn->giga_suffix('M');

   my $count = 0;
   for (@$indices) {
      $count += $_->{count};
   }

   return $fn->from_number($count);
}

sub list_green_indices {
   my $self = shift;

   my $get = $self->get_indices or return;

   my @indices = ();
   for (@$get) {
      if ($_->{color} eq 'green') {
         push @indices, $_->{index};
      }
   }

   return \@indices;
}

sub list_yellow_indices {
   my $self = shift;

   my $get = $self->get_indices or return;

   my @indices = ();
   for (@$get) {
      if ($_->{color} eq 'yellow') {
         push @indices, $_->{index};
      }
   }

   return \@indices;
}

sub list_red_indices {
   my $self = shift;

   my $get = $self->get_indices or return;

   my @indices = ();
   for (@$get) {
      if ($_->{color} eq 'red') {
         push @indices, $_->{index};
      }
   }

   return \@indices;
}

#
# https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html
#
sub list_datatypes {
   my $self = shift;

   return {
      core => [ qw(string long integer short byte double float data boolean binary) ],
   };
}

#
# Return total hits for last www_search
#
sub get_hits_total {
   my $self = shift;
   my ($run) = @_;

   $self->brik_help_run_undef_arg('get_hits_total', $run) or return;

   if (ref($run) eq 'HASH') {
      if (exists($run->{hits}) && exists($run->{hits}{total})) {
         # In ES 7.x, total is now a hash. We rewrite it to only keep the
         # number:
         if (ref($run->{hits}{total}) eq 'HASH') {
            return $run->{hits}{total}{value};
         }
         return $run->{hits}{total};
      }
   }

   return $self->log->error("get_hits_total: last Command not compatible");
}

sub disable_shard_allocation {
   my $self = shift;

   my $settings = {
      persistent => {
         'cluster.routing.allocation.enable' => 'none',
      }
   };

   return $self->put_cluster_settings($settings);
}

sub enable_shard_allocation {
   my $self = shift;

   my $settings = {
      persistent => { 
         'cluster.routing.allocation.enable' => 'all',
      }
   };

   return $self->put_cluster_settings($settings);
}

sub flush_synced {
   my $self = shift;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;

   my $r;
   eval {
      $r = $es->indices->flush_synced;
   };
   if ($@) {
      chomp($@);
      return $self->log->error("flush_synced: failed: [$@]");
   }

   return $r;
}

#
# https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html
#
# run client::elasticsearch create_snapshot_repository myrepo 
#      "{ type => 'fs', settings => { compress => 'true', location => '/path/' } }"
#
# You have to set path.repo in elasticsearch.yml like:
# path.repo: ["/home/gomor/es-backups"]
#
# Search::Elasticsearch::Client::2_0::Direct::Snapshot
#
sub create_snapshot_repository {
   my $self = shift;
   my ($body, $repository_name) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('create_snapshot_repository', $body) or return;

   $repository_name ||= 'repository';

   my %args = (
      repository => $repository_name,
      body => $body,
   );

   my $r;
   eval {
      $r = $es->snapshot->create_repository(%args);
   };
   if ($@) {
      chomp($@);
      return $self->log->error("create_snapshot_repository: failed: [$@]");
   }

   return $r;
}

sub create_shared_fs_snapshot_repository {
   my $self = shift;
   my ($location, $repository_name) = @_;

   $repository_name ||= 'repository';
   $self->brik_help_run_undef_arg('create_shared_fs_snapshot_repository', $location) or return;

   if ($location !~ m{^/}) {
      return $self->log->error("create_shared_fs_snapshot_repository: you have to give ".
         "a full directory path, this one is invalid [$location]");
   }

   my $body = {
      #type => 'fs',
      settings => {
         compress => 'true',
         location => $location,
      },
   };

   return $self->create_snapshot_repository($body, $repository_name);
}

#
# Search::Elasticsearch::Client::2_0::Direct::Snapshot
#
sub get_snapshot_repositories {
   my $self = shift;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;

   my $r;
   eval {
      $r = $es->snapshot->get_repository;
   };
   if ($@) {
      chomp($@);
      return $self->log->error("get_snapshot_repositories: failed: [$@]");
   }

   return $r;
}

#
# Search::Elasticsearch::Client::2_0::Direct::Snapshot
#
sub get_snapshot_status {
   my $self = shift;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;

   my $r;
   eval {
      $r = $es->snapshot->status;
   };
   if ($@) {
      chomp($@);
      return $self->log->error("get_snapshot_status: failed: [$@]");
   }

   return $r;
}

#
# Search::Elasticsearch::Client::5_0::Direct::Snapshot
#
sub create_snapshot {
   my $self = shift;
   my ($snapshot_name, $repository_name, $body) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;

   $snapshot_name ||= 'snapshot';
   $repository_name ||= 'repository';

   my %args = (
      repository => $repository_name,
      snapshot => $snapshot_name,
   );
   if (defined($body)) {
      $args{body} = $body;
   }

   my $r;
   eval {
      $r = $es->snapshot->create(%args);
   };
   if ($@) {
      chomp($@);
      return $self->log->error("create_snapshot: failed: [$@]");
   }

   return $r;
}

sub create_snapshot_for_indices {
   my $self = shift;
   my ($indices, $snapshot_name, $repository_name) = @_;

   $self->brik_help_run_undef_arg('create_snapshot_for_indices', $indices) or return;

   $snapshot_name ||= 'snapshot';
   $repository_name ||= 'repository';

   my $body = {
      indices => $indices,
   };

   return $self->create_snapshot($snapshot_name, $repository_name, $body);
}

sub is_snapshot_finished {
   my $self = shift;

   my $status = $self->get_snapshot_status or return;

   if (@{$status->{snapshots}} == 0) {
      return 1;
   }

   return 0;
}

sub get_snapshot_state {
   my $self = shift;

   if ($self->is_snapshot_finished) {
      return $self->log->info("get_snapshot_state: is already finished");
   }

   my $status = $self->get_snapshot_status or return;

   my @indices_done = ();
   my @indices_not_done = ();

   my $list = $status->{snapshots};
   for my $snapshot (@$list) {
      my $indices = $snapshot->{indices};
      for my $index (@$indices) {
         my $done = $index->{shards_stats}{done};
         if ($done) {
            push @indices_done, $index;
         }
         else {
            push @indices_not_done, $index;
         }
      }
   }

   return { done => \@indices_done, not_done => \@indices_not_done };
}

sub verify_snapshot_repository {
}

sub delete_snapshot_repository {
   my $self = shift;
   my ($repository_name) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('delete_snapshot_repository', $repository_name) or return;

   my $r;
   eval {
      $r = $es->snapshot->delete_repository(
         repository => $repository_name,
      );
   };
   if ($@) {
      chomp($@);
      return $self->log->error("delete_snapshot_repository: failed: [$@]");
   }

   return $r;
}

sub get_snapshot {
   my $self = shift;
   my ($snapshot_name, $repository_name) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;

   $snapshot_name ||= 'snapshot';
   $repository_name ||= 'repository';

   my $r;
   eval {
      $r = $es->snapshot->get(
         repository => $repository_name,
         snapshot => $snapshot_name,
      );
   };
   if ($@) {
      chomp($@);
      return $self->log->error("get_snapshot: failed: [$@]");
   }

   return $r;
}

#
# Search::Elasticsearch::Client::5_0::Direct::Snapshot
#
sub delete_snapshot {
   my $self = shift;
   my ($snapshot_name, $repository_name) = @_;

   my $es = $self->_es;
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('delete_snapshot', $snapshot_name) or return;
   $self->brik_help_run_undef_arg('delete_snapshot', $repository_name) or return;

   my $timeout = $self->rtimeout;

   my $r;
   eval {
      $r = $es->snapshot->delete(
         repository => $repository_name,
         snapshot => $snapshot_name,
         master_timeout => "${timeout}s",
      );
   };
   if ($@) {
      chomp($@);
      return $self->log->error("delete_snapshot: failed: [$@]");
   }

   return $r;
}

#
# https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html
#
sub restore_snapshot {
   my $self = shift;
   my ($snapshot_name, $repository_name, $body) = @_;

   my $es = $self->_es;
   $snapshot_name ||= 'snapshot';
   $repository_name ||= 'repository';
   $self->brik_help_run_undef_arg('open', $es) or return;
   $self->brik_help_run_undef_arg('restore_snapshot', $snapshot_name) or return;
   $self->brik_help_run_undef_arg('restore_snapshot', $repository_name) or return;

   my %args = (
      repository => $repository_name,
      snapshot => $snapshot_name,
   );
   if (defined($body)) {
      $args{body} = $body;
   }

   my $r;
   eval {
      $r = $es->snapshot->restore(%args);
   };
   if ($@) {
      chomp($@);
      return $self->log->error("restore_snapshot: failed: [$@]");
   }

   return $r;
}

sub restore_snapshot_for_indices {
   my $self = shift;
   my ($indices, $snapshot_name, $repository_name) = @_;

   $snapshot_name ||= 'snapshot';
   $repository_name ||= 'repository';
   $self->brik_help_run_undef_arg('restore_snapshot_for_indices', $indices) or return;
   $self->brik_help_run_undef_arg('restore_snapshot_for_indices', $snapshot_name) or return;
   $self->brik_help_run_undef_arg('restore_snapshot_for_indices', $repository_name) or return;

   my $body = {
      indices => $indices,
   };

   return $self->restore_snapshot($snapshot_name, $repository_name, $body);
}

# shard occupation
#
# curl -XGET "http://127.0.0.1:9200/_cat/shards?v
# Or https://www.elastic.co/guide/en/elasticsearch/reference/1.6/cluster-nodes-stats.html
#
# disk occuption:
# curl -XGET http://127.0.0.1:9200/_cat/nodes?h=ip,h,diskAvail,diskTotal
# 
#
# Who is master: curl -XGET http://127.0.0.1:9200/_cat/master?v
#

# Check memory lock

# curl -XGET 'localhost:9200/_nodes?filter_path=**.mlockall&pretty'
# {
#  "nodes" : {
#    "3XXX" : {
#      "process" : {
#        "mlockall" : true
#      }
#    }
#  }
# }

1;

__END__

=head1 NAME

Metabrik::Client::Elasticsearch - client::elasticsearch Brik

=head1 SYNOPSIS

   host:~> my $q = { term => { ip => "192.168.57.19" } }
   host:~> run client::elasticsearch open
   host:~> run client::elasticsearch query $q data-*

=head1 DESCRIPTION

Template to write a new Metabrik Brik.

=head1 COPYRIGHT AND LICENSE

Copyright (c) 2014-2022, Patrice E<lt>GomoRE<gt> Auffret

You may distribute this module under the terms of The BSD 3-Clause License.
See LICENSE file in the source distribution archive.

=head1 AUTHOR

Patrice E<lt>GomoRE<gt> Auffret

=cut


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.