Skip to content

csv_to_oscal_cd

trestle.tasks.csv_to_oscal_cd ¤

OSCAL transformation tasks.

ADJUSTED_RISK_RATING ¤

CHECK_DESCRIPTION ¤

CHECK_ID ¤

COMPONENT_DESCRIPTION ¤

COMPONENT_TITLE ¤

COMPONENT_TYPE ¤

CONTROL_ID_LIST ¤

FETCHER_DESCRIPTION ¤

FETCHER_ID ¤

HEADER_DECORATION_CHAR ¤

NAMESPACE ¤

ORIGINAL_RISK_RATING ¤

PARAMETER ¤

PARAMETER_DESCRIPTION ¤

PARAMETER_ID ¤

PARAMETER_VALUE_ALTERNATIVES ¤

PARAMETER_VALUE_DEFAULT ¤

PROFILE_DESCRIPTION ¤

PROFILE_SOURCE ¤

RISK_ADJUSTMENT ¤

RULE_DESCRIPTION ¤

RULE_ID ¤

Row ¤

logger ¤

prefix_rule_set ¤

validation ¤

Classes¤

CsvColumn ¤

CsvColumn.

Source code in trestle/tasks/csv_to_oscal_cd.py
class CsvColumn():
    """CsvColumn."""

    _columns_required = [
        f'{COMPONENT_TITLE}',
        f'{COMPONENT_DESCRIPTION}',
        f'{COMPONENT_TYPE}',
        f'{RULE_ID}',
        f'{RULE_DESCRIPTION}',
        f'{PROFILE_SOURCE}',
        f'{PROFILE_DESCRIPTION}',
        f'{CONTROL_ID_LIST}',
        f'{NAMESPACE}',
    ]

    _columns_required_validation = [
        f'{COMPONENT_TITLE}',
        f'{COMPONENT_DESCRIPTION}',
        f'{COMPONENT_TYPE}',
        f'{RULE_ID}',
        f'{NAMESPACE}',
        f'{CHECK_ID}',
        f'{CHECK_DESCRIPTION}',
    ]

    _columns_optional = [
        f'{CHECK_ID}',
        f'{CHECK_DESCRIPTION}',
        f'{ORIGINAL_RISK_RATING}',
        f'{ADJUSTED_RISK_RATING}',
        f'{RISK_ADJUSTMENT}',
    ]

    _columns_parameter = [
        f'{PARAMETER_ID}',
        f'{PARAMETER_DESCRIPTION}',
        f'{PARAMETER_VALUE_ALTERNATIVES}',
        f'{PARAMETER_VALUE_DEFAULT}',
    ]

    _columns_ordered = _columns_required + _columns_parameter + _columns_optional

    @staticmethod
    def get_order(column_name: str) -> int:
        """Get order for column_name."""
        rval = sys.maxsize
        if column_name in CsvColumn._columns_ordered:
            rval = CsvColumn._columns_ordered.index(column_name)
        return rval

    @staticmethod
    def is_column_name_required(name: str) -> bool:
        """Is column name required."""
        return name in (CsvColumn._columns_required + CsvColumn._columns_required_validation)

    @staticmethod
    def is_column_name_optional(name: str) -> bool:
        """Is column name optional."""
        return name in (CsvColumn._columns_optional)

    @staticmethod
    def is_column_name_parameter(name: str) -> bool:
        """Is column name parameter."""
        for cname in CsvColumn._columns_parameter:
            if name.startswith(cname):
                return True
        return False

    @staticmethod
    def get_required_column_names() -> List[str]:
        """Get required column names."""
        rval = []
        rval += CsvColumn._columns_required
        return rval

    @staticmethod
    def get_optional_column_names() -> List[str]:
        """Get optional column names."""
        rval = []
        rval += CsvColumn._columns_optional
        return rval

    @staticmethod
    def get_parameter_column_names() -> List[str]:
        """Get parameter column names."""
        rval = []
        rval += CsvColumn._columns_parameters
        return rval

    @staticmethod
    def get_required_column_names_validation() -> List[str]:
        """Get required column names validation."""
        rval = []
        rval += CsvColumn._columns_required_validation
        return rval

    _rule_property_column_names = [
        f'{RULE_ID}',
        f'{RULE_DESCRIPTION}',
        f'{PARAMETER_ID}',
        f'{PARAMETER_DESCRIPTION}',
        f'{PARAMETER_VALUE_ALTERNATIVES}',
        f'{CHECK_ID}',
        f'{CHECK_DESCRIPTION}',
        f'{ORIGINAL_RISK_RATING}',
        f'{ADJUSTED_RISK_RATING}',
        f'{RISK_ADJUSTMENT}',
    ]

    @staticmethod
    def get_rule_property_column_names() -> List[str]:
        """Get rule property column names."""
        return CsvColumn._rule_property_column_names

    # columns required which do not become properties
    _columns_required_filtered = [
        f'{COMPONENT_TITLE}',
        f'{COMPONENT_DESCRIPTION}',
        f'{COMPONENT_TYPE}',
        f'{PROFILE_SOURCE}',
        f'{PROFILE_DESCRIPTION}',
        f'{CONTROL_ID_LIST}',
        f'{NAMESPACE}',
    ]

    # optional columns which do not become properties, initially
    _columns_optional_filtered = []

    _columns_filtered = _columns_required_filtered + _columns_optional_filtered

    @staticmethod
    def get_filtered_required_column_names() -> List[str]:
        """Get filtered required column names."""
        rval = []
        for column_name in CsvColumn.get_required_column_names():
            if column_name not in CsvColumn._columns_filtered:
                rval.append(column_name)
        return rval

    @staticmethod
    def get_filtered_optional_column_names() -> List[str]:
        """Get filtered optional column names."""
        rval = []
        for column_name in CsvColumn.get_optional_column_names():
            if column_name not in CsvColumn._columns_filtered:
                rval.append(column_name)
        return rval

    _check_property_column_names = [
        f'{RULE_ID}',
        f'{CHECK_ID}',
        f'{CHECK_DESCRIPTION}',
    ]

    @staticmethod
    def get_check_property_column_names() -> List[str]:
        """Get check property column names."""
        return CsvColumn._check_property_column_names

    # optional columns which do become properties, afterwards
    _columns_parameters = [
        f'{PARAMETER_ID}',
        f'{PARAMETER_DESCRIPTION}',
        f'{PARAMETER_VALUE_ALTERNATIVES}',
    ]

    # optional columns which require Param_Id be present in the row
    _columns_parameters_dependent = [
        f'{PARAMETER_DESCRIPTION}',
        f'{PARAMETER_VALUE_ALTERNATIVES}',
        f'{PARAMETER_VALUE_DEFAULT}',
    ]
Methods¤
get_check_property_column_names() staticmethod ¤

Get check property column names.

Source code in trestle/tasks/csv_to_oscal_cd.py
@staticmethod
def get_check_property_column_names() -> List[str]:
    """Get check property column names."""
    return CsvColumn._check_property_column_names
get_filtered_optional_column_names() staticmethod ¤

Get filtered optional column names.

