Skip to content


trestle.transforms.transformer_helper ¤

Transformer helper functions.


PropertyAccounting ¤

Property accounting class.

Help transformers do accounting.

Each time a new record is processed the transformer calls count_group. For each attribute on that record, the transformer calls count_property. - If the property already exactly exists, then its count is incremented. - Otherwise, a new entry is made and the count for that property is set to 1. When the transformer wants to know if a property (name, value, ns, and class) is common to all records for the group, is_common_property is employed to check that the number of records in the group is equal to the number of duplicates there are for the property. If equal, then the property is common.

Source code in trestle/transforms/
class PropertyAccounting():
    """Property accounting class.

    Help transformers do accounting.

    > Each time a new record is processed the transformer calls count_group.
    > For each attribute on that record, the transformer calls count_property.
      - If the property already exactly exists, then its count is incremented.
      - Otherwise, a new entry is made and the count for that property is set to 1.
    > When the transformer wants to know if a property (name, value, ns, and class)
      is common to all records for the group, is_common_property is employed to check
      that the number of records in the group is equal to the number of duplicates
      there are for the property. If equal, then the property is common.

    def __init__(self) -> None:
        self._group_map: Dict[str, int] = {}
        self._property_map: Dict[str, Dict[str, int]] = {}

    def count_group(self, group: Optional[str] = None) -> None:
        """Group accounting."""
        if not group:
            raise TrestleError('count_group created with group=None')
        if group not in self._group_map:
            self._group_map[group] = 0
        self._group_map[group] += 1

    def count_property(
        group: str,
        name: Optional[str] = None,
        value: Optional[str] = None,
        class_: Optional[str] = None,
        ns: Optional[str] = None
    ) -> None:
        """Property accounting."""
        key = _segment_separator.join([str(name), str(value), str(class_), str(ns)])
        if group not in self._property_map:
            self._property_map[group] = {}
        if key not in self._property_map[group]:
            self._property_map[group][key] = 0
        self._property_map[group][key] += 1

    def is_common_property(
        group: str,
        name: Optional[str] = None,
        value: Optional[str] = None,
        class_: Optional[str] = None,
        ns: Optional[str] = None
    ) -> bool:
        """Check for common property."""
        rval = False
        key = _segment_separator.join([str(name), str(value), str(class_), str(ns)])
        if group in self._group_map and key in self._property_map[group]:
            rval = self._group_map[group] == self._property_map[group][key]
        return rval
__init__(self) special ¤


Source code in trestle/transforms/
def __init__(self) -> None:
    self._group_map: Dict[str, int] = {}
    self._property_map: Dict[str, Dict[str, int]] = {}
count_group(self, group=None) ¤

Group accounting.

Source code in trestle/transforms/
def count_group(self, group: Optional[str] = None) -> None:
    """Group accounting."""
    if not group:
        raise TrestleError('count_group created with group=None')
    if group not in self._group_map:
        self._group_map[group] = 0
    self._group_map[group] += 1
count_property(self, group, name=None, value=None, class_=None, ns=None) ¤

Property accounting.

Source code in trestle/transforms/
def count_property(
    group: str,
    name: Optional[str] = None,
    value: Optional[str] = None,
    class_: Optional[str] = None,
    ns: Optional[str] = None
) -> None:
    """Property accounting."""
    key = _segment_separator.join([str(name), str(value), str(class_), str(ns)])
    if group not in self._property_map:
        self._property_map[group] = {}
    if key not in self._property_map[group]:
        self._property_map[group][key] = 0
    self._property_map[group][key] += 1
is_common_property(self, group, name=None, value=None, class_=None, ns=None) ¤

Check for common property.

Source code in trestle/transforms/
def is_common_property(
    group: str,
    name: Optional[str] = None,
    value: Optional[str] = None,
    class_: Optional[str] = None,
    ns: Optional[str] = None
) -> bool:
    """Check for common property."""
    rval = False
    key = _segment_separator.join([str(name), str(value), str(class_), str(ns)])
    if group in self._group_map and key in self._property_map[group]:
        rval = self._group_map[group] == self._property_map[group][key]
    return rval

PropertyManager ¤

Property manager class.

Help transformer manage properties.

Use materialize to: fetch a property from cache (if caching), else create a new property instance and keep in cache (if caching). Use put_common_property to: keep common properties for each group. Use get_common_properties to: recall the list of common properties for the group.

Source code in trestle/transforms/
class PropertyManager():
    """Property manager class.

    Help transformer manage properties.

    > Use materialize to: fetch a property from cache (if caching), else create a new
      property instance and keep in cache (if caching).
    > Use put_common_property to: keep common properties for each group.
    > Use get_common_properties to: recall the list of common properties for the group.

    def __init__(self, caching: bool = True, checking: bool = False) -> None:
        self._caching = caching
        self._checking = checking
        self._requests = 0
        self._hits = 0
        self._map_unique: Dict[str, Any] = {}
        self._map_common: Dict[str, Dict[str, Property]] = {}

    def requests(self) -> int:
        """Cache requests."""
        return self._requests

    def hits(self) -> int:
        """Cache hits."""
        return self._hits

    def materialize(
        name: Optional[str] = None,
        value: Optional[str] = None,
        class_: Optional[str] = None,
        ns: Optional[str] = None
    ) -> Property:
        """Get property from cache or create new property."""
        self._requests += 1
        # try fetch from cache
        key = _segment_separator.join([str(name), str(value), str(class_), str(ns)])
        if key in self._map_unique:
            self._hits += 1
            return self._map_unique[key]
        # create new property and put into cache if caching
        prop = self._create(name=name, value=value, class_=class_, ns=ns)
        if self._caching:
            self._map_unique[key] = prop
        return prop

    def put_common_property(
        group: Optional[str] = None,
        name: Optional[str] = None,
        value: Optional[str] = None,
        class_: Optional[str] = None,
        ns: Optional[str] = None
    ) -> None:
        """Remember common property."""
        if not group:
            raise TrestleError('put_common_property created with group=None')
        if group not in self._map_common:
            self._map_common[group] = {}
        key = _segment_separator.join([str(name), str(value), str(class_), str(ns)])
        if key not in self._map_common[group]:
            prop = self.materialize(name, value, class_, ns)
            self._map_common[group][key] = prop

    def get_common_properties(self, group: Optional[str] = None) -> Optional[List[Property]]:
        """Recall common properties for the group."""
        rval = None
        if not group:
            raise TrestleError('get_common_properties created with group=None')
        if group in self._map_common:
            rval = list(self._map_common[group].values())
        return rval

    def _create(
        name: Optional[str] = None,
        value: Optional[str] = None,
        class_: Optional[str] = None,
        ns: Optional[str] = None
    ) -> Property:
        """Create new property."""
        if self._checking:
            return Property(name=name, value=value, class_=class_, ns=ns)  # type: ignore
        return Property.construct(name=name, value=value, class_=class_, ns=ns)  # type: ignore
