Skip to content

trestle.common.model_utils

trestle.common.model_utils ¤

Common utilities for the OSCAL models and directories.

Attributes¤

logger = logging.getLogger(__name__) module-attribute ¤

Classes¤

ModelUtils ¤

Utilities for the OSCAL models input and output.

Source code in trestle/common/model_utils.py
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
class ModelUtils:
    """Utilities for the OSCAL models input and output."""

    @staticmethod
    def load_distributed(
        abs_path: Path,
        abs_trestle_root: Path,
        collection_type: Optional[Type[Any]] = None
    ) -> Tuple[Type[OscalBaseModel],
               str,
               Optional[Union[OscalBaseModel, List[OscalBaseModel], Dict[str, OscalBaseModel]]]]:
        """
        Given path to a model, load the model.

        If the model is decomposed/split/distributed,the decomposed models are loaded recursively.

        Args:
            abs_path: The path to the file/directory to be loaded.
            abs_trestle_root: The trestle workspace root directory.
            collection_type: The type of collection model, if it is a collection model.
                typing.List is the only collection type handled or expected.
                Defaults to None.

        Returns:
            Return a tuple of Model Type (e.g. class 'trestle.oscal.catalog.Catalog'),
            Model Alias (e.g. 'catalog.metadata') and Instance of the Model.
            If the model is decomposed/split/distributed, the instance of the model contains
                the decomposed models loaded recursively.

        Note:
            This does not validate the model.  You must either validate the model separately or use the load_validate
            utilities.
        """
        # if trying to load file that does not exist, load path instead
        if not abs_path.exists():
            abs_path = abs_path.with_name(abs_path.stem)

        if not abs_path.exists():
            raise TrestleNotFoundError(f'File {abs_path} not found for load.')

        if collection_type:
            # If the path contains a list type model
            if collection_type is list:
                return ModelUtils._load_list(abs_path, abs_trestle_root)
            # the only other collection type in OSCAL is dict, and it only applies to include_all,
            # which is too granular ever to be loaded by this routine
            else:
                raise TrestleError(f'Collection type {collection_type} not recognized for distributed load.')

        # Get current model
        primary_model_type, primary_model_alias = ModelUtils.get_stripped_model_type(abs_path, abs_trestle_root)
        primary_model_instance: Optional[Union[OscalBaseModel, List[OscalBaseModel], Dict[str, OscalBaseModel]]] = None

        # is this an attempt to load an actual json or yaml file?
        content_type = FileContentType.path_to_content_type(abs_path)
        # if file is sought but it doesn't exist, ignore and load as decomposed model
        if FileContentType.is_readable_file(content_type) and abs_path.exists():
            primary_model_instance = primary_model_type.oscal_read(abs_path)
        # Is model decomposed?
        decomposed_dir = abs_path.with_name(abs_path.stem)

        if decomposed_dir.exists():
            aliases_not_to_be_stripped = []
            instances_to_be_merged: List[OscalBaseModel] = []

            for local_path in sorted(trestle.common.file_utils.iterdir_without_hidden_files(decomposed_dir)):
                if local_path.is_file():
                    model_type, model_alias, model_instance = ModelUtils.load_distributed(local_path, abs_trestle_root)
                    aliases_not_to_be_stripped.append(model_alias.split('.')[-1])
                    instances_to_be_merged.append(model_instance)

                elif local_path.is_dir():
                    model_type, model_alias = ModelUtils.get_stripped_model_type(local_path, abs_trestle_root)
                    # Only load the directory if it is a collection model. Otherwise do nothing - it gets loaded when
                    # iterating over the model file

                    # If a model is just a container for a list e.g.
                    # class Foo(OscalBaseModel):  noqa: E800
                    #      __root__: List[Bar]    noqa: E800
                    # You need to test whether first a root key exists
                    # then whether the outer_type of root is a collection.
                    # Alternative is to do a try except to avoid the error for an unknown key.

                    if model_type.is_collection_container():
                        # This directory is a decomposed List or Dict
                        collection_type = model_type.get_collection_type()
                        model_type, model_alias, model_instance = ModelUtils.load_distributed(local_path,
                                                                                              abs_trestle_root,
                                                                                              collection_type)
                        aliases_not_to_be_stripped.append(model_alias.split('.')[-1])
                        instances_to_be_merged.append(model_instance)
            primary_model_dict = {}
            if primary_model_instance is not None:
                primary_model_dict = primary_model_instance.__dict__

            merged_model_type, merged_model_alias = ModelUtils.get_stripped_model_type(abs_path,
                                                                                       abs_trestle_root,
                                                                                       aliases_not_to_be_stripped)

            # The following use of top_level is to allow loading of a top level model by name only, e.g. MyCatalog
            # There may be a better overall way to approach this.
            top_level = len(merged_model_alias.split('.')) == 1

            for i in range(len(aliases_not_to_be_stripped)):
                alias = aliases_not_to_be_stripped[i]
                instance = instances_to_be_merged[i]
                if hasattr(instance, '__dict__') and '__root__' in instance.__dict__ and isinstance(instance,
                                                                                                    OscalBaseModel):
                    instance = instance.__dict__['__root__']
                if top_level and not primary_model_dict:
                    primary_model_dict = instance.__dict__
                else:
                    primary_model_dict[alias] = instance

            merged_model_instance = merged_model_type(**primary_model_dict)
            return merged_model_type, merged_model_alias, merged_model_instance
        return primary_model_type, primary_model_alias, primary_model_instance

    @staticmethod
    def load_model_for_class(
        trestle_root: pathlib.Path,
        model_name: str,
        model_class: TG,
        file_content_type: Optional[FileContentType] = None
    ) -> Tuple[TG, pathlib.Path]:
        """Load a model by name and model class and infer file content type if not specified.

        If you need to load an existing model but its content type may not be known, use this method.
        But the file content type should be specified if it is somehow known.

        Note:
            This does not validate the model.  If you want to validate the model use the load_validate utilities.
        """
        root_model_path = ModelUtils._root_path_for_top_level_model(
            trestle_root, model_name, model_class
        )  # type: ignore
        if file_content_type is None:
            file_content_type = FileContentType.path_to_content_type(root_model_path)
        if not FileContentType.is_readable_file(file_content_type):
            raise TrestleError(f'Unable to load model {model_name} without specifying json or yaml.')
        full_model_path = root_model_path.with_suffix(FileContentType.to_file_extension(file_content_type))
        _, _, model = ModelUtils.load_distributed(full_model_path, trestle_root)
        return model, full_model_path  # type: ignore

    @staticmethod
    def load_model_for_type(trestle_root: pathlib.Path, model_type: str,
                            model_name: str) -> Tuple[TopLevelOscalModel, pathlib.Path]:
        """Load model for the given type and name."""
        dir_name = ModelUtils.model_type_to_model_dir(model_type)
        model_path = trestle_root / dir_name / model_name

        if not model_path.exists():
            raise TrestleError(f'No model is found at path: {model_path}.')

        _, _, oscal_object = ModelUtils.load_distributed(model_path, trestle_root)

        return oscal_object, model_path  # type: ignore

    @staticmethod
    def save_top_level_model(
        model: TopLevelOscalModel, trestle_root: pathlib.Path, model_name: str, file_content_type: FileContentType
    ) -> None:
        """Save a model by name and infer model type by inspection.

        You don't need to specify the model type (catalog, profile, etc.) but you must specify the file content type.
        If the model directory does not exist, it is created.
        """
        root_model_path = ModelUtils._root_path_for_top_level_model(trestle_root, model_name, model)
        full_model_path = root_model_path.with_suffix(FileContentType.to_file_extension(file_content_type))
        if not full_model_path.parent.exists():
            full_model_path.parent.mkdir(parents=True, exist_ok=True)
        model.oscal_write(full_model_path)

    @staticmethod
    def get_relative_model_type(relative_path: pathlib.Path) -> Tuple[Type[OscalBaseModel], str]:
        """
        Given the relative path of a file with respect to 'trestle_root' return the oscal model type.

        Args:
            relative_path: Relative path of the model with respect to the root directory of the trestle workspace.
        Returns:
            Type of Oscal Model for the provided model
            Alias of that oscal model.
        """
        if len(relative_path.parts) < 2:
            raise TrestleError(
                'Insufficient path length to be a valid relative path w.r.t trestle workspace root directory.'
            )
        model_dir = relative_path.parts[0]
        model_relative_path = pathlib.Path(*relative_path.parts[2:])  # catalogs, profiles, etc

        if model_dir in const.MODEL_DIR_LIST:
            module_name = const.MODEL_DIR_TO_MODEL_MODULE[model_dir]
        else:
            raise TrestleError(f'No valid trestle model type directory (e.g. catalogs) found for {model_dir}.')

        model_type, model_alias = ModelUtils.get_root_model(module_name)
        full_alias = model_alias

        for index, part in enumerate(model_relative_path.parts):
            alias = ModelUtils._extract_alias(part)
            if index > 0 or model_alias != alias:
                model_alias = alias
                full_alias = f'{full_alias}.{model_alias}'
                if utils.is_collection_field_type(model_type):
                    model_type = utils.get_inner_type(model_type)
                else:
                    model_type = model_type.alias_to_field_map()[alias].outer_type_

        return model_type, full_alias

    @staticmethod
    def get_stripped_model_type(
        absolute_path: pathlib.Path,
        absolute_trestle_root: pathlib.Path,
        aliases_not_to_be_stripped: List[str] = None
    ) -> Tuple[Type[OscalBaseModel], str]:
        """
        Get the stripped contextual model class and alias based on the contextual path.

        This function relies on the directory structure of the trestle model being edited to determine, based on the
        existing files and folder, which fields should be stripped from the model type represented by the
        path passed in as a parameter.
        """
        if aliases_not_to_be_stripped is None:
            aliases_not_to_be_stripped = []
        singular_model_type, model_alias = ModelUtils.get_relative_model_type(
            absolute_path.relative_to(absolute_trestle_root))
        logger.debug(f'singular model type {singular_model_type} model alias {model_alias}')

        # Stripped models do not apply to collection types such as List[] and Dict{}
        # if model type is a list or dict, generate a new wrapping model for it
        if utils.is_collection_field_type(singular_model_type):
            malias = model_alias.split('.')[-1]
            class_name = alias_to_classname(malias, AliasMode.JSON)
            logger.debug(f'collection field type class name {class_name} and alias {malias}')
            model_type = create_model(class_name, __base__=OscalBaseModel, __root__=(singular_model_type, ...))
            logger.debug(f'model_type created: {model_type}')
            return model_type, model_alias

        malias = model_alias.split('.')[-1]
        logger.debug(f'not collection field type, malias: {malias}')
        if absolute_path.is_dir() and malias != ModelUtils._extract_alias(absolute_path.name):
            split_subdir = absolute_path / malias
        else:
            split_subdir = absolute_path.parent / absolute_path.with_suffix('').name

        aliases_to_be_stripped = set()
        if split_subdir.exists():
            for f in iterdir_without_hidden_files(split_subdir):
                alias = ModelUtils._extract_alias(f.name)
                if alias not in aliases_not_to_be_stripped:
                    aliases_to_be_stripped.add(alias)

        logger.debug(f'aliases to be stripped: {aliases_to_be_stripped}')
        if len(aliases_to_be_stripped) > 0:
            model_type = singular_model_type.create_stripped_model_type(
                stripped_fields_aliases=list(aliases_to_be_stripped)
            )
            logger.debug(f'model_type: {model_type}')
            return model_type, model_alias
        return singular_model_type, model_alias

    @staticmethod
    def model_type_to_model_dir(model_type: str) -> str:
        """Get plural model directory from model type."""
        if model_type not in const.MODEL_TYPE_LIST:
            raise err.TrestleError(f'Not a valid model type: {model_type}.')
        return const.MODEL_TYPE_TO_MODEL_DIR[model_type]

    @staticmethod
    def get_models_of_type(model_type: str, root: pathlib.Path) -> List[str]:
        """Get list of model names for requested type in trestle directory."""
        if model_type not in const.MODEL_TYPE_LIST:
            raise err.TrestleError(f'Model type {model_type} is not supported')
        # search relative to project root
        trestle_root = extract_trestle_project_root(root)
        if not trestle_root:
            logger.error(f'Given directory {root} is not within a trestle project.')
            raise err.TrestleError('Given directory is not within a trestle project.')

        # contruct path to the model file name
        model_dir_name = ModelUtils.model_type_to_model_dir(model_type)
        root_model_dir = trestle_root / model_dir_name
        model_list = []
        for f in root_model_dir.glob('*/'):
            # only look for proper json and yaml files
            if not ModelUtils._should_ignore(f.stem):
                if not f.is_dir():
                    logger.warning(
                        f'Ignoring validation of misplaced file {f.name} '
                        + f'found in the model directory, {model_dir_name}.'
                    )
                else:
                    model_list.append(f.stem)
        return model_list

    @staticmethod
    def get_all_models(root: pathlib.Path) -> List[Tuple[str, str]]:
        """Get list of all models in trestle directory as tuples (model_type, model_name)."""
        full_list = []
        for model_type in const.MODEL_TYPE_LIST:
            models = ModelUtils.get_models_of_type(model_type, root)
            for m in models:
                full_list.append((model_type, m))
        return full_list

    @staticmethod
    def get_model_path_for_name_and_class(
        trestle_root: pathlib.Path,
        model_name: str,
        model_class: Type[TopLevelOscalModel],
        file_content_type: Optional[FileContentType] = None
    ) -> Optional[pathlib.Path]:
        """
        Find the full path of a model given its name, model type and file content type.

        If file_content_type is given it will not inspect the file system or confirm the needed path and file exists.
        """
        if file_content_type is None:
            root_model_path = ModelUtils._root_path_for_top_level_model(trestle_root, model_name, model_class)
            file_content_type = FileContentType.path_to_content_type(root_model_path)
            if not FileContentType.is_readable_file(file_content_type):
                return None

            return root_model_path.with_suffix(FileContentType.to_file_extension(file_content_type))

        root_path = ModelUtils._root_path_for_top_level_model(trestle_root, model_name, model_class)
        return root_path.with_suffix(FileContentType.to_file_extension(file_content_type))

    @staticmethod
    def get_singular_alias(alias_path: str, relative_path: Optional[pathlib.Path] = None) -> str:
        """
        Get the alias in the singular form from a jsonpath.

        If contextual_mode is True and contextual_path is None, it assumes alias_path
        is relative to the directory the user is running trestle from.

        Args:
            alias_path: The current alias element path as a string
            relative_path: Optional relative path (w.r.t. trestle_root) to cater for relative element paths.
        Returns:
            Alias as a string
        """
        if len(alias_path.strip()) == 0:
            raise err.TrestleError(f'Invalid jsonpath {alias_path}')

        singular_alias: str = ''

        full_alias_path = alias_path
        if relative_path:
            logger.debug(f'get_singular_alias contextual mode: {str}')
            _, full_model_alias = ModelUtils.get_relative_model_type(relative_path)
            first_alias_a = full_model_alias.split('.')[-1]
            first_alias_b = alias_path.split('.')[0]
            if first_alias_a == first_alias_b:
                full_model_alias = '.'.join(full_model_alias.split('.')[:-1])
            full_alias_path = '.'.join([full_model_alias, alias_path]).strip('.')

        path_parts = full_alias_path.split(const.ALIAS_PATH_SEPARATOR)
        logger.debug(f'path parts: {path_parts}')

        model_types = []

        root_model_alias = path_parts[0]
        found = False
        for module_name in const.MODEL_TYPE_TO_MODEL_MODULE.values():
            model_type, model_alias = ModelUtils.get_root_model(module_name)
            if root_model_alias == model_alias:
                found = True
                model_types.append(model_type)
                break

        if not found:
            raise err.TrestleError(f'{root_model_alias} is an invalid root model alias.')

        if len(path_parts) == 1:
            return root_model_alias

        model_type = model_types[0]
        # go through path parts skipping first one
        for i in range(1, len(path_parts)):
            if utils.is_collection_field_type(model_type):
                # if it is a collection type and last part is * then break
                if i == len(path_parts) - 1 and path_parts[i] == '*':
                    break
                # otherwise get the inner type of items in the collection
                model_type = utils.get_inner_type(model_type)
                # and bump i
                i = i + 1
            else:
                path_part = path_parts[i]
                field_map = model_type.alias_to_field_map()
                if path_part not in field_map:
                    continue
                field = field_map[path_part]
                model_type = field.outer_type_
            model_types.append(model_type)

        last_alias = path_parts[-1]
        if last_alias == '*':
            last_alias = path_parts[-2]

        # generic model and not list, so return itself fixme doc
        if not utils.is_collection_field_type(model_type):
            return last_alias

        parent_model_type = model_types[-2]
        try:
            field_map = parent_model_type.alias_to_field_map()
            field = field_map[last_alias]
            outer_type = field.outer_type_
            inner_type = utils.get_inner_type(outer_type)
            inner_type_name = inner_type.__name__
            singular_alias = str_utils.classname_to_alias(inner_type_name, AliasMode.JSON)
        except Exception as e:
            raise err.TrestleError(f'Error in json path {alias_path}: {e}')

        return singular_alias

    @staticmethod
    def get_root_model(module_name: str) -> Tuple[Type[Any], str]:
        """Get the root model class and alias based on the module."""
        try:
            module = importlib.import_module(module_name)
        except ModuleNotFoundError as e:
            raise err.TrestleError(str(e))

        if hasattr(module, 'Model'):
            model_metadata = next(iter(module.Model.__fields__.values()))
            return model_metadata.type_, model_metadata.alias
        raise err.TrestleError('Invalid module')

    @staticmethod
    def _root_path_for_top_level_model(
        trestle_root: pathlib.Path, model_name: str, model_class: Union[TopLevelOscalModel, Type[TopLevelOscalModel]]
    ) -> pathlib.Path:
        """
        Find the root path to a model given its name and class - with no suffix.

        This is a private method used only to construct the root filepath based on model name and type.
        It does not check for existence or content type and it does not create the directory if it does not exist.
        """
        if not hasattr(model_class, '__module__') or model_class.__module__ not in const.MODEL_MODULE_LIST:
            raise TrestleError(f'Unable to determine model type for model {model_name} with class {model_class}')
        model_alias = const.MODEL_MODULE_TO_MODEL_TYPE[model_class.__module__]
        model_dir = trestle_root / f'{const.MODEL_TYPE_TO_MODEL_DIR[model_alias]}/{model_name}'
        return model_dir / model_alias

    @staticmethod
    def _extract_alias(string_dir: str) -> str:
        """
        Extract alias from filename or directory name removing extensions and prefixes related to dict and list.

        As we need to do this for multiple parts of a path operating on strings is easier.
        """
        alias = string_dir.split('.')[0].split(
            const.IDX_SEP
        )[-1]  # get suffix of file or directory name representing list or dict item
        return alias

    @staticmethod
    def _should_ignore(name: str) -> bool:
        """Check if the file or directory should be ignored or not."""
        return name[0] == '.' or name[0] == '_'

    @staticmethod
    def _load_list(abs_path: Path, abs_trestle_root: Path) -> Tuple[Type[OscalBaseModel], str, List[OscalBaseModel]]:
        """Given path to a directory of list(array) models, load the distributed models."""
        aliases_not_to_be_stripped = []
        instances_to_be_merged: List[OscalBaseModel] = []
        collection_model_type, collection_model_alias = ModelUtils.get_stripped_model_type(abs_path, abs_trestle_root)
        for path in sorted(trestle.common.file_utils.iterdir_without_hidden_files(abs_path)):

            # ASSUMPTION HERE: if it is a directory, there's a file that can not be decomposed further.
            if path.is_dir():
                continue
            _, model_alias, model_instance = ModelUtils.load_distributed(path, abs_trestle_root)

            instances_to_be_merged.append(model_instance)
            aliases_not_to_be_stripped.append(model_alias.split('.')[-1])

        return collection_model_type, collection_model_alias, instances_to_be_merged

    @staticmethod
    def _parameter_to_dict_recurse(obj: Union[OscalBaseModel, str], partial: bool) -> Union[str, Dict[str, Any]]:
        """
        Convert obj to dict containing only string values with recursion.

        Args:
            obj: The parameter or its consituent parts in recursive calls
            partial: Whether to convert the entire param or just the parts needed for markdown header

        Returns:
            The converted parameter as dictionary
        """
        main_fields = ['id', 'label', 'values', 'select', 'choice', 'how_many', 'guidelines', 'prose']
        if isinstance(obj, common.Remarks):
            return obj.__root__
        if isinstance(obj, common.HowMany):
            return obj.value
        # it is either a string already or we cast it to string
        if not hasattr(obj, const.FIELDS_SET):
            return str(obj)
        # it is an oscal object and we need to recurse within its attributes
        res = {}
        for field in obj.__fields_set__:
            if partial and field not in main_fields:
                continue
            attr = getattr(obj, field)
            if not attr:
                continue
            if isinstance(attr, list):
                new_list = []
                for item in attr:
                    new_list.append(ModelUtils._parameter_to_dict_recurse(item, partial))
                res[field] = new_list
            elif isinstance(attr, str):
                res[field] = attr
            else:
                res[field] = ModelUtils._parameter_to_dict_recurse(attr, partial)
        return res

    @staticmethod
    def parameter_to_dict(obj: Union[OscalBaseModel, str], partial: bool) -> Union[str, Dict[str, Any]]:
        """
        Convert obj to dict containing only string values, storing only the fields that have values set.

        Args:
            obj: The parameter or its consituent parts in recursive calls
            partial: Whether to convert the entire param or just the parts needed for markdown header

        Returns:
            The converted parameter as dictionary, with values as None if not present
        """
        res = ModelUtils._parameter_to_dict_recurse(obj, partial)
        if 'values' not in res:
            res['values'] = None  # type: ignore
        return res

    @staticmethod
    def _string_to_howmany(count_str: str) -> Optional[str]:
        clean_str = count_str.lower().strip().replace('-', ' ').replace('_', ' ')
        if clean_str == const.ONE:
            return common.HowMany.one  # type: ignore
        if clean_str == const.ONE_OR_MORE_SPACED:
            return common.HowMany.one_or_more  # type: ignore
        return None

    @staticmethod
    def dict_to_parameter(param_dict: Dict[str, Any]) -> common.Parameter:
        """
        Convert dict with only string values to Parameter with handling for HowMany and with validity checks.

        Args:
            param_dict: Dictionary of pure string values representing Parameter contents

        Returns:
            A valid OSCAL Parameter

        Notes:
            This handles both partial and full parameter dictionaries
            It checks for validity of the values if a select and HowMany is specified
            There is special handling for values: If it is a single string it is converted to list of one ParameterValue
            But if it is a list of strings is regarded as a list of values and is converted to a list of ParameterValues
        """
        values = param_dict.get('values', [])
        # special handling when only one value present - convert to list of 1
        if isinstance(values, str):
            values = [values]
            param_dict['values'] = values
        if 'select' in param_dict and 'how_many' in param_dict['select']:
            count_str = param_dict['select']['how_many']
            how_many = ModelUtils._string_to_howmany(count_str)
            if how_many is None:
                raise TrestleError(f'Unrecognized HowMany value {how_many} in Parameter: should be one-or-more or one.')
            param_dict['select']['how_many'] = how_many
            if how_many == const.ONE and len(values) > 1:
                logger.warning(f'Parameter specifies HowMany=1 but has {len(values)} values given.')
            choices = param_dict['select'].get('choice', [])
            if choices and values:
                for value in values:
                    if value not in choices:
                        logger.warning(f"Parameter {param_dict['id']} has value \"{value}\" not in choices: {choices}.")
        props = param_dict.get('props', [])
        if const.DISPLAY_NAME in param_dict:
            display_name = param_dict.pop(const.DISPLAY_NAME)
            props.append(common.Property(name=const.DISPLAY_NAME, value=display_name, ns=const.TRESTLE_GENERIC_NS))
        if const.AGGREGATES in param_dict:
            # removing aggregates as this is prop just informative in markdown
            param_dict.pop(const.AGGREGATES)
        param_value_origin = None
        if const.PARAM_VALUE_ORIGIN in param_dict:
            param_value_origin = param_dict.pop(const.PARAM_VALUE_ORIGIN)
            if param_value_origin is not None:
                props.append(common.Property(name=const.PARAM_VALUE_ORIGIN, value=param_value_origin))
            else:
                raise TrestleError(
                    f'Parameter value origin property for parameter {param_dict["id"]}'
                    'is None and it should have a value'
                )
        if const.ALT_IDENTIFIER in param_dict:
            # removing alt-identifier as this is prop just informative in markdown
            param_dict.pop(const.ALT_IDENTIFIER)

        if 'ns' in param_dict:
            param_dict.pop('ns')
        param = common.Parameter(**param_dict)
        param.props = none_if_empty(props)
        return param

    @staticmethod
    def last_modified_at_time(timestamp: Optional[datetime] = None) -> datetime:
        """Generate a LastModified set to timestamp or now."""
        timestamp = timestamp if timestamp else datetime.now().astimezone()
        return timestamp

    @staticmethod
    def update_last_modified(model: TopLevelOscalModel, timestamp: Optional[datetime] = None) -> None:
        """Update the LastModified timestamp in top level model to now."""
        timestamp = timestamp if timestamp else datetime.now().astimezone()
        model.metadata.last_modified = timestamp

    @staticmethod
    def model_age(model: TopLevelOscalModel) -> int:
        """Find time in seconds since LastModified timestamp."""
        # default to one year if no last_modified
        age_seconds = const.DAY_SECONDS * 365
        if model.metadata.last_modified:
            dt = datetime.now().astimezone() - model.metadata.last_modified
            age_seconds = dt.seconds
        return age_seconds

    @staticmethod
    def find_values_by_name(object_of_interest: Any, name_of_interest: str) -> List[Any]:
        """Traverse object and return list of values of specified name."""
        loe = []
        if isinstance(object_of_interest, BaseModel):
            value = getattr(object_of_interest, name_of_interest, None)
            if value is not None:
                loe.append(value)
            fields = getattr(object_of_interest, const.FIELDS_SET, None)
            if fields is not None:
                for field in fields:
                    loe.extend(
                        ModelUtils.find_values_by_name(getattr(object_of_interest, field, None), name_of_interest)
                    )
        elif type(object_of_interest) is list:
            for item in object_of_interest:
                loe.extend(ModelUtils.find_values_by_name(item, name_of_interest))
        elif type(object_of_interest) is dict:
            if name_of_interest in object_of_interest:
                loe.append(object_of_interest[name_of_interest])
            for item in object_of_interest.values():
                loe.extend(ModelUtils.find_values_by_name(item, name_of_interest))
        return loe

    @staticmethod
    def has_no_duplicate_values_by_name(object_of_interest: BaseModel, name_of_interest: str) -> bool:
        """Determine if duplicate values of type exist in object."""
        loe = ModelUtils.find_values_by_name(object_of_interest, name_of_interest)
        set_loe = set(loe)
        if len(loe) == len(set_loe):
            return True
        items: Dict[str, Any] = {}
        for item in loe:
            items[item] = items.get(item, 0) + 1
        # now print items
        for item, instances in items.items():
            if instances > 1:
                logger.warning(f'Duplicate detected of item {item} with {instances} instances.')
        return False

    @staticmethod
    def find_uuid_refs(object_of_interest: BaseModel) -> Set[str]:
        """Find uuid references made in prose and links."""
        # hrefs have form #foo or #uuid
        uuid_strs = ModelUtils.find_values_by_name(object_of_interest, 'href')

        # prose has uuid refs in markdown form: [foo](#bar) or [foo](#uuid)
        prose_list = ModelUtils.find_values_by_name(object_of_interest, 'prose')
        for prose in prose_list:
            matches = re.findall(const.MARKDOWN_URL_REGEX, prose)
            # the [1] is to extract the inner of 3 capture patterns
            new_uuids = [match[1] for match in matches]
            uuid_strs.extend(new_uuids)

        # collect the strings that start with # and are potential uuids
        uuid_strs = [uuid_str for uuid_str in uuid_strs if uuid_str and uuid_str[0] == '#']

        # go through all matches and build set of those that are uuids
        uuid_set = {uuid_match for uuid_str in uuid_strs for uuid_match in re.findall(const.UUID_REGEX, uuid_str[1:])}
        return uuid_set

    @staticmethod
    def _regenerate_uuids_in_place(object_of_interest: Any, uuid_lut: Dict[str, str]) -> Tuple[Any, Dict[str, str]]:
        """Update all uuids in model that require updating.

        Go through the model and replace all dicts with key == 'uuid' and replace the value with a new uuid4.
        Build a lookup table of the updates that were made.
        This function does not update the corresponding refs to those uuid's.  That is done by update_uuid_refs
        Note that this function needs to be started off with uuid_lut == {}, i.e. an empty dict.
        After that it recurses and grows the lut.

        Args:
            object_of_interest: pydantic.BaseModel, list, dict or str will be updated
            uuid_lut: dict of the growing lut of old:new uuid's.  First call must be made with value {}

        Returns:
            The updated object_of_interest with new uuid's (but refs to them are not updated)
            The final lookup table of old:new uuid's

        """
        uuid_str = 'uuid'
        # Certain types are known not to need updating and should not change
        # Resources are identified by uuid, and the corresponding href will have # in front of the uuid string
        # Neither of these should change
        # If other similar types are found they should be added to the FixedUuidModel typevar to prevent updating
        if isinstance(object_of_interest, common.Resource):
            pass
        elif isinstance(object_of_interest, BaseModel):
            # fields has names of all fields in model
            fields = getattr(object_of_interest, const.FIELDS_SET, None)
            for field in fields:
                new_object = None
                if field == uuid_str:
                    orig_uuid = getattr(object_of_interest, field)
                    if orig_uuid:
                        new_object = str(uuid.uuid4())
                        uuid_lut[orig_uuid] = new_object
                else:
                    new_object, uuid_lut = ModelUtils._regenerate_uuids_in_place(
                        object_of_interest.__dict__[field],
                        uuid_lut
                    )
                object_of_interest.__dict__[field] = new_object
        elif type(object_of_interest) is list:
            new_list = []
            for item in object_of_interest:
                new_item, uuid_lut = ModelUtils._regenerate_uuids_in_place(item, uuid_lut)
                new_list.append(new_item)
            object_of_interest = new_list
        elif type(object_of_interest) is dict:
            new_dict = {}
            for key, value in object_of_interest.items():
                if key == uuid_str:
                    new_val = str(uuid.uuid4())
                    new_dict[uuid_str] = new_val
                    uuid_lut[value] = new_val
                else:
                    new_value, uuid_lut = ModelUtils._regenerate_uuids_in_place(value, uuid_lut)
                    new_dict[key] = new_value
            object_of_interest = new_dict
        return object_of_interest, uuid_lut

    @staticmethod
    def _update_new_uuid_refs(object_of_interest: Any, uuid_lut: Dict[str, str]) -> Tuple[Any, int]:
        """Update all refs to uuids that were changed."""
        n_refs_updated = 0
        if isinstance(object_of_interest, BaseModel):
            fields = getattr(object_of_interest, const.FIELDS_SET, None)
            for field in fields:
                new_object, n_new_updates = ModelUtils._update_new_uuid_refs(
                    object_of_interest.__dict__[field],
                    uuid_lut
                )
                n_refs_updated += n_new_updates
                object_of_interest.__dict__[field] = new_object
        elif type(object_of_interest) is list:
            new_list = []
            for item in object_of_interest:
                new_item, n_new_updates = ModelUtils._update_new_uuid_refs(item, uuid_lut)
                n_refs_updated += n_new_updates
                new_list.append(new_item)
            object_of_interest = new_list
        elif type(object_of_interest) is dict:
            new_dict = {}
            for key, value in object_of_interest.items():
                if isinstance(value, str):
                    if value in uuid_lut:
                        new_dict[key] = uuid_lut[value]
                        n_refs_updated += 1
                    else:
                        new_dict[key] = value
                else:
                    new_value, n_new_updates = ModelUtils._update_new_uuid_refs(value, uuid_lut)
                    n_refs_updated += n_new_updates
                    new_dict[key] = new_value
            object_of_interest = new_dict
        elif isinstance(object_of_interest, str):
            if object_of_interest in uuid_lut:
                n_refs_updated += 1
                object_of_interest = uuid_lut[object_of_interest]
        return object_of_interest, n_refs_updated

    @staticmethod
    def regenerate_uuids(object_of_interest: Any) -> Tuple[Any, Dict[str, str], int]:
        """Regenerate all uuids in object and update corresponding references.

        Find all dicts with key == 'uuid' and replace the value with a new uuid4.
        Build a corresponding lookup table as you go, of old:new uuid values.
        Then make a second pass through the object and replace all string values
        present in the lookup table with the new value.

        Args:
            object_of_interest: pydantic.BaseModel, list, dict or str will be updated

        Returns:
            The updated object with new uuid's and refs
            The final lookup table of old:new uuid's
            A count of the number of refs that were updated
        """
        new_object, uuid_lut = ModelUtils._regenerate_uuids_in_place(object_of_interest, {})
        new_object, n_refs_updated = ModelUtils._update_new_uuid_refs(new_object, uuid_lut)
        return new_object, uuid_lut, n_refs_updated

    @staticmethod
    def fields_set_non_none(obj: BaseModel) -> Set[str]:
        """Find the fields set with Nones and empty items removed."""
        return set(as_filtered_list(list(obj.__fields_set__), lambda f: getattr(obj, f)))

    @staticmethod
    def _objects_differ(
        obj_a: Any, obj_b: Any, ignore_type_list: List[Any], ignore_name_list: List[str], ignore_all_uuid: bool
    ) -> bool:
        """
        Compare two objects with option to ignore given types.

        This does not check for tuples or other structures that won't be found in JSON.
        """
        obj_a_type = type(obj_a)
        obj_b_type = type(obj_b)
        if bool(obj_a) != bool(obj_b) or obj_a_type != obj_b_type:
            return True
        if not bool(obj_a):
            return False
        if obj_a_type in ignore_type_list:
            return False
        if obj_a_type is str:
            return obj_a != obj_b
        elif isinstance(obj_a, BaseModel):
            fields_a = ModelUtils.fields_set_non_none(obj_a)
            fields_b = ModelUtils.fields_set_non_none(obj_b)
            if fields_a != fields_b:
                return True
            for field in list_utils.as_filtered_list(fields_a, lambda f: f not in ignore_name_list):  # type: ignore
                if ignore_all_uuid and 'uuid' in field:
                    continue
                if ModelUtils._objects_differ(getattr(obj_a, field),
                                              getattr(obj_b, field),
                                              ignore_type_list,
                                              ignore_name_list,
                                              ignore_all_uuid):
                    return True
        elif obj_a_type is list:
            if len(obj_a) != len(obj_b):
                return True
            for item_a, item_b in zip(obj_a, obj_b):
                if ModelUtils._objects_differ(item_a, item_b, ignore_type_list, ignore_name_list, ignore_all_uuid):
                    return True
        elif obj_a_type is dict:
            if obj_a.keys() != obj_b.keys():
                return True
            for key, val in obj_a.items():
                if ignore_all_uuid and 'uuid' in key:
                    continue
                if key not in ignore_name_list and ModelUtils._objects_differ(
                        val, obj_b[key], ignore_type_list, ignore_name_list, ignore_all_uuid):
                    return True
        elif obj_a != obj_b:
            return True
        return False

    @staticmethod
    def models_are_equivalent(
        model_a: Optional[TopLevelOscalModel],
        model_b: Optional[TopLevelOscalModel],
        ignore_all_uuid: bool = False
    ) -> bool:
        """
        Test if models are equivalent except for last modified and possibly uuid.

        If a model has had uuids regenerated, then all uuids *and references to them* are updated.  This means that
        special handling is required if a model has had uuids regenerated - when checking equivalence.
        """
        uuid_type_list = [
            common.LastModified,
            common.LocationUuid,
            common.PartyUuid,
            common.RelatedRisk,
            common.Source,
            assessment_plan.RelatedObservation,
            assessment_results.RelatedObservation,
            poam.RelatedObservation,
            poam.RelatedObservation1
        ]
        type_list = uuid_type_list if ignore_all_uuid else [common.LastModified]
        return not ModelUtils._objects_differ(model_a, model_b, type_list, ['last_modified'], ignore_all_uuid)

    @staticmethod
    def get_title_from_model_uri(trestle_root: pathlib.Path, uri: str) -> str:
        """Get title from model at uri."""
        try:
            fetcher = cache.FetcherFactory.get_fetcher(trestle_root, uri)
            model, _ = fetcher.get_oscal()
            return model.metadata.title
        except TrestleError as e:
            logger.warning(f'Error finding title for model at uri {uri}: {e}')
            raise
