csv_to_oscal_cd
trestle.tasks.csv_to_oscal_cd
¤
OSCAL transformation tasks.
ADJUSTED_RISK_RATING
¤
CHECK_DESCRIPTION
¤
CHECK_ID
¤
COMPONENT_DESCRIPTION
¤
COMPONENT_TITLE
¤
COMPONENT_TYPE
¤
CONTROL_ID_LIST
¤
FETCHER_DESCRIPTION
¤
FETCHER_ID
¤
HEADER_DECORATION_CHAR
¤
NAMESPACE
¤
ORIGINAL_RISK_RATING
¤
PARAMETER
¤
PARAMETER_DESCRIPTION
¤
PARAMETER_ID
¤
PARAMETER_VALUE_ALTERNATIVES
¤
PARAMETER_VALUE_DEFAULT
¤
PROFILE_DESCRIPTION
¤
PROFILE_SOURCE
¤
RISK_ADJUSTMENT
¤
RULE_DESCRIPTION
¤
RULE_ID
¤
Row
¤
logger
¤
prefix_rule_set
¤
validation
¤
Classes¤
CsvColumn
¤
CsvColumn.
Source code in trestle/tasks/csv_to_oscal_cd.py
class CsvColumn():
"""CsvColumn."""
_columns_required = [
f'{COMPONENT_TITLE}',
f'{COMPONENT_DESCRIPTION}',
f'{COMPONENT_TYPE}',
f'{RULE_ID}',
f'{RULE_DESCRIPTION}',
f'{PROFILE_SOURCE}',
f'{PROFILE_DESCRIPTION}',
f'{CONTROL_ID_LIST}',
f'{NAMESPACE}',
]
_columns_required_validation = [
f'{COMPONENT_TITLE}',
f'{COMPONENT_DESCRIPTION}',
f'{COMPONENT_TYPE}',
f'{RULE_ID}',
f'{NAMESPACE}',
f'{CHECK_ID}',
f'{CHECK_DESCRIPTION}',
]
_columns_optional = [
f'{CHECK_ID}',
f'{CHECK_DESCRIPTION}',
f'{ORIGINAL_RISK_RATING}',
f'{ADJUSTED_RISK_RATING}',
f'{RISK_ADJUSTMENT}',
]
_columns_parameter = [
f'{PARAMETER_ID}',
f'{PARAMETER_DESCRIPTION}',
f'{PARAMETER_VALUE_ALTERNATIVES}',
f'{PARAMETER_VALUE_DEFAULT}',
]
_columns_ordered = _columns_required + _columns_parameter + _columns_optional
@staticmethod
def get_order(column_name: str) -> int:
"""Get order for column_name."""
rval = sys.maxsize
if column_name in CsvColumn._columns_ordered:
rval = CsvColumn._columns_ordered.index(column_name)
return rval
@staticmethod
def is_column_name_required(name: str) -> bool:
"""Is column name required."""
return name in (CsvColumn._columns_required + CsvColumn._columns_required_validation)
@staticmethod
def is_column_name_optional(name: str) -> bool:
"""Is column name optional."""
return name in (CsvColumn._columns_optional)
@staticmethod
def is_column_name_parameter(name: str) -> bool:
"""Is column name parameter."""
for cname in CsvColumn._columns_parameter:
if name.startswith(cname):
return True
return False
@staticmethod
def get_required_column_names() -> List[str]:
"""Get required column names."""
rval = []
rval += CsvColumn._columns_required
return rval
@staticmethod
def get_optional_column_names() -> List[str]:
"""Get optional column names."""
rval = []
rval += CsvColumn._columns_optional
return rval
@staticmethod
def get_parameter_column_names() -> List[str]:
"""Get parameter column names."""
rval = []
rval += CsvColumn._columns_parameters
return rval
@staticmethod
def get_required_column_names_validation() -> List[str]:
"""Get required column names validation."""
rval = []
rval += CsvColumn._columns_required_validation
return rval
_rule_property_column_names = [
f'{RULE_ID}',
f'{RULE_DESCRIPTION}',
f'{PARAMETER_ID}',
f'{PARAMETER_DESCRIPTION}',
f'{PARAMETER_VALUE_ALTERNATIVES}',
f'{CHECK_ID}',
f'{CHECK_DESCRIPTION}',
f'{ORIGINAL_RISK_RATING}',
f'{ADJUSTED_RISK_RATING}',
f'{RISK_ADJUSTMENT}',
]
@staticmethod
def get_rule_property_column_names() -> List[str]:
"""Get rule property column names."""
return CsvColumn._rule_property_column_names
# columns required which do not become properties
_columns_required_filtered = [
f'{COMPONENT_TITLE}',
f'{COMPONENT_DESCRIPTION}',
f'{COMPONENT_TYPE}',
f'{PROFILE_SOURCE}',
f'{PROFILE_DESCRIPTION}',
f'{CONTROL_ID_LIST}',
f'{NAMESPACE}',
]
# optional columns which do not become properties, initially
_columns_optional_filtered = []
_columns_filtered = _columns_required_filtered + _columns_optional_filtered
@staticmethod
def get_filtered_required_column_names() -> List[str]:
"""Get filtered required column names."""
rval = []
for column_name in CsvColumn.get_required_column_names():
if column_name not in CsvColumn._columns_filtered:
rval.append(column_name)
return rval
@staticmethod
def get_filtered_optional_column_names() -> List[str]:
"""Get filtered optional column names."""
rval = []
for column_name in CsvColumn.get_optional_column_names():
if column_name not in CsvColumn._columns_filtered:
rval.append(column_name)
return rval
_check_property_column_names = [
f'{RULE_ID}',
f'{CHECK_ID}',
f'{CHECK_DESCRIPTION}',
]
@staticmethod
def get_check_property_column_names() -> List[str]:
"""Get check property column names."""
return CsvColumn._check_property_column_names
# optional columns which do become properties, afterwards
_columns_parameters = [
f'{PARAMETER_ID}',
f'{PARAMETER_DESCRIPTION}',
f'{PARAMETER_VALUE_ALTERNATIVES}',
]
# optional columns which require Param_Id be present in the row
_columns_parameters_dependent = [
f'{PARAMETER_DESCRIPTION}',
f'{PARAMETER_VALUE_ALTERNATIVES}',
f'{PARAMETER_VALUE_DEFAULT}',
]
Methods¤
get_check_property_column_names()
staticmethod
¤
Get check property column names.
Source code in trestle/tasks/csv_to_oscal_cd.py
@staticmethod
def get_check_property_column_names() -> List[str]:
"""Get check property column names."""
return CsvColumn._check_property_column_names
get_filtered_optional_column_names()
staticmethod
¤
Get filtered optional column names.
Source code in trestle/tasks/csv_to_oscal_cd.py
@staticmethod
def get_filtered_optional_column_names() -> List[str]:
"""Get filtered optional column names."""
rval = []
for column_name in CsvColumn.get_optional_column_names():
if column_name not in CsvColumn._columns_filtered:
rval.append(column_name)
return rval
get_filtered_required_column_names()
staticmethod
¤
Get filtered required column names.
Source code in trestle/tasks/csv_to_oscal_cd.py
@staticmethod
def get_filtered_required_column_names() -> List[str]:
"""Get filtered required column names."""
rval = []
for column_name in CsvColumn.get_required_column_names():
if column_name not in CsvColumn._columns_filtered:
rval.append(column_name)
return rval
get_optional_column_names()
staticmethod
¤
Get optional column names.
Source code in trestle/tasks/csv_to_oscal_cd.py
@staticmethod
def get_optional_column_names() -> List[str]:
"""Get optional column names."""
rval = []
rval += CsvColumn._columns_optional
return rval
get_order(column_name)
staticmethod
¤
Get order for column_name.
Source code in trestle/tasks/csv_to_oscal_cd.py
@staticmethod
def get_order(column_name: str) -> int:
"""Get order for column_name."""
rval = sys.maxsize
if column_name in CsvColumn._columns_ordered:
rval = CsvColumn._columns_ordered.index(column_name)
return rval
get_parameter_column_names()
staticmethod
¤
Get parameter column names.
Source code in trestle/tasks/csv_to_oscal_cd.py
@staticmethod
def get_parameter_column_names() -> List[str]:
"""Get parameter column names."""
rval = []
rval += CsvColumn._columns_parameters
return rval
get_required_column_names()
staticmethod
¤
Get required column names.
Source code in trestle/tasks/csv_to_oscal_cd.py
@staticmethod
def get_required_column_names() -> List[str]:
"""Get required column names."""
rval = []
rval += CsvColumn._columns_required
return rval
get_required_column_names_validation()
staticmethod
¤
Get required column names validation.
Source code in trestle/tasks/csv_to_oscal_cd.py
@staticmethod
def get_required_column_names_validation() -> List[str]:
"""Get required column names validation."""
rval = []
rval += CsvColumn._columns_required_validation
return rval
get_rule_property_column_names()
staticmethod
¤
Get rule property column names.
Source code in trestle/tasks/csv_to_oscal_cd.py
@staticmethod
def get_rule_property_column_names() -> List[str]:
"""Get rule property column names."""
return CsvColumn._rule_property_column_names
is_column_name_optional(name)
staticmethod
¤
Is column name optional.
Source code in trestle/tasks/csv_to_oscal_cd.py
@staticmethod
def is_column_name_optional(name: str) -> bool:
"""Is column name optional."""
return name in (CsvColumn._columns_optional)
is_column_name_parameter(name)
staticmethod
¤
Is column name parameter.
Source code in trestle/tasks/csv_to_oscal_cd.py
@staticmethod
def is_column_name_parameter(name: str) -> bool:
"""Is column name parameter."""
for cname in CsvColumn._columns_parameter:
if name.startswith(cname):
return True
return False
is_column_name_required(name)
staticmethod
¤
Is column name required.
Source code in trestle/tasks/csv_to_oscal_cd.py
@staticmethod
def is_column_name_required(name: str) -> bool:
"""Is column name required."""
return name in (CsvColumn._columns_required + CsvColumn._columns_required_validation)
CsvToOscalComponentDefinition (TaskBase)
¤
Task to create OSCAL ComponentDefinition json.
Attributes:
Name | Type | Description |
---|---|---|
name |
str |
Name of the task. |
Source code in trestle/tasks/csv_to_oscal_cd.py
class CsvToOscalComponentDefinition(TaskBase):
"""
Task to create OSCAL ComponentDefinition json.
Attributes:
name: Name of the task.
"""
name = 'csv-to-oscal-cd'
def __init__(self, config_object: Optional[configparser.SectionProxy]) -> None:
"""
Initialize trestle task csv-to-oscal-cd.
Args:
config_object: Config section associated with the task.
"""
super().__init__(config_object)
def print_info(self) -> None:
"""Print the help string."""
name = self.name
oscal_name = 'component_definition'
#
logger.info(f'Help information for {name} task.')
logger.info('')
logger.info(f'Purpose: From csv produce OSCAL {oscal_name} file.')
logger.info('')
logger.info('')
logger.info(f'Configuration flags sit under [task.{name}]:')
text1 = ' title = '
text2 = '(required) the component definition title.'
logger.info(text1 + text2)
text1 = ' version = '
text2 = '(required) the component definition version.'
logger.info(text1 + text2)
text1 = ' csv-file = '
text2 = '(required) the path of the csv file.'
text3 = ' [1st row are column headings; 2nd row are column descriptions; 3rd row and beyond is data]'
logger.info(text1 + text2 + text3)
text1 = ' required columns: '
for text2 in CsvColumn.get_required_column_names():
if text2 in [f'{RULE_DESCRIPTION}', f'{PROFILE_SOURCE}', f'{PROFILE_DESCRIPTION}', f'{CONTROL_ID_LIST}']:
text2 += ' (see note 1)'
logger.info(text1 + '$$' + text2)
text1 = ' '
text1 = ' optional columns: '
for text2 in CsvColumn.get_optional_column_names():
if text2 in [f'{ORIGINAL_RISK_RATING}', f'{ADJUSTED_RISK_RATING}', f'{RISK_ADJUSTMENT}']:
text2 += ' (see note 1)'
else:
text2 += ' (see note 2)'
logger.info(text1 + '$' + text2)
text1 = ' '
for text2 in CsvColumn.get_parameter_column_names():
text2 += ' (see notes 1, 4)'
logger.info(text1 + '$' + text2)
text1 = ' '
text1 = ' comment columns: '
text2 = 'Informational (see note 3)'
logger.info(text1 + '#' + text2)
text1 = ' output-dir = '
text2 = '(required) the path of the output directory for synthesized OSCAL .json files.'
logger.info(text1 + text2)
text1 = ' component-definition = '
text2 = '(optional) the path of the existing component-definition OSCAL .json file.'
logger.info(text1 + text2)
text1 = ' class.column-name = '
text2 = f'(optional) the class to associate with the specified column name, e.g. class.{RULE_ID} = scc_class'
logger.info(text1 + text2)
text1 = ' output-overwrite = '
text2 = '(optional) true [default] or false; replace existing output when true.'
logger.info(text1 + text2)
text1 = ' validate-controls = '
text2 = '(optional) on, warn, or off [default]; validate controls exist in resolved profile.'
logger.info(text1 + text2)
# Notes
text1 = ''
text2 = ''
logger.info(text1 + text2)
text1 = 'Notes: '
text2 = '[1] column is ignored for validation component type'
logger.info(text1 + text2)
text1 = ' '
text2 = '[2] column is required for validation component type'
logger.info(text1 + text2)
text1 = ' '
text2 = '[3] column name starting with # causes column to be ignored'
logger.info(text1 + text2)
text1 = ' '
text2 = '[4] additional parameters are specified by adding a common suffix per set'
text3 = f', for example: {PARAMETER_ID}_1, {PARAMETER_DESCRIPTION}_1, ...{PARAMETER_ID}_2...'
logger.info(text1 + text2 + text3)
def configure(self) -> bool:
"""Configure."""
self._timestamp = datetime.datetime.utcnow().replace(microsecond=0).replace(tzinfo=datetime.timezone.utc
).isoformat()
# config verbosity
self._quiet = self._config.get('quiet', False)
self._verbose = not self._quiet
# title
self._title = self._config.get('title')
if self._title is None:
logger.warning('config missing "title"')
return False
# version
self._version = self._config.get('version')
if self._version is None:
logger.warning('config missing "version"')
return False
# config csv
self._csv_file = self._config.get('csv-file')
if self._csv_file is None:
logger.warning('config missing "csv-file"')
return False
self._csv_path = pathlib.Path(self._csv_file)
if not self._csv_path.exists():
logger.warning('"csv-file" not found')
return False
# announce csv
if self._verbose:
logger.info(f'input: {self._csv_file}')
# config cd
self._cd_path = None
self._cd_file = self._config.get('component-definition')
if self._cd_file is not None:
self._cd_path = pathlib.Path(self._cd_file)
if not self._cd_path.exists():
logger.warning('"component-definition" not found')
return False
# workspace
self._workspace = os.getcwd()
# validate_controls
self._validate_controls = self._config.get('validate-controls', 'off')
return True
def get_class(self, name: str) -> str:
"""Get class value for specified name from config."""
key = f'class.{name}'
return self._config.get(key)
def simulate(self) -> TaskOutcome:
"""Provide a simulated outcome."""
return TaskOutcome('simulated-success')
def execute(self) -> TaskOutcome:
"""Provide an executed outcome."""
try:
return self._execute()
except Exception:
logger.error(traceback.format_exc())
return TaskOutcome('failure')
def _execute(self) -> TaskOutcome:
"""Execute path core."""
if not self.configure():
return TaskOutcome('failure')
# config output
odir = self._config.get('output-dir')
opth = pathlib.Path(odir)
self._overwrite = self._config.getboolean('output-overwrite', True)
# insure output dir exists
opth.mkdir(exist_ok=True, parents=True)
# calculate output file name & check writability
oname = 'component-definition.json'
ofile = opth / oname
if not self._overwrite and pathlib.Path(ofile).exists():
logger.warning(f'output: {ofile} already exists')
return TaskOutcome('failure')
# fetch existing component-definition, if any
self._cd_mgr = _CdMgr(self._cd_path, self._title, self._timestamp, self._version)
# fetch csv
self._csv_mgr = _CsvMgr(self._csv_path)
# create resolved profile -> catalog helper
profile_list = self._csv_mgr.get_profile_list()
self._resolved_profile_catalog_helper = _ResolvedProfileCatalogHelper(profile_list, self._workspace)
self._unresolved_controls = []
# calculate deletion, addition & modification rule lists
rules = self._calculate_rules()
# calculate deletion, addition & modification set-parameter lists
set_params = self._calculate_set_params(rules[2])
# calculate deletion, addition & modification control mapping lists
control_mappings = self._calculate_control_mappings(rules[2])
# rule set manager
self._rule_set_id_mgr = _RuleSetIdMgr(self._cd_mgr.get_max_rule_set_number(), len(rules[1]))
# rule additions, deletions & modifications (by row)
self.rules_del(rules[0])
self.rules_add(rules[1])
self.rules_mod(rules[2])
# set-parameters additions, deletions & modifications (by row)
self.set_params_del(set_params[0])
self.set_params_add(set_params[1])
self.set_params_mod(set_params[2])
# control mappings additions, deletions & modifications (by row)
self.control_mappings_del(control_mappings[0])
self.control_mappings_add(control_mappings[1])
# note: control mappings mod is currently not possible
# note: add/del user columns not currently supported
if len(self._unresolved_controls) > 0:
text = f'Unresolved controls: {self._unresolved_controls}'
if self._validate_controls == 'warn':
logger.warning(text)
elif self._validate_controls == 'on':
raise RuntimeError(text)
# prepare new/revised component definition
component_definition = self._cd_mgr.get_component_definition()
# write OSCAL ComponentDefinition to file
if self._verbose:
logger.info(f'output: {ofile}')
component_definition.oscal_write(pathlib.Path(ofile))
return TaskOutcome('success')
def _calculate_rules(self) -> tuple:
"""Calculate rules add, delete, modify."""
cd_rules = self._cd_mgr.get_rule_keys()
csv_rules = self._csv_mgr.get_rule_keys()
del_rules = []
add_rules = []
mod_rules = []
for key in cd_rules:
if key in csv_rules:
continue
else:
del_rules.append(key)
logger.debug(f'rules del: {key}')
for key in csv_rules:
if key in cd_rules:
mod_rules.append(key)
logger.debug(f'rules mod: {key}')
else:
add_rules.append(key)
logger.debug(f'rules add: {key}')
return (del_rules, add_rules, mod_rules)
def _calculate_set_params(self, mod_rules: List) -> tuple:
"""Calculate set parameters add, delete, modify."""
cd_set_params = self._cd_mgr.get_set_params_keys()
csv_set_params = self._csv_mgr.get_set_params_keys()
del_set_params = []
add_set_params = []
mod_set_params = []
for key in cd_set_params:
rule_key = (key[0], key[1], key[2])
if rule_key not in mod_rules:
continue
if key in csv_set_params:
continue
else:
del_set_params.append(key)
logger.debug(f'params del: {key}')
for key in csv_set_params:
rule_key = (key[0], key[1], key[2])
if rule_key not in mod_rules:
continue
if key in cd_set_params:
mod_set_params.append(key)
logger.debug(f'params mod: {key}')
else:
add_set_params.append(key)
logger.debug(f'params add: {key}')
return (del_set_params, add_set_params, mod_set_params)
def _calculate_control_mappings(self, mod_rules: List) -> tuple:
"""Calculate control mappings add, delete, modify."""
cd_controls = self._cd_mgr.get_control_keys()
csv_controls = self._csv_mgr.get_control_keys()
del_control_mappings = []
add_control_mappings = []
mod_control_mappings = []
for key in cd_controls:
rule_key = (key[0], key[1], key[2])
if rule_key not in mod_rules:
continue
if key in csv_controls:
continue
else:
del_control_mappings.append(key)
logger.debug(f'ctl-maps del: {key}')
for key in csv_controls:
rule_key = (key[0], key[1], key[2])
if rule_key not in mod_rules:
continue
if key in cd_controls:
mod_control_mappings.append(key)
logger.debug(f'ctl-maps mod: {key}')
else:
add_control_mappings.append(key)
logger.debug(f'ctl-maps add: {key}')
return (del_control_mappings, add_control_mappings, mod_control_mappings)
def _get_namespace(self, rule_key: tuple) -> str:
"""Get namespace."""
return self._csv_mgr.get_value(rule_key, NAMESPACE).strip()
def _get_prop_name(self, column_name: str) -> str:
"""Get property name."""
return column_name.lstrip('$')
def rules_del(self, del_rules: List[str]) -> None:
"""Delete rules."""
for tokens in del_rules:
component_title = tokens[0]
component_type = tokens[1]
rule_id = tokens[2]
description = ''
# component
component = self._cd_mgr.get_component(component_title, component_type, description)
# props
component.props = self._delete_rule_props(component, rule_id)
def _delete_rule_props(self, component: DefinedComponent, rule_id: str) -> List[Property]:
"""Delete rule props."""
props = []
rule_set = _RuleSetHelper.get_rule_set(component.props, rule_id)
for prop in component.props:
if prop.remarks != rule_set:
props.append(prop)
elif prop.name in self._csv_mgr.get_parameter_id_column_names():
self._delete_rule_set_parameter(component, prop.value)
elif prop.name == RULE_ID:
self._delete_rule_implemented_requirement(component, prop.value)
return props
def _control_implementation_generator(
self, control_implementations: List[ControlImplementation]
) -> Iterator[ControlImplementation]:
"""Control implementation generator."""
if control_implementations:
for control_implementation in control_implementations:
yield control_implementation
def _set_parameter_generator(self, set_parameters: List[SetParameter]) -> Iterator[SetParameter]:
"""Set parameter generator."""
if set_parameters:
for set_parameter in set_parameters:
yield set_parameter
def _implemented_requirement_generator(
self, implemented_requirements: List[ImplementedRequirement]
) -> Iterator[ImplementedRequirement]:
"""Implemented-requirement generator."""
if implemented_requirements:
for implemented_requirement in implemented_requirements:
yield implemented_requirement
def _delete_rule_set_parameter(self, component: DefinedComponent, parameter_id: str) -> None:
"""Delete rule set-parameter."""
control_implementations = component.control_implementations
for control_implementation in self._control_implementation_generator(control_implementations):
if control_implementation.set_parameters:
set_parameters = control_implementation.set_parameters
control_implementation.set_parameters = []
for set_parameter in set_parameters:
if set_parameter.param_id != parameter_id:
_OscalHelper.add_set_parameter(control_implementation.set_parameters, set_parameter)
if not len(control_implementation.set_parameters):
control_implementation.set_parameters = None
def _delete_rule_implemented_requirement(self, component: DefinedComponent, rule_id: str) -> None:
"""Delete rule implemented_requirement."""
control_implementations = component.control_implementations
component.control_implementations = []
for control_implementation in self._control_implementation_generator(control_implementations):
if control_implementation.implemented_requirements:
implemented_requirements = control_implementation.implemented_requirements
control_implementation.implemented_requirements = []
for implemented_requirement in implemented_requirements:
self._delete_ir_props(implemented_requirement, rule_id)
self._delete_ir_statements(implemented_requirement, rule_id)
if len(as_list(implemented_requirement.props)) or len(as_list(implemented_requirement.statements)):
control_implementation.implemented_requirements.append(implemented_requirement)
if len(as_list(control_implementation.implemented_requirements)):
component.control_implementations.append(control_implementation)
def _delete_ir_statements(self, implemented_requirement: ImplementedRequirement, rule_id: str) -> None:
"""Delete implemented-requirement statements."""
if implemented_requirement.statements:
statements = implemented_requirement.statements
implemented_requirement.statements = []
for statement in statements:
statement.props = self._delete_props(statement.props, rule_id)
if not len(statement.props):
statement.props = None
if statement.props:
implemented_requirement.statements.append(statement)
if not len(implemented_requirement.statements):
implemented_requirement.statements = None
def _delete_ir_props(self, implemented_requirement: ImplementedRequirement, rule_id: str) -> None:
"""Delete implemented-requirement props."""
if implemented_requirement.props:
implemented_requirement.props = self._delete_props(implemented_requirement.props, rule_id)
if not len(implemented_requirement.props):
implemented_requirement.props = None
def _delete_props(self, props: List[Property], rule_id: str) -> List[property]:
"""Delete props."""
rval = []
if props:
for prop in props:
if prop.name == RULE_ID and prop.value == rule_id:
continue
rval.append(prop)
return rval
def rules_add(self, add_rules: List[str]) -> None:
"""Add rules."""
for rule_key in add_rules:
component_title = self._csv_mgr.get_value(rule_key, COMPONENT_TITLE)
component_type = self._csv_mgr.get_value(rule_key, COMPONENT_TYPE)
component_description = self._csv_mgr.get_value(rule_key, COMPONENT_DESCRIPTION)
# component
component = self._cd_mgr.get_component(component_title, component_type, component_description)
# props
component.props = as_list(component.props)
component.props = component.props + self._create_rule_props(rule_key)
# additional props, when not validation component
if not self._is_validation(rule_key):
# control implementation
source = self._csv_mgr.get_value(rule_key, PROFILE_SOURCE)
description = self._csv_mgr.get_value(rule_key, PROFILE_DESCRIPTION)
control_implementation = self._get_control_implementation(component, source, description)
# set-parameters
set_parameters = self._create_set_parameters(rule_key)
if set_parameters:
control_implementation.set_parameters = as_list(control_implementation.set_parameters)
_OscalHelper.add_set_parameters(control_implementation.set_parameters, set_parameters)
# control-mappings
control_mappings = self._csv_mgr.get_value(rule_key, CONTROL_ID_LIST).split()
self._add_rule_prop(control_implementation, control_mappings, rule_key)
def _is_validation(self, rule_key: tuple) -> bool:
"""Check for validation component."""
component_type = self._csv_mgr.get_value(rule_key, COMPONENT_TYPE)
return component_type.lower() == validation
def _add_rule_prop(
self, control_implementation: ControlImplementation, control_mappings: List[str], rule_key: tuple
) -> None:
"""Add rule prop."""
namespace = self._get_namespace(rule_key)
for control_mapping in control_mappings:
control_id = derive_control_id(control_mapping)
implemented_requirement = self._get_implemented_requirement(control_implementation, control_id)
# create rule implementation (as property)
name = RULE_ID
prop = Property(
name=name,
value=self._csv_mgr.get_value(rule_key, name),
ns=namespace,
class_=self.get_class(name),
)
part_id = derive_part_id(control_mapping)
if part_id is None:
implemented_requirement.props = as_list(implemented_requirement.props)
implemented_requirement.props.append(prop)
else:
statement = self._get_statement(implemented_requirement, part_id)
statement.props.append(prop)
def _create_rule_props(self, rule_key: tuple) -> List[Property]:
"""Create rule props."""
rule_set = self._rule_set_id_mgr.get_next_rule_set_id()
row_number = self._csv_mgr.get_row_number(rule_key)
rule_set_mgr = _RuleSetMgr(row_number, rule_set)
namespace = self._get_namespace(rule_key)
if self._is_validation(rule_key):
column_names = CsvColumn.get_check_property_column_names()
else:
column_names = CsvColumn.get_rule_property_column_names()
# req'd & optional props
for column_name in column_names:
prop_name = self._get_prop_name(column_name)
prop_value = self._csv_mgr.get_value(rule_key, column_name).strip()
rule_set_mgr.add_prop(prop_name, prop_value, namespace, self.get_class(prop_name))
if not self._is_validation(rule_key):
# parameter columns
column_names = self._csv_mgr.get_parameter_column_names()
for column_name in column_names:
prop_name = self._get_prop_name(column_name)
prop_value = self._csv_mgr.get_value(rule_key, column_name).strip()
rule_set_mgr.add_prop(prop_name, prop_value, namespace, self.get_class(prop_name))
# user props
column_names = self._csv_mgr.get_user_column_names()
for column_name in column_names:
if column_name.startswith('#'):
continue
prop_name = self._get_prop_name(column_name)
prop_value = self._csv_mgr.get_value(rule_key, column_name).strip()
rule_set_mgr.add_prop(prop_name, prop_value, namespace, self.get_class(prop_name))
return rule_set_mgr.get_props()
def _get_control_implementation(
self, component: DefinedComponent, source: str, description: str
) -> ControlImplementation:
"""Find or create control implementation."""
component.control_implementations = as_list(component.control_implementations)
for control_implementation in component.control_implementations:
if control_implementation.source == source and control_implementation.description == description:
return control_implementation
control_implementation = ControlImplementation(
uuid=str(uuid.uuid4()), source=source, description=description, implemented_requirements=[]
)
component.control_implementations.append(control_implementation)
return control_implementation
def _str_to_list(self, value: str) -> List[str]:
"""Transform string to list."""
rval = []
if ',' in value:
values = value.split(',')
# remove leading/trailing whitespace
for v in values:
rval.append(v.strip())
else:
rval.append(value)
return rval
def _create_set_parameters(self, rule_key: tuple) -> List[SetParameter]:
"""Create set parameters."""
set_parameters = []
for parameter_id_column_name in self._csv_mgr.get_parameter_id_column_names():
suffix = parameter_id_column_name.replace(PARAMETER_ID, '')
parameter_value_default_column_name = f'{PARAMETER_VALUE_DEFAULT}{suffix}'
name = self._csv_mgr.get_value(rule_key, parameter_id_column_name)
value = self._csv_mgr.get_value(rule_key, parameter_value_default_column_name)
if name and value:
values = self._str_to_list(value)
set_parameter = SetParameter(
param_id=name,
values=values,
)
set_parameters.append(set_parameter)
elif name:
row_number = self._csv_mgr.get_row_number(rule_key)
text = f'row "{row_number}" missing value for "{parameter_value_default_column_name}"'
logger.debug(text)
return set_parameters
def _get_implemented_requirement(
self, control_implementation: ControlImplementation, control_id: str
) -> ImplementedRequirement:
"""Find or create implemented requirement."""
if self._validate_controls != 'off':
if not self._resolved_profile_catalog_helper.validate(control_id):
if control_id not in self._unresolved_controls:
self._unresolved_controls.append(control_id)
for implemented_requirement in control_implementation.implemented_requirements:
if implemented_requirement.control_id == control_id:
return implemented_requirement
implemented_requirement = ImplementedRequirement(
uuid=str(uuid.uuid4()),
control_id=control_id,
description='',
)
control_implementation.implemented_requirements.append(implemented_requirement)
return implemented_requirement
def _get_statement(self, implemented_requirement: ImplementedRequirement, part_id: str) -> Statement:
"""Find or create statement."""
implemented_requirement.statements = as_list(implemented_requirement.statements)
for statement in implemented_requirement.statements:
if statement.statement_id == part_id:
return statement
statement = Statement(
uuid=str(uuid.uuid4()),
statement_id=part_id,
description='',
props=[],
)
implemented_requirement.statements.append(statement)
return statement
def rules_mod(self, mod_rules: List[str]) -> None:
"""Modify rules."""
for rule_key in mod_rules:
component_title = self._csv_mgr.get_value(rule_key, COMPONENT_TITLE)
component_type = self._csv_mgr.get_value(rule_key, COMPONENT_TYPE)
component_description = self._csv_mgr.get_value(rule_key, COMPONENT_DESCRIPTION)
# component
component = self._cd_mgr.get_component(component_title, component_type, component_description)
# props
component.props = self._modify_rule_props(component, rule_key)
def _modify_rule_props(self, component: DefinedComponent, rule_key: tuple) -> List[Property]:
"""Modify rule props."""
rule_id = self._csv_mgr.get_value(rule_key, RULE_ID)
rule_set = _RuleSetHelper.get_rule_set(component.props, rule_id)
rule_ns = self._csv_mgr.get_value(rule_key, NAMESPACE)
column_names = CsvColumn.get_filtered_required_column_names() + CsvColumn.get_filtered_optional_column_names()
# req'd & optional props
for column_name in column_names:
column_value = self._csv_mgr.get_value(rule_key, column_name).strip()
class_ = self.get_class(column_name)
self._cd_mgr.update_rule_definition(component, rule_set, column_name, column_value, rule_ns, class_)
# parameter columns
column_names = self._csv_mgr.get_parameter_column_names()
for column_name in column_names:
column_value = self._csv_mgr.get_value(rule_key, column_name).strip()
class_ = self.get_class(column_name)
self._cd_mgr.update_rule_definition(component, rule_set, column_name, column_value, rule_ns, class_)
# user props
column_names = self._csv_mgr.get_user_column_names()
for column_name in column_names:
column_value = self._csv_mgr.get_value(rule_key, column_name).strip()
self._cd_mgr.update_rule_definition(component, rule_set, column_name, column_value, rule_ns, class_)
return component.props
def set_params_del(self, del_set_params: List[str]) -> None:
"""Set parameters delete."""
for tokens in del_set_params:
component_title = tokens[0]
component_type = tokens[1]
source = tokens[3]
description = tokens[4]
param_id = tokens[5]
control_implementation = self._cd_mgr.find_control_implementation(
component_title, component_type, source, description
)
if control_implementation:
set_parameters = control_implementation.set_parameters
control_implementation.set_parameters = []
for set_parameter in self._set_parameter_generator(set_parameters):
if set_parameter.param_id == param_id:
continue
_OscalHelper.add_set_parameter(control_implementation.set_parameters, set_parameter)
if control_implementation.set_parameters == []:
control_implementation.set_parameters = None
def set_params_add(self, add_set_params: List[str]) -> None:
"""Set parameters add."""
for tokens in add_set_params:
component_title = tokens[0]
component_type = tokens[1]
rule_id = tokens[2]
source = tokens[3]
description = tokens[4]
param_id = tokens[5]
control_implementation = self._cd_mgr.find_control_implementation(
component_title, component_type, source, description
)
control_implementation.set_parameters = as_list(control_implementation.set_parameters)
# add
rule_key = _CsvMgr.get_rule_key(component_title, component_type, rule_id)
values = [self._csv_mgr.get_default_value_by_id(rule_key, param_id)]
set_parameter = SetParameter(
param_id=param_id,
values=values,
)
_OscalHelper.add_set_parameter(control_implementation.set_parameters, set_parameter)
def set_params_mod(self, mod_set_params: List[str]) -> None:
"""Set parameters modify."""
for tokens in mod_set_params:
component_title = tokens[0]
component_type = tokens[1]
rule_id = tokens[2]
source = tokens[3]
description = tokens[4]
param_id = tokens[5]
control_implementation = self._cd_mgr.find_control_implementation(
component_title, component_type, source, description
)
if control_implementation:
set_parameters = control_implementation.set_parameters
for set_parameter in self._set_parameter_generator(set_parameters):
if set_parameter.param_id != param_id:
continue
rule_key = _CsvMgr.get_rule_key(component_title, component_type, rule_id)
values = [self._csv_mgr.get_default_value_by_id(rule_key, param_id)]
replacement = SetParameter(
param_id=param_id,
values=values,
)
if set_parameter.values == replacement.values:
continue
logger.debug(f'params-mod: {rule_id} {param_id} {set_parameter.values} -> {replacement.values}')
set_parameter.values = replacement.values
def _control_mappings_generator(self, control_mappings: List[str]) -> Iterator[List[str]]:
"""Control mappings generator."""
for tokens in control_mappings:
component_title = tokens[0]
component_type = tokens[1]
source = tokens[3]
description = tokens[4]
control_implementation = self._cd_mgr.find_control_implementation(
component_title, component_type, source, description
)
if control_implementation:
yield tokens
def control_mappings_del(self, del_control_mappings: List[str]) -> None:
"""Control mappings delete."""
for tokens in self._control_mappings_generator(del_control_mappings):
component_title = tokens[0]
component_type = tokens[1]
rule_id = tokens[2]
source = tokens[3]
description = tokens[4]
smt_id = tokens[5]
control_id = derive_control_id(smt_id)
control_implementation = self._cd_mgr.find_control_implementation(
component_title, component_type, source, description
)
implemented_requirements = control_implementation.implemented_requirements
control_implementation.implemented_requirements = []
for implemented_requirement in self._implemented_requirement_generator(implemented_requirements):
if implemented_requirement.control_id == control_id:
implemented_requirement.statements = _OscalHelper.remove_rule_statement(
implemented_requirement.statements, rule_id, smt_id
)
implemented_requirement.props = _OscalHelper.remove_rule(implemented_requirement.props, rule_id)
if len(as_list(implemented_requirement.props)) or len(as_list(implemented_requirement.statements)):
control_implementation.implemented_requirements.append(implemented_requirement)
else:
control_implementation.implemented_requirements.append(implemented_requirement)
def control_mappings_add(self, add_control_mappings: List[str]) -> None:
"""Control mappings add."""
for tokens in self._control_mappings_generator(add_control_mappings):
component_title = tokens[0]
component_type = tokens[1]
rule_id = tokens[2]
source = tokens[3]
description = tokens[4]
smt_id = tokens[5]
control_id = derive_control_id(smt_id)
control_implementation = self._cd_mgr.find_control_implementation(
component_title, component_type, source, description
)
implemented_requirement = self._get_implemented_requirement(control_implementation, control_id)
# namespace
rule_key = (tokens[0], tokens[1], tokens[2])
ns = self._get_namespace(rule_key)
# create rule implementation (as property)
name = RULE_ID
prop = Property(
name=name,
value=rule_id,
ns=ns,
class_=self.get_class(name),
)
if smt_id == control_id:
implemented_requirement.props = as_list(implemented_requirement.props)
implemented_requirement.props.append(prop)
else:
statement = self._get_statement(implemented_requirement, smt_id)
statement.props.append(prop)
name: str
¤
Methods¤
__init__(self, config_object)
special
¤
Initialize trestle task csv-to-oscal-cd.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config_object |
Optional[configparser.SectionProxy] |
Config section associated with the task. |
required |
Source code in trestle/tasks/csv_to_oscal_cd.py
def __init__(self, config_object: Optional[configparser.SectionProxy]) -> None:
"""
Initialize trestle task csv-to-oscal-cd.
Args:
config_object: Config section associated with the task.
"""
super().__init__(config_object)
configure(self)
¤
Configure.
Source code in trestle/tasks/csv_to_oscal_cd.py
def configure(self) -> bool:
"""Configure."""
self._timestamp = datetime.datetime.utcnow().replace(microsecond=0).replace(tzinfo=datetime.timezone.utc
).isoformat()
# config verbosity
self._quiet = self._config.get('quiet', False)
self._verbose = not self._quiet
# title
self._title = self._config.get('title')
if self._title is None:
logger.warning('config missing "title"')
return False
# version
self._version = self._config.get('version')
if self._version is None:
logger.warning('config missing "version"')
return False
# config csv
self._csv_file = self._config.get('csv-file')
if self._csv_file is None:
logger.warning('config missing "csv-file"')
return False
self._csv_path = pathlib.Path(self._csv_file)
if not self._csv_path.exists():
logger.warning('"csv-file" not found')
return False
# announce csv
if self._verbose:
logger.info(f'input: {self._csv_file}')
# config cd
self._cd_path = None
self._cd_file = self._config.get('component-definition')
if self._cd_file is not None:
self._cd_path = pathlib.Path(self._cd_file)
if not self._cd_path.exists():
logger.warning('"component-definition" not found')
return False
# workspace
self._workspace = os.getcwd()
# validate_controls
self._validate_controls = self._config.get('validate-controls', 'off')
return True
control_mappings_add(self, add_control_mappings)
¤
Control mappings add.
Source code in trestle/tasks/csv_to_oscal_cd.py
def control_mappings_add(self, add_control_mappings: List[str]) -> None:
"""Control mappings add."""
for tokens in self._control_mappings_generator(add_control_mappings):
component_title = tokens[0]
component_type = tokens[1]
rule_id = tokens[2]
source = tokens[3]
description = tokens[4]
smt_id = tokens[5]
control_id = derive_control_id(smt_id)
control_implementation = self._cd_mgr.find_control_implementation(
component_title, component_type, source, description
)
implemented_requirement = self._get_implemented_requirement(control_implementation, control_id)
# namespace
rule_key = (tokens[0], tokens[1], tokens[2])
ns = self._get_namespace(rule_key)
# create rule implementation (as property)
name = RULE_ID
prop = Property(
name=name,
value=rule_id,
ns=ns,
class_=self.get_class(name),
)
if smt_id == control_id:
implemented_requirement.props = as_list(implemented_requirement.props)
implemented_requirement.props.append(prop)
else:
statement = self._get_statement(implemented_requirement, smt_id)
statement.props.append(prop)
control_mappings_del(self, del_control_mappings)
¤
Control mappings delete.
Source code in trestle/tasks/csv_to_oscal_cd.py
def control_mappings_del(self, del_control_mappings: List[str]) -> None:
"""Control mappings delete."""
for tokens in self._control_mappings_generator(del_control_mappings):
component_title = tokens[0]
component_type = tokens[1]
rule_id = tokens[2]
source = tokens[3]
description = tokens[4]
smt_id = tokens[5]
control_id = derive_control_id(smt_id)
control_implementation = self._cd_mgr.find_control_implementation(
component_title, component_type, source, description
)
implemented_requirements = control_implementation.implemented_requirements
control_implementation.implemented_requirements = []
for implemented_requirement in self._implemented_requirement_generator(implemented_requirements):
if implemented_requirement.control_id == control_id:
implemented_requirement.statements = _OscalHelper.remove_rule_statement(
implemented_requirement.statements, rule_id, smt_id
)
implemented_requirement.props = _OscalHelper.remove_rule(implemented_requirement.props, rule_id)
if len(as_list(implemented_requirement.props)) or len(as_list(implemented_requirement.statements)):
control_implementation.implemented_requirements.append(implemented_requirement)
else:
control_implementation.implemented_requirements.append(implemented_requirement)
execute(self)
¤
Provide an executed outcome.
Source code in trestle/tasks/csv_to_oscal_cd.py
def execute(self) -> TaskOutcome:
"""Provide an executed outcome."""
try:
return self._execute()
except Exception:
logger.error(traceback.format_exc())
return TaskOutcome('failure')
get_class(self, name)
¤
Get class value for specified name from config.
Source code in trestle/tasks/csv_to_oscal_cd.py
def get_class(self, name: str) -> str:
"""Get class value for specified name from config."""
key = f'class.{name}'
return self._config.get(key)
print_info(self)
¤
Print the help string.
Source code in trestle/tasks/csv_to_oscal_cd.py
def print_info(self) -> None:
"""Print the help string."""
name = self.name
oscal_name = 'component_definition'
#
logger.info(f'Help information for {name} task.')
logger.info('')
logger.info(f'Purpose: From csv produce OSCAL {oscal_name} file.')
logger.info('')
logger.info('')
logger.info(f'Configuration flags sit under [task.{name}]:')
text1 = ' title = '
text2 = '(required) the component definition title.'
logger.info(text1 + text2)
text1 = ' version = '
text2 = '(required) the component definition version.'
logger.info(text1 + text2)
text1 = ' csv-file = '
text2 = '(required) the path of the csv file.'
text3 = ' [1st row are column headings; 2nd row are column descriptions; 3rd row and beyond is data]'
logger.info(text1 + text2 + text3)
text1 = ' required columns: '
for text2 in CsvColumn.get_required_column_names():
if text2 in [f'{RULE_DESCRIPTION}', f'{PROFILE_SOURCE}', f'{PROFILE_DESCRIPTION}', f'{CONTROL_ID_LIST}']:
text2 += ' (see note 1)'
logger.info(text1 + '$$' + text2)
text1 = ' '
text1 = ' optional columns: '
for text2 in CsvColumn.get_optional_column_names():
if text2 in [f'{ORIGINAL_RISK_RATING}', f'{ADJUSTED_RISK_RATING}', f'{RISK_ADJUSTMENT}']:
text2 += ' (see note 1)'
else:
text2 += ' (see note 2)'
logger.info(text1 + '$' + text2)
text1 = ' '
for text2 in CsvColumn.get_parameter_column_names():
text2 += ' (see notes 1, 4)'
logger.info(text1 + '$' + text2)
text1 = ' '
text1 = ' comment columns: '
text2 = 'Informational (see note 3)'
logger.info(text1 + '#' + text2)
text1 = ' output-dir = '
text2 = '(required) the path of the output directory for synthesized OSCAL .json files.'
logger.info(text1 + text2)
text1 = ' component-definition = '
text2 = '(optional) the path of the existing component-definition OSCAL .json file.'
logger.info(text1 + text2)
text1 = ' class.column-name = '
text2 = f'(optional) the class to associate with the specified column name, e.g. class.{RULE_ID} = scc_class'
logger.info(text1 + text2)
text1 = ' output-overwrite = '
text2 = '(optional) true [default] or false; replace existing output when true.'
logger.info(text1 + text2)
text1 = ' validate-controls = '
text2 = '(optional) on, warn, or off [default]; validate controls exist in resolved profile.'
logger.info(text1 + text2)
# Notes
text1 = ''
text2 = ''
logger.info(text1 + text2)
text1 = 'Notes: '
text2 = '[1] column is ignored for validation component type'
logger.info(text1 + text2)
text1 = ' '
text2 = '[2] column is required for validation component type'
logger.info(text1 + text2)
text1 = ' '
text2 = '[3] column name starting with # causes column to be ignored'
logger.info(text1 + text2)
text1 = ' '
text2 = '[4] additional parameters are specified by adding a common suffix per set'
text3 = f', for example: {PARAMETER_ID}_1, {PARAMETER_DESCRIPTION}_1, ...{PARAMETER_ID}_2...'
logger.info(text1 + text2 + text3)
rules_add(self, add_rules)
¤
Add rules.
Source code in trestle/tasks/csv_to_oscal_cd.py
def rules_add(self, add_rules: List[str]) -> None:
"""Add rules."""
for rule_key in add_rules:
component_title = self._csv_mgr.get_value(rule_key, COMPONENT_TITLE)
component_type = self._csv_mgr.get_value(rule_key, COMPONENT_TYPE)
component_description = self._csv_mgr.get_value(rule_key, COMPONENT_DESCRIPTION)
# component
component = self._cd_mgr.get_component(component_title, component_type, component_description)
# props
component.props = as_list(component.props)
component.props = component.props + self._create_rule_props(rule_key)
# additional props, when not validation component
if not self._is_validation(rule_key):
# control implementation
source = self._csv_mgr.get_value(rule_key, PROFILE_SOURCE)
description = self._csv_mgr.get_value(rule_key, PROFILE_DESCRIPTION)
control_implementation = self._get_control_implementation(component, source, description)
# set-parameters
set_parameters = self._create_set_parameters(rule_key)
if set_parameters:
control_implementation.set_parameters = as_list(control_implementation.set_parameters)
_OscalHelper.add_set_parameters(control_implementation.set_parameters, set_parameters)
# control-mappings
control_mappings = self._csv_mgr.get_value(rule_key, CONTROL_ID_LIST).split()
self._add_rule_prop(control_implementation, control_mappings, rule_key)
rules_del(self, del_rules)
¤
Delete rules.
Source code in trestle/tasks/csv_to_oscal_cd.py
def rules_del(self, del_rules: List[str]) -> None:
"""Delete rules."""
for tokens in del_rules:
component_title = tokens[0]
component_type = tokens[1]
rule_id = tokens[2]
description = ''
# component
component = self._cd_mgr.get_component(component_title, component_type, description)
# props
component.props = self._delete_rule_props(component, rule_id)
rules_mod(self, mod_rules)
¤
Modify rules.
Source code in trestle/tasks/csv_to_oscal_cd.py
def rules_mod(self, mod_rules: List[str]) -> None:
"""Modify rules."""
for rule_key in mod_rules:
component_title = self._csv_mgr.get_value(rule_key, COMPONENT_TITLE)
component_type = self._csv_mgr.get_value(rule_key, COMPONENT_TYPE)
component_description = self._csv_mgr.get_value(rule_key, COMPONENT_DESCRIPTION)
# component
component = self._cd_mgr.get_component(component_title, component_type, component_description)
# props
component.props = self._modify_rule_props(component, rule_key)
set_params_add(self, add_set_params)
¤
Set parameters add.
Source code in trestle/tasks/csv_to_oscal_cd.py
def set_params_add(self, add_set_params: List[str]) -> None:
"""Set parameters add."""
for tokens in add_set_params:
component_title = tokens[0]
component_type = tokens[1]
rule_id = tokens[2]
source = tokens[3]
description = tokens[4]
param_id = tokens[5]
control_implementation = self._cd_mgr.find_control_implementation(
component_title, component_type, source, description
)
control_implementation.set_parameters = as_list(control_implementation.set_parameters)
# add
rule_key = _CsvMgr.get_rule_key(component_title, component_type, rule_id)
values = [self._csv_mgr.get_default_value_by_id(rule_key, param_id)]
set_parameter = SetParameter(
param_id=param_id,
values=values,
)
_OscalHelper.add_set_parameter(control_implementation.set_parameters, set_parameter)
set_params_del(self, del_set_params)
¤
Set parameters delete.
Source code in trestle/tasks/csv_to_oscal_cd.py
def set_params_del(self, del_set_params: List[str]) -> None:
"""Set parameters delete."""
for tokens in del_set_params:
component_title = tokens[0]
component_type = tokens[1]
source = tokens[3]
description = tokens[4]
param_id = tokens[5]
control_implementation = self._cd_mgr.find_control_implementation(
component_title, component_type, source, description
)
if control_implementation:
set_parameters = control_implementation.set_parameters
control_implementation.set_parameters = []
for set_parameter in self._set_parameter_generator(set_parameters):
if set_parameter.param_id == param_id:
continue
_OscalHelper.add_set_parameter(control_implementation.set_parameters, set_parameter)
if control_implementation.set_parameters == []:
control_implementation.set_parameters = None
set_params_mod(self, mod_set_params)
¤
Set parameters modify.
Source code in trestle/tasks/csv_to_oscal_cd.py
def set_params_mod(self, mod_set_params: List[str]) -> None:
"""Set parameters modify."""
for tokens in mod_set_params:
component_title = tokens[0]
component_type = tokens[1]
rule_id = tokens[2]
source = tokens[3]
description = tokens[4]
param_id = tokens[5]
control_implementation = self._cd_mgr.find_control_implementation(
component_title, component_type, source, description
)
if control_implementation:
set_parameters = control_implementation.set_parameters
for set_parameter in self._set_parameter_generator(set_parameters):
if set_parameter.param_id != param_id:
continue
rule_key = _CsvMgr.get_rule_key(component_title, component_type, rule_id)
values = [self._csv_mgr.get_default_value_by_id(rule_key, param_id)]
replacement = SetParameter(
param_id=param_id,
values=values,
)
if set_parameter.values == replacement.values:
continue
logger.debug(f'params-mod: {rule_id} {param_id} {set_parameter.values} -> {replacement.values}')
set_parameter.values = replacement.values
simulate(self)
¤
Provide a simulated outcome.
Source code in trestle/tasks/csv_to_oscal_cd.py
def simulate(self) -> TaskOutcome:
"""Provide a simulated outcome."""
return TaskOutcome('simulated-success')
Functions¤
derive_control_id(control_mapping)
¤
Derive control id.
Source code in trestle/tasks/csv_to_oscal_cd.py
def derive_control_id(control_mapping: str) -> str:
"""Derive control id."""
rval = control_mapping.split('_smt')[0]
return rval
derive_part_id(control_mapping)
¤
Derive part id.
Source code in trestle/tasks/csv_to_oscal_cd.py
def derive_part_id(control_mapping: str) -> str:
"""Derive part id."""
if '_smt.' in control_mapping:
rval = control_mapping
else:
rval = None
return rval
etype(target)
¤
Get etype.
Source code in trestle/tasks/csv_to_oscal_cd.py
def etype(target: str) -> str:
"""Get etype."""
if target:
return 'invalid'
else:
return 'missing'
row_property_builder(row, name, value, ns, class_, remarks)
¤
Row property builder.
Source code in trestle/tasks/csv_to_oscal_cd.py
def row_property_builder(row: int, name: str, value, ns: str, class_: str, remarks: str) -> Property:
"""Row property builder."""
# name
try:
Property(
name=name,
value='value',
)
except Exception:
text = f'property for row: {row} name: {name} is {etype(name)}'
raise RuntimeError(text)
# value
try:
Property(
name=name,
value=value,
)
except Exception:
text = f'property for row: {row} value: {value} is {etype(value)}'
raise RuntimeError(text)
# ns
try:
Property(
name=name,
value=value,
ns=ns,
)
except Exception:
text = f'property for row: {row} ns: {ns} is {etype(ns)}'
raise RuntimeError(text)
# class
try:
Property(
name=name,
value=value,
class_=class_,
)
except Exception:
text = f'property for row: {row} class: {class_} is {etype(class_)}'
raise RuntimeError(text)
# prop
prop = Property(
name=name,
value=value,
ns=ns,
class_=class_,
remarks=remarks,
)
return prop
handler: python