EdgeAI: Jetson Nano with MiNiFi C++ Agent

Build and Utilizing The Apache NiFi - MiNiFi C++ Agent For Jetson Nano

(EdgeAI:   Jetson Nano with MiNiFi C++ Agent)


source.hostname
jetsonnano

source.ipv4
192.168.1.217

GetUSBCamera

FPS: .5


Bootstrap and Build

/opt/demo/nifi-minifi-cpp-source/build

bootstrap.sh

Options:  Kafka, OpenCV, TensorFlow, USB Camera


****************************************
 Select MiNiFi C++ Features to toggle.
****************************************
A. Persistent Repositories .....Enabled
B. Lib Curl Features ...........Enabled
C. Lib Archive Features ........Enabled
D. Execute Script support ......Enabled
E. Expression Language support .Enabled
F. Kafka support ...............Enabled
G. PCAP support ................Disabled
H. USB Camera support ..........Enabled
I. GPS support .................Disabled
J. TensorFlow Support ..........Disabled
K. Bustache Support ............Disabled
L. MQTT Support ................Enabled
M. SQLite Support ..............Disabled
N. Python Support ..............Enabled
O. COAP Support ................Enabled
S. SFTP Support ................Enabled
V. AWS Support .................Disabled
T. OpenCV Support ..............Enabled
U. OPC-UA Support...............Enabled

****************************************

sudo apt-get install libcurl-dev libcurl4-openssl-dev -y
make


We can see when data arrives in NiFi from a MiNiFi Agent.



 We can publish to Kafka directly from our MiNiFi C++ agent.


If CEM/Edge Flow Manager is a mystery to you, check out the live Swagger REST Documentation.


With MiNiFi C++ I can add a USB Camera.




 In NiFi we can see the Host Information that MiNiFi attached.



Example Data



{"uuid": "nano_uuid_crr_20200218002610", "ipaddress": "192.168.1.217", "top1pct": 54.833984375, "top1": "cab, hack, taxi, taxicab", "cputemp": "45.5", "gputemp": "43.5", "gputempf": "110", "cputempf": "114", "runtime": "4", "host": "jetsonnano", "filename": "/opt/demo/images/image_esq_20200218002610.jpg", "imageinput": "/opt/demo/images/2020-02-17_1926.jpg", "host_name": "jetsonnano", "macaddress": "ec:08:6b:18:0d:7f", "end": "1581985574.6246474", "te": "4.158604383468628", "systemtime": "02/17/2020 19:26:14", "cpu": 51.8, "diskusage": "5479.7 MB", "memory": 71.4, "id": "20200218002610_8a12dd65-1038-41ac-b923-98fc907f5be0"}

Example Config.yml Section


  name: AppendHostInfo
  class: org.apache.nifi.minifi.processors.AppendHostInfo
  max concurrent tasks: 1
  scheduling strategy: TIMER_DRIVEN
  scheduling period: 1000 ms
  penalization period: 30000 ms
  yield period: 1000 ms
  run duration nanos: 0
  auto-terminated relationships list: []
  Properties:
    Hostname Attribute: source.hostname
    IP Attribute: source.ipv4
    Network Interface Name: wlan0

Example Output


