Skip to content

repository

trestle.core.repository ¤

Trestle Repository APIs.

Attributes¤

logger = logging.getLogger(__name__) module-attribute ¤

Classes¤

AgileAuthoring ¤

Bases: Repository

AgileAuthoring extends the Repository class for performing authoring specific operations on Trestle repository.

This class provides a set of APIs to perform generate and assemble authoring operations in the trestle repository rather than using the command line.

Source code in trestle/core/repository.py
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
class AgileAuthoring(Repository):
    """
    AgileAuthoring extends the Repository class for performing authoring specific operations on Trestle repository.

    This class provides a set of APIs to perform generate and assemble authoring operations in the trestle repository
    rather than using the command line.

    """

    def __init__(self, root_dir: pathlib.Path) -> None:
        """Initialize trestle repository object."""
        super().__init__(root_dir)

    def assemble_catalog_markdown(
        self,
        name: str,
        output: str,
        markdown_dir: str,
        set_parameters: bool = False,
        regenerate: bool = False,
        version: Optional[str] = None
    ) -> bool:
        """Assemble catalog markdown into OSCAL Catalog in JSON."""
        logger.debug(f'Assembling model {name} of type catalog.')
        success = False

        verbose = log.get_current_verbosity_level(logger)
        args = argparse.Namespace(
            name=name,
            output=output,
            markdown=markdown_dir,
            trestle_root=self.root_dir,
            set_parameters=set_parameters,
            regenerate=regenerate,
            version=version,
            verbose=verbose
        )

        try:
            ret = catalogauthorcmd.CatalogAssemble()._run(args)
            if ret == 0:
                success = True
        except Exception as e:
            raise TrestleError(f'Error assembling catalog {name}: {e}')

        logger.debug(f'Model {name} assembled successfully.')
        return success

    def assemble_profile_markdown(
        self,
        name: str,
        output: str,
        markdown_dir: str,
        set_parameters: bool = False,
        regenerate: bool = False,
        version: Optional[str] = None,
        sections: Optional[str] = None,
        required_sections: Optional[str] = None,
        allowed_sections: Optional[str] = None
    ) -> bool:
        """Assemble profile markdown into OSCAL Profile in JSON."""
        logger.debug(f'Assembling model {name} of type profile.')
        success = False

        verbose = log.get_current_verbosity_level(logger)
        args = argparse.Namespace(
            name=name,
            output=output,
            markdown=markdown_dir,
            trestle_root=self.root_dir,
            set_parameters=set_parameters,
            regenerate=regenerate,
            version=version,
            sections=sections,
            required_sections=required_sections,
            allowed_sections=allowed_sections,
            verbose=verbose
        )

        try:
            ret = profileauthorcmd.ProfileAssemble()._run(args)
            if ret == 0:
                success = True
        except Exception as e:
            raise TrestleError(f'Error assembling profile {name}: {e}')

        logger.debug(f'Model {name} assembled successfully.')
        return success

    def assemble_component_definition_markdown(
        self, name: str, output: str, markdown_dir: str, regenerate: bool = False, version: str = ''
    ) -> bool:
        """Assemble component definition markdown into OSCAL Component Definition in JSON."""
        logger.debug(f'Assembling model {name} of type component definition.')
        success = False

        verbose = log.get_current_verbosity_level(logger)
        args = argparse.Namespace(
            name=name,
            output=output,
            markdown=markdown_dir,
            trestle_root=self.root_dir,
            regenerate=regenerate,
            version=version,
            verbose=verbose
        )

        try:
            ret = componentauthorcmd.ComponentAssemble()._run(args)
            if ret == 0:
                success = True
        except Exception as e:
            raise TrestleError(f'Error assembling component definition {name}: {e}')

        logger.debug(f'Model {name} assembled successfully.')
        return success

    def assemble_ssp_markdown(
        self,
        name: str,
        output: str,
        markdown_dir: str,
        compdefs: str,
        regenerate: bool = False,
        version: Optional[str] = None
    ) -> bool:
        """Assemble ssp markdown into OSCAL SSP in JSON."""
        logger.debug(f'Assembling model {name} of type ssp.')
        success = False

        verbose = log.get_current_verbosity_level(logger)
        args = argparse.Namespace(
            name=name,
            output=output,
            markdown=markdown_dir,
            compdefs=compdefs,
            trestle_root=self.root_dir,
            regenerate=regenerate,
            version=version,
            verbose=verbose
        )

        try:
            ret = sspauthorcmd.SSPAssemble()._run(args)
            if ret == 0:
                success = True
        except Exception as e:
            raise TrestleError(f'Error assembling ssp {name}: {e}')

        logger.debug(f'Model {name} assembled successfully.')
        return success

    def generate_catalog_markdown(
        self,
        name: str,
        output: str,
        force_overwrite: bool = False,
        yaml_header: Optional[str] = None,
        overwrite_header_values: bool = False
    ) -> bool:
        """Generate catalog markdown from OSCAL Catalog in JSON."""
        logger.debug(f'Generating markdown for {name} of type catalog.')
        success = False

        verbose = log.get_current_verbosity_level(logger)
        args = argparse.Namespace(
            name=name,
            output=output,
            trestle_root=self.root_dir,
            force_overwrite=force_overwrite,
            yaml_header=yaml_header,
            overwrite_header_values=overwrite_header_values,
            verbose=verbose
        )

        try:
            ret = catalogauthorcmd.CatalogGenerate()._run(args)
            if ret == 0:
                success = True
        except Exception as e:
            raise TrestleError(f'Error generate markdown for catalog {name}: {e}')

        logger.debug(f'Model {name} markdown generated successfully.')
        return success

    def generate_profile_markdown(
        self,
        name: str,
        output: str,
        force_overwrite: bool = False,
        yaml_header: Optional[str] = None,
        overwrite_header_values: bool = False,
        sections: Optional[str] = None,
        required_sections: Optional[str] = None
    ) -> bool:
        """Generate profile markdown from OSCAL Profile in JSON."""
        logger.debug(f'Generating markdown for {name} of type profile.')
        success = False

        verbose = log.get_current_verbosity_level(logger)
        args = argparse.Namespace(
            name=name,
            output=output,
            trestle_root=self.root_dir,
            force_overwrite=force_overwrite,
            yaml_header=yaml_header,
            overwrite_header_values=overwrite_header_values,
            sections=sections,
            required_sections=required_sections,
            verbose=verbose
        )

        try:
            ret = profileauthorcmd.ProfileGenerate()._run(args)
            if ret == 0:
                success = True
        except Exception as e:
            raise TrestleError(f'Error generate markdown for profile {name}: {e}')

        logger.debug(f'Model {name} markdown generated successfully.')
        return success

    def generate_component_definition_markdown(
        self,
        name: str,
        output: str,
        force_overwrite: bool = False,
    ) -> bool:
        """Generate component definition markdown from OSCAL Component Definition in JSON."""
        logger.debug(f'Generating markdown for {name} of type component definition.')
        success = False

        verbose = log.get_current_verbosity_level(logger)
        args = argparse.Namespace(
            name=name, output=output, trestle_root=self.root_dir, force_overwrite=force_overwrite, verbose=verbose
        )

        try:
            ret = componentauthorcmd.ComponentGenerate()._run(args)
            if ret == 0:
                success = True
        except Exception as e:
            raise TrestleError(f'Error generating markdown for component definition {name}: {e}')

        logger.debug(f'Model {name} markdown generated successfully.')
        return success

    def generate_ssp_markdown(
        self,
        profile: str,
        output: str,
        compdefs: str,
        leveraged_ssp: Optional[str] = None,
        force_overwrite: bool = False,
        include_all_parts: bool = False,
        yaml_header: Optional[str] = None,
        overwrite_header_values: bool = False
    ) -> bool:
        """Generate ssp markdown from OSCAL Profile and Component Definitions."""
        logger.debug(f'Generating markdown for {output} of type ssp.')
        success = False

        verbose = log.get_current_verbosity_level(logger)
        args = argparse.Namespace(
            profile=profile,
            output=output,
            compdefs=compdefs,
            leveraged_ssp=leveraged_ssp,
            trestle_root=self.root_dir,
            force_overwrite=force_overwrite,
            include_all_parts=include_all_parts,
            yaml_header=yaml_header,
            overwrite_header_values=overwrite_header_values,
            verbose=verbose
        )

        try:
            ret = sspauthorcmd.SSPGenerate()._run(args)
            if ret == 0:
                success = True
        except Exception as e:
            raise TrestleError(f'Error in generating markdown for ssp {output}: {e}')

        logger.debug(f'Model {output} markdown generated successfully.')
        return success