Source code in trestle/tasks/csv_to_oscal_cd.py
@staticmethod
def get_filtered_optional_column_names() -> List[str]:
    """Get filtered optional column names."""
    rval = []
    for column_name in CsvColumn.get_optional_column_names():
        if column_name not in CsvColumn._columns_filtered:
            rval.append(column_name)
    return rval
get_filtered_required_column_names() staticmethod ¤

Get filtered required column names.

Source code in trestle/tasks/csv_to_oscal_cd.py
@staticmethod
def get_filtered_required_column_names() -> List[str]:
    """Get filtered required column names."""
    rval = []
    for column_name in CsvColumn.get_required_column_names():
        if column_name not in CsvColumn._columns_filtered:
            rval.append(column_name)
    return rval
get_optional_column_names() staticmethod ¤

Get optional column names.

Source code in trestle/tasks/csv_to_oscal_cd.py
@staticmethod
def get_optional_column_names() -> List[str]:
    """Get optional column names."""
    rval = []
    rval += CsvColumn._columns_optional
    return rval
get_order(column_name) staticmethod ¤

Get order for column_name.

Source code in trestle/tasks/csv_to_oscal_cd.py
@staticmethod
def get_order(column_name: str) -> int:
    """Get order for column_name."""
    rval = sys.maxsize
    if column_name in CsvColumn._columns_ordered:
        rval = CsvColumn._columns_ordered.index(column_name)
    return rval
get_parameter_column_names() staticmethod ¤

Get parameter column names.

Source code in trestle/tasks/csv_to_oscal_cd.py
@staticmethod
def get_parameter_column_names() -> List[str]:
    """Get parameter column names."""
    rval = []
    rval += CsvColumn._columns_parameters
    return rval
get_required_column_names() staticmethod ¤

Get required column names.

Source code in trestle/tasks/csv_to_oscal_cd.py
@staticmethod
def get_required_column_names() -> List[str]:
    """Get required column names."""
    rval = []
    rval += CsvColumn._columns_required
    return rval
get_required_column_names_validation() staticmethod ¤

Get required column names validation.

Source code in trestle/tasks/csv_to_oscal_cd.py
@staticmethod
def get_required_column_names_validation() -> List[str]:
    """Get required column names validation."""
    rval = []
    rval += CsvColumn._columns_required_validation
    return rval
get_rule_property_column_names() staticmethod ¤

Get rule property column names.

Source code in trestle/tasks/csv_to_oscal_cd.py
@staticmethod
def get_rule_property_column_names() -> List[str]:
    """Get rule property column names."""
    return CsvColumn._rule_property_column_names
is_column_name_optional(name) staticmethod ¤

Is column name optional.

Source code in trestle/tasks/csv_to_oscal_cd.py
@staticmethod
def is_column_name_optional(name: str) -> bool:
    """Is column name optional."""
    return name in (CsvColumn._columns_optional)
is_column_name_parameter(name) staticmethod ¤

Is column name parameter.

Source code in trestle/tasks/csv_to_oscal_cd.py
@staticmethod
def is_column_name_parameter(name: str) -> bool:
    """Is column name parameter."""
    for cname in CsvColumn._columns_parameter:
        if name.startswith(cname):
            return True
    return False
is_column_name_required(name) staticmethod ¤

Is column name required.

Source code in trestle/tasks/csv_to_oscal_cd.py
@staticmethod
def is_column_name_required(name: str) -> bool:
    """Is column name required."""
    return name in (CsvColumn._columns_required + CsvColumn._columns_required_validation)

CsvToOscalComponentDefinition (TaskBase) ¤

Task to create OSCAL ComponentDefinition json.

Attributes:

Name Type Description
name str

Name of the task.

