Skip to content

cis_xlsx_to_oscal_catalog

trestle.tasks.cis_xlsx_to_oscal_catalog ¤

OSCAL transformation tasks.

logger ¤

timestamp ¤

Classes¤

CatalogHelper ¤

OSCAL Catalog Helper.

Source code in trestle/tasks/cis_xlsx_to_oscal_catalog.py
class CatalogHelper:
    """OSCAL Catalog Helper."""

    def __init__(self, title: str, version: str) -> None:
        """Initialize."""
        # metadata
        self._metadata = Metadata(title=title, last_modified=timestamp, oscal_version=OSCAL_VERSION, version=version)
        self._root_group = OrderedDict()
        self._root_resources = OrderedDict()
        self._all_groups = OrderedDict()
        self._all_controls = OrderedDict()

    def add_group(self, section: str, title: str, props: List[Property], parts: List[Part]) -> None:
        """Add group."""
        numdots = section.count('.')
        if numdots == 0:
            group = Group(title=f'{title}', id=f'CIS-{section}')
            if props:
                group.props = props
            if parts:
                group.parts = parts
            self._root_group[section] = group
            self._all_groups[section] = group
        else:
            key = '.'.join(section.split('.')[:-1])
            parent = self._all_groups[key]
            if parent.groups is None:
                parent.groups = []
            group = Group(title=f'{title}', id=f'CIS-{section}')
            if props:
                group.props = props
            if parts:
                group.parts = parts
            parent.groups.append(group)
            self._all_groups[section] = group

    def _add_prop(self, control: Control, prop: Property) -> None:
        """Add property to control."""
        control_props = control.props
        control.props = []
        last = 0
        for i, control_prop in enumerate(control_props):
            if control_prop.name == prop.name:
                last = i
        for i, control_prop in enumerate(control_props):
            control.props.append(control_prop)
            if i == last:
                control.props.append(prop)

    def add_control(
        self,
        section: str,
        recommendation: str,
        title: str,
        props: List[Property],
        parts: List[Part],
        links: List[Link]
    ) -> None:
        """Add control."""
        group = self._all_groups[section]
        if group.controls is None:
            group.controls = []
        id_ = f'CIS-{recommendation}'
        if id_ in self._all_controls:
            control = self._all_controls[id_]
            for prop in props:
                if prop.name == 'profile':
                    self._add_prop(control, prop)
        else:
            title = f'{title}'
            control = Control(id=id_, title=title)
            self._all_controls[id_] = control
            if props:
                control.props = props
            if parts:
                control.parts = parts
            if links:
                control.links = links
            group.controls.append(control)

    def add_link(
        self,
        recommendation: str,
        reference: str,
        links: List[Link],
    ) -> None:
        """Add link."""
        id_ = f'CIS-{recommendation}'
        if id_ not in self._root_resources:
            res_id = str(uuid.uuid4())
            link = Link(href=f'#{res_id}', rel='reference')
            links.append(link)
            resource = Resource(
                uuid=res_id,
                description=reference,
            )
            self._root_resources[id_] = resource

    def get_catalog(self) -> Catalog:
        """Get catalog."""
        back_matter = BackMatter(resources=list(self._root_resources.values()))
        catalog = Catalog(
            uuid=str(uuid.uuid4()),
            metadata=self._metadata,
            groups=list(self._root_group.values()),
            back_matter=back_matter
        )
        return catalog
Methods¤
__init__(self, title, version) special ¤

Initialize.

Source code in trestle/tasks/cis_xlsx_to_oscal_catalog.py
def __init__(self, title: str, version: str) -> None:
    """Initialize."""
    # metadata
    self._metadata = Metadata(title=title, last_modified=timestamp, oscal_version=OSCAL_VERSION, version=version)
    self._root_group = OrderedDict()
    self._root_resources = OrderedDict()
    self._all_groups = OrderedDict()
    self._all_controls = OrderedDict()
