Using Cloudera Streams Messaging Manager for Apache Kafka Monitoring, Management, Analytics and CRUD

Using Cloudera Streams Messaging Manager for Apache Kafka Monitoring, Management, Analytics and CRUD

SMM is powerful tool to work with Apache Kafka and provide you with monitoring, management, analytics and creating Kafka topics.   You will be able to monitor servers, brokers, consumers, producers, topics and messages.   You will also be able to easily build alerts based on various events that can occur with those entities.

From Cloudera Manager, we can now install and manage Kafka, SMM, NiFi and Hadoop services.


Let's create a Kafka topic, no command-line!





For a simple topic, we select Low size for replication factor of one and replica count of one.  We also set a cleanup policy of delete.



Let's create an alert.


For this one if the nifi-reader consumer group has a lag then send an email to me.


Let's browse our Kafka infrastructure in our AWS Cloudera Kafka cluster, so easy to navigate.



You can dive into a topic and see individual messages, see offsets, keys, values, timestamps and more.


Zoom into one message in a topic.


Let's analyze a topic's configuration.



The result of the alert we built is an email sent to me with this data:


Example Alert Sent

Notification id: 56d35dcc-8fc0-4c59-b70a-ccbd1bb35681,
Root resource name: nifi-reader,
Root resource type: CONSUMER,
Created timestamp: Thu Aug 22 18:42:41 UTC 2019 : 1566499361366,
Last updated timestamp: Thu Aug 22 18:42:41 UTC 2019 : 1566499361366, 
State: RAISED,

Message:
Alert policy : "ALERT IF ( CONSUMER (name="nifi-reader") CONSUMER_GROUP_LAG >= 100 )" has been evaluated to true Condition : "CONSUMER_GROUP_LAG>=100" has been evaluated to true for following CONSUMERS - CONSUMER = "nifi-reader" had following attribute values * CONSUMER_GROUP_LAG = 139



Software



  • CSP 2.1 
  • CDH 6.3.0
  • Cloudera Schema Registry 0.80
  • CFM 1.0.1.0
  • Apache NiFi Registry 0.3.0
  • Apache NiFi 1.9.0.1.0
  • JDK 1.8



Resources







GTFS Real-time Streaming with Apache NiFi


GTFS Real-time Streaming with Apache NiFi


To facilitate ingesting GTFS Real-Time data, I have added a processor that converts GTFS (General Transit Feed Specification) formatted ProtoBuf data into JSON.   This is using the standard Google APIs to accomplish this.      You can see the Protocol Buffers schema here

We will be able to get data on trip updates including delays, service alerts including changed routes and vehicle positions which can have location and congestion information.   This is the same information that these public transit systems feed to Google Maps.

https://developers.google.com/transit/gtfs-realtime/


An Example NiFi Flow For Accessing GTFS Data




First we add my new NiFi Processor, which you can get as source and build the NAR.   Or download one of the release builds of my new NiFi NAR archive.   This is alpha level code done very quickly.   If you find bugs please report and suggest updates.





Once it is added to your canvas, you can change the name and scheduling.



The only setting currently is the URL of the GTFS resource (which may require a key from some transportation APIs).




Once it runs, it will quickly return a flow file containing a JSON formatted version of the GTFS result as well as some attributes with GTFS information.






 The JSON has a lot of arrays and subsections, not very flat.   We could dump this raw to Hive and query it as a JSON document.  Or we can parse it with FlattenJSON, EvaluateJSONPath, SplitJson or various other options.

As mentioned before the main sections are tripUpdate, vehicle and alerts.   I will parse this and use this for real-time Slack, Kafka, HBase and Kudu messaging.   This data can become critical to companies that have trucks or people trying to navigate systems.

My first use case is to see if I can find out this data real-time during time of emergencies or events to help communities.    Non-Profit agents will be able to have real-time status reports of what transportation is available.   You have to ingest, cleanse and transform data to become knowledge and that can empower the community.

Join me in Puerto Rico at the NetHope Global Summit.

Source

https://github.com/tspannhw/gtfs/

Nar Release To Install in Apache NiFi 1.9.2 


https://github.com/tspannhw/gtfs/releases

Resources




EFM Series: Using MiNiFi Agents on Raspberry Pi 4 with Intel Movidius Neural Compute Stick 2, Apache NiFi and AI

EFM Series:   Using MiNiFi Agents on Raspberry Pi 4 with Intel Movidius Neural Compute Stick 2, Apache NiFi and AI





The good news is Raspberry Pi 4 can run MiNiFi Java Agents, Intel Movidius Neural Compute Stick 2 and AI libraries.   You can now use this 4GB of RAM device to run IoT with AI on the edge.

Flow From MiNiFi Agent Running OpenVino, SysLog Tail and Grabbing WebCam Images



Configure The Execution of OpenVino Python Applications on RPI 4




Events Returning from Raspberry Pi 4




