Skip to content

trestle.tasks.xlsx_helper

trestle.tasks.xlsx_helper ¤

XLSX utilities.

Attributes¤

logger = logging.getLogger(__name__) module-attribute ¤

Classes¤

Column ¤

Spread sheet columns.

Source code in trestle/tasks/xlsx_helper.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
class Column():
    """Spread sheet columns."""

    control_id = 'ControlId'
    control_text = 'ControlText'
    goal_name_id = 'goal_name_id'
    goal_version = 'goal_version'
    rule_name_id = 'rule_name_id'
    rule_version = 'rule_version'
    nist_mappings = 'NIST Mappings'
    resource_title = 'ResourceTitle'
    parameter_opt_parm = 'Parameter [optional parameter]'
    values_alternatives = 'Values default , [alternatives]'
    filter_column = None

    tokens_nist_mappings = nist_mappings.split()
    tokens_parameter_opt_parm = parameter_opt_parm.split()
    rename_parameter_opt_parm = 'ParameterName'
    tokens_values_alternatives = values_alternatives.split()
    rename_values_alternatives = 'ParameterValue'

    help_list = []
    text1 = '                      '
    text2 = f'column "{control_id}" contains control ID.'
    help_list.append(text1 + text2)
    text2 = f'column "{control_text}" contains control text.'
    help_list.append(text1 + text2)
    text2 = f'columns "{nist_mappings}" contain NIST control mappings.'
    help_list.append(text1 + text2)
    text2 = f'column "{resource_title}" contains component name.'
    help_list.append(text1 + text2)
    text2 = f'column "{goal_name_id}" contains goal name.'
    help_list.append(text1 + text2)
    text2 = f'column "{goal_version}" contains goal version.'
    help_list.append(text1 + text2)
    text2 = f'column "{rule_name_id}" contains rule name.'
    help_list.append(text1 + text2)
    text2 = f'column "{rule_version}" contains rule version.'
    help_list.append(text1 + text2)
    text2 = f'column "{parameter_opt_parm}" contains parameter name + description, separated by newline.'
    help_list.append(text1 + text2)
    text2 = f'column "{values_alternatives}" contains parameter values.'
    help_list.append(text1 + text2)
Attributes¤
control_id = 'ControlId' class-attribute instance-attribute ¤
control_text = 'ControlText' class-attribute instance-attribute ¤
filter_column = None class-attribute instance-attribute ¤
goal_name_id = 'goal_name_id' class-attribute instance-attribute ¤
goal_version = 'goal_version' class-attribute instance-attribute ¤
help_list = [] class-attribute instance-attribute ¤
nist_mappings = 'NIST Mappings' class-attribute instance-attribute ¤
parameter_opt_parm = 'Parameter [optional parameter]' class-attribute instance-attribute ¤
rename_parameter_opt_parm = 'ParameterName' class-attribute instance-attribute ¤
rename_values_alternatives = 'ParameterValue' class-attribute instance-attribute ¤
resource_title = 'ResourceTitle' class-attribute instance-attribute ¤
rule_name_id = 'rule_name_id' class-attribute instance-attribute ¤
rule_version = 'rule_version' class-attribute instance-attribute ¤
text1 = ' ' class-attribute instance-attribute ¤
text2 = f'column "{values_alternatives}" contains parameter values.' class-attribute instance-attribute ¤
tokens_nist_mappings = nist_mappings.split() class-attribute instance-attribute ¤
tokens_parameter_opt_parm = parameter_opt_parm.split() class-attribute instance-attribute ¤
tokens_values_alternatives = values_alternatives.split() class-attribute instance-attribute ¤
values_alternatives = 'Values default , [alternatives]' class-attribute instance-attribute ¤

XlsxHelper ¤

Xlsx Helper common functions and assistance navigating spread sheet.

