streamsx.spl.spl

SPL Python primitive operators.

Overview

SPL primitive operators that call a Python function or class methods are created by decorators provided by this module.

The name of the function or callable class becomes the name of the operator.

A decorated function is a stateless operator while a decorated class is an optionally stateful operator.

These are the supported decorators that create an SPL operator:

  • @spl.source - Creates a source operator that produces tuples.

  • @spl.filter - Creates a operator that filters tuples.

  • @spl.map - Creates a operator that maps input tuples to output tuples.

  • @spl.for_each - Creates a operator that terminates a stream processing each tuple.

  • @spl.primitive_operator - Creates an SPL primitive operator that has an arbitrary number of input and output ports.

Decorated functions and classes must be located in the directory opt/python/streams in the SPL toolkit. Each module in that directory will be inspected for operators during extraction. Each module defines the SPL namespace for its operators by the function spl_namespace, for example:

from streamsx.spl import spl

def spl_namespace():
    return 'com.example.ops'

@spl.map()
def Pass(*tuple_):
    return tuple_

creates a pass-through operator com.example.ops::Pass.

SPL primitive operators are created by executing the extraction script spl-python-extract against the SPL toolkit. Once created the operators become part of the toolkit and may be used like any other SPL operator.

Python classes as SPL operators

Overview

Decorating a Python class creates a stateful SPL operator where the instance fields of the class are the operator’s state. An instance of the class is created when the SPL operator invocation is initialized at SPL runtime. The instance of the Python class is private to the SPL operator and is maintained for the lifetime of the operator.

If the class has instance fields then they are the state of the operator and are private to each invocation of the operator.

If the __init__ method has parameters beyond the first self parameter then they are mapped to operator parameters. Any parameter that has a default value becomes an optional parameter to the SPL operator. Parameters of the form *args and **kwargs are not supported.

Warning

Parameter names must be valid SPL identifers, SPL identifiers start with an ASCII letter or underscore, followed by ASCII letters, digits, or underscores. The name also must not be an SPL keyword.

Parameter names suppress and include are reserved.

The value of the operator parameters at SPL operator invocation are passed to the __init__ method. This is equivalent to creating an instance of the class passing the operator parameters into the constructor.

For example, with this decorated class producing an SPL source operator:

@spl.source()
class Range(object):
  def __init__(self, stop, start=0):
    self.start = start
    self.stop = stop

  def __iter__(self):
      return zip(range(self.start, self.stop))

The SPL operator Range has two parameters, stop is mandatory and start is optional, defaulting to zero. Thus the SPL operator may be invoked as:

// Produces the sequence of values from 0 to 99
//
// Creates an instance of the Python class
// Range using Range(100)
//
stream<int32 seq> R = Range() {
  param
    stop: 100;
}

or both operator parameters can be set:

// Produces the sequence of values from 50 to 74
//
// Creates an instance of the Python class
// Range using Range(75, 50)
//
stream<int32 seq> R = Range() {
  param
    start: 50;
    stop: 75;
}

Operator state

Use of a class allows the operator to be stateful by maintaining state in instance attributes across invocations (tuple processing).

When the operator is in a consistent region or checkpointing then it is serialized using dill. The default serialization may be modified by using the standard Python pickle mechanism of __getstate__ and __setstate__. This is required if the state includes objects that cannot be serialized, for example file descriptors. For details see See https://docs.python.org/3.5/library/pickle.html#handling-stateful-objects .

If the class has __enter__ and __exit__ context manager methods then __enter__ is called after the instance has been deserialized by dill. Thus __enter__ is used to recreate runtime objects that cannot be serialized such as open files or sockets.

Operator initialization & shutdown

Execution of an instance for an operator effectively run in a context manager so that an instance’s __enter__ method is called when the processing element containing the operator is initialized and its __exit__ method called when the processing element is stopped. To take advantage of this the class must define both __enter__ and __exit__ methods.

Note

Initialization such as opening files should be in __enter__ in order to support stateful operator restart & checkpointing.

