ocp4_cis_profile_to_oscal_catalog
trestle.tasks.ocp4_cis_profile_to_oscal_catalog
¤
OSCAL transformation tasks.
logger
¤
Classes¤
Node (BaseModel)
pydantic-model
¤
Representation of CIS profile entry.
Source code in trestle/tasks/ocp4_cis_profile_to_oscal_catalog.py
class Node(BaseModel):
"""Representation of CIS profile entry."""
name: Optional[str] = Field(None)
description: Optional[str] = Field(None)
__class_vars__
special
¤
__custom_root_type__
special
¤
__doc__
special
¤
__exclude_fields__
special
¤
__fields__
special
¤
__include_fields__
special
¤
__post_root_validators__
special
¤
__pre_root_validators__
special
¤
__private_attributes__
special
¤
__schema_cache__
special
¤
__signature__
special
¤
__slots__: Tuple[str, ...]
special
¤
__validators__
special
¤
description: str
pydantic-field
¤
name: str
pydantic-field
¤
Ocp4CisProfileToOscalCatalog (TaskBase)
¤
Task to transform OCP4 CIS profile to OSCAL catalog.
Attributes:
Name | Type | Description |
---|---|---|
name |
str |
Name of the task. |
Source code in trestle/tasks/ocp4_cis_profile_to_oscal_catalog.py
class Ocp4CisProfileToOscalCatalog(TaskBase):
"""
Task to transform OCP4 CIS profile to OSCAL catalog.
Attributes:
name: Name of the task.
"""
name = 'ocp4-cis-profile-to-oscal-catalog'
def __init__(self, config_object: Optional[configparser.SectionProxy]) -> None:
"""
Initialize trestle task ocp4-cis-profile-to-oscal-catalog.
Args:
config_object: Config section associated with the task.
"""
super().__init__(config_object)
self._timestamp = datetime.datetime.utcnow().replace(microsecond=0).replace(tzinfo=datetime.timezone.utc
).isoformat()
def print_info(self) -> None:
"""Print the help string."""
logger.info(f'Help information for {self.name} task.')
logger.info('')
logger.info('Purpose: Create catalog from standard (e.g. CIS benchmark).')
logger.info('')
logger.info('Configuration flags sit under [task.ocp4-cis-profile-to-oscal-catalog]:')
text1 = ' input-dir = '
text2 = '(required) location to read the compliance-as-code profile files.'
logger.info(text1 + text2)
text1 = ' output-dir = '
text2 = '(required) location to write the generated catalog.json file.'
logger.info(text1 + text2)
text1 = ' output-overwrite = '
text2 = '(optional) true [default] or false; replace existing output when true.'
logger.info(text1 + text2)
def simulate(self) -> TaskOutcome:
"""Provide a simulated outcome."""
return TaskOutcome('simulated-success')
def execute(self) -> TaskOutcome:
"""Provide an actual outcome."""
try:
return self._execute()
except Exception:
logger.info(traceback.format_exc())
return TaskOutcome('failure')
def _execute(self) -> TaskOutcome:
"""Wrap the execute for exception handling."""
if not self._config:
logger.warning('config missing')
return TaskOutcome('failure')
try:
idir = self._config['input-dir']
odir = self._config['output-dir']
except KeyError as e:
logger.info(f'key {e.args[0]} missing')
return TaskOutcome('failure')
# verbosity
quiet = self._config.get('quiet', False)
verbose = not quiet
# output
overwrite = self._config.getboolean('output-overwrite', True)
opth = pathlib.Path(odir)
# insure output dir exists
opth.mkdir(exist_ok=True, parents=True)
# calculate output file name & check writability
oname = 'catalog.json'
ofile = opth / oname
if not overwrite and pathlib.Path(ofile).exists():
logger.warning(f'output: {ofile} already exists')
return TaskOutcome('failure')
# metadata links (optional)
metadata_links = self._config.get('metadata-links')
# get list or <name>.profile files
filelist = self._get_filelist(idir)
if len(filelist) < 1:
logger.warning(f'input: {idir} no .profile file found')
return TaskOutcome('failure')
# initialize node list
self._node_map = {}
# process files
for fp in filelist:
lines = self._get_content(fp)
self._parse(lines)
# get root nodes
root_nodes = self._get_root_nodes()
# groups and controls
root = Group(title='root', groups=[])
for node in root_nodes:
group = Group(title=f'{node.name} {node.description}')
root.groups.append(group)
depth = self._depth(node.name)
if depth == 3:
self._add_groups(group, node.name, depth)
if depth == 2:
self._add_controls(group, node.name, depth)
# metadata
metadata = Metadata(
title=self._title, last_modified=self._timestamp, oscal_version=OSCAL_VERSION, version=trestle.__version__
)
# metadata links
if metadata_links is not None:
metadata.links = []
for item in metadata_links.split():
link = Link(href=item)
metadata.links.append(link)
# catalog
catalog = Catalog(uuid=_uuid(), metadata=metadata, groups=root.groups)
# write OSCAL ComponentDefinition to file
if verbose:
logger.info(f'output: {ofile}')
catalog.oscal_write(pathlib.Path(ofile))
return TaskOutcome('success')
def _get_filelist(self, idir: str) -> List[pathlib.Path]:
"""Get filelist."""
return [x for x in pathlib.Path(idir).iterdir() if x.is_file() and x.suffix == '.profile']
def _get_content(self, fp: pathlib.Path) -> List[str]:
"""Fetch content from file."""
content = None
try:
f = fp.open('r', encoding=const.FILE_ENCODING)
content = f.readlines()
f.close()
return content
except Exception as e:
logger.warning(f'unable to process {fp.name}')
raise e
def _parse(self, lines: List[str]) -> None:
"""Parse lines to build data structure."""
for line in lines:
line = line.strip()
if line.startswith('title: ') and "'" in line:
self._title = line.split("'")[1]
continue
line_parts = line.split(None, 2)
# must be 3 parts exactly
if len(line_parts) < 3:
continue
# normalized name and description
name = line_parts[1].rstrip('.')
description = line_parts[2]
# name must be numbers and decimal points
if not set(name) <= set('0123456789.'):
continue
# derive desired sortable key from name
key = self._get_key(name)
self._node_map[key] = Node(name=name, description=description)
def _get_key(self, name: str) -> Tuple[int, int, int]:
"""Convert name to desired sortable key."""
parts = name.split('.')
if len(parts) == 1:
key = (int(parts[0]), 0, 0)
elif len(parts) == 2:
key = (int(parts[0]), int(parts[1]), 0)
elif len(parts) == 3:
key = (int(parts[0]), int(parts[1]), int(parts[2]))
else:
text = f'Unexpected value: {name}'
raise RuntimeError(text)
return key
def _get_root_nodes(self) -> ValuesView[Node]:
"""Get root nodes."""
root_nodes = {}
for node in self._node_map.values():
if len(node.name) == 1:
root_nodes[node.name] = node
return root_nodes.values()
def _depth(self, prefix: str) -> int:
"""Get maximum depth for prefix."""
depth = 0
for node in self._node_map.values():
name = node.name
if not name.startswith(prefix):
continue
dots = name.split('.')
if len(dots) <= depth:
continue
depth = len(dots)
return depth
def _add_controls(self, group: Group, prefix: str, depth: int):
"""Add controls to group."""
controls = []
for key in sorted(self._node_map.keys()):
node = self._node_map[key]
name = node.name
if name.startswith(prefix):
dots = name.split('.')
if len(dots) == depth:
id_ = f'CIS-{node.name}'
title = f'{node.name} {node.description}'
control = Control(id=id_, title=title)
controls.append(control)
if len(controls) > 0:
group.controls = controls
def _add_groups(self, group: Group, prefix: str, depth: int):
"""Add sub-groups to group."""
groups = []
for key in sorted(self._node_map.keys()):
node = self._node_map[key]
name = node.name
if not name.startswith(prefix):
continue
if name == prefix:
continue
dots = name.split('.')
if len(dots) != depth - 1:
continue
title = f'{node.name} {node.description}'
sub_group = Group(title=title)
groups.append(sub_group)
sub_prefix = node.name
self._add_controls(sub_group, sub_prefix, depth)
if len(groups) > 0:
group.groups = groups
name: str
¤
Methods¤
__init__(self, config_object)
special
¤
Initialize trestle task ocp4-cis-profile-to-oscal-catalog.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config_object |
Optional[configparser.SectionProxy] |
Config section associated with the task. |
required |
Source code in trestle/tasks/ocp4_cis_profile_to_oscal_catalog.py
def __init__(self, config_object: Optional[configparser.SectionProxy]) -> None:
"""
Initialize trestle task ocp4-cis-profile-to-oscal-catalog.
Args:
config_object: Config section associated with the task.
"""
super().__init__(config_object)
self._timestamp = datetime.datetime.utcnow().replace(microsecond=0).replace(tzinfo=datetime.timezone.utc
).isoformat()
execute(self)
¤
Provide an actual outcome.
Source code in trestle/tasks/ocp4_cis_profile_to_oscal_catalog.py
def execute(self) -> TaskOutcome:
"""Provide an actual outcome."""
try:
return self._execute()
except Exception:
logger.info(traceback.format_exc())
return TaskOutcome('failure')
print_info(self)
¤
Print the help string.
Source code in trestle/tasks/ocp4_cis_profile_to_oscal_catalog.py
def print_info(self) -> None:
"""Print the help string."""
logger.info(f'Help information for {self.name} task.')
logger.info('')
logger.info('Purpose: Create catalog from standard (e.g. CIS benchmark).')
logger.info('')
logger.info('Configuration flags sit under [task.ocp4-cis-profile-to-oscal-catalog]:')
text1 = ' input-dir = '
text2 = '(required) location to read the compliance-as-code profile files.'
logger.info(text1 + text2)
text1 = ' output-dir = '
text2 = '(required) location to write the generated catalog.json file.'
logger.info(text1 + text2)
text1 = ' output-overwrite = '
text2 = '(optional) true [default] or false; replace existing output when true.'
logger.info(text1 + text2)
simulate(self)
¤
Provide a simulated outcome.
Source code in trestle/tasks/ocp4_cis_profile_to_oscal_catalog.py
def simulate(self) -> TaskOutcome:
"""Provide a simulated outcome."""
return TaskOutcome('simulated-success')
handler: python