Functions¤
__init__(root_dir) ¤

Initialize trestle repository object.

Source code in trestle/core/repository.py
380
381
382
def __init__(self, root_dir: pathlib.Path) -> None:
    """Initialize trestle repository object."""
    super().__init__(root_dir)
assemble_catalog_markdown(name, output, markdown_dir, set_parameters=False, regenerate=False, version=None) ¤

Assemble catalog markdown into OSCAL Catalog in JSON.

Source code in trestle/core/repository.py
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
def assemble_catalog_markdown(
    self,
    name: str,
    output: str,
    markdown_dir: str,
    set_parameters: bool = False,
    regenerate: bool = False,
    version: Optional[str] = None
) -> bool:
    """Assemble catalog markdown into OSCAL Catalog in JSON."""
    logger.debug(f'Assembling model {name} of type catalog.')
    success = False

    verbose = log.get_current_verbosity_level(logger)
    args = argparse.Namespace(
        name=name,
        output=output,
        markdown=markdown_dir,
        trestle_root=self.root_dir,
        set_parameters=set_parameters,
        regenerate=regenerate,
        version=version,
        verbose=verbose
    )

    try:
        ret = catalogauthorcmd.CatalogAssemble()._run(args)
        if ret == 0:
            success = True
    except Exception as e:
        raise TrestleError(f'Error assembling catalog {name}: {e}')

    logger.debug(f'Model {name} assembled successfully.')
    return success
assemble_component_definition_markdown(name, output, markdown_dir, regenerate=False, version='') ¤

Assemble component definition markdown into OSCAL Component Definition in JSON.

Source code in trestle/core/repository.py
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
def assemble_component_definition_markdown(
    self, name: str, output: str, markdown_dir: str, regenerate: bool = False, version: str = ''
) -> bool:
    """Assemble component definition markdown into OSCAL Component Definition in JSON."""
    logger.debug(f'Assembling model {name} of type component definition.')
    success = False

    verbose = log.get_current_verbosity_level(logger)
    args = argparse.Namespace(
        name=name,
        output=output,
        markdown=markdown_dir,
        trestle_root=self.root_dir,
        regenerate=regenerate,
        version=version,
        verbose=verbose
    )

    try:
        ret = componentauthorcmd.ComponentAssemble()._run(args)
        if ret == 0:
            success = True
    except Exception as e:
        raise TrestleError(f'Error assembling component definition {name}: {e}')

    logger.debug(f'Model {name} assembled successfully.')
    return success
assemble_profile_markdown(name, output, markdown_dir, set_parameters=False, regenerate=False, version=None, sections=None, required_sections=None, allowed_sections=None) ¤

Assemble profile markdown into OSCAL Profile in JSON.