[2020-02-11 19:35:09.116] [org::apache::nifi::minifi::processors::ExecuteProcess] [info] Execute Command /opt/demo/rundemo.sh 
[2020-02-11 19:35:11.275] [org::apache::nifi::minifi::c2::C2Agent] [info] Checking 0 triggers
[2020-02-11 19:35:13.742] [org::apache::nifi::minifi::c2::C2Agent] [info] Checking 0 triggers
[2020-02-11 19:35:15.568] [org::apache::nifi::minifi::core::ProcessSession] [info] Transferring 899b5964-4d2f-11ea-8b9a-6e260e221e3d from ExecuteProcess - Python to relationship success
[2020-02-11 19:35:15.568] [org::apache::nifi::minifi::processors::ExecuteProcess] [info] Execute Command Complete /opt/demo/rundemo.sh status 0 pid 31004
[2020-02-11 19:35:15.569] [org::apache::nifi::minifi::core::ProcessSession] [info] Transferring 899b5964-4d2f-11ea-8b9a-6e260e221e3d from AppendHostInfo to relationship success
[2020-02-11 19:35:15.649] [org::apache::nifi::minifi::sitetosite::SiteToSiteClient] [info] Site to Site transaction 4d0b460e-e4f6-4ca1-8c56-30d310a0712b sent flow 1flow records, with total size 3581
[2020-02-11 19:35:15.785] [org::apache::nifi::minifi::sitetosite::HttpSiteToSiteClient] [info] Site to Site closed transaction 4d0b460e-e4f6-4ca1-8c56-30d310a0712b
[2020-02-11 19:35:15.841] [org::apache::nifi::minifi::sitetosite::SiteToSiteClient] [info] Site2Site transaction 4d0b460e-e4f6-4ca1-8c56-30d310a0712b peer finished transaction
[2020-02-11 19:35:15.841] [org::apache::nifi::minifi::io::HttpStream] [warning] Future status already cleared for http://ec2-35-171-154-174.compute-1.amazonaws.com:8080/nifi-api/data-transfer/input-ports/17979d5f-0170-1000-0000-000011f1cc00/transactions/4d0b460e-e4f6-4ca1-8c56-30d310a0712b/flow-files, continuing
[2020-02-11 19:35:16.236] [org::apache::nifi::minifi::c2::C2Agent] [info] Checking 0 triggers
[2020-02-11 19:35:16.263] [org::apache::nifi::minifi::core::ProcessSession] [info] Transferring 8a05413a-4d2f-11ea-8b9a-6e260e221e3d from TailFile to relationship success
[2020-02-11 19:35:16.264] [org::apache::nifi::minifi::processors::TailFile] [info] TailFile nano.log for 616 bytes
[2020-02-11 19:35:16.273] [org::apache::nifi::minifi::core::ProcessSession] [info] Transferring 8a05413a-4d2f-11ea-8b9a-6e260e221e3d from AppendHostInfo to relationship success
[2020-02-11 19:35:16.274] [org::apache::nifi::minifi::core::ProcessSession] [info] Transferring 8a05413a-4d2f-11ea-8b9a-6e260e221e3d from PublishKafka to relationship success
[2020-02-11 19:35:18.748] [org::apache::nifi::minifi::c2::C2Agent] [info] Checking 0 triggers
[2020-02-11 19:35:21.260] [org::apache::nifi::minifi::c2::C2Agent] [info] Checking 0 triggers

Using Apache NiFi - MiNiFi C++ Agent Elsewhere

I am working on a Jetbot robot powered by NVidia Jetson Nano that will use the MiNiFi C++ agent.








References







Quick Tip: NiFi JSON Cleanup

From Vasilis Vagias:


evaluateJsonPath immediately after the InvokeHTTP and replace the flowfile content with the $.response then NiFi unescapes and removes the additional quotes auto magically.

This is helpful for occasions when CDSW returns JSON or other REST APIs which may double encode JSON files.

QuickTip: Ingesting Google Analytics API with Apache NiFi

QuickTip:   Ingesting Google Analytics API with Apache NiFi 



Design your query / test the API here:

https://ga-dev-tools.appspot.com/query-explorer/



Building this NiFi flow is trivial.



Add your URL with tokens from the Query Explorer console.




You will need to reference the JRE that NiFi is using and it's cacerts if you don't want to build your own trust store.   The default password for JDK 8 is changeit.   No really.



Here are our results in clean JSON



Here are some attributes NiFi shows.


Example JSON Results

