Google Coral TPU with Edge Devices and MiNiFi

Google Coral TPU with Edge Devices and MiNiFi 

Designing Our Edge AI Flow with Cloudera Edge Flow Manager.

Configure Your Remote Process Group to Send Data to Your NiFi Cluster

Monitor Your Agents From the Events Screen

Let's grab all the new images and then delete on completion

We have Input and Output Ports to have Bidirectional communication with 0-n MiNiFi agents

Our NiFi flow to process calls from MiNiFi Agents running Coral TPUs

We run a query to check the TensorFlow Lite classification results and send out a slack message.

Let's push JSON data to a Kafka Cluster in AWS

Updating Machine Learning Models At The Edge With Apache NiFi and MiNiFi

Updating Machine Learning Models At The Edge With Apache NiFi and MiNiFi

Yes, we have bidirectional communication with MiNiFi agents from Apache NiFi via Site-to-Site (S2S) over HTTPS.   This means I can push in anything I want to the agent, including commands, files and updates.

I can also transmit data to edge agents via MQTT, REST and Kafka amongst other options.

NiFi Ready To Send and Receive Messages From Other NiFi Nodes, Clusters and MiNiFi Agents

Our NiFi flow is consuming Kafka and MQTT Messages, as well as reading updated model files and generating integration test sensor data.

MiNiFi Agents Have Downloaded The Model and Anything Else We Send to It

It's Easy to Configure MQTT Message Consumption in CEM, we just need the broker (with port) and a topic to filter on if you wish.

To Listen For Files/Models You can easily add a REST End Point to Proxy in Data of Your Choice with or without SSL

Here's an example CURL script to test that REST API:

curl -d '{"key1":"value1", "key2":"value2"}' -H "Content-Type: application/json" -X POST

We can generate JSON IoT Style Data for Integration Tests with ease using GenerateFlowFile:

Let's grab updated models when they change from my Data Science server:

I can read Kafka messages and send them to MiNiFi agents as well.

So I pushed a TFLITE model, but ONNX, PMML, Docker or Pickle are all options.

Generating JSON Data in Apache NiFi

Example of a JSON IoT Generator

   "ip": ${ip()},
   "unique_id": "${UUID()}",
   "thread": "${thread()}",
   "hostname": "${hostname()}",
   "sensor_9": ${random():mod(100)},
   "sensor_id": ${random():mod(30)},
   "sensor_3": ${random():mod(50)},
   "sensor_2": ${random():mod(500)},
   "sensor_1": ${random():mod(110)},
   "sensor_0": ${random():mod(150)},
   "sensor_7": ${random():mod(255)},
   "sensor_6": ${random():mod(95)},
   "sensor_5": ${random():mod(80)},
   "sensor_ts": ${now():toNumber()},
   "sensor_8": ${random():mod(120)},
   "sensor_4": ${random():mod(60)},
   "sensor_11": ${random():mod(20)},
   "sensor_10": ${random():mod(10)}


Using Cloudera Streams Messaging Manager for Apache Kafka Monitoring, Management, Analytics and CRUD

Using Cloudera Streams Messaging Manager for Apache Kafka Monitoring, Management, Analytics and CRUD

SMM is powerful tool to work with Apache Kafka and provide you with monitoring, management, analytics and creating Kafka topics.   You will be able to monitor servers, brokers, consumers, producers, topics and messages.   You will also be able to easily build alerts based on various events that can occur with those entities.

From Cloudera Manager, we can now install and manage Kafka, SMM, NiFi and Hadoop services.

Let's create a Kafka topic, no command-line!

For a simple topic, we select Low size for replication factor of one and replica count of one.  We also set a cleanup policy of delete.

Let's create an alert.

For this one if the nifi-reader consumer group has a lag then send an email to me.

Let's browse our Kafka infrastructure in our AWS Cloudera Kafka cluster, so easy to navigate.

You can dive into a topic and see individual messages, see offsets, keys, values, timestamps and more.

Zoom into one message in a topic.

Let's analyze a topic's configuration.

The result of the alert we built is an email sent to me with this data:

Example Alert Sent

Notification id: 56d35dcc-8fc0-4c59-b70a-ccbd1bb35681,
Root resource name: nifi-reader,
Root resource type: CONSUMER,
Created timestamp: Thu Aug 22 18:42:41 UTC 2019 : 1566499361366,
Last updated timestamp: Thu Aug 22 18:42:41 UTC 2019 : 1566499361366, 
State: RAISED,

Alert policy : "ALERT IF ( CONSUMER (name="nifi-reader") CONSUMER_GROUP_LAG >= 100 )" has been evaluated to true Condition : "CONSUMER_GROUP_LAG>=100" has been evaluated to true for following CONSUMERS - CONSUMER = "nifi-reader" had following attribute values * CONSUMER_GROUP_LAG = 139


  • CSP 2.1 
  • CDH 6.3.0
  • Cloudera Schema Registry 0.80
  • CFM
  • Apache NiFi Registry 0.3.0
  • Apache NiFi
  • JDK 1.8


GTFS Real-time Streaming with Apache NiFi

GTFS Real-time Streaming with Apache NiFi

To facilitate ingesting GTFS Real-Time data, I have added a processor that converts GTFS (General Transit Feed Specification) formatted ProtoBuf data into JSON.   This is using the standard Google APIs to accomplish this.      You can see the Protocol Buffers schema here

We will be able to get data on trip updates including delays, service alerts including changed routes and vehicle positions which can have location and congestion information.   This is the same information that these public transit systems feed to Google Maps.

An Example NiFi Flow For Accessing GTFS Data