Example of using __enter__ and __exit__ to open and close a file:

import streamsx.ec as ec

@spl.map()
class Sentiment(object):
    def __init__(self, name):
        self.name = name
        self.file = None

    def __enter__(self):
        self.file = open(self.name, 'r')

    def __exit__(self, exc_type, exc_value, traceback):
        if self.file is not None:
            self.file.close()

    def __call__(self):
        pass

When an instance defines a valid __exit__ method then it will be called with an exception when:

  • the instance raises an exception during processing of a tuple

  • a data conversion exception is raised converting a Python value to an SPL tuple or attribute

If __exit__ returns a true value then the exception is suppressed and processing continues, otherwise the enclosing processing element will be terminated.

Application log and trace

IBM Streams provides application trace and log services which are accesible through standard Python loggers from the logging module.

See Application log and trace.

Python functions as SPL operators

Decorating a Python function creates a stateless SPL operator. In SPL terms this is similar to an SPL Custom operator, where the code in the Python function is the custom code. For operators with input ports the function is called for each input tuple, passing a Python representation of the SPL input tuple. For an SPL source operator the function is called to obtain an iterable whose contents will be submitted to the output stream as SPL tuples.

Operator parameters are not supported.

An example SPL sink operator that prints each input SPL tuple after its conversion to a Python tuple:

@spl.for_each()
def PrintTuple(*tuple_):
    "Print each tuple to standard out."
     print(tuple_, flush=True)

Processing SPL tuples in Python

SPL tuples are converted to Python objects and passed to a decorated callable.

Overview

For each SPL tuple arriving at an input port a Python function is called with the SPL tuple converted to Python values suitable for the function call. How the tuple is passed is defined by the tuple passing style.

Tuple Passing Styles

An input tuple can be passed to Python function using a number of different styles:
  • dictionary

  • tuple

  • attributes by name not yet implemented

  • attributes by position

Dictionary

Passing the SPL tuple as a Python dictionary is flexible and makes the operator independent of any schema. A disadvantage is the reduction in code readability for Python function by not having formal parameters, though getters such as tuple['id'] mitigate that to some extent. If the function is general purpose and can derive meaning from the keys that are the attribute names then **kwargs can be useful.

When the only function parameter is **kwargs (e.g. def myfunc(**tuple_):) then the passing style is dictionary.

All of the attributes are passed in the dictionary using the SPL schema attribute name as the key.

Tuple

Passing the SPL tuple as a Python tuple is flexible and makes the operator independent of any schema but is brittle to changes in the SPL schema. Another disadvantage is the reduction in code readability for Python function by not having formal parameters. However if the function is general purpose and independent of the tuple contents *args can be useful.

When the only function parameter is *args (e.g. def myfunc(*tuple_):) then the passing style is tuple.

All of the attributes are passed as a Python tuple with the order of values matching the order of the SPL schema.

Attributes by name

(not yet implemented)

Passing attributes by name can be robust against changes in the SPL scheme, e.g. additional attributes being added in the middle of the schema, but does require that the SPL schema has matching attribute names.

When attributes by name is used then SPL tuple attributes are passed to the function by name for formal parameters. Order of the attributes and parameters need not match. This is supported for function parameters of kind POSITIONAL_OR_KEYWORD and KEYWORD_ONLY.

If the function signature also contains a parameter of the form **kwargs (VAR_KEYWORD) then any attributes not bound to formal parameters are passed in its dictionary using the SPL schema attribute name as the key.

If the function signature also contains an arbitrary argument list *args then any attributes not bound to formal parameters or to **kwargs are passed in order of the SPL schema.

If there are only formal parameters any non-bound attributes are not passed into the function.

Attributes by position

Passing attributes by position allows the SPL operator to be independent of the SPL schema but is brittle to changes in the SPL schema. For example a function expecting an identifier and a sensor reading as the first two attributes would break if an attribute representing region was added as the first SPL attribute.