add_control(self, section, recommendation, title, props, parts, links) ¤

Add control.

Source code in trestle/tasks/cis_xlsx_to_oscal_catalog.py
def add_control(
    self,
    section: str,
    recommendation: str,
    title: str,
    props: List[Property],
    parts: List[Part],
    links: List[Link]
) -> None:
    """Add control."""
    group = self._all_groups[section]
    if group.controls is None:
        group.controls = []
    id_ = f'CIS-{recommendation}'
    if id_ in self._all_controls:
        control = self._all_controls[id_]
        for prop in props:
            if prop.name == 'profile':
                self._add_prop(control, prop)
    else:
        title = f'{title}'
        control = Control(id=id_, title=title)
        self._all_controls[id_] = control
        if props:
            control.props = props
        if parts:
            control.parts = parts
        if links:
            control.links = links
        group.controls.append(control)
add_group(self, section, title, props, parts) ¤

Add group.

Source code in trestle/tasks/cis_xlsx_to_oscal_catalog.py
def add_group(self, section: str, title: str, props: List[Property], parts: List[Part]) -> None:
    """Add group."""
    numdots = section.count('.')
    if numdots == 0:
        group = Group(title=f'{title}', id=f'CIS-{section}')
        if props:
            group.props = props
        if parts:
            group.parts = parts
        self._root_group[section] = group
        self._all_groups[section] = group
    else:
        key = '.'.join(section.split('.')[:-1])
        parent = self._all_groups[key]
        if parent.groups is None:
            parent.groups = []
        group = Group(title=f'{title}', id=f'CIS-{section}')
        if props:
            group.props = props
        if parts:
            group.parts = parts
        parent.groups.append(group)
        self._all_groups[section] = group

Add link.

Source code in trestle/tasks/cis_xlsx_to_oscal_catalog.py
def add_link(
    self,
    recommendation: str,
    reference: str,
    links: List[Link],
) -> None:
    """Add link."""
    id_ = f'CIS-{recommendation}'
    if id_ not in self._root_resources:
        res_id = str(uuid.uuid4())
        link = Link(href=f'#{res_id}', rel='reference')
        links.append(link)
        resource = Resource(
            uuid=res_id,
            description=reference,
        )
        self._root_resources[id_] = resource
get_catalog(self) ¤

Get catalog.

Source code in trestle/tasks/cis_xlsx_to_oscal_catalog.py
def get_catalog(self) -> Catalog:
    """Get catalog."""
    back_matter = BackMatter(resources=list(self._root_resources.values()))
    catalog = Catalog(
        uuid=str(uuid.uuid4()),
        metadata=self._metadata,
        groups=list(self._root_group.values()),
        back_matter=back_matter
    )
    return catalog

CisXlsxToOscalCatalog (TaskBase) ¤

Task to transform CIS .xlsx to OSCAL catalog.

Attributes:

Name Type Description
name str

Name of the task.

