streamsx.topology.topology¶
Streaming application definition.
Overview¶
IBM Streams is an advanced analytic platform that allows user-developed applications to quickly ingest, analyze and correlate information as it arrives from thousands of real-time sources. Streams can handle very high data throughput rates, millions of events or messages per second.
With this API Python developers can build streaming applications that can be executed using IBM Streams, including the processing being distributed across multiple computing resources (hosts or machines) for scalability.
Topology¶
A Topology
declares a graph of streams and operations against
tuples (data items) on those streams.
After being declared, a Topology is submitted to be compiled into a Streams application bundle (sab file) and then executed. The sab file is a self contained bundle that can be executed in a distributed Streams instance either using the Streaming Analytics service on IBM Cloud or an on-premise IBM Streams installation.
The compilation step invokes the Streams compiler to produce a bundle. This effectively, from a Python point of view, produces a runnable version of the Python topology that includes application specific Python C extensions to optimize performance.
The bundle also includes any required Python packages or modules
that were used in the declaration of the application, excluding
ones that are in a directory path containing site-packages
.
The Python standard package tool pip
uses a directory structure
including site-packages
when installing packages. Packages installed
with pip
can be included in the bundle with
add_pip_package()
when using a build service.
This avoids the requirement to have packages be preinstalled in cloud environments.
Local Python packages and modules containing callables used in transformations
such as map()
are copied into the bundle from their
local location. The addition of local packages to the bundle can be controlled
with Topology.include_packages
and
Topology.exclude_packages
.
The Streams runtime distributes the application’s operations across the resources available in the instance.
Note
Topology represents a declaration of a streaming application that will be executed by a Streams instance as a job, either using the Streaming Analytics service on IBM Cloud or an on-premises distributed instance. Topology does not represent a running application, so an instance of Stream class does not contain the tuples, it is only a declaration of a stream.
Stream¶
A Stream
can be an infinite sequence of tuples, such as a stream for a traffic flow sensor.
Alternatively, a stream can be finite, such as a stream that is created from the contents of a file.
When a streams processing application contains infinite streams, the application runs continuously without ending.
A stream has a schema that defines the type of each tuple on the stream. The schema for a stream is either:
Python
- A tuple may be any Python object. This is the default when the schema is not explictly or implicitly set.String
- Each tuple is a Unicode string.Binary
- Each tuple is a blob.Json
- Each tuple is a Python dict that can be expressed as a JSON object.Structured - A stream that has a structured schema of a ordered list of attributes, with each attribute having a fixed type (e.g. float64 or int32) and a name. The schema of a structured stream is defined using typed named tuple or
StreamSchema
.
A stream’s schema is implictly dervied from type hints declared for the callable
of the transform that produces it. For example readings defined as follows would have a structured schema matching SensorReading
class SensorReading(typing.NamedTuple):
sensor_id: str
ts: int
reading: float
def reading_from_json(value:dict) -> SensorReading:
return SensorReading(value['id'], value['timestamp'], value['reading'])
topo = Topology()
json_readings = topo.source(HttpReadings()).as_json()
readings = json_readings.map(reading_from_json)
Deriving schemas from type hints can be disabled by setting the topology’s
type_checking
attribute to false, for example this would change readings
in the previous example to have generic Python object schema Python
topo = Topology()
topo.type_checking = False
Stream processing¶
Callables¶
A stream is processed to produce zero or more transformed streams, such as filtering a stream to drop unwanted tuples, producing a stream that only contains the required tuples.
Streaming processing is per tuple based, as each tuple is submitted to a stream consuming operators have their processing logic invoked for that tuple.
A functional operator is declared by methods on Stream
such as map()
which
maps the tuples on its input stream to tuples on its output stream. Stream uses a functional model
where each stream processing operator is defined in terms a Python callable that is invoked passing
input tuples and whose return defines what output tuples are submitted for downstream processing.
The Python callable used for functional processing in this API may be:
A Python lambda function.
A Python function.
An instance of a Python callable class.
For example a stream words
containing only string objects can be
processed by a filter()
using a lambda function:
# Filter the stream so it only contains words starting with py
pywords = words.filter(lambda word : word.startswith('py'))
When a callable has type hints they are used to:
define the schema of the resulting transformation, see Stream.
type checking the correctness of the transformation at topology declaration time.
For example if the callable defining the source had type hints that indicated
it was an iterator of str
objects then the schema of the resultant stream
would be String
. If this
source stream then underwent a Stream.map()
transform with a callable
that had a type hint for its argument, a check is made to ensure
that the type of the argument is compatible with str
.
Type hints are maintained through transforms regardless of resultant schema.
For example a transform that has a return type hint of int
defines
the schema as Python
,
but the type hint is retained even though the schema is generic. Thus an
error is raised at topology declaration time if a downstream transformation
uses a callable with a type hint that is incompatible with being passed an int
.
How type hints are used is specific to each transformation, such as
source()
, map()
, filter()
etc.
Type checking can be disabled by setting the topology’s type_checking
attribute to false.
When a callable is a lambda or defined inline (defined in the main Python script,
a notebook or an interactive session) then a serialized copy of its definition becomes part of the
topology. The supported types of captured globals for these callables is limited to
avoid increasing the size of the application and serialization failures due non-serializable
objects directly or indirectly referenced from captured globals. The supported types of captured globals
are constants (int
, str
, float
, bool
, bytes
, complex
), modules, module attributes (e.g. classes, functions and variables
defined in a module), inline classes and functions. If a lambda or inline callable causes an exception due to unsupported global
capture then moving it to its own module is a solution.
Due to Python bug 36697 a lambda or inline callable can
incorrect capture a global variable. For example an inline class using a attribute of self.model
will incorrectly capture the global model
even if the global variable model
is never used within the class.
To workaround this bug use attribute or variable names that do not shadow global variables
(e.g. self._model
).
Due to issue 2336 an inline class using super()
will cause an AttributeError
at runtime. Workaround is to call the super class’s method directly, for example replace this code:
class A(X):
def __init__(self):
super().__init__()
with:
class A(X):
def __init__(self):
X.__init__(self)
or move the class to a module.
Stateful operations¶
Use of a class instance allows the operation to be stateful by maintaining state in instance attributes across invocations.
Note
For support with consistent region or checkpointing instances should ensure that the object’s state can be pickled. See https://docs.python.org/3.5/library/pickle.html#handling-stateful-objects
Initialization and shutdown¶
Execution of a class instance effectively run in a context manager so that an instance’s __enter__
method is called when the processing element containing the instance is initialized
and its __exit__
method called when the processing element is stopped. To take advantage of this
the class must define both __enter__
and __exit__
methods.
Note
Since an instance of a class is passed to methods such as
map()
__init__
is only called when the topology is declared, not at runtime.
Initialization at runtime, such as opening connections, occurs through the __enter__
method.
Example of using __enter__
to create custom metrics:
import streamsx.ec as ec
class Sentiment(object):
def __init__(self):
pass
def __enter__(self):
self.positive_metric = ec.CustomMetric(self, "positiveSentiment")
self.negative_metric = ec.CustomMetric(self, "negativeSentiment")
def __exit__(self, exc_type, exc_value, traceback):
pass
def __call__(self):
pass
When an instance defines a valid __exit__
method then it will be called with an exception when:
the instance raises an exception during processing of a tuple
a data conversion exception is raised converting a value to an structutured schema tuple or attribute
If __exit__
returns a true value then the exception is suppressed and processing continues, otherwise the enclosing processing element will be terminated.
Tuple semantics¶
Python objects on a stream may be passed by reference between callables (e.g. the value returned by a map callable may be passed by reference to a following filter callable). This can only occur when the functions are executing in the same PE (process). If an object is not passed by reference a deep-copy is passed. Streams that cross PE (process) boundaries are always passed by deep-copy.
Thus if a stream is consumed by two map and one filter callables in the same PE they may receive the same object reference that was sent by the upstream callable. If one (or more) callable modifies the passed in reference those changes may be seen by the upstream callable or the other callables. The order of execution of the downstream callables is not defined. One can prevent such potential non-deterministic behavior by one or more of these techniques:
Passing immutable objects
Not retaining a reference to an object that will be submitted on a stream
Not modifying input tuples in a callable
Using copy/deepcopy when returning a value that will be submitted to a stream.
Applications cannot rely on pass-by reference, it is a performance optimization that can be made in some situations when stream connections are within a PE.
Application log and trace¶
IBM Streams provides application trace and log services which are accesible through standard Python loggers from the logging module.
SPL operators¶
In addition an application declared by Topology can include stream processing defined by SPL primitive or composite operators. This allows reuse of adapters and analytics provided by IBM Streams, open source and third-party SPL toolkits.
See streamsx.spl.op
Module contents¶
Module contents¶
Classes
|
Pending stream connection. |
|
Defines how tuples are routed to channels in a parallel region. |
|
Termination of a Stream. |
|
The Stream class is the primary abstraction within a streaming application. |
|
Connection mode between a subscriber and matching publishers. |
|
The Topology class is used to define data sources, and is passed as a parameter when submitting an application. |
|
The View class provides access to a continuously updated sampling of data items on a |
|
Declaration of a window of tuples on a Stream. |