streamsx.topology.topology

Streaming application definition.

Overview

IBM Streams is an advanced analytic platform that allows user-developed applications to quickly ingest, analyze and correlate information as it arrives from thousands of real-time sources. Streams can handle very high data throughput rates, millions of events or messages per second.

With this API Python developers can build streaming applications that can be executed using IBM Streams, including the processing being distributed across multiple computing resources (hosts or machines) for scalability.

Topology

A Topology declares a graph of streams and operations against tuples (data items) on those streams.

After being declared, a Topology is submitted to be compiled into a Streams application bundle (sab file) and then executed. The sab file is a self contained bundle that can be executed in a distributed Streams instance either using the Streaming Analytics service on IBM Cloud or an on-premise IBM Streams installation.

The compilation step invokes the Streams compiler to produce a bundle. This effectively, from a Python point of view, produces a runnable version of the Python topology that includes application specific Python C extensions to optimize performance.

The bundle also includes any required Python packages or modules that were used in the declaration of the application, excluding ones that are in a directory path containing site-packages.

The Python standard package tool pip uses a directory structure including site-packages when installing packages. Packages installed with pip can be included in the bundle with add_pip_package() when using a build service. This avoids the requirement to have packages be preinstalled in cloud environments.

Local Python packages and modules containing callables used in transformations such as map() are copied into the bundle from their local location. The addition of local packages to the bundle can be controlled with Topology.include_packages and Topology.exclude_packages.

The Streams runtime distributes the application’s operations across the resources available in the instance.

Note

Topology represents a declaration of a streaming application that will be executed by a Streams instance as a job, either using the Streaming Analytics service on IBM Cloud or an on-premises distributed instance. Topology does not represent a running application, so an instance of Stream class does not contain the tuples, it is only a declaration of a stream.

Stream

A Stream can be an infinite sequence of tuples, such as a stream for a traffic flow sensor. Alternatively, a stream can be finite, such as a stream that is created from the contents of a file. When a streams processing application contains infinite streams, the application runs continuously without ending.

A stream has a schema that defines the type of each tuple on the stream. The schema for a stream is either:

  • Python - A tuple may be any Python object. This is the default when the schema is not explictly or implicitly set.

  • String - Each tuple is a Unicode string.

  • Binary - Each tuple is a blob.

  • Json - Each tuple is a Python dict that can be expressed as a JSON object.

  • Structured - A stream that has a structured schema of a ordered list of attributes, with each attribute having a fixed type (e.g. float64 or int32) and a name. The schema of a structured stream is defined using typed named tuple or StreamSchema.

A stream’s schema is implictly dervied from type hints declared for the callable of the transform that produces it. For example readings defined as follows would have a structured schema matching SensorReading

class SensorReading(typing.NamedTuple):
    sensor_id: str
    ts: int
    reading: float

def reading_from_json(value:dict) -> SensorReading:
    return SensorReading(value['id'], value['timestamp'], value['reading'])

topo = Topology()
json_readings = topo.source(HttpReadings()).as_json()
readings = json_readings.map(reading_from_json)

Deriving schemas from type hints can be disabled by setting the topology’s type_checking attribute to false, for example this would change readings in the previous example to have generic Python object schema Python

topo = Topology()
topo.type_checking = False

Stream processing

Callables

A stream is processed to produce zero or more transformed streams, such as filtering a stream to drop unwanted tuples, producing a stream that only contains the required tuples.

Streaming processing is per tuple based, as each tuple is submitted to a stream consuming operators have their processing logic invoked for that tuple.

A functional operator is declared by methods on Stream such as map() which maps the tuples on its input stream to tuples on its output stream. Stream uses a functional model where each stream processing operator is defined in terms a Python callable that is invoked passing input tuples and whose return defines what output tuples are submitted for downstream processing.

The Python callable used for functional processing in this API may be:

  • A Python lambda function.

  • A Python function.

  • An instance of a Python callable class.

For example a stream words containing only string objects can be processed by a filter() using a lambda function:

# Filter the stream so it only contains words starting with py
pywords = words.filter(lambda word : word.startswith('py'))

When a callable has type hints they are used to:

  • define the schema of the resulting transformation, see Stream.

  • type checking the correctness of the transformation at topology declaration time.

For example if the callable defining the source had type hints that indicated it was an iterator of str objects then the schema of the resultant stream would be String. If this source stream then underwent a Stream.map() transform with a callable that had a type hint for its argument, a check is made to ensure that the type of the argument is compatible with str.

