Skip to content

trestle.core.commands.split

trestle.core.commands.split ¤

Trestle Split Command.

Attributes¤

logger = logging.getLogger(__name__) module-attribute ¤

trace = log.Trace(logger) module-attribute ¤

Classes¤

AliasTracker ¤

Bases: TrestleBaseModel

Convenience class to track writing out of models.

Source code in trestle/core/commands/split.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class AliasTracker(TrestleBaseModel):
    """Convenience class to track writing out of models."""

    # This tracks the parts that need to be split from each element
    # and makes sure it is written out once

    aliases: List[str]
    written: bool = False

    def add_alias(self, alias: str) -> None:
        """Add alias."""
        if alias not in self.aliases:
            self.aliases.append(alias)

    def get_aliases(self) -> List[str]:
        """Get the list of aliases."""
        return self.aliases

    def needs_writing(self) -> bool:
        """Need to write the model."""
        return not self.written

    def mark_written(self) -> None:
        """Mark this model as written."""
        self.written = True
Attributes¤
aliases instance-attribute ¤
written = False class-attribute instance-attribute ¤
Functions¤
add_alias(alias) ¤

Add alias.

Source code in trestle/core/commands/split.py
50
51
52
53
def add_alias(self, alias: str) -> None:
    """Add alias."""
    if alias not in self.aliases:
        self.aliases.append(alias)
get_aliases() ¤

Get the list of aliases.

Source code in trestle/core/commands/split.py
55
56
57
def get_aliases(self) -> List[str]:
    """Get the list of aliases."""
    return self.aliases
mark_written() ¤

Mark this model as written.

Source code in trestle/core/commands/split.py
63
64
65
def mark_written(self) -> None:
    """Mark this model as written."""
    self.written = True
needs_writing() ¤

Need to write the model.

Source code in trestle/core/commands/split.py
59
60
61
def needs_writing(self) -> bool:
    """Need to write the model."""
    return not self.written

SplitCmd ¤

Bases: CommandPlusDocs

Split subcomponents on a trestle model.

