# File: Stem/AsyncIO.pm # This file is part of Stem. # Copyright (C) 1999, 2000, 2001 Stem Systems, Inc. # Stem is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # Stem is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # You should have received a copy of the GNU General Public License # along with Stem; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # For a license to use the Stem under conditions other than those # described here, to purchase support for this software, or to purchase a # commercial warranty contract, please contact Stem Systems at: # Stem Systems, Inc. 781-643-7504 # 79 Everett St. info@stemsystems.com # Arlington, MA 02474 # USA package Stem::AsyncIO ; use strict ; use Data::Dumper ; use Stem::Vars ; my $attr_spec = [ { 'name' => 'object', 'required' => 1, 'help' => < 'read_method', 'default' => 'async_read_data', 'help' => < 'stderr_method', 'default' => 'async_stderr_data', 'help' => < 'closed_method', 'default' => 'async_closed', 'help' => < 'fh', 'help' => < 'read_fh', 'help' => < 'write_fh', 'help' => < 'stderr_fh', 'help' => < 'data_addr', 'type' => 'address', 'help' => < 'stderr_addr', 'type' => 'address', 'help' => < 'data_msg_type', 'default' => 'data', 'help' => < 'codec', 'help' => < 'stderr_msg_type', 'default' => 'stderr_data', 'help' => < 'from_addr', 'type' => 'address', 'help' => < 'send_data_on_close', 'type' => 'boolean', 'help' => < 'id', 'help' => < 'log_label', 'default' => 'AIO', 'help' => < 'log_level', 'default' => 5, 'help' => < 'read_log', 'help' => < 'stderr_log', 'help' => < 'write_log', 'help' => <{'data_addr'} && ! $self->{'from_addr'} ) { return "Using 'data_addr in AsyncIO requires a 'from_addr'" ; } if ( my $codec = $self->{'codec'} ) { require Stem::Packet ; my $packet = Stem::Packet->new( 'codec' => $codec ) ; return $packet unless ref $packet ; $self->{'packet'} = $packet ; } $self->{'stderr_addr'} ||= $self->{'data_addr'} ; $self->{'buffer'} = '' if $self->{'send_data_on_close'} ; $self->{ 'read_fh' } ||= $self->{ 'fh' } ; $self->{ 'write_fh' } ||= $self->{ 'fh' } ; if ( my $read_fh = $self->{'read_fh'} ) { my $read_event = Stem::Event::Read->new( 'object' => $self, 'fh' => $read_fh, ) ; return $read_event unless ref $read_event ; $self->{'read_event'} = $read_event ; } if ( my $stderr_fh = $self->{'stderr_fh'} ) { my $stderr_event = Stem::Event::Read->new( 'object' => $self, 'fh' => $stderr_fh, 'method' => 'stderr_readable', ) ; return $stderr_event unless ref $stderr_event ; $self->{'stderr_event'} = $stderr_event ; } if ( my $write_fh = $self->{'write_fh'} ) { my $write_event = Stem::Event::Write->new( 'object' => $self, 'fh' => $write_fh, ) ; return $write_event unless ref $write_event ; $self->{'write_event'} = $write_event ; $self->{'write_buf'} = '' ; } return $self ; } sub shut_down { my( $self ) = @_ ; #cluck "SHUT $self\n" ; if ( $self->{'shut_down'} ) { return ; } $self->{'shutting_down'} = 1 ; $self->read_shut_down() ; $self->write_shut_down() ; if ( my $event = delete $self->{'stderr_event'} ) { $event->cancel() ; close( $self->{'stderr_fh'} ) ; } $self->{'shut_down'} = 1 ; #print "DELETE OBJ", caller(), "\n" ; delete $self->{'object'} ; } sub read_shut_down { my( $self ) = @_ ; if ( my $event = delete $self->{'read_event'} ) { $event->cancel() ; } shutdown( $self->{'read_fh'}, 0 ) ; } sub write_shut_down { my( $self ) = @_ ; if ( exists( $self->{'write_buf'} ) && length( $self->{'write_buf'} ) ) { #print "write handle shut when empty\n" ; $self->{'shut_down_when_empty'} = 1 ; return ; } if ( my $event = delete $self->{'write_event'} ) { shutdown( $self->{'write_fh'}, 1 ) ; $event->cancel() ; } } sub readable { my( $self ) = @_ ; my( $read_buf ) ; return if $self->{'shut_down'} ; my $bytes_read = sysread( $self->{'read_fh'}, $read_buf, 8192 ) ; #print "READ: $bytes_read [$read_buf]\n" ; unless( defined( $bytes_read ) && $bytes_read > 0 ) { $self->read_shut_down() ; if ( $self->{'send_data_on_close'} && length( $self->{'buffer'} ) ) { $self->send_data() ; # since we sent the total read buffer, we don't do a closed callback. return ; } $self->_callback( 'closed_method' ) ; return ; } # decode the packet if needed if ( my $packet = $self->{packet} ) { my $buf_ref = \$read_buf ; while( my $data_ref = $packet->to_data( $buf_ref ) ) { $self->send_data( $data_ref ) ; $buf_ref = undef ; } return ; } if ( $self->{'send_data_on_close'} ) { $self->{'buffer'} .= $read_buf ; return ; } $self->send_data( \$read_buf ) ; } sub send_data { my( $self, $buffer ) = @_ ; my $buf_ref = $buffer || \$self->{'buffer'} ; $self->_send_data_msg( 'data_addr', 'data_msg_type', $buf_ref ) ; $self->_callback( 'read_method', $buf_ref ) ; return ; } sub stderr_readable { my( $self ) = @_ ; my( $read_buf ) ; my $bytes_read = sysread( $self->{'stderr_fh'}, $read_buf, 8192 ) ; # no callback on stderr close. let the read handle close deal with the # shutdown return if $bytes_read == 0 ; #print "STDERR READ [$read_buf]\n" ; $self->_send_data_msg( 'stderr_addr', 'stderr_msg_type', \$read_buf ) ; $self->_callback( 'stderr_method', \$read_buf ) ; } sub _send_data_msg { my( $self, $addr_attr, $type_attr, $data_ref ) = @_ ; my $to_addr = $self->{$addr_attr} or return ; my $msg = Stem::Msg->new( 'to' => $to_addr, 'from' => $self->{'from_addr'}, 'type' => $self->{$type_attr}, 'data' => $data_ref, ) ; #print $msg->dump( 'SEND DATA' ) ; $msg->dispatch() ; } sub _callback { my ( $self, $method_attr, @data ) = @_ ; my $obj = $self->{'object'} or return ; my $method = $self->{$method_attr} ; my $code = $obj->can( $method ) or return ; return $obj->$code( @data, $self->{'id'} ) ; } sub write { my( $self ) = shift ; return unless @_ ; return unless exists( $self->{'write_buf'} ) ; my $buffer = shift ; return if $self->{'shut_down'} ; # encode the data in a packet if needed if ( my $packet = $self->{packet} ) { my $buf_ref = $packet->to_packet( $buffer ) ; $self->{'write_buf'} .= ${$buf_ref} ; } else { $self->{'write_buf'} .= ref $buffer eq 'SCALAR' ? ${$buffer} : $buffer ; } $self->{'write_event'}->start() ; } sub final_write { my( $self ) = @_ ; $self->write( $_[1] ) ; $self->write_shut_down() ; } sub writeable { my( $self ) = @_ ; return if $self->{'shut_down'} ; my $buf_ref = \$self->{'write_buf'} ; my $buf_len = length $$buf_ref ; #print "BUFLEN [$buf_len]\n" ; unless ( $buf_len ) { #print "AIO W STOPPING\n" ; $self->{'write_event'}->stop() ; return ; } my $bytes_written = syswrite( $self->{'write_fh'}, $$buf_ref ) ; unless( defined( $bytes_written ) ) { # do a SHUTDOWN return ; } # remove the part of the buffer that was written substr( $$buf_ref, 0, $bytes_written, '' ) ; return if length( $$buf_ref ) ; $self->write_shut_down() if $self->{'shut_down_when_empty'} ; } # DESTROY { # my( $self ) = @_ ; # print "DESTROY $self\n" ; # } 1 ;