When attributes by position is used then SPL tuple attributes are passed to the function by position for formal parameters. The first SPL attribute in the tuple is passed as the first parameter. This is supported for function parameters of kind POSITIONAL_OR_KEYWORD.

If the function signature also contains an arbitrary argument list *args (VAR_POSITIONAL) then any attributes not bound to formal parameters are passed in order of the SPL schema.

The function signature must not contain a parameter of the form **kwargs (VAR_KEYWORD).

If there are only formal parameters any non-bound attributes are not passed into the function.

The SPL schema must have at least the number of positional arguments the function requires.

Selecting the style

For signatures only containing a parameter of the form *args or **kwargs the style is implicitly defined:

  • def f(**tuple_) - dictionary - tuple_ will contain a dictionary of all of the SPL tuple attribute’s values with the keys being the attribute names.

  • def f(*tuple_) - tuple - tuple_ will contain all of the SPL tuple attribute’s values in order of the SPL schema definition.

Otherwise the style is set by the style parameter to the decorator, defaulting to attributes by name. The style value can be set to:

  • 'name' - attributes by name (the default)

  • 'position' - attributes by position

Examples

These examples show how an SPL tuple with the schema and value:

tuple<rstring id, float64 temp, boolean increase>
{id='battery', temp=23.7, increase=true}

is passed into a variety of functions by showing the effective Python call and the resulting values of the function’s parameters.

Dictionary consuming all attributes by **kwargs:

@spl.map()
def f(**tuple_)
    pass
# f({'id':'battery', 'temp':23.7, 'increase': True})
#     tuple_={'id':'battery', 'temp':23.7, 'increase':True}

Tuple consuming all attributes by *args:

@spl.map()
def f(*tuple_)
    pass
# f('battery', 23.7, True)
#     tuple_=('battery',23.7, True)

Attributes by name consuming all attributes:

@spl.map()
def f(id, temp, increase)
    pass
# f(id='battery', temp=23.7, increase=True)
#     id='battery'
#     temp=23.7
#     increase=True

Attributes by name consuming a subset of attributes:

@spl.map()
def f(id, temp)
    pass
# f(id='battery', temp=23.7)
#    id='battery'
#    temp=23.7

Attributes by name consuming a subset of attributes in a different order:

@spl.map()
def f(increase, temp)
    pass
# f(temp=23.7, increase=True)
#    increase=True
#    temp=23.7

Attributes by name consuming id by name and remaining attributes by **kwargs:

@spl.map()
def f(id, **tuple_)
    pass
# f(id='battery', {'temp':23.7, 'increase':True})
#    id='battery'
#    tuple_={'temp':23.7, 'increase':True}

Attributes by name consuming id by name and remaining attributes by *args:

@spl.map()
def f(id, *tuple_)
    pass
# f(id='battery', 23.7, True)
#    id='battery'
#    tuple_=(23.7, True)

Attributes by position consuming all attributes:

@spl.map(style='position')
def f(key, value, up)
     pass
# f('battery', 23.7, True)
#    key='battery'
#    value=23.7
#    up=True

Attributes by position consuming a subset of attributes:

@spl.map(style='position')
def f(a, b)
   pass
# f('battery', 23.7)
#    a='battery'
#    b=23.7

Attributes by position consuming id by position and remaining attributes by *args:

@spl.map(style='position')
def f(key, *tuple_)
    pass
# f('battery', 23.7, True)
#    key='battery'
#    tuple_=(23.7, True)

In all cases the SPL tuple must be able to provide all parameters required by the function. If the SPL schema is insufficient then an error will result, typically an SPL compile time error.

The SPL schema can provide a subset of the formal parameters if the remaining attributes are optional (having a default).

Attributes by name consuming a subset of attributes with an optional parameter not matched by the schema:

@spl.map()
def f(id, temp, pressure=None)
   pass
# f(id='battery', temp=23.7)
#     id='battery'
#     temp=23.7
#     pressure=None

Submission of SPL tuples from Python

The return from a decorated callable results in submission of SPL tuples on the associated outut port.

