xccdf_result_to_oscal_ar
trestle.tasks.xccdf_result_to_oscal_ar
¤
OSCAL transformation tasks.
default_description
¤
default_title
¤
default_type
¤
logger
¤
Classes¤
XccdfResultToOscalAR (TaskBase)
¤
Task to convert Xccdf result to OSCAL json.
Attributes:
Name | Type | Description |
---|---|---|
name |
str |
Name of the task. |
Source code in trestle/tasks/xccdf_result_to_oscal_ar.py
class XccdfResultToOscalAR(TaskBase):
"""
Task to convert Xccdf result to OSCAL json.
Attributes:
name: Name of the task.
"""
name = 'xccdf-result-to-oscal-ar'
def __init__(self, config_object: Optional[configparser.SectionProxy]) -> None:
"""
Initialize trestle task xccdf-result-to-oscal-ar.
Args:
config_object: Config section associated with the task.
"""
super().__init__(config_object)
def print_info(self) -> None:
"""Print the help string."""
opt = '(optional)'
req = '(required)'
logger.info(f'Help information for {self.name} task.')
logger.info('')
logger.info(
'Purpose: Transform Xccdf files into Open Security Controls Assessment Language (OSCAL) '
+ 'partial results files.'
)
logger.info('')
logger.info('Configuration flags sit under [task.xccdf-result-to-oscal-ar]:')
#
t1 = f' input-dir = {req} '
t2 = 'the path of the input directory comprising Xccdf results.'
logger.info(f'{t1}{t2}')
t1 = f' output-dir = {req} '
t2 = 'the path of the output directory comprising synthesized OSCAL .json files.'
logger.info(f'{t1}{t2}')
t1 = f' checking = {opt} '
t2 = 'True indicates perform strict checking of OSCAL properties, default is False.'
logger.info(f'{t1}{t2}')
t1 = f' output-overwrite = {opt} '
t2 = 'true [default] or false; replace existing output when true.'
logger.info(f'{t1}{t2}')
t1 = f' quiet = {opt} '
t2 = 'true or false [default]; display file creations and rules analysis when false.'
logger.info(f'{t1}{t2}')
t1 = f' title = {opt} '
t2 = f'default={default_title}.'
logger.info(f'{t1}{t2}')
t1 = f' description = {opt} '
t2 = f'default={default_description}.'
logger.info(f'{t1}{t2}')
t1 = f' type = {opt} '
t2 = f'default={default_type}.'
logger.info(f'{t1}{t2}')
t1 = f' property-name-to-class = {opt} '
t2 = 'list of name:class pairs for tagging named property with class, '
t3 = 'e.g. "target:scc_inventory_item_id, version:scc_check_version".'
logger.info(f'{t1}{t2}{t3}')
t1 = f' timestamp = {opt} '
t2 = 'timestamp for the Observations in ISO 8601 format, such as '
t3 = ' 2021-01-04T00:05:23+04:00 for example; if not specified then value for "Timestamp" key in the Xccdf '
t4 = ' result is used if present, otherwise current time is used.'
logger.info(f'{t1}{t2}{t3}{t4}')
#
logger.info('')
logger.info(
'Operation: A transformation is performed on one or more Xccdf input files to produce output in OSCAL '
+ 'partial results format.'
)
def simulate(self) -> TaskOutcome:
"""Provide a simulated outcome."""
self._simulate = True
return self._transform()
def execute(self) -> TaskOutcome:
"""Provide an actual outcome."""
self._simulate = False
return self._transform()
def _transform(self) -> TaskOutcome:
"""Perform transformation."""
try:
return self._transform_work()
except Exception:
logger.debug(traceback.format_exc())
mode = ''
if self._simulate:
mode = 'simulated-'
return TaskOutcome(mode + 'failure')
def _transform_work(self) -> TaskOutcome:
"""
Perform transformation work steps.
Work steps: read input, process, write output, display analysis
"""
mode = ''
if self._simulate:
mode = 'simulated-'
if not self._config:
logger.warning('config missing')
return TaskOutcome(mode + 'failure')
# config required input & output dirs
try:
idir = self._config['input-dir']
ipth = pathlib.Path(idir)
odir = self._config['output-dir']
opth = pathlib.Path(odir)
except KeyError as e:
logger.debug(f'key {e.args[0]} missing')
return TaskOutcome(mode + 'failure')
# config optional overwrite & quiet
self._overwrite = self._config.getboolean('output-overwrite', True)
quiet = self._config.get('quiet', False)
self._verbose = not self._simulate and not quiet
# title, description, type
title = self._config.get('title', default_title)
description = self._config.get('description', default_description)
type_ = self._config.get('type', default_type)
# property-name-to-class
tags = self._get_tags()
# config optional timestamp
timestamp = self._config.get('timestamp')
if timestamp is not None:
try:
XccdfTransformer.set_timestamp(timestamp)
except Exception:
logger.warning('config invalid "timestamp"')
return TaskOutcome(mode + 'failure')
# config optional performance
modes = {
'checking': self._config.getboolean('checking', False),
}
# insure output dir exists
opth.mkdir(exist_ok=True, parents=True)
# process
for ifile in sorted(ipth.iterdir()):
if ifile.suffix not in ['.json', '.jsn', '.yaml', '.yml', '.xml']:
continue
blob = self._read_file(ifile)
xccdf_transformer = XccdfTransformer()
xccdf_transformer.set_title(title)
xccdf_transformer.set_description(description)
xccdf_transformer.set_type(type_)
xccdf_transformer.set_modes(modes)
xccdf_transformer.set_tags(tags)
results = xccdf_transformer.transform(blob)
oname = ifile.stem + '.oscal' + '.json'
ofile = opth / oname
if not self._overwrite and pathlib.Path(ofile).exists():
logger.warning(f'output: {ofile} already exists')
return TaskOutcome(mode + 'failure')
self._write_file(results, ofile)
self._show_analysis(xccdf_transformer)
return TaskOutcome(mode + 'success')
def _get_tags(self) -> Dict:
"""Get property name to class tags, if any."""
tags = {}
data = self._config.get('property-name-to-class')
if data is not None:
for item in data.split(','):
item = item.strip()
parts = item.split(':')
if len(parts) == 2:
name = parts[0]
value = parts[1]
tags[name] = value
return tags
def _read_file(self, ifile: str) -> str:
"""Read raw input file."""
if not self._simulate and self._verbose:
logger.info(f'input: {ifile}')
with open(ifile, encoding=const.FILE_ENCODING) as fp:
blob = fp.read()
return blob
def _write_file(self, result: str, ofile: str) -> None:
"""Write oscal results file."""
if not self._simulate:
if self._verbose:
logger.info(f'output: {ofile}')
result.oscal_write(pathlib.Path(ofile))
def _show_analysis(self, xccdf_transformer: XccdfTransformer) -> None:
"""Show analysis."""
if not self._simulate and self._verbose:
analysis = xccdf_transformer.analysis
for line in analysis:
logger.info(line)
name: str
¤
Methods¤
__init__(self, config_object)
special
¤
Initialize trestle task xccdf-result-to-oscal-ar.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config_object |
Optional[configparser.SectionProxy] |
Config section associated with the task. |
required |
Source code in trestle/tasks/xccdf_result_to_oscal_ar.py
def __init__(self, config_object: Optional[configparser.SectionProxy]) -> None:
"""
Initialize trestle task xccdf-result-to-oscal-ar.
Args:
config_object: Config section associated with the task.
"""
super().__init__(config_object)
execute(self)
¤
Provide an actual outcome.
Source code in trestle/tasks/xccdf_result_to_oscal_ar.py
def execute(self) -> TaskOutcome:
"""Provide an actual outcome."""
self._simulate = False
return self._transform()
print_info(self)
¤
Print the help string.
Source code in trestle/tasks/xccdf_result_to_oscal_ar.py
def print_info(self) -> None:
"""Print the help string."""
opt = '(optional)'
req = '(required)'
logger.info(f'Help information for {self.name} task.')
logger.info('')
logger.info(
'Purpose: Transform Xccdf files into Open Security Controls Assessment Language (OSCAL) '
+ 'partial results files.'
)
logger.info('')
logger.info('Configuration flags sit under [task.xccdf-result-to-oscal-ar]:')
#
t1 = f' input-dir = {req} '
t2 = 'the path of the input directory comprising Xccdf results.'
logger.info(f'{t1}{t2}')
t1 = f' output-dir = {req} '
t2 = 'the path of the output directory comprising synthesized OSCAL .json files.'
logger.info(f'{t1}{t2}')
t1 = f' checking = {opt} '
t2 = 'True indicates perform strict checking of OSCAL properties, default is False.'
logger.info(f'{t1}{t2}')
t1 = f' output-overwrite = {opt} '
t2 = 'true [default] or false; replace existing output when true.'
logger.info(f'{t1}{t2}')
t1 = f' quiet = {opt} '
t2 = 'true or false [default]; display file creations and rules analysis when false.'
logger.info(f'{t1}{t2}')
t1 = f' title = {opt} '
t2 = f'default={default_title}.'
logger.info(f'{t1}{t2}')
t1 = f' description = {opt} '
t2 = f'default={default_description}.'
logger.info(f'{t1}{t2}')
t1 = f' type = {opt} '
t2 = f'default={default_type}.'
logger.info(f'{t1}{t2}')
t1 = f' property-name-to-class = {opt} '
t2 = 'list of name:class pairs for tagging named property with class, '
t3 = 'e.g. "target:scc_inventory_item_id, version:scc_check_version".'
logger.info(f'{t1}{t2}{t3}')
t1 = f' timestamp = {opt} '
t2 = 'timestamp for the Observations in ISO 8601 format, such as '
t3 = ' 2021-01-04T00:05:23+04:00 for example; if not specified then value for "Timestamp" key in the Xccdf '
t4 = ' result is used if present, otherwise current time is used.'
logger.info(f'{t1}{t2}{t3}{t4}')
#
logger.info('')
logger.info(
'Operation: A transformation is performed on one or more Xccdf input files to produce output in OSCAL '
+ 'partial results format.'
)
simulate(self)
¤
Provide a simulated outcome.
Source code in trestle/tasks/xccdf_result_to_oscal_ar.py
def simulate(self) -> TaskOutcome:
"""Provide a simulated outcome."""
self._simulate = True
return self._transform()
handler: python