Group
Extension

Hadoop-Admin/lib/Hadoop/Admin.pm

use strict;
package Hadoop::Admin;
{
  $Hadoop::Admin::VERSION = '0.4';
}
use warnings;
use Moose;
use LWP::UserAgent;
use JSON -support_by_pp;

has 'namenode' => (
    is  => 'ro',
    isa => 'Str',
    reader => 'get_namenode',
    predicate => 'has_namenode',
    );

has 'namenode_port' => (
    is      => 'ro',
    isa     => 'Int',
    default => '50070',
    reader  => 'get_namenode_port',
    predicate => 'has_namenode_port',
    );

has 'jobtracker' => (
    is  => 'ro',
    isa => 'Str',
    reader => 'get_jobtracker',
    predicate => 'has_jobtracker',
    );

has 'jobtracker_port' => (
    is      => 'ro',
    isa     => 'Int',
    default => '50030',
    reader  => 'get_jobtracker_port',
    predicate => 'has_jobtracker_port',
    );

has 'secondarynamenode' => (
    is  => 'ro',
    isa => 'Str',
    reader => 'get_secondarynamenode',
    predicate => 'has_secondarynamenode',
    );

has 'resourcemanager' => (
    is  => 'ro',
    isa => 'Str',
    reader => 'get_resourcemanager',
    predicate => 'has_resourcemanager',
    );

has 'resourcemanager_port' => (
    is      => 'ro',
    isa     => 'Int',
    default => '8088',
    reader  => 'get_resourcemanager_port',
    predicate => 'has_resourcemanager_port',
    );

has 'socksproxy' => (
    is  => 'ro',
    isa => 'Str',
    reader => 'get_socksproxy',
    predicate => 'has_socksproxy',
    );

has 'socksproxy_port' => (
    is      => 'ro',
    isa     => 'Int',
    default => '1080',
    reader  => 'get_socksproxy_port',
    predicate => 'has_socksproxy_port',
    );

has 'ua' => (
    is  => 'rw',
    isa => 'Object',
    predicate => 'has_ua',
    );

has '_test_namenodeinfo' => (
    is  => 'ro',
    isa => 'Str',
    );

has '_test_jobtrackerinfo' => (
    is  => 'ro',
    isa => 'Str',
    );

has '_test_resourcemanagerinfo' => (
    is  => 'ro',
    isa => 'Str',
    );

# ABSTRACT: Module for administration of Hadoop clusters


sub BUILD{

    my $self=shift;

    if ( $self->has_resourcemanager && $self->has_jobtracker ){
	die "Can't have both ResourceManager and JobTracker\n";
    }

    $self->ua(new LWP::UserAgent());
    if ( defined $self->get_socksproxy ){
	$self->ua->proxy([qw(http https)] => 'socks://'.$self->get_socksproxy.':1080');
    }
    ## Hooks for testing during builds.  Doesn't connect to a real cluster.
    if ( defined $self->_test_namenodeinfo ){
	my $test_nn_string;
	{
	    local $/=undef;
	    open (my $fh, '<', $self->_test_namenodeinfo) or die "Couldn't open file: $!";
	    $test_nn_string = <$fh>;
	    close $fh;
	}
	$self->parse_nn_jmx($test_nn_string);
    }else{
	if ( defined $self->get_namenode ){
	    $self->gather_nn_jmx('NameNodeInfo');
	}
    }
    ## Hooks for testing during builds.  Doesn't connect to a real cluster.
    if ( defined $self->_test_jobtrackerinfo ){
	my $test_jt_string;
	{
	    local $/=undef;
	    open(my $fh, '<', $self->_test_jobtrackerinfo) or die "Couldn't open file: $!";
	    $test_jt_string = <$fh>;
	    close $fh;
	}
	$self->parse_jt_jmx($test_jt_string);
    }else{
	if ( defined $self->get_jobtracker ){
	    $self->gather_jt_jmx('JobTrackerInfo');
	}
    }
    
    ## Hooks for testing during builds.  Doesn't connect to a real cluster.
    if ( defined $self->_test_resourcemanagerinfo ){
	my $test_rm_string;
	{
	    local $/=undef;
	    open(my $fh, '<', $self->_test_resourcemanagerinfo) or die "Couldn't open file: $!";
	    $test_rm_string = <$fh>;
	    close $fh;
	}
	$self->parse_rm_jmx($test_rm_string);
    }else{
	if ( defined $self->get_resourcemanager ){
	    $self->gather_rm_jmx('RMNMInfo');
	}
    }
    
    return $self;
}

sub datanode_live_list{
    my $self=shift;
    return keys %{$self->{'NameNodeInfo_LiveNodes'}};
}

sub datanode_dead_list{
    my $self=shift;
    return keys %{$self->{'NameNodeInfo_DeadNodes'}};
}

sub datanode_decom_list{
    my $self=shift;
    return keys %{$self->{'NameNodeInfo_DecomNodes'}};
}