hits: int property readonly ¤

Cache hits.

requests: int property readonly ¤

Cache requests.

__init__(self, caching=True, checking=False) special ¤


Source code in trestle/transforms/
def __init__(self, caching: bool = True, checking: bool = False) -> None:
    self._caching = caching
    self._checking = checking
    self._requests = 0
    self._hits = 0
    self._map_unique: Dict[str, Any] = {}
    self._map_common: Dict[str, Dict[str, Property]] = {}
get_common_properties(self, group=None) ¤

Recall common properties for the group.

Source code in trestle/transforms/
def get_common_properties(self, group: Optional[str] = None) -> Optional[List[Property]]:
    """Recall common properties for the group."""
    rval = None
    if not group:
        raise TrestleError('get_common_properties created with group=None')
    if group in self._map_common:
        rval = list(self._map_common[group].values())
    return rval
materialize(self, name=None, value=None, class_=None, ns=None) ¤

Get property from cache or create new property.

Source code in trestle/transforms/
def materialize(
    name: Optional[str] = None,
    value: Optional[str] = None,
    class_: Optional[str] = None,
    ns: Optional[str] = None
) -> Property:
    """Get property from cache or create new property."""
    self._requests += 1
    # try fetch from cache
    key = _segment_separator.join([str(name), str(value), str(class_), str(ns)])
    if key in self._map_unique:
        self._hits += 1
        return self._map_unique[key]
    # create new property and put into cache if caching
    prop = self._create(name=name, value=value, class_=class_, ns=ns)
    if self._caching:
        self._map_unique[key] = prop
    return prop
put_common_property(self, group=None, name=None, value=None, class_=None, ns=None) ¤

Remember common property.

Source code in trestle/transforms/
def put_common_property(
    group: Optional[str] = None,
    name: Optional[str] = None,
    value: Optional[str] = None,
    class_: Optional[str] = None,
    ns: Optional[str] = None
) -> None:
    """Remember common property."""
    if not group:
        raise TrestleError('put_common_property created with group=None')
    if group not in self._map_common:
        self._map_common[group] = {}
    key = _segment_separator.join([str(name), str(value), str(class_), str(ns)])
    if key not in self._map_common[group]:
        prop = self.materialize(name, value, class_, ns)
        self._map_common[group][key] = prop

TransformerHelper ¤

OSCAL transformer helper.

Source code in trestle/transforms/
class TransformerHelper():
    """OSCAL transformer helper."""

    def remove_common_observation_properties(self, observations: List[Observation]) -> List[Property]:
        """Remove common observation properties."""
        common_props = []
        props = {}
        # count each property occurrence in each observation
        props_occurrence_counts = self._get_property_occurrence_counts(observations)
        # remove common properties from observation
        for key in props_occurrence_counts.keys():
            # skip property if not identical for each and every observation
            if props_occurrence_counts[key] != len(observations):
            # remove property from each observation and keep one instance
            for observation in observations:
                for prop in as_list(observation.props):
                    if key == f'{}:{prop.value}:{prop.class_}':
                        props[key] = prop
        # formulate list of removed properties
        for key in props.keys():
        # return list of removed properties
        return common_props

    def _get_property_occurrence_counts(self, observations: List[Observation]) -> Dict[str, Property]:
        """Count each property occurrence in each observation."""
        property_occurences = {}
        for observation in observations:
            for prop in observation.props:
                key = f'{}:{prop.value}:{prop.class_}'
                if key not in property_occurences.keys():
                    property_occurences[key] = 0
                property_occurences[key] += 1
        return property_occurences
remove_common_observation_properties(self, observations) ¤

Remove common observation properties.

Source code in trestle/transforms/
def remove_common_observation_properties(self, observations: List[Observation]) -> List[Property]:
    """Remove common observation properties."""
    common_props = []
    props = {}
    # count each property occurrence in each observation
    props_occurrence_counts = self._get_property_occurrence_counts(observations)
    # remove common properties from observation
    for key in props_occurrence_counts.keys():
        # skip property if not identical for each and every observation
        if props_occurrence_counts[key] != len(observations):
        # remove property from each observation and keep one instance
        for observation in observations:
            for prop in as_list(observation.props):
                if key == f'{}:{prop.value}:{prop.class_}':
                    props[key] = prop
    # formulate list of removed properties
    for key in props.keys():
    # return list of removed properties
    return common_props

handler: python