Skip to content

trestle.transforms.implementations.xccdf

trestle.transforms.implementations.xccdf ¤

Facilitate OSCAL-XCCDF transformation.

Classes¤

RuleUse ¤

Represents one rule of XCCDF data.

Source code in trestle/transforms/implementations/xccdf.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
class RuleUse():
    """Represents one rule of XCCDF data."""

    def __init__(self, args: Dict[str, str]) -> None:
        """Initialize given specified args."""
        self.id_ = args['id_']
        self.target = args['target']
        self.target_type = args['target_type']
        self.host_name = args['host_name']
        self.benchmark_href = args['benchmark_href']
        self.benchmark_id = args['benchmark_id']
        self.scanner_name = args['scanner_name']
        self.scanner_version = args['scanner_version']
        self.idref = args['idref']
        self.version = args['version']
        self.time = args['time']
        self.result = args['result']
        self.severity = args['severity']
        self.weight = args['weight']

    @property
    def inventory_key(self):
        """Derive inventory key."""
        if self.host_name is None:
            # OpenScap 1.3.3
            rval = self.target + ':' + self.target_type
        else:
            # OpenScap 1.3.5
            rval = self.host_name + ':' + self.target_type
        return rval

    @property
    def ns(self):
        """Derive namespace."""
        return f'https://oscal-compass.github.io/compliance-trestle/schemas/oscal/ar/{self.scanner_name}'  # noqa: E231
Attributes¤
benchmark_href = args['benchmark_href'] instance-attribute ¤
benchmark_id = args['benchmark_id'] instance-attribute ¤
host_name = args['host_name'] instance-attribute ¤
id_ = args['id_'] instance-attribute ¤
idref = args['idref'] instance-attribute ¤
inventory_key property ¤

Derive inventory key.

ns property ¤

Derive namespace.

result = args['result'] instance-attribute ¤
scanner_name = args['scanner_name'] instance-attribute ¤
scanner_version = args['scanner_version'] instance-attribute ¤
severity = args['severity'] instance-attribute ¤
target = args['target'] instance-attribute ¤
target_type = args['target_type'] instance-attribute ¤
time = args['time'] instance-attribute ¤
version = args['version'] instance-attribute ¤
weight = args['weight'] instance-attribute ¤
Functions¤
__init__(args) ¤

Initialize given specified args.

Source code in trestle/transforms/implementations/xccdf.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def __init__(self, args: Dict[str, str]) -> None:
    """Initialize given specified args."""
    self.id_ = args['id_']
    self.target = args['target']
    self.target_type = args['target_type']
    self.host_name = args['host_name']
    self.benchmark_href = args['benchmark_href']
    self.benchmark_id = args['benchmark_id']
    self.scanner_name = args['scanner_name']
    self.scanner_version = args['scanner_version']
    self.idref = args['idref']
    self.version = args['version']
    self.time = args['time']
    self.result = args['result']
    self.severity = args['severity']
    self.weight = args['weight']

XccdfResultToOscalARTransformer ¤

Bases: ResultsTransformer

Interface for Xccdf transformer.