Functions¤
dict_to_parameter(param_dict) staticmethod ¤

Convert dict with only string values to Parameter with handling for HowMany and with validity checks.

Parameters:

Name Type Description Default
param_dict Dict[str, Any]

Dictionary of pure string values representing Parameter contents

required

Returns:

Type Description
Parameter

A valid OSCAL Parameter

Notes

This handles both partial and full parameter dictionaries It checks for validity of the values if a select and HowMany is specified There is special handling for values: If it is a single string it is converted to list of one ParameterValue But if it is a list of strings is regarded as a list of values and is converted to a list of ParameterValues

Source code in trestle/common/model_utils.py
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
@staticmethod
def dict_to_parameter(param_dict: Dict[str, Any]) -> common.Parameter:
    """
    Convert dict with only string values to Parameter with handling for HowMany and with validity checks.

    Args:
        param_dict: Dictionary of pure string values representing Parameter contents

    Returns:
        A valid OSCAL Parameter

    Notes:
        This handles both partial and full parameter dictionaries
        It checks for validity of the values if a select and HowMany is specified
        There is special handling for values: If it is a single string it is converted to list of one ParameterValue
        But if it is a list of strings is regarded as a list of values and is converted to a list of ParameterValues
    """
    values = param_dict.get('values', [])
    # special handling when only one value present - convert to list of 1
    if isinstance(values, str):
        values = [values]
        param_dict['values'] = values
    if 'select' in param_dict and 'how_many' in param_dict['select']:
        count_str = param_dict['select']['how_many']
        how_many = ModelUtils._string_to_howmany(count_str)
        if how_many is None:
            raise TrestleError(f'Unrecognized HowMany value {how_many} in Parameter: should be one-or-more or one.')
        param_dict['select']['how_many'] = how_many
        if how_many == const.ONE and len(values) > 1:
            logger.warning(f'Parameter specifies HowMany=1 but has {len(values)} values given.')
        choices = param_dict['select'].get('choice', [])
        if choices and values:
            for value in values:
                if value not in choices:
                    logger.warning(f"Parameter {param_dict['id']} has value \"{value}\" not in choices: {choices}.")
    props = param_dict.get('props', [])
    if const.DISPLAY_NAME in param_dict:
        display_name = param_dict.pop(const.DISPLAY_NAME)
        props.append(common.Property(name=const.DISPLAY_NAME, value=display_name, ns=const.TRESTLE_GENERIC_NS))
    if const.AGGREGATES in param_dict:
        # removing aggregates as this is prop just informative in markdown
        param_dict.pop(const.AGGREGATES)
    param_value_origin = None
    if const.PARAM_VALUE_ORIGIN in param_dict:
        param_value_origin = param_dict.pop(const.PARAM_VALUE_ORIGIN)
        if param_value_origin is not None:
            props.append(common.Property(name=const.PARAM_VALUE_ORIGIN, value=param_value_origin))
        else:
            raise TrestleError(
                f'Parameter value origin property for parameter {param_dict["id"]}'
                'is None and it should have a value'
            )
    if const.ALT_IDENTIFIER in param_dict:
        # removing alt-identifier as this is prop just informative in markdown
        param_dict.pop(const.ALT_IDENTIFIER)

    if 'ns' in param_dict:
        param_dict.pop('ns')
    param = common.Parameter(**param_dict)
    param.props = none_if_empty(props)
    return param
