streamsx.service

Streams Jobs as a Cloud Pak for Data Service

Overview

A streams-application service can be used to insert data into and retrieve data from a Streams job. When adding one or more callables using EndpointSource or EndpointSink to your topology application and submitting the application to run as a job, a streams-application service is created.

Exchanging data with the job is done by using a REST API.

The streams-application service instances are included in the Services > Instances page of the Cloud Pak for Data web client. Selecting a service entry in the list opens the REST API documentation for the service.

See also

Streams Jobs as a Service

Resources for Streams developers in the IBM Community.

Module contents

Classes

EndpointSink

Creates a service endpoint to consume data from that stream via REST.

EndpointSource

Creates a service endpoint that produces a stream of data received via REST.

class streamsx.service.EndpointSink(buffer_size=None, consuming_reads=None, service_documentation=None, endpoint_documentation=None)

Bases: streamsx.topology.composite.ForEach

Creates a service endpoint to consume data from that stream via REST.

With this sink the Streams job is enabled as a Cloud Pak for Data service and emits job data using REST API.

Use an instance of this class in for_each() and terminate a stream:

from streamsx.service import EndpointSink
stream.for_each(EndpointSink())

Simple example without service or endpoint documentation:

from streamsx.topology.topology import Topology
from streamsx.topology.context import submit, ContextTypes, ConfigParams, JobConfig
from streamsx.service import EndpointSink
from typing import Iterable, NamedTuple
import itertools, random

class SampleSourceSchema(NamedTuple):
    id: str
    num: int

# Callable of the Source
class SampleSource(object):
    def __call__(self) -> Iterable[SampleSourceSchema]:
        for num in itertools.count(1):
            output_event = SampleSourceSchema(
                id = str(num),
                num = random.randint(0,num)
            )
            yield output_event

topo = Topology('endpoint_sink_sample')

stream1 = topo.source(SampleSource())
stream1.for_each(EndpointSink(buffer_size=50000))

Example with service and endpoint documentation:

service_documentation={
   'title': 'streamsx-sample-endpoint-sink',
   'description': 'NUMBER GENERATOR',
   'version': '1.0.0',
   'externalDocsUrl': 'https://mycompany.com/numgen/doc',
   'externalDocsDescription': 'Number generator documentation'
}
tags = dict()
tag1 = {
   'Output': {
      'description': 'Output tag description',
      'externalDocs': {
         'url': 'https://mycompany.com/numgen/input/doc',
         'description': 'Output tag external doc description'
      }
   }
}
tags.update(tag1)
service_documentation['tags'] = tags

endpoint_documentation = dict()
endpoint_documentation['summary'] = 'Sample endpoint sink'
endpoint_documentation['tags'] = ['Output']
endpoint_documentation['description'] = 'Streams job emits some data with random numbers'

doc_attr = dict()
descr = {'id': {'description': 'IDENTIFIER (incremented by one per tuple)'}}
doc_attr.update(descr)
descr = {'num': {'description': 'RANDOM NUMBER'}}
doc_attr.update(descr)
endpoint_documentation['attributeDescriptions'] = doc_attr

stream1 = topo.source(SampleSource())
stream1.for_each(EndpointSink(
   buffer_size=50000,
   service_documentation=service_documentation,
   endpoint_documentation=endpoint_documentation), name='cpd_endpoint_sink')

New in version 1.18.

buffer_size

Size of the buffer. If the buffer capacity is reached, older tuples are removed to make room for the newer tuples. A warning is returned on an API request if the requested start time is before the oldest tuple in the buffer. The default buffer size is 1000.

Type

int

consuming_reads

Indicates whether tuples should be removed from the endpoint buffer after they have been retuned on a REST API call. The default value is false.

Type

boolean

service_documentation

Content to describe the service. This is set once per application only. Apply a dict containing one or more of the keys: ‘title, ‘version’, ‘description’, ‘externalDocsUrl’, ‘externalDocsDescription’, ‘tags’:

