Skip to content

trestle.core.commands.merge

trestle.core.commands.merge ¤

Trestle Merge Command.

Attributes¤

logger = logging.getLogger(__name__) module-attribute ¤

trace = log.Trace(logger) module-attribute ¤

Classes¤

MergeCmd ¤

Bases: CommandPlusDocs

Merge subcomponents on a trestle model.

Source code in trestle/core/commands/merge.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
class MergeCmd(CommandPlusDocs):
    """Merge subcomponents on a trestle model."""

    name = 'merge'

    def _init_arguments(self) -> None:
        self.add_argument(
            f'-{const.ARG_ELEMENT_SHORT}',
            f'--{const.ARG_ELEMENT}',
            help=f'{const.ARG_DESC_ELEMENT}(s) to be merged. The last element is merged into the second last element.',
            required=True
        )

    def _run(self, args: argparse.Namespace) -> int:
        """Merge elements into the parent oscal model."""
        try:
            log.set_log_level_from_args(args)

            # remove any quotes passed in as on windows platforms
            elements_clean = args.element.strip("'")
            element_paths = elements_clean.split(',')
            trace.log(f'merge _run element paths {element_paths}')
            cwd = Path.cwd()
            rc = self.perform_all_merges(element_paths, cwd, args.trestle_root)
            return rc
        except Exception as e:  # pragma: no cover
            return handle_generic_command_exception(e, logger, 'Error while merging subcomponents on a trestle model')

    @classmethod
    def perform_all_merges(cls, element_paths: List[str], effective_cwd: Path, trestle_root: Path) -> int:
        """Run all merges over a list of element paths."""
        for element_path in element_paths:
            logger.debug(f'merge {element_path}')
            plan = cls.merge(effective_cwd, ElementPath(element_path), trestle_root)
            plan.execute()
        return CmdReturnCodes.SUCCESS.value

    @classmethod
    def merge(cls, effective_cwd: Path, element_path: ElementPath, trestle_root: Path) -> Plan:
        """Merge operations.

        It returns a plan for the operation
        """
        if not element_path.is_multipart():
            raise TrestleError(
                'Multiple parts of an element path must be passed to merge e.g. catalog.* or catalog.groups'
            )

        target_model_alias = element_path.get_last()
        logger.debug(f'merge element path list: {element_path} target model alias {target_model_alias}')
        # 1. Load desination model into a stripped model
        # Load destination model
        destination_path = element_path.get_preceding_path()
        destination_model_alias = destination_path.get_last()
        trace.log(f'merge destination model alias: {destination_model_alias}')
        trace.log('merge getting contextual file type effective working directory')
        # Destination model filetype
        file_type = file_utils.get_contextual_file_type(effective_cwd)
        trace.log(f'contextual file type is {file_type}')

        file_ext = FileContentType.to_file_extension(file_type)
        # Destination model filename
        destination_model_path = (
            effective_cwd / f'{classname_to_alias(destination_model_alias, AliasMode.JSON)}{file_ext}'
        )
        trace.log(f'destination model filename is {destination_model_path}')
        destination_model_type, _ = ModelUtils.get_stripped_model_type(destination_model_path, trestle_root)

        destination_model_object: OscalBaseModel = None
        if destination_model_path.exists():
            trace.log('dest filename exists so read it')
            destination_model_object = destination_model_type.oscal_read(destination_model_path)
        # 2. If target is wildcard, load distributed destination model and replace destination model.
        # Handle WILDCARD '*' match. Return plan to load the destination model, with its distributed attributes
        if target_model_alias == '*':
            trace.log('handle target model alias wildcard')
            collection_type = None
            if destination_model_type.is_collection_container():
                collection_type = destination_model_type.get_collection_type()

            merged_model_type, _, merged_model_instance = ModelUtils.load_distributed(
                destination_model_path, trestle_root, collection_type)
            plan = Plan()
            reset_destination_action = CreatePathAction(destination_model_path, clear_content=True)
            wrapper_alias = destination_model_alias
            write_destination_action = WriteFileAction(
                destination_model_path, Element(merged_model_instance, wrapper_alias), content_type=file_type
            )
            remove_path_folder = effective_cwd / destination_model_alias
            delete_target_action = RemovePathAction(remove_path_folder)
            plan: Plan = Plan()
            plan.add_action(reset_destination_action)
            plan.add_action(write_destination_action)
            plan.add_action(delete_target_action)
            return plan

        trace.log(f'get dest model with fields stripped: {target_model_alias}')
        # Get destination model without the target field stripped
        merged_model_type, _ = ModelUtils.get_stripped_model_type(destination_model_path, trestle_root,
                                                                  aliases_not_to_be_stripped=[target_model_alias])
        # 3. Load Target model. Target model could be stripped
        try:
            target_model_type = element_path.get_type(merged_model_type)
        except Exception as e:
            logger.debug(f'target model not found, element path list {element_path} type {merged_model_type}')
            raise TrestleError(
                f'Target model not found. Possibly merge of the elements not allowed at this point. {str(e)}'
            )
        target_model_path = effective_cwd / destination_model_alias
        trace.log(f'look for target model path {target_model_path} at dest alias {destination_model_alias} rel to cwd')

        # target_model filename - depends whether destination model is decomposed or not
        if target_model_path.exists():
            trace.log(f'target model path does exist so target path is subdir with target alias {target_model_alias}')
            target_model_path = target_model_path / target_model_alias
        else:
            trace.log(f'target model filename does not exist so target path is target alias {target_model_alias}')
            target_model_path = target_model_path / target_model_alias  # FIXME this is same as above
        trace.log(f'final target model path is {target_model_path}')

        # if target model is a file then handle file. If file doesn't exist, handle the directory,
        # but in this case it's a list or a dict collection type
        target_model_filename = target_model_path.with_suffix(file_ext)
        if target_model_filename.exists():
            trace.log(f'target model path with extension does exist so load distrib {target_model_filename}')
            _, _, target_model_object = ModelUtils.load_distributed(target_model_filename, trestle_root)
        else:
            target_model_filename = Path(target_model_path)
            trace.log(f'target model path plus extension does not exist so load distrib {target_model_filename}')
            trace.log(f'get collection type for model type {target_model_type}')
            collection_type = type_utils.get_origin(target_model_type)
            trace.log(f'load {target_model_filename} as collection type {collection_type}')
            _, _, target_model_object = ModelUtils.load_distributed(target_model_filename,
                                                                    trestle_root, collection_type)

        if hasattr(target_model_object, '__dict__') and '__root__' in target_model_object.__dict__:
            trace.log('loaded object has dict and root so set target model object to root contents')
            target_model_object = target_model_object.__dict__['__root__']
        # 4. Insert target model into destination model.
        merged_dict = {}
        if destination_model_object is not None:
            merged_dict = destination_model_object.__dict__
        merged_dict[target_model_alias] = target_model_object
        merged_model_object = merged_model_type(**merged_dict)
        merged_destination_element = Element(merged_model_object)
        # 5. Create action  plan
        trace.log(f'create path action clear content: {destination_model_path}')
        reset_destination_action = CreatePathAction(destination_model_path, clear_content=True)
        trace.log(f'write file action {destination_model_path}')
        write_destination_action = WriteFileAction(
            destination_model_path, merged_destination_element, content_type=file_type
        )
        # FIXME this will delete metadata.json but it will leave metadata/roles/roles.*
        # need to clean up all lower dirs
        trace.log(f'remove path action {target_model_filename}')
        delete_target_action = RemovePathAction(target_model_filename)

        plan: Plan = Plan()
        plan.add_action(reset_destination_action)
        plan.add_action(write_destination_action)
        plan.add_action(delete_target_action)

        # TODO: Destination model directory is empty or already merged? Then clean up.

        return plan
