App-Oozie/lib/App/Oozie/Deploy.pm
package App::Oozie::Deploy;
use 5.014;
use strict;
use warnings;
our $VERSION = '0.020'; # VERSION
use namespace::autoclean -except => [qw/_options_data _options_config/];
use App::Oozie::Constants qw(
DEFAULT_DIR_MODE
DEFAULT_FILE_MODE
EMPTY_STRING
FILE_FIND_FOLLOW_SKIP_IGNORE_DUPLICATES
MILISEC_DIV
MODE_BITSHIFT_READ
SPACE_CHAR
STAT_MODE
TERMINAL_INFO_LINE_LEN
WEBHDFS_CREATE_CHUNK_SIZE
);
use Cwd 'abs_path';
use Moo;
use MooX::Options prefer_commandline => 0,
protect_argv => 0,
usage_string => <<'USAGE',
Usage: %c %o
Deploys workflows to HDFS. Specifying names as final arguments will upload only those
USAGE
;
use App::Oozie::Deploy::Template;
use App::Oozie::Deploy::Validate::Spec;
use App::Oozie::Types::Common qw( IsDir IsFile );
use App::Oozie::Util::Misc qw( resolve_tmp_dir trim_slashes );
use App::Oozie::Constants qw( OOZIE_STATES_RUNNING );
use Carp ();
use Config::Properties;
use Config::General ();
use DateTime::Format::Strptime;
use DateTime;
use Email::Valid;
use Fcntl qw( :mode );
use File::Basename qw( basename dirname );
use File::Find ();
use File::Find::Rule;
use File::Spec;
use File::Temp ();
use List::MoreUtils qw( uniq );
use List::Util qw( max );
use Path::Tiny qw( path );
use Ref::Util qw( is_arrayref is_hashref );
use Sys::Hostname ();
use Template;
use Text::Glob qw(
match_glob
glob_to_regex
);
use Time::Duration qw( duration_exact );
use Time::HiRes qw( time );
use Types::Standard qw(
ArrayRef
CodeRef
StrictNum
Str
);
with qw(
App::Oozie::Role::Log
App::Oozie::Role::Fields::Common
App::Oozie::Role::NameNode
App::Oozie::Role::Git
App::Oozie::Role::Meta
App::Oozie::Role::Info
);
option write_ownership_to_workflow_xml => (
is => 'rw',
default => sub { 1 },
doc => 'Populate the meta file into workflow.xml? This option is temporary while testing',
);
option hdfs_dest => (
is => 'rw',
format => 's',
doc => 'HDFS destination (default is <default_hdfs_destination>/<name>)',
);
option keep_deploy_path => (
is => 'rw',
short => 'keep',
doc => 'Keep the temp files at the end of deployment',
);
option prune => (
is => 'rw',
short => 'p',
doc => 'Prune obsolete files on HDFS',
);
option sla => (
is => 'rw',
doc => 'Enable SLA under Oozie?',
);
option oozie_workflows_base => (
is => 'rw',
format => 's',
default => sub {
my $self = shift;
File::Spec->catdir( $self->local_oozie_code_path, 'workflows' ),
},
lazy => 1,
);
option dump_xml_to_json => (
is => 'rw',
isa => IsDir,
format => 's',
doc => 'Specify a directory to convert and dump XML files in the workflow as JSON. This implies a dryrun.',
);
option hdfs_properties_file => (
is => 'rw',
isa => Str,
format => 's',
# i.e.: /share/oozie.properties
doc => 'The location of the optional properties file on HDFS',
);
#------------------------------------------------------------------------------#
has required_tt_files => (
is => 'ro',
isa => ArrayRef[Str],
default => sub {
return [qw(
coordinator_config_xml
ttree.cfg
workflow_global_xml_end
workflow_global_xml_start
workflow_parameters_xml_end
workflow_parameters_xml_start
workflow_sla_xml
workflow_xmlns
)],
},
);
has ttlib_base_dir => (
is => 'rw',
isa => IsDir,
lazy => 1,
default => sub {
my $self = shift;
my $first_guess = File::Spec->catdir( $self->local_oozie_code_path, 'lib' );
return $first_guess if App::Oozie::Types::Common->get_type(IsDir)->check( $first_guess );
(my $whereami = __FILE__) =~ s{ [.]pm \z }{}xms;
my $base = File::Spec->catdir( $whereami, 'ttlib' );
return $base if App::Oozie::Types::Common->get_type(IsDir)->check( $base );
die 'Failed to locate the ttlib path!';
},
);
# this will be used for dynamic includes/directives etc. It will be inside
# ttlib_base_dir
has ttlib_dynamic_base_dir_name => (
is => 'ro',
isa => Str,
default => sub { '.oozie-deploy-lib' },
);
has deploy_start => (
is => 'ro',
default => sub { time },
);
has max_node_name_len => (
is => 'ro',
isa => StrictNum, # range check?
lazy => 1,
default => sub {
shift->oozie->max_node_name_len;
},
);
has spec_queue_is_missing_message => (
is => 'rw',
isa => Str,
default => sub {
<<'NO_QUEUE_MSG';
The action configuration property "%s" is not
defined for these action(s):
%s
NO_QUEUE_MSG
},
);
has deployment_meta_file_name => (
is => 'rw',
default => sub { '.deployment' },
);
has configuration_files => (
is => 'rw',
# TODO: type needs fixing for coercion
# isa => ArrayRef[IsFile],
lazy => 1,
default => sub {
my $self = shift;
my $ttlib = $self->ttlib_base_dir;
return [
File::Spec->catfile( $ttlib, 'common.properties' ),
],
},
);
has email_validator => (
is => 'rw',
default => sub {
my $self = shift;
sub {
my $self = shift;
my $emails = shift || do {
$self->logger->warn( 'No email was set!' );
return;
};
my @splits = map s/\+.+?@/@/r, map s/^\s+|\s+$//gr, split q{,}, $emails; ## no critic (ProhibitStringySplit,RequireDotMatchAnything,RequireExtendedFormatting,RequireLineBoundaryMatching,ProhibitEscapedMetacharacters)
my @invalids = grep { ! Email::Valid->address( $_ ) } @splits;
return 1 if ! @invalids;
for my $bogus ( @invalids ) {
$self->logger->warn(
sprintf 'FIXME !!! errorEmailTo parameter in workflow.xml is not set to a proper address: %s',
$bogus,
);
}
return;
},
},
isa => CodeRef,
lazy => 1,
);
has internal_conf => (
is => 'ro',
builder => '__collect_internal_conf',
lazy => 1,
);
has process_coord_directive_varname => (
is => 'rw',
isa => CodeRef,
default => sub {
sub {
my $name = shift;
return $name;
},
},
);
sub BUILD {
my ($self, $args) = @_;
my $logger = $self->logger;
my $oozie_base_dir = $self->local_oozie_code_path;
my $ttlib_base_dir = $self->ttlib_base_dir;
my $verbose = $self->verbose;
my $is_file = IsFile->library->get_type( IsFile );
foreach my $file ( @{ $self->required_tt_files } ) {
my $absolute_path = File::Spec->catfile( $ttlib_base_dir, $file );
if ( $verbose ) {
$logger->debug("Assert file: $absolute_path");
}
# assert_valid() does not display the error message, hence the manual check
my $error = $is_file->validate( $absolute_path ) || next;
$logger->logdie( sprintf 'required_tt_files(): %s', $error );
}
if ( $verbose ) {
$logger->debug( join q{=}, $_, $self->$_ ) for qw(
local_oozie_code_path
ttlib_base_dir
);
}
if ( $self->dump_xml_to_json && ! $self->dryrun ) {
$self->logger->info( 'dump_xml_to_json is enabled without a dryrun. Enabling dryrun as well.' );
$self->dryrun( 1 );
}
return;
}
sub run {
my $self = shift;
my $workflows = shift;
my $logger = $self->logger;
my $config = $self->internal_conf;
my $dryrun = $self->dryrun;
my $verbose = $self->verbose;
my $run_start_epoch = time;
my $log_marker = q{#} x TERMINAL_INFO_LINE_LEN;
$logger->info(
sprintf '%s Starting deployment in %s%s %s',
$log_marker,
$self->cluster_name,
$verbose ? EMPTY_STRING : '. Enable --verbose to see the underlying commands',
$log_marker,
);
$self->log_versions if $verbose;
my($update_coord) = $self->_verify_and_compile_all_workflows( $workflows );
if (!$self->secure_cluster) {
# Left in place for historial reasons.
# All clusters should be under Kerberos.
# Possible removal in a future version.
#
# unsafe, but needed when uploading with mapred's uid or hdfs dfs cannot see the files
chmod oct( DEFAULT_FILE_MODE ), $config->{base_dest};
}
my $success = $self->upload_to_hdfs;
$self->maybe_update_coordinators( $update_coord ) if @{ $update_coord };
if ($self->prune) {
$logger->info( '--prune is set, checking workflow directories for old files' );
for my $workflow ( @{ $workflows } ) {
$self->prune_path(
File::Spec->catdir(
$config->{hdfs_dest},
basename $workflow
)
);
}
}
$logger->info(
sprintf '%s Completed successfully in %s (took %s) %s',
$log_marker,
sprintf( '%s%s', $self->cluster_name, ( $dryrun ? ' (dryrun is set)' : EMPTY_STRING ) ),
duration_exact( time - $run_start_epoch ),
$log_marker,
);
return $success;
}
sub _verify_and_compile_all_workflows {
my $self = shift;
my $workflows = shift;
my $logger = $self->logger;
if ( ! is_arrayref $workflows || ! @{ $workflows } ) {
$logger->logdie( 'Please give one or several workflow name(s) on the command line (glob pattern accepted). Also see --help' );
}
$self->pre_verification( $workflows );
$self->verify_temp_dir;
if ( $self->gitfeatures
&& ! $self->gitforce
) {
$self->verify_git_tag;
}
my $wfs = $self->collect_names_to_deploy( $workflows );
my($total_errors, $validation_errors);
my @update_coord;
for my $workflow ( @{ $wfs } ) {
my($t_validation_errors, $t_total_errors, $dest, $cvc) = $self->process_workflow( $workflow );
push @update_coord, $self->guess_running_coordinator( $workflow, $cvc, $dest );
$total_errors += $t_validation_errors;
$validation_errors += $t_total_errors;
}
if ($total_errors) {
$logger->fatal( sprintf 'ERROR: %s errors were encountered during this run. Please fix it!', $total_errors );
$logger->fatal( 'The --force option has been disabled, as not enough really paid attention.' );
$logger->fatal( 'Fixing the errors is really your best and easiest option.' );
$logger->logdie( 'Failed.' );
}
return \@update_coord;
}
sub process_workflow {
my $self = shift;
my $workflow = shift;
my($t_validation_errors, $t_total_errors, $dest, $cvc) = $self->process_templates( $workflow );
return $t_validation_errors, $t_total_errors, $dest, $cvc;
}
sub pre_verification {
# stub
}
sub destination_path {
my $self = shift;
my $default = shift || $self->default_hdfs_destination;
return $default =~ m{ \A hdfs:// }xms
? $default
: File::Spec->canonpath( File::Spec->catdir( q{/}, $default ) )
;
}
sub __collect_internal_conf_hdfs {
my $self = shift;
my $logger = $self->logger;
my $file = $self->hdfs_properties_file;
return {} if ! $file; # not specified at all
$logger->debug( sprintf 'If exists, fetching from HDFS: %s', $file );
return {} if ! $self->_hdfs_exists_no_exception( $file );
return {
Config::General::ParseConfig(
-String => $self->hdfs->read( $file ),
)
};
}
sub __collect_internal_conf {
my $self = shift;
my $keep = $self->keep_deploy_path;
my $logger = $self->logger;
# This will load static properties that we will reuse as variables in the
# template and merge it with the common.properties file
my $properties = Config::Properties->new;
my $config = {};
if ( my $files = $self->configuration_files ) {
my $verbose = $self->verbose;
foreach my $file ( @{ $files } ) {
if ( $verbose ) {
$logger->debug( sprintf 'Processing conf file %s ...', $file );
}
open my $FH, '<', $file or $logger->logdie( sprintf 'Failed to read %s: %s', $file, $! );
$properties->load( $FH );
if ( ! close $FH ) {
$logger->warn(
sprintf 'Failed to close %s: %s',,
$file,
$!,
);
}
$config = {
%{ $config },
$properties->properties,
};
}
}
$config = {
%{ $config },
%{ $self->__collect_internal_conf_hdfs },
};
my $base_dest = File::Temp::tempdir(
CLEANUP => ! $keep,
DIR => resolve_tmp_dir(),
);
$config->{base_dest} = $base_dest;
$self->logger->info(
"Output directory: `$base_dest`.",
( $keep ? ' You have decided to keep it after completion' : EMPTY_STRING )
);
# Override the paramters only when they are not set.
# For example, these keys might be set with an HDFS
# config file, collected before this point.
#
$config->{workflowsBaseDir} //= $self->oozie_basepath;
$config->{clusterName} //= $self->cluster_name;
$config->{hdfs_dest} //= $self->destination_path( $config->{workflowsBaseDir} );
# If YARN, use a different property.
# The oozie syntax doesn't change
# (still uses the jobtracker property)
$config->{jobTracker} //= $config->{resourceManager} || $self->resource_manager;
$config->{nameNode} //= $self->template_namenode;
$config->{has_sla} = $self->sla;
$self->logger->info( sprintf 'Upload directory: %s', $self->destination_path );
return $config;
}
sub max_wf_xml_length {
my $self = shift;
my $ooz_admin = $self->oozie->admin('configuration');
my $conf_val = $ooz_admin->{'oozie.service.WorkflowAppService.WorkflowDefinitionMaxLength'};
return $conf_val
|| $self->logger->logdie( 'Unable to fetch the ooozie configuration WorkflowDefinitionMaxLength!' );
}
sub guess_running_coordinator {
state $is_running = { map { $_ => 1 } OOZIE_STATES_RUNNING };
my $self = shift;
my $workflow = shift;
my $cvc = shift;
my $dest = shift;
my $logger = $self->logger;
$logger->info( 'Probing for existing coordinators ...' );
my $local_base = $self->oozie_workflows_base;
(my $rel_path = $workflow) =~ s{ \A \Q$local_base\E [/]? }{}xms;
my $remote_path = File::Spec->catfile( $self->destination_path, $rel_path );
my $paths = $self->oozie
->active_job_paths(
coordinator => $self->destination_path
);
my @rv;
foreach my $path ( grep { $_ =~ m{ \Q$remote_path\E \b \z }xms } keys %{ $paths } ) {
my $e = $paths->{ $path };
if ( @{ $e } > 1 ) {
# TODO: multi path
}
foreach my $jobs ( @{ $e } ) {
foreach my $cid ( keys %{ $jobs } ) {
# multiple coordinators
my $job = $jobs->{ $cid };
next if ! $is_running->{ $job->{status} };
push @rv,
{
path => $path,
coord_id => $cid,
job => $job,
cvc => $cvc,
workflow => $workflow,
dest => $dest,
};
}
}
}
return @rv;
}
sub maybe_update_coordinators {
my $self = shift;
my $coords = shift;
for my $e ( @{ $coords } ) {
# stub: better override in a subclass
}
return;
}
sub _get_spec_validator {
my($self, $dest) = @_;
my @pass_through = qw(
email_validator
max_node_name_len
max_wf_xml_length
oozie_cli
oozie_client_jar
oozie_uri
spec_queue_is_missing_message
timeout
verbose
);
return App::Oozie::Deploy::Validate::Spec->new(
( map { $_ => $self->$_ } @pass_through ),
local_path => $dest,
);
}
sub __maybe_dump_xml_to_json {
my $self = shift;
my $dump_path = $self->dump_xml_to_json || return;
my $logger = $self->logger;
require JSON;
my $sv = shift || $logger->logdie( 'Spec validator not specified!' );
my $validation_errors_ref = shift;
my $total_errors_ref = shift;
for my $xml_file ( $sv->local_xml_files ) {
my $parsed = $sv->maybe_parse_xml( $xml_file );
if ( my $error = $parsed->{error} ) {
$logger->fatal(
sprintf q{We can't validate %s since parsing failed: %s},
$parsed->{relative_file_name},
$error,
);
${ $validation_errors_ref }++;
${ $total_errors_ref }++;
next; #we don't even have valid XML file at this point, so just skip it
};
$logger->info( sprintf 'Dumping xml to json within %s', $dump_path );
my $json_filename = File::Spec->catfile(
$dump_path,
File::Basename::basename($xml_file, '.xml') . '.json'
);
File::Path::make_path( $dump_path );
open my $JSON_FH, '>', $json_filename or $logger->logdie( sprintf 'Failed to create %s: %s', $json_filename, $! );
print $JSON_FH JSON->new->pretty->encode( $parsed->{xml_in} );
if ( ! close $JSON_FH ) {
$logger->warn(
sprintf 'Failed to close %s: %s',,
$json_filename,
$!,
);
}
}
return;
}
sub compile_templates {
my $self = shift;
my $workflow = shift;
my $validation_errors_ref = shift;
my $total_errors_ref = shift;
if ( ! -d $workflow ) {
die sprintf 'The workflow path `%s` either does not exist or not a directory',
$workflow;
}
state $pass_through = [
qw(
dryrun
effective_username
internal_conf
oozie_workflows_base
process_coord_directive_varname
timeout
ttlib_base_dir
ttlib_dynamic_base_dir_name
verbose
write_ownership_to_workflow_xml
)
];
my $t = App::Oozie::Deploy::Template->new(
map { $_ => $self->$_ }
@{ $pass_through }
);
my($template_validation_errors,
$template_total_errors,
$dest,
) = $t->compile( $workflow );
my $cvc = $t->coordinator_directive_var_cache;
${ $validation_errors_ref } += $template_validation_errors;
${ $total_errors_ref } += $template_total_errors;
return $dest, $cvc;
}
sub process_templates {
my $self = shift;
my $workflow = shift || die 'No workflow path specified!';
if ( ! -d $workflow ) {
die sprintf 'The workflow path %s either does not exist or not a directory',
$workflow,
;
}
my($validation_errors, $total_errors);
my($dest, $cvc) = $self->compile_templates(
$workflow,
\$validation_errors,
\$total_errors,
);
if ( $self->write_ownership_to_workflow_xml ) {
$self->validate_meta_file(
File::Spec->catfile( $workflow, $self->meta->file ),
\$validation_errors,
\$total_errors,
{},
);
}
my $sv = $self->_get_spec_validator( $dest );
$self->__maybe_dump_xml_to_json(
$sv,
\$validation_errors,
\$total_errors,
) if $self->dump_xml_to_json;
my($spec_validation_errors, $spec_total_errors) = $sv->verify( $workflow );
$validation_errors += $spec_validation_errors;
$total_errors += $spec_total_errors;
$self->create_deployment_meta_file( $dest, $workflow, $total_errors );
if ( $validation_errors ) {
$self->logger->error( 'Oozie deployment validation status: !!!!! FAILED !!!!!' );
}
else {
$self->logger->info( 'Oozie deployment validation status: OK' );
}
return $validation_errors, $total_errors, $dest, $cvc;
}
sub validate_meta_file {
my $self = shift;
my $file = shift;
$self->logger->info( sprintf 'Extra validation for %s', $file );
return;
}
sub verify_temp_dir {
my $self = shift;
my $user_setting = $ENV{TMPDIR} || return;
my $logger = $self->logger;
# The path needs to be readable by mapred in order the deploy to be successful
# Some users have this set in their environment to paths lacking relevant
# permissions leading to failures.
#
# If the path is bogus, then by removing the setting locally in here
# will lead the temporary directory to be created inside "/tmp" by default.
#
# Otherwise it can still be altered to elsewhere by changing the
# environment by the users.
#
my $remove;
if ( ! -d $user_setting ) {
$logger->warn( sprintf q{You have TMPDIR=%s but it doesn't exist! I will ignore/remove that setting!}, $user_setting );
$remove = 1;
}
else {
my $mode = (stat $user_setting)[STAT_MODE];
my $group_read = ( $mode & S_IRGRP ) >> MODE_BITSHIFT_READ;
my $other_read = $mode & S_IROTH;
if ( ! $group_read || ! $other_read ) {
$logger->warn(
sprintf q{You have TMPDIR=%s and it is not group/other readable (mode=%04o)! I will ignore/remove that setting!},
$user_setting,
S_IMODE( $mode ),
);
$remove = 1;
}
}
delete $ENV{TMPDIR} if $remove;
return;
}
sub collect_names_to_deploy {
my $self = shift;
my $names = shift || die 'No workflow names were specified!';
my $owf_base = $self->oozie_workflows_base;
my $logger = $self->logger;
my $verbose = $self->verbose;
if ( ! is_arrayref $names ) {
die 'Workflow names need to be specified as an arrayref';
}
my(@firstLevelMatchingPatterns, @secondLevelMatchingPatterns);
my @workflow = map { trim_slashes( $_ ) } @{ $names };
my $workflowPatternCount = @workflow;
for my $w (@workflow) {
my $separators = () = $w =~ m{ [/] }xmsg;
# disallow the case with going up the tree ".." ?
if ( $separators == 0 ) {
push @firstLevelMatchingPatterns, $w;
}
elsif ($separators == 1 ) {
push @secondLevelMatchingPatterns, $w;
}
else {
my $msg = <<"MSG";
=> Only first or second-level folders inside the workflow folder can be used to
store workflows which are eligible for deployment.
The following path or pattern will, therefore, be ignored: $w\n",
MSG
$logger->info( $msg );
}
}
@firstLevelMatchingPatterns = map {
qr{ \A \Q$_\E \z }xms
}
@firstLevelMatchingPatterns;
# Transform the patterns in actual, existing directories
my @firstLevelWorkflows =
File::Find::Rule->directory
->maxdepth( 1 )
->mindepth( 1 )
->extras({
follow => 1,
follow_skip => FILE_FIND_FOLLOW_SKIP_IGNORE_DUPLICATES,
})
->name(@firstLevelMatchingPatterns)
->in( $owf_base );
#Don't want to be matching the 'workflows' part in workflows/stuff/workflow
my $workflowFolderPrefixLength = length( $owf_base ) + 1;
my @secondLevelWorkflows =
File::Find::Rule
->directory
->maxdepth( 2 )
->mindepth( 2 )
->extras({
follow => 1,
follow_skip => FILE_FIND_FOLLOW_SKIP_IGNORE_DUPLICATES,
})
->exec(
sub{
my $str = substr $_[2], $workflowFolderPrefixLength;
# might be a good idea to limit matching
# globs to the last level of folder structure
# only (e.g. no "f*g/k*s")
return grep { match_glob($_, $str) }
@secondLevelMatchingPatterns
}
)
->in( $owf_base );
for my $i ( 0..$#firstLevelWorkflows ) {
my $workflowFileLocationGuess = File::Spec->rel2abs(
$firstLevelWorkflows[$i].'/workflow.xml'
);
my $bundleFileLocationGuess = File::Spec->rel2abs($firstLevelWorkflows[$i].'/bundle.xml');
if (! -f $workflowFileLocationGuess) {
my $msg = q{It doesn't look like there's a workflow at `%s`. }
. q{I will process its subfolders, if any, instead.};
$logger->info( sprintf $msg, $firstLevelWorkflows[$i] );
my @subs = File::Find::Rule->directory
->maxdepth(1)
->mindepth(1)
->in( $firstLevelWorkflows[$i] )
;
if (@subs) {
for my $subx ( @subs ) {
$logger->debug(
sprintf 'I will additionally look for workflows in the following sub folder: %s',
$subx,
);
}
}
push @secondLevelWorkflows, @subs;
# When deploying a wf/coord, it makes no sense to upload what we have in parent,
# otherwise, yes, we'll need to update bundle.xml along with any other file on it
if (! -f $bundleFileLocationGuess) {
splice @firstLevelWorkflows, $i, 1;
$i--;
}
else {
$self->logger->debug( 'We have identified this a a bundle.' );
}
}
}
@firstLevelWorkflows = uniq @firstLevelWorkflows;
@secondLevelWorkflows = uniq @secondLevelWorkflows;
my $num_workflows = @secondLevelWorkflows + @firstLevelWorkflows;
if ( ! $num_workflows ) {
die "Exiting: found no workflows to deploy under `$owf_base`.";
}
if ($workflowPatternCount > $num_workflows) {
my @uniq_wfs = (@secondLevelWorkflows, @firstLevelWorkflows);
my @params = (
$num_workflows,
$workflowPatternCount,
join(qq{\n\t}, @{ $names } ),
join(qq{\n\t}, @uniq_wfs),
);
my $msg = sprintf <<"ERROR", @params;
Exiting: only %s workflow folders found when we expected at least %s (from the number of command-line arguments).
Expected workflows:
\t%s
Computed:
\t%s
Hint: you might have a character which could look like a dash but it is not in your arguments.
If this is the case, such an argument will be treated as a workflow name.
ERROR
;
$logger->logdie( $msg );
}
@workflow = ( @firstLevelWorkflows, @secondLevelWorkflows );
for my $wf ( @workflow ) {
$logger->info( "Workflow to be deployed: $wf" );
}
return \@workflow;
}
sub collect_data_for_deployment_meta_file {
my $self = shift;
my $workflow = shift;
my $total_errors = shift;
my $use_git = $self->gitfeatures;
my $obase = $self->oozie_workflows_base;
# returns values for "workflow" might be overridden to be absolute from
# the subclasses
my $source_dir = File::Spec->file_name_is_absolute( $workflow )
? $workflow
: File::Spec->catfile( $obase, $workflow )
;
my @meta = (
{
display => 'User',
name => 'user',
value => $self->effective_username,
},
{
display => 'Deployment date',
name => 'date',
value => $self->date->epoch_yyyy_mm_dd_hh_mm_ss( time ),
},
{
display => 'Deployment host',
name => 'host',
value => Sys::Hostname::hostname(),
},
{
display => 'Deployed directory',
name => 'source_dir',
value => $source_dir,
},
{
display => 'Validation errors',
name => 'total_errors',
value => $total_errors,
},
);
if ( $use_git ) {
my $repo_dir = $self->git_repo_path;
my $git_log = join "\n", $self->get_git_info_on_all_files_in( $workflow );
$git_log ||= 'N/A. The files were not committed';
push @meta,
{
display => 'Latest local SHA1',
name => 'git_hash',
value => scalar $self->get_latest_git_commit,
},
{
display => 'Latest workflow SHA1',
name => 'git_folder_hash',
value => scalar $self->get_git_sha1_of_folder( abs_path $workflow ) },
{
display => 'Latest git-deploy tag',
name => 'git_tag',
value => $self->get_latest_git_tag,
},
{
display => 'Git status from deployment location',
name => 'git_status',
value => scalar $self->get_git_status,
},
{
display => "Git data on files (assuming git repo to be in $repo_dir)",
name => 'git_log',
value => $git_log,
},
;
}
# We also want to have this convertible/accessible as a hash
my %seen;
for my $e ( @meta ) {
if ( ++$seen{ $e->{name} } > 1 ) {
die "$e->{name} is used more than once in the deployment meta spec!";
}
}
return \@meta;
}
sub create_deployment_meta_file {
my $self = shift;
my $path = shift || die 'No path was specified!';
my $workflow = shift || die 'No workflow was specified!';
my $total_errors = shift;
my $meta = $self->collect_data_for_deployment_meta_file( $workflow, $total_errors );
$self->write_deployment_meta_file( $path, $meta );
return;
}
sub write_deployment_meta_file {
my $self = shift;
my $path = shift;
my $meta = shift;
# only probe the single line meta data
my $max_len = max map { length $_->{display} }
grep { $_->{value} !~ m{\n}xms }
@{ $meta }
;
my $file = File::Spec->catfile( $path, $self->deployment_meta_file_name );
my $compute_meta_row = sub {
my $this_row = shift;
my($display, $value) = @{ $this_row }{qw/ display value /};
my $multi_line = $value =~ m{\n}xms;
return sprintf "%s% -${max_len}s:%s%s%s",
( $multi_line ? "\n" : EMPTY_STRING ),
$display,
( $multi_line ? "\n\n" : SPACE_CHAR ),
$value,
( $multi_line ? "\n\n" : "\n" ),
;
};
open my $FH, '>', $file or die "Could not create $file: $!";
for my $row ( @{ $meta } ) {
printf $FH $compute_meta_row->( $row );
}
if ( ! close $FH ) {
$self->logger->warn(
sprintf 'Failed to close %s: %s',,
$file,
$!,
);
}
return;
}
sub prune_path {
my $self = shift;
my $path = shift || die 'No path was specified';
my $files = $self->hdfs->list($path);
my $total_files = scalar @{ $files };
my $deleted_files = 0;
my $deploy_start = $self->deploy_start;
my $dryrun = $self->dryrun;
for my $file ( @{ $files } ) {
#next if $file->{pathSuffix} =~ /^(\.deployment|coordinator\.xml)$/;
if ( $file->{type} eq 'FILE'
&& $file->{modificationTime} / MILISEC_DIV < $deploy_start
) {
my $msg = sprintf 'Old file found in destination: %s (mtime %s) -> %s',
$file->{pathSuffix},
$self->date->epoch_yyyy_mm_dd_hh_mm_ss(
int( $file->{modificationTime} / MILISEC_DIV )
),
$dryrun ? 'would have deleted if dryrun was not specified' : 'is now deleted',
;
$self->logger->info( $msg );
$self->hdfs->delete("$path/$file->{pathSuffix}") if ! $dryrun;
$deleted_files++;
}
# check directories regardless of age
if( $file->{type} eq 'DIRECTORY' ) {
my $msg = sprintf 'Directory found in destination: %s (mtime %s) -> checking contents',
$file->{pathSuffix},
$self->date->epoch_yyyy_mm_dd_hh_mm_ss(
int( $file->{modificationTime} / MILISEC_DIV )
),
;
$self->logger->info( $msg );
#recurse down to check lower directories
my $empty = $self->prune_path("$path/$file->{pathSuffix}");
if( $empty ) {
$self->logger->info( "$file->{pathSuffix} is empty, " . ( $dryrun ? 'would have deleted if dryrun was not specified' : 'deleting' ) );
$self->hdfs->delete("$path/$file->{pathSuffix}") if ! $dryrun;
$deleted_files++;
} else {
$self->logger->info( "$file->{pathSuffix} has current files, keeping it" );
}
}
}
return ($total_files == $deleted_files);
}
sub upload_to_hdfs {
my $self = shift;
my $config = $self->internal_conf;
if ( $self->dryrun ) {
$self->logger->warn(
sprintf 'Skipping upload to HDFS as dryrun was set. Would have uploaded from %s to %s',
$config->{base_dest},
$config->{hdfs_dest},
);
return 1;
}
my $success = $self->_copy_to_hdfs_with_webhdfs($config->{base_dest}, $config->{hdfs_dest});
return $success;
}
sub _hdfs_exists_no_exception {
my $self = shift;
my $path = shift;
my $hdfs = $self->hdfs;
my $rv;
eval {
$rv = $hdfs->exists( $path );
1;
} or do {
my $eval_error = $@ || 'Zombie error';
if ( $self->verbose ) {
$self->logger->debug(
sprintf 'WebHDFS exists() failed with exception, however since this is a silent call, it is ignored: %s',
$eval_error,
)
}
};
return $rv;
}
sub _copy_to_hdfs_with_webhdfs {
my $self = shift;
my $sourceFolder = shift;
my $destFolder = shift;
my $hdfs = $self->hdfs;
my $logger = $self->logger;
my $verbose = $self->verbose;
$logger->info(
sprintf 'copying from `%s` to `%s`',
$sourceFolder,
$destFolder,
);
if ( ! $self->_hdfs_exists_no_exception( $destFolder ) ) {
if ( $verbose ) {
$logger->debug(
sprintf 'HDFS destination %s does not exist',
$destFolder,
);
}
my(undef, @paths) = File::Spec->splitpath( $destFolder );
my $remote_base;
for my $chunk ( @paths ) {
if ( $remote_base ) {
$remote_base = File::Spec->catdir( $remote_base, $chunk);
}
else {
$remote_base = $chunk;
}
if ( $self->_hdfs_exists_no_exception( $remote_base ) ) {
next;
}
if ( $verbose ) {
$logger->debug(
sprintf 'Attempting to mkdir HDFS destination %s',
$remote_base,
);
}
$hdfs->mkdir( $remote_base );
$hdfs->chmod( $remote_base, DEFAULT_DIR_MODE );
}
# since the above calls were silent, see if this throws anything
if ( $hdfs->exists($destFolder) ) {
if ( $verbose ) {
$logger->debug(
sprintf 'HDFS destination %s exists',
$destFolder,
);
}
}
}
else {
if ( $verbose ) {
$logger->debug(
sprintf 'HDFS destination %s exists',
$destFolder,
);
}
}
my $f_rule = File::Find::Rule->new->file->maxdepth(1)->mindepth(1);
my @files = $f_rule->in($sourceFolder);
foreach my $file (@files)
{
my $filename = basename($file);
my $dest = File::Spec->catfile($destFolder, $filename);
my $filehandle = path( $file );
my $data = $filehandle->slurp_raw;
if($verbose){
$logger->debug("Creating $dest");
}
$hdfs->touchz( $dest );
if ( ! $hdfs->create(
$dest,
$data,
overwrite => 'true',
)
) {
$logger->logdie(
sprintf 'Failed to create %s through WebHDFS',
$dest
);
}
$hdfs->chmod( $dest, DEFAULT_DIR_MODE );
}
my $d_rule = File::Find::Rule->new->directory->maxdepth(1)->mindepth(1);
my @folders = $d_rule->in($sourceFolder);
foreach my $folder (@folders)
{
my $foldername = basename($folder);
my $dest = File::Spec->catfile($destFolder, $foldername);
$self->_copy_to_hdfs_with_webhdfs($folder, $dest)
}
return 1;
}
1;
__END__
=pod
=encoding UTF-8
=head1 NAME
App::Oozie::Deploy
=head1 VERSION
version 0.020
=head1 SYNOPSIS
use App::Oozie::Deploy;
App::Oozie::Deploy->new_with_options->run;
=head1 DESCRIPTION
This is an action/program in the Oozie Tooling.
=for Pod::Coverage BUILD
=head1 NAME
App::Oozie::Deploy - The program to deploy Oozie workflows.
=head1 Methods
=head2 collect_data_for_deployment_meta_file
=head2 collect_names_to_deploy
=head2 compile_templates
=head2 create_deployment_meta_file
=head2 destination_path
=head2 guess_running_coordinator
=head2 max_wf_xml_length
=head2 maybe_update_coordinators
=head2 pre_verification
=head2 process_templates
=head2 process_workflow
=head2 prune_path
=head2 run
=head2 upload_to_hdfs
=head2 validate_meta_file
=head2 verify_temp_dir
=head2 write_deployment_meta_file
=head1 Accessors
=head2 Overridable from cli
=head3 dump_xml_to_json
=head3 hdfs_dest
=head3 keep_deploy_path
=head3 oozie_workflows_base
=head3 prune
=head3 sla
=head3 write_ownership_to_workflow_xml
=head2 Overridable from sub-classes
=head3 configuration_files
=head3 deploy_start
=head3 deployment_meta_file_name
=head3 email_validator
=head3 internal_conf
=head3 max_node_name_len
=head3 process_coord_directive_varname
=head3 required_tt_files
=head3 spec_queue_is_missing_message
=head3 ttlib_base_dir
=head3 ttlib_dynamic_base_dir_name
=head1 SEE ALSO
L<App::Oozie>.
=head1 AUTHORS
=over 4
=item *
David Morel
=item *
Burak Gursoy
=back
=head1 COPYRIGHT AND LICENSE
This software is copyright (c) 2023 by Booking.com.
This is free software; you can redistribute it and/or modify it under
the same terms as the Perl 5 programming language system itself.
=cut