Skip to content

osco

trestle.transforms.implementations.osco ¤

Facilitate OSCAL-OSCO transformation.

logger ¤

Classes¤

OscalProfileToOscoProfileTransformer (FromOscalTransformer) ¤

Interface for Oscal Profile to Osco Profile transformer.

Source code in trestle/transforms/implementations/osco.py
class OscalProfileToOscoProfileTransformer(FromOscalTransformer):
    """Interface for Oscal Profile to Osco Profile transformer."""

    def __init__(
        self,
        extends='ocp4-cis-node',
        api_version='compliance.openshift.io/v1alpha1',
        kind='TailoredProfile',
        name='customized-tailored-profile',
        namespace='openshift-compliance',
    ) -> None:
        """Initialize."""
        self._extends = extends
        self._api_version = api_version
        self._kind = kind
        self._name = name
        self._namespace = namespace

    def transform(self, profile: Profile) -> str:
        """Transform the Profile into a OSCO yaml."""
        self._profile = profile
        self._osco_version = self._get_normalized_version('osco_version', '0.1.46')
        # set values
        set_values = self._get_set_values()
        # spec
        if self._osco_version < (0, 1, 40):
            # for versions prior to 0.1.40, exclude 'description'
            spec = {
                'extends': self._get_metadata_prop_value('base_profile_mnemonic', self._extends),
                'title': self._profile.metadata.title,
                'setValues': set_values,
            }
        else:
            # for versions 0.1.40 and beyond, include 'description'
            spec = {
                'description': self._get_metadata_prop_value('profile_mnemonic', self._name),
                'extends': self._get_metadata_prop_value('base_profile_mnemonic', self._extends),
                'title': self._profile.metadata.title,
                'setValues': set_values,
            }
        disable_rules = self._get_disable_rules()
        if disable_rules:
            spec['disableRules'] = disable_rules
        # yaml data
        ydata = {
            'apiVersion': self._api_version,
            'kind': self._kind,
            'metadata': {
                'name': self._get_metadata_prop_value('profile_mnemonic', self._name),
                'namespace': self._namespace,
            },
            'spec': spec,
        }
        return json.dumps(ydata)

    def _get_normalized_version(self, prop_name, prop_default) -> Tuple[int, int, int]:
        """Get normalized version.

        Normalize the "x.y.z" string value to an integer: 1,000,000*x + 1,000*y + z.
        """
        try:
            vparts = self._get_metadata_prop_value(prop_name, prop_default).split('.')
            normalized_version = (int(vparts[0]), int(vparts[1]), int(vparts[2]))
        except Exception:
            logger.warning(f'metadata prop name={prop_name} value error')
            vparts = prop_default.split('.')
            normalized_version = (int(vparts[0]), int(vparts[1]), int(vparts[2]))
        return normalized_version

    def _get_set_values(self) -> List[Dict]:
        """Extract set_paramater name/value pairs from profile."""
        set_values = []
        # for check versions prior to 0.1.59 include parameters
        # for later versions parameters should not be specified, caveat emptor
        if self._profile.modify is not None:
            for set_parameter in as_list(self._profile.modify.set_parameters):
                name = self._format_osco_rule_name(set_parameter.param_id)
                parameter_value = set_parameter.values[0]
                value = parameter_value
                rationale = self._get_rationale_for_set_value()
                set_value = {'name': name, 'value': value, 'rationale': rationale}
                set_values.append(set_value)
        return set_values

    def _format_osco_rule_name(self, name: str) -> str:
        """Format for OSCO.

        1. remove prefix xccdf_org.ssgproject.content_rule_
        2. change underscores to dashes
        3. add prefix ocp4-
        """
        normalized_name = name.replace('xccdf_org.ssgproject.content_rule_', '').replace('_', '-')
        if not normalized_name.startswith('ocp4-'):
            normalized_name = f'ocp4-{normalized_name}'
        return normalized_name

    def _get_metadata_prop_value(self, name: str, default_: str) -> str:
        """Extract metadata prop or else default if not present."""
        for prop in as_list(self._profile.metadata.props):
            if prop.name == name:
                return prop.value
        logger.info(f'using default: {name} = {default_}')
        return default_

    def _get_disable_rules(self) -> List[str]:
        """Extract disabled rules."""
        value = []
        for _import in as_list(self._profile.imports):
            for control in as_list(_import.exclude_controls):
                self._add_disable_rules_for_control(value, control)
        return value

    def _add_disable_rules_for_control(self, value, control):
        """Extract disabled rules for control."""
        for with_id in as_list(control.with_ids):
            name = self._format_osco_rule_name(with_id.__root__)
            rationale = self._get_rationale_for_disable_rule()
            entry = {'name': name, 'rationale': rationale}
            value.append(entry)

    def _get_rationale_for_set_value(self) -> str:
        """Rationale for set value."""
        return 'not determinable from specification'

    def _get_rationale_for_disable_rule(self) -> str:
        """Rationale for disable rule."""
        return 'not determinable from specification'