d = {
   'title': <string value>,
   'version': <string value>,
   'description': <string value>,
   'externalDocsUrl': <string value>,
   'externalDocsDescription': <string value>,
   'tags': {<key> {'description': <string value>, 'externalDocs': {'url': <string value>, 'description': <string value>}}, ...}
}
Type

dict

endpoint_documentation

Additional content to be included for an API endpoint to describe the endpoint sink. Apply a dict containing one or more of the keys: ‘summary, ‘tags’, ‘description’, ‘attributeDescriptions’:

d = {
   'summary': <string value>,
   'tags': <array of strings>,
   'description': <string value>,
   'attributeDescriptions': {<key> {'description': <string value>}, ...}
}
Type

dict

Returns

topology_ref:streamsx.topology.topology.Sink: Stream termination.

populate(topology, stream, name, **options)

Populate the topology with this composite for each transformation. Subclasses must implement the populate function. populate is called when the composite is added to the topology with:

sink = input_stream.for_each(myForEachComposite)
Parameters
  • topology – Topology containing the composite map.

  • stream – Stream to be transformed.

  • name – Name passed into for_each.

  • **options – Future options passed to for_each.

Returns

Termination for this composite transformation of stream.

Return type

Sink

class streamsx.service.EndpointSource(schema, buffer_size=None, endpoint_documentation=None, service_documentation=None)

Bases: streamsx.topology.composite.Source

Creates a service endpoint that produces a stream of data received via REST.

With this source the Streams job is enabled as a Cloud Pak for Data service and retrieves job data using REST API.

Use an instance of this class in source() and create a stream:

from streamsx.service import EndpointSource
topo.source(EndpointSource())

Example endpoint that receives data in JSON format:

from streamsx.service import EndpointSource
s = topo.source(EndpointSource(schema=CommonSchema.Json), name='cpd_endpoint_src_json')

Example with structured Stream schema, service and endpoint documentation:

from streamsx.topology.topology import Topology
from streamsx.service import EndpointSource

topo = Topology('endpoint_source_sample')
service_documentation={
   'title': 'streamsx-sample-endpoint-source',
   'description': 'Streams job as service receives data',
   'version': '1.0.0'
}
endpoint_documentation = {
   'summary': 'Sample endpoint source',
   'description': 'CPD job endpoint injects some data'
}
schema = 'tuple<rstring id, int64 number>'
s = topo.source(EndpointSource(
   schema=schema,
   buffer_size=200000,
   service_documentation=service_documentation,
   endpoint_documentation=endpoint_documentation), name='cpd_endpoint_src')

New in version 1.18.

schema

Schema of the source stream.

Type

StreamSchema

buffer_size

Size of the buffer

Type

int

service_documentation

Content to describe the service. This is set once per application only. Apply a dict containing one or more of the keys: ‘title, ‘version’, ‘description’, ‘externalDocsUrl’, ‘externalDocsDescription’, ‘tags’:

d = {
   'title': <string value>,
   'version': <string value>,
   'description': <string value>,
   'externalDocsUrl': <string value>,
   'externalDocsDescription': <string value>,
   'tags': {<key> {'description': <string value>, 'externalDocs': {'url': <string value>, 'description': <string value>}}, ...}
}
Type

dict

endpoint_documentation

Additional content to be included for an API endpoint to describe the endpoint source. Apply a dict containing one or more of the keys: ‘summary, ‘tags’, ‘description’, ‘attributeDescriptions’:

d = {
   'summary': <string value>,
   'tags': <array of strings>,
   'description': <string value>,
   'attributeDescriptions': {<key> {'description': <string value>}, ...}
}
Type

dict

Returns

Stream.

populate(topology, name, **options)

Populate the topology with this composite source. Subclasses must implement the populate function. populate is called when the composite is added to the topology with:

topo = Topology()
source_stream = topo.source(mySourceComposite)
Parameters
  • topology – Topology containing the source.

  • name – Name passed into source.

  • **options – Future options passed to source.

Returns

Single stream representing the source.

Return type

Stream