package IPC::AnyEvent::Gearman; # ABSTRACT: IPC through gearmand. use Devel::GlobalDestruction; use namespace::autoclean; use Log::Log4perl qw(:easy); Log::Log4perl->easy_init($ERROR); use Scalar::Util qw(weaken); use Any::Moose; use Data::Dumper; use AnyEvent::Gearman; use AnyEvent::Gearman::Worker::RetryConnection; use UUID::Random; our $VERSION = '0.8'; # VERSION has 'job_servers' => (is => 'rw', isa => 'ArrayRef',required => 1); has channel => (is => 'rw', default=>sub{UUID::Random::generate;} ); has 'on_recv' => (is => 'rw', isa=>'CodeRef', default=>sub{return sub{WARN 'You need to set on_recv function'};} ); has 'on_sent' => (is => 'rw', isa=>'CodeRef', default=>sub{return sub{INFO 'Send OK '.$_[0]};} ); has 'on_fail' => (is => 'rw', isa=>'CodeRef', default=>sub{return sub{WARN 'Send FAIL '.$_[0]};} ); has 'client' => (is=>'rw', lazy=>1, isa=>'Object', default=>sub{ DEBUG 'lazy client'; my $self = shift; return gearman_client @{$self->job_servers()}; }, ); has 'worker' => (is=>'rw', isa=>'Object',); after 'channel' => sub{ my $self = shift; if( @_ && $self->{listening}){ $self->_renew_connection(); } }; after 'job_servers' => sub{ my $self = shift; if( @_ && $self->{listening}){ $self->_renew_connection(); } if( @_ ){ $self->client( gearman_client @{$self->job_servers()} ); } }; sub listen{ my $self = shift; $self->{listening} = 1; $self->_renew_connection(); } sub send{ my $self = shift; my $target_channel= shift; my $data = shift; $self->client->add_task( $target_channel => $data, on_complete => sub{ my $result = $_[1]; $self->on_sent()->($target_channel,$_[1]); }, on_fail => sub{ $self->on_fail()->($target_channel); } ); weaken($self); } sub _renew_connection{ my $self = shift; DEBUG "new Connection"; my $worker = gearman_worker @{$self->job_servers()}; $worker = AnyEvent::Gearman::Worker::RetryConnection::patch_worker($worker); $self->worker( $worker ); $self->worker->register_function( $self->channel() => sub{ my $job = shift; my $res = $self->on_recv()->($job->workload); $res = '' unless defined($res); $job->complete($res); } ); weaken($self); } sub DEMOLISH{ return if in_global_destruction; my $self = shift; DEBUG __PACKAGE__." DEMOLISHED"; } __PACKAGE__->meta->make_immutable; 1; =pod =head1 NAME IPC::AnyEvent::Gearman - IPC through gearmand. =head1 VERSION version 0.8 =head1 SYNOPSIS use AnyEvent; use IPC::AnyEvent::Gearman; #receive my $recv = IPC::AnyEvent::Gearman->new(job_servers=>['localhost:9999']); $recv->channel('BE_CALLED'); # channel is set with a random UUID by default $recv->on_recv(sub{ my $msg = shift; print "received msg : $data\n"; return "OK";#result }); $recv->listen(); my $cv = AE::cv; $cv->recv; #send my $ch = 'BE_CALLED'; my $send = IPC::AnyEvent::Gearman->new(server=>['localhost:9999']); my $result = $send->send($ch,"TEST DATA"); pritn $result; # prints "OK" =head1 ATTRIBUTES =head2 job_servers ArrayRef of hosts. *REQUIRED* =head2 channel get/set channel. When set, reconnect to new channel. It is set with Random-UUID by default. =head2 on_recv on_recv Hander. First argument is DATA which is sent. This can be invoked after listen(). =head2 on_sent on_sent handler. First argument is a channel string. =head2 on_fail on_fail handler. First argument is a channel string. =head1 METHODS =head2 listen To receive message, you MUST call listen(). my $sender = IPC::AnyEvent::Gearman->new(channel=>'ADMIN',job_servers=>['localhost:9998']); $sender->listen(); =head2 send To send data to process listening channel, use this. my $sender = IPC::AnyEvent::Gearman->new(job_servers=>['localhost:9998']); $sender->send($channel,'DATA'); =head1 AUTHOR KHS, HyeonSeung Kim =head1 COPYRIGHT AND LICENSE This software is copyright (c) 2012 by HyeonSeung Kim. This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself. =cut __END__