Source code in trestle/tasks/csv_to_oscal_cd.py
class CsvToOscalComponentDefinition(TaskBase):
    """
    Task to create OSCAL ComponentDefinition json.

    Attributes:
        name: Name of the task.
    """

    name = 'csv-to-oscal-cd'

    def __init__(self, config_object: Optional[configparser.SectionProxy]) -> None:
        """
        Initialize trestle task csv-to-oscal-cd.

        Args:
            config_object: Config section associated with the task.
        """
        super().__init__(config_object)

    def print_info(self) -> None:
        """Print the help string."""
        name = self.name
        oscal_name = 'component_definition'
        #
        logger.info(f'Help information for {name} task.')
        logger.info('')
        logger.info(f'Purpose: From csv produce OSCAL {oscal_name} file.')
        logger.info('')
        logger.info('')
        logger.info(f'Configuration flags sit under [task.{name}]:')
        text1 = '  title                = '
        text2 = '(required) the component definition title.'
        logger.info(text1 + text2)
        text1 = '  version              = '
        text2 = '(required) the component definition version.'
        logger.info(text1 + text2)
        text1 = '  csv-file             = '
        text2 = '(required) the path of the csv file.'
        text3 = ' [1st row are column headings; 2nd row are column descriptions; 3rd row and beyond is data]'
        logger.info(text1 + text2 + text3)
        text1 = '  required columns:      '
        for text2 in CsvColumn.get_required_column_names():
            if text2 in [f'{RULE_DESCRIPTION}', f'{PROFILE_SOURCE}', f'{PROFILE_DESCRIPTION}', f'{CONTROL_ID_LIST}']:
                text2 += ' (see note 1)'
            logger.info(text1 + '$$' + text2)
            text1 = '                         '
        text1 = '  optional columns:      '
        for text2 in CsvColumn.get_optional_column_names():
            if text2 in [f'{ORIGINAL_RISK_RATING}', f'{ADJUSTED_RISK_RATING}', f'{RISK_ADJUSTMENT}']:
                text2 += ' (see note 1)'
            else:
                text2 += ' (see note 2)'
            logger.info(text1 + '$' + text2)
            text1 = '                         '
        for text2 in CsvColumn.get_parameter_column_names():
            text2 += ' (see notes 1, 4)'
            logger.info(text1 + '$' + text2)
            text1 = '                         '
        text1 = '  comment columns:       '
        text2 = 'Informational (see note 3)'
        logger.info(text1 + '#' + text2)
        text1 = '  output-dir           = '
        text2 = '(required) the path of the output directory for synthesized OSCAL .json files.'
        logger.info(text1 + text2)
        text1 = '  component-definition = '
        text2 = '(optional) the path of the existing component-definition OSCAL .json file.'
        logger.info(text1 + text2)
        text1 = '  class.column-name    = '
        text2 = f'(optional) the class to associate with the specified column name, e.g. class.{RULE_ID} = scc_class'
        logger.info(text1 + text2)
        text1 = '  output-overwrite     = '
        text2 = '(optional) true [default] or false; replace existing output when true.'
        logger.info(text1 + text2)
        text1 = '  validate-controls    = '
        text2 = '(optional) on, warn, or off [default]; validate controls exist in resolved profile.'
        logger.info(text1 + text2)
        # Notes
        text1 = ''
        text2 = ''
        logger.info(text1 + text2)
        text1 = 'Notes: '
        text2 = '[1] column is ignored for validation component type'
        logger.info(text1 + text2)
        text1 = '       '
        text2 = '[2] column is required for validation component type'
        logger.info(text1 + text2)
        text1 = '       '
        text2 = '[3] column name starting with # causes column to be ignored'
        logger.info(text1 + text2)
        text1 = '       '
        text2 = '[4] additional parameters are specified by adding a common suffix per set'
        text3 = f', for example: {PARAMETER_ID}_1, {PARAMETER_DESCRIPTION}_1, ...{PARAMETER_ID}_2...'
        logger.info(text1 + text2 + text3)

    def configure(self) -> bool:
        """Configure."""
        self._timestamp = datetime.datetime.utcnow().replace(microsecond=0).replace(tzinfo=datetime.timezone.utc
                                                                                    ).isoformat()
        # config verbosity
        self._quiet = self._config.get('quiet', False)
        self._verbose = not self._quiet
        # title
        self._title = self._config.get('title')
        if self._title is None:
            logger.warning('config missing "title"')
            return False
        # version
        self._version = self._config.get('version')
        if self._version is None:
            logger.warning('config missing "version"')
            return False
        # config csv
        self._csv_file = self._config.get('csv-file')
        if self._csv_file is None:
            logger.warning('config missing "csv-file"')
            return False
        self._csv_path = pathlib.Path(self._csv_file)
        if not self._csv_path.exists():
            logger.warning('"csv-file" not found')
            return False
        # announce csv
        if self._verbose:
            logger.info(f'input: {self._csv_file}')
        # config cd
        self._cd_path = None
        self._cd_file = self._config.get('component-definition')
        if self._cd_file is not None:
            self._cd_path = pathlib.Path(self._cd_file)
            if not self._cd_path.exists():
                logger.warning('"component-definition" not found')
                return False
        # workspace
        self._workspace = os.getcwd()
        # validate_controls
        self._validate_controls = self._config.get('validate-controls', 'off')
        return True

    def get_class(self, name: str) -> str:
        """Get class value for specified name from config."""
        key = f'class.{name}'
        return self._config.get(key)

    def simulate(self) -> TaskOutcome:
        """Provide a simulated outcome."""
        return TaskOutcome('simulated-success')

    def execute(self) -> TaskOutcome:
        """Provide an executed outcome."""
        try:
            return self._execute()
        except Exception:
            logger.error(traceback.format_exc())
            return TaskOutcome('failure')

    def _execute(self) -> TaskOutcome:
        """Execute path core."""
        if not self.configure():
            return TaskOutcome('failure')
        # config output
        odir = self._config.get('output-dir')
        opth = pathlib.Path(odir)
        self._overwrite = self._config.getboolean('output-overwrite', True)
        # insure output dir exists
        opth.mkdir(exist_ok=True, parents=True)
        # calculate output file name & check writability
        oname = 'component-definition.json'
        ofile = opth / oname
        if not self._overwrite and pathlib.Path(ofile).exists():
            logger.warning(f'output: {ofile} already exists')
            return TaskOutcome('failure')
        # fetch existing component-definition, if any
        self._cd_mgr = _CdMgr(self._cd_path, self._title, self._timestamp, self._version)
        # fetch csv
        self._csv_mgr = _CsvMgr(self._csv_path)
        # create resolved profile -> catalog helper
        profile_list = self._csv_mgr.get_profile_list()
        self._resolved_profile_catalog_helper = _ResolvedProfileCatalogHelper(profile_list, self._workspace)
        self._unresolved_controls = []
        # calculate deletion, addition & modification rule lists
        rules = self._calculate_rules()
        # calculate deletion, addition & modification set-parameter lists
        set_params = self._calculate_set_params(rules[2])
        # calculate deletion, addition & modification control mapping lists
        control_mappings = self._calculate_control_mappings(rules[2])
        # rule set manager
        self._rule_set_id_mgr = _RuleSetIdMgr(self._cd_mgr.get_max_rule_set_number(), len(rules[1]))
        # rule additions, deletions & modifications (by row)
        self.rules_del(rules[0])
        self.rules_add(rules[1])
        self.rules_mod(rules[2])
        # set-parameters additions, deletions & modifications (by row)
        self.set_params_del(set_params[0])
        self.set_params_add(set_params[1])
        self.set_params_mod(set_params[2])
        # control mappings additions, deletions & modifications (by row)
        self.control_mappings_del(control_mappings[0])
        self.control_mappings_add(control_mappings[1])
        # note: control mappings mod is currently not possible
        # note: add/del user columns not currently supported
        if len(self._unresolved_controls) > 0:
            text = f'Unresolved controls: {self._unresolved_controls}'
            if self._validate_controls == 'warn':
                logger.warning(text)
            elif self._validate_controls == 'on':
                raise RuntimeError(text)
        # prepare new/revised component definition
        component_definition = self._cd_mgr.get_component_definition()
        # write OSCAL ComponentDefinition to file
        if self._verbose:
            logger.info(f'output: {ofile}')
        component_definition.oscal_write(pathlib.Path(ofile))
        return TaskOutcome('success')

    def _calculate_rules(self) -> tuple:
        """Calculate rules add, delete, modify."""
        cd_rules = self._cd_mgr.get_rule_keys()
        csv_rules = self._csv_mgr.get_rule_keys()
        del_rules = []
        add_rules = []
        mod_rules = []
        for key in cd_rules:
            if key in csv_rules:
                continue
            else:
                del_rules.append(key)
                logger.debug(f'rules del: {key}')
        for key in csv_rules:
            if key in cd_rules:
                mod_rules.append(key)
                logger.debug(f'rules mod: {key}')
            else:
                add_rules.append(key)
                logger.debug(f'rules add: {key}')
        return (del_rules, add_rules, mod_rules)

    def _calculate_set_params(self, mod_rules: List) -> tuple:
        """Calculate set parameters add, delete, modify."""
        cd_set_params = self._cd_mgr.get_set_params_keys()
        csv_set_params = self._csv_mgr.get_set_params_keys()
        del_set_params = []
        add_set_params = []
        mod_set_params = []
        for key in cd_set_params:
            rule_key = (key[0], key[1], key[2])
            if rule_key not in mod_rules:
                continue
            if key in csv_set_params:
                continue
            else:
                del_set_params.append(key)
                logger.debug(f'params del: {key}')
        for key in csv_set_params:
            rule_key = (key[0], key[1], key[2])
            if rule_key not in mod_rules:
                continue
            if key in cd_set_params:
                mod_set_params.append(key)
                logger.debug(f'params mod: {key}')
            else:
                add_set_params.append(key)
                logger.debug(f'params add: {key}')
        return (del_set_params, add_set_params, mod_set_params)

    def _calculate_control_mappings(self, mod_rules: List) -> tuple:
        """Calculate control mappings add, delete, modify."""
        cd_controls = self._cd_mgr.get_control_keys()
        csv_controls = self._csv_mgr.get_control_keys()
        del_control_mappings = []
        add_control_mappings = []
        mod_control_mappings = []
        for key in cd_controls:
            rule_key = (key[0], key[1], key[2])
            if rule_key not in mod_rules:
                continue
            if key in csv_controls:
                continue
            else:
                del_control_mappings.append(key)
                logger.debug(f'ctl-maps del: {key}')
        for key in csv_controls:
            rule_key = (key[0], key[1], key[2])
            if rule_key not in mod_rules:
                continue
            if key in cd_controls:
                mod_control_mappings.append(key)
                logger.debug(f'ctl-maps mod: {key}')
            else:
                add_control_mappings.append(key)
                logger.debug(f'ctl-maps add: {key}')
        return (del_control_mappings, add_control_mappings, mod_control_mappings)

    def _get_namespace(self, rule_key: tuple) -> str:
        """Get namespace."""
        return self._csv_mgr.get_value(rule_key, NAMESPACE).strip()

    def _get_prop_name(self, column_name: str) -> str:
        """Get property name."""
        return column_name.lstrip('$')

    def rules_del(self, del_rules: List[str]) -> None:
        """Delete rules."""
        for tokens in del_rules:
            component_title = tokens[0]
            component_type = tokens[1]
            rule_id = tokens[2]
            description = ''
            # component
            component = self._cd_mgr.get_component(component_title, component_type, description)
            # props
            component.props = self._delete_rule_props(component, rule_id)

    def _delete_rule_props(self, component: DefinedComponent, rule_id: str) -> List[Property]:
        """Delete rule props."""
        props = []
        rule_set = _RuleSetHelper.get_rule_set(component.props, rule_id)
        for prop in component.props:
            if prop.remarks != rule_set:
                props.append(prop)
            elif prop.name in self._csv_mgr.get_parameter_id_column_names():
                self._delete_rule_set_parameter(component, prop.value)
            elif prop.name == RULE_ID:
                self._delete_rule_implemented_requirement(component, prop.value)
        return props

    def _control_implementation_generator(
        self, control_implementations: List[ControlImplementation]
    ) -> Iterator[ControlImplementation]:
        """Control implementation generator."""
        if control_implementations:
            for control_implementation in control_implementations:
                yield control_implementation

    def _set_parameter_generator(self, set_parameters: List[SetParameter]) -> Iterator[SetParameter]:
        """Set parameter generator."""
        if set_parameters:
            for set_parameter in set_parameters:
                yield set_parameter

    def _implemented_requirement_generator(
        self, implemented_requirements: List[ImplementedRequirement]
    ) -> Iterator[ImplementedRequirement]:
        """Implemented-requirement generator."""
        if implemented_requirements:
            for implemented_requirement in implemented_requirements:
                yield implemented_requirement

    def _delete_rule_set_parameter(self, component: DefinedComponent, parameter_id: str) -> None:
        """Delete rule set-parameter."""
        control_implementations = component.control_implementations
        for control_implementation in self._control_implementation_generator(control_implementations):
            if control_implementation.set_parameters:
                set_parameters = control_implementation.set_parameters
                control_implementation.set_parameters = []
                for set_parameter in set_parameters:
                    if set_parameter.param_id != parameter_id:
                        _OscalHelper.add_set_parameter(control_implementation.set_parameters, set_parameter)
                if not len(control_implementation.set_parameters):
                    control_implementation.set_parameters = None

    def _delete_rule_implemented_requirement(self, component: DefinedComponent, rule_id: str) -> None:
        """Delete rule implemented_requirement."""
        control_implementations = component.control_implementations
        component.control_implementations = []
        for control_implementation in self._control_implementation_generator(control_implementations):
            if control_implementation.implemented_requirements:
                implemented_requirements = control_implementation.implemented_requirements
                control_implementation.implemented_requirements = []
                for implemented_requirement in implemented_requirements:
                    self._delete_ir_props(implemented_requirement, rule_id)
                    self._delete_ir_statements(implemented_requirement, rule_id)
                    if len(as_list(implemented_requirement.props)) or len(as_list(implemented_requirement.statements)):
                        control_implementation.implemented_requirements.append(implemented_requirement)
            if len(as_list(control_implementation.implemented_requirements)):
                component.control_implementations.append(control_implementation)

    def _delete_ir_statements(self, implemented_requirement: ImplementedRequirement, rule_id: str) -> None:
        """Delete implemented-requirement statements."""
        if implemented_requirement.statements:
            statements = implemented_requirement.statements
            implemented_requirement.statements = []
            for statement in statements:
                statement.props = self._delete_props(statement.props, rule_id)
                if not len(statement.props):
                    statement.props = None
                if statement.props:
                    implemented_requirement.statements.append(statement)
            if not len(implemented_requirement.statements):
                implemented_requirement.statements = None

    def _delete_ir_props(self, implemented_requirement: ImplementedRequirement, rule_id: str) -> None:
        """Delete implemented-requirement props."""
        if implemented_requirement.props:
            implemented_requirement.props = self._delete_props(implemented_requirement.props, rule_id)
            if not len(implemented_requirement.props):
                implemented_requirement.props = None

    def _delete_props(self, props: List[Property], rule_id: str) -> List[property]:
        """Delete props."""
        rval = []
        if props:
            for prop in props:
                if prop.name == RULE_ID and prop.value == rule_id:
                    continue
                rval.append(prop)
        return rval

    def rules_add(self, add_rules: List[str]) -> None:
        """Add rules."""
        for rule_key in add_rules:
            component_title = self._csv_mgr.get_value(rule_key, COMPONENT_TITLE)
            component_type = self._csv_mgr.get_value(rule_key, COMPONENT_TYPE)
            component_description = self._csv_mgr.get_value(rule_key, COMPONENT_DESCRIPTION)
            # component
            component = self._cd_mgr.get_component(component_title, component_type, component_description)
            # props
            component.props = as_list(component.props)
            component.props = component.props + self._create_rule_props(rule_key)
            # additional props, when not validation component
            if not self._is_validation(rule_key):
                # control implementation
                source = self._csv_mgr.get_value(rule_key, PROFILE_SOURCE)
                description = self._csv_mgr.get_value(rule_key, PROFILE_DESCRIPTION)
                control_implementation = self._get_control_implementation(component, source, description)
                # set-parameters
                set_parameters = self._create_set_parameters(rule_key)
                if set_parameters:
                    control_implementation.set_parameters = as_list(control_implementation.set_parameters)
                    _OscalHelper.add_set_parameters(control_implementation.set_parameters, set_parameters)
                # control-mappings
                control_mappings = self._csv_mgr.get_value(rule_key, CONTROL_ID_LIST).split()
                self._add_rule_prop(control_implementation, control_mappings, rule_key)

    def _is_validation(self, rule_key: tuple) -> bool:
        """Check for validation component."""
        component_type = self._csv_mgr.get_value(rule_key, COMPONENT_TYPE)
        return component_type.lower() == validation

    def _add_rule_prop(
        self, control_implementation: ControlImplementation, control_mappings: List[str], rule_key: tuple
    ) -> None:
        """Add rule prop."""
        namespace = self._get_namespace(rule_key)
        for control_mapping in control_mappings:
            control_id = derive_control_id(control_mapping)
            implemented_requirement = self._get_implemented_requirement(control_implementation, control_id)
            # create rule implementation (as property)
            name = RULE_ID
            prop = Property(
                name=name,
                value=self._csv_mgr.get_value(rule_key, name),
                ns=namespace,
                class_=self.get_class(name),
            )
            part_id = derive_part_id(control_mapping)
            if part_id is None:
                implemented_requirement.props = as_list(implemented_requirement.props)
                implemented_requirement.props.append(prop)
            else:
                statement = self._get_statement(implemented_requirement, part_id)
                statement.props.append(prop)

    def _create_rule_props(self, rule_key: tuple) -> List[Property]:
        """Create rule props."""
        rule_set = self._rule_set_id_mgr.get_next_rule_set_id()
        row_number = self._csv_mgr.get_row_number(rule_key)
        rule_set_mgr = _RuleSetMgr(row_number, rule_set)
        namespace = self._get_namespace(rule_key)
        if self._is_validation(rule_key):
            column_names = CsvColumn.get_check_property_column_names()
        else:
            column_names = CsvColumn.get_rule_property_column_names()
        # req'd & optional props
        for column_name in column_names:
            prop_name = self._get_prop_name(column_name)
            prop_value = self._csv_mgr.get_value(rule_key, column_name).strip()
            rule_set_mgr.add_prop(prop_name, prop_value, namespace, self.get_class(prop_name))
        if not self._is_validation(rule_key):
            # parameter columns
            column_names = self._csv_mgr.get_parameter_column_names()
            for column_name in column_names:
                prop_name = self._get_prop_name(column_name)
                prop_value = self._csv_mgr.get_value(rule_key, column_name).strip()
                rule_set_mgr.add_prop(prop_name, prop_value, namespace, self.get_class(prop_name))
        # user props
        column_names = self._csv_mgr.get_user_column_names()
        for column_name in column_names:
            if column_name.startswith('#'):
                continue
            prop_name = self._get_prop_name(column_name)
            prop_value = self._csv_mgr.get_value(rule_key, column_name).strip()
            rule_set_mgr.add_prop(prop_name, prop_value, namespace, self.get_class(prop_name))
        return rule_set_mgr.get_props()

    def _get_control_implementation(
        self, component: DefinedComponent, source: str, description: str
    ) -> ControlImplementation:
        """Find or create control implementation."""
        component.control_implementations = as_list(component.control_implementations)
        for control_implementation in component.control_implementations:
            if control_implementation.source == source and control_implementation.description == description:
                return control_implementation
        control_implementation = ControlImplementation(
            uuid=str(uuid.uuid4()), source=source, description=description, implemented_requirements=[]
        )
        component.control_implementations.append(control_implementation)
        return control_implementation

    def _str_to_list(self, value: str) -> List[str]:
        """Transform string to list."""
        rval = []
        if ',' in value:
            values = value.split(',')
            # remove leading/trailing whitespace
            for v in values:
                rval.append(v.strip())
        else:
            rval.append(value)
        return rval

    def _create_set_parameters(self, rule_key: tuple) -> List[SetParameter]:
        """Create set parameters."""
        set_parameters = []
        for parameter_id_column_name in self._csv_mgr.get_parameter_id_column_names():
            suffix = parameter_id_column_name.replace(PARAMETER_ID, '')
            parameter_value_default_column_name = f'{PARAMETER_VALUE_DEFAULT}{suffix}'
            name = self._csv_mgr.get_value(rule_key, parameter_id_column_name)
            value = self._csv_mgr.get_value(rule_key, parameter_value_default_column_name)
            if name and value:
                values = self._str_to_list(value)
                set_parameter = SetParameter(
                    param_id=name,
                    values=values,
                )
                set_parameters.append(set_parameter)
            elif name:
                row_number = self._csv_mgr.get_row_number(rule_key)
                text = f'row "{row_number}" missing value for "{parameter_value_default_column_name}"'
                logger.debug(text)
        return set_parameters

    def _get_implemented_requirement(
        self, control_implementation: ControlImplementation, control_id: str
    ) -> ImplementedRequirement:
        """Find or create implemented requirement."""
        if self._validate_controls != 'off':
            if not self._resolved_profile_catalog_helper.validate(control_id):
                if control_id not in self._unresolved_controls:
                    self._unresolved_controls.append(control_id)
        for implemented_requirement in control_implementation.implemented_requirements:
            if implemented_requirement.control_id == control_id:
                return implemented_requirement
        implemented_requirement = ImplementedRequirement(
            uuid=str(uuid.uuid4()),
            control_id=control_id,
            description='',
        )
        control_implementation.implemented_requirements.append(implemented_requirement)
        return implemented_requirement

    def _get_statement(self, implemented_requirement: ImplementedRequirement, part_id: str) -> Statement:
        """Find or create statement."""
        implemented_requirement.statements = as_list(implemented_requirement.statements)
        for statement in implemented_requirement.statements:
            if statement.statement_id == part_id:
                return statement
        statement = Statement(
            uuid=str(uuid.uuid4()),
            statement_id=part_id,
            description='',
            props=[],
        )
        implemented_requirement.statements.append(statement)
        return statement

    def rules_mod(self, mod_rules: List[str]) -> None:
        """Modify rules."""
        for rule_key in mod_rules:
            component_title = self._csv_mgr.get_value(rule_key, COMPONENT_TITLE)
            component_type = self._csv_mgr.get_value(rule_key, COMPONENT_TYPE)
            component_description = self._csv_mgr.get_value(rule_key, COMPONENT_DESCRIPTION)
            # component
            component = self._cd_mgr.get_component(component_title, component_type, component_description)
            # props
            component.props = self._modify_rule_props(component, rule_key)

    def _modify_rule_props(self, component: DefinedComponent, rule_key: tuple) -> List[Property]:
        """Modify rule props."""
        rule_id = self._csv_mgr.get_value(rule_key, RULE_ID)
        rule_set = _RuleSetHelper.get_rule_set(component.props, rule_id)
        rule_ns = self._csv_mgr.get_value(rule_key, NAMESPACE)
        column_names = CsvColumn.get_filtered_required_column_names() + CsvColumn.get_filtered_optional_column_names()
        # req'd & optional props
        for column_name in column_names:
            column_value = self._csv_mgr.get_value(rule_key, column_name).strip()
            class_ = self.get_class(column_name)
            self._cd_mgr.update_rule_definition(component, rule_set, column_name, column_value, rule_ns, class_)
        # parameter columns
        column_names = self._csv_mgr.get_parameter_column_names()
        for column_name in column_names:
            column_value = self._csv_mgr.get_value(rule_key, column_name).strip()
            class_ = self.get_class(column_name)
            self._cd_mgr.update_rule_definition(component, rule_set, column_name, column_value, rule_ns, class_)
        # user props
        column_names = self._csv_mgr.get_user_column_names()
        for column_name in column_names:
            column_value = self._csv_mgr.get_value(rule_key, column_name).strip()
            self._cd_mgr.update_rule_definition(component, rule_set, column_name, column_value, rule_ns, class_)
        return component.props

    def set_params_del(self, del_set_params: List[str]) -> None:
        """Set parameters delete."""
        for tokens in del_set_params:
            component_title = tokens[0]
            component_type = tokens[1]
            source = tokens[3]
            description = tokens[4]
            param_id = tokens[5]
            control_implementation = self._cd_mgr.find_control_implementation(
                component_title, component_type, source, description
            )
            if control_implementation:
                set_parameters = control_implementation.set_parameters
                control_implementation.set_parameters = []
                for set_parameter in self._set_parameter_generator(set_parameters):
                    if set_parameter.param_id == param_id:
                        continue
                    _OscalHelper.add_set_parameter(control_implementation.set_parameters, set_parameter)
                if control_implementation.set_parameters == []:
                    control_implementation.set_parameters = None

    def set_params_add(self, add_set_params: List[str]) -> None:
        """Set parameters add."""
        for tokens in add_set_params:
            component_title = tokens[0]
            component_type = tokens[1]
            rule_id = tokens[2]
            source = tokens[3]
            description = tokens[4]
            param_id = tokens[5]
            control_implementation = self._cd_mgr.find_control_implementation(
                component_title, component_type, source, description
            )
            control_implementation.set_parameters = as_list(control_implementation.set_parameters)
            # add
            rule_key = _CsvMgr.get_rule_key(component_title, component_type, rule_id)
            values = [self._csv_mgr.get_default_value_by_id(rule_key, param_id)]
            set_parameter = SetParameter(
                param_id=param_id,
                values=values,
            )
            _OscalHelper.add_set_parameter(control_implementation.set_parameters, set_parameter)

    def set_params_mod(self, mod_set_params: List[str]) -> None:
        """Set parameters modify."""
        for tokens in mod_set_params:
            component_title = tokens[0]
            component_type = tokens[1]
            rule_id = tokens[2]
            source = tokens[3]
            description = tokens[4]
            param_id = tokens[5]
            control_implementation = self._cd_mgr.find_control_implementation(
                component_title, component_type, source, description
            )
            if control_implementation:
                set_parameters = control_implementation.set_parameters
                for set_parameter in self._set_parameter_generator(set_parameters):
                    if set_parameter.param_id != param_id:
                        continue
                    rule_key = _CsvMgr.get_rule_key(component_title, component_type, rule_id)
                    values = [self._csv_mgr.get_default_value_by_id(rule_key, param_id)]
                    replacement = SetParameter(
                        param_id=param_id,
                        values=values,
                    )
                    if set_parameter.values == replacement.values:
                        continue
                    logger.debug(f'params-mod: {rule_id} {param_id} {set_parameter.values} -> {replacement.values}')
                    set_parameter.values = replacement.values

    def _control_mappings_generator(self, control_mappings: List[str]) -> Iterator[List[str]]:
        """Control mappings generator."""
        for tokens in control_mappings:
            component_title = tokens[0]
            component_type = tokens[1]
            source = tokens[3]
            description = tokens[4]
            control_implementation = self._cd_mgr.find_control_implementation(
                component_title, component_type, source, description
            )
            if control_implementation:
                yield tokens

    def control_mappings_del(self, del_control_mappings: List[str]) -> None:
        """Control mappings delete."""
        for tokens in self._control_mappings_generator(del_control_mappings):
            component_title = tokens[0]
            component_type = tokens[1]
            rule_id = tokens[2]
            source = tokens[3]
            description = tokens[4]
            smt_id = tokens[5]
            control_id = derive_control_id(smt_id)
            control_implementation = self._cd_mgr.find_control_implementation(
                component_title, component_type, source, description
            )
            implemented_requirements = control_implementation.implemented_requirements
            control_implementation.implemented_requirements = []
            for implemented_requirement in self._implemented_requirement_generator(implemented_requirements):
                if implemented_requirement.control_id == control_id:
                    implemented_requirement.statements = _OscalHelper.remove_rule_statement(
                        implemented_requirement.statements, rule_id, smt_id
                    )
                    implemented_requirement.props = _OscalHelper.remove_rule(implemented_requirement.props, rule_id)
                    if len(as_list(implemented_requirement.props)) or len(as_list(implemented_requirement.statements)):
                        control_implementation.implemented_requirements.append(implemented_requirement)
                else:
                    control_implementation.implemented_requirements.append(implemented_requirement)

    def control_mappings_add(self, add_control_mappings: List[str]) -> None:
        """Control mappings add."""
        for tokens in self._control_mappings_generator(add_control_mappings):
            component_title = tokens[0]
            component_type = tokens[1]
            rule_id = tokens[2]
            source = tokens[3]
            description = tokens[4]
            smt_id = tokens[5]
            control_id = derive_control_id(smt_id)
            control_implementation = self._cd_mgr.find_control_implementation(
                component_title, component_type, source, description
            )
            implemented_requirement = self._get_implemented_requirement(control_implementation, control_id)
            # namespace
            rule_key = (tokens[0], tokens[1], tokens[2])
            ns = self._get_namespace(rule_key)
            # create rule implementation (as property)
            name = RULE_ID
            prop = Property(
                name=name,
                value=rule_id,
                ns=ns,
                class_=self.get_class(name),
            )
            if smt_id == control_id:
                implemented_requirement.props = as_list(implemented_requirement.props)
                implemented_requirement.props.append(prop)
            else:
                statement = self._get_statement(implemented_requirement, smt_id)
                statement.props.append(prop)
