Skip to content

base_model

trestle.core.base_model ¤

Pydantic base model for use within a trestle workspace and associated configuration.

The heart of the current OSCAL model within trestle is based on pydantic (https://pydantic-docs.helpmanual.io/) which itself is a veneer on-top of python data classes.

Functionality here defines a base-model which all trestle oscal data models inherit from. This allows additional functionality to be easily inserted.

I can write a comment in here and you can even edit on the same line.

Attributes¤

logger = logging.getLogger(__name__) module-attribute ¤

Classes¤

OscalBaseModel ¤

Bases: TrestleBaseModel

Trestle defined pydantic base model for use with OSCAL pydantic dataclasses.

This BaseModel provides two types of functionality: 1. Overrides default configuation of the pydantic library with behaviours required for trestle 2. Provides utility functions for trestle which are specific to OSCAL and the naming schema associated with it.

Source code in trestle/core/base_model.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
class OscalBaseModel(TrestleBaseModel):
    """
    Trestle defined pydantic base model for use with OSCAL pydantic dataclasses.

    This BaseModel provides two types of functionality:
    1. Overrides default configuation of the pydantic library with behaviours required for trestle
    2. Provides utility functions for trestle which are specific to OSCAL and the naming schema associated with it.
    """

    class Config:
        """Overriding configuration class for pydantic base model, for use with OSCAL data classes."""

        json_loads = orjson.loads
        # TODO: json_dumps with orjson.dumps see #840

        json_encoders = {datetime.datetime: lambda x: robust_datetime_serialization(x)}
        allow_population_by_field_name = True

        # Enforce strict schema
        extra = Extra.forbid

        # Validate on assignment of variables to ensure no escapes
        validate_assignment = True

    @classmethod
    def create_stripped_model_type(
        cls,
        stripped_fields: Optional[List[str]] = None,
        stripped_fields_aliases: Optional[List[str]] = None
    ) -> Type['OscalBaseModel']:
        """Create a pydantic model, which is derived from the current model, but missing certain fields.

        OSCAL mandates a 'strict' schema (e.g. unless otherwise stated no additional fields), and certain fields
        are mandatory. Given this the corresponding dataclasses are also strict. Workflows with trestle require missing
        mandatory fields. This allows creation of derivative models missing certain fields.

        Args:
            stripped_fields: The fields to be removed from the current data class.
            stripped_fields_aliases: The fields to be removed from the current data class provided by alias.

        Returns:
            Pydantic data class thta can be used to instanciate a model.

        Raises:
            TrestleError: If user provided both stripped_fields and stripped_field_aliases or neither.
            TrestleError: If incorrect aliases or field names are provided.
        """
        if stripped_fields is not None and stripped_fields_aliases is not None:
            raise err.TrestleError('Either "stripped_fields" or "stripped_fields_aliases" need to be passed, not both.')
        if stripped_fields is None and stripped_fields_aliases is None:
            raise err.TrestleError('Exactly one of "stripped_fields" or "stripped_fields_aliases" must be provided')

        # create alias to field_name mapping
        excluded_fields = []
        if stripped_fields is not None:
            excluded_fields = stripped_fields
        elif stripped_fields_aliases is not None:
            alias_to_field = cls.alias_to_field_map()
            try:
                excluded_fields = [alias_to_field[key].name for key in stripped_fields_aliases]
            except KeyError as e:
                raise err.TrestleError(f'Field {str(e)} does not exist in the model')

        current_fields = cls.__fields__
        new_fields_for_model = {}
        # Build field list
        for current_mfield in current_fields.values():
            if current_mfield.name in excluded_fields:
                continue
            # Validate name in the field
            # Cehcke behaviour with an alias
            if current_mfield.required:
                new_fields_for_model[
                    current_mfield.name
                ] = (current_mfield.outer_type_, Field(..., title=current_mfield.name, alias=current_mfield.alias))
            else:
                new_fields_for_model[current_mfield.name] = (
                    Optional[current_mfield.outer_type_],
                    Field(None, title=current_mfield.name, alias=current_mfield.alias)
                )
        new_model = create_model(cls.__name__, __base__=OscalBaseModel, **new_fields_for_model)  # type: ignore
        # TODO: This typing cast should NOT be necessary. Potentially fixable with a fix to pydantic. Issue #175
        new_model = cast(Type[OscalBaseModel], new_model)

        return new_model

    def get_field_by_alias(self, field_alias: str) -> Any:
        """Convert field alias to a field."""
        attr_field = self.alias_to_field_map().get(field_alias, None)
        return attr_field

    def get_field_value_by_alias(self, attr_alias: str) -> Optional[Any]:
        """Get attribute value by field alias."""
        # TODO: can this be restricted beyond Any easily.
        attr_field = self.get_field_by_alias(attr_alias)
        if isinstance(attr_field, ModelField):
            return getattr(self, attr_field.name, None)

        return None

    def stripped_instance(
        self,
        stripped_fields: Optional[List[str]] = None,
        stripped_fields_aliases: Optional[List[str]] = None
    ) -> 'OscalBaseModel':
        """Return a new model instance with the specified fields being stripped.

        Args:
            stripped_fields: The fields to be removed from the current data class.
            stripped_fields_aliases: The fields to be removed from the current data class provided by alias.

        Returns:
            The current datamodel with the fields provided removed in a derivate (run time created) data model.

        Raises:
            err.TrestleError: If user provided both stripped_fields and stripped_field_aliases or neither.
            err.TrestleError: If incorrect aliases or field names are provided.
        """
        # stripped class type
        stripped_class: Type[OscalBaseModel] = self.create_stripped_model_type(
            stripped_fields=stripped_fields, stripped_fields_aliases=stripped_fields_aliases
        )

        # remaining values
        remaining_values = {}
        for field in self.__fields__.values():
            if field.name in stripped_class.__fields__:
                remaining_values[field.name] = self.__dict__[field.name]

        # create stripped model instance
        # TODO: Not sure if we can avoid type escapes here
        stripped_instance = stripped_class(**remaining_values)

        return stripped_instance

    def oscal_dict(self) -> Dict[str, Any]:
        """Return a dictionary including the root wrapping object key."""
        class_name = self.__class__.__name__
        result = {}
        raw_dict = self.dict(by_alias=True, exclude_none=True)
        # Additional check to avoid root serialization
        if '__root__' in raw_dict.keys():
            result[classname_to_alias(class_name, AliasMode.JSON)] = raw_dict['__root__']
        else:
            result[classname_to_alias(class_name, AliasMode.JSON)] = raw_dict
        return result

    def oscal_serialize_json_bytes(self, pretty: bool = False, wrapped: bool = True) -> bytes:
        """
        Return an 'oscal wrapped' json object serialized in a compressed form as bytes.

        Args:
            pretty: Whether or not to pretty-print json output or have in compressed form.
        Returns:
            Oscal model serialized to a json object including packaging inside of a single top level key.
        """
        if wrapped:
            odict = self.oscal_dict()
        else:
            odict = self.dict(by_alias=True, exclude_none=True)
        if pretty:
            return orjson.dumps(odict, default=self.__json_encoder__, option=orjson.OPT_INDENT_2)  # type: ignore
        return orjson.dumps(odict, default=self.__json_encoder__)  # type: ignore

    def oscal_serialize_json(self, pretty: bool = False, wrapped: bool = True) -> str:
        """
        Return an 'oscal wrapped' json object serialized in a compressed form as bytes.

        Args:
            pretty: Whether or not to pretty-print json output or have in compressed form.
        Returns:
            Oscal model serialized to a json object including packaging inside of a single top level key.
        """
        # This function is provided for backwards compatibility
        return self.oscal_serialize_json_bytes(pretty, wrapped).decode(const.FILE_ENCODING)

    def oscal_write(self, path: pathlib.Path) -> None:
        """
        Write out a pydantic data model in an oscal friendly way.

        OSCAL schema mandates that top level elements are wrapped in a singular
        json/yaml field. This function handles both json and yaml output as well
        as formatting of the json.

        Args:
            path: The output file location for the oscal object.

        Raises:
            err.TrestleError: If a unknown file extension is provided.
        """
        content_type = FileContentType.to_content_type(path.suffix)
        # The output will have \r\n newlines on windows and \n newlines elsewhere

        if content_type == FileContentType.YAML:
            write_file = pathlib.Path(path).open('w', encoding=const.FILE_ENCODING)
            yaml = YAML(typ='safe')
            yaml.dump(yaml.load(self.oscal_serialize_json()), write_file)
            write_file.flush()
            write_file.close()
        elif content_type == FileContentType.JSON:
            write_file = pathlib.Path(path).open('wb')  # type: ignore
            write_file.write(self.oscal_serialize_json_bytes(pretty=True))  # type: ignore
            # Flush / close required (by experience) due to flushing issues in tests.
            write_file.flush()
            write_file.close()

    @classmethod
    def oscal_read(cls, path: pathlib.Path) -> Optional['OscalBaseModel']:
        """
        Read OSCAL objects.

        Handles the fact OSCAL wraps top level elements and also deals with both yaml and json.

        Args:
            path: The path of the oscal object to read.
        Returns:
            The oscal object read into trestle oscal models.
        """
        # Create the wrapper model.
        alias = classname_to_alias(cls.__name__, AliasMode.JSON)

        content_type = FileContentType.to_content_type(path.suffix)
        logger.debug(f'oscal_read content type {content_type} and alias {alias} from {path}')

        if not path.exists():
            logger.warning(f'path does not exist in oscal_read: {path}')
            return None

        obj: Dict[str, Any] = {}
        try:
            if content_type == FileContentType.YAML:
                yaml = YAML(typ='safe')
                fh = path.open('r', encoding=const.FILE_ENCODING)
                obj = yaml.load(fh)
                fh.close()
            elif content_type == FileContentType.JSON:
                obj = load_file(
                    path,
                    json_loads=cls.__config__.json_loads,
                )
        except Exception as e:
            raise err.TrestleError(f'Error loading file {path} {str(e)}')
        try:
            if not len(obj) == 1:
                raise err.TrestleError(
                    f'Invalid OSCAL file structure, oscal file '
                    f'does not have a single top level key wrapping it. It has {len(obj)} keys.'
                )
            parsed = cls.parse_obj(obj[alias])
        except KeyError:
            raise err.TrestleError(f'Provided oscal file does not have top level key key: {alias}')
        except Exception as e:
            raise err.TrestleError(f'Error parsing file {path} {str(e)}')

        return parsed

    def copy_to(self, new_oscal_type: Type['OscalBaseModel']) -> 'OscalBaseModel':
        """
        Opportunistic copy operation between similar types of data classes.

        Due to the way in which oscal is constructed we get a set of similar / the same definition across various
        oscal models. Due to the lack of guarantees that they are the same we cannot easily 'collapse' the mode.

        Args:
            new_oscal_type: The desired type of oscal model

        Returns:
            Opportunistic copy of the data into the new model type.
        """
        logger.debug('Copy to started')
        if self.__class__.__name__ == new_oscal_type.__name__:
            logger.debug('Json based copy')
            # Note: Json based oppportunistic copy
            # Dev notes: Do not change this from json. Due to enums (in particular) json is the closest we can get.
            return new_oscal_type.parse_raw(self.oscal_serialize_json(pretty=False, wrapped=False))

        if ('__root__' in self.__fields__ and len(self.__fields__) == 1 and '__root__' in new_oscal_type.__fields__
                and len(new_oscal_type.__fields__) == 1):
            logger.debug('Root element based copy too')
            return new_oscal_type.parse_obj(self.__root__)  # type: ignore

        # bad place here.
        raise err.TrestleError('Provided inconsistent classes to copy to methodology.')

    def copy_from(self, existing_oscal_object: 'OscalBaseModel') -> None:
        """
        Copy operation that implicitly does type conversion.

        Typically would
        be used to set an attribute, however, does not need to be.

        Deals with two scenarios:
        1) Casting across oscal models of equivalent type. The purpose if this
        is to cross class spaces.

        2) The same as above where the item is an array style object which does
        not correctly serialize to a dict.

        3) if the from and 'to' objects are root schema elements the copy operation
        will copy the root element to the value.

        Args:
            existing_oscal_object: The oscal object where fields are copied from.

        """
        recast_object = existing_oscal_object.copy_to(self.__class__)
        for raw_field in self.__dict__:
            self.__dict__[raw_field] = recast_object.__dict__[raw_field]

    @classmethod
    def alias_to_field_map(cls) -> Dict[str, ModelField]:
        """Create a map from field alias to field.

        Returns:
            A dict which has key's of aliases and Fields as values.
        """
        alias_to_field: Dict[str, ModelField] = {}
        for field in cls.__fields__.values():
            alias_to_field[field.alias] = field

        return alias_to_field

    @classmethod
    def is_collection_container(cls) -> bool:
        """
        Determine whether a pydantic model has being created to wrap a collection primitive (e.g a list or dict).

        In performing model decomposition it is possible using trestle framework to automatically generate a model
        which looks like

        class Foo(OscalBaseModel):
            __root__: List[Bar]

        Returns:
            Boolean on if it meets the above criteria

        When these cases exist we need special handling of the type information.
        """
        # Additional sanity check on field length
        if len(cls.__fields__) == 1 and '__root__' in cls.__fields__:
            # This is now a __root__ key only model
            if is_collection_field_type(cls.__fields__['__root__'].outer_type_):
                return True
        return False

    @classmethod
    def get_collection_type(cls) -> Optional[type]:
        """
        If the type wraps an collection, return the collection type.

        Returns:
            The collection type.

        Raises:
            err.TrestleError: if not a wrapper of the collection type.
        """
        if not cls.is_collection_container():
            raise err.TrestleError('OscalBaseModel is not wrapping a collection type')
        return get_origin(cls.__fields__['__root__'].outer_type_)
Classes¤
Config ¤

Overriding configuration class for pydantic base model, for use with OSCAL data classes.

Source code in trestle/core/base_model.py
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class Config:
    """Overriding configuration class for pydantic base model, for use with OSCAL data classes."""

    json_loads = orjson.loads
    # TODO: json_dumps with orjson.dumps see #840

    json_encoders = {datetime.datetime: lambda x: robust_datetime_serialization(x)}
    allow_population_by_field_name = True

    # Enforce strict schema
    extra = Extra.forbid

    # Validate on assignment of variables to ensure no escapes
    validate_assignment = True
Attributes¤
allow_population_by_field_name = True class-attribute instance-attribute ¤
extra = Extra.forbid class-attribute instance-attribute ¤
json_encoders = {datetime.datetime: lambda x: robust_datetime_serialization(x)} class-attribute instance-attribute ¤
json_loads = orjson.loads class-attribute instance-attribute ¤
validate_assignment = True class-attribute instance-attribute ¤
Functions¤
alias_to_field_map() classmethod ¤

Create a map from field alias to field.

Returns:

Type Description
Dict[str, ModelField]

A dict which has key's of aliases and Fields as values.

Source code in trestle/core/base_model.py
386
387
388
389
390
391
392
393
394
395
396
397
@classmethod
def alias_to_field_map(cls) -> Dict[str, ModelField]:
    """Create a map from field alias to field.

    Returns:
        A dict which has key's of aliases and Fields as values.
    """
    alias_to_field: Dict[str, ModelField] = {}
    for field in cls.__fields__.values():
        alias_to_field[field.alias] = field

    return alias_to_field
copy_from(existing_oscal_object) ¤

Copy operation that implicitly does type conversion.

Typically would be used to set an attribute, however, does not need to be.

Deals with two scenarios: 1) Casting across oscal models of equivalent type. The purpose if this is to cross class spaces.