Source code in trestle/core/repository.py
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
def assemble_profile_markdown(
    self,
    name: str,
    output: str,
    markdown_dir: str,
    set_parameters: bool = False,
    regenerate: bool = False,
    version: Optional[str] = None,
    sections: Optional[str] = None,
    required_sections: Optional[str] = None,
    allowed_sections: Optional[str] = None
) -> bool:
    """Assemble profile markdown into OSCAL Profile in JSON."""
    logger.debug(f'Assembling model {name} of type profile.')
    success = False

    verbose = log.get_current_verbosity_level(logger)
    args = argparse.Namespace(
        name=name,
        output=output,
        markdown=markdown_dir,
        trestle_root=self.root_dir,
        set_parameters=set_parameters,
        regenerate=regenerate,
        version=version,
        sections=sections,
        required_sections=required_sections,
        allowed_sections=allowed_sections,
        verbose=verbose
    )

    try:
        ret = profileauthorcmd.ProfileAssemble()._run(args)
        if ret == 0:
            success = True
    except Exception as e:
        raise TrestleError(f'Error assembling profile {name}: {e}')

    logger.debug(f'Model {name} assembled successfully.')
    return success
assemble_ssp_markdown(name, output, markdown_dir, compdefs, regenerate=False, version=None) ¤

Assemble ssp markdown into OSCAL SSP in JSON.

Source code in trestle/core/repository.py
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
def assemble_ssp_markdown(
    self,
    name: str,
    output: str,
    markdown_dir: str,
    compdefs: str,
    regenerate: bool = False,
    version: Optional[str] = None
) -> bool:
    """Assemble ssp markdown into OSCAL SSP in JSON."""
    logger.debug(f'Assembling model {name} of type ssp.')
    success = False

    verbose = log.get_current_verbosity_level(logger)
    args = argparse.Namespace(
        name=name,
        output=output,
        markdown=markdown_dir,
        compdefs=compdefs,
        trestle_root=self.root_dir,
        regenerate=regenerate,
        version=version,
        verbose=verbose
    )

    try:
        ret = sspauthorcmd.SSPAssemble()._run(args)
        if ret == 0:
            success = True
    except Exception as e:
        raise TrestleError(f'Error assembling ssp {name}: {e}')

    logger.debug(f'Model {name} assembled successfully.')
    return success
generate_catalog_markdown(name, output, force_overwrite=False, yaml_header=None, overwrite_header_values=False) ¤

Generate catalog markdown from OSCAL Catalog in JSON.

Source code in trestle/core/repository.py
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
def generate_catalog_markdown(
    self,
    name: str,
    output: str,
    force_overwrite: bool = False,
    yaml_header: Optional[str] = None,
    overwrite_header_values: bool = False
) -> bool:
    """Generate catalog markdown from OSCAL Catalog in JSON."""
    logger.debug(f'Generating markdown for {name} of type catalog.')
    success = False

    verbose = log.get_current_verbosity_level(logger)
    args = argparse.Namespace(
        name=name,
        output=output,
        trestle_root=self.root_dir,
        force_overwrite=force_overwrite,
        yaml_header=yaml_header,
        overwrite_header_values=overwrite_header_values,
        verbose=verbose
    )

    try:
        ret = catalogauthorcmd.CatalogGenerate()._run(args)
        if ret == 0:
            success = True
    except Exception as e:
        raise TrestleError(f'Error generate markdown for catalog {name}: {e}')

    logger.debug(f'Model {name} markdown generated successfully.')
    return success
generate_component_definition_markdown(name, output, force_overwrite=False) ¤

Generate component definition markdown from OSCAL Component Definition in JSON.

Source code in trestle/core/repository.py
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
def generate_component_definition_markdown(
    self,
    name: str,
    output: str,
    force_overwrite: bool = False,
) -> bool:
    """Generate component definition markdown from OSCAL Component Definition in JSON."""
    logger.debug(f'Generating markdown for {name} of type component definition.')
    success = False

    verbose = log.get_current_verbosity_level(logger)
    args = argparse.Namespace(
        name=name, output=output, trestle_root=self.root_dir, force_overwrite=force_overwrite, verbose=verbose
    )

    try:
        ret = componentauthorcmd.ComponentGenerate()._run(args)
        if ret == 0:
            success = True
    except Exception as e:
        raise TrestleError(f'Error generating markdown for component definition {name}: {e}')

    logger.debug(f'Model {name} markdown generated successfully.')
    return success
generate_profile_markdown(name, output, force_overwrite=False, yaml_header=None, overwrite_header_values=False, sections=None, required_sections=None) ¤

Generate profile markdown from OSCAL Profile in JSON.

Source code in trestle/core/repository.py
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
def generate_profile_markdown(
    self,
    name: str,
    output: str,
    force_overwrite: bool = False,
    yaml_header: Optional[str] = None,
    overwrite_header_values: bool = False,
    sections: Optional[str] = None,
    required_sections: Optional[str] = None
) -> bool:
    """Generate profile markdown from OSCAL Profile in JSON."""
    logger.debug(f'Generating markdown for {name} of type profile.')
    success = False

    verbose = log.get_current_verbosity_level(logger)
    args = argparse.Namespace(
        name=name,
        output=output,
        trestle_root=self.root_dir,
        force_overwrite=force_overwrite,
        yaml_header=yaml_header,
        overwrite_header_values=overwrite_header_values,
        sections=sections,
        required_sections=required_sections,
        verbose=verbose
    )

    try:
        ret = profileauthorcmd.ProfileGenerate()._run(args)
        if ret == 0:
            success = True
    except Exception as e:
        raise TrestleError(f'Error generate markdown for profile {name}: {e}')

    logger.debug(f'Model {name} markdown generated successfully.')
    return success
generate_ssp_markdown(profile, output, compdefs, leveraged_ssp=None, force_overwrite=False, include_all_parts=False, yaml_header=None, overwrite_header_values=False) ¤

Generate ssp markdown from OSCAL Profile and Component Definitions.

