from collections import Counter
from datetime import datetime, timedelta
import logging
import os
from wa import Command, settings
from wa.framework.configuration.core import Status
from wa.framework.output import RunOutput, discover_wa_outputs
from wa.utils.doc import underline
from wa.utils.log import COLOR_MAP, RESET_COLOR
from wa.utils.terminalsize import get_terminal_size
class ReportCommand(Command):
name = 'report'
description = '''
Monitor an ongoing run and provide information on its progress.
Specify the output directory of the run you would like the monitor;
alternatively report will attempt to discover wa output directories
within the current directory. The output includes run information such as
the UUID, start time, duration, project name and a short summary of the
run's progress (number of completed jobs, the number of jobs in each
different status).
If verbose output is specified, the output includes a list of all events
labelled as not specific to any job, followed by a list of the jobs in the
order executed, with their retries (if any), current status and, if the job
is finished, a list of events that occurred during that job's execution.
This is an example of a job status line:
wk1 (exoplayer) [1] - 2, PARTIAL
It contains two entries delimited by a comma: the job's descriptor followed
by its completion status (``PARTIAL``, in this case). The descriptor
consists of the following elements:
- the job ID (``wk1``)
- the job label (which defaults to the workload name) in parentheses
- job iteration number in square brakets (``1`` in this case)
- a hyphen followed by the retry attempt number.
(note: this will only be shown if the job has been retried as least
once. If the job has not yet run, or if it completed on the first
attempt, the hyphen and retry count -- which in that case would be
zero -- will not appear).
def initialize(self, context):
self.parser.add_argument('-d', '--directory',
Specify the WA output path. report will
otherwise attempt to discover output
directories in the current directory.
def execute(self, state, args):
if args.directory:
output_path = args.directory
run_output = RunOutput(output_path)
possible_outputs = list(discover_wa_outputs(os.getcwd()))
num_paths = len(possible_outputs)
if num_paths > 1:
print('More than one possible output directory found,'
' please choose a path from the following:'
for i in range(num_paths):
print("{}: {}".format(i, possible_outputs[i].basepath))
while True:
select = int(input())
except ValueError:
print("Please select a valid path number")
if select not in range(num_paths):
print("Please select a valid path number")
run_output = possible_outputs[select]
run_output = possible_outputs[0]
rm = RunMonitor(run_output)
class RunMonitor:
def elapsed_time(self):
if self._elapsed is None:
if self.ro.info.duration is None:
self._elapsed = datetime.utcnow() - self.ro.info.start_time
self._elapsed = self.ro.info.duration
return self._elapsed
def job_outputs(self):
if self._job_outputs is None:
self._job_outputs = {
(j_o.id, j_o.label, j_o.iteration): j_o for j_o in self.ro.jobs
return self._job_outputs
def projected_duration(self):
elapsed = self.elapsed_time.total_seconds()
proj = timedelta(seconds=elapsed * (len(self.jobs) / len(self.segmented['finished'])))
return proj - self.elapsed_time
def __init__(self, ro):
self.ro = ro
self._elapsed = None
self._p_duration = None
self._job_outputs = None
self._termwidth = None
self._fmt = _simple_formatter()
def get_data(self):
self.jobs = [state for label_id, state in self.ro.state.jobs.items()]
if self.jobs:
rc = self.ro.run_config
self.segmented = segment_jobs_by_state(self.jobs,
def generate_run_header(self):
info = self.ro.info
header = underline('Run Info')
header += "UUID: {}\n".format(info.uuid)
if info.run_name:
header += "Run name: {}\n".format(info.run_name)
if info.project:
header += "Project: {}\n".format(info.project)
if info.project_stage:
header += "Project stage: {}\n".format(info.project_stage)
if info.start_time:
duration = _seconds_as_smh(self.elapsed_time.total_seconds())
header += ("Start time: {}\n"
"Duration: {:02}:{:02}:{:02}\n"
duration[2], duration[1], duration[0],
if self.segmented['finished'] and not info.end_time:
p_duration = _seconds_as_smh(self.projected_duration.total_seconds())
header += "Projected time remaining: {:02}:{:02}:{:02}\n".format(
p_duration[2], p_duration[1], p_duration[0]
elif self.ro.info.end_time:
header += "End time: {}\n".format(info.end_time)
return header + '\n'
def generate_job_summary(self):
total = len(self.jobs)
num_fin = len(self.segmented['finished'])
summary = underline('Job Summary')
summary += 'Total: {}, Completed: {} ({}%)\n'.format(
total, num_fin, (num_fin / total) * 100
) if total > 0 else 'No jobs created\n'
ctr = Counter()
for run_state, jobs in ((k, v) for k, v in self.segmented.items() if v):
if run_state == 'finished':
ctr.update([job.status.name.lower() for job in jobs])
ctr[run_state] += len(jobs)
return summary + ', '.join(
[str(count) + ' ' + self._fmt.highlight_keyword(status) for status, count in ctr.items()]
) + '\n\n'
def generate_job_detail(self):
detail = underline('Job Detail')
for job in self.jobs:
detail += ('{} ({}) [{}]{}, {}\n').format(
' - ' + str(job.retries)if job.retries else '',
job_output = self.job_outputs[(job.id, job.label, job.iteration)]
for event in job_output.events:
detail += self._fmt.fit_term_width(
return detail
def generate_run_detail(self):
detail = underline('Run Events') if self.ro.events else ''
for event in self.ro.events:
detail += '{}\n'.format(event.summary)
return detail + '\n'
def generate_output(self, verbose):
if not self.jobs:
return 'No jobs found in output directory\n'
output = self.generate_run_header()
output += self.generate_job_summary()
if verbose:
output += self.generate_run_detail()
output += self.generate_job_detail()
return output
def _seconds_as_smh(seconds):
seconds = int(seconds)
hours = seconds // 3600
minutes = (seconds % 3600) // 60
seconds = seconds % 60
return seconds, minutes, hours
def segment_jobs_by_state(jobstates, max_retries, retry_status):
finished_states = [
Status.PARTIAL, Status.FAILED,
Status.ABORTED, Status.OK, Status.SKIPPED
segmented = {
'finished': [], 'other': [], 'running': [],
'pending': [], 'uninitialized': []
for jobstate in jobstates:
if (jobstate.status in retry_status) and jobstate.retries < max_retries:
elif jobstate.status in finished_states:
elif jobstate.status == Status.RUNNING:
elif jobstate.status == Status.PENDING:
elif jobstate.status == Status.NEW:
return segmented
class _simple_formatter:
color_map = {
'running': COLOR_MAP[logging.INFO],
'partial': COLOR_MAP[logging.WARNING],
'failed': COLOR_MAP[logging.CRITICAL],
'aborted': COLOR_MAP[logging.ERROR]
def __init__(self):
self.termwidth = get_terminal_size()[0]
self.color = settings.logging['color']
def fit_term_width(self, text):
text = text.expandtabs()
if len(text) <= self.termwidth:
return text
return text[0:self.termwidth - 4] + " ...\n"
def highlight_keyword(self, kw):
if not self.color or kw not in _simple_formatter.color_map:
return kw
color = _simple_formatter.color_map[kw.lower()]
return '{}{}{}'.format(color, kw, RESET_COLOR)