Group
Extension

App-ElasticSearch-Utilities/lib/App/ElasticSearch/Utilities/Metrics.pm

package App::ElasticSearch::Utilities::Metrics;
# ABSTRACT: Fetches performance metrics about the node


use v5.16;
use warnings;

our $VERSION = '8.8'; # VERSION

use App::ElasticSearch::Utilities qw(es_connect);
use CLI::Helpers qw(:output);
use JSON::MaybeXS;
use Ref::Util qw(is_ref is_arrayref is_hashref);
use Types::Standard qw( ArrayRef Bool HashRef InstanceOf Int Str );

use Moo;
use namespace::autoclean;


has 'connection' => (
    is      => 'ro',
    isa     => InstanceOf['App::ElasticSearch::Utilities::Connection'],
    default => sub { es_connect() },
    handles => [qw(host port request)],
);

my @_IGNORES = qw(
    _all _shards
    attributes id timestamp uptime_in_millis
);


has 'ignore' => (
    is      => 'lazy',
    isa     => ArrayRef[Str],
    default => sub {
        my ($self) = @_;

        my %roles = map { $_ => 1 } @{ $self->node_details->{roles} };
        my @ignore = qw(adaptive_selection discovery);

        # Easy roles and sections are the same
        foreach my $section ( qw(ingest ml transform) ) {
            push @ignore, $section
                unless $roles{$section};
        }

        if( ! $roles{ml} ) {
            push @ignore, qw(ml_datafeed ml_job_comms ml_utility);
        }

        # Skip some sections if we're not a data node
        if( ! grep { /^data/ } keys %roles ) {
            push @ignore, qw(force_merge indexing indices merges pressure recovery segments translog);
        }

        return \@ignore;
    },
);


has 'node_details' => (
    is => 'lazy',
    isa => HashRef,
    init_arg => undef,
);

sub _build_node_details {
    my ($self) = @_;

    if( my $res = $self->request('_nodes/_local')->content ) {
        if( my $nodes = $res->{nodes} ) {
            my ($id) = keys %{ $nodes };
            return {
                %{ $nodes->{$id} },
                id => $id,
            }
        }
    }

    # Fail our type check
    return;
}


has 'node_id' => (
    is  => 'lazy',
    isa => Str,
);

sub _build_node_id {
    my ($self) = @_;

    if( my $details = $self->node_details ) {
        return $details->{id};
    }

    warn sprintf "unable to determine node_id for %s:%d",
        $self->host, $self->port;

    # Fail our type check
    return;
}


has 'with_cluster_metrics' => (
    is      => 'lazy',
    isa     => Bool,
    builder => sub {
        my ($self) = @_;
        if( my $info = $self->node_details ) {
            return !!grep { $_ eq 'master' } @{ $info->{roles} };
        }
        return 0;
    },
);



has 'with_index_metrics' => (
    is      => 'lazy',
    isa     => Bool,
    builder => sub {
        my ($self) = @_;
        if( my $info = $self->node_details ) {
            return !!grep { /^data/ } @{ $info->{roles} };
        }
        return 0;
    },
);


sub get_metrics {
    my ($self) = @_;

    # Fetch Node Local Stats
    my @collected = $self->collect_node_metrics();

    push @collected, $self->collect_cluster_metrics()
        if $self->with_cluster_metrics;

    push @collected, $self->collect_index_metrics()
        if $self->with_index_metrics;

    # Flatten Collected and Return the Stats
    return \@collected;
}


sub collect_node_metrics {
    my ($self) = @_;

    if( my $res = $self->request('_nodes/_local/stats')->content ) {
        return $self->_stat_collector( $res->{nodes}{$self->node_id} );
    }

    # Explicit return of empty list
    return;
}


sub collect_cluster_metrics {
    my ($self) = @_;

    my @stats = ();

    if( my $res = $self->request('_cluster/health')->content ) {
        push @stats,
            { key => "cluster.nodes.total",         value => $res->{number_of_nodes},       },
            { key => "cluster.nodes.data",          value => $res->{number_of_data_nodes},  },
            { key => "cluster.shards.primary",      value => $res->{active_primary_shards}, },
            { key => "cluster.shards.active",       value => $res->{active_shards},         },
            { key => "cluster.shards.initializing", value => $res->{initializing_shards},   },
            { key => "cluster.shards.relocating",   value => $res->{relocating_shards},     },
            { key => "cluster.shards.unassigned",   value => $res->{unassigned_shards},     },
            ;
    }
    push @stats, $self->_collect_index_blocks();
    return @stats;
}

sub _collect_index_blocks {
    my ($self) = @_;

    my @req = (
        '_settings/index.blocks.*',
        {
            index => '_all',
            uri_param => {
                flat_settings => 'true',
            },
        },
    );

    if( my $res = $self->request(@req)->content ) {
        my %collected=();
        foreach my $idx ( keys %{ $res } ) {
            if( my $settings = $res->{$idx}{settings} ) {
                foreach my $block ( keys %{ $settings } ) {
                    my $value = $settings->{$block};
                    if( lc $value eq 'true') {
                        $collected{$block} ||= 0;
                        $collected{$block}++;
                    }
                }
            }
        }
        return map { { key => "cluster.$_", value => $collected{$_} } } sort keys %collected;
    }

    # Explicit return of empty list
    return;
}


