Skip to content

base_model

trestle.core.base_model ¤

Pydantic base model for use within a trestle workspace and associated configuration.

The heart of the current OSCAL model within trestle is based on pydantic (https://pydantic-docs.helpmanual.io/) which itself is a veneer on-top of python data classes.

Functionality here defines a base-model which all trestle oscal data models inherit from. This allows additional functionality to be easily inserted.

I can write a comment in here and you can even edit on the same line.

logger ¤

Classes¤

OscalBaseModel (TrestleBaseModel) pydantic-model ¤

Trestle defined pydantic base model for use with OSCAL pydantic dataclasses.

This BaseModel provides two types of functionality: 1. Overrides default configuation of the pydantic library with behaviours required for trestle 2. Provides utility functions for trestle which are specific to OSCAL and the naming schema associated with it.

Source code in trestle/core/base_model.py
class OscalBaseModel(TrestleBaseModel):
    """
    Trestle defined pydantic base model for use with OSCAL pydantic dataclasses.

    This BaseModel provides two types of functionality:
    1. Overrides default configuation of the pydantic library with behaviours required for trestle
    2. Provides utility functions for trestle which are specific to OSCAL and the naming schema associated with it.
    """

    class Config:
        """Overriding configuration class for pydantic base model, for use with OSCAL data classes."""

        json_loads = orjson.loads
        # TODO: json_dumps with orjson.dumps see #840

        json_encoders = {datetime.datetime: lambda x: robust_datetime_serialization(x)}
        allow_population_by_field_name = True

        # Enforce strict schema
        extra = Extra.forbid

        # Validate on assignment of variables to ensure no escapes
        validate_assignment = True

    @classmethod
    def create_stripped_model_type(
        cls,
        stripped_fields: Optional[List[str]] = None,
        stripped_fields_aliases: Optional[List[str]] = None
    ) -> Type['OscalBaseModel']:
        """Create a pydantic model, which is derived from the current model, but missing certain fields.

        OSCAL mandates a 'strict' schema (e.g. unless otherwise stated no additional fields), and certain fields
        are mandatory. Given this the corresponding dataclasses are also strict. Workflows with trestle require missing
        mandatory fields. This allows creation of derivative models missing certain fields.

        Args:
            stripped_fields: The fields to be removed from the current data class.
            stripped_fields_aliases: The fields to be removed from the current data class provided by alias.

        Returns:
            Pydantic data class thta can be used to instanciate a model.

        Raises:
            TrestleError: If user provided both stripped_fields and stripped_field_aliases or neither.
            TrestleError: If incorrect aliases or field names are provided.
        """
        if stripped_fields is not None and stripped_fields_aliases is not None:
            raise err.TrestleError('Either "stripped_fields" or "stripped_fields_aliases" need to be passed, not both.')
        if stripped_fields is None and stripped_fields_aliases is None:
            raise err.TrestleError('Exactly one of "stripped_fields" or "stripped_fields_aliases" must be provided')

        # create alias to field_name mapping
        excluded_fields = []
        if stripped_fields is not None:
            excluded_fields = stripped_fields
        elif stripped_fields_aliases is not None:
            alias_to_field = cls.alias_to_field_map()
            try:
                excluded_fields = [alias_to_field[key].name for key in stripped_fields_aliases]
            except KeyError as e:
                raise err.TrestleError(f'Field {str(e)} does not exist in the model')

        current_fields = cls.__fields__
        new_fields_for_model = {}
        # Build field list
        for current_mfield in current_fields.values():
            if current_mfield.name in excluded_fields:
                continue
            # Validate name in the field
            # Cehcke behaviour with an alias
            if current_mfield.required:
                new_fields_for_model[
                    current_mfield.name
                ] = (current_mfield.outer_type_, Field(..., title=current_mfield.name, alias=current_mfield.alias))
            else:
                new_fields_for_model[current_mfield.name] = (
                    Optional[current_mfield.outer_type_],
                    Field(None, title=current_mfield.name, alias=current_mfield.alias)
                )
        new_model = create_model(cls.__name__, __base__=OscalBaseModel, **new_fields_for_model)  # type: ignore
        # TODO: This typing cast should NOT be necessary. Potentially fixable with a fix to pydantic. Issue #175
        new_model = cast(Type[OscalBaseModel], new_model)

        return new_model

    def get_field_by_alias(self, field_alias: str) -> Any:
        """Convert field alias to a field."""
        attr_field = self.alias_to_field_map().get(field_alias, None)
        return attr_field

    def get_field_value_by_alias(self, attr_alias: str) -> Optional[Any]:
        """Get attribute value by field alias."""
        # TODO: can this be restricted beyond Any easily.
        attr_field = self.get_field_by_alias(attr_alias)
        if isinstance(attr_field, ModelField):
            return getattr(self, attr_field.name, None)

        return None

    def stripped_instance(
        self,
        stripped_fields: Optional[List[str]] = None,
        stripped_fields_aliases: Optional[List[str]] = None
    ) -> 'OscalBaseModel':
        """Return a new model instance with the specified fields being stripped.

        Args:
            stripped_fields: The fields to be removed from the current data class.
            stripped_fields_aliases: The fields to be removed from the current data class provided by alias.

        Returns:
            The current datamodel with the fields provided removed in a derivate (run time created) data model.

        Raises:
            err.TrestleError: If user provided both stripped_fields and stripped_field_aliases or neither.
            err.TrestleError: If incorrect aliases or field names are provided.
        """
        # stripped class type
        stripped_class: Type[OscalBaseModel] = self.create_stripped_model_type(
            stripped_fields=stripped_fields, stripped_fields_aliases=stripped_fields_aliases
        )

        # remaining values
        remaining_values = {}
        for field in self.__fields__.values():
            if field.name in stripped_class.__fields__:
                remaining_values[field.name] = self.__dict__[field.name]

        # create stripped model instance
        # TODO: Not sure if we can avoid type escapes here
        stripped_instance = stripped_class(**remaining_values)

        return stripped_instance

    def oscal_dict(self) -> Dict[str, Any]:
        """Return a dictionary including the root wrapping object key."""
        class_name = self.__class__.__name__
        result = {}
        raw_dict = self.dict(by_alias=True, exclude_none=True)
        # Additional check to avoid root serialization
        if '__root__' in raw_dict.keys():
            result[classname_to_alias(class_name, AliasMode.JSON)] = raw_dict['__root__']
        else:
            result[classname_to_alias(class_name, AliasMode.JSON)] = raw_dict
        return result

    def oscal_serialize_json_bytes(self, pretty: bool = False, wrapped: bool = True) -> bytes:
        """
        Return an 'oscal wrapped' json object serialized in a compressed form as bytes.

        Args:
            pretty: Whether or not to pretty-print json output or have in compressed form.
        Returns:
            Oscal model serialized to a json object including packaging inside of a single top level key.
        """
        if wrapped:
            odict = self.oscal_dict()
        else:
            odict = self.dict(by_alias=True, exclude_none=True)
        if pretty:
            return orjson.dumps(odict, default=self.__json_encoder__, option=orjson.OPT_INDENT_2)  # type: ignore
        return orjson.dumps(odict, default=self.__json_encoder__)  # type: ignore

    def oscal_serialize_json(self, pretty: bool = False, wrapped: bool = True) -> str:
        """
        Return an 'oscal wrapped' json object serialized in a compressed form as bytes.

        Args:
            pretty: Whether or not to pretty-print json output or have in compressed form.
        Returns:
            Oscal model serialized to a json object including packaging inside of a single top level key.
        """
        # This function is provided for backwards compatibility
        return self.oscal_serialize_json_bytes(pretty, wrapped).decode(const.FILE_ENCODING)

    def oscal_write(self, path: pathlib.Path) -> None:
        """
        Write out a pydantic data model in an oscal friendly way.

        OSCAL schema mandates that top level elements are wrapped in a singular
        json/yaml field. This function handles both json and yaml output as well
        as formatting of the json.

        Args:
            path: The output file location for the oscal object.

        Raises:
            err.TrestleError: If a unknown file extension is provided.
        """
        content_type = FileContentType.to_content_type(path.suffix)
        # The output will have \r\n newlines on windows and \n newlines elsewhere

        if content_type == FileContentType.YAML:
            write_file = pathlib.Path(path).open('w', encoding=const.FILE_ENCODING)
            yaml = YAML(typ='safe')
            yaml.dump(yaml.load(self.oscal_serialize_json()), write_file)
            write_file.flush()
            write_file.close()
        elif content_type == FileContentType.JSON:
            write_file = pathlib.Path(path).open('wb')  # type: ignore
            write_file.write(self.oscal_serialize_json_bytes(pretty=True))  # type: ignore
            # Flush / close required (by experience) due to flushing issues in tests.
            write_file.flush()
            write_file.close()

    @classmethod
    def oscal_read(cls, path: pathlib.Path) -> Optional['OscalBaseModel']:
        """
        Read OSCAL objects.

        Handles the fact OSCAL wraps top level elements and also deals with both yaml and json.

        Args:
            path: The path of the oscal object to read.
        Returns:
            The oscal object read into trestle oscal models.
        """
        # Create the wrapper model.
        alias = classname_to_alias(cls.__name__, AliasMode.JSON)

        content_type = FileContentType.to_content_type(path.suffix)
        logger.debug(f'oscal_read content type {content_type} and alias {alias} from {path}')

        if not path.exists():
            logger.warning(f'path does not exist in oscal_read: {path}')
            return None

        obj: Dict[str, Any] = {}
        try:
            if content_type == FileContentType.YAML:
                yaml = YAML(typ='safe')
                fh = path.open('r', encoding=const.FILE_ENCODING)
                obj = yaml.load(fh)
                fh.close()
            elif content_type == FileContentType.JSON:
                obj = load_file(
                    path,
                    json_loads=cls.__config__.json_loads,
                )
        except Exception as e:
            raise err.TrestleError(f'Error loading file {path} {str(e)}')
        try:
            if not len(obj) == 1:
                raise err.TrestleError(
                    f'Invalid OSCAL file structure, oscal file '
                    f'does not have a single top level key wrapping it. It has {len(obj)} keys.'
                )
            parsed = cls.parse_obj(obj[alias])
        except KeyError:
            raise err.TrestleError(f'Provided oscal file does not have top level key key: {alias}')
        except Exception as e:
            raise err.TrestleError(f'Error parsing file {path} {str(e)}')

        return parsed

    def copy_to(self, new_oscal_type: Type['OscalBaseModel']) -> 'OscalBaseModel':
        """
        Opportunistic copy operation between similar types of data classes.

        Due to the way in which oscal is constructed we get a set of similar / the same definition across various
        oscal models. Due to the lack of guarantees that they are the same we cannot easily 'collapse' the mode.

        Args:
            new_oscal_type: The desired type of oscal model

        Returns:
            Opportunistic copy of the data into the new model type.
        """
        logger.debug('Copy to started')
        if self.__class__.__name__ == new_oscal_type.__name__:
            logger.debug('Json based copy')
            # Note: Json based oppportunistic copy
            # Dev notes: Do not change this from json. Due to enums (in particular) json is the closest we can get.
            return new_oscal_type.parse_raw(self.oscal_serialize_json(pretty=False, wrapped=False))

        if ('__root__' in self.__fields__ and len(self.__fields__) == 1 and '__root__' in new_oscal_type.__fields__
                and len(new_oscal_type.__fields__) == 1):
            logger.debug('Root element based copy too')
            return new_oscal_type.parse_obj(self.__root__)  # type: ignore

        # bad place here.
        raise err.TrestleError('Provided inconsistent classes to copy to methodology.')

    def copy_from(self, existing_oscal_object: 'OscalBaseModel') -> None:
        """
        Copy operation that implicitly does type conversion.

        Typically would
        be used to set an attribute, however, does not need to be.

        Deals with two scenarios:
        1) Casting across oscal models of equivalent type. The purpose if this
        is to cross class spaces.

        2) The same as above where the item is an array style object which does
        not correctly serialize to a dict.

        3) if the from and 'to' objects are root schema elements the copy operation
        will copy the root element to the value.

        Args:
            existing_oscal_object: The oscal object where fields are copied from.

        """
        recast_object = existing_oscal_object.copy_to(self.__class__)
        for raw_field in self.__dict__:
            self.__dict__[raw_field] = recast_object.__dict__[raw_field]

    @classmethod
    def alias_to_field_map(cls) -> Dict[str, ModelField]:
        """Create a map from field alias to field.

        Returns:
            A dict which has key's of aliases and Fields as values.
        """
        alias_to_field: Dict[str, ModelField] = {}
        for field in cls.__fields__.values():
            alias_to_field[field.alias] = field

        return alias_to_field

    @classmethod
    def is_collection_container(cls) -> bool:
        """
        Determine whether a pydantic model has being created to wrap a collection primitive (e.g a list or dict).

        In performing model decomposition it is possible using trestle framework to automatically generate a model
        which looks like

        class Foo(OscalBaseModel):
            __root__: List[Bar]

        Returns:
            Boolean on if it meets the above criteria

        When these cases exist we need special handling of the type information.
        """
        # Additional sanity check on field length
        if len(cls.__fields__) == 1 and '__root__' in cls.__fields__:
            # This is now a __root__ key only model
            if is_collection_field_type(cls.__fields__['__root__'].outer_type_):
                return True
        return False

    @classmethod
    def get_collection_type(cls) -> Optional[type]:
        """
        If the type wraps an collection, return the collection type.

        Returns:
            The collection type.

        Raises:
            err.TrestleError: if not a wrapper of the collection type.
        """
        if not cls.is_collection_container():
            raise err.TrestleError('OscalBaseModel is not wrapping a collection type')
        return get_origin(cls.__fields__['__root__'].outer_type_)
__class_vars__ special ¤
__custom_root_type__ special ¤
__doc__ special ¤
__exclude_fields__ special ¤
__fields__ special ¤
__include_fields__ special ¤
__post_root_validators__ special ¤
__pre_root_validators__ special ¤
__private_attributes__ special ¤
__schema_cache__ special ¤
__signature__ special ¤
__slots__: Tuple[str, ...] special ¤
__validators__ special ¤
Classes¤
Config ¤

Overriding configuration class for pydantic base model, for use with OSCAL data classes.

Source code in trestle/core/base_model.py
class Config:
    """Overriding configuration class for pydantic base model, for use with OSCAL data classes."""

    json_loads = orjson.loads
    # TODO: json_dumps with orjson.dumps see #840

    json_encoders = {datetime.datetime: lambda x: robust_datetime_serialization(x)}
    allow_population_by_field_name = True

    # Enforce strict schema
    extra = Extra.forbid

    # Validate on assignment of variables to ensure no escapes
    validate_assignment = True
allow_population_by_field_name ¤
extra ¤
json_encoders ¤
json_loads ¤
validate_assignment ¤
Methods¤
alias_to_field_map() classmethod ¤

Create a map from field alias to field.

Returns:

Type Description
Dict[str, pydantic.v1.fields.ModelField]

A dict which has key's of aliases and Fields as values.

Source code in trestle/core/base_model.py
@classmethod
def alias_to_field_map(cls) -> Dict[str, ModelField]:
    """Create a map from field alias to field.

    Returns:
        A dict which has key's of aliases and Fields as values.
    """
    alias_to_field: Dict[str, ModelField] = {}
    for field in cls.__fields__.values():
        alias_to_field[field.alias] = field

    return alias_to_field
copy_from(self, existing_oscal_object) ¤

Copy operation that implicitly does type conversion.

Typically would be used to set an attribute, however, does not need to be.

Deals with two scenarios: 1) Casting across oscal models of equivalent type. The purpose if this is to cross class spaces.