Type hints are maintained through transforms regardless of resultant schema. For example a transform that has a return type hint of int defines the schema as Python, but the type hint is retained even though the schema is generic. Thus an error is raised at topology declaration time if a downstream transformation uses a callable with a type hint that is incompatible with being passed an int.

How type hints are used is specific to each transformation, such as source(), map(), filter() etc.

Type checking can be disabled by setting the topology’s type_checking attribute to false.

When a callable is a lambda or defined inline (defined in the main Python script, a notebook or an interactive session) then a serialized copy of its definition becomes part of the topology. The supported types of captured globals for these callables is limited to avoid increasing the size of the application and serialization failures due non-serializable objects directly or indirectly referenced from captured globals. The supported types of captured globals are constants (int, str, float, bool, bytes, complex), modules, module attributes (e.g. classes, functions and variables defined in a module), inline classes and functions. If a lambda or inline callable causes an exception due to unsupported global capture then moving it to its own module is a solution.

Due to Python bug 36697 a lambda or inline callable can incorrect capture a global variable. For example an inline class using a attribute of self.model will incorrectly capture the global model even if the global variable model is never used within the class. To workaround this bug use attribute or variable names that do not shadow global variables (e.g. self._model).

Due to issue 2336 an inline class using super() will cause an AttributeError at runtime. Workaround is to call the super class’s method directly, for example replace this code:

class A(X):
    def __init__(self):
        super().__init__()

with:

class A(X):
    def __init__(self):
        X.__init__(self)

or move the class to a module.

Stateful operations

Use of a class instance allows the operation to be stateful by maintaining state in instance attributes across invocations.

Note

For support with consistent region or checkpointing instances should ensure that the object’s state can be pickled. See https://docs.python.org/3.5/library/pickle.html#handling-stateful-objects

Initialization and shutdown

Execution of a class instance effectively run in a context manager so that an instance’s __enter__ method is called when the processing element containing the instance is initialized and its __exit__ method called when the processing element is stopped. To take advantage of this the class must define both __enter__ and __exit__ methods.

Note

Since an instance of a class is passed to methods such as map() __init__ is only called when the topology is declared, not at runtime. Initialization at runtime, such as opening connections, occurs through the __enter__ method.

Example of using __enter__ to create custom metrics:

import streamsx.ec as ec

class Sentiment(object):
    def __init__(self):
        pass

    def __enter__(self):
        self.positive_metric = ec.CustomMetric(self, "positiveSentiment")
        self.negative_metric = ec.CustomMetric(self, "negativeSentiment")

    def __exit__(self, exc_type, exc_value, traceback):
        pass

    def __call__(self):
        pass

When an instance defines a valid __exit__ method then it will be called with an exception when:

  • the instance raises an exception during processing of a tuple

  • a data conversion exception is raised converting a value to an structutured schema tuple or attribute

If __exit__ returns a true value then the exception is suppressed and processing continues, otherwise the enclosing processing element will be terminated.

Tuple semantics

Python objects on a stream may be passed by reference between callables (e.g. the value returned by a map callable may be passed by reference to a following filter callable). This can only occur when the functions are executing in the same PE (process). If an object is not passed by reference a deep-copy is passed. Streams that cross PE (process) boundaries are always passed by deep-copy.

Thus if a stream is consumed by two map and one filter callables in the same PE they may receive the same object reference that was sent by the upstream callable. If one (or more) callable modifies the passed in reference those changes may be seen by the upstream callable or the other callables. The order of execution of the downstream callables is not defined. One can prevent such potential non-deterministic behavior by one or more of these techniques:

  • Passing immutable objects

  • Not retaining a reference to an object that will be submitted on a stream

  • Not modifying input tuples in a callable

  • Using copy/deepcopy when returning a value that will be submitted to a stream.

Applications cannot rely on pass-by reference, it is a performance optimization that can be made in some situations when stream connections are within a PE.

Application log and trace

IBM Streams provides application trace and log services which are accesible through standard Python loggers from the logging module.

See Application log and trace.

SPL operators

In addition an application declared by Topology can include stream processing defined by SPL primitive or composite operators. This allows reuse of adapters and analytics provided by IBM Streams, open source and third-party SPL toolkits.

See streamsx.spl.op

Module contents

Module contents

Classes

PendingStream

Pending stream connection.

Routing

Defines how tuples are routed to channels in a parallel region.

Sink

Termination of a Stream.

Stream

The Stream class is the primary abstraction within a streaming application.

SubscribeConnection

Connection mode between a subscriber and matching publishers.

Topology

The Topology class is used to define data sources, and is passed as a parameter when submitting an application.

View

The View class provides access to a continuously updated sampling of data items on a Stream after submission.

Window

Declaration of a window of tuples on a Stream.