Methods¤
__init__(self, extends='ocp4-cis-node', api_version='compliance.openshift.io/v1alpha1', kind='TailoredProfile', name='customized-tailored-profile', namespace='openshift-compliance') special ¤

Initialize.

Source code in trestle/transforms/implementations/osco.py
def __init__(
    self,
    extends='ocp4-cis-node',
    api_version='compliance.openshift.io/v1alpha1',
    kind='TailoredProfile',
    name='customized-tailored-profile',
    namespace='openshift-compliance',
) -> None:
    """Initialize."""
    self._extends = extends
    self._api_version = api_version
    self._kind = kind
    self._name = name
    self._namespace = namespace
transform(self, profile) ¤

Transform the Profile into a OSCO yaml.

Source code in trestle/transforms/implementations/osco.py
def transform(self, profile: Profile) -> str:
    """Transform the Profile into a OSCO yaml."""
    self._profile = profile
    self._osco_version = self._get_normalized_version('osco_version', '0.1.46')
    # set values
    set_values = self._get_set_values()
    # spec
    if self._osco_version < (0, 1, 40):
        # for versions prior to 0.1.40, exclude 'description'
        spec = {
            'extends': self._get_metadata_prop_value('base_profile_mnemonic', self._extends),
            'title': self._profile.metadata.title,
            'setValues': set_values,
        }
    else:
        # for versions 0.1.40 and beyond, include 'description'
        spec = {
            'description': self._get_metadata_prop_value('profile_mnemonic', self._name),
            'extends': self._get_metadata_prop_value('base_profile_mnemonic', self._extends),
            'title': self._profile.metadata.title,
            'setValues': set_values,
        }
    disable_rules = self._get_disable_rules()
    if disable_rules:
        spec['disableRules'] = disable_rules
    # yaml data
    ydata = {
        'apiVersion': self._api_version,
        'kind': self._kind,
        'metadata': {
            'name': self._get_metadata_prop_value('profile_mnemonic', self._name),
            'namespace': self._namespace,
        },
        'spec': spec,
    }
    return json.dumps(ydata)

OscoResultToOscalARTransformer (ResultsTransformer) ¤

Interface for Osco transformer.

Source code in trestle/transforms/implementations/osco.py
class OscoResultToOscalARTransformer(ResultsTransformer):
    """Interface for Osco transformer."""

    def __init__(self) -> None:
        """Initialize."""
        self._modes = {}

    @property
    def analysis(self) -> List[str]:
        """Analysis."""
        return self._results_factory.analysis

    @property
    def checking(self):
        """Return checking."""
        return self._modes.get('checking', False)

    def set_modes(self, modes: Dict[str, Any]) -> None:
        """Keep modes info."""
        if modes is not None:
            self._modes = modes

    def transform(self, blob: str) -> Results:
        """Transform the blob into a Results.

        The expected blob is a string that is one of:
            - data from OpenShift Compliance Operator (json, yaml, xml)
            - data from Auditree OSCO fetcher/check (json)
        """
        results = None
        self._results_factory = _OscalResultsFactory(self.get_timestamp(), self.checking)
        if results is None:
            results = self._ingest_xml(blob)
        if results is None:
            results = self._ingest_json(blob)
        if results is None:
            results = self._ingest_yaml(blob)
        return results

    def _ingest_xml(self, blob: str) -> Results:
        """Ingest xml data."""
        # ?xml data
        if blob.startswith('<?xml'):
            resource = blob
            self._results_factory.ingest_xml(resource)
        else:
            return None
        results = Results()
        results.__root__.append(self._results_factory.result)
        return results

    def _ingest_json(self, blob: str) -> Results:
        """Ingest json data."""
        try:
            # ? configmaps or auditree data
            jdata = json.loads(blob)
            # https://docs.openshift.com/container-platform/3.7/rest_api/api/v1.ConfigMap.html#Get-api-v1-namespaces-namespace-configmaps-name
            if 'kind' in jdata.keys() and jdata['kind'] == 'ConfigMapList' and 'items' in jdata.keys():
                items = jdata['items']
                for item in items:
                    if 'data' in item.keys():
                        data = item['data']
                        if 'results' in data:
                            resource = item
                            self._results_factory.ingest(resource)
            # https://github.com/ComplianceAsCode/auditree-arboretum/blob/main/arboretum/kubernetes/fetchers/fetch_cluster_resource.py
            else:
                for key in jdata.keys():
                    for group in jdata[key]:
                        for cluster in jdata[key][group]:
                            if 'resources' in cluster:
                                for resource in cluster['resources']:
                                    self._results_factory.ingest(resource)
        except json.decoder.JSONDecodeError:
            return None
        results = Results()
        results.__root__.append(self._results_factory.result)
        return results

    def _ingest_yaml(self, blob: str) -> Results:
        """Ingest yaml data."""
        try:
            # ? yaml data
            yaml = YAML(typ='safe')
            resource = yaml.load(blob)
            self._results_factory.ingest(resource)
        except Exception as e:
            raise e
        results = Results()
        results.__root__.append(self._results_factory.result)
        return results
