Skip to content

replicate

trestle.core.commands.replicate ¤

Trestle Replicate Command.

Attributes¤

logger = logging.getLogger(__name__) module-attribute ¤

Classes¤

ReplicateCmd ¤

Bases: CommandPlusDocs

Replicate a top level model within the trestle directory structure.

Source code in trestle/core/commands/replicate.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
class ReplicateCmd(CommandPlusDocs):
    """Replicate a top level model within the trestle directory structure."""

    name = 'replicate'

    def _init_arguments(self) -> None:
        logger.debug('Init arguments')

        self.add_argument('model', help='Choose OSCAL model', choices=const.MODEL_TYPE_LIST)
        self.add_argument('-n', '--name', help='Name of model to replicate.', type=str, required=True)
        self.add_argument('-o', '--output', help='Name of replicated model.', type=str, required=True)
        self.add_argument('-r', '--regenerate', action='store_true', help=const.HELP_REGENERATE)

    def _run(self, args: argparse.Namespace) -> int:
        """Execute and process the args."""
        try:
            log.set_log_level_from_args(args)
            return self.replicate_object(args.model, args)
        except Exception as e:  # pragma: no cover
            return handle_generic_command_exception(e, logger, 'Error while replicating model')

    @classmethod
    def replicate_object(cls, model_alias: str, args: argparse.Namespace) -> int:
        """
        Core replicate routine invoked by subcommands.

        Args:
            model_alias: Name of the top level model in the trestle directory.
            args: CLI arguments
        Returns:
            A return code that can be used as standard posix codes. 0 is success.
        """
        logger.debug('Entering replicate_object.')

        # 1 Bad working directory if not running from current working directory
        trestle_root = args.trestle_root  # trestle root is set via command line in args. Default is cwd.
        if not trestle_root or not file_utils.is_valid_project_root(trestle_root):
            raise TrestleError(f'Given directory: {trestle_root} is not a trestle project.')

        plural_path = ModelUtils.model_type_to_model_dir(model_alias)

        # 2 Check that input file given exists.

        input_file_stem = trestle_root / plural_path / args.name / model_alias
        content_type = FileContentType.path_to_content_type(input_file_stem)
        if content_type == FileContentType.UNKNOWN:
            raise TrestleError(
                f'Input file {args.name} has no json or yaml file at expected location {input_file_stem}.'
            )

        input_file = input_file_stem.with_suffix(FileContentType.to_file_extension(content_type))

        # 3 Distributed load from file
        _, model_alias, model_instance = ModelUtils.load_distributed(input_file, trestle_root)

        rep_model_path = trestle_root / plural_path / args.output / (
            model_alias + FileContentType.to_file_extension(content_type)
        )

        if rep_model_path.exists():
            raise TrestleError(f'OSCAL file to be replicated here: {rep_model_path} exists.')

        if args.regenerate:
            logger.debug(f'regenerating uuids for model {input_file}')
            model_instance, uuid_lut, n_refs_updated = ModelUtils.regenerate_uuids(model_instance)
            logger.debug(f'{len(uuid_lut)} uuids generated and {n_refs_updated} references updated')

        # 4 Prepare actions and plan
        top_element = Element(model_instance)
        create_action = CreatePathAction(rep_model_path, True)
        write_action = WriteFileAction(rep_model_path, top_element, content_type)

        # create a plan to create the directory and imported file.
        replicate_plan = Plan()
        replicate_plan.add_action(create_action)
        replicate_plan.add_action(write_action)

        replicate_plan.execute()

        return CmdReturnCodes.SUCCESS.value
Attributes¤
name = 'replicate' class-attribute instance-attribute ¤
Functions¤
replicate_object(model_alias, args) classmethod ¤

Core replicate routine invoked by subcommands.

Parameters:

Name Type Description Default
model_alias str

Name of the top level model in the trestle directory.

required
args Namespace

CLI arguments

required

Returns: A return code that can be used as standard posix codes. 0 is success.

Source code in trestle/core/commands/replicate.py
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
@classmethod
def replicate_object(cls, model_alias: str, args: argparse.Namespace) -> int:
    """
    Core replicate routine invoked by subcommands.

    Args:
        model_alias: Name of the top level model in the trestle directory.
        args: CLI arguments
    Returns:
        A return code that can be used as standard posix codes. 0 is success.
    """
    logger.debug('Entering replicate_object.')

    # 1 Bad working directory if not running from current working directory
    trestle_root = args.trestle_root  # trestle root is set via command line in args. Default is cwd.
    if not trestle_root or not file_utils.is_valid_project_root(trestle_root):
        raise TrestleError(f'Given directory: {trestle_root} is not a trestle project.')

    plural_path = ModelUtils.model_type_to_model_dir(model_alias)

    # 2 Check that input file given exists.

    input_file_stem = trestle_root / plural_path / args.name / model_alias
    content_type = FileContentType.path_to_content_type(input_file_stem)
    if content_type == FileContentType.UNKNOWN:
        raise TrestleError(
            f'Input file {args.name} has no json or yaml file at expected location {input_file_stem}.'
        )

    input_file = input_file_stem.with_suffix(FileContentType.to_file_extension(content_type))

    # 3 Distributed load from file
    _, model_alias, model_instance = ModelUtils.load_distributed(input_file, trestle_root)

    rep_model_path = trestle_root / plural_path / args.output / (
        model_alias + FileContentType.to_file_extension(content_type)
    )

    if rep_model_path.exists():
        raise TrestleError(f'OSCAL file to be replicated here: {rep_model_path} exists.')

    if args.regenerate:
        logger.debug(f'regenerating uuids for model {input_file}')
        model_instance, uuid_lut, n_refs_updated = ModelUtils.regenerate_uuids(model_instance)
        logger.debug(f'{len(uuid_lut)} uuids generated and {n_refs_updated} references updated')

    # 4 Prepare actions and plan
    top_element = Element(model_instance)
    create_action = CreatePathAction(rep_model_path, True)
    write_action = WriteFileAction(rep_model_path, top_element, content_type)

    # create a plan to create the directory and imported file.
    replicate_plan = Plan()
    replicate_plan.add_action(create_action)
    replicate_plan.add_action(write_action)

    replicate_plan.execute()

    return CmdReturnCodes.SUCCESS.value

Functions¤

handler: python