################################################################################ # MogileFS::HTTPFile object # NOTE: This is meant to be used within IO::WrapTie... # package MogileFS::NewHTTPFile; use strict; no strict 'refs'; use Carp; use POSIX qw( EAGAIN ); use Socket qw( PF_INET SOCK_STREAM ); use Errno qw( EINPROGRESS EISCONN ); use vars qw($PROTO_TCP); use fields ('host', 'sock', # IO::Socket; created only when we need it 'uri', 'data', # buffered data we have 'pos', # simulated file position 'length', # length of data field 'content_length', # declared length of data we will be receiving (not required) 'mg', 'fid', 'devid', 'class', 'key', 'path', # full URL to save data to 'backup_dests', 'bytes_out', # count of how many bytes we've written to the socket 'data_in', # storage for data we've read from the socket 'create_close_args', # Extra arguments hashref for the do_request of create_close during CLOSE ); sub path { _getset(shift, 'path'); } sub class { _getset(shift, 'class', @_); } sub key { _getset(shift, 'key', @_); } sub _parse_url { my MogileFS::NewHTTPFile $self = shift; my $url = shift; return 0 unless $url =~ m!http://(.+?)(/.+)$!; $self->{host} = $1; $self->{uri} = $2; $self->{path} = $url; return 1; } sub TIEHANDLE { my MogileFS::NewHTTPFile $self = shift; $self = fields::new($self) unless ref $self; my %args = @_; return undef unless $self->_parse_url($args{path}); $self->{data} = ''; $self->{length} = 0; $self->{backup_dests} = $args{backup_dests} || []; $self->{content_length} = $args{content_length} + 0; $self->{pos} = 0; $self->{$_} = $args{$_} foreach qw(mg fid devid class key); $self->{bytes_out} = 0; $self->{data_in} = ''; $self->{create_close_args} = $args{create_close_args} || {}; return $self; } *new = *TIEHANDLE; sub _sock_to_host { # (host) my MogileFS::NewHTTPFile $self = shift; my $host = shift; # setup my ($ip, $port) = $host =~ /^(.*):(\d+)$/; my $sock = "Sock_$host"; my $proto = $PROTO_TCP ||= getprotobyname('tcp'); my $sin; # create the socket socket($sock, PF_INET, SOCK_STREAM, $proto); $sin = Socket::sockaddr_in($port, Socket::inet_aton($ip)); # unblock the socket IO::Handle::blocking($sock, 0); # attempt a connection my $ret = connect($sock, $sin); if (!$ret && $! == EINPROGRESS) { my $win = ''; vec($win, fileno($sock), 1) = 1; # watch for writeability if (select(undef, $win, undef, 3) > 0) { $ret = connect($sock, $sin); # EISCONN means connected & won't re-connect, so success $ret = 1 if !$ret && $! == EISCONN; } } # just throw back the socket we have return $sock if $ret; return undef; } sub _connect_sock { my MogileFS::NewHTTPFile $self = shift; return 1 if $self->{sock}; my @down_hosts; while (!$self->{sock} && $self->{host}) { # attempt to connect return 1 if $self->{sock} = $self->_sock_to_host($self->{host}); push @down_hosts, $self->{host}; if (my $dest = shift @{$self->{backup_dests}}) { # dest is [$devid,$path] _debug("connecting to $self->{host} (dev $self->{devid}) failed; now trying $dest->[1] (dev $dest->[0])"); $self->_parse_url($dest->[1]) or _fail("bogus URL"); $self->{devid} = $dest->[0]; } else { $self->{host} = undef; } } _fail("unable to open socket to storage node (tried: @down_hosts): $!"); } # abstracted read; implements what ends up being a blocking read but # does it in terms of non-blocking operations. sub _getline { my MogileFS::NewHTTPFile $self = shift; my $timeout = shift || 3; return undef unless $self->{sock}; # short cut if we already have data read if ($self->{data_in} =~ s/^(.*?\r?\n)//) { return $1; } my $rin = ''; vec($rin, fileno($self->{sock}), 1) = 1; # nope, we have to read a line my $nfound; my $t1 = Time::HiRes::time(); while ($nfound = select($rin, undef, undef, $timeout)) { my $data; my $bytesin = sysread($self->{sock}, $data, 1024); if (defined $bytesin) { # we can also get 0 here, which means EOF. no error, but no data. $self->{data_in} .= $data if $bytesin; } else { next if $! == EAGAIN; _fail("error reading from node for device $self->{devid}: $!"); } # return a line if we got one if ($self->{data_in} =~ s/^(.*?\r?\n)//) { return $1; } # and if we got no data, it's time to return EOF unless ($bytesin) { $@ = "\$bytesin is 0"; return undef; } } # if we got here, nothing was readable in our time limit my $t2 = Time::HiRes::time(); $@ = sprintf("not readable in %0.02f seconds", $t2-$t1); return undef; } # abstracted write function that uses non-blocking I/O and checking for # writeability to ensure that we don't get stuck doing a write if the # node we're talking to goes down. also handles logic to fall back to # a backup node if we're on our first write and the first node is down. # this entire function is a blocking function, it just uses intelligent # non-blocking write functionality. # # this function returns success (1) or it croaks on failure. sub _write { my MogileFS::NewHTTPFile $self = shift; return undef unless $self->{sock}; my $win = ''; vec($win, fileno($self->{sock}), 1) = 1; # setup data and counters my $data = shift(); my $bytesleft = length($data); my $bytessent = 0; # main sending loop for data, will keep looping until all of the data # we've been asked to send is sent my $nfound; while ($bytesleft && ($nfound = select(undef, $win, undef, 3))) { my $bytesout = syswrite($self->{sock}, $data, $bytesleft, $bytessent); if (defined $bytesout) { # update our myriad counters $bytessent += $bytesout; $self->{bytes_out} += $bytesout; $bytesleft -= $bytesout; } else { # if we get EAGAIN, restart the select loop, else fail next if $! == EAGAIN; _fail("error writing to node for device $self->{devid}: $!"); } } return 1 unless $bytesleft; # at this point, we had a socket error, since we have bytes left, and # the loop above didn't finish sending them. if this was our first # write, let's try to fall back to a different host. unless ($self->{bytes_out}) { if (my $dest = shift @{$self->{backup_dests}}) { # dest is [$devid,$path] $self->_parse_url($dest->[1]) or _fail("bogus URL"); $self->{devid} = $dest->[0]; $self->_connect_sock; # now repass this write to try again return $self->_write($data); } } # total failure (croak) $self->{sock} = undef; _fail(sprintf("unable to write to any allocated storage node, last tried dev %s on host %s uri %s. Had sent %s bytes, %s bytes left", $self->{devid}, $self->{host}, $self->{uri}, $self->{bytes_out}, $bytesleft)); } sub PRINT { my MogileFS::NewHTTPFile $self = shift; # get data to send to server my $data = shift; my $newlen = length $data; $self->{pos} += $newlen; # now make socket if we don't have one if (!$self->{sock} && $self->{content_length}) { $self->_connect_sock; $self->_write("PUT $self->{uri} HTTP/1.0\r\nContent-length: $self->{content_length}\r\n\r\n"); } # write some data to our socket if ($self->{sock}) { # save the first 1024 bytes of data so that we can seek back to it # and do some work later if ($self->{length} < 1024) { if ($self->{length} + $newlen > 1024) { $self->{length} = 1024; $self->{data} .= substr($data, 0, 1024 - $self->{length}); } else { $self->{length} += $newlen; $self->{data} .= $data; } } # actually write $self->_write($data); } else { # or not, just stick it on our queued data $self->{data} .= $data; $self->{length} += $newlen; } } *print = *PRINT; sub CLOSE { my MogileFS::NewHTTPFile $self = shift; # if we're closed and we have no sock... unless ($self->{sock}) { $self->_connect_sock; $self->_write("PUT $self->{uri} HTTP/1.0\r\nContent-length: $self->{length}\r\n\r\n"); $self->_write($self->{data}); } # set a message in $! and $@ my $err = sub { $@ = "$_[0]\n"; return undef; }; # get response from put if ($self->{sock}) { my $line = $self->_getline(6); # wait up to 6 seconds for response to PUT. return $err->("Unable to read response line from server ($self->{sock}) after PUT of $self->{length} to $self->{uri}. _getline says: $@") unless defined $line; if ($line =~ m!^HTTP/\d+\.\d+\s+(\d+)!) { # all 2xx responses are success unless ($1 >= 200 && $1 <= 299) { my $errcode = $1; # read through to the body my ($found_header, $body); while (defined (my $l = $self->_getline)) { # remove trailing stuff $l =~ s/[\r\n\s]+$//g; $found_header = 1 unless $l; next unless $found_header; # add line to the body, with a space for readability $body .= " $l"; } $body = substr($body, 0, 512) if length $body > 512; return $err->("HTTP response $errcode from upload of $self->{uri} to $self->{sock}: $body"); } } else { return $err->("Response line not understood from $self->{sock}: $line"); } $self->{sock}->close; } my MogileFS $mg = $self->{mg}; my $domain = $mg->{domain}; my $fid = $self->{fid}; my $devid = $self->{devid}; my $path = $self->{path}; my $create_close_args = $self->{create_close_args}; my $key = shift || $self->{key}; my $rv = $mg->{backend}->do_request ("create_close", { %$create_close_args, fid => $fid, devid => $devid, domain => $domain, size => $self->{content_length} ? $self->{content_length} : $self->{length}, key => $key, path => $path, }); unless ($rv) { # set $@, as our callers expect $@ to contain the error message that # failed during a close. since we failed in the backend, we have to # do this manually. return $err->("$mg->{backend}->{lasterr}: $mg->{backend}->{lasterrstr}"); } return 1; } *close = *CLOSE; sub TELL { # return our current pos return $_[0]->{pos}; } *tell = *TELL; sub SEEK { # simply set pos... _fail("seek past end of file") if $_[1] > $_[0]->{length}; $_[0]->{pos} = $_[1]; } *seek = *SEEK; sub EOF { return ($_[0]->{pos} >= $_[0]->{length}) ? 1 : 0; } *eof = *EOF; sub BINMODE { # no-op, we're always in binary mode } *binmode = *BINMODE; sub READ { my MogileFS::NewHTTPFile $self = shift; my $count = $_[1] + 0; my $max = $self->{length} - $self->{pos}; $max = $count if $count < $max; $_[0] = substr($self->{data}, $self->{pos}, $max); $self->{pos} += $max; return $max; } *read = *READ; ################################################################################ # MogileFS::NewHTTPFile class methods # sub _fail { croak "MogileFS::NewHTTPFile: $_[0]"; } sub _debug { MogileFS::Client::_debug(@_); } sub _getset { my MogileFS::NewHTTPFile $self = shift; my $what = shift; if (@_) { # note: we're a TIEHANDLE interface, so we're not QUITE like a # normal class... our parameters tend to come in via an arrayref my $val = shift; $val = shift(@$val) if ref $val eq 'ARRAY'; return $self->{$what} = $val; } else { return $self->{$what}; } } sub _fid { my MogileFS::NewHTTPFile $self = shift; return $self->{fid}; } 1;