name: str ¤
Methods¤
__init__(self, config_object) special ¤

Initialize trestle task csv-to-oscal-cd.

Parameters:

Name Type Description Default
config_object Optional[configparser.SectionProxy]

Config section associated with the task.

required
Source code in trestle/tasks/csv_to_oscal_cd.py
def __init__(self, config_object: Optional[configparser.SectionProxy]) -> None:
    """
    Initialize trestle task csv-to-oscal-cd.

    Args:
        config_object: Config section associated with the task.
    """
    super().__init__(config_object)
configure(self) ¤

Configure.

Source code in trestle/tasks/csv_to_oscal_cd.py
def configure(self) -> bool:
    """Configure."""
    self._timestamp = datetime.datetime.utcnow().replace(microsecond=0).replace(tzinfo=datetime.timezone.utc
                                                                                ).isoformat()
    # config verbosity
    self._quiet = self._config.get('quiet', False)
    self._verbose = not self._quiet
    # title
    self._title = self._config.get('title')
    if self._title is None:
        logger.warning('config missing "title"')
        return False
    # version
    self._version = self._config.get('version')
    if self._version is None:
        logger.warning('config missing "version"')
        return False
    # config csv
    self._csv_file = self._config.get('csv-file')
    if self._csv_file is None:
        logger.warning('config missing "csv-file"')
        return False
    self._csv_path = pathlib.Path(self._csv_file)
    if not self._csv_path.exists():
        logger.warning('"csv-file" not found')
        return False
    # announce csv
    if self._verbose:
        logger.info(f'input: {self._csv_file}')
    # config cd
    self._cd_path = None
    self._cd_file = self._config.get('component-definition')
    if self._cd_file is not None:
        self._cd_path = pathlib.Path(self._cd_file)
        if not self._cd_path.exists():
            logger.warning('"component-definition" not found')
            return False
    # workspace
    self._workspace = os.getcwd()
    # validate_controls
    self._validate_controls = self._config.get('validate-controls', 'off')
    return True