2) The same as above where the item is an array style object which does not correctly serialize to a dict.

3) if the from and 'to' objects are root schema elements the copy operation will copy the root element to the value.

Parameters:

Name Type Description Default
existing_oscal_object OscalBaseModel

The oscal object where fields are copied from.

required
Source code in trestle/core/base_model.py
def copy_from(self, existing_oscal_object: 'OscalBaseModel') -> None:
    """
    Copy operation that implicitly does type conversion.

    Typically would
    be used to set an attribute, however, does not need to be.

    Deals with two scenarios:
    1) Casting across oscal models of equivalent type. The purpose if this
    is to cross class spaces.

    2) The same as above where the item is an array style object which does
    not correctly serialize to a dict.

    3) if the from and 'to' objects are root schema elements the copy operation
    will copy the root element to the value.

    Args:
        existing_oscal_object: The oscal object where fields are copied from.

    """
    recast_object = existing_oscal_object.copy_to(self.__class__)
    for raw_field in self.__dict__:
        self.__dict__[raw_field] = recast_object.__dict__[raw_field]
copy_to(self, new_oscal_type) ¤

Opportunistic copy operation between similar types of data classes.

Due to the way in which oscal is constructed we get a set of similar / the same definition across various oscal models. Due to the lack of guarantees that they are the same we cannot easily 'collapse' the mode.