Source code in trestle/transforms/implementations/xccdf.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
class XccdfResultToOscalARTransformer(ResultsTransformer):
    """Interface for Xccdf transformer."""

    def __init__(self) -> None:
        """Initialize."""
        self._modes = {}

    @property
    def analysis(self) -> List[str]:
        """Analysis."""
        return self._results_factory.analysis

    @property
    def checking(self):
        """Return checking."""
        return self._modes.get('checking', False)

    @property
    def tags(self):
        """Return tags."""
        return self._tags

    def set_tags(self, tags: Dict[str, str]) -> None:
        """Keep tags info (property name to property class)."""
        self._tags = tags

    def set_title(self, title: str) -> None:
        """Keep title info."""
        self._title = title

    def set_description(self, description: str) -> None:
        """Keep description info."""
        self._description = description

    def set_type(self, type_: str) -> None:
        """Keep type info."""
        self._type = type_

    def set_modes(self, modes: Dict[str, Any]) -> None:
        """Keep modes info."""
        if modes is not None:
            self._modes = modes

    def transform(self, blob: str) -> Results:
        """Transform the blob into a Results.

        The expected blob is a string that is one of:
            - data from OpenShift Compliance Operator (json, yaml, xml)
            - data from Auditree XCCDF fetcher/check (json)
        """
        self._results_factory = _OscalResultsFactory(
            self._title, self._description, self._type, self.get_timestamp(), self.checking, self.tags
        )
        results = self._ingest_xml(blob)
        if results is None:
            results = self._ingest_json(blob)
        if results is None:
            results = self._ingest_yaml(blob)
        return results

    def _ingest_xml(self, blob: str) -> Optional[Results]:
        """Ingest xml data."""
        # ?xml data
        if blob.startswith('<?xml'):
            resource = blob
            self._results_factory.ingest_xml(resource)
        else:
            return None
        results = Results()
        results.__root__.append(self._results_factory.result)
        return results

    def _ingest_configmaps(self, jdata: dict) -> None:
        """Ingest configmaps."""
        items = jdata['items']
        for item in items:
            if 'data' in item.keys():
                data = item['data']
                if 'results' in data:
                    resource = item
                    self._results_factory.ingest(resource)

    def _ingest_auditree(self, jdata: dict) -> None:
        """Ingest auditree."""
        for key in jdata.keys():
            for group in jdata[key]:
                for cluster in jdata[key][group]:
                    if 'resources' in cluster:
                        for resource in cluster['resources']:
                            self._results_factory.ingest(resource)

    def _ingest_json(self, blob: str) -> Optional[Results]:
        """Ingest json data."""
        try:
            # ? configmaps or auditree data
            jdata = json.loads(blob)
            # https://docs.openshift.com/container-platform/3.7/rest_api/api/v1.ConfigMap.html#Get-api-v1-namespaces-namespace-configmaps-name
            if 'kind' in jdata.keys() and jdata['kind'] == 'ConfigMapList' and 'items' in jdata.keys():
                self._ingest_configmaps(jdata)
            # https://github.com/ComplianceAsCode/auditree-arboretum/blob/main/arboretum/kubernetes/fetchers/fetch_cluster_resource.py
            else:
                self._ingest_auditree(jdata)
        except json.decoder.JSONDecodeError:
            return None
        results = Results()
        results.__root__.append(self._results_factory.result)
        return results

    def _ingest_yaml(self, blob: str) -> Results:
        """Ingest yaml data."""
        try:
            # ? yaml data
            yaml = YAML(typ='safe')
            resource = yaml.load(blob)
            self._results_factory.ingest(resource)
        except Exception as e:
            raise RuntimeError(e)
        results = Results()
        results.__root__.append(self._results_factory.result)
        return results
Attributes¤
analysis property ¤

Analysis.

checking property ¤

Return checking.

tags property ¤

Return tags.

Functions¤
__init__() ¤

Initialize.

Source code in trestle/transforms/implementations/xccdf.py
53
54
55
def __init__(self) -> None:
    """Initialize."""
    self._modes = {}
set_description(description) ¤

Keep description info.

Source code in trestle/transforms/implementations/xccdf.py
80
81
82
def set_description(self, description: str) -> None:
    """Keep description info."""
    self._description = description
set_modes(modes) ¤

Keep modes info.

Source code in trestle/transforms/implementations/xccdf.py
88
89
90
91
def set_modes(self, modes: Dict[str, Any]) -> None:
    """Keep modes info."""
    if modes is not None:
        self._modes = modes
set_tags(tags) ¤

Keep tags info (property name to property class).

Source code in trestle/transforms/implementations/xccdf.py
72
73
74
def set_tags(self, tags: Dict[str, str]) -> None:
    """Keep tags info (property name to property class)."""
    self._tags = tags
set_title(title) ¤

Keep title info.

Source code in trestle/transforms/implementations/xccdf.py
76
77
78
def set_title(self, title: str) -> None:
    """Keep title info."""
    self._title = title
set_type(type_) ¤

Keep type info.

Source code in trestle/transforms/implementations/xccdf.py
84
85
86
def set_type(self, type_: str) -> None:
    """Keep type info."""
    self._type = type_
transform(blob) ¤

Transform the blob into a Results.

The expected blob is a string that is one of
  • data from OpenShift Compliance Operator (json, yaml, xml)
  • data from Auditree XCCDF fetcher/check (json)
Source code in trestle/transforms/implementations/xccdf.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def transform(self, blob: str) -> Results:
    """Transform the blob into a Results.

    The expected blob is a string that is one of:
        - data from OpenShift Compliance Operator (json, yaml, xml)
        - data from Auditree XCCDF fetcher/check (json)
    """
    self._results_factory = _OscalResultsFactory(
        self._title, self._description, self._type, self.get_timestamp(), self.checking, self.tags
    )
    results = self._ingest_xml(blob)
    if results is None:
        results = self._ingest_json(blob)
    if results is None:
        results = self._ingest_yaml(blob)
    return results

XccdfTransformer ¤

Bases: XccdfResultToOscalARTransformer

Legacy class name.

Source code in trestle/transforms/implementations/xccdf.py
172
173
class XccdfTransformer(XccdfResultToOscalARTransformer):
    """Legacy class name."""

handler: python