Source code in trestle/core/repository.py
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
def generate_ssp_markdown(
    self,
    profile: str,
    output: str,
    compdefs: str,
    leveraged_ssp: Optional[str] = None,
    force_overwrite: bool = False,
    include_all_parts: bool = False,
    yaml_header: Optional[str] = None,
    overwrite_header_values: bool = False
) -> bool:
    """Generate ssp markdown from OSCAL Profile and Component Definitions."""
    logger.debug(f'Generating markdown for {output} of type ssp.')
    success = False

    verbose = log.get_current_verbosity_level(logger)
    args = argparse.Namespace(
        profile=profile,
        output=output,
        compdefs=compdefs,
        leveraged_ssp=leveraged_ssp,
        trestle_root=self.root_dir,
        force_overwrite=force_overwrite,
        include_all_parts=include_all_parts,
        yaml_header=yaml_header,
        overwrite_header_values=overwrite_header_values,
        verbose=verbose
    )

    try:
        ret = sspauthorcmd.SSPGenerate()._run(args)
        if ret == 0:
            success = True
    except Exception as e:
        raise TrestleError(f'Error in generating markdown for ssp {output}: {e}')

    logger.debug(f'Model {output} markdown generated successfully.')
    return success

ManagedOSCAL ¤

Object representing OSCAL models in repository for programmatic manipulation.

Source code in trestle/core/repository.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
class ManagedOSCAL:
    """Object representing OSCAL models in repository for programmatic manipulation."""

    def __init__(self, root_dir: pathlib.Path, model_type: Type[OscalBaseModel], name: str) -> None:
        """Initialize repository OSCAL model object."""
        if not file_utils.is_valid_project_root(root_dir):
            raise TrestleError(f'Provided root directory {str(root_dir)} is not a valid Trestle root directory.')
        self.root_dir = root_dir
        self.model_type = model_type
        self.model_name = name

        # set model alais and dir
        self.model_alias = classname_to_alias(self.model_type.__name__, AliasMode.JSON)
        if parser.to_full_model_name(self.model_alias) is None:
            raise TrestleError(f'Given model {self.model_alias} is not a top level model.')

        plural_path = ModelUtils.model_type_to_model_dir(self.model_alias)
        self.model_dir = self.root_dir / plural_path / self.model_name

        if not self.model_dir.exists() or not self.model_dir.is_dir():
            raise TrestleError(f'Model dir {self.model_name} does not exist.')

        file_content_type = FileContentType.path_to_content_type(self.model_dir / self.model_alias)
        if file_content_type == FileContentType.UNKNOWN:
            raise TrestleError(f'Model file for model {self.model_name} does not exist.')
        self.file_content_type = file_content_type

        filepath = pathlib.Path(
            self.model_dir,
            self.model_alias + FileContentType.path_to_file_extension(self.model_dir / self.model_alias)
        )

        self.filepath = filepath

    def read(self) -> OscalBaseModel:
        """Read OSCAL model from repository."""
        logger.debug(f'Reading model {self.model_name}.')
        model = load_validate_model_path(self.root_dir, self.filepath)
        return model

    def write(self, model: OscalBaseModel) -> bool:
        """Write OSCAL model to repository."""
        logger.debug(f'Writing model {self.model_name}.')
        model_alias = classname_to_alias(model.__class__.__name__, AliasMode.JSON)
        if parser.to_full_model_name(model_alias) is None:
            raise TrestleError(f'Given model {model_alias} is not a top level model.')

        # split directory if the model was split
        split_dir = pathlib.Path(self.model_dir, self.model_alias)

        # Prepare actions; delete split model dir if any, recreate model file, and write to filepath
        top_element = Element(model)
        remove_action = RemovePathAction(split_dir)
        create_action = CreatePathAction(self.filepath, True)
        write_action = WriteFileAction(self.filepath, top_element, self.file_content_type)

        # create a plan to create the directory and imported file.
        import_plan = Plan()
        import_plan.add_action(remove_action)
        import_plan.add_action(create_action)
        import_plan.add_action(write_action)

        import_plan.execute()

        logger.debug(f'Model {self.model_name} written to repository')
        return True

    def split(self, model_file: pathlib.Path, elements: List[str]) -> bool:
        """Split the given OSCAL model file in repository.

        Model file path should be relative to the main model directory, e.g., model dir is $TRESTLE_ROOT/catalogs/NIST
        then model file path can be 'catalog/metadata.json' if metadata is to be split.

        Elements should be specified relative to model file, e.g., 'metadata.props.*'
        """
        logger.debug(f'Splitting model {self.model_name}, file {model_file}.')
        # input model_file should be relative to the model dir
        model_file_path = self.model_dir / model_file
        model_file_path = model_file_path.resolve()
        file_parent = model_file_path.parent
        filename = model_file_path.name

        elems = ''
        first = True
        for elem in elements:
            if first:
                elems = elem
                first = False
            else:
                elems = elems + ',' + elem

        success = False
        try:
            ret = splitcmd.SplitCmd().perform_split(file_parent, filename, elems, self.root_dir)
            if ret == 0:
                success = True
        except Exception as e:
            raise TrestleError(f'Error in splitting model: {e}')

        logger.debug(f'Model {self.model_name}, file {model_file} splitted successfully.')
        return success

    def merge(self, elements: List[str], parent_model_dir: Optional[pathlib.Path] = None) -> bool:
        """Merge OSCAL elements in repository.

        The parent_model_dir specifies the parent model direcotry in which to merge relative to main model dir.
        For example, if we have to merge 'metadata.*' into 'metadata' then parent_model_dir should be the 'catalog'
        dir that contains the 'metadata.json' file or the 'metadata' directory
        """
        logger.debug(f'Merging model {self.model_name}, parent dir {parent_model_dir}.')
        if parent_model_dir is None:
            effective_cwd = self.model_dir
        else:
            effective_cwd = self.model_dir / parent_model_dir

        success = True
        try:
            for elem in elements:
                plan = mergecmd.MergeCmd.merge(effective_cwd, ElementPath(elem), self.root_dir)
                plan.execute()

        except Exception as e:
            raise TrestleError(f'Error in merging model: {e}')

        logger.debug(f'Model {self.model_name} merged successfully.')
        return success

    def validate(self) -> bool:
        """Validate OSCAL model in repository."""
        logger.debug(f'Validating model {self.model_name}.')
        repo = Repository(self.root_dir)
        success = repo.validate_model(self.model_type, self.model_name)
        return success
