Skip to content

xccdf

trestle.transforms.implementations.xccdf ¤

Facilitate OSCAL-XCCDF transformation.

Classes¤

RuleUse ¤

Represents one rule of XCCDF data.

Source code in trestle/transforms/implementations/xccdf.py
class RuleUse():
    """Represents one rule of XCCDF data."""

    def __init__(self, args: Dict[str, str]) -> None:
        """Initialize given specified args."""
        self.id_ = args['id_']
        self.target = args['target']
        self.target_type = args['target_type']
        self.host_name = args['host_name']
        self.benchmark_href = args['benchmark_href']
        self.benchmark_id = args['benchmark_id']
        self.scanner_name = args['scanner_name']
        self.scanner_version = args['scanner_version']
        self.idref = args['idref']
        self.version = args['version']
        self.time = args['time']
        self.result = args['result']
        self.severity = args['severity']
        self.weight = args['weight']

    @property
    def inventory_key(self):
        """Derive inventory key."""
        if self.host_name is None:
            # OpenScap 1.3.3
            rval = self.target + ':' + self.target_type
        else:
            # OpenScap 1.3.5
            rval = self.host_name + ':' + self.target_type
        return rval

    @property
    def ns(self):
        """Derive namespace."""
        return f'https://oscal-compass.github.io/compliance-trestle/schemas/oscal/ar/{self.scanner_name}'  # noqa: E231
Attributes¤
inventory_key property readonly ¤

Derive inventory key.

ns property readonly ¤

Derive namespace.

Methods¤
__init__(self, args) special ¤

Initialize given specified args.

Source code in trestle/transforms/implementations/xccdf.py
def __init__(self, args: Dict[str, str]) -> None:
    """Initialize given specified args."""
    self.id_ = args['id_']
    self.target = args['target']
    self.target_type = args['target_type']
    self.host_name = args['host_name']
    self.benchmark_href = args['benchmark_href']
    self.benchmark_id = args['benchmark_id']
    self.scanner_name = args['scanner_name']
    self.scanner_version = args['scanner_version']
    self.idref = args['idref']
    self.version = args['version']
    self.time = args['time']
    self.result = args['result']
    self.severity = args['severity']
    self.weight = args['weight']

XccdfResultToOscalARTransformer (ResultsTransformer) ¤

Interface for Xccdf transformer.

Source code in trestle/transforms/implementations/xccdf.py
class XccdfResultToOscalARTransformer(ResultsTransformer):
    """Interface for Xccdf transformer."""

    def __init__(self) -> None:
        """Initialize."""
        self._modes = {}

    @property
    def analysis(self) -> List[str]:
        """Analysis."""
        return self._results_factory.analysis

    @property
    def checking(self):
        """Return checking."""
        return self._modes.get('checking', False)

    @property
    def tags(self):
        """Return tags."""
        return self._tags

    def set_tags(self, tags: Dict[str, str]) -> None:
        """Keep tags info (property name to property class)."""
        self._tags = tags

    def set_title(self, title: str) -> None:
        """Keep title info."""
        self._title = title

    def set_description(self, description: str) -> None:
        """Keep description info."""
        self._description = description

    def set_type(self, type_: str) -> None:
        """Keep type info."""
        self._type = type_

    def set_modes(self, modes: Dict[str, Any]) -> None:
        """Keep modes info."""
        if modes is not None:
            self._modes = modes

    def transform(self, blob: str) -> Results:
        """Transform the blob into a Results.

        The expected blob is a string that is one of:
            - data from OpenShift Compliance Operator (json, yaml, xml)
            - data from Auditree XCCDF fetcher/check (json)
        """
        self._results_factory = _OscalResultsFactory(
            self._title, self._description, self._type, self.get_timestamp(), self.checking, self.tags
        )
        results = self._ingest_xml(blob)
        if results is None:
            results = self._ingest_json(blob)
        if results is None:
            results = self._ingest_yaml(blob)
        return results

    def _ingest_xml(self, blob: str) -> Optional[Results]:
        """Ingest xml data."""
        # ?xml data
        if blob.startswith('<?xml'):
            resource = blob
            self._results_factory.ingest_xml(resource)
        else:
            return None
        results = Results()
        results.__root__.append(self._results_factory.result)
        return results

    def _ingest_configmaps(self, jdata: dict) -> None:
        """Ingest configmaps."""
        items = jdata['items']
        for item in items:
            if 'data' in item.keys():
                data = item['data']
                if 'results' in data:
                    resource = item
                    self._results_factory.ingest(resource)

    def _ingest_auditree(self, jdata: dict) -> None:
        """Ingest auditree."""
        for key in jdata.keys():
            for group in jdata[key]:
                for cluster in jdata[key][group]:
                    if 'resources' in cluster:
                        for resource in cluster['resources']:
                            self._results_factory.ingest(resource)

    def _ingest_json(self, blob: str) -> Optional[Results]:
        """Ingest json data."""
        try:
            # ? configmaps or auditree data
            jdata = json.loads(blob)
            # https://docs.openshift.com/container-platform/3.7/rest_api/api/v1.ConfigMap.html#Get-api-v1-namespaces-namespace-configmaps-name
            if 'kind' in jdata.keys() and jdata['kind'] == 'ConfigMapList' and 'items' in jdata.keys():
                self._ingest_configmaps(jdata)
            # https://github.com/ComplianceAsCode/auditree-arboretum/blob/main/arboretum/kubernetes/fetchers/fetch_cluster_resource.py
            else:
                self._ingest_auditree(jdata)
        except json.decoder.JSONDecodeError:
            return None
        results = Results()
        results.__root__.append(self._results_factory.result)
        return results

    def _ingest_yaml(self, blob: str) -> Results:
        """Ingest yaml data."""
        try:
            # ? yaml data
            yaml = YAML(typ='safe')
            resource = yaml.load(blob)
            self._results_factory.ingest(resource)
        except Exception as e:
            raise RuntimeError(e)
        results = Results()
        results.__root__.append(self._results_factory.result)
        return results