Models Used

Download model using downloader.

Github

https://github.com/tspannhw/minifi-rpi4-ncc2

DATE=$(date +"%Y-%m-%d_%H%M")
fswebcam -q -r 1280x720 --no-banner /opt/demo/images/$DATE.jpg

python3 /opt/intel/openvino/build/test.py /opt/demo/images/$DATE.jpg

Software

  • Apache NiFi
  • Apache NiFi - MiNiFi Agents
  • TensorFlow
  • OpenVino
  • Python 3
  • FSWEBCAM
  • OpenCV DNN
  • PSUTIL

Python Libraries

pip3 install getmac
pip3 install psutil
pip3 install --upgrade pip
pip3 install --upgrade setuptools

  • Intel Movidius Neural Compute Stick 2
  • Raspberry Pi 4 with 4GB RAM

Hosting

We can run On-Premise on VMs, Containers, K8 or Bare metal.   Or on own of the clouds such as Google.



Resources


Directories

  •  /root/inference_engine_samples_build/armv7l/Release
  • /opt/intel/openvino

Find cacerts from Java JRE Lib Security for NiFi SSL

Find cacerts from Java JRE Lib Security for NiFi SSL For REST Calls


On NiFi 1.9.2+

On My Mac

/Library/Java/JavaVirtualMachines/jdk1.8.0_211.jdk/Contents/Home/jre/lib/security/cacerts

Default Password is changeit
JKS
TLS



See:


https://stackoverflow.com/questions/11936685/how-to-obtain-the-location-of-cacerts-of-the-default-java-installation

On Mac Get the Directory

/usr/libexec/java_home
On Linux 

ps -ef | grep -i java
readlink -f /usr/java/default/bin/java

/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.191.b12-1.el7_6.x86_64/jre/lib/security/cacerts 

Walk back from


 /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.191.b12-1.el7_6.x86_64/bin/java

Migrating Apache Flume Flows to Apache NiFi: Log to HDFS

Migrating Apache Flume Flows to Apache NiFi:  Log to HDFS





I am moving some Apache Flume flows over to Apache NiFi, this is the first one I am doing.    I am grabbing log files from a directory and pushing over to HDFS.   We had added some features beyond what was available in the original Flume flow.

We also can visually see it running and get access to real-time metrics and stats via Apache NiFi's REST API.



Step-By-Step

  1. Enter your directory holding your logs or files.   Check permissions and set file regular expression.
  2. Update FetchFile properties.
  3. UpdateAttribute to change your file name to what you would like.
  4. PutHDFS to store you file to HDFS.
  5. Add notifications, alerts and failure handling.



ListFile



Fetch the files from the directory and then delete when done.


Other Attributes for ListFile

View the State of Files as They Are Read


Replace the FileName for HDFS Safe Saves

You can change the HDFS  Owner


It will create new directories



Configure Your HDFS Directory to Store To (can be many directories/subdirectories programmatically)



Set The Size of an HDFS Block


Choose Your Compression


Chose How You Want to Write Data (Append, Replace, Ignore, Fail, Append)



NiFi File Filter / Reg Ex To Match Input Name Patterns:   (wifi).*log

You can match any subset of files and directories that we may want to grab.


Also remove invalid characters from HDFS filename output


NiFi Output:

Attribute Values

absolute.path
/var/log/
file.creationTime
2019-08-12T10:08:53-0400
file.group
wheel
file.lastAccessTime
2019-08-12T10:08:53-0400
file.lastModifiedTime
2019-08-12T10:08:53-0400
file.owner
root
file.permissions
rw-r--r--
file.size
271413
filename
wifi-08-12-2019__10:08:53.107.log
path
./
uuid
6693546e-1241-4625-bbd8-b9dd5da9281a

Files Migrated



hdfs dfs -ls /flume
Found 7 items
-rw-r--r--   1 tspann hdfs     120824 2019-08-13 19:49 /flume/wifi-08-12-2019__100647.406.log
-rw-r--r--   1 tspann hdfs     125888 2019-08-13 19:49 /flume/wifi-08-12-2019__100651.863.log
-rw-r--r--   1 tspann hdfs     252917 2019-08-13 19:49 /flume/wifi-08-12-2019__100839.901.log
-rw-r--r--   1 tspann hdfs     255873 2019-08-13 19:49 /flume/wifi-08-12-2019__100845.958.log
-rw-r--r--   1 tspann hdfs     271315 2019-08-13 19:49 /flume/wifi-08-12-2019__100848.189.log
-rw-r--r--   1 tspann hdfs     271413 2019-08-13 19:49 /flume/wifi-08-12-2019__100853.107.log

-rw-r--r--   1 tspann hdfs      17078 2019-08-13 19:49 /flume/wifi.log


Source:

https://github.com/tspannhw/flume-to-nifi

