oscal_catalog_to_csv
trestle.tasks.oscal_catalog_to_csv
¤
OSCAL transformation tasks.
level_control
¤
level_default
¤
level_list
¤
level_statement
¤
logger
¤
recurse
¤
timestamp
¤
Classes¤
CatalogHelper
¤
OSCAL Catalog Helper.
Source code in trestle/tasks/oscal_catalog_to_csv.py
class CatalogHelper:
"""OSCAL Catalog Helper."""
def __init__(self, path) -> None:
"""Initialize."""
self.path = path
self.catalog = Catalog.oscal_read(path)
self.catalog_interface = CatalogInterface(self.catalog)
self._init_control_parent_map()
def _init_control_parent_map(self, recurse=True) -> None:
"""Initialize map: Child Control.id to parent Control."""
self._control_parent_map = {}
for control in self.catalog_interface.get_all_controls_from_catalog(recurse):
parents = self.catalog_interface.get_dependent_control_ids(control.id)
for parent in parents:
# assert child has only one parent
if parent in self._control_parent_map.keys():
raise RuntimeError('{parent} duplicate?')
self._control_parent_map[parent] = control
def get_parent_control(self, ctl_id: str) -> Control:
"""Return parent Control of child Control.id, if any."""
return self._control_parent_map.get(ctl_id)
def get_family_controls(self, ctl_id: str) -> List[Control]:
"""Return family of controls for Control.id, if any."""
rval = []
search_id = ctl_id.split('.')[0]
for control in self.catalog_interface.get_all_controls_from_catalog(recurse):
if control.id.startswith(search_id):
rval.append(control)
return rval
def get_controls(self, recurse=True) -> Iterator:
"""Return controls iterator."""
for control in self.catalog_interface.get_all_controls_from_catalog(recurse):
yield control
def get_statement_text_for_control(self, control: Control) -> Optional[str]:
"""Get statement text for control."""
statement_text = self._withdrawn(control)
return statement_text
def get_statement_text_for_part(self, control: Control, part: Part) -> Optional[str]:
"""Get statement text for part."""
statement_text = self._derive_text(control, part)
if part.parts:
for subpart in part.parts:
if '_smt' in subpart.id:
partial_text = self._derive_text(control, subpart)
statement_text = join_str(statement_text, partial_text)
return statement_text
def _withdrawn(self, control: Control) -> Optional[str]:
"""Check if withdrawn."""
rval = None
for prop in control.props:
if prop.name.lower() == 'status' and prop.value.lower() == 'withdrawn':
status = self._get_status(control)
rval = join_str('Withdrawn', status, '')
rval = f'[{rval}]'
break
return rval
def _link_generator(self, control: Control) -> Iterator[Link]:
"""Link generator."""
if control.links:
for link in control.links:
yield link
def _get_status(self, control: Control) -> Optional[str]:
"""Get status."""
rval = None
ilist = None
for link in self._link_generator(control):
if link.rel.lower() == 'moved-to':
moved = self._href_to_control(link.href)
rval = f': Moved to {moved}.'
break
if link.rel.lower() == 'incorporated-into':
incorporated = self._href_to_control(link.href)
if ilist is None:
ilist = f'{incorporated}'
else:
ilist = f'{ilist}, {incorporated}'
if ilist:
rval = f': Incorporated into {ilist}.'
return rval
def _href_to_control(self, href: str) -> str:
"""Convert href to control."""
rval = href.replace('#', '').upper()
return rval
def _derive_text(self, control: Control, part: Part) -> Optional[str]:
"""Derive control text."""
rval = None
if part.prose:
id_ = self._derive_id(part.id)
text = self._resolve_parms(control, part.prose)
rval = join_str(id_, text)
return rval
def _derive_id(self, id_: str) -> str:
"""Derive control text sub-part id."""
rval = None
id_parts = id_.split('_smt')
if id_parts[1]:
id_sub_parts = id_parts[1].split('.')
if len(id_sub_parts) == 2:
rval = f'{id_sub_parts[1]}.'
elif len(id_sub_parts) == 3:
rval = f'{id_sub_parts[2]}.'
elif len(id_sub_parts) == 4:
rval = f'({id_sub_parts[3]})'
return rval
def _resolve_parms(self, control: Control, utext: str) -> str:
"""Resolve parm."""
rtext = self._resolve_parms_for_control(control, utext)
if '{{' in rtext:
parent_control = self.get_parent_control(control.id)
if parent_control:
rtext = self._resolve_parms_for_control(parent_control, rtext)
if '{{' in rtext:
family_controls = self.get_family_controls(control.id)
for family_control in family_controls:
rtext = self._resolve_parms_for_control(family_control, rtext)
if '{{' in rtext:
text = f'control.id: {control.id} unresolved: {rtext}'
raise RuntimeError(text)
return rtext
def _resolve_parms_for_control(self, control: Control, utext: str) -> str:
"""Resolve parms for control."""
rtext = utext
staches: List[str] = re.findall(r'{{.*?}}', utext)
if staches:
for stach in staches:
parm_id = stach
parm_id = parm_id.replace('{{', '')
parm_id = parm_id.replace('}}', '')
parm_id = parm_id.split(',')[1].strip()
value = self._get_parm_value(control, parm_id)
if value:
rtext = rtext.replace(stach, value)
return rtext
def _get_parm_value(self, control: Control, parm_id: str) -> str:
"""Get parm value."""
rval = None
if control.params:
for param in control.params:
if param.id != parm_id:
continue
if param.label:
rval = f'[Assignment: {param.label}]'
elif param.select:
choices = self._get_parm_choices(control, param)
if param.select.how_many == HowMany.one:
rval = f'[Selection (one): {choices}]'
else:
rval = f'[Selection (one or more): {choices}]'
break
return rval
def _get_parm_choices(self, control: Control, param: Parameter) -> str:
"""Get parm choices."""
choices = ''
for choice in param.select.choice:
rchoice = self._resolve_parms(control, choice)
if choices:
choices += f'; {rchoice}'
else:
choices += f'{rchoice}'
return choices
Methods¤
__init__(self, path)
special
¤
Initialize.
Source code in trestle/tasks/oscal_catalog_to_csv.py
def __init__(self, path) -> None:
"""Initialize."""
self.path = path
self.catalog = Catalog.oscal_read(path)
self.catalog_interface = CatalogInterface(self.catalog)
self._init_control_parent_map()
get_controls(self, recurse=True)
¤
Return controls iterator.
Source code in trestle/tasks/oscal_catalog_to_csv.py
def get_controls(self, recurse=True) -> Iterator:
"""Return controls iterator."""
for control in self.catalog_interface.get_all_controls_from_catalog(recurse):
yield control
get_family_controls(self, ctl_id)
¤
Return family of controls for Control.id, if any.
Source code in trestle/tasks/oscal_catalog_to_csv.py
def get_family_controls(self, ctl_id: str) -> List[Control]:
"""Return family of controls for Control.id, if any."""
rval = []
search_id = ctl_id.split('.')[0]
for control in self.catalog_interface.get_all_controls_from_catalog(recurse):
if control.id.startswith(search_id):
rval.append(control)
return rval
get_parent_control(self, ctl_id)
¤
Return parent Control of child Control.id, if any.
Source code in trestle/tasks/oscal_catalog_to_csv.py
def get_parent_control(self, ctl_id: str) -> Control:
"""Return parent Control of child Control.id, if any."""
return self._control_parent_map.get(ctl_id)
get_statement_text_for_control(self, control)
¤
Get statement text for control.
Source code in trestle/tasks/oscal_catalog_to_csv.py
def get_statement_text_for_control(self, control: Control) -> Optional[str]:
"""Get statement text for control."""
statement_text = self._withdrawn(control)
return statement_text
get_statement_text_for_part(self, control, part)
¤
Get statement text for part.
Source code in trestle/tasks/oscal_catalog_to_csv.py
def get_statement_text_for_part(self, control: Control, part: Part) -> Optional[str]:
"""Get statement text for part."""
statement_text = self._derive_text(control, part)
if part.parts:
for subpart in part.parts:
if '_smt' in subpart.id:
partial_text = self._derive_text(control, subpart)
statement_text = join_str(statement_text, partial_text)
return statement_text
ContentManager
¤
Content manager.
Source code in trestle/tasks/oscal_catalog_to_csv.py
class ContentManager():
"""Content manager."""
def __init__(self, catalog_helper: CatalogHelper) -> None:
"""Initialize."""
self.catalog_helper = catalog_helper
self.rows = []
self.row_template = None
def add(self, row: List):
"""Add row."""
n_row = copy.copy(row)
t_row = self.row_template
if t_row:
for index in range(3):
if n_row[index] == t_row[index]:
n_row[index] = None
self.rows.append(n_row)
self.row_template = row
def get_content(self, level: str) -> List:
"""Get content."""
if level == level_control:
rval = self._get_content_by_control()
else:
rval = self._get_content_by_statement()
return rval
def _get_content_by_statement(self) -> List:
"""Get content by statement."""
catalog_helper = self.catalog_helper
header = ['Control Identifier', 'Control Title', 'Statement Identifier', 'Statement Text']
self.rows.append(header)
for control in catalog_helper.get_controls():
control_id = convert_control_id(control.id)
if control.parts:
self._add_parts_by_statement(control)
else:
statement_text = catalog_helper.get_statement_text_for_control(control)
row = [control_id, control.title, '', statement_text]
self.add(row)
return self.rows
def _add_subparts_by_statement(self, control: Control, part: Part) -> None:
"""Add subparts by statement."""
catalog_helper = self.catalog_helper
control_id = convert_control_id(control.id)
for subpart in part.parts:
if '_smt' in subpart.id:
statement_text = catalog_helper.get_statement_text_for_part(control, subpart)
row = [control_id, control.title, convert_smt_id(subpart.id), statement_text]
self.add(row)
def _add_parts_by_statement(self, control: Control) -> None:
"""Add parts by statement."""
catalog_helper = self.catalog_helper
control_id = convert_control_id(control.id)
for part in control.parts:
if part.id:
if '_smt' not in part.id:
continue
if part.parts:
self._add_subparts_by_statement(control, part)
else:
statement_text = catalog_helper.get_statement_text_for_part(control, part)
row = [control_id, control.title, convert_smt_id(part.id), statement_text]
self.add(row)
def _get_content_by_control(self) -> List:
"""Get content by statement."""
catalog_helper = self.catalog_helper
header = ['Control Identifier', 'Control Title', 'Control Text']
self.rows.append(header)
for control in catalog_helper.get_controls():
control_id = convert_control_id(control.id)
if control.parts:
self._add_parts_by_control(control)
else:
control_text = catalog_helper.get_statement_text_for_control(control)
row = [control_id, control.title, control_text]
self.add(row)
return self.rows
def _add_subparts_by_control(self, control: Control, part: Part, control_text) -> str:
"""Add subparts by control."""
catalog_helper = self.catalog_helper
for subpart in part.parts:
if '_smt' in subpart.id:
statement_text = catalog_helper.get_statement_text_for_part(control, subpart)
control_text = join_str(control_text, statement_text)
return control_text
def _add_parts_by_control(self, control: Control) -> None:
"""Add parts by control."""
catalog_helper = self.catalog_helper
control_id = convert_control_id(control.id)
control_text = None
for part in control.parts:
if part.id:
if '_smt' not in part.id:
continue
if part.parts:
control_text = self._add_subparts_by_control(control, part, control_text)
else:
statement_text = catalog_helper.get_statement_text_for_part(control, part)
control_text = join_str(control_text, statement_text)
row = [control_id, control.title, control_text]
self.add(row)
Methods¤
__init__(self, catalog_helper)
special
¤
Initialize.
Source code in trestle/tasks/oscal_catalog_to_csv.py
def __init__(self, catalog_helper: CatalogHelper) -> None:
"""Initialize."""
self.catalog_helper = catalog_helper
self.rows = []
self.row_template = None
add(self, row)
¤
Add row.
Source code in trestle/tasks/oscal_catalog_to_csv.py
def add(self, row: List):
"""Add row."""
n_row = copy.copy(row)
t_row = self.row_template
if t_row:
for index in range(3):
if n_row[index] == t_row[index]:
n_row[index] = None
self.rows.append(n_row)
self.row_template = row
get_content(self, level)
¤
Get content.
Source code in trestle/tasks/oscal_catalog_to_csv.py
def get_content(self, level: str) -> List:
"""Get content."""
if level == level_control:
rval = self._get_content_by_control()
else:
rval = self._get_content_by_statement()
return rval
CsvHelper
¤
Csv Helper.
Source code in trestle/tasks/oscal_catalog_to_csv.py
class CsvHelper:
"""Csv Helper."""
def __init__(self, path) -> None:
"""Initialize."""
self.path = path
def write(self, rows: List[List[str]]) -> None:
"""Write csv file."""
with open(self.path, 'w', newline='', encoding='utf-8') as output:
csv_writer = csv.writer(output, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
for row in rows:
csv_writer.writerow(row)
Methods¤
__init__(self, path)
special
¤
Initialize.
Source code in trestle/tasks/oscal_catalog_to_csv.py
def __init__(self, path) -> None:
"""Initialize."""
self.path = path
write(self, rows)
¤
Write csv file.
Source code in trestle/tasks/oscal_catalog_to_csv.py
def write(self, rows: List[List[str]]) -> None:
"""Write csv file."""
with open(self.path, 'w', newline='', encoding='utf-8') as output:
csv_writer = csv.writer(output, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
for row in rows:
csv_writer.writerow(row)
OscalCatalogToCsv (TaskBase)
¤
Task to transform OSCAL catalog to .csv.
Attributes:
Name | Type | Description |
---|---|---|
name |
str |
Name of the task. |
Source code in trestle/tasks/oscal_catalog_to_csv.py
class OscalCatalogToCsv(TaskBase):
"""
Task to transform OSCAL catalog to .csv.
Attributes:
name: Name of the task.
"""
name = 'oscal-catalog-to-csv'
def __init__(self, config_object: Optional[configparser.SectionProxy]) -> None:
"""
Initialize trestle task.
Args:
config_object: Config section associated with the task.
"""
super().__init__(config_object)
def print_info(self) -> None:
"""Print the help string."""
logger.info(f'Help information for {self.name} task.')
logger.info('')
logger.info('Purpose: Create .csv from OSCAL catalog.')
logger.info('')
logger.info('Configuration flags sit under [task.oscal-catalog-to-csv]:')
text1 = ' input-file = '
text2 = '(required) path of file to read the catalog.'
logger.info(text1 + text2)
text1 = ' output-dir = '
text2 = '(required) path of directory to write the generated .csv file.'
logger.info(text1 + text2)
text1 = ' output-name = '
text2 = '(optional) name of the generated .csv file [default is name of input file with .csv suffix].'
logger.info(text1 + text2)
text1 = ' output-overwrite = '
text2 = '(optional) true [default] or false; replace existing output when true.'
logger.info(text1 + text2)
text1 = ' level = '
text2 = f'(optional) one of: {level_control} or {level_statement} [default].'
logger.info(text1 + text2)
def simulate(self) -> TaskOutcome:
"""Provide a simulated outcome."""
return TaskOutcome('simulated-success')
def execute(self) -> TaskOutcome:
"""Provide an actual outcome."""
try:
return self._execute()
except Exception:
logger.info(traceback.format_exc())
return TaskOutcome('failure')
def _execute(self) -> TaskOutcome:
"""Wrap the execute for exception handling."""
# config processing
if not self._config:
logger.warning('config missing')
return TaskOutcome('failure')
# input
ifile = self._config.get('input-file')
if not ifile:
logger.warning('input-file missing')
return TaskOutcome('failure')
ipth = pathlib.Path(ifile)
# overwrite
self._overwrite = self._config.getboolean('output-overwrite', True)
# output
odir = self._config.get('output-dir')
if not odir:
logger.warning('output-dir missing')
return TaskOutcome('failure')
opth = pathlib.Path(odir)
opth.mkdir(exist_ok=True, parents=True)
iname = ipth.name.split('.')[0]
oname = self._config.get('output-name', f'{iname}.csv')
opth = opth / oname
if not self._overwrite and opth.exists():
logger.warning(f'output: {opth} already exists')
return TaskOutcome('failure')
csv_helper = CsvHelper(opth)
# level
level = self._config.get('level', level_default)
if level not in level_list:
logger.warning(f'level: {level} unknown')
return TaskOutcome('failure')
# helper
catalog_helper = CatalogHelper(ipth)
# process
content_manager = ContentManager(catalog_helper)
rows = content_manager.get_content(level)
# write
csv_helper.write(rows)
logger.info(f'output-file: {opth}')
# success
return TaskOutcome('success')
name: str
¤
Methods¤
__init__(self, config_object)
special
¤
Initialize trestle task.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config_object |
Optional[configparser.SectionProxy] |
Config section associated with the task. |
required |
Source code in trestle/tasks/oscal_catalog_to_csv.py
def __init__(self, config_object: Optional[configparser.SectionProxy]) -> None:
"""
Initialize trestle task.
Args:
config_object: Config section associated with the task.
"""
super().__init__(config_object)
execute(self)
¤
Provide an actual outcome.
Source code in trestle/tasks/oscal_catalog_to_csv.py
def execute(self) -> TaskOutcome:
"""Provide an actual outcome."""
try:
return self._execute()
except Exception:
logger.info(traceback.format_exc())
return TaskOutcome('failure')
print_info(self)
¤
Print the help string.
Source code in trestle/tasks/oscal_catalog_to_csv.py
def print_info(self) -> None:
"""Print the help string."""
logger.info(f'Help information for {self.name} task.')
logger.info('')
logger.info('Purpose: Create .csv from OSCAL catalog.')
logger.info('')
logger.info('Configuration flags sit under [task.oscal-catalog-to-csv]:')
text1 = ' input-file = '
text2 = '(required) path of file to read the catalog.'
logger.info(text1 + text2)
text1 = ' output-dir = '
text2 = '(required) path of directory to write the generated .csv file.'
logger.info(text1 + text2)
text1 = ' output-name = '
text2 = '(optional) name of the generated .csv file [default is name of input file with .csv suffix].'
logger.info(text1 + text2)
text1 = ' output-overwrite = '
text2 = '(optional) true [default] or false; replace existing output when true.'
logger.info(text1 + text2)
text1 = ' level = '
text2 = f'(optional) one of: {level_control} or {level_statement} [default].'
logger.info(text1 + text2)
simulate(self)
¤
Provide a simulated outcome.
Source code in trestle/tasks/oscal_catalog_to_csv.py
def simulate(self) -> TaskOutcome:
"""Provide a simulated outcome."""
return TaskOutcome('simulated-success')
Functions¤
convert_control_id(control_id)
¤
Convert control id.
Source code in trestle/tasks/oscal_catalog_to_csv.py
def convert_control_id(control_id: str) -> str:
"""Convert control id."""
rval = copy.copy(control_id)
rval = rval.upper()
if '.' in rval:
rval = rval.replace('.', '(')
rval = rval + ')'
return rval
convert_smt_id(smt_id)
¤
Convert smt id.
Source code in trestle/tasks/oscal_catalog_to_csv.py
def convert_smt_id(smt_id: str) -> str:
"""Convert smt id."""
parts = smt_id.split('_smt')
seg1 = convert_control_id(parts[0])
seg2 = ''
if len(parts) == 2:
seg2 = parts[1]
if '.' in seg2:
seg2 = seg2.replace('.', '(')
seg2 = seg2 + ')'
rval = f'{seg1}{seg2}'
return rval
join_str(s1, s2, sep=' ')
¤
Join strings.
Source code in trestle/tasks/oscal_catalog_to_csv.py
def join_str(s1: Optional[str], s2: Optional[str], sep: str = ' ') -> Optional[str]:
"""Join strings."""
if s1 is None:
rval = s2
elif s2 is None:
rval = s1
else:
rval = f'{s1}{sep}{s2}'
return rval
handler: python