Showing posts with label deep-learning. Show all posts
Showing posts with label deep-learning. Show all posts

EdgeAI: Google Coral with Coral Environmental Sensors and TPU With NiFi and MiNiFi (Updated EFM)

EdgeAI:   Google Coral with Coral Environmental Sensors and TPU With NiFi and MiNiFi

Building MiNiFi IoT Apps with the new Cloudera EFM 

It is very easy to build a drag and drop EdgeAI application with EFM and then push to all your MiNiFi agents.

Cloudera Edge Management CEM-1.1.1
Download the newest CEM today!

NiFi Flow Receiving From MiNiFi Java Agent

In a cluster in my CDP-DC Cluster I consume Kafka messages sent from my remote NiFi gateway to publish alerts to Kafka and push records to Apache HBase and Apache Kudu.  We filter our data with Streaming SQL.

We can use SQL to route, create aggregates like averages, chose a subset of fields and limit data returned.   Using the power of Apache Calcite, Streaming SQL in NiFi is a game changer against Record Data Types including CSV, XML, Avro, Parquet, JSON and Grokable text.   Read and write different formats and convert when your SQL is done.   Or just to SELECT * FROM FLOWFILE to get everything.  

We can see this flow from Atlas as we trace the data lineage and provenance from Kafka topic.

We can search Atlas for Kafka Topics.

From coral Kafka topic to NiFi to Kudu.

Details on Coral Kafka Topic

Examining the Hive Metastore Data on the Coral Kudu Table

NiFi Flow Details in Atlas

Details on Alerts Topic

Statistics from Atlas

Example Web Camera Image

 Example JSON Record

[{"cputemp":59,"id":"20200221190718_2632409e-f635-48e7-9f32-aa1333f3b8f9","temperature":"39.44","memory":91.1,"score_1":"0.29","starttime":"02/21/2020 14:07:13","label_1":"hair spray","tempf":"102.34","diskusage":"50373.5 MB","message":"Success","ambient_light":"329.92","host":"coralenv","cpu":34.1,"macaddress":"b8:27:eb:99:64:6b","pressure":"102.76","score_2":"0.14","ip":"","te":"5.10","systemtime":"02/21/2020 14:07:18","label_2":"syringe","humidity":"10.21"}]

Querying Kudu results in Hue

Pushing Alerts to Slack from NiFi

I am running on Apache NiFi 1.11.1 and wanted to point out a new feature.   Download flow:   Will download the highlighted flow/pgroup as JSON.

Looking at NiFi counters to monitor progress:

We can see how easy it is to ingest IoT sensor data and run AI algorithms on Coral TPUs.

