The KSSOURCE function creates a Kafka Streams Source node and adds it to a Kafka Streams topology.

The source node is defined using a map with the following keys:

nameName of the node. Must be unique within the topology.
tsextractorOptional macro used to extract the record timestamp. This macro will be called with a map containing the record details and is expect to return a LONG. If this macro is undefined, the default extractor defined in the configuration will be used.
keydeserOptional macro used to deserialize a record key. If unset, the default key deserializer will be used.
valuedeserOptional macro used to deserialize a record value. If unset, the default value deserializer will be used.
topicsRegular expression identifying the topics the source will consume.
resetOptional auto offset reset policy to use if no committed offsets are found, EARLIEST or LATEST. Defaults to EARLIEST.

Deserializers are called with a parameter map containing keys topic and data associated with the record topic and byte array to deserialize.

The record details passed to tsextractor are contained in a map with the following keys:

timestampThe current record timestamp.
timestampTypeThe type of timestamp, NoTimestampType, CreateTime or LogAppendTime.
topicName of topic from which the record was consumed.
offsetOffset of the record.
partitionPartition of the record.
partitionTimeThe highest extracted valid timestamp of the current record's partition˙ (could be -1 if unknown).
keyRecord key.
valueRecord value.
headersMap of record headers, key is a STRING, value a byte array.

The tsextractor macro is expected to return a timestamp expressed in milliseconds since the Unix Epoch.