Google Coral TPU with Edge Devices and MiNiFi

Google Coral TPU with Edge Devices and MiNiFi 


Designing Our Edge AI Flow with Cloudera Edge Flow Manager.


Configure Your Remote Process Group to Send Data to Your NiFi Cluster


Monitor Your Agents From the Events Screen


Let's grab all the new images and then delete on completion



We have Input and Output Ports to have Bidirectional communication with 0-n MiNiFi agents


Our NiFi flow to process calls from MiNiFi Agents running Coral TPUs


We run a query to check the TensorFlow Lite classification results and send out a slack message.



Let's push JSON data to a Kafka Cluster in AWS





Updating Machine Learning Models At The Edge With Apache NiFi and MiNiFi

Updating Machine Learning Models At The Edge With Apache NiFi and MiNiFi

Yes, we have bidirectional communication with MiNiFi agents from Apache NiFi via Site-to-Site (S2S) over HTTPS.   This means I can push in anything I want to the agent, including commands, files and updates.

I can also transmit data to edge agents via MQTT, REST and Kafka amongst other options.


NiFi Ready To Send and Receive Messages From Other NiFi Nodes, Clusters and MiNiFi Agents


Our NiFi flow is consuming Kafka and MQTT Messages, as well as reading updated model files and generating integration test sensor data.



MiNiFi Agents Have Downloaded The Model and Anything Else We Send to It



It's Easy to Configure MQTT Message Consumption in CEM, we just need the broker (with port) and a topic to filter on if you wish.




To Listen For Files/Models You can easily add a REST End Point to Proxy in Data of Your Choice with or without SSL


Here's an example CURL script to test that REST API:

curl -d '{"key1":"value1", "key2":"value2"}' -H "Content-Type: application/json" -X POST http://ec2-3-85-54-189.compute-1.amazonaws.com:8899/upload


We can generate JSON IoT Style Data for Integration Tests with ease using GenerateFlowFile:


Let's grab updated models when they change from my Data Science server:


I can read Kafka messages and send them to MiNiFi agents as well.




So I pushed a TFLITE model, but ONNX, PMML, Docker or Pickle are all options.


Generating JSON Data in Apache NiFi



Example of a JSON IoT Generator


{
   "ip": ${ip()},
   "unique_id": "${UUID()}",
   "thread": "${thread()}",
   "hostname": "${hostname()}",
   "sensor_9": ${random():mod(100)},
   "sensor_id": ${random():mod(30)},
   "sensor_3": ${random():mod(50)},
   "sensor_2": ${random():mod(500)},
   "sensor_1": ${random():mod(110)},
   "sensor_0": ${random():mod(150)},
   "sensor_7": ${random():mod(255)},
   "sensor_6": ${random():mod(95)},
   "sensor_5": ${random():mod(80)},
   "sensor_ts": ${now():toNumber()},
   "sensor_8": ${random():mod(120)},
   "sensor_4": ${random():mod(60)},
   "sensor_11": ${random():mod(20)},
   "sensor_10": ${random():mod(10)}
}

Resources


Using Cloudera Streams Messaging Manager for Apache Kafka Monitoring, Management, Analytics and CRUD

Using Cloudera Streams Messaging Manager for Apache Kafka Monitoring, Management, Analytics and CRUD

SMM is powerful tool to work with Apache Kafka and provide you with monitoring, management, analytics and creating Kafka topics.   You will be able to monitor servers, brokers, consumers, producers, topics and messages.   You will also be able to easily build alerts based on various events that can occur with those entities.

From Cloudera Manager, we can now install and manage Kafka, SMM, NiFi and Hadoop services.


Let's create a Kafka topic, no command-line!





For a simple topic, we select Low size for replication factor of one and replica count of one.  We also set a cleanup policy of delete.



Let's create an alert.


For this one if the nifi-reader consumer group has a lag then send an email to me.


Let's browse our Kafka infrastructure in our AWS Cloudera Kafka cluster, so easy to navigate.



You can dive into a topic and see individual messages, see offsets, keys, values, timestamps and more.


Zoom into one message in a topic.


Let's analyze a topic's configuration.



The result of the alert we built is an email sent to me with this data:


Example Alert Sent

