Skip to content

tanium_result_to_oscal_ar

trestle.tasks.tanium_result_to_oscal_ar ¤

OSCAL transformation tasks.

logger ¤

Classes¤

TaniumResultToOscalAR (TaskBase) ¤

Task to convert Tanium result to OSCAL json.

Attributes:

Name Type Description
name str

Name of the task.

Source code in trestle/tasks/tanium_result_to_oscal_ar.py
class TaniumResultToOscalAR(TaskBase):
    """
    Task to convert Tanium result to OSCAL json.

    Attributes:
        name: Name of the task.
    """

    name = 'tanium-result-to-oscal-ar'

    def __init__(self, config_object: Optional[configparser.SectionProxy]) -> None:
        """
        Initialize trestle task tanium-result-to-oscal-ar.

        Args:
            config_object: Config section associated with the task.
        """
        super().__init__(config_object)

    def print_info(self) -> None:
        """Print the help string."""
        logger.info(f'Help information for {self.name} task.')
        logger.info('')
        logger.info(
            'Purpose: Transform Tanium files into Open Security Controls Assessment Language (OSCAL) results objects'
            + 'and serialize to a file.'
        )
        logger.info('')
        logger.info('Configuration flags sit under [task.tanium-result-to-oscal-ar]:')
        logger.info('  blocksize = (optional) the desired number Tanuim result input lines to process per CPU.')
        logger.info('  cpus-max  = (optional) the desired maximum number of CPUs to employ, default is 1.')
        logger.info('  cpus-min  = (optional) the desired minimum number of CPUs to employ.')
        logger.info('  aggregate = (optional) True indicates employ properties aggregation, default is True.')
        logger.info('  caching   = (optional) True indicates employ object caching, default is True.')
        logger.info(
            '  checking  = (optional) True indicates perform strict checking of OSCAL properties, default is False.'
        )
        logger.info('  input-dir = (required) the path of the input directory comprising Tanium results.')
        logger.info(
            '  output-dir = (required) the path of the output directory comprising synthesized OSCAL .json files.'
        )
        logger.info('  output-overwrite = (optional) true [default] or false; replace existing output when true.')
        logger.info(
            '  quiet = (optional) true or false [default]; display file creations and rules analysis when false.'
        )
        logger.info(
            '  timestamp = (optional) timestamp for the Observations in ISO 8601 format, such as '
            + '2021-01-04T00:05:23+04:00 for example; if not specified then value for "Timestamp" key in the Tanium '
            + 'result is used if present, otherwise current time is used.'
        )
        logger.info('')
        logger.info(
            'Operation: A transformation is performed on one or more Tanium input files to produce output in '
            + 'OSCAL partial results format.'
        )

    def simulate(self) -> TaskOutcome:
        """Provide a simulated outcome."""
        self._simulate = True
        return self._transform()

    def execute(self) -> TaskOutcome:
        """Provide an actual outcome."""
        self._simulate = False
        return self._transform()

    def _transform(self) -> TaskOutcome:
        """Perform transformation."""
        try:
            return self._transform_work()
        except Exception:
            logger.info(traceback.format_exc())
            mode = ''
            if self._simulate:
                mode = 'simulated-'
            return TaskOutcome(mode + 'failure')

    def _transform_work(self) -> TaskOutcome:
        """
        Perform the transformation work.

        Transformation work steps: read input, process, write output, display analysis.
        """
        mode = ''
        if self._simulate:
            mode = 'simulated-'
        if not self._config:
            logger.warning('Config missing')
            return TaskOutcome(mode + 'failure')
        # config required input & output dirs
        try:
            idir = self._config['input-dir']
            ipth = pathlib.Path(idir)
            odir = self._config['output-dir']
            opth = pathlib.Path(odir)
        except KeyError as e:
            logger.debug(f'key {e.args[0]} missing')
            return TaskOutcome(mode + 'failure')
        # config optional overwrite & quiet
        self._overwrite = self._config.getboolean('output-overwrite', True)
        quiet = self._config.get('quiet', False)
        self._verbose = not self._simulate and not quiet
        # config optional timestamp
        timestamp = self._config.get('timestamp')
        if timestamp is not None:
            try:
                TaniumTransformer.set_timestamp(timestamp)
            except Exception:
                logger.warning('config invalid "timestamp"')
                return TaskOutcome(mode + 'failure')
        # config optional performance
        modes = {
            'blocksize': self._config.getint('blocksize', 10000),
            'cpus_max': self._config.getint('cpus-max', 1),
            'cpus_min': self._config.getint('cpus-min', 1),
            'aggregate': self._config.getboolean('aggregate', True),
            'caching': self._config.getboolean('caching', True),
            'checking': self._config.getboolean('checking', False),
        }
        # insure output dir exists
        opth.mkdir(exist_ok=True, parents=True)
        # process
        for ifile in sorted(ipth.iterdir()):
            blob = self._read_file(ifile)
            tanium_transformer = TaniumTransformer()
            tanium_transformer.set_modes(modes)
            results = tanium_transformer.transform(blob)
            oname = ifile.stem + '.oscal' + '.json'
            ofile = opth / oname
            if not self._overwrite and pathlib.Path(ofile).exists():
                logger.warning(f'output: {ofile} already exists')
                return TaskOutcome(mode + 'failure')
            self._write_file(results, ofile)
            self._show_analysis(tanium_transformer)
        return TaskOutcome(mode + 'success')

    def _read_file(self, ifile: str) -> str:
        """Read raw input file."""
        if not self._simulate and self._verbose:
            logger.info(f'input: {ifile}')
        with open(ifile, 'r', encoding=const.FILE_ENCODING) as fp:
            blob = fp.read()
        return blob

    def _write_file(self, result: str, ofile: str) -> None:
        """Write oscal results file."""
        if not self._simulate:
            if self._verbose:
                logger.info(f'output: {ofile}')
            result.oscal_write(pathlib.Path(ofile))

    def _show_analysis(self, tanium_transformer: TaniumTransformer) -> None:
        """Show analysis."""
        if not self._simulate and self._verbose:
            analysis = tanium_transformer.analysis
            for line in analysis:
                logger.info(line)
