# File: Stem/Cell.pm # This file is part of Stem. # Copyright (C) 1999, 2000, 2001 Stem Systems, Inc. # Stem is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # Stem is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # You should have received a copy of the GNU General Public License # along with Stem; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # For a license to use the Stem under conditions other than those # described here, to purchase support for this software, or to purchase a # commercial warranty contract, please contact Stem Systems at: # Stem Systems, Inc. 781-643-7504 # 79 Everett St. info@stemsystems.com # Arlington, MA 02474 # USA package Stem::Cell ; use strict ; use Data::Dumper ; use Carp qw( cluck ) ; use Stem::Route qw( :cell ) ; use Stem::AsyncIO ; use Stem::Id ; use Stem::Gather ; use Stem::Cell::Clone ; use Stem::Cell::Pipe ; use Stem::Cell::Flow ; use Stem::Cell::Work ; use Stem::Trace 'log' => 'stem_status' , 'sub' => 'TraceStatus' ; my %class_to_attr_name ; my $attr_spec = [ { 'name' => 'reg_name', 'help' => < 'cloneable', 'type' => 'boolean', 'help' => < 'data_addr', 'type' => 'address', 'help' => < 'status_addr', 'type' => 'address', 'help' => < 'send_data_on_close', 'type' => 'boolean', 'help' => < 'no_io', 'type' => 'boolean', 'help' => < 'pipe_addr', 'type' => 'address', 'help' => < 'pipe_args', 'help' => < 'aio_args', 'type' => 'hash', 'help' => < 'errors_to_output', 'env' => 'errors_to_output', 'help' => < 'id_size', 'default' => 3, 'help' => < 'trigger_method', 'default' => 'triggered_cell', 'help' => < 'shut_down_method', 'default' => 'shut_down_cell', 'help' => < 'activated_method', 'default' => 'activate_cell', 'help' => < 'sequence_done_method', 'help' => < 'codec', 'help' => < 'work_ready_addr', 'type' => 'address', 'help' => < 'stderr_log', 'help' => <_dump( 'NEW' ) ; return( $self ) ; } # this is only called in Stem::Conf for this class. # it initializes the cell info object inside its owner object. sub cell_init { my( $self, $owner_obj, $cell_name, $cell_info_attr ) = @_ ; # the $owner_obj is the cell that owns this Stem::Cell object $self->{'owner_obj'} = $owner_obj ; $self->{'cell_name'} = $cell_name ; # $self->{'from_addr'} = $cell_name ; $self->{'from_addr'} = Stem::Msg::make_address_string( $Stem::Vars::Hub_name, $cell_name ) ; $self->{'cell_info_attr'} = $cell_info_attr ; # save the attribute name that the owner class uses for the cell info. # this is how a cell info object can be found given an owner cell object. # also keep this name in the info itself #print "OWNER [$owner_obj]\n" ; $class_to_attr_name{ ref $owner_obj } ||= $cell_info_attr ; if ( $self->{'cloneable'} ) { $self->{'id_obj'} = Stem::Id->new( 'size' => $self->{'id_size'} ) ; $self->{'is_parent'} = 1 ; $self->{'target'} = '' ; } } # get the cell info whether we were called from the owner object or # the cell info itself ; sub _get_cell_info { my ( $self ) = @_ ; my $class = ref $self ; return "can't get cell info from '$self'\n" unless $class ; return $self if $class eq __PACKAGE__ ; #print "CLASS [$class][$class_to_attr_name{ $class }]\n" ; return $self->{ $class_to_attr_name{ $class } } ; } sub cell_trigger { my ( $self, @args ) = @_ ; my $self_info = $self->_get_cell_info() ; return $self_info unless ref $self_info ; return if $self_info->{'triggered'} ; # clone this cell and its info if needed # $cell will either be $self or a clone of $self my $cell = $self_info->_clone() ; my $cell_info = $cell->_get_cell_info() ; $cell_info->{'triggered'} = 1 ; #print $cell_info->_dump( 'TRIGGER' ) ; # set any args (e.g. from trigger message) into this cell $cell_info->cell_set_args( @args ) ; $cell_info->_cell_pipe() ; if ( my $err = $cell_info->_gather_io_args() ) { $cell_info->cell_shut_down( $err ) ; return $err ; } # do the callback into the (possibly cloned) cell if ( my $err = $cell_info->_callback( 'trigger_method' ) ) { #print "CALLBACK $err\n" ; $cell_info->cell_shut_down( $err ) ; return $err ; } # return $cell_info ; return ; } sub cell_trigger_cmd { my ( $self, $msg ) = @_ ; my @args ; if ( my $data = $msg->data() ) { $data = ${$data} if ref $data eq 'SCALAR' ; my $ref = ref $data ; if ( ! $ref && defined $data ) { unless ( @args = $data =~ /(\S+)=(\S+)/g ) { @args = ( 'args' => $data ) ; } } elsif ( $ref eq 'HASH' ) { @args = %{$data} ; } elsif ( $ref eq 'ARRAY' ) { @args = @{$data} ; } } push( @args, triggering_msg => $msg ) ; my $err = $self->cell_trigger( @args ) ; print "TRIG ERR [$err]\n" if $err ; return $err if ref $err ; return ; } sub cell_shut_down { my( $self, $error ) = @_ ; my $cell_info = $self->_get_cell_info() ; #cluck "CELL SHUT\n" ; #print $cell_info->_dump( 'SHUT' ) ; return unless $error || $cell_info->{'active'} ; $cell_info->{'error'} = $error ; #print $cell_info->_dump( "SHUT $error" ) ; if ( my $aio = delete $cell_info->{'aio'} ) { $aio->shut_down() ; } if ( my $gather = delete $cell_info->{'gather'} ) { $gather->shut_down() ; } $cell_info->_close_pipe() ; $cell_info->_clone_delete() ; delete $cell_info->{'args'} ; # delete $cell_info->{'data_addr'} ; $cell_info->{'active'} = 0 ; $cell_info->{'triggered'} = 0 ; TraceStatus "cell shut down done" ; return ; } sub cell_set_args { my( $self, %args ) = @_ ; my $cell_info = $self->_get_cell_info() ; @{$cell_info->{'args'}}{ keys %args } = values %args ; if ( my $gather = $cell_info->{'gather'} ) { my $err = $gather->gathered( keys %args ) ; return $err if $err ; } return ; } sub cell_get_args { my( $self, @arg_keys ) = @_ ; my $cell_info = $self->_get_cell_info() ; return( @{$cell_info->{'args'}}{@arg_keys } ) ; } sub cell_info { my( $self ) = shift ; my $cell_info = $self->_get_cell_info() ; $cell_info->{'info'} = shift if @_ ; return $cell_info->{'info'} ; } sub _gather_io_args { my( $self ) = @_ ; my $cell_info = $self->_get_cell_info() ; return if $cell_info->{'no_io'} ; my @gather_keys = 'aio_args' ; push( @gather_keys, 'data_addr' ) if $cell_info->{'piped'} && ! $cell_info->{'data_addr'} ; my $gather = Stem::Gather->new( 'object' => $cell_info, 'keys' => \@gather_keys, 'gathered_method' => '_cell_activate_io', ) ; return $gather unless ref $gather ; $cell_info->{'gather'} = $gather ; my $err = $gather->gathered( keys %{$cell_info->{'args'}} ) ; return $err if $err ; } sub _cell_activate_io { my ( $self ) = @_ ; TraceStatus "cell activated" ; $self->{'active'} = 1 ; #print $self->_dump( "BEFORE AIO" ) ; my @aio_args ; # get any config args if ( my $aio_args = $self->{'aio_args'} ) { push( @aio_args, %{$aio_args} ) ; } # args from a trigger message override any config args if ( my $msg_aio_args = $self->{'args'}{'aio_args'} ) { ref $msg_aio_args eq 'ARRAY' or return <{'args'}{'data_addr'} || $self->{'data_addr'} ; my $aio = Stem::AsyncIO->new( 'object' => $self->{'owner_obj'}, 'data_addr' => $data_addr, 'from_addr' => $self->{'from_addr'}, 'send_data_on_close' => $self->{'send_data_on_close'}, 'codec' => $self->{'codec'}, @aio_args, ) ; print "AIO ERR [$aio]\n" unless ref $aio ; return $aio unless ref $aio ; $self->{'aio'} = $aio ; #print $self->_dump( "AFTER AIO" ) ; return ; } sub cell_activate { my( $self ) = @_ ; my $cell_info = $self->_get_cell_info() ; $cell_info->{'active'} = 1 ; } *cell_status_cmd = \&status_cmd ; sub status_cmd { my( $self ) = @_ ; my $cell_info = $self->_get_cell_info() ; my $info = $cell_info->{'info'} || $cell_info->{'args'}{'info'} || '' ; $info =~ s/^/\t\t/mg ; my $class = ref $cell_info->{'owner_obj'} ; # my $data_addr = Stem::Msg::address_string( my $data_addr = $cell_info->{'data_addr'} || $cell_info->{'args'}{'data_addr'} || '[NONE]' ; my $active = ( $cell_info->{'active'} ) ? 'Active' : 'Inactive' ; my $codec = $cell_info->{codec} || 'NONE' ; print "CELL STATUS\n" ; #my $dump = $cell_info->_dump( 'STATUS' ) ; my $dump = '' ; return <{'from_addr'} Status: $active Data Addr: $data_addr Codec: $codec Info:$info SELF: $self CELL: $cell_info AIO: $cell_info->{aio} FH: $cell_info->{fh} $dump STATUS } sub data_in { my( $self, $msg ) = @_ ; #print "DATA SELF $self\n" ; #print $msg->dump( 'CELL IN' ) ; my $cell_info = $self->_get_cell_info() ; if ( $cell_info->{'is_parent'} ) { #print "PARENT\n" ; TraceStatus "parent cell $cell_info->{'from_addr'} ignoring msg" ; return ; } unless( $cell_info->{'active'} ) { #print "INACTIVE\n" ; TraceStatus "cell not active. msg ignored FOO" ; return ; } #print $cell_info->_dump( "DATA IN" ) ; $cell_info->{data_in_msg} = $msg ; $cell_info->cell_write( $msg->data() ) ; } sub cell_write { my( $self, $data ) = @_ ; my $cell_info = $self->_get_cell_info() ; $cell_info->{'aio'}->write( $data ) ; } sub _cell_write_sync { my( $self, $data ) = @_ ; my $cell_info = $self->_get_cell_info() ; #print "SYNC $$data\n" ; #print $cell_info->_dump( 'SYNC' ) ; if ( my $aio_args = $cell_info->{'args'}{'aio_args'} ) { my %aio_args = @{$aio_args} ; if ( my $fh = $aio_args{'fh'} ) { # $fh->blocking( 1 ) ; $fh->syswrite( (ref $data) ? $$data : $data ) ; } } } # handle stderr data as plain data *stderr_data_in = \&data_in ; # $cell_info is the Stem::Cell object of the parent cell. the name is # not self as it is differentiated from $clone_info. sub _callback { my ( $self, $method_name, @data ) = @_ ; my $method = $self->{$method_name} ; my $owner_obj = $self->{'owner_obj'} ; if ( $owner_obj->can( $method ) ) { return $owner_obj->$method( @data ) ; } TraceStatus "can't call $method in $owner_obj" ; return ; } sub cell_from_addr { my ( $self ) = @_ ; my $cell_info = $self->_get_cell_info() ; return( $cell_info->{'from_addr'} ) ; } use Stem::Debug qw( dump_data ) ; sub _dump { my ( $self, $text ) = @_ ; return $text . dump_data( $self ) ; $text ||= 'CELL' ; my $dump = "$text =\n" ; my $cell_info = $self->_get_cell_info() ; # my $owner_obj = $cell_info->{owner_obj} ; # my @names = lookup_cell_name( $owner_obj ) ; # $dump .= "\nNames: @names\n" ; foreach my $key ( sort keys %{$cell_info} ) { my $val = $cell_info->{$key} ; next unless defined $val ; if ( $key eq 'args' ) { $dump .= "\targs = {\n" ; foreach my $arg ( sort keys %{$val} ) { my $arg_val = $val->{$arg} || ''; $dump .= "\t\t$arg = '$arg_val'\n" ; } $dump .= "\t}\n" ; next ; } $dump .= "\t$key = '$val'\n" ; } $dump .= "\n\n" ; return $dump ; } sub dump_cmd { my ($self) = @_ ; my $cell_info = $self->_get_cell_info() ; return $cell_info->_dump() . Dumper $cell_info ; } 1 ;