2) The same as above where the item is an array style object which does not correctly serialize to a dict.

3) if the from and 'to' objects are root schema elements the copy operation will copy the root element to the value.

Parameters:

Name Type Description Default
existing_oscal_object OscalBaseModel

The oscal object where fields are copied from.

required
Source code in trestle/core/base_model.py
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
def copy_from(self, existing_oscal_object: 'OscalBaseModel') -> None:
    """
    Copy operation that implicitly does type conversion.

    Typically would
    be used to set an attribute, however, does not need to be.

    Deals with two scenarios:
    1) Casting across oscal models of equivalent type. The purpose if this
    is to cross class spaces.

    2) The same as above where the item is an array style object which does
    not correctly serialize to a dict.

    3) if the from and 'to' objects are root schema elements the copy operation
    will copy the root element to the value.

    Args:
        existing_oscal_object: The oscal object where fields are copied from.

    """
    recast_object = existing_oscal_object.copy_to(self.__class__)
    for raw_field in self.__dict__:
        self.__dict__[raw_field] = recast_object.__dict__[raw_field]
copy_to(new_oscal_type) ¤

Opportunistic copy operation between similar types of data classes.

Due to the way in which oscal is constructed we get a set of similar / the same definition across various oscal models. Due to the lack of guarantees that they are the same we cannot easily 'collapse' the mode.

