Skip to content

trestle.transforms.implementations.osco

trestle.transforms.implementations.osco ¤

Facilitate OSCAL-OSCO transformation.

Attributes¤

logger = logging.getLogger(__name__) module-attribute ¤

Classes¤

OscalProfileToOscoProfileTransformer ¤

Bases: FromOscalTransformer

Interface for Oscal Profile to Osco Profile transformer.

Source code in trestle/transforms/implementations/osco.py
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
class OscalProfileToOscoProfileTransformer(FromOscalTransformer):
    """Interface for Oscal Profile to Osco Profile transformer."""

    def __init__(
        self,
        extends='ocp4-cis-node',
        api_version='compliance.openshift.io/v1alpha1',
        kind='TailoredProfile',
        name='customized-tailored-profile',
        namespace='openshift-compliance',
    ) -> None:
        """Initialize."""
        self._extends = extends
        self._api_version = api_version
        self._kind = kind
        self._name = name
        self._namespace = namespace

    def transform(self, profile: Profile) -> str:
        """Transform the Profile into a OSCO yaml."""
        self._profile = profile
        self._osco_version = self._get_normalized_version('osco_version', '0.1.46')
        # set values
        set_values = self._get_set_values()
        # spec
        if self._osco_version < (0, 1, 40):
            # for versions prior to 0.1.40, exclude 'description'
            spec = {
                'extends': self._get_metadata_prop_value('base_profile_mnemonic', self._extends),
                'title': self._profile.metadata.title,
                'setValues': set_values,
            }
        else:
            # for versions 0.1.40 and beyond, include 'description'
            spec = {
                'description': self._get_metadata_prop_value('profile_mnemonic', self._name),
                'extends': self._get_metadata_prop_value('base_profile_mnemonic', self._extends),
                'title': self._profile.metadata.title,
                'setValues': set_values,
            }
        disable_rules = self._get_disable_rules()
        if disable_rules:
            spec['disableRules'] = disable_rules
        # yaml data
        ydata = {
            'apiVersion': self._api_version,
            'kind': self._kind,
            'metadata': {
                'name': self._get_metadata_prop_value('profile_mnemonic', self._name),
                'namespace': self._namespace,
            },
            'spec': spec,
        }
        return json.dumps(ydata)

    def _get_normalized_version(self, prop_name, prop_default) -> Tuple[int, int, int]:
        """Get normalized version.

        Normalize the "x.y.z" string value to an integer: 1,000,000*x + 1,000*y + z.
        """
        try:
            vparts = self._get_metadata_prop_value(prop_name, prop_default).split('.')
            normalized_version = (int(vparts[0]), int(vparts[1]), int(vparts[2]))
        except Exception:
            logger.warning(f'metadata prop name={prop_name} value error')
            vparts = prop_default.split('.')
            normalized_version = (int(vparts[0]), int(vparts[1]), int(vparts[2]))
        return normalized_version

    def _get_set_values(self) -> List[Dict]:
        """Extract set_paramater name/value pairs from profile."""
        set_values = []
        # for check versions prior to 0.1.59 include parameters
        # for later versions parameters should not be specified, caveat emptor
        if self._profile.modify is not None:
            for set_parameter in as_list(self._profile.modify.set_parameters):
                name = self._format_osco_rule_name(set_parameter.param_id)
                parameter_value = set_parameter.values[0]
                value = parameter_value
                rationale = self._get_rationale_for_set_value()
                set_value = {'name': name, 'value': value, 'rationale': rationale}
                set_values.append(set_value)
        return set_values

    def _format_osco_rule_name(self, name: str) -> str:
        """Format for OSCO.

        1. remove prefix xccdf_org.ssgproject.content_rule_
        2. change underscores to dashes
        3. add prefix ocp4-
        """
        normalized_name = name.replace('xccdf_org.ssgproject.content_rule_', '').replace('_', '-')
        if not normalized_name.startswith('ocp4-'):
            normalized_name = f'ocp4-{normalized_name}'
        return normalized_name

    def _get_metadata_prop_value(self, name: str, default_: str) -> str:
        """Extract metadata prop or else default if not present."""
        for prop in as_list(self._profile.metadata.props):
            if prop.name == name:
                return prop.value
        logger.info(f'using default: {name} = {default_}')
        return default_

    def _get_disable_rules(self) -> List[str]:
        """Extract disabled rules."""
        value = []
        for _import in as_list(self._profile.imports):
            for control in as_list(_import.exclude_controls):
                self._add_disable_rules_for_control(value, control)
        return value

    def _add_disable_rules_for_control(self, value, control):
        """Extract disabled rules for control."""
        for with_id in as_list(control.with_ids):
            name = self._format_osco_rule_name(with_id.__root__)
            rationale = self._get_rationale_for_disable_rule()
            entry = {'name': name, 'rationale': rationale}
            value.append(entry)

    def _get_rationale_for_set_value(self) -> str:
        """Rationale for set value."""
        return 'not determinable from specification'

    def _get_rationale_for_disable_rule(self) -> str:
        """Rationale for disable rule."""
        return 'not determinable from specification'