Source code in trestle/tasks/xlsx_helper.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
class XlsxHelper:
    """Xlsx Helper common functions and assistance navigating spread sheet."""

    by_goal = 'by-goal'
    by_rule = 'by-rule'
    by_control = 'by-control'
    by_check = 'by-check'

    profile_types = [by_goal, by_rule, by_control, by_check]

    def __init__(self) -> None:
        """Initialize."""
        self._column = Column()

    def print_info(self, name, oscal_name) -> None:
        """Print the help string."""
        logger.info(f'Help information for {name} task.')
        logger.info('')
        logger.info(f'Purpose: From spread sheet and catalog produce OSCAL {oscal_name} file.')
        logger.info('')
        logger.info(f'Configuration flags sit under [task.{name}]:')
        if oscal_name == 'component_definition':
            text1 = '  catalog-file      = '
            text2 = '(required) the path of the OSCAL catalog file.'
            logger.info(text1 + text2)
        text1 = '  spread-sheet-file = '
        text2 = '(required) the path of the spread sheet file.'
        logger.info(text1 + text2)
        text1 = '  work-sheet-name   = '
        text2 = '(required) the name of the work sheet in the spread sheet file.'
        logger.info(text1 + text2)
        for line in self._column.help_list:
            logger.info(line)
        text1 = '  output-dir        = '
        text2 = '(required) the path of the output directory for synthesized OSCAL .json files.'
        logger.info(text1 + text2)
        text1 = '  output-overwrite  = '
        text2 = '(optional) true [default] or false; replace existing output when true.'
        logger.info(text1 + text2)
        text1 = '  filter-column     = '
        text2 = '(optional) column heading of yes/no values; process only "yes" rows.'
        logger.info(text1 + text2)
        text1 = '  profile-type      = '
        text2 = f'(optional) one of {self.profile_types}'
        logger.info(text1 + text2)

    @property
    def profile_type(self) -> str:
        """Profile type."""
        return self._profile_type

    def configure(self, task: TaskBase) -> bool:
        """Configure."""
        if not task._config:
            logger.warning('config missing')
            return False
        # config verbosity
        quiet = task._config.get('quiet', False)
        task._verbose = not quiet
        # required for component-definition
        if not self.configure_cd(task):
            return False
        # required for profile
        if not self.configure_profile(task):
            return False
        # optional
        self._column.filter_column = task._config.get('filter-column', None)
        # config spread sheet
        spread_sheet = task._config.get('spread-sheet-file')
        if spread_sheet is None:
            logger.warning('config missing "spread-sheet"')
            return False
        if not pathlib.Path(spread_sheet).exists():
            logger.warning('"spread-sheet" not found')
            return False
        sheet_name = task._config.get('work-sheet-name')
        if sheet_name is None:
            logger.warning('config missing "work-sheet-name"')
            return False
        # announce spreadsheet
        if task._verbose:
            logger.info(f'input: {spread_sheet}')
        # get profile type
        if task.name == 'xlsx-to-oscal-profile':
            self._profile_type = task._config.get('profile-type', self.profile_types[0])
            if self._profile_type not in self.profile_types:
                logger.warning(f'invalid "profile-type" {self._profile_type} ')
                return False
        else:
            self._profile_type = None
        # load spread sheet
        self.load(spread_sheet, sheet_name)
        return True

    def configure_cd(self, task: TaskBase) -> bool:
        """Configure cd."""
        if task.name == 'xlsx-to-oscal-cd':
            catalog_file = task._config.get('catalog-file')
            if catalog_file is None:
                logger.warning('config missing "catalog-file"')
                return False
            try:
                catalog = Catalog.oscal_read(pathlib.Path(catalog_file))
                logger.debug(f'catalog: {catalog_file}')
            except Exception as e:  # pragma: no cover
                raise TrestleError(f'Error loading catalog {catalog_file}: {e}')
            task.catalog_interface = CatalogInterface(catalog)
        return True

    def configure_profile(self, task: TaskBase) -> bool:
        """Configure profile."""
        if task.name == 'xlsx-to-oscal-profile':
            profile_title = task._config.get('profile-title')
            if profile_title is None:
                logger.warning('config missing "profile-title"')
                return False
            spread_sheet_url = task._config.get('spread-sheet-url')
            if spread_sheet_url is None:
                logger.warning('config missing "spread-sheet-url"')
                return False
        return True

    def load(self, spread_sheet: str, sheet_name: str) -> None:
        """Load."""
        self._spread_sheet = spread_sheet
        self._sheet_name = sheet_name
        self._wb = load_workbook(self._spread_sheet)
        self._work_sheet = self._wb[self._sheet_name]
        self._map_name_to_letters = {}
        # accumulators
        self.rows_missing_control_id = []
        self.rows_missing_goal_name_id = []
        self.rows_invalid_goal_name_id = []
        self.rows_missing_rule_name_id = []
        self.rows_invalid_rule_name_id = []
        self.rows_invalid_parameter_name = []
        self.rows_missing_controls = []
        self.rows_missing_parameters = []
        self.rows_missing_parameters_values = []
        self.rows_filtered = []
        # map columns
        self._map_columns()

    def row_generator(self) -> Iterator[int]:
        """Generate rows until control_id is None."""
        row = 1
        rows_skipped_consecutive = 0
        # assume no more data when 100 consecutve rows no control id
        rows_skipped_consecutive_limit = 100
        while True:
            row = row + 1
            control_id = self._get_control_id(row)
            goal_id = self.get_goal_name_id(row)
            if control_id is None and goal_id is None:
                rows_skipped_consecutive += 1
                if rows_skipped_consecutive < rows_skipped_consecutive_limit:
                    continue
                logger.debug(f'break: {row} {rows_skipped_consecutive}')
                break
            if control_id is None:
                self._add_row(row, self.rows_missing_control_id)
                continue
            if goal_id is None:
                self._add_row(row, self.rows_missing_goal_name_id)
                continue
            if self._is_filtered(row):
                continue
            yield row
            rows_skipped_consecutive = 0

    def _is_filtered(self, row) -> bool:
        """Return True if row is to be skipped."""
        if self._column.filter_column is None:
            return False
        col = self._get_column_letter(self._column.filter_column)
        value = self._work_sheet[col + str(row)].value
        if value is None:
            return False
        if value.lower() != 'yes':
            return False
        self._add_row(row, self.rows_filtered)
        return True

    def get_goal_name_id(self, row: int, strict: bool = True) -> str:
        """Get goal_name_id from work_sheet."""
        col = self._get_column_letter(self._column.goal_name_id)
        value = self._work_sheet[col + str(row)].value
        if value is None:
            self._add_row(row, self.rows_missing_goal_name_id)
        else:
            value = str(value).strip()
            if strict:
                svalue = str(value).strip()
                value = ''.join(str(svalue).split())
                if value != svalue:
                    self._add_row(row, self.rows_invalid_goal_name_id)
        return value

    def get_check_name_id(self, row: int, strict: bool = False) -> str:
        """Get check_name_id from work_sheet."""
        return self.get_goal_name_id(row, strict)

    def get_rule_name_id(self, row: int, strict: bool = False) -> str:
        """Get rule_name_id from work_sheet."""
        col = self._get_column_letter(self._column.rule_name_id)
        value = self._work_sheet[col + str(row)].value
        if value is None:
            self._add_row(row, self.rows_missing_rule_name_id)
        else:
            value = str(value).strip()
            if strict:
                svalue = str(value).strip()
                value = ''.join(str(svalue).split())
                if value != svalue:
                    self._add_row(row, self.rows_invalid_rule_name_id)
        return value

    def get_parameter_usage(self, row: int) -> str:
        """Get parameter_usage from work_sheet."""
        return self.get_goal_remarks(row)

    def get_parameter_value_default(self, row: int) -> str:
        """Get parameter_value_default from work_sheet."""
        col = self._get_column_letter(self._column.rename_values_alternatives)
        value = self._work_sheet[col + str(row)].value
        if value is not None:
            value = str(value).split(',')[0].strip()
        return value

    def get_parameter_values(self, row: int) -> str:
        """Get parameter_values from work_sheet."""
        col = self._get_column_letter(self._column.rename_values_alternatives)
        value = self._work_sheet[col + str(row)].value
        if value is None and self.get_parameter_name(row) is not None:
            self._add_row(row, self.rows_missing_parameters_values)
        # massage into comma separated list of values
        else:
            value = str(value).strip().replace(' ', '')
            value = value.replace(',[]', '')
            value = value.replace('[', '')
            value = value.replace(']', '')
            value = value.split(',')
        return value

    def _get_goal_text(self, row: int) -> str:
        """Get goal_text from work_sheet."""
        col = self._get_column_letter(self._column.control_text)
        goal_text = self._work_sheet[col + str(row)].value
        # normalize & tokenize
        value = goal_text.replace('\t', ' ')
        return value

    def _get_goal_text_tokens(self, row: int) -> List[str]:
        """Get goal_text tokens from work_sheet."""
        goal_text = self._get_goal_text(row)
        tokens = goal_text.split()
        return tokens

    def get_goal_remarks(self, row: int) -> str:
        """Get goal_remarks from work_sheet."""
        tokens = self._get_goal_text_tokens(row)
        # replace "Check whether" with "Ensure", if present
        if tokens:
            if tokens[0] == 'Check':
                if len(tokens) > 1:
                    if tokens[1] == 'whether':
                        tokens.pop(0)
                tokens[0] = 'Ensure'
        value = ' '.join(tokens)
        return value

    def get_controls(self, row: int) -> Dict[str, List[str]]:
        """Produce dict of controls mapped to statements.

        Example: {'au-2': ['(a)', '(d)'], 'au-12': [], 'si-4': ['(a)', '(b)', '(c)']}
        """
        value = {}
        for col in self._get_column_letter(self._column.nist_mappings):
            control = self._work_sheet[col + str(row)].value
            if control is None:
                continue
            # remove blanks
            control = ''.join(control.split())
            if len(control) < 1 or control.lower() == 'none':
                continue
            # remove rhs of : inclusive
            if ':' in control:
                control = control.split(':')[0]
            # remove alphabet parts of control & accumulate in statements
            control, statements = self._normalize_control(control)
            # skip bogus control made up if dashes only
            if len(control.replace('-', '')) == 0:
                continue
            if control not in value.keys():
                value[control] = statements
        if len(value.keys()) == 0:
            self._add_row(row, self.rows_missing_controls)
        logger.debug(f'row: {row} controls {value}')
        return value

    def get_component_name(self, row: int) -> str:
        """Get component_name from work_sheet."""
        col = self._get_column_letter(self._column.resource_title)
        value = self._work_sheet[col + str(row)].value
        if value is None:
            raise RuntimeError(f'row {row} col {col} missing component name')
        return value.strip()

    def get_parameter_name(self, row: int) -> Tuple[str, str]:
        """Get parameter_name from work_sheet."""
        return self.get_parameter_name_and_description(row)[0]

    def get_parameter_name_and_description(self, row: int) -> Tuple[str, str]:
        """Get parameter_name and description from work_sheet."""
        name = None
        description = None
        col = self._get_column_letter(self._column.rename_parameter_opt_parm)
        combined_values = self._work_sheet[col + str(row)].value
        if combined_values is not None:
            if '\n' in combined_values:
                parameter_parts = combined_values.split('\n')
            elif ' ' in combined_values:
                parameter_parts = combined_values.split(' ', 1)
            else:
                parameter_parts = combined_values
            if len(parameter_parts) == 2:
                name = parameter_parts[1].strip()
                description = parameter_parts[0].strip()
                sname = str(name).strip()
                name = sname.replace(' ', '_')
                if name != sname:
                    self._add_row(row, self.rows_invalid_parameter_name)
            else:
                logger.info(f'row {row} col {col} invalid value')
        if name is None and self.get_parameter_value_default(row) is not None:
            self._add_row(row, self.rows_missing_parameters)
        value = name, description
        return value

    def _get_control_id(self, row: int) -> int:
        """Get control_id from work_sheet."""
        col = self._get_column_letter(self._column.control_id)
        value = self._work_sheet[col + str(row)].value
        return value

    def _get_column_letter(self, name: str) -> str:
        """Get column letter."""
        value = self.map_name_to_letters[name]
        if len(value) == 1:
            value = value[0]
        return value

    def _map_columns(self) -> None:
        """Map columns."""
        self.map_name_to_letters = {}
        columns = self._work_sheet.max_column
        for column in range(1, columns + 1):
            cell_value = self._cell_value(1, column)
            if cell_value is None:
                continue
            cell_tokens = cell_value.split()
            normalized_cell_value = ' '.join(cell_tokens)
            # find columns of interest
            if self._column.control_id in cell_tokens:
                self._add_column(self._column.control_id, column, 1)
            elif self._column.control_text in cell_tokens:
                self._add_column(self._column.control_text, column, 1)
            elif self._column.goal_name_id in cell_tokens:
                self._add_column(self._column.goal_name_id, column, 1)
            elif self._column.goal_version in cell_tokens:
                self._add_column(self._column.goal_version, column, 1)
            elif self._column.rule_name_id in cell_tokens:
                self._add_column(self._column.rule_name_id, column, 1)
            elif self._column.rule_version in cell_tokens:
                self._add_column(self._column.rule_version, column, 1)
            # parameters and alternatives (exact tokens match)
            elif cell_tokens == self._column.tokens_parameter_opt_parm:
                self._add_column(self._column.rename_parameter_opt_parm, column, 1)
            elif cell_tokens == self._column.tokens_values_alternatives:
                self._add_column(self._column.rename_values_alternatives, column, 1)
            # filter column (exact string match)
            elif self._column.filter_column == normalized_cell_value:
                self._add_column(self._column.filter_column, column, 1)
            # nist mappings and resource title (multiple columns match)
            elif is_ordered_sublist(self._column.tokens_nist_mappings, cell_tokens):
                self._add_column(self._column.nist_mappings, column, 0)
            elif self._column.resource_title in cell_tokens:
                self._add_column(self._column.resource_title, column, 0)
        # insure expected columns found
        for name in [self._column.control_id,
                     self._column.control_text,
                     self._column.rule_name_id,
                     self._column.rule_version,
                     self._column.goal_name_id,
                     self._column.goal_version,
                     self._column.nist_mappings,
                     self._column.resource_title,
                     self._column.rename_parameter_opt_parm,
                     self._column.rename_values_alternatives]:
            if name not in self.map_name_to_letters.keys():
                raise RuntimeError(f'missing column {name}')

    def _add_column(self, name: str, column: int, limit: int) -> None:
        """Add column."""
        if name not in self.map_name_to_letters:
            self.map_name_to_letters[name] = []
        if limit > 0 and len(self.map_name_to_letters[name]) == limit:
            raise RuntimeError(f'duplicate column {name} {get_column_letter(column)}')
        self.map_name_to_letters[name].append(get_column_letter(column))

    def _cell_value(self, row: int, col: int) -> Any:
        """Get value for cell, adjusting for merged cells."""
        cell = self._work_sheet.cell(row, col)
        retval = cell.value
        if isinstance(cell, MergedCell):
            # cell is merged
            for mc_range in self._work_sheet.merged_cells.ranges:
                coord = get_column_letter(col) + str(row)
                if coord in mc_range:
                    retval = mc_range.start_cell.value
        return retval

    def _normalize_control(self, control: str) -> Tuple[str, List[str]]:
        """Remove parenthesized characters from controls."""
        statements = []
        for i in string.ascii_lowercase:
            needle = '(' + i + ')'
            if needle in control:
                statements.append(needle)
                control = control.replace(needle, '')
        control = control.lower()
        return control, statements

    def _add_row(self, row: int, account: List[int]) -> None:
        """Add row to accounting list of rows."""
        if row not in account:
            account.append(row)

    def report_issues(self) -> None:
        """Report issues."""
        if self.rows_missing_control_id:
            logger.info(f'rows missing control_id: {self.rows_missing_control_id}')
        if self.rows_invalid_goal_name_id:
            logger.info(f'rows invalid goal_name_id: {self.rows_invalid_goal_name_id}')
        if self.rows_missing_rule_name_id:
            logger.info(f'rows missing rule_name_id: {self.rows_missing_rule_name_id}')
        if self.rows_invalid_rule_name_id:
            logger.info(f'rows invalid rule_name_id: {self.rows_invalid_rule_name_id}')
        if self.rows_invalid_parameter_name:
            logger.info(f'rows invalid parameter_name: {self.rows_invalid_parameter_name}')
        if self.rows_missing_controls:
            logger.info(f'rows missing controls: {self.rows_missing_controls}')
        if self.rows_missing_parameters:
            logger.info(f'rows missing parameters: {self.rows_missing_parameters}')
        if self.rows_missing_parameters_values:
            logger.info(f'rows missing parameters values: {self.rows_missing_parameters_values}')
        if self.rows_filtered:
            logger.info(f'rows filtered: {self.rows_filtered}')
