Showing posts with label sensors. Show all posts
Showing posts with label sensors. Show all posts

Predicting Sensor Readings with Time Series Machine Learning

Predicting Sensor Readings with Time Series Machine Learning


Sensor Unit (
  • BME280 temperature, pressure, humidity sensor
  • LTR-559 light and proximity sensor
  • MICS6814 analog gas sensor
  • ADS1015 ADC with spare channel for adding another analog sensor
  • MEMS microphone
  • 0.96-inch, 160 x 80 color LCD
  • Raspberry Pi 4
  • Intel Movidius 2
  • JDK 8
  • MiNIFi Java Agent 0.6.0
  • Python 3

Example Data

{"uuid": "rpi4_uuid_omi_20200417211935", "amplitude100": 0.3, "amplitude500": 0.1, "amplitude1000": 0.1, "lownoise": 0.1, "midnoise": 0.1, "highnoise": 0.1, "amps": 0.3, "ipaddress": "", "host": "rp4", "host_name": "rp4", "macaddress": "dc:a6:32:03:a6:e9", "systemtime": "04/17/2020 17:19:36", "endtime": "1587158376.22", "runtime": "36.47", "starttime": "04/17/2020 17:18:58", "cpu": 0.0, "cpu_temp": "59.0", "diskusage": "46651.6 MB", "memory": 6.3, "id": "20200417211935_7b7ae5da-905b-418b-94f1-270a15dbc1df", "temperature": "38.7", "adjtemp": "29.7", "adjtempf": "65.5", "temperaturef": "81.7", "pressure": 1015.6, "humidity": 6.8, "lux": 1.2, "proximity": 0, "oxidising": 8.3, "reducing": 306.4, "nh3": 129.5, "gasKO": "Oxidising: 8300.63 Ohms\nReducing: 306352.94 Ohms\nNH3: 129542.17 Ohms"}

EdgeAI: Google Coral with Coral Environmental Sensors and TPU With NiFi and MiNiFi (Updated EFM)

EdgeAI:   Google Coral with Coral Environmental Sensors and TPU With NiFi and MiNiFi

Building MiNiFi IoT Apps with the new Cloudera EFM 

It is very easy to build a drag and drop EdgeAI application with EFM and then push to all your MiNiFi agents.

Cloudera Edge Management CEM-1.1.1
Download the newest CEM today!

NiFi Flow Receiving From MiNiFi Java Agent

In a cluster in my CDP-DC Cluster I consume Kafka messages sent from my remote NiFi gateway to publish alerts to Kafka and push records to Apache HBase and Apache Kudu.  We filter our data with Streaming SQL.

We can use SQL to route, create aggregates like averages, chose a subset of fields and limit data returned.   Using the power of Apache Calcite, Streaming SQL in NiFi is a game changer against Record Data Types including CSV, XML, Avro, Parquet, JSON and Grokable text.   Read and write different formats and convert when your SQL is done.   Or just to SELECT * FROM FLOWFILE to get everything.  

We can see this flow from Atlas as we trace the data lineage and provenance from Kafka topic.

We can search Atlas for Kafka Topics.

From coral Kafka topic to NiFi to Kudu.

Details on Coral Kafka Topic

Examining the Hive Metastore Data on the Coral Kudu Table

NiFi Flow Details in Atlas

Details on Alerts Topic

Statistics from Atlas

Example Web Camera Image

 Example JSON Record

[{"cputemp":59,"id":"20200221190718_2632409e-f635-48e7-9f32-aa1333f3b8f9","temperature":"39.44","memory":91.1,"score_1":"0.29","starttime":"02/21/2020 14:07:13","label_1":"hair spray","tempf":"102.34","diskusage":"50373.5 MB","message":"Success","ambient_light":"329.92","host":"coralenv","cpu":34.1,"macaddress":"b8:27:eb:99:64:6b","pressure":"102.76","score_2":"0.14","ip":"","te":"5.10","systemtime":"02/21/2020 14:07:18","label_2":"syringe","humidity":"10.21"}]

Querying Kudu results in Hue

Pushing Alerts to Slack from NiFi

I am running on Apache NiFi 1.11.1 and wanted to point out a new feature.   Download flow:   Will download the highlighted flow/pgroup as JSON.

Looking at NiFi counters to monitor progress:

We can see how easy it is to ingest IoT sensor data and run AI algorithms on Coral TPUs.

Shell (

DATE=$(date +"%Y-%m-%d_%H%M%S")
fswebcam -q -r 1280x720 /opt/demo/images/$DATE.jpg
python3 -W ignore /opt/demo/ --image /opt/demo/images/$DATE.jpg 2>/dev/null

Kudu Table DDL

Python 3 (

import time
import sys
import subprocess
import os
import base64
import uuid
import datetime
import traceback
import base64
import json
from time import gmtime, strftime
import math
import random, string
import time
import psutil
import uuid 
from getmac import get_mac_address
from coral.enviro.board import EnviroBoard
from luma.core.render import canvas
from PIL import Image, ImageDraw, ImageFont
import os
import argparse
from edgetpu.classification.engine import ClassificationEngine

# Importing socket library 
import socket 

start = time.time()
starttf ='%m/%d/%Y %H:%M:%S')

def ReadLabelFile(file_path):
    with open(file_path, 'r') as f:
        lines = f.readlines()
    ret = {}
    for line in lines:
        pair = line.strip().split(maxsplit=1)
        ret[int(pair[0])] = pair[1].strip()
    return ret

# Google Example Code
def update_display(display, msg):
    with canvas(display) as draw:
        draw.text((0, 0), msg, fill='white')

def getCPUtemperature():
    res = os.popen('vcgencmd measure_temp').readline()

# Get MAC address of a local interfaces
def psutil_iface(iface):
    # type: (str) -> Optional[str]
    import psutil
    nics = psutil.net_if_addrs()
    if iface in nics:
        nic = nics[iface]
        for i in nic:
            if == psutil.AF_LINK:
                return i.address
# /opt/demo/examples-camera/all_models  
row = { }
#i = 1
#while i == 1:
    parser = argparse.ArgumentParser()
    parser.add_argument('--image', help='File path of the image to be recognized.', required=True)
    args = parser.parse_args()
    # Prepare labels.
    labels = ReadLabelFile('/opt/demo/examples-camera/all_models/imagenet_labels.txt')

    # Initialize engine.
    engine = ClassificationEngine('/opt/demo/examples-camera/all_models/inception_v4_299_quant_edgetpu.tflite')

    # Run inference.
    img =

    scores = {}
    kCount = 1

    # Iterate Inference Results
    for result in engine.ClassifyWithImage(img, top_k=5):
        scores['label_' + str(kCount)] = labels[result[0]]
        scores['score_' + str(kCount)] = "{:.2f}".format(result[1])
        kCount = kCount + 1    

    enviro = EnviroBoard()
    host_name = socket.gethostname()
    host_ip = socket.gethostbyname(host_name)
    uuid2 = '{0}_{1}'.format(strftime("%Y%m%d%H%M%S",gmtime()),uuid.uuid4())
    usage = psutil.disk_usage("/")
    end = time.time()
    row['host'] = os.uname()[1]
    row['ip'] = host_ip
    row['macaddress'] = psutil_iface('wlan0')
    row['cputemp'] = round(cpuTemp,2)
    row['te'] = "{0:.2f}".format((end-start))
    row['starttime'] = starttf
    row['systemtime'] ='%m/%d/%Y %H:%M:%S')
    row['cpu'] = psutil.cpu_percent(interval=1)
    row['diskusage'] = "{:.1f} MB".format(float( / 1024 / 1024)
    row['memory'] = psutil.virtual_memory().percent
    row['id'] = str(uuid2)
    row['message'] = "Success"
    row['temperature'] = '{0:.2f}'.format(enviro.temperature)
    row['humidity'] = '{0:.2f}'.format(enviro.humidity)
    row['tempf'] = '{0:.2f}'.format((enviro.temperature * 1.8) + 32)    
    row['ambient_light'] = '{0}'.format(enviro.ambient_light)
    row['pressure'] = '{0:.2f}'.format(enviro.pressure)
    msg = 'Temp: {0}'.format(row['temperature'])
    msg += 'IP: {0}'.format(row['ip'])
    update_display(enviro.display, msg)
#    i = 2
    row['message'] = "Error"

Source Code:

Sensors / Devices / Hardware:

  • Humdity-HDC2010 humidity sensor
  • Light-OPT3002 ambient light sensor
  • Barometric-BMP280 barometric pressure sensor
  • PS3 Eye Camera and Microphone USB
  • Raspberry Pi 3B+
  • Google Coral Environmental Sensor Board
  • Google Coral USB Accelerator TPU


Cloudera Edge2AI: MiNiFi Java Agent with Raspberry Pi and Thermal Camera and Air Quality Sensor - Part 1

MiNiFi with Thermal Cameras and Air Quality Sensors

Cloudera Edge2AI:  MiNiFi Java Agent with Raspberry Pi and Thermal Camera and Air Quality Sensor - Part 1

Use Case / Overview:

We need to track heat signatures, web camera images, gas and other readings from a remote workers office.   This is for occupancy analytics and safety monitoring.   We can extend this to field and remote sites where levels of temperatures, movements, noxious gases and other real world items may cause risk to our staff.

For tracking room temperature and humidity we have other sensors:

We can also track things like GPS, overhead plane traffic, regionalized social media reports, news, government reports, weather, mass transit status, traffic cameras, smoke, stock market, database, files, syslog, logs and anything else we may need to add to improve our machine learning and deep learning models.

We want to send immediate alerts if anything is dangerous to the equipment or living things in the area.   We have deployed a pre-built TensorFlow Lite model to our edge device to execute on incoming images.   We can deploy our own models automagically via Apache NiFi - MiNiFi Agents and Edge Flow Manager:

Hardware Component List:

Software Component List:
  • Raspian
  • Python 3.7
  • OpenJDK 8 Java
  • Apache NiFi 1.10
  • MiniFi Agent 0.6.0 Java
  • Cloudera Edge Flow Manager (CEM)
  • Apache Kudu
  • Apache Kafka 2.x
  • Cloudera Streams Messaging Manager
  • Cloudera Manager 7.0.3
  • Apache Hue
  • Cloudera Schema Registry
  • Apache NiFi Registry
  • CDP on AWS with Kafka and NiFi Data Hub
Source Code:
Example JSON Data:

{"uuid": "sgp30_uuid_qmy_20200104232746", "ipaddress": "", "runtime": "0", "host": "garden2", "host_name": "garden2", "macaddress": "dc:a6:32:32:98:20", "end": "1578180466.510303", "te": "0.027782917022705078", "systemtime": "01/04/2020 18:27:46", "cpu": 3.9, "diskusage": "111786.0 MB", "memory": 9.7, "equivalentco2ppm": "  405", "totalvocppb": "   16", "id": "20200104232746_9379ae31-d848-4655-964f-92bd1b5e63fe"}

 { "node_id_0" : "511",
  "label_0" : "container ship, containership, container vessel",
  "result_0" : "0.439216",
  "node_id_1" : "650",
  "label_1" : "megalith, megalithic structure",
  "result_1" : "0.050980",
  "node_id_2" : "580",
  "label_2" : "grand piano, grand",
  "result_2" : "0.050980",
  "node_id_3" : "882",
  "label_3" : "upright, upright piano",
  "result_3" : "0.027451",
  "node_id_4" : "518",
  "label_4" : "crane",
  "result_4" : "0.023529",
  "uuid" : "tensorflow_uuid_mbk_20200106194910",
  "ipaddress" : "",
  "runtime" : "0",
  "host" : "garden2",
  "host_name" : "garden2",
  "macaddress" : "dc:a6:32:32:98:20",
  "end" : "1578340151.1180859",
  "te" : "0.18831157684326172",
  "systemtime" : "01/06/2020 14:49:11",
  "cpu" : 45.8,
  "diskusage" : "109621.5 MB",
  "memory" : 50.2,
  "id" : "20200106194910_098df463-4d9e-4326-9f8c-ad12fe55c7d2" }

Example Thermal Data:

Example Web Cam Data:


We need to make sure the Raspberry Pi is up to date and has some software available such as git, curl and unzip.    I can then build the MLX library.
I had to install the sgp30-python using the from the github, don't use the pip3 install.   Initial calibration will take a while, so let that happen.
A demo is installed:   /home/pi/Pimoroni/sgp30/examples
MLX Thermal Camera
There is a nice Python utility that will build a GIF animation from the camera.   /opt/demo/mlx90640-library/python
Make sure you build all examples.
Install Java for MiNiFi Agent
sudo apt install openjdk-8-jdk openjdk-8-jre

Download MiNiFi from Cloudera or and copy zip/tar/gz to your device.

MiNiFi Ingest:

Web Camera Images
Sensor JSON Logs
Thermal Videos
TensorFlow Lite Classification Results

MiNiFi can grab my data from my device whether it's files, logs, images, sensor readings, python app calls, shell scripts, TCP, UDP or whatever.  Goodbye cron.

Building a MiNiFi flow and pushing that to make MiNiFi agents running on various devices is a snap.   I drag and drop a few components, set some parameters such which file to tail, what directory to list and what scripts to run and bam send it to the cloud.

It is very easy to monitor my progress with EFM event viewer or REST API.

NiFi Processing:

  • Receive From MiNiFi Agents
  • Route Images to Image Processing
  • QueryRecord to limit and route records
  • Push to Kafka

Additional NiFi Processing in Second AWS Cluster

  • Consume Kafka
  • Insert into Kudu Table

We need to add topics to Kafka so we can send our messages for further processing.

Before I can create Kafka topics, send messages or monitor Kafka, I need a Kafka cluster.   Using The Enterprise Data Cloud, Cloudera Data Platform lets me easily spin up a Kafka cluster on AWS or Azure (soon Google).

If I don't like fancy web UIs or need to script this, there is a CDP CLI that I can create a cluster using a JSON template.

It is very easy to build NiFi and Kafka Data Hubs and make them available to users/developers in minutes.   So we are now ready to rock.

Since we have been producing and consuming thousands of messages in different topics to my cloud hosted Kafka cluster, let's see what's going on by using Cloudera's Streams Messaging Manager (SMM).   

Let's see the data now that it has landed in Impala/Kudu tables.   So easy to query my tables with Apache Hue.

We can see the data displayed in Slack channels.

I can see my tables have been built in Kudu.

Cloud Storage - Kudu Tables:

CREATE TABLE webcam ( uuid STRING,  `end` STRING, systemtime STRING, runtime STRING, cpu DOUBLE, id STRING, te STRING,
host STRING,
macaddress STRING, diskusage STRING, memory DOUBLE, ipaddress STRING, host_name STRING,
node_id_0 INT, label_0 STRING, result_0 DOUBLE,
node_id_1 INT, label_1 STRING, result_1 DOUBLE,
node_id_2 INT, label_2 STRING, result_2 DOUBLE,
node_id_3 INT, label_3 STRING, result_3 DOUBLE,
node_id_4 INT, label_4 STRING, result_4 DOUBLE,
PRIMARY KEY (uuid, `end`)
TBLPROPERTIES ('kudu.num_tablet_replicas' = '1');
CREATE TABLE gassensors ( uuid STRING,  `end` STRING, systemtime STRING, runtime STRING, cpu DOUBLE, id STRING, te STRING,
host STRING, equivalentco2ppm STRING,  totalvocppb STRING,
macaddress STRING,  diskusage STRING, memory DOUBLE, ipaddress STRING, host_name STRING,
PRIMARY KEY (uuid, `end`)
TBLPROPERTIES ('kudu.num_tablet_replicas' = '1'); 
CREATE TABLE bme280sensors ( uuid STRING,  `end` STRING, systemtime STRING, runtime STRING, cpu DOUBLE, id STRING,
host STRING,
macaddress STRING,  diskusage STRING, memory DOUBLE, ipaddress STRING,
host_name STRING, bme280_altitude STRING, bme280_tempf STRING, max30105timestamp STRING,
max30105_detected STRING, max30105_delta STRING, max30105_temp STRING, bme280_tempc STRING,
max30105_mean STRING, max30105_value STRING, bme280_altitude_feet STRING, bme280_pressure STRING,
starttime STRING, cputemp DOUBLE, imgnamep STRING, imgname STRING,
PRIMARY KEY (uuid, `end`)
TBLPROPERTIES ('kudu.num_tablet_replicas' = '1');
Python Libraries:

pip3 install scikit-image 
pip3 install getmac 
pip3 install psutilpip3 install --upgrade pip 
pip3 install --upgrade setuptools 
pip3 install tflite_runtime-1.14.0-cp37-cp37m-linux_armv7l.whl 
pip3 install easydict -U 
pip3 install scikit-learn -U 
pip3 install opencv-python -U --user 
pip3 install numpy -U 
pip3 install mxnet -U 
pip3 install gluoncv --upgrade 
pip3 install tensorflow

We read thermal images, sensors and camera images with a MiNiFi agent that sends this data via MQTT and HTTP(S) Site-to-Site (S2S).   We process MQTT, Kafka and S2S data streams with NiFi and easily push the final data into Kudu tables which we can query.

So we easily ingest structured, unstructured and semistructured data with live analytics.

In the second part of this article I will show you how we integrate with CDSW for additional machine learning on our data.