xlsx_to_oscal_profile
trestle.tasks.xlsx_to_oscal_profile
¤
OSCAL transformation tasks.
logger
¤
Classes¤
XlsxToOscalProfile (TaskBase)
¤
Task to create OSCAL Profile json.
Attributes:
Name | Type | Description |
---|---|---|
name |
str |
Name of the task. |
Source code in trestle/tasks/xlsx_to_oscal_profile.py
class XlsxToOscalProfile(TaskBase):
"""
Task to create OSCAL Profile json.
Attributes:
name: Name of the task.
"""
name = 'xlsx-to-oscal-profile'
def __init__(self, config_object: Optional[configparser.SectionProxy]) -> None:
"""
Initialize trestle task xlsx-to-oscal-profile.
Args:
config_object: Config section associated with the task.
"""
super().__init__(config_object)
self.xlsx_helper = XlsxHelper()
self._timestamp = datetime.datetime.utcnow().replace(microsecond=0).replace(tzinfo=datetime.timezone.utc
).isoformat()
def set_timestamp(self, timestamp: str) -> None:
"""Set the timestamp."""
self._timestamp = timestamp
def print_info(self) -> None:
"""Print the help string."""
self.xlsx_helper.print_info(self.name, 'profile')
def simulate(self) -> TaskOutcome:
"""Provide a simulated outcome."""
return TaskOutcome('simulated-success')
def execute(self) -> TaskOutcome:
"""Provide an executed outcome."""
try:
return self._execute()
except Exception:
logger.info(traceback.format_exc())
return TaskOutcome('failure')
def _execute(self) -> TaskOutcome:
"""Execute path core."""
if not self.xlsx_helper.configure(self):
return TaskOutcome('failure')
# config output
odir = self._config.get('output-dir')
opth = pathlib.Path(odir)
self._overwrite = self._config.getboolean('output-overwrite', True)
# insure output dir exists
opth.mkdir(exist_ok=True, parents=True)
# calculate output file name & check writability
oname = 'profile.json'
ofile = opth / oname
if not self._overwrite and pathlib.Path(ofile).exists():
logger.error(f'output: {ofile} already exists')
return TaskOutcome('failure')
# create OSCAL Profile
metadata = Metadata(
title='Profile for ' + self._get_profile_title(),
last_modified=self._timestamp,
oscal_version=OSCAL_VERSION,
version=get_trestle_version(),
)
if self.xlsx_helper.profile_type == self.xlsx_helper.by_control:
imports = self._get_imports_by_control()
profile = Profile(
uuid=str(uuid.uuid4()),
metadata=metadata,
imports=imports,
)
else:
if self.xlsx_helper.profile_type == self.xlsx_helper.by_rule:
imports = self._get_imports_by_rule()
elif self.xlsx_helper.profile_type == self.xlsx_helper.by_check:
imports = self._get_imports_by_check()
else:
imports = self._get_imports_by_goal()
set_parameters = self._get_set_parameters()
modify = Modify(set_parameters=set_parameters)
profile = Profile(
uuid=str(uuid.uuid4()),
metadata=metadata,
imports=imports,
modify=modify,
)
# write OSCAL Profile to file
if self._verbose:
logger.info(f'output: {ofile}')
profile.oscal_write(pathlib.Path(ofile))
# issues
self._report_issues()
return TaskOutcome('success')
def _get_imports_by_goal(self) -> List[Import]:
"""Get imports by goal."""
return self._get_imports_by_check()
def _get_imports_by_control(self) -> List[Import]:
"""Get imports by control."""
import_ = Import(
href=self._get_spread_sheet_url(),
include_controls=[SelectControlById(with_ids=self._get_with_ids_by_control())],
)
imports = [import_]
return imports
def _get_imports_by_rule(self) -> List[Import]:
"""Get imports by rule."""
import_ = Import(
href=self._get_spread_sheet_url(),
include_controls=[SelectControlById(with_ids=self._get_with_ids_by_rule())],
)
imports = [import_]
return imports
def _get_imports_by_check(self) -> List[Import]:
"""Get imports by check."""
import_ = Import(
href=self._get_spread_sheet_url(),
include_controls=[SelectControlById(with_ids=self._get_with_ids_by_check())],
)
imports = [import_]
return imports
def _get_with_ids_by_control(self) -> List[str]:
"""Get controls from spread sheet."""
control_list = []
for row in self.xlsx_helper.row_generator():
# quit when first row with no goal_id encountered
controls = self.xlsx_helper.get_controls(row)
if controls is not None:
for control in controls:
control = self._oscal_namify(control)
if control in control_list:
continue
control_list.append(control)
return sorted(control_list, key=self._control_sort_key)
def _get_with_ids_by_rule(self) -> List[str]:
"""Get rules from spread sheet."""
rule_name_id_list = []
for row in self.xlsx_helper.row_generator():
# quit when first row with no goal_id encountered
rule_name_id = self.xlsx_helper.get_rule_name_id(row, strict=True)
if rule_name_id is not None:
if rule_name_id in rule_name_id_list:
continue
rule_name_id_list.append(rule_name_id)
return sorted(rule_name_id_list)
def _get_with_ids_by_check(self) -> List[str]:
"""Get check from spread sheet."""
check_name_id_list = []
for row in self.xlsx_helper.row_generator():
# quit when first row with no goal_id encountered
check_name_id = self.xlsx_helper.get_check_name_id(row, strict=True)
if check_name_id is not None:
if check_name_id in check_name_id_list:
continue
check_name_id_list.append(check_name_id)
return sorted(check_name_id_list)
def _control_sort_key(self, control: str) -> Tuple[str, int, int]:
"""Fabricate sort key."""
k1 = control.split('-')[0]
k2 = int(control.split('-')[1].split('.')[0])
if '.' in control:
k3 = int(control.split('-')[1].split('.')[1])
else:
k3 = 0
return (k1, k2, k3)
def _oscal_namify(self, control: str) -> str:
"""Rectify parenthesized numbers in controls."""
control = control.replace('(', '.')
control = control.replace(')', '')
return control
def _get_set_parameters(self) -> List[SetParameter]:
"""Get set parameters from spread sheet."""
set_parameters = []
for row in self.xlsx_helper.row_generator():
# quit when first row with no goal_id encountered
param_id, label = self.xlsx_helper.get_parameter_name_and_description(row)
usage = self.xlsx_helper.get_parameter_usage(row)
values = self.xlsx_helper.get_parameter_values(row)
if param_id is None:
continue
set_parameter = SetParameter(
param_id=param_id,
label=label,
usage=usage,
)
if values is not None:
set_parameter.values = values
set_parameters.append(set_parameter)
return set_parameters
def _get_profile_title(self) -> str:
"""Get profile title from config."""
value = self._config.get('profile-title')
logger.debug(f'profile-title: {value}')
return value
def _get_spread_sheet_url(self) -> str:
"""Get spread sheet url from config."""
value = self._config.get('spread-sheet-url')
logger.debug(f'spread-sheet-url: {value}')
return value
def _report_issues(self) -> None:
"""Report issues."""
self.xlsx_helper.report_issues()
name: str
¤
Methods¤
__init__(self, config_object)
special
¤
Initialize trestle task xlsx-to-oscal-profile.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config_object |
Optional[configparser.SectionProxy] |
Config section associated with the task. |
required |
Source code in trestle/tasks/xlsx_to_oscal_profile.py
def __init__(self, config_object: Optional[configparser.SectionProxy]) -> None:
"""
Initialize trestle task xlsx-to-oscal-profile.
Args:
config_object: Config section associated with the task.
"""
super().__init__(config_object)
self.xlsx_helper = XlsxHelper()
self._timestamp = datetime.datetime.utcnow().replace(microsecond=0).replace(tzinfo=datetime.timezone.utc
).isoformat()
execute(self)
¤
Provide an executed outcome.
Source code in trestle/tasks/xlsx_to_oscal_profile.py
def execute(self) -> TaskOutcome:
"""Provide an executed outcome."""
try:
return self._execute()
except Exception:
logger.info(traceback.format_exc())
return TaskOutcome('failure')
print_info(self)
¤
Print the help string.
Source code in trestle/tasks/xlsx_to_oscal_profile.py
def print_info(self) -> None:
"""Print the help string."""
self.xlsx_helper.print_info(self.name, 'profile')
set_timestamp(self, timestamp)
¤
Set the timestamp.
Source code in trestle/tasks/xlsx_to_oscal_profile.py
def set_timestamp(self, timestamp: str) -> None:
"""Set the timestamp."""
self._timestamp = timestamp
simulate(self)
¤
Provide a simulated outcome.
Source code in trestle/tasks/xlsx_to_oscal_profile.py
def simulate(self) -> TaskOutcome:
"""Provide a simulated outcome."""
return TaskOutcome('simulated-success')
handler: python