Attributes¤
by_check = 'by-check' class-attribute instance-attribute ¤
by_control = 'by-control' class-attribute instance-attribute ¤
by_goal = 'by-goal' class-attribute instance-attribute ¤
by_rule = 'by-rule' class-attribute instance-attribute ¤
profile_type property ¤

Profile type.

profile_types = [by_goal, by_rule, by_control, by_check] class-attribute instance-attribute ¤
Functions¤
__init__() ¤

Initialize.

Source code in trestle/tasks/xlsx_helper.py
97
98
99
def __init__(self) -> None:
    """Initialize."""
    self._column = Column()
configure(task) ¤

Configure.

Source code in trestle/tasks/xlsx_helper.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
def configure(self, task: TaskBase) -> bool:
    """Configure."""
    if not task._config:
        logger.warning('config missing')
        return False
    # config verbosity
    quiet = task._config.get('quiet', False)
    task._verbose = not quiet
    # required for component-definition
    if not self.configure_cd(task):
        return False
    # required for profile
    if not self.configure_profile(task):
        return False
    # optional
    self._column.filter_column = task._config.get('filter-column', None)
    # config spread sheet
    spread_sheet = task._config.get('spread-sheet-file')
    if spread_sheet is None:
        logger.warning('config missing "spread-sheet"')
        return False
    if not pathlib.Path(spread_sheet).exists():
        logger.warning('"spread-sheet" not found')
        return False
    sheet_name = task._config.get('work-sheet-name')
    if sheet_name is None:
        logger.warning('config missing "work-sheet-name"')
        return False
    # announce spreadsheet
    if task._verbose:
        logger.info(f'input: {spread_sheet}')
    # get profile type
    if task.name == 'xlsx-to-oscal-profile':
        self._profile_type = task._config.get('profile-type', self.profile_types[0])
        if self._profile_type not in self.profile_types:
            logger.warning(f'invalid "profile-type" {self._profile_type} ')
            return False
    else:
        self._profile_type = None
    # load spread sheet
    self.load(spread_sheet, sheet_name)
    return True
