package UR::Service::RPC::Message; use UR; use FreezeThaw; use IO::Select; use strict; use warnings; our $VERSION = "0.46"; # UR $VERSION; UR::Object::Type->define( class_name => 'UR::Service::RPC::Message', has => [ target_class => { is => 'String' }, method_name => { is => 'String' }, ], has_optional => [ #arg_list => { is => 'ARRAY' }, params => { is => 'Object', is_many => 1 }, return_values => { is => 'Object', is_many => 1 }, 'wantarray' => { is => 'Integer' }, fh => { is => 'IO::Handle' }, exception => { is => 'String' }, ], is_transactional => 0, ); sub create { my($class,%params) = @_; foreach my $key ( 'params', 'return_values' ) { if (!$params{$key}) { $params{$key} = []; } elsif (ref($params{$key}) ne 'ARRAY') { $params{$key} = [ $params{$key} ]; } } return $class->SUPER::create(%params); } sub send { my $self = shift; my $fh = shift; $fh ||= $self->fh; my %struct; foreach my $key ( qw (target_class method_name params wantarray return_values exception) ) { $struct{$key} = $self->{$key}; } my $string = FreezeThaw::freeze(\%struct); $string = pack('N', length($string)) . $string; my $len = length($string); my $sent = 0; while($sent < $len) { my $wrote = $fh->syswrite($string, $len - $sent, $sent); if ($wrote) { $sent += $wrote; } else { # The filehandle closed for some reason $fh->close; return undef; } } return $sent; } sub recv { my($class, $fh, $timeout) = @_; # You can also call recv on a message object previously created if (ref($class) && $class->isa('UR::Service::RPC::Message')) { my $fh = $class->fh; $class = ref($class); return $class->recv($fh); } if (@_ < 3) { # # if they didn't specify a timeout $timeout = 5; # Default wait 5 sec } my $select = IO::Select->new($fh); # read in the message len, 4 chars my $msglen; my $numchars = 0; while ($numchars < 4) { unless ($select->can_read($timeout)) { $class->warning_message("Can't get message length, timed out"); return; } my $read = $fh->sysread($msglen, 4-$numchars, $numchars); unless ($read) { $class->warning_message("Can't get message length: $!"); return; } $numchars += $read; } $msglen = unpack('N', $msglen); my $string = ''; $numchars = 0; while ($numchars < $msglen) { unless ($select->can_read($timeout)) { $class->warning_message("Timed out reading message after $numchars bytes"); return; } my $read = $fh->sysread($string, $msglen - $numchars, $numchars); unless($read) { $class->warning_message("Error reading message after $numchars bytes: $!"); return; } $numchars += $read; } my($struct) = FreezeThaw::thaw($string); my $obj = $class->create(%$struct, fh => $fh); return $obj; } 1; =pod =head1 NAME UR::Service::RPC::Message - Serializable object appropriate for sending RPC messages =head1 SYNOPSIS my $msg = UR::Service::RPC::Message->create( target_class => 'URT::RPC::Thingy', method_name => 'join', params => ['-', @join_args], 'wantarray' => 0, ); $msg->send($fh); my $resp = UR::Service::RPC::Message->recv($fh, 5); =head1 DESCRIPTION This class is used as a message-passing interface by the RPC service modules. =head1 PROPERTIES These properties should be filled in by the initiating caller =over 4 =item method_name => Text The name of the subroutine the initiator whishes to call. =item target_class => Text The namespace the initiator wants the subroutine to be called in =item params => ARRAY List of parameters to pass to the subroutine =item wantarray => Boolean What wantarray() context the subroutine should be called in. =back These properties are assigned after the RPC call to the subroutine =over 4 =item return_values => ARRAY List of values returned by the subroutine =item exception On the receiving side, the subroutine is called within an eval. If there was an exception, C stores the value of $@, or the empty string. The receiving side should also fill-in C if there was an authentication failure. =item fh C fills this in with the file handle the message was read from. =back =head1 METHODS =over 4 =item send $bytes = $msg->send($fh); Serializes the Message object with FreezeThaw and writes the data to the filehandle $fh. Returns the number of bytes written. $bytes will be false if there was an error. =item recv $response = UR::Service::RPC::Message->recv($fh,$timeout); $response = $msg->recv(); Reads a serialized Message from the filehandle and constructs a Message object that is then returned to the caller. In the first case, it reads from the given filehandle, waiting a maximum of $timeout seconds with select before giving up. In the second case, it reads from whatever filehandle is stored in $msg to read data from. =back =head1 SEE ALSO UR::Service::RPC::Server, UR::Service::RPC::Executor =cut