streamsx.topology.context

Context for submission and build of topologies.

Module contents

Functions

build

Build a topology to produce a Streams application bundle.

run

Run a topology in a distributed Streams instance.

submit

Submits a Topology (application) using the specified context type.

Classes

ConfigParams

Configuration options which may be used as keys in submit() config parameter.

ContextTypes

Submission context types.

JobConfig

Job configuration.

SubmissionResult

Passed back to the user after a call to submit.

class streamsx.topology.context.ContextTypes

Bases: object

Submission context types.

A Topology is submitted using submit() and a context type. Submision of a Topology generally builds the application into a Streams application bundle (sab) file and then submits it for execution in the required context.

The Streams application bundle contains all the artifacts required by an application such that it can be executed remotely (e.g. on a Streaming Analytics service), including distributing the execution of the application across multiple resources (hosts).

The context type defines which context is used for submission.

The main context types result in a running application and are:

  • STREAMING_ANALYTICS_SERVICE - Application is submitted to a Streaming Analytics service running on IBM Cloud.

  • DISTRIBUTED - Application is submitted to an IBM Streams instance.

  • STANDALONE - Application is executed as a local process, IBM Streams standalone application. Typically this is used during development or testing.

The BUNDLE context type compiles the application (Topology) to produce a Streams application bundle (sab file). The bundle is not executed but may subsequently be submitted to a Streaming Analytics service or an IBM Streams instance. A bundle may be submitted multiple times to services or instances, each resulting in a unique job (running application).

BUILD_ARCHIVE = 'BUILD_ARCHIVE'

Creates a build archive.

This context type produces the intermediate code archive used for bundle creation.

Note

BUILD_ARCHIVE is typically only used when diagnosing issues with bundle generation.

BUNDLE = 'BUNDLE'

Create a Streams application bundle.

The Topology is compiled to produce Streams application bundle (sab file).

The resultant application can be submitted to:
  • Streaming Analytics service using the Streams console or the Streaming Analytics REST api.

  • IBM Streams instance using the Streams console, JMX api or command line streamtool submitjob.

  • Executed standalone for development or testing.

The bundle must be built on the same operating system version and architecture as the intended running environment. For Streaming Analytics service this is currently RedHat/CentOS 7 and x86_64 architecture.

IBM Cloud Pak for Data integated configuration

Projects (within cluster)

The Topology is compiled using the Streams build service for a Streams service instance running in the same Cloud Pak for Data cluster as the Jupyter notebook or script declaring the application.

The instance is specified in the configuration passed into submit(). The code that selects a service instance by name is:

from icpd_core import icpd_util
cfg = icpd_util.get_service_instance_details(name='instanceName', instance_type="streams")

topo = Topology()
...
submit(ContextTypes.BUNDLE, topo, cfg)

The resultant cfg dict may be augmented with other values such as keys from ConfigParams.

External to cluster or project

The Topology is compiled using the Streams build service for a Streams service instance running in Cloud Pak for Data.

Environment variables:

These environment variables define how the application is built and submitted.

  • CP4D_URL - Cloud Pak for Data deployment URL, e.g. https://cp4d_server:31843

  • STREAMS_INSTANCE_ID - Streams service instance name.

  • STREAMS_USERNAME - (optional) User name to submit the job as, defaulting to the current operating system user name.

  • STREAMS_PASSWORD - Password for authentication.

IBM Cloud Pak for Data standalone configuration

The Topology is compiled using the Streams build service.

Environment variables:

These environment variables define how the application is built.

  • STREAMS_BUILD_URL - Streams build service URL, e.g. when the service is exposed as node port: https://<NODE-IP>:<NODE-PORT>

  • STREAMS_USERNAME - (optional) User name to submit the job as, defaulting to the current operating system user name.

  • STREAMS_PASSWORD - Password for authentication.

IBM Streams on-premise 4.2 & 4.3

The Topology is compiled using a local IBM Streams installation.

Environment variables:

These environment variables define how the application is built.

  • STREAMS_INSTALL - Location of a local IBM Streams installation.

DISTRIBUTED = 'DISTRIBUTED'

Submission to an IBM Streams instance.

IBM Cloud Pak for Data integated configuration

Projects (within cluster)