configure_cd(task) ¤

Configure cd.

Source code in trestle/tasks/xlsx_helper.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def configure_cd(self, task: TaskBase) -> bool:
    """Configure cd."""
    if task.name == 'xlsx-to-oscal-cd':
        catalog_file = task._config.get('catalog-file')
        if catalog_file is None:
            logger.warning('config missing "catalog-file"')
            return False
        try:
            catalog = Catalog.oscal_read(pathlib.Path(catalog_file))
            logger.debug(f'catalog: {catalog_file}')
        except Exception as e:  # pragma: no cover
            raise TrestleError(f'Error loading catalog {catalog_file}: {e}')
        task.catalog_interface = CatalogInterface(catalog)
    return True
configure_profile(task) ¤

Configure profile.

Source code in trestle/tasks/xlsx_helper.py
196
197
198
199
200
201
202
203
204
205
206
207
def configure_profile(self, task: TaskBase) -> bool:
    """Configure profile."""
    if task.name == 'xlsx-to-oscal-profile':
        profile_title = task._config.get('profile-title')
        if profile_title is None:
            logger.warning('config missing "profile-title"')
            return False
        spread_sheet_url = task._config.get('spread-sheet-url')
        if spread_sheet_url is None:
            logger.warning('config missing "spread-sheet-url"')
            return False
    return True
