Skip to content

transformer_helper

trestle.transforms.transformer_helper ¤

Transformer helper functions.

Classes¤

PropertyAccounting ¤

Property accounting class.

Help transformers do accounting.

Each time a new record is processed the transformer calls count_group. For each attribute on that record, the transformer calls count_property. - If the property already exactly exists, then its count is incremented. - Otherwise, a new entry is made and the count for that property is set to 1. When the transformer wants to know if a property (name, value, ns, and class) is common to all records for the group, is_common_property is employed to check that the number of records in the group is equal to the number of duplicates there are for the property. If equal, then the property is common.

Source code in trestle/transforms/transformer_helper.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
class PropertyAccounting():
    """Property accounting class.

    Help transformers do accounting.

    > Each time a new record is processed the transformer calls count_group.
    > For each attribute on that record, the transformer calls count_property.
      - If the property already exactly exists, then its count is incremented.
      - Otherwise, a new entry is made and the count for that property is set to 1.
    > When the transformer wants to know if a property (name, value, ns, and class)
      is common to all records for the group, is_common_property is employed to check
      that the number of records in the group is equal to the number of duplicates
      there are for the property. If equal, then the property is common.
    """

    def __init__(self) -> None:
        """Initialize."""
        self._group_map: Dict[str, int] = {}
        self._property_map: Dict[str, Dict[str, int]] = {}

    def count_group(self, group: Optional[str] = None) -> None:
        """Group accounting."""
        if not group:
            raise TrestleError('count_group created with group=None')
        if group not in self._group_map:
            self._group_map[group] = 0
        self._group_map[group] += 1

    def count_property(
        self,
        group: str,
        name: Optional[str] = None,
        value: Optional[str] = None,
        class_: Optional[str] = None,
        ns: Optional[str] = None
    ) -> None:
        """Property accounting."""
        key = _segment_separator.join([str(name), str(value), str(class_), str(ns)])
        if group not in self._property_map:
            self._property_map[group] = {}
        if key not in self._property_map[group]:
            self._property_map[group][key] = 0
        self._property_map[group][key] += 1

    def is_common_property(
        self,
        group: str,
        name: Optional[str] = None,
        value: Optional[str] = None,
        class_: Optional[str] = None,
        ns: Optional[str] = None
    ) -> bool:
        """Check for common property."""
        rval = False
        key = _segment_separator.join([str(name), str(value), str(class_), str(ns)])
        if group in self._group_map and key in self._property_map[group]:
            rval = self._group_map[group] == self._property_map[group][key]
        return rval
Functions¤
__init__() ¤

Initialize.

Source code in trestle/transforms/transformer_helper.py
44
45
46
47
def __init__(self) -> None:
    """Initialize."""
    self._group_map: Dict[str, int] = {}
    self._property_map: Dict[str, Dict[str, int]] = {}
count_group(group=None) ¤

Group accounting.

Source code in trestle/transforms/transformer_helper.py
49
50
51
52
53
54
55
def count_group(self, group: Optional[str] = None) -> None:
    """Group accounting."""
    if not group:
        raise TrestleError('count_group created with group=None')
    if group not in self._group_map:
        self._group_map[group] = 0
    self._group_map[group] += 1
count_property(group, name=None, value=None, class_=None, ns=None) ¤

Property accounting.

Source code in trestle/transforms/transformer_helper.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def count_property(
    self,
    group: str,
    name: Optional[str] = None,
    value: Optional[str] = None,
    class_: Optional[str] = None,
    ns: Optional[str] = None
) -> None:
    """Property accounting."""
    key = _segment_separator.join([str(name), str(value), str(class_), str(ns)])
    if group not in self._property_map:
        self._property_map[group] = {}
    if key not in self._property_map[group]:
        self._property_map[group][key] = 0
    self._property_map[group][key] += 1
is_common_property(group, name=None, value=None, class_=None, ns=None) ¤

Check for common property.