Parameters:

Name Type Description Default
new_oscal_type Type[OscalBaseModel]

The desired type of oscal model

required

Returns:

Type Description
OscalBaseModel

Opportunistic copy of the data into the new model type.

Source code in trestle/core/base_model.py
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
def copy_to(self, new_oscal_type: Type['OscalBaseModel']) -> 'OscalBaseModel':
    """
    Opportunistic copy operation between similar types of data classes.

    Due to the way in which oscal is constructed we get a set of similar / the same definition across various
    oscal models. Due to the lack of guarantees that they are the same we cannot easily 'collapse' the mode.

    Args:
        new_oscal_type: The desired type of oscal model

    Returns:
        Opportunistic copy of the data into the new model type.
    """
    logger.debug('Copy to started')
    if self.__class__.__name__ == new_oscal_type.__name__:
        logger.debug('Json based copy')
        # Note: Json based oppportunistic copy
        # Dev notes: Do not change this from json. Due to enums (in particular) json is the closest we can get.
        return new_oscal_type.parse_raw(self.oscal_serialize_json(pretty=False, wrapped=False))

    if ('__root__' in self.__fields__ and len(self.__fields__) == 1 and '__root__' in new_oscal_type.__fields__
            and len(new_oscal_type.__fields__) == 1):
        logger.debug('Root element based copy too')
        return new_oscal_type.parse_obj(self.__root__)  # type: ignore

    # bad place here.
    raise err.TrestleError('Provided inconsistent classes to copy to methodology.')