get_check_name_id(row, strict=False) ¤

Get check_name_id from work_sheet.

Source code in trestle/tasks/xlsx_helper.py
285
286
287
def get_check_name_id(self, row: int, strict: bool = False) -> str:
    """Get check_name_id from work_sheet."""
    return self.get_goal_name_id(row, strict)
get_component_name(row) ¤

Get component_name from work_sheet.

Source code in trestle/tasks/xlsx_helper.py
387
388
389
390
391
392
393
def get_component_name(self, row: int) -> str:
    """Get component_name from work_sheet."""
    col = self._get_column_letter(self._column.resource_title)
    value = self._work_sheet[col + str(row)].value
    if value is None:
        raise RuntimeError(f'row {row} col {col} missing component name')
    return value.strip()
get_controls(row) ¤

Produce dict of controls mapped to statements.

Example: {'au-2': ['(a)', '(d)'], 'au-12': [], 'si-4': ['(a)', '(b)', '(c)']}

Source code in trestle/tasks/xlsx_helper.py
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def get_controls(self, row: int) -> Dict[str, List[str]]:
    """Produce dict of controls mapped to statements.

    Example: {'au-2': ['(a)', '(d)'], 'au-12': [], 'si-4': ['(a)', '(b)', '(c)']}
    """
    value = {}
    for col in self._get_column_letter(self._column.nist_mappings):
        control = self._work_sheet[col + str(row)].value
        if control is None:
            continue
        # remove blanks
        control = ''.join(control.split())
        if len(control) < 1 or control.lower() == 'none':
            continue
        # remove rhs of : inclusive
        if ':' in control:
            control = control.split(':')[0]
        # remove alphabet parts of control & accumulate in statements
        control, statements = self._normalize_control(control)
        # skip bogus control made up if dashes only
        if len(control.replace('-', '')) == 0:
            continue
        if control not in value.keys():
            value[control] = statements
    if len(value.keys()) == 0:
        self._add_row(row, self.rows_missing_controls)
    logger.debug(f'row: {row} controls {value}')
    return value