Notification id: 56d35dcc-8fc0-4c59-b70a-ccbd1bb35681,
Root resource name: nifi-reader,
Root resource type: CONSUMER,
Created timestamp: Thu Aug 22 18:42:41 UTC 2019 : 1566499361366,
Last updated timestamp: Thu Aug 22 18:42:41 UTC 2019 : 1566499361366, 
State: RAISED,

Message:
Alert policy : "ALERT IF ( CONSUMER (name="nifi-reader") CONSUMER_GROUP_LAG >= 100 )" has been evaluated to true Condition : "CONSUMER_GROUP_LAG>=100" has been evaluated to true for following CONSUMERS - CONSUMER = "nifi-reader" had following attribute values * CONSUMER_GROUP_LAG = 139



Software



  • CSP 2.1 
  • CDH 6.3.0
  • Cloudera Schema Registry 0.80
  • CFM 1.0.1.0
  • Apache NiFi Registry 0.3.0
  • Apache NiFi 1.9.0.1.0
  • JDK 1.8



Resources







GTFS Real-time Streaming with Apache NiFi


GTFS Real-time Streaming with Apache NiFi


To facilitate ingesting GTFS Real-Time data, I have added a processor that converts GTFS (General Transit Feed Specification) formatted ProtoBuf data into JSON.   This is using the standard Google APIs to accomplish this.      You can see the Protocol Buffers schema here

We will be able to get data on trip updates including delays, service alerts including changed routes and vehicle positions which can have location and congestion information.   This is the same information that these public transit systems feed to Google Maps.

https://developers.google.com/transit/gtfs-realtime/


An Example NiFi Flow For Accessing GTFS Data




First we add my new NiFi Processor, which you can get as source and build the NAR.   Or download one of the release builds of my new NiFi NAR archive.   This is alpha level code done very quickly.   If you find bugs please report and suggest updates.





Once it is added to your canvas, you can change the name and scheduling.



The only setting currently is the URL of the GTFS resource (which may require a key from some transportation APIs).




Once it runs, it will quickly return a flow file containing a JSON formatted version of the GTFS result as well as some attributes with GTFS information.






 The JSON has a lot of arrays and subsections, not very flat.   We could dump this raw to Hive and query it as a JSON document.  Or we can parse it with FlattenJSON, EvaluateJSONPath, SplitJson or various other options.

As mentioned before the main sections are tripUpdate, vehicle and alerts.   I will parse this and use this for real-time Slack, Kafka, HBase and Kudu messaging.   This data can become critical to companies that have trucks or people trying to navigate systems.

My first use case is to see if I can find out this data real-time during time of emergencies or events to help communities.    Non-Profit agents will be able to have real-time status reports of what transportation is available.   You have to ingest, cleanse and transform data to become knowledge and that can empower the community.

Join me in Puerto Rico at the NetHope Global Summit.

Source

https://github.com/tspannhw/gtfs/

Nar Release To Install in Apache NiFi 1.9.2 


https://github.com/tspannhw/gtfs/releases

Resources




EFM Series: Using MiNiFi Agents on Raspberry Pi 4 with Intel Movidius Neural Compute Stick 2, Apache NiFi and AI

EFM Series:   Using MiNiFi Agents on Raspberry Pi 4 with Intel Movidius Neural Compute Stick 2, Apache NiFi and AI





The good news is Raspberry Pi 4 can run MiNiFi Java Agents, Intel Movidius Neural Compute Stick 2 and AI libraries.   You can now use this 4GB of RAM device to run IoT with AI on the edge.

Flow From MiNiFi Agent Running OpenVino, SysLog Tail and Grabbing WebCam Images



Configure The Execution of OpenVino Python Applications on RPI 4




Events Returning from Raspberry Pi 4




Models Used

Download model using downloader.

Github

https://github.com/tspannhw/minifi-rpi4-ncc2

DATE=$(date +"%Y-%m-%d_%H%M")
fswebcam -q -r 1280x720 --no-banner /opt/demo/images/$DATE.jpg

python3 /opt/intel/openvino/build/test.py /opt/demo/images/$DATE.jpg

Software

  • Apache NiFi
  • Apache NiFi - MiNiFi Agents
  • TensorFlow
  • OpenVino
  • Python 3
  • FSWEBCAM
  • OpenCV DNN
  • PSUTIL