Source code in trestle/core/commands/split.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
class SplitCmd(CommandPlusDocs):
    """Split subcomponents on a trestle model."""

    name = 'split'

    def _init_arguments(self) -> None:
        self.add_argument(
            f'-{const.ARG_FILE_SHORT}', f'--{const.ARG_FILE}', help=const.ARG_DESC_FILE + ' to split.', required=False
        )
        self.add_argument(
            f'-{const.ARG_ELEMENT_SHORT}',
            f'--{const.ARG_ELEMENT}',
            help=const.ARG_DESC_ELEMENT + ' to split.',
            required=False
        )

    def _run(self, args: argparse.Namespace) -> int:
        """Split an OSCAL file into elements."""
        try:
            log.set_log_level_from_args(args)
            trace.log('Entering trestle split.')
            # get the Model
            args_raw: Dict[str, str] = args.__dict__

            # remove any quotes passed in as on windows platforms
            elements_clean: str = args_raw[const.ARG_ELEMENT].strip("'")

            file_name = ''
            file_name = '' if not args_raw[const.ARG_FILE] else args_raw[const.ARG_FILE]
            # cwd must be in the model directory if file to split is not specified
            effective_cwd = pathlib.Path.cwd()

            return self.perform_split(effective_cwd, file_name, elements_clean, args.trestle_root)
        except Exception as e:  # pragma: no cover
            return handle_generic_command_exception(e, logger, 'Error while performing a split operation')

    @classmethod
    def perform_split(
        cls, effective_cwd: pathlib.Path, file_name: str, elements: str, trestle_root: pathlib.Path
    ) -> int:
        """Perform the split operation.

        Args:
            effective_cwd: effective directory in which the the split operation is performed
            file_name: file name of model to split, or '' if deduced from elements and cwd
            elements: comma separated list of paths to strip from the file, with quotes removed

        Returns:
            0 on success and 1 on failure
        """
        file_path_list: List[Tuple[str, str]] = []

        if file_name:
            file_path_list.append((file_name, elements))
        else:
            # cwd must be in the model directory if file to split is not specified
            # find top directory for this model based on trestle root and cwd
            model_dir = file_utils.extract_project_model_path(effective_cwd)
            if model_dir is None:
                raise TrestleError('Current directory must be within a model directory if file is not specified')

            content_type: FileContentType = FileContentType.dir_to_content_type(model_dir)

            # determine the file needed for each split path
            element_paths = elements.split(',')
            for path in element_paths:
                element_path = ElementPath(path)
                # if element path is relative use directory context to determine absolute path
                element_path.make_absolute(model_dir, effective_cwd)
                file_path = element_path.find_last_file_in_path(content_type, model_dir)
                # now make the element path relative to the model file to be loaded
                if file_path is None or element_path.make_relative(file_path.relative_to(model_dir)) != 0:
                    raise TrestleError(f'Unable to match element path with files in model directory {element_path}')

                file_path_list.append((file_path, element_path.to_string()))

        # match paths to corresponding files since several paths may be split from the same file
        file_path_dict: Dict[str, str] = {}
        for file_path in file_path_list:
            key = file_path[0]  # type: ignore
            path = file_path[1]  # type: ignore
            if key not in file_path_dict:
                file_path_dict[key] = path
            else:
                current_path = file_path_dict[key]
                file_path_dict[key] = f'{current_path},{path}'

        for raw_file_name, element_path in file_path_dict.items():
            file_path = file_utils.relative_resolve(pathlib.Path(raw_file_name), effective_cwd)
            # this makes assumptions that the path is relative.
            if not file_path.exists():
                raise TrestleError(f'File {file_path} does not exist.')

            content_type = FileContentType.to_content_type(file_path.suffix)

            # find the base directory of the file
            base_dir = file_path.parent
            model_type, _ = ModelUtils.get_stripped_model_type(file_path, trestle_root)

            model: OscalBaseModel = model_type.oscal_read(file_path)

            if cmd_utils.split_is_too_fine(element_path, model):
                raise TrestleError('Cannot split the model to the level of uuids, strings, etc.')

            # use the model itself to resolve any wildcards and create list of element paths
            logger.debug(f'split calling parse_element_args on {element_path}')
            # use contextual mode to parse

            element_paths: List[ElementPath] = cmd_utils.parse_element_args(
                model, element_path.split(','), base_dir.relative_to(trestle_root)
            )

            # analyze the split tree and determine which aliases should be stripped from each file
            aliases_to_strip = cls.find_aliases_to_strip(element_paths)

            # need the file name relative to the base directory
            file_name_no_path = str(file_path.name)

            split_plan = cls.split_model(
                model, element_paths, base_dir, content_type, file_name_no_path, aliases_to_strip
            )
            trash.store(file_path, True)

            try:
                split_plan.execute()
            except Exception as e:
                trash.recover(file_path, True)
                raise TrestleError(f'Split has failed with error: {e}.')

        return CmdReturnCodes.SUCCESS.value

    @classmethod
    def prepare_sub_model_split_actions(
        cls,
        sub_model_item: OscalBaseModel,
        sub_model_dir: pathlib.Path,
        file_prefix: str,
        content_type: FileContentType
    ) -> List[Action]:
        """Create split actions of sub model."""
        actions: List[Action] = []
        file_name = cmd_utils.to_model_file_name(sub_model_item, file_prefix, content_type)
        model_type = classname_to_alias(type(sub_model_item).__name__, AliasMode.JSON)
        sub_model_file = sub_model_dir / file_name
        actions.append(CreatePathAction(sub_model_file))
        actions.append(WriteFileAction(sub_model_file, Element(sub_model_item, model_type), content_type))
        return actions

    @classmethod
    def split_model_at_path_chain(
        cls,
        model_obj: OscalBaseModel,
        element_paths: List[ElementPath],
        base_dir: pathlib.Path,
        content_type: FileContentType,
        cur_path_index: int,
        split_plan: Plan,
        strip_root: bool,
        root_file_name: str,
        aliases_to_strip: Dict[str, AliasTracker],
        last_one: bool = True
    ) -> int:
        """Recursively split the model at the provided chain of element paths.

        It assumes that a chain of element paths starts at the cur_path_index with the first path ending
        with a wildcard (*)

        If the wildcard follows an element that is inherently a list of items, the list of items is extracted.
        But if the wildcard follows a generic model than members of that model class found in the model will be
        split off.  But only the non-trivial elements are removed, i.e. not str, int, datetime, etc.

        Args:
            model_obj: The OscalBaseModel to be split
            element_paths: The List[ElementPath] of elements to split, including embedded wildcards
            base_dir: pathlib.Path of the file being split
            content_type: json or yaml files
            cur_path_index: Index into the list of element paths for the current split operation
            split_plan: The accumulated plan of actions needed to perform the split
            strip_root: Whether to strip elements from the root object
            root_file_name: Filename of root file that gets split into a list of items
            aliases_to_strip: AliasTracker previously loaded with aliases that need to be split from each element
            last_one: bool indicating last item in array has been split and stripped model can now be written

        Returns:
            int representing the index where the chain of the path ends.

        Examples:
            For example, element paths could have a list of paths as below for a `ComponentDefinition` model where
            the first path is the start of the chain.

            For each of the sub model described by the first element path (e.g component-defintion.components.*) in the
            chain, the subsequent paths (e.g component.control-implementations.*) will be applied recursively
            to retrieve the sub-sub models:
            [
                'component-definition.component.*',
                'component.control-implementations.*'
            ]
            for a command like below:
            trestle split -f component.yaml -e component-definition.components.*.control-implementations.*
        """
        if split_plan is None:
            raise TrestleError('Split plan must have been initialized')

        if cur_path_index < 0:
            raise TrestleError('Current index of the chain of paths cannot be less than 0')

        # if there are no more element_paths, return the current plan
        if cur_path_index >= len(element_paths):
            return cur_path_index

        # initialize local variables
        element = Element(model_obj)
        stripped_field_alias: List[str] = []

        # get the sub_model specified by the element_path of this round
        element_path = element_paths[cur_path_index]

        # does the next element_path point back at me
        is_parent = cur_path_index + 1 < len(element_paths) and element_paths[cur_path_index
                                                                              + 1].get_parent() == element_path

        # root dir name for sub models dir
        # 00000__group.json will have the root_dir name as 00000__group for sub models of group
        # catalog.json will have the root_dir name as catalog
        root_dir = ''
        if root_file_name != '':
            root_dir = str(pathlib.Path(root_file_name).with_suffix(''))

        sub_models = element.get_at(element_path, False)  # we call sub_models as in plural, but it can be just one

        # assume cur_path_index is the end of the chain
        # value of this variable may change during recursive split of the sub-models below
        path_chain_end = cur_path_index

        # if wildcard is present in the element_path and the next path in the chain has current path as the parent,
        # Then deal with case of list, or split of arbitrary oscalbasemodel
        if is_parent and element_path.get_last() is not ElementPath.WILDCARD:
            # create dir for all sub model items
            sub_models_dir = base_dir / element_path.to_root_path()
            sub_model_plan = Plan()
            path_chain_end = cls.split_model_at_path_chain(
                sub_models,
                element_paths,
                sub_models_dir,
                content_type,
                cur_path_index + 1,
                sub_model_plan,
                True,
                '',
                aliases_to_strip
            )
            sub_model_actions = sub_model_plan.get_actions()
            split_plan.add_actions(sub_model_actions)
        elif element_path.get_last() == ElementPath.WILDCARD:
            # extract sub-models into a dict with appropriate prefix
            sub_model_items: Dict[str, OscalBaseModel] = {}
            sub_models_dir = base_dir / element_path.to_file_path(root_dir=root_dir)
            if isinstance(sub_models, list):
                for i, sub_model_item in enumerate(sub_models):
                    # e.g. `groups/00000_groups/`
                    prefix = str(i).zfill(const.FILE_DIGIT_PREFIX_LENGTH)
                    sub_model_items[prefix] = sub_model_item

            # process list sub model items
            count = 0
            for key, sub_model_item in sub_model_items.items():
                count += 1
                # recursively split the sub-model if there are more element paths to traverse
                # e.g. split component.control-implementations.*
                require_recursive_split = cur_path_index + 1 < len(element_paths) and element_paths[
                    cur_path_index + 1].get_parent() == element_path

                if require_recursive_split:
                    # prepare individual directory for each sub-model
                    sub_root_file_name = cmd_utils.to_model_file_name(sub_model_item, key, content_type)
                    sub_model_plan = Plan()

                    last_one: bool = count == len(sub_model_items)
                    path_chain_end = cls.split_model_at_path_chain(
                        sub_model_item,
                        element_paths,
                        sub_models_dir,
                        content_type,
                        cur_path_index + 1,
                        sub_model_plan,
                        True,
                        sub_root_file_name,
                        aliases_to_strip,
                        last_one
                    )
                    sub_model_actions = sub_model_plan.get_actions()
                else:
                    sub_model_actions = cls.prepare_sub_model_split_actions(
                        sub_model_item, sub_models_dir, key, content_type
                    )

                split_plan.add_actions(sub_model_actions)
        else:
            # the chain of path ends at the current index.
            # so no recursive call. Let's just write the sub model to the file and get out
            if sub_models is not None:
                sub_model_file = base_dir / element_path.to_file_path(content_type, root_dir=root_dir)
                split_plan.add_action(CreatePathAction(sub_model_file))
                split_plan.add_action(
                    WriteFileAction(sub_model_file, Element(sub_models, element_path.get_element_name()), content_type)
                )

        # Strip the root model and add a WriteAction for the updated model object in the plan
        if strip_root:
            full_path = element_path.get_full()
            path = '.'.join(full_path.split('.')[:-1])
            aliases = [element_path.get_element_name()]
            need_to_write = True
            use_alias_dict = aliases_to_strip is not None and path in aliases_to_strip
            if use_alias_dict:
                aliases = aliases_to_strip[path].get_aliases()
                need_to_write = aliases_to_strip[path].needs_writing()

            stripped_model = model_obj.stripped_instance(stripped_fields_aliases=aliases)
            # can mark it written even if it doesn't need writing since it is empty
            # but if an array only mark it written if it's the last one
            if last_one and use_alias_dict:
                aliases_to_strip[path].mark_written()
            # If it's an empty model after stripping the fields, don't create path and don't write
            field_list = [x for x in model_obj.__fields__.keys() if model_obj.__fields__[x] is not None]
            if set(field_list) == set(stripped_field_alias):
                return path_chain_end

            if need_to_write:
                if root_file_name != '':
                    root_file = base_dir / root_file_name
                else:
                    root_file = base_dir / element_path.to_root_path(content_type)

                split_plan.add_action(CreatePathAction(root_file))
                wrapper_alias = classname_to_alias(stripped_model.__class__.__name__, AliasMode.JSON)
                split_plan.add_action(WriteFileAction(root_file, Element(stripped_model, wrapper_alias), content_type))

        # return the end of the current path chain
        return path_chain_end

    @classmethod
    def split_model(
        cls,
        model_obj: OscalBaseModel,
        element_paths: List[ElementPath],
        base_dir: pathlib.Path,
        content_type: FileContentType,
        root_file_name: str,
        aliases_to_strip: Dict[str, AliasTracker]
    ) -> Plan:
        """Split the model at the provided element paths.

        It returns a plan for the operation
        """
        # initialize plan
        split_plan = Plan()

        # loop through the element path list and update the split_plan
        stripped_field_alias = []
        cur_path_index = 0
        while cur_path_index < len(element_paths):
            # extract the sub element name for each of the root path of the path chain
            element_path = element_paths[cur_path_index]

            if element_path.get_parent() is None and len(element_path.get()) > 1:
                stripped_part = element_path.get()[1]
                if stripped_part == ElementPath.WILDCARD:
                    stripped_field_alias.append('__root__')
                else:
                    if stripped_part not in stripped_field_alias:
                        stripped_field_alias.append(stripped_part)

            # split model at the path chain
            cur_path_index = cls.split_model_at_path_chain(
                model_obj,
                element_paths,
                base_dir,
                content_type,
                cur_path_index,
                split_plan,
                False,
                root_file_name,
                aliases_to_strip
            )

            cur_path_index += 1

        # strip the root model object and add a WriteAction
        stripped_root = model_obj.stripped_instance(stripped_fields_aliases=stripped_field_alias)
        # If it's an empty model after stripping the fields, don't create path and don't write
        if set(model_obj.__fields__.keys()) == set(stripped_field_alias):
            return split_plan
        if root_file_name != '':
            root_file = base_dir / root_file_name
        else:
            root_file = base_dir / element_paths[0].to_root_path(content_type)
        split_plan.add_action(CreatePathAction(root_file, True))
        wrapper_alias = classname_to_alias(stripped_root.__class__.__name__, AliasMode.JSON)
        split_plan.add_action(WriteFileAction(root_file, Element(stripped_root, wrapper_alias), content_type))

        return split_plan

    @classmethod
    def find_aliases_to_strip(cls, element_paths: List[ElementPath]) -> Dict[str, AliasTracker]:
        """Find list of aliases that need to be stripped as each element written out."""
        # A given path may be present in several split actions
        # Need to determine all parts stripped at each node in order to strip them all and
        # write the stripped model only once
        tracker_map: Dict[str, AliasTracker] = {}
        for element_path in element_paths:
            path = element_path.get_full()
            path_parts = path.split('.')
            alias = path_parts[-1]
            if len(path_parts) > 2 and alias != '*':
                root_path = '.'.join(path_parts[:-1])
                if root_path in tracker_map:
                    tracker_map[root_path].add_alias(alias)
                else:
                    tracker_map[root_path] = AliasTracker(aliases=[alias])
        return tracker_map