fields_set_non_none(obj) staticmethod ¤

Find the fields set with Nones and empty items removed.

Source code in trestle/common/model_utils.py
860
861
862
863
@staticmethod
def fields_set_non_none(obj: BaseModel) -> Set[str]:
    """Find the fields set with Nones and empty items removed."""
    return set(as_filtered_list(list(obj.__fields_set__), lambda f: getattr(obj, f)))
find_uuid_refs(object_of_interest) staticmethod ¤

Find uuid references made in prose and links.

Source code in trestle/common/model_utils.py
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
@staticmethod
def find_uuid_refs(object_of_interest: BaseModel) -> Set[str]:
    """Find uuid references made in prose and links."""
    # hrefs have form #foo or #uuid
    uuid_strs = ModelUtils.find_values_by_name(object_of_interest, 'href')

    # prose has uuid refs in markdown form: [foo](#bar) or [foo](#uuid)
    prose_list = ModelUtils.find_values_by_name(object_of_interest, 'prose')
    for prose in prose_list:
        matches = re.findall(const.MARKDOWN_URL_REGEX, prose)
        # the [1] is to extract the inner of 3 capture patterns
        new_uuids = [match[1] for match in matches]
        uuid_strs.extend(new_uuids)

    # collect the strings that start with # and are potential uuids
    uuid_strs = [uuid_str for uuid_str in uuid_strs if uuid_str and uuid_str[0] == '#']

    # go through all matches and build set of those that are uuids
    uuid_set = {uuid_match for uuid_str in uuid_strs for uuid_match in re.findall(const.UUID_REGEX, uuid_str[1:])}
    return uuid_set
