Net-Hadoop-WebHDFS/lib/Net/Hadoop/WebHDFS.pm
package Net::Hadoop::WebHDFS;
use strict;
use warnings;
use Carp;
use JSON::XS qw//;
use Furl;
use File::Spec;
use URI;
use Try::Tiny;
use constant GENERIC_FS_ACTION_WITH_NO_PATH => '';
our $VERSION = "0.8";
our %OPT_TABLE = ();
sub new {
my ($this, %opts) = @_;
my $self = +{
host => $opts{host} || 'localhost',
port => $opts{port} || 50070,
standby_host => $opts{standby_host},
standby_port => ($opts{standby_port} || $opts{port} || 50070),
httpfs_mode => $opts{httpfs_mode} || 0,
username => $opts{username},
doas => $opts{doas},
useragent => $opts{useragent} || 'Furl Net::Hadoop::WebHDFS (perl)',
timeout => $opts{timeout} || 10,
suppress_errors => $opts{suppress_errors} || 0,
last_error => undef,
under_failover => 0,
};
$self->{furl} = Furl::HTTP->new(agent => $self->{useragent}, timeout => $self->{timeout}, max_redirects => 0);
return bless $self, $this;
}
# curl -i -X PUT "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=CREATE
# [&overwrite=<true|false>][&blocksize=<LONG>][&replication=<SHORT>]
# [&permission=<OCTAL>][&buffersize=<INT>]"
sub create {
my ($self, $path, $body, %options) = @_;
if ($self->{httpfs_mode}) {
%options = (%options, data => 'true');
}
my $err = $self->check_options('CREATE', %options);
croak $err if $err;
my $res = $self->operate_requests('PUT', $path, 'CREATE', \%options, $body);
$res->{code} == 201;
}
$OPT_TABLE{CREATE} = ['overwrite', 'blocksize', 'replication', 'permission', 'buffersize', 'data'];
# curl -i -X POST "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=APPEND
# [&buffersize=<INT>]"
sub append {
my ($self, $path, $body, %options) = @_;
if ($self->{httpfs_mode}) {
%options = (%options, data => 'true');
}
my $err = $self->check_options('APPEND', %options);
croak $err if $err;
my $res = $self->operate_requests('POST', $path, 'APPEND', \%options, $body);
$res->{code} == 200;
}
$OPT_TABLE{APPEND} = ['buffersize', 'data'];
# curl -i -L "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=OPEN
# [&offset=<LONG>][&length=<LONG>][&buffersize=<INT>]"
sub read {
my ($self, $path, %options) = @_;
my $err = $self->check_options('OPEN', %options);
croak $err if $err;
my $res = $self->operate_requests('GET', $path, 'OPEN', \%options);
$res->{body};
}
$OPT_TABLE{OPEN} = ['offset', 'length', 'buffersize'];
sub open { (shift)->read(@_); }
# curl -i -X PUT "http://<HOST>:<PORT>/<PATH>?op=MKDIRS
# [&permission=<OCTAL>]"
sub mkdir {
my ($self, $path, %options) = @_;
my $err = $self->check_options('MKDIRS', %options);
croak $err if $err;
my $res = $self->operate_requests('PUT', $path, 'MKDIRS', \%options);
$self->check_success_json($res, 'boolean');
}
$OPT_TABLE{MKDIRS} = ['permission'];
sub mkdirs { (shift)->mkdir(@_); }
# curl -i -X PUT "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=RENAME
# &destination=<PATH>"
sub rename {
my ($self, $path, $dest, %options) = @_;
my $err = $self->check_options('RENAME', %options);
croak $err if $err;
unless ($dest =~ m!^/!) {
$dest = '/' . $dest;
}
my $res = $self->operate_requests('PUT', $path, 'RENAME', {%options, destination => $dest});
$self->check_success_json($res, 'boolean');
}
# curl -i -X DELETE "http://<host>:<port>/webhdfs/v1/<path>?op=DELETE
# [&recursive=<true|false>]"
sub delete {
my ($self, $path, %options) = @_;
my $err = $self->check_options('DELETE', %options);
croak $err if $err;
my $res = $self->operate_requests('DELETE', $path, 'DELETE', \%options);
$self->check_success_json($res, 'boolean');
}
$OPT_TABLE{DELETE} = ['recursive'];
# curl -i "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILESTATUS"
sub stat {
my ($self, $path, %options) = @_;
my $err = $self->check_options('GETFILESTATUS', %options);
croak $err if $err;
my $res = $self->operate_requests('GET', $path, 'GETFILESTATUS', \%options);
$self->check_success_json($res, 'FileStatus');
}
sub getfilestatus { (shift)->stat(@_); }
# curl -i "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=LISTSTATUS"
sub list {
my ($self, $path, %options) = @_;
my $err = $self->check_options('LISTSTATUS', %options);
croak $err if $err;
my $res = $self->operate_requests('GET', $path, 'LISTSTATUS', \%options);
$self->check_success_json($res, 'FileStatuses')->{FileStatus};
}
sub liststatus { (shift)->list(@_); }
# curl -i "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETCONTENTSUMMARY"
sub content_summary {
my ($self, $path, %options) = @_;
my $err = $self->check_options('GETCONTENTSUMMARY', %options);
croak $err if $err;
my $res = $self->operate_requests('GET', $path, 'GETCONTENTSUMMARY', \%options);
$self->check_success_json($res, 'ContentSummary');
}
sub getcontentsummary { (shift)->content_summary(@_); }
# curl -i "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILECHECKSUM"
sub checksum {
my ($self, $path, %options) = @_;
my $err = $self->check_options('GETFILECHECKSUM', %options);
croak $err if $err;
my $res = $self->operate_requests('GET', $path, 'GETFILECHECKSUM', \%options);
$self->check_success_json($res, 'FileChecksum');
}
sub getfilechecksum { (shift)->checksum(@_); }
# curl -i "http://<HOST>:<PORT>/webhdfs/v1/?op=GETHOMEDIRECTORY"
sub homedir {
my ($self, %options) = @_;
my $err = $self->check_options('GETHOMEDIRECTORY', %options);
croak $err if $err;
my $res = $self->operate_requests('GET', '/', 'GETHOMEDIRECTORY', \%options);
$self->check_success_json($res, 'Path');
}
sub gethomedirectory { (shift)->homedir(@_); }
# curl -i -X PUT "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETPERMISSION
# [&permission=<OCTAL>]"
sub chmod {
my ($self, $path, $mode, %options) = @_;
my $err = $self->check_options('SETPERMISSION', %options);
croak $err if $err;
my $res = $self->operate_requests('PUT', $path, 'SETPERMISSION', {%options, permission => $mode});
$res->{code} == 200;
}
sub setpermission { (shift)->chmod(@_); }
# curl -i -X PUT "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETOWNER
# [&owner=<USER>][&group=<GROUP>]"
sub chown {
my ($self, $path, %options) = @_;
my $err = $self->check_options('SETOWNER', %options);
croak $err if $err;
unless (defined($options{owner}) or defined($options{group})) {
croak "'chown' needs at least one of owner or group";
}
my $res = $self->operate_requests('PUT', $path, 'SETOWNER', \%options);
$res->{code} == 200;
}
$OPT_TABLE{SETOWNER} = ['owner', 'group'];
sub setowner { (shift)->chown(@_); }
# curl -i -X PUT "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETREPLICATION
# [&replication=<SHORT>]"
sub replication {
my ($self, $path, $replnum, %options) = @_;
my $err = $self->check_options('SETREPLICATION', %options);
croak $err if $err;
my $res = $self->operate_requests('PUT', $path, 'SETREPLICATION', {%options, replication => $replnum});
$self->check_success_json($res, 'boolean');
}
sub setreplication { (shift)->replication(@_); }
# curl -i -X PUT "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETTIMES
# [&modificationtime=<TIME>][&accesstime=<TIME>]"
# modificationtime: radix-10 long integer
# accesstime: radix-10 long integer
$OPT_TABLE{SETTIMES} = [ qw( modificationtime accesstime ) ];
sub touch {
my ($self, $path, %options) = @_;
my $err = $self->check_options('SETTIMES', %options);
croak $err if $err;
unless (defined($options{modificationtime}) or defined($options{accesstime})) {
croak "'touch' needs at least one of modificationtime or accesstime";
}
my $res = $self->operate_requests('PUT', $path, 'SETTIMES', \%options);
$res->{code} == 200;
}
#---------------------------- EXTENDED ATTRIBUTES START -----------------------#
sub xattr {
my($self, $path, $action, @args) = @_;
croak "No action defined for xattr" if ! $action;
my $target = sprintf '_%s_xattr', $action;
my $target2 = sprintf '_%s_xattrs', $action;
my $method = $self->can( $target )
|| $self->can( $target2 )
|| croak "invalid action `$action`";
$self->$method( $path, @args );
}
# curl -i "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETXATTRS
# &xattr.name=<XATTRNAME>&encoding=<ENCODING>"
#
# curl -i "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETXATTRS
# &xattr.name=<XATTRNAME1>&xattr.name=<XATTRNAME2>&encoding=<ENCODING>"
$OPT_TABLE{GETXATTRS} = [qw( names encoding flatten )];
sub _get_xattrs {
my($self, $path, %options) = @_;
my $err = $self->check_options('GETXATTRS', %options);
croak $err if $err;
my $flatten = delete $options{flatten};
# limit to a subset? will return all of the attributes otherwise
if ( my $name = delete $options{names} ) {
croak "getxattrs: name needs to be an arrayref" if ref $name ne 'ARRAY';
$options{'xattr.name'} = $name;
}
my $res = $self->operate_requests('GET', $path, 'GETXATTRS', \%options);
if ( my $rv = $self->check_success_json($res, 'XAttrs') ) {
croak "Unexpected return value from listxattrs: $rv"
if ref $rv ne 'ARRAY';
return $rv if ! $flatten;
map { @{ $_ }{qw/ name value /} } @{ $rv };
}
}
# curl -i "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=LISTXATTRS"
sub _list_xattrs {
my($self, $path) = @_;
my $res = $self->operate_requests('GET', $path, 'LISTXATTRS');
if ( my $rv = $self->check_success_json($res, 'XAttrNames') ) {
my $attr = JSON::XS::decode_json $rv;
croak "Unexpected return value from listxattrs: $attr"
if ref $attr ne 'ARRAY';
return @{ $attr };
}
}
# curl -i -X PUT "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=SETXATTR
# &xattr.name=<XATTRNAME>&xattr.value=<XATTRVALUE>&flag=<FLAG>"
# https://blog.cloudera.com/blog/2014/06/why-extended-attributes-are-coming-to-hdfs/
# flag: [CREATE,REPLACE]
$OPT_TABLE{SETXATTR} = [qw( name value flag )];
sub _set_xattr {
my($self, $path, %options) = @_;
my $err = $self->check_options('SETXATTR', %options);
croak $err if $err;
croak "value of xattr not set" if ! exists $options{value};
$options{ 'xattr.name' } = delete $options{name} || croak "name of xattr not set";
$options{ 'xattr.value' } = delete $options{value};
croak 'flag was not specified.' if ! $options{flag};
my $res = $self->operate_requests( PUT => $path, 'SETXATTR', \%options);
$res->{code} == 200;
}
sub _create_xattr {
my($self, $path, $name, $value) = @_;
$self->_set_xattr(
$path,
name => $name,
value => $value,
flag => 'CREATE',
);
}
sub _replace_xattr {
my($self, $path, $name, $value) = @_;
$self->_set_xattr(
$path,
name => $name,
value => $value,
flag => 'REPLACE',
);
}
# curl -i -X PUT "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=REMOVEXATTR
# &xattr.name=<XATTRNAME>"
sub _remove_xattr {
my($self, $path, $name) = @_;
my %options;
$options{'xattr.name'} = $name || croak "xattr name was not specified";
my $res = $self->operate_requests( PUT => $path, 'REMOVEXATTR', \%options);
$res->{code} == 200;
}
#---------------------------- EXTENDED ATTRIBUTES END -------------------------#
# curl -i "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=CHECKACCESS
# &fsaction=<FSACTION>
# this seems to be broken in some versions. You may get a "No enum constant ..."
# error if this is the case.
# Also see https://issues.apache.org/jira/browse/HDFS-9695
#
sub checkaccess {
my($self, $path, $fsaction, %options) = @_;
croak "checkaccess: fsaction parameter was not specified" if ! $fsaction;
my $err = $self->check_options('CHECKACCESS', %options);
croak $err if $err;
$options{fsaction} = $fsaction;
my $res = $self->operate_requests('GET', $path, 'CHECKACCESS', \%options);
$res->{code} == 200;
}
# curl -i -X POST "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=CONCAT
# &sources=<PATHS>"
sub concat {
my($self, $path, @sources) = @_;
croak "At least one source path needs to be specified" if ! @sources;
my $paths = join q{,}, @sources;
my $res = $self->operate_requests(
POST => $path,
'CONCAT',
{ sources => $paths },
);
$res->{code} == 200;
}
# curl -i -X POST "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=TRUNCATE
# &newlength=<LONG>"
# Available after Hadoop v2.7
# https://issues.apache.org/jira/browse/HDFS-7655
#
sub truncate {
my($self, $path, $newlength) = @_;
$newlength = 0 if ! defined $newlength;
my $res = $self->operate_requests(
POST => $path,
'TRUNCATE',
{ newlength => $newlength },
);
if ( my $rv = $self->check_success_json($res, 'boolean') ) {
$rv eq 'true';
}
}
# curl -i -X PUT "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=CREATESYMLINK
# &destination=<PATH>[&createParent=<true |false>]"
# currently broken/disabled
# https://issues.apache.org/jira/browse/HADOOP-10019
#$OPT_TABLE{CREATESYMLINK} = [qw( destination createParent )];
#sub createsymlink {
# # Not available yet
# # https://issues.apache.org/jira/browse/HADOOP-10019
# my($self, $path, $destination, $createParent) = @_;
#
# croak "createsymlink: destination not specified" if ! $destination;
#
# my %options = (
# destination => $destination,
# ($createParent ? (
# createParent => $createParent ? 'true' : 'false',
# ) : ())
# );
#
# my $res = $self->operate_requests( PUT => $path, 'CREATESYMLINK', \%options);
# $res->{code} == 200;
#}
#---------------------------- DELEGATION TOKEN START --------------------------#
# Also see
# http://hadoop.apache.org/docs/r2.6.0/hadoop-hdfs-httpfs/httpfs-default.html
# GETDELEGATIONTOKENS: Obsolete and removed after HDFS-10200, HDFS-3667
#
sub delegation_token {
my($self, $action, @args) = @_;
croak "No action defined for delegation_token" if ! $action;
my $target = sprintf '_%s_delegation_token', $action;
croak "invalid action $action" if ! $self->can( $target );
$self->$target( @args );
}
# curl -i "http://<HOST>:<PORT>/webhdfs/v1/?op=GETDELEGATIONTOKEN
# &renewer=<USER>&service=<SERVICE>&kind=<KIND>"
# kind: The kind of the delegation token requested
# <empty> (Server sets the default kind for the service)
# A string that represents token kind e.g "HDFS_DELEGATION_TOKEN" or "WEBHDFS delegation"
# service: The name of the service where the token is supposed to be used, e.g. ip:port of the namenode
#
$OPT_TABLE{GETDELEGATIONTOKEN} = [qw( renewer service kind )];
sub _get_delegation_token {
my($self, $path, %options) = @_;
my $err = $self->check_options('GETDELEGATIONTOKEN', %options);
croak $err if $err;
$options{renewer} ||= $self->{username} if $self->{username};
my $res = $self->operate_requests( GET => $path, 'GETDELEGATIONTOKEN', \%options);
if ( my $rv = $self->check_success_json($res, 'Token') ) {
$rv->{urlString};
}
}
# curl -i -X PUT "http://<HOST>:<PORT>/webhdfs/v1/?op=RENEWDELEGATIONTOKEN
# &token=<TOKEN>"
sub _renew_delegation_token {
my($self, $token) = @_;
croak "No token was specified" if ! $token;
my $res = $self->operate_requests(
PUT => GENERIC_FS_ACTION_WITH_NO_PATH,
'RENEWDELEGATIONTOKEN',
{ token => $token },
);
if ( my $rv = $self->check_success_json($res, 'long') ) {
$rv; # new expiration time in miliseconds
}
}
# curl -i -X PUT "http://<HOST>:<PORT>/webhdfs/v1/?op=CANCELDELEGATIONTOKEN
# &token=<TOKEN>"
sub _cancel_delegation_token {
my($self, $token) = @_;
croak "No token was specified" if ! $token;
my $res = $self->operate_requests(
PUT => GENERIC_FS_ACTION_WITH_NO_PATH,
'CANCELDELEGATIONTOKEN',
{ token => $token },
);
$res->{code} == 200;
}
#---------------------------- DELEGATION TOKEN END ----------------------------#
#---------------------------- SNAPSHOT START ----------------------------------#
# Needs testing, seems to be buggy and can be destructive in earlier versions
# i.e.: https://issues.apache.org/jira/browse/HDFS-9406
#
# Snaphotting is not enabled by default and this needs to be executed as a super user:
# hdfs dfsadmin -allowSnapshot $path
#
sub snapshot {
my($self, $path, $action, @args) = @_;
croak "No action defined for delegation_token" if ! $action;
my $target = sprintf '_%s_snapshot', $action;
croak sprintf "%s: invalid action $action", (caller 0)[3] if ! $self->can( $target );
$self->$target( $path => @args );
}
# curl -i -X PUT "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=CREATESNAPSHOT
# [&snapshotname=<SNAPSHOTNAME>]"
sub _create_snapshot {
my($self, $path, $snapshotname) = @_;
my %options;
$options{snapshotname} = $snapshotname if $snapshotname;
my $res = $self->operate_requests('PUT', $path, 'CREATESNAPSHOT', \%options);
if ( my $rv = $self->check_success_json($res, 'Path') ) {
$rv;
}
}
# curl -i -X PUT "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=RENAMESNAPSHOT
# &oldsnapshotname=<SNAPSHOTNAME>&snapshotname=<SNAPSHOTNAME>"
sub _rename_snapshot {
my($self, $path, $oldsnapshotname, $snapshotname) = @_;
my %options = (
oldsnapshotname => $oldsnapshotname,
snapshotname => $snapshotname,
);
my $res = $self->operate_requests('PUT', $path, 'RENAMESNAPSHOT', \%options);
$res->{code} == 200;
}
# curl -i -X DELETE "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=DELETESNAPSHOT
# &snapshotname=<SNAPSHOTNAME>"
sub _delete_snapshot {
my($self, $path, $snapshotname) = @_;
croak "No snapshotname specified" if ! $snapshotname;
my %options = (
snapshotname => $snapshotname,
);
my $res = $self->operate_requests('DELETE', $path, 'DELETESNAPSHOT', \%options);
$res->{code} == 200;
}
#---------------------------- SNAPSHOT END ------------------------------------#
sub touchz {
my ($self, $path) = @_;
return $self->create( $path, '', overwrite => 'true' );
}
sub settimes { (shift)->touch(@_); }
# sub delegation_token {}
# sub renew_delegation_token {}
# sub cancel_delegation_token {}
sub check_options {
my ($self, $op, %opts) = @_;
my @ex = ();
my $opts = $OPT_TABLE{$op} || [];
foreach my $k (keys %opts) {
push @ex, $k if scalar(grep {$k eq $_} @$opts) < 1;
}
return undef unless @ex;
'no such option: ' . join(' ', @ex);
}
sub check_success_json {
my ($self, $res, $attr) = @_;
$res->{code} == 200 and $res->{content_type} =~ m!^application/json! and
(not defined($attr) or JSON::XS::decode_json($res->{body})->{$attr});
}
sub api_path {
my ($self, $path) = @_;
return '/webhdfs/v1' . $path if $path =~ m!^/!;
'/webhdfs/v1/' . $path;
}
sub build_path {
my ($self, $path, $op, %params) = @_;
my %opts = (('op' => $op),
($self->{username} ? ('user.name' => $self->{username}) : ()),
($self->{doas} ? ('doas' => $self->{doas}) : ()),
%params);
my $u = URI->new('', 'http');
$u->query_form(%opts);
$self->api_path($path) . $u->path_query; # path_query() #=> '?foo=1&bar=2'
}
sub connect_to {
my $self = shift;
if ($self->{under_failover}) {
return ($self->{standby_host}, $self->{standby_port});
}
return ($self->{host}, $self->{port});
}
our %REDIRECTED_OPERATIONS = (APPEND => 1, CREATE => 1, OPEN => 1, GETFILECHECKSUM => 1);
sub operate_requests {
my ($self, $method, $path, $op, $params, $payload) = @_;
my ($host, $port) = $self->connect_to();
my $headers = []; # or undef ?
if ($self->{httpfs_mode} or not $REDIRECTED_OPERATIONS{$op}) {
# empty files are ok
if ($self->{httpfs_mode} and defined($payload)) {
$headers = ['Content-Type' => 'application/octet-stream'];
}
return $self->request($host, $port, $method, $path, $op, $params, $payload, $headers);
}
# pattern for not httpfs and redirected by namenode
my $res = $self->request($host, $port, $method, $path, $op, $params, undef);
unless ($res->{code} >= 300 and $res->{code} <= 399 and $res->{location}) {
my $code = $res->{code};
my $body = $res->{body};
croak "NameNode returns non-redirection (or without location header), code:$code, body:$body.";
}
my $uri = URI->new($res->{location});
$headers = ['Content-Type' => 'application/octet-stream'];
return $self->request($uri->host, $uri->port, $method, $uri->path_query, undef, {}, $payload, $headers);
}
sub request {
my $self = shift;
return $self->_request(@_) unless $self->{suppress_errors};
try {
$self->_request(@_);
} catch {
$self->{last_error} = $_;
0;
};
}
# IllegalArgumentException 400 Bad Request
# UnsupportedOperationException 400 Bad Request
# SecurityException 401 Unauthorized
# IOException 403 Forbidden
# FileNotFoundException 404 Not Found
# RumtimeException 500 Internal Server Error
sub _request {
my ($self, $host, $port, $method, $path, $op, $params, $payload, $header) = @_;
my $request_path = $op ? $self->build_path($path, $op, %$params) : $path;
my ($ver, $code, $msg, $headers, $body) = $self->{furl}->request(
method => $method,
host => $host,
port => $port,
path_query => $request_path,
headers => $header,
($payload ? (content => $payload) : ()),
);
my $res = { code => $code, body => $body };
for (my $i = 0; $i < scalar(@$headers); $i += 2) {
my $header = $headers->[$i];
my $value = $headers->[$i + 1];
if ($header =~ m!^location$!i) { $res->{location} = $value; }
elsif ($header =~ m!^content-type$!i) { $res->{content_type} = $value; }
}
return $res if $code >= 200 and $code <= 299;
return $res if $code >= 300 and $code <= 399;
my $errmsg = $res->{body} || 'Response body is empty...';
$errmsg =~ s/\n//g;
if ($code == 400) { croak "ClientError: $errmsg"; }
elsif ($code == 401) { croak "SecurityError: $errmsg"; }
elsif ($code == 403) {
if ($errmsg =~ /org\.apache\.hadoop\.ipc\.StandbyException/) {
if ($self->{httpfs_mode} || not defined($self->{standby_host})) {
# failover is disabled
} elsif ($self->{retrying}) {
# more failover is prohibited
$self->{retrying} = 0;
} else {
$self->{under_failover} = not $self->{under_failover};
$self->{retrying} = 1;
my ($next_host, $next_port) = $self->connect_to();
my $val = $self->request($next_host, $next_port, $method, $path, $op, $params, $payload, $header);
$self->{retrying} = 0;
return $val;
}
}
croak "IOError: $errmsg";
}
elsif ($code == 404) { croak "FileNotFoundError: $errmsg"; }
elsif ($code == 500) { croak "ServerError: $errmsg"; }
croak "RequestFailedError, code:$code, message:$errmsg";
}
sub exists {
my $self = shift;
my $path = shift || croak "No HDFS path was specified";
my $stat;
eval {
$stat = $self->stat( $path );
1;
} or do {
my $eval_error = $@ || 'Zombie error';
return if $eval_error =~ m<
\QFileNotFoundError: {"RemoteException":{"message":"File does not exist:\E
>xms;
# just re-throw
croak $eval_error;
};
return $stat;
}
sub find {
my $self = shift;
my $file_path = shift || croak "No file path specified";
my $cb = shift;
my $opt = @_ && ref $_[-1] eq 'HASH' ? pop @_ : {};
if ( ref $cb ne 'CODE' ) {
die "Call back needs to be a CODE ref";
}
my $suppress = $self->{suppress_errors};
# can be used to quickly skip the java junk like, file names starting with
# underscores, etc.
my $re_ignore = $opt->{re_ignore} ? qr/$opt->{re_ignore}/ : undef;
#
# No such thing like symlinks (yet) in HDFS, in case you're wondering:
# https://issues.apache.org/jira/browse/HADOOP-10019
# although check that link yourself
#
my $looper;
$looper = sub {
my $thing = shift;
if ( ! $self->exists( $thing ) ) {
# should happen at the start, so this will short-circuit the recursion
warn "The HDFS directory specified ($thing) does not exist! Please guard your HDFS paths with exists()";
return;
}
my $list = $self->list( $thing );
foreach my $e ( @{ $list } ) {
my $path = $e->{pathSuffix};
my $type = $e->{type};
next if $re_ignore && $path && $path =~ $re_ignore;
if ( $type eq 'DIRECTORY' ) {
$cb->( $thing, $e );
eval {
$looper->( File::Spec->catdir( $thing, $path ) );
1;
} or do {
my $eval_error = $@ || 'Zombie error';
if ( $suppress ) {
warn "[ERROR DOWNGRADED] Failed to check $thing/$path: $eval_error";
next;
}
croak $eval_error;
}
}
elsif ( $type eq 'FILE' ) {
$cb->( $thing, $e );
}
else {
my $msg = "I don't know what to do with type=$type!";
if ( $suppress ) {
warn "[ERROR DOWNGRADED] $msg";
next;
}
croak $msg;
}
}
return;
};
$looper->( $file_path );
return;
}
1;
__END__
=head1 NAME
Net::Hadoop::WebHDFS - Client library for Hadoop WebHDFS and HttpFs
=head1 SYNOPSIS
use Net::Hadoop::WebHDFS;
my $client = Net::Hadoop::WebHDFS->new( host => 'hostname.local', port => 50070 );
my $statusArrayRef = $client->list('/');
my $contentData = $client->read('/data.txt');
$client->create('/foo/bar/data.bin', $bindata);
=head1 DESCRIPTION
This module supports WebHDFS v1 on Hadoop 1.x (and CDH4.0.0 or later), and HttpFs on Hadoop 2.x (and CDH4 or later).
WebHDFS/HttpFs has two authentication methods: pseudo authentication and Kerberos, but this module supports pseudo authentication only.
=head1 METHODS
Net::Hadoop::WebHDFS class method and instance methods.
=head2 CLASS METHODS
=head3 C<< Net::Hadoop::WebHDFS->new( %args ) :Net::Hadoop::WebHDFS >>
Creates and returns a new client instance with I<%args>.
If you are using HttpFs, set I<< httpfs_mode => 1 >> and I<< port => 14000 >>.
I<%args> might be:
=over
=item host :Str = "namenode.local"
=item port :Int = 50070
=item standby_host :Str = "standby.namenode.local"
=item standby_port :Int = 50070
=item username :Str = "hadoop"
=item doas :Str = "hdfs"
=item httpfs_mode :Bool = 0/1
=back
=head2 INSTANCE METHODS
=head3 C<< $client->create($path, $body, %options) :Bool >>
Creates file on HDFS with I<$body> data. If you want to create blank file, pass blank string.
I<%options> might be:
=over
=item overwrite :Str = "true" or "false"
=item blocksize :Int
=item replication :Int
=item permission :Str = "0600"
=item buffersize :Int
=back
=head3 C<< $client->append($path, $body, %options) :Bool >>
Append I<$body> data to I<$path> file.
I<%options> might be:
=over
=item buffersize :Int
=back
=head3 C<< $client->read($path, %options) :Str >>
Open file of I<$path> and returns its content. Alias: B<open>.
I<%options> might be:
=over
=item offset :Int
=item length :Int
=item buffersize :Int
=back
=head3 C<< $client->mkdir($path, [permission => '0644']) :Bool >>
Make directory I<$path> on HDFS. Alias: B<mkdirs>.
=head3 C<< $client->rename($path, $dest) :Bool >>
Rename file or directory as I<$dest>.
=head3 C<< $client->delete($path, [recursive => 0/1]) :Bool >>
Delete file I<$path> from HDFS. With optional I<< recursive => 1 >>, files and directories are removed recursively (default false).
=head3 C<< $client->stat($path) :HashRef >>
Get and returns file status object for I<$path>. Alias: B<getfilestatus>.
=head3 C<< $client->list($path) :ArrayRef >>
Get list of files in directory I<$path>, and returns these status objects arrayref. Alias: B<liststatus>.
=head3 C<< $client->content_summary($path) :HashRef >>
Get 'content summary' object and returns it. Alias: B<getcontentsummary>.
=head3 C<< $client->checksum($path) :HashRef >>
Get checksum information object for I<$path>. Alias: B<getfilechecksum>.
=head3 C<< $client->homedir() :Str >>
Get accessing user's home directory path. Alias: B<gethomedirectory>.
=head3 C<< $client->chmod($path, $mode) :Bool >>
Set permission of I<$path> as octal I<$mode>. Alias: B<setpermission>.
=head3 C<< $client->chown($path, [owner => 'username', group => 'groupname']) :Bool >>
Set owner or group of I<$path>. One of owner/group must be specified. Alias: B<setowner>.
=head3 C<< $client->replication($path, $replnum) :Bool >>
Set replica number for I<$path>. Alias: B<setreplication>.
=head3 C<< $client->touch($path, [modificationtime => $mtime, accesstime => $atime]) :Bool >>
Set mtime/atime of I<$path>. Alias: B<settimes>.
=head3 C<< $client->touchz($path) :Bool >>
Create a zero length file.
=head3 C<< $client->checkaccess( $path, $fsaction ) :Bool >>
Test if the user has the rights to do a file system action.
=head3 C<< $client->concat( $path, @source_paths ) :Bool >>
Concatenate paths.
=head3 C<< $client->truncate( $path, $newlength ) :Bool >>
Truncate a path contents.
=head3 C<< $client->delegation_token( $action, $path, @args ) >>
This is a method wrapping the multiple methods for delegation token
handling.
my $token = $client->delegation_token( get => $path );
print "Token: $token\n";
my $milisec = $client->delegation_token( renew => $token );
printf "Token expiration renewed until %s\n", scalar localtime $milisec / 1000;
if ( $client->delegation_token( cancel => $token ) ) {
print "Token cancelled. There will be a new one created.\n";
my $token_new = $client->delegation_token( get => $path );
print "New token: $token_new\n";
printf "New token is %s\n", $token_new eq $token ? 'the same' : 'different';
}
else {
warn "Failed to cancel token $token!";
}
=head4 C<< $client->delegation_token( get => $path, [renewer => $username, service => $service, kind => $kind ] ) :Str ) >>
Returns the delegation token id for the specified path.
=head4 C<< $client->delegation_token( renew => $token ) :Int >>
Returns the new expiration time for the specified delegation token in miliseconds.
=head4 C<< $client->delegation_token( cancel => $token ) :Bool >>
Cancels the specified delegation token (which will force a new one to be created.
=head3 C<< $client->snapshot( $path, $action => @args ) >>
This is a method wrapping the multiple methods for snapshot handling.
=head4 C<< $client->snapshot( $path, create => [, $snapshotname ] ) :Str >>
Creates a new snaphot on the specified path and returns the name of the
snapshot.
=head4 C<< $client->snapshot( $path, rename => $oldsnapshotname, $snapshotname ) :Bool >>
Renames the snaphot.
=head4 C<< $client->snapshot( $path, delete => $snapshotname ) :Bool >>
Deletes the specified snapshot.
=head3 C<< $client->xattr( $path, $action, @args ) >>
This is a method wrapping the multiple methods for extended attributes handling.
my @attr_names = $client->xattr( $path, 'list' );
my %attr = $client->xattr( $path, get => flatten => 1 );
if ( ! exists $attr{'user.bar'} ) {
warn "set user.bar = 42\n";
$client->xattr( $path, create => 'user.bar' => 42 )
|| warn "Failed to create user.bar";
}
else {
warn "alter user.bar = 24\n";
$client->xattr( $path, replace => 'user.bar' => 24 )
|| warn "Failed to replace user.bar";
;
}
if ( exists $attr{'user.foo'} ) {
warn "No more foo\n";
$client->xattr( $path, remove => 'user.foo')
|| warn "Failed to remove user.foo";
;
}
=head4 C<< $client->xattr( $path, get => [, names => \@attr_names] [, flatten => 1 ] [, encoding => $enc ] ) :Struct >>
Returns the extended attribute key/value pairs on a path. The default data set
is an array of hashrefs with the pairs, however if you set C<<flatten>> to a true
value then a simple hash will be returned.
It is also possible to fetch a subset of the attributes if you specify the
names of them with the C<<names>> option.
=head4 C<< $client->xattr( $path, 'list' ) :List >>
This method will return the names of all the attributes set on C<<$path>>.
=head4 C<< $client->xattr( $path, create => $attr_name => $value ) :Bool >>
It is possible to create a new extended attribute on a path with this method.
=head4 C<< $client->xattr( $path, replace => $attr_name => $value ) :Bool >>
It is possible to replace the value of an existing extended attribute on a path
with this method.
=head4 C<< $client->xattr( $path, remove => $attr_name ) :Bool >>
Deletes the speficied attribute on C<<$path>>.
=head2 EXTENSIONS
=head3 C<< $client->exists($path) :Bool >>
Returns the C<< stat() >> hash if successful, and false otherwise. Dies on
interface errors.
=head3 C<< $client->find($path, $callback, $options_hash) >>
Loops recursively over the specified path:
$client->find(
'/user',
sub {
my($cwd, $path) = @_;
my $date = localtime $path->{modificationTime};
my $type = $path->{type} eq q{DIRECTORY} ? "[dir ]" : "[file]";
my $size = sprintf "% 10s",
$path->{blockSize}
? sprintf "%.2f MB", $path->{blockSize} / 1024**2
: 0;
print "$type $size $path->{permission} $path->{owner}:$path->{group} $cwd/$path->{pathSuffix}\n";
},
{ # optional
re_ignore => qr{
\A # Filter some filenames out even before reaching the callback
[_] # logs and meta data, java junk, _SUCCESS files, etc.
}xms,
}
);
=head1 AUTHOR
TAGOMORI Satoshi E<lt>tagomoris {at} gmail.comE<gt>
=head1 LICENSE
This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.
=cut