package App::Oozie::Deploy::Validate::Spec::Workflow; $App::Oozie::Deploy::Validate::Spec::Workflow::VERSION = '0.003'; use 5.010; use strict; use warnings; use namespace::autoclean -except => [qw/_options_data _options_config/]; use File::Basename; use Moo; use MooX::Options; use Types::Standard qw( CodeRef ); my @JOB_TYPES_NEEDING_QUEUE = qw( fs hive java shell spark sqoop sub-workflow ); with qw( App::Oozie::Role::Log App::Oozie::Role::Fields::Generic ); has queue_conf_key_name => ( is => 'rw', default => sub { 'mapreduce.job.queuename' }, ); has file => ( is => 'rw', required => 1, ); has file_size => ( is => 'ro', default => sub { my $file = shift->file; my $wf_size = (stat $file )[7] || die "$file either has zero size or I've failed to locate it"; $wf_size; } ); has max_wf_xml_length => ( is => 'rw', required => 1, ); has max_node_name_len => ( is => 'rw', required => 1, ); has email_validator => ( is => 'rw', required => 1, isa => CodeRef, ); has spec_queue_is_missing_message => ( is => 'rw', default => sub { <<'NO_QUEUE_MSG'; The action configuration property "%s" is not defined for these action(s): %s You don't have to add it to each individually; you can also add a global block which adds it to all of your action nodes at once. Example: [% PROCESS workflow_global_xml_start %] mapreduce.job.queuename PUT YOUR QUEUE NAME HERE [% PROCESS workflow_global_xml_end %] The [% ... %] tags are probably already in your workflow.xml. NO_QUEUE_MSG }, ); sub verify { my $self = shift; my $xml_in = shift; my $file = $self->file; my $wf_size = $self->file_size; my $max_wf_xml_length = $self->max_wf_xml_length; my $max_node_name_len = $self->max_node_name_len; my($validation_errors, $total_errors); if ( $wf_size > $max_wf_xml_length ) { my $msg = sprintf <<'ETOOFAT', basename( $file ), $wf_size, $max_wf_xml_length; Your %s has a size above the limit ( %s > %s ) please either modify it to reduce the size as your job will fail anyway if you push it as-is. ETOOFAT $self->logger->warn( $msg ); $validation_errors++; $total_errors++; } # check any action contains root.default or root.mapred queue conf (spark,hive or shell) my $FH; my $validation_queue_check = 0; open $FH, '<', $file or die "Cannot open $file"; while(my $String = <$FH>) { if($String =~ /(root.default)$/ || $String =~ /(root.mapred)$/ ) { $self->logger->error( "FIXME !!! queue configuration parameter in workflow.xml is set to default or mapred; you are not allowed to deploy workflows in root.mapred or root.default queue." ); $validation_errors++; $total_errors++; } if($String=~ /(mapreduce.job.queuename)/ || $String =~ /(spark.yarn.queue)/ ) { $validation_queue_check++; } } if ( !$validation_queue_check ) { $self->logger->error( "FIXME !!! queue configuration parameter in workflow.xml is not mentioned..Please set queue parameter either using --conf spark.yarn.queue or mapreduce.job.queuename. you are not allowed to deploy workflows in root.mapred or root.default queue." ); $validation_errors++; $total_errors++; } my $prop = $xml_in->{parameters} && $xml_in->{parameters}{property} ? $xml_in->{parameters}{property} : undef ; my $global_prop = $xml_in->{global} && $xml_in->{global}{configuration} && $xml_in->{global}{configuration}{property} ? $xml_in->{global}{configuration}{property} : undef ; $self->logger->info( "XML key validation for $file" ); # check some values in the XML files # in workflow.xml, check errorEmailTo, various params, and display a warning my @contact_mail = map { $_->{value} } grep { $_->{name} eq 'errorEmailTo' } @$prop; # check if global conf parameter contains mapred or default queue configuration my @queue_array = map { $_->{value} } grep { $_->{name} =~ 'queuename' } @$global_prop; foreach my $queue_value (@queue_array) { if ($queue_value =~ 'default' || $queue_value =~ 'mapred' ) { $self->logger->error( "FIXME !!! mapreduce.job.queuename parameter in workflow.xml is set to default or mapred; you are not allowed to deploy workflows in root.mapred or root.default queue" ); $validation_errors++; $total_errors++; } } if ( ! @contact_mail ) { $self->logger->warn( "FIXME !!! no errorEmailTo parameter in workflow.xml; you will not get error emails" ); $validation_errors++; $total_errors++; } else { my $validator = $self->email_validator; if ( ! $validator->( $self, @contact_mail ) ) { $self->logger->warn( sprintf "errorEmailTo=`%s` is invalid", @contact_mail ); $validation_errors++; $total_errors++; } } if ( my $action = $xml_in->{action} ) { foreach my $name ( keys %{ $action } ) { my $len = length $name; next if $len <= $max_node_name_len; # See https://issues.apache.org/jira/browse/OOZIE-2168 my $msg = <<"LONG_ACTION_NAME"; FIXME !!! The action name is longer than $max_node_name_len characters (it is $len characters to be precise) $name The restriction to $max_node_name_len characters is a hardcoded limit in the Oozie Java code (and its MySQL metastore). Plese rename it as your job will fail eventually at run time. LONG_ACTION_NAME $self->logger->warn( $msg ); $validation_errors++; $total_errors++; } my($action_validation_errors, $action_total_errors ) = $self->verify_queue_name( $action, $global_prop ); $validation_errors += $action_validation_errors; $total_errors += $action_total_errors; } return $validation_errors // 0, $total_errors // 0; } sub verify_queue_name { my($self, $action, $global_prop) = @_; my $logger = $self->logger; $logger->info( sprintf 'Verifying %s', $self->queue_conf_key_name ); # check if workflow has defined queuname globally my $needs_verification = ! $global_prop || ! exists $global_prop->{ $self->queue_conf_key_name } ; if ( ! $needs_verification ) { $logger->info( sprintf 'There is a global setting for %s', $self->queue_conf_key_name ); return 0, 0; } $logger->info( sprintf 'There is no global setting for "%s" defined in your workflow. The individual actions will now be verified instead.', $self->queue_conf_key_name ); my($validation_errors, $total_errors); # mapreduce.job.queuename is not defined globally # check if action has queuename property defined or not my @offenders; if ( exists $action->{name} ) { # There are only single actions (XML::Simple issue) $action = { $action->{name} => $action }; } foreach my $action_name (keys %{ $action } ) { my $action = $action->{$action_name}; foreach my $job_type ( grep { exists $action->{ $_ } } @JOB_TYPES_NEEDING_QUEUE ) { my $a_prop = $action->{ $job_type }{configuration} && $action->{ $job_type }{configuration}{property} ? $action->{ $job_type }{configuration}{property} : undef ; if ( ( ! $a_prop || ! exists $a_prop->{ $self->queue_conf_key_name } ) && ( ! exists $a_prop->{name} || $a_prop->{name} ne $self->queue_conf_key_name ) ) { push @offenders, $action_name; } last; } } if ( @offenders ) { my $flat_list = sprintf "\t- %s\n", join "\n\t- ", sort { lc $a cmp lc $b } @offenders; my $varname = $self->queue_conf_key_name; my $msg = sprintf $self->spec_queue_is_missing_message, $varname, $flat_list, ; $self->logger->warn( $msg ); $validation_errors = $total_errors = @offenders; } return $validation_errors // 0, $total_errors // 0; } 1; __END__ =pod =encoding UTF-8 =head1 NAME App::Oozie::Deploy::Validate::Spec::Workflow =head1 VERSION version 0.003 =head1 SYNOPSIS TBD =head1 DESCRIPTION TBD =head1 NAME App::Oozie::Deploy::Validate::Spec::Workflow - Part of the Oozie Workflow validator kit. =head1 Methods =head2 file =head2 file_size =head2 max_node_name_len =head2 max_wf_xml_length =head2 queue_conf_key_name =head2 spec_queue_is_missing_message =head2 verify =head2 verify_queue_name =head1 SEE ALSO L. =head1 AUTHORS =over 4 =item * David Morel =item * Burak Gursoy =back =head1 COPYRIGHT AND LICENSE This software is copyright (c) 2023 by Booking.com. This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself. =cut