Attributes¤
name = 'split' class-attribute instance-attribute ¤
Functions¤
find_aliases_to_strip(element_paths) classmethod ¤

Find list of aliases that need to be stripped as each element written out.

Source code in trestle/core/commands/split.py
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
@classmethod
def find_aliases_to_strip(cls, element_paths: List[ElementPath]) -> Dict[str, AliasTracker]:
    """Find list of aliases that need to be stripped as each element written out."""
    # A given path may be present in several split actions
    # Need to determine all parts stripped at each node in order to strip them all and
    # write the stripped model only once
    tracker_map: Dict[str, AliasTracker] = {}
    for element_path in element_paths:
        path = element_path.get_full()
        path_parts = path.split('.')
        alias = path_parts[-1]
        if len(path_parts) > 2 and alias != '*':
            root_path = '.'.join(path_parts[:-1])
            if root_path in tracker_map:
                tracker_map[root_path].add_alias(alias)
            else:
                tracker_map[root_path] = AliasTracker(aliases=[alias])
    return tracker_map
perform_split(effective_cwd, file_name, elements, trestle_root) classmethod ¤

Perform the split operation.

Parameters:

Name Type Description Default
effective_cwd Path

effective directory in which the the split operation is performed

required
file_name str

file name of model to split, or '' if deduced from elements and cwd