Source code in trestle/tasks/cis_xlsx_to_oscal_catalog.py
class CisXlsxToOscalCatalog(TaskBase):
    """
    Task to transform CIS .xlsx to OSCAL catalog.

    Attributes:
        name: Name of the task.
    """

    name = 'cis-xlsx-to-oscal-catalog'
    ns = 'https://oscal-compass.github.io/compliance-trestle/schemas/oscal/catalog/cis'

    def __init__(self, config_object: Optional[configparser.SectionProxy]) -> None:
        """
        Initialize trestle task.

        Args:
            config_object: Config section associated with the task.
        """
        super().__init__(config_object)

    def print_info(self) -> None:
        """Print the help string."""
        logger.info(f'Help information for {self.name} task.')
        logger.info('')
        logger.info('Purpose: Create catalog from standard (e.g. CIS benchmark).')
        logger.info('')
        logger.info('Configuration flags sit under [task.cis-xlsx-to-oscal-catalog]:')
        text1 = '  input-file             = '
        text2 = '(required) path to read the compliance-as-code .xlsx spread sheet file.'
        logger.info(text1 + text2)
        text1 = '  output-dir             = '
        text2 = '(required) location to write the generated catalog.json file.'
        logger.info(text1 + text2)
        text1 = '  title                  = '
        text2 = '(required) title of the CIS catalog.'
        logger.info(text1 + text2)
        text1 = '  version                = '
        text2 = '(required) version of the CIS catalog.'
        logger.info(text1 + text2)
        text1 = '  output-overwrite       = '
        text2 = '(optional) true [default] or false; replace existing output when true.'
        logger.info(text1 + text2)

    def simulate(self) -> TaskOutcome:
        """Provide a simulated outcome."""
        return TaskOutcome('simulated-success')

    def execute(self) -> TaskOutcome:
        """Provide an actual outcome."""
        try:
            return self._execute()
        except Exception:
            logger.info(traceback.format_exc())
            return TaskOutcome('failure')

    def _get_normalized_name(self, key: str) -> str:
        """Get normalized name."""
        name = key
        name = name.replace(' ', '_')
        name = name.replace('&', 'a')
        name = name.replace('(', '')
        name = name.replace(')', '')
        return name

    def _create_property(self, name: str, value: str) -> Property:
        return Property(name=name, value=value, ns=CisXlsxToOscalCatalog.ns)

    def _add_property(self, xlsx_helper: XlsxHelper, props: List[Property], row: int, key: str) -> None:
        """Add property."""
        name = self._get_normalized_name(key)
        value = xlsx_helper.get(row, key)
        if value:
            props.append(self._create_property(name, value))

    def _add_property_boolean(self, xlsx_helper: XlsxHelper, props: List[Property], row: int, key: str) -> None:
        """Add property."""
        name = self._get_normalized_name(key)
        value = xlsx_helper.get(row, key)
        if value:
            props.append(self._create_property(name, 'True'))
        else:
            props.append(self._create_property(name, 'False'))

    def _add_part(self, xlsx_helper: XlsxHelper, parts: List[Part], id_: str, row: int, key: str) -> None:
        """Add part."""
        value = xlsx_helper.get(row, key)
        if value:
            name = self._get_normalized_name(key)
            parts.append(Part(id=id_, name=name, prose=value))

    def _add_links(
        self, xlsx_helper: XlsxHelper, catalog_helper: CatalogHelper, links: List[Link], row: int, key: str
    ) -> None:
        """Add links."""
        value = xlsx_helper.get(row, key)
        if value:
            recommendation = xlsx_helper.get(row, 'recommendation #')
            catalog_helper.add_link(recommendation, value, links)

    def _execute(self) -> TaskOutcome:
        """Wrap the execute for exception handling."""
        if not self._config:
            logger.warning('config missing')
            return TaskOutcome('failure')
        try:
            ifile = self._config['input-file']
            odir = self._config['output-dir']
            title = self._config['title']
            version = self._config['version']
        except KeyError as e:
            logger.info(f'key {e.args[0]} missing')
            return TaskOutcome('failure')
        # verbosity
        _quiet = self._config.get('quiet', False)
        _verbose = not _quiet
        # output
        overwrite = self._config.getboolean('output-overwrite', True)
        opth = pathlib.Path(odir)
        # insure output dir exists
        opth.mkdir(exist_ok=True, parents=True)
        # calculate output file name & check writability
        oname = 'catalog.json'
        ofile = opth / oname
        if not overwrite and pathlib.Path(ofile).exists():
            logger.warning(f'output: {ofile} already exists')
            return TaskOutcome('failure')
        xlsx_helper = XlsxHelper(ifile)
        catalog_helper = CatalogHelper(title, version)
        if xlsx_helper.is_ocp():
            self._process_ocp(xlsx_helper, catalog_helper)
        else:
            self._process_rhel(xlsx_helper, catalog_helper)
        catalog = catalog_helper.get_catalog()
        # write OSCAL ComponentDefinition to file
        if _verbose:
            logger.info(f'output: {ofile}')
        catalog.oscal_write(pathlib.Path(ofile))
        return TaskOutcome('success')

    def _process_ocp(self, xlsx_helper: XlsxHelper, catalog_helper: CatalogHelper) -> None:
        """Process OCP."""
        # transform each row into OSCAL equivalent
        for row in xlsx_helper.row_generator():
            section = xlsx_helper.get(row, 'section #')
            recommendation = xlsx_helper.get(row, 'recommendation #')
            title = xlsx_helper.get(row, 'title')
            # init
            props = []
            parts = []
            links = []
            # props
            self._add_property(xlsx_helper, props, row, 'profile')
            self._add_property(xlsx_helper, props, row, 'status')
            self._add_property(xlsx_helper, props, row, 'assessment status')
            if recommendation is None:
                frag = section
            else:
                frag = recommendation
            # parts
            self._add_part(xlsx_helper, parts, f'CIS-{frag}_smt', row, 'statement')
            self._add_part(xlsx_helper, parts, f'CIS-{frag}_rat', row, 'rationale statement')
            self._add_part(xlsx_helper, parts, f'CIS-{frag}_imp', row, 'impact statement')
            self._add_part(xlsx_helper, parts, f'CIS-{frag}_rem', row, 'remediation procedure')
            self._add_part(xlsx_helper, parts, f'CIS-{frag}_aud', row, 'audit procedure')
            self._add_part(xlsx_helper, parts, f'CIS-{frag}_inf', row, 'additional information')
            self._add_part(xlsx_helper, parts, f'CIS-{frag}_ctl', row, 'CIS Controls')
            # group or control
            if recommendation is None:
                catalog_helper.add_group(section, title, props, parts)
            else:
                self._add_property_boolean(xlsx_helper, props, row, 'v7 IG1')
                self._add_property_boolean(xlsx_helper, props, row, 'v7 IG2')
                self._add_property_boolean(xlsx_helper, props, row, 'v7 IG3')
                self._add_property_boolean(xlsx_helper, props, row, 'v8 IG1')
                self._add_property_boolean(xlsx_helper, props, row, 'v8 IG2')
                self._add_property_boolean(xlsx_helper, props, row, 'v8 IG3')
                self._add_property(xlsx_helper, props, row, 'MITRE ATT&CK Mappings')
                self._add_links(xlsx_helper, catalog_helper, links, row, 'references')
                catalog_helper.add_control(section, recommendation, title, props, parts, links)

    def _process_rhel(self, xlsx_helper: XlsxHelper, catalog_helper: CatalogHelper) -> None:
        """Process RHEL."""
        # transform each row into OSCAL equivalent
        for row in xlsx_helper.row_generator():
            section = xlsx_helper.get(row, 'section #')
            recommendation = xlsx_helper.get(row, 'recommendation #')
            title = xlsx_helper.get(row, 'title')
            # init
            props = []
            parts = []
            links = []
            # props
            self._add_property(xlsx_helper, props, row, 'profile')
            self._add_property(xlsx_helper, props, row, 'assessment status')
            if recommendation is None:
                frag = section
            else:
                frag = recommendation
            # parts
            self._add_part(xlsx_helper, parts, f'CIS-{frag}_smt', row, 'statement')
            self._add_part(xlsx_helper, parts, f'CIS-{frag}_rat', row, 'rational statement')
            self._add_part(xlsx_helper, parts, f'CIS-{frag}_imp', row, 'impact statement')
            self._add_part(xlsx_helper, parts, f'CIS-{frag}_rem', row, 'remediation procedure')
            self._add_part(xlsx_helper, parts, f'CIS-{frag}_aud', row, 'audit procedure')
            self._add_part(xlsx_helper, parts, f'CIS-{frag}_inf', row, 'additional information')
            self._add_part(xlsx_helper, parts, f'CIS-{frag}_ctl', row, 'CIS Controls')
            self._add_part(xlsx_helper, parts, f'CIS-{frag}_1v8', row, 'CIS Safeguards 1 (v8)')
            self._add_part(xlsx_helper, parts, f'CIS-{frag}_2v8', row, 'CIS Safeguards 2 (v8)')
            self._add_part(xlsx_helper, parts, f'CIS-{frag}_3v8', row, 'CIS Safeguards 3 (v8)')
            self._add_part(xlsx_helper, parts, f'CIS-{frag}_1v7', row, 'CIS Safeguards 1 (v7)')
            self._add_part(xlsx_helper, parts, f'CIS-{frag}_2v7', row, 'CIS Safeguards 2 (v7)')
            self._add_part(xlsx_helper, parts, f'CIS-{frag}_3v7', row, 'CIS Safeguards 3 (v7)')
            # group or control
            if recommendation is None:
                catalog_helper.add_group(section, title, props, parts)
            else:
                self._add_property_boolean(xlsx_helper, props, row, 'v7 IG1')
                self._add_property_boolean(xlsx_helper, props, row, 'v7 IG2')
                self._add_property_boolean(xlsx_helper, props, row, 'v7 IG3')
                self._add_property_boolean(xlsx_helper, props, row, 'v8 IG1')
                self._add_property_boolean(xlsx_helper, props, row, 'v8 IG2')
                self._add_property_boolean(xlsx_helper, props, row, 'v8 IG3')
                self._add_links(xlsx_helper, catalog_helper, links, row, 'references')
                catalog_helper.add_control(section, recommendation, title, props, parts, links)