get_goal_name_id(row, strict=True) ¤

Get goal_name_id from work_sheet.

Source code in trestle/tasks/xlsx_helper.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
def get_goal_name_id(self, row: int, strict: bool = True) -> str:
    """Get goal_name_id from work_sheet."""
    col = self._get_column_letter(self._column.goal_name_id)
    value = self._work_sheet[col + str(row)].value
    if value is None:
        self._add_row(row, self.rows_missing_goal_name_id)
    else:
        value = str(value).strip()
        if strict:
            svalue = str(value).strip()
            value = ''.join(str(svalue).split())
            if value != svalue:
                self._add_row(row, self.rows_invalid_goal_name_id)
    return value
get_goal_remarks(row) ¤

Get goal_remarks from work_sheet.

Source code in trestle/tasks/xlsx_helper.py
345
346
347
348
349
350
351
352
353
354
355
356
def get_goal_remarks(self, row: int) -> str:
    """Get goal_remarks from work_sheet."""
    tokens = self._get_goal_text_tokens(row)
    # replace "Check whether" with "Ensure", if present
    if tokens:
        if tokens[0] == 'Check':
            if len(tokens) > 1:
                if tokens[1] == 'whether':
                    tokens.pop(0)
            tokens[0] = 'Ensure'
    value = ' '.join(tokens)
    return value