Attributes¤
file_content_type = file_content_type instance-attribute ¤
filepath = filepath instance-attribute ¤
model_alias = classname_to_alias(self.model_type.__name__, AliasMode.JSON) instance-attribute ¤
model_dir = self.root_dir / plural_path / self.model_name instance-attribute ¤
model_name = name instance-attribute ¤
model_type = model_type instance-attribute ¤
root_dir = root_dir instance-attribute ¤
Functions¤
__init__(root_dir, model_type, name) ¤

Initialize repository OSCAL model object.

Source code in trestle/core/repository.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def __init__(self, root_dir: pathlib.Path, model_type: Type[OscalBaseModel], name: str) -> None:
    """Initialize repository OSCAL model object."""
    if not file_utils.is_valid_project_root(root_dir):
        raise TrestleError(f'Provided root directory {str(root_dir)} is not a valid Trestle root directory.')
    self.root_dir = root_dir
    self.model_type = model_type
    self.model_name = name

    # set model alais and dir
    self.model_alias = classname_to_alias(self.model_type.__name__, AliasMode.JSON)
    if parser.to_full_model_name(self.model_alias) is None:
        raise TrestleError(f'Given model {self.model_alias} is not a top level model.')

    plural_path = ModelUtils.model_type_to_model_dir(self.model_alias)
    self.model_dir = self.root_dir / plural_path / self.model_name

    if not self.model_dir.exists() or not self.model_dir.is_dir():
        raise TrestleError(f'Model dir {self.model_name} does not exist.')

    file_content_type = FileContentType.path_to_content_type(self.model_dir / self.model_alias)
    if file_content_type == FileContentType.UNKNOWN:
        raise TrestleError(f'Model file for model {self.model_name} does not exist.')
    self.file_content_type = file_content_type

    filepath = pathlib.Path(
        self.model_dir,
        self.model_alias + FileContentType.path_to_file_extension(self.model_dir / self.model_alias)
    )

    self.filepath = filepath
merge(elements, parent_model_dir=None) ¤

Merge OSCAL elements in repository.

The parent_model_dir specifies the parent model direcotry in which to merge relative to main model dir. For example, if we have to merge 'metadata.*' into 'metadata' then parent_model_dir should be the 'catalog' dir that contains the 'metadata.json' file or the 'metadata' directory

Source code in trestle/core/repository.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def merge(self, elements: List[str], parent_model_dir: Optional[pathlib.Path] = None) -> bool:
    """Merge OSCAL elements in repository.

    The parent_model_dir specifies the parent model direcotry in which to merge relative to main model dir.
    For example, if we have to merge 'metadata.*' into 'metadata' then parent_model_dir should be the 'catalog'
    dir that contains the 'metadata.json' file or the 'metadata' directory
    """
    logger.debug(f'Merging model {self.model_name}, parent dir {parent_model_dir}.')
    if parent_model_dir is None:
        effective_cwd = self.model_dir
    else:
        effective_cwd = self.model_dir / parent_model_dir

    success = True
    try:
        for elem in elements:
            plan = mergecmd.MergeCmd.merge(effective_cwd, ElementPath(elem), self.root_dir)
            plan.execute()

    except Exception as e:
        raise TrestleError(f'Error in merging model: {e}')

    logger.debug(f'Model {self.model_name} merged successfully.')
    return success
read() ¤

Read OSCAL model from repository.

Source code in trestle/core/repository.py
84
85
86
87
88
def read(self) -> OscalBaseModel:
    """Read OSCAL model from repository."""
    logger.debug(f'Reading model {self.model_name}.')
    model = load_validate_model_path(self.root_dir, self.filepath)
    return model
split(model_file, elements) ¤

Split the given OSCAL model file in repository.

Model file path should be relative to the main model directory, e.g., model dir is $TRESTLE_ROOT/catalogs/NIST then model file path can be 'catalog/metadata.json' if metadata is to be split.

Elements should be specified relative to model file, e.g., 'metadata.props.*'

Source code in trestle/core/repository.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def split(self, model_file: pathlib.Path, elements: List[str]) -> bool:
    """Split the given OSCAL model file in repository.

    Model file path should be relative to the main model directory, e.g., model dir is $TRESTLE_ROOT/catalogs/NIST
    then model file path can be 'catalog/metadata.json' if metadata is to be split.

    Elements should be specified relative to model file, e.g., 'metadata.props.*'
    """
    logger.debug(f'Splitting model {self.model_name}, file {model_file}.')
    # input model_file should be relative to the model dir
    model_file_path = self.model_dir / model_file
    model_file_path = model_file_path.resolve()
    file_parent = model_file_path.parent
    filename = model_file_path.name

    elems = ''
    first = True
    for elem in elements:
        if first:
            elems = elem
            first = False
        else:
            elems = elems + ',' + elem

    success = False
    try:
        ret = splitcmd.SplitCmd().perform_split(file_parent, filename, elems, self.root_dir)
        if ret == 0:
            success = True
    except Exception as e:
        raise TrestleError(f'Error in splitting model: {e}')

    logger.debug(f'Model {self.model_name}, file {model_file} splitted successfully.')
    return success
validate() ¤

Validate OSCAL model in repository.

Source code in trestle/core/repository.py
177
178
179
180
181
182
def validate(self) -> bool:
    """Validate OSCAL model in repository."""
    logger.debug(f'Validating model {self.model_name}.')
    repo = Repository(self.root_dir)
    success = repo.validate_model(self.model_type, self.model_name)
    return success
write(model) ¤

Write OSCAL model to repository.