control_mappings_add(self, add_control_mappings) ¤

Control mappings add.

Source code in trestle/tasks/csv_to_oscal_cd.py
def control_mappings_add(self, add_control_mappings: List[str]) -> None:
    """Control mappings add."""
    for tokens in self._control_mappings_generator(add_control_mappings):
        component_title = tokens[0]
        component_type = tokens[1]
        rule_id = tokens[2]
        source = tokens[3]
        description = tokens[4]
        smt_id = tokens[5]
        control_id = derive_control_id(smt_id)
        control_implementation = self._cd_mgr.find_control_implementation(
            component_title, component_type, source, description
        )
        implemented_requirement = self._get_implemented_requirement(control_implementation, control_id)
        # namespace
        rule_key = (tokens[0], tokens[1], tokens[2])
        ns = self._get_namespace(rule_key)
        # create rule implementation (as property)
        name = RULE_ID
        prop = Property(
            name=name,
            value=rule_id,
            ns=ns,
            class_=self.get_class(name),
        )
        if smt_id == control_id:
            implemented_requirement.props = as_list(implemented_requirement.props)
            implemented_requirement.props.append(prop)
        else:
            statement = self._get_statement(implemented_requirement, smt_id)
            statement.props.append(prop)
control_mappings_del(self, del_control_mappings) ¤

