Skip to content

trestle.core.commands.import_

trestle.core.commands.import_ ¤

Trestle Import Command.

Attributes¤

logger = logging.getLogger(__name__) module-attribute ¤

Classes¤

ImportCmd ¤

Bases: CommandPlusDocs

Import an existing full OSCAL model into the trestle workspace.

Source code in trestle/core/commands/import_.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
class ImportCmd(CommandPlusDocs):
    """Import an existing full OSCAL model into the trestle workspace."""

    name = 'import'

    def _init_arguments(self) -> None:
        logger.debug('Init arguments')
        self.add_argument(
            '-f', '--file', help='OSCAL file to import - either file path or url.', type=str, required=True
        )
        self.add_argument('-o', '--output', help='Name of output element.', type=str, required=True)
        self.add_argument('-r', '--regenerate', action='store_true', help=const.HELP_REGENERATE)

    def _run(self, args: argparse.Namespace) -> int:
        """Top level import run command."""
        try:
            log.set_log_level_from_args(args)
            trestle_root = args.trestle_root
            if not file_utils.is_valid_project_root(trestle_root):
                raise TrestleRootError(f'Attempt to import from non-valid trestle project root {trestle_root}')

            input_uri = args.file
            if cache.FetcherFactory.in_trestle_directory(trestle_root, input_uri):
                raise TrestleError(
                    f'Imported file {input_uri} cannot be from current trestle project. Use duplicate instead.'
                )

            content_type = FileContentType.to_content_type('.' + input_uri.split('.')[-1])

            fetcher = cache.FetcherFactory.get_fetcher(trestle_root, str(input_uri))

            model_read, parent_alias = fetcher.get_oscal(True)

            # validate the loaded model in memory before writing out
            # this will do any needed fixes to the file, such as assign missing catalog group ids
            args_validate = argparse.Namespace(mode=const.VAL_MODE_ALL)
            validator: Validator = validator_factory.get(args_validate)
            if not validator.model_is_valid(model_read, True, trestle_root):  # type: ignore
                logger.warning(f'Validation of file to be imported {input_uri} did not pass.  Import failed.')
                return CmdReturnCodes.COMMAND_ERROR.value

            plural_path = ModelUtils.model_type_to_model_dir(parent_alias)

            output_name = args.output

            desired_model_dir = trestle_root / plural_path
            desired_model_path: pathlib.Path = desired_model_dir / output_name / parent_alias
            desired_model_path = desired_model_path.with_suffix(FileContentType.to_file_extension(content_type)
                                                                ).resolve()

            if desired_model_path.exists():
                logger.warning(f'Cannot import because file to be imported here: {desired_model_path} already exists.')
                return CmdReturnCodes.COMMAND_ERROR.value

            if args.regenerate:
                logger.debug(f'regenerating uuids in imported file {input_uri}')
                model_read, lut, nchanged = ModelUtils.regenerate_uuids(model_read)
                logger.debug(f'uuid lut has {len(lut.items())} entries and {nchanged} refs were updated')

            top_element = Element(model_read)
            create_action = CreatePathAction(desired_model_path, True)
            write_action = WriteFileAction(desired_model_path, top_element, content_type)

            # create a plan to create the directory and write the imported file.
            import_plan = Plan()
            import_plan.add_action(create_action)
            import_plan.add_action(write_action)

            import_plan.execute()

            args = argparse.Namespace(
                file=desired_model_path,
                verbose=args.verbose,
                trestle_root=args.trestle_root,
                type=None,
                all=None,
                quiet=True
            )
            return CmdReturnCodes.SUCCESS.value

        except Exception as e:  # pragma: no cover
            return handle_generic_command_exception(e, logger, 'Error while importing OSCAL file')
Attributes¤
name = 'import' class-attribute instance-attribute ¤

Functions¤

handler: python