Source code in trestle/core/repository.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def write(self, model: OscalBaseModel) -> bool:
    """Write OSCAL model to repository."""
    logger.debug(f'Writing model {self.model_name}.')
    model_alias = classname_to_alias(model.__class__.__name__, AliasMode.JSON)
    if parser.to_full_model_name(model_alias) is None:
        raise TrestleError(f'Given model {model_alias} is not a top level model.')

    # split directory if the model was split
    split_dir = pathlib.Path(self.model_dir, self.model_alias)

    # Prepare actions; delete split model dir if any, recreate model file, and write to filepath
    top_element = Element(model)
    remove_action = RemovePathAction(split_dir)
    create_action = CreatePathAction(self.filepath, True)
    write_action = WriteFileAction(self.filepath, top_element, self.file_content_type)

    # create a plan to create the directory and imported file.
    import_plan = Plan()
    import_plan.add_action(remove_action)
    import_plan.add_action(create_action)
    import_plan.add_action(write_action)

    import_plan.execute()

    logger.debug(f'Model {self.model_name} written to repository')
    return True

Repository ¤

Repository class for performing operations on Trestle repository.

This class provides a set of APIs to perform operations on trestle repository programmatically rather than using the command line. It takes the trestle root directory as input while creating an instance of this object. Operations such as import and get model return a ManagedOSCAL object representing the specific model that can be used to perform operations on the specific models.

Source code in trestle/core/repository.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
class Repository:
    """Repository class for performing operations on Trestle repository.

    This class provides a set of APIs to perform operations on trestle repository programmatically
    rather than using the command line. It takes the trestle root directory as input while creating
    an instance of this object. Operations such as import and get model return a ManagedOSCAL object
    representing the specific model that can be used to perform operations on the specific models.

    """

    def __init__(self, root_dir: pathlib.Path) -> None:
        """Initialize trestle repository object."""
        if not file_utils.is_valid_project_root(root_dir):
            raise TrestleError(f'Provided root directory {root_dir} is not a valid Trestle root directory.')
        self.root_dir = root_dir

    def import_model(
        self, model: OscalBaseModel, name: str, content_type: FileContentType = FileContentType.JSON
    ) -> ManagedOSCAL:
        """Import OSCAL object into trestle repository."""
        logger.debug(f'Importing model {name} of type {model.__class__.__name__}.')
        model_alias = classname_to_alias(model.__class__.__name__, AliasMode.JSON)
        if parser.to_full_model_name(model_alias) is None:
            raise TrestleError(f'Given model {model_alias} is not a top level model.')

        # Work out output directory and file
        plural_path = ModelUtils.model_type_to_model_dir(model_alias)

        desired_model_dir = self.root_dir / plural_path
        desired_model_path = desired_model_dir / name / (model_alias + FileContentType.to_file_extension(content_type))
        desired_model_path = desired_model_path.resolve()

        if desired_model_path.exists():
            raise TrestleError(f'OSCAL file to be created here: {desired_model_path} exists.')

        content_type = FileContentType.to_content_type(pathlib.Path(desired_model_path).suffix)

        # Prepare actions
        top_element = Element(model)
        create_action = CreatePathAction(desired_model_path, True)
        write_action = WriteFileAction(desired_model_path, top_element, content_type)

        # create a plan to create the directory and imported file.
        import_plan = Plan()
        import_plan.add_action(create_action)
        import_plan.add_action(write_action)
        import_plan.execute()

        # Validate the imported file, rollback if unsuccessful
        success = False
        errmsg = ''
        try:
            success = self.validate_model(model.__class__, name)
            if not success:
                errmsg = f'Validation of model {name} did not pass'
                logger.error(errmsg)
        except Exception as err:
            logger.error(errmsg)
            errmsg = f'Import of model {name} failed. Validation failed with error: {err}'

        if not success:
            # rollback in case of validation error or failure
            logger.debug(f'Rolling back import of model {name} to {desired_model_path}')
            try:
                import_plan.rollback()
            except TrestleError as err:
                logger.error(f'Failed to rollback: {err}. Remove {desired_model_path} to resolve state.')
            else:
                logger.debug(f'Successful rollback of import to {desired_model_path}')

            # raise trestle error
            raise TrestleError(errmsg)

        # all well; model was imported and validated successfully
        logger.debug(f'Model {name} of type {model.__class__.__name__} imported successfully.')
        return ManagedOSCAL(self.root_dir, model.__class__, name)

    def load_and_import_model(
        self,
        model_path: pathlib.Path,
        name: str,
        content_type: FileContentType = FileContentType.JSON
    ) -> ManagedOSCAL:
        """Load the model at the specified path into trestle with the specified name."""
        fetcher = cache.FetcherFactory.get_fetcher(self.root_dir, str(model_path))
        model, _ = fetcher.get_oscal(True)

        return self.import_model(model, name, content_type)

    def list_models(self, model_type: Type[OscalBaseModel]) -> List[str]:
        """List models of a given type in trestle repository."""
        logger.debug(f'Listing models of type {model_type.__name__}.')
        model_alias = classname_to_alias(model_type.__name__, AliasMode.JSON)
        if parser.to_full_model_name(model_alias) is None:
            raise TrestleError(f'Given model {model_alias} is not a top level model.')
        models = ModelUtils.get_models_of_type(model_alias, self.root_dir)

        return models

    def get_model(self, model_type: Type[OscalBaseModel], name: str) -> ManagedOSCAL:
        """Get a specific OSCAL model from repository."""
        logger.debug(f'Getting model {name} of type {model_type.__name__}.')
        model_alias = classname_to_alias(model_type.__name__, AliasMode.JSON)
        if parser.to_full_model_name(model_alias) is None:
            raise TrestleError(f'Given model {model_alias} is not a top level model.')
        plural_path = ModelUtils.model_type_to_model_dir(model_alias)
        desired_model_dir = self.root_dir / plural_path / name

        if not desired_model_dir.exists() or not desired_model_dir.is_dir():
            raise TrestleError(f'Model {name} does not exist.')

        return ManagedOSCAL(self.root_dir, model_type, name)

    def delete_model(self, model_type: Type[OscalBaseModel], name: str) -> bool:
        """Delete an OSCAL model from repository."""
        logger.debug(f'Deleting model {name} of type {model_type.__name__}.')
        model_alias = classname_to_alias(model_type.__name__, AliasMode.JSON)
        if parser.to_full_model_name(model_alias) is None:
            raise TrestleError(f'Given model {model_alias} is not a top level model.')
        plural_path = ModelUtils.model_type_to_model_dir(model_alias)
        desired_model_dir = self.root_dir / plural_path / name

        if not desired_model_dir.exists() or not desired_model_dir.is_dir():
            raise TrestleError(f'Model {name} does not exist.')
        shutil.rmtree(desired_model_dir)

        # remove model from dist directory if it exists
        dist_model_dir = self.root_dir / const.TRESTLE_DIST_DIR / plural_path
        file_content_type = FileContentType.path_to_content_type(dist_model_dir / name)
        if file_content_type != FileContentType.UNKNOWN:
            file_path = pathlib.Path(
                dist_model_dir, name + FileContentType.path_to_file_extension(dist_model_dir / name)
            )
            logger.debug(f'Deleting model {name} from dist directory.')
            os.remove(file_path)

        logger.debug(f'Model {name} deleted successfully.')
        return True

    def assemble_model(self, model_type: Type[OscalBaseModel], name: str, extension: str = 'json') -> bool:
        """Assemble an OSCAL model in repository and publish it to 'dist' directory."""
        logger.debug(f'Assembling model {name} of type {model_type.__name__}.')
        success = False

        model_alias = classname_to_alias(model_type.__name__, AliasMode.JSON)
        if parser.to_full_model_name(model_alias) is None:
            raise TrestleError(f'Given model {model_alias} is not a top level model.')

        verbose = log.get_current_verbosity_level(logger)
        args = argparse.Namespace(
            type=model_alias, name=name, extension=extension, trestle_root=self.root_dir, verbose=verbose
        )

        try:
            ret = assemblecmd.AssembleCmd().assemble_model(model_alias, args)
            if ret == 0:
                success = True
        except Exception as e:
            raise TrestleError(f'Error in assembling model: {e}')

        logger.debug(f'Model {name} assembled successfully.')
        return success

    def validate_model(self, model_type: Type[OscalBaseModel], name: str) -> bool:
        """Validate an OSCAL model in repository."""
        logger.debug(f'Validating model {name} of type {model_type.__name__}.')
        success = False

        model_alias = classname_to_alias(model_type.__name__, AliasMode.JSON)
        if parser.to_full_model_name(model_alias) is None:
            raise TrestleError(f'Given model {model_alias} is not a top level model.')

        verbose = log.get_current_verbosity_level(logger)
        args = argparse.Namespace(type=model_alias, name=name, trestle_root=self.root_dir, verbose=verbose, quiet=False)

        try:
            ret = validatecmd.ValidateCmd()._run(args)
            if ret == 0:
                success = True
        except Exception as e:
            raise TrestleError(f'Error in validating model: {e}')

        logger.debug(f'Model {name} validated successfully.')
        return success