A Python function must return:
  • None

  • a Python tuple

  • a Python dictionary

  • a list containing any of the above.

None

When None is return then no tuple will be submitted to the operator output port.

Python tuple

When a Python tuple is returned it is converted to an SPL tuple and submitted to the output port.

The values of a Python tuple are assigned to an output SPL tuple by position, so the first value in the Python tuple is assigned to the first attribute in the SPL tuple:

# SPL input schema: tuple<int32 x, float64 y>
# SPL output schema: tuple<int32 x, float64 y, float32 z>
@spl.map(style='position')
def myfunc(a,b):
   return (a,b,a+b)

# The SPL output will be:
# All values explictly set by returned Python tuple
# based on the x,y values from the input tuple
# x is set to: x
# y is set to: y
# z is set to: x+y

The returned tuple may be sparse, any attribute value in the tuple that is None will be set to their SPL default or copied from a matching attribute in the input tuple (same name and type, or same name and same type as the underlying type of an output attribute with an optional type), depending on the operator kind:

# SPL input schema: tuple<int32 x, float64 y>
# SPL output schema: tuple<int32 x, float64 y, float32 z>
@spl.map(style='position')
def myfunc(a,b):
   return (a,None,a+b)

# The SPL output will be:
# x is set to: x (explictly set by returned Python tuple)
# y is set to: y (set by matching input SPL attribute)
# z is set to: x+y

When a returned tuple has fewer values than attributes in the SPL output schema the attributes not set by the Python function will be set to their SPL default or copied from a matching attribute in the input tuple (same name and type, or same name and same type as the underlying type of an output attribute with an optional type), depending on the operator kind:

# SPL input schema: tuple<int32 x, float64 y>
# SPL output schema: tuple<int32 x, float64 y, float32 z>
@spl.map(style='position')
def myfunc(a,b):
   return a,

# The SPL output will be:
# x is set to: x (explictly set by returned Python tuple)
# y is set to: y (set by matching input SPL attribute)
# z is set to: 0 (default int32 value)

When a returned tuple has more values than attributes in the SPL output schema then the additional values are ignored:

# SPL input schema: tuple<int32 x, float64 y>
# SPL output schema: tuple<int32 x, float64 y, float32 z>
@spl.map(style='position')
def myfunc(a,b):
   return (a,b,a+b,a/b)

# The SPL output will be:
# All values explictly set by returned Python tuple
# based on the x,y values from the input tuple
# x is set to: x
# y is set to: y
# z is set to: x+y
#
# The fourth value in the tuple a/b = x/y is ignored.

Python dictionary

A Python dictionary is converted to an SPL tuple for submission to the associated output port. An SPL attribute is set from the dictionary if the dictionary contains a key equal to the attribute name. The value is used to set the attribute, unless the value is None.

If the value in the dictionary is None, or no matching key exists, then the attribute value is set to its SPL default or copied from a matching attribute in the input tuple (same name and type, or same name and same type as the underlying type of an output attribute with an optional type), depending on the operator kind.

Any keys in the dictionary that do not map to SPL attribute names are ignored.

Python list

When a list is returned, each value is converted to an SPL tuple and submitted to the output port, in order of the list starting with the first element (position 0). If the list contains None at an index then no SPL tuple is submitted for that index.

The list must only contain Python tuples, dictionaries or None. The list can contain a mix of valid values.

The list may be empty resulting in no tuples being submitted.

Module contents

Functions

extracting

Is a module being loaded by spl-python-extract.

ignore

Decorator to ignore a Python function.

Classes

PrimitiveOperator

Primitive operator super class.

filter

Decorator that creates a filter SPL operator from a callable class or function.

for_each

Creates an SPL operator with a single input port.

input_port

Declare an input port and its processor method.

map

Decorator to create a map SPL operator from a callable class or function.

primitive_operator

Creates an SPL primitive operator with an arbitrary number of input ports and output ports.

source

Create a source SPL operator from an iterable.