Skip to content

merge

trestle.core.commands.merge ¤

Trestle Merge Command.

logger ¤

trace ¤

Classes¤

MergeCmd (CommandPlusDocs) ¤

Merge subcomponents on a trestle model.

Source code in trestle/core/commands/merge.py
class MergeCmd(CommandPlusDocs):
    """Merge subcomponents on a trestle model."""

    name = 'merge'

    def _init_arguments(self) -> None:
        self.add_argument(
            f'-{const.ARG_ELEMENT_SHORT}',
            f'--{const.ARG_ELEMENT}',
            help=f'{const.ARG_DESC_ELEMENT}(s) to be merged. The last element is merged into the second last element.',
            required=True
        )

    def _run(self, args: argparse.Namespace) -> int:
        """Merge elements into the parent oscal model."""
        try:
            log.set_log_level_from_args(args)

            # remove any quotes passed in as on windows platforms
            elements_clean = args.element.strip("'")
            element_paths = elements_clean.split(',')
            trace.log(f'merge _run element paths {element_paths}')
            cwd = Path.cwd()
            rc = self.perform_all_merges(element_paths, cwd, args.trestle_root)
            return rc
        except Exception as e:  # pragma: no cover
            return handle_generic_command_exception(e, logger, 'Error while merging subcomponents on a trestle model')

    @classmethod
    def perform_all_merges(cls, element_paths: List[str], effective_cwd: Path, trestle_root: Path) -> int:
        """Run all merges over a list of element paths."""
        for element_path in element_paths:
            logger.debug(f'merge {element_path}')
            plan = cls.merge(effective_cwd, ElementPath(element_path), trestle_root)
            plan.execute()
        return CmdReturnCodes.SUCCESS.value

    @classmethod
    def merge(cls, effective_cwd: Path, element_path: ElementPath, trestle_root: Path) -> Plan:
        """Merge operations.

        It returns a plan for the operation
        """
        if not element_path.is_multipart():
            raise TrestleError(
                'Multiple parts of an element path must be passed to merge e.g. catalog.* or catalog.groups'
            )

        target_model_alias = element_path.get_last()
        logger.debug(f'merge element path list: {element_path} target model alias {target_model_alias}')
        # 1. Load desination model into a stripped model
        # Load destination model
        destination_path = element_path.get_preceding_path()
        destination_model_alias = destination_path.get_last()
        trace.log(f'merge destination model alias: {destination_model_alias}')
        trace.log('merge getting contextual file type effective working directory')
        # Destination model filetype
        file_type = file_utils.get_contextual_file_type(effective_cwd)
        trace.log(f'contextual file type is {file_type}')

        file_ext = FileContentType.to_file_extension(file_type)
        # Destination model filename
        destination_model_path = (
            effective_cwd / f'{classname_to_alias(destination_model_alias, AliasMode.JSON)}{file_ext}'
        )
        trace.log(f'destination model filename is {destination_model_path}')
        destination_model_type, _ = ModelUtils.get_stripped_model_type(destination_model_path, trestle_root)

        destination_model_object: OscalBaseModel = None
        if destination_model_path.exists():
            trace.log('dest filename exists so read it')
            destination_model_object = destination_model_type.oscal_read(destination_model_path)
        # 2. If target is wildcard, load distributed destination model and replace destination model.
        # Handle WILDCARD '*' match. Return plan to load the destination model, with its distributed attributes
        if target_model_alias == '*':
            trace.log('handle target model alias wildcard')
            collection_type = None
            if destination_model_type.is_collection_container():
                collection_type = destination_model_type.get_collection_type()

            merged_model_type, _, merged_model_instance = ModelUtils.load_distributed(
                destination_model_path, trestle_root, collection_type)
            plan = Plan()
            reset_destination_action = CreatePathAction(destination_model_path, clear_content=True)
            wrapper_alias = destination_model_alias
            write_destination_action = WriteFileAction(
                destination_model_path, Element(merged_model_instance, wrapper_alias), content_type=file_type
            )
            remove_path_folder = effective_cwd / destination_model_alias
            delete_target_action = RemovePathAction(remove_path_folder)
            plan: Plan = Plan()
            plan.add_action(reset_destination_action)
            plan.add_action(write_destination_action)
            plan.add_action(delete_target_action)
            return plan

        trace.log(f'get dest model with fields stripped: {target_model_alias}')
        # Get destination model without the target field stripped
        merged_model_type, _ = ModelUtils.get_stripped_model_type(destination_model_path, trestle_root,
                                                                  aliases_not_to_be_stripped=[target_model_alias])
        # 3. Load Target model. Target model could be stripped
        try:
            target_model_type = element_path.get_type(merged_model_type)
        except Exception as e:
            logger.debug(f'target model not found, element path list {element_path} type {merged_model_type}')
            raise TrestleError(
                f'Target model not found. Possibly merge of the elements not allowed at this point. {str(e)}'
            )
        target_model_path = effective_cwd / destination_model_alias
        trace.log(f'look for target model path {target_model_path} at dest alias {destination_model_alias} rel to cwd')

        # target_model filename - depends whether destination model is decomposed or not
        if target_model_path.exists():
            trace.log(f'target model path does exist so target path is subdir with target alias {target_model_alias}')
            target_model_path = target_model_path / target_model_alias
        else:
            trace.log(f'target model filename does not exist so target path is target alias {target_model_alias}')
            target_model_path = target_model_path / target_model_alias  # FIXME this is same as above
        trace.log(f'final target model path is {target_model_path}')

        # if target model is a file then handle file. If file doesn't exist, handle the directory,
        # but in this case it's a list or a dict collection type
        target_model_filename = target_model_path.with_suffix(file_ext)
        if target_model_filename.exists():
            trace.log(f'target model path with extension does exist so load distrib {target_model_filename}')
            _, _, target_model_object = ModelUtils.load_distributed(target_model_filename, trestle_root)
        else:
            target_model_filename = Path(target_model_path)
            trace.log(f'target model path plus extension does not exist so load distrib {target_model_filename}')
            trace.log(f'get collection type for model type {target_model_type}')
            collection_type = type_utils.get_origin(target_model_type)
            trace.log(f'load {target_model_filename} as collection type {collection_type}')
            _, _, target_model_object = ModelUtils.load_distributed(target_model_filename,
                                                                    trestle_root, collection_type)

        if hasattr(target_model_object, '__dict__') and '__root__' in target_model_object.__dict__:
            trace.log('loaded object has dict and root so set target model object to root contents')
            target_model_object = target_model_object.__dict__['__root__']
        # 4. Insert target model into destination model.
        merged_dict = {}
        if destination_model_object is not None:
            merged_dict = destination_model_object.__dict__
        merged_dict[target_model_alias] = target_model_object
        merged_model_object = merged_model_type(**merged_dict)
        merged_destination_element = Element(merged_model_object)
        # 5. Create action  plan
        trace.log(f'create path action clear content: {destination_model_path}')
        reset_destination_action = CreatePathAction(destination_model_path, clear_content=True)
        trace.log(f'write file action {destination_model_path}')
        write_destination_action = WriteFileAction(
            destination_model_path, merged_destination_element, content_type=file_type
        )
        # FIXME this will delete metadata.json but it will leave metadata/roles/roles.*
        # need to clean up all lower dirs
        trace.log(f'remove path action {target_model_filename}')
        delete_target_action = RemovePathAction(target_model_filename)

        plan: Plan = Plan()
        plan.add_action(reset_destination_action)
        plan.add_action(write_destination_action)
        plan.add_action(delete_target_action)

        # TODO: Destination model directory is empty or already merged? Then clean up.

        return plan