name: str ¤
ns ¤
Methods¤
__init__(self, config_object) special ¤

Initialize trestle task.

Parameters:

Name Type Description Default
config_object Optional[configparser.SectionProxy]

Config section associated with the task.

required
Source code in trestle/tasks/cis_xlsx_to_oscal_catalog.py
def __init__(self, config_object: Optional[configparser.SectionProxy]) -> None:
    """
    Initialize trestle task.

    Args:
        config_object: Config section associated with the task.
    """
    super().__init__(config_object)
execute(self) ¤

Provide an actual outcome.

Source code in trestle/tasks/cis_xlsx_to_oscal_catalog.py
def execute(self) -> TaskOutcome:
    """Provide an actual outcome."""
    try:
        return self._execute()
    except Exception:
        logger.info(traceback.format_exc())
        return TaskOutcome('failure')
print_info(self) ¤

Print the help string.

Source code in trestle/tasks/cis_xlsx_to_oscal_catalog.py
def print_info(self) -> None:
    """Print the help string."""
    logger.info(f'Help information for {self.name} task.')
    logger.info('')
    logger.info('Purpose: Create catalog from standard (e.g. CIS benchmark).')
    logger.info('')
    logger.info('Configuration flags sit under [task.cis-xlsx-to-oscal-catalog]:')
    text1 = '  input-file             = '
    text2 = '(required) path to read the compliance-as-code .xlsx spread sheet file.'
    logger.info(text1 + text2)
    text1 = '  output-dir             = '
    text2 = '(required) location to write the generated catalog.json file.'
    logger.info(text1 + text2)
    text1 = '  title                  = '
    text2 = '(required) title of the CIS catalog.'
    logger.info(text1 + text2)
    text1 = '  version                = '
    text2 = '(required) version of the CIS catalog.'
    logger.info(text1 + text2)
    text1 = '  output-overwrite       = '
    text2 = '(optional) true [default] or false; replace existing output when true.'
    logger.info(text1 + text2)