Parameters:

Name Type Description Default
new_oscal_type Type[OscalBaseModel]

The desired type of oscal model

required

Returns:

Type Description
OscalBaseModel

Opportunistic copy of the data into the new model type.

Source code in trestle/core/base_model.py
def copy_to(self, new_oscal_type: Type['OscalBaseModel']) -> 'OscalBaseModel':
    """
    Opportunistic copy operation between similar types of data classes.

    Due to the way in which oscal is constructed we get a set of similar / the same definition across various
    oscal models. Due to the lack of guarantees that they are the same we cannot easily 'collapse' the mode.

    Args:
        new_oscal_type: The desired type of oscal model

    Returns:
        Opportunistic copy of the data into the new model type.
    """
    logger.debug('Copy to started')
    if self.__class__.__name__ == new_oscal_type.__name__:
        logger.debug('Json based copy')
        # Note: Json based oppportunistic copy
        # Dev notes: Do not change this from json. Due to enums (in particular) json is the closest we can get.
        return new_oscal_type.parse_raw(self.oscal_serialize_json(pretty=False, wrapped=False))

    if ('__root__' in self.__fields__ and len(self.__fields__) == 1 and '__root__' in new_oscal_type.__fields__
            and len(new_oscal_type.__fields__) == 1):
        logger.debug('Root element based copy too')
        return new_oscal_type.parse_obj(self.__root__)  # type: ignore

    # bad place here.
    raise err.TrestleError('Provided inconsistent classes to copy to methodology.')