required
elements str

comma separated list of paths to strip from the file, with quotes removed

required

Returns:

Type Description
int

0 on success and 1 on failure

Source code in trestle/core/commands/split.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
@classmethod
def perform_split(
    cls, effective_cwd: pathlib.Path, file_name: str, elements: str, trestle_root: pathlib.Path
) -> int:
    """Perform the split operation.

    Args:
        effective_cwd: effective directory in which the the split operation is performed
        file_name: file name of model to split, or '' if deduced from elements and cwd
        elements: comma separated list of paths to strip from the file, with quotes removed

    Returns:
        0 on success and 1 on failure
    """
    file_path_list: List[Tuple[str, str]] = []

    if file_name:
        file_path_list.append((file_name, elements))
    else:
        # cwd must be in the model directory if file to split is not specified
        # find top directory for this model based on trestle root and cwd
        model_dir = file_utils.extract_project_model_path(effective_cwd)
        if model_dir is None:
            raise TrestleError('Current directory must be within a model directory if file is not specified')

        content_type: FileContentType = FileContentType.dir_to_content_type(model_dir)

        # determine the file needed for each split path
        element_paths = elements.split(',')
        for path in element_paths:
            element_path = ElementPath(path)
            # if element path is relative use directory context to determine absolute path
            element_path.make_absolute(model_dir, effective_cwd)
            file_path = element_path.find_last_file_in_path(content_type, model_dir)
            # now make the element path relative to the model file to be loaded
            if file_path is None or element_path.make_relative(file_path.relative_to(model_dir)) != 0:
                raise TrestleError(f'Unable to match element path with files in model directory {element_path}')

            file_path_list.append((file_path, element_path.to_string()))

    # match paths to corresponding files since several paths may be split from the same file
    file_path_dict: Dict[str, str] = {}
    for file_path in file_path_list:
        key = file_path[0]  # type: ignore
        path = file_path[1]  # type: ignore
        if key not in file_path_dict:
            file_path_dict[key] = path
        else:
            current_path = file_path_dict[key]
            file_path_dict[key] = f'{current_path},{path}'

    for raw_file_name, element_path in file_path_dict.items():
        file_path = file_utils.relative_resolve(pathlib.Path(raw_file_name), effective_cwd)
        # this makes assumptions that the path is relative.
        if not file_path.exists():
            raise TrestleError(f'File {file_path} does not exist.')

        content_type = FileContentType.to_content_type(file_path.suffix)

        # find the base directory of the file
        base_dir = file_path.parent
        model_type, _ = ModelUtils.get_stripped_model_type(file_path, trestle_root)

        model: OscalBaseModel = model_type.oscal_read(file_path)

        if cmd_utils.split_is_too_fine(element_path, model):
            raise TrestleError('Cannot split the model to the level of uuids, strings, etc.')

        # use the model itself to resolve any wildcards and create list of element paths
        logger.debug(f'split calling parse_element_args on {element_path}')
        # use contextual mode to parse

        element_paths: List[ElementPath] = cmd_utils.parse_element_args(
            model, element_path.split(','), base_dir.relative_to(trestle_root)
        )

        # analyze the split tree and determine which aliases should be stripped from each file
        aliases_to_strip = cls.find_aliases_to_strip(element_paths)

        # need the file name relative to the base directory
        file_name_no_path = str(file_path.name)

        split_plan = cls.split_model(
            model, element_paths, base_dir, content_type, file_name_no_path, aliases_to_strip
        )
        trash.store(file_path, True)

        try:
            split_plan.execute()
        except Exception as e:
            trash.recover(file_path, True)
            raise TrestleError(f'Split has failed with error: {e}.')

    return CmdReturnCodes.SUCCESS.value