Control mappings delete.

Source code in trestle/tasks/csv_to_oscal_cd.py
def control_mappings_del(self, del_control_mappings: List[str]) -> None:
    """Control mappings delete."""
    for tokens in self._control_mappings_generator(del_control_mappings):
        component_title = tokens[0]
        component_type = tokens[1]
        rule_id = tokens[2]
        source = tokens[3]
        description = tokens[4]
        smt_id = tokens[5]
        control_id = derive_control_id(smt_id)
        control_implementation = self._cd_mgr.find_control_implementation(
            component_title, component_type, source, description
        )
        implemented_requirements = control_implementation.implemented_requirements
        control_implementation.implemented_requirements = []
        for implemented_requirement in self._implemented_requirement_generator(implemented_requirements):
            if implemented_requirement.control_id == control_id:
                implemented_requirement.statements = _OscalHelper.remove_rule_statement(
                    implemented_requirement.statements, rule_id, smt_id
                )
                implemented_requirement.props = _OscalHelper.remove_rule(implemented_requirement.props, rule_id)
                if len(as_list(implemented_requirement.props)) or len(as_list(implemented_requirement.statements)):
                    control_implementation.implemented_requirements.append(implemented_requirement)
            else:
                control_implementation.implemented_requirements.append(implemented_requirement)