create_stripped_model_type(stripped_fields=None, stripped_fields_aliases=None) classmethod ¤

Create a pydantic model, which is derived from the current model, but missing certain fields.

OSCAL mandates a 'strict' schema (e.g. unless otherwise stated no additional fields), and certain fields are mandatory. Given this the corresponding dataclasses are also strict. Workflows with trestle require missing mandatory fields. This allows creation of derivative models missing certain fields.

Parameters:

Name Type Description Default
stripped_fields Optional[List[str]]

The fields to be removed from the current data class.

None
stripped_fields_aliases Optional[List[str]]

The fields to be removed from the current data class provided by alias.

None

Returns:

Type Description
Type[OscalBaseModel]

Pydantic data class thta can be used to instanciate a model.

Exceptions:

Type Description
TrestleError

If user provided both stripped_fields and stripped_field_aliases or neither.

TrestleError

If incorrect aliases or field names are provided.

Source code in trestle/core/base_model.py
@classmethod
def create_stripped_model_type(
    cls,
    stripped_fields: Optional[List[str]] = None,
    stripped_fields_aliases: Optional[List[str]] = None
) -> Type['OscalBaseModel']:
    """Create a pydantic model, which is derived from the current model, but missing certain fields.

    OSCAL mandates a 'strict' schema (e.g. unless otherwise stated no additional fields), and certain fields
    are mandatory. Given this the corresponding dataclasses are also strict. Workflows with trestle require missing
    mandatory fields. This allows creation of derivative models missing certain fields.

    Args:
        stripped_fields: The fields to be removed from the current data class.
        stripped_fields_aliases: The fields to be removed from the current data class provided by alias.

    Returns:
        Pydantic data class thta can be used to instanciate a model.

    Raises:
        TrestleError: If user provided both stripped_fields and stripped_field_aliases or neither.
        TrestleError: If incorrect aliases or field names are provided.
    """
    if stripped_fields is not None and stripped_fields_aliases is not None:
        raise err.TrestleError('Either "stripped_fields" or "stripped_fields_aliases" need to be passed, not both.')
    if stripped_fields is None and stripped_fields_aliases is None:
        raise err.TrestleError('Exactly one of "stripped_fields" or "stripped_fields_aliases" must be provided')

    # create alias to field_name mapping
    excluded_fields = []
    if stripped_fields is not None:
        excluded_fields = stripped_fields
    elif stripped_fields_aliases is not None:
        alias_to_field = cls.alias_to_field_map()
        try:
            excluded_fields = [alias_to_field[key].name for key in stripped_fields_aliases]
        except KeyError as e:
            raise err.TrestleError(f'Field {str(e)} does not exist in the model')

    current_fields = cls.__fields__
    new_fields_for_model = {}
    # Build field list
    for current_mfield in current_fields.values():
        if current_mfield.name in excluded_fields:
            continue
        # Validate name in the field
        # Cehcke behaviour with an alias
        if current_mfield.required:
            new_fields_for_model[
                current_mfield.name
            ] = (current_mfield.outer_type_, Field(..., title=current_mfield.name, alias=current_mfield.alias))
        else:
            new_fields_for_model[current_mfield.name] = (
                Optional[current_mfield.outer_type_],
                Field(None, title=current_mfield.name, alias=current_mfield.alias)
            )
    new_model = create_model(cls.__name__, __base__=OscalBaseModel, **new_fields_for_model)  # type: ignore
    # TODO: This typing cast should NOT be necessary. Potentially fixable with a fix to pydantic. Issue #175
    new_model = cast(Type[OscalBaseModel], new_model)

    return new_model