The Topology is compiled using the Streams build service and submitted to an Streams service instance running in the same Cloud Pak for Data cluster as the Jupyter notebook or script declaring the application.

The instance is specified in the configuration passed into submit(). The code that selects a service instance by name is:

from icpd_core import icpd_util
cfg = icpd_util.get_service_instance_details(name='instanceName', instance_type="streams")

topo = Topology()
...
submit(ContextTypes.DISTRIBUTED, topo, cfg)

The resultant cfg dict may be augmented with other values such as a JobConfig or keys from ConfigParams.

External to cluster or project

The Topology is compiled using the Streams build service and submitted to a Streams service instance running in Cloud Pak for Data.

Environment variables:

These environment variables define how the application is built and submitted.

  • CP4D_URL - Cloud Pak for Data deployment URL, e.g. https://cp4d_server:31843

  • STREAMS_INSTANCE_ID - Streams service instance name.

  • STREAMS_USERNAME - (optional) User name to submit the job as, defaulting to the current operating system user name.

  • STREAMS_PASSWORD - Password for authentication.

IBM Cloud Pak for Data standalone configuration

The Topology is compiled using the Streams build service and submitted to a Streams service instance using REST apis.

Environment variables:

These environment variables define how the application is built and submitted.

  • STREAMS_BUILD_URL - Streams build service URL, e.g. when the service is exposed as node port: https://<NODE-IP>:<NODE-PORT>

  • STREAMS_REST_URL - Streams SWS service (REST API) URL, e.g. when the service is exposed as node port: https://<NODE-IP>:<NODE-PORT>

  • STREAMS_USERNAME - (optional) User name to submit the job as, defaulting to the current operating system user name.

  • STREAMS_PASSWORD - Password for authentication.

IBM Streams on-premise 4.2 & 4.3

The Topology is compiled locally and the resultant Streams application bundle (sab file) is submitted to an IBM Streams instance.

Environment variables:

These environment variables define how the application is built and submitted.

  • STREAMS_INSTALL - Location of a IBM Streams installation (4.2 or 4.3).

  • STREAMS_DOMAIN_ID - Domain identifier for the Streams instance.

  • STREAMS_INSTANCE_ID - Instance identifier.

  • STREAMS_ZKCONNECT - (optional) ZooKeeper connection string for domain (when not using an embedded ZooKeeper)

  • STREAMS_USERNAME - (optional) User name to submit the job as, defaulting to the current operating system user name.

Warning

streamtool is used to submit the job with on-premise 4.2 & 4.3 Streams and requires that streamtool does not prompt for authentication. This is achieved by using streamtool genkey.

EDGE = 'EDGE'

Submission to build service running on IBM Cloud Pak for Data to create an image for Edge.

The Topology is compiled and the resultant Streams application bundle (sab file) is added to an image for Edge.

IBM Cloud Pak for Data integated configuration

Projects (within cluster)

The Topology is compiled using the Streams build service for a Streams service instance running in the same Cloud Pak for Data cluster as the Jupyter notebook or script declaring the application.

The instance is specified in the configuration passed into submit(). The code that selects a service instance by name is:

from streamsx.topology.context import submit, ContextTypes
from icpd_core import icpd_util
cfg = icpd_util.get_service_instance_details(name='instanceName', instance_type="streams")

topo = Topology()
...
submit(ContextTypes.EDGE, topo, cfg)

The resultant cfg dict may be augmented with other values such as keys from ConfigParams or JobConfig. For example, apply imageName and imageTag:

from streamsx.topology.context import submit, ContextTypes, JobConfig
from icpd_core import icpd_util
cfg = icpd_util.get_service_instance_details(name='instanceName', instance_type="streams")

topo = Topology()
...
jc = JobConfig()
jc.raw_overlay = {'edgeConfig': {'imageName':'py-sample-app', 'imageTag':'v1.0'}}
jc.add(cfg)

submit(ContextTypes.EDGE, topo, cfg)

External to cluster or project

The Topology is compiled using the Streams build service for a Streams service instance running in Cloud Pak for Data.

Environment variables:

These environment variables define how the application is built and submitted.

  • CP4D_URL - Cloud Pak for Data deployment URL, e.g. https://cp4d_server:31843

  • STREAMS_INSTANCE_ID - Streams service instance name.

  • STREAMS_USERNAME - (optional) User name to submit the job as, defaulting to the current operating system user name.

  • STREAMS_PASSWORD - Password for authentication.

Example code to query the base images:

from streamsx.build import BuildService

bs = BuildService.of_endpoint(verify=False)
baseImages = bs.get_base_images()
print('# images = ' + str(len(baseImages)))
for i in baseImages:
    print(i.id)
    print(i.registry)

Example code to select a base image for the image build:

from streamsx.topology.context import submit, ContextTypes, JobConfig
topo = Topology()
...
jc = JobConfig()
jc.raw_overlay = {'edgeConfig': {'imageName':'py-sample-app', 'imageTag':'v1.0', 'baseImage':'streams-base-edge-conda-el7:5.3.0.0'}}
jc.add(cfg)

submit(ContextTypes.EDGE, topo, cfg)

EDGE configuration

The dict edgeConfig supports the following fields that are used for the image creation:

  • imageName - [str] name of the image

  • imageTag - [str] name of the image tag

  • baseImage - [str] identify the name of the base image

  • pipPackages - [list] identify one or more Python install packages that are to be included in the image.

  • condaPackages - [list] identify one or more anaconda packages that are to be included in the image.

  • rpms - [list] identify one or more linux RPMs that are to be included in the image

  • locales - [list] identify one or more locales that are to be included in the image. The first item in the list is the “default” locale. The locales are identified in the java format  <language>_<county>_<variant>. Example: “en_US”

Example with adding pip packages and rpms:

jc.raw_overlay = {'edgeConfig': {'imageName': image_name, 'imageTag': image_tag, 'pipPackages':['pandas','numpy'], 'rpms':['atlas-devel']}}
EDGE_BUNDLE = 'EDGE_BUNDLE'

Creates a Streams application bundle.

The Topology is compiled on build service running on IBM Cloud Pak for Data and the resultant Streams application bundle (sab file) is downloaded.

Note

EDGE_BUNDLE is typically only used when diagnosing issues with applications for EDGE.

STANDALONE = 'STANDALONE'

Build and execute locally.

Compiles and executes the Topology locally in IBM Streams standalone mode as a separate sub-process. Typically used for devlopment and testing.

The call to submit() return when (if) the application completes. An application completes when it has finite source streams and all tuples from those streams have been processed by the complete topology. If the source streams are infinite (e.g. reading tweets) then the standalone application will not complete.

Environment variables:

This environment variables define how the application is built.

  • STREAMS_INSTALL - Location of a IBM Streams installation (4.0.1 or later).

STREAMING_ANALYTICS_SERVICE = 'STREAMING_ANALYTICS_SERVICE'

Submission to Streaming Analytics service running on IBM Cloud.

The Topology is compiled and the resultant Streams application bundle (sab file) is submitted for execution on the Streaming Analytics service.

When STREAMS_INSTALL is not set or the submit() config parameter has FORCE_REMOTE_BUILD set to True the compilation of the application occurs remotely by the service. This allows creation and submission of Streams applications without a local install of IBM Streams.

When STREAMS_INSTALL is set and the submit() config parameter has FORCE_REMOTE_BUILD set to False or not set then the creation of the Streams application bundle occurs locally and the bundle is submitted for execution on the service.

Environment variables:

These environment variables define how the application is built and submitted.

  • STREAMS_INSTALL - (optional) Location of a IBM Streams installation (4.0.1 or later). The install must be running on RedHat/CentOS 7 and x86_64 architecture.

TOOLKIT = 'TOOLKIT'

Creates an SPL toolkit.

Topology applications are implemented as an SPL application before compilation into an Streams application bundle. This context type produces the intermediate SPL toolkit that is input to the SPL compiler for bundle creation.

Note

TOOLKIT is typically only used when diagnosing issues with bundle generation.

class streamsx.topology.context.ConfigParams

Bases: object

Configuration options which may be used as keys in submit() config parameter.

FORCE_REMOTE_BUILD = 'topology.forceRemoteBuild'

Force a remote build of the application.

