This article explains how to set up a data pipeline for IoT data to Cassandra using Apache Kafka. IoT devices use the MQTT Internet of Things connectivity protocol. MQTT is machine to machine messaging protocol.
Read more on MQTT here.
All these are set up on a single server for running this example. We will use a single node Kafka broker and single-node Cassandra.
This example uses connect-standalone.sh tool which comes as part of Kafka download.
We also need a third party plugin stream-reactor for setting up different connectors in Kafka. The connector used for receiving data into Kafka is known as the source connector and the connector used for sending data from Kafka is known as sink connector. stream-reactor provides a number of sources and sinks connectors for Kafka which can be downloaded from here.
Use the latest version compatible with Kafka 2.1.1.
Download the archive and extract it to /opt/stream-reactor folder [You can use any folder].
This example uses Mosquitto broker running on default port 1883 for receiving the data from IoT devices. We have simulated the IoT data using the mosquitto_pub tool.
For running the example, we need to configure some basic parameters for Kafka Connect and MQTT Sink connector.
Create a file Kafka-connect.properties under Kafka installation’s config directory with the following parameters:
—————————————————————-
# kafka-connect.properties file
# modified for creating data pipe line from mqtt to cassandra
# Kafka broker IP addresses to connect to
bootstrap.servers=localhost:9092
# Path to directory containing the connector jar
plugin.path=/usr/local/kafka/libs,/opt/stream-reactor
# Converters to use to convert keys and values
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
# The internal converters Kafka Connect uses for storing offset and configuration data
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=30000
———————————————————————-
Change the bootstrap.servers and plugin.path according to your server. The offset.flush.interval.ms is set to 30000 [30 seconds] in the example which can be changed as per your requirement.
Note : No user authentication values are provided on the config file. If authentication is needed, we need to set the following parameters too on the kafka-connect.properties file.
———————————————————————–
#Kafka Connect properties
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="<password>";
#Source connector properties
producer.sasl.mechanism=SCRAM-SHA-256
producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="<password>";
———————————————————————–
Next step is to create a kafka topic for receiving the data from IoT devices. We can use the following command [run from terminal] for creating a topic mqtt-source.
Goto, the Kafka installed folder and run the command :
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mqtt-source-data
Once the command is run, we will get a response like topic mqtt-source-data is created on the terminal. Other useful Kafka commands are :
# list all topics
bin/kafka-topics.sh --list --zookeeper localhost:2181
# delete a topic
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic mqtt-source-data
# describe a topic
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic mqtt-source-data
# running console consumer command
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mqtt-source-data
--from-beginning
Now that we have configured Kafka Connect, next is to set up the MQTT Source Connector. We use MQTT Source from the stream-reactor project. More configuration options can be read from here.
MQTT Source Connector will take all the data from an MQTT topic to Kafka’s topic. We use the Kafka topic mqtt-source-data and MQTT topic mqtt-source for this example.
Create a file, connect-mqtt-source.properties on config folder with the below content :
———————————————————————————
name=mqtt-source
connector.class=com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector
tasks.max=1
connect.mqtt.clean=true
connect.mqtt.timeout=1000
connect.mqtt.kcql=INSERT INTO mqtt-source-data SELECT * FROM mqtt-source WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter`
connect.mqtt.keep.alive=1000
connect.mqtt.converter.throw.on.error=true
connect.mqtt.hosts=tcp://localhost:1883
connect.mqtt.service.quality=1
connect.progress.enabled=true
———————————————————————————
Note : Make sure to update connect.mqtt.hosts and connect.mqtt.kcql as per your environment.
Next, we need to set up the Cassandra Sink Connector for moving data from the Kafka topic to Cassandra. Cassandra Sink from the stream-reactor project is used for this example. For more configuration options, please visit this link.
Cassandra Sink Connector will consume data from the Kafka topic and insert it into the Cassandra database.
Create a file, connect-cassandra-sink.properties on config folder with the below content :
————————————————————————————-
connector.class=com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector
connect.cassandra.key.space=sensordata
connect.cassandra.contact.points=localhost
tasks.max=1
topics=mqtt-source-data
connect.cassandra.kcql=INSERT INTO mqttsensordata SELECT * FROM mqtt-source-data
connect.cassandra.connection.clean=true
connect.cassandra.connection.timeout=1000
connect.cassandra.connection.keep.alive=1000
connect.cassandra.port=9042
name=cassandra-sink-sensor-data
key.converter.schemas.enable=false
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
————————————————————————————-
Cassandra keyspace used is sensor data and the table used is mqttsensordata. The following commands are used to create keyspace and table in Cassandra after connecting through cqlsh.
CREATE KEYSPACE sensordata WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'} AND durable_writes = true; USE sensordata; CREATE TABLE sensordata.mqttsensordata ( deviceid int PRIMARY KEY, humidity int, temperature int );
Run the command on the terminal from Kafka installation folder :
bin/connect-standalone.sh config/kafka-connect.properties config/connect-mqtt-source.properties config/connect-cassandra-sink.properties
Before testing the data pipeline, make sure that mosquitto broker is running on port 1883, Kafka is up and running and Cassandra is up and running on port 9042.
Send some sample records using mosquitto_pub :
mosquitto_pub -h localhost -t mqtt-source -m "{\"deviceid\":1,\"temperature\":26,\"humidity\":51}"
mosquitto_pub -h localhost -t mqtt-source -m "{\"deviceid\":2,\"temperature\":23,\"humidity\":54}"
mosquitto_pub -h localhost -t mqtt-source -m "{\"deviceid\":3,\"temperature\":22,\"humidity\":55}"
Open a console consumer from another terminal to check whether the data was received on topic mqtt-source-data.
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mqtt-source-data --from-beginning
You can see the records available on the Kafka topic. Open another terminal and connect to Cassandra using cqlsh command. Use the keyspace sensordata and select the data from mqttsensordata table.
cqlsh localhost
USE sensordata;
SELECT * FROM mqttsensordata;
You can see the 3 records inserted on mqttsensordata table after a few seconds.
Congratulations !!! You have successfully created a data pipeline for MQTT messages to Cassandra database using Apache Kafka.
Do you have a link for the stream-reactor project which you have used?
http://archive.landoop.com/stream-reactor/