Functions¤
__init__(extends='ocp4-cis-node', api_version='compliance.openshift.io/v1alpha1', kind='TailoredProfile', name='customized-tailored-profile', namespace='openshift-compliance') ¤

Initialize.

Source code in trestle/transforms/implementations/osco.py
590
591
592
593
594
595
596
597
598
599
600
601
602
603
def __init__(
    self,
    extends='ocp4-cis-node',
    api_version='compliance.openshift.io/v1alpha1',
    kind='TailoredProfile',
    name='customized-tailored-profile',
    namespace='openshift-compliance',
) -> None:
    """Initialize."""
    self._extends = extends
    self._api_version = api_version
    self._kind = kind
    self._name = name
    self._namespace = namespace
transform(profile) ¤

Transform the Profile into a OSCO yaml.

Source code in trestle/transforms/implementations/osco.py
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
def transform(self, profile: Profile) -> str:
    """Transform the Profile into a OSCO yaml."""
    self._profile = profile
    self._osco_version = self._get_normalized_version('osco_version', '0.1.46')
    # set values
    set_values = self._get_set_values()
    # spec
    if self._osco_version < (0, 1, 40):
        # for versions prior to 0.1.40, exclude 'description'
        spec = {
            'extends': self._get_metadata_prop_value('base_profile_mnemonic', self._extends),
            'title': self._profile.metadata.title,
            'setValues': set_values,
        }
    else:
        # for versions 0.1.40 and beyond, include 'description'
        spec = {
            'description': self._get_metadata_prop_value('profile_mnemonic', self._name),
            'extends': self._get_metadata_prop_value('base_profile_mnemonic', self._extends),
            'title': self._profile.metadata.title,
            'setValues': set_values,
        }
    disable_rules = self._get_disable_rules()
    if disable_rules:
        spec['disableRules'] = disable_rules
    # yaml data
    ydata = {
        'apiVersion': self._api_version,
        'kind': self._kind,
        'metadata': {
            'name': self._get_metadata_prop_value('profile_mnemonic', self._name),
            'namespace': self._namespace,
        },
        'spec': spec,
    }
    return json.dumps(ydata)

OscoResultToOscalARTransformer ¤

Bases: ResultsTransformer

Interface for Osco transformer.

Source code in trestle/transforms/implementations/osco.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
class OscoResultToOscalARTransformer(ResultsTransformer):
    """Interface for Osco transformer."""

    def __init__(self) -> None:
        """Initialize."""
        self._modes = {}

    @property
    def analysis(self) -> List[str]:
        """Analysis."""
        return self._results_factory.analysis

    @property
    def checking(self):
        """Return checking."""
        return self._modes.get('checking', False)

    def set_modes(self, modes: Dict[str, Any]) -> None:
        """Keep modes info."""
        if modes is not None:
            self._modes = modes

    def transform(self, blob: str) -> Results:
        """Transform the blob into a Results.

        The expected blob is a string that is one of:
            - data from OpenShift Compliance Operator (json, yaml, xml)
            - data from Auditree OSCO fetcher/check (json)
        """
        results = None
        self._results_factory = _OscalResultsFactory(self.get_timestamp(), self.checking)
        if results is None:
            results = self._ingest_xml(blob)
        if results is None:
            results = self._ingest_json(blob)
        if results is None:
            results = self._ingest_yaml(blob)
        return results

    def _ingest_xml(self, blob: str) -> Results:
        """Ingest xml data."""
        # ?xml data
        if blob.startswith('<?xml'):
            resource = blob
            self._results_factory.ingest_xml(resource)
        else:
            return None
        results = Results()
        results.__root__.append(self._results_factory.result)
        return results

    def _ingest_json(self, blob: str) -> Results:
        """Ingest json data."""
        try:
            # ? configmaps or auditree data
            jdata = json.loads(blob)
            # https://docs.openshift.com/container-platform/3.7/rest_api/api/v1.ConfigMap.html#Get-api-v1-namespaces-namespace-configmaps-name
            if 'kind' in jdata.keys() and jdata['kind'] == 'ConfigMapList' and 'items' in jdata.keys():
                items = jdata['items']
                for item in items:
                    if 'data' in item.keys():
                        data = item['data']
                        if 'results' in data:
                            resource = item
                            self._results_factory.ingest(resource)
            # https://github.com/ComplianceAsCode/auditree-arboretum/blob/main/arboretum/kubernetes/fetchers/fetch_cluster_resource.py
            else:
                for key in jdata.keys():
                    for group in jdata[key]:
                        for cluster in jdata[key][group]:
                            if 'resources' in cluster:
                                for resource in cluster['resources']:
                                    self._results_factory.ingest(resource)
        except json.decoder.JSONDecodeError:
            return None
        results = Results()
        results.__root__.append(self._results_factory.result)
        return results

    def _ingest_yaml(self, blob: str) -> Results:
        """Ingest yaml data."""
        try:
            # ? yaml data
            yaml = YAML(typ='safe')
            resource = yaml.load(blob)
            self._results_factory.ingest(resource)
        except Exception as e:
            raise e
        results = Results()
        results.__root__.append(self._results_factory.result)
        return results
