IoT Series: Sensors: Utilizing Breakout Garden Hat: Part 2 - Integrating MQTT, TensorFlow and Kafka Streams

IoT Series: Sensors: Utilizing Breakout Garden Hat: Part 2 - Integrating MQTT, TensorFlow and Kafka Streams
In this second part, I have incremented the functionality in the Python capture, MiniFi, NiFi and post-NiFi processing. I have added a Kafka Streams Java application.
With this NiFi flow we are consuming the MQTT and Kafka messages send by the Kafka Streams application.
In one flow, we received MQTT messages, pull out the entire flow as a message and send to a Slack channel.
In another flow we ingest two types of Kafka messages and store the JSON ones that have a schema in an HBase table via the record processor.
In this flow we receive from the local NiFi router that was called by MiniFi over S2S/HTTP(s). We build two types of messages and send them to Kafka 2.0 brokers. One is the full JSON message with a schema, the other is just the temperature. We create a Kafka Key from the UUID. We also process the images sent from MiniFi with my native Java TensorFlow Inception processor.
I decided to try some TensorFlow processing for our infinite sensor loop, it may be too much memory usage, so I may have to pick a different TensorFlow model and switch to TF Lite (https://www.tensorflow.org/lite/devguide). You will not two extra attributes coming from the Python script running on the Raspberry Pi 3B+.
Another thing I wanted to do is try Kafka Streams since in Kafka 2.0 in HDP and HDF we have a fully supported version available. So based on example code I wrote a simple Kafka Streams Java 8 application that reads Kafka JSON messages sent from NiFi 1.8 and check for some conditions and push out data to MQTT and another Kafka topic.
If you don't have an MQTT broker. Here is a quick way to install a Mosquitto MQTT broker on Centos 7.
  1. sudo yum -y install mosquitto
  2. /etc/mosquitto/mosquitto.conf
  3. mkdir -p /var/log/mosquitto<br>chmod -R 777 /var/log/mosquitto/<br>touch /var/log/mosquitto/mosquitto.log<br>sudo systemctl start mosquitto<br>sudo systemctl enable mosquitto
Now that we have an MQTT broker our Kafka Streams app can send messages to it and NiFi can read messages from it.
In a future version I will use Hortonworks Schema Registry and Avro.
I have updated the Python script to include TensorFlow and to update to Python 3.5. Make sure you run with Python 3.5 and have all the libraries installed on your RPI/Linux device.
Some of the updated code for 3.5, note the message encoding. Python: https://github.com/tspannhw/minifi-breakoutgarden/blob/master/minifi35.py
  1. def send_tcp(s, message):
  2. if not message:
  3. try:
  4. s.sendall(message.encode('utf-8')) <br> except:
  5. print("Failed to send message")

For testing IOT values, I have a GenerateFlowFile with this JSON:
  1. {
  2. "systemtime" : "${now():format('MM/dd/yyyy HH:mm:ss')}",
  3. "BH1745_green" : "${random():mod(100):plus(1)} ",
  4. "ltr559_prox" : "0000",
  5. "end" : "${now():format('yyyyMMddHHmmss')}",
  6. "uuid" : "${now():format('yyyyMMddHHmmss')}_${UUID()}",
  7. "lsm303d_accelerometer" : "+00.06g : -01.01g : +00.04g",
  8. "imgnamep" : "images/bog_image_p_${now():format('yyyyMMddHHmmss')}_${UUID()}.jpg",
  9. "cputemp" : ${random():mod(100):toNumber()},
  10. "BH1745_blue" : "9.0",
  11. "te" : "47.3427119255",
  12. "bme680_tempc" : "28.19",
  13. "imgname" : "images/bog_image_${now():format('yyyyMMddHHmmss')}_${UUID()}.jpg",
  14. "bme680_tempf" : "80.${random():mod(100):toNumber()}",
  15. "ltr559_lux" : "006.87",
  16. "memory" : 34.9,
  17. "VL53L1X_distance_in_mm" : 134,
  18. "bme680_humidity" : "${random():mod(100):toNumber()}",
  19. "host" : "vid5",
  20. "diskusage" : "8732.7",
  21. "ipaddress" : "192.168.1.167",
  22. "bme680_pressure" : "1017.31",
  23. "BH1745_clear" : "10.0",
  24. "BH1745_red" : "0.0",
  25. "lsm303d_magnetometer" : "+00.04 : +00.34 : -00.10",
  26. "starttime" : "${now():format('MM/dd/yyyy HH:mm:ss')}"
  27. }
Kafka Streams Source Code:
Running the Fat Jar:
  1. java -jar target/kstreams-1.0.jar<br>******************************************* Started <br>**********2018/12/28 16:41:19<br>**********
  2. Memory Usage: 28284968
Updated Source Code:
Updated Example Run Output
  1. {
  2. "ltr559_lux" : "033.75",
  3. "uuid" : "20181228162321_cbd0cbd3-17f6-4730-ae43-1e7b46a01135",
  4. "cputemp" : 51,
  5. "host" : "piups",
  6. "lsm303d_magnetometer" : "-00.12 : +00.27 : +00.15",
  7. "bme680_tempc" : "24.96",
  8. "score" : "0.9694475",
  9. "lsm303d_accelerometer" : "+00.12g : -01.00g : +00.08g",
  10. "ltr559_prox" : "0000",
  11. "bme680_humidity" : "28.875",
  12. "diskusage" : "10058.7",
  13. "human_string" : "electric fan, blower",
  14. "bme680_pressure" : "1012.00",
  15. "BH1745_green" : "31.0",
  16. "imgnamep" : "/opt/demo/images/bog_image_p_20181228162321_cbd0cbd3-17f6-4730-ae43-1e7b46a01135.jpg",
  17. "systemtime" : "12/28/2018 11:24:11",
  18. "BH1745_red" : "33.0",
  19. "starttime" : "12/28/2018 11:16:02",
  20. "BH1745_blue" : "19.8",
  21. "end" : "1546014251.2879872",
  22. "bme680_tempf" : "76.93",
  23. "VL53L1X_distance_in_mm" : 455,
  24. "te" : "488.33915853500366",
  25. "memory" : 70.8,
  26. "imgname" : "/opt/demo/images/bog_image_20181228162321_cbd0cbd3-17f6-4730-ae43-1e7b46a01135.jpg",
  27. "ipaddress" : "192.168.1.166",
  28. "BH1745_clear" : "40.0"
  29. }
From Kafka Streams I am sending a warning on temperature to MQTT which NiFi sends to Slack.
Temperature warning 82.74
Using HBase 2.0, we are storing out data as it streams from Kafka Streams to NiFi. We use PutHBaseRecord which utilizes record processing and our schema to stream our JSON into HBase with ease.
Updated Schema with TF Attributes
  1. {
  2. "type": "record",
  3. "name": "garden",
  4. "fields": [
  5. {
  6. "name": "systemtime",
  7. "type": "string"
  8. },
  9. {
  10. "name": "BH1745_green",
  11. "type": "string"
  12. },
  13. {
  14. "name": "human_string",
  15. "type": "string",
  16. "default": "UNK"
  17. },
  18. {
  19. "name": "ltr559_prox",
  20. "type": "string"
  21. },
  22. {
  23. "name": "end",
  24. "type": "string"
  25. },
  26. {
  27. "name": "uuid",
  28. "type": "string"
  29. },
  30. {
  31. "name": "lsm303d_accelerometer",
  32. "type": "string"
  33. },
  34. {
  35. "name": "score",
  36. "type": "string",
  37. "default": "0"
  38. },
  39. {
  40. "name": "imgnamep",
  41. "type": "string"
  42. },
  43. {
  44. "name": "cputemp",
  45. "type": "double",
  46. "doc": "Type inferred from '58.0'"
  47. },
  48. {
  49. "name": "BH1745_blue",
  50. "type": "string",
  51. "doc": "Type inferred from '\"10.8\"'"
  52. },
  53. {
  54. "name": "te",
  55. "type": "string",
  56. "doc": "Type inferred from '\"254.545491934\"'"
  57. },
  58. {
  59. "name": "bme680_tempc",
  60. "type": "string",
  61. "doc": "Type inferred from '\"29.13\"'"
  62. },
  63. {
  64. "name": "imgname",
  65. "type": "string"
  66. },
  67. {
  68. "name": "bme680_tempf",
  69. "type": "string",
  70. "doc": "Type inferred from '\"84.43\"'"
  71. },
  72. {
  73. "name": "ltr559_lux",
  74. "type": "string",
  75. "doc": "Type inferred from '\"077.95\"'"
  76. },
  77. {
  78. "name": "memory",
  79. "type": "double",
  80. "doc": "Type inferred from '37.6'"
  81. },
  82. {
  83. "name": "VL53L1X_distance_in_mm",
  84. "type": "int",
  85. "doc": "Type inferred from '161'"
  86. },
  87. {
  88. "name": "bme680_humidity",
  89. "type": "string",
  90. "doc": "Type inferred from '\"32.359\"'"
  91. },
  92. {
  93. "name": "host",
  94. "type": "string",
  95. "doc": "Type inferred from '\"vid5\"'"
  96. },
  97. {
  98. "name": "diskusage",
  99. "type": "string",
  100. "doc": "Type inferred from '\"8357.6\"'"
  101. },
  102. {
  103. "name": "ipaddress",
  104. "type": "string",
  105. "doc": "Type inferred from '\"192.168.1.167\"'"
  106. },
  107. {
  108. "name": "bme680_pressure",
  109. "type": "string",
  110. "doc": "Type inferred from '\"987.86\"'"
  111. },
  112. {
  113. "name": "BH1745_clear",
  114. "type": "string",
  115. "doc": "Type inferred from '\"90.0\"'"
  116. },
  117. {
  118. "name": "BH1745_red",
  119. "type": "string",
  120. "doc": "Type inferred from '\"33.0\"'"
  121. },
  122. {
  123. "name": "lsm303d_magnetometer",
  124. "type": "string"
  125. },
  126. {
  127. "name": "starttime",
  128. "type": "string"
  129. }
  130. ]
  131. }
HBase table
create 'breakout', 'sensors'
Example Row
  1. 1546014251.2879872 column=sensors:BH1745_blue, timestamp=1546020326955, value=19.8
  2. 1546014251.2879872 column=sensors:BH1745_clear, timestamp=1546020326955, value=40.0
  3. 1546014251.2879872 column=sensors:BH1745_green, timestamp=1546020326955, value=31.0
  4. 1546014251.2879872 column=sensors:BH1745_red, timestamp=1546020326955, value=33.0
  5. 1546014251.2879872 column=sensors:VL53L1X_distance_in_mm, timestamp=1546020326955, value=455
  6. 1546014251.2879872 column=sensors:bme680_humidity, timestamp=1546020326955, value=28.875
  7. 1546014251.2879872 column=sensors:bme680_pressure, timestamp=1546020326955, value=1012.00
  8. 1546014251.2879872 column=sensors:bme680_tempc, timestamp=1546020326955, value=24.96
  9. 1546014251.2879872 column=sensors:bme680_tempf, timestamp=1546020326955, value=76.93
  10. 1546014251.2879872

Apache NiFi Operations and Monitoring 101

NiFi Operations






https://community.hortonworks.com/articles/183217/devops-backing-up-apache-nifi-registry-flows.html

Real-Time Energy Monitoring With Apache NiFi

Monitoring Energy




A simple example of using Apache NiFi to receive and process electric data stored as JSON.   We add SQL based alerting and storage to Parquet files with no code.



Cloudera Data Science Workbench



Using PySQL SQL to analyze Parquet files built by electric data stored to HDFS via Apache NiFi.


 We can display alerts easily with Slack.



If you want to know how something works in the Apache NiFi UI you can analyze the REST calls made by JSON.






Python Libraries
pip3 install pyhs100
pip3 install psutil 

Github Repos
Hardware Meter

  • https://www.amazon.com/TP-Link-HS110-Monitoring-Required-Assistant/dp/B0178IC5ZY


Resources




Barcelona DataWorks Summit March 2019

I just returned from this awesome event.   Not even a rough plane trip can damper my spirits after seeing all the amazing things and all that we got to do this year.   It was nice to see familiar faces from attendees from 2017 and 2018 including my friends from Prague and Germany!

Thanks to Andy LoPresto, George Vetticaden, Dinesh Chandrasekhar, Purnima, Nathan, Dan Chaffelson for great pictures, talks, support and being an amazing team for Data in Motionists.





Meetup

The meetup was great and in the same hall as some other amazing meetups at the same time. A great experience for those at Summit early (and open to all people for free).





Highlight:  Dan spinning up NiFi at scale in the audience on Google Cloud on K8 with ease!

Highlight:  Andy’s crushing it MiNiFi and NiFi presentation! I think he has too many RPIs!




Demopalooza








Edge to AI






Apache Deep Learning 201


All the Githubs

All the great Apache NiFi content from Andy LoPresto is here including custom processors:
https://github.com/alopresto/slides/blob/master/dws_barcelona_2019/
https://github.com/alopresto/slides

Exporting and Importing Data from MongoDB in the Cloud with Apache NiFi

We have data stored in a MongoDB from a third party application in Amazon.
Export from MongoDB to Parquet.
Moving data from a single purpose data silo to your Enterprise Data Lake is a common use case. Using Apache NiFi we can easily save your data from this remote silo and bring it streaming into your analytics store for machine learning and deep analytics with Impala, Hive and Spark. It doesn't matter which cloud which are coming from or going to or from cloud to on-premise or various Hybrid situations. Apache NiFi will work in all of these situations which full data lineage and provenance on what it did when.
I have created a mock dataset with Mockaroo. It's all about yummy South Jersey sandwiches.
Our Easy MongoDB Flows to Ingest Mongo data to our Date Lake and another flow to load MongoDB.
In our test, we loaded all the data from our Mock REST API into a MongoDB in the cloud. In the real world an application populated that dataset and now we need to bring it into our central data lake for analytics.
We use Jolt to replace the non-Hadoop friendly built-in MongoDB _id with a friendly name mongo_id.
Storing to Parquet on HDFS is Easy (Let's compress with Snappy)
Connecting to MongoDB is easy, setup a controller and specify the database and collection.
Our MongoDB Connection Service, just enter your URI with username/password@server.
GetHTTP URL
https://my.api.mockaroo.com/hoagie.json

GetHTTP Filename
${filename:append('hoagie.'):append(${now():format('yyyyMMddHHmmSS'):append(${md5}):append('.json')})}

JSON Path Expression
$.*

JOLT Chain
[{
"operation": "shift",
"spec": {
"_id": "mongo_id",
"*": "&"
}
}]

Mongo URI
mongodb://user:userpassword@server.cloud.com:13916/nifi
Many files stored in HDFS as Parquet