osco
trestle.transforms.implementations.osco
¤
Facilitate OSCAL-OSCO transformation.
logger
¤
Classes¤
OscalProfileToOscoProfileTransformer (FromOscalTransformer)
¤
Interface for Oscal Profile to Osco Profile transformer.
Source code in trestle/transforms/implementations/osco.py
class OscalProfileToOscoProfileTransformer(FromOscalTransformer):
"""Interface for Oscal Profile to Osco Profile transformer."""
def __init__(
self,
extends='ocp4-cis-node',
api_version='compliance.openshift.io/v1alpha1',
kind='TailoredProfile',
name='customized-tailored-profile',
namespace='openshift-compliance',
) -> None:
"""Initialize."""
self._extends = extends
self._api_version = api_version
self._kind = kind
self._name = name
self._namespace = namespace
def transform(self, profile: Profile) -> str:
"""Transform the Profile into a OSCO yaml."""
self._profile = profile
self._osco_version = self._get_normalized_version('osco_version', '0.1.46')
# set values
set_values = self._get_set_values()
# spec
if self._osco_version < (0, 1, 40):
# for versions prior to 0.1.40, exclude 'description'
spec = {
'extends': self._get_metadata_prop_value('base_profile_mnemonic', self._extends),
'title': self._profile.metadata.title,
'setValues': set_values,
}
else:
# for versions 0.1.40 and beyond, include 'description'
spec = {
'description': self._get_metadata_prop_value('profile_mnemonic', self._name),
'extends': self._get_metadata_prop_value('base_profile_mnemonic', self._extends),
'title': self._profile.metadata.title,
'setValues': set_values,
}
disable_rules = self._get_disable_rules()
if disable_rules:
spec['disableRules'] = disable_rules
# yaml data
ydata = {
'apiVersion': self._api_version,
'kind': self._kind,
'metadata': {
'name': self._get_metadata_prop_value('profile_mnemonic', self._name),
'namespace': self._namespace,
},
'spec': spec,
}
return json.dumps(ydata)
def _get_normalized_version(self, prop_name, prop_default) -> Tuple[int, int, int]:
"""Get normalized version.
Normalize the "x.y.z" string value to an integer: 1,000,000*x + 1,000*y + z.
"""
try:
vparts = self._get_metadata_prop_value(prop_name, prop_default).split('.')
normalized_version = (int(vparts[0]), int(vparts[1]), int(vparts[2]))
except Exception:
logger.warning(f'metadata prop name={prop_name} value error')
vparts = prop_default.split('.')
normalized_version = (int(vparts[0]), int(vparts[1]), int(vparts[2]))
return normalized_version
def _get_set_values(self) -> List[Dict]:
"""Extract set_paramater name/value pairs from profile."""
set_values = []
# for check versions prior to 0.1.59 include parameters
# for later versions parameters should not be specified, caveat emptor
if self._profile.modify is not None:
for set_parameter in as_list(self._profile.modify.set_parameters):
name = self._format_osco_rule_name(set_parameter.param_id)
parameter_value = set_parameter.values[0]
value = parameter_value
rationale = self._get_rationale_for_set_value()
set_value = {'name': name, 'value': value, 'rationale': rationale}
set_values.append(set_value)
return set_values
def _format_osco_rule_name(self, name: str) -> str:
"""Format for OSCO.
1. remove prefix xccdf_org.ssgproject.content_rule_
2. change underscores to dashes
3. add prefix ocp4-
"""
normalized_name = name.replace('xccdf_org.ssgproject.content_rule_', '').replace('_', '-')
if not normalized_name.startswith('ocp4-'):
normalized_name = f'ocp4-{normalized_name}'
return normalized_name
def _get_metadata_prop_value(self, name: str, default_: str) -> str:
"""Extract metadata prop or else default if not present."""
for prop in as_list(self._profile.metadata.props):
if prop.name == name:
return prop.value
logger.info(f'using default: {name} = {default_}')
return default_
def _get_disable_rules(self) -> List[str]:
"""Extract disabled rules."""
value = []
for _import in as_list(self._profile.imports):
for control in as_list(_import.exclude_controls):
self._add_disable_rules_for_control(value, control)
return value
def _add_disable_rules_for_control(self, value, control):
"""Extract disabled rules for control."""
for with_id in as_list(control.with_ids):
name = self._format_osco_rule_name(with_id.__root__)
rationale = self._get_rationale_for_disable_rule()
entry = {'name': name, 'rationale': rationale}
value.append(entry)
def _get_rationale_for_set_value(self) -> str:
"""Rationale for set value."""
return 'not determinable from specification'
def _get_rationale_for_disable_rule(self) -> str:
"""Rationale for disable rule."""
return 'not determinable from specification'
Methods¤
__init__(self, extends='ocp4-cis-node', api_version='compliance.openshift.io/v1alpha1', kind='TailoredProfile', name='customized-tailored-profile', namespace='openshift-compliance')
special
¤
Initialize.
Source code in trestle/transforms/implementations/osco.py
def __init__(
self,
extends='ocp4-cis-node',
api_version='compliance.openshift.io/v1alpha1',
kind='TailoredProfile',
name='customized-tailored-profile',
namespace='openshift-compliance',
) -> None:
"""Initialize."""
self._extends = extends
self._api_version = api_version
self._kind = kind
self._name = name
self._namespace = namespace
transform(self, profile)
¤
Transform the Profile into a OSCO yaml.
Source code in trestle/transforms/implementations/osco.py
def transform(self, profile: Profile) -> str:
"""Transform the Profile into a OSCO yaml."""
self._profile = profile
self._osco_version = self._get_normalized_version('osco_version', '0.1.46')
# set values
set_values = self._get_set_values()
# spec
if self._osco_version < (0, 1, 40):
# for versions prior to 0.1.40, exclude 'description'
spec = {
'extends': self._get_metadata_prop_value('base_profile_mnemonic', self._extends),
'title': self._profile.metadata.title,
'setValues': set_values,
}
else:
# for versions 0.1.40 and beyond, include 'description'
spec = {
'description': self._get_metadata_prop_value('profile_mnemonic', self._name),
'extends': self._get_metadata_prop_value('base_profile_mnemonic', self._extends),
'title': self._profile.metadata.title,
'setValues': set_values,
}
disable_rules = self._get_disable_rules()
if disable_rules:
spec['disableRules'] = disable_rules
# yaml data
ydata = {
'apiVersion': self._api_version,
'kind': self._kind,
'metadata': {
'name': self._get_metadata_prop_value('profile_mnemonic', self._name),
'namespace': self._namespace,
},
'spec': spec,
}
return json.dumps(ydata)
OscoResultToOscalARTransformer (ResultsTransformer)
¤
Interface for Osco transformer.
Source code in trestle/transforms/implementations/osco.py
class OscoResultToOscalARTransformer(ResultsTransformer):
"""Interface for Osco transformer."""
def __init__(self) -> None:
"""Initialize."""
self._modes = {}
@property
def analysis(self) -> List[str]:
"""Analysis."""
return self._results_factory.analysis
@property
def checking(self):
"""Return checking."""
return self._modes.get('checking', False)
def set_modes(self, modes: Dict[str, Any]) -> None:
"""Keep modes info."""
if modes is not None:
self._modes = modes
def transform(self, blob: str) -> Results:
"""Transform the blob into a Results.
The expected blob is a string that is one of:
- data from OpenShift Compliance Operator (json, yaml, xml)
- data from Auditree OSCO fetcher/check (json)
"""
results = None
self._results_factory = _OscalResultsFactory(self.get_timestamp(), self.checking)
if results is None:
results = self._ingest_xml(blob)
if results is None:
results = self._ingest_json(blob)
if results is None:
results = self._ingest_yaml(blob)
return results
def _ingest_xml(self, blob: str) -> Results:
"""Ingest xml data."""
# ?xml data
if blob.startswith('<?xml'):
resource = blob
self._results_factory.ingest_xml(resource)
else:
return None
results = Results()
results.__root__.append(self._results_factory.result)
return results
def _ingest_json(self, blob: str) -> Results:
"""Ingest json data."""
try:
# ? configmaps or auditree data
jdata = json.loads(blob)
# https://docs.openshift.com/container-platform/3.7/rest_api/api/v1.ConfigMap.html#Get-api-v1-namespaces-namespace-configmaps-name
if 'kind' in jdata.keys() and jdata['kind'] == 'ConfigMapList' and 'items' in jdata.keys():
items = jdata['items']
for item in items:
if 'data' in item.keys():
data = item['data']
if 'results' in data:
resource = item
self._results_factory.ingest(resource)
# https://github.com/ComplianceAsCode/auditree-arboretum/blob/main/arboretum/kubernetes/fetchers/fetch_cluster_resource.py
else:
for key in jdata.keys():
for group in jdata[key]:
for cluster in jdata[key][group]:
if 'resources' in cluster:
for resource in cluster['resources']:
self._results_factory.ingest(resource)
except json.decoder.JSONDecodeError:
return None
results = Results()
results.__root__.append(self._results_factory.result)
return results
def _ingest_yaml(self, blob: str) -> Results:
"""Ingest yaml data."""
try:
# ? yaml data
yaml = YAML(typ='safe')
resource = yaml.load(blob)
self._results_factory.ingest(resource)
except Exception as e:
raise e
results = Results()
results.__root__.append(self._results_factory.result)
return results
Attributes¤
analysis: List[str]
property
readonly
¤
Analysis.
checking
property
readonly
¤
Return checking.
Methods¤
__init__(self)
special
¤
Initialize.
Source code in trestle/transforms/implementations/osco.py
def __init__(self) -> None:
"""Initialize."""
self._modes = {}
set_modes(self, modes)
¤
Keep modes info.
Source code in trestle/transforms/implementations/osco.py
def set_modes(self, modes: Dict[str, Any]) -> None:
"""Keep modes info."""
if modes is not None:
self._modes = modes
transform(self, blob)
¤
Transform the blob into a Results.
The expected blob is a string that is one of: - data from OpenShift Compliance Operator (json, yaml, xml) - data from Auditree OSCO fetcher/check (json)
Source code in trestle/transforms/implementations/osco.py
def transform(self, blob: str) -> Results:
"""Transform the blob into a Results.
The expected blob is a string that is one of:
- data from OpenShift Compliance Operator (json, yaml, xml)
- data from Auditree OSCO fetcher/check (json)
"""
results = None
self._results_factory = _OscalResultsFactory(self.get_timestamp(), self.checking)
if results is None:
results = self._ingest_xml(blob)
if results is None:
results = self._ingest_json(blob)
if results is None:
results = self._ingest_yaml(blob)
return results
OscoTransformer (OscoResultToOscalARTransformer)
¤
Legacy class name.
Source code in trestle/transforms/implementations/osco.py
class OscoTransformer(OscoResultToOscalARTransformer):
"""Legacy class name."""
RuleUse
¤
Represents one rule of OSCO data.
Source code in trestle/transforms/implementations/osco.py
class RuleUse():
"""Represents one rule of OSCO data."""
def __init__(self, args: Dict[str, str]) -> None:
"""Initialize given specified args."""
self.id_ = args['id_']
self.target = args['target']
self.target_type = args['target_type']
self.host_name = args['host_name']
self.benchmark_href = args['benchmark_href']
self.benchmark_id = args['benchmark_id']
self.scanner_name = args['scanner_name']
self.scanner_version = args['scanner_version']
self.idref = args['idref']
self.version = args['version']
self.time = args['time']
self.result = args['result']
self.severity = args['severity']
self.weight = args['weight']
@property
def inventory_key(self):
"""Derive inventory key."""
if self.host_name is None:
# OpenScap 1.3.3
rval = self.target + ':' + self.target_type
else:
# OpenScap 1.3.5
rval = self.host_name + ':' + self.target_type
return rval
Attributes¤
inventory_key
property
readonly
¤
Derive inventory key.
Methods¤
__init__(self, args)
special
¤
Initialize given specified args.
Source code in trestle/transforms/implementations/osco.py
def __init__(self, args: Dict[str, str]) -> None:
"""Initialize given specified args."""
self.id_ = args['id_']
self.target = args['target']
self.target_type = args['target_type']
self.host_name = args['host_name']
self.benchmark_href = args['benchmark_href']
self.benchmark_id = args['benchmark_id']
self.scanner_name = args['scanner_name']
self.scanner_version = args['scanner_version']
self.idref = args['idref']
self.version = args['version']
self.time = args['time']
self.result = args['result']
self.severity = args['severity']
self.weight = args['weight']
handler: python