find_values_by_name(object_of_interest, name_of_interest) staticmethod ¤

Traverse object and return list of values of specified name.

Source code in trestle/common/model_utils.py
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
@staticmethod
def find_values_by_name(object_of_interest: Any, name_of_interest: str) -> List[Any]:
    """Traverse object and return list of values of specified name."""
    loe = []
    if isinstance(object_of_interest, BaseModel):
        value = getattr(object_of_interest, name_of_interest, None)
        if value is not None:
            loe.append(value)
        fields = getattr(object_of_interest, const.FIELDS_SET, None)
        if fields is not None:
            for field in fields:
                loe.extend(
                    ModelUtils.find_values_by_name(getattr(object_of_interest, field, None), name_of_interest)
                )
    elif type(object_of_interest) is list:
        for item in object_of_interest:
            loe.extend(ModelUtils.find_values_by_name(item, name_of_interest))
    elif type(object_of_interest) is dict:
        if name_of_interest in object_of_interest:
            loe.append(object_of_interest[name_of_interest])
        for item in object_of_interest.values():
            loe.extend(ModelUtils.find_values_by_name(item, name_of_interest))
    return loe
get_all_models(root) staticmethod ¤

Get list of all models in trestle directory as tuples (model_type, model_name).

Source code in trestle/common/model_utils.py
341
342
343
344
345
346
347
348
349
@staticmethod
def get_all_models(root: pathlib.Path) -> List[Tuple[str, str]]:
    """Get list of all models in trestle directory as tuples (model_type, model_name)."""
    full_list = []
    for model_type in const.MODEL_TYPE_LIST:
        models = ModelUtils.get_models_of_type(model_type, root)
        for m in models:
            full_list.append((model_type, m))
    return full_list