prepare_sub_model_split_actions(sub_model_item, sub_model_dir, file_prefix, content_type) classmethod ¤

Create split actions of sub model.

Source code in trestle/core/commands/split.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
@classmethod
def prepare_sub_model_split_actions(
    cls,
    sub_model_item: OscalBaseModel,
    sub_model_dir: pathlib.Path,
    file_prefix: str,
    content_type: FileContentType
) -> List[Action]:
    """Create split actions of sub model."""
    actions: List[Action] = []
    file_name = cmd_utils.to_model_file_name(sub_model_item, file_prefix, content_type)
    model_type = classname_to_alias(type(sub_model_item).__name__, AliasMode.JSON)
    sub_model_file = sub_model_dir / file_name
    actions.append(CreatePathAction(sub_model_file))
    actions.append(WriteFileAction(sub_model_file, Element(sub_model_item, model_type), content_type))
    return actions
split_model(model_obj, element_paths, base_dir, content_type, root_file_name, aliases_to_strip) classmethod ¤

Split the model at the provided element paths.

It returns a plan for the operation

Source code in trestle/core/commands/split.py
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
@classmethod
def split_model(
    cls,
    model_obj: OscalBaseModel,
    element_paths: List[ElementPath],
    base_dir: pathlib.Path,
    content_type: FileContentType,
    root_file_name: str,
    aliases_to_strip: Dict[str, AliasTracker]
) -> Plan:
    """Split the model at the provided element paths.

    It returns a plan for the operation
    """
    # initialize plan
    split_plan = Plan()

    # loop through the element path list and update the split_plan
    stripped_field_alias = []
    cur_path_index = 0
    while cur_path_index < len(element_paths):
        # extract the sub element name for each of the root path of the path chain
        element_path = element_paths[cur_path_index]

        if element_path.get_parent() is None and len(element_path.get()) > 1:
            stripped_part = element_path.get()[1]
            if stripped_part == ElementPath.WILDCARD:
                stripped_field_alias.append('__root__')
            else:
                if stripped_part not in stripped_field_alias:
                    stripped_field_alias.append(stripped_part)

        # split model at the path chain
        cur_path_index = cls.split_model_at_path_chain(
            model_obj,
            element_paths,
            base_dir,
            content_type,
            cur_path_index,
            split_plan,
            False,
            root_file_name,
            aliases_to_strip
        )

        cur_path_index += 1

    # strip the root model object and add a WriteAction
    stripped_root = model_obj.stripped_instance(stripped_fields_aliases=stripped_field_alias)
    # If it's an empty model after stripping the fields, don't create path and don't write
    if set(model_obj.__fields__.keys()) == set(stripped_field_alias):
        return split_plan
    if root_file_name != '':
        root_file = base_dir / root_file_name
    else:
        root_file = base_dir / element_paths[0].to_root_path(content_type)
    split_plan.add_action(CreatePathAction(root_file, True))
    wrapper_alias = classname_to_alias(stripped_root.__class__.__name__, AliasMode.JSON)
    split_plan.add_action(WriteFileAction(root_file, Element(stripped_root, wrapper_alias), content_type))

    return split_plan