create_stripped_model_type(stripped_fields=None, stripped_fields_aliases=None) classmethod ¤

Create a pydantic model, which is derived from the current model, but missing certain fields.

OSCAL mandates a 'strict' schema (e.g. unless otherwise stated no additional fields), and certain fields are mandatory. Given this the corresponding dataclasses are also strict. Workflows with trestle require missing mandatory fields. This allows creation of derivative models missing certain fields.

Parameters:

Name Type Description Default
stripped_fields Optional[List[str]]

The fields to be removed from the current data class.

None
stripped_fields_aliases Optional[List[str]]

The fields to be removed from the current data class provided by alias.

None

Returns:

Type Description
Type[OscalBaseModel]

Pydantic data class thta can be used to instanciate a model.

Raises:

Type Description
TrestleError

If user provided both stripped_fields and stripped_field_aliases or neither.

TrestleError

If incorrect aliases or field names are provided.

Source code in trestle/core/base_model.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
@classmethod
def create_stripped_model_type(
    cls,
    stripped_fields: Optional[List[str]] = None,
    stripped_fields_aliases: Optional[List[str]] = None
) -> Type['OscalBaseModel']:
    """Create a pydantic model, which is derived from the current model, but missing certain fields.

    OSCAL mandates a 'strict' schema (e.g. unless otherwise stated no additional fields), and certain fields
    are mandatory. Given this the corresponding dataclasses are also strict. Workflows with trestle require missing
    mandatory fields. This allows creation of derivative models missing certain fields.

    Args:
        stripped_fields: The fields to be removed from the current data class.
        stripped_fields_aliases: The fields to be removed from the current data class provided by alias.

    Returns:
        Pydantic data class thta can be used to instanciate a model.

    Raises:
        TrestleError: If user provided both stripped_fields and stripped_field_aliases or neither.
        TrestleError: If incorrect aliases or field names are provided.
    """
    if stripped_fields is not None and stripped_fields_aliases is not None:
        raise err.TrestleError('Either "stripped_fields" or "stripped_fields_aliases" need to be passed, not both.')
    if stripped_fields is None and stripped_fields_aliases is None:
        raise err.TrestleError('Exactly one of "stripped_fields" or "stripped_fields_aliases" must be provided')

    # create alias to field_name mapping
    excluded_fields = []
    if stripped_fields is not None:
        excluded_fields = stripped_fields
    elif stripped_fields_aliases is not None:
        alias_to_field = cls.alias_to_field_map()
        try:
            excluded_fields = [alias_to_field[key].name for key in stripped_fields_aliases]
        except KeyError as e:
            raise err.TrestleError(f'Field {str(e)} does not exist in the model')

    current_fields = cls.__fields__
    new_fields_for_model = {}
    # Build field list
    for current_mfield in current_fields.values():
        if current_mfield.name in excluded_fields:
            continue
        # Validate name in the field
        # Cehcke behaviour with an alias
        if current_mfield.required:
            new_fields_for_model[
                current_mfield.name
            ] = (current_mfield.outer_type_, Field(..., title=current_mfield.name, alias=current_mfield.alias))
        else:
            new_fields_for_model[current_mfield.name] = (
                Optional[current_mfield.outer_type_],
                Field(None, title=current_mfield.name, alias=current_mfield.alias)
            )
    new_model = create_model(cls.__name__, __base__=OscalBaseModel, **new_fields_for_model)  # type: ignore
    # TODO: This typing cast should NOT be necessary. Potentially fixable with a fix to pydantic. Issue #175
    new_model = cast(Type[OscalBaseModel], new_model)

    return new_model
