streamsx.topology.composite

Composite transformations.

New in version 1.14.

Module contents

Classes

Composite

Composite transformations support a single logical transformation being a composite of one or more basic transformations.

ForEach

Abstract composite for each transformation.

Map

Abstract composite map transformation.

Source

Abstract composite source.

class streamsx.topology.composite.Composite

Bases: abc.ABC

Composite transformations support a single logical transformation being a composite of one or more basic transformations. Composites encapsulate complex transformations for being re-used.

A composite transformation is implemented as a sub-class of Source, Map or ForEach whose populate method populates the topology with the required basic transformations. For example a Source composite might have use source() followed by a filter() to filter out unwanted events and then a map() to parse the event into a structured schema.

Composites may use other composites during populate. The populate function implements the specific transformations of a composite.

Composites can control how the basic transformations are visually represented. By default any transformations within a composite are grouped visually. A composite may alter this using these attributes of the composite instance:

  • kind - Sets the name of operator kind for a group or single operator. Defaults to a combination of the module and class name of the composite, e.g. streamsx.standard.utility::Sequence. Set to a false value to disable any modification of the visual representation of the composite’s transformations.

  • group - Set to a false value to disable any grouping of multiple transformations. Defaults to True to enable grouping.

The values of kind and group are checked after the expansion of the composite using populate.

class streamsx.topology.composite.Source

Bases: streamsx.topology.composite.Composite

Abstract composite source.

An instance of a subclass can be passed to source() to create a source stream that is composed of one or more basic transformations, which must be implemented by the populate() method of the subclass.

Example assuming RawTweets is Python iterable that produces raw tweets:

class Tweets(streamsx.topology.composite.Source):
    def __init__(self, track):
        self.track = track

    def populate(self, topology, name, **options):
        # get all the tweets
        tweets = topology.source(RawTweets(track=self.track), name=name)
        # filter so that only with a message are returned
        return tweets.filter(lambda tweet : tweet['text'])

This class can then be used as follows:

topo = Topology()
gf_tweets = topo.source(Tweets(track=['glutenfree', 'gf']))
abstract populate(topology, name, **options)

Populate the topology with this composite source. Subclasses must implement the populate function. populate is called when the composite is added to the topology with:

topo = Topology()
source_stream = topo.source(mySourceComposite)
Parameters
  • topology (Topology) – Topology containing the source.

  • name (Optional[str]) – Name passed into source.

  • **options – Future options passed to source.

Returns

Single stream representing the source.

Return type

Stream

class streamsx.topology.composite.Map

Bases: streamsx.topology.composite.Composite

Abstract composite map transformation.

An instance of a subclass can be passed to map() to create a stream that is composed of one or more basic transformations of an input stream, which must be implemented by the populate() method of the subclass.

Example:

class WordCount(streamsx.topology.composite.Map):
    def __init__(self, period, update):
        self.period = period
        self.update = update

    def populate(self, topology, stream, schema, name, **options):
        words = stream.flat_map(lambda line : line.split())
        win = words.last(size=self.period).trigger(self.update).partition(lambda s : s)
        return win.aggregate(lambda values : (values[0], len(values)))
abstract populate(topology, stream, schema, name, **options)

Populate the topology with this composite map transformation. Subclasses must implement the populate function. populate is called when the composite is added to the topology with:

transformed_stream = input_stream.map(myMapComposite)
Parameters
  • topology (Topology) – Topology containing the composite map.

  • stream (Stream) – Stream to be transformed.

  • schema (Union[StreamSchema, CommonSchema, str, NamedTuple]) – Schema passed into map.

  • name (Optional[str]) – Name passed into map.

  • **options – Future options passed to map.

Returns

Single stream representing the transformation of stream.

Return type

Stream

class streamsx.topology.composite.ForEach

Bases: streamsx.topology.composite.Composite

Abstract composite for each transformation.

An instance of a subclass can be passed to for_each() to create a sink (stream termination) that is composed of one or more basic transformations of an input stream. These transformations and the sink function must be implemented by the populate() method of the subclass.

abstract populate(topology, stream, name, **options)

Populate the topology with this composite for each transformation. Subclasses must implement the populate function. populate is called when the composite is added to the topology with:

sink = input_stream.for_each(myForEachComposite)
Parameters
  • topology (Topology) – Topology containing the composite map.

  • stream (Stream) – Stream to be transformed.

  • name (Optional[str]) – Name passed into for_each.

  • **options – Future options passed to for_each.

Returns

Termination for this composite transformation of stream.

Return type

Sink