Attributes¤
analysis property ¤

Analysis.

checking property ¤

Return checking.

Functions¤
__init__() ¤

Initialize.

Source code in trestle/transforms/implementations/osco.py
58
59
60
def __init__(self) -> None:
    """Initialize."""
    self._modes = {}
set_modes(modes) ¤

Keep modes info.

Source code in trestle/transforms/implementations/osco.py
72
73
74
75
def set_modes(self, modes: Dict[str, Any]) -> None:
    """Keep modes info."""
    if modes is not None:
        self._modes = modes
transform(blob) ¤

Transform the blob into a Results.

The expected blob is a string that is one of
  • data from OpenShift Compliance Operator (json, yaml, xml)
  • data from Auditree OSCO fetcher/check (json)
Source code in trestle/transforms/implementations/osco.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def transform(self, blob: str) -> Results:
    """Transform the blob into a Results.

    The expected blob is a string that is one of:
        - data from OpenShift Compliance Operator (json, yaml, xml)
        - data from Auditree OSCO fetcher/check (json)
    """
    results = None
    self._results_factory = _OscalResultsFactory(self.get_timestamp(), self.checking)
    if results is None:
        results = self._ingest_xml(blob)
    if results is None:
        results = self._ingest_json(blob)
    if results is None:
        results = self._ingest_yaml(blob)
    return results

OscoTransformer ¤

Bases: OscoResultToOscalARTransformer

Legacy class name.

Source code in trestle/transforms/implementations/osco.py
149
150
class OscoTransformer(OscoResultToOscalARTransformer):
    """Legacy class name."""

RuleUse ¤

Represents one rule of OSCO data.

Source code in trestle/transforms/implementations/osco.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
class RuleUse():
    """Represents one rule of OSCO data."""

    def __init__(self, args: Dict[str, str]) -> None:
        """Initialize given specified args."""
        self.id_ = args['id_']
        self.target = args['target']
        self.target_type = args['target_type']
        self.host_name = args['host_name']
        self.benchmark_href = args['benchmark_href']
        self.benchmark_id = args['benchmark_id']
        self.scanner_name = args['scanner_name']
        self.scanner_version = args['scanner_version']
        self.idref = args['idref']
        self.version = args['version']
        self.time = args['time']
        self.result = args['result']
        self.severity = args['severity']
        self.weight = args['weight']

    @property
    def inventory_key(self):
        """Derive inventory key."""
        if self.host_name is None:
            # OpenScap 1.3.3
            rval = self.target + ':' + self.target_type
        else:
            # OpenScap 1.3.5
            rval = self.host_name + ':' + self.target_type
        return rval
Attributes¤
benchmark_href = args['benchmark_href'] instance-attribute ¤
benchmark_id = args['benchmark_id'] instance-attribute ¤
host_name = args['host_name'] instance-attribute ¤
id_ = args['id_'] instance-attribute ¤
idref = args['idref'] instance-attribute ¤
inventory_key property ¤

Derive inventory key.

result = args['result'] instance-attribute ¤
scanner_name = args['scanner_name'] instance-attribute ¤
scanner_version = args['scanner_version'] instance-attribute ¤
severity = args['severity'] instance-attribute ¤
target = args['target'] instance-attribute ¤
target_type = args['target_type'] instance-attribute ¤
time = args['time'] instance-attribute ¤
version = args['version'] instance-attribute ¤
weight = args['weight'] instance-attribute ¤
Functions¤
__init__(args) ¤

Initialize given specified args.

Source code in trestle/transforms/implementations/osco.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def __init__(self, args: Dict[str, str]) -> None:
    """Initialize given specified args."""
    self.id_ = args['id_']
    self.target = args['target']
    self.target_type = args['target_type']
    self.host_name = args['host_name']
    self.benchmark_href = args['benchmark_href']
    self.benchmark_id = args['benchmark_id']
    self.scanner_name = args['scanner_name']
    self.scanner_version = args['scanner_version']
    self.idref = args['idref']
    self.version = args['version']
    self.time = args['time']
    self.result = args['result']
    self.severity = args['severity']
    self.weight = args['weight']

Functions¤

handler: python