Attributes¤
root_dir = root_dir instance-attribute ¤
Functions¤
__init__(root_dir) ¤

Initialize trestle repository object.

Source code in trestle/core/repository.py
195
196
197
198
199
def __init__(self, root_dir: pathlib.Path) -> None:
    """Initialize trestle repository object."""
    if not file_utils.is_valid_project_root(root_dir):
        raise TrestleError(f'Provided root directory {root_dir} is not a valid Trestle root directory.')
    self.root_dir = root_dir
assemble_model(model_type, name, extension='json') ¤

Assemble an OSCAL model in repository and publish it to 'dist' directory.

Source code in trestle/core/repository.py
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
def assemble_model(self, model_type: Type[OscalBaseModel], name: str, extension: str = 'json') -> bool:
    """Assemble an OSCAL model in repository and publish it to 'dist' directory."""
    logger.debug(f'Assembling model {name} of type {model_type.__name__}.')
    success = False

    model_alias = classname_to_alias(model_type.__name__, AliasMode.JSON)
    if parser.to_full_model_name(model_alias) is None:
        raise TrestleError(f'Given model {model_alias} is not a top level model.')

    verbose = log.get_current_verbosity_level(logger)
    args = argparse.Namespace(
        type=model_alias, name=name, extension=extension, trestle_root=self.root_dir, verbose=verbose
    )

    try:
        ret = assemblecmd.AssembleCmd().assemble_model(model_alias, args)
        if ret == 0:
            success = True
    except Exception as e:
        raise TrestleError(f'Error in assembling model: {e}')

    logger.debug(f'Model {name} assembled successfully.')
    return success
delete_model(model_type, name) ¤

Delete an OSCAL model from repository.

Source code in trestle/core/repository.py
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
def delete_model(self, model_type: Type[OscalBaseModel], name: str) -> bool:
    """Delete an OSCAL model from repository."""
    logger.debug(f'Deleting model {name} of type {model_type.__name__}.')
    model_alias = classname_to_alias(model_type.__name__, AliasMode.JSON)
    if parser.to_full_model_name(model_alias) is None:
        raise TrestleError(f'Given model {model_alias} is not a top level model.')
    plural_path = ModelUtils.model_type_to_model_dir(model_alias)
    desired_model_dir = self.root_dir / plural_path / name

    if not desired_model_dir.exists() or not desired_model_dir.is_dir():
        raise TrestleError(f'Model {name} does not exist.')
    shutil.rmtree(desired_model_dir)

    # remove model from dist directory if it exists
    dist_model_dir = self.root_dir / const.TRESTLE_DIST_DIR / plural_path
    file_content_type = FileContentType.path_to_content_type(dist_model_dir / name)
    if file_content_type != FileContentType.UNKNOWN:
        file_path = pathlib.Path(
            dist_model_dir, name + FileContentType.path_to_file_extension(dist_model_dir / name)
        )
        logger.debug(f'Deleting model {name} from dist directory.')
        os.remove(file_path)

    logger.debug(f'Model {name} deleted successfully.')
    return True