{
  "kind": "analytics#gaData",
  "id": "https://www.googleapis.com/analytics/v3/data/ga?ids=ga:33&metrics=ga:users,ga:percentNewSessions,ga:sessions&start-date=30daysAgo&end-date=yesterday",
  "query": {
    "start-date": "30daysAgo",
    "end-date": "yesterday",
    "ids": "ga:33",
    "metrics": [
      "ga:users",
      "ga:percentNewSessions",
      "ga:sessions"
    ],
    "start-index": 1,
    "max-results": 1000
  },
  "itemsPerPage": 1000,
  "totalResults": 0,
  "selfLink": "https://www.googleapis.com/analytics/v3/data/ga?ids=ga:33&metrics=ga:users,ga:percentNewSessions,ga:sessions&start-date=30daysAgo&end-date=yesterday",
  "profileInfo": {
    "profileId": "333",
    "accountId": "333",
    "webPropertyId": "UA-333-3",
    "internalWebPropertyId": "33",
    "profileName": "monitorenergy.blogspot.com/",
    "tableId": "ga:33"
  },
  "containsSampledData": false,
  "columnHeaders": [
    {
      "name": "ga:users",
      "columnType": "METRIC",
      "dataType": "INTEGER"
    },
    {
      "name": "ga:percentNewSessions",
      "columnType": "METRIC",
      "dataType": "PERCENT"
    },
    {
      "name": "ga:sessions",
      "columnType": "METRIC",
      "dataType": "INTEGER"
    }
  ],
  "totalsForAllResults": {
    "ga:users": "0",
    "ga:percentNewSessions": "0.0",
    "ga:sessions": "0"
  }
}

You should have a lot more data depending on what you have Google Analytics pointing to.   From here you can use QueryRecord or another record processor to automatically covert, query or route this data.   You can infer a schema or build up a permanent one and store it in Cloudera Schema Registry.   I recommend doing that if this is a frequent process.

Download a reference NiFi flow here:

https://github.com/tspannhw/flows

References:

https://developers.google.com/analytics/devguides/reporting/core/v4

https://developers.google.com/analytics

Analyzing Wood Burning Stoves with FLaNK Stack Part 2 - Analytics

Analyzing Wood Burning Stoves with FLaNK Stack Part 2 - Analytics - Part 2

Part 1:  https://www.datainmotion.dev/2020/01/analyzing-wood-burning-stoves-with.html

See:   https://shop.pimoroni.com/products/sgp30-air-quality-sensor-breakout
  • Sensiron SGP30 TVOC and eCO2 sensor
  • TVOC sensing from 0-60,000 ppb (parts per billion)
  • CO2 sensing from 400 to 60,000 ppm (parts per million)
Running the fire I can see I am getting higher CO2 production than normal.

Since I stored my data in Kudu tables, it's easy to analyze with Impala and Hue.


select equivalentco2ppm, totalvocppb, systemtime
from gassensors
order by equivalentco2ppm desc



select avg( cast(  equivalentco2ppm as double) ) CO2PPM
from gassensors

The average was 493.

Now that we have some time series data, I can start feeding this to some standard machine learning algorithms and have CML and a Data Scientist if me some analytics and help me determine where I a may want an alert.


Up to 400 is considered normal.

400 to 1,000 is typical of occupied locations with air exchange.

Once you get over 1,000 you start getting drowsy and noticeable effects.

Over 2,000 you get headaches, this is a concern.   Over 5,000 you should remove yourself from the situation.   

select appx_median(cast(equivalentco2ppm as double)) median, min(cast(equivalentco2ppm as double)) min, 
       max(cast(equivalentco2ppm as double)) max, avg(cast(equivalentco2ppm as double)) avg, 
stddev(cast(equivalentco2ppm as double)) standarddev,
stddev_pop(cast(equivalentco2ppm as double)) standardpop
from gassensors


Let's start setting alerts at various levels.

We can also look at the indoor air quality.


As a baseline for the sensor, in an empty ventilated room my numbers are:


