Skip to main content

Cloudera Edge2AI: MiNiFi Java Agent with Raspberry Pi and Thermal Camera and Air Quality Sensor - Part 1

MiNiFi with Thermal Cameras and Air Quality Sensors

Cloudera Edge2AI:  MiNiFi Java Agent with Raspberry Pi and Thermal Camera and Air Quality Sensor - Part 1

Use Case / Overview:

We need to track heat signatures, web camera images, gas and other readings from a remote workers office.   This is for occupancy analytics and safety monitoring.   We can extend this to field and remote sites where levels of temperatures, movements, noxious gases and other real world items may cause risk to our staff.

For tracking room temperature and humidity we have other sensors:   https://www.datainmotion.dev/2019/12/iot-series-minifi-agent-on-raspberry-pi.html
https://www.datainmotion.dev/2019/10/using-grovepi-with-raspberry-pi-and.html
https://www.datainmotion.dev/2019/09/powering-edge-ai-for-sensor-reading.html
https://www.datainmotion.dev/2019/12/easy-deep-learning-in-apache-nifi-with.html

We can also track things like GPS, overhead plane traffic, regionalized social media reports, news, government reports, weather, mass transit status, traffic cameras, smoke, stock market, database, files, syslog, logs and anything else we may need to add to improve our machine learning and deep learning models.

We want to send immediate alerts if anything is dangerous to the equipment or living things in the area.   We have deployed a pre-built TensorFlow Lite model to our edge device to execute on incoming images.   We can deploy our own models automagically via Apache NiFi - MiNiFi Agents and Edge Flow Manager:   https://www.datainmotion.dev/2019/08/updating-machine-learning-models-at.html.


Hardware Component List:

Software Component List:
  • Raspian
  • Python 3.7
  • OpenJDK 8 Java
  • Apache NiFi 1.10
  • MiniFi Agent 0.6.0 Java
  • Cloudera Edge Flow Manager (CEM)
  • Apache Kudu
  • Apache Kafka 2.x
  • Cloudera Streams Messaging Manager
  • Cloudera Manager 7.0.3
  • Apache Hue
  • Cloudera Schema Registry
  • Apache NiFi Registry
  • CDP on AWS with Kafka and NiFi Data Hub
Source Code:
Example JSON Data:

{"uuid": "sgp30_uuid_qmy_20200104232746", "ipaddress": "192.168.1.221", "runtime": "0", "host": "garden2", "host_name": "garden2", "macaddress": "dc:a6:32:32:98:20", "end": "1578180466.510303", "te": "0.027782917022705078", "systemtime": "01/04/2020 18:27:46", "cpu": 3.9, "diskusage": "111786.0 MB", "memory": 9.7, "equivalentco2ppm": "  405", "totalvocppb": "   16", "id": "20200104232746_9379ae31-d848-4655-964f-92bd1b5e63fe"}

 { "node_id_0" : "511",
  "label_0" : "container ship, containership, container vessel",
  "result_0" : "0.439216",
  "node_id_1" : "650",
  "label_1" : "megalith, megalithic structure",
  "result_1" : "0.050980",
  "node_id_2" : "580",
  "label_2" : "grand piano, grand",
  "result_2" : "0.050980",
  "node_id_3" : "882",
  "label_3" : "upright, upright piano",
  "result_3" : "0.027451",
  "node_id_4" : "518",
  "label_4" : "crane",
  "result_4" : "0.023529",
  "uuid" : "tensorflow_uuid_mbk_20200106194910",
  "ipaddress" : "192.168.1.221",
  "runtime" : "0",
  "host" : "garden2",
  "host_name" : "garden2",
  "macaddress" : "dc:a6:32:32:98:20",
  "end" : "1578340151.1180859",
  "te" : "0.18831157684326172",
  "systemtime" : "01/06/2020 14:49:11",
  "cpu" : 45.8,
  "diskusage" : "109621.5 MB",
  "memory" : 50.2,
  "id" : "20200106194910_098df463-4d9e-4326-9f8c-ad12fe55c7d2" }

Example Thermal Data:

Example Web Cam Data:



Device:



Setup:
We need to make sure the Raspberry Pi is up to date and has some software available such as git, curl and unzip.    I can then build the MLX library.
I had to install the sgp30-python using the install.sh from the github, don't use the pip3 install.   Initial calibration will take a while, so let that happen.
A demo is installed:   /home/pi/Pimoroni/sgp30/examples
MLX Thermal Camera
There is a nice Python utility that will build a GIF animation from the camera.   /opt/demo/mlx90640-library/python
Make sure you build all examples.
Install Java for MiNiFi Agent
sudo apt install openjdk-8-jdk openjdk-8-jre

Download MiNiFi from Cloudera or nifi.apache.org and copy zip/tar/gz to your device.

MiNiFi Ingest:

Web Camera Images
Sensor JSON Logs
Thermal Videos
TensorFlow Lite Classification Results

MiNiFi can grab my data from my device whether it's files, logs, images, sensor readings, python app calls, shell scripts, TCP, UDP or whatever.  Goodbye cron.


Building a MiNiFi flow and pushing that to make MiNiFi agents running on various devices is a snap.   I drag and drop a few components, set some parameters such which file to tail, what directory to list and what scripts to run and bam send it to the cloud.




It is very easy to monitor my progress with EFM event viewer or REST API.


NiFi Processing:

  • Receive From MiNiFi Agents
  • Route Images to Image Processing
  • QueryRecord to limit and route records
  • Push to Kafka



Additional NiFi Processing in Second AWS Cluster


  • Consume Kafka
  • Insert into Kudu Table







We need to add topics to Kafka so we can send our messages for further processing.


Before I can create Kafka topics, send messages or monitor Kafka, I need a Kafka cluster.   Using The Enterprise Data Cloud, Cloudera Data Platform lets me easily spin up a Kafka cluster on AWS or Azure (soon Google).



If I don't like fancy web UIs or need to script this, there is a CDP CLI that I can create a cluster using a JSON template.


It is very easy to build NiFi and Kafka Data Hubs and make them available to users/developers in minutes.   So we are now ready to rock.






Since we have been producing and consuming thousands of messages in different topics to my cloud hosted Kafka cluster, let's see what's going on by using Cloudera's Streams Messaging Manager (SMM).   







Let's see the data now that it has landed in Impala/Kudu tables.   So easy to query my tables with Apache Hue.



We can see the data displayed in Slack channels.



I can see my tables have been built in Kudu.



Cloud Storage - Kudu Tables:

CREATE TABLE webcam ( uuid STRING,  `end` STRING, systemtime STRING, runtime STRING, cpu DOUBLE, id STRING, te STRING,
host STRING,
macaddress STRING, diskusage STRING, memory DOUBLE, ipaddress STRING, host_name STRING,
node_id_0 INT, label_0 STRING, result_0 DOUBLE,
node_id_1 INT, label_1 STRING, result_1 DOUBLE,
node_id_2 INT, label_2 STRING, result_2 DOUBLE,
node_id_3 INT, label_3 STRING, result_3 DOUBLE,
node_id_4 INT, label_4 STRING, result_4 DOUBLE,
PRIMARY KEY (uuid, `end`)
)
PARTITION BY HASH PARTITIONS 4
STORED AS KUDU
TBLPROPERTIES ('kudu.num_tablet_replicas' = '1');
CREATE TABLE gassensors ( uuid STRING,  `end` STRING, systemtime STRING, runtime STRING, cpu DOUBLE, id STRING, te STRING,
host STRING, equivalentco2ppm STRING,  totalvocppb STRING,
macaddress STRING,  diskusage STRING, memory DOUBLE, ipaddress STRING, host_name STRING,
PRIMARY KEY (uuid, `end`)
)
PARTITION BY HASH PARTITIONS 4
STORED AS KUDU
TBLPROPERTIES ('kudu.num_tablet_replicas' = '1'); 
CREATE TABLE bme280sensors ( uuid STRING,  `end` STRING, systemtime STRING, runtime STRING, cpu DOUBLE, id STRING,
te STRING,
host STRING,
macaddress STRING,  diskusage STRING, memory DOUBLE, ipaddress STRING,
host_name STRING, bme280_altitude STRING, bme280_tempf STRING, max30105timestamp STRING,
max30105_detected STRING, max30105_delta STRING, max30105_temp STRING, bme280_tempc STRING,
max30105_mean STRING, max30105_value STRING, bme280_altitude_feet STRING, bme280_pressure STRING,
starttime STRING, cputemp DOUBLE, imgnamep STRING, imgname STRING,
PRIMARY KEY (uuid, `end`)
)
PARTITION BY HASH PARTITIONS 4
STORED AS KUDU
TBLPROPERTIES ('kudu.num_tablet_replicas' = '1');
Python Libraries:

wget https://dl.google.com/coral/python/tflite_runtime-1.14.0-cp37-cp37m-linux_armv7l.whl 
pip3 install scikit-image 
pip3 install getmac 
pip3 install psutilpip3 install --upgrade pip 
pip3 install --upgrade setuptools 
pip3 install tflite_runtime-1.14.0-cp37-cp37m-linux_armv7l.whl 
pip3 install easydict -U 
pip3 install scikit-learn -U 
pip3 install opencv-python -U --user 
pip3 install numpy -U 
pip3 install mxnet -U 
pip3 install gluoncv --upgrade 
pip3 install tensorflow

Summary:
We read thermal images, sensors and camera images with a MiNiFi agent that sends this data via MQTT and HTTP(S) Site-to-Site (S2S).   We process MQTT, Kafka and S2S data streams with NiFi and easily push the final data into Kudu tables which we can query.

So we easily ingest structured, unstructured and semistructured data with live analytics.

In the second part of this article I will show you how we integrate with CDSW for additional machine learning on our data.


References

Popular posts from this blog

Migrating Apache Flume Flows to Apache NiFi: Kafka Source to HDFS / Kudu / File / Hive

Migrating Apache Flume Flows to Apache NiFi: Kafka Source to HDFS / Kudu / File / HiveArticle 7 - https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_9.html Article 6 - https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_35.html
Article 5 - 
Article 4 - https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_8.html Article 3 - https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_7.html Article 2 - https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache.html Article 1https://www.datainmotion.dev/2019/08/migrating-apache-flume-flows-to-apache.html Source Code:  https://github.com/tspannhw/flume-to-nifi
This is one possible simple, fast replacement for "Flafka".



Consume / Publish Kafka And Store to Files, HDFS, Hive 3.1, Kudu

Consume Kafka Flow 

 Merge Records And Store As AVRO or ORC
Consume Kafka, Update Records via Machine Learning Models In CDSW And Store to Kudu

Sour…

Exploring Apache NiFi 1.10: Stateless Engine and Parameters

Exploring Apache NiFi 1.10:   Stateless Engine and Parameters Apache NiFi is now available in 1.10!
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12316020&version=12344993

You can now use JDK 8 or JDK 11!   I am running in JDK 11, seems a bit faster.

A huge feature is the addition of Parameters!   And you can use these to pass parameters to Apache NiFi Stateless!

A few lesser Processors have been moved from the main download, see here for migration hints:
https://cwiki.apache.org/confluence/display/NIFI/Migration+Guidance

Release Notes:   https://cwiki.apache.org/confluence/display/NIFI/Release+Notes#ReleaseNotes-Version1.10.0

Example Source Code:https://github.com/tspannhw/stateless-examples

More New Features:

ParquetReader/Writer (See:  https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_7.html)Prometheus Reporting Task.   Expect more Prometheus stuff coming.Experimental Encrypted content repository.   People asked me for this one before.Par…

Ingesting Drone Data From DJII Ryze Tello Drones Part 1 - Setup and Practice

Ingesting Drone Data From DJII Ryze Tello Drones Part 1 - Setup and Practice In Part 1, we will setup our drone, our communication environment, capture the data and do initial analysis. We will eventually grab live video stream for object detection, real-time flight control and real-time data ingest of photos, videos and sensor readings. We will have Apache NiFi react to live situations facing the drone and have it issue flight commands via UDP. In this initial section, we will control the drone with Python which can be triggered by NiFi. Apache NiFi will ingest log data that is stored as CSV files on a NiFi node connected to the drone's WiFi. This will eventually move to a dedicated embedded device running MiniFi. This is a small personal drone with less than 13 minutes of flight time per battery. This is not a commercial drone, but gives you an idea of the what you can do with drones. Drone Live Communications for Sensor Readings and Drone Control You must connect to the drone…