get_model_path_for_name_and_class(trestle_root, model_name, model_class, file_content_type=None) staticmethod ¤

Find the full path of a model given its name, model type and file content type.

If file_content_type is given it will not inspect the file system or confirm the needed path and file exists.

Source code in trestle/common/model_utils.py
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
@staticmethod
def get_model_path_for_name_and_class(
    trestle_root: pathlib.Path,
    model_name: str,
    model_class: Type[TopLevelOscalModel],
    file_content_type: Optional[FileContentType] = None
) -> Optional[pathlib.Path]:
    """
    Find the full path of a model given its name, model type and file content type.

    If file_content_type is given it will not inspect the file system or confirm the needed path and file exists.
    """
    if file_content_type is None:
        root_model_path = ModelUtils._root_path_for_top_level_model(trestle_root, model_name, model_class)
        file_content_type = FileContentType.path_to_content_type(root_model_path)
        if not FileContentType.is_readable_file(file_content_type):
            return None

        return root_model_path.with_suffix(FileContentType.to_file_extension(file_content_type))

    root_path = ModelUtils._root_path_for_top_level_model(trestle_root, model_name, model_class)
    return root_path.with_suffix(FileContentType.to_file_extension(file_content_type))
get_models_of_type(model_type, root) staticmethod ¤