get_parameter_name(row) ¤

Get parameter_name from work_sheet.

Source code in trestle/tasks/xlsx_helper.py
395
396
397
def get_parameter_name(self, row: int) -> Tuple[str, str]:
    """Get parameter_name from work_sheet."""
    return self.get_parameter_name_and_description(row)[0]
get_parameter_name_and_description(row) ¤

Get parameter_name and description from work_sheet.

Source code in trestle/tasks/xlsx_helper.py
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
def get_parameter_name_and_description(self, row: int) -> Tuple[str, str]:
    """Get parameter_name and description from work_sheet."""
    name = None
    description = None
    col = self._get_column_letter(self._column.rename_parameter_opt_parm)
    combined_values = self._work_sheet[col + str(row)].value
    if combined_values is not None:
        if '\n' in combined_values:
            parameter_parts = combined_values.split('\n')
        elif ' ' in combined_values:
            parameter_parts = combined_values.split(' ', 1)
        else:
            parameter_parts = combined_values
        if len(parameter_parts) == 2:
            name = parameter_parts[1].strip()
            description = parameter_parts[0].strip()
            sname = str(name).strip()
            name = sname.replace(' ', '_')
            if name != sname:
                self._add_row(row, self.rows_invalid_parameter_name)
        else:
            logger.info(f'row {row} col {col} invalid value')
    if name is None and self.get_parameter_value_default(row) is not None:
        self._add_row(row, self.rows_missing_parameters)
    value = name, description
    return value
get_parameter_usage(row) ¤

Get parameter_usage from work_sheet.

Source code in trestle/tasks/xlsx_helper.py
304
305
306
def get_parameter_usage(self, row: int) -> str:
    """Get parameter_usage from work_sheet."""
    return self.get_goal_remarks(row)
get_parameter_value_default(row) ¤

Get parameter_value_default from work_sheet.

Source code in trestle/tasks/xlsx_helper.py
308
309
310
311
312
313
314
def get_parameter_value_default(self, row: int) -> str:
    """Get parameter_value_default from work_sheet."""
    col = self._get_column_letter(self._column.rename_values_alternatives)
    value = self._work_sheet[col + str(row)].value
    if value is not None:
        value = str(value).split(',')[0].strip()
    return value
get_parameter_values(row) ¤

Get parameter_values from work_sheet.

Source code in trestle/tasks/xlsx_helper.py
316
317
318
319
320
321
322
323
324
325
326
327
328
329
def get_parameter_values(self, row: int) -> str:
    """Get parameter_values from work_sheet."""
    col = self._get_column_letter(self._column.rename_values_alternatives)
    value = self._work_sheet[col + str(row)].value
    if value is None and self.get_parameter_name(row) is not None:
        self._add_row(row, self.rows_missing_parameters_values)
    # massage into comma separated list of values
    else:
        value = str(value).strip().replace(' ', '')
        value = value.replace(',[]', '')
        value = value.replace('[', '')
        value = value.replace(']', '')
        value = value.split(',')
    return value
get_rule_name_id(row, strict=False) ¤

Get rule_name_id from work_sheet.

Source code in trestle/tasks/xlsx_helper.py
289
290
291
292
293
294
295
296
297
298
299
300
301
302
def get_rule_name_id(self, row: int, strict: bool = False) -> str:
    """Get rule_name_id from work_sheet."""
    col = self._get_column_letter(self._column.rule_name_id)
    value = self._work_sheet[col + str(row)].value
    if value is None:
        self._add_row(row, self.rows_missing_rule_name_id)
    else:
        value = str(value).strip()
        if strict:
            svalue = str(value).strip()
            value = ''.join(str(svalue).split())
            if value != svalue:
                self._add_row(row, self.rows_invalid_rule_name_id)
    return value
load(spread_sheet, sheet_name) ¤

Load.