{"uuid": "sgp30_uuid_glv_20200123132631", "ipaddress": "192.168.1.221", "runtime": "0", "host": "garden3", "host_name": "garden3", "macaddress": "dc:a6:32:32:98:20", "end": "1579785991.7173052", "te": "0.0261075496673584", "systemtime": "01/23/2020 08:26:31", "cpu": 53.5, "diskusage": "109138.7 MB", "memory": 46.5, "equivalentco2ppm": "  412", "totalvocppb": "    6", "id": "20200123132631_dec207f1-9234-4bee-ad38-a0256629c976"}
{"uuid": "sgp30_uuid_snt_20200123132633", "ipaddress": "192.168.1.221", "runtime": "0", "host": "garden3", "host_name": "garden3", "macaddress": "dc:a6:32:32:98:20", "end": "1579785993.7479923", "te": "0.02589273452758789", "systemtime": "01/23/2020 08:26:33", "cpu": 55.6, "diskusage": "109137.0 MB", "memory": 46.5, "equivalentco2ppm": "  403", "totalvocppb": "    5", "id": "20200123132633_3bd5fb39-d6b2-4f23-8904-0ada862ede2b"}
{"uuid": "sgp30_uuid_uha_20200123132635", "ipaddress": "192.168.1.221", "runtime": "0", "host": "garden3", "host_name": "garden3", "macaddress": "dc:a6:32:32:98:20", "end": "1579785995.7779448", "te": "0.025917768478393555", "systemtime": "01/23/2020 08:26:35", "cpu": 51.1, "diskusage": "109135.3 MB", "memory": 46.5, "equivalentco2ppm": "  406", "totalvocppb": "    3", "id": "20200123132635_0412f445-9b8c-43a8-b34a-a5466f914be7"}
{"uuid": "sgp30_uuid_wau_20200123132637", "ipaddress": "192.168.1.221", "runtime": "0", "host": "garden3", "host_name": "garden3", "macaddress": "dc:a6:32:32:98:20", "end": "1579785997.8079107", "te": "0.02591681480407715", "systemtime": "01/23/2020 08:26:37", "cpu": 58.7, "diskusage": "109133.5 MB", "memory": 47.1, "equivalentco2ppm": "  406", "totalvocppb": "   13", "id": "20200123132637_73f069d9-0beb-4d06-a638-2bd92e50ece7"}
{"uuid": "sgp30_uuid_lse_20200123132639", "ipaddress": "192.168.1.221", "runtime": "0", "host": "garden3", "host_name": "garden3", "macaddress": "dc:a6:32:32:98:20", "end": "1579785999.83777", "te": "0.025897502899169922", "systemtime": "01/23/2020 08:26:39", "cpu": 53.1, "diskusage": "109131.6 MB", "memory": 46.5, "equivalentco2ppm": "  410", "totalvocppb": "    1", "id": "20200123132639_1aa392fe-0eb7-4332-9631-83ac5838e153"}

Very low parts per billion between 1 and 13, with nothing changing in the static room seems like that's a 10 ppb margin of error, we can run some queries in Hue for better stats.

Let's look at some data over time for TVOC.

select appx_median(cast(totalvocppb as double)) median, min(cast(totalvocppb as double)) min, 
       max(cast(totalvocppb as double)) max, avg(cast(totalvocppb as double)) avg, 
stddev(cast(totalvocppb as double)) standarddev,
stddev_pop(cast(totalvocppb as double)) standardpop
from gassensors



So what's a good TVOC?   On average we are below the range of potential irritation of 120 - 1200 ppb.   We do have some variance for sensor capabilities and lack of professional calibration.      Median and Average numbers look good.   The maximum is a bit disturbing but can be sensor error, warm up time or other data quality issues.   We'll have to dive more into the numbers.

Next we can look at PM 2.5 values.

Need to crowd source some science here.

We had 3,500+ records of data over 120.

select count(*)
from gassensors
where  cast(totalvocppb as double) > 120

I can see a number of records and the data climb as the fire burns and we add more cherry wood.

select systemtime, equivalentco2ppm, totalvocppb
from gassensors
where  cast(totalvocppb as double) > 120
order by systemtime asc

I should also note that the time series data is coming in every 2 seconds.

select to_timestamp(systemtime, 'MM/dd/yyyy HH:mm:ss'), EXTRACT(to_timestamp(systemtime, 'MM/dd/yyyy HH:mm:ss'), 
    'MINUTE') as minute , 
cast(totalvocppb as double) as TVOC, cast(equivalentco2ppm as double) CO2PPM

from gassensors
order by systemtime desc

Resources



Analyzing Wood Burning Stoves with FLaNK Stack: MiNiFi, Flink, NiFi, Kafka, Kudu

Analyzing Wood Burning Stoves with FLaNK Stack:   MiNiFi, Flink, NiFi, Kafka, Kudu (FLaNK Stack)