Get list of model names for requested type in trestle directory.

Source code in trestle/common/model_utils.py
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
@staticmethod
def get_models_of_type(model_type: str, root: pathlib.Path) -> List[str]:
    """Get list of model names for requested type in trestle directory."""
    if model_type not in const.MODEL_TYPE_LIST:
        raise err.TrestleError(f'Model type {model_type} is not supported')
    # search relative to project root
    trestle_root = extract_trestle_project_root(root)
    if not trestle_root:
        logger.error(f'Given directory {root} is not within a trestle project.')
        raise err.TrestleError('Given directory is not within a trestle project.')

    # contruct path to the model file name
    model_dir_name = ModelUtils.model_type_to_model_dir(model_type)
    root_model_dir = trestle_root / model_dir_name
    model_list = []
    for f in root_model_dir.glob('*/'):
        # only look for proper json and yaml files
        if not ModelUtils._should_ignore(f.stem):
            if not f.is_dir():
                logger.warning(
                    f'Ignoring validation of misplaced file {f.name} '
                    + f'found in the model directory, {model_dir_name}.'
                )
            else:
                model_list.append(f.stem)
    return model_list
get_relative_model_type(relative_path) staticmethod ¤

Given the relative path of a file with respect to 'trestle_root' return the oscal model type.

Parameters:

Name Type Description Default
relative_path Path

Relative path of the model with respect to the root directory of the trestle workspace.

required

Returns: Type of Oscal Model for the provided model Alias of that oscal model.

Source code in trestle/common/model_utils.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
@staticmethod
def get_relative_model_type(relative_path: pathlib.Path) -> Tuple[Type[OscalBaseModel], str]:
    """
    Given the relative path of a file with respect to 'trestle_root' return the oscal model type.

    Args:
        relative_path: Relative path of the model with respect to the root directory of the trestle workspace.
    Returns:
        Type of Oscal Model for the provided model
        Alias of that oscal model.
    """
    if len(relative_path.parts) < 2:
        raise TrestleError(
            'Insufficient path length to be a valid relative path w.r.t trestle workspace root directory.'
        )
    model_dir = relative_path.parts[0]
    model_relative_path = pathlib.Path(*relative_path.parts[2:])  # catalogs, profiles, etc

    if model_dir in const.MODEL_DIR_LIST:
        module_name = const.MODEL_DIR_TO_MODEL_MODULE[model_dir]
    else:
        raise TrestleError(f'No valid trestle model type directory (e.g. catalogs) found for {model_dir}.')

    model_type, model_alias = ModelUtils.get_root_model(module_name)
    full_alias = model_alias

    for index, part in enumerate(model_relative_path.parts):
        alias = ModelUtils._extract_alias(part)
        if index > 0 or model_alias != alias:
            model_alias = alias
            full_alias = f'{full_alias}.{model_alias}'
            if utils.is_collection_field_type(model_type):
                model_type = utils.get_inner_type(model_type)
            else:
                model_type = model_type.alias_to_field_map()[alias].outer_type_

    return model_type, full_alias
get_root_model(module_name) staticmethod ¤

Get the root model class and alias based on the module.

Source code in trestle/common/model_utils.py
464
465
466
467
468
469
470
471
472
473
474
475
@staticmethod
def get_root_model(module_name: str) -> Tuple[Type[Any], str]:
    """Get the root model class and alias based on the module."""
    try:
        module = importlib.import_module(module_name)
    except ModuleNotFoundError as e:
        raise err.TrestleError(str(e))

    if hasattr(module, 'Model'):
        model_metadata = next(iter(module.Model.__fields__.values()))
        return model_metadata.type_, model_metadata.alias
    raise err.TrestleError('Invalid module')
get_singular_alias(alias_path, relative_path=None) staticmethod ¤

Get the alias in the singular form from a jsonpath.

If contextual_mode is True and contextual_path is None, it assumes alias_path is relative to the directory the user is running trestle from.

Parameters:

Name Type Description Default
alias_path str

The current alias element path as a string

required
relative_path Optional[Path]

Optional relative path (w.r.t. trestle_root) to cater for relative element paths.

None

Returns: Alias as a string

Source code in trestle/common/model_utils.py
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
@staticmethod
def get_singular_alias(alias_path: str, relative_path: Optional[pathlib.Path] = None) -> str:
    """
    Get the alias in the singular form from a jsonpath.

    If contextual_mode is True and contextual_path is None, it assumes alias_path
    is relative to the directory the user is running trestle from.

    Args:
        alias_path: The current alias element path as a string
        relative_path: Optional relative path (w.r.t. trestle_root) to cater for relative element paths.
    Returns:
        Alias as a string
    """
    if len(alias_path.strip()) == 0:
        raise err.TrestleError(f'Invalid jsonpath {alias_path}')

    singular_alias: str = ''

    full_alias_path = alias_path
    if relative_path:
        logger.debug(f'get_singular_alias contextual mode: {str}')
        _, full_model_alias = ModelUtils.get_relative_model_type(relative_path)
        first_alias_a = full_model_alias.split('.')[-1]
        first_alias_b = alias_path.split('.')[0]
        if first_alias_a == first_alias_b:
            full_model_alias = '.'.join(full_model_alias.split('.')[:-1])
        full_alias_path = '.'.join([full_model_alias, alias_path]).strip('.')

    path_parts = full_alias_path.split(const.ALIAS_PATH_SEPARATOR)
    logger.debug(f'path parts: {path_parts}')

    model_types = []

    root_model_alias = path_parts[0]
    found = False
    for module_name in const.MODEL_TYPE_TO_MODEL_MODULE.values():
        model_type, model_alias = ModelUtils.get_root_model(module_name)
        if root_model_alias == model_alias:
            found = True
            model_types.append(model_type)
            break

    if not found:
        raise err.TrestleError(f'{root_model_alias} is an invalid root model alias.')

    if len(path_parts) == 1:
        return root_model_alias

    model_type = model_types[0]
    # go through path parts skipping first one
    for i in range(1, len(path_parts)):
        if utils.is_collection_field_type(model_type):
            # if it is a collection type and last part is * then break
            if i == len(path_parts) - 1 and path_parts[i] == '*':
                break
            # otherwise get the inner type of items in the collection
            model_type = utils.get_inner_type(model_type)
            # and bump i
            i = i + 1
        else:
            path_part = path_parts[i]
            field_map = model_type.alias_to_field_map()
            if path_part not in field_map:
                continue
            field = field_map[path_part]
            model_type = field.outer_type_
        model_types.append(model_type)

    last_alias = path_parts[-1]
    if last_alias == '*':
        last_alias = path_parts[-2]

    # generic model and not list, so return itself fixme doc
    if not utils.is_collection_field_type(model_type):
        return last_alias

    parent_model_type = model_types[-2]
    try:
        field_map = parent_model_type.alias_to_field_map()
        field = field_map[last_alias]
        outer_type = field.outer_type_
        inner_type = utils.get_inner_type(outer_type)
        inner_type_name = inner_type.__name__
        singular_alias = str_utils.classname_to_alias(inner_type_name, AliasMode.JSON)
    except Exception as e:
        raise err.TrestleError(f'Error in json path {alias_path}: {e}')

    return singular_alias
get_stripped_model_type(absolute_path, absolute_trestle_root, aliases_not_to_be_stripped=None) staticmethod ¤

Get the stripped contextual model class and alias based on the contextual path.

This function relies on the directory structure of the trestle model being edited to determine, based on the existing files and folder, which fields should be stripped from the model type represented by the path passed in as a parameter.

