Skip to content

Stream Layer

Stream Layer#

The stream layer provides classes for creating custom stream-processing pipelines. Each pipeline consists of: a source, which injects items into the pipeline for processing; processors, which take items and process them; and a sink, which consumes the items coming out of the pipeline.

There are 3 base-classes representing sources, processors and sinks (respectively: StreamSource, StreamProcessor and StreamSink). These classes should be inherited to provide customised stream-processing functionality. The Pipeline class gathers the individual stream elements into a whole, connects each element to the previous and next elements, and ensures that the elements follow the pipeline's calling semantics (see below).

Note that while the stream-processing elements are typed on the items they consume/produce, this is only for linting purposes and not enforced at run-time.

Calling semantics#

For sources and processors, the ability to pass items on to later elements in the processing pipeline is provided through 2 callback functions, then and done. then forwards an item to the next element of the pipeline, and done signals that no more items will be forwarded. This design was chosen over e.g. iterators as it ensures that when an error occurs, the relevant information which led to the error is retained on the call-stack. This is a useful aid for debugging.

However, this does mean that the potential exists for the callbacks to called in an arbitrary order, when only certain permutations are sensible. The intended calling semantics of the 2 callbacks are: 0 or more calls to then to forward items, followed by exactly 1 call to done to signal termination.

The pipeline class constructs these callbacks so that, if they are not called in the correct order, an error occurs. Specifically, calling then after done has been called will raise an error, as will not calling done at all. In practice, done is written to be idempotent, such that calling it multiple times is the same as calling it once.

StreamSource#

Stream sources should implement the produce method to inject items into the pipeline. The method should call the then callback with each item to inject as its argument. Once all items have been produced, the source should then call the done callback.

StreamProcessor#

Stream processors should implement process_element method to receive items from the previous processing-element in the pipeline for processing. They should also implement the finish method to finalise processing after all items have been received.

Both methods take the then and done callbacks as arguments, so processors can process items and forward the processed results as they arrive, or batch them and perform processing/forwarding in bulk.

An optional start method is available, which can be overridden to perform any set-up required before the processor starts receiving items.

StreamSink#

Stream sinks should implement the consume_element method to consume items produced by the pipeline. They should also implement the finish method to perform any tidy-up once the pipeline has terminated.

An optional start method is available, which can be overridden to perform any set-up required before the sink starts receiving items.

Pipeline#

The Pipeline class take an optional source, zero or more processors, and an optional sink as its constructor arguments. When the process method is called, the processing elements are connected together, the start methods for each processor and the sink are called, and the source is instructed to start producing items. Optionally, an iterable can be passed to the process method to replace the pipeline's own source, and/or a function can be passed to replace the pipeline's sink.

When the pipeline is connecting the processing elements together, it also inserts checks for the calling semantics, which will raise an exception if they are not adhered to.

Utilities#

RequiresNoFinalisation#

Mixin class that can be used with StreamProcessor/StreamSink to automatically implement the finish method. It will call the done callback automatically for processors if it has not been called already.

ProcessState#

Descriptor class which adds per-stream state to a stream-processing element. Takes an initialiser function as its only constructor argument. The initialiser is used to re-initialise the state before each stream is processed by the pipeline that contains the element. The state can be modified during stream processing to track per-stream state, but the changes are discarded once a new stream is processed.