Winter has arrived, finally.   The 50-70 F days are over, it dropped below 30 F in Princeton, so time to light up the wood burning stove and burn some season cherry wood (We get cherry wood from a local tree service that removes dead trees for people and then season the wood.  Recycle!) .  It's great for camp fires, smoking meats and for heating up our house.  Also if you have no smelled cherry wood smoke it is amazing.   I wanted to see if having a fire that raised my houses temperature from 67 F to 87 F would produce noticeable sensor readings.   Fortunately, I have a thermal camera sensor (Pimoroni rocks! Add another thing to my list of thinks I love from Britain (Dr. Who, Jelly Babies, Pimoroni and my awesome boss Dan).  I also have Raspberry Pi sensors for temperature, humidity, light and various gas sensors.   Let's see what the numbers look like.    The temperatures and images start greeen and yellow and as they heat up turn red, purple and then pure white.   That's real hot.   Fortunately the Raspberry Pis didn't overheat, had to open a window when we got close to 90.   Yes, temperature regulation and maybe an automated wood feeder would be nice.
  




Inside the Stove


Cherry Wood burning nice in stove, notice Fire on Cloudera T-Shirt

Four  USB PS3 Eye Cameras ($7!!!) attached to Raspberry Pi 3B+ and 4s.


A very organized professional assortment of Pis and sensors...





 It's very easy to spot check my sensor values as they stream through Apache Kafka with Cloudera SMM.


Some Sensor Readings:

{"bme280_tempf": "93.78", "uuid": "20200117195629_104c9f2a-b5a8-43d2-8386-57b7bd05f55a", "systemtime": "01/17/2020 14:56:29", "bme280_altitude": "-41.31", "memory": 92.1, "max30105_value": "84.00", "end": "1579290989.4628081", "imgnamep": "images/bog_image_p_20200117195629_104c9f2a-b5a8-43d2-8386-57b7bd05f55a.jpg", "max30105_temp": "34.56", "ipaddress": "192.168.1.251", "diskusage": "44726.6", "host": "garden2", "max30105timestamp": "20200117-145629-345697", "starttime": "01/17/2020 13:48:29", "bme280_altitude_feet": "-135.53", "max30105_delta": "0.00", "max30105_mean": "84.00", "max30105_detected": "False", "bme280_tempc": "34.32", "bme280_pressure": "1034.61", "cputemp": 59, "te": "4079.87322807312", "imgname": "images/bog_image_20200117195629_104c9f2a-b5a8-43d2-8386-57b7bd05f55a.jpg"}


[{"uuid":"sgp30_uuid_xyg_20200117185015","ipaddress":"192.168.1.221","runtime":"0","host":"garden3","host_name":"garden3","macaddress":"dc:a6:32:32:98:20","end":"1579287015.6653564","te":"0.025962352752685547","systemtime":"01/17/2020 13:50:15","cpu":55.0,"diskusage":"109290.8 MB","memory":29.7,"equivalentco2ppm":"  400","totalvocppb":"   37","id":"20200117185015_b8fbd9c1-fa30-4f70-b20d-e43a2c703b18"}]

{"uuid": "rpi4_uuid_kse_20200117222947", "ipaddress": "192.168.1.243", "host": "rp4", "host_name": "rp4", "macaddress": "dc:a6:32:03:a6:e9", "systemtime": "01/17/2020 17:29:47", "cpu": 50.8, "diskusage": "46208.0 MB", "memory": 18.2, "id": "20200117222947_e9299089-d56f-468b-8bac-897a2918307a", "temperature": "48.96355155197982", "pressure": "1035.4460084255888", "humidity": "0.0", "lux": "49.0753", "proximity": "0", "gas": "Oxidising: 30516.85 Ohms\nReducing: 194406.50 Ohms\nNH3: 104000.00 Ohms"}

{"host": "rp4", "cputemp": "72", "ipaddress": "192.168.1.243", "endtime": "1579293634.02", "runtime": "0.00", "systemtime": "01/17/2020 15:40:34", "starttime": "01/17/2020 15:40:34", "diskfree": "46322.7", "memory": "17.1", "uuid": "20200117204034_99f49e71-7444-4fd7-b82e-7e03720c4c39", "image_filename": "20200117204034_d9f811a3-8582-4b47-b4b4-cb6ec51cca04.jpg"}