Rapid IoT Development with Cloudera Edge Flow Manager - Using SenseHat, Intel Movidius, WebCameras and Electric Monitoring

Rapid IoT Development with Cloudera Edge Flow Manager - Using SenseHat, Intel Movidius, WebCameras and Electric Monitoring




I wanted to upgrade and change flows on my Movidius device, so the best option was to update to MiNiFi 0.6.0 Java Agent.   This allowed me to write a new flow graphically easily.  I also added the PiP library to connect to the HS-110 electric energy monitoring.   So we are grabbing sensor data from the Sense-Hat on the RPI, plus calling to the HS-110 sensor over TCP/IP, grabbing an image from our web camera and calling Apache MXNet and Caffe to run deep learning analytics against it.


So I installed the 0.6.0 Java MiNiFi agent on my RPI with SenseHat.   I enabled C2 in the configuration file and pointed to one of my EFM servers.


In our new flow, I execute the Python script that runs a Caffe and a MXNet model against a newly captured webcamera image.

I also list and fetch all the images (after a minute) that have been created by the webcamera.

Finally I execute a Python script to read from a local energy meter over a Python API.



All of this data (image and two types of JSON files) are sent to a NiFi cluster via S2S.   I route based on the type and process.   Energy monitoring data is queried via QueryRecord and sent to Slack if good.   The image is sent to my standard image processor group to be sent to CDSW for YOLO SSD analysis as well is to local TensorFlow and MXNet NiFi processors.   Finally the JSON Sensor/DL results are sent to Slack as a notification seen below.   This took less than an hour.  Easy AI - IoT / Edge 2 AI!



Example Web Camera Image


Technology:

  • Apache NiFi
  • Apache NiFi - MiNiFi
  • Cloudera Edge Flow Manager
  • Apache Kafka
  • Apache MXNet
  • Apache MXNet GluonCV
  • TensorFlow
  • Intel Movidius
  • Caffe
  • Python 2/3
  • PiP
  • TensorFlow Model zsoo
  • TP-Link-HS110
  • SenseHat Sensors
  • Raspberry Pi 3B+
  • USB Webcamera
  • Cloudera Data Science Workbench

Source:

References:

Creating Apache Kafka Topics Dynamically As Part of a DataFlow

Creating Apache Kafka Topics Dynamically As Part of a DataFlow


Sometimes when you are ingesting data at scale, whether it is from a Data Warehouse, Logs, REST API, IoT, Social Media or other sources, you may need to create new Apache Kafka topics depending on the type, variations, newness, schema, schema version or other changes.

Instead of having to manually create an Apache Kafka topic with Cloudera Streams Messaging Manager or Apache Kafka command line kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test), I would like to create it mid-stream based on names that are relevant to arriving data.   So this could be the name of the schema from the data, the table name of the origin date, some unique name generated with the data or another source.   For my example, I am generating a unique name via Apache NiFi Expression Language:

nifi${now():format('yyyyMMddmmss')}${UUID()}

This is a Proof of Concept, there are more features I would add if I wanted this for production use cases such as adding fields for Number Of Partitions and Number of Replicas.

Example Run



The processor is very easy to use, you merely enter your Kafka Broker URL, such as demo.hortonworks.com:6667.   And then the name of your Kafka topic.   The processor will validate to make sure you have a valid name which should be Alphanumeric with only the addition of periods, dashes and underscores.  It will run quickly and when completed, you can check out the results.  Your flowfile will be unchanged, but you will get new attributes as seen below.






You will get kafka.bootstrap (your Broker URL), kafka.client.id (a generate one time use client id), kafka.topic.<TOPIC_NAME> - with one for each Kafka topic that exists, kafka.topic.creation.success - a status of flag, kafka.topic.message - a message, kafka.topic.YourNewNamed one.






In IntelliJ I quickly developed this program using the Apache Kafka Admin API and some JUnit tests.

For a production use case I would probably just use the Cloudera SMM REST API to create topics.




It is trivial to call a REST API from Apache NiFi so I can use an Apache NiFi flow to orchestrate an entire Kafka lifecycle with management and monitoring for real-time interaction.

Source Code for Custom Apache NiFi Processor



Source Code fo Apache Kafka Shell Scripts


Edge Processing with Jetson Nano Part 3 - AI Integration

Edge Data Processing with Jetson Nano Part 3 - AI Integration






Top Level NiFi Flow Receiving MiNiFi Agent Messages





Overview of our Apache NiFi Flow For Processing









We format a new flow file to send to CDSW in JSON to the CDSW Job Environment




We Run Apache MXNet 1.3.1 (Java) SSD Against the Web Camera Image


Extract The Values From the FlowFile to Send to the Spark Job



Our JSON Results From the Logs






Log data has successfully arrived, consistent JSON rows are grabbed as they are written to the file
 



We can see the results of the Spark Job in Cloudera Data Science Workbench (CDSW)



We can also see messages that we sent to slack