streamsx.spl.op

Integration of SPL operators.

Invoking SPL Operators

IBM Streams supports Stream Processing Language (SPL), a domain specific language for streaming analytics. SPL creates an application by building a graph of operator invocations. These operators are declared in an SPL toolkit.

SPL streams have a structured schema, such as tuple<rstring id, timestamp ts, float64 value> for a sensor reading with a sensor identifier, timestamp and value. A schema is defined using StreamSchema.

A Python topology application can take advantage of SPL operators by using streams with structured schemas. A stream of Python objects can be converted to a structured stream using map() with the schema parameter set:

# s is stream of Python objects representing a sensor
s = ...

# map s to a structured stream using a lambda function
# for each sensor reading r a Python tuple is created
# with the required values matching the order of the
# structured schema.
s2 = s.map(lambda r : (r.sensor_id, r.reading_time, r.reading),
     schema='tuple<rstring id, timestamp ts, float64 value>'

An SPL operator is invoked in an application by creating an instance of:

  • Invoke - Invocation of an arbitrary SPL operator.

  • Source - Invocation of an SPL source operator with one input port.

  • Map - Invocation of an SPL map operator with one input port and one output port.

  • Sink - Invocation of an SPL sink operator with one output port.

In SPL, operator invocation supports a number of clauses that are supported in Python.

Values for operator clauses

When an operator clause requires a value, the value may be passed as a constant, an input attribute (passed using the attribute method of the invocation), or an arbitrary SPL expression (passed as a string or an Expression). Because a string is interpreted as an SPL expression, a string constant should be passed by enclosing the quoted string in outer quotes (for example, ‘“a string constant”’).

SPL is strictly typed so when passing a constant as a value the value may need to be strongly typed.

  • bool, int, float and str values map automatically to SPL boolean, int32, float64 and rstring respectively.

  • Enum values map to an operator custom literal using the symbolic name of the value. For custom literals only the symbolic name needs to match a value expected by the operator, the class name and other values are arbitrary.

  • The module streamsx.spl.types provides functions to create typed SPL expressions from values.

An optional type may be set to SPL null by passing either Python None or the value returned from null().

Param clause

Operator parameterization is through operator parameters that configure and modify the operator for the specific application.

Parameters are passed as a dict containing the parameter names and their values (see Values for operator clauses).

Examples

To invoke a Beacon operator from the SPL standard toolkit producing 100 tuples at the rate of two per second:

schema = StreamSchema('tuple<uint64 seq>')
beacon = op.Source(topology, 'spl.utility::Beacon', schema,
    params = {'iterations':100, 'period':0.5})

To use an IntEnum to pass a custom literal to the Parse operator:

from enum import IntEnum

class DataFormats(IntEnum):
    csv = 0
    txt = 1

...

params['format'] = DataFormats.csv

To create a count parameter of type uint64 for the SPL DeDuplicate operator:

params['count'] = streamsx.spl.types.uint64(20)

After the instance representing the operator invocation has been created, additional parameters may be added through the params attribute. If the value is an expression that is only valid in the context of the operator invocation then the parameter must be added after the operator invocation has been created.

For example, the Filter operator uses an expression that is usually dependent on the context, filtering tuples based upon their attribute values:

fs = op.Map('spl.relational::Filter', beacon)
fs.params['filter'] = fs.expression('seq % 2ul == 0ul')

Output clause

The operator output clause defines the values of attributes on outgoing tuples on the operator invocation’s output ports.

When a tuple is submitted by an operator invocation each of its attributes is set in one of three ways:

  • By the operator based upon its state and input tuples. For example, a US ZIP code operator might set the zipcode attribute based upon its lookup of the ZIP code from the address details in the input tuple.

  • By the operator implicitly setting output attributes from matching input attributes when those attributes have not been explicitly set elsewhere. Many streaming operators implicitly set output attributes to allow attributes to flow through the operator without any explicit coding. This only occurs when an output attribute is not explicitly set by the operator, or the output clause, and the input tuple has an attribute that matches the output attribute (same name and type, or same name and same type as the underlying type of an output attribute with an optional type). For example, in the US ZIP code operator, if the output tuple included attributes of rstring city, rstring state that matched input attributes, then they would be implicitly copied from the input tuple to the output tuple.

  • By an output clause in the operator invocation. In this case the application invoking the operator is explicitly setting attributes using SPL expressions. An operator may provide output functions that return values based upon the operator’s state and input tuples. For example, the US ZIP code operator might provide a ZIPCode() output function rather than explicitly setting an output attribute. Then the application is free to use any attribute name to represent the ZIP code in its output tuple.

In Python an output tuple attribute is set by creating an attribute in the operator invocation instance that is set to a return from the output method. The attribute value passed to the output method is passed as described in Values for operator clauses.

For example, invoking an SPL Beacon operator using an output function to set the sequence number of a tuple and an SPL expression to set the timestamp:

schema = StreamSchema('tuple<uint64 seq, timestamp ts>')
beacon = op.Source(topology, 'spl.utility::Beacon', schema, params = {'period':0.1})

# Set the seq attribute using an output function provided by Beacon
beacon.seq = beacon.output('IterationCount()')

# Set the ts attribute using an SPL function that returns the current time
beacon.ts = beacon.output('getTimestamp()')

See also

Streams Processing Language (SPL) Reference

Reference documentation.

Developing Streams applications

Developing Streams applications.

Operator invocations

Operator invocations from the SPL reference documentation.

Module contents

Functions

main_composite

Wrap a main composite invocation as a Topology.

Classes

Expression

An SPL expression.

Invoke

Declaration of an invocation of an SPL operator in a Topology.

Map

Declaration of an invocation of an SPL map operator.

Sink

Declaration of an invocation of an SPL sink operator.

Source

Declaration of an invocation of an SPL source operator.