Attributes¤
analysis: List[str] property readonly ¤

Analysis.

checking property readonly ¤

Return checking.

Methods¤
__init__(self) special ¤

Initialize.

Source code in trestle/transforms/implementations/osco.py
def __init__(self) -> None:
    """Initialize."""
    self._modes = {}
set_modes(self, modes) ¤

Keep modes info.

Source code in trestle/transforms/implementations/osco.py
def set_modes(self, modes: Dict[str, Any]) -> None:
    """Keep modes info."""
    if modes is not None:
        self._modes = modes
transform(self, blob) ¤

Transform the blob into a Results.

The expected blob is a string that is one of: - data from OpenShift Compliance Operator (json, yaml, xml) - data from Auditree OSCO fetcher/check (json)

Source code in trestle/transforms/implementations/osco.py
def transform(self, blob: str) -> Results:
    """Transform the blob into a Results.

    The expected blob is a string that is one of:
        - data from OpenShift Compliance Operator (json, yaml, xml)
        - data from Auditree OSCO fetcher/check (json)
    """
    results = None
    self._results_factory = _OscalResultsFactory(self.get_timestamp(), self.checking)
    if results is None:
        results = self._ingest_xml(blob)
    if results is None:
        results = self._ingest_json(blob)
    if results is None:
        results = self._ingest_yaml(blob)
    return results

OscoTransformer (OscoResultToOscalARTransformer) ¤

Legacy class name.

Source code in trestle/transforms/implementations/osco.py
class OscoTransformer(OscoResultToOscalARTransformer):
    """Legacy class name."""

RuleUse ¤

Represents one rule of OSCO data.

Source code in trestle/transforms/implementations/osco.py
class RuleUse():
    """Represents one rule of OSCO data."""

    def __init__(self, args: Dict[str, str]) -> None:
        """Initialize given specified args."""
        self.id_ = args['id_']
        self.target = args['target']
        self.target_type = args['target_type']
        self.host_name = args['host_name']
        self.benchmark_href = args['benchmark_href']
        self.benchmark_id = args['benchmark_id']
        self.scanner_name = args['scanner_name']
        self.scanner_version = args['scanner_version']
        self.idref = args['idref']
        self.version = args['version']
        self.time = args['time']
        self.result = args['result']
        self.severity = args['severity']
        self.weight = args['weight']

    @property
    def inventory_key(self):
        """Derive inventory key."""
        if self.host_name is None:
            # OpenScap 1.3.3
            rval = self.target + ':' + self.target_type
        else:
            # OpenScap 1.3.5
            rval = self.host_name + ':' + self.target_type
        return rval
Attributes¤
inventory_key property readonly ¤

Derive inventory key.

Methods¤
__init__(self, args) special ¤

Initialize given specified args.

Source code in trestle/transforms/implementations/osco.py
def __init__(self, args: Dict[str, str]) -> None:
    """Initialize given specified args."""
    self.id_ = args['id_']
    self.target = args['target']
    self.target_type = args['target_type']
    self.host_name = args['host_name']
    self.benchmark_href = args['benchmark_href']
    self.benchmark_id = args['benchmark_id']
    self.scanner_name = args['scanner_name']
    self.scanner_version = args['scanner_version']
    self.idref = args['idref']
    self.version = args['version']
    self.time = args['time']
    self.result = args['result']
    self.severity = args['severity']
    self.weight = args['weight']

handler: python