get_collection_type() classmethod ¤

If the type wraps an collection, return the collection type.

Returns:

Type Description
Optional[type]

The collection type.

Exceptions:

Type Description
err.TrestleError

if not a wrapper of the collection type.

Source code in trestle/core/base_model.py
@classmethod
def get_collection_type(cls) -> Optional[type]:
    """
    If the type wraps an collection, return the collection type.

    Returns:
        The collection type.

    Raises:
        err.TrestleError: if not a wrapper of the collection type.
    """
    if not cls.is_collection_container():
        raise err.TrestleError('OscalBaseModel is not wrapping a collection type')
    return get_origin(cls.__fields__['__root__'].outer_type_)
get_field_by_alias(self, field_alias) ¤

Convert field alias to a field.

Source code in trestle/core/base_model.py
def get_field_by_alias(self, field_alias: str) -> Any:
    """Convert field alias to a field."""
    attr_field = self.alias_to_field_map().get(field_alias, None)
    return attr_field
get_field_value_by_alias(self, attr_alias) ¤

Get attribute value by field alias.

Source code in trestle/core/base_model.py
def get_field_value_by_alias(self, attr_alias: str) -> Optional[Any]:
    """Get attribute value by field alias."""
    # TODO: can this be restricted beyond Any easily.
    attr_field = self.get_field_by_alias(attr_alias)
    if isinstance(attr_field, ModelField):
        return getattr(self, attr_field.name, None)

    return None
