IoT Series: Sensors: Utilizing Breakout Garden Hat: Part 1 - Introduction

IoT Series: Sensors: Utilizing Breakout Garden Hat: Part 1 - Introduction
An easy option for adding, removing and prototype sensor reads from a standard Raspberry Pi with no special wiring.
Hardware Component List:
  • Raspberry Pi
  • USB Power Cable
  • Pimoroni Breakout Garden Hat
  • 1.12" Mono OLED Breakout 128x128 White/Black Screen
  • BME680 Air Quality, Temperature, Pressure, Humidity Sensor
  • LWM303D 6D0F Motion Sensor (X, Y, Z Axes)
  • BH1745 Luminance and Color Sensor
  • LTR-559 Light and Proximity Sensor 0.01 lux to 64,000 lux
  • VL53L1X Time of Flight (TOF) Sensor Pew Pew Lasers!
Software Component List:
  • Raspian
  • Python 2.7
  • JDK 8 Java
  • Apache NiFi
  • MiniFi
Source Code:
Summary
Our Raspberry Pi has a Breakout Garden Hat with 5 sensors and one small display. The display is showing the last reading and is constantly updating. For debugging purposes, it shows the IP Address so I can connect as needed.
We currently run via nohup, but when we go into constant use I will switch to a Linux Service to run on startup.
The Python script initializes the connections to all of the sensors and then goes into an infinite loop of reading those values and building a JSON packet that we send via TCP/IP over port 5005 to a listener. MiniFi 0.5.0 Java Agent is using ListenTCP on that port to capture these messages and filter them based on alarm values. If outside of the checked parameters we send them via S2S/HTTP(s) to an Apache NiFi server.
We also have a USB WebCam (Sony Playstation 3 EYE) that is capturing images and we read those with MiniFi and send them to NiFi as well.
The first thing we need to do is pretty easy. We need to plug in our Pimoroni Breakout Garden Hat and our 6 plugs.
You have to do the standard installation of Python, Java 8, MiniFi and I recommend OpenCV. Make sure you have everything plugged in securely and the correct direction before you power on the Raspberry Pi.
Install Python PIP curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py
unzip master.zip
cd breakout-garden-master
sudo ./install.sh
Running In NiFi