When submitting to STREAMING_ANALYTICS_SERVICE a local build of the Streams application bundle will occur if the environment variable STREAMS_INSTALL is set. Setting this flag to True ignores the local Streams install and forces the build to occur remotely using the service.

JOB_CONFIG = 'topology.jobConfigOverlays'

Key for a JobConfig object representing a job configuration for a submission.

SC_OPTIONS = 'topology.sc.options'

Options to be passed to IBM Streams sc command.

A topology is compiled into a Streams application bundle (sab) using the SPL compiler sc.

Additional options to be passed to sc may be set using this key. The value can be a single string option (e.g. --c++std=c++11 to select C++ 11 compilation) or a list of strings for multiple options.

Setting sc options may be required when invoking SPL operators directly or testing SPL applications.

Warning

Options that modify the requested submission context (e.g. setting a different main composite) or deprecated options should not be specified.

New in version 1.12.10.

SERVICE_DEFINITION = 'topology.service.definition'

Streaming Analytics service definition. Identifies the Streaming Analytics service to use. The definition can be one of

  • The service credentials copied from the Service credentials page of the service console (not the Streams console). Credentials are provided in JSON format. They contain such as the API key and secret, as well as connection information for the service.

  • A JSON object (dict) created from the service credentials, for example with json.loads(service_credentials)

  • A JSON object (dict) of the form: { "type": "streaming-analytics", "name": "service name", "credentials": ... } with the service credentials as the value of the credentials key. The value of the credentials key can be a JSON object (dict) or a str copied from the Service credentials page of the service console.

This key takes precedence over VCAP_SERVICES and SERVICE_NAME.

SERVICE_NAME = 'topology.service.name'

Streaming Analytics service name.

Selects the specific Streaming Analytics service from VCAP service definitions defined by the the environment variable VCAP_SERVICES or the key VCAP_SERVICES in the submit config.

SSL_VERIFY = 'topology.SSLVerify'

Key for the SSL verification value passed to requests as its verify option for distributed contexts. By default set to True.

Note

Only True or False is supported. Behaviour is undefined when passing a path to a CA_BUNDLE file or directory with certificates of trusted CAs.

New in version 1.11.

STREAMS_CONNECTION = 'topology.streamsConnection'

Key for a StreamsConnection object for connecting to a running IBM Streams instance. Only supported for Streams 4.2, 4.3. Requires environment variable STREAMS_INSTANCE_ID to be set.

VCAP_SERVICES = 'topology.service.vcap'

Streaming Analytics service definitions including credentials in VCAP_SERVICES format.

Provides the connection credentials when connecting to a Streaming Analytics service using context type STREAMING_ANALYTICS_SERVICE. The streaming-analytics service to use within the service definitions is identified by name using SERVICE_NAME.

The key overrides the environment variable VCAP_SERVICES.

The value can be:
  • Path to a local file containing a JSON representation of the VCAP services information.

  • Dictionary containing the VCAP services information.

See also

VCAP services

class streamsx.topology.context.JobConfig(job_name=None, job_group=None, preload=False, data_directory=None, tracing=None)

Bases: object

Job configuration.

JobConfig allows configuration of job that will result from submission of a Topology (application).

A JobConfig is set in the config dictionary passed to submit() using the key JOB_CONFIG. add() exists as a convenience method to add it to a submission configuration.

A JobConfig can also be used when submitting a Streams application bundle through the Streaming Analytics REST API method submit_job().

Parameters
  • job_name (str) – The name that is assigned to the job. A job name must be unique within a Streasm instance When set to None a system generated name is used.

  • job_group (str) – The job group to use to control permissions for the submitted job.

  • preload (bool) – Specifies whether to preload the job onto all resources in the instance, even if the job is not currently needed on each. Preloading the job can improve PE restart performance if the PEs are relocated to a new resource.

  • data_directory (str) – Specifies the location of the optional data directory. The data directory is a path within the cluster that is running the Streams instance.

  • tracing – Specify the application trace level. See tracing

Example:

# Submit a job with the name NewsIngester
cfg = {}
job_config = JobConfig(job_name='NewsIngester')
job_config.add(cfg)
context.submit('STREAMING_ANALYTICS_SERVICE', topo, cfg)
add(config)

Add this JobConfig into a submission configuration object.

Parameters

