38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202 | class MergeCmd(CommandPlusDocs):
"""Merge subcomponents on a trestle model."""
name = 'merge'
def _init_arguments(self) -> None:
self.add_argument(
f'-{const.ARG_ELEMENT_SHORT}',
f'--{const.ARG_ELEMENT}',
help=f'{const.ARG_DESC_ELEMENT}(s) to be merged. The last element is merged into the second last element.',
required=True
)
def _run(self, args: argparse.Namespace) -> int:
"""Merge elements into the parent oscal model."""
try:
log.set_log_level_from_args(args)
# remove any quotes passed in as on windows platforms
elements_clean = args.element.strip("'")
element_paths = elements_clean.split(',')
trace.log(f'merge _run element paths {element_paths}')
cwd = Path.cwd()
rc = self.perform_all_merges(element_paths, cwd, args.trestle_root)
return rc
except Exception as e: # pragma: no cover
return handle_generic_command_exception(e, logger, 'Error while merging subcomponents on a trestle model')
@classmethod
def perform_all_merges(cls, element_paths: List[str], effective_cwd: Path, trestle_root: Path) -> int:
"""Run all merges over a list of element paths."""
for element_path in element_paths:
logger.debug(f'merge {element_path}')
plan = cls.merge(effective_cwd, ElementPath(element_path), trestle_root)
plan.execute()
return CmdReturnCodes.SUCCESS.value
@classmethod
def merge(cls, effective_cwd: Path, element_path: ElementPath, trestle_root: Path) -> Plan:
"""Merge operations.
It returns a plan for the operation
"""
if not element_path.is_multipart():
raise TrestleError(
'Multiple parts of an element path must be passed to merge e.g. catalog.* or catalog.groups'
)
target_model_alias = element_path.get_last()
logger.debug(f'merge element path list: {element_path} target model alias {target_model_alias}')
# 1. Load desination model into a stripped model
# Load destination model
destination_path = element_path.get_preceding_path()
destination_model_alias = destination_path.get_last()
trace.log(f'merge destination model alias: {destination_model_alias}')
trace.log('merge getting contextual file type effective working directory')
# Destination model filetype
file_type = file_utils.get_contextual_file_type(effective_cwd)
trace.log(f'contextual file type is {file_type}')
file_ext = FileContentType.to_file_extension(file_type)
# Destination model filename
destination_model_path = (
effective_cwd / f'{classname_to_alias(destination_model_alias, AliasMode.JSON)}{file_ext}'
)
trace.log(f'destination model filename is {destination_model_path}')
destination_model_type, _ = ModelUtils.get_stripped_model_type(destination_model_path, trestle_root)
destination_model_object: OscalBaseModel = None
if destination_model_path.exists():
trace.log('dest filename exists so read it')
destination_model_object = destination_model_type.oscal_read(destination_model_path)
# 2. If target is wildcard, load distributed destination model and replace destination model.
# Handle WILDCARD '*' match. Return plan to load the destination model, with its distributed attributes
if target_model_alias == '*':
trace.log('handle target model alias wildcard')
collection_type = None
if destination_model_type.is_collection_container():
collection_type = destination_model_type.get_collection_type()
merged_model_type, _, merged_model_instance = ModelUtils.load_distributed(
destination_model_path, trestle_root, collection_type)
plan = Plan()
reset_destination_action = CreatePathAction(destination_model_path, clear_content=True)
wrapper_alias = destination_model_alias
write_destination_action = WriteFileAction(
destination_model_path, Element(merged_model_instance, wrapper_alias), content_type=file_type
)
remove_path_folder = effective_cwd / destination_model_alias
delete_target_action = RemovePathAction(remove_path_folder)
plan: Plan = Plan()
plan.add_action(reset_destination_action)
plan.add_action(write_destination_action)
plan.add_action(delete_target_action)
return plan
trace.log(f'get dest model with fields stripped: {target_model_alias}')
# Get destination model without the target field stripped
merged_model_type, _ = ModelUtils.get_stripped_model_type(destination_model_path, trestle_root,
aliases_not_to_be_stripped=[target_model_alias])
# 3. Load Target model. Target model could be stripped
try:
target_model_type = element_path.get_type(merged_model_type)
except Exception as e:
logger.debug(f'target model not found, element path list {element_path} type {merged_model_type}')
raise TrestleError(
f'Target model not found. Possibly merge of the elements not allowed at this point. {str(e)}'
)
target_model_path = effective_cwd / destination_model_alias
trace.log(f'look for target model path {target_model_path} at dest alias {destination_model_alias} rel to cwd')
# target_model filename - depends whether destination model is decomposed or not
if target_model_path.exists():
trace.log(f'target model path does exist so target path is subdir with target alias {target_model_alias}')
target_model_path = target_model_path / target_model_alias
else:
trace.log(f'target model filename does not exist so target path is target alias {target_model_alias}')
target_model_path = target_model_path / target_model_alias # FIXME this is same as above
trace.log(f'final target model path is {target_model_path}')
# if target model is a file then handle file. If file doesn't exist, handle the directory,
# but in this case it's a list or a dict collection type
target_model_filename = target_model_path.with_suffix(file_ext)
if target_model_filename.exists():
trace.log(f'target model path with extension does exist so load distrib {target_model_filename}')
_, _, target_model_object = ModelUtils.load_distributed(target_model_filename, trestle_root)
else:
target_model_filename = Path(target_model_path)
trace.log(f'target model path plus extension does not exist so load distrib {target_model_filename}')
trace.log(f'get collection type for model type {target_model_type}')
collection_type = type_utils.get_origin(target_model_type)
trace.log(f'load {target_model_filename} as collection type {collection_type}')
_, _, target_model_object = ModelUtils.load_distributed(target_model_filename,
trestle_root, collection_type)
if hasattr(target_model_object, '__dict__') and '__root__' in target_model_object.__dict__:
trace.log('loaded object has dict and root so set target model object to root contents')
target_model_object = target_model_object.__dict__['__root__']
# 4. Insert target model into destination model.
merged_dict = {}
if destination_model_object is not None:
merged_dict = destination_model_object.__dict__
merged_dict[target_model_alias] = target_model_object
merged_model_object = merged_model_type(**merged_dict)
merged_destination_element = Element(merged_model_object)
# 5. Create action plan
trace.log(f'create path action clear content: {destination_model_path}')
reset_destination_action = CreatePathAction(destination_model_path, clear_content=True)
trace.log(f'write file action {destination_model_path}')
write_destination_action = WriteFileAction(
destination_model_path, merged_destination_element, content_type=file_type
)
# FIXME this will delete metadata.json but it will leave metadata/roles/roles.*
# need to clean up all lower dirs
trace.log(f'remove path action {target_model_filename}')
delete_target_action = RemovePathAction(target_model_filename)
plan: Plan = Plan()
plan.add_action(reset_destination_action)
plan.add_action(write_destination_action)
plan.add_action(delete_target_action)
# TODO: Destination model directory is empty or already merged? Then clean up.
return plan
|