Showing posts with label apache-nifi-1.11. Show all posts
Showing posts with label apache-nifi-1.11. Show all posts

Predicting Sensor Readings with Time Series Machine Learning

Predicting Sensor Readings with Time Series Machine Learning


Sensor Unit (
  • BME280 temperature, pressure, humidity sensor
  • LTR-559 light and proximity sensor
  • MICS6814 analog gas sensor
  • ADS1015 ADC with spare channel for adding another analog sensor
  • MEMS microphone
  • 0.96-inch, 160 x 80 color LCD
  • Raspberry Pi 4
  • Intel Movidius 2
  • JDK 8
  • MiNIFi Java Agent 0.6.0
  • Python 3

Example Data

{"uuid": "rpi4_uuid_omi_20200417211935", "amplitude100": 0.3, "amplitude500": 0.1, "amplitude1000": 0.1, "lownoise": 0.1, "midnoise": 0.1, "highnoise": 0.1, "amps": 0.3, "ipaddress": "", "host": "rp4", "host_name": "rp4", "macaddress": "dc:a6:32:03:a6:e9", "systemtime": "04/17/2020 17:19:36", "endtime": "1587158376.22", "runtime": "36.47", "starttime": "04/17/2020 17:18:58", "cpu": 0.0, "cpu_temp": "59.0", "diskusage": "46651.6 MB", "memory": 6.3, "id": "20200417211935_7b7ae5da-905b-418b-94f1-270a15dbc1df", "temperature": "38.7", "adjtemp": "29.7", "adjtempf": "65.5", "temperaturef": "81.7", "pressure": 1015.6, "humidity": 6.8, "lux": 1.2, "proximity": 0, "oxidising": 8.3, "reducing": 306.4, "nh3": 129.5, "gasKO": "Oxidising: 8300.63 Ohms\nReducing: 306352.94 Ohms\nNH3: 129542.17 Ohms"}

SQL Reporting Task for Cloudera Flow Management / HDF / Apache NiFi

SQL Reporting Task for Cloudera Flow Management / HDF / Apache NiFi

Would you like to have reporting tasks gathering metrics and sending them to your database or Kafka from NiFi based on a query of NiFi provenance, bulletins, metrics, processor status or other KPI?

Now you can.   If you are using HDF 3.5.0, this Reporting task NAR is pre installed and ready to go.

Let's add some Reporting tasks that use SQL!!!  QueryNiFiReportingTask.

The first one that was interesting for me was to write queries against provenance for one processor that consumes from a certain topic, I decided to query it every 10 seconds.   My query and some results are below.

So let's go to Controller Settings / Reporting Tasks and then add QueryNiFiReportingTask:

We add one per item we want to monitor.   Then for the reporting task we will need a place to send the records (a sink), we can send it to a JDBC Database (DatabaseRecordSink, KafkaRecordSink, PrometheusRecordSink, ScriptedRecordSink or SiteToSiteReportingRecordSink).   I am going to do Kafka, but Prometheus, Database and S2S are good options.   If you send SiteToSite you can send to another NiFi cluster for processing for that NiFinception processing route,

We have to write an Apache Calcite compliant SQL query, set our sink and decide if we want to include zero record results (false is a good idea).

One option is to query the BULLETINS table, which are NiFi Cluster bulletins (warnings/errors).

Another option is the CONNECTION_STATUS table.

How about NiFi JVM Metrics?  That has some good stuff in there.

Let's configure some Kafka Record Sinks.  I am using Kafka 2.3, so I'll use the Kafka 2 sink.   I set some brokers (default port of 9092), create a new topic for it, chose the record writer.  I chose JSON, but it could be CSV, AVRO, Parquet, XML or something else.   Stick with JSON or AVRO for Kafka.

So data starts getting sent, by default the schedule is every 5 minutes.   This can be adjusted, for provenance I set mine to every 10 seconds.


Let's look at data as it passes through Kafka, in the follow up to this article we'll land in Impala/Kudu/Hive/Druid/Phoenix/HBase or HDFS/S3 and query it.   For now, we can examine the data within Kafka via Cloudera Streams Messaging Manager (SMM).

We can examine any of the topics in Kafka with SMM.   We can then start consuming these records and build live metric handling applications or send to live data marts and dashboards powered by Cloudera Data Platform (CDP).

We have a full set of SQL to use for these reporting tasks including selecting columns, doing aggregates like MAX and AVG, ordering, grouping, where clauses and row limits.

Provenance Query

SELECT eventId, durationMillis, lineageStart, timestampMillis,
updatedAttributes, entitySize, details
WHERE componentName = 'Consume Kafka iot messages'

Provenance Results

[{"eventId":2724,"durationMillis":69,"lineageStart":1586294707989,"timestampMillis":1586294708002,"updatedAttributes":"{path=./, schema.protocol.version=1, filename=be499074-c595-46f5-a03a-482607fb9c8c, schema.identifier=1, kafka.partition=7, kafka.offset=36,, kafka.timestamp=1586294707933, kafka.topic=iot, schema.version=1, mime.type=application/json, uuid=be499074-c595-46f5-a03a-482607fb9c8c}","entitySize":246,"details":null},{"eventId":2736,"durationMillis":20,"lineageStart":1586294708905,"timestampMillis":1586294708916,"updatedAttributes":"{path=./, schema.protocol.version=1, filename=74c9e28d-82a4-4cea-8331-92b7a4bee1b3, schema.identifier=1, kafka.partition=3, kafka.offset=36,, kafka.timestamp=1586294708898, kafka.topic=iot, schema.version=1, mime.type=application/json, uuid=74c9e28d-82a4-4cea-8331-92b7a4bee1b3}","entitySize":247,"details":null},..]

I have classes on using this technology and some webinars coming to you in the East Coast of the US and virtually at my meetup.

Cloudera HDF 3.5.0 Is Now Available For Download

HDF 3.5.0 includes the following components:
  • Apache Ambari 2.7.5
  • Apache Kafka 2.3.1
  • Apache NiFi 1.11.1
  • NiFi Registry 0.5.0
  • Apache Ranger 1.2.0
  • Apache Storm 1.2.1
  • Apache ZooKeeper 3.4.6
  • Apache MiNiFi Java Agent 0.6.0
  • Apache MiNiFi C++ 0.6.0
  • Hortonworks Schema Registry 0.8.1
  • Hortonworks Streaming Analytics Manager 0.6.0
  • Apache Knox 1.0.0
  • SmartSense 1.5.0

SQL reporting task. The QueryNiFiReportingTask allows users to execute SQL queries against tables containing information on Connection Status, Processor Status, Bulletins, Process Group Status, JVM Metrics, Provenance and Connection Status Predictions. In combination with Site to Site, it is particularly useful to define fine-grained monitoring capabilities on top of the running workflows.

Using NiFi CLI to Restore NiFi Flows From Backups

Using NiFi CLI to Restore NiFi Flows From Backups

Please note, Apache NiFi 1.11.4 is now available for download


#> registry list-buckets -u

#   Name   Id                                     Description 
-   ----   ------------------------------------   ----------- 
1   IOT    45834964-d022-4f4c-891f-695898e1e5f0   (empty)     
2   IoT    250a5ae5-ced8-4f4e-8b3b-01eb9d47a0d9   (empty)     
3   dev    46b7bab7-400f-44ae-a0e6-7340ff19c96f   (empty)     
4   iot    c594d6bc-7413-4f6a-ba9a-50b8020eec37   (empty)     
5   prod   0bf59d2e-1dd5-4d24-8aa0-0614bf991dc9   (empty)     

#> registry create-flow -verbose -u -b 250a5ae5-ced8-4f4e-8b3b-01eb9d47a0d9 --flowName iotFlow


#> registry import-flow-version -verbose -u -f a5a4ac59-9aeb-416e-937f-e601ca8beba9 -i iot-1.json

#> registry list-flows  -u -b 250a5ae5-ced8-4f4e-8b3b-01eb9d47a0d9

#   Name      Id                                     Description 
-   -------   ------------------------------------   ----------- 
1   iotFlow   a5a4ac59-9aeb-416e-937f-e601ca8beba9   (empty)     

New and Improved: It's NiFi

Apache NiFi 1.11.3

If you have downloaded anything after NiFi 1.10, please upgrade now.   This has some major improvements and some fixes.

Release note highlights can be found here:

I am running this now in Anaheim, and it's no Mickey Mouse upgrade.   It's fast and nice.

Some of the more recent upgrades:

For parameters and stateless, and ability to download a flow as JSON is worth the price of install.

See some more NiFi 1.11 features here:

EdgeAI: Google Coral with Coral Environmental Sensors and TPU With NiFi and MiNiFi (Updated EFM)

EdgeAI:   Google Coral with Coral Environmental Sensors and TPU With NiFi and MiNiFi

Building MiNiFi IoT Apps with the new Cloudera EFM 

It is very easy to build a drag and drop EdgeAI application with EFM and then push to all your MiNiFi agents.

Cloudera Edge Management CEM-1.1.1
Download the newest CEM today!

NiFi Flow Receiving From MiNiFi Java Agent

In a cluster in my CDP-DC Cluster I consume Kafka messages sent from my remote NiFi gateway to publish alerts to Kafka and push records to Apache HBase and Apache Kudu.  We filter our data with Streaming SQL.

We can use SQL to route, create aggregates like averages, chose a subset of fields and limit data returned.   Using the power of Apache Calcite, Streaming SQL in NiFi is a game changer against Record Data Types including CSV, XML, Avro, Parquet, JSON and Grokable text.   Read and write different formats and convert when your SQL is done.   Or just to SELECT * FROM FLOWFILE to get everything.  

We can see this flow from Atlas as we trace the data lineage and provenance from Kafka topic.

We can search Atlas for Kafka Topics.

From coral Kafka topic to NiFi to Kudu.

Details on Coral Kafka Topic

Examining the Hive Metastore Data on the Coral Kudu Table

NiFi Flow Details in Atlas

Details on Alerts Topic

Statistics from Atlas

Example Web Camera Image

 Example JSON Record

[{"cputemp":59,"id":"20200221190718_2632409e-f635-48e7-9f32-aa1333f3b8f9","temperature":"39.44","memory":91.1,"score_1":"0.29","starttime":"02/21/2020 14:07:13","label_1":"hair spray","tempf":"102.34","diskusage":"50373.5 MB","message":"Success","ambient_light":"329.92","host":"coralenv","cpu":34.1,"macaddress":"b8:27:eb:99:64:6b","pressure":"102.76","score_2":"0.14","ip":"","te":"5.10","systemtime":"02/21/2020 14:07:18","label_2":"syringe","humidity":"10.21"}]

Querying Kudu results in Hue

Pushing Alerts to Slack from NiFi

I am running on Apache NiFi 1.11.1 and wanted to point out a new feature.   Download flow:   Will download the highlighted flow/pgroup as JSON.

Looking at NiFi counters to monitor progress:

We can see how easy it is to ingest IoT sensor data and run AI algorithms on Coral TPUs.

Shell (

DATE=$(date +"%Y-%m-%d_%H%M%S")
fswebcam -q -r 1280x720 /opt/demo/images/$DATE.jpg
python3 -W ignore /opt/demo/ --image /opt/demo/images/$DATE.jpg 2>/dev/null

Kudu Table DDL

Python 3 (

import time
import sys
import subprocess
import os
import base64
import uuid
import datetime
import traceback
import base64
import json
from time import gmtime, strftime
import math
import random, string
import time
import psutil
import uuid 
from getmac import get_mac_address
from coral.enviro.board import EnviroBoard
from luma.core.render import canvas
from PIL import Image, ImageDraw, ImageFont
import os
import argparse
from edgetpu.classification.engine import ClassificationEngine

# Importing socket library 
import socket 

start = time.time()
starttf ='%m/%d/%Y %H:%M:%S')

def ReadLabelFile(file_path):
    with open(file_path, 'r') as f:
        lines = f.readlines()
    ret = {}
    for line in lines:
        pair = line.strip().split(maxsplit=1)
        ret[int(pair[0])] = pair[1].strip()
    return ret

# Google Example Code
def update_display(display, msg):
    with canvas(display) as draw:
        draw.text((0, 0), msg, fill='white')

def getCPUtemperature():
    res = os.popen('vcgencmd measure_temp').readline()

# Get MAC address of a local interfaces
def psutil_iface(iface):
    # type: (str) -> Optional[str]
    import psutil
    nics = psutil.net_if_addrs()
    if iface in nics:
        nic = nics[iface]
        for i in nic:
            if == psutil.AF_LINK:
                return i.address
# /opt/demo/examples-camera/all_models  
row = { }
#i = 1
#while i == 1:
    parser = argparse.ArgumentParser()
    parser.add_argument('--image', help='File path of the image to be recognized.', required=True)
    args = parser.parse_args()
    # Prepare labels.
    labels = ReadLabelFile('/opt/demo/examples-camera/all_models/imagenet_labels.txt')

    # Initialize engine.
    engine = ClassificationEngine('/opt/demo/examples-camera/all_models/inception_v4_299_quant_edgetpu.tflite')

    # Run inference.
    img =

    scores = {}
    kCount = 1

    # Iterate Inference Results
    for result in engine.ClassifyWithImage(img, top_k=5):
        scores['label_' + str(kCount)] = labels[result[0]]
        scores['score_' + str(kCount)] = "{:.2f}".format(result[1])
        kCount = kCount + 1    

    enviro = EnviroBoard()
    host_name = socket.gethostname()
    host_ip = socket.gethostbyname(host_name)
    uuid2 = '{0}_{1}'.format(strftime("%Y%m%d%H%M%S",gmtime()),uuid.uuid4())
    usage = psutil.disk_usage("/")
    end = time.time()
    row['host'] = os.uname()[1]
    row['ip'] = host_ip
    row['macaddress'] = psutil_iface('wlan0')
    row['cputemp'] = round(cpuTemp,2)
    row['te'] = "{0:.2f}".format((end-start))
    row['starttime'] = starttf
    row['systemtime'] ='%m/%d/%Y %H:%M:%S')
    row['cpu'] = psutil.cpu_percent(interval=1)
    row['diskusage'] = "{:.1f} MB".format(float( / 1024 / 1024)
    row['memory'] = psutil.virtual_memory().percent
    row['id'] = str(uuid2)
    row['message'] = "Success"
    row['temperature'] = '{0:.2f}'.format(enviro.temperature)
    row['humidity'] = '{0:.2f}'.format(enviro.humidity)
    row['tempf'] = '{0:.2f}'.format((enviro.temperature * 1.8) + 32)    
    row['ambient_light'] = '{0}'.format(enviro.ambient_light)
    row['pressure'] = '{0:.2f}'.format(enviro.pressure)
    msg = 'Temp: {0}'.format(row['temperature'])
    msg += 'IP: {0}'.format(row['ip'])
    update_display(enviro.display, msg)
#    i = 2
    row['message'] = "Error"

Source Code:

Sensors / Devices / Hardware:

  • Humdity-HDC2010 humidity sensor
  • Light-OPT3002 ambient light sensor
  • Barometric-BMP280 barometric pressure sensor
  • PS3 Eye Camera and Microphone USB
  • Raspberry Pi 3B+
  • Google Coral Environmental Sensor Board
  • Google Coral USB Accelerator TPU


EdgeAI: Jetson Nano with MiNiFi C++ Agent

Build and Utilizing The Apache NiFi - MiNiFi C++ Agent For Jetson Nano

(EdgeAI:   Jetson Nano with MiNiFi C++ Agent)




FPS: .5

Bootstrap and Build


Options:  Kafka, OpenCV, TensorFlow, USB Camera

 Select MiNiFi C++ Features to toggle.
A. Persistent Repositories .....Enabled
B. Lib Curl Features ...........Enabled
C. Lib Archive Features ........Enabled
D. Execute Script support ......Enabled
E. Expression Language support .Enabled
F. Kafka support ...............Enabled
G. PCAP support ................Disabled
H. USB Camera support ..........Enabled
I. GPS support .................Disabled
J. TensorFlow Support ..........Disabled
K. Bustache Support ............Disabled
L. MQTT Support ................Enabled
M. SQLite Support ..............Disabled
N. Python Support ..............Enabled
O. COAP Support ................Enabled
S. SFTP Support ................Enabled
V. AWS Support .................Disabled
T. OpenCV Support ..............Enabled
U. OPC-UA Support...............Enabled


sudo apt-get install libcurl-dev libcurl4-openssl-dev -y

We can see when data arrives in NiFi from a MiNiFi Agent.

 We can publish to Kafka directly from our MiNiFi C++ agent.

If CEM/Edge Flow Manager is a mystery to you, check out the live Swagger REST Documentation.

With MiNiFi C++ I can add a USB Camera.

 In NiFi we can see the Host Information that MiNiFi attached.

Example Data

{"uuid": "nano_uuid_crr_20200218002610", "ipaddress": "", "top1pct": 54.833984375, "top1": "cab, hack, taxi, taxicab", "cputemp": "45.5", "gputemp": "43.5", "gputempf": "110", "cputempf": "114", "runtime": "4", "host": "jetsonnano", "filename": "/opt/demo/images/image_esq_20200218002610.jpg", "imageinput": "/opt/demo/images/2020-02-17_1926.jpg", "host_name": "jetsonnano", "macaddress": "ec:08:6b:18:0d:7f", "end": "1581985574.6246474", "te": "4.158604383468628", "systemtime": "02/17/2020 19:26:14", "cpu": 51.8, "diskusage": "5479.7 MB", "memory": 71.4, "id": "20200218002610_8a12dd65-1038-41ac-b923-98fc907f5be0"}

Example Config.yml Section

  name: AppendHostInfo
  class: org.apache.nifi.minifi.processors.AppendHostInfo
  max concurrent tasks: 1
  scheduling strategy: TIMER_DRIVEN
  scheduling period: 1000 ms
  penalization period: 30000 ms
  yield period: 1000 ms
  run duration nanos: 0
  auto-terminated relationships list: []
    Hostname Attribute: source.hostname
    IP Attribute: source.ipv4
    Network Interface Name: wlan0

Example Output

[2020-02-11 19:35:09.116] [org::apache::nifi::minifi::processors::ExecuteProcess] [info] Execute Command /opt/demo/ 
[2020-02-11 19:35:11.275] [org::apache::nifi::minifi::c2::C2Agent] [info] Checking 0 triggers
[2020-02-11 19:35:13.742] [org::apache::nifi::minifi::c2::C2Agent] [info] Checking 0 triggers
[2020-02-11 19:35:15.568] [org::apache::nifi::minifi::core::ProcessSession] [info] Transferring 899b5964-4d2f-11ea-8b9a-6e260e221e3d from ExecuteProcess - Python to relationship success
[2020-02-11 19:35:15.568] [org::apache::nifi::minifi::processors::ExecuteProcess] [info] Execute Command Complete /opt/demo/ status 0 pid 31004
[2020-02-11 19:35:15.569] [org::apache::nifi::minifi::core::ProcessSession] [info] Transferring 899b5964-4d2f-11ea-8b9a-6e260e221e3d from AppendHostInfo to relationship success
[2020-02-11 19:35:15.649] [org::apache::nifi::minifi::sitetosite::SiteToSiteClient] [info] Site to Site transaction 4d0b460e-e4f6-4ca1-8c56-30d310a0712b sent flow 1flow records, with total size 3581
[2020-02-11 19:35:15.785] [org::apache::nifi::minifi::sitetosite::HttpSiteToSiteClient] [info] Site to Site closed transaction 4d0b460e-e4f6-4ca1-8c56-30d310a0712b
[2020-02-11 19:35:15.841] [org::apache::nifi::minifi::sitetosite::SiteToSiteClient] [info] Site2Site transaction 4d0b460e-e4f6-4ca1-8c56-30d310a0712b peer finished transaction
[2020-02-11 19:35:15.841] [org::apache::nifi::minifi::io::HttpStream] [warning] Future status already cleared for, continuing
[2020-02-11 19:35:16.236] [org::apache::nifi::minifi::c2::C2Agent] [info] Checking 0 triggers
[2020-02-11 19:35:16.263] [org::apache::nifi::minifi::core::ProcessSession] [info] Transferring 8a05413a-4d2f-11ea-8b9a-6e260e221e3d from TailFile to relationship success
[2020-02-11 19:35:16.264] [org::apache::nifi::minifi::processors::TailFile] [info] TailFile nano.log for 616 bytes
[2020-02-11 19:35:16.273] [org::apache::nifi::minifi::core::ProcessSession] [info] Transferring 8a05413a-4d2f-11ea-8b9a-6e260e221e3d from AppendHostInfo to relationship success
[2020-02-11 19:35:16.274] [org::apache::nifi::minifi::core::ProcessSession] [info] Transferring 8a05413a-4d2f-11ea-8b9a-6e260e221e3d from PublishKafka to relationship success
[2020-02-11 19:35:18.748] [org::apache::nifi::minifi::c2::C2Agent] [info] Checking 0 triggers
[2020-02-11 19:35:21.260] [org::apache::nifi::minifi::c2::C2Agent] [info] Checking 0 triggers

Using Apache NiFi - MiNiFi C++ Agent Elsewhere

I am working on a Jetbot robot powered by NVidia Jetson Nano that will use the MiNiFi C++ agent.