The next step is to have NiFi load the data to Kudu, Hive, HBase or Phoenix tables for analysis with Cloudera Data Science Work Bench and some machine learning analytics in Python 3 on either Zeppelin or Jupyter notebooks feeding CDSW.   Then I can host my final model on K8 within CDSW for a real edge to AI application and solve the issue of how much fire in my house is too much? 

This article is part of the FLaNK Stack series, highlighting using the FL(ink) Apache NiFi Kafka Kudu stack for big data streaming development with IoT and AI applications.

FLANK Stack: NiFi Processor for Kafka Consumption on Demand - REST Proxy Example

Writing a Custom Kafka Rest Proxy in 4 Hours


A custom processor for using NiFi as a REST Proxy to Kafka is very easy.  So I made one in NiFi 1.10.   It's a simple Kafka smart client that accepts POSTs, GETs or whatever HTTP request and returns a message from a Kafka topic, topic can be set via variables, HTTP request or your choice.   To get on of the query parameters, you do so like this:   ${http.query.param.topic}.    I am using the plain old KafkaConsumer class. 



If you need data, just CURL it or use your HTTP/REST client controls in Java, Python, Go, Scala, Ruby, C#, VB.NET or whatever.


curl http://localhost:9089?topic=bme680

Download a pre-built NAR and install.   Note:   this is a pre-release Alpha that I quickly built and tested on a few clusters.   This is not an official project or product.   This is a POC for myself to see how hard it could it be.   It's not!   Roll your own or join me in building out an open source project for one.   What requirements do you have?   This worked for me, send a curl, get a message.



Build a topic for Kafka with SMM in seconds


 Here's an entire Kafka REST Proxy in a few steps.

