App-ElasticSearch-Utilities/scripts/es-search.pl
#!perl
# PODNAME: es-search.pl
# ABSTRACT: Provides a CLI for quick searches of data in ElasticSearch daily indexes
use strict;
use warnings;
use App::ElasticSearch::Utilities qw(:all);
use App::ElasticSearch::Utilities::Query;
use App::ElasticSearch::Utilities::QueryString;
use Carp;
use CLI::Helpers qw(:all);
use Getopt::Long qw(:config no_ignore_case no_ignore_case_always);
use JSON::MaybeXS qw(:legacy);
use Pod::Usage;
use POSIX qw(strftime);
use Ref::Util qw(is_ref is_arrayref is_hashref);
use Time::HiRes qw(sleep time);
use YAML::XS;
local $YAML::XS::Boolean = "JSON::PP";
#------------------------------------------------------------------------#
# Argument Parsing
my %OPT;
GetOptions(\%OPT, qw(
all
asc
bases
bg-filter=s
by=s
desc
exists=s@
fields
filter
format=s
help|h
json|jq
manual|m
match-all
max-batch-size=i
missing=s@
no-decorators|no-header
no-implications|no-imply
precision=i
prefix=s@
pretty
show=s@
size|n|limit=i
sort=s
tail
timestamp=s
top=s
interval=s
with=s@
with-missing
or
));
# Search string is the rest of the argument string
my $context = $OPT{filter} ? 'filter' : 'must';
my $qs = App::ElasticSearch::Utilities::QueryString->new(
$OPT{filter} ? (context => 'filter') : (),
default_join => $OPT{or} ? 'OR' : 'AND',
);
my $q = exists $OPT{'match-all'} && $OPT{'match-all'}
? App::ElasticSearch::Utilities::Query->new($context => { match_all => {} })
: $qs->expand_query_string(@ARGV);
$q->set_timeout('10s');
$q->set_scroll('30s');
if( exists $OPT{prefix} ){
foreach my $prefix (@{ $OPT{prefix} }) {
my ($f,$v) = split /:/, $prefix, 2;
next unless $f && $v;
$q->add_bool( $context => { prefix => { $f => $v } } );
}
}
#------------------------------------------------------------------------#
# Documentation
pod2usage({-sections => 'SYNOPSIS'}) if $OPT{help};
pod2usage(-exitval => 0, -verbose => 2) if $OPT{manual};
my $unknown_options = join ', ', grep /^--/, @ARGV;
pod2usage({-exitval => 1, -sections => 'SYNOPSIS', -msg =>"Unknown option(s): $unknown_options"}) if $unknown_options;
#--------------------------------------------------------------------------#
# Information Gathering Routines
if( $OPT{bases} ) {
show_bases();
exit 0;
}
#--------------------------------------------------------------------------#
# App Config
my %CONFIG = (
size => ($OPT{size} && $OPT{size} > 0) ? int($OPT{size}) : 20,
format => $OPT{json} ? 'json'
: $OPT{format} ? lc $OPT{format}
: 'yaml',
'max-batch-size' => $OPT{'max-batch-size'} || 50,
precision => $OPT{precision} || 3,
$OPT{timestamp} ? ( timestamp => $OPT{timestamp} ) : (),
);
$OPT{'no-decorators'} = 1 if $CONFIG{format} eq 'json';
$CONFIG{pretty} = $OPT{pretty} ? 1
: $CONFIG{format} =~ /pretty/ ? 1
: 0;
$CONFIG{decimal_format} = "%0.$CONFIG{precision}f";
#------------------------------------------------------------------------#
# Handle Indices
my $ORDER = exists $OPT{asc} && $OPT{asc} ? 'asc' : 'desc';
$ORDER = 'asc' if exists $OPT{tail};
my %by_age = ();
my %indices = map { $_ => (es_index_days_old($_) || 0) } es_indices();
die "# Failed to retrieve any indices using your paramaters." unless keys %indices;
my %FIELDS = ();
my $TimeStampCheck=0;
foreach my $index (sort by_index_age keys %indices) {
my $age = $indices{$index};
$by_age{$age} ||= [];
push @{ $by_age{$age} }, $index;
my $fields = es_index_fields($index);
foreach my $k ( keys %{ $fields } ) {
$FIELDS{$k} = $fields->{$k}
unless $FIELDS{$k};
}
# Lookup the Index in our local YAML
if( !$TimeStampCheck ) {
$TimeStampCheck++;
$CONFIG{timestamp} ||= es_local_index_meta(timestamp => $index);
}
}
# Set fields so we know how to construct complex aggs
$q->fields_meta( \%FIELDS );
#------------------------------------------------------------------------#
# Figure out the timestamp
$CONFIG{timestamp} ||= es_globals('timestamp') || '@timestamp';
debug_var(\%by_age);
my @AGES = sort { $ORDER eq 'asc' ? $b <=> $a : $a <=> $b } keys %by_age;
# Figure out if we summarize
$CONFIG{summary} = @AGES > 1 && $OPT{top} && ( !$OPT{by} && !$OPT{with} && !$OPT{interval} );
debug({color=>"cyan"}, "Fields discovered.");
if( $OPT{fields} ) {
show_fields();
exit 0;
}
# Attempt date autodiscovery
if( !exists $FIELDS{$CONFIG{timestamp}} ) {
my @dates = grep { $FIELDS{$_}->{type} eq 'date' } keys %FIELDS;
if( @dates == 0 ) {
output({color=>'red',stderr=>1},"FATAL: No date fields found in the indices specified" );
exit 1;
}
elsif( @dates == 1 ) {
output({color=>'yellow',stderr=>1}, "WARNING: Timestamp field '$CONFIG{timestamp}' not found, using '$dates[0]' instead");
$CONFIG{timestamp} = $dates[0];
}
else {
output({color=>'red',stderr=>1},
sprintf "FATAL: Timestamp field '%s' not found and discovered multiple date fields: %s",
$CONFIG{timestamp},
join(', ', sort @dates)
);
output({color=>'yellow',indent=>1}, "Try again with '--timestamp $dates[0]' for example.");
exit 1;
}
}
# Which fields to show
my @SHOW = ();
if ( exists $OPT{show} && scalar @{ $OPT{show} } ) {
foreach my $args (@{ $OPT{show} }) {
push @SHOW, grep { defined && length } split /,/, $args;
}
$q->set_fields([$CONFIG{timestamp},@SHOW]);
}
# How to sort
my $SORT = [ { $CONFIG{timestamp} => $ORDER } ];
if( exists $OPT{sort} && length $OPT{sort} ) {
$SORT = [
map { /:/ ? +{ split /:/ } : $_ }
split /,/,
$OPT{sort}
]
}
$q->set_sort($SORT);
# Improper Usage
pod2usage({-exitval=>1, -verbose=>0, -sections=>'SYNOPSIS', -msg=>'No search string specified'})
unless keys %{ $q->query };
pod2usage({-exitval=>1, -verbose=>0, -sections=>'SYNOPSIS', -msg=>'Cannot use --tail and --top together'})
if exists $OPT{tail} && $OPT{top};
pod2usage({-exitval=>1, -verbose=>0, -sections=>'SYNOPSIS', -msg=>'Cannot use --tail and --sort together'})
if exists $OPT{tail} && $OPT{sort};
pod2usage({-exitval=>1, -verbose=>0, -sections=>'SYNOPSIS', -msg=>'Cannot use --sort along with --asc or --desc'})
if $OPT{sort} && ($OPT{asc} || $OPT{desc});
pod2usage({-exitval=>1, -verbose=>0, -sections=>'SYNOPSIS', -msg=>'Please specify --show or --jq with --tail'})
if exists $OPT{tail} && !( @SHOW || $OPT{json});
# Process extra parameters
foreach my $presence ( qw( exists missing ) ) {
if( exists $OPT{$presence} ) {
my @fields = map { split /[,:]/ } (is_arrayref($OPT{$presence}) ? @{ $OPT{$presence} } : ($OPT{$presence}));
my $context = $presence eq 'exists' ? 'must' : 'must_not';
foreach my $field (@fields) {
$q->add_bool( $context => { exists => { field => $field } } );
}
}
}
my %SUPPORTED_AGGREGATIONS = map {$_=>'simple_value'} qw(cardinality sum min max avg value_count);
my $agg_header = '';
if( exists $OPT{top} ) {
my @top = split /:/, $OPT{top};
my $top_field = pop @top;
my $top_agg = @top ? shift @top : 'terms';
my @agg_fields = grep { exists $FIELDS{$_} } split /\s*,\s*/, $top_field;
croak(sprintf("Option --top takes a field, found %d fields: %s\n", scalar(@agg_fields),join(',',@agg_fields)))
unless @agg_fields == 1;
my %agg = ();
my %sub_agg = ();
if( $OPT{by}) {
my ($type,$field) = split /\:/, $OPT{by};
if( exists $SUPPORTED_AGGREGATIONS{$type} ) {
$sub_agg{by} = { $type => {field => $field} };
}
else {
output({color=>'red'}, "Aggregation '$type' is not currently supported, ignoring.");
}
}
if( $OPT{with} ) {
my @with = is_arrayref($OPT{with}) ? @{ $OPT{with} } : ( $OPT{with} );
foreach my $with ( @with ) {
my @attrs = split /:/, $with, 3;
# Process Args from Right to Left
my $arg = @attrs == 3 ? pop @attrs
: $attrs[-1] =~ /^\d/ ? pop @attrs
: '';
my $pcts = $arg =~ /^\d{1,2}(?:\.\d+)?(?:,\d{1,2}(?:\.\d+)?)*$/ ? $arg : '25,50,75,90,95,99';
my $size = $arg =~ /^\d+$/ ? $arg : 3;
my $hi = $arg || 0.1;
my $field = exists $FIELDS{$attrs[-1]} ? pop @attrs : undef;
my $type = @attrs ? pop @attrs : 'terms';
# Skip invalid elements
next unless defined $field and defined $size and $size > 0;
my %params = ();
my $id = "$type-$field";
# If a term agg and we haven't used this field name, simplify it
if( $type =~ /terms$/ && !$sub_agg{$field} ) {
$id = $field;
$params{size} = $size;
$params{missing} = 'MISSING' if $OPT{'with-missing'};
}
if( $type =~ /histogram|stats|percentiles/ && !$OPT{'no-implications'} ) {
output({color=>'magenta',sticky=>1}, "* Using a statistical aggregation implies an exists filter on $field, use --no-implications to disable this");
$q->add_bool( must => { exists => { field => $field } } );
}
$sub_agg{$id} = {
$type => {
field => $field,
$type eq 'percentiles' ? ( percents => [split /,/, $pcts] ) : (),
$type eq 'histogram' ? ( interval => $hi ) : (),
%params,
}
};
}
}
my %params = ();
$params{missing} = 'MISSING' if $OPT{'with-missing'} and $top_agg eq 'terms';
my $field = shift @agg_fields;
$agg_header = "count\tpct\t" . $field;
$agg{$top_agg} = { field => $field, %params };
if( $OPT{'bg-filter'} && $top_agg eq 'significant_terms' ) {
my $bgf = App::ElasticSearch::Utilities::QueryString->new();
my $bgq = $bgf->expand_query_string($OPT{'bg-filter'});
$agg{$top_agg}->{background_filter} = $bgq->query;
}
if( exists $sub_agg{by} ) {
$agg_header = "$OPT{by}\t" . $agg_header;
$agg{$top_agg}->{order} = [ { by => $ORDER }, { "_count" => "desc" } ];
}
$agg{aggregations} = \%sub_agg if keys %sub_agg;
if( exists $OPT{all} ) {
verbose({color=>'cyan'}, "# Aggregations with --all are limited to returning 1,000,000 results.");
$agg{$top_agg}->{size} = 1_000_000;
}
else {
$agg{$top_agg}->{size} = $CONFIG{size};
}
$q->add_aggregations( top => \%agg );
$q->add_aggregations( out_of => { cardinality => { field => $field } } );
if( $OPT{interval} ) {
$q->wrap_aggregations( step => {
date_histogram => {
field => $CONFIG{timestamp},
interval => $OPT{interval},
}
});
}
}
elsif(exists $OPT{tail}) {
$q->set_size($CONFIG{'max-batch-size'});
@AGES = ($AGES[-1]);
}
elsif( $OPT{all} ) {
$q->set_size( $CONFIG{'max-batch-size'} );
}
else {
$q->set_size( $CONFIG{size} < $CONFIG{'max-batch-size'} ? $CONFIG{size} : $CONFIG{'max-batch-size'} );
}
my %displayed_indices = ();
my $TOTAL_HITS = 0;
my $OUT_OF = 0;
my $last_hit_ts = undef;
my $duration = 0;
my $displayed = 0;
my $header = 0;
my $age = undef;
my %last_batch_id = ();
my %AGGS_TOTALS = ();
my %AGES_SEEN = ();
# Handle CTRL+C During the Loop
my $DONE = 0;
local $SIG{INT} = sub { $DONE=1 };
verbose({color=>'green',level=>1}, "= Query setup complete, beginning request.");
AGES: while( !$DONE && @AGES ) {
# With --tail, we don't want to deplete @AGES
$age = $OPT{tail} ? $AGES[0] : shift @AGES;
# Pause for 200ms if we're tailing
sleep(0.2) if exists $OPT{tail} && $last_hit_ts;
my $start=time();
$last_hit_ts ||= strftime('%Y-%m-%dT%H:%M:%S%z',localtime($start-30));
# If we're tailing, bump the @query with a timestamp range
$q->stash( filter => {range => { $CONFIG{timestamp} => {gte => $last_hit_ts}}} ) if $OPT{tail};
# Header
if( !exists $AGES_SEEN{$age} ) {
output({color=>'yellow'}, "= Querying Indexes: " . join(',', @{ $by_age{$age} })) unless $OPT{'no-decorators'};
$AGES_SEEN{$age}=1;
$header=0;
}
debug("== Request Parameters");
debug_var($q->uri_params);
debug("== Query");
debug(to_json $q->request_body,{allow_nonref=>1,canonical=>1,pretty=>1});
# Execute the query
my $result = $q->execute( $by_age{$age} );
debug({clear=>1},"== Results");
debug_var($result);
$duration += time() - $start;
# Advance if we don't have a result
next unless defined $result;
if ( $result->{error} ) {
my $simple_error;
eval {
$simple_error = $result->{error}{caused_by}{caused_by}{reason};
} or do {
($simple_error) = $result->{error} =~ m/(QueryParsingException\[\[[^\]]+\][^\]]+\]\]);/;
};
$simple_error ||= '';
output({stderr=>1,color=>'red'},
"# Received an error from the cluster. $simple_error"
);
last;
}
$displayed_indices{$_} = 1 for @{ $by_age{$age} };
$TOTAL_HITS += $result->{hits}{total} if $result->{hits}{total};
my @always = ();
push @always, $CONFIG{timestamp} unless $OPT{'no-decorators'};
if(!$header && @SHOW) {
output({color=>'cyan'}, join("\t", @always, @SHOW));
$header++;
}
while( $result && !$DONE ) {
my $hits = is_arrayref($result->{hits}{hits}) ? $result->{hits}{hits} : [];
# Handle Aggregations
if( my $aggregations = $result->{aggregations} ) {
display_aggregations($aggregations, $result->{hits}{total});
next AGES;
}
# Reset the last batch ID if we have new data
%last_batch_id = () if @{$hits} > 0 && $last_hit_ts ne $hits->[-1]->{_source}{$CONFIG{timestamp}};
debug({color=>'magenta'}, "+ ID cache is now empty.") unless keys %last_batch_id;
foreach my $hit (@{ $hits }) {
# Skip if we've seen this record
next if exists $last_batch_id{$hit->{_id}};
$last_hit_ts = $hit->{_source}{$CONFIG{timestamp}};
$last_batch_id{$hit->{_id}}=1;
my $record = {};
# Add the _id field to the source so that it is listed
# when showing full records and can be and can be
# used in @SHOW
$hit->{_source}->{_id} = $hit->{_id} unless defined($hit->{_source}->{_id});
if( @SHOW ) {
my $flat = es_flatten_hash( $hit->{_source} );
debug_var($flat);
foreach my $f (@always) {
$record->{$f} = $flat->{$f};
}
foreach my $f (@SHOW) {
my $value = undef;
if( exists $flat->{$f} ) {
$value = $flat->{$f};
}
elsif( my $v = document_lookdown($hit->{_source},$f) ) {
$value = $v;
}
elsif(index($f, '.') > 0) {
# Try path matching the key
my @values = ();
foreach my $k (keys %{ $flat }) {
if( index($k,$f) == 0 ) {
push @values, $flat->{$k};
}
elsif( $k =~ /\.\d+\./ ) {
my $flatter = join '.', grep { !/^\d+$/ } split /\./, $k;
if ( $flatter eq $f ) {
push @values, $flat->{$k};
}
}
}
$value = @values ? @values == 1 ? $values[0] : \@values : undef;
}
$record->{$f} = $value;
}
}
else {
$record = $hit->{_source};
}
# Determine how this record is output
my $output = undef;
if( @SHOW ) {
my @cols=();
foreach my $f (@always,@SHOW) {
my $v = '-';
if( exists $record->{$f} && defined $record->{$f} ) {
$v = is_arrayref($record->{$f}) && @{ $record->{$f} } == 1 ? $record->{$f}[0]
: is_ref($record->{$f}) ? to_json($record->{$f},{allow_nonref=>1,canonical=>1})
: $record->{$f};
}
push @cols,$v;
}
$output = join("\t",@cols);
}
else {
$output = $CONFIG{format} =~ /^json/? to_json($record,{allow_nonref=>1,canonical=>1,pretty=>$CONFIG{pretty}})
: Dump $record;
}
output({data=>1}, $output);
$displayed++;
last if all_records_displayed();
}
last if all_records_displayed();
# Scroll forward
$start = time;
$result = $q->scroll_results();
$duration += time - $start;
# Check if we need to keep going
last unless defined $result;
last unless $result->{hits} && $result->{hits}{hits} && @{ $result->{hits}{hits} } > 0
}
last if all_records_displayed();
}
output({stderr=>1,color=>'yellow'},
"# Search Parameters:",
(map { "# $_" } split /\r?\n/, to_json($q->query,{allow_nonref=>1,canonical=>1,pretty=>$CONFIG{pretty}})),
sprintf("# Displaying %d of %d results%s took %0.2f seconds.",
$displayed,
$OUT_OF || $TOTAL_HITS,
$OUT_OF ? " in $TOTAL_HITS documents" : '',
$duration,
),
sprintf("# Indexes (%d of %d) searched: %s\n",
scalar(keys %displayed_indices),
scalar(keys %indices),
join(',', sort keys %displayed_indices)
),
) unless $OPT{'no-decorators'};
if($CONFIG{summary} && keys %AGGS_TOTALS) {
unless ( $OPT{'no-decorators'} ) {
output({color=>'yellow'}, '#', '# Totals across batch', '#');
output({color=>'cyan'},$agg_header);
}
foreach my $k (sort { $AGGS_TOTALS{$b} <=> $AGGS_TOTALS{$a} } keys %AGGS_TOTALS) {
output({data=>1,color=>'green'}, join "\t",
$AGGS_TOTALS{$k}, sprintf($CONFIG{decimal_format}, $AGGS_TOTALS{$k} / $TOTAL_HITS), $k
);
}
}
sub all_records_displayed {
return 1 if $DONE;
return 0 if exists $OPT{tail};
return 0 if exists $OPT{all};
return 1 if $displayed >= $CONFIG{size};
return 0;
}
sub document_lookdown {
my ($href,$field) = @_;
return $href->{$field} if exists $href->{$field};
foreach my $k (keys %{ $href }) {
if( is_hashref($href->{$k}) ) {
return document_lookdown($href->{$k},$field);
}
}
return;
}
sub show_fields {
output({color=>'cyan'}, 'Fields available for search:' );
my $total = 0;
my %types = ();
foreach my $field (sort keys %FIELDS) {
$total++;
my $type = $FIELDS{$field}->{type};
$types{$type} ||= 0;
$types{$type}++;
my $color = $type eq 'ip' ? 'magenta'
: $type eq 'text' ? 'red'
: $type =~ /float|integer|short|byte|double/ ? 'cyan'
: $type =~ /^geo/ ? 'green'
: $type =~ /^date/ ? 'yellow'
: 'white';
output({indent=>1,kv=>1,color=>$color}, $field => $type);
output({indent=>2}, sprintf "nested: %s - %s",
@{ $FIELDS{$field} }{qw(nested_path nested_key)}
) if exists $FIELDS{$field}->{nested_path};
}
output({color=>"yellow"},
sprintf("# Fields: %d from a combined %d indices.\n",
$total,
scalar(keys %indices),
)
);
# Type Meta Roll Up
output({indent=>1}, join(', ',
map { "$types{$_} $_ fields" }
sort { $types{$b} <=> $types{$a} }
keys %types
)
);
}
sub show_bases {
output({color=>'cyan'}, 'Bases available for search:' );
my @all = es_indices(_all => 1);
my %bases = ();
foreach my $index (@all) {
next if $index =~ /^\./;
my $days_old = es_index_days_old( $index ) || 0;
next unless defined $days_old;
$days_old = 0 if $days_old < 0;
foreach my $base (es_index_bases($index)) {
if( exists $bases{$base} ) {
$bases{$base}->{oldest} = $days_old if $days_old > $bases{$base}->{oldest};
$bases{$base}->{youngest} = $days_old if $days_old < $bases{$base}->{youngest};
}
else {
$bases{$base} = { oldest => $days_old, youngest => $days_old };
}
}
}
foreach my $base (sort keys %bases) {
output({indent=>1,color=>'green'},$base);
verbose({indent=>2,kv=>1},
map {
$_ => sprintf "%d days old", $bases{$base}->{$_}
} qw( youngest oldest )
);
}
output({color=>"yellow"},
sprintf("# Bases: %d from a combined %d indices.\n",
scalar(keys %bases),
scalar(@all),
)
);
}
sub display_aggregations {
my ($aggregations,$total_docs) = (@_);
my $out_of = $aggregations->{out_of}{value} || 0;
$OUT_OF = $out_of if $out_of > $OUT_OF;
my $steps = exists $aggregations->{step} ? $aggregations->{step}{buckets}
: [ $aggregations ];
my $indent = exists $aggregations->{step} ? 1 : 0;
foreach my $step ( @$steps ) {
my $aggs = exists $step->{top} ? $step->{top}{buckets} : [];
if( exists $step->{key_as_string} ) {
output({color=>'cyan',clear=>1}, sprintf "%d\t%s", @{$step}{qw(doc_count key_as_string)});
}
if( @$aggs ) {
# For top the N of T needs to represent maximums
$displayed = scalar(@$aggs) if scalar(@$aggs) > $displayed;
output({color=>'cyan',indent=>$indent},$agg_header) unless $OPT{'no-decorators'};
foreach my $agg ( @$aggs ) {
$AGGS_TOTALS{$agg->{key}} ||= 0;
$AGGS_TOTALS{$agg->{key}} += $agg->{doc_count};
my @out = ();
my $top_docs;
foreach my $k (qw(score doc_count bg_count key)) {
next unless exists $agg->{$k};
my $value = delete $agg->{$k};
push @out, defined $value ? ($k eq 'score' ? sprintf $CONFIG{decimal_format}, $value : $value ) : '-';
if( $k eq 'doc_count' ) {
push @out, sprintf $CONFIG{decimal_format}, $value ? $value / $total_docs : 0;
$top_docs = $value;
}
}
if(exists $agg->{by} ) {
my $by = delete $agg->{by};
if( exists $by->{value_as_string} ) {
unshift @out, $by->{value_as_string};
}
elsif( exists $by->{value} ) {
my $v = $by->{value} =~ /^\d+\.\d+$/ ? sprintf($CONFIG{decimal_format}, $by->{value}) : $by->{value};
unshift @out, $v;
}
}
# Handle the --with elements
my %subaggs = ();
if( keys %{ $agg } ) {
foreach my $k (sort keys %{ $agg }) {
next unless is_hashref($agg->{$k});
if( exists $agg->{$k}{buckets} ) {
my @sub;
foreach my $subagg (@{ $agg->{$k}{buckets} }) {
my @elms = ();
next unless exists $subagg->{key};
push @elms, $subagg->{key};
foreach my $dk (qw(score doc_count bg_count)) {
next unless exists $subagg->{$dk};
my $v = delete $subagg->{$dk};
push @elms, defined $v ? ($dk eq 'score' ? sprintf $CONFIG{decimal_format}, $v : $v ) : '-';
push @elms, sprintf $CONFIG{decimal_format}, $v / $top_docs
if $dk eq 'doc_count' and $top_docs;
}
push @sub, \@elms;
}
$subaggs{$k} = \@sub if @sub;
}
# Simple Numeric Aggs
elsif( exists $agg->{$k}{value_as_string} ) {
$subaggs{$k} = [ [ $agg->{$k}{value_as_string} ] ];
}
elsif( exists $agg->{$k}{value} ) {
my $v = $agg->{$k}{value} =~ /^\d+\.\d+$/ ? sprintf $CONFIG{decimal_format}, $agg->{$k}{value}
: $agg->{$k}{value};
$subaggs{$k} = [ [ $v ] ];
}
# Percentiles
elsif( exists $agg->{$k}{values} ) {
my @pcts;
foreach my $pctl (sort { $a <=> $b } keys %{ $agg->{$k}{values} }) {
push @pcts, "p$pctl", sprintf $CONFIG{decimal_format}, $agg->{$k}{values}{$pctl};
}
$subaggs{$k} = [ \@pcts ];
}
else {
# Statistics
my @stats;
my %alias = qw( variance var std_deviation stdev );
foreach my $stat (qw(count min avg max sum variance std_deviation)) {
next unless exists $agg->{$k}{$stat};
my $v = $agg->{$k}{$stat} =~ /\./ ? sprintf $CONFIG{decimal_format}, $agg->{$k}{$stat}
: $agg->{$k}{$stat};
push @stats, $alias{$stat} || $stat => $v;
}
$subaggs{$k} = [ \@stats ] if @stats;
}
}
}
if( keys %subaggs ) {
foreach my $subagg (sort keys %subaggs) {
foreach my $extra ( @{ $subaggs{$subagg} } ) {
output({indent=>$indent,data=>1},
join "\t", @out, $subagg, @{ $extra }
);
}
}
}
else {
# Simple output
output({indent=>$indent,data=>!$CONFIG{summary}}, join("\t",@out));
}
}
}
elsif(exists $aggregations->{top}) {
output({indent=>1,color=>'red'}, "= No results.");
}
}
}
sub by_index_age {
return $ORDER eq 'asc'
? $indices{$b} <=> $indices{$a}
: $indices{$a} <=> $indices{$b};
}
__END__
=pod
=head1 NAME
es-search.pl - Provides a CLI for quick searches of data in ElasticSearch daily indexes
=head1 VERSION
version 8.8
=head1 SYNOPSIS
es-search.pl [search string]
Options:
--help print help
--manual print full manual
--filter Force filter context for all query elements
--show Comma separated list of fields to display, default is ALL, switches to tab output
--tail Continue the query until CTRL+C is sent
--top Perform an aggregation on the fields, by a comma separated list of up to 2 items
--by Perform an aggregation using the result of this, example: --by cardinality:src_ip
--with Perform a sub aggregation on the query
--bg-filter Only used if --top aggregation is significant_terms, applies a background filter
--match-all Enables the ElasticSearch match_all operator
--interval When running aggregations, wrap the aggreation in a date_histogram with this interval
--prefix Takes "field:string" and enables the Lucene prefix query for that field
--exists Field which must be present in the document
--missing Field which must not be present in the document
--size Result size, default is 20, aliased to -n and --limit
--max-batch-size When making requests to ES, retrieve this many docs in a single request, defaults to 50
--all Don't consider result size, just give me *everything*
--asc Sort by ascending timestamp
--desc Sort by descending timestamp (Default)
--sort List of fields for custom sorting
--format When --show isn't used, use this method for outputting the record, supported: json, jsonpretty, yaml
json assumes --no-decorator as we assume you're piping through jq
--pretty Where possible, use JSON->pretty
--precision For floating point values, use this many digits of precision, defaults to 3
--no-decorators Do not show the header with field names in the query results
--no-header Same as above
--no-implications Don't attempt to imply filters from statistical aggregations
--fields Display the field list for this index!
--bases Display the index base list for this cluster.
--timestamp Field to use as the date object, default: @timestamp
From App::ElasticSearch::Utilities:
--local Use localhost as the elasticsearch host
--host ElasticSearch host to connect to
--port HTTP port for your cluster
--proto Defaults to 'http', can also be 'https'
--http-username HTTP Basic Auth username
--password-exec Script to run to get the users password
--insecure Don't verify TLS certificates
--cacert Specify the TLS CA file
--capath Specify the directory with TLS CAs
--cert Specify the path to the client certificate
--key Specify the path to the client private key file
--noop Any operations other than GET are disabled, can be negated with --no-noop
--timeout Timeout to ElasticSearch, default 10
--keep-proxy Do not remove any proxy settings from %ENV
--index Index to run commands against
--base For daily indexes, reference only those starting with "logstash"
(same as --pattern logstash-* or logstash-DATE)
--pattern Use a pattern to operate on the indexes
--days If using a pattern or base, how many days back to go, default: 1
See also the "CONNECTION ARGUMENTS" and "INDEX SELECTION ARGUMENTS" sections from App::ElasticSearch::Utilities.
From CLI::Helpers:
--data-file Path to a file to write lines tagged with 'data => 1'
--tags A comma separated list of tags to display
--color Boolean, enable/disable color, default use git settings
--verbose Incremental, increase verbosity (Alias is -v)
--debug Show developer output
--debug-class Show debug messages originating from a specific package, default: main
--quiet Show no output (for cron)
--syslog Generate messages to syslog as well
--syslog-facility Default "local0"
--syslog-tag The program name, default is the script name
--syslog-debug Enable debug messages to syslog if in use, default false
--nopaste Use App::Nopaste to paste output to configured paste service
--nopaste-public Defaults to false, specify to use public paste services
--nopaste-service Comma-separated App::Nopaste service, defaults to Shadowcat
=head1 DESCRIPTION
This tool takes a search string parameter to search the cluster. It is in the format of the Lucene
L<query string|http://lucene.apache.org/core/2_9_4/queryparsersyntax.html>
Examples might include:
# Search for past 10 days vhost admin.example.com and client IP 1.2.3.4
es-search.pl --days=10 --size=100 dst:"admin.example.com" AND src_ip:"1.2.3.4"
# Search for all apache logs past with status 500
es-search.pl program:"apache" AND crit:500
# Search for all apache logs with status 500 show only file and out_bytes
es-search.pl program:"apache" AND crit:500 --show file,out_bytes
# Search for ip subnet client IP 1.2.3.0 to 1.2.3.255 or 1.2.0.0 to 1.2.255.255
es-search.pl --size=100 dst:"admin.example.com" AND src_ip:"1.2.3.0/24"
es-search.pl --size=100 dst:"admin.example.com" AND src_ip:"1.2.0/16"
# Show the top src_ip for 'www.example.com'
es-search.pl --base access dst:www.example.com --top src_ip
# Tail the access log for www.example.com 404's
es-search.pl --base access --tail --show src_ip,file,referer_domain dst:www.example.com AND crit:404
=head1 NAME
es-search.pl - Search a logging cluster for information
=head1 OPTIONS
=over 8
=item B<help>
Print this message and exit
=item B<manual>
Print detailed help with examples
=item B<filter>
Forces filter context for all query parameters, the default is using query context.
=item B<show>
Comma separated list of fields to display in the dump of the data
--show src_ip,crit,file,out_bytes
=item B<sort>
Use this option to sort your documents on fields other than the timestamp. Fields are given as a comma separated list:
--sort field1,field2
To specify per-field sort direction use:
--sort field1:asc,field2:desc
Using this option together with C<--asc>, C<--desc> or C<--tail> is not possible.
=item B<format>
Output format to use when the full record is dumped. The default is 'yaml', but 'json' is also supported.
--format json
=item B<precision>
For output involving floating point numbers, use this many places to the right of the decimal point. The default is 3.
=item B<tail>
Repeats the query every second until CTRL+C is hit, displaying new results. Due to the implementation,
this mode enforces that only the most recent indices are searched. Also, given the output is continuous, you must
specify --show with this option.
=item B<top>
Perform an aggregation returning the top field. Limited to a single field at this time.
This option is not available when using --tail.
--top src_ip
You can override the default of the C<terms> bucket aggregation by prefixing
the parameter with the required bucket aggregation, i.e.:
--top significant_terms:src_ip
=item B<by>
Perform a sub aggregation on the top terms aggregation and order by the result of this aggregation.
Aggregation syntax is as follows:
--by <type>:<field>
A full example might look like this:
$ es-search.pl --base access dst:www.example.com --top src_ip --by cardinality:acct
This will show the top source IP's ordered by the cardinality (count of the distinct values) of accounts logging
in as each source IP, instead of the source IP with the most records.
Supported sub agggregations and formats:
cardinality:<field>
min:<field>
max:<field>
avg:<field>
sum:<field>
=item B<with>
Perform a subaggregation on the top terms and report that sub aggregation details in the output. The format is:
--with <aggregation>:<field>:<size>
The default B<size> is 3.
The default B<aggregation> is 'terms'.
B<field> is the only required element.
e.g.
$ es-search.pl --base logstash error --top program --size 2 --by cardinality:host --with host:5
This will show the top 2 programs with log messages containing the word error by the cardinality (count
distinct host) of hosts showing the top 5 hosts
Without the --with, the results might look like this:
112314 0.151 sshd
21224 0.151 ntp
The B<--with> option would expand that output to look like this:
112314 0.151 host bastion-804 12431 0.111 sshd
112314 0.151 host bastion-803 10009 0.089 sshd
112314 0.151 host bastion-805 9768 0.087 sshd
112314 0.151 host bastion-801 8789 0.078 sshd
112314 0.151 host bastion-802 4121 0.037 sshd
21224 0.016 host webapp-324 21223 0.999 ntp
21224 0.016 host mail-42 1 0.000 ntp
This may be specified multiple times, the result is more I<rows>, not more I<columns>, e.g.
$ es-search.pl --base logstash error --top program --size 2 --by cardinality:host --with host:5 --with dc:2
Produces:
112314 0.151 dc arlington 112314 1.000 sshd
112314 0.151 host bastion-804 12431 0.111 sshd
112314 0.151 host bastion-803 10009 0.089 sshd
112314 0.151 host bastion-805 9768 0.087 sshd
112314 0.151 host bastion-801 8789 0.078 sshd
112314 0.151 host bastion-802 4121 0.037 sshd
21224 0.016 dc amsterdam 21223 0.999 ntp
21224 0.016 dc la 1 0.000 ntp
21224 0.016 host webapp-324 21223 0.999 ntp
21224 0.016 host mail-42 1 0.000 ntp
You may sub aggregate using any L<bucket agggregation|https://www.elastic.co/guide/en/elasticsearch/reference/master/search-aggregations-bucket.html>
as long as the aggregation provides a B<key> element. Additionally, doc_count, score, and bg_count will be reported in the output.
Other examples:
--with significant_terms:crime
--with cardinality:accts
--with min:out_bytes
--with max:out_bytes
--with avg:out_bytes
--with sum:out_bytes
--with stats:out_bytes
--with extended_stats:out_bytes
--with percentiles:out_bytes
--with percentiles:out_bytes:50,95,99
--with histogram:out_bytes:1024
=item B<with-missing>
For terms aggregations, adds a C<MISSING> bucket.
=item B<bg-filter>
Only used if the C<--top> aggregation is C<significant_terms>. Sets the
background filter for the C<significant_terms> aggregation.
es-search.pl --top significant_terms:src_ip method:POST file:\/get\/sensitive_data --bg-filter method:POST
=item B<interval>
When performing aggregations, wrap those aggregations in a date_histogram of this interval. This
helps flush out "what changed in the last hour."
=item B<match-all>
Apply the ElasticSearch "match_all" search operator to query on all documents
in the index. This is the default with no search parameters.
=item B<prefix>
Takes a "field:string" combination and you can use multiple --prefix options will be "AND"'d
Example:
--prefix useragent:'Go '
Will search for documents where the useragent field matches a prefix search on the string 'Go '
JSON Equivalent is:
{ "prefix": { "useragent": "Go " } }
=item B<exists>
Filter results to those containing a valid, not null field
--exists referer
Only show records with a referer field in the document.
=item B<missing>
Filter results to those not containing a valid, not null field
--missing referer
Only show records without a referer field in the document.
=item B<bases>
Display a list of bases that can be used with the --base option.
Use with --verbose to show age information on the indexes in each base.
=item B<fields>
Display a list of searchable fields
=item B<index>
Search only this index for data, may also be a comma separated list
=item B<days>
The number of days back to search, the default is 5
=item B<base>
Index base name, will be expanded using the days back parameter. The default
is 'logstash' which will expand to 'logstash-YYYY.MM.DD'
=item B<timestamp>
The field in your documents that we'll treat as a "date" type in our queries.
May also be specified in the C<~/.es-utils.yaml> file per index, or index base:
---
host: es-readonly-01
port: 9200
meta:
bro:
timestamp: 'record_ts'
mayans-2012.12.21:
timestamp: 'end_of_the_world'
Then running:
# timestamp is set to '@timestamp', the default
es-search.pl --base logstash --match-all
# timestamp is set to 'record_ts', from ~/.es-utils.yaml
es-search.pl --base bro --match-all
# timestamp is set to '@timestamp', the default
es-search.pl --base mayans --match-all
# timestamp is set to 'end_of_the_world', from ~/.es-utils.yaml
es-search.pl --index mayans-2012.12.21 --match-all
=item B<size>
The number of results to show, default is 20.
=item B<max-batch-size>
When building result sets, this tool uses scroll searches. This parameter
controls how many docs are in each scroll. It defaults to 50, but will be
scaled down lower if C<size> is smaller.
=item B<all>
If specified, ignore the --size parameter and show me everything within the date range I specified.
In the case of --top, this limits the result set to 1,000,000 results.
=back
=head1 Extended Syntax
The search string is pre-analyzed before being sent to ElasticSearch. The following plugins
work to manipulate the query string and provide richer, more complete syntax for CLI applications.
=head2 App::ElasticSearch::Utilities::QueryString::Barewords
The following barewords are transformed:
or => OR
and => AND
not => NOT
=head2 App::ElasticSearch::Utilities::QueryString::Text
Provides field prefixes to manipulate the text search capabilities.
=head3 Terms Query via '='
Provide an '=' prefix to a query string parameter to promote that parameter to a C<term> filter.
This allows for exact matches of a field without worrying about escaping Lucene special character filters.
E.g.:
user_agent:"Mozilla/5.0 (iPhone; CPU iPhone OS 12_1_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.0 Mobile/15E148 Safari/604.1"
Is evaluated into a weird query that doesn't do what you want. However:
=user_agent:"Mozilla/5.0 (iPhone; CPU iPhone OS 12_1_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.0 Mobile/15E148 Safari/604.1"
Is translated into:
{ term => { user_agent => "Mozilla/5.0 (iPhone; CPU iPhone OS 12_1_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.0 Mobile/15E148 Safari/604.1" } }
=head3 Wildcard Query via '*'
Provide an '*' prefix to a query string parameter to promote that parameter to a C<wildcard> filter.
This uses the wild card match for text fields to making matching more intuitive.
E.g.:
*user_agent:"Mozilla*"
Is translated into:
{ wildcard => { user_agent => "Mozilla* } }
=head3 Regexp Query via '/'
Provide an '/' prefix to a query string parameter to promote that parameter to a C<regexp> filter.
If you want to use regexp matching for finding data, you can use:
/message:'\\bden(ial|ied|y)'
Is translated into:
{ regexp => { message => "\\bden(ial|ied|y)" } }
=head3 Fuzzy Matching via '~'
Provide an '~' prefix to a query string parameter to promote that parameter to a C<fuzzy> filter.
~message:deny
Is translated into:
{ fuzzy => { message => "deny" } }
=head3 Phrase Matching via '+'
Provide an '+' prefix to a query string parameter to promote that parameter to a C<match_phrase> filter.
+message:"login denied"
Is translated into:
{ match_phrase => { message => "login denied" } }
=head3 Automatic Match Queries for Text Fields
If the field meta data is provided and the field is a C<text> type, the query
will automatically be mapped to a C<match> query.
# message field is text
message:"foo"
Is translated into:
{ match => { message => "foo" } }
=head2 App::ElasticSearch::Utilities::QueryString::IP
If a field is an IP address uses CIDR Notation, it's expanded to a range query.
src_ip:10.0/8 => src_ip:[10.0.0.0 TO 10.255.255.255]
=head2 App::ElasticSearch::Utilities::QueryString::Ranges
This plugin translates some special comparison operators so you don't need to
remember them anymore.
Example:
price:<100
Will translate into a:
{ range: { price: { lt: 100 } } }
And:
price:>50,<100
Will translate to:
{ range: { price: { gt: 50, lt: 100 } } }
=head3 Supported Operators
B<gt> via E<gt>, B<gte> via E<gt>=, B<lt> via E<lt>, B<lte> via E<lt>=
=head2 App::ElasticSearch::Utilities::QueryString::Underscored
This plugin translates some special underscore surrounded tokens into
the Elasticsearch Query DSL.
Implemented:
=head3 _prefix_
Example query string:
_prefix_:useragent:'Go '
Translates into:
{ prefix => { useragent => 'Go ' } }
=head2 App::ElasticSearch::Utilities::QueryString::FileExpansion
If the match ends in .dat, .txt, .csv, or .json then we attempt to read a file with that name and OR the condition:
$ cat test.dat
50 1.2.3.4
40 1.2.3.5
30 1.2.3.6
20 1.2.3.7
Or
$ cat test.csv
50,1.2.3.4
40,1.2.3.5
30,1.2.3.6
20,1.2.3.7
Or
$ cat test.txt
1.2.3.4
1.2.3.5
1.2.3.6
1.2.3.7
Or
$ cat test.json
{ "ip": "1.2.3.4" }
{ "ip": "1.2.3.5" }
{ "ip": "1.2.3.6" }
{ "ip": "1.2.3.7" }
We can source that file:
src_ip:test.dat => src_ip:(1.2.3.4 1.2.3.5 1.2.3.6 1.2.3.7)
src_ip:test.json[ip] => src_ip:(1.2.3.4 1.2.3.5 1.2.3.6 1.2.3.7)
This make it simple to use the --data-file output options and build queries
based off previous queries. For .txt and .dat file, the delimiter for columns
in the file must be either a tab or a null. For files ending in
.csv, Text::CSV_XS is used to accurate parsing of the file format. Files
ending in .json are considered to be newline-delimited JSON.
You can also specify the column of the data file to use, the default being the last column or (-1). Columns are
B<zero-based> indexing. This means the first column is index 0, second is 1, .. The previous example can be rewritten
as:
src_ip:test.dat[1]
or:
src_ip:test.dat[-1]
For newline delimited JSON files, you need to specify the key path you want to extract from the file. If we have a
JSON source file with:
{ "first": { "second": { "third": [ "bob", "alice" ] } } }
{ "first": { "second": { "third": "ginger" } } }
{ "first": { "second": { "nope": "fred" } } }
We could search using:
actor:test.json[first.second.third]
Which would expand to:
{ "terms": { "actor": [ "alice", "bob", "ginger" ] } }
This option will iterate through the whole file and unique the elements of the list. They will then be transformed into
an appropriate L<terms query|http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/query-dsl-terms-query.html>.
=head3 Wildcards
We can also have a group of wildcard or regexp in a file:
$ cat wildcards.dat
*@gmail.com
*@yahoo.com
To enable wildcard parsing, prefix the filename with a C<*>.
es-search.pl to_address:*wildcards.dat
Which expands the query to:
{
"bool": {
"minimum_should_match":1,
"should": [
{"wildcard":{"to_outbound":{"value":"*@gmail.com"}}},
{"wildcard":{"to_outbound":{"value":"*@yahoo.com"}}}
]
}
}
No attempt is made to verify or validate the wildcard patterns.
=head3 Regular Expressions
If you'd like to specify a file full of regexp, you can do that as well:
$ cat regexp.dat
.*google\.com$
.*yahoo\.com$
To enable regexp parsing, prefix the filename with a C<~>.
es-search.pl to_address:~regexp.dat
Which expands the query to:
{
"bool": {
"minimum_should_match":1,
"should": [
{"regexp":{"to_outbound":{"value":".*google\\.com$"}}},
{"regexp":{"to_outbound":{"value":".*yahoo\\.com$"}}}
]
}
}
No attempt is made to verify or validate the regexp expressions.
=head2 App::ElasticSearch::Utilities::QueryString::Nested
Implement the proposed nested query syntax early. Example:
nested_path:"field:match AND string"
=head1 Meta-Queries
Helpful in building queries is the --bases and --fields options which lists the index bases and fields:
es-search.pl --bases
es-search.pl --fields
es-search.pl --base access --fields
=head1 AUTHOR
Brad Lhotsky <brad@divisionbyzero.net>
=head1 COPYRIGHT AND LICENSE
This software is Copyright (c) 2024 by Brad Lhotsky.
This is free software, licensed under:
The (three-clause) BSD License
=cut