sub collect_index_metrics {
    my ($self) = @_;

    my $id = $self->node_id;
    my $shardres = $self->request('_cat/shards',
        {
            uri_param => {
                local  => 'true',
                format => 'json',
                bytes  => 'b',
                h => join(',', qw( index prirep docs store id state )),
            }
        }
    )->content;

    my %results;
    foreach my $shard ( @{ $shardres } ) {
        # Skip unallocated shards
        next unless $shard->{id};

        # Skip unless this shard is allocated to this shard
        next unless $shard->{id} eq $id;

        # Skip "Special" Indexes
        next if $shard->{index} =~ /^\./;

        # Figure out the Index Basename
        my $index = $shard->{index} =~ s/[-_]\d{4}([.-])\d{2}\g{1}\d{2}(?:[-_.]\d+)?$//r;
        next unless $index;
        $index =~ s/[^a-zA-Z0-9]+/_/g;

        my $type  = $shard->{prirep} eq 'p' ? 'primary' : 'replica';

        # Initialize
        $results{$index} ||=  { map { $_ => 0 } qw( docs bytes primary replica ) };
        $results{$index}->{state} ||= {};
        $results{$index}->{state}{$shard->{state}} ||= 0;
        $results{$index}->{state}{$shard->{state}}++;

        # Add it up, Add it up
        $results{$index}->{docs}  += $shard->{docs};
        $results{$index}->{bytes} += $shard->{store};
        $results{$index}->{$type}++;
    }

    my @results;
    foreach my $idx (sort keys %results) {
        foreach my $k ( sort keys %{ $results{$idx} } ) {
            # Skip the complex
            next if ref $results{$idx}->{$k};
            push @results,
                {
                    key => sprintf("node.indices.%s.%s", $idx, $k),
                    value => $results{$idx}->{$k},
                };
        }
        my $states = $results{$idx}->{state} || {};

        foreach my $k ( sort keys %{ $states } ) {
            push @results,
                {
                    key => sprintf("node.indices.%s.state.%s", $idx, $k),
                    value => $states->{$k},
                };
        }
    }
    return @results;
}

#------------------------------------------------------------------------#
# Parse Statistics Dynamically
sub _stat_collector {
    my $self  = shift;
    my $ref   = shift;
    my @path  = @_;
    my @stats = ();

    # Base Case
    return unless is_hashref($ref);

    my %ignores = map { $_ => 1 } @{ $self->ignore }, @_IGNORES;
    foreach my $key (sort keys %{ $ref }) {
        # Skip uninteresting keys
        next if $ignores{$key};

        # Skip peak values, we'll see those in the graphs.
        next if $key =~ /^peak/;

        # Sanitize Key Name
        my $key_name = $key;
        $key_name =~ s/(?:_time)?(?:_in)?_millis/_ms/;
        $key_name =~ s/(?:size_)?in_bytes/bytes/;
        $key_name =~ s/[^a-zA-Z0-9]+/_/g;

        if( is_hashref($ref->{$key}) ) {
            # Recurse
            push @stats, $self->_stat_collector($ref->{$key},@path,$key_name);
        }
        elsif( $ref->{$key} =~ /^\d+(?:\.\d+)?$/ ) {
            # Numeric
            push @stats, {
                key   => join('.',@path,$key_name),
                value => $ref->{$key},
            };
        }
    }

    return @stats;
}

__PACKAGE__->meta->make_immutable;

__END__

=pod

=head1 NAME

App::ElasticSearch::Utilities::Metrics - Fetches performance metrics about the node

=head1 VERSION

version 8.8

=head1 SYNOPSIS

This provides a simple API to export some core metrics from the local
ElasticSearch instance.

    use App::ElasticSearch::Utilities qw(es_connect);
    use App::ElasticSearch::Utilities::Metrics;

    my $metrics_fetcher = App::ElasticSearch::Utilities::Metrics->new(
        connection           => es_connect(),
        with_cluster_metrics => 1,
        with_index_metrics   => 1,
    );

    my $metrics = $metrics_fetcher->get_metrics();

=head1 ATTRIBUTES

=head2 connection

An `App::ElasticSearch::Utilities::Connection` instance, or automatically
created via C<es_connect()>.

=head2 ignore

An array of metric names to ignore, in addition to the static list when parsing
the `_node/_local/stats` stats.  Defaults to:

    [qw(adaptive_selection discovery)]

Plus ignores sections containing C<ingest>, C<ml>, C<transform> B<UNLESS> those
roles appear in the node's roles.  Also, unless the node is tagged as a
C<data*> node, the following keys are ignored:

    [qw(force_merge indexing indices merges pressure recovery segments translog)]

=head2 node_details

The Node details provided by the C<_nodes/_local> API.

=head2 node_id

The Node ID for the connection, will be automatically discovered

=head2 with_cluster_metrics

Boolean, set to true to collect cluster metrics in addition to node metrics

=head2 with_index_metrics

Boolean, set to true to collect index level metrics in addition to node metrics

=head1 METHODS

=head2 get_metrics()

Retrieves the metrics from the local node.

=head2 collect_node_metrics()

Returns all relevant stats from the C<_nodes/_local> API

=head2 collect_cluster_metrics()

Return all relevant stats from the C<_cluster/health> API as well as a count of
`index.blocks.*` in place.

=head2 collect_index_metrics()

This method totals the shard, and segment state and size for the current node by index base name.

=head1 AUTHOR

Brad Lhotsky <brad@divisionbyzero.net>

=head1 COPYRIGHT AND LICENSE

This software is Copyright (c) 2024 by Brad Lhotsky.

This is free software, licensed under:

  The (three-clause) BSD License

=cut


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.