Source code in trestle/tasks/xlsx_helper.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
def load(self, spread_sheet: str, sheet_name: str) -> None:
    """Load."""
    self._spread_sheet = spread_sheet
    self._sheet_name = sheet_name
    self._wb = load_workbook(self._spread_sheet)
    self._work_sheet = self._wb[self._sheet_name]
    self._map_name_to_letters = {}
    # accumulators
    self.rows_missing_control_id = []
    self.rows_missing_goal_name_id = []
    self.rows_invalid_goal_name_id = []
    self.rows_missing_rule_name_id = []
    self.rows_invalid_rule_name_id = []
    self.rows_invalid_parameter_name = []
    self.rows_missing_controls = []
    self.rows_missing_parameters = []
    self.rows_missing_parameters_values = []
    self.rows_filtered = []
    # map columns
    self._map_columns()
print_info(name, oscal_name) ¤

Print the help string.

Source code in trestle/tasks/xlsx_helper.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def print_info(self, name, oscal_name) -> None:
    """Print the help string."""
    logger.info(f'Help information for {name} task.')
    logger.info('')
    logger.info(f'Purpose: From spread sheet and catalog produce OSCAL {oscal_name} file.')
    logger.info('')
    logger.info(f'Configuration flags sit under [task.{name}]:')
    if oscal_name == 'component_definition':
        text1 = '  catalog-file      = '
        text2 = '(required) the path of the OSCAL catalog file.'
        logger.info(text1 + text2)
    text1 = '  spread-sheet-file = '
    text2 = '(required) the path of the spread sheet file.'
    logger.info(text1 + text2)
    text1 = '  work-sheet-name   = '
    text2 = '(required) the name of the work sheet in the spread sheet file.'
    logger.info(text1 + text2)
    for line in self._column.help_list:
        logger.info(line)
    text1 = '  output-dir        = '
    text2 = '(required) the path of the output directory for synthesized OSCAL .json files.'
    logger.info(text1 + text2)
    text1 = '  output-overwrite  = '
    text2 = '(optional) true [default] or false; replace existing output when true.'
    logger.info(text1 + text2)
    text1 = '  filter-column     = '
    text2 = '(optional) column heading of yes/no values; process only "yes" rows.'
    logger.info(text1 + text2)
    text1 = '  profile-type      = '
    text2 = f'(optional) one of {self.profile_types}'
    logger.info(text1 + text2)
report_issues() ¤

Report issues.

Source code in trestle/tasks/xlsx_helper.py
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
def report_issues(self) -> None:
    """Report issues."""
    if self.rows_missing_control_id:
        logger.info(f'rows missing control_id: {self.rows_missing_control_id}')
    if self.rows_invalid_goal_name_id:
        logger.info(f'rows invalid goal_name_id: {self.rows_invalid_goal_name_id}')
    if self.rows_missing_rule_name_id:
        logger.info(f'rows missing rule_name_id: {self.rows_missing_rule_name_id}')
    if self.rows_invalid_rule_name_id:
        logger.info(f'rows invalid rule_name_id: {self.rows_invalid_rule_name_id}')
    if self.rows_invalid_parameter_name:
        logger.info(f'rows invalid parameter_name: {self.rows_invalid_parameter_name}')
    if self.rows_missing_controls:
        logger.info(f'rows missing controls: {self.rows_missing_controls}')
    if self.rows_missing_parameters:
        logger.info(f'rows missing parameters: {self.rows_missing_parameters}')
    if self.rows_missing_parameters_values:
        logger.info(f'rows missing parameters values: {self.rows_missing_parameters_values}')
    if self.rows_filtered:
        logger.info(f'rows filtered: {self.rows_filtered}')
row_generator() ¤

Generate rows until control_id is None.

Source code in trestle/tasks/xlsx_helper.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
def row_generator(self) -> Iterator[int]:
    """Generate rows until control_id is None."""
    row = 1
    rows_skipped_consecutive = 0
    # assume no more data when 100 consecutve rows no control id
    rows_skipped_consecutive_limit = 100
    while True:
        row = row + 1
        control_id = self._get_control_id(row)
        goal_id = self.get_goal_name_id(row)
        if control_id is None and goal_id is None:
            rows_skipped_consecutive += 1
            if rows_skipped_consecutive < rows_skipped_consecutive_limit:
                continue
            logger.debug(f'break: {row} {rows_skipped_consecutive}')
            break
        if control_id is None:
            self._add_row(row, self.rows_missing_control_id)
            continue
        if goal_id is None:
            self._add_row(row, self.rows_missing_goal_name_id)
            continue
        if self._is_filtered(row):
            continue
        yield row
        rows_skipped_consecutive = 0

Functions¤

get_trestle_version() ¤

Get trestle version wrapper.

Source code in trestle/tasks/xlsx_helper.py
37
38
39
def get_trestle_version() -> str:
    """Get trestle version wrapper."""
    return __version__

handler: python