Python Libraries

pip3 install getmac
pip3 install psutil
pip3 install --upgrade pip
pip3 install --upgrade setuptools

  • Intel Movidius Neural Compute Stick 2
  • Raspberry Pi 4 with 4GB RAM

Hosting

We can run On-Premise on VMs, Containers, K8 or Bare metal.   Or on own of the clouds such as Google.



Resources


Directories

  •  /root/inference_engine_samples_build/armv7l/Release
  • /opt/intel/openvino

Find cacerts from Java JRE Lib Security for NiFi SSL

Find cacerts from Java JRE Lib Security for NiFi SSL For REST Calls


On NiFi 1.9.2+

On My Mac

/Library/Java/JavaVirtualMachines/jdk1.8.0_211.jdk/Contents/Home/jre/lib/security/cacerts

Default Password is changeit
JKS
TLS



See:


https://stackoverflow.com/questions/11936685/how-to-obtain-the-location-of-cacerts-of-the-default-java-installation

On Mac Get the Directory

/usr/libexec/java_home
On Linux 

ps -ef | grep -i java
readlink -f /usr/java/default/bin/java

/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.191.b12-1.el7_6.x86_64/jre/lib/security/cacerts 

Walk back from


 /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.191.b12-1.el7_6.x86_64/bin/java

Migrating Apache Flume Flows to Apache NiFi: Log to HDFS

Migrating Apache Flume Flows to Apache NiFi:  Log to HDFS





I am moving some Apache Flume flows over to Apache NiFi, this is the first one I am doing.    I am grabbing log files from a directory and pushing over to HDFS.   We had added some features beyond what was available in the original Flume flow.

We also can visually see it running and get access to real-time metrics and stats via Apache NiFi's REST API.



Step-By-Step

  1. Enter your directory holding your logs or files.   Check permissions and set file regular expression.
  2. Update FetchFile properties.
  3. UpdateAttribute to change your file name to what you would like.
  4. PutHDFS to store you file to HDFS.
  5. Add notifications, alerts and failure handling.



ListFile



Fetch the files from the directory and then delete when done.


Other Attributes for ListFile

View the State of Files as They Are Read


Replace the FileName for HDFS Safe Saves

You can change the HDFS  Owner


It will create new directories



Configure Your HDFS Directory to Store To (can be many directories/subdirectories programmatically)



Set The Size of an HDFS Block


Choose Your Compression


Chose How You Want to Write Data (Append, Replace, Ignore, Fail, Append)



NiFi File Filter / Reg Ex To Match Input Name Patterns:   (wifi).*log

You can match any subset of files and directories that we may want to grab.


Also remove invalid characters from HDFS filename output


NiFi Output:

Attribute Values

absolute.path
/var/log/
file.creationTime
2019-08-12T10:08:53-0400
file.group
wheel
file.lastAccessTime
2019-08-12T10:08:53-0400
file.lastModifiedTime
2019-08-12T10:08:53-0400
file.owner
root
file.permissions
rw-r--r--
file.size
271413
filename
wifi-08-12-2019__10:08:53.107.log
path
./
uuid
6693546e-1241-4625-bbd8-b9dd5da9281a

Files Migrated



hdfs dfs -ls /flume
Found 7 items
-rw-r--r--   1 tspann hdfs     120824 2019-08-13 19:49 /flume/wifi-08-12-2019__100647.406.log
-rw-r--r--   1 tspann hdfs     125888 2019-08-13 19:49 /flume/wifi-08-12-2019__100651.863.log
-rw-r--r--   1 tspann hdfs     252917 2019-08-13 19:49 /flume/wifi-08-12-2019__100839.901.log
-rw-r--r--   1 tspann hdfs     255873 2019-08-13 19:49 /flume/wifi-08-12-2019__100845.958.log
-rw-r--r--   1 tspann hdfs     271315 2019-08-13 19:49 /flume/wifi-08-12-2019__100848.189.log
-rw-r--r--   1 tspann hdfs     271413 2019-08-13 19:49 /flume/wifi-08-12-2019__100853.107.log

-rw-r--r--   1 tspann hdfs      17078 2019-08-13 19:49 /flume/wifi.log


Source:

https://github.com/tspannhw/flume-to-nifi