split_model_at_path_chain(model_obj, element_paths, base_dir, content_type, cur_path_index, split_plan, strip_root, root_file_name, aliases_to_strip, last_one=True) classmethod ¤

Recursively split the model at the provided chain of element paths.

It assumes that a chain of element paths starts at the cur_path_index with the first path ending with a wildcard (*)

If the wildcard follows an element that is inherently a list of items, the list of items is extracted. But if the wildcard follows a generic model than members of that model class found in the model will be split off. But only the non-trivial elements are removed, i.e. not str, int, datetime, etc.

Parameters:

Name Type Description Default
model_obj OscalBaseModel

The OscalBaseModel to be split

required
element_paths List[ElementPath]

The List[ElementPath] of elements to split, including embedded wildcards

required
base_dir Path

pathlib.Path of the file being split

required
content_type FileContentType

json or yaml files

required
cur_path_index int

Index into the list of element paths for the current split operation

required
split_plan Plan

The accumulated plan of actions needed to perform the split

required
strip_root bool

Whether to strip elements from the root object

required
root_file_name str

Filename of root file that gets split into a list of items

required
aliases_to_strip Dict[str, AliasTracker]

AliasTracker previously loaded with aliases that need to be split from each element

required
last_one bool

bool indicating last item in array has been split and stripped model can now be written