First we add my new NiFi Processor, which you can get as source and build the NAR.   Or download one of the release builds of my new NiFi NAR archive.   This is alpha level code done very quickly.   If you find bugs please report and suggest updates.

Once it is added to your canvas, you can change the name and scheduling.

The only setting currently is the URL of the GTFS resource (which may require a key from some transportation APIs).

Once it runs, it will quickly return a flow file containing a JSON formatted version of the GTFS result as well as some attributes with GTFS information.

 The JSON has a lot of arrays and subsections, not very flat.   We could dump this raw to Hive and query it as a JSON document.  Or we can parse it with FlattenJSON, EvaluateJSONPath, SplitJson or various other options.

As mentioned before the main sections are tripUpdate, vehicle and alerts.   I will parse this and use this for real-time Slack, Kafka, HBase and Kudu messaging.   This data can become critical to companies that have trucks or people trying to navigate systems.

My first use case is to see if I can find out this data real-time during time of emergencies or events to help communities.    Non-Profit agents will be able to have real-time status reports of what transportation is available.   You have to ingest, cleanse and transform data to become knowledge and that can empower the community.

Join me in Puerto Rico at the NetHope Global Summit.


Nar Release To Install in Apache NiFi 1.9.2


EFM Series: Using MiNiFi Agents on Raspberry Pi 4 with Intel Movidius Neural Compute Stick 2, Apache NiFi and AI

EFM Series:   Using MiNiFi Agents on Raspberry Pi 4 with Intel Movidius Neural Compute Stick 2, Apache NiFi and AI

The good news is Raspberry Pi 4 can run MiNiFi Java Agents, Intel Movidius Neural Compute Stick 2 and AI libraries.   You can now use this 4GB of RAM device to run IoT with AI on the edge.

Flow From MiNiFi Agent Running OpenVino, SysLog Tail and Grabbing WebCam Images

Configure The Execution of OpenVino Python Applications on RPI 4

Events Returning from Raspberry Pi 4

Models Used

Download model using downloader.


DATE=$(date +"%Y-%m-%d_%H%M")
fswebcam -q -r 1280x720 --no-banner /opt/demo/images/$DATE.jpg

python3 /opt/intel/openvino/build/ /opt/demo/images/$DATE.jpg


  • Apache NiFi
  • Apache NiFi - MiNiFi Agents
  • TensorFlow
  • OpenVino
  • Python 3
  • OpenCV DNN

Python Libraries

pip3 install getmac
pip3 install psutil
pip3 install --upgrade pip
pip3 install --upgrade setuptools

  • Intel Movidius Neural Compute Stick 2
  • Raspberry Pi 4 with 4GB RAM


We can run On-Premise on VMs, Containers, K8 or Bare metal.   Or on own of the clouds such as Google.



  •  /root/inference_engine_samples_build/armv7l/Release
  • /opt/intel/openvino

Find cacerts from Java JRE Lib Security for NiFi SSL

Find cacerts from Java JRE Lib Security for NiFi SSL For REST Calls

On NiFi 1.9.2+

On My Mac


Default Password is changeit


On Mac Get the Directory

On Linux 

ps -ef | grep -i java
readlink -f /usr/java/default/bin/java


Walk back from


Migrating Apache Flume Flows to Apache NiFi: Log to HDFS

Migrating Apache Flume Flows to Apache NiFi:  Log to HDFS

I am moving some Apache Flume flows over to Apache NiFi, this is the first one I am doing.    I am grabbing log files from a directory and pushing over to HDFS.   We had added some features beyond what was available in the original Flume flow.

We also can visually see it running and get access to real-time metrics and stats via Apache NiFi's REST API.


  1. Enter your directory holding your logs or files.   Check permissions and set file regular expression.
  2. Update FetchFile properties.
  3. UpdateAttribute to change your file name to what you would like.
  4. PutHDFS to store you file to HDFS.
  5. Add notifications, alerts and failure handling.


Fetch the files from the directory and then delete when done.

Other Attributes for ListFile

View the State of Files as They Are Read

Replace the FileName for HDFS Safe Saves

You can change the HDFS  Owner

It will create new directories

Configure Your HDFS Directory to Store To (can be many directories/subdirectories programmatically)

Set The Size of an HDFS Block

Choose Your Compression

Chose How You Want to Write Data (Append, Replace, Ignore, Fail, Append)

NiFi File Filter / Reg Ex To Match Input Name Patterns:   (wifi).*log

You can match any subset of files and directories that we may want to grab.

Also remove invalid characters from HDFS filename output

NiFi Output:

Attribute Values


Files Migrated

hdfs dfs -ls /flume
Found 7 items
-rw-r--r--   1 tspann hdfs     120824 2019-08-13 19:49 /flume/wifi-08-12-2019__100647.406.log
-rw-r--r--   1 tspann hdfs     125888 2019-08-13 19:49 /flume/wifi-08-12-2019__100651.863.log
-rw-r--r--   1 tspann hdfs     252917 2019-08-13 19:49 /flume/wifi-08-12-2019__100839.901.log
-rw-r--r--   1 tspann hdfs     255873 2019-08-13 19:49 /flume/wifi-08-12-2019__100845.958.log
-rw-r--r--   1 tspann hdfs     271315 2019-08-13 19:49 /flume/wifi-08-12-2019__100848.189.log
-rw-r--r--   1 tspann hdfs     271413 2019-08-13 19:49 /flume/wifi-08-12-2019__100853.107.log

-rw-r--r--   1 tspann hdfs      17078 2019-08-13 19:49 /flume/wifi.log