Group
Extension

Net-Hadoop-YARN/lib/Net/Hadoop/YARN/Roles/JMX.pm

package Net::Hadoop::YARN::Roles::JMX;
$Net::Hadoop::YARN::Roles::JMX::VERSION = '0.203';
use 5.10.0;
use strict;
use warnings;

use Carp  qw( confess );
use Clone qw( clone   );
use Constant::FromGlobal DEBUG => { int => 1, default => 0, env => 1 };
use JSON::XS ();
use Moo::Role;
use Ref::Util qw(
    is_arrayref
    is_coderef
    is_hashref
    is_ref
);
use Scalar::Util qw( blessed );
use Time::HiRes  qw( time );

has 'target_host_port' => (
    is => 'rw',
    default => sub {
        my $self = shift;
        return join q{:}, $self->target_host, $self->target_port;
    }
);

has 'target_host' => (
    is      => 'rw',
    default => sub {
        confess "target_host is not defined";
    }
);

has 'target_port' => (
    is      => 'rw',
    default => sub {
        confess "target_port is not defined";
    }
);

has stats => (
    is      => 'rw',
    default => sub {
        shift->all_available_stats;
    },
    isa => sub {
        my $thing = shift;
        if ( ref $thing ne 'ARRAY' ) {
            confess "$thing must be an ARRAY";
        }
        # TODO: verify somehow
    },
    lazy => 1,
);

has flat => (
    is      => 'rw',
    default => sub { 0 },
    lazy    => 1,
);

has decode_json_substrings => (
    is      => 'rw',
    default => sub { 0 },
    lazy    => 1,
);

sub all_available_stats {
    my $self = shift;
    my $c = $self->clone;
    $c->stats( ['all'] );
    $c->decode_json_substrings( 0 );

    my @names;

    $c->_looper(
        $c->collect,
        sub {
            my($thing, $name_or_index, $is_hash) = @_;
            if ( $is_hash && $name_or_index eq 'ObjectName' ) {
                push @names, $thing->{$name_or_index};
            }
        },
    );

    if ( ! @names ) {
        confess "Failed to collect the avaialble stat names!";
    }

    return [ @names ];
}

sub collect {
    my $self               = shift;
    my $user_defined_stats = shift;
    my $host_port          = $self->target_host_port;

    my @stats = is_arrayref $user_defined_stats
                    ? @{ $user_defined_stats }
                    : @{ $self->stats        }
                ;

    my $has_all = grep { $_ eq 'all'} @stats;

    @stats = qw( all ) if $has_all; # ignore the rest, if any

    my $uri_tmpl = $has_all ? 'http://%s/jmx' : 'http://%s/jmx?qry=%s';
    my $is_flat  = $self->flat;
    my %rv;

    STATS: foreach my $stat ( @stats ) {
        my $uri = sprintf $uri_tmpl, $host_port, ( $has_all ? () : ( $stat ) );
        my $resp;
        eval {
            my $start = time;
            $resp = $self->agent_request( $uri ) || next STATS;
            DEBUG && sprintf "[ %s REST ] Took %.2f seconds\n", ref $self, time - $start;
            1;
        } or do {
            my $eval_error = $@ || 'Zombie error';
            my $msg = "Error from $host_port: $eval_error";
            die $msg;
        };

        next if ! keys %{ $resp };

        if ( $is_flat ) {
            if ( $has_all ) {
                foreach my $bean ( @{ $resp->{beans} } ) {
                    my $name = $bean->{ObjectName} || $bean->{name};
                    $rv{ $name } = $bean;
                }
            }
            else {
                $rv{ $stat } = $resp;
            }
            next STATS;
        }

        if ( $has_all ) {
            foreach my $bean ( @{ $resp->{beans} } ) {
                my $name = $bean->{ObjectName} || $bean->{name};
                $self->_expand( $name, $bean, \%rv );
            }
        }
        else {
            $self->_expand( $stat, $resp, \%rv );
        }
    }

    if ( $self->decode_json_substrings ) {
        $self->_expand_json_substrings_in_place( \%rv );
    }

    return \%rv;
}

sub _expand {
    my $self      = shift;
    my $stat_name = shift;
    my $response  = shift;
    my $rv        = shift;

    my @names = split m{ [.,=:] }xms, $stat_name;

    if ( @names > 1 ) {
        my $slot = $rv->{ shift @names } ||= {};

        while ( my $name = shift @names ) {
            if ( @names ) {
                $slot = $slot->{ $name } ||= {};
            }
            else {
                $slot->{$name} = $response;
            }
        }
    }
    else {
        $rv->{ $stat_name } = $response;
    }

    return;
}

sub _expand_json_substrings_in_place {
    my $self = shift;
    my $rv   = shift || die "No data set specified!";

    my $re_json = qr{ \A \{\" }xms;
    my $callback = sub {
        my($thing, $name_or_index, $is_hash) = @_;
        if ( $is_hash ) {
            my $item = $thing->{ $name_or_index };
            if ( $item =~ $re_json ) {
                $thing->{ $name_or_index } = JSON::XS::decode_json( $item ); # loop?
            }
        }
        else {
            my $item = $thing->[ $name_or_index ];
            # Array: We don't seem to have such a thing, but you can never know
            if ( $item =~ $re_json ) {
                $thing->[ $name_or_index ] = JSON::XS::decode_json( $item ); # loop?
            }
        }
        return;
    };

    $self->_looper( $rv, $callback );

    return;
}

sub _looper {
    my $self = shift;
    my $rv   = shift || die 'No data was specified!';
    my $callback = shift;

    if ( ! is_coderef $callback ) {
        die "callback needs to be a CODE ref";
    }

    my $looper;
    $looper = sub {
        my $thing = shift;
        my $cb    = shift;

        if ( is_hashref $thing ) {
            LOOPH: foreach my $name ( keys %{ $thing } ) {
                my $item = $thing->{ $name } || next;
                if ( is_ref $item ) {
                    $looper->( $item );
                    next LOOPH;
                }
                $callback->( $thing, $name, 1 );
            }
        }
        elsif ( is_arrayref $thing ) {
            LOOPA: foreach my $i ( 0.. $#{ $thing } ) {
                my $item = $thing->[ $i ];
                if ( ref $item ) {
                    $looper->( $item );
                    next LOOPA;
                }
                $callback->( $thing, $i );
            }
        }
        elsif ( is_ref $thing ) {
            if ( blessed $thing && $thing->isa('JSON::PP::Boolean') ) {
                return;
            }
            die "Unknown key: $thing";
        }
        else {
            die "$thing is not a reference!";
        }
    };

    $looper->( $rv, $callback );
}

1;

__END__

=pod

=encoding UTF-8

=head1 NAME

Net::Hadoop::YARN::Roles::JMX

=head1 VERSION

version 0.203

=head1 SYNOPSIS

    -

=head1 DESCRIPTION

JMX helpers for YARN endpoints.

=head1 ACCESSORS

=head3 target_host

=head3 target_host_port

=head3 target_port

=head1 METHODS

=head2 all_available_stats

=head2 collect

=head1 AUTHOR

David Morel <david.morel@amakuru.net>

=head1 COPYRIGHT AND LICENSE

This software is copyright (c) 2015 by David Morel & Booking.com.

This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.

=cut


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.