First we build our MiniFi Flow:
We have two objectives: listen for TCP/IP JSON messages from our running Python sensor collector and gather images captured by the PS3 Eye USB Webcam.
We then add content type and schema information to the attributes. We also extract a few values from the JSON stream to use for alerting.
We extract: $.cputemp, $.VL53L1X_distance_in_mm, $.bme680_humidity, $.bme680_tempf
These attributes are now attached to our flowfile which is unchanged. We can now Route on them.
So we route on a few alarm conditions:
${cputemp:gt(100)}
${humidity:gt(60)}
${tempf:gt(80)}
${distance:gt(0)}
We can easily add more conditions or different set values. We can also populate these set values from an HTTP / file lookup.
If these values are met we send to our local Apache NiFi router. This can then do further analysis with the fuller NiFi processor set including TensorFlow, MXNet, Record processing and lookups.
Local NiFi Routing
For now we are just splitting up the images and JSON and sending to two different remote ports on our cloud NiFi cluster.
These then arrive in the cloud.
As you can see a list of the flow files waiting to be processed (I haven't written that part yet). As you can see we are getting a few a second, we could get 100,000 a second if we needed. Just add nodes. Instant scaling. Cloudbreak can do that for you.
In part 2, we will start processing these data streams and images. We will also add Apache MXNet and TensorFlow at various points on the edge, router and cloud using Python and built-in Deep Learning NiFi processors I have authored. We will also break apart these records and send each sensor to it's own Kafka topic to be processed with Kafka Streams, Druid, Hive and HBase.
As part of our loop we write to our little screen current values:

Example Record
  1. {
  2. "systemtime" : "12/19/2018 22:15:56",
  3. "BH1745_green" : "4.0",
  4. "ltr559_prox" : "0000",
  5. "end" : "1545275756.7",
  6. "uuid" : "20181220031556_e54721d6-6110-40a6-aa5c-72dbd8a8dcb2",
  7. "lsm303d_accelerometer" : "+00.06g : -01.01g : +00.04g",
  8. "imgnamep" : "images/bog_image_p_20181220031556_e54721d6-6110-40a6-aa5c-72dbd8a8dcb2.jpg",
  9. "cputemp" : 51.0,
  10. "BH1745_blue" : "9.0",
  11. "te" : "47.3427119255",
  12. "bme680_tempc" : "28.19",
  13. "imgname" : "images/bog_image_20181220031556_e54721d6-6110-40a6-aa5c-72dbd8a8dcb2.jpg",
  14. "bme680_tempf" : "82.74",
  15. "ltr559_lux" : "006.87",
  16. "memory" : 34.9,
  17. "VL53L1X_distance_in_mm" : 134,
  18. "bme680_humidity" : "23.938",
  19. "host" : "vid5",
  20. "diskusage" : "8732.7",
  21. "ipaddress" : "192.168.1.167",
  22. "bme680_pressure" : "1017.31",
  23. "BH1745_clear" : "10.0",
  24. "BH1745_red" : "0.0",
  25. "lsm303d_magnetometer" : "+00.04 : +00.34 : -00.10",
  26. "starttime" : "12/19/2018 22:15:09"
  27. }
NiFi Templates
Let's Build Those Topics Now
  1. /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic bme680
  2. /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic bh17455
  3. /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic lsm303d
  4. /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic vl53l1x
  5. /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic ltr559

Hopefully in your environment, you will be able to have 3, 5 or 7 replication factor and many partitions. I have one Kafka Broker so this is what we are starting with.

Reference

IoT Series: Sensors: Utilizing Breakout Garden Hat: Part 2 - Integrating MQTT, TensorFlow and Kafka Streams

IoT Series: Sensors: Utilizing Breakout Garden Hat: Part 2 - Integrating MQTT, TensorFlow and Kafka Streams
In this second part, I have incremented the functionality in the Python capture, MiniFi, NiFi and post-NiFi processing. I have added a Kafka Streams Java application.
With this NiFi flow we are consuming the MQTT and Kafka messages send by the Kafka Streams application.
In one flow, we received MQTT messages, pull out the entire flow as a message and send to a Slack channel.
In another flow we ingest two types of Kafka messages and store the JSON ones that have a schema in an HBase table via the record processor.
In this flow we receive from the local NiFi router that was called by MiniFi over S2S/HTTP(s). We build two types of messages and send them to Kafka 2.0 brokers. One is the full JSON message with a schema, the other is just the temperature. We create a Kafka Key from the UUID. We also process the images sent from MiniFi with my native Java TensorFlow Inception processor.
I decided to try some TensorFlow processing for our infinite sensor loop, it may be too much memory usage, so I may have to pick a different TensorFlow model and switch to TF Lite (https://www.tensorflow.org/lite/devguide). You will not two extra attributes coming from the Python script running on the Raspberry Pi 3B+.
Another thing I wanted to do is try Kafka Streams since in Kafka 2.0 in HDP and HDF we have a fully supported version available. So based on example code I wrote a simple Kafka Streams Java 8 application that reads Kafka JSON messages sent from NiFi 1.8 and check for some conditions and push out data to MQTT and another Kafka topic.
If you don't have an MQTT broker. Here is a quick way to install a Mosquitto MQTT broker on Centos 7.
  1. sudo yum -y install mosquitto
  2. /etc/mosquitto/mosquitto.conf
  3. mkdir -p /var/log/mosquitto<br>chmod -R 777 /var/log/mosquitto/<br>touch /var/log/mosquitto/mosquitto.log<br>sudo systemctl start mosquitto<br>sudo systemctl enable mosquitto
Now that we have an MQTT broker our Kafka Streams app can send messages to it and NiFi can read messages from it.
In a future version I will use Hortonworks Schema Registry and Avro.
I have updated the Python script to include TensorFlow and to update to Python 3.5. Make sure you run with Python 3.5 and have all the libraries installed on your RPI/Linux device.
Some of the updated code for 3.5, note the message encoding. Python: https://github.com/tspannhw/minifi-breakoutgarden/blob/master/minifi35.py
  1. def send_tcp(s, message):
  2. if not message:
  3. try:
  4. s.sendall(message.encode('utf-8')) <br> except:
  5. print("Failed to send message")

For testing IOT values, I have a GenerateFlowFile with this JSON:
  1. {
  2. "systemtime" : "${now():format('MM/dd/yyyy HH:mm:ss')}",
  3. "BH1745_green" : "${random():mod(100):plus(1)} ",
  4. "ltr559_prox" : "0000",
  5. "end" : "${now():format('yyyyMMddHHmmss')}",
  6. "uuid" : "${now():format('yyyyMMddHHmmss')}_${UUID()}",
  7. "lsm303d_accelerometer" : "+00.06g : -01.01g : +00.04g",
  8. "imgnamep" : "images/bog_image_p_${now():format('yyyyMMddHHmmss')}_${UUID()}.jpg",
  9. "cputemp" : ${random():mod(100):toNumber()},
  10. "BH1745_blue" : "9.0",
  11. "te" : "47.3427119255",
  12. "bme680_tempc" : "28.19",
  13. "imgname" : "images/bog_image_${now():format('yyyyMMddHHmmss')}_${UUID()}.jpg",
  14. "bme680_tempf" : "80.${random():mod(100):toNumber()}",
  15. "ltr559_lux" : "006.87",
  16. "memory" : 34.9,
  17. "VL53L1X_distance_in_mm" : 134,
  18. "bme680_humidity" : "${random():mod(100):toNumber()}",
  19. "host" : "vid5",
  20. "diskusage" : "8732.7",
  21. "ipaddress" : "192.168.1.167",
  22. "bme680_pressure" : "1017.31",
  23. "BH1745_clear" : "10.0",
  24. "BH1745_red" : "0.0",
  25. "lsm303d_magnetometer" : "+00.04 : +00.34 : -00.10",
  26. "starttime" : "${now():format('MM/dd/yyyy HH:mm:ss')}"
  27. }
Kafka Streams Source Code:
Running the Fat Jar:
  1. java -jar target/kstreams-1.0.jar<br>******************************************* Started <br>**********2018/12/28 16:41:19<br>**********
  2. Memory Usage: 28284968
Updated Source Code:
Updated Example Run Output
  1. {
  2. "ltr559_lux" : "033.75",
  3. "uuid" : "20181228162321_cbd0cbd3-17f6-4730-ae43-1e7b46a01135",
  4. "cputemp" : 51,
  5. "host" : "piups",
  6. "lsm303d_magnetometer" : "-00.12 : +00.27 : +00.15",
  7. "bme680_tempc" : "24.96",
  8. "score" : "0.9694475",
  9. "lsm303d_accelerometer" : "+00.12g : -01.00g : +00.08g",
  10. "ltr559_prox" : "0000",
  11. "bme680_humidity" : "28.875",
  12. "diskusage" : "10058.7",
  13. "human_string" : "electric fan, blower",
  14. "bme680_pressure" : "1012.00",
  15. "BH1745_green" : "31.0",
  16. "imgnamep" : "/opt/demo/images/bog_image_p_20181228162321_cbd0cbd3-17f6-4730-ae43-1e7b46a01135.jpg",
  17. "systemtime" : "12/28/2018 11:24:11",
  18. "BH1745_red" : "33.0",
  19. "starttime" : "12/28/2018 11:16:02",
  20. "BH1745_blue" : "19.8",
  21. "end" : "1546014251.2879872",
  22. "bme680_tempf" : "76.93",
  23. "VL53L1X_distance_in_mm" : 455,
  24. "te" : "488.33915853500366",
  25. "memory" : 70.8,
  26. "imgname" : "/opt/demo/images/bog_image_20181228162321_cbd0cbd3-17f6-4730-ae43-1e7b46a01135.jpg",
  27. "ipaddress" : "192.168.1.166",
  28. "BH1745_clear" : "40.0"
  29. }
From Kafka Streams I am sending a warning on temperature to MQTT which NiFi sends to Slack.
Temperature warning 82.74
Using HBase 2.0, we are storing out data as it streams from Kafka Streams to NiFi. We use PutHBaseRecord which utilizes record processing and our schema to stream our JSON into HBase with ease.
Updated Schema with TF Attributes
  1. {
  2. "type": "record",
  3. "name": "garden",
  4. "fields": [
  5. {
  6. "name": "systemtime",
  7. "type": "string"
  8. },
  9. {
  10. "name": "BH1745_green",
  11. "type": "string"
  12. },
  13. {
  14. "name": "human_string",
  15. "type": "string",
  16. "default": "UNK"
  17. },
  18. {
  19. "name": "ltr559_prox",
  20. "type": "string"
  21. },
  22. {
  23. "name": "end",
  24. "type": "string"
  25. },
  26. {
  27. "name": "uuid",
  28. "type": "string"
  29. },
  30. {
  31. "name": "lsm303d_accelerometer",
  32. "type": "string"
  33. },
  34. {
  35. "name": "score",
  36. "type": "string",
  37. "default": "0"
  38. },
  39. {
  40. "name": "imgnamep",
  41. "type": "string"
  42. },
  43. {
  44. "name": "cputemp",
  45. "type": "double",
  46. "doc": "Type inferred from '58.0'"
  47. },
  48. {
  49. "name": "BH1745_blue",
  50. "type": "string",
  51. "doc": "Type inferred from '\"10.8\"'"
  52. },
  53. {
  54. "name": "te",
  55. "type": "string",
  56. "doc": "Type inferred from '\"254.545491934\"'"
  57. },
  58. {
  59. "name": "bme680_tempc",
  60. "type": "string",
  61. "doc": "Type inferred from '\"29.13\"'"
  62. },
  63. {
  64. "name": "imgname",
  65. "type": "string"
  66. },
  67. {
  68. "name": "bme680_tempf",
  69. "type": "string",
  70. "doc": "Type inferred from '\"84.43\"'"
  71. },
  72. {
  73. "name": "ltr559_lux",
  74. "type": "string",
  75. "doc": "Type inferred from '\"077.95\"'"
  76. },
  77. {
  78. "name": "memory",
  79. "type": "double",
  80. "doc": "Type inferred from '37.6'"
  81. },
  82. {
  83. "name": "VL53L1X_distance_in_mm",
  84. "type": "int",
  85. "doc": "Type inferred from '161'"
  86. },
  87. {
  88. "name": "bme680_humidity",
  89. "type": "string",
  90. "doc": "Type inferred from '\"32.359\"'"
  91. },
  92. {
  93. "name": "host",
  94. "type": "string",
  95. "doc": "Type inferred from '\"vid5\"'"
  96. },
  97. {
  98. "name": "diskusage",
  99. "type": "string",
  100. "doc": "Type inferred from '\"8357.6\"'"
  101. },
  102. {
  103. "name": "ipaddress",
  104. "type": "string",
  105. "doc": "Type inferred from '\"192.168.1.167\"'"
  106. },
  107. {
  108. "name": "bme680_pressure",
  109. "type": "string",
  110. "doc": "Type inferred from '\"987.86\"'"
  111. },
  112. {
  113. "name": "BH1745_clear",
  114. "type": "string",
  115. "doc": "Type inferred from '\"90.0\"'"
  116. },
  117. {
  118. "name": "BH1745_red",
  119. "type": "string",
  120. "doc": "Type inferred from '\"33.0\"'"
  121. },
  122. {
  123. "name": "lsm303d_magnetometer",
  124. "type": "string"
  125. },
  126. {
  127. "name": "starttime",
  128. "type": "string"
  129. }
  130. ]
  131. }
HBase table
create 'breakout', 'sensors'
Example Row
  1. 1546014251.2879872 column=sensors:BH1745_blue, timestamp=1546020326955, value=19.8
  2. 1546014251.2879872 column=sensors:BH1745_clear, timestamp=1546020326955, value=40.0
  3. 1546014251.2879872 column=sensors:BH1745_green, timestamp=1546020326955, value=31.0
  4. 1546014251.2879872 column=sensors:BH1745_red, timestamp=1546020326955, value=33.0
  5. 1546014251.2879872 column=sensors:VL53L1X_distance_in_mm, timestamp=1546020326955, value=455
  6. 1546014251.2879872 column=sensors:bme680_humidity, timestamp=1546020326955, value=28.875
  7. 1546014251.2879872 column=sensors:bme680_pressure, timestamp=1546020326955, value=1012.00
  8. 1546014251.2879872 column=sensors:bme680_tempc, timestamp=1546020326955, value=24.96
  9. 1546014251.2879872 column=sensors:bme680_tempf, timestamp=1546020326955, value=76.93
  10. 1546014251.2879872