get_collection_type() classmethod ¤

If the type wraps an collection, return the collection type.

Returns:

Type Description
Optional[type]

The collection type.

Raises:

Type Description
TrestleError

if not a wrapper of the collection type.

Source code in trestle/core/base_model.py
422
423
424
425
426
427
428
429
430
431
432
433
434
435
@classmethod
def get_collection_type(cls) -> Optional[type]:
    """
    If the type wraps an collection, return the collection type.

    Returns:
        The collection type.

    Raises:
        err.TrestleError: if not a wrapper of the collection type.
    """
    if not cls.is_collection_container():
        raise err.TrestleError('OscalBaseModel is not wrapping a collection type')
    return get_origin(cls.__fields__['__root__'].outer_type_)
get_field_by_alias(field_alias) ¤

Convert field alias to a field.

Source code in trestle/core/base_model.py
163
164
165
166
def get_field_by_alias(self, field_alias: str) -> Any:
    """Convert field alias to a field."""
    attr_field = self.alias_to_field_map().get(field_alias, None)
    return attr_field
get_field_value_by_alias(attr_alias) ¤

Get attribute value by field alias.

Source code in trestle/core/base_model.py
168
169
170
171
172
173
174
175
def get_field_value_by_alias(self, attr_alias: str) -> Optional[Any]:
    """Get attribute value by field alias."""
    # TODO: can this be restricted beyond Any easily.
    attr_field = self.get_field_by_alias(attr_alias)
    if isinstance(attr_field, ModelField):
        return getattr(self, attr_field.name, None)

    return None