Shell (

DATE=$(date +"%Y-%m-%d_%H%M%S")
fswebcam -q -r 1280x720 /opt/demo/images/$DATE.jpg
python3 -W ignore /opt/demo/ --image /opt/demo/images/$DATE.jpg 2>/dev/null

Kudu Table DDL

Python 3 (

import time
import sys
import subprocess
import os
import base64
import uuid
import datetime
import traceback
import base64
import json
from time import gmtime, strftime
import math
import random, string
import time
import psutil
import uuid 
from getmac import get_mac_address
from coral.enviro.board import EnviroBoard
from luma.core.render import canvas
from PIL import Image, ImageDraw, ImageFont
import os
import argparse
from edgetpu.classification.engine import ClassificationEngine

# Importing socket library 
import socket 

start = time.time()
starttf ='%m/%d/%Y %H:%M:%S')

def ReadLabelFile(file_path):
    with open(file_path, 'r') as f:
        lines = f.readlines()
    ret = {}
    for line in lines:
        pair = line.strip().split(maxsplit=1)
        ret[int(pair[0])] = pair[1].strip()
    return ret

# Google Example Code
def update_display(display, msg):
    with canvas(display) as draw:
        draw.text((0, 0), msg, fill='white')

def getCPUtemperature():
    res = os.popen('vcgencmd measure_temp').readline()

# Get MAC address of a local interfaces
def psutil_iface(iface):
    # type: (str) -> Optional[str]
    import psutil
    nics = psutil.net_if_addrs()
    if iface in nics:
        nic = nics[iface]
        for i in nic:
            if == psutil.AF_LINK:
                return i.address
# /opt/demo/examples-camera/all_models  
row = { }
#i = 1
#while i == 1:
    parser = argparse.ArgumentParser()
    parser.add_argument('--image', help='File path of the image to be recognized.', required=True)
    args = parser.parse_args()
    # Prepare labels.
    labels = ReadLabelFile('/opt/demo/examples-camera/all_models/imagenet_labels.txt')

    # Initialize engine.
    engine = ClassificationEngine('/opt/demo/examples-camera/all_models/inception_v4_299_quant_edgetpu.tflite')

    # Run inference.
    img =

    scores = {}
    kCount = 1

    # Iterate Inference Results
    for result in engine.ClassifyWithImage(img, top_k=5):
        scores['label_' + str(kCount)] = labels[result[0]]
        scores['score_' + str(kCount)] = "{:.2f}".format(result[1])
        kCount = kCount + 1    

    enviro = EnviroBoard()
    host_name = socket.gethostname()
    host_ip = socket.gethostbyname(host_name)
    uuid2 = '{0}_{1}'.format(strftime("%Y%m%d%H%M%S",gmtime()),uuid.uuid4())
    usage = psutil.disk_usage("/")
    end = time.time()
    row['host'] = os.uname()[1]
    row['ip'] = host_ip
    row['macaddress'] = psutil_iface('wlan0')
    row['cputemp'] = round(cpuTemp,2)
    row['te'] = "{0:.2f}".format((end-start))
    row['starttime'] = starttf
    row['systemtime'] ='%m/%d/%Y %H:%M:%S')
    row['cpu'] = psutil.cpu_percent(interval=1)
    row['diskusage'] = "{:.1f} MB".format(float( / 1024 / 1024)
    row['memory'] = psutil.virtual_memory().percent
    row['id'] = str(uuid2)
    row['message'] = "Success"
    row['temperature'] = '{0:.2f}'.format(enviro.temperature)
    row['humidity'] = '{0:.2f}'.format(enviro.humidity)
    row['tempf'] = '{0:.2f}'.format((enviro.temperature * 1.8) + 32)    
    row['ambient_light'] = '{0}'.format(enviro.ambient_light)
    row['pressure'] = '{0:.2f}'.format(enviro.pressure)
    msg = 'Temp: {0}'.format(row['temperature'])
    msg += 'IP: {0}'.format(row['ip'])
    update_display(enviro.display, msg)
#    i = 2
    row['message'] = "Error"

Source Code:

Sensors / Devices / Hardware:

  • Humdity-HDC2010 humidity sensor
  • Light-OPT3002 ambient light sensor
  • Barometric-BMP280 barometric pressure sensor
  • PS3 Eye Camera and Microphone USB
  • Raspberry Pi 3B+
  • Google Coral Environmental Sensor Board
  • Google Coral USB Accelerator TPU


IoT Series: MiNiFi Agent on Raspberry Pi 4 with Enviro+ Hat For Environmental Monitoring and Analytics

IoT Series:  MiNiFi Agent on Raspberry Pi 4 with Enviro+ Hat For Environmental Monitoring and Analytics

Summary:  Our powerful edge device streams sensor readings for environmental readings while also performing edge analytics with deep learning libraries and enhanced edge VPU.   We can perform complex running calculations on sensor data locally on the box before making potentially expense network calls.  We can also decide when to send out data based on heuristics, machine learning or simple if-then logic.

Use Case:   Monitor Environment.   Act Local, Report Global.

Stack:   FLANK

Category:   AI, IoT, Edge2AI, Real-Time Streaming, Sensors, Cameras, Telemetry.

Hardware:  Intel Movidius NCC 2 VPU (Neural Computing), Pimoroni Enviro Plus pHAT, Raspberry Pi 4 (4GB Edition).

Software:  Python 3 + Java + MiNiFi Agents + Cloudera Edge Flow Manager (EFM/CEM) + Apache NiFi.   Using Mm... FLaNK Stack.

Keywords:  Edge2AI, CV, AI, Deep Learning, Cloudera, NiFi, Raspberry Pi, Deep Learning, Sensors, IoT, IIoT, Devices, Java, Agents, FLaNK Stack, VPU, Movidius.

Open Source Assets:

I am running a Python script that streams sensor data continuously to MQTT to be picked up by MiNiFi agents or NiFi.   For development I am just running my Python script with a shell script and nohup.
python3 /opt/demo/

nohup ./ &

Example Enviro Plus pHAT Sensor Data

  "uuid" : "rpi4_uuid_xki_20191220215721",
  "ipaddress" : "",
  "host" : "rp4",
  "host_name" : "rp4",
  "macaddress" : "dc:a6:32:03:a6:e9",
  "systemtime" : "12/20/2019 16:57:21",
  "cpu" : 0.0,
  "diskusage" : "46958.1 MB",
  "memory" : 6.3,
  "id" : "20191220215721_938f2137-5adb-4c22-867d-cdfbce6431a8",
  "temperature" : "33.590520852237226",
  "pressure" : "1032.0433707836428",
  "humidity" : "7.793797584651376",
  "lux" : "0.0",
  "proximity" : "0",
  "gas" : "Oxidising: 3747.82 Ohms\nReducing: 479652.17 Ohms\nNH3: 60888.05 Ohms"


We are also running a standard MiNiFi Java Agent 0.6 that is running a Python application to do sensors, edge AI with Intel's OpenVino and some other analytics.


DATE=$(date +"%Y-%m-%d_%H%M")
source /opt/intel/openvino/bin/
fswebcam -q -r 1280x720 --no-banner /opt/demo/images/$DATE.jpg
python3 -W ignore /opt/intel/openvino/build/ /opt/demo/images/$DATE.jpg 2>/dev/null

Example OpenVino Data

{"host": "rp4", "cputemp": "67", "ipaddress": "", "endtime": "1577194586.66", "runtime": "0.00", "systemtime": "12/24/2019 08:36:26", "starttime": "12/24/2019 08:36:26", "diskfree": "46889.0", "memory": "15.1", "uuid": "20191224133626_55157415-1354-4137-8472-424779645fbe", "image_filename": "20191224133626_9317880e-ee87-485a-8627-c7088df734fc.jpg"}

In our flow I convert to Apache Avro, as you can see Avro schema is embedded.

The flow is very simple, consume MQTT messages from our broker on the topic we are pushing messages to from our field sensors.   We also ingest MiNiFi events through standard Apache NiFi HTTP(s) Site-to-Site (S2S).   We route images to our image processor and sensor data right to Kudu tables.

Now that the data is stored to Apache Kudu we can do our analytics.

Easy to Run an MQTT Broker


Demo Info:

Run Mosquitto MQTT on Local Machine (RPI, Mac, Win10, ...)

On OSX, brew install mosquitto


To have launchd start mosquitto now and restart at login:
  brew services start mosquitto

Or, if you don't want/need a background service you can just run:
  mosquitto -c /usr/local/etc/mosquitto/mosquitto.conf

For Python, we need pip3 install paho-mqtt.

Run Sensors on Device that pushes to MQTT

Python pushes continuous stream of sensor data to MQTT

MiNiFi Agent Reads Broker

Send to Kafka and/or NiFi

Example Image Grabbed From Webcam in Dark Office (It's Christmas Eve!)

 When ready we can push to a CDP Data Warehouse in AWS.

With CDP, it's very easy to have a data environment in many clouds to store my live sensor data.

 I can now use this data in Kudu tables from Cloudera Data Science Workbench for real Data Science, machine learning and insights.

What do we do with all of this data?   Check in soon for real-time analytics and dash boarding.

Google Coral TPU with Edge Devices and MiNiFi

Google Coral TPU with Edge Devices and MiNiFi 

Designing Our Edge AI Flow with Cloudera Edge Flow Manager.

Configure Your Remote Process Group to Send Data to Your NiFi Cluster

Monitor Your Agents From the Events Screen

Let's grab all the new images and then delete on completion

We have Input and Output Ports to have Bidirectional communication with 0-n MiNiFi agents

Our NiFi flow to process calls from MiNiFi Agents running Coral TPUs

We run a query to check the TensorFlow Lite classification results and send out a slack message.

Let's push JSON data to a Kafka Cluster in AWS

EFM Series: Using MiNiFi Agents on Raspberry Pi 4 with Intel Movidius Neural Compute Stick 2, Apache NiFi and AI

EFM Series:   Using MiNiFi Agents on Raspberry Pi 4 with Intel Movidius Neural Compute Stick 2, Apache NiFi and AI

The good news is Raspberry Pi 4 can run MiNiFi Java Agents, Intel Movidius Neural Compute Stick 2 and AI libraries.   You can now use this 4GB of RAM device to run IoT with AI on the edge.

Flow From MiNiFi Agent Running OpenVino, SysLog Tail and Grabbing WebCam Images

Configure The Execution of OpenVino Python Applications on RPI 4

Events Returning from Raspberry Pi 4

Models Used

Download model using downloader.


DATE=$(date +"%Y-%m-%d_%H%M")
fswebcam -q -r 1280x720 --no-banner /opt/demo/images/$DATE.jpg

python3 /opt/intel/openvino/build/ /opt/demo/images/$DATE.jpg


  • Apache NiFi
  • Apache NiFi - MiNiFi Agents
  • TensorFlow
  • OpenVino
  • Python 3
  • OpenCV DNN

Python Libraries

pip3 install getmac
pip3 install psutil
pip3 install --upgrade pip
pip3 install --upgrade setuptools

  • Intel Movidius Neural Compute Stick 2
  • Raspberry Pi 4 with 4GB RAM


We can run On-Premise on VMs, Containers, K8 or Bare metal.   Or on own of the clouds such as Google.



  •  /root/inference_engine_samples_build/armv7l/Release
  • /opt/intel/openvino