package MogileFS::Worker::Replicate; # replicates files around use strict; use base 'MogileFS::Worker'; use fields ( 'fidtodo', # hashref { fid => 1 } ); use List::Util (); use MogileFS::Server; use MogileFS::Util qw(error every debug wait_for_readability); use MogileFS::Config; use MogileFS::ReplicationRequest qw(rr_upgrade); use Digest; use MIME::Base64 qw(encode_base64); sub new { my ($class, $psock) = @_; my $self = fields::new($class); $self->SUPER::new($psock); $self->{fidtodo} = {}; return $self; } # replicator wants sub watchdog_timeout { 90; } use constant SOCK_TIMEOUT => 45; sub work { my $self = shift; every(1.0, sub { $self->send_to_parent("worker_bored 100 replicate rebalance"); my $queue_todo = $self->queue_todo('replicate'); my $queue_todo2 = $self->queue_todo('rebalance'); return unless (@$queue_todo || @$queue_todo2); return unless $self->validate_dbh; my $sto = Mgd::get_store(); while (my $todo = shift @$queue_todo) { my $fid = $todo->{fid}; $self->replicate_using_torepl_table($todo); } while (my $todo = shift @$queue_todo2) { $self->still_alive; # deserialize the arg :/ $todo->{arg} = [split /,/, $todo->{arg}]; my $devfid = MogileFS::DevFID->new($todo->{devid}, $todo->{fid}); $self->rebalance_devfid($devfid, { target_devids => $todo->{arg} }); # If files error out, we want to send the error up to syslog # and make a real effort to chew through the queue. Users may # manually re-run rebalance to retry. $sto->delete_fid_from_file_to_queue($todo->{fid}, REBAL_QUEUE); } $_[0]->(0); # don't sleep. }); } # return 1 if we did something (or tried to do something), return 0 if # there was nothing to be done. sub replicate_using_torepl_table { my $self = shift; my $todo = shift; # find some fids to replicate, prioritize based on when they should be tried my $sto = Mgd::get_store(); my $fid = $todo->{fid}; $self->still_alive; my $errcode; my %opts; $opts{errref} = \$errcode; $opts{no_unlock} = 1; # to make it return an $unlock subref $opts{source_devid} = $todo->{fromdevid} if $todo->{fromdevid}; my ($status, $unlock) = replicate($fid, %opts); if ($status) { # $status is either 0 (failure, handled below), 1 (success, we actually # replicated this file), or 2 (success, but someone else replicated it). # when $staus eq "lost_race", this delete is unnecessary normally # (somebody else presumably already deleted it if they # also replicated it), but in the case of running with old # replicators from previous versions, -or- simply if the # other guy's delete failed, this cleans it up.... $sto->delete_fid_from_file_to_replicate($fid); $unlock->() if $unlock; next; } debug("Replication of fid=$fid failed with errcode=$errcode") if $Mgd::DEBUG >= 2; # ERROR CASES: # README: please keep this up to date if you update the replicate() function so we ensure # that this code always does the right thing # # -- HARMLESS -- # failed_getting_lock => harmless. skip. somebody else probably doing. # # -- ACTIONABLE -- # too_happy => too many copies, attempt to rebalance. # # -- TEMPORARY; DO EXPONENTIAL BACKOFF -- # source_down => only source available is observed down. # policy_error_doing_failed => policy plugin fucked up. it's looping. # policy_error_already_there => policy plugin fucked up. it's dumb. # policy_no_suggestions => no copy was attempted. policy is just not happy. # copy_error => policy said to do 1+ things, we failed, it ran out of suggestions. # # -- FATAL; DON'T TRY AGAIN -- # no_source => it simply exists nowhere. not that something's down, but file_on is empty. # bail if we failed getting the lock, that means someone else probably # already did it, so we should just move on if ($errcode eq 'failed_getting_lock') { $unlock->() if $unlock; next; } # logic for setting the next try time appropriately my $update_nexttry = sub { my ($type, $delay) = @_; my $sto = Mgd::get_store(); if ($type eq 'end_of_time') { # special; update to a time that won't happen again, # as we've encountered a scenario in which case we're # really hosed $sto->reschedule_file_to_replicate_absolute($fid, $sto->end_of_time); } elsif ($type eq "offset") { $sto->reschedule_file_to_replicate_relative($fid, $delay+0); } else { $sto->reschedule_file_to_replicate_absolute($fid, $delay+0); } }; # now let's handle any error we want to consider a total failure; do not # retry at any point. push this file off to the end so someone has to come # along and figure out what went wrong. if ($errcode eq 'no_source') { $update_nexttry->( end_of_time => 1 ); $unlock->() if $unlock; next; } # try to shake off extra copies. fall through to the backoff logic # so we don't flood if it's impossible to properly weaken the fid. # there's a race where the fid could be checked again, but the # exclusive locking prevents replication clobbering. if ($errcode eq 'too_happy') { $unlock->() if $unlock; $unlock = undef; my $f = MogileFS::FID->new($fid); my @devs = List::Util::shuffle($f->devids); my $devfid; # First one we can delete from, we try to rebalance away from. for (@devs) { my $dev = Mgd::device_factory()->get_by_id($_); # Not positive 'should_read_from' needs to be here. # We must be able to delete off of this dev so the fid can # move. if ($dev->can_delete_from && $dev->should_read_from) { $devfid = MogileFS::DevFID->new($dev, $f); last; } } if ($devfid) { if ($self->rebalance_devfid($devfid)) { # disable exponential backoff below if we rebalanced due to # excessive replication: $todo->{failcount} = 0; } } } # at this point, the rest of the errors require exponential backoff. define what this means # as far as failcount -> delay to next try. # 15s, 1m, 5m, 30m, 1h, 2h, 4h, 8h, 24h, 24h, 24h, 24h, ... my @backoff = qw( 15 60 300 1800 3600 7200 14400 28800 ); $update_nexttry->( offset => int(($backoff[$todo->{failcount}] || 86400) * (rand(0.4) + 0.8)) ); $unlock->() if $unlock; return 1; } # Return 1 on success, 0 on failure or no-op. sub rebalance_devfid { my ($self, $devfid, $opts) = @_; $opts ||= {}; MogileFS::Util::okay_args($opts, qw(avoid_devids target_devids)); my $fid = $devfid->fid; # bail out early if this FID is no longer in the namespace (weird # case where file is in file_on because not yet deleted, but # has been replaced/deleted in 'file' table...). not too harmful # (just noisy) if this line didn't exist, but whatever... it # makes stuff cleaner on my intentionally-corrupted-for-fsck-testing # dev machine... return 1 if ! $fid->exists; my $errcode; my ($ret, $unlock) = replicate($fid, mask_devids => { $devfid->devid => 1 }, no_unlock => 1, target_devids => $opts->{target_devids}, errref => \$errcode, ); my $fail = sub { my $error = shift; $unlock->(); error("Rebalance for $devfid (" . $devfid->url . ") failed: $error"); return 0; }; unless ($ret || $errcode eq "too_happy") { return $fail->("Replication failed"); } my $should_delete = 0; my $del_reason; if ($errcode eq "too_happy" || $ret eq "lost_race") { # for some reason, we did no work. that could be because # either 1) we lost the race, as the error code implies, # and some other process rebalanced this first, or 2) # the file is over-replicated, and everybody just thinks they # lost the race because the replication policy said there's # nothing to do, even with this devfid masked away. # so let's figure it out... if this devfid still exists, # we're over-replicated, else we just lost the race. if ($devfid->exists) { # over-replicated # see if some copy, besides this one we want # to delete, is currently alive & of right size.. # just as extra paranoid check before we delete it foreach my $test_df ($fid->devfids) { next if $test_df->devid == $devfid->devid; if ($test_df->size_matches) { $should_delete = 1; $del_reason = "over_replicated"; last; } } } else { # lost race $should_delete = 0; # no-op } } elsif ($ret eq "would_worsen") { # replication has indicated we would be making ruining this fid's day # if we delete an existing copy, so lets not do that. # this indicates a condition where there're no suitable devices to # copy new data onto, so lets be loud about it. return $fail->("no suitable destination devices available"); } else { $should_delete = 1; $del_reason = "did_rebalance;ret=$ret"; } my %destroy_opts; $destroy_opts{ignore_missing} = 1 if MogileFS::Config->config("rebalance_ignore_missing"); if ($should_delete) { eval { $devfid->destroy(%destroy_opts) }; if ($@) { return $fail->("HTTP delete (due to '$del_reason') failed: $@"); } } $unlock->(); return $should_delete; } # replicates $fid to make sure it meets its class' replicate policy. # # README: if you update this sub to return a new error code, please update the # appropriate callers to know how to deal with the errors returned. # # returns either: # $rv # ($rv, $unlock_sub) -- when 'no_unlock' %opt is used. subref to release lock. # $rv is one of: # 0 = failure (failure written to ${$opts{errref}}) # 1 = success # "lost_race" = skipping, we did no work and policy was already met. # "nofid" => fid no longer exists. skip replication. sub replicate { my ($fid, %opts) = @_; $fid = MogileFS::FID->new($fid) unless ref $fid; my $fidid = $fid->id; debug("Replication for $fidid called, opts=".join(',',keys(%opts))) if $Mgd::DEBUG >= 2; my $errref = delete $opts{'errref'}; my $no_unlock = delete $opts{'no_unlock'}; my $fixed_source = delete $opts{'source_devid'}; my $mask_devids = delete $opts{'mask_devids'} || {}; my $avoid_devids = delete $opts{'avoid_devids'} || {}; my $target_devids = delete $opts{'target_devids'} || []; # inverse of avoid_devids. die "unknown_opts" if %opts; die unless ref $mask_devids eq "HASH"; my $sdevid; my $sto = Mgd::get_store(); my $unlock = sub { $sto->note_done_replicating($fidid); }; my $retunlock = sub { my $rv = shift; my ($errmsg, $errcode); if (@_ == 2) { ($errcode, $errmsg) = @_; $errmsg = "$errcode: $errmsg"; # include code with message } else { ($errmsg) = @_; } $$errref = $errcode if $errref; my $ret; if ($errcode && $errcode eq "failed_getting_lock") { # don't emit a warning with error() on lock failure. not # a big deal, don't scare people. $ret = 0; } else { $ret = $rv ? $rv : error($errmsg); } if ($no_unlock) { die "ERROR: must be called in list context w/ no_unlock" unless wantarray; return ($ret, $unlock); } else { die "ERROR: must not be called in list context w/o no_unlock" if wantarray; $unlock->(); return $ret; } }; # hashref of devid -> MogileFS::Device my $devs = Mgd::device_factory()->map_by_id or die "No device map"; return $retunlock->(0, "failed_getting_lock", "Unable to obtain lock for fid $fidid") unless $sto->should_begin_replicating_fidid($fidid); # if the fid doesn't even exist, consider our job done! no point # replicating file contents of a file no longer in the namespace. return $retunlock->("nofid") unless $fid->exists; my $cls = $fid->class; my $polobj = $cls->repl_policy_obj; # learn what this devices file is already on my @on_devs; # all devices fid is on, reachable or not. my @on_devs_tellpol; # subset of @on_devs, to tell the policy class about my @on_up_devid; # subset of @on_devs: just devs that are readable foreach my $devid ($fid->devids) { my $d = Mgd::device_factory()->get_by_id($devid) or next; push @on_devs, $d; if ($d->dstate->should_have_files && ! $mask_devids->{$devid}) { push @on_devs_tellpol, $d; } if ($d->should_read_from) { push @on_up_devid, $devid; } } return $retunlock->(0, "no_source", "Source is no longer available replicating $fidid") if @on_devs == 0; return $retunlock->(0, "source_down", "No alive devices available replicating $fidid") if @on_up_devid == 0; if ($fixed_source && ! grep { $_ == $fixed_source } @on_up_devid) { error("Fixed source dev$fixed_source requested for $fidid but not available. Trying other devices"); } my %dest_failed; # devid -> 1 for each devid we were asked to copy to, but failed. my %source_failed; # devid -> 1 for each devid we had problems reading from. my $got_copy_request = 0; # true once replication policy asks us to move something somewhere my $copy_err; my $dest_devs = $devs; if (@$target_devids) { $dest_devs = {map { $_ => $devs->{$_} } @$target_devids}; } my $rr; # MogileFS::ReplicationRequest while (1) { $rr = rr_upgrade($polobj->replicate_to( fid => $fidid, on_devs => \@on_devs_tellpol, # all device objects fid is on, dead or otherwise all_devs => $dest_devs, failed => \%dest_failed, min => $cls->mindevcount, )); last if $rr->is_happy; my @ddevs; # dest devs, in order of preference my $ddevid; # dest devid we've chosen to copy to if (@ddevs = $rr->copy_to_one_of_ideally) { if (my @not_masked_ids = (grep { ! $mask_devids->{$_} && ! $avoid_devids->{$_} } map { $_->id } @ddevs)) { $ddevid = $not_masked_ids[0]; } else { # once we masked devids away, there were no # ideal suggestions. this is the case of rebalancing, # which without this check could 'worsen' the state # of the world. consider the case: # h1[ d1 d2 ] h2[ d3 ] # and files are on d1 & d3, an ideal layout. # if d3 is being rebalanced, and masked away, the # replication policy could presumably say to put # the file on d2, even though d3 isn't dead. # so instead, when masking is in effect, we don't # use non-ideal placement, just bailing out. # this used to return "lost_race" as a lie, but rebalance was # happily deleting the masked fid if at least one other fid # existed... because it assumed it was over replicated. # now we tell rebalance that touching this fid would be # stupid. return $retunlock->("would_worsen"); } } elsif (@ddevs = $rr->copy_to_one_of_desperate) { # TODO: reschedule a replication for 'n' minutes in future, or # when new hosts/devices become available or change state $ddevid = $ddevs[0]->id; } else { last; } $got_copy_request = 1; # replication policy shouldn't tell us to put a file on a device # we've already told it that we've failed at. so if we get that response, # the policy plugin is broken and we should terminate now. if ($dest_failed{$ddevid}) { return $retunlock->(0, "policy_error_doing_failed", "replication policy told us to do something we already told it we failed at while replicating fid $fidid"); } # replication policy shouldn't tell us to put a file on a # device that it's already on. that's just stupid. if (grep { $_->id == $ddevid } @on_devs) { return $retunlock->(0, "policy_error_already_there", "replication policy told us to put fid $fidid on dev $ddevid, but it's already there!"); } # find where we're replicating from { # TODO: use an observed good device+host as source to start. my @choices = grep { ! $source_failed{$_} } @on_up_devid; return $retunlock->(0, "source_down", "No devices available replicating $fidid") unless @choices; if ($fixed_source && grep { $_ == $fixed_source } @choices) { $sdevid = $fixed_source; } else { @choices = List::Util::shuffle(@choices); MogileFS::run_global_hook('replicate_order_final_choices', $devs, \@choices); $sdevid = shift @choices; } } my $worker = MogileFS::ProcManager->is_child or die; my $digest; my $fid_checksum = $fid->checksum; $digest = Digest->new($fid_checksum->hashname) if $fid_checksum; $digest ||= Digest->new($cls->hashname) if $cls->hashtype; my $rv = http_copy( sdevid => $sdevid, ddevid => $ddevid, fid => $fid, errref => \$copy_err, callback => sub { $worker->still_alive; }, digest => $digest, ); die "Bogus error code: $copy_err" if !$rv && $copy_err !~ /^(?:src|dest)_error$/; unless ($rv) { error("Failed copying fid $fidid from devid $sdevid to devid $ddevid (error type: $copy_err)"); if ($copy_err eq "src_error") { $source_failed{$sdevid} = 1; if ($fixed_source && $fixed_source == $sdevid) { error("Fixed source dev$fixed_source was requested for $fidid but failed: will try other sources"); } } else { $dest_failed{$ddevid} = 1; } next; } my $dfid = MogileFS::DevFID->new($ddevid, $fid); $dfid->add_to_db; if ($digest && !$fid->checksum) { $sto->set_checksum($fidid, $cls->hashtype, $digest->digest); } push @on_devs, $devs->{$ddevid}; push @on_devs_tellpol, $devs->{$ddevid}; push @on_up_devid, $ddevid; } # We are over replicated. Let caller decide if it should rebalance. if ($rr->too_happy) { return $retunlock->(0, "too_happy", "fid $fidid is on too many devices"); } if ($rr->is_happy) { return $retunlock->(1) if $got_copy_request; return $retunlock->("lost_race"); # some other process got to it first. policy was happy immediately. } return $retunlock->(0, "policy_no_suggestions", "replication policy ran out of suggestions for us replicating fid $fidid"); } # Returns a hashref with the following: # { # code => HTTP status code integer, # keep => boolean, whether to keep the connection after reading # len => value of the Content-Length header (integer) # } # Returns undef on timeout sub read_headers { my ($sock, $intercopy_cb) = @_; my $head = ''; do { wait_for_readability(fileno($sock), SOCK_TIMEOUT) or return; $intercopy_cb->(); my $r = sysread($sock, $head, 1024, length($head)); if (defined $r) { return if $r == 0; # EOF } elsif ($!{EAGAIN} || $!{EINTR}) { # loop again } else { return; } } until ($head =~ /\r?\n\r?\n/); my $data; ($head, $data) = split(/\r?\n\r?\n/, $head, 2); my @head = split(/\r?\n/, $head); $head = shift(@head); $head =~ m!\AHTTP/(\d+\.\d+)\s+(\d+)! or return; my %rv = ( keep => $1 >= 1.1, code => $2 ); foreach my $line (@head) { if ($line =~ /\AConnection:\s*keep-alive\s*\z/is) { $rv{keep} = 1; } elsif ($line =~ /\AConnection:\s*close\s*\z/is) { $rv{keep} = 0; } elsif ($line =~ /\AContent-Length:\s*(\d+)\s*\z/is) { $rv{len} = $1; } } return (\%rv, $data); } # copies a file from one Perlbal to another utilizing HTTP sub http_copy { my %opts = @_; my ($sdevid, $ddevid, $fid, $intercopy_cb, $errref, $digest) = map { delete $opts{$_} } qw(sdevid ddevid fid callback errref digest ); die if %opts; $fid = MogileFS::FID->new($fid) unless ref($fid); my $fidid = $fid->id; my $expected_clen = $fid->length; my $clen; my $content_md5 = ''; my ($sconn, $dconn); my $fid_checksum = $fid->checksum; if ($fid_checksum && $fid_checksum->hashname eq "MD5") { # some HTTP servers may be able to verify Content-MD5 on PUT # and reject corrupted requests. no HTTP server should reject # a request for an unrecognized header my $b64digest = encode_base64($fid_checksum->{checksum}, ""); $content_md5 = "\r\nContent-MD5: $b64digest"; } $intercopy_cb ||= sub {}; my $err_common = sub { my ($err, $msg) = @_; $$errref = $err if $errref; $sconn->close($err) if $sconn; $dconn->close($err) if $dconn; return error($msg); }; # handles setting unreachable magic; $error->(reachability, "message") my $error_unreachable = sub { return $err_common->("src_error", "Fid $fidid unreachable while replicating: $_[0]"); }; my $dest_error = sub { return $err_common->("dest_error", $_[0]); }; my $src_error = sub { return $err_common->("src_error", $_[0]); }; # get some information we'll need my $sdev = Mgd::device_factory()->get_by_id($sdevid); my $ddev = Mgd::device_factory()->get_by_id($ddevid); return error("Error: unable to get device information: source=$sdevid, destination=$ddevid, fid=$fidid") unless $sdev && $ddev; my $s_dfid = MogileFS::DevFID->new($sdev, $fid); my $d_dfid = MogileFS::DevFID->new($ddev, $fid); my ($spath, $dpath) = (map { $_->uri_path } ($s_dfid, $d_dfid)); my ($shost, $dhost) = (map { $_->host } ($sdev, $ddev)); my ($shostip, $sport) = ($shost->ip, $shost->http_port); if (MogileFS::Config->config("repl_use_get_port")) { $sport = $shost->http_get_port; } my ($dhostip, $dport) = ($dhost->ip, $dhost->http_port); unless (defined $spath && defined $dpath && defined $shostip && defined $dhostip && $sport && $dport) { # show detailed information to find out what's not configured right error("Error: unable to replicate file fid=$fidid from device id $sdevid to device id $ddevid"); error(" http://$shostip:$sport$spath -> http://$dhostip:$dport$dpath"); return 0; } my $put = "PUT $dpath HTTP/1.0\r\nConnection: keep-alive\r\n" . "Content-length: $expected_clen$content_md5\r\n\r\n"; # need by webdav servers, like lighttpd... $ddev->vivify_directories($d_dfid->url); # call a hook for odd casing completely different source data # for specific files. my $shttphost; MogileFS::run_global_hook('replicate_alternate_source', $fid, \$shostip, \$sport, \$spath, \$shttphost); my $durl = "http://$dhostip:$dport$dpath"; my $surl = "http://$shostip:$sport$spath"; # okay, now get the file my %sopts = ( ip => $shostip, port => $sport ); my $get = "GET $spath HTTP/1.0\r\nConnection: keep-alive\r\n"; # plugin set a custom host. $get .= "Host: $shttphost\r\n" if $shttphost; my ($sock, $dsock); my ($wcount, $bytes_to_read, $written, $remain); my ($stries, $dtries) = (0, 0); my ($sres, $data, $bytes); retry: $sconn->close("retrying") if $sconn; $dconn->close("retrying") if $dconn; $dconn = undef; $stries++; $sconn = $shost->http_conn_get(\%sopts) or return $src_error->("Unable to create source socket to $shostip:$sport for $spath"); $sock = $sconn->sock; unless ($sock->write("$get\r\n")) { goto retry if $sconn->retryable && $stries == 1; return $src_error->("Pipe closed retrieving $spath from $shostip:$sport"); } # we just want a content length ($sres, $data) = read_headers($sock, $intercopy_cb); unless ($sres) { goto retry if $sconn->retryable && $stries == 1; return $error_unreachable->("Error: Resource $surl failed to return an HTTP response"); } unless ($sres->{code} >= 200 && $sres->{code} <= 299) { return $error_unreachable->("Error: Resource $surl failed: HTTP $sres->{code}"); } $clen = $sres->{len}; return $error_unreachable->("File $spath has unexpected content-length of $clen, not $expected_clen") if $clen != $expected_clen; # open target for put $dtries++; $dconn = $dhost->http_conn_get or return $dest_error->("Unable to create dest socket to $dhostip:$dport for $dpath"); $dsock = $dconn->sock; unless ($dsock->write($put)) { goto retry if $dconn->retryable && $dtries == 1; return $dest_error->("Pipe closed during write to $dpath on $dhostip:$dport"); } # now read data and print while we're reading. $bytes = length($data); ($written, $remain) = (0, $clen); $bytes_to_read = 1024*1024; # read 1MB at a time until there's less than that remaining $bytes_to_read = $remain if $remain < $bytes_to_read; $wcount = 0; while ($bytes_to_read) { unless (defined $bytes) { read_again: $bytes = sysread($sock, $data, $bytes_to_read); unless (defined $bytes) { if ($!{EAGAIN} || $!{EINTR}) { wait_for_readability(fileno($sock), SOCK_TIMEOUT) and goto read_again; } return $src_error->("error reading midway through source: $!"); } if ($bytes == 0) { return $src_error->("EOF reading midway through source: $!"); } } # now we've read in $bytes bytes $remain -= $bytes; $bytes_to_read = $remain if $remain < $bytes_to_read; $digest->add($data) if $digest; my $data_len = $bytes; $bytes = undef; my $data_off = 0; while (1) { my $wbytes = syswrite($dsock, $data, $data_len, $data_off); unless (defined $wbytes) { # it can take two writes to determine if a socket is dead # (TCP_NODELAY and TCP_CORK are (and must be) zero here) goto retry if (!$wcount && $dconn->retryable && $dtries == 1); return $dest_error->("Error: syswrite failed after $written bytes with: $!; failed putting to $dpath"); } $wcount++; $written += $wbytes; $intercopy_cb->(); last if ($data_len == $wbytes); $data_len -= $wbytes; $data_off += $wbytes; } die if $bytes_to_read < 0; } # source connection drained, return to pool if ($sres->{keep}) { $shost->http_conn_put($sconn); $sconn = undef; } else { $sconn->close("http_close"); } # callee will want this digest, too, so clone as "digest" is destructive $digest = $digest->clone->digest if $digest; if ($fid_checksum) { if ($digest ne $fid_checksum->{checksum}) { my $expect = $fid_checksum->hexdigest; $digest = unpack("H*", $digest); return $src_error->("checksum mismatch on GET: expected: $expect actual: $digest"); } } # now read in the response line (should be first line) my ($dres, $ddata) = read_headers($dsock, $intercopy_cb); unless ($dres) { goto retry if (!$wcount && $dconn->retryable && $dtries == 1); return $dest_error->("Error: HTTP response line not recognized writing to $durl"); } # drain the response body if there is one # there may be no dres->{len}/Content-Length if there is no body my $dlen = ($dres->{len} || 0) - length($ddata); if ($dlen > 0) { my $r = $dsock->read($data, $dlen); # dres->{len} should be tiny if (defined $r) { if ($r != $dlen) { Mgd::error("Failed to read $r of Content-Length:$dres->{len} bytes for PUT response on $durl"); $dres->{keep} = 0; } } else { Mgd::error("Failed to read Content-Length:$dres->{len} bytes for PUT response on $durl ($!)"); $dres->{keep} = 0; } } elsif ($dlen < 0) { Mgd::error("strange response Content-Length:$dres->{len} with ". length($ddata) . " extra bytes for PUT response on $durl ($!)"); $dres->{keep} = 0; } # return the connection back to the connection pool if ($dres->{keep}) { $dhost->http_conn_put($dconn); $dconn = undef; } else { $dconn->close("http_close"); } if ($dres->{code} >= 200 && $dres->{code} <= 299) { if ($digest) { my $alg = ($fid_checksum && $fid_checksum->hashname) || $fid->class->hashname; if ($ddev->{reject_bad_md5} && ($alg eq "MD5")) { # dest device would've rejected us with a error, # no need to reread the file return 1; } my $httpfile = MogileFS::HTTPFile->at($durl); my $actual = $httpfile->digest($alg, $intercopy_cb); if ($actual ne $digest) { my $expect = unpack("H*", $digest); $actual = unpack("H*", $actual); return $dest_error->("checksum mismatch on PUT, expected: $expect actual: $digest"); } } return 1; } return $dest_error->("Got HTTP status code $dres->{code} PUTing to $durl"); } 1; # Local Variables: # mode: perl # c-basic-indent: 4 # indent-tabs-mode: nil # End: __END__ =head1 NAME MogileFS::Worker::Replicate -- replicates files =head1 OVERVIEW This process replicates files enqueued in B table. The replication policy (which devices to replicate to) is pluggable, but only one policy comes with the server. See L =head1 SEE ALSO L L L