39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443 | class CatalogReader():
"""
Catalog reader.
Catalog reader handles all operations related to
reading catalog from markdown.
"""
def __init__(self, catalog_interface: CatalogInterface):
"""Initialize catalog reader."""
self._catalog_interface = catalog_interface
def read_additional_content(
self,
md_path: pathlib.Path,
required_sections_list: List[str],
label_map: Dict[str, Dict[str, str]],
sections_dict: Dict[str, str],
write_mode: bool
) -> Tuple[List[prof.Alter], Dict[str, Any], Dict[str, str]]:
"""Read all markdown controls and return list of alters plus control param dict and param sort map."""
alters_map: Dict[str, prof.Alter] = {}
final_param_dict: Dict[str, Any] = {}
param_sort_map: Dict[str, str] = {}
for group_path in CatalogInterface._get_group_ids_and_dirs(md_path).values():
for control_file in group_path.glob('*.md'):
sort_id, control_alters, control_param_dict = ControlReader.read_editable_content(
control_file,
required_sections_list,
label_map,
sections_dict,
write_mode
)
alters_map[sort_id] = control_alters
for param_id, param_dict in control_param_dict.items():
# if profile_values are present, overwrite values with them
if const.PROFILE_VALUES in param_dict:
if param_dict[const.PROFILE_VALUES] != [] and param_dict[const.PROFILE_VALUES] is not None:
if not write_mode and const.REPLACE_ME_PLACEHOLDER in param_dict[const.PROFILE_VALUES]:
param_dict[const.PROFILE_VALUES].remove(const.REPLACE_ME_PLACEHOLDER)
if param_dict[const.PROFILE_VALUES] != [] and param_dict[const.PROFILE_VALUES] is not None:
param_dict[const.VALUES] = param_dict[const.PROFILE_VALUES]
if not write_mode:
param_dict.pop(const.PROFILE_VALUES)
# verifies if at control profile edition the param value origin was modified
# through the profile-param-value-origin tag
if const.PROFILE_PARAM_VALUE_ORIGIN in param_dict:
if param_dict[const.PROFILE_PARAM_VALUE_ORIGIN] != const.REPLACE_ME_PLACEHOLDER:
param_dict[const.PARAM_VALUE_ORIGIN] = param_dict[const.PROFILE_PARAM_VALUE_ORIGIN]
param_dict.pop(const.PROFILE_PARAM_VALUE_ORIGIN)
else:
# removes replace me placeholder and profile-param-value-origin as it was not modified
param_dict.pop(const.PROFILE_PARAM_VALUE_ORIGIN)
# validates param-value-origin is in dict to remove it
# because a value wasn´t provided and it shouldn´t be inheriting value from parent
if const.PARAM_VALUE_ORIGIN in param_dict:
param_dict.pop(const.PARAM_VALUE_ORIGIN)
final_param_dict[param_id] = param_dict
param_sort_map[param_id] = sort_id
new_alters: List[prof.Alter] = []
# fill the alters according to the control sorting order
for key in sorted(alters_map.keys()):
new_alters.extend(alters_map[key])
return new_alters, final_param_dict, param_sort_map
def read_catalog_from_markdown(self, md_path: pathlib.Path, set_parameters_flag: bool) -> cat.Catalog:
"""
Read the groups and catalog controls from the given directory.
This will overwrite the existing groups and controls in the catalog.
"""
id_map = CatalogInterface._get_group_ids_and_dirs(md_path)
groups: List[cat.Group] = []
# read each group dir
for group_id, group_dir in id_map.items():
control_list_raw = []
group_title = ''
# Need to get group title from at least one control in this directory
# All controls in dir should have same group title
# Set group title to the first one found and warn if different non-empty title appears
# Controls with empty group titles are tolerated but at least one title must be present or warning given
# The special group with no name that has the catalog as parent is just a list and has no title
for control_path in group_dir.glob('*.md'):
control, control_group_title = ControlReader.read_control(control_path, set_parameters_flag)
if control_group_title:
if group_title:
if control_group_title != group_title:
logger.warning(
f'Control {control.id} group title {control_group_title} differs from {group_title}'
)
else:
group_title = control_group_title
control_list_raw.append(control)
control_list = sorted(control_list_raw, key=lambda control: ControlInterface.get_sort_id(control))
if group_id:
if not group_title:
logger.warning(
f'No group title found in controls for group {group_id}. The title will be recovered if assembling into an existing catalog with the group title defined.' # noqa E501
)
new_group = cat.Group(id=group_id, title=group_title)
new_group.controls = none_if_empty(control_list)
groups.append(new_group)
else:
# if the list of controls has no group id it also has no title and is just the controls of the catalog
self._catalog_interface._catalog.controls = none_if_empty(control_list)
self._catalog_interface._catalog.groups = none_if_empty(groups)
self._catalog_interface._create_control_dict()
self._catalog_interface._catalog.params = none_if_empty(self._catalog_interface._catalog.params)
return self._catalog_interface._catalog
@staticmethod
def read_catalog_imp_reqs(md_path: pathlib.Path, context: ControlContext) -> List[comp.ImplementedRequirement]:
"""Read the full set of control implemented requirements from markdown.
Args:
md_path: Path to the markdown control files, with directories for each group
context: Context for the operation
Returns:
List of implemented requirements gathered from each control
Notes:
As the controls are read into the catalog the needed components are added if not already available.
avail_comps provides the mapping of component name to the actual component.
This is only used during component assemble and only for updating one component
"""
imp_req_map: Dict[str, comp.ImplementedRequirement] = {}
for group_path in CatalogInterface._get_group_ids_and_dirs(md_path).values():
for control_file in group_path.glob('*.md'):
sort_id, imp_req = ControlReader.read_implemented_requirement(control_file, context)
imp_req_map[sort_id] = imp_req
return [imp_req_map[key] for key in sorted(imp_req_map.keys())]
@staticmethod
def _get_imp_req_for_control(ssp: ossp.SystemSecurityPlan, control_id: str) -> ossp.ImplementedRequirement:
for imp_req in as_list(ssp.control_implementation.implemented_requirements):
if imp_req.control_id == control_id:
return imp_req
imp_req = gens.generate_sample_model(ossp.ImplementedRequirement)
imp_req.control_id = control_id
ssp.control_implementation.implemented_requirements = as_list(
ssp.control_implementation.implemented_requirements
)
ssp.control_implementation.implemented_requirements.append(imp_req)
return imp_req
@staticmethod
def _get_imp_req_for_statement(
ssp: ossp.SystemSecurityPlan, control_id: str, statement_id: str
) -> ossp.ImplementedRequirement:
control_imp_req: Optional[ossp.ImplementedRequirement] = None
for imp_req in as_list(ssp.control_implementation.implemented_requirements):
if imp_req.control_id == control_id:
control_imp_req = imp_req
if statement_id in [stat.statement_id for stat in as_list(imp_req.statements)]:
return imp_req
# we didn't find imp_req with statement so need to make statement and/or imp_req
if not control_imp_req:
control_imp_req = gens.generate_sample_model(ossp.ImplementedRequirement)
control_imp_req.control_id = control_id
control_imp_req.statements = None
ssp.control_implementation.implemented_requirements = as_list(
ssp.control_implementation.implemented_requirements
)
ssp.control_implementation.implemented_requirements.append(control_imp_req)
CatalogReader._add_statement_to_impl_requirement(control_imp_req, statement_id)
return control_imp_req
@staticmethod
def _add_statement_to_impl_requirement(impl_req: ossp.ImplementedRequirement, statement_id: str) -> ossp.Statement:
"""Update the implemented requirement with a new statement."""
statement = gens.generate_sample_model(ossp.Statement)
statement.statement_id = statement_id
statement.by_components = None
impl_req.statements = as_list(impl_req.statements)
impl_req.statements.append(statement)
return statement
@staticmethod
def _get_by_comp_from_imp_req(
imp_req: ossp.ImplementedRequirement, statement_id: str, comp_uuid: str
) -> ossp.ByComponent:
if statement_id:
found_statement: Optional[ossp.Statement] = None
for statement in as_list(imp_req.statements):
if statement.statement_id == statement_id:
found_statement = statement
break
# If the statement doesn't exist, create it
if not found_statement:
found_statement = CatalogReader._add_statement_to_impl_requirement(imp_req, statement_id)
else:
for by_comp in as_list(found_statement.by_components):
if by_comp.component_uuid == comp_uuid:
return by_comp
# didnt find bycomp or new statement so need to make one
by_comp = gens.generate_sample_model(ossp.ByComponent)
by_comp.component_uuid = comp_uuid
by_comp.implementation_status = com.ImplementationStatus(state=const.STATUS_PLANNED)
found_statement.by_components = as_list(found_statement.by_components)
found_statement.by_components.append(by_comp)
return by_comp
else:
for by_comp in as_list(imp_req.by_components):
if by_comp.component_uuid == comp_uuid:
return by_comp
by_comp = gens.generate_sample_model(ossp.ByComponent)
by_comp.component_uuid = comp_uuid
by_comp.implementation_status = com.ImplementationStatus(state=const.STATUS_PLANNED)
imp_req.by_components = as_list(imp_req.by_components)
imp_req.by_components.append(by_comp)
return by_comp
@staticmethod
def _read_comp_info_from_md(control_file_path: pathlib.Path,
context: ControlContext) -> Tuple[Dict[str, Any], CompDict]:
md_header = {}
comp_dict = {}
if control_file_path.exists():
md_header, comp_dict = ControlReader.read_control_info_from_md(control_file_path, context)
return md_header, comp_dict
@staticmethod
def _update_ssp_with_comp_info(
ssp: ossp.SystemSecurityPlan,
control_id: str,
gen_comp: generic.GenericComponent,
comp_info_dict: Dict[str, ComponentImpInfo],
part_id_map_by_label: Dict[str, Dict[str, str]]
) -> None:
# get imp req for control and find one with by_comp, creating if needed
imp_req = CatalogReader._get_imp_req_for_control(ssp, control_id)
# if control has no parts it will not have part id map and bycomps will go at control level
control_part_id_map = part_id_map_by_label.get(control_id, {})
for label, comp_info in comp_info_dict.items():
part_id = control_part_id_map.get(label, '')
by_comp = CatalogReader._get_by_comp_from_imp_req(imp_req, part_id, gen_comp.uuid)
by_comp.description = comp_info.prose
by_comp.implementation_status = comp_info.status
@staticmethod
def _insert_set_param_into_by_comps(
item: Union[ossp.ImplementedRequirement, ossp.ByComponent],
rule_id: str,
param_name: str,
param_values: List[str],
comp_uuid: str
) -> None:
for by_comp in as_list(item.by_components):
if by_comp.component_uuid == comp_uuid:
for prop in as_list(by_comp.props):
if prop.name == const.RULE_ID and prop.value == rule_id:
found = False
for sp in as_list(by_comp.set_parameters):
if sp.param_id == param_name:
sp.values = param_values
found = True
break
if not found:
sp = ossp.SetParameter(param_id=param_name, values=param_values)
by_comp.set_parameters = as_list(by_comp.set_parameters)
by_comp.set_parameters.append(sp)
@staticmethod
def _insert_param_dict_in_imp_req(
imp_req: ossp.ImplementedRequirement,
param_dict: Dict[str, str],
comp_name: str,
md_header: Dict[str, Dict[str, str]],
comp_uuid: str
):
"""Insert the param in the by_comps that are supported by the rule."""
# given param name find rule_id in comp name header entry
# then find all statements with by_comp that have that rule id in props
rules_dict = md_header.get(const.RULES_PARAMS_TAG, {})
comp_rules_params = rules_dict.get(comp_name, [])
param_name = param_dict['name']
param_values = param_dict['values']
for comp_rule_param in comp_rules_params:
if comp_rule_param['name'] == param_name:
rule_id = comp_rule_param[const.HEADER_RULE_ID]
CatalogReader._insert_set_param_into_by_comps(imp_req, rule_id, param_name, param_values, comp_uuid)
for statement in as_list(imp_req.statements):
CatalogReader._insert_set_param_into_by_comps(
statement, rule_id, param_name, param_values, comp_uuid
)
@staticmethod
def _add_set_params_to_item(param_dict: Dict[str, str], item: TypeWithSetParams, param_id: str) -> None:
value_list = param_dict[const.SSP_VALUES]
param_values = value_list
new_sp_list = []
for sp in as_list(item.set_parameters):
if sp.param_id != param_id:
new_sp_list.append(sp)
item.set_parameters = new_sp_list
item.set_parameters.append(ossp.SetParameter(param_id=param_id, values=param_values))
@staticmethod
def _add_props_to_imp_req(
control_id: str,
part_id_map_by_label: Dict[str, Dict[str, str]],
yaml_header: Dict[str, Any],
imp_req: ossp.ImplementedRequirement
) -> None:
"""Add the props from the yaml header to the imp_req."""
control_part_id_map = part_id_map_by_label.get(control_id, {})
props, props_by_id = ControlReader.get_props_list(control_id, control_part_id_map, yaml_header)
# add the props at control level
if props:
imp_req.props = as_list(imp_req.props)
imp_req.props.extend(props)
# add the props at the part level
for label, part_id in control_part_id_map.items():
props = props_by_id.get(label, [])
if not props:
continue
for statement in as_list(imp_req.statements):
if statement.statement_id == part_id:
statement.props = as_list(statement.props)
statement.props.extend(props)
@staticmethod
def _update_ssp_with_md_header(
ssp: ossp.SystemSecurityPlan,
control_id: str,
comp_dict: Dict[str, generic.GenericComponent],
part_label_to_id_map: Dict[str, Dict[str, str]],
md_header: Dict[str, Dict[str, str]]
) -> None:
"""Update the ssp with info from the header of an ssp control markdown file."""
# rules param vals go in bycomps of imp_req
# param vals go directly in imp_req
rules_param_vals_dict = md_header.get(const.COMP_DEF_RULES_PARAM_VALS_TAG, {})
imp_req = CatalogReader._get_imp_req_for_control(ssp, control_id)
for comp_name, param_dict_list in rules_param_vals_dict.items():
for param_dict in as_list(param_dict_list):
if const.SSP_VALUES in param_dict:
param_dict['values'] = param_dict['ssp-values']
CatalogReader._insert_param_dict_in_imp_req(
imp_req, param_dict, comp_name, md_header, comp_dict[comp_name].uuid
)
param_vals_dict = md_header.get(const.SET_PARAMS_TAG, {})
for param_id, param_dict in param_vals_dict.items():
if const.SSP_VALUES in param_dict:
CatalogReader._add_set_params_to_item(param_dict, imp_req, param_id)
CatalogReader._add_props_to_imp_req(control_id, part_label_to_id_map, md_header, imp_req)
@staticmethod
def read_ssp_md_content(
md_path: pathlib.Path,
ssp: ossp.SystemSecurityPlan,
comp_dict: Dict[str, generic.GenericComponent],
part_id_map_by_label: Dict[str, Dict[str, str]],
context: ControlContext
) -> None:
"""
Read md content into the ssp.
Args:
md_path: path to the catalog markdown
ssp: ssp in which to insert the md content
comp_dict: map of component name to component
part_id_map_by_label: map label to part_id of control
context: control context for the procedure
Notes:
The ssp should already contain info from the comp defs and this fills in selected content from md.
The only content read from md is:
ssp values in the comp def rules param vals of the header
ssp values in the set-params of the header
all prose for implementaton responses
all status values
ssp has components but may not have all needed imp reqs and bycomps
know controlid and comp name in comp_dict
"""
for group_path in CatalogInterface._get_group_ids_and_dirs(md_path).values():
for control_file in group_path.glob('*.md'):
skip = False
for file in control_file.parents:
if file.name == const.INHERITANCE_VIEW_DIR:
skip = True
break
if skip:
continue
control_id = control_file.stem
md_header, control_comp_dict = CatalogReader._read_comp_info_from_md(control_file, context)
for comp_name, comp_info_dict in control_comp_dict.items():
if comp_name not in comp_dict:
err_msg = f'Control {control_id} references component {comp_name} not defined in a component-definition.' # noqa E501
# give added guidance if no comp defs were specified at command line
if not context.comp_def_name_list:
err_msg += ' Please specify the names of any component-definitions needed for assembly.'
raise TrestleError(err_msg)
CatalogReader._update_ssp_with_comp_info(
ssp, control_id, comp_dict[comp_name], comp_info_dict, part_id_map_by_label
)
CatalogReader._update_ssp_with_md_header(ssp, control_id, comp_dict, part_id_map_by_label, md_header)
|