Skip to content

trestle.core.resolver.merge

trestle.core.resolver.merge ¤

Create resolved catalog from profile.

Attributes¤

CATALOG_EXCLUDE = ['uuid', 'metadata', 'back_matter'] module-attribute ¤

CONTROL_EXCLUDE = [ID] module-attribute ¤

ID = 'id' module-attribute ¤

ITEM_EXCLUDE_MAP = {'Part': PART_EXCLUDE, 'Property': PROPERTY_EXCLUDE, 'Parameter': PARAMETER_EXCLUDE, 'Control': CONTROL_EXCLUDE, 'Catalog': CATALOG_EXCLUDE} module-attribute ¤

NAME = 'name' module-attribute ¤

PARAMETER_EXCLUDE = [ID] module-attribute ¤

PART_EXCLUDE = [NAME] module-attribute ¤

PROPERTY_EXCLUDE = [NAME] module-attribute ¤

logger = logging.getLogger(__name__) module-attribute ¤

Classes¤

Merge ¤

Bases: Filter

Merge the incoming catalogs according to rules in the profile.

The incoming catalogs have already been pruned based on the import. Now the controls must be gathered, merged, and grouped based on the merge settings.

Source code in trestle/core/resolver/merge.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
class Merge(Pipeline.Filter):
    """
    Merge the incoming catalogs according to rules in the profile.

    The incoming catalogs have already been pruned based on the import.
    Now the controls must be gathered, merged, and grouped based on the merge settings.
    """

    def __init__(self, profile: prof.Profile) -> None:
        """Initialize the class with the profile."""
        logger.debug('merge filter initialize')
        self._profile = profile

    def _get_id(self, item: OBT) -> Optional[str]:
        id_ = getattr(item, ID, None)
        if id_ is None:
            id_ = getattr(item, NAME, None)
        return id_

    def _item_genertor(self, dest: List[OBT], src: List[OBT], merge_method: Optional[str]) -> Iterator[OBT]:
        if merge_method == prof.CombinationMethodValidValues.keep.value:
            dest.extend(src)
        else:
            for item in src:
                # if there is an exact copy of this in dest then ignore it
                if item not in dest:
                    yield item

    def _merge_item(self, item: OBT, dest: List[OBT], merge_method: Optional[str]) -> bool:
        merged = False
        item_id = self._get_id(item)
        if item_id is not None:
            for other in dest:
                other_id = self._get_id(other)
                if other_id != item_id:
                    continue
                if merge_method == prof.CombinationMethodValidValues.merge.value:
                    self._merge_items(other, item, merge_method)
                merged = True
                break
        return merged

    def _merge_lists(self, dest: List[OBT], src: List[OBT], merge_method: Optional[str]) -> None:
        added_items = []
        if merge_method == prof.CombinationMethodValidValues.keep.value:
            dest.extend(src)
            return
        for item in self._item_genertor(dest, src, merge_method):
            merged = self._merge_item(item, dest, merge_method)
            # it isn't already in dest and no match was found for merge, so append
            if not merged:
                added_items.append(item)
        dest.extend(added_items)

    def _merge_attrs(
        self, dest: Union[OBT, List[OBT]], src: Union[OBT, List[OBT]], attr: str, merge_method: Optional[str]
    ) -> None:
        """Merge this attr of src into the attr of dest."""
        src_attr = getattr(src, attr, None)
        if src_attr is None:
            return
        item_type = type(src).__name__
        if attr in ITEM_EXCLUDE_MAP.get(item_type, []):
            return
        dest_attr = getattr(dest, attr, None)
        if dest_attr and isinstance(dest_attr, list):
            self._merge_lists(dest_attr, src_attr, merge_method)
            setattr(dest, attr, dest_attr)
            return
        if dest_attr and merge_method == prof.CombinationMethodValidValues.use_first.value:
            return
        if dest_attr == src_attr and merge_method not in [None, prof.CombinationMethodValidValues.keep.value]:
            return
        setattr(dest, attr, src_attr)

    def _merge_items(self, dest: OBT, src: OBT, merge_method: Optional[str]) -> None:
        """Merge two items recursively."""
        for field in src.__fields_set__:
            self._merge_attrs(dest, src, field, merge_method)

    def _group_contents(self, group: cat.Group) -> Tuple[List[cat.Control], List[com.Parameter]]:
        """Get flattened content of group and its groups recursively."""
        controls = []
        params = []
        controls.extend(as_list(group.controls))
        params.extend(as_list(group.params))
        if group.groups is not None:
            for sub_group in group.groups:
                new_controls, new_params = self._group_contents(sub_group)
                controls.extend(new_controls)
                params.extend(new_params)
        return controls, params

    def _flatten_catalog(self, catalog: cat.Catalog, as_is: bool) -> cat.Catalog:
        """Flatten the groups of the catalog if as_is is False."""
        if as_is or catalog.groups is None:
            return catalog

        # as_is is False so flatten the controls into a single list
        catalog.controls = as_list(catalog.controls)
        catalog.params = as_list(catalog.params)
        for group in catalog.groups:
            new_controls, new_params = self._group_contents(group)
            catalog.controls.extend(new_controls)
            catalog.params.extend(new_params)
        catalog.controls = none_if_empty(catalog.controls)
        catalog.params = none_if_empty(catalog.params)
        catalog.groups = None
        return catalog

    def _merge_two_catalogs(
        self, dest: cat.Catalog, src: cat.Catalog, merge_method: Optional[str], as_is: bool
    ) -> cat.Catalog:
        # merge_method is use_first, merge, keep
        # no combine or merge_method equates to merge_method=keep
        # if as_is is false, the result is flattened

        dest = self._flatten_catalog(dest, as_is)
        src = self._flatten_catalog(src, as_is)

        self._merge_items(dest, src, merge_method)

        return dest

    def _merge_catalog(self, merged: Optional[cat.Catalog], catalog: cat.Catalog) -> cat.Catalog:
        """Merge the controls in the catalog into merged catalog."""
        # no merge means keep, including dups
        # same for merge with no combine
        # groups are merged only if separate directive such as as-is is given
        # use-first is a merge combination rule
        # merge is a merge combination rule for controls.  groups are not merged by this rule
        # merge/as-is and merge/custom are used for merging groups
        # if neither as-is nor custom is specified - just get single list of controls
        # unstructured controls should appear after any loose params

        # make copies to avoid changing input objects
        local_cat = catalog.copy(deep=True)
        local_merged = merged.copy(deep=True) if merged else None

        merge_method = prof.CombinationMethodValidValues.keep.value
        as_is = False
        if self._profile.merge is not None:
            if self._profile.merge.custom is not None:
                raise TrestleError('Profile with custom merge is not supported.')
            if self._profile.merge.as_is is not None:
                as_is = self._profile.merge.as_is
            if self._profile.merge.combine is None:
                logger.debug('Profile has merge but no combine so defaulting to combine/merge.')
                merge_method = prof.CombinationMethodValidValues.merge.value
            else:
                merge_combine = self._profile.merge.combine
                if merge_combine.method.value is None:
                    logger.debug('Profile has merge combine but no method.  Defaulting to merge.')
                    merge_method = prof.CombinationMethodValidValues.merge.value
                else:
                    merge_method = merge_combine.method.value

        if local_merged is None:
            return self._flatten_catalog(local_cat, as_is)

        # merge the incoming catalog with merged based on merge_method and as_is
        return self._merge_two_catalogs(local_merged, local_cat, merge_method, as_is)

    def process(self, pipelines: List[Pipeline]) -> Iterator[cat.Catalog]:  # type: ignore
        """
        Merge the incoming catalogs.

        This pulls from import and iterates over the incoming catalogs.
        The way groups, lists of controls, and controls themselves get merged is specified by the profile.
        """
        merged: Optional[cat.Catalog] = None
        logger.debug(f'merge entering process with {len(pipelines)} pipelines')
        for pipeline in pipelines:
            catalog = next(pipeline.process(None))
            merged = self._merge_catalog(merged, catalog)
        yield merged  # type: ignore
Functions¤
__init__(profile) ¤

Initialize the class with the profile.

Source code in trestle/core/resolver/merge.py
60
61
62
63
def __init__(self, profile: prof.Profile) -> None:
    """Initialize the class with the profile."""
    logger.debug('merge filter initialize')
    self._profile = profile
process(pipelines) ¤

Merge the incoming catalogs.

This pulls from import and iterates over the incoming catalogs. The way groups, lists of controls, and controls themselves get merged is specified by the profile.

Source code in trestle/core/resolver/merge.py
215
216
217
218
219
220
221
222
223
224
225
226
227
def process(self, pipelines: List[Pipeline]) -> Iterator[cat.Catalog]:  # type: ignore
    """
    Merge the incoming catalogs.

    This pulls from import and iterates over the incoming catalogs.
    The way groups, lists of controls, and controls themselves get merged is specified by the profile.
    """
    merged: Optional[cat.Catalog] = None
    logger.debug(f'merge entering process with {len(pipelines)} pipelines')
    for pipeline in pipelines:
        catalog = next(pipeline.process(None))
        merged = self._merge_catalog(merged, catalog)
    yield merged  # type: ignore

Functions¤

handler: python