Skip to main content

IoT Series: Sensors: Utilizing Breakout Garden Hat: Part 2 - Integrating MQTT, TensorFlow and Kafka Streams

IoT Series: Sensors: Utilizing Breakout Garden Hat: Part 2 - Integrating MQTT, TensorFlow and Kafka Streams
In this second part, I have incremented the functionality in the Python capture, MiniFi, NiFi and post-NiFi processing. I have added a Kafka Streams Java application.
With this NiFi flow we are consuming the MQTT and Kafka messages send by the Kafka Streams application.
In one flow, we received MQTT messages, pull out the entire flow as a message and send to a Slack channel.
In another flow we ingest two types of Kafka messages and store the JSON ones that have a schema in an HBase table via the record processor.
In this flow we receive from the local NiFi router that was called by MiniFi over S2S/HTTP(s). We build two types of messages and send them to Kafka 2.0 brokers. One is the full JSON message with a schema, the other is just the temperature. We create a Kafka Key from the UUID. We also process the images sent from MiniFi with my native Java TensorFlow Inception processor.
I decided to try some TensorFlow processing for our infinite sensor loop, it may be too much memory usage, so I may have to pick a different TensorFlow model and switch to TF Lite (https://www.tensorflow.org/lite/devguide). You will not two extra attributes coming from the Python script running on the Raspberry Pi 3B+.
Another thing I wanted to do is try Kafka Streams since in Kafka 2.0 in HDP and HDF we have a fully supported version available. So based on example code I wrote a simple Kafka Streams Java 8 application that reads Kafka JSON messages sent from NiFi 1.8 and check for some conditions and push out data to MQTT and another Kafka topic.
If you don't have an MQTT broker. Here is a quick way to install a Mosquitto MQTT broker on Centos 7.
  1. sudo yum -y install mosquitto
  2. /etc/mosquitto/mosquitto.conf
  3. mkdir -p /var/log/mosquitto<br>chmod -R 777 /var/log/mosquitto/<br>touch /var/log/mosquitto/mosquitto.log<br>sudo systemctl start mosquitto<br>sudo systemctl enable mosquitto
Now that we have an MQTT broker our Kafka Streams app can send messages to it and NiFi can read messages from it.
In a future version I will use Hortonworks Schema Registry and Avro.
I have updated the Python script to include TensorFlow and to update to Python 3.5. Make sure you run with Python 3.5 and have all the libraries installed on your RPI/Linux device.
Some of the updated code for 3.5, note the message encoding. Python: https://github.com/tspannhw/minifi-breakoutgarden/blob/master/minifi35.py
  1. def send_tcp(s, message):
  2. if not message:
  3. try:
  4. s.sendall(message.encode('utf-8')) <br> except:
  5. print("Failed to send message")

For testing IOT values, I have a GenerateFlowFile with this JSON:
  1. {
  2. "systemtime" : "${now():format('MM/dd/yyyy HH:mm:ss')}",
  3. "BH1745_green" : "${random():mod(100):plus(1)} ",
  4. "ltr559_prox" : "0000",
  5. "end" : "${now():format('yyyyMMddHHmmss')}",
  6. "uuid" : "${now():format('yyyyMMddHHmmss')}_${UUID()}",
  7. "lsm303d_accelerometer" : "+00.06g : -01.01g : +00.04g",
  8. "imgnamep" : "images/bog_image_p_${now():format('yyyyMMddHHmmss')}_${UUID()}.jpg",
  9. "cputemp" : ${random():mod(100):toNumber()},
  10. "BH1745_blue" : "9.0",
  11. "te" : "47.3427119255",
  12. "bme680_tempc" : "28.19",
  13. "imgname" : "images/bog_image_${now():format('yyyyMMddHHmmss')}_${UUID()}.jpg",
  14. "bme680_tempf" : "80.${random():mod(100):toNumber()}",
  15. "ltr559_lux" : "006.87",
  16. "memory" : 34.9,
  17. "VL53L1X_distance_in_mm" : 134,
  18. "bme680_humidity" : "${random():mod(100):toNumber()}",
  19. "host" : "vid5",
  20. "diskusage" : "8732.7",
  21. "ipaddress" : "192.168.1.167",
  22. "bme680_pressure" : "1017.31",
  23. "BH1745_clear" : "10.0",
  24. "BH1745_red" : "0.0",
  25. "lsm303d_magnetometer" : "+00.04 : +00.34 : -00.10",
  26. "starttime" : "${now():format('MM/dd/yyyy HH:mm:ss')}"
  27. }
Kafka Streams Source Code:
Running the Fat Jar:
  1. java -jar target/kstreams-1.0.jar<br>******************************************* Started <br>**********2018/12/28 16:41:19<br>**********
  2. Memory Usage: 28284968
Updated Source Code:
Updated Example Run Output
  1. {
  2. "ltr559_lux" : "033.75",
  3. "uuid" : "20181228162321_cbd0cbd3-17f6-4730-ae43-1e7b46a01135",
  4. "cputemp" : 51,
  5. "host" : "piups",
  6. "lsm303d_magnetometer" : "-00.12 : +00.27 : +00.15",
  7. "bme680_tempc" : "24.96",
  8. "score" : "0.9694475",
  9. "lsm303d_accelerometer" : "+00.12g : -01.00g : +00.08g",
  10. "ltr559_prox" : "0000",
  11. "bme680_humidity" : "28.875",
  12. "diskusage" : "10058.7",
  13. "human_string" : "electric fan, blower",
  14. "bme680_pressure" : "1012.00",
  15. "BH1745_green" : "31.0",
  16. "imgnamep" : "/opt/demo/images/bog_image_p_20181228162321_cbd0cbd3-17f6-4730-ae43-1e7b46a01135.jpg",
  17. "systemtime" : "12/28/2018 11:24:11",
  18. "BH1745_red" : "33.0",
  19. "starttime" : "12/28/2018 11:16:02",
  20. "BH1745_blue" : "19.8",
  21. "end" : "1546014251.2879872",
  22. "bme680_tempf" : "76.93",
  23. "VL53L1X_distance_in_mm" : 455,
  24. "te" : "488.33915853500366",
  25. "memory" : 70.8,
  26. "imgname" : "/opt/demo/images/bog_image_20181228162321_cbd0cbd3-17f6-4730-ae43-1e7b46a01135.jpg",
  27. "ipaddress" : "192.168.1.166",
  28. "BH1745_clear" : "40.0"
  29. }
From Kafka Streams I am sending a warning on temperature to MQTT which NiFi sends to Slack.
Temperature warning 82.74
Using HBase 2.0, we are storing out data as it streams from Kafka Streams to NiFi. We use PutHBaseRecord which utilizes record processing and our schema to stream our JSON into HBase with ease.
Updated Schema with TF Attributes
  1. {
  2. "type": "record",
  3. "name": "garden",
  4. "fields": [
  5. {
  6. "name": "systemtime",
  7. "type": "string"
  8. },
  9. {
  10. "name": "BH1745_green",
  11. "type": "string"
  12. },
  13. {
  14. "name": "human_string",
  15. "type": "string",
  16. "default": "UNK"
  17. },
  18. {
  19. "name": "ltr559_prox",
  20. "type": "string"
  21. },
  22. {
  23. "name": "end",
  24. "type": "string"
  25. },
  26. {
  27. "name": "uuid",
  28. "type": "string"
  29. },
  30. {
  31. "name": "lsm303d_accelerometer",
  32. "type": "string"
  33. },
  34. {
  35. "name": "score",
  36. "type": "string",
  37. "default": "0"
  38. },
  39. {
  40. "name": "imgnamep",
  41. "type": "string"
  42. },
  43. {
  44. "name": "cputemp",
  45. "type": "double",
  46. "doc": "Type inferred from '58.0'"
  47. },
  48. {
  49. "name": "BH1745_blue",
  50. "type": "string",
  51. "doc": "Type inferred from '\"10.8\"'"
  52. },
  53. {
  54. "name": "te",
  55. "type": "string",
  56. "doc": "Type inferred from '\"254.545491934\"'"
  57. },
  58. {
  59. "name": "bme680_tempc",
  60. "type": "string",
  61. "doc": "Type inferred from '\"29.13\"'"
  62. },
  63. {
  64. "name": "imgname",
  65. "type": "string"
  66. },
  67. {
  68. "name": "bme680_tempf",
  69. "type": "string",
  70. "doc": "Type inferred from '\"84.43\"'"
  71. },
  72. {
  73. "name": "ltr559_lux",
  74. "type": "string",
  75. "doc": "Type inferred from '\"077.95\"'"
  76. },
  77. {
  78. "name": "memory",
  79. "type": "double",
  80. "doc": "Type inferred from '37.6'"
  81. },
  82. {
  83. "name": "VL53L1X_distance_in_mm",
  84. "type": "int",
  85. "doc": "Type inferred from '161'"
  86. },
  87. {
  88. "name": "bme680_humidity",
  89. "type": "string",
  90. "doc": "Type inferred from '\"32.359\"'"
  91. },
  92. {
  93. "name": "host",
  94. "type": "string",
  95. "doc": "Type inferred from '\"vid5\"'"
  96. },
  97. {
  98. "name": "diskusage",
  99. "type": "string",
  100. "doc": "Type inferred from '\"8357.6\"'"
  101. },
  102. {
  103. "name": "ipaddress",
  104. "type": "string",
  105. "doc": "Type inferred from '\"192.168.1.167\"'"
  106. },
  107. {
  108. "name": "bme680_pressure",
  109. "type": "string",
  110. "doc": "Type inferred from '\"987.86\"'"
  111. },
  112. {
  113. "name": "BH1745_clear",
  114. "type": "string",
  115. "doc": "Type inferred from '\"90.0\"'"
  116. },
  117. {
  118. "name": "BH1745_red",
  119. "type": "string",
  120. "doc": "Type inferred from '\"33.0\"'"
  121. },
  122. {
  123. "name": "lsm303d_magnetometer",
  124. "type": "string"
  125. },
  126. {
  127. "name": "starttime",
  128. "type": "string"
  129. }
  130. ]
  131. }
HBase table
create 'breakout', 'sensors'
Example Row
  1. 1546014251.2879872 column=sensors:BH1745_blue, timestamp=1546020326955, value=19.8
  2. 1546014251.2879872 column=sensors:BH1745_clear, timestamp=1546020326955, value=40.0
  3. 1546014251.2879872 column=sensors:BH1745_green, timestamp=1546020326955, value=31.0
  4. 1546014251.2879872 column=sensors:BH1745_red, timestamp=1546020326955, value=33.0
  5. 1546014251.2879872 column=sensors:VL53L1X_distance_in_mm, timestamp=1546020326955, value=455
  6. 1546014251.2879872 column=sensors:bme680_humidity, timestamp=1546020326955, value=28.875
  7. 1546014251.2879872 column=sensors:bme680_pressure, timestamp=1546020326955, value=1012.00
  8. 1546014251.2879872 column=sensors:bme680_tempc, timestamp=1546020326955, value=24.96
  9. 1546014251.2879872 column=sensors:bme680_tempf, timestamp=1546020326955, value=76.93
  10. 1546014251.2879872

Popular posts from this blog

Migrating Apache Flume Flows to Apache NiFi: Kafka Source to HDFS / Kudu / File / Hive

Migrating Apache Flume Flows to Apache NiFi: Kafka Source to HDFS / Kudu / File / HiveArticle 7 - https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_9.html Article 6 - https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_35.html
Article 5 - 
Article 4 - https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_8.html Article 3 - https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_7.html Article 2 - https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache.html Article 1https://www.datainmotion.dev/2019/08/migrating-apache-flume-flows-to-apache.html Source Code:  https://github.com/tspannhw/flume-to-nifi
This is one possible simple, fast replacement for "Flafka".



Consume / Publish Kafka And Store to Files, HDFS, Hive 3.1, Kudu

Consume Kafka Flow 

 Merge Records And Store As AVRO or ORC
Consume Kafka, Update Records via Machine Learning Models In CDSW And Store to Kudu

Sour…

Exploring Apache NiFi 1.10: Stateless Engine and Parameters

Exploring Apache NiFi 1.10:   Stateless Engine and Parameters Apache NiFi is now available in 1.10!
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12316020&version=12344993

You can now use JDK 8 or JDK 11!   I am running in JDK 11, seems a bit faster.

A huge feature is the addition of Parameters!   And you can use these to pass parameters to Apache NiFi Stateless!

A few lesser Processors have been moved from the main download, see here for migration hints:
https://cwiki.apache.org/confluence/display/NIFI/Migration+Guidance

Release Notes:   https://cwiki.apache.org/confluence/display/NIFI/Release+Notes#ReleaseNotes-Version1.10.0

Example Source Code:https://github.com/tspannhw/stateless-examples

More New Features:

ParquetReader/Writer (See:  https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_7.html)Prometheus Reporting Task.   Expect more Prometheus stuff coming.Experimental Encrypted content repository.   People asked me for this one before.Par…

Ingesting Drone Data From DJII Ryze Tello Drones Part 1 - Setup and Practice

Ingesting Drone Data From DJII Ryze Tello Drones Part 1 - Setup and Practice In Part 1, we will setup our drone, our communication environment, capture the data and do initial analysis. We will eventually grab live video stream for object detection, real-time flight control and real-time data ingest of photos, videos and sensor readings. We will have Apache NiFi react to live situations facing the drone and have it issue flight commands via UDP. In this initial section, we will control the drone with Python which can be triggered by NiFi. Apache NiFi will ingest log data that is stored as CSV files on a NiFi node connected to the drone's WiFi. This will eventually move to a dedicated embedded device running MiniFi. This is a small personal drone with less than 13 minutes of flight time per battery. This is not a commercial drone, but gives you an idea of the what you can do with drones. Drone Live Communications for Sensor Readings and Drone Control You must connect to the drone…