sub nodemanager_live_list{
    my $self=shift;
    my @returnValue=();
    foreach my $hostref ( @{$self->{'RMNMInfo_LiveNodeManagers'}} ) {
	push @returnValue, $hostref->{'HostName'};
    }
    return @returnValue;
}

sub tasktracker_live_list{
    my $self=shift;
    my @returnValue=();
    use Data::Dumper;
    foreach my $hostref ( @{$self->{'JobTrackerInfo_AliveNodesInfoJson'}} ) {
	push @returnValue, $hostref->{'hostname'};
    }
    return @returnValue;
}

sub tasktracker_blacklist_list{
    my $self=shift;
    my @returnValue=();
    foreach my $hostref ( @{$self->{'JobTrackerInfo_BlacklistedNodesInfoJson'}} ) {
	push @returnValue, $hostref->{'hostname'};
    }
    return @returnValue;
}

sub tasktracker_graylist_list{
    my $self=shift;
    my @returnValue=();
    foreach my $hostref ( @{$self->{'JobTrackerInfo_BlacklistedNodesInfoJson'}} ) {
	push @returnValue, $hostref->{'hostname'};
    }
    return @returnValue;
}

sub gather_nn_jmx{
    my $self=shift;
    my $bean=shift;
    my $qry;
    if ($bean eq 'NameNodeInfo'){
	$qry='Hadoop%3Aservice%3DNameNode%2Cname%3DNameNodeInfo';
    }
    my $jmx_url= "http://".$self->get_namenode.":".$self->get_namenode_port."/jmx?qry=$qry";
    my $response = $self->{'ua'}->get($jmx_url);
    if (! $response->is_success) {
	print "Can't get JMX data from Namenode: $@";
	exit(1);
    }
    $self->parse_nn_jmx($response->decoded_content);
}

sub parse_nn_jmx{
    my $self=shift;
    my $nn_content=shift;
    my $json=new JSON();
    my $json_text = $json->allow_nonref->utf8->relaxed->escape_slash->loose->allow_singlequote->allow_barekey->pretty->decode($nn_content);
    foreach my $bean (@{$json_text->{beans}}){
	if ($bean->{name} eq "Hadoop:service=NameNode,name=NameNodeInfo"){
	    foreach my $var (keys %{$bean}){
		$self->{"NameNodeInfo_$var"}=$bean->{$var};
	    }
	    $self->{'NameNodeInfo_LiveNodes'}=$json->allow_nonref->utf8->relaxed->escape_slash->loose->allow_singlequote->allow_barekey->pretty->decode($bean->{LiveNodes});
	    $self->{'NameNodeInfo_DeadNodes'}=$json->allow_nonref->utf8->relaxed->escape_slash->loose->allow_singlequote->allow_barekey->pretty->decode($bean->{DeadNodes});
	    $self->{'NameNodeInfo_DecomNodes'}=$json->allow_nonref->utf8->relaxed->escape_slash->loose->allow_singlequote->allow_barekey->pretty->decode($bean->{DecomNodes});
	}
	
    }
}

sub gather_jt_jmx{
    my $self=shift;
    my $bean=shift;
    my $qry;
    if ($bean eq "JobTrackerInfo"){
	$qry='Hadoop%3Aservice%3DJobTracker%2Cname%3DJobTrackerInfo';
    }
    my $jmx_url= "http://".$self->get_jobtracker.":".$self->get_jobtracker_port."/jmx?qry=$qry";
    my $response = $self->{'ua'}->get($jmx_url);
    if (! $response->is_success) {
	print "Can't get JMX data from JobTracker: $@";
	exit(1);
    }
    $self->parse_jt_jmx($response->decoded_content);

}

sub parse_jt_jmx{
    my $self=shift;
    my $jt_content=shift;
    my $json=JSON->new();
    my $json_text = $json->allow_nonref->utf8->relaxed->escape_slash->loose->allow_singlequote->allow_barekey->pretty->decode($jt_content);
    foreach my $bean (@{$json_text->{beans}}){
	foreach my $var (keys %{$bean}){
	    $self->{"JobTrackerInfo_$var"}=$bean->{$var};
	}
	if ($bean->{name} eq "Hadoop:service=JobTracker,name=JobTrackerInfo"){
	    $self->{'JobTrackerInfo_AliveNodesInfoJson'}=$json->allow_nonref->utf8->relaxed->escape_slash->loose->allow_singlequote->allow_barekey->pretty->decode($bean->{AliveNodesInfoJson});
	    $self->{'JobTrackerInfo_BlacklistedNodesInfoJson'}=$json->allow_nonref->utf8->relaxed->escape_slash->loose->allow_singlequote->allow_barekey->pretty->decode($bean->{BlacklistedNodesInfoJson});
	    $self->{'JobTrackerInfo_GraylistedNodesInfoJson'}=$json->allow_nonref->utf8->relaxed->escape_slash->loose->allow_singlequote->allow_barekey->pretty->decode($bean->{GraylistedNodesInfoJson});
	}
	
    }
}
sub gather_rm_jmx{
    my $self=shift;
    my $bean=shift;
    my $qry;
    if ($bean eq "RMNMInfo"){
	$qry='Hadoop%3Aservice%3DResourceManager%2Cname%3DRMNMInfo';
    }
    my $jmx_url= "http://".$self->get_resourcemanager.":".$self->get_resourcemanager_port."/jmx?qry=$qry";
    my $response = $self->{'ua'}->get($jmx_url);
    if (! $response->is_success) {
	print "Can't get JMX data from ResourceManager: $@";
	exit(1);
    }
    $self->parse_rm_jmx($response->decoded_content);
}