is_collection_container() classmethod ¤

Determine whether a pydantic model has being created to wrap a collection primitive (e.g a list or dict).

In performing model decomposition it is possible using trestle framework to automatically generate a model which looks like

class Foo(OscalBaseModel): root: List[Bar]

Returns:

Type Description
bool

Boolean on if it meets the above criteria

When these cases exist we need special handling of the type information.

Source code in trestle/core/base_model.py
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
@classmethod
def is_collection_container(cls) -> bool:
    """
    Determine whether a pydantic model has being created to wrap a collection primitive (e.g a list or dict).

    In performing model decomposition it is possible using trestle framework to automatically generate a model
    which looks like

    class Foo(OscalBaseModel):
        __root__: List[Bar]

    Returns:
        Boolean on if it meets the above criteria

    When these cases exist we need special handling of the type information.
    """
    # Additional sanity check on field length
    if len(cls.__fields__) == 1 and '__root__' in cls.__fields__:
        # This is now a __root__ key only model
        if is_collection_field_type(cls.__fields__['__root__'].outer_type_):
            return True
    return False
oscal_dict() ¤

Return a dictionary including the root wrapping object key.

Source code in trestle/core/base_model.py
212
213
214
215
216
217
218
219
220
221
222
def oscal_dict(self) -> Dict[str, Any]:
    """Return a dictionary including the root wrapping object key."""
    class_name = self.__class__.__name__
    result = {}
    raw_dict = self.dict(by_alias=True, exclude_none=True)
    # Additional check to avoid root serialization
    if '__root__' in raw_dict.keys():
        result[classname_to_alias(class_name, AliasMode.JSON)] = raw_dict['__root__']
    else:
        result[classname_to_alias(class_name, AliasMode.JSON)] = raw_dict
    return result
oscal_read(path) classmethod ¤

Read OSCAL objects.

Handles the fact OSCAL wraps top level elements and also deals with both yaml and json.

Parameters:

Name Type Description Default
path Path