config (dict) – Submission configuration.

Returns

config.

Return type

dict

as_overlays()

Return this job configuration as a complete job configuration overlays object.

Converts this job configuration into the full format supported by IBM Streams. The returned dict contains:

  • jobConfigOverlays key with an array containing a single job configuration overlay.

  • an optional comment key containing the comment str.

For example with this JobConfig:

jc = JobConfig(job_name='TestIngester')
jc.comment = 'Test configuration'
jc.target_pe_count = 2

the returned dict would be:

{"comment": "Test configuration",
    "jobConfigOverlays":
        [{"jobConfig": {"jobName": "TestIngester"},
        "deploymentConfig": {"fusionTargetPeCount": 2, "fusionScheme": "manual"}}]}

The returned overlays object can be saved as JSON in a file using json.dump. A file can be used with job submission mechanisms that support a job config overlays file, such as streamtool submitjob or the IBM Streams console.

Example of saving a JobConfig instance as a file:

jc = JobConfig(job_name='TestIngester')
with open('jobconfig.json', 'w') as f:
    json.dump(jc.as_overlays(), f)
Returns

Complete job configuration overlays object built from this object.

Return type

dict

New in version 1.9.

property comment

Comment for job configuration.

The comment does not change the functionality of the job configuration.

Returns

Comment text, None if it has not been set.

Return type

str

New in version 1.9.

static from_overlays(overlays)

Create a JobConfig instance from a full job configuration overlays object.

All logical items, such as comment and job_name, are extracted from overlays. The remaining information in the single job config overlay in overlays is set as raw_overlay.

Parameters

overlays (dict) – Full job configuration overlays object.

Returns

Instance representing logical view of overlays.

Return type

JobConfig

New in version 1.9.

property raw_overlay

Raw Job Config Overlay.

A submitted job is configured using Job Config Overlay which is represented as a JSON. JobConfig exposes Job Config Overlay logically with properties such as job_name and tracing. This property (as a dict) allows merging of the configuration defined by this object and raw representation of a Job Config Overlay. This can be used when a capability of Job Config Overlay is not exposed logically through this class.

For example, the threading model can be set by:

jc = streamsx.topology.context.JobConfig()
jc.raw_overlay = {'deploymentConfig': {'threadingModel': 'manual'}}

Any logical items set by this object overwrite any set with raw_overlay. For example this sets the job name to to value set in the constructor (DBIngest) not the value in raw_overlay (Ingest):

jc = streamsx.topology.context.JobConfig(job_name='DBIngest')
jc.raw_overlay = {'jobConfig': {'jobName': 'Ingest'}}

Note

Contents of raw_overlay is a dict that is must match a single Job Config Overlay and be serializable as JSON to the correct format.

New in version 1.9.

property submission_parameters

Job submission parameters.

Submission parameters values for the job. A dict object that maps submission parameter names to values.

New in version 1.9.

property target_pe_count

Target processing element count.

When submitted against a Streams instance target_pe_count provides a hint to the scheduler as to how to partition the topology across processing elements (processes) for the job execution. When a job contains multiple processing elements (PEs) then the Streams scheduler can distributed the PEs across the resources (hosts) running in the instance.

When set to None (the default) no hint is supplied to the scheduler. The number of PEs in the submitted job will be determined by the scheduler.

The value is only a target and may be ignored when the topology contains isolate() calls.

Note

Only supported in Streaming Analytics service and IBM Streams 4.2 or later.

property tracing

Runtime application trace level.

The runtime application trace level can be a string with value error, warn, info, debug or trace.

In addition a level from Python logging module can be used in with CRITICAL and ERROR mapping to error, WARNING to warn, INFO to info and DEBUG to debug.

Setting tracing to None or logging.NOTSET will result in the job submission using the Streams instance application trace level.

The value of tracing is the level as a string (error, warn, info, debug or trace) or None.

class streamsx.topology.context.SubmissionResult(results)

Bases: object

Passed back to the user after a call to submit. Allows the user to use dot notation to access dictionary elements.

Example accessing result files when using BUNDLE:

submission_result = submit(ContextTypes.BUNDLE, topology, config)
print(submission_result.bundlePath)
...
os.remove(submission_result.bundlePath)
os.remove(submission_result.jobConfigPath)