simulate(self) ¤

Provide a simulated outcome.

Source code in trestle/tasks/cis_xlsx_to_oscal_catalog.py
def simulate(self) -> TaskOutcome:
    """Provide a simulated outcome."""
    return TaskOutcome('simulated-success')

XlsxHelper ¤

Xlsx Helper common functions and assistance navigating spread sheet.

Source code in trestle/tasks/cis_xlsx_to_oscal_catalog.py
class XlsxHelper:
    """Xlsx Helper common functions and assistance navigating spread sheet."""

    def __init__(self, file: str) -> None:
        """Initialize."""
        self._spread_sheet = file
        self._wb = load_workbook(self._spread_sheet)
        sheet_candidates = ['Combined Profiles', 'Combined']
        self._sheet_name = None
        for sheet_candidate in sheet_candidates:
            if sheet_candidate in self._wb.sheetnames:
                self._sheet_name = sheet_candidate
                break
        if not self._sheet_name:
            raise RuntimeError(f'{file} missing one of {sheet_candidates} sheet')
        self._work_sheet = self._wb[self._sheet_name]
        self._mapper()
        self._key_to_col_map = {'statement': 'description'}

    def is_ocp(self) -> bool:
        """Check if sheet is for OCP."""
        return self._sheet_name == 'Combined Profiles'

    def _normalize(self, name: str) -> str:
        """Normalize."""
        return name.lower()

    def _translate(self, name: str) -> str:
        """Translate name key to column name."""
        return self._key_to_col_map.get(name, name)

    def _mapper(self) -> None:
        """Map columns heading names to column numbers."""
        self._col_name_to_number = {}
        cols = self._work_sheet.max_column
        row = 1
        for col in range(row, cols):
            cell = self._work_sheet.cell(row, col)
            if cell.value:
                name = self._normalize(cell.value)
                self._col_name_to_number[name] = col

    def row_generator(self) -> Iterator[int]:
        """Generate rows until max reached."""
        row = 2
        while row <= self._work_sheet.max_row:
            yield row
            row += 1

    def get(self, row: int, name: str) -> str:
        """Get cell value for given row and column name."""
        nname = self._normalize(name)
        cname = self._translate(nname)
        col = self._col_name_to_number[cname]
        cell = self._work_sheet.cell(row, col)
        return cell.value
