package Search::Elasticsearch::Role::Cxn; $Search::Elasticsearch::Role::Cxn::VERSION = '1.16'; use Moo::Role; use Search::Elasticsearch::Util qw(throw); use List::Util qw(min); use Try::Tiny; use URI(); use Search::Elasticsearch::Util qw(to_list); use namespace::clean; requires qw(protocol perform_request error_from_text); has 'host' => ( is => 'ro', required => 1 ); has 'port' => ( is => 'ro', required => 1 ); has 'uri' => ( is => 'ro', required => 1 ); has 'request_timeout' => ( is => 'ro', default => 30 ); has 'ping_timeout' => ( is => 'ro', default => 2 ); has 'sniff_timeout' => ( is => 'ro', default => 1 ); has 'sniff_request_timeout' => ( is => 'ro', default => 2 ); has 'next_ping' => ( is => 'rw', default => 0 ); has 'ping_failures' => ( is => 'rw', default => 0 ); has 'dead_timeout' => ( is => 'ro', default => 60 ); has 'max_dead_timeout' => ( is => 'ro', default => 3600 ); has 'serializer' => ( is => 'ro', required => 1 ); has 'logger' => ( is => 'ro', required => 1 ); has 'handle_args' => ( is => 'ro', default => sub { {} } ); my %Code_To_Error = ( 400 => 'Request', 403 => 'ClusterBlocked', 404 => 'Missing', 409 => 'Conflict', 503 => 'Unavailable' ); #=================================== sub is_live { !shift->next_ping } sub is_dead { !!shift->next_ping } #=================================== #=================================== sub mark_live { #=================================== my $self = shift; $self->ping_failures(0); $self->next_ping(0); } #=================================== sub mark_dead { #=================================== my $self = shift; my $fails = $self->ping_failures; $self->ping_failures( $fails + 1 ); my $timeout = min( $self->dead_timeout * 2**$fails, $self->max_dead_timeout ); my $next = $self->next_ping( time() + $timeout ); $self->logger->infof( 'Marking [%s] as dead. Next ping at: %s', $self->stringify, scalar localtime($next) ); } #=================================== sub force_ping { #=================================== my $self = shift; $self->ping_failures(0); $self->next_ping(-1); } #=================================== sub pings_ok { #=================================== my $self = shift; $self->logger->infof( 'Pinging [%s]', $self->stringify ); return try { $self->perform_request( { method => 'HEAD', path => '/', timeout => $self->ping_timeout, } ); $self->logger->infof( 'Marking [%s] as live', $self->stringify ); $self->mark_live; 1; } catch { $self->logger->debug("$_"); $self->mark_dead; 0; }; } #=================================== sub sniff { #=================================== my $self = shift; my $protocol = $self->protocol; $self->logger->infof( 'Sniffing [%s]', $self->stringify ); return try { $self->perform_request( { method => 'GET', path => '/_nodes/' . $protocol, qs => { timeout => 1000 * $self->sniff_timeout }, timeout => $self->sniff_request_timeout, } )->{nodes}; } catch { $self->logger->debug($_); return; }; } #=================================== sub process_response { #=================================== my ( $self, $params, $code, $msg, $body, $mime_type ) = @_; my $is_encoded = $mime_type && $mime_type ne 'text/plain'; if ( $code >= 200 and $code <= 209 ) { if ( defined $body and length $body ) { $body = $self->serializer->decode($body) if $is_encoded; return $code, $body; } return ( $code, 1 ) if $params->{method} eq 'HEAD'; return ( $code, '' ); } my @ignore = to_list( $params->{ignore} ); push @ignore, 404 if $params->{method} eq 'HEAD'; return ($code) if grep { $_ eq $code } @ignore; my $error_type = $Code_To_Error{$code}; unless ($error_type) { if ( defined $body and length $body ) { $msg = $body; $body = undef; } $error_type = $self->error_from_text( $code, $msg ); } delete $params->{data} if $params->{body}; my %error_args = ( status_code => $code, request => $params ); if ( $body = $self->serializer->decode($body) ) { $error_args{body} = $body; if ( ref $body ) { $msg = $body->{error} || $msg; } else { $msg = $body; } $error_args{current_version} = $1 if $error_type eq 'Conflict' and $msg =~ /: version conflict, current \[(\d+)\]/; } $msg ||= $error_type; chomp $msg; throw( $error_type, "[" . $self->stringify . "]-[$code] $msg", \%error_args ); } 1; # ABSTRACT: Provides common functionality to Cxn implementations __END__ =pod =encoding UTF-8 =head1 NAME Search::Elasticsearch::Role::Cxn - Provides common functionality to Cxn implementations =head1 VERSION version 1.16 =head1 DESCRIPTION L provides common functionality to the Cxn implementations. Cxn instances are created by a L implementation, using the L class. =head1 CONFIGURATION B The L, L, L, and L parameters default to values that allow this module to function with low powered hardware and slow networks. When you use Elasticsearch in production, you will probably want to reduce these timeout parameters to values that suit your environment. The configuration parameters are as follows: =head2 C $e = Search::Elasticsearch->new( request_timeout => 30 ); How long a normal request (ie not a ping or sniff request) should wait before throwing a C error. Defaults to C<30> seconds. B In production, no request should take 30 seconds to run, other than an L request. A more reasonable value for production would be C<10> seconds or lower. =head2 C $e = Search::Elasticsearch->new( ping_timeout => 2 ); How long a ping request should wait before throwing a C error. Defaults to C<2> seconds. The L module pings nodes on first use, after any failure, and periodically to ensure that nodes are healthy. The C should be long enough to allow nodes respond in time, but not so long that sick nodes cause delays. A reasonable value for use in production on reasonable hardware would be C<0.3>-C<1> seconds. =head2 C $e = Search::Elasticsearch->new( dead_timeout => 60 ); How long a Cxn should be considered to be I (not used to serve requests), before it is retried. The default is C<60> seconds. This value is increased by powers of 2 for each time a request fails. In other words, the delay after each failure is as follows: Failure Delay 1 60 * 1 = 60 seconds 2 60 * 2 = 120 seconds 3 60 * 4 = 240 seconds 4 60 * 8 = 480 seconds 5 60 * 16 = 960 seconds =head2 C $e = Search::Elasticsearch->new( max_dead_timeout => 3600 ); The maximum delay that should be applied to a failed node. If the L calculation results in a delay greater than C (default C<3,600> seconds) then the C is used instead. In other words, dead nodes will be retried at least once every hour by default. =head2 C $e = Search::Elasticsearch->new( sniff_request_timeout => 2 ); How long a sniff request should wait before throwing a C error. Defaults to C<2> seconds. A reasonable value for production would be C<0.5>-C<2> seconds. =head2 C $e = Search::Elasticsearch->new( sniff_timeout => 1 ); How long the node being sniffed should wait for responses from other nodes before responding to the client. Defaults to C<1> second. A reasonable value in production would be C<0.3>-C<1> seconds. B The C is distinct from the L. For example, let's say you have a cluster with 5 nodes, 2 of which are unhealthy (taking a long time to respond): =over =item * If you sniff an unhealthy node, the request will throw a C error after C seconds. =item * If you sniff a healthy node, it will gather responses from the other nodes, and give up after C seconds, returning just the information it has managed to gather from the healthy nodes. =back B The C must be longer than the C to ensure that you get information about healthy nodes from the cluster. =head2 C Any default arguments which should be passed when creating a new instance of the class which handles the network transport, eg L. =head1 METHODS None of the methods listed below are useful to the user. They are documented for those who are writing alternative implementations only. =head2 C $host = $cxn->host; The value of the C parameter, eg C. =head2 C $port = $cxn->port; The value of the C parameter, eg C<9200>. =head2 C $uri = $cxn->uri; A L object representing the node, eg C. =head2 C $bool = $cxn->is_dead Is the current node marked as I. =head2 C $bool = $cxn->is_live Is the current node marked as I. =head2 C $time = $cxn->next_ping($time) Get/set the time for the next scheduled ping. If zero, no ping is scheduled and the cxn is considered to be alive. If -1, a ping is scheduled before the next use. =head2 C $num = $cxn->ping_failures($num) The number of times that a cxn has been marked as dead. =head2 C $cxn->mark_dead Mark the cxn as I, set L and increment L. =head2 C Mark the cxn as I, set L and L to zero. =head2 C Set L to -1 (ie before next use) and L to zero. =head2 C $bool = $cxn->pings_ok Try to ping the node and call L or L depending on the success or failure of the ping. =head2 C $response = $cxn->sniff; Send a sniff request to the node and return the response. =head2 C ($code,$result) = $cxn->process_response($params, $code, $msg, $body ); Processes the response received from an Elasticsearch node and either returns the HTTP status code and the response body (deserialized from JSON) or throws an error of the appropriate type. The C<$params> are the original params passed to L, the C<$code> is the HTTP status code, the C<$msg> is the error message returned by the backend library and the C<$body> is the HTTP response body returned by Elasticsearch. =head1 AUTHOR Clinton Gormley =head1 COPYRIGHT AND LICENSE This software is Copyright (c) 2014 by Elasticsearch BV. This is free software, licensed under: The Apache License, Version 2.0, January 2004 =cut