$ wf g -w /path/to/warp10 io.warp10 warp10-plugin-kstreams 1.0.0-uberuberjar

/path/to/warp10/ is where Warp 10 is physically located.


Warp 10 Kafka Streams Plugin

This plugin allows to run Kafka Streams topologies defined entirely in WarpScript. The goal is to leverage the power and versatility of the Warp 10 ecosystem within the Kafka Streams paradigm.

The primary goal of the plugin is to define Kafka Streams topologies within a Warp 10 instance but it includes a launcher which can run topologies within a dedicated Java Virtual Machine.

Topology definition

A topology is a WarpScript program making use of some specific functions. Those functions allow to define the sources, sinks and processors of the topology in a way very similar to the Kafka Streams Processor API.

A single WarpScript program can contain the definition of several topologies and launch them in the same JVM.

The sequence of functions to use for defining and running a topology is:

KSTOPOLOGY          // Create a topology
{ ... } KSSOURCE    // Define a source and add it to the topology
{ ... } KSPROCESSOR // Define a processor and add it to the topology
{ ... } KSSINK      // Define a sink and add it to the topology

// The above functions can be used multiple times to add multiple instances of each node type

  ... // Kafka Streams configuration

This is all there is to it. Please refer to the documentation of each function to learn about their parameters.

Deploying topologies within a Warp 10 instance

The plugin needs to be configured for your Warp 10 instance:

warp10.plugin.kstreams = io.warp10.plugins.kstreams.KStreamsWarp10Plugin
kstreams.dir = /path/to/your/kstreams/topologies/directory

When the Kafka Streams Plugin is loaded into Warp 10, the directory specified under kstreams.dir is scanned periodically (as specified in kstreams.period or every 60000 milliseconds) and any new or modified .mc2 file will be executed. Those files are expected to contain topology definitions.

When a file is updated, the topologies it previously launched are first stopped and the file content is then executed, launching new topologies if KSSTART is called.

Launching topologies in a separate JVM

Launching topologies in a dedicated JVM is done via the use of the io.warp10.plugins.kstreams.KSLaunch class. Assuming the definition of the topologies is in file topologies.mc2, the following command would launch them:

java -cp warp10-plugins-kstreams-x.y.z-uberuberjar.jar -Dwarp10.conf=warp10.conf io.warp10.plugins.kstreams.KSLaunch topologies.mc2

Note that you have to use the -uberuberjar version of the plugin which embeds the WarpScript library. The -uberjar version only embeds the Kafka Streams dependencies, it is meant to be used within a Warp 10 instance.

The warp10.conf file contains the configuration of the WarpScript environment, such as the time units, the extensions to load, the WarpFleet URLs and any other configuration you may need.

Additional functions

The following functions are provided by the KStreamsWarpScriptExtension which comes with the Kafka Streams Plugin.

Topology definition functions

KSTOPOLOGYCreate an empty topology.
KSSOURCEAdd a source to a topology.
KSPROCESSORAdd a processor to a topology.
KSSINKAdd a sink to a topology.
KSSTARTStart a topology.

Processor functions

KSAPPIDRetrieve the application ID.
KSCOMMITRequest a commit of offsets.
KSCONFIGRetrieve the topology configuration.
KSHEADERSRetrieve the headers of the current record.
KSOFFSETRetrieve the offset of the current record.
KSPARTITIONRetrieve the partition of the current record.
KSTIMESTAMPRetrieve the timestamp of the current record.
KSTOPICRetrieve the topic of the current record.
KSTIMESTAMPRetrieve the timestamp of the current record. Timestamp is in milliseconds since the Unix Epoch.
KSSCHEDULESchedule or cancel a punctuation.
KSFORWARDForward a record to a downstream processor.






Last published






Path Size Creation time
  KSAPPID 824 bytes 2020-03-27
  KSCOMMIT 819 bytes 2020-03-27
  KSCONFIG 935 bytes 2020-03-27
  KSFORWARD 1046 bytes 2020-03-27
  KSHEADERS 898 bytes 2020-03-27
  KSOFFSET 818 bytes 2020-03-27
  KSPARTITION 836 bytes 2020-03-27
  KSPROCESSOR 1447 bytes 2020-03-27
  KSSCHEDULE 1260 bytes 2020-03-27
  KSSINK 2411 bytes 2020-03-26
  KSSOURCE 2605 bytes 2020-03-26
  KSSTART 913 bytes 2020-03-26
  KSTIMESTAMP 836 bytes 2020-03-27
  KSTOPIC 843 bytes 2020-03-27
  KSTOPOLOGY 634 bytes 2020-03-26



Configuration template


Describe here the typical configuration needed to enable

and use the module.


This includes configuration primitives for the module itself

or for provided macros.


This configuration file is used by the wf tool for documenting

your module on the WarpFleetâ„¢ site.

warp10.plugin.kstreams = io.warp10.plugins.kstreams.KStreamsWarp10Plugin

Path of the directory containing the topology definitions

kstreams.dir = /path/to/your/kstreams/topologies/directory

How often should kstreams.dir be scanned, in ms. Defaults to 60,000

kstreams.period = 60000