True

Returns:

Type Description
int

int representing the index where the chain of the path ends.

Examples:

For example, element paths could have a list of paths as below for a ComponentDefinition model where the first path is the start of the chain.

For each of the sub model described by the first element path (e.g component-defintion.components.) in the chain, the subsequent paths (e.g component.control-implementations.) will be applied recursively to retrieve the sub-sub models: [ 'component-definition.component.', 'component.control-implementations.' ] for a command like below: trestle split -f component.yaml -e component-definition.components..control-implementations.

Source code in trestle/core/commands/split.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
@classmethod
def split_model_at_path_chain(
    cls,
    model_obj: OscalBaseModel,
    element_paths: List[ElementPath],
    base_dir: pathlib.Path,
    content_type: FileContentType,
    cur_path_index: int,
    split_plan: Plan,
    strip_root: bool,
    root_file_name: str,
    aliases_to_strip: Dict[str, AliasTracker],
    last_one: bool = True
) -> int:
    """Recursively split the model at the provided chain of element paths.

    It assumes that a chain of element paths starts at the cur_path_index with the first path ending
    with a wildcard (*)

    If the wildcard follows an element that is inherently a list of items, the list of items is extracted.
    But if the wildcard follows a generic model than members of that model class found in the model will be
    split off.  But only the non-trivial elements are removed, i.e. not str, int, datetime, etc.

    Args:
        model_obj: The OscalBaseModel to be split
        element_paths: The List[ElementPath] of elements to split, including embedded wildcards
        base_dir: pathlib.Path of the file being split
        content_type: json or yaml files
        cur_path_index: Index into the list of element paths for the current split operation
        split_plan: The accumulated plan of actions needed to perform the split
        strip_root: Whether to strip elements from the root object
        root_file_name: Filename of root file that gets split into a list of items
        aliases_to_strip: AliasTracker previously loaded with aliases that need to be split from each element
        last_one: bool indicating last item in array has been split and stripped model can now be written

    Returns:
        int representing the index where the chain of the path ends.

    Examples:
        For example, element paths could have a list of paths as below for a `ComponentDefinition` model where
        the first path is the start of the chain.

        For each of the sub model described by the first element path (e.g component-defintion.components.*) in the
        chain, the subsequent paths (e.g component.control-implementations.*) will be applied recursively
        to retrieve the sub-sub models:
        [
            'component-definition.component.*',
            'component.control-implementations.*'
        ]
        for a command like below:
        trestle split -f component.yaml -e component-definition.components.*.control-implementations.*
    """
    if split_plan is None:
        raise TrestleError('Split plan must have been initialized')

    if cur_path_index < 0:
        raise TrestleError('Current index of the chain of paths cannot be less than 0')

    # if there are no more element_paths, return the current plan
    if cur_path_index >= len(element_paths):
        return cur_path_index

    # initialize local variables
    element = Element(model_obj)
    stripped_field_alias: List[str] = []

    # get the sub_model specified by the element_path of this round
    element_path = element_paths[cur_path_index]

    # does the next element_path point back at me
    is_parent = cur_path_index + 1 < len(element_paths) and element_paths[cur_path_index
                                                                          + 1].get_parent() == element_path

    # root dir name for sub models dir
    # 00000__group.json will have the root_dir name as 00000__group for sub models of group
    # catalog.json will have the root_dir name as catalog
    root_dir = ''
    if root_file_name != '':
        root_dir = str(pathlib.Path(root_file_name).with_suffix(''))

    sub_models = element.get_at(element_path, False)  # we call sub_models as in plural, but it can be just one

    # assume cur_path_index is the end of the chain
    # value of this variable may change during recursive split of the sub-models below
    path_chain_end = cur_path_index

    # if wildcard is present in the element_path and the next path in the chain has current path as the parent,
    # Then deal with case of list, or split of arbitrary oscalbasemodel
    if is_parent and element_path.get_last() is not ElementPath.WILDCARD:
        # create dir for all sub model items
        sub_models_dir = base_dir / element_path.to_root_path()
        sub_model_plan = Plan()
        path_chain_end = cls.split_model_at_path_chain(
            sub_models,
            element_paths,
            sub_models_dir,
            content_type,
            cur_path_index + 1,
            sub_model_plan,
            True,
            '',
            aliases_to_strip
        )
        sub_model_actions = sub_model_plan.get_actions()
        split_plan.add_actions(sub_model_actions)
    elif element_path.get_last() == ElementPath.WILDCARD:
        # extract sub-models into a dict with appropriate prefix
        sub_model_items: Dict[str, OscalBaseModel] = {}
        sub_models_dir = base_dir / element_path.to_file_path(root_dir=root_dir)
        if isinstance(sub_models, list):
            for i, sub_model_item in enumerate(sub_models):
                # e.g. `groups/00000_groups/`
                prefix = str(i).zfill(const.FILE_DIGIT_PREFIX_LENGTH)
                sub_model_items[prefix] = sub_model_item

        # process list sub model items
        count = 0
        for key, sub_model_item in sub_model_items.items():
            count += 1
            # recursively split the sub-model if there are more element paths to traverse
            # e.g. split component.control-implementations.*
            require_recursive_split = cur_path_index + 1 < len(element_paths) and element_paths[
                cur_path_index + 1].get_parent() == element_path

            if require_recursive_split:
                # prepare individual directory for each sub-model
                sub_root_file_name = cmd_utils.to_model_file_name(sub_model_item, key, content_type)
                sub_model_plan = Plan()

                last_one: bool = count == len(sub_model_items)
                path_chain_end = cls.split_model_at_path_chain(
                    sub_model_item,
                    element_paths,
                    sub_models_dir,
                    content_type,
                    cur_path_index + 1,
                    sub_model_plan,
                    True,
                    sub_root_file_name,
                    aliases_to_strip,
                    last_one
                )
                sub_model_actions = sub_model_plan.get_actions()
            else:
                sub_model_actions = cls.prepare_sub_model_split_actions(
                    sub_model_item, sub_models_dir, key, content_type
                )

            split_plan.add_actions(sub_model_actions)
    else:
        # the chain of path ends at the current index.
        # so no recursive call. Let's just write the sub model to the file and get out
        if sub_models is not None:
            sub_model_file = base_dir / element_path.to_file_path(content_type, root_dir=root_dir)
            split_plan.add_action(CreatePathAction(sub_model_file))
            split_plan.add_action(
                WriteFileAction(sub_model_file, Element(sub_models, element_path.get_element_name()), content_type)
            )

    # Strip the root model and add a WriteAction for the updated model object in the plan
    if strip_root:
        full_path = element_path.get_full()
        path = '.'.join(full_path.split('.')[:-1])
        aliases = [element_path.get_element_name()]
        need_to_write = True
        use_alias_dict = aliases_to_strip is not None and path in aliases_to_strip
        if use_alias_dict:
            aliases = aliases_to_strip[path].get_aliases()
            need_to_write = aliases_to_strip[path].needs_writing()

        stripped_model = model_obj.stripped_instance(stripped_fields_aliases=aliases)
        # can mark it written even if it doesn't need writing since it is empty
        # but if an array only mark it written if it's the last one
        if last_one and use_alias_dict:
            aliases_to_strip[path].mark_written()
        # If it's an empty model after stripping the fields, don't create path and don't write
        field_list = [x for x in model_obj.__fields__.keys() if model_obj.__fields__[x] is not None]
        if set(field_list) == set(stripped_field_alias):
            return path_chain_end

        if need_to_write:
            if root_file_name != '':
                root_file = base_dir / root_file_name
            else:
                root_file = base_dir / element_path.to_root_path(content_type)

            split_plan.add_action(CreatePathAction(root_file))
            wrapper_alias = classname_to_alias(stripped_model.__class__.__name__, AliasMode.JSON)
            split_plan.add_action(WriteFileAction(root_file, Element(stripped_model, wrapper_alias), content_type))

    # return the end of the current path chain
    return path_chain_end

Functions¤

handler: python