Skip to main content

IoT Series: Sensors: Utilizing Breakout Garden Hat: Part 2 - Integrating MQTT, TensorFlow and Kafka Streams

IoT Series: Sensors: Utilizing Breakout Garden Hat: Part 2 - Integrating MQTT, TensorFlow and Kafka Streams
In this second part, I have incremented the functionality in the Python capture, MiniFi, NiFi and post-NiFi processing. I have added a Kafka Streams Java application.
With this NiFi flow we are consuming the MQTT and Kafka messages send by the Kafka Streams application.
In one flow, we received MQTT messages, pull out the entire flow as a message and send to a Slack channel.
In another flow we ingest two types of Kafka messages and store the JSON ones that have a schema in an HBase table via the record processor.
In this flow we receive from the local NiFi router that was called by MiniFi over S2S/HTTP(s). We build two types of messages and send them to Kafka 2.0 brokers. One is the full JSON message with a schema, the other is just the temperature. We create a Kafka Key from the UUID. We also process the images sent from MiniFi with my native Java TensorFlow Inception processor.
I decided to try some TensorFlow processing for our infinite sensor loop, it may be too much memory usage, so I may have to pick a different TensorFlow model and switch to TF Lite (https://www.tensorflow.org/lite/devguide). You will not two extra attributes coming from the Python script running on the Raspberry Pi 3B+.
Another thing I wanted to do is try Kafka Streams since in Kafka 2.0 in HDP and HDF we have a fully supported version available. So based on example code I wrote a simple Kafka Streams Java 8 application that reads Kafka JSON messages sent from NiFi 1.8 and check for some conditions and push out data to MQTT and another Kafka topic.
If you don't have an MQTT broker. Here is a quick way to install a Mosquitto MQTT broker on Centos 7.

  1. sudo yum -y install mosquitto

  2. /etc/mosquitto/mosquitto.conf

  3. mkdir -p /var/log/mosquitto<br>chmod -R 777 /var/log/mosquitto/<br>touch /var/log/mosquitto/mosquitto.log<br>sudo systemctl start mosquitto<br>sudo systemctl enable mosquitto


Now that we have an MQTT broker our Kafka Streams app can send messages to it and NiFi can read messages from it.
In a future version I will use Hortonworks Schema Registry and Avro.
I have updated the Python script to include TensorFlow and to update to Python 3.5. Make sure you run with Python 3.5 and have all the libraries installed on your RPI/Linux device.
Some of the updated code for 3.5, note the message encoding. Python: https://github.com/tspannhw/minifi-breakoutgarden/blob/master/minifi35.py

  1. def send_tcp(s, message):

  2. if not message:

  3. try:

  4. s.sendall(message.encode('utf-8')) <br> except:

  5. print("Failed to send message")



For testing IOT values, I have a GenerateFlowFile with this JSON:

  1. {

  2. "systemtime" : "${now():format('MM/dd/yyyy HH:mm:ss')}",

  3. "BH1745_green" : "${random():mod(100):plus(1)} ",

  4. "ltr559_prox" : "0000",

  5. "end" : "${now():format('yyyyMMddHHmmss')}",

  6. "uuid" : "${now():format('yyyyMMddHHmmss')}_${UUID()}",

  7. "lsm303d_accelerometer" : "+00.06g : -01.01g : +00.04g",

  8. "imgnamep" : "images/bog_image_p_${now():format('yyyyMMddHHmmss')}_${UUID()}.jpg",

  9. "cputemp" : ${random():mod(100):toNumber()},

  10. "BH1745_blue" : "9.0",

  11. "te" : "47.3427119255",

  12. "bme680_tempc" : "28.19",

  13. "imgname" : "images/bog_image_${now():format('yyyyMMddHHmmss')}_${UUID()}.jpg",

  14. "bme680_tempf" : "80.${random():mod(100):toNumber()}",

  15. "ltr559_lux" : "006.87",

  16. "memory" : 34.9,

  17. "VL53L1X_distance_in_mm" : 134,

  18. "bme680_humidity" : "${random():mod(100):toNumber()}",

  19. "host" : "vid5",

  20. "diskusage" : "8732.7",

  21. "ipaddress" : "192.168.1.167",

  22. "bme680_pressure" : "1017.31",

  23. "BH1745_clear" : "10.0",

  24. "BH1745_red" : "0.0",

  25. "lsm303d_magnetometer" : "+00.04 : +00.34 : -00.10",

  26. "starttime" : "${now():format('MM/dd/yyyy HH:mm:ss')}"

  27. }


Kafka Streams Source Code:
Running the Fat Jar:

  1. java -jar target/kstreams-1.0.jar<br>******************************************* Started <br>**********2018/12/28 16:41:19<br>**********

  2. Memory Usage: 28284968


Updated Source Code:
Updated Example Run Output

  1. {

  2. "ltr559_lux" : "033.75",

  3. "uuid" : "20181228162321_cbd0cbd3-17f6-4730-ae43-1e7b46a01135",

  4. "cputemp" : 51,

  5. "host" : "piups",

  6. "lsm303d_magnetometer" : "-00.12 : +00.27 : +00.15",

  7. "bme680_tempc" : "24.96",

  8. "score" : "0.9694475",

  9. "lsm303d_accelerometer" : "+00.12g : -01.00g : +00.08g",

  10. "ltr559_prox" : "0000",

  11. "bme680_humidity" : "28.875",

  12. "diskusage" : "10058.7",

  13. "human_string" : "electric fan, blower",

  14. "bme680_pressure" : "1012.00",

  15. "BH1745_green" : "31.0",

  16. "imgnamep" : "/opt/demo/images/bog_image_p_20181228162321_cbd0cbd3-17f6-4730-ae43-1e7b46a01135.jpg",

  17. "systemtime" : "12/28/2018 11:24:11",

  18. "BH1745_red" : "33.0",

  19. "starttime" : "12/28/2018 11:16:02",

  20. "BH1745_blue" : "19.8",

  21. "end" : "1546014251.2879872",

  22. "bme680_tempf" : "76.93",

  23. "VL53L1X_distance_in_mm" : 455,

  24. "te" : "488.33915853500366",

  25. "memory" : 70.8,

  26. "imgname" : "/opt/demo/images/bog_image_20181228162321_cbd0cbd3-17f6-4730-ae43-1e7b46a01135.jpg",

  27. "ipaddress" : "192.168.1.166",

  28. "BH1745_clear" : "40.0"

  29. }


From Kafka Streams I am sending a warning on temperature to MQTT which NiFi sends to Slack.
Temperature warning 82.74
Using HBase 2.0, we are storing out data as it streams from Kafka Streams to NiFi. We use PutHBaseRecord which utilizes record processing and our schema to stream our JSON into HBase with ease.
Updated Schema with TF Attributes

  1. {

  2. "type": "record",

  3. "name": "garden",

  4. "fields": [

  5. {

  6. "name": "systemtime",

  7. "type": "string"

  8. },

  9. {

  10. "name": "BH1745_green",

  11. "type": "string"

  12. },

  13. {

  14. "name": "human_string",

  15. "type": "string",

  16. "default": "UNK"

  17. },

  18. {

  19. "name": "ltr559_prox",

  20. "type": "string"

  21. },

  22. {

  23. "name": "end",

  24. "type": "string"

  25. },

  26. {

  27. "name": "uuid",

  28. "type": "string"

  29. },

  30. {

  31. "name": "lsm303d_accelerometer",

  32. "type": "string"

  33. },

  34. {

  35. "name": "score",

  36. "type": "string",

  37. "default": "0"

  38. },

  39. {

  40. "name": "imgnamep",

  41. "type": "string"

  42. },

  43. {

  44. "name": "cputemp",

  45. "type": "double",

  46. "doc": "Type inferred from '58.0'"

  47. },

  48. {

  49. "name": "BH1745_blue",

  50. "type": "string",

  51. "doc": "Type inferred from '\"10.8\"'"

  52. },

  53. {

  54. "name": "te",

  55. "type": "string",

  56. "doc": "Type inferred from '\"254.545491934\"'"

  57. },

  58. {

  59. "name": "bme680_tempc",

  60. "type": "string",

  61. "doc": "Type inferred from '\"29.13\"'"

  62. },

  63. {

  64. "name": "imgname",

  65. "type": "string"

  66. },

  67. {

  68. "name": "bme680_tempf",

  69. "type": "string",

  70. "doc": "Type inferred from '\"84.43\"'"

  71. },

  72. {

  73. "name": "ltr559_lux",

  74. "type": "string",

  75. "doc": "Type inferred from '\"077.95\"'"

  76. },

  77. {

  78. "name": "memory",

  79. "type": "double",

  80. "doc": "Type inferred from '37.6'"

  81. },

  82. {

  83. "name": "VL53L1X_distance_in_mm",

  84. "type": "int",

  85. "doc": "Type inferred from '161'"

  86. },

  87. {

  88. "name": "bme680_humidity",

  89. "type": "string",

  90. "doc": "Type inferred from '\"32.359\"'"

  91. },

  92. {

  93. "name": "host",

  94. "type": "string",

  95. "doc": "Type inferred from '\"vid5\"'"

  96. },

  97. {

  98. "name": "diskusage",

  99. "type": "string",

  100. "doc": "Type inferred from '\"8357.6\"'"

  101. },

  102. {

  103. "name": "ipaddress",

  104. "type": "string",

  105. "doc": "Type inferred from '\"192.168.1.167\"'"

  106. },

  107. {

  108. "name": "bme680_pressure",

  109. "type": "string",

  110. "doc": "Type inferred from '\"987.86\"'"

  111. },

  112. {

  113. "name": "BH1745_clear",

  114. "type": "string",

  115. "doc": "Type inferred from '\"90.0\"'"

  116. },

  117. {

  118. "name": "BH1745_red",

  119. "type": "string",

  120. "doc": "Type inferred from '\"33.0\"'"

  121. },

  122. {

  123. "name": "lsm303d_magnetometer",

  124. "type": "string"

  125. },

  126. {

  127. "name": "starttime",

  128. "type": "string"

  129. }

  130. ]

  131. }


HBase table
create 'breakout', 'sensors'
Example Row

  1. 1546014251.2879872 column=sensors:BH1745_blue, timestamp=1546020326955, value=19.8

  2. 1546014251.2879872 column=sensors:BH1745_clear, timestamp=1546020326955, value=40.0

  3. 1546014251.2879872 column=sensors:BH1745_green, timestamp=1546020326955, value=31.0

  4. 1546014251.2879872 column=sensors:BH1745_red, timestamp=1546020326955, value=33.0

  5. 1546014251.2879872 column=sensors:VL53L1X_distance_in_mm, timestamp=1546020326955, value=455

  6. 1546014251.2879872 column=sensors:bme680_humidity, timestamp=1546020326955, value=28.875

  7. 1546014251.2879872 column=sensors:bme680_pressure, timestamp=1546020326955, value=1012.00

  8. 1546014251.2879872 column=sensors:bme680_tempc, timestamp=1546020326955, value=24.96

  9. 1546014251.2879872 column=sensors:bme680_tempf, timestamp=1546020326955, value=76.93

  10. 1546014251.2879872 column=sensors:cputemp, timestamp=1546020326955, value=51.0

  11. 1546014251.2879872 column=sensors:diskusage, timestamp=1546020326955, value=10058.7

  12. 1546014251