Source code in trestle/transforms/transformer_helper.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def is_common_property(
    self,
    group: str,
    name: Optional[str] = None,
    value: Optional[str] = None,
    class_: Optional[str] = None,
    ns: Optional[str] = None
) -> bool:
    """Check for common property."""
    rval = False
    key = _segment_separator.join([str(name), str(value), str(class_), str(ns)])
    if group in self._group_map and key in self._property_map[group]:
        rval = self._group_map[group] == self._property_map[group][key]
    return rval

PropertyManager ¤

Property manager class.

Help transformer manage properties.

Use materialize to: fetch a property from cache (if caching), else create a new property instance and keep in cache (if caching). Use put_common_property to: keep common properties for each group. Use get_common_properties to: recall the list of common properties for the group.

Source code in trestle/transforms/transformer_helper.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
class PropertyManager():
    """Property manager class.

    Help transformer manage properties.

    > Use materialize to: fetch a property from cache (if caching), else create a new
      property instance and keep in cache (if caching).
    > Use put_common_property to: keep common properties for each group.
    > Use get_common_properties to: recall the list of common properties for the group.
    """

    def __init__(self, caching: bool = True, checking: bool = False) -> None:
        """Initialize."""
        self._caching = caching
        self._checking = checking
        self._requests = 0
        self._hits = 0
        self._map_unique: Dict[str, Any] = {}
        self._map_common: Dict[str, Dict[str, Property]] = {}

    @property
    def requests(self) -> int:
        """Cache requests."""
        return self._requests

    @property
    def hits(self) -> int:
        """Cache hits."""
        return self._hits

    def materialize(
        self,
        name: Optional[str] = None,
        value: Optional[str] = None,
        class_: Optional[str] = None,
        ns: Optional[str] = None
    ) -> Property:
        """Get property from cache or create new property."""
        self._requests += 1
        # try fetch from cache
        key = _segment_separator.join([str(name), str(value), str(class_), str(ns)])
        if key in self._map_unique:
            self._hits += 1
            return self._map_unique[key]
        # create new property and put into cache if caching
        prop = self._create(name=name, value=value, class_=class_, ns=ns)
        if self._caching:
            self._map_unique[key] = prop
        return prop

    def put_common_property(
        self,
        group: Optional[str] = None,
        name: Optional[str] = None,
        value: Optional[str] = None,
        class_: Optional[str] = None,
        ns: Optional[str] = None
    ) -> None:
        """Remember common property."""
        if not group:
            raise TrestleError('put_common_property created with group=None')
        if group not in self._map_common:
            self._map_common[group] = {}
        key = _segment_separator.join([str(name), str(value), str(class_), str(ns)])
        if key not in self._map_common[group]:
            prop = self.materialize(name, value, class_, ns)
            self._map_common[group][key] = prop

    def get_common_properties(self, group: Optional[str] = None) -> Optional[List[Property]]:
        """Recall common properties for the group."""
        rval = None
        if not group:
            raise TrestleError('get_common_properties created with group=None')
        if group in self._map_common:
            rval = list(self._map_common[group].values())
        return rval

    def _create(
        self,
        name: Optional[str] = None,
        value: Optional[str] = None,
        class_: Optional[str] = None,
        ns: Optional[str] = None
    ) -> Property:
        """Create new property."""
        if self._checking:
            return Property(name=name, value=value, class_=class_, ns=ns)  # type: ignore
        return Property.construct(name=name, value=value, class_=class_, ns=ns)  # type: ignore
Attributes¤
hits: int property ¤

Cache hits.

requests: int property ¤

Cache requests.

Functions¤
__init__(caching=True, checking=False) ¤

Initialize.

Source code in trestle/transforms/transformer_helper.py
100
101
102
103
104
105
106
107
def __init__(self, caching: bool = True, checking: bool = False) -> None:
    """Initialize."""
    self._caching = caching
    self._checking = checking
    self._requests = 0
    self._hits = 0
    self._map_unique: Dict[str, Any] = {}
    self._map_common: Dict[str, Dict[str, Property]] = {}
get_common_properties(group=None) ¤

Recall common properties for the group.