Result contains the generated toolkit location when using TOOLKIT:

submission_result = submit(ContextTypes.TOOLKIT, topology, config)
print(submission_result.toolkitRoot)

Result when using DISTRIBUTED depends if the Topology is compiled locally and the resultant Streams application bundle (sab file) is submitted to an IBM Streams instance or if the Topology is compiled on build-service and submitted to an instance in Cloud Pak for Data:

submission_result = submit(ContextTypes.DISTRIBUTED, topology, config)
print(submission_result)

Result contains the generated image, imageDigest, submitMetrics (building the bundle), submitImageMetrics (building the image) when using EDGE:

submission_result = submit(ContextTypes.EDGE, topology, config)
print(submission_result.image)
print(submission_result.imageDigest)
cancel_job_button(description=None)

Display a button that will cancel the submitted job.

Used in a Jupyter IPython notebook to provide an interactive mechanism to cancel a job submitted from the notebook.

Once clicked the button is disabled unless the cancel fails.

A job may be cancelled directly using:

submission_result = submit(ctx_type, topology, config)
submission_result.job.cancel()
Parameters

description (str) – Text used as the button description, defaults to value based upon the job name.

Warning

Behavior when called outside a notebook is undefined.

New in version 1.12.

property job

REST binding for the job associated with the submitted build.

Returns

REST binding for running job or None if connection information was not available or no job was submitted.

Return type

Job

streamsx.topology.context.submit(ctxtype, graph, config=None, username=None, password=None)

Submits a Topology (application) using the specified context type.

Used to submit an application for compilation into a Streams application and execution within an Streaming Analytics service or IBM Streams instance.

ctxtype defines how the application will be submitted, see ContextTypes.

The parameters username and password are only required when submitting to an IBM Streams instance and it is required to access the Streams REST API from the code performing the submit. Accessing data from views created by view() requires access to the Streams REST API.

Parameters
  • ctxtype (str) – Type of context the application will be submitted to. A value from ContextTypes.

  • graph (Topology) – The application topology to be submitted.

  • config (dict) – Configuration for the submission, augmented with values such as a JobConfig or keys from ConfigParams.

  • username (str) – Deprecated: Username for the Streams REST api. Use environment variable STREAMS_USERNAME if using user-password authentication.

  • password (str) – Deprecated: Password for username. Use environment variable STREAMS_PASSWORD if using user-password authentication.

Returns

Result of the submission. Content depends on ContextTypes constant passed as ctxtype.

Return type

SubmissionResult

streamsx.topology.context.build(topology, config=None, dest=None, verify=None)

Build a topology to produce a Streams application bundle.

Builds a topology using submit() with context type BUNDLE. The result is a sab file on the local file system along with a job config overlay file matching the application.

The build uses a build service or a local install, see BUNDLE for details.

Parameters
  • topology (Topology) – Application topology to be built.

  • config (dict) – Configuration for the build.

  • dest (str) – Destination directory for the sab and JCO files. Default is context specific.

  • verify – SSL verification used by requests when using a build service. Defaults to enabling SSL verification.

Returns

3-element tuple containing

  • bundle_path (str): path to the bundle (sab file) or None if not created.

  • jco_path (str): path to file containing the job config overlay for the application or None if not created.

  • result (SubmissionResult): value returned from submit.

See also

BUNDLE for details on how to configure the build service to use.

New in version 1.14.

streamsx.topology.context.run(topology, config=None, job_name=None, verify=None, ctxtype='DISTRIBUTED')

Run a topology in a distributed Streams instance.

Runs a topology using submit() with context type DISTRIBUTED (by default). The result is running Streams job.

Parameters
  • topology (Topology) – Application topology to be run.

  • config (dict) – Configuration for the build.

  • job_name (str) – Optional job name. If set will override any job name in config.

  • verify – SSL verification used by requests when using a build service. Defaults to enabling SSL verification.

  • ctxtype (str) – Context type for submission.

Returns

2-element tuple containing

  • job (Job): REST binding object for the running job or None if no job was submitted.

  • result (SubmissionResult): value returned from submit.

See also

DISTRIBUTED for details on how to configure the Streams instance to use.

New in version 1.14.