is_collection_container() classmethod ¤

Determine whether a pydantic model has being created to wrap a collection primitive (e.g a list or dict).

In performing model decomposition it is possible using trestle framework to automatically generate a model which looks like

class Foo(OscalBaseModel): root: List[Bar]

Returns:

Type Description
bool

Boolean on if it meets the above criteria

When these cases exist we need special handling of the type information.

Source code in trestle/core/base_model.py
@classmethod
def is_collection_container(cls) -> bool:
    """
    Determine whether a pydantic model has being created to wrap a collection primitive (e.g a list or dict).

    In performing model decomposition it is possible using trestle framework to automatically generate a model
    which looks like

    class Foo(OscalBaseModel):
        __root__: List[Bar]

    Returns:
        Boolean on if it meets the above criteria

    When these cases exist we need special handling of the type information.
    """
    # Additional sanity check on field length
    if len(cls.__fields__) == 1 and '__root__' in cls.__fields__:
        # This is now a __root__ key only model
        if is_collection_field_type(cls.__fields__['__root__'].outer_type_):
            return True
    return False
oscal_dict(self) ¤

Return a dictionary including the root wrapping object key.

Source code in trestle/core/base_model.py
def oscal_dict(self) -> Dict[str, Any]:
    """Return a dictionary including the root wrapping object key."""
    class_name = self.__class__.__name__
    result = {}
    raw_dict = self.dict(by_alias=True, exclude_none=True)
    # Additional check to avoid root serialization
    if '__root__' in raw_dict.keys():
        result[classname_to_alias(class_name, AliasMode.JSON)] = raw_dict['__root__']
    else:
        result[classname_to_alias(class_name, AliasMode.JSON)] = raw_dict
    return result
oscal_read(path) classmethod ¤

Read OSCAL objects.

Handles the fact OSCAL wraps top level elements and also deals with both yaml and json.

Parameters:

Name Type Description Default
path Path

The path of the oscal object to read.

required

Returns:

Type Description
Optional[OscalBaseModel]

