package Net::Hadoop::WebHDFS; use strict; use warnings; use Carp; use JSON::XS qw//; use Furl; use File::Spec; use URI; use Try::Tiny; use constant GENERIC_FS_ACTION_WITH_NO_PATH => ''; our $VERSION = "0.8"; our %OPT_TABLE = (); sub new { my ($this, %opts) = @_; my $self = +{ host => $opts{host} || 'localhost', port => $opts{port} || 50070, standby_host => $opts{standby_host}, standby_port => ($opts{standby_port} || $opts{port} || 50070), httpfs_mode => $opts{httpfs_mode} || 0, username => $opts{username}, doas => $opts{doas}, useragent => $opts{useragent} || 'Furl Net::Hadoop::WebHDFS (perl)', timeout => $opts{timeout} || 10, suppress_errors => $opts{suppress_errors} || 0, last_error => undef, under_failover => 0, }; $self->{furl} = Furl::HTTP->new(agent => $self->{useragent}, timeout => $self->{timeout}, max_redirects => 0); return bless $self, $this; } # curl -i -X PUT "http://:/webhdfs/v1/?op=CREATE # [&overwrite=][&blocksize=][&replication=] # [&permission=][&buffersize=]" sub create { my ($self, $path, $body, %options) = @_; if ($self->{httpfs_mode}) { %options = (%options, data => 'true'); } my $err = $self->check_options('CREATE', %options); croak $err if $err; my $res = $self->operate_requests('PUT', $path, 'CREATE', \%options, $body); $res->{code} == 201; } $OPT_TABLE{CREATE} = ['overwrite', 'blocksize', 'replication', 'permission', 'buffersize', 'data']; # curl -i -X POST "http://:/webhdfs/v1/?op=APPEND # [&buffersize=]" sub append { my ($self, $path, $body, %options) = @_; if ($self->{httpfs_mode}) { %options = (%options, data => 'true'); } my $err = $self->check_options('APPEND', %options); croak $err if $err; my $res = $self->operate_requests('POST', $path, 'APPEND', \%options, $body); $res->{code} == 200; } $OPT_TABLE{APPEND} = ['buffersize', 'data']; # curl -i -L "http://:/webhdfs/v1/?op=OPEN # [&offset=][&length=][&buffersize=]" sub read { my ($self, $path, %options) = @_; my $err = $self->check_options('OPEN', %options); croak $err if $err; my $res = $self->operate_requests('GET', $path, 'OPEN', \%options); $res->{body}; } $OPT_TABLE{OPEN} = ['offset', 'length', 'buffersize']; sub open { (shift)->read(@_); } # curl -i -X PUT "http://:/?op=MKDIRS # [&permission=]" sub mkdir { my ($self, $path, %options) = @_; my $err = $self->check_options('MKDIRS', %options); croak $err if $err; my $res = $self->operate_requests('PUT', $path, 'MKDIRS', \%options); $self->check_success_json($res, 'boolean'); } $OPT_TABLE{MKDIRS} = ['permission']; sub mkdirs { (shift)->mkdir(@_); } # curl -i -X PUT "http://:/webhdfs/v1/?op=RENAME # &destination=" sub rename { my ($self, $path, $dest, %options) = @_; my $err = $self->check_options('RENAME', %options); croak $err if $err; unless ($dest =~ m!^/!) { $dest = '/' . $dest; } my $res = $self->operate_requests('PUT', $path, 'RENAME', {%options, destination => $dest}); $self->check_success_json($res, 'boolean'); } # curl -i -X DELETE "http://:/webhdfs/v1/?op=DELETE # [&recursive=]" sub delete { my ($self, $path, %options) = @_; my $err = $self->check_options('DELETE', %options); croak $err if $err; my $res = $self->operate_requests('DELETE', $path, 'DELETE', \%options); $self->check_success_json($res, 'boolean'); } $OPT_TABLE{DELETE} = ['recursive']; # curl -i "http://:/webhdfs/v1/?op=GETFILESTATUS" sub stat { my ($self, $path, %options) = @_; my $err = $self->check_options('GETFILESTATUS', %options); croak $err if $err; my $res = $self->operate_requests('GET', $path, 'GETFILESTATUS', \%options); $self->check_success_json($res, 'FileStatus'); } sub getfilestatus { (shift)->stat(@_); } # curl -i "http://:/webhdfs/v1/?op=LISTSTATUS" sub list { my ($self, $path, %options) = @_; my $err = $self->check_options('LISTSTATUS', %options); croak $err if $err; my $res = $self->operate_requests('GET', $path, 'LISTSTATUS', \%options); $self->check_success_json($res, 'FileStatuses')->{FileStatus}; } sub liststatus { (shift)->list(@_); } # curl -i "http://:/webhdfs/v1/?op=GETCONTENTSUMMARY" sub content_summary { my ($self, $path, %options) = @_; my $err = $self->check_options('GETCONTENTSUMMARY', %options); croak $err if $err; my $res = $self->operate_requests('GET', $path, 'GETCONTENTSUMMARY', \%options); $self->check_success_json($res, 'ContentSummary'); } sub getcontentsummary { (shift)->content_summary(@_); } # curl -i "http://:/webhdfs/v1/?op=GETFILECHECKSUM" sub checksum { my ($self, $path, %options) = @_; my $err = $self->check_options('GETFILECHECKSUM', %options); croak $err if $err; my $res = $self->operate_requests('GET', $path, 'GETFILECHECKSUM', \%options); $self->check_success_json($res, 'FileChecksum'); } sub getfilechecksum { (shift)->checksum(@_); } # curl -i "http://:/webhdfs/v1/?op=GETHOMEDIRECTORY" sub homedir { my ($self, %options) = @_; my $err = $self->check_options('GETHOMEDIRECTORY', %options); croak $err if $err; my $res = $self->operate_requests('GET', '/', 'GETHOMEDIRECTORY', \%options); $self->check_success_json($res, 'Path'); } sub gethomedirectory { (shift)->homedir(@_); } # curl -i -X PUT "http://:/webhdfs/v1/?op=SETPERMISSION # [&permission=]" sub chmod { my ($self, $path, $mode, %options) = @_; my $err = $self->check_options('SETPERMISSION', %options); croak $err if $err; my $res = $self->operate_requests('PUT', $path, 'SETPERMISSION', {%options, permission => $mode}); $res->{code} == 200; } sub setpermission { (shift)->chmod(@_); } # curl -i -X PUT "http://:/webhdfs/v1/?op=SETOWNER # [&owner=][&group=]" sub chown { my ($self, $path, %options) = @_; my $err = $self->check_options('SETOWNER', %options); croak $err if $err; unless (defined($options{owner}) or defined($options{group})) { croak "'chown' needs at least one of owner or group"; } my $res = $self->operate_requests('PUT', $path, 'SETOWNER', \%options); $res->{code} == 200; } $OPT_TABLE{SETOWNER} = ['owner', 'group']; sub setowner { (shift)->chown(@_); } # curl -i -X PUT "http://:/webhdfs/v1/?op=SETREPLICATION # [&replication=]" sub replication { my ($self, $path, $replnum, %options) = @_; my $err = $self->check_options('SETREPLICATION', %options); croak $err if $err; my $res = $self->operate_requests('PUT', $path, 'SETREPLICATION', {%options, replication => $replnum}); $self->check_success_json($res, 'boolean'); } sub setreplication { (shift)->replication(@_); } # curl -i -X PUT "http://:/webhdfs/v1/?op=SETTIMES # [&modificationtime=