Attributes¤
name = 'merge' class-attribute instance-attribute ¤
Functions¤
merge(effective_cwd, element_path, trestle_root) classmethod ¤

Merge operations.

It returns a plan for the operation

Source code in trestle/core/commands/merge.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
@classmethod
def merge(cls, effective_cwd: Path, element_path: ElementPath, trestle_root: Path) -> Plan:
    """Merge operations.

    It returns a plan for the operation
    """
    if not element_path.is_multipart():
        raise TrestleError(
            'Multiple parts of an element path must be passed to merge e.g. catalog.* or catalog.groups'
        )

    target_model_alias = element_path.get_last()
    logger.debug(f'merge element path list: {element_path} target model alias {target_model_alias}')
    # 1. Load desination model into a stripped model
    # Load destination model
    destination_path = element_path.get_preceding_path()
    destination_model_alias = destination_path.get_last()
    trace.log(f'merge destination model alias: {destination_model_alias}')
    trace.log('merge getting contextual file type effective working directory')
    # Destination model filetype
    file_type = file_utils.get_contextual_file_type(effective_cwd)
    trace.log(f'contextual file type is {file_type}')

    file_ext = FileContentType.to_file_extension(file_type)
    # Destination model filename
    destination_model_path = (
        effective_cwd / f'{classname_to_alias(destination_model_alias, AliasMode.JSON)}{file_ext}'
    )
    trace.log(f'destination model filename is {destination_model_path}')
    destination_model_type, _ = ModelUtils.get_stripped_model_type(destination_model_path, trestle_root)

    destination_model_object: OscalBaseModel = None
    if destination_model_path.exists():
        trace.log('dest filename exists so read it')
        destination_model_object = destination_model_type.oscal_read(destination_model_path)
    # 2. If target is wildcard, load distributed destination model and replace destination model.
    # Handle WILDCARD '*' match. Return plan to load the destination model, with its distributed attributes
    if target_model_alias == '*':
        trace.log('handle target model alias wildcard')
        collection_type = None
        if destination_model_type.is_collection_container():
            collection_type = destination_model_type.get_collection_type()

        merged_model_type, _, merged_model_instance = ModelUtils.load_distributed(
            destination_model_path, trestle_root, collection_type)
        plan = Plan()
        reset_destination_action = CreatePathAction(destination_model_path, clear_content=True)
        wrapper_alias = destination_model_alias
        write_destination_action = WriteFileAction(
            destination_model_path, Element(merged_model_instance, wrapper_alias), content_type=file_type
        )
        remove_path_folder = effective_cwd / destination_model_alias
        delete_target_action = RemovePathAction(remove_path_folder)
        plan: Plan = Plan()
        plan.add_action(reset_destination_action)
        plan.add_action(write_destination_action)
        plan.add_action(delete_target_action)
        return plan

    trace.log(f'get dest model with fields stripped: {target_model_alias}')
    # Get destination model without the target field stripped
    merged_model_type, _ = ModelUtils.get_stripped_model_type(destination_model_path, trestle_root,
                                                              aliases_not_to_be_stripped=[target_model_alias])
    # 3. Load Target model. Target model could be stripped
    try:
        target_model_type = element_path.get_type(merged_model_type)
    except Exception as e:
        logger.debug(f'target model not found, element path list {element_path} type {merged_model_type}')
        raise TrestleError(
            f'Target model not found. Possibly merge of the elements not allowed at this point. {str(e)}'
        )
    target_model_path = effective_cwd / destination_model_alias
    trace.log(f'look for target model path {target_model_path} at dest alias {destination_model_alias} rel to cwd')

    # target_model filename - depends whether destination model is decomposed or not
    if target_model_path.exists():
        trace.log(f'target model path does exist so target path is subdir with target alias {target_model_alias}')
        target_model_path = target_model_path / target_model_alias
    else:
        trace.log(f'target model filename does not exist so target path is target alias {target_model_alias}')
        target_model_path = target_model_path / target_model_alias  # FIXME this is same as above
    trace.log(f'final target model path is {target_model_path}')

    # if target model is a file then handle file. If file doesn't exist, handle the directory,
    # but in this case it's a list or a dict collection type
    target_model_filename = target_model_path.with_suffix(file_ext)
    if target_model_filename.exists():
        trace.log(f'target model path with extension does exist so load distrib {target_model_filename}')
        _, _, target_model_object = ModelUtils.load_distributed(target_model_filename, trestle_root)
    else:
        target_model_filename = Path(target_model_path)
        trace.log(f'target model path plus extension does not exist so load distrib {target_model_filename}')
        trace.log(f'get collection type for model type {target_model_type}')
        collection_type = type_utils.get_origin(target_model_type)
        trace.log(f'load {target_model_filename} as collection type {collection_type}')
        _, _, target_model_object = ModelUtils.load_distributed(target_model_filename,
                                                                trestle_root, collection_type)

    if hasattr(target_model_object, '__dict__') and '__root__' in target_model_object.__dict__:
        trace.log('loaded object has dict and root so set target model object to root contents')
        target_model_object = target_model_object.__dict__['__root__']
    # 4. Insert target model into destination model.
    merged_dict = {}
    if destination_model_object is not None:
        merged_dict = destination_model_object.__dict__
    merged_dict[target_model_alias] = target_model_object
    merged_model_object = merged_model_type(**merged_dict)
    merged_destination_element = Element(merged_model_object)
    # 5. Create action  plan
    trace.log(f'create path action clear content: {destination_model_path}')
    reset_destination_action = CreatePathAction(destination_model_path, clear_content=True)
    trace.log(f'write file action {destination_model_path}')
    write_destination_action = WriteFileAction(
        destination_model_path, merged_destination_element, content_type=file_type
    )
    # FIXME this will delete metadata.json but it will leave metadata/roles/roles.*
    # need to clean up all lower dirs
    trace.log(f'remove path action {target_model_filename}')
    delete_target_action = RemovePathAction(target_model_filename)

    plan: Plan = Plan()
    plan.add_action(reset_destination_action)
    plan.add_action(write_destination_action)
    plan.add_action(delete_target_action)

    # TODO: Destination model directory is empty or already merged? Then clean up.

    return plan
perform_all_merges(element_paths, effective_cwd, trestle_root) classmethod ¤

Run all merges over a list of element paths.

Source code in trestle/core/commands/merge.py
66
67
68
69
70
71
72
73
@classmethod
def perform_all_merges(cls, element_paths: List[str], effective_cwd: Path, trestle_root: Path) -> int:
    """Run all merges over a list of element paths."""
    for element_path in element_paths:
        logger.debug(f'merge {element_path}')
        plan = cls.merge(effective_cwd, ElementPath(element_path), trestle_root)
        plan.execute()
    return CmdReturnCodes.SUCCESS.value

Functions¤

handler: python