32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200 | class Prune(Pipeline.Filter):
"""Prune the catalog based on the import include rule."""
def __init__(self, import_: prof.Import, profile: prof.Profile) -> None:
"""
Inject the import.
This needs to be created prior to knowing the catalog.
The profile itself is only needed for debug messages.
The import is one possibly several imports in that profile.
"""
self._import = import_
self._profile = profile
self._catalog_interface: Optional[CatalogInterface] = None
self._catalog: Optional[cat.Catalog] = None
def _set_catalog(self, catalog: cat.Catalog) -> None:
"""Set the catalog used by the catalog interface."""
self._catalog_interface = CatalogInterface(catalog)
self._catalog = catalog
def _controls_selected(self, select_list: Optional[List[prof.SelectControl]]) -> List[str]:
control_ids: List[str] = []
if select_list is not None:
for select_control in select_list:
if select_control.matching is not None:
raise TrestleError('Profiles with SelectControl based on matching are not supported.')
include_children = select_control.with_child_controls == 'yes'
if select_control.with_ids:
new_ids = select_control.with_ids
for withid_ in new_ids:
id_ = withid_.__root__
control_ids.append(id_)
if include_children:
control_ids.extend(self._catalog_interface.get_dependent_control_ids(id_))
return control_ids
def _find_needed_control_ids(self) -> List[str]:
"""Get list of control_ids needed by profile and corresponding groups."""
if self._import.include_controls is not None:
include_ids = self._controls_selected(self._import.include_controls)
else:
if self._import.include_all is None:
logger.warning('Profile does not specify include-controls, so including all.')
include_ids = self._catalog_interface.get_control_ids()
exclude_ids = self._controls_selected(self._import.exclude_controls)
if not set(include_ids).issuperset(set(exclude_ids)):
logger.debug(f'include_ids is not a superset of exclude_ids in import {self._import.href}')
return sorted([id_ for id_ in include_ids if id_ not in exclude_ids])
def _prune_control(self, needed_ids: List[str], control: cat.Control, exclude_ids: List[str]) -> cat.Control:
"""
Prune the control based on the Import requirements.
This is only called if the control is needed
Some or all of its sub_controls may not be needed
This always returns the original control, possibly with fewer subcontrols
"""
if control.controls is None:
return control
controls = []
for sub_control in control.controls:
if sub_control.id in needed_ids and sub_control.id not in exclude_ids:
controls.append(self._prune_control(needed_ids, sub_control, exclude_ids))
exclude_ids.append(sub_control.id)
control.controls = none_if_empty(controls)
return control
def _prune_controls(self, needed_ids: List[str]) -> List[str]:
loaded_ids: List[str] = []
final_ids: List[str] = []
for control_id in needed_ids:
if control_id not in loaded_ids:
control = self._catalog_interface.get_control(control_id)
if control is None:
msg = (
f'Profile titled "{self._profile.metadata.title}" references control {control_id} '
f'but it is not in catalog titled "{self._catalog.metadata.title}"'
)
raise TrestleError(msg)
control = self._prune_control(needed_ids, control, loaded_ids)
self._catalog_interface.replace_control(control)
loaded_ids.append(control_id)
final_ids.append(control_id)
return final_ids
def _re_insert_child_controls(self, control: cat.Control) -> cat.Control:
"""Re insert this control and its children recursively."""
new_controls = []
for sub_control in as_list(control.controls):
new_control = self._re_insert_child_controls(sub_control)
new_controls.append(new_control)
control.controls = none_if_empty(new_controls)
return control
def _re_insert_children(self) -> None:
"""Go through all controls in control dict and load child controls from control dict."""
for control in self._catalog_interface.get_all_controls_from_dict():
_ = self._re_insert_child_controls(control)
def _prune_catalog(self) -> cat.Catalog:
"""Prune the controls in the current catalog."""
if self._import is None:
return self._catalog
needed_ids = self._find_needed_control_ids()
# if a control includes controls - only include those that we know are needed
final_control_ids = self._prune_controls(needed_ids)
self._re_insert_children()
cat_controls = []
# build the needed groups of controls
group_dict: Dict[str, cat.Group] = {}
for control_id in final_control_ids:
control = self._catalog_interface.get_control(control_id)
group_id, group_title, group_class = self._catalog_interface.get_group_info_by_control(control_id)
if not group_id:
cat_controls.append(control)
continue
group = group_dict.get(group_id)
if group is None:
group = cat.Group(id=group_id, title=group_title, class_=group_class, controls=[control])
group_dict[group_id] = group
else:
group_dict[group_id].controls.append(control)
new_groups: Optional[List[cat.Group]] = list(group_dict.values())
# should avoid empty lists so set to None if empty
new_groups = none_if_empty(new_groups)
cat_controls = none_if_empty(cat_controls)
new_params = self._catalog.params
new_cat = cat.Catalog(
uuid=str(uuid4()),
metadata=self._catalog.metadata,
back_matter=common.BackMatter(),
controls=cat_controls,
groups=new_groups,
params=new_params
)
# find all referenced uuids - they should be 1:1 with those in backmatter
needed_uuid_refs = ModelUtils.find_uuid_refs(new_cat)
# prune the list of resources to only those that are needed
new_resources: Optional[List[common.Resource]] = []
if self._catalog.back_matter and self._catalog.back_matter.resources:
new_resources = [res for res in self._catalog.back_matter.resources if res.uuid in needed_uuid_refs]
new_resources = none_if_empty(new_resources)
new_cat.back_matter.resources = new_resources
return new_cat
def process(self, catalog_iter: Iterator[cat.Catalog]) -> Iterator[cat.Catalog]: # type: ignore
"""
Prune the catalog based on the include rule in the import_.
This only processes the one catalog yielded by the one import in this pipeline.
It must yield in order to have the merge filter loop over available imported catalogs.
"""
self._set_catalog(next(catalog_iter))
logger.debug(f'prune yielding catalog {self._catalog.metadata.title} with import {self._import.href}')
yield self._prune_catalog()
|