52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227 | class Merge(Pipeline.Filter):
"""
Merge the incoming catalogs according to rules in the profile.
The incoming catalogs have already been pruned based on the import.
Now the controls must be gathered, merged, and grouped based on the merge settings.
"""
def __init__(self, profile: prof.Profile) -> None:
"""Initialize the class with the profile."""
logger.debug('merge filter initialize')
self._profile = profile
def _get_id(self, item: OBT) -> Optional[str]:
id_ = getattr(item, ID, None)
if id_ is None:
id_ = getattr(item, NAME, None)
return id_
def _item_genertor(self, dest: List[OBT], src: List[OBT], merge_method: Optional[str]) -> Iterator[OBT]:
if merge_method == prof.CombinationMethodValidValues.keep.value:
dest.extend(src)
else:
for item in src:
# if there is an exact copy of this in dest then ignore it
if item not in dest:
yield item
def _merge_item(self, item: OBT, dest: List[OBT], merge_method: Optional[str]) -> bool:
merged = False
item_id = self._get_id(item)
if item_id is not None:
for other in dest:
other_id = self._get_id(other)
if other_id != item_id:
continue
if merge_method == prof.CombinationMethodValidValues.merge.value:
self._merge_items(other, item, merge_method)
merged = True
break
return merged
def _merge_lists(self, dest: List[OBT], src: List[OBT], merge_method: Optional[str]) -> None:
added_items = []
if merge_method == prof.CombinationMethodValidValues.keep.value:
dest.extend(src)
return
for item in self._item_genertor(dest, src, merge_method):
merged = self._merge_item(item, dest, merge_method)
# it isn't already in dest and no match was found for merge, so append
if not merged:
added_items.append(item)
dest.extend(added_items)
def _merge_attrs(
self, dest: Union[OBT, List[OBT]], src: Union[OBT, List[OBT]], attr: str, merge_method: Optional[str]
) -> None:
"""Merge this attr of src into the attr of dest."""
src_attr = getattr(src, attr, None)
if src_attr is None:
return
item_type = type(src).__name__
if attr in ITEM_EXCLUDE_MAP.get(item_type, []):
return
dest_attr = getattr(dest, attr, None)
if dest_attr and isinstance(dest_attr, list):
self._merge_lists(dest_attr, src_attr, merge_method)
setattr(dest, attr, dest_attr)
return
if dest_attr and merge_method == prof.CombinationMethodValidValues.use_first.value:
return
if dest_attr == src_attr and merge_method not in [None, prof.CombinationMethodValidValues.keep.value]:
return
setattr(dest, attr, src_attr)
def _merge_items(self, dest: OBT, src: OBT, merge_method: Optional[str]) -> None:
"""Merge two items recursively."""
for field in src.__fields_set__:
self._merge_attrs(dest, src, field, merge_method)
def _group_contents(self, group: cat.Group) -> Tuple[List[cat.Control], List[com.Parameter]]:
"""Get flattened content of group and its groups recursively."""
controls = []
params = []
controls.extend(as_list(group.controls))
params.extend(as_list(group.params))
if group.groups is not None:
for sub_group in group.groups:
new_controls, new_params = self._group_contents(sub_group)
controls.extend(new_controls)
params.extend(new_params)
return controls, params
def _flatten_catalog(self, catalog: cat.Catalog, as_is: bool) -> cat.Catalog:
"""Flatten the groups of the catalog if as_is is False."""
if as_is or catalog.groups is None:
return catalog
# as_is is False so flatten the controls into a single list
catalog.controls = as_list(catalog.controls)
catalog.params = as_list(catalog.params)
for group in catalog.groups:
new_controls, new_params = self._group_contents(group)
catalog.controls.extend(new_controls)
catalog.params.extend(new_params)
catalog.controls = none_if_empty(catalog.controls)
catalog.params = none_if_empty(catalog.params)
catalog.groups = None
return catalog
def _merge_two_catalogs(
self, dest: cat.Catalog, src: cat.Catalog, merge_method: Optional[str], as_is: bool
) -> cat.Catalog:
# merge_method is use_first, merge, keep
# no combine or merge_method equates to merge_method=keep
# if as_is is false, the result is flattened
dest = self._flatten_catalog(dest, as_is)
src = self._flatten_catalog(src, as_is)
self._merge_items(dest, src, merge_method)
return dest
def _merge_catalog(self, merged: Optional[cat.Catalog], catalog: cat.Catalog) -> cat.Catalog:
"""Merge the controls in the catalog into merged catalog."""
# no merge means keep, including dups
# same for merge with no combine
# groups are merged only if separate directive such as as-is is given
# use-first is a merge combination rule
# merge is a merge combination rule for controls. groups are not merged by this rule
# merge/as-is and merge/custom are used for merging groups
# if neither as-is nor custom is specified - just get single list of controls
# unstructured controls should appear after any loose params
# make copies to avoid changing input objects
local_cat = catalog.copy(deep=True)
local_merged = merged.copy(deep=True) if merged else None
merge_method = prof.CombinationMethodValidValues.keep.value
as_is = False
if self._profile.merge is not None:
if self._profile.merge.custom is not None:
raise TrestleError('Profile with custom merge is not supported.')
if self._profile.merge.as_is is not None:
as_is = self._profile.merge.as_is
if self._profile.merge.combine is None:
logger.debug('Profile has merge but no combine so defaulting to combine/merge.')
merge_method = prof.CombinationMethodValidValues.merge.value
else:
merge_combine = self._profile.merge.combine
if merge_combine.method.value is None:
logger.debug('Profile has merge combine but no method. Defaulting to merge.')
merge_method = prof.CombinationMethodValidValues.merge.value
else:
merge_method = merge_combine.method.value
if local_merged is None:
return self._flatten_catalog(local_cat, as_is)
# merge the incoming catalog with merged based on merge_method and as_is
return self._merge_two_catalogs(local_merged, local_cat, merge_method, as_is)
def process(self, pipelines: List[Pipeline]) -> Iterator[cat.Catalog]: # type: ignore
"""
Merge the incoming catalogs.
This pulls from import and iterates over the incoming catalogs.
The way groups, lists of controls, and controls themselves get merged is specified by the profile.
"""
merged: Optional[cat.Catalog] = None
logger.debug(f'merge entering process with {len(pipelines)} pipelines')
for pipeline in pipelines:
catalog = next(pipeline.process(None))
merged = self._merge_catalog(merged, catalog)
yield merged # type: ignore
|