execute(self) ¤

Provide an executed outcome.

Source code in trestle/tasks/csv_to_oscal_cd.py
def execute(self) -> TaskOutcome:
    """Provide an executed outcome."""
    try:
        return self._execute()
    except Exception:
        logger.error(traceback.format_exc())
        return TaskOutcome('failure')
get_class(self, name) ¤

Get class value for specified name from config.

Source code in trestle/tasks/csv_to_oscal_cd.py
def get_class(self, name: str) -> str:
    """Get class value for specified name from config."""
    key = f'class.{name}'
    return self._config.get(key)
print_info(self) ¤

Print the help string.

Source code in trestle/tasks/csv_to_oscal_cd.py
def print_info(self) -> None:
    """Print the help string."""
    name = self.name
    oscal_name = 'component_definition'
    #
    logger.info(f'Help information for {name} task.')
    logger.info('')
    logger.info(f'Purpose: From csv produce OSCAL {oscal_name} file.')
    logger.info('')
    logger.info('')
    logger.info(f'Configuration flags sit under [task.{name}]:')
    text1 = '  title                = '
    text2 = '(required) the component definition title.'
    logger.info(text1 + text2)
    text1 = '  version              = '
    text2 = '(required) the component definition version.'
    logger.info(text1 + text2)
    text1 = '  csv-file             = '
    text2 = '(required) the path of the csv file.'
    text3 = ' [1st row are column headings; 2nd row are column descriptions; 3rd row and beyond is data]'
    logger.info(text1 + text2 + text3)
    text1 = '  required columns:      '
    for text2 in CsvColumn.get_required_column_names():
        if text2 in [f'{RULE_DESCRIPTION}', f'{PROFILE_SOURCE}', f'{PROFILE_DESCRIPTION}', f'{CONTROL_ID_LIST}']:
            text2 += ' (see note 1)'
        logger.info(text1 + '$$' + text2)
        text1 = '                         '
    text1 = '  optional columns:      '
    for text2 in CsvColumn.get_optional_column_names():
        if text2 in [f'{ORIGINAL_RISK_RATING}', f'{ADJUSTED_RISK_RATING}', f'{RISK_ADJUSTMENT}']:
            text2 += ' (see note 1)'
        else:
            text2 += ' (see note 2)'
        logger.info(text1 + '$' + text2)
        text1 = '                         '
    for text2 in CsvColumn.get_parameter_column_names():
        text2 += ' (see notes 1, 4)'
        logger.info(text1 + '$' + text2)
        text1 = '                         '
    text1 = '  comment columns:       '
    text2 = 'Informational (see note 3)'
    logger.info(text1 + '#' + text2)
    text1 = '  output-dir           = '
    text2 = '(required) the path of the output directory for synthesized OSCAL .json files.'
    logger.info(text1 + text2)
    text1 = '  component-definition = '
    text2 = '(optional) the path of the existing component-definition OSCAL .json file.'
    logger.info(text1 + text2)
    text1 = '  class.column-name    = '
    text2 = f'(optional) the class to associate with the specified column name, e.g. class.{RULE_ID} = scc_class'
    logger.info(text1 + text2)
    text1 = '  output-overwrite     = '
    text2 = '(optional) true [default] or false; replace existing output when true.'
    logger.info(text1 + text2)
    text1 = '  validate-controls    = '
    text2 = '(optional) on, warn, or off [default]; validate controls exist in resolved profile.'
    logger.info(text1 + text2)
    # Notes
    text1 = ''
    text2 = ''
    logger.info(text1 + text2)
    text1 = 'Notes: '
    text2 = '[1] column is ignored for validation component type'
    logger.info(text1 + text2)
    text1 = '       '
    text2 = '[2] column is required for validation component type'
    logger.info(text1 + text2)
    text1 = '       '
    text2 = '[3] column name starting with # causes column to be ignored'
    logger.info(text1 + text2)
    text1 = '       '
    text2 = '[4] additional parameters are specified by adding a common suffix per set'
    text3 = f', for example: {PARAMETER_ID}_1, {PARAMETER_DESCRIPTION}_1, ...{PARAMETER_ID}_2...'
    logger.info(text1 + text2 + text3)
rules_add(self, add_rules) ¤

Add rules.

Source code in trestle/tasks/csv_to_oscal_cd.py
def rules_add(self, add_rules: List[str]) -> None:
    """Add rules."""
    for rule_key in add_rules:
        component_title = self._csv_mgr.get_value(rule_key, COMPONENT_TITLE)
        component_type = self._csv_mgr.get_value(rule_key, COMPONENT_TYPE)
        component_description = self._csv_mgr.get_value(rule_key, COMPONENT_DESCRIPTION)
        # component
        component = self._cd_mgr.get_component(component_title, component_type, component_description)
        # props
        component.props = as_list(component.props)
        component.props = component.props + self._create_rule_props(rule_key)
        # additional props, when not validation component
        if not self._is_validation(rule_key):
            # control implementation
            source = self._csv_mgr.get_value(rule_key, PROFILE_SOURCE)
            description = self._csv_mgr.get_value(rule_key, PROFILE_DESCRIPTION)
            control_implementation = self._get_control_implementation(component, source, description)
            # set-parameters
            set_parameters = self._create_set_parameters(rule_key)
            if set_parameters:
                control_implementation.set_parameters = as_list(control_implementation.set_parameters)
                _OscalHelper.add_set_parameters(control_implementation.set_parameters, set_parameters)
            # control-mappings
            control_mappings = self._csv_mgr.get_value(rule_key, CONTROL_ID_LIST).split()
            self._add_rule_prop(control_implementation, control_mappings, rule_key)