name: str ¤
Methods¤
__init__(self, config_object) special ¤

Initialize trestle task tanium-result-to-oscal-ar.

Parameters:

Name Type Description Default
config_object Optional[configparser.SectionProxy]

Config section associated with the task.

required
Source code in trestle/tasks/tanium_result_to_oscal_ar.py
def __init__(self, config_object: Optional[configparser.SectionProxy]) -> None:
    """
    Initialize trestle task tanium-result-to-oscal-ar.

    Args:
        config_object: Config section associated with the task.
    """
    super().__init__(config_object)
execute(self) ¤

Provide an actual outcome.

Source code in trestle/tasks/tanium_result_to_oscal_ar.py
def execute(self) -> TaskOutcome:
    """Provide an actual outcome."""
    self._simulate = False
    return self._transform()
print_info(self) ¤

Print the help string.

Source code in trestle/tasks/tanium_result_to_oscal_ar.py
def print_info(self) -> None:
    """Print the help string."""
    logger.info(f'Help information for {self.name} task.')
    logger.info('')
    logger.info(
        'Purpose: Transform Tanium files into Open Security Controls Assessment Language (OSCAL) results objects'
        + 'and serialize to a file.'
    )
    logger.info('')
    logger.info('Configuration flags sit under [task.tanium-result-to-oscal-ar]:')
    logger.info('  blocksize = (optional) the desired number Tanuim result input lines to process per CPU.')
    logger.info('  cpus-max  = (optional) the desired maximum number of CPUs to employ, default is 1.')
    logger.info('  cpus-min  = (optional) the desired minimum number of CPUs to employ.')
    logger.info('  aggregate = (optional) True indicates employ properties aggregation, default is True.')
    logger.info('  caching   = (optional) True indicates employ object caching, default is True.')
    logger.info(
        '  checking  = (optional) True indicates perform strict checking of OSCAL properties, default is False.'
    )
    logger.info('  input-dir = (required) the path of the input directory comprising Tanium results.')
    logger.info(
        '  output-dir = (required) the path of the output directory comprising synthesized OSCAL .json files.'
    )
    logger.info('  output-overwrite = (optional) true [default] or false; replace existing output when true.')
    logger.info(
        '  quiet = (optional) true or false [default]; display file creations and rules analysis when false.'
    )
    logger.info(
        '  timestamp = (optional) timestamp for the Observations in ISO 8601 format, such as '
        + '2021-01-04T00:05:23+04:00 for example; if not specified then value for "Timestamp" key in the Tanium '
        + 'result is used if present, otherwise current time is used.'
    )
    logger.info('')
    logger.info(
        'Operation: A transformation is performed on one or more Tanium input files to produce output in '
        + 'OSCAL partial results format.'
    )
simulate(self) ¤

Provide a simulated outcome.

Source code in trestle/tasks/tanium_result_to_oscal_ar.py
def simulate(self) -> TaskOutcome:
    """Provide a simulated outcome."""
    self._simulate = True
    return self._transform()

handler: python