Bivio::ShellUtil
# Copyright (c) 2000-2012 bivio Software, Inc. All rights reserved. # $Id$ package Bivio::ShellUtil; use strict; use Bivio::Base 'Collection.Attributes'; use File::Spec (); use POSIX (); # C<Bivio::ShellUtil> is the base class for command line utilities. # All shell utilities take a I<command> as their first argument # followed by zero or more arguments. I<command> must map to a # method in the subclass. The arguments are parsed by the method. # # L<setup|"setup"> creates a request from the standard # voptions (I<user>, I<db>, and I<realm>). It is called # implicitly by L<get_request|"get_request"> # # Options precede the command. See L<OPTIONS|"OPTIONS">. If the options # contain references or are C<undef>, the value is used verbatim. If # the option value is a string, it will be parsed with the C<from_literal> # of the option's type. # # For an example, see L<Bivio::Biz::Util::File|Bivio::Biz::Util::File> # and L<Bivio::Biz::Util::Filtrum|Bivio::Biz::Util::Filtrum> (less complex). # # When implementing a subclass, try to avoid assumptions about $self. # For example, don't assume $self is a reference and instead load things # on the request. As an example, in Bivio::Biz::Util::File, the volume # is loaded on the request once it is parsed from $self if it is available. # # ShellUtils can't be subclassed and commands may not begin with "handle_". # See _method_ok() below. # # # # argv : array_ref # # Unmodified argument vector. # # db : string [undef] # # Name of database to connect to. # # detach : boolean [0] # # Detach the process from standard output. Output will receive all output. # # email : string [undef] # # Where to mail the output. Uses I<result_subject>, I<result_type> and # I<result_name>, if available. If there is an exception, will email # the die as a string instead of the text result. # # force : boolean [0] # # If true, L<are_you_sure|"are_you_sure"> will always return true. # # input : string [-] # # Reads the input file. If C<->, reads from stdin. See # L<read_input|"read_input">. # # input : string_ref # # The contents of the input file. Value is returned verbatim from # L<read_input|"read_input">. # # noexecute : boolean [1] # # Won't execute any "modifying" operations. Will not call # commit on termination. # # program : string # # Name of the program sans suffix and directory. # # output : string # # Name of the file to write the output to. # # realm : string [undef] # # The auth realm in which we are operating. # # req : Bivio::Agent::Request # # Request used for the call. Initialized by L<setup|"setup">. # # result_name : string [] # # File name of the result as set by the caller I<command> method. # # result_type : string [] # # MIME type of the result as set by the caller I<command> method. # # user : string [undef or first_admin] # # The auth user used to execute I<command>. If not set and # I<realm> is set, will be implicitly set to the first_admin # as defined by # L<Bivio::Biz::Model::RealmAdminList|Bivio::Biz::Model::RealmAdminList>. b_use('IO.Trace'); our($_TRACE); my($_IDI) = __PACKAGE__->instance_data_index; # Map of class to Attributes which contains result of _parse_options() my(%_DEFAULT_OPTIONS); my($_A) = b_use('IO.Alert'); my($_C) = b_use('IO.Config'); my($_CA) = b_use('Collection.Attributes'); my($_CL) = b_use('IO.ClassLoader'); my($_DIE) = b_use('Bivio.Die'); my($_F) = b_use('IO.File'); my($_FP) = b_use('Type.FilePath'); my($_L) = b_use('IO.Log'); my($_M) = b_use('Biz.Model'); my($_TE) = b_use('Bivio.TypeError'); my($_MAP_NAME) = 'ShellUtil'; $_C->register(my $_CFG = { lock_directory => '/tmp', $_C->NAMED => { daemon_max_children => 1, daemon_sleep_after_start => 60, daemon_sleep_after_reap => 0, daemon_max_child_run_seconds => 0, daemon_max_child_term_seconds => 0, daemon_log_file => $_C->REQUIRED, }, }); my($_HANDLERS) = b_use('Biz.Registrar')->new; sub OPTIONS { # Returns a mapping of options to bivio types and default values. # # Boolean is treated specially, but all other options are parsed # with L<Bivio::Type::from_literal|Bivio::Type/"from_literal">. # If an option is C<undef>, it was passed but not set properly. # If an option does not exist, it wasn't passed. # # You should always use L<getopt|"getopt">, because # it will return C<undef> in all cases, even if called statically. # # If the default value is C<undef>, the option will not be set. # # If the option begins with a unique first letter, the single # letter version is also supported. return { #TODO: Add option -query which sets the query on the request db => ['Name', undef], detach => ['Boolean', 0], detach_log => ['Text', undef], email => ['Text', undef], force => ['Boolean', 0], input => ['Text', '-'], live => ['Boolean', 0], noexecute => ['Boolean', 0], realm => ['Line', undef], user => ['Line', undef], output => ['Line', undef], verbose => ['Boolean', 0], }; } sub OPTIONS_USAGE { return <<'EOF'; options: -db - name of database connection -detach - calls detach process before executing command -detach_log - log file to use when detached -email - who to mail the results to (may be a comma separated list) -force - don't ask "are you sure?" -input - a file to read from ("-" is STDIN) -live - don't die on errors (used in weird circumstances) -noexecute - don't commit -output - a file to write the output to ("-" is STDOUT) -realm - realm_id or realm name -user - user_id or user name EOF } sub USAGE { my($proto) = @_; # B<Subclasses may override this method to provide command details # in which case they should return the complete usage string, e.g. # # usage: b-db-util [options] command [args...] # commands: # remote_sqlplus host db_login actions # copy_logs_to_standby # recover_standby # sql2csv file.sql # switch_logs_and_count_rows die('abstract method') if $proto->package_name eq __PACKAGE__; return join("\n", 'usage: '._cleanup_command_name().' [options] command [args..]', 'commands:', map(" $_", @{$proto->shell_commands}), ''); } sub are_you_sure { my($self, $prompt) = @_; # Writes I<prompt> (default: "Are you sure?") to STDERR. User must # answer "yes", on STDIN or the routine throws an exception. # # Does not prompt if: # # * STDIN is not a tty (-t STDIN returns false) # * self is not a reference (called statically) # * -force option is true # # It is assumed STDERR is set up for autoflushing. # Not a tty? return unless -t STDIN; # Not an instance? return unless ref($self); # Force? return if $self->unsafe_get('force'); $prompt ||= 'Are you sure?'; $self->usage_error("Operation aborted") unless $self->readline_stdin($prompt." (yes or no) ") eq 'yes'; # Yes answer return; } sub arg_list { my($proto, $args, $decls) = @_; $_A->warn_deprecated('use name_args'); return $proto->name_args($decls, $args); } sub assert_dev { return $_C->assert_dev; } sub assert_have_user { my($self) = @_; $self->usage_error('must select a user with -user') unless $self->req('auth_user'); return; } sub assert_not_general { my($self) = @_; $self->usage_error('must select a realm with -realm') if $self->req('auth_realm')->is_general; return; } sub assert_not_root { my($self) = @_; # Ensure the current command-line user is not root. $self->usage_error('this utility method may not be run as root') if $> == 0; return; } sub assert_test { return $_C->assert_test; } sub command_line { my($self) = @_; # Returns the command line that was used to execute this command. return ref($self) ? join(' ', $self->unsafe_get('program') || '', map { defined($_) ? $_ : '<undef>' } @{$self->unsafe_get('argv') || []}) : 'N/A'; } sub commit_or_rollback { my($self, $abort) = @_; my($method) = $self->unsafe_get('noexecute') || $abort ? 'rollback' : 'commit'; b_use('Agent.Task')->$method($self->req); return; } sub convert_literal { my($proto, $type) = (shift, shift); # Calls L<Bivio::Type::from_literal_or_die|Bivio::Type/"from_literal_or_die"> # on I<value> by loading I<type> first. return $proto->use('Type', $type)->from_literal_or_die(@_); } sub detach_process { my($self) = @_; my($pid) = fork; die("fork: $!") unless defined($pid); return $pid if $pid; # Child my($log) = $_F->absolute_path(_detach_log($self)); open(STDIN, '< /dev/null'); open(STDOUT, "+> $log"); open(STDERR, '>&STDOUT'); select(STDERR); $| = 1; select(STDOUT); $| = 1; $_A->set_printer('FILE', $log); eval { require POSIX; POSIX::setsid(); }; return; } sub do_backticks { my($self, $command, $ignore_exit_code) = @_; my($res) = $self->piped_exec( $command, undef, $ignore_exit_code, ); return wantarray ? split(/(?<=\n)/, $$res) : $$res; } sub finish { my($self, $abort) = @_; # Calls L<commit_or_rollback|"commit_or_rollback"> and undoes setup. my($fields) = $self->[$_IDI]; $self->commit_or_rollback($abort); $self->get_request->call_process_cleanup; b_use('SQL.Connection')->set_dbi_name($fields->{prior_db}) if $fields->{prior_db}; return; } sub shell_util_request_instance { return b_use('Test.Request')->get_instance->put_durable(is_secure => 1); } sub get_request { my($self) = @_; return $self->unsafe_get('req') || $self->setup->get('req') if ref($self); my($req) = b_use('Agent.Request')->get_current; $_DIE->die('no request') unless $req; return $req; } sub group_args { my($proto, $group_size, $args) = @_; # Returns an array of I<group_size> tuples (array_refs). Calls # L<usage_error|"usage_error"> if I<args> not modulo I<group_size>. # # I<args> is modified. $proto->usage_error("arguments must come in $group_size-tuples") unless @$args % $group_size == 0; my($res) = []; push(@$res, [splice(@$args, 0, $group_size)]) while @$args; return $res; } sub handle_call_autoload { my($proto) = shift; b_die($proto, ': arguments not allowed') if @_; return $proto unless !$proto->equals_class_name('ShellUtil') and my $req = b_use('Agent.Request')->get_current; return $proto->new(\@_, $req); } sub handle_config { my(undef, $cfg) = @_; # daemon_log_file : string (named, required) # # Name of the log file for the daemon process. Will be passed to # L<Bivio::IO::Log::file_name|Bivio::IO::Log/"file_name">, so may be relative. # The log file is openned at each write to avoid collisions and to make log # rotation easier. # # daemon_max_children : int [1] (named) # # Number of children for the worker. This creates a single queue. # # daemon_max_child_run_seconds : int [0] (named) # # Maximum elapsed run-time in seconds for a single process. If zero, no maximum. # If greater than zero, child will be killed with TERM after run-time exceeded. # # daemon_max_child_term_seconds : int [0] (named) # # Elapsed run-time after kill TERM, before kill KILL is sent to the child. # # daemon_sleep_after_reap : int [0] (named) # # If 0, then L<run_daemon|"run_daemon"> calls C<wait> and blocks forever # until any children exit. This is normal behavior. # # If greater than 0, then childred are reaped by polling C<waitpid> with # C<POSIX::WNOHANG>. After all children are reaped, the reaper (run_daemon) # sleeps for I<daemon_sleep_after_reap> before doing anything else. # # daemon_sleep_after_start : int [60] (named) # # Sleep after starts and before retries. # # lock_directory : string [/tmp] # # Where L<lock_action|"lock_action"> directories are created. Must be absolute, # writable directory. $_DIE->die($cfg->{lock_directory}, ': not a writable directory') unless length($cfg->{lock_directory}) && -w $cfg->{lock_directory} && -d _; $_DIE->die($cfg->{lock_directory}, ': not absolute') unless File::Spec->file_name_is_absolute($cfg->{lock_directory}); $_CFG = $cfg; return; } sub if_option_execute { my($self, $op) = @_; return $op->() unless $self->unsafe_get('noexecute'); return; } sub initialize_fully { # Same as initialize_ui(1). return shift->initialize_ui(1); } sub initialize_ui { my($self, $fully) = @_; # Initializes the UI and sets up the default facade. This takes some time, so # classes should use this sparingly. If I<fully> is true, initializes all # facades. Otherwise, only initializes the default facade, and does not setup # tasks for execution. my($req) = $self->get_request; if ($req->can('setup_all_facades')) { b_use('Agent.Dispatcher')->initialize(!$fully); $req->setup_all_facades if $fully; } b_use('UI.Facade')->setup_request(undef, $req) unless $req->unsafe_get('UI.Facade'); $req->put_durable( task => b_use('Agent.Task')->get_by_id($req->get('task_id'))) if $req->unsafe_get('task_id'); return $req; } sub is_execute { return shift->unsafe_get('noexecute') ? 0 : 1; } sub is_loadavg_ok { my($line) = $_F->read('/proc/loadavg'); # Returns TRUE if the machine load is below a configurable # threshold. # # TODO: Make threshold configurable my(@load) = $$line =~ /^([\d\.]+)\s+([\d\.]+)\s+([\d\.]+)/; return $load[0] < 4 ? 1 : 0; } sub lock_action { my($proto, $op, $name, $no_warn) = @_; # Creates a file lock for I<name> in /tmp/. If I<name> is undef, # uses C<caller> subroutine name. The usage is: # # sub my_action { # my($self, ...) = @_; # return Bivio::ShellUtil->lock_action(sub { # do something; # }); # } # # Prints a warning of the lock couldn't be obtained. If I<op> dies, # rethrows die after removing lock. # # The lock is a directory, and is owned by process. If that process dies, # the lock is removed and re-acquired by this process. # # Returns the result of $op if lock was obtained and I<op> executed without # dying. Returns () if lock could not be acquired. # # # B<DEPRECATED USAGE BELOW> # # # Creates a file lock for I<action> in I<lock_directory>. If I<action> is undef, # uses C<caller> sub. The usage is: # # sub my_action { # my($self, ...) = @_; # return # unless $self->lock_action; # } # # This method forks a new process and returns true in the child process. # The parent waits for the child and returns. There is no timeout, so # child must be designed to be robust. # # Catches TERM signal and resignals (to previous handler) bug first removes the # lock. return _deprecated_lock_action($op || (caller(1))[3]) unless ref($op) eq 'CODE'; my($lock_dir, $lock_pid) = _lock_files($name || (caller(1))[3]); my($clean) = sub { $_F->rm_rf($lock_dir); return; }; my($this_host) = b_use('Bivio.BConf')->bconf_host_name; foreach my $retry (1, 0) { last if mkdir($lock_dir, 0700); unless ($retry) { b_warn($lock_dir, ': unable to delete lock for dead process'); _lock_warning($lock_dir) unless $no_warn; return; } my($pid, $host) = -r $lock_pid ? split(/\s+/, ${$_F->read($lock_pid)}) : (); if (($host && $host ne $this_host) || _process_exists($pid)) { _lock_warning($lock_dir) unless $no_warn; return; } b_warn($pid, ": process doesn't exist, removing ", $lock_dir); # Don't test results, because there may be contention $clean->(); } # Write host after pid to be backwards compatible with just pid format. $_F->write($lock_pid, $$ . ' ' . $this_host); my($prev) = $SIG{TERM}; local($SIG{TERM}) = sub { $clean->(); $SIG{TERM} = $prev; kill('TERM', $$); return; }; return $_DIE->catch_and_rethrow($op, $clean); } sub lock_realm { my($self) = @_; # Locks the current realm. Dies if general realm is auth_realm. # Handles re-locking existing realm. my($req) = $self->get_request; $_DIE->die("can't lock general realm") if $req->get('auth_realm')->get('type') == Bivio::Auth::RealmType->GENERAL(); b_use('Model.Lock')->execute_unless_acquired($req); return; } sub main { my($proto, @argv) = @_; # Parses its arguments. If I<argv[0]> contains is a valid public # method (definition: begins with a letter), will call it. # The rest of the arguments are passed verbatim # to this method. If an error occurs, L<usage|"usage"> is called. # # Global options precede the command and are set on the instance. # # Returns the result as a string_ref if there is a result and wantarray is true. # This backward compatible feature was added to ease testing. local($|) = 1; my(@new_args); unless (ref($proto)) { push(@new_args, shift(@argv)) if $argv[0] && $argv[0] =~ /^[A-Z]/; push(@new_args, \@argv); } # Forces a setup, if called as $self my($self) = ref($proto) ? $proto->setup(_initialize($proto, \@argv)) : $proto->new(@new_args); my($fields) = $self->[$_IDI]; $fields->{in_main} = 1; if ($self->unsafe_get('db')) { # Setup DBI connection to access a probably non-default database $self->setup(); } my($p) = $0 || ''; $p =~ s!.*/!!; $p =~ s!\.\w+$!!; $self->put(program => $p); my($cmd, $res); my($die) = $_DIE->catch(sub { if (@argv and $cmd = _method_ok($self, $argv[0])) { shift(@argv); } else { $self->usage(@argv ? ($argv[0], ': unknown command') : 'missing command'); } return; }); unless ($die) { my($pid); if ($self->unsafe_get('detach')) { $_A->info('log=', _detach_log($self)); if ($pid = $self->detach_process) { $res = "$pid\n"; $self->SUPER::delete(qw(output email req)); } } $die = $_DIE->catch(sub { $res = $self->$cmd(@argv); }) unless $pid; } $fields->{in_main} = 0; # Don't finish if setup never called. $self->finish($die ? 1 : 0) if $self->unsafe_get('req'); if ($die) { # Email error and re-throw $self->put(result_type => undef, result_subject => 'ERROR from: '.$self->command_line); _result_email($self, $cmd, $die->as_string); if ($self->unsafe_get('live')) { $_A->warn($die); return; } $die->throw(); # DOES NOT RETURN } return $res if $res = $self->result($cmd, $res) and wantarray; return; } sub model { shift->get_request; return $_M->new_other_with_query(@_); } sub name_args { my($proto, $decls, $args) = @_; my($last_decl) = $decls->[$#$decls]; my($res) = {}; return ( $proto, @{$proto->map_together(sub { my($arg, $decl) = @_; $decl ||= $last_decl; $decl = [$decl] unless ref($decl); my($name, $type, $default) = @$decl; ($default, $type) = ($type, undef) if ref($type) eq 'CODE'; my($has_default) = $name =~ s/^\?// || defined($default) || @$decl > 2; $type ||= $name; $type = "Type.$type" unless $type =~ /\W/; $type = $proto->use($type); my($v, $e) = $type->from_literal($arg); return $res->{$name} = $v if defined($v); unless ($e) { return $res->{$name} = ref($default) eq 'CODE' ? $default->($proto, $res) : $default if $has_default; $e = $_TE->NULL; } $proto->usage_error( $arg, ': invalid ', $name, ': ', $e->get_long_desc, '; see Type.', $type, "\n"); # DOES NOT RETURN }, $args, $decls)}, ); return; } sub new { my($proto, $class, $argv, $req) = @_; # Initializes a new instance with these command line arguments. if ($class && !ref($class)) { $proto = _other($proto, $class); } else { $argv = $class; $_DIE->die($proto, ': must not be called as ShellUtil->new') if $proto eq __PACKAGE__; } return _initialize($proto->SUPER::new, $argv, $req); } sub new_other { my($self, $class) = @_; # Instantiates a new ShellUtil, whose class is I<class>. Will load class # dynamically (must be fully qualified). Passes standard options from I<self> # to I<other> # Calls I<put_request> on I<other> if there's a request on I<self>, i.e. # L<get_request|"get_request"> has been called. # # If I<self> is not an instance, no options are passed (defaults will # be used in I<other>). # # You can override options by calling I<put> on I<other> after this # call returns. # explicit die if not found # ClassLoader calls throw_quietly() which has no output my($c) = _other($self, $class); my($options) = []; if (ref($self)) { my($standard) = __PACKAGE__->OPTIONS(); while (my($k, $v) = each(%$standard)) { if ($v->[0] eq 'Boolean') { push(@$options, '-'.$k) if $self->unsafe_get($k); } else { # We don't pass undef options. my($actual) = $self->unsafe_get($k); push(@$options, '-'.$k, $actual) if defined($actual) != defined($v) || defined($v) && $v ne $actual; } } } my($other) = $c->new($options); $other->put_request($self->get_request) if $self->unsafe_get('req'); $other->put(program => $self->unsafe_get('program')) if ref($self) && $self->has_keys('program'); return $other; } sub piped_exec { my(undef, $command, $input, $ignore_exit_code) = @_; # Runs I<command> with I<input> (or empty input) and returns output. # I<input> may be C<undef>. # # Throws exception if it can't write the input. Throws exception if the # command returns a non-zero exit result unless ignore_exit_code is # specified. The L<$_DIE|$_DIE> has an I<exit_code> attribute. my($in) = ref($input) ? $input : \$input; $$in = '' unless defined($$in); my($pid) = open(IN, "-|"); defined($pid) || die("fork: $!"); #TODO: Use IO::File and $_F unless ($pid) { $_HANDLERS->call_fifo('handle_piped_exec_child'); (ref($command) eq 'ARRAY' ? open(OUT, '|-', @$command) : open(OUT, "| exec $command") ) || $_DIE->die($command, ": open failed: $!"); print(OUT $$in); close(OUT); # If there is a signal, return 99. Otherwise, return exit code. CORE::exit($? ? ($? >> 8) ? ($? >> 8) : 99 : 0); } my($res); if ($_TRACE) { _trace('START: ', $command); while (defined(my $line = <IN>)) { $res .= $line; _trace($line); } _trace('END: ', $command); } else { local($/) = undef; $res = <IN>; } # May be undef $res .= ''; unless (close(IN)) { $_DIE->throw_die('DIE', { message => 'command died with non-zero status', entity => $command, input => $in, output => \$res, exit_code => $?, }) unless $ignore_exit_code; } return \$res; } sub piped_exec_remote { my($self, $host, $command, $input, $ignore_exit_code) = @_; # Run I<command> remotely using C<ssh>. Returns result. Assumes remote shell # understands single quote escaping. if (defined($host)) { $command =~ s/'/'\''/g; $command = "ssh $host '($command) && echo OK$$'"; } my($res) = $self->piped_exec($command, $input, $ignore_exit_code); $_DIE->throw_die('DIE', { message => 'remote command failed', host => $host, entity => $command, output => $res, }) unless $$res =~ s/OK$$\n// || $ignore_exit_code; return $res; } sub print { # Writes output to STDERR. Returns result of print. # This method may be overriden. shift; return print(STDERR @_); } sub print_line { return shift->print(@_, "\n"); } sub put { my($self) = shift; # If called statically, has no effect. Otherwise, just calls # L<Bivio::Collection::Attributes::put|Bivio::Collection::Attributes/"put">. return unless ref($self); return $self->SUPER::put(@_); } sub put_request { my($self, $req) = @_; # Puts I<req> on I<self> and modifies other values appropriately. # Sets the current request to I<req>. return $self->put(req => $req); } sub read_input { my($self) = @_; # Returns the contents if I<input> argument. If no argument, reads # from STDIN. If I<input> is a ref, just return that. my($input) = $self->get('input'); return ref($input) ? $input : $_F->read($input); } sub readline_stdin { my($self, $prompt) = @_; # Prints I<prompt>, and returns answer stripped of leading and trailing # whitespace. $self->print($prompt); my $answer = <STDIN>; chomp($answer); $answer =~ s/^\s+|\s+$//g; return $answer; } sub ref_to_string { return shift->use('IO.Ref')->to_string(@_); } sub register_handler { shift; $_HANDLERS->push_object(@_); return; } sub required_main { my($proto, $class, @args) = @_; my($pkgs) = $_CL->list_simple_packages_in_map($_MAP_NAME); $proto->usage_error( join("\n", 'first argument must be a class name. Available classes:', @$pkgs, ) . "\n" ) unless $class; if ($proto->is_simple_package_name($class)) { my($c) = grep(/^\Q$class\E$/i, @$pkgs); $proto->usage_error($class, ": class not found in $_MAP_NAME map") unless $c; $class = $c; } return ref($proto->new($_CL->map_require($_MAP_NAME => $class))) ->main(@args); } sub result { my($self, $cmd, $res) = @_; # Processes I<res> by sending via I<email> and writing to I<output> # or printing to STDOUT. Returns a reference to result or undef. $res = _result_ref($self, $res); return undef unless $res; print(STDOUT $$res, $$res =~ /\n$/s ? () : "\n") unless _result_email($self, $cmd, $res) + _result_output($self, $cmd, $res); return $res; } sub run_daemon { my($self, $next_command, $cfg_name) = @_; # Starts a collection of processes using config defined by # I<cfg_name> (see L<handle_config|"handle_config">. $self->get_request; my($cfg) = $_C->get($cfg_name); # Makes log rotating simple: All processes share a log $_A->set_printer('FILE', $_L->file_name($cfg->{daemon_log_file})) if $cfg->{daemon_log_file}; _check_cfg($cfg, $cfg_name); my($children) = {}; my($ref) = b_use('IO.Ref'); while (1) { my($max_duplicates) = $cfg->{daemon_max_children}; while (keys(%$children) < $cfg->{daemon_max_children}) { my($args) = $next_command->(); last unless $args; _reap_daemon_children($children, 0, 0, $cfg); if (grep( $ref->nested_equals($args, $_->{args}), values(%$children), )) { _trace('already running: ', $args) if $_TRACE; # protects against infinite loop when daemon_max_children # is greater than the number of jobs. last if --$max_duplicates <= 0; } else { $children->{_start_daemon_child($self, $args, $cfg)} = { args => $args, $cfg->{daemon_max_child_run_seconds} > 0 ? (max_time => time + $cfg->{daemon_max_child_run_seconds}) : (), }; sleep($cfg->{daemon_sleep_after_start}); } } return unless %$children; _reap_daemon_children( $children, $cfg->{daemon_sleep_after_reap} > 0 ? (0, $cfg->{daemon_sleep_after_reap}) : (wait, 0), $cfg, ); } return; } sub send_mail { my($self, $email, $subject, $body) = @_; my($msg) = b_use('Mail.Outgoing')->new; my($req) = $self->get_request; $msg->set_recipients($email, $req); $msg->set_header('Subject', $subject); $msg->set_header('To', $email); $msg->set_from_with_user($req); if (ref($body) eq 'CODE') { $body->($msg); } elsif ($_CL->was_required('Model.RealmFile') && b_use('Model.RealmFile')->is_blesser_of($body), ) { $msg->set_content_type('multipart/mixed'); $msg->attach({ content => $body->get_content, content_type => $body->get_content_type, filename => $_FP->get_tail($body->get('path')), }); } elsif (ref($body) eq 'SCALAR') { $msg->set_body($body); } else { b_die($body, ': invalid message body type'); } $msg->send($req) unless $self->unsafe_get('noexecute'); return; } sub set_realm_and_user { my($self, $realm, $user) = @_; $realm = b_use('Auth.Realm')->get_general() unless defined($realm); my($req) = $self->get_request; $req->set_realm($realm); if (defined($user)) { $req->set_user($user); return $self; } $self->set_user_to_any unless $req->get('auth_realm')->is_general; return $self; } sub set_user_to_any { return _any_user(unsafe_get_any_online_admin => @_); } sub set_user_to_any_online_admin { return _any_user(get_any_online_admin => @_); } sub setup { my($self) = @_; # Configures the environment for request. Does nothing if already setup. my($fields) = $self->[$_IDI]; $fields->{in_main} ? _setup_for_main($self) : _setup_for_call($self); return $self; } sub shell_commands { my($proto) = @_; no strict qw(refs); my($cmd_usage) = sub { my($cmd) = @_; my($fn) = $proto->can("${cmd}_USAGE"); $cmd .= ' '.$fn->() if ref($fn) eq 'CODE'; return $cmd; }; return [ map($cmd_usage->($_), sort( grep(!/(^b_|^_|^[A-Z0-9_]+$|_USAGE$)/ && *{$proto->package_name.'::'.$_}{CODE}, keys(%{*{$proto->package_name.'::'}}))))]; } sub unauth_model { shift->get_request; return $_M->new_other_with_query(@_); } sub unauth_realm_id { return shift->model('RealmOwner')->unauth_load_and_get_id(@_); } sub unsafe_get { my($self) = shift; return $self->SUPER::unsafe_get(@_) if ref($self); $_DEFAULT_OPTIONS{$self} = $_CA->new(_parse_options($self, [])) unless $_DEFAULT_OPTIONS{$self}; return $_DEFAULT_OPTIONS{$self}->unsafe_get(@_); } sub usage { my($proto) = shift; $proto->usage_error(@_, "\n", $proto->USAGE, $proto->OPTIONS_USAGE); # DOES NOT RETURN } sub usage_error { my(undef, @args) = @_; my($msg) = $_A->format_args(@args); $_A->print_literally('ERROR: ', $msg); $_DIE->throw_quietly('DIE', {message => $msg}); # DOES NOT RETURN } sub write_file { my(undef, $file_name, $contents) = @_; # DEPRECATED: See L<$_F::write|$_F/"write"> $_A->warn_deprecated('use $_F->write'); return $_F->write($file_name, $contents); } sub _any_user { my($method, $self) = @_; $self->req->set_user( $self->model('RealmUser')->$method() || $self->model('User') ->do_iterate(sub {0}, unauth_iterate_start => 'user_id asc') ->unsafe_get('user_id') || $self->unauth_model(RealmOwner => {name => 'user'}) ->unsafe_get('realm_id'), ); $self->put(user => my $u = $self->ureq(qw(auth_user name))); return $u; } sub _check_cfg { my($cfg, $cfg_name) = @_; # Asserts config is valid while (my($k, $v) = each(%$cfg)) { next unless $k =~ /_(?:sleep|max|child)_/; next if $v =~ /^\d+$/ && $v >= 0; my($dv) = $_CFG->{$_C->NAMED}->{$k}; $_A->warn($v, ': bad value for ', ($cfg_name ? "$cfg_name." : ''), $k, '; using ', $dv); $cfg->{$k} = $dv; } if ($cfg->{daemon_max_child_run_seconds} > 0 && $cfg->{daemon_sleep_after_reap} <= 0) { $_A->warn('daemon_sleep_after_reap must be non-zero', ' when daemon_max_child_run_seconds is non-zero; using 1 second'); $cfg->{daemon_sleep_after_reap} = 1; } return; } sub _cleanup_command_name { $0 =~ qr{([-\w]+)$}; #exclude path to $0 in output return _special_handling_to_append_argv0_for_bivio($1); } sub _compile_options { my($self) = @_; # Compiles the options string. Returns a map of options to declarations # as a hash_ref and an array_ref of the declarations. A declaration # is an array_ref (name, type, default). my($options) = $self->OPTIONS; return ({}, []) unless $options && keys(%$options); my($map) = {}; my($opts) = []; foreach my $k (keys(%$options)) { die("$k: options must be valid perl idents with at least on character") unless $k =~ /^[a-z]\w+$/i; my($first) = substr($k, 0, 1); my($type, $default) = @{$options->{$k}}; my($opt) = [$k, b_use(Type => $type)]; $opt->[2] = _parse_option_value($self, $opt, $default); $map->{$first} = exists($map->{$first}) ? 0 : $opt; $map->{$k} = $opt; push(@$opts, $opt); } while (my($k, $v) = each(%$map)) { delete($map->{$k}) unless $v; } return ($map, $opts); } sub _deprecated_lock_action { my($action) = @_; # Implements deprecated form of lock_action. my($dir) = _lock_files($action); unless (mkdir($dir, 0700)) { _lock_warning($dir); return 0; } my($pid) = fork; defined($pid) || die("fork: $!"); return 1 unless $pid; # Parent process waits for child to finish my($res) = waitpid($pid, 0) == -1 ? undef : $?; # Don't need an error check; rather have $res always returned rmdir($dir); die("waitpid failed: $!\nsomething seriously wrong") unless defined($res); die("$action failed\n") if $res; # Tell caller to return, not exit() return 0; } sub _detach_log { my($self) = @_; return $self->get_if_exists_else_put(detach_log => sub { return $_F->absolute_path( b_use('Type.DateTime')->local_now_as_file_name . '-' . $self->get('program') . '.log', ); }); } sub _initialize { my($self, $argv, $req) = @_; # Initializes the instance with the appropriate params. $argv ||= []; my($orig_argv) = [@$argv]; $self->[$_IDI] ||= {}; return $self->put( %{_parse_options($self, $argv)}, argv => $orig_argv, )->put_request($req); } sub _lock_files { my($name) = @_; # Returns the $name converted to (lock_dir, lock_pid) # Strip illegal chars $name =~ s{@{[b_use('Type.FileName')->ILLEGAL_CHAR_REGEXP]}+}{}g; my($d) = File::Spec->catdir($_CFG->{lock_directory}, "$name.lockdir"); return ($d, File::Spec->catfile($d, 'pid')); } sub _lock_warning { my($lock_dir) = @_; my($seconds) = (stat($lock_dir))[9]; b_warn( $lock_dir, ': not acquired; lock age=', defined($seconds) ? ((time - $seconds) . 's') : 'unknown', ); return; } sub _method_ok { my($self, $method) = @_; return undef unless $method =~ /^([a-z]\w*)$/i; return undef if $method =~ /^handle_/; return $method if $method eq 'usage'; foreach my $c (ref($self), @{$self->inheritance_ancestors}) { last if $c =~ /::ShellUtil$/; my($m) = $c->grep_subroutines(qr{^(?:u_)?\Q$method\E$}); return $m->[0] if @$m; } return undef; } sub _monitor_daemon_children { my($children, $cfg) = @_; # Monitor children max_time if daemon_max_child_run_seconds is greater than # zero. return unless $cfg->{daemon_max_child_run_seconds} > 0; my($t) = time; while (my($pid, $child) = each(%$children)) { next if $t < $child->{max_time}; my($sig) = $child->{kill_term}++ ? 'KILL' : 'TERM'; kill($sig, $pid); $_A->info("Sent SIG$sig: pid=", $pid, ' args=', join(' ', splice(@{$child->{args}}, 2))); $child->{max_time} = $t + $cfg->{daemon_max_child_term_seconds} } return; } sub _other { my($self, $class) = @_; my($die); $class = "$_MAP_NAME.$class" if $self->is_simple_package_name($class); return $_DIE->catch_quietly( sub {b_use($class)}, \$die, ) || $_DIE->die($class, ": not found or syntax error: ", $die); } sub _parse_option_value { my($self, $opt, $value) = @_; # Returns the options that were set. return $value if ref($value) || !defined($value); my($v, $e) = $opt->[1]->from_literal($value); $self->usage("-$opt->[0] '$value': ", $e->get_long_desc) if $e; return $v; } sub _parse_options { my($self, $argv) = @_; # Returns the options that were set. my($res) = {}; my($map, $opts) = _compile_options($self); return {} unless %$map; # Parse the options while (@$argv && $argv->[0] =~ /^-/) { my($k) = shift(@$argv); $k =~ s/^-//; my($opt) = $map->{$k}; $self->usage("-$k: unknown option") unless $opt; if ($opt->[1] eq 'Bivio::Type::Boolean') { $res->{$opt->[0]} = 1; next; } $self->usage("-$k: missing an argument") unless @$argv; $res->{$opt->[0]} = _parse_option_value($self, $opt, shift(@$argv)); } # Set the (defined) defaults foreach my $opt (@$opts) { next if exists($res->{$opt->[0]}); next unless defined($opt->[2]); $res->{$opt->[0]} = $opt->[2]; } $res->{output} = $_F->absolute_path($res->{output}) if defined($res->{output}) && $res->{output} ne '-'; _trace($res) if $_TRACE; return $res; } sub _parse_realm { my($self, $attr) = @_; # Returns the id or undef for realm. return undef unless my $realm = $self->unsafe_get($attr); my($ro) = $self->model('RealmOwner'); $self->usage_error($realm, ': no such ', $attr) unless $ro->unauth_load_by_email_id_or_name($realm); return $ro; } sub _process_exists { my($pid) = @_; return 0 unless $pid; # Returns true if $pid exists $! = undef; return kill(0, $pid) || $! != POSIX::ESRCH() ? 1 : 0; } sub _reap_daemon_children { my($children, $stopped, $sleep, $cfg) = @_; # Reap children without blocking while (1) { if ($stopped > 0) { if (my $child = delete($children->{$stopped})) { $_A->info('Stopped: pid=', $stopped, ' args=', join(' ', splice(@{$child->{args}}, 2))); } else { $_A->warn($stopped, ': unknown pid'); } } $stopped = waitpid(-1, POSIX::WNOHANG()); last unless $stopped > 0; } _monitor_daemon_children($children, $cfg); sleep($sleep) if $sleep; return; } sub _result_email { my($self, $cmd, $res) = @_; # Emails the result if there is an email option (returns true in that case). my($email) = $self->unsafe_get('email'); return 0 unless $email; my($name, $type, $subject) = $self->unsafe_get( qw(result_name result_type result_subject)); $self->send_mail( $email, $subject || $name || 'Output from: '.$self->command_line(), sub { my($msg) = @_; if ($type) { $msg->set_content_type('multipart/mixed'); # Can't use -B and couldn't get IO::Scalar to work. # Just assume is binary $msg->attach($res, $type, $name || $cmd, 1); } else { $msg->set_body($res); } return; } ); return 1; } sub _result_output { my($self, $cmd, $res) = @_; # Returns true if there is an output option and it is written to a file. my($output) = $self->unsafe_get('output'); return 0 unless $output; $_F->write($output, $res); return 1; } sub _result_ref { my($self, $res) = @_; # Returns a scalar reference to the result or undef if no result to print. # Will print any structure. return undef unless defined($res); my($ref) = \$res; if (ref($res)) { return $self->ref_to_string($res) unless ref($res) eq 'SCALAR'; $ref = $res; } return defined($$ref) && length($$ref) ? $ref : undef; } sub _setup_for_call { my($self) = @_; # Called from within a program. Request must be setup already or dies. # Doesn't allow certain attributes. Sets user and realm only if passed # explicitly. my($req) = b_use('Agent.Request')->get_current; $_DIE->die(ref($self), ": called without first creating a request") unless $req; $self->put_request($req); foreach my $x (qw(realm user)) { next unless my $v =_parse_realm($self, $x); my($m) = "set_$x"; $req->$m($v); } foreach my $attr (qw(db)) { $_DIE->die($attr, ': cannot pass to ', ref($self), ' call') if $self->unsafe_get($attr); } return; } sub _setup_for_main { my($self) = @_; # Called from "main". Always creates a Job::Request. Initializes db. # Sets realm/user. my($fields) = $self->[$_IDI]; my($db, $user, $realm) = $self->unsafe_get(qw(db user realm)); my($p) = b_use('SQL.Connection')->set_dbi_name($db); $fields->{prior_db} = $p unless $fields->{prior_db}; $self->put_request( b_use('Test.Request')->get_instance->put_durable(is_secure => 1), ) unless $self->unsafe_get('req'); $self->set_realm_and_user(map(_parse_realm($self, $_), qw(realm user))); return; } sub _special_handling_to_append_argv0_for_bivio { my($cmd) = @_; return $cmd eq 'bivio' ? "$1 $ARGV[0]" : $cmd; } sub _start_daemon_child { my($self, $args, $cfg) = @_; # Starts child process, appending to log. Returns pid. # Force a reconnect for both child and parent; avoids errors in # logs for parent. b_use('SQL.Connection')->disconnect; $_A->reset_warn_counter; RETRY: { my($child) = fork; unless (defined($child)) { b_warn($args, " fork: $!; sleeping before retry"); sleep($cfg->{daemon_sleep_after_start}); redo RETRY; } if ($child) { _trace('started: ', $child, ' ', $args) if $_TRACE; return $child; } $self->get_request->clear_current; $0 = join(' ', @$args); $_A->info('Starting: pid=', $$, ' args=', join(' ', @$args[2 .. $#$args])); # Reset so we can send signals in _monitor_daemon_children() local($SIG{TERM}) = 'DEFAULT'; $args->[0]->main(@$args[1 .. $#$args]); CORE::exit(0); } } 1;