sub parse_rm_jmx{
    my $self=shift;
    my $rm_content=shift;
    my $json=JSON->new();
    my $json_text = $json->allow_nonref->utf8->relaxed->escape_slash->loose->allow_singlequote->allow_barekey->pretty->decode($rm_content);
    foreach my $bean (@{$json_text->{beans}}){
	foreach my $var (keys %{$bean}){
	    $self->{"RMNMInfo_$var"}=$bean->{$var};
	}
	if ($bean->{name} eq "Hadoop:service=ResourceManager,name=RMNMInfo"){
	    $self->{'RMNMInfo_LiveNodeManagers'}=$json->allow_nonref->utf8->relaxed->escape_slash->loose->allow_singlequote->allow_barekey->pretty->decode($bean->{LiveNodeManagers});
	}
	
    }
}

1;


__END__
=pod

=head1 NAME

Hadoop::Admin - Module for administration of Hadoop clusters

=head1 VERSION

version 0.4

=head1 SYNOPSIS

    use Hadoop::Admin; 

    my $cluster=Hadoop::Admin->new({
      'namenode'          => 'namenode.host.name',
      'jobtracker'        => 'jobtracker.host.name',
    });

    print $cluster->datanode_live_list();

=head1 DESCRIPTION

This module connects to Hadoop servers using http.  The JMX Proxy
Servlet is queried for specific mbeans.

This module requires Hadoop the changes in
https://issues.apache.org/jira/browse/HADOOP-7144.  They are available
in versions 0.20.204.0, 0.23.0 or later.

=head1 INTERFACE FUNCTIONS

=head2 new ()

=over 4

=item Description

Create a new instance of the Hadoop::Admin class.  

The method requires a hash containing at minimum one of the
namenode's, the resourcemanager's, and the jobtracker's hostnames.
Optionally, you may provide a socksproxy for the http connection.  Use
of both a jobtracker and resourcemanger is prohibited.  It is not a
valid cluster configuration to have both a jobtracker and a
resourcemanager.

Creation of this object will cause an immediate querry to servers
provided to the constructor.

=item namenode => <hostname>

=item namenode_port => <port number>

=item jobtracker => <hostname>

=item jobtracker_port => <port number>

=item resourcemanager => <hostname>

=item resourcemanager_port => <port number>

=item socksproxy => <hostname>

=item socksproxy_port => <port number>

=back

=head2 datanode_live_list ()

=over 4

=item Description

Returns a list of the current live DataNodes.

=item Return values

Array containing hostnames.

=back

=head2 datanode_dead_list ()

=over 4

=item Description

Returns a list of the current dead DataNodes.

=item Return values

Array containing hostnames.

=back

=head2 datanode_decom_list ()

=over 4

=item Description

Returns a list of the currently decommissioning DataNodes.

=item Return values

Array containing hostnames.

=back

=head2 nodemanager_live_list ()

=over 4

=item Description

Returns a list of the current live NodeManagers.

=item Return values

Array containing hostnames.

=back

=head2 tasktracker_live_list ()

=over 4

=item Description

Returns a list of the current live TaskTrackers.

=item Return values

Array containing hostnames.

=back

=head2 tasktracker_blacklist_list ()

=over 4

=item Description

Returns a list of the current blacklisted TaskTrackers.

=item Return values

Array containing hostnames.

=back

=head2 tasktracker_graylist_list ()

=over 4

=item Description

Returns a list of the current graylisted TaskTrackers.

=item Return values

Array containing hostnames.

=back

=head1 KNOWN BUGS

None known at this time.  Please log issues at: 

https://github.com/cwimmer/hadoop-admin/issues

=head1 AVAILABILITY

Source code is available on GitHub:

https://github.com/cwimmer/hadoop-admin

Module available on CPAN as Hadoop::Admin:

http://search.cpan.org/~cwimmer/

=for Pod::Coverage gather_jt_jmx
gather_nn_jmx
gather_rm_jmx
parse_jt_jmx
parse_nn_jmx
parse_rm_jmx
BUILD

=head1 AUTHOR

Charles A. Wimmmer (charles@wimmer.net)

=head1 COPYRIGHT AND LICENSE

This software is Copyright (c) 2012 by Charles A. Wimmer.

This is free software, licensed under:

  The (three-clause) BSD License

=cut



Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.