Methods¤
__init__(self, file) special ¤

Initialize.

Source code in trestle/tasks/cis_xlsx_to_oscal_catalog.py
def __init__(self, file: str) -> None:
    """Initialize."""
    self._spread_sheet = file
    self._wb = load_workbook(self._spread_sheet)
    sheet_candidates = ['Combined Profiles', 'Combined']
    self._sheet_name = None
    for sheet_candidate in sheet_candidates:
        if sheet_candidate in self._wb.sheetnames:
            self._sheet_name = sheet_candidate
            break
    if not self._sheet_name:
        raise RuntimeError(f'{file} missing one of {sheet_candidates} sheet')
    self._work_sheet = self._wb[self._sheet_name]
    self._mapper()
    self._key_to_col_map = {'statement': 'description'}
get(self, row, name) ¤

Get cell value for given row and column name.

Source code in trestle/tasks/cis_xlsx_to_oscal_catalog.py
def get(self, row: int, name: str) -> str:
    """Get cell value for given row and column name."""
    nname = self._normalize(name)
    cname = self._translate(nname)
    col = self._col_name_to_number[cname]
    cell = self._work_sheet.cell(row, col)
    return cell.value
is_ocp(self) ¤

Check if sheet is for OCP.

Source code in trestle/tasks/cis_xlsx_to_oscal_catalog.py
def is_ocp(self) -> bool:
    """Check if sheet is for OCP."""
    return self._sheet_name == 'Combined Profiles'
row_generator(self) ¤

Generate rows until max reached.

Source code in trestle/tasks/cis_xlsx_to_oscal_catalog.py
def row_generator(self) -> Iterator[int]:
    """Generate rows until max reached."""
    row = 2
    while row <= self._work_sheet.max_row:
        yield row
        row += 1

handler: python