package MogileFS::IOStatWatcher; use strict; use Sys::Syscall 0.22; # We use it indirectly, and trigger bugs in earlier versions. use Danga::Socket; use IO::Socket::INET; =head1 Methods =head2 $iow = MogileFS::IOStatWatcher->new() Returns a new IOStatWatcher object. =cut sub new { my ($class) = @_; my $self = bless { hosts => {}, }, $class; $self->on_stats; # set an empty handler. return $self; } =head2 $iow->set_hosts( host1 [, host2 [, ...] ] ) Sets the list of hosts to connect to for collecting IOStat information. This call can block if you pass it hostnames instead of ip addresses. Upon successful connection, the on_stats callback will be called each time the statistics are collected. Error states (failed connections, etc.) will trigger retries on 60 second intervals, and disconnects will trigger an immediate reconnect. =cut sub set_hosts { my ($self, @ips) = @_; my $old_hosts = $self->{hosts}; my $new_hosts = {}; foreach my $host (@ips) { $new_hosts->{$host} = (delete $old_hosts->{$host}) || MogileFS::IOStatWatch::Client->new($host, $self); } # TODO: close hosts that were removed (things in %$old_hosts) $self->{hosts} = $new_hosts; } =head2 $iow->on_stats( coderef ) Sets the coderef called for the C callback. =cut sub on_stats { my ($self, $cb) = @_; unless (ref $cb eq 'CODE') { $cb = sub {}; } $self->{on_stats} = $cb; } =head1 Callbacks =head2 on_stats->( host, stats ) Called each time device use statistics are collected. The C argument is the value passed in to the C method. The C object is a hashref of mogile device numbers (without leading "dev") to their corresponding utilization percentages. =cut # Everything beyond here is internal. sub got_stats { my ($self, $host, $stats) = @_; $self->{on_stats}->($host, $stats); } sub restart_monitoring_if_needed { my ($self, $host) = @_; return unless $self->{hosts}->{$host} && $self->{hosts}->{$host}->{closed}; $self->{hosts}->{$host} = MogileFS::IOStatWatch::Client->new($host, $self); } sub got_error { my ($self, $host) = @_; Danga::Socket->AddTimer(60, sub { $self->restart_monitoring_if_needed($host); }); } sub got_disconnect { my ($self, $host) = @_; $self->{hosts}->{$host} = MogileFS::IOStatWatch::Client->new($host, $self); } # Support class that does the communication with individual hosts. package MogileFS::IOStatWatch::Client; use Socket qw(SO_KEEPALIVE); use strict; use warnings; use base 'Danga::Socket'; use fields qw(host watcher buffer active); sub new { my MogileFS::IOStatWatch::Client $self = shift; my $hostspec = shift; my $watcher = shift; my $sock = IO::Socket::INET->new( PeerAddr => $hostspec, PeerPort => MogileFS->config("mogstored_stream_port"), Proto => 'tcp', Blocking => 0, ); return unless $sock; $sock->sockopt(SO_KEEPALIVE, 1); $self = fields::new($self) unless ref $self; $self->SUPER::new($sock); $self->watch_write(1); $self->watch_read(1); $self->{watcher} = $watcher; $self->{buffer} = ''; $self->{host} = $hostspec; return $self; } sub event_write { my MogileFS::IOStatWatch::Client $self = shift; $self->{active} = 1; $self->write("watch\n"); $self->watch_write(0); # I hope I can safely assume that 6 characters will write properly. } sub event_read { my MogileFS::IOStatWatch::Client $self = shift; my $bref = $self->read(10240); return $self->close unless defined $bref; $self->{buffer} .= $$bref; if ($self->{buffer} =~ m/^ERR\s+(.*?)\s* $ /x) { # There was an error on the way to watching this machine, close it and stay quiet. $self->close; } # If we can yank off lines till there is one by itself with a . on it, we've gotten a full set of stats. while ($self->{buffer} =~ s/^(.*?\n)?\.\n//s) { my %stats; foreach my $line (split /\n+/, $1) { next unless $line; my ($devnum, $util) = split /\s+/, $line; $stats{$devnum} = $util; } $self->{watcher}->got_stats($self->{host}, \%stats); } } sub event_err { my MogileFS::IOStatWatch::Client $self = shift; $self->{watcher}->got_error($self->{host}); } sub event_hup { my MogileFS::IOStatWatch::Client $self = shift; $self->{watcher}->got_error($self->{host}); } sub close { my MogileFS::IOStatWatch::Client $self = shift; if ($self->{active}) { $self->{watcher}->got_disconnect($self->{host}); } else { $self->{watcher}->got_error($self->{host}); } $self->SUPER::close(@_); } 1;