#!/usr/local/bin/perl -w # # spawn -- Run commands in parallel on otherwise idle hosts # John Harper # # $Id: spawn,v 1.16 1999/08/31 13:48:43 john Exp $ # # spawn OPTIONS... # # Reads commands to execute from standard input (each line equals one shell # command). Executes these commands in parallel (if possible) on all HOSTS. # # For a command to be run on a host, that host must not have anyone logged # onto its console, or that user must be idle for at least MINUTES. # # Note that each command may be run several times if necessary (i.e. if a # host goes down with a command running). use Getopt::Std; use FileHandle; use Cwd; use POSIX "sys_wait_h"; ######################################################################## # Standard lists of hosts @cs017_hosts = ("diamond", "zephyr", "khamsin", "hurricane", "bise", "squall", "blizzard", "simoom", "chinook", "curry"); @cs104_hosts = ("sapphire", "scatterbrain", "rush", "naughty", "chatterbox", "nosey", "daydream", "uppity", "tickle", "messy", "bounce", "sneeze", "silly", "mischief", "impossible", "bossy", "nonsense", "mustard"); @hpsg_hosts = ("vimto", "tizer", "sprite", "coke", "burroughs"); @all_hosts = (); push @all_hosts, @cs104_hosts; push @all_hosts, @cs017_hosts; @hosts = @all_hosts; %host_groups = (cs017 => \@cs017_hosts, cs104 => \@cs104_hosts, hpsg => \@hpsg_hosts, all => \@all_hosts); ######################################################################## # Configurable parameters # Number of minutes idle before a machine is considered available $def_minidle = 30; # Maximum value of the five-minute load average for starting jobs $def_maxloadav = 0.9; # Number of minutes between checking a machine's idle time and load avg $def_recheck = 5; # If a machine is down, wait at least this many minutes before retrying $def_down_recheck = 30; # Number of seconds to wait for output before assuming the machine is down $def_timeout = 60; # Nice level of spawned remote processes $def_nicelevel = 10; # +ve means output gobs of debugging info # -ve means output nothing at all $verbose = 0; # Programs to use $finger_program = "/usr/bin/finger"; $rup_program = "/usr/bin/rup"; $rsh_program = "/usr/bin/rsh"; $shell = "/bin/sh"; $nice_program = "/usr/bin/nice"; # Column position of relevant finger output $finger_tty_idx = 31; $finger_tty_len = 12; $finger_idle_idx = 43; $finger_idle_len = 5; # Big numbers used when we don't know the exact value for some reason $idle_bignum = 65536 * 60; $loadav_bignum = 100; $uptime_bignum = 24 * 60 * 60; ######################################################################## # Parse command line if (! getopts('h:s:i:u:n:g:f:w:vqVpc')) { my $host_list = join ',', @hosts; my $host_group_list = join ',', keys %host_groups; print <) { # Skip blank lines and comments next if /^#/; next if /^\s*$/; chop; push @hosts, $_; } } } if (defined $opt_s) { $shell = $opt_s; } if (defined $opt_i) { $def_minidle = $opt_i; } if (defined $opt_u) { $def_maxloadav = $opt_u; } if (defined $opt_n) { $def_nicelevel = $opt_n; } if (defined $opt_f) { open STDIN, $opt_f or die "Can't open $opt_f: $!\n"; } if (defined $opt_v) { $verbose = $opt_v; } if (defined $opt_q) { $verbose = -$opt_q; } if (defined $opt_V) { $opt_V = 0; # suppress warning print '$Id: spawn,v 1.16 1999/08/31 13:48:43 john Exp $', "\n"; exit 0; } ######################################################################## # Initialise variables and signal handlers # Initial working directory. $local_wd = cwd(); # FIFO queue of commands still to be run @command_queue = (); $watchdog_pid = 0; # Map hosts to remote working directories. In the future there'll be # a means to set this by hostname. (To support hosts that don't share # the same filesystem.) @host_wd = ((defined $opt_w) ? $opt_w : $local_wd) x scalar(@hosts); # Array of commands currently being executed on each host @host_command = (0) x scalar(@hosts); # Array of pids for each host with a command @host_pids = (0) x scalar(@hosts); # Local time at which each command was started @host_start_times = (0) x scalar(@hosts); # Idle time per host, -1 if uninitialised @host_idle_time = (-1) x scalar(@hosts); # 15-minute load avg per host @host_load_avg = (-1) x scalar(@hosts); # Time the host's been up (seconds) @host_uptime = (-1) x scalar(@hosts); # Time we last checked each host for idleness @host_checked_at = (0) x scalar(@hosts); # Map of hosts that aren't responding. If non-zero the value # is the time at which they were last tried @host_down = (0) x scalar(@hosts); @host_minidle = ($def_minidle) x scalar(@hosts); @host_maxloadav = ($def_maxloadav) x scalar(@hosts); @host_recheck = ($def_recheck) x scalar(@hosts); @host_down_recheck = ($def_down_recheck) x scalar(@hosts); @host_timeout = ($def_timeout) x scalar(@hosts); @host_nicelevel = ($def_nicelevel) x scalar(@hosts); # Number of running subprocesses $running_commands = 0; # Set signal handlers $got_sigchld = 0; $SIG{'CHLD'} = sub { $got_sigchld++; }; $SIG{'ALRM'} = sub { }; $got_sigusr1 = 0; $SIG{'USR1'} = sub { $got_sigusr1++; }; $got_sigusr2 = 0; $SIG{'USR2'} = sub { $got_sigusr2++; }; # Work through list of hosts, extracting per-host options where present for (my $i = 0; $i <= $#hosts; $i++) { $_ = $hosts[$i]; if (/^([a-zA-Z0-9.-]+)(\s+|:)(.*)$/) { # Hostname contains options $hosts[$i] = $1; my $opts = $3; while ($opts =~ /^\s*([a-zA-Z0-9_-]+)=([^, \t]+),?(.*)$/) { my $opt = $1; my $arg = $2; $opts = $3; if ($opt eq "idle") { $host_minidle[$i] = $arg; } elsif ($opt eq "loadav") { $host_maxloadav[$i] = $arg; } elsif ($opt eq "recheck") { $host_recheck[$i] = $arg; } elsif ($opt eq "down-recheck") { $host_down_recheck[$i] = $arg; } elsif ($opt eq "timeout") { $host_timeout[$i] = $arg; } elsif ($opt eq "nice") { $host_nicelevel[$i] = $arg; } elsif ($opt eq "wd") { $host_wd[$i] = $arg; } else { die "Unknown option for host: $hosts[$i], $opt\n"; } } } } ######################################################################## # Main control loop print STDERR "[Hosts] @hosts\n" if $verbose > 0; if (defined $opt_p) { $opt_p = 1; #suppress warning # Just ping each host printf "%-20s %8s %8s %8s\n", "Host", "Idle", "Load", "Ready"; for (my $i = 0; $i <= $#hosts; $i++) { &idle_time($i); my $idle = $host_idle_time[$i]; my $avail = (($host_idle_time[$i] >= $host_minidle[$i] && $host_load_avg[$i] < $host_maxloadav[$i]) ? 'Y' : 'N'); if ($idle == $idle_bignum) { $idle = '-'; } if ($host_load_avg[$i] != $loadav_bignum) { printf "%-20s %8s %8s %8s\n", $hosts[$i], $idle, $host_load_avg[$i], $avail; } } exit 0; } if (defined $opt_c) { $opt_c = 1; #suppress warning $| = 1; for (my $i = 0; $i <= $#hosts; $i++) { &idle_time($i); if ($host_uptime[$i] > 0) { print "$hosts[$i]: "; system ("$rsh_program $hosts[$i] uname -a"); if ($? >> 8 != 0) { print "failed\n"; } } } exit 0; } # Start the watchdog process &watchdog; my $got_eof = 0; do { my $command = 0; &check_signals; if (@command_queue) { # Read next command from head of queue $command = pop @command_queue; } elsif (!$got_eof) { # Read next command from standard input $command = &read_command; if (!$command) { $got_eof = 1; } } if ($command) { my $hostid = &get_host; &start_command($command, $hostid) } elsif (&reaper == 0) { &sleep_for(5 * 60) unless $got_sigchld; } } while (!$got_eof || $running_commands > 0 || @command_queue); if ($watchdog_pid) { &kill_process($watchdog_pid); } exit 0; ######################################################################## # Subroutines # read_command => COMMAND to execute or null sub read_command { TOP: $_ = ; if (! $_) { return 0; } # Skip blank lines and comments goto TOP if /^#/; goto TOP if /^\s*$/; chop; # Concatenate lines ending in a backslash while (/\\$/) { chop; $_ .= ; chop; } return $_; } # &get_host => HOST-IDX sub get_host { while (1) { do { &reaper; for (my $i = 0; $i <= $#hosts; $i++) { # Check idle time even if the host is in use. &idle_time($i); if (! $host_command[$i]) { # No command is running on host I if($verbose > 0) { print STDERR "$hosts[$i]: $host_idle_time[$i] minutes idle"; print STDERR ", $host_load_avg[$i] load avg"; print STDERR ", up $host_uptime[$i] seconds\n"; } if ($host_idle_time[$i] >= $host_minidle[$i] && $host_load_avg[$i] < $host_maxloadav[$i]) { print STDERR "$hosts[$i]: available\n" if $verbose > 0; return $i; } } } } while ($got_sigchld); # Nothing to do. Go to sleep. If any child processes # exit, the SIGCHLD will wake us up print STDERR "[no hosts available; sleeping...]\n" if $verbose > 0; # Make this as atomic as possible.. &sleep_for(5 * 60) unless $got_sigchld; &check_signals; } } # &start_command COMMAND HOST-IDX => PID sub start_command { my $command = shift; my $hostid = shift; my $wd = $host_wd[$hostid]; if (my $pid = fork) { # parent $host_command[$hostid] = $command; $host_pids[$hostid] = $pid; $host_start_times[$hostid] = time; $running_commands++; return $pid; } elsif (defined $pid) { # child sysopen(STDIN, "/dev/null", 0); if ($verbose >= 0) { my $out = "$hosts[$hostid]: $command\n"; syswrite(STDERR, $out, length($out)); } my $cmd = "$shell -c 'cd $wd; $command'"; if ($host_nicelevel[$hostid] != 0) { $cmd = "$nice_program -n $host_nicelevel[$hostid] $cmd"; } exec $rsh_program, $hosts[$hostid], $cmd; die "Can't exec $rsh_program: $!\n"; } else { die "Can't fork: $!\n"; } } # &watchdog # This process sends a SIGUSR1 every half an hour to the main process. # I think I may have fixed the bug that was causing the problem anyway... sub watchdog { if (!$watchdog_pid) { if (my $pid = fork) { # parent $watchdog_pid = $pid; if ($verbose > 0) { print STDERR "[watchdog] running on pid $watchdog_pid\n" } } elsif (defined $pid) { # child sysopen(STDIN, "/dev/null", 0); while (1) { select undef, undef, undef, 30 * 60; if ($verbose > 0) { syswrite(STDERR, "watchdog: signalling parent...\n", length("watchdog: signalling parent...\n")); } kill 'USR1', getppid(); } } else { die "Can't fork: $!\n"; } } } # &reaper => NUMBER-EXITED sub reaper { my $count = 0; # Better to set this before calling waitpid, and maybe get called # again than set after waitpid, and maybe lose the signal $got_sigchld = 0; # Call waitpid for each process. wait may lose some pids, and we'd # have to search for each pid anyway. for (my $i = 0; $i <= $#hosts; $i++) { if ($host_command[$i]) { if (waitpid($host_pids[$i], WNOHANG) > 0) { my $rc = $? >> 8; print STDERR "$hosts[$i]: exited ($rc)\n" if $verbose > 0; if ($rc != 0) { # Command failed to be executed; mark the host # as being down.. $host_down[$i] = time(); $host_idle_time[$i] = 0; $host_load_avg[$i] = $loadav_bignum; $host_uptime[$i] = 0; # ..and put the command back on to the queue push @command_queue, $host_command[$i]; print STDERR "** $hosts[$i]: rsh failed; requeuing command\n" if $verbose > 0; } $host_command[$i] = 0; $running_commands--; $count++; } else { # Command hasn't exited. Has the host bounced since the # command was started? &idle_time($i); if ($host_uptime[$i] < (time() - $host_start_times[$i])) { # 'Fraid so, requeue the command print STDERR "** $hosts[$i]: bounced, requeued command\n" if $verbose >= 0; &kill_process($host_pids[$i]); push @command_queue, $host_command[$i]; $host_command[$i] = 0; $running_commands--; } } } } if ($count > 0 && $verbose > 0 && $running_commands > 0) { print STDERR "[$running_commands running]"; for (my $i = 0; $i <= $#hosts; $i++) { if ($host_command[$i]) { print STDERR " $hosts[$i]"; } } print STDERR "\n"; } return $count; } sub check_signals { if ($got_sigusr1 > 0) { # Recheck all hosts for idle ness @host_checked_at = (0) x scalar(@hosts); $got_sigusr1 = 0; $got_sigchld++; } if ($got_sigusr2 > 0) { # Recheck all hosts that are though of as being down @host_down = (0) x scalar(@hosts); @host_checked_at = (0) x scalar(@hosts); $got_sigchld++; } } # &idle_time HOST-IDX sub idle_time { my $hostid = shift; if ($host_down[$hostid] && $host_down[$hostid] + $host_down_recheck[$hostid]*60 > time()) { # Host is down, and not yet time to recheck return; } if ($host_checked_at[$hostid] && $host_checked_at[$hostid] + $host_recheck[$hostid]*60 < time()) { # Not time to requery yet return } my $handle = new FileHandle; my $pid; $host_idle_time[$hostid] = $idle_bignum; $host_load_avg[$hostid] = $loadav_bignum; $host_uptime[$hostid] = $uptime_bignum; # Finger the host.. $pid = open($handle, "$finger_program \@$hosts[$hostid] /dev/null |") or die "Can't open finger connection: $!\n"; # then grovel in the output for a user on the console. This # assumes the Solaris output format :-( # First line should be '[HOST]' $_ = &read_or_timeout($handle, $host_timeout[$hostid]) or goto error; /^\[.+\]\s*$/ or goto error; # Second line is headings; ignore &read_or_timeout($handle, $host_timeout[$hostid]) or goto error; # Keep reading lines until EOF, or find console login # Don't use timeouts? Can we assume the remote host is up now? while (<$handle>) { my $tty = substr $_, $finger_tty_idx, $finger_tty_len; if ($tty =~ /^\s*console\s*$/) { # Found whoever's on the console, parse the idle time $_ = substr $_, $finger_idle_idx, $finger_idle_len; if (/^\s*(\d+)\s*$/) { # "MM" $host_idle_time[$hostid] = $1; } elsif (/^\s*(\d\d):\s*$/) { # "HH:" $host_idle_time[$hostid] = $1 * 60; } elsif (/^\s*(\d):(\d\d)\s*$/) { # "H:MM" $host_idle_time[$hostid] = $1 * 60 + $2; } elsif (/^\s*(\d+)d\s*$/) { # "DDd" $host_idle_time[$hostid] = $1 * 24 * 60; } else { $host_idle_time[$hostid] = 0; } last; } } close $handle; $got_sigchld--; # Now try for load average $pid = open $handle, "$rup_program $hosts[$hostid] /dev/null |"; $_ = &read_or_timeout($handle, $host_timeout[$hostid]) or goto error; if (/load average:\s*([0-9.]+)\s*,\s*([0-9.]+)\s*,\s*([0-9.]+)/) { $host_load_avg[$hostid] = $2; } if (/\Wup\s+([0-9]+)\s*days?,\s*([0-9]+):([0-9]+)\s*,/) { $host_uptime[$hostid] = (($1 * 24 + $2) * 60 + $3) * 60; } elsif (/\Wup\s+([0-9]+)\s*mins?/) { $host_uptime[$hostid] = $1 * 60; } elsif (/\Wup\s+([0-9]+):([0-9]+)\s*,/) { $host_uptime[$hostid] = (($1 * 60) + $2) * 60; } elsif (/\Wup\s+([0-9]+)\s*days?,\s*([0-9]+)\s*mins?,/) { $host_uptime[$hostid] = (($1 * 24 * 60) + $2) * 60; } else { die "Unable to parse uptime: `$_'\n"; } close $handle; $got_sigchld--; $host_checked_at[$hostid] = time(); return; error: &kill_process($pid); $host_down[$hostid] = time(); $host_idle_time[$hostid] = 0; $host_uptime[$hostid] = 0; print STDERR "$hosts[$hostid]: unreponsive\n" if $verbose >= 0; } # read_or_timeout FILEHANDLE TIMEOUT => LINE or "" sub read_or_timeout { my $handle = shift; my $time = shift; # I did try this using SIGALRM timeouts but it gave me # errors about trying to free zero referenced objects? my $nfound; my $timeleft; do { my $rin = ""; vec($rin, fileno($handle), 1) = 1; ($nfound, $timeleft) = select($rin, undef, undef, $time); } while ($nfound < 0 and $! =~ /Interrupted system call/); return ($nfound > 0) ? <$handle> : ""; } # kill_process PID sub kill_process { my $pid = shift; if(kill 'TERM', $pid) { eval { local $SIG{ALRM} = sub { die "timeout" }; alarm 5; waitpid $pid, 0; alarm 0; }; if ($@ and $@ =~ /timeout/) { kill 'KILL', $pid; } } } # sleep_for SECONDS sub sleep_for { my $secs = shift; select undef, undef, undef, $secs; } # end of spawn.