HandleHttpRequest (We could have hundreds of options here.
RouteOnAttribute


Provenance For An Example Kafka REST Call

To use the processor, we need to set some variables for Kafka Broker, Topic, Offset Reset, # of Records to grab at a time, Client Id, Group Id - important for keeping your offset, auto commit, deserializer for your key and value types - String is usually good, maybe Byte.


Kafka to HBase is Easy.

Kafka to Kudu is Easy.




Kafka Proxy Processor For Message Consumption Source


https://github.com/tspannhw/kafkarest-processor

Pre-Built NAR

https://github.com/tspannhw/kafkarest-processor/releases/download/0.1/nifi-kafkarest-nar-1.0.nar

Other Kafka Articles



It's so easy, didn't wake the cat.




Cloudera Edge2AI: MiNiFi Java Agent with Raspberry Pi and Thermal Camera and Air Quality Sensor - Part 1

MiNiFi with Thermal Cameras and Air Quality Sensors

Cloudera Edge2AI:  MiNiFi Java Agent with Raspberry Pi and Thermal Camera and Air Quality Sensor - Part 1

Use Case / Overview:

We need to track heat signatures, web camera images, gas and other readings from a remote workers office.   This is for occupancy analytics and safety monitoring.   We can extend this to field and remote sites where levels of temperatures, movements, noxious gases and other real world items may cause risk to our staff.

For tracking room temperature and humidity we have other sensors:   https://www.datainmotion.dev/2019/12/iot-series-minifi-agent-on-raspberry-pi.html
https://www.datainmotion.dev/2019/10/using-grovepi-with-raspberry-pi-and.html
https://www.datainmotion.dev/2019/09/powering-edge-ai-for-sensor-reading.html
https://www.datainmotion.dev/2019/12/easy-deep-learning-in-apache-nifi-with.html

We can also track things like GPS, overhead plane traffic, regionalized social media reports, news, government reports, weather, mass transit status, traffic cameras, smoke, stock market, database, files, syslog, logs and anything else we may need to add to improve our machine learning and deep learning models.

We want to send immediate alerts if anything is dangerous to the equipment or living things in the area.   We have deployed a pre-built TensorFlow Lite model to our edge device to execute on incoming images.   We can deploy our own models automagically via Apache NiFi - MiNiFi Agents and Edge Flow Manager:   https://www.datainmotion.dev/2019/08/updating-machine-learning-models-at.html.


Hardware Component List:

Software Component List:
  • Raspian
  • Python 3.7
  • OpenJDK 8 Java
  • Apache NiFi 1.10
  • MiniFi Agent 0.6.0 Java
  • Cloudera Edge Flow Manager (CEM)
  • Apache Kudu
  • Apache Kafka 2.x
  • Cloudera Streams Messaging Manager
  • Cloudera Manager 7.0.3
  • Apache Hue
  • Cloudera Schema Registry
  • Apache NiFi Registry
  • CDP on AWS with Kafka and NiFi Data Hub
Source Code:
Example JSON Data:

{"uuid": "sgp30_uuid_qmy_20200104232746", "ipaddress": "192.168.1.221", "runtime": "0", "host": "garden2", "host_name": "garden2", "macaddress": "dc:a6:32:32:98:20", "end": "1578180466.510303", "te": "0.027782917022705078", "systemtime": "01/04/2020 18:27:46", "cpu": 3.9, "diskusage": "111786.0 MB", "memory": 9.7, "equivalentco2ppm": "  405", "totalvocppb": "   16", "id": "20200104232746_9379ae31-d848-4655-964f-92bd1b5e63fe"}

 { "node_id_0" : "511",
  "label_0" : "container ship, containership, container vessel",
  "result_0" : "0.439216",
  "node_id_1" : "650",
  "label_1" : "megalith, megalithic structure",
  "result_1" : "0.050980",
  "node_id_2" : "580",
  "label_2" : "grand piano, grand",
  "result_2" : "0.050980",
  "node_id_3" : "882",
  "label_3" : "upright, upright piano",
  "result_3" : "0.027451",
  "node_id_4" : "518",
  "label_4" : "crane",
  "result_4" : "0.023529",
  "uuid" : "tensorflow_uuid_mbk_20200106194910",
  "ipaddress" : "192.168.1.221",
  "runtime" : "0",
  "host" : "garden2",
  "host_name" : "garden2",
  "macaddress" : "dc:a6:32:32:98:20",
  "end" : "1578340151.1180859",
  "te" : "0.18831157684326172",
  "systemtime" : "01/06/2020 14:49:11",
  "cpu" : 45.8,
  "diskusage" : "109621.5 MB",
  "memory" : 50.2,
  "id" : "20200106194910_098df463-4d9e-4326-9f8c-ad12fe55c7d2" }

Example Thermal Data:

Example Web Cam Data:



Device:



Setup:
We need to make sure the Raspberry Pi is up to date and has some software available such as git, curl and unzip.    I can then build the MLX library.
I had to install the sgp30-python using the install.sh from the github, don't use the pip3 install.   Initial calibration will take a while, so let that happen.
A demo is installed:   /home/pi/Pimoroni/sgp30/examples
MLX Thermal Camera
There is a nice Python utility that will build a GIF animation from the camera.   /opt/demo/mlx90640-library/python
Make sure you build all examples.
Install Java for MiNiFi Agent
sudo apt install openjdk-8-jdk openjdk-8-jre

Download MiNiFi from Cloudera or nifi.apache.org and copy zip/tar/gz to your device.

MiNiFi Ingest:

Web Camera Images
Sensor JSON Logs
Thermal Videos
TensorFlow Lite Classification Results

MiNiFi can grab my data from my device whether it's files, logs, images, sensor readings, python app calls, shell scripts, TCP, UDP or whatever.  Goodbye cron.


Building a MiNiFi flow and pushing that to make MiNiFi agents running on various devices is a snap.   I drag and drop a few components, set some parameters such which file to tail, what directory to list and what scripts to run and bam send it to the cloud.




It is very easy to monitor my progress with EFM event viewer or REST API.


NiFi Processing:

  • Receive From MiNiFi Agents
  • Route Images to Image Processing
  • QueryRecord to limit and route records
  • Push to Kafka



Additional NiFi Processing in Second AWS Cluster


  • Consume Kafka
  • Insert into Kudu Table







We need to add topics to Kafka so we can send our messages for further processing.


Before I can create Kafka topics, send messages or monitor Kafka, I need a Kafka cluster.   Using The Enterprise Data Cloud, Cloudera Data Platform lets me easily spin up a Kafka cluster on AWS or Azure (soon Google).



If I don't like fancy web UIs or need to script this, there is a CDP CLI that I can create a cluster using a JSON template.


It is very easy to build NiFi and Kafka Data Hubs and make them available to users/developers in minutes.   So we are now ready to rock.






Since we have been producing and consuming thousands of messages in different topics to my cloud hosted Kafka cluster, let's see what's going on by using Cloudera's Streams Messaging Manager (SMM).   







Let's see the data now that it has landed in Impala/Kudu tables.   So easy to query my tables with Apache Hue.



We can see the data displayed in Slack channels.



I can see my tables have been built in Kudu.



Cloud Storage - Kudu Tables:

CREATE TABLE webcam ( uuid STRING,  `end` STRING, systemtime STRING, runtime STRING, cpu DOUBLE, id STRING, te STRING,
host STRING,
macaddress STRING, diskusage STRING, memory DOUBLE, ipaddress STRING, host_name STRING,
node_id_0 INT, label_0 STRING, result_0 DOUBLE,
node_id_1 INT, label_1 STRING, result_1 DOUBLE,
node_id_2 INT, label_2 STRING, result_2 DOUBLE,
node_id_3 INT, label_3 STRING, result_3 DOUBLE,
node_id_4 INT, label_4 STRING, result_4 DOUBLE,
PRIMARY KEY (uuid, `end`)
)
PARTITION BY HASH PARTITIONS 4
STORED AS KUDU
TBLPROPERTIES ('kudu.num_tablet_replicas' = '1');
CREATE TABLE gassensors ( uuid STRING,  `end` STRING, systemtime STRING, runtime STRING, cpu DOUBLE, id STRING, te STRING,
host STRING, equivalentco2ppm STRING,  totalvocppb STRING,
macaddress STRING,  diskusage STRING, memory DOUBLE, ipaddress STRING, host_name STRING,
PRIMARY KEY (uuid, `end`)
)
PARTITION BY HASH PARTITIONS 4
STORED AS KUDU
TBLPROPERTIES ('kudu.num_tablet_replicas' = '1'); 
CREATE TABLE bme280sensors ( uuid STRING,  `end` STRING, systemtime STRING, runtime STRING, cpu DOUBLE, id STRING,
te STRING,
host STRING,
macaddress STRING,  diskusage STRING, memory DOUBLE, ipaddress STRING,
host_name STRING, bme280_altitude STRING, bme280_tempf STRING, max30105timestamp STRING,
max30105_detected STRING, max30105_delta STRING, max30105_temp STRING, bme280_tempc STRING,
max30105_mean STRING, max30105_value STRING, bme280_altitude_feet STRING, bme280_pressure STRING,
starttime STRING, cputemp DOUBLE, imgnamep STRING, imgname STRING,
PRIMARY KEY (uuid, `end`)
)
PARTITION BY HASH PARTITIONS 4
STORED AS KUDU
TBLPROPERTIES ('kudu.num_tablet_replicas' = '1');
Python Libraries:

wget https://dl.google.com/coral/python/tflite_runtime-1.14.0-cp37-cp37m-linux_armv7l.whl 
pip3 install scikit-image 
pip3 install getmac 
pip3 install psutilpip3 install --upgrade pip 
pip3 install --upgrade setuptools 
pip3 install tflite_runtime-1.14.0-cp37-cp37m-linux_armv7l.whl 
pip3 install easydict -U 
pip3 install scikit-learn -U 
pip3 install opencv-python -U --user 
pip3 install numpy -U 
pip3 install mxnet -U 
pip3 install gluoncv --upgrade 
pip3 install tensorflow

Summary:
We read thermal images, sensors and camera images with a MiNiFi agent that sends this data via MQTT and HTTP(S) Site-to-Site (S2S).   We process MQTT, Kafka and S2S data streams with NiFi and easily push the final data into Kudu tables which we can query.

So we easily ingest structured, unstructured and semistructured data with live analytics.

In the second part of this article I will show you how we integrate with CDSW for additional machine learning on our data.


References