diff --git a/wa/__init__.py b/wa/__init__.py index ab6a7cca..73e55d22 100644 --- a/wa/__init__.py +++ b/wa/__init__.py @@ -1,6 +1,7 @@ from wa.framework import pluginloader, log, signal from wa.framework.command import Command from wa.framework.configuration import settings +from wa.framework.configuration.core import Status from wa.framework.exception import HostError, JobError, InstrumentError, ConfigError from wa.framework.exception import (ResultProcessorError, ResourceError, CommandError, ToolError) @@ -10,4 +11,5 @@ from wa.framework.exception import WorkerThreadError, PluginLoaderError from wa.framework.instrumentation import (Instrument, very_slow, slow, normal, fast, very_fast) from wa.framework.plugin import Plugin, Parameter +from wa.framework.processor import ResultProcessor from wa.framework.workload import Workload diff --git a/wa/framework/configuration/core.py b/wa/framework/configuration/core.py index 8e2ed919..c34fa044 100644 --- a/wa/framework/configuration/core.py +++ b/wa/framework/configuration/core.py @@ -32,11 +32,10 @@ KIND_MAP = { dict: OrderedDict, } -RunStatus = enum(['NEW', 'STARTED', 'CONNECTED', 'INITIALIZED', 'RUNNING', - 'ABORTED', 'FAILED', 'PARTIAL', 'OK']) +Status = enum(['UNKNOWN', 'NEW', 'PENDING', + 'STARTED', 'CONNECTED', 'INITIALIZED', 'RUNNING', + 'SKIPPED', 'ABORTED', 'FAILED', 'PARTIAL', 'OK']) -JobStatus = enum(['NEW', 'PENDING', 'RUNNING', - 'SKIPPED', 'ABORTED', 'FAILED', 'PARTIAL', 'OK']) ########################## @@ -551,8 +550,7 @@ class MetaConfiguration(Configuration): 'wa.commands', 'wa.workloads', 'wa.instrumentation', - #'wa.result_processors', - #'wa.managers', + 'wa.processors', 'wa.framework.target.descriptor', 'wa.framework.resource_getters', ] @@ -741,9 +739,9 @@ class RunConfiguration(Configuration): ), ConfigurationPoint( 'retry_on_status', - kind=list_of(JobStatus), + kind=list_of(Status), default=['FAILED', 'PARTIAL'], - allowed_values=JobStatus.values, + allowed_values=Status.values[Status.RUNNING.value:], description=''' This is list of statuses on which a job will be cosidered to have failed and will be automatically retried up to ``max_retries`` @@ -774,6 +772,17 @@ class RunConfiguration(Configuration): .. note:: this number does not include the original attempt ''', ), + ConfigurationPoint( + 'result_processors', + kind=toggle_set, + default=['csv', 'status'], + description=''' + The list of output processors to be used for this run. Output processors + post-process data generated by workloads and instruments, e.g. to + generate additional reports, format the output in a certain way, or + export the output to an exeternal location. + ''', + ), ] configuration = {cp.name: cp for cp in config_points + meta_data} diff --git a/wa/framework/configuration/execution.py b/wa/framework/configuration/execution.py index 66df05ca..e568ad34 100644 --- a/wa/framework/configuration/execution.py +++ b/wa/framework/configuration/execution.py @@ -4,7 +4,7 @@ from itertools import izip_longest, groupby, chain from wa.framework import pluginloader from wa.framework.configuration.core import (MetaConfiguration, RunConfiguration, - JobGenerator, JobStatus, settings) + JobGenerator, Status, settings) from wa.framework.configuration.parsers import ConfigParser from wa.framework.configuration.plugin_cache import PluginCache from wa.framework.exception import NotFoundError @@ -88,12 +88,23 @@ class ConfigManager(object): for name in self.enabled_instruments: try: instruments.append(self.get_plugin(name, kind='instrument', - target=target)) + target=target)) except NotFoundError: msg = 'Instrument "{}" not found' raise NotFoundError(msg.format(name)) return instruments + def get_processors(self): + processors = [] + for name in self.run_config.result_processors: + try: + proc = self.plugin_cache.get_plugin(name, kind='result_processor') + except NotFoundError: + msg = 'Result processor "{}" not found' + raise NotFoundError(msg.format(name)) + processors.append(proc) + return processors + def finalize(self): if not self.agenda: msg = 'Attempting to finalize config before agenda has been set' diff --git a/wa/framework/execution.py b/wa/framework/execution.py index d4e1672d..51d22725 100644 --- a/wa/framework/execution.py +++ b/wa/framework/execution.py @@ -28,12 +28,13 @@ from itertools import izip_longest import wa.framework.signal as signal from wa.framework import instrumentation, pluginloader -from wa.framework.configuration.core import settings, RunStatus, JobStatus +from wa.framework.configuration.core import settings, Status from wa.framework.exception import (WAError, ConfigError, TimeoutError, InstrumentError, TargetError, TargetNotRespondingError) from wa.framework.output import init_job_output from wa.framework.plugin import Artifact +from wa.framework.processor import ProcessorManager from wa.framework.resource import ResourceResolver from wa.framework.run import RunState from wa.framework.target.info import TargetInfo @@ -95,22 +96,35 @@ class ExecutionContext(object): self.tm = tm self.run_output = output self.run_state = output.state + self.target_info = self.tm.get_target_info() self.logger.debug('Loading resource discoverers') self.resolver = ResourceResolver(cm) self.resolver.load() self.job_queue = None self.completed_jobs = None self.current_job = None + self.successful_jobs = 0 + self.failed_jobs = 0 def start_run(self): self.output.info.start_time = datetime.now() self.output.write_info() self.job_queue = copy(self.cm.jobs) self.completed_jobs = [] - self.run_state.status = RunStatus.STARTED + self.run_state.status = Status.STARTED + self.output.status = Status.STARTED self.output.write_state() def end_run(self): + if self.successful_jobs: + if self.failed_jobs: + status = Satus.PARTIAL + else: + status = Status.OK + else: + status = Status.FAILED + self.run_state.status = status + self.output.status = status self.output.info.end_time = datetime.now() self.output.info.duration = self.output.info.end_time -\ self.output.info.start_time @@ -144,7 +158,7 @@ class ExecutionContext(object): def skip_remaining_jobs(self): while self.job_queue: job = self.job_queue.pop(0) - job.status = JobStatus.SKIPPED + job.status = Status.SKIPPED self.run_state.update_job(job) self.completed_jobs.append(job) self.write_state() @@ -166,6 +180,9 @@ class ExecutionContext(object): classifiers=None): self.run_output.add_artifact(name, path, kind, description, classifiers) + def add_event(self, message): + self.output.add_event(message) + class Executor(object): """ @@ -228,8 +245,14 @@ class Executor(object): instrumentation.install(instrument) instrumentation.validate() + self.logger.info('Installing result processors') + pm = ProcessorManager() + for proc in config_manager.get_processors(): + pm.install(proc) + pm.validate() + self.logger.info('Starting run') - runner = Runner(context) + runner = Runner(context, pm) signal.send(signal.RUN_STARTED, self) runner.run() self.execute_postamble(context, output) @@ -244,7 +267,7 @@ class Executor(object): counter = context.run_state.get_status_counts() parts = [] - for status in reversed(JobStatus.values): + for status in reversed(Status.values): if status in counter: parts.append('{} {}'.format(counter[status], status)) self.logger.info(status_summary + ', '.join(parts)) @@ -272,9 +295,10 @@ class Runner(object): """ - def __init__(self, context): + def __init__(self, context, pm): self.logger = logging.getLogger('runner') self.context = context + self.pm = pm self.output = self.context.output self.config = self.context.cm @@ -290,6 +314,7 @@ class Runner(object): except KeyboardInterrupt: self.context.skip_remaining_jobs() except Exception as e: + self.context.add_event(e.message) if (not getattr(e, 'logged', None) and not isinstance(e, KeyboardInterrupt)): log.log_error(e, self.logger) @@ -302,6 +327,7 @@ class Runner(object): def initialize_run(self): self.logger.info('Initializing run') self.context.start_run() + self.pm.initialize() log.indent() for job in self.context.job_queue: job.initialize(self.context) @@ -311,6 +337,9 @@ class Runner(object): def finalize_run(self): self.logger.info('Finalizing run') self.context.end_run() + self.pm.process_run_output(self.context) + self.pm.export_run_output(self.context) + self.pm.finalize() def run_next_job(self, context): job = context.start_job() @@ -319,12 +348,13 @@ class Runner(object): try: log.indent() self.do_run_job(job, context) - job.status = JobStatus.OK + job.status = Status.OK except KeyboardInterrupt: - job.status = JobStatus.ABORTED + job.status = Status.ABORTED raise except Exception as e: - job.status = JobStatus.FAILED + job.status = Status.FAILED + context.add_event(e.message) if not getattr(e, 'logged', None): log.log_error(e, self.logger) e.logged = True @@ -337,7 +367,7 @@ class Runner(object): self.check_job(job) def do_run_job(self, job, context): - job.status = JobStatus.RUNNING + job.status = Status.RUNNING self.send(signal.JOB_STARTED) with signal.wrap('JOB_TARGET_CONFIG', self): @@ -353,15 +383,18 @@ class Runner(object): try: with signal.wrap('JOB_OUTPUT_PROCESSED', self): job.process_output(context) + self.pm.process_job_output(context) + self.pm.export_job_output(context) except Exception: - job.status = JobStatus.PARTIAL + job.status = Status.PARTIAL raise + except KeyboardInterrupt: - job.status = JobStatus.ABORTED + job.status = Status.ABORTED self.logger.info('Got CTRL-C. Aborting.') raise except Exception as e: - job.status = JobStatus.FAILED + job.status = Status.FAILED if not getattr(e, 'logged', None): log.log_error(e, self.logger) e.logged = True @@ -380,15 +413,17 @@ class Runner(object): self.logger.error(msg.format(job.id, job.status, job.iteration)) self.context.move_failed(job) job.retries += 1 - job.status = JobStatus.PENDING + job.status = Status.PENDING self.context.job_queue.insert(0, job) self.context.write_state() else: msg = 'Job {} iteration {} completed with status {}. '\ 'Max retries exceeded.' self.logger.error(msg.format(job.id, job.status, job.iteration)) + self.context.failed_jobs += 1 else: # status not in retry_on_status self.logger.info('Job completed with status {}'.format(job.status)) + self.context.successful_jobs += 1 def send(self, s): signal.send(s, self, self.context) diff --git a/wa/framework/job.py b/wa/framework/job.py index 3a3ee2d4..9b7bc7da 100644 --- a/wa/framework/job.py +++ b/wa/framework/job.py @@ -1,7 +1,7 @@ import logging from wa.framework import pluginloader, signal -from wa.framework.configuration.core import JobStatus +from wa.framework.configuration.core import Status class Job(object): @@ -18,15 +18,25 @@ class Job(object): def classifiers(self): return self.spec.classifiers + @property + def status(self): + return self._status + + @status.setter + def status(self, value): + self._status = value + if self.output: + self.output.status = value + def __init__(self, spec, iteration, context): self.logger = logging.getLogger('job') self.spec = spec self.iteration = iteration self.context = context - self.status = JobStatus.NEW self.workload = None self.output = None self.retries = 0 + self._status = Status.NEW def load(self, target, loader=pluginloader): self.logger.info('Loading job {}'.format(self.id)) @@ -40,7 +50,7 @@ class Job(object): self.logger.info('Initializing job {}'.format(self.id)) with signal.wrap('WORKLOAD_INITIALIZED', self, context): self.workload.initialize(context) - self.status = JobStatus.PENDING + self.status = Status.PENDING context.update_job_state(self) def configure_target(self, context): diff --git a/wa/framework/output.py b/wa/framework/output.py index 6111ecad..ead48c76 100644 --- a/wa/framework/output.py +++ b/wa/framework/output.py @@ -5,10 +5,11 @@ import string import sys import uuid from copy import copy -from datetime import timedelta +from datetime import datetime, timedelta -from wa.framework.configuration.core import JobSpec, RunStatus -from wa.framework.configuration.manager import ConfigManager +from wa.framework.configuration.core import JobSpec, Status +from wa.framework.configuration.execution import ConfigManager +from wa.framework.exception import HostError from wa.framework.run import RunState, RunInfo from wa.framework.target.info import TargetInfo from wa.utils.misc import touch, ensure_directory_exists @@ -21,13 +22,37 @@ logger = logging.getLogger('output') class Output(object): + kind = None + @property def resultfile(self): return os.path.join(self.basepath, 'result.json') + @property + def event_summary(self): + num_events = len(self.events) + if num_events: + lines = self.events[0].message.split('\n') + message = '({} event(s)): {}' + if num_events > 1 or len(lines) > 1: + message += '[...]' + return message.format(num_events, lines[0]) + return '' + + @property + def status(self): + if self.result is None: + return None + return self.result.status + + @status.setter + def status(self, value): + self.result.status = value + def __init__(self, path): self.basepath = path self.result = None + self.events = [] def reload(self): pod = read_pod(self.resultfile) @@ -36,11 +61,16 @@ class Output(object): def write_result(self): write_pod(self.result.to_pod(), self.resultfile) + def get_path(self, subpath): + return os.path.join(self.basepath, subpath.strip(os.sep)) + def add_metric(self, name, value, units=None, lower_is_better=False, classifiers=None): self.result.add_metric(name, value, units, lower_is_better, classifiers) def add_artifact(self, name, path, kind, description=None, classifiers=None): + if not os.path.exists(path): + path = self.get_path(path) if not os.path.exists(path): msg = 'Attempting to add non-existing artifact: {}' raise HostError(msg.format(path)) @@ -51,9 +81,14 @@ class Output(object): self.result.add_artifact(name, path, kind, description, classifiers) + def add_event(self, message): + self.result.add_event(message) + class RunOutput(Output): + kind = 'run' + @property def logfile(self): return os.path.join(self.basepath, 'run.log') @@ -96,7 +131,7 @@ class RunOutput(Output): self.info = None self.state = None self.result = None - self.jobs = None + self.jobs = [] if (not os.path.isfile(self.statefile) or not os.path.isfile(self.infofile)): msg = '"{}" does not exist or is not a valid WA output directory.' @@ -155,8 +190,10 @@ class RunOutput(Output): class JobOutput(Output): + kind = 'job' + def __init__(self, path, id, label, iteration, retry): - self.basepath = path + super(JobOutput, self).__init__(path) self.id = id self.label = label self.iteration = iteration @@ -170,13 +207,17 @@ class Result(object): @staticmethod def from_pod(pod): instance = Result() + instance.status = Status(pod['status']) instance.metrics = [Metric.from_pod(m) for m in pod['metrics']] instance.artifacts = [Artifact.from_pod(a) for a in pod['artifacts']] + instance.events = [Event.from_pod(e) for e in pod['events']] return instance def __init__(self): + self.status = Status.NEW self.metrics = [] self.artifacts = [] + self.events = [] def add_metric(self, name, value, units=None, lower_is_better=False, classifiers=None): @@ -190,10 +231,15 @@ class Result(object): logger.debug('Adding artifact: {}'.format(artifact)) self.artifacts.append(artifact) + def add_event(self, message): + self.events.append(Event(message)) + def to_pod(self): return dict( + status=str(self.status), metrics=[m.to_pod() for m in self.metrics], artifacts=[a.to_pod() for a in self.artifacts], + events=[e.to_pod() for e in self.events], ) @@ -349,6 +395,43 @@ class Metric(object): return '<{}>'.format(text) +class Event(object): + """ + An event that occured during a run. + + """ + + __slots__ = ['timestamp', 'message'] + + @staticmethod + def from_pod(pod): + instance = Event(pod['message']) + instance.timestamp = pod['timestamp'] + return instance + + @property + def summary(self): + lines = self.message.split('\n') + result = lines[0] + if len(lines) > 1: + result += '[...]' + return result + + def __init__(self, message): + self.timestamp = datetime.utcnow() + self.message = message + + def to_pod(self): + return dict( + timestamp=self.timestamp, + message=self.message, + ) + + def __str__(self): + return '[{}] {}'.format(self.timestamp, self.message) + + __repr__ = __str__ + def init_run_output(path, wa_state, force=False): if os.path.exists(path): @@ -382,7 +465,10 @@ def init_job_output(run_output, job): path = os.path.join(run_output.basepath, output_name) ensure_directory_exists(path) write_pod(Result().to_pod(), os.path.join(path, 'result.json')) - return JobOutput(path, job.id, job.iteration, job.label, job.retries) + job_output = JobOutput(path, job.id, job.iteration, job.label, job.retries) + job_output.status = job.status + run_output.jobs.append(job_output) + return job_output def _save_raw_config(meta_dir, state): diff --git a/wa/framework/processor.py b/wa/framework/processor.py new file mode 100644 index 00000000..aca240ae --- /dev/null +++ b/wa/framework/processor.py @@ -0,0 +1,87 @@ +import logging + +from wa.framework import pluginloader +from wa.framework.exception import ConfigError +from wa.framework.instrumentation import is_installed +from wa.framework.plugin import Plugin +from wa.utils.log import log_error, indent, dedent + + +class ResultProcessor(Plugin): + + kind = 'result_processor' + requires = [] + + def validate(self): + super(ResultProcessor, self).validate() + for instrument in self.requires: + if not is_installed(instrument): + msg = 'Instrument "{}" is required by {}, but is not installed.' + raise ConfigError(msg.format(instrument, self.name)) + + def initialize(self): + pass + + def finalize(self): + pass + + +class ProcessorManager(object): + + def __init__(self, loader=pluginloader): + self.loader = loader + self.logger = logging.getLogger('processor') + self.processors = [] + + def install(self, processor): + if not isinstance(processor, ResultProcessor): + processor = self.loader.get_result_processor(processor) + self.logger.debug('Installing {}'.format(processor.name)) + self.processors.append(processor) + + def validate(self): + for proc in self.processors: + proc.validate() + + def initialize(self): + for proc in self.processors: + proc.initialize() + + def finalize(self): + for proc in self.processors: + proc.finalize() + + def process_job_output(self, context): + self.do_for_each_proc('process_job_output', 'processing using "{}"', + context.job_output, context.target_info, + context.run_output) + + def export_job_output(self, context): + self.do_for_each_proc('export_job_output', 'Exporting using "{}"', + context.job_output, context.target_info, + context.run_output) + + def process_run_output(self, context): + self.do_for_each_proc('process_run_output', 'Processing using "{}"', + context.run_output, context.target_info) + + def export_run_output(self, context): + self.do_for_each_proc('export_run_output', 'Exporting using "{}"', + context.run_output, context.target_info) + + def do_for_each_proc(self, method_name, message, *args): + try: + indent() + for proc in self.processors: + proc_func = getattr(proc, method_name, None) + if proc_func is None: + continue + try: + self.logger.info(message.format(proc.name)) + proc_func(*args) + except Exception as e: + if isinstance(e, KeyboardInterrupt): + raise + log_error(e, self.logger) + finally: + dedent() diff --git a/wa/framework/run.py b/wa/framework/run.py index 50c35be5..8510fc12 100644 --- a/wa/framework/run.py +++ b/wa/framework/run.py @@ -18,7 +18,7 @@ from collections import OrderedDict, Counter from copy import copy from datetime import datetime, timedelta -from wa.framework.configuration.core import RunStatus, JobStatus +from wa.framework.configuration.core import Status class RunInfo(object): @@ -67,7 +67,7 @@ class RunState(object): @staticmethod def from_pod(pod): instance = RunState() - instance.status = RunStatus(pod['status']) + instance.status = Status(pod['status']) instance.timestamp = pod['timestamp'] jss = [JobState.from_pod(j) for j in pod['jobs']] instance.jobs = OrderedDict(((js.id, js.iteration), js) for js in jss) @@ -76,11 +76,11 @@ class RunState(object): @property def num_completed_jobs(self): return sum(1 for js in self.jobs.itervalues() - if js.status > JobStatus.SKIPPED) + if js.status > Status.SKIPPED) def __init__(self): self.jobs = OrderedDict() - self.status = RunStatus.NEW + self.status = Status.NEW self.timestamp = datetime.now() def add_job(self, job): @@ -110,7 +110,7 @@ class JobState(object): @staticmethod def from_pod(pod): - instance = JobState(pod['id'], pod['label'], JobStatus(pod['status'])) + instance = JobState(pod['id'], pod['label'], Status(pod['status'])) instance.retries = pod['retries'] instance.iteration = pod['iteration'] instance.timestamp = pod['timestamp'] diff --git a/wa/processors/__init__.py b/wa/processors/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/wa/processors/csvproc.py b/wa/processors/csvproc.py new file mode 100644 index 00000000..8e9f68ea --- /dev/null +++ b/wa/processors/csvproc.py @@ -0,0 +1,87 @@ +import csv + +from wa import ResultProcessor, Parameter +from wa.framework.exception import ConfigError +from wa.utils.types import list_of_strings + + +class CsvReportProcessor(ResultProcessor): + + name = 'csv' + description = """ + Creates a ``results.csv`` in the output directory containing results for + all iterations in CSV format, each line containing a single metric. + + """ + + parameters = [ + Parameter('use_all_classifiers', kind=bool, default=False, + global_alias='use_all_classifiers', + description=""" + If set to ``True``, this will add a column for every classifier + that features in at least one collected metric. + + .. note:: This cannot be ``True`` if ``extra_columns`` is set. + + """), + Parameter('extra_columns', kind=list_of_strings, + description=""" + List of classifiers to use as columns. + + .. note:: This cannot be set if ``use_all_classifiers`` is + ``True``. + + """), + ] + + def validate(self): + super(CsvReportProcessor, self).validate() + if self.use_all_classifiers and self.extra_columns: + msg = 'extra_columns cannot be specified when '\ + 'use_all_classifiers is True' + raise ConfigError(msg) + + def initialize(self): + self.results_so_far = [] # pylint: disable=attribute-defined-outside-init + self.artifact_added = False + + def process_job_output(self, output, target_info, run_output): + self.results_so_far.append(output) + self._write_results(self.results_so_far, run_output) + if not self.artifact_added: + run_output.add_artifact('run_result_csv', 'results.csv', 'export') + self.artifact_added = True + + def process_run_result(self, output, target_info): + self.results_so_far.append(output.result) + self._write_results(self.rsults_so_far, output) + if not self.artifact_added: + output.add_artifact('run_result_csv', 'results.csv', 'export') + self.artifact_added = True + + def _write_results(self, results, output): + if self.use_all_classifiers: + classifiers = set([]) + for result in results: + for metric in result.metrics: + classifiers.update(metric.classifiers.keys()) + extra_columns = list(classifiers) + elif self.extra_columns: + extra_columns = self.extra_columns + else: + extra_columns = [] + + outfile = output.get_path('results.csv') + with open(outfile, 'wb') as wfh: + writer = csv.writer(wfh) + writer.writerow(['id', 'workload', 'iteration', 'metric', ] + + extra_columns + ['value', 'units']) + + for o in results: + header = [o.id, o.label, o.iteration] + for metric in o.result.metrics: + row = (header + [metric.name] + + [str(metric.classifiers.get(c, '')) + for c in extra_columns] + + [str(metric.value), metric.units or '']) + writer.writerow(row) diff --git a/wa/processors/status.py b/wa/processors/status.py new file mode 100644 index 00000000..9744519f --- /dev/null +++ b/wa/processors/status.py @@ -0,0 +1,59 @@ +# Copyright 2013-2015 ARM Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +# pylint: disable=R0201 +import os +import time +from collections import Counter + +from wa import ResultProcessor, Status +from wa.utils.misc import write_table + + +class StatusTxtReporter(ResultProcessor): + name = 'status' + description = """ + Outputs a txt file containing general status information about which runs + failed and which were successful + + """ + + def process_run_output(self, output, target_info): + counter = Counter() + for jo in output.jobs: + counter[jo.status] += 1 + + outfile = output.get_path('status.txt') + self.logger.info('Status available in {}'.format(outfile)) + with open(outfile, 'w') as wfh: + wfh.write('Run name: {}\n'.format(output.info.run_name)) + wfh.write('Run status: {}\n'.format(output.status)) + wfh.write('Date: {}\n'.format(time.strftime("%c"))) + if output.events: + wfh.write('Events:\n') + for event in output.events: + wfh.write('\t{}\n'.format(event.summary)) + + txt = '{}/{} iterations completed without error\n' + wfh.write(txt.format(counter[Status.OK], len(output.jobs))) + wfh.write('\n') + status_lines = [map(str, [o.id, o.label, o.iteration, o.status, + o.event_summary]) + for o in output.jobs] + write_table(status_lines, wfh, align='<<>><') + + output.add_artifact('run_status_summary', 'status.txt', 'export') +