1
0
mirror of https://github.com/ARM-software/workload-automation.git synced 2024-10-06 10:51:13 +01:00

Unit tests

This commit is contained in:
Sebastian Goscik 2016-08-12 16:17:35 +01:00
parent 9416888361
commit 9046ddb15d
2 changed files with 483 additions and 128 deletions

View File

@ -1,15 +1,20 @@
# pylint: disable=R0201
from copy import deepcopy, copy
from unittest import TestCase
from nose.tools import assert_equal, assert_is
from mock.mock import MagicMock, Mock
from mock.mock import Mock
from wlauto.exceptions import ConfigError
from wlauto.core.configuration.tree import Node
from wlauto.core.configuration.configuration import (ConfigurationPoint, Configuration,
JobsConfiguration)
from wlauto.core.configuration.tree import SectionNode
from wlauto.core.configuration.configuration import (ConfigurationPoint,
Configuration,
RunConfiguration,
merge_using_priority_specificity,
get_type_name)
from wlauto.core.configuration.plugin_cache import PluginCache
from wlauto.utils.types import obj_dict
# A1
# / \
# B1 B2
@ -17,32 +22,75 @@ from wlauto.core.configuration.configuration import (ConfigurationPoint, Configu
# C1 C2 C3 C4
# \
# D1
a1 = Node("A1")
b1 = a1.add_section("B1")
b2 = a1.add_section("B2")
c1 = b1.add_section("C1")
c2 = b1.add_section("C2")
c3 = b2.add_section("C3")
c4 = b2.add_section("C4")
d1 = c2.add_section("D1")
a1 = SectionNode({"id": "A1"})
b1 = a1.add_section({"id": "B1"})
b2 = a1.add_section({"id": "B2"})
c1 = b1.add_section({"id": "C1"})
c2 = b1.add_section({"id": "C2"})
c3 = b2.add_section({"id": "C3"})
c4 = b2.add_section({"id": "C4"})
d1 = c2.add_section({"id": "D1"})
DEFAULT_PLUGIN_CONFIG = {
"device_config": {
"a": {
"test3": ["there"],
"test5": [5, 4, 3],
},
"b": {
"test4": 1234,
},
},
"some_device": {
"a": {
"test3": ["how are"],
"test2": "MANDATORY",
},
"b": {
"test3": ["you?"],
"test5": [1, 2, 3],
}
}
}
class NodeTest(TestCase):
def _construct_mock_plugin_cache(values=None):
if values is None:
values = deepcopy(DEFAULT_PLUGIN_CONFIG)
plugin_cache = Mock(spec=PluginCache)
plugin_cache.sources = ["a", "b", "c", "d", "e"]
def get_plugin_config(plugin_name):
return values[plugin_name]
plugin_cache.get_plugin_config.side_effect = get_plugin_config
def get_plugin_config_points(_):
return TestConfiguration.configuration
plugin_cache.get_plugin_config_points.side_effect = get_plugin_config_points
return plugin_cache
class TreeTest(TestCase):
def test_node(self):
node = Node(1)
node = SectionNode(1)
assert_equal(node.config, 1)
assert_is(node.parent, None)
assert_equal(node.workloads, [])
assert_equal(node.workload_entries, [])
assert_equal(node.children, [])
def test_add_workload(self):
node = Node(1)
node = SectionNode(1)
node.add_workload(2)
assert_equal(node.workloads, [2])
assert_equal(len(node.workload_entries), 1)
wk = node.workload_entries[0]
assert_equal(wk.config, 2)
assert_is(wk.parent, node)
def test_add_section(self):
node = Node(1)
node = SectionNode(1)
new_node = node.add_section(2)
assert_equal(len(node.children), 1)
assert_is(node.children[0], new_node)
@ -52,28 +100,31 @@ class NodeTest(TestCase):
def test_descendants(self):
for got, expected in zip(b1.descendants(), [c1, d1, c2]):
print "GOT:{} EXPECTED:{}".format(got.config, expected.config)
assert_is(got, expected)
print "----"
assert_equal(got.config, expected.config)
for got, expected in zip(a1.descendants(), [c1, d1, c2, b1, c3, c4, b2]):
print "GOT:{} EXPECTED:{}".format(got.config, expected.config)
assert_is(got, expected)
assert_equal(got.config, expected.config)
def test_ancestors(self):
for got, expected in zip(d1.ancestors(), [c2, b1, a1]):
print "GOT:{} EXPECTED:{}".format(got.config, expected.config)
assert_is(got, expected)
assert_equal(got.config, expected.config)
for _ in a1.ancestors():
raise Exception("A1 is the root, it shouldn't have ancestors")
def test_leaves(self):
for got, expected in zip(a1.leaves(), [c1, d1, c3, c4]):
print "GOT:{} EXPECTED:{}".format(got.config, expected.config)
assert_is(got, expected)
print "----"
assert_equal(got.config, expected.config)
for got, expected in zip(d1.leaves(), [d1]):
print "GOT:{} EXPECTED:{}".format(got.config, expected.config)
assert_is(got, expected)
assert_equal(got.config, expected.config)
def test_source_name(self):
assert_equal(a1.name, 'section "A1"')
global_section = SectionNode({"id": "global"})
assert_equal(global_section.name, "globally specified configuration")
a1.add_workload({'id': 'wk1'})
assert_equal(a1.workload_entries[0].name, 'workload "wk1" from section "A1"')
global_section.add_workload({'id': 'wk2'})
assert_equal(global_section.workload_entries[0].name, 'workload "wk2"')
class ConfigurationPointTest(TestCase):
@ -135,6 +186,11 @@ class ConfigurationPointTest(TestCase):
cp6.set_value(mock)
def test_validation(self):
#Test invalid default
with self.assertRaises(ValueError):
# pylint: disable=W0612
bad_cp = ConfigurationPoint("test", allowed_values=[1], default=100)
def is_even(value):
if value % 2:
return False
@ -144,7 +200,7 @@ class ConfigurationPointTest(TestCase):
cp2 = ConfigurationPoint("test", kind=list, allowed_values=[1, 2, 3, 4, 5])
cp3 = ConfigurationPoint("test", kind=int, constraint=is_even)
cp4 = ConfigurationPoint("test", kind=list, mandatory=True, allowed_values=[1, 99])
mock = MagicMock()
mock = obj_dict()
mock.name = "ConfigurationPoint Validation Unit Test"
# Test allowed values
@ -187,10 +243,10 @@ class ConfigurationPointTest(TestCase):
names = ["str", "list", "integer", "dummy"]
for kind, name in zip(types, names):
cp = ConfigurationPoint("test", kind=kind)
assert_equal(cp.get_type_name(), name)
assert_equal(get_type_name(cp.kind), name)
# Subclass just to add some config points to use in testing
# Subclass to add some config points for use in testing
class TestConfiguration(Configuration):
name = "Test Config"
__configuration = [
@ -205,25 +261,124 @@ class TestConfiguration(Configuration):
class ConfigurationTest(TestCase):
def test(self):
def test_merge_using_priority_specificity(self):
# Test good configs
plugin_cache = _construct_mock_plugin_cache()
expected_result = {
"test1": "hello",
"test2": "MANDATORY",
"test3": ["hello", "there", "how are", "you?"],
"test4": 1234,
"test5": [1, 2, 3],
}
result = merge_using_priority_specificity("device_config", "some_device", plugin_cache)
assert_equal(result, expected_result)
# Test missing mandatory parameter
plugin_cache = _construct_mock_plugin_cache(values={
"device_config": {
"a": {
"test1": "abc",
},
},
"some_device": {
"b": {
"test5": [1, 2, 3],
}
}
})
msg = 'No value specified for mandatory parameter "test2" in some_device.'
with self.assertRaisesRegexp(ConfigError, msg):
merge_using_priority_specificity("device_config", "some_device", plugin_cache)
# Test conflict
plugin_cache = _construct_mock_plugin_cache(values={
"device_config": {
"e": {
'test2': "NOT_CONFLICTING"
}
},
"some_device": {
'a': {
'test2': "CONFLICT1"
},
'b': {
'test2': "CONFLICT2"
},
'c': {
'test2': "CONFLICT3"
},
},
})
msg = ('Error in "e":\n'
'\t"device_config" configuration "test2" has already been specified more specifically for some_device in:\n'
'\t\ta\n'
'\t\tb\n'
'\t\tc')
with self.assertRaisesRegexp(ConfigError, msg):
merge_using_priority_specificity("device_config", "some_device", plugin_cache)
# Test invalid entries
plugin_cache = _construct_mock_plugin_cache(values={
"device_config": {
"a": {
"NOT_A_CFG_POINT": "nope"
}
},
"some_device": {}
})
msg = ('Error in "a":\n\t'
'Invalid entry\(ies\) for "some_device" in "device_config": "NOT_A_CFG_POINT"')
with self.assertRaisesRegexp(ConfigError, msg):
merge_using_priority_specificity("device_config", "some_device", plugin_cache)
plugin_cache = _construct_mock_plugin_cache(values={
"some_device": {
"a": {
"NOT_A_CFG_POINT": "nope"
}
},
"device_config": {}
})
msg = ('Error in "a":\n\t'
'Invalid entry\(ies\) for "some_device": "NOT_A_CFG_POINT"')
with self.assertRaisesRegexp(ConfigError, msg):
merge_using_priority_specificity("device_config", "some_device", plugin_cache)
# pylint: disable=no-member
def test_configuration(self):
# Test loading defaults
cfg = TestConfiguration()
expected = {
"test1": "hello",
"test2": None,
"test3": ["hello"],
"test4": 123,
"test5": None,
}
assert_equal(cfg.to_pod(), expected)
# If a cfg point is not set an attribute with value None should still be created
for name, value in expected.iteritems():
assert_equal(getattr(cfg, name), value)
assert_is(cfg.test2, None)
assert_is(cfg.test5, None)
# Testing pre finalization "set"
# Testing set
# Good value
cfg.set("test1", "there")
assert_equal(cfg.test1, "there") # pylint: disable=E1101
# Unknown value
with self.assertRaisesRegexp(ConfigError, 'Unknown Test Config configuration "nope"'):
cfg.set("nope", 123)
# check_mandatory
with self.assertRaises(ConfigError):
cfg.set("test2", value=None)
cfg.set("test2", value=None, check_mandatory=False)
# parameter constraints are tested in the ConfigurationPoint unit test
# since this just calls through to `ConfigurationPoint.set_value`
# Test validation
msg = 'No value specified for mandatory parameter "test2" in Test Config'
with self.assertRaisesRegexp(ConfigError, msg):
cfg.validate()
cfg.set("test2", 1)
cfg.validate()
# Testing setting values from a dict
new_values = {
@ -238,55 +393,64 @@ class ConfigurationTest(TestCase):
for k, v in new_values.iteritems():
assert_equal(getattr(cfg, k), v)
# Test finalization
#Testing podding
pod = cfg.to_pod()
new_pod = TestConfiguration.from_pod(copy(pod), None).to_pod()
assert_equal(pod, new_pod)
# This is a madatory cfg point so finalization should fail
cfg.configuration["test2"].set_value(cfg, value=None, check_mandatory=False)
msg = 'No value specified for mandatory parameter "test2" in Test Config'
#invalid pod entry
pod = {'invalid_entry': "nope"}
msg = 'Invalid entry\(ies\) for "Test Config": "invalid_entry"'
with self.assertRaisesRegexp(ConfigError, msg):
cfg.finalize()
assert_equal(cfg._finalized, False) # pylint: disable=W0212
TestConfiguration.from_pod(pod, None)
# Valid finalization
cfg.set("test2", "is")
cfg.finalize()
assert_equal(cfg._finalized, True) # pylint: disable=W0212
#failed pod validation
pod = {"test1": "testing"}
msg = 'No value specified for mandatory parameter "test2" in Test Config.'
with self.assertRaisesRegexp(ConfigError, msg):
TestConfiguration.from_pod(pod, None)
# post finalization set should failed
with self.assertRaises(RuntimeError):
cfg.set("test2", "is")
def test_run_configuration(self):
plugin_cache = _construct_mock_plugin_cache()
# Test `merge_device_config``
run_config = RunConfiguration()
run_config.set("device", "some_device")
run_config.merge_device_config(plugin_cache)
class JobsConfigurationTest(TestCase):
# Test `to_pod`
expected_pod = {
"device": "some_device",
"device_config": {
"test1": "hello",
"test2": "MANDATORY",
"test3": ["hello", "there", "how are", "you?"],
"test4": 1234,
"test5": [1, 2, 3],
},
"execution_order": "by_iteration",
"reboot_policy": "as_needed",
"retry_on_status": ['FAILED', 'PARTIAL'],
"max_retries": 3,
}
pod = run_config.to_pod()
assert_equal(pod, expected_pod)
def test_set_global_config(self):
jc = JobsConfiguration()
# Test to_pod > from_pod
new_pod = RunConfiguration.from_pod(copy(pod), plugin_cache).to_pod()
assert_equal(pod, new_pod)
jc.set_global_config("workload_name", "test")
assert_equal(jc.root_node.config.workload_name, "test")
# Aliased names (e.g. "name") should be resolved by the parser
# before being passed here.
# from_pod with invalid device_config
pod['device_config']['invalid_entry'] = "nope"
msg = 'Invalid entry "invalid_entry" for device "some_device".'
with self.assertRaisesRegexp(ConfigError, msg):
RunConfiguration.from_pod(copy(pod), plugin_cache)
with self.assertRaises(ConfigError):
jc.set_global_config("unknown", "test")
# from_pod with no device_config
pod.pop("device_config")
msg = 'No value specified for mandatory parameter "device_config".'
with self.assertRaisesRegexp(ConfigError, msg):
RunConfiguration.from_pod(copy(pod), plugin_cache)
jc.finalise_global_config()
with self.assertRaises(RuntimeError):
jc.set_global_config("workload_name", "test")
def test_tree_manipulation(self):
jc = JobsConfiguration()
workloads = [123, "hello", True]
for w in workloads:
jc.add_workload(w)
assert_equal(jc.root_node.workloads, workloads)
jc.add_section("section", workloads)
assert_equal(jc.root_node.children[0].config, "section")
assert_equal(jc.root_node.workloads, workloads)
def test_generate_job_specs(self):
# disable_instruments
# only_run_ids
def test_generate_job_spec(self):
pass

View File

@ -1,16 +1,16 @@
import os
from unittest import TestCase
from copy import copy
from nose.tools import assert_equal # pylint: disable=E0611
from mock.mock import Mock, MagicMock, call
from wlauto.exceptions import ConfigError
from wlauto.core.configuration.parsers import (get_aliased_param,
_load_file, ConfigParser, EnvironmentVarsParser,
CommandLineArgsParser)
from wlauto.core.configuration import (WAConfiguration, RunConfiguration, JobsConfiguration,
PluginCache)
from wlauto.utils.types import toggle_set
from wlauto.core.configuration.parsers import * # pylint: disable=wildcard-import
from wlauto.core.configuration.parsers import _load_file, _collect_valid_id, _resolve_params_alias
from wlauto.core.configuration import (WAConfiguration, RunConfiguration, JobGenerator,
PluginCache, ConfigurationPoint)
from wlauto.utils.types import toggle_set, reset_counter
class TestFunctions(TestCase):
@ -36,51 +36,98 @@ class TestFunctions(TestCase):
def test_get_aliased_param(self):
# Ideal case
d_correct = {"workload_parameters": [1, 2, 3],
"instruments": [2, 3, 4],
"some_other_param": 1234}
assert_equal(get_aliased_param(d_correct, [
cp1 = ConfigurationPoint("test", aliases=[
'workload_parameters',
'workload_params',
'params'
], default=[], pop=False), [1, 2, 3])
])
d_correct = {"workload_parameters": [1, 2, 3],
"instruments": [2, 3, 4],
"some_other_param": 1234}
assert_equal(get_aliased_param(cp1, d_correct, default=[], pop=False), [1, 2, 3])
# Two aliases for the same parameter given
d_duplicate = {"workload_parameters": [1, 2, 3],
"workload_params": [2, 3, 4]}
with self.assertRaises(ConfigError):
get_aliased_param(d_duplicate, [
'workload_parameters',
'workload_params',
'params'
], default=[])
get_aliased_param(cp1, d_duplicate, default=[])
# Empty dict
d_none = {}
assert_equal(get_aliased_param(d_none, [
'workload_parameters',
'workload_params',
'params'
], default=[]), [])
assert_equal(get_aliased_param(cp1, d_none, default=[]), [])
# Aliased parameter not present in dict
d_not_present = {"instruments": [2, 3, 4],
"some_other_param": 1234}
assert_equal(get_aliased_param(d_not_present, [
'workload_parameters',
'workload_params',
'params'
], default=1), 1)
assert_equal(get_aliased_param(cp1, d_not_present, default=1), 1)
# Testing pop functionality
assert_equal("workload_parameters" in d_correct, True)
get_aliased_param(d_correct, [
'workload_parameters',
'workload_params',
'params'
], default=[])
get_aliased_param(cp1, d_correct, default=[])
assert_equal("workload_parameters" in d_correct, False)
def test_merge_result_processor_instruments(self):
non_merge = {
"instrumentation": toggle_set(["one", "two"]),
}
expected_non_merge = copy(non_merge)
merge_result_processors_instruments(non_merge)
assert_equal(non_merge, expected_non_merge)
no_overlap = {
"instrumentation": ["one", "two"],
"result_processors": ["three", "~four"],
}
expected_no_overlap = {"instrumentation": toggle_set(["one", "two", "three", "~four"])}
merge_result_processors_instruments(no_overlap)
assert_equal(no_overlap, expected_no_overlap)
non_conflicting = {
"instrumentation": ["one", "two"],
"result_processors": ["two", "three"],
}
expected_non_conflicting = {"instrumentation": toggle_set(["one", "two", "three"])}
merge_result_processors_instruments(non_conflicting)
assert_equal(non_conflicting, expected_non_conflicting)
conflict = {
"instrumentation": ["one", "two"],
"result_processors": ["~two", "three"],
}
with self.assertRaises(ConfigError):
merge_result_processors_instruments(conflict)
def test_collect_valid_id(self):
msg = 'Invalid unit_test ID "uses-a-dash"; IDs cannot contain a "-"'
with self.assertRaisesRegexp(ConfigError, msg):
_collect_valid_id("uses-a-dash", set(), "unit_test")
msg = 'Invalid unit_test ID "global"; is a reserved ID'
with self.assertRaisesRegexp(ConfigError, msg):
_collect_valid_id("global", set(), "unit_test")
msg = 'Duplicate unit_test ID "duplicate"'
with self.assertRaisesRegexp(ConfigError, msg):
_collect_valid_id("duplicate", set(["duplicate"]), "unit_test")
def test_resolve_params_alias(self):
test = {"params": "some_value"}
_resolve_params_alias(test, "new_name")
assert_equal(test, {"new_name": "some_value"})
# Test it only affects "params"
_resolve_params_alias(test, "new_name")
assert_equal(test, {"new_name": "some_value"})
test["params"] = "some_other_value"
with self.assertRaises(ConfigError):
_resolve_params_alias(test, "new_name")
def test_construct_valid_entry(self):
raise Exception()
class TestConfigParser(TestCase):
@ -91,7 +138,7 @@ class TestConfigParser(TestCase):
run_config.configuration = RunConfiguration.configuration
config_parser = ConfigParser(wa_config,
run_config,
Mock(spec=JobsConfiguration),
Mock(spec=JobGenerator),
Mock(spec=PluginCache))
# "run_name" can only be in agenda config sections
@ -117,7 +164,7 @@ class TestConfigParser(TestCase):
run_config = Mock(spec=RunConfiguration)
run_config.configuration = RunConfiguration.configuration
jobs_config = Mock(spec=JobsConfiguration)
jobs_config = Mock(spec=JobGenerator)
plugin_cache = Mock(spec=PluginCache)
config_parser = ConfigParser(wa_config, run_config, jobs_config, plugin_cache)
@ -138,11 +185,9 @@ class TestConfigParser(TestCase):
call("project", "some project"),
call("project_stage", "stage 1")
], any_order=True)
print jobs_config.set_global_config.call_args_list
jobs_config.set_global_config.assert_has_calls([
jobs_config.set_global_value.assert_has_calls([
call("iterations", 9001),
call("workload_name", "name"),
call("instrumentation", toggle_set()),
call("instrumentation", toggle_set())
], any_order=True)
@ -153,19 +198,170 @@ class TestConfigParser(TestCase):
"result_processors": ["two", "three"]
}
config_parser.load(instruments_and_result_processors, "Unit test")
jobs_config.set_global_config.assert_has_calls([
call("instrumentation", toggle_set(["one", "two"])),
call("instrumentation", toggle_set(["two", "three"]))
jobs_config.set_global_value.assert_has_calls([
call("instrumentation", toggle_set(["one", "two", "three"]))
], any_order=True)
# Testing a empty config
jobs_config.reset_mock()
config_parser.load({}, "Unit test")
jobs_config.set_global_config.assert_has_calls([], any_order=True)
jobs_config.set_global_value.assert_has_calls([], any_order=True)
wa_config.set.assert_has_calls([], any_order=True)
run_config.set.assert_has_calls([], any_order=True)
class TestAgendaParser(TestCase):
# Tests Phase 1 & 2
def test_valid_structures(self):
wa_config = Mock(spec=WAConfiguration)
wa_config.configuration = WAConfiguration.configuration
run_config = Mock(spec=RunConfiguration)
run_config.configuration = RunConfiguration.configuration
jobs_config = Mock(spec=JobGenerator)
plugin_cache = Mock(spec=PluginCache)
agenda_parser = AgendaParser(wa_config, run_config, jobs_config, plugin_cache)
msg = 'Error in "Unit Test":\n\tInvalid agenda, top level entry must be a dict'
with self.assertRaisesRegexp(ConfigError, msg):
agenda_parser.load(123, "Unit Test")
def _test_bad_type(name, source, msg):
error_msg = msg.format(source=source, name=name)
with self.assertRaisesRegexp(ConfigError, error_msg):
agenda_parser.load({name: 123}, source)
msg = 'Error in "{source}":\n\tInvalid entry "{name}" - must be a dict'
_test_bad_type("config", "Unit Test", msg)
_test_bad_type("global", "Unit Test", msg)
msg = 'Error in "Unit Test":\n\tInvalid entry "{name}" - must be a list'
_test_bad_type("sections", "Unit Test", msg)
_test_bad_type("workloads", "Unit Test", msg)
msg = 'Error in "Unit Test":\n\tInvalid top level agenda entry\(ies\): "{name}"'
_test_bad_type("not_a_valid_entry", "Unit Test", msg)
# Test Phase 3
def test_id_collection(self):
wa_config = Mock(spec=WAConfiguration)
wa_config.configuration = WAConfiguration.configuration
run_config = Mock(spec=RunConfiguration)
run_config.configuration = RunConfiguration.configuration
jobs_config = Mock(spec=JobGenerator)
plugin_cache = Mock(spec=PluginCache)
agenda_parser = AgendaParser(wa_config, run_config, jobs_config, plugin_cache)
agenda = {
"workloads": [
{"id": "test1"},
{"id": "test2"},
],
"sections": [
{"id": "section1",
"workloads": [
{"id": "section1_workload"}
]}
]
}
workloads, sections = agenda_parser.load(agenda, "Unit Test")
assert_equal(sections, set(["section1"]))
assert_equal(workloads, set(["test1", "test2", "section1_workload"]))
# Test Phase 4
def test_id_assignment(self):
wa_config = Mock(spec=WAConfiguration)
wa_config.configuration = WAConfiguration.configuration
run_config = Mock(spec=RunConfiguration)
run_config.configuration = RunConfiguration.configuration
jobs_config = Mock(spec=JobGenerator)
plugin_cache = Mock(spec=PluginCache)
agenda_parser = AgendaParser(wa_config, run_config, jobs_config, plugin_cache)
# Helper function
def _assert_ids(ids, expected):
ids_set = set(ids)
assert_equal(len(ids), len(ids_set))
assert_equal(ids_set, set(expected))
def _assert_workloads_sections(jobs_config, expected_sect, expected_wk):
wk_ids = [wk[0][0]['id'] for wk in jobs_config.add_workload.call_args_list]
# section workloads
for s in jobs_config.add_section.call_args_list:
wk_ids += [wk['id'] for wk in s[0][1]]
#sections
sec_ids = set([s[0][0]['id'] for s in jobs_config.add_section.call_args_list])
_assert_ids(wk_ids, set(expected_wk))
_assert_ids(sec_ids, set(expected_sect))
_reset_jobs_config(jobs_config)
def _reset_jobs_config(jobs_config):
jobs_config.reset_mock()
reset_counter("wk")
reset_counter("s")
# Test auto id assignment
auto_id = {
"workloads": [
{"name": 1},
{"name": 2},
{"name": 3},
],
"sections": [
{"name": 4,
"workloads": [
{"name": 7},
{"name": 8},
{"name": 9},
]},
{"name": 5},
{"name": 6},
]
}
agenda_parser.load(auto_id, "Unit Test")
_assert_workloads_sections(jobs_config, ["s1", "s2", "s3"],
["wk1", "wk2", "wk3", "wk4", "wk5", "wk6"])
# Test user defined IDs
user_ids = {
"workloads": [
{"id": "user1"},
{"name": "autoid1"},
],
"sections": [
{"id": "user_section1",
"workloads": [
{"name": "autoid2"}
]}
]
}
agenda_parser.load(user_ids, "Unit Test")
_assert_workloads_sections(jobs_config, ["user_section1"],
["user1", "wk1", "wk2"])
# Test auto asigned ID already present
used_auto_id = {
"workloads": [
{"id": "wk2"},
{"name": 2},
{"name": 3},
],
}
agenda_parser.load(used_auto_id, "Unit Test")
_assert_workloads_sections(jobs_config, [], ["wk1", "wk2", "wk3"])
# Test string workload
string = {
"workloads": [
"test"
]
}
agenda_parser.load(string, "Unit Test")
workload = jobs_config.add_workload.call_args_list[0][0][0]
assert_equal(isinstance(workload, dict), True)
assert_equal(workload['workload_name'], "test")
class TestEnvironmentVarsParser(TestCase):
def test_environmentvarsparser(self):
@ -209,7 +405,7 @@ class TestEnvironmentVarsParser(TestCase):
class TestCommandLineArgsParser(TestCase):
wa_config = Mock(spec=WAConfiguration)
run_config = Mock(spec=RunConfiguration)
jobs_config = Mock(spec=JobsConfiguration)
jobs_config = Mock(spec=JobGenerator)
cmd_args = MagicMock(
verbosity=1,
@ -220,12 +416,7 @@ class TestCommandLineArgsParser(TestCase):
)
CommandLineArgsParser(cmd_args, wa_config, run_config, jobs_config)
wa_config.set.assert_has_calls([call("verbosity", 1)], any_order=True)
run_config.set.assert_has_calls([call("output_directory", "my_results")], any_order=True)
jobs_config.disable_instruments.assert_has_calls([
call(toggle_set(["~abc", "~def", "~ghi"]))
], any_order=True)
jobs_config.only_run_ids.assert_has_calls([call(["wk1", "s1_wk4"])], any_order=True)
class TestAgendaParser(TestCase):
pass