name ¤
Methods¤
merge(effective_cwd, element_path, trestle_root) classmethod ¤

Merge operations.

It returns a plan for the operation

Source code in trestle/core/commands/merge.py
@classmethod
def merge(cls, effective_cwd: Path, element_path: ElementPath, trestle_root: Path) -> Plan:
    """Merge operations.

    It returns a plan for the operation
    """
    if not element_path.is_multipart():
        raise TrestleError(
            'Multiple parts of an element path must be passed to merge e.g. catalog.* or catalog.groups'
        )

    target_model_alias = element_path.get_last()
    logger.debug(f'merge element path list: {element_path} target model alias {target_model_alias}')
    # 1. Load desination model into a stripped model
    # Load destination model
    destination_path = element_path.get_preceding_path()
    destination_model_alias = destination_path.get_last()
    trace.log(f'merge destination model alias: {destination_model_alias}')
    trace.log('merge getting contextual file type effective working directory')
    # Destination model filetype
    file_type = file_utils.get_contextual_file_type(effective_cwd)
    trace.log(f'contextual file type is {file_type}')

    file_ext = FileContentType.to_file_extension(file_type)
    # Destination model filename
    destination_model_path = (
        effective_cwd / f'{classname_to_alias(destination_model_alias, AliasMode.JSON)}{file_ext}'
    )
    trace.log(f'destination model filename is {destination_model_path}')
    destination_model_type, _ = ModelUtils.get_stripped_model_type(destination_model_path, trestle_root)

    destination_model_object: OscalBaseModel = None
    if destination_model_path.exists():
        trace.log('dest filename exists so read it')
        destination_model_object = destination_model_type.oscal_read(destination_model_path)
    # 2. If target is wildcard, load distributed destination model and replace destination model.
    # Handle WILDCARD '*' match. Return plan to load the destination model, with its distributed attributes
    if target_model_alias == '*':
        trace.log('handle target model alias wildcard')
        collection_type = None
        if destination_model_type.is_collection_container():
            collection_type = destination_model_type.get_collection_type()

        merged_model_type, _, merged_model_instance = ModelUtils.load_distributed(
            destination_model_path, trestle_root, collection_type)
        plan = Plan()
        reset_destination_action = CreatePathAction(destination_model_path, clear_content=True)
        wrapper_alias = destination_model_alias
        write_destination_action = WriteFileAction(
            destination_model_path, Element(merged_model_instance, wrapper_alias), content_type=file_type
        )
        remove_path_folder = effective_cwd / destination_model_alias
        delete_target_action = RemovePathAction(remove_path_folder)
        plan: Plan = Plan()
        plan.add_action(reset_destination_action)
        plan.add_action(write_destination_action)
        plan.add_action(delete_target_action)
        return plan

    trace.log(f'get dest model with fields stripped: {target_model_alias}')
    # Get destination model without the target field stripped
    merged_model_type, _ = ModelUtils.get_stripped_model_type(destination_model_path, trestle_root,
                                                              aliases_not_to_be_stripped=[target_model_alias])
    # 3. Load Target model. Target model could be stripped
    try:
        target_model_type = element_path.get_type(merged_model_type)
    except Exception as e:
        logger.debug(f'target model not found, element path list {element_path} type {merged_model_type}')
        raise TrestleError(
            f'Target model not found. Possibly merge of the elements not allowed at this point. {str(e)}'
        )
    target_model_path = effective_cwd / destination_model_alias
    trace.log(f'look for target model path {target_model_path} at dest alias {destination_model_alias} rel to cwd')

    # target_model filename - depends whether destination model is decomposed or not
    if target_model_path.exists():
        trace.log(f'target model path does exist so target path is subdir with target alias {target_model_alias}')
        target_model_path = target_model_path / target_model_alias
    else:
        trace.log(f'target model filename does not exist so target path is target alias {target_model_alias}')
        target_model_path = target_model_path / target_model_alias  # FIXME this is same as above
    trace.log(f'final target model path is {target_model_path}')

    # if target model is a file then handle file. If file doesn't exist, handle the directory,
    # but in this case it's a list or a dict collection type
    target_model_filename = target_model_path.with_suffix(file_ext)
    if target_model_filename.exists():
        trace.log(f'target model path with extension does exist so load distrib {target_model_filename}')
        _, _, target_model_object = ModelUtils.load_distributed(target_model_filename, trestle_root)
    else:
        target_model_filename = Path(target_model_path)
        trace.log(f'target model path plus extension does not exist so load distrib {target_model_filename}')
        trace.log(f'get collection type for model type {target_model_type}')
        collection_type = type_utils.get_origin(target_model_type)
        trace.log(f'load {target_model_filename} as collection type {collection_type}')
        _, _, target_model_object = ModelUtils.load_distributed(target_model_filename,
                                                                trestle_root, collection_type)

    if hasattr(target_model_object, '__dict__') and '__root__' in target_model_object.__dict__:
        trace.log('loaded object has dict and root so set target model object to root contents')
        target_model_object = target_model_object.__dict__['__root__']
    # 4. Insert target model into destination model.
    merged_dict = {}
    if destination_model_object is not None:
        merged_dict = destination_model_object.__dict__
    merged_dict[target_model_alias] = target_model_object
    merged_model_object = merged_model_type(**merged_dict)
    merged_destination_element = Element(merged_model_object)
    # 5. Create action  plan
    trace.log(f'create path action clear content: {destination_model_path}')
    reset_destination_action = CreatePathAction(destination_model_path, clear_content=True)
    trace.log(f'write file action {destination_model_path}')
    write_destination_action = WriteFileAction(
        destination_model_path, merged_destination_element, content_type=file_type
    )
    # FIXME this will delete metadata.json but it will leave metadata/roles/roles.*
    # need to clean up all lower dirs
    trace.log(f'remove path action {target_model_filename}')
    delete_target_action = RemovePathAction(target_model_filename)

    plan: Plan = Plan()
    plan.add_action(reset_destination_action)
    plan.add_action(write_destination_action)
    plan.add_action(delete_target_action)

    # TODO: Destination model directory is empty or already merged? Then clean up.

    return plan
perform_all_merges(element_paths, effective_cwd, trestle_root) classmethod ¤

Run all merges over a list of element paths.

Source code in trestle/core/commands/merge.py
@classmethod
def perform_all_merges(cls, element_paths: List[str], effective_cwd: Path, trestle_root: Path) -> int:
    """Run all merges over a list of element paths."""
    for element_path in element_paths:
        logger.debug(f'merge {element_path}')
        plan = cls.merge(effective_cwd, ElementPath(element_path), trestle_root)
        plan.execute()
    return CmdReturnCodes.SUCCESS.value

handler: python