Kafka Ingestion
This guide illustrates how to develop a Swim application that ingests data from Kafka topics and instantiates logic-performing Web Agents.
We accomplish this by declaring two types of Web Agents:
- A singleton
KafkaConsumingAgent
responsible for consuming messages from a Kafka topic and relaying them to… - …a dynamic number of
VehicleAgents
whose callback functions define the business logic.
Prerequisites
- Swim server libraries
- Kafka Client libraries
- A network-accessible Kafka topic
Guide
Step 0: Example Data Definition and Business Logic Goals
Let’s envision a situation where vehicles continuously report their state to the Kafka topic. Messages in the topic take the following structure:
-
key
: a unique String identifying this vehicle -
value
: a JSON string that looks like:
{
"id": (string (same as key)),
"timestamp": (number (Unix timestamp))
"latitude": (number),
"longitude": (number),
"speed": (number),
"bearing": (number)
}
We wish to have real-time access to present and historical data at vehicle-level granularity.
KafkaConsumer
Instantiation
Step 1: Instantiate a KafkaConsumer
– nothing special here, and certainly familiar to veteran Kafka users.
// Assets.java
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public final class Assets {
private Assets() {
}
private static KafkaConsumer<String, String> kafkaConsumer; // or other type parameters
public static KafkaConsumer<String, String> kafkaConsumer() {
return Assets.kafkaConsumer;
}
private static KafkaConsumer<String, String> loadKafkaConsumer() {
final Properties props = new Properties();
props.setProperty("bootstrap.servers", "your-bootstrap-host:9092");
props.setProperty("group.id", "your-group");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// Alternatively, load above from a .properties file
return new KafkaConsumer<>(props);
}
public static void init() {
Assets.kafkaConsumer = loadKafkaConsumer();
}
}
Assets.kafkaConsumer
will be the bridge between the Kafka topic and the Swim server.
KafkaConsumerAgent
Implementation
Step 2: The Kafka-recommended pattern for consuming messages with a KafkaConsumer
looks like:
while (true) {
ConsumerRecords<?, ?> records = yourConsumer.poll(YOUR_POLL_DURATION_MS);
for (ConsumerRecord<?, ?> record : records) {
// Do something with record
}
}
This is all it takes to implement that (clearly blocking) pattern within a Web Agent:
// KafkaConsumingAgent.java
import java.time.Duration;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import swim.api.agent.AbstractAgent;
import swim.concurrent.AbstractTask;
import swim.concurrent.TaskRef;
public class KafkaConsumingAgent extends AbstractAgent {
// asyncStage() can safely run blocking, long-running operations
private final TaskRef endlessConsumingTask = asyncStage().task(new AbstractTask() {
@Override
public void runTask() {
while (true) {
final ConsumerRecords<String, String> records = Assets.kafkaConsumer()
.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// TODO: take an action on record
}
}
}
@Override
public boolean taskWillBlock() {
return true;
}
});
@Override
public void didStart() {
this.endlessConsumingTask.cue();
}
}
Note: because KafkaConsumingAgent
is the only class that that actively uses the KafkaConsumer
class, you may choose to instantiate the KafkaConsumer
instance from KafkaConsumingAgent
instead.
The current approach has the advantage of “fast-failing” the process, avoiding any part of the Swim server from starting if there is an issue reaching the Kafka topic.
Warning
When we configure the Web Agent nodeUri routing paths (e.g. within server.recon), ensure that only one instance of KafkaConsumingAgent can be instantiated.
VehicleAgent
Implementation and Routing
Step 3: The code so far is fully capable of consuming the topic’s data.
We must now create entities – VehicleAgents
– that can accept and process this data.
Each will merely contain a CommandLane
(to receive messages) and a time series (to retain them).
// VehicleAgent.java
import swim.api.SwimLane;
import swim.api.agent.AbstractAgent;
import swim.api.lane.CommandLane;
import swim.api.lane.MapLane;
import swim.structure.Value;
public class VehicleAgent extends AbstractAgent {
@SwimLane("addMessage")
CommandLane<Value> addMessage = this.<Value>commandLane()
.onCommand(v -> {
this.history.put(v.get("timestamp").longValue(), v);
});
@SwimLane("history")
MapLane<Long, Value> history = this.<Long, Value>mapLane()
.didUpdate((k, n, o) -> {
System.out.println(nodeUri() + ": received " + n);
});
}
Deciding that the URIs for VehicleAgents
will take the form /vehicle/:id
, everything is in place to fill out our earlier for-loop’s TODO:
// KafkaConsumingAgent.java
// import ...
import swim.json.Json;
import swim.structure.Value;
public class KafkaConsumingAgent extends AbstractAgent {
private final AbstractTask infiniteConsumingTask = asyncStage().task(new AbstractTask() {
@Override
public void runTask() {
while (true) {
final ConsumerRecords<String, String> records = Assets.kafkaConsumer()
.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
final String nodeUri = "/vehicle/" + record.key();
final Value payload = Json.parse(record.value());
command(nodeUri, "addMessage", payload);
}
}
}
// ...
}
Step 4: Wrapping It Up
Minus the boilerplate that comes with every Swim application, namely:
- A
server.recon
to configure networking, routing, and additional kernels - A runtime-providing
Plane
- A
main()
method that loads theKafkaConsumer
and the Swim server
We’re completely done! A standalone, directly-runnable project can be found here.