Skip to content

pipeline

trestle.core.pipeline ¤

Abstract base class for pipelines and filters.

Classes¤

Pipeline ¤

Pipeline base class.

Source code in trestle/core/pipeline.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class Pipeline():
    """Pipeline base class."""

    class Filter(ABC):
        """Filter class used by pipeline."""

        @abstractclassmethod
        def process(self, input_: Any) -> Any:
            """Process the input to output."""
            return input_

    def __init__(self, filters: List[Filter]) -> None:
        """Initialize the class."""
        self._filters = filters

    def process(self, input_: Any) -> Any:
        """Process the filter pipeline."""
        for filter_ in self._filters:
            input_ = filter_.process(input_)
        return input_
Classes¤
Filter ¤

Bases: ABC

Filter class used by pipeline.

Source code in trestle/core/pipeline.py
23
24
25
26
27
28
29
class Filter(ABC):
    """Filter class used by pipeline."""

    @abstractclassmethod
    def process(self, input_: Any) -> Any:
        """Process the input to output."""
        return input_
Functions¤
process(input_) ¤

Process the input to output.

Source code in trestle/core/pipeline.py
26
27
28
29
@abstractclassmethod
def process(self, input_: Any) -> Any:
    """Process the input to output."""
    return input_
Functions¤
__init__(filters) ¤

Initialize the class.

Source code in trestle/core/pipeline.py
31
32
33
def __init__(self, filters: List[Filter]) -> None:
    """Initialize the class."""
    self._filters = filters
process(input_) ¤

Process the filter pipeline.

Source code in trestle/core/pipeline.py
35
36
37
38
39
def process(self, input_: Any) -> Any:
    """Process the filter pipeline."""
    for filter_ in self._filters:
        input_ = filter_.process(input_)
    return input_

handler: python