The oscal object read into trestle oscal models.

Source code in trestle/core/base_model.py
@classmethod
def oscal_read(cls, path: pathlib.Path) -> Optional['OscalBaseModel']:
    """
    Read OSCAL objects.

    Handles the fact OSCAL wraps top level elements and also deals with both yaml and json.

    Args:
        path: The path of the oscal object to read.
    Returns:
        The oscal object read into trestle oscal models.
    """
    # Create the wrapper model.
    alias = classname_to_alias(cls.__name__, AliasMode.JSON)

    content_type = FileContentType.to_content_type(path.suffix)
    logger.debug(f'oscal_read content type {content_type} and alias {alias} from {path}')

    if not path.exists():
        logger.warning(f'path does not exist in oscal_read: {path}')
        return None

    obj: Dict[str, Any] = {}
    try:
        if content_type == FileContentType.YAML:
            yaml = YAML(typ='safe')
            fh = path.open('r', encoding=const.FILE_ENCODING)
            obj = yaml.load(fh)
            fh.close()
        elif content_type == FileContentType.JSON:
            obj = load_file(
                path,
                json_loads=cls.__config__.json_loads,
            )
    except Exception as e:
        raise err.TrestleError(f'Error loading file {path} {str(e)}')
    try:
        if not len(obj) == 1:
            raise err.TrestleError(
                f'Invalid OSCAL file structure, oscal file '
                f'does not have a single top level key wrapping it. It has {len(obj)} keys.'
            )
        parsed = cls.parse_obj(obj[alias])
    except KeyError:
        raise err.TrestleError(f'Provided oscal file does not have top level key key: {alias}')
    except Exception as e:
        raise err.TrestleError(f'Error parsing file {path} {str(e)}')

    return parsed
oscal_serialize_json(self, pretty=False, wrapped=True) ¤

Return an 'oscal wrapped' json object serialized in a compressed form as bytes.

Parameters:

Name Type Description Default
pretty bool

Whether or not to pretty-print json output or have in compressed form.

False

Returns:

Type Description
str

Oscal model serialized to a json object including packaging inside of a single top level key.

Source code in trestle/core/base_model.py
def oscal_serialize_json(self, pretty: bool = False, wrapped: bool = True) -> str:
    """
    Return an 'oscal wrapped' json object serialized in a compressed form as bytes.

    Args:
        pretty: Whether or not to pretty-print json output or have in compressed form.
    Returns:
        Oscal model serialized to a json object including packaging inside of a single top level key.
    """
    # This function is provided for backwards compatibility
    return self.oscal_serialize_json_bytes(pretty, wrapped).decode(const.FILE_ENCODING)
oscal_serialize_json_bytes(self, pretty=False, wrapped=True) ¤

Return an 'oscal wrapped' json object serialized in a compressed form as bytes.

Parameters:

Name Type Description Default
pretty bool

Whether or not to pretty-print json output or have in compressed form.

False

Returns:

Type Description
bytes

Oscal model serialized to a json object including packaging inside of a single top level key.

Source code in trestle/core/base_model.py
def oscal_serialize_json_bytes(self, pretty: bool = False, wrapped: bool = True) -> bytes:
    """
    Return an 'oscal wrapped' json object serialized in a compressed form as bytes.

    Args:
        pretty: Whether or not to pretty-print json output or have in compressed form.
    Returns:
        Oscal model serialized to a json object including packaging inside of a single top level key.
    """
    if wrapped:
        odict = self.oscal_dict()
    else:
        odict = self.dict(by_alias=True, exclude_none=True)
    if pretty:
        return orjson.dumps(odict, default=self.__json_encoder__, option=orjson.OPT_INDENT_2)  # type: ignore
    return orjson.dumps(odict, default=self.__json_encoder__)  # type: ignore
oscal_write(self, path) ¤

Write out a pydantic data model in an oscal friendly way.

OSCAL schema mandates that top level elements are wrapped in a singular json/yaml field. This function handles both json and yaml output as well as formatting of the json.

Parameters:

Name Type Description Default
path Path

The output file location for the oscal object.

required

Exceptions:

Type Description
err.TrestleError

If a unknown file extension is provided.

