Skip to content

merge

trestle.core.resolver.merge ¤

Create resolved catalog from profile.

CATALOG_EXCLUDE ¤

CONTROL_EXCLUDE ¤

ID ¤

ITEM_EXCLUDE_MAP ¤

NAME ¤

PARAMETER_EXCLUDE ¤

PART_EXCLUDE ¤

PROPERTY_EXCLUDE ¤

logger ¤

Classes¤

Merge (Filter) ¤

Merge the incoming catalogs according to rules in the profile.

The incoming catalogs have already been pruned based on the import. Now the controls must be gathered, merged, and grouped based on the merge settings.

Source code in trestle/core/resolver/merge.py
class Merge(Pipeline.Filter):
    """
    Merge the incoming catalogs according to rules in the profile.

    The incoming catalogs have already been pruned based on the import.
    Now the controls must be gathered, merged, and grouped based on the merge settings.
    """

    def __init__(self, profile: prof.Profile) -> None:
        """Initialize the class with the profile."""
        logger.debug('merge filter initialize')
        self._profile = profile

    def _get_id(self, item: OBT) -> Optional[str]:
        id_ = getattr(item, ID, None)
        if id_ is None:
            id_ = getattr(item, NAME, None)
        return id_

    def _item_genertor(self, dest: List[OBT], src: List[OBT], merge_method: Optional[str]) -> Iterator[OBT]:
        if merge_method == prof.CombinationMethodValidValues.keep.value:
            dest.extend(src)
        else:
            for item in src:
                # if there is an exact copy of this in dest then ignore it
                if item not in dest:
                    yield item

    def _merge_item(self, item: OBT, dest: List[OBT], merge_method: Optional[str]) -> bool:
        merged = False
        item_id = self._get_id(item)
        if item_id is not None:
            for other in dest:
                other_id = self._get_id(other)
                if other_id != item_id:
                    continue
                if merge_method == prof.CombinationMethodValidValues.merge.value:
                    self._merge_items(other, item, merge_method)
                merged = True
                break
        return merged

    def _merge_lists(self, dest: List[OBT], src: List[OBT], merge_method: Optional[str]) -> None:
        added_items = []
        if merge_method == prof.CombinationMethodValidValues.keep.value:
            dest.extend(src)
            return
        for item in self._item_genertor(dest, src, merge_method):
            merged = self._merge_item(item, dest, merge_method)
            # it isn't already in dest and no match was found for merge, so append
            if not merged:
                added_items.append(item)
        dest.extend(added_items)

    def _merge_attrs(
        self, dest: Union[OBT, List[OBT]], src: Union[OBT, List[OBT]], attr: str, merge_method: Optional[str]
    ) -> None:
        """Merge this attr of src into the attr of dest."""
        src_attr = getattr(src, attr, None)
        if src_attr is None:
            return
        item_type = type(src).__name__
        if attr in ITEM_EXCLUDE_MAP.get(item_type, []):
            return
        dest_attr = getattr(dest, attr, None)
        if dest_attr and isinstance(dest_attr, list):
            self._merge_lists(dest_attr, src_attr, merge_method)
            setattr(dest, attr, dest_attr)
            return
        if dest_attr and merge_method == prof.CombinationMethodValidValues.use_first.value:
            return
        if dest_attr == src_attr and merge_method not in [None, prof.CombinationMethodValidValues.keep.value]:
            return
        setattr(dest, attr, src_attr)

    def _merge_items(self, dest: OBT, src: OBT, merge_method: Optional[str]) -> None:
        """Merge two items recursively."""
        for field in src.__fields_set__:
            self._merge_attrs(dest, src, field, merge_method)

    def _group_contents(self, group: cat.Group) -> Tuple[List[cat.Control], List[com.Parameter]]:
        """Get flattened content of group and its groups recursively."""
        controls = []
        params = []
        controls.extend(as_list(group.controls))
        params.extend(as_list(group.params))
        if group.groups is not None:
            for sub_group in group.groups:
                new_controls, new_params = self._group_contents(sub_group)
                controls.extend(new_controls)
                params.extend(new_params)
        return controls, params

    def _flatten_catalog(self, catalog: cat.Catalog, as_is: bool) -> cat.Catalog:
        """Flatten the groups of the catalog if as_is is False."""
        if as_is or catalog.groups is None:
            return catalog

        # as_is is False so flatten the controls into a single list
        catalog.controls = as_list(catalog.controls)
        catalog.params = as_list(catalog.params)
        for group in catalog.groups:
            new_controls, new_params = self._group_contents(group)
            catalog.controls.extend(new_controls)
            catalog.params.extend(new_params)
        catalog.controls = none_if_empty(catalog.controls)
        catalog.params = none_if_empty(catalog.params)
        catalog.groups = None
        return catalog

    def _merge_two_catalogs(
        self, dest: cat.Catalog, src: cat.Catalog, merge_method: Optional[str], as_is: bool
    ) -> cat.Catalog:
        # merge_method is use_first, merge, keep
        # no combine or merge_method equates to merge_method=keep
        # if as_is is false, the result is flattened

        dest = self._flatten_catalog(dest, as_is)
        src = self._flatten_catalog(src, as_is)

        self._merge_items(dest, src, merge_method)

        return dest

    def _merge_catalog(self, merged: Optional[cat.Catalog], catalog: cat.Catalog) -> cat.Catalog:
        """Merge the controls in the catalog into merged catalog."""
        # no merge means keep, including dups
        # same for merge with no combine
        # groups are merged only if separate directive such as as-is is given
        # use-first is a merge combination rule
        # merge is a merge combination rule for controls.  groups are not merged by this rule
        # merge/as-is and merge/custom are used for merging groups
        # if neither as-is nor custom is specified - just get single list of controls
        # unstructured controls should appear after any loose params

        # make copies to avoid changing input objects
        local_cat = catalog.copy(deep=True)
        local_merged = merged.copy(deep=True) if merged else None

        merge_method = prof.CombinationMethodValidValues.keep.value
        as_is = False
        if self._profile.merge is not None:
            if self._profile.merge.custom is not None:
                raise TrestleError('Profile with custom merge is not supported.')
            if self._profile.merge.as_is is not None:
                as_is = self._profile.merge.as_is
            if self._profile.merge.combine is None:
                logger.debug('Profile has merge but no combine so defaulting to combine/merge.')
                merge_method = prof.CombinationMethodValidValues.merge.value
            else:
                merge_combine = self._profile.merge.combine
                if merge_combine.method.value is None:
                    logger.debug('Profile has merge combine but no method.  Defaulting to merge.')
                    merge_method = prof.CombinationMethodValidValues.merge.value
                else:
                    merge_method = merge_combine.method.value

        if local_merged is None:
            return self._flatten_catalog(local_cat, as_is)

        # merge the incoming catalog with merged based on merge_method and as_is
        return self._merge_two_catalogs(local_merged, local_cat, merge_method, as_is)

    def process(self, pipelines: List[Pipeline]) -> Iterator[cat.Catalog]:  # type: ignore
        """
        Merge the incoming catalogs.

        This pulls from import and iterates over the incoming catalogs.
        The way groups, lists of controls, and controls themselves get merged is specified by the profile.
        """
        merged: Optional[cat.Catalog] = None
        logger.debug(f'merge entering process with {len(pipelines)} pipelines')
        for pipeline in pipelines:
            catalog = next(pipeline.process(None))
            merged = self._merge_catalog(merged, catalog)
        yield merged  # type: ignore
Methods¤
__init__(self, profile) special ¤

Initialize the class with the profile.

Source code in trestle/core/resolver/merge.py
def __init__(self, profile: prof.Profile) -> None:
    """Initialize the class with the profile."""
    logger.debug('merge filter initialize')
    self._profile = profile
process(self, pipelines) ¤

Merge the incoming catalogs.

This pulls from import and iterates over the incoming catalogs. The way groups, lists of controls, and controls themselves get merged is specified by the profile.

Source code in trestle/core/resolver/merge.py
def process(self, pipelines: List[Pipeline]) -> Iterator[cat.Catalog]:  # type: ignore
    """
    Merge the incoming catalogs.

    This pulls from import and iterates over the incoming catalogs.
    The way groups, lists of controls, and controls themselves get merged is specified by the profile.
    """
    merged: Optional[cat.Catalog] = None
    logger.debug(f'merge entering process with {len(pipelines)} pipelines')
    for pipeline in pipelines:
        catalog = next(pipeline.process(None))
        merged = self._merge_catalog(merged, catalog)
    yield merged  # type: ignore

handler: python