rules_del(self, del_rules) ¤

Delete rules.

Source code in trestle/tasks/csv_to_oscal_cd.py
def rules_del(self, del_rules: List[str]) -> None:
    """Delete rules."""
    for tokens in del_rules:
        component_title = tokens[0]
        component_type = tokens[1]
        rule_id = tokens[2]
        description = ''
        # component
        component = self._cd_mgr.get_component(component_title, component_type, description)
        # props
        component.props = self._delete_rule_props(component, rule_id)
rules_mod(self, mod_rules) ¤

Modify rules.

Source code in trestle/tasks/csv_to_oscal_cd.py
def rules_mod(self, mod_rules: List[str]) -> None:
    """Modify rules."""
    for rule_key in mod_rules:
        component_title = self._csv_mgr.get_value(rule_key, COMPONENT_TITLE)
        component_type = self._csv_mgr.get_value(rule_key, COMPONENT_TYPE)
        component_description = self._csv_mgr.get_value(rule_key, COMPONENT_DESCRIPTION)
        # component
        component = self._cd_mgr.get_component(component_title, component_type, component_description)
        # props
        component.props = self._modify_rule_props(component, rule_key)
set_params_add(self, add_set_params) ¤

Set parameters add.

Source code in trestle/tasks/csv_to_oscal_cd.py
def set_params_add(self, add_set_params: List[str]) -> None:
    """Set parameters add."""
    for tokens in add_set_params:
        component_title = tokens[0]
        component_type = tokens[1]
        rule_id = tokens[2]
        source = tokens[3]
        description = tokens[4]
        param_id = tokens[5]
        control_implementation = self._cd_mgr.find_control_implementation(
            component_title, component_type, source, description
        )
        control_implementation.set_parameters = as_list(control_implementation.set_parameters)
        # add
        rule_key = _CsvMgr.get_rule_key(component_title, component_type, rule_id)
        values = [self._csv_mgr.get_default_value_by_id(rule_key, param_id)]
        set_parameter = SetParameter(
            param_id=param_id,
            values=values,
        )
        _OscalHelper.add_set_parameter(control_implementation.set_parameters, set_parameter)
set_params_del(self, del_set_params) ¤

Set parameters delete.

Source code in trestle/tasks/csv_to_oscal_cd.py
def set_params_del(self, del_set_params: List[str]) -> None:
    """Set parameters delete."""
    for tokens in del_set_params:
        component_title = tokens[0]
        component_type = tokens[1]
        source = tokens[3]
        description = tokens[4]
        param_id = tokens[5]
        control_implementation = self._cd_mgr.find_control_implementation(
            component_title, component_type, source, description
        )
        if control_implementation:
            set_parameters = control_implementation.set_parameters
            control_implementation.set_parameters = []
            for set_parameter in self._set_parameter_generator(set_parameters):
                if set_parameter.param_id == param_id:
                    continue
                _OscalHelper.add_set_parameter(control_implementation.set_parameters, set_parameter)
            if control_implementation.set_parameters == []:
                control_implementation.set_parameters = None
set_params_mod(self, mod_set_params) ¤

Set parameters modify.

Source code in trestle/tasks/csv_to_oscal_cd.py
def set_params_mod(self, mod_set_params: List[str]) -> None:
    """Set parameters modify."""
    for tokens in mod_set_params:
        component_title = tokens[0]
        component_type = tokens[1]
        rule_id = tokens[2]
        source = tokens[3]
        description = tokens[4]
        param_id = tokens[5]
        control_implementation = self._cd_mgr.find_control_implementation(
            component_title, component_type, source, description
        )
        if control_implementation:
            set_parameters = control_implementation.set_parameters
            for set_parameter in self._set_parameter_generator(set_parameters):
                if set_parameter.param_id != param_id:
                    continue
                rule_key = _CsvMgr.get_rule_key(component_title, component_type, rule_id)
                values = [self._csv_mgr.get_default_value_by_id(rule_key, param_id)]
                replacement = SetParameter(
                    param_id=param_id,
                    values=values,
                )
                if set_parameter.values == replacement.values:
                    continue
                logger.debug(f'params-mod: {rule_id} {param_id} {set_parameter.values} -> {replacement.values}')
                set_parameter.values = replacement.values
simulate(self) ¤

Provide a simulated outcome.

Source code in trestle/tasks/csv_to_oscal_cd.py
def simulate(self) -> TaskOutcome:
    """Provide a simulated outcome."""
    return TaskOutcome('simulated-success')

Functions¤

derive_control_id(control_mapping) ¤

Derive control id.

Source code in trestle/tasks/csv_to_oscal_cd.py
def derive_control_id(control_mapping: str) -> str:
    """Derive control id."""
    rval = control_mapping.split('_smt')[0]
    return rval

derive_part_id(control_mapping) ¤

Derive part id.

Source code in trestle/tasks/csv_to_oscal_cd.py
def derive_part_id(control_mapping: str) -> str:
    """Derive part id."""
    if '_smt.' in control_mapping:
        rval = control_mapping
    else:
        rval = None
    return rval

etype(target) ¤

Get etype.

Source code in trestle/tasks/csv_to_oscal_cd.py
def etype(target: str) -> str:
    """Get etype."""
    if target:
        return 'invalid'
    else:
        return 'missing'

row_property_builder(row, name, value, ns, class_, remarks) ¤

Row property builder.

Source code in trestle/tasks/csv_to_oscal_cd.py
def row_property_builder(row: int, name: str, value, ns: str, class_: str, remarks: str) -> Property:
    """Row property builder."""
    # name
    try:
        Property(
            name=name,
            value='value',
        )
    except Exception:
        text = f'property for row: {row} name: {name} is {etype(name)}'
        raise RuntimeError(text)
    # value
    try:
        Property(
            name=name,
            value=value,
        )
    except Exception:
        text = f'property for row: {row} value: {value} is {etype(value)}'
        raise RuntimeError(text)
    # ns
    try:
        Property(
            name=name,
            value=value,
            ns=ns,
        )
    except Exception:
        text = f'property for row: {row} ns: {ns} is {etype(ns)}'
        raise RuntimeError(text)
    # class
    try:
        Property(
            name=name,
            value=value,
            class_=class_,
        )
    except Exception:
        text = f'property for row: {row} class: {class_} is {etype(class_)}'
        raise RuntimeError(text)
    # prop
    prop = Property(
        name=name,
        value=value,
        ns=ns,
        class_=class_,
        remarks=remarks,
    )
    return prop

handler: python