Source code in trestle/core/base_model.py
def oscal_write(self, path: pathlib.Path) -> None:
    """
    Write out a pydantic data model in an oscal friendly way.

    OSCAL schema mandates that top level elements are wrapped in a singular
    json/yaml field. This function handles both json and yaml output as well
    as formatting of the json.

    Args:
        path: The output file location for the oscal object.

    Raises:
        err.TrestleError: If a unknown file extension is provided.
    """
    content_type = FileContentType.to_content_type(path.suffix)
    # The output will have \r\n newlines on windows and \n newlines elsewhere

    if content_type == FileContentType.YAML:
        write_file = pathlib.Path(path).open('w', encoding=const.FILE_ENCODING)
        yaml = YAML(typ='safe')
        yaml.dump(yaml.load(self.oscal_serialize_json()), write_file)
        write_file.flush()
        write_file.close()
    elif content_type == FileContentType.JSON:
        write_file = pathlib.Path(path).open('wb')  # type: ignore
        write_file.write(self.oscal_serialize_json_bytes(pretty=True))  # type: ignore
        # Flush / close required (by experience) due to flushing issues in tests.
        write_file.flush()
        write_file.close()
stripped_instance(self, stripped_fields=None, stripped_fields_aliases=None) ¤

Return a new model instance with the specified fields being stripped.

Parameters:

Name Type Description Default
stripped_fields Optional[List[str]]

The fields to be removed from the current data class.

None
stripped_fields_aliases Optional[List[str]]

The fields to be removed from the current data class provided by alias.

None

Returns:

Type Description
OscalBaseModel

The current datamodel with the fields provided removed in a derivate (run time created) data model.

Exceptions:

Type Description
err.TrestleError

If user provided both stripped_fields and stripped_field_aliases or neither.

err.TrestleError

If incorrect aliases or field names are provided.

Source code in trestle/core/base_model.py
def stripped_instance(
    self,
    stripped_fields: Optional[List[str]] = None,
    stripped_fields_aliases: Optional[List[str]] = None
) -> 'OscalBaseModel':
    """Return a new model instance with the specified fields being stripped.

    Args:
        stripped_fields: The fields to be removed from the current data class.
        stripped_fields_aliases: The fields to be removed from the current data class provided by alias.

    Returns:
        The current datamodel with the fields provided removed in a derivate (run time created) data model.

    Raises:
        err.TrestleError: If user provided both stripped_fields and stripped_field_aliases or neither.
        err.TrestleError: If incorrect aliases or field names are provided.
    """
    # stripped class type
    stripped_class: Type[OscalBaseModel] = self.create_stripped_model_type(
        stripped_fields=stripped_fields, stripped_fields_aliases=stripped_fields_aliases
    )

    # remaining values
    remaining_values = {}
    for field in self.__fields__.values():
        if field.name in stripped_class.__fields__:
            remaining_values[field.name] = self.__dict__[field.name]

    # create stripped model instance
    # TODO: Not sure if we can avoid type escapes here
    stripped_instance = stripped_class(**remaining_values)

    return stripped_instance

Functions¤

robust_datetime_serialization(input_dt) ¤

Return a nicely formatted string for in a format compatible with OSCAL specifications.

Parameters:

Name Type Description Default
input_dt datetime

Input datetime to convert to a string.

required

Returns:

Type Description
str

String in isoformat to the millisecond enforcing that timezone offset is provided.

Exceptions:

Type Description
TrestleError

Error is raised if datetime object does not contain sufficient timezone information.

Source code in trestle/core/base_model.py
def robust_datetime_serialization(input_dt: datetime.datetime) -> str:
    """Return a nicely formatted string for in a format compatible with OSCAL specifications.

    Args:
        input_dt: Input datetime to convert to a string.

    Returns:
        String in isoformat to the millisecond enforcing that timezone offset is provided.

    Raises:
        TrestleError: Error is raised if datetime object does not contain sufficient timezone information.
    """
    # fail if the input datetime is not aware - ie it has no associated timezone
    if input_dt.tzinfo is None:
        raise err.TrestleError('Missing timezone in datetime')
    if input_dt.tzinfo.utcoffset(input_dt) is None:
        raise err.TrestleError('Missing utcoffset in datetime')

    # use this leave in original timezone rather than utc
    # return input_dt.astimezone().isoformat(timespec='milliseconds')  noqa: E800

    # force it to be utc
    return input_dt.astimezone(datetime.timezone.utc).isoformat(timespec='milliseconds')

handler: python