The path of the oscal object to read.

required

Returns: The oscal object read into trestle oscal models.

Source code in trestle/core/base_model.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
@classmethod
def oscal_read(cls, path: pathlib.Path) -> Optional['OscalBaseModel']:
    """
    Read OSCAL objects.

    Handles the fact OSCAL wraps top level elements and also deals with both yaml and json.

    Args:
        path: The path of the oscal object to read.
    Returns:
        The oscal object read into trestle oscal models.
    """
    # Create the wrapper model.
    alias = classname_to_alias(cls.__name__, AliasMode.JSON)

    content_type = FileContentType.to_content_type(path.suffix)
    logger.debug(f'oscal_read content type {content_type} and alias {alias} from {path}')

    if not path.exists():
        logger.warning(f'path does not exist in oscal_read: {path}')
        return None

    obj: Dict[str, Any] = {}
    try:
        if content_type == FileContentType.YAML:
            yaml = YAML(typ='safe')
            fh = path.open('r', encoding=const.FILE_ENCODING)
            obj = yaml.load(fh)
            fh.close()
        elif content_type == FileContentType.JSON:
            obj = load_file(
                path,
                json_loads=cls.__config__.json_loads,
            )
    except Exception as e:
        raise err.TrestleError(f'Error loading file {path} {str(e)}')
    try:
        if not len(obj) == 1:
            raise err.TrestleError(
                f'Invalid OSCAL file structure, oscal file '
                f'does not have a single top level key wrapping it. It has {len(obj)} keys.'
            )
        parsed = cls.parse_obj(obj[alias])
    except KeyError:
        raise err.TrestleError(f'Provided oscal file does not have top level key key: {alias}')
    except Exception as e:
        raise err.TrestleError(f'Error parsing file {path} {str(e)}')

    return parsed
oscal_serialize_json(pretty=False, wrapped=True) ¤

Return an 'oscal wrapped' json object serialized in a compressed form as bytes.

Parameters:

Name Type Description Default
pretty bool

Whether or not to pretty-print json output or have in compressed form.

False

Returns: Oscal model serialized to a json object including packaging inside of a single top level key.

Source code in trestle/core/base_model.py
241
242
243
244
245
246
247
248
249
250
251
def oscal_serialize_json(self, pretty: bool = False, wrapped: bool = True) -> str:
    """
    Return an 'oscal wrapped' json object serialized in a compressed form as bytes.

    Args:
        pretty: Whether or not to pretty-print json output or have in compressed form.
    Returns:
        Oscal model serialized to a json object including packaging inside of a single top level key.
    """
    # This function is provided for backwards compatibility
    return self.oscal_serialize_json_bytes(pretty, wrapped).decode(const.FILE_ENCODING)
oscal_serialize_json_bytes(pretty=False, wrapped=True) ¤

Return an 'oscal wrapped' json object serialized in a compressed form as bytes.

Parameters:

Name Type Description Default
pretty bool

Whether or not to pretty-print json output or have in compressed form.

False

Returns: Oscal model serialized to a json object including packaging inside of a single top level key.

Source code in trestle/core/base_model.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
def oscal_serialize_json_bytes(self, pretty: bool = False, wrapped: bool = True) -> bytes:
    """
    Return an 'oscal wrapped' json object serialized in a compressed form as bytes.

    Args:
        pretty: Whether or not to pretty-print json output or have in compressed form.
    Returns:
        Oscal model serialized to a json object including packaging inside of a single top level key.
    """
    if wrapped:
        odict = self.oscal_dict()
    else:
        odict = self.dict(by_alias=True, exclude_none=True)
    if pretty:
        return orjson.dumps(odict, default=self.__json_encoder__, option=orjson.OPT_INDENT_2)  # type: ignore
    return orjson.dumps(odict, default=self.__json_encoder__)  # type: ignore
oscal_write(path) ¤

Write out a pydantic data model in an oscal friendly way.

OSCAL schema mandates that top level elements are wrapped in a singular json/yaml field. This function handles both json and yaml output as well as formatting of the json.

Parameters:

Name Type Description Default
path Path

The output file location for the oscal object.

required

Raises:

Type Description
TrestleError

If a unknown file extension is provided.