get_model(model_type, name) ¤

Get a specific OSCAL model from repository.

Source code in trestle/core/repository.py
284
285
286
287
288
289
290
291
292
293
294
295
296
def get_model(self, model_type: Type[OscalBaseModel], name: str) -> ManagedOSCAL:
    """Get a specific OSCAL model from repository."""
    logger.debug(f'Getting model {name} of type {model_type.__name__}.')
    model_alias = classname_to_alias(model_type.__name__, AliasMode.JSON)
    if parser.to_full_model_name(model_alias) is None:
        raise TrestleError(f'Given model {model_alias} is not a top level model.')
    plural_path = ModelUtils.model_type_to_model_dir(model_alias)
    desired_model_dir = self.root_dir / plural_path / name

    if not desired_model_dir.exists() or not desired_model_dir.is_dir():
        raise TrestleError(f'Model {name} does not exist.')

    return ManagedOSCAL(self.root_dir, model_type, name)
import_model(model, name, content_type=FileContentType.JSON) ¤

Import OSCAL object into trestle repository.

Source code in trestle/core/repository.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def import_model(
    self, model: OscalBaseModel, name: str, content_type: FileContentType = FileContentType.JSON
) -> ManagedOSCAL:
    """Import OSCAL object into trestle repository."""
    logger.debug(f'Importing model {name} of type {model.__class__.__name__}.')
    model_alias = classname_to_alias(model.__class__.__name__, AliasMode.JSON)
    if parser.to_full_model_name(model_alias) is None:
        raise TrestleError(f'Given model {model_alias} is not a top level model.')

    # Work out output directory and file
    plural_path = ModelUtils.model_type_to_model_dir(model_alias)

    desired_model_dir = self.root_dir / plural_path
    desired_model_path = desired_model_dir / name / (model_alias + FileContentType.to_file_extension(content_type))
    desired_model_path = desired_model_path.resolve()

    if desired_model_path.exists():
        raise TrestleError(f'OSCAL file to be created here: {desired_model_path} exists.')

    content_type = FileContentType.to_content_type(pathlib.Path(desired_model_path).suffix)

    # Prepare actions
    top_element = Element(model)
    create_action = CreatePathAction(desired_model_path, True)
    write_action = WriteFileAction(desired_model_path, top_element, content_type)

    # create a plan to create the directory and imported file.
    import_plan = Plan()
    import_plan.add_action(create_action)
    import_plan.add_action(write_action)
    import_plan.execute()

    # Validate the imported file, rollback if unsuccessful
    success = False
    errmsg = ''
    try:
        success = self.validate_model(model.__class__, name)
        if not success:
            errmsg = f'Validation of model {name} did not pass'
            logger.error(errmsg)
    except Exception as err:
        logger.error(errmsg)
        errmsg = f'Import of model {name} failed. Validation failed with error: {err}'

    if not success:
        # rollback in case of validation error or failure
        logger.debug(f'Rolling back import of model {name} to {desired_model_path}')
        try:
            import_plan.rollback()
        except TrestleError as err:
            logger.error(f'Failed to rollback: {err}. Remove {desired_model_path} to resolve state.')
        else:
            logger.debug(f'Successful rollback of import to {desired_model_path}')

        # raise trestle error
        raise TrestleError(errmsg)

    # all well; model was imported and validated successfully
    logger.debug(f'Model {name} of type {model.__class__.__name__} imported successfully.')
    return ManagedOSCAL(self.root_dir, model.__class__, name)
list_models(model_type) ¤

List models of a given type in trestle repository.

Source code in trestle/core/repository.py
274
275
276
277
278
279
280
281
282
def list_models(self, model_type: Type[OscalBaseModel]) -> List[str]:
    """List models of a given type in trestle repository."""
    logger.debug(f'Listing models of type {model_type.__name__}.')
    model_alias = classname_to_alias(model_type.__name__, AliasMode.JSON)
    if parser.to_full_model_name(model_alias) is None:
        raise TrestleError(f'Given model {model_alias} is not a top level model.')
    models = ModelUtils.get_models_of_type(model_alias, self.root_dir)

    return models
load_and_import_model(model_path, name, content_type=FileContentType.JSON) ¤

Load the model at the specified path into trestle with the specified name.

Source code in trestle/core/repository.py
262
263
264
265
266
267
268
269
270
271
272
def load_and_import_model(
    self,
    model_path: pathlib.Path,
    name: str,
    content_type: FileContentType = FileContentType.JSON
) -> ManagedOSCAL:
    """Load the model at the specified path into trestle with the specified name."""
    fetcher = cache.FetcherFactory.get_fetcher(self.root_dir, str(model_path))
    model, _ = fetcher.get_oscal(True)

    return self.import_model(model, name, content_type)
validate_model(model_type, name) ¤

Validate an OSCAL model in repository.

Source code in trestle/core/repository.py
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
def validate_model(self, model_type: Type[OscalBaseModel], name: str) -> bool:
    """Validate an OSCAL model in repository."""
    logger.debug(f'Validating model {name} of type {model_type.__name__}.')
    success = False

    model_alias = classname_to_alias(model_type.__name__, AliasMode.JSON)
    if parser.to_full_model_name(model_alias) is None:
        raise TrestleError(f'Given model {model_alias} is not a top level model.')

    verbose = log.get_current_verbosity_level(logger)
    args = argparse.Namespace(type=model_alias, name=name, trestle_root=self.root_dir, verbose=verbose, quiet=False)

    try:
        ret = validatecmd.ValidateCmd()._run(args)
        if ret == 0:
            success = True
    except Exception as e:
        raise TrestleError(f'Error in validating model: {e}')

    logger.debug(f'Model {name} validated successfully.')
    return success

Functions¤

handler: python