Source code in trestle/transforms/transformer_helper.py
157
158
159
160
161
162
163
164
def get_common_properties(self, group: Optional[str] = None) -> Optional[List[Property]]:
    """Recall common properties for the group."""
    rval = None
    if not group:
        raise TrestleError('get_common_properties created with group=None')
    if group in self._map_common:
        rval = list(self._map_common[group].values())
    return rval
materialize(name=None, value=None, class_=None, ns=None) ¤

Get property from cache or create new property.

Source code in trestle/transforms/transformer_helper.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def materialize(
    self,
    name: Optional[str] = None,
    value: Optional[str] = None,
    class_: Optional[str] = None,
    ns: Optional[str] = None
) -> Property:
    """Get property from cache or create new property."""
    self._requests += 1
    # try fetch from cache
    key = _segment_separator.join([str(name), str(value), str(class_), str(ns)])
    if key in self._map_unique:
        self._hits += 1
        return self._map_unique[key]
    # create new property and put into cache if caching
    prop = self._create(name=name, value=value, class_=class_, ns=ns)
    if self._caching:
        self._map_unique[key] = prop
    return prop
put_common_property(group=None, name=None, value=None, class_=None, ns=None) ¤

Remember common property.

Source code in trestle/transforms/transformer_helper.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def put_common_property(
    self,
    group: Optional[str] = None,
    name: Optional[str] = None,
    value: Optional[str] = None,
    class_: Optional[str] = None,
    ns: Optional[str] = None
) -> None:
    """Remember common property."""
    if not group:
        raise TrestleError('put_common_property created with group=None')
    if group not in self._map_common:
        self._map_common[group] = {}
    key = _segment_separator.join([str(name), str(value), str(class_), str(ns)])
    if key not in self._map_common[group]:
        prop = self.materialize(name, value, class_, ns)
        self._map_common[group][key] = prop

TransformerHelper ¤

OSCAL transformer helper.

Source code in trestle/transforms/transformer_helper.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
class TransformerHelper():
    """OSCAL transformer helper."""

    def remove_common_observation_properties(self, observations: List[Observation]) -> List[Property]:
        """Remove common observation properties."""
        common_props = []
        props = {}
        # count each property occurrence in each observation
        props_occurrence_counts = self._get_property_occurrence_counts(observations)
        # remove common properties from observation
        for key in props_occurrence_counts.keys():
            # skip property if not identical for each and every observation
            if props_occurrence_counts[key] != len(observations):
                continue
            # remove property from each observation and keep one instance
            for observation in observations:
                for prop in as_list(observation.props):
                    if key == f'{prop.name}:{prop.value}:{prop.class_}':
                        props[key] = prop
                        observation.props.remove(prop)
                        break
        # formulate list of removed properties
        for key in props.keys():
            common_props.append(props[key])
        # return list of removed properties
        return common_props

    def _get_property_occurrence_counts(self, observations: List[Observation]) -> Dict[str, Property]:
        """Count each property occurrence in each observation."""
        property_occurences = {}
        for observation in observations:
            for prop in observation.props:
                key = f'{prop.name}:{prop.value}:{prop.class_}'
                if key not in property_occurences.keys():
                    property_occurences[key] = 0
                property_occurences[key] += 1
        return property_occurences
Functions¤
remove_common_observation_properties(observations) ¤

Remove common observation properties.

Source code in trestle/transforms/transformer_helper.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def remove_common_observation_properties(self, observations: List[Observation]) -> List[Property]:
    """Remove common observation properties."""
    common_props = []
    props = {}
    # count each property occurrence in each observation
    props_occurrence_counts = self._get_property_occurrence_counts(observations)
    # remove common properties from observation
    for key in props_occurrence_counts.keys():
        # skip property if not identical for each and every observation
        if props_occurrence_counts[key] != len(observations):
            continue
        # remove property from each observation and keep one instance
        for observation in observations:
            for prop in as_list(observation.props):
                if key == f'{prop.name}:{prop.value}:{prop.class_}':
                    props[key] = prop
                    observation.props.remove(prop)
                    break
    # formulate list of removed properties
    for key in props.keys():
        common_props.append(props[key])
    # return list of removed properties
    return common_props

Functions¤

handler: python