Source code in trestle/core/base_model.py
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
def oscal_write(self, path: pathlib.Path) -> None:
    """
    Write out a pydantic data model in an oscal friendly way.

    OSCAL schema mandates that top level elements are wrapped in a singular
    json/yaml field. This function handles both json and yaml output as well
    as formatting of the json.

    Args:
        path: The output file location for the oscal object.

    Raises:
        err.TrestleError: If a unknown file extension is provided.
    """
    content_type = FileContentType.to_content_type(path.suffix)
    # The output will have \r\n newlines on windows and \n newlines elsewhere

    if content_type == FileContentType.YAML:
        write_file = pathlib.Path(path).open('w', encoding=const.FILE_ENCODING)
        yaml = YAML(typ='safe')
        yaml.dump(yaml.load(self.oscal_serialize_json()), write_file)
        write_file.flush()
        write_file.close()
    elif content_type == FileContentType.JSON:
        write_file = pathlib.Path(path).open('wb')  # type: ignore
        write_file.write(self.oscal_serialize_json_bytes(pretty=True))  # type: ignore
        # Flush / close required (by experience) due to flushing issues in tests.
        write_file.flush()
        write_file.close()
stripped_instance(stripped_fields=None, stripped_fields_aliases=None) ¤

Return a new model instance with the specified fields being stripped.

Parameters:

Name Type Description Default
stripped_fields Optional[List[str]]

The fields to be removed from the current data class.

None
stripped_fields_aliases Optional[List[str]]

The fields to be removed from the current data class provided by alias.

None

Returns:

Type Description
OscalBaseModel

The current datamodel with the fields provided removed in a derivate (run time created) data model.

Raises:

Type Description
TrestleError

If user provided both stripped_fields and stripped_field_aliases or neither.

TrestleError

If incorrect aliases or field names are provided.

Source code in trestle/core/base_model.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def stripped_instance(
    self,
    stripped_fields: Optional[List[str]] = None,
    stripped_fields_aliases: Optional[List[str]] = None
) -> 'OscalBaseModel':
    """Return a new model instance with the specified fields being stripped.

    Args:
        stripped_fields: The fields to be removed from the current data class.
        stripped_fields_aliases: The fields to be removed from the current data class provided by alias.

    Returns:
        The current datamodel with the fields provided removed in a derivate (run time created) data model.

    Raises:
        err.TrestleError: If user provided both stripped_fields and stripped_field_aliases or neither.
        err.TrestleError: If incorrect aliases or field names are provided.
    """
    # stripped class type
    stripped_class: Type[OscalBaseModel] = self.create_stripped_model_type(
        stripped_fields=stripped_fields, stripped_fields_aliases=stripped_fields_aliases
    )

    # remaining values
    remaining_values = {}
    for field in self.__fields__.values():
        if field.name in stripped_class.__fields__:
            remaining_values[field.name] = self.__dict__[field.name]

    # create stripped model instance
    # TODO: Not sure if we can avoid type escapes here
    stripped_instance = stripped_class(**remaining_values)

    return stripped_instance

Functions¤

robust_datetime_serialization(input_dt) ¤

Return a nicely formatted string for in a format compatible with OSCAL specifications.

Parameters:

Name Type Description Default
input_dt datetime

Input datetime to convert to a string.

required

Returns:

Type Description
str

String in isoformat to the millisecond enforcing that timezone offset is provided.

Raises:

Type Description
TrestleError

Error is raised if datetime object does not contain sufficient timezone information.

Source code in trestle/core/base_model.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def robust_datetime_serialization(input_dt: datetime.datetime) -> str:
    """Return a nicely formatted string for in a format compatible with OSCAL specifications.

    Args:
        input_dt: Input datetime to convert to a string.

    Returns:
        String in isoformat to the millisecond enforcing that timezone offset is provided.

    Raises:
        TrestleError: Error is raised if datetime object does not contain sufficient timezone information.
    """
    # fail if the input datetime is not aware - ie it has no associated timezone
    if input_dt.tzinfo is None:
        raise err.TrestleError('Missing timezone in datetime')
    if input_dt.tzinfo.utcoffset(input_dt) is None:
        raise err.TrestleError('Missing utcoffset in datetime')

    # use this leave in original timezone rather than utc
    # return input_dt.astimezone().isoformat(timespec='milliseconds')  noqa: E800

    # force it to be utc
    return input_dt.astimezone(datetime.timezone.utc).isoformat(timespec='milliseconds')

handler: python