Attributes¤
analysis: List[str] property readonly ¤

Analysis.

checking property readonly ¤

Return checking.

tags property readonly ¤

Return tags.

Methods¤
__init__(self) special ¤

Initialize.

Source code in trestle/transforms/implementations/xccdf.py
def __init__(self) -> None:
    """Initialize."""
    self._modes = {}
set_description(self, description) ¤

Keep description info.

Source code in trestle/transforms/implementations/xccdf.py
def set_description(self, description: str) -> None:
    """Keep description info."""
    self._description = description
set_modes(self, modes) ¤

Keep modes info.

Source code in trestle/transforms/implementations/xccdf.py
def set_modes(self, modes: Dict[str, Any]) -> None:
    """Keep modes info."""
    if modes is not None:
        self._modes = modes
set_tags(self, tags) ¤

Keep tags info (property name to property class).

Source code in trestle/transforms/implementations/xccdf.py
def set_tags(self, tags: Dict[str, str]) -> None:
    """Keep tags info (property name to property class)."""
    self._tags = tags
set_title(self, title) ¤

Keep title info.

Source code in trestle/transforms/implementations/xccdf.py
def set_title(self, title: str) -> None:
    """Keep title info."""
    self._title = title
set_type(self, type_) ¤

Keep type info.

Source code in trestle/transforms/implementations/xccdf.py
def set_type(self, type_: str) -> None:
    """Keep type info."""
    self._type = type_
transform(self, blob) ¤

Transform the blob into a Results.

The expected blob is a string that is one of: - data from OpenShift Compliance Operator (json, yaml, xml) - data from Auditree XCCDF fetcher/check (json)

Source code in trestle/transforms/implementations/xccdf.py
def transform(self, blob: str) -> Results:
    """Transform the blob into a Results.

    The expected blob is a string that is one of:
        - data from OpenShift Compliance Operator (json, yaml, xml)
        - data from Auditree XCCDF fetcher/check (json)
    """
    self._results_factory = _OscalResultsFactory(
        self._title, self._description, self._type, self.get_timestamp(), self.checking, self.tags
    )
    results = self._ingest_xml(blob)
    if results is None:
        results = self._ingest_json(blob)
    if results is None:
        results = self._ingest_yaml(blob)
    return results

XccdfTransformer (XccdfResultToOscalARTransformer) ¤

Legacy class name.

Source code in trestle/transforms/implementations/xccdf.py
class XccdfTransformer(XccdfResultToOscalARTransformer):
    """Legacy class name."""

handler: python