Source code in trestle/common/model_utils.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
@staticmethod
def get_stripped_model_type(
    absolute_path: pathlib.Path,
    absolute_trestle_root: pathlib.Path,
    aliases_not_to_be_stripped: List[str] = None
) -> Tuple[Type[OscalBaseModel], str]:
    """
    Get the stripped contextual model class and alias based on the contextual path.

    This function relies on the directory structure of the trestle model being edited to determine, based on the
    existing files and folder, which fields should be stripped from the model type represented by the
    path passed in as a parameter.
    """
    if aliases_not_to_be_stripped is None:
        aliases_not_to_be_stripped = []
    singular_model_type, model_alias = ModelUtils.get_relative_model_type(
        absolute_path.relative_to(absolute_trestle_root))
    logger.debug(f'singular model type {singular_model_type} model alias {model_alias}')

    # Stripped models do not apply to collection types such as List[] and Dict{}
    # if model type is a list or dict, generate a new wrapping model for it
    if utils.is_collection_field_type(singular_model_type):
        malias = model_alias.split('.')[-1]
        class_name = alias_to_classname(malias, AliasMode.JSON)
        logger.debug(f'collection field type class name {class_name} and alias {malias}')
        model_type = create_model(class_name, __base__=OscalBaseModel, __root__=(singular_model_type, ...))
        logger.debug(f'model_type created: {model_type}')
        return model_type, model_alias

    malias = model_alias.split('.')[-1]
    logger.debug(f'not collection field type, malias: {malias}')
    if absolute_path.is_dir() and malias != ModelUtils._extract_alias(absolute_path.name):
        split_subdir = absolute_path / malias
    else:
        split_subdir = absolute_path.parent / absolute_path.with_suffix('').name

    aliases_to_be_stripped = set()
    if split_subdir.exists():
        for f in iterdir_without_hidden_files(split_subdir):
            alias = ModelUtils._extract_alias(f.name)
            if alias not in aliases_not_to_be_stripped:
                aliases_to_be_stripped.add(alias)

    logger.debug(f'aliases to be stripped: {aliases_to_be_stripped}')
    if len(aliases_to_be_stripped) > 0:
        model_type = singular_model_type.create_stripped_model_type(
            stripped_fields_aliases=list(aliases_to_be_stripped)
        )
        logger.debug(f'model_type: {model_type}')
        return model_type, model_alias
    return singular_model_type, model_alias
get_title_from_model_uri(trestle_root, uri) staticmethod ¤

Get title from model at uri.

Source code in trestle/common/model_utils.py
943
944
945
946
947
948
949
950
951
952
@staticmethod
def get_title_from_model_uri(trestle_root: pathlib.Path, uri: str) -> str:
    """Get title from model at uri."""
    try:
        fetcher = cache.FetcherFactory.get_fetcher(trestle_root, uri)
        model, _ = fetcher.get_oscal()
        return model.metadata.title
    except TrestleError as e:
        logger.warning(f'Error finding title for model at uri {uri}: {e}')
        raise
has_no_duplicate_values_by_name(object_of_interest, name_of_interest) staticmethod ¤

Determine if duplicate values of type exist in object.

Source code in trestle/common/model_utils.py
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
@staticmethod
def has_no_duplicate_values_by_name(object_of_interest: BaseModel, name_of_interest: str) -> bool:
    """Determine if duplicate values of type exist in object."""
    loe = ModelUtils.find_values_by_name(object_of_interest, name_of_interest)
    set_loe = set(loe)
    if len(loe) == len(set_loe):
        return True
    items: Dict[str, Any] = {}
    for item in loe:
        items[item] = items.get(item, 0) + 1
    # now print items
    for item, instances in items.items():
        if instances > 1:
            logger.warning(f'Duplicate detected of item {item} with {instances} instances.')
    return False
last_modified_at_time(timestamp=None) staticmethod ¤

Generate a LastModified set to timestamp or now.

Source code in trestle/common/model_utils.py
655
656
657
658
659
@staticmethod
def last_modified_at_time(timestamp: Optional[datetime] = None) -> datetime:
    """Generate a LastModified set to timestamp or now."""
    timestamp = timestamp if timestamp else datetime.now().astimezone()
    return timestamp
load_distributed(abs_path, abs_trestle_root, collection_type=None) staticmethod ¤

Given path to a model, load the model.

If the model is decomposed/split/distributed,the decomposed models are loaded recursively.

Parameters:

Name Type Description Default
abs_path Path

The path to the file/directory to be loaded.

required
abs_trestle_root Path

The trestle workspace root directory.

required
collection_type Optional[Type[Any]]

The type of collection model, if it is a collection model. typing.List is the only collection type handled or expected. Defaults to None.

None

Returns:

Type Description
Type[OscalBaseModel]

Return a tuple of Model Type (e.g. class 'trestle.oscal.catalog.Catalog'),

str

Model Alias (e.g. 'catalog.metadata') and Instance of the Model.

Optional[Union[OscalBaseModel, List[OscalBaseModel], Dict[str, OscalBaseModel]]]

If the model is decomposed/split/distributed, the instance of the model contains the decomposed models loaded recursively.

Note

This does not validate the model. You must either validate the model separately or use the load_validate utilities.

Source code in trestle/common/model_utils.py
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
@staticmethod
def load_distributed(
    abs_path: Path,
    abs_trestle_root: Path,
    collection_type: Optional[Type[Any]] = None
) -> Tuple[Type[OscalBaseModel],
           str,
           Optional[Union[OscalBaseModel, List[OscalBaseModel], Dict[str, OscalBaseModel]]]]:
    """
    Given path to a model, load the model.

    If the model is decomposed/split/distributed,the decomposed models are loaded recursively.

    Args:
        abs_path: The path to the file/directory to be loaded.
        abs_trestle_root: The trestle workspace root directory.
        collection_type: The type of collection model, if it is a collection model.
            typing.List is the only collection type handled or expected.
            Defaults to None.

    Returns:
        Return a tuple of Model Type (e.g. class 'trestle.oscal.catalog.Catalog'),
        Model Alias (e.g. 'catalog.metadata') and Instance of the Model.
        If the model is decomposed/split/distributed, the instance of the model contains
            the decomposed models loaded recursively.

    Note:
        This does not validate the model.  You must either validate the model separately or use the load_validate
        utilities.
    """
    # if trying to load file that does not exist, load path instead
    if not abs_path.exists():
        abs_path = abs_path.with_name(abs_path.stem)

    if not abs_path.exists():
        raise TrestleNotFoundError(f'File {abs_path} not found for load.')

    if collection_type:
        # If the path contains a list type model
        if collection_type is list:
            return ModelUtils._load_list(abs_path, abs_trestle_root)
        # the only other collection type in OSCAL is dict, and it only applies to include_all,
        # which is too granular ever to be loaded by this routine
        else:
            raise TrestleError(f'Collection type {collection_type} not recognized for distributed load.')

    # Get current model
    primary_model_type, primary_model_alias = ModelUtils.get_stripped_model_type(abs_path, abs_trestle_root)
    primary_model_instance: Optional[Union[OscalBaseModel, List[OscalBaseModel], Dict[str, OscalBaseModel]]] = None

    # is this an attempt to load an actual json or yaml file?
    content_type = FileContentType.path_to_content_type(abs_path)
    # if file is sought but it doesn't exist, ignore and load as decomposed model
    if FileContentType.is_readable_file(content_type) and abs_path.exists():
        primary_model_instance = primary_model_type.oscal_read(abs_path)
    # Is model decomposed?
    decomposed_dir = abs_path.with_name(abs_path.stem)

    if decomposed_dir.exists():
        aliases_not_to_be_stripped = []
        instances_to_be_merged: List[OscalBaseModel] = []

        for local_path in sorted(trestle.common.file_utils.iterdir_without_hidden_files(decomposed_dir)):
            if local_path.is_file():
                model_type, model_alias, model_instance = ModelUtils.load_distributed(local_path, abs_trestle_root)
                aliases_not_to_be_stripped.append(model_alias.split('.')[-1])
                instances_to_be_merged.append(model_instance)

            elif local_path.is_dir():
                model_type, model_alias = ModelUtils.get_stripped_model_type(local_path, abs_trestle_root)
                # Only load the directory if it is a collection model. Otherwise do nothing - it gets loaded when
                # iterating over the model file

                # If a model is just a container for a list e.g.
                # class Foo(OscalBaseModel):  noqa: E800
                #      __root__: List[Bar]    noqa: E800
                # You need to test whether first a root key exists
                # then whether the outer_type of root is a collection.
                # Alternative is to do a try except to avoid the error for an unknown key.

                if model_type.is_collection_container():
                    # This directory is a decomposed List or Dict
                    collection_type = model_type.get_collection_type()
                    model_type, model_alias, model_instance = ModelUtils.load_distributed(local_path,
                                                                                          abs_trestle_root,
                                                                                          collection_type)
                    aliases_not_to_be_stripped.append(model_alias.split('.')[-1])
                    instances_to_be_merged.append(model_instance)
        primary_model_dict = {}
        if primary_model_instance is not None:
            primary_model_dict = primary_model_instance.__dict__

        merged_model_type, merged_model_alias = ModelUtils.get_stripped_model_type(abs_path,
                                                                                   abs_trestle_root,
                                                                                   aliases_not_to_be_stripped)

        # The following use of top_level is to allow loading of a top level model by name only, e.g. MyCatalog
        # There may be a better overall way to approach this.
        top_level = len(merged_model_alias.split('.')) == 1

        for i in range(len(aliases_not_to_be_stripped)):
            alias = aliases_not_to_be_stripped[i]
            instance = instances_to_be_merged[i]
            if hasattr(instance, '__dict__') and '__root__' in instance.__dict__ and isinstance(instance,
                                                                                                OscalBaseModel):
                instance = instance.__dict__['__root__']
            if top_level and not primary_model_dict:
                primary_model_dict = instance.__dict__
            else:
                primary_model_dict[alias] = instance

        merged_model_instance = merged_model_type(**primary_model_dict)
        return merged_model_type, merged_model_alias, merged_model_instance
    return primary_model_type, primary_model_alias, primary_model_instance
load_model_for_class(trestle_root, model_name, model_class, file_content_type=None) staticmethod ¤

Load a model by name and model class and infer file content type if not specified.

If you need to load an existing model but its content type may not be known, use this method. But the file content type should be specified if it is somehow known.

Note

This does not validate the model. If you want to validate the model use the load_validate utilities.

Source code in trestle/common/model_utils.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
@staticmethod
def load_model_for_class(
    trestle_root: pathlib.Path,
    model_name: str,
    model_class: TG,
    file_content_type: Optional[FileContentType] = None
) -> Tuple[TG, pathlib.Path]:
    """Load a model by name and model class and infer file content type if not specified.

    If you need to load an existing model but its content type may not be known, use this method.
    But the file content type should be specified if it is somehow known.

    Note:
        This does not validate the model.  If you want to validate the model use the load_validate utilities.
    """
    root_model_path = ModelUtils._root_path_for_top_level_model(
        trestle_root, model_name, model_class
    )  # type: ignore
    if file_content_type is None:
        file_content_type = FileContentType.path_to_content_type(root_model_path)
    if not FileContentType.is_readable_file(file_content_type):
        raise TrestleError(f'Unable to load model {model_name} without specifying json or yaml.')
    full_model_path = root_model_path.with_suffix(FileContentType.to_file_extension(file_content_type))
    _, _, model = ModelUtils.load_distributed(full_model_path, trestle_root)
    return model, full_model_path  # type: ignore
load_model_for_type(trestle_root, model_type, model_name) staticmethod ¤

Load model for the given type and name.

Source code in trestle/common/model_utils.py
188
189
190
191
192
193
194
195
196
197
198
199
200
@staticmethod
def load_model_for_type(trestle_root: pathlib.Path, model_type: str,
                        model_name: str) -> Tuple[TopLevelOscalModel, pathlib.Path]:
    """Load model for the given type and name."""
    dir_name = ModelUtils.model_type_to_model_dir(model_type)
    model_path = trestle_root / dir_name / model_name

    if not model_path.exists():
        raise TrestleError(f'No model is found at path: {model_path}.')

    _, _, oscal_object = ModelUtils.load_distributed(model_path, trestle_root)

    return oscal_object, model_path  # type: ignore
model_age(model) staticmethod ¤

Find time in seconds since LastModified timestamp.

Source code in trestle/common/model_utils.py
667
668
669
670
671
672
673
674
675
@staticmethod
def model_age(model: TopLevelOscalModel) -> int:
    """Find time in seconds since LastModified timestamp."""
    # default to one year if no last_modified
    age_seconds = const.DAY_SECONDS * 365
    if model.metadata.last_modified:
        dt = datetime.now().astimezone() - model.metadata.last_modified
        age_seconds = dt.seconds
    return age_seconds
model_type_to_model_dir(model_type) staticmethod ¤

Get plural model directory from model type.

Source code in trestle/common/model_utils.py
307
308
309
310
311
312
@staticmethod
def model_type_to_model_dir(model_type: str) -> str:
    """Get plural model directory from model type."""
    if model_type not in const.MODEL_TYPE_LIST:
        raise err.TrestleError(f'Not a valid model type: {model_type}.')
    return const.MODEL_TYPE_TO_MODEL_DIR[model_type]
models_are_equivalent(model_a, model_b, ignore_all_uuid=False) staticmethod ¤

Test if models are equivalent except for last modified and possibly uuid.

If a model has had uuids regenerated, then all uuids and references to them are updated. This means that special handling is required if a model has had uuids regenerated - when checking equivalence.

Source code in trestle/common/model_utils.py
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
@staticmethod
def models_are_equivalent(
    model_a: Optional[TopLevelOscalModel],
    model_b: Optional[TopLevelOscalModel],
    ignore_all_uuid: bool = False
) -> bool:
    """
    Test if models are equivalent except for last modified and possibly uuid.

    If a model has had uuids regenerated, then all uuids *and references to them* are updated.  This means that
    special handling is required if a model has had uuids regenerated - when checking equivalence.
    """
    uuid_type_list = [
        common.LastModified,
        common.LocationUuid,
        common.PartyUuid,
        common.RelatedRisk,
        common.Source,
        assessment_plan.RelatedObservation,
        assessment_results.RelatedObservation,
        poam.RelatedObservation,
        poam.RelatedObservation1
    ]
    type_list = uuid_type_list if ignore_all_uuid else [common.LastModified]
    return not ModelUtils._objects_differ(model_a, model_b, type_list, ['last_modified'], ignore_all_uuid)
parameter_to_dict(obj, partial) staticmethod ¤

Convert obj to dict containing only string values, storing only the fields that have values set.

Parameters:

Name Type Description Default
obj Union[OscalBaseModel, str]

The parameter or its consituent parts in recursive calls

required
partial bool

Whether to convert the entire param or just the parts needed for markdown header

required

Returns:

Type Description
Union[str, Dict[str, Any]]

The converted parameter as dictionary, with values as None if not present

Source code in trestle/common/model_utils.py
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
@staticmethod
def parameter_to_dict(obj: Union[OscalBaseModel, str], partial: bool) -> Union[str, Dict[str, Any]]:
    """
    Convert obj to dict containing only string values, storing only the fields that have values set.

    Args:
        obj: The parameter or its consituent parts in recursive calls
        partial: Whether to convert the entire param or just the parts needed for markdown header

    Returns:
        The converted parameter as dictionary, with values as None if not present
    """
    res = ModelUtils._parameter_to_dict_recurse(obj, partial)
    if 'values' not in res:
        res['values'] = None  # type: ignore
    return res
regenerate_uuids(object_of_interest) staticmethod ¤

Regenerate all uuids in object and update corresponding references.

Find all dicts with key == 'uuid' and replace the value with a new uuid4. Build a corresponding lookup table as you go, of old:new uuid values. Then make a second pass through the object and replace all string values present in the lookup table with the new value.

Parameters:

Name Type Description Default
object_of_interest Any

pydantic.BaseModel, list, dict or str will be updated

required

Returns:

Type Description
Any

The updated object with new uuid's and refs

Dict[str, str]

The final lookup table of old:new uuid's

int

A count of the number of refs that were updated

Source code in trestle/common/model_utils.py
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
@staticmethod
def regenerate_uuids(object_of_interest: Any) -> Tuple[Any, Dict[str, str], int]:
    """Regenerate all uuids in object and update corresponding references.

    Find all dicts with key == 'uuid' and replace the value with a new uuid4.
    Build a corresponding lookup table as you go, of old:new uuid values.
    Then make a second pass through the object and replace all string values
    present in the lookup table with the new value.

    Args:
        object_of_interest: pydantic.BaseModel, list, dict or str will be updated

    Returns:
        The updated object with new uuid's and refs
        The final lookup table of old:new uuid's
        A count of the number of refs that were updated
    """
    new_object, uuid_lut = ModelUtils._regenerate_uuids_in_place(object_of_interest, {})
    new_object, n_refs_updated = ModelUtils._update_new_uuid_refs(new_object, uuid_lut)
    return new_object, uuid_lut, n_refs_updated
save_top_level_model(model, trestle_root, model_name, file_content_type) staticmethod ¤

Save a model by name and infer model type by inspection.

You don't need to specify the model type (catalog, profile, etc.) but you must specify the file content type. If the model directory does not exist, it is created.

Source code in trestle/common/model_utils.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
@staticmethod
def save_top_level_model(
    model: TopLevelOscalModel, trestle_root: pathlib.Path, model_name: str, file_content_type: FileContentType
) -> None:
    """Save a model by name and infer model type by inspection.

    You don't need to specify the model type (catalog, profile, etc.) but you must specify the file content type.
    If the model directory does not exist, it is created.
    """
    root_model_path = ModelUtils._root_path_for_top_level_model(trestle_root, model_name, model)
    full_model_path = root_model_path.with_suffix(FileContentType.to_file_extension(file_content_type))
    if not full_model_path.parent.exists():
        full_model_path.parent.mkdir(parents=True, exist_ok=True)
    model.oscal_write(full_model_path)
update_last_modified(model, timestamp=None) staticmethod ¤

Update the LastModified timestamp in top level model to now.

Source code in trestle/common/model_utils.py
661
662
663
664
665
@staticmethod
def update_last_modified(model: TopLevelOscalModel, timestamp: Optional[datetime] = None) -> None:
    """Update the LastModified timestamp in top level model to now."""
    timestamp = timestamp if timestamp else datetime.now().astimezone()
    model.metadata.last_modified = timestamp

Functions¤

handler: python