Monitoring Number of Of Flow Files Queued in Apache NiFi via PyPortal

When I am developing in NiFi it is sometimes helpful to have an information radiator to tell me some things of immediate interest without going into the NiFi Summary screen.   So I made a quick information radiator with Adafruit's awesome PyPortal.   This device with screen connects to the WiFi network shared with my Apache NiFi node and will read the REST API to ingest the current number of total flow files queued in the system.

I saw mine was almost 300,000 flow files waiting, so I went into queues and emptied out ones I did not need to process.   There was nothing broken, but a few sinks no longer valid so I emptied those out.

NiFi Summary







System Statistics



When I first checked the Flow Files in the Queue, there were a lot of them.  






 As a Typical Business User, You Will Know Immediately Queue Count




Source:
https://github.com/tspannhw/pyportal-nifi-monitoring/tree/master

Every Minute Let's Grab the NiFi Flow Files in Queue via REST API


"""
grab data from Apache NiFi REST API
If you can find something that spits out JSON data, we can display it!
https://nifi.apache.org/docs/nifi-docs/rest-api/
"""
import time
import board
from adafruit_pyportal import PyPortal
# Set up where we'll be fetching data from
DATA_SOURCE = "http://hw13125.local:8080/nifi-api/flow/status"
DATA_LOCATION = ["controllerStatus", "flowFilesQueued"]
def text_transform(val):
format_str = "FlowFilesQueued in NiFi = {:d}"
return format_str.format(val)
# the current working directory (where this file is)
cwd = ("/"+__file__).rsplit('/', 1)[0]
pyportal = PyPortal(url=DATA_SOURCE, json_path=DATA_LOCATION,
status_neopixel=board.NEOPIXEL,
default_bg=cwd+"/cloudera.bmp",
text_font=cwd+"/fonts/Arial-ItalicMT-17.bdf",
text_position=(20, 20),
text_color=0xFFFFFF,
text_transform=text_transform)
pyportal.preload_font(b'$012345789') # preload numbers
while True:
try:
value = pyportal.fetch()
print("Response is", value)
except (ValueError, RuntimeError) as e:
print("Some error occured, retrying! -", e)
time.sleep(60) #



Moving data out of MongoDB Data Stores


Patient Monitoring Example Architecture

Stack
Apache Kafka
Cloudera Streaming Messaging Manager for Apache Kafka
Apache NiFi
Cloudera Schema Registry
Apache Hive
Apache Druid
Apache Spark
Apache Kafka Streams (KStreams)
Apache Ranger
Apache Atlas
Apache Ambari
Apache NiFi - MiNiFi
Apache Hadoop with Apache YARN and Apache HDFS

For Upcoming Projects

Cloudera Edge Manager

Using Sensors At Scale on the Edge Part 1 : Apache NiFi : MiNiFi : CEM

Using Sensors At Scale on the Edge Part 1 : Apache NiFi : MiNiFi : CEM


In the next release I will be adding CEM.

This is an update to an existing flow that adds new features based on Apache NiFi 1.9.1.   We will also be migrating this to MiNiFi 0.6.0 C++ with Cloudera CEM.



*Will be updated with CEM and MFC++ Agent 0.6.0


Links:



Source:






















Advanced XML Processing with Apache NiFi 1.9.1

Advanced XML Processing with Apache NiFi 1.9.1


With the latest version of Apache NiFi, you can now directly convert XML to JSON or Apache AVRO, CSV or any other format supported by RecordWriters.   This is a great advancement.  To make it even easier, you don't even need to know the schema before hand.   There is a built-in option to Infer Schema.



The results of an RSS (XML) feed converted to JSON and displayed in a slack channel.



Besides just RSS feeds, we can grab regular XML data including XML data that is wrapped in a Zip file (or even in a Zipfile in an email, SFTP server or Google Docs).


Get the Hourly Weather Observation for the United States


Decompress That Zip


 Unpack That Zip into Files


One ZIP becomes many XML files of data.



An example XML record from a NOAA weather station.



Converted to JSON Automagically




Let's Read Those Records With A Query and Convert the results to JSON Records



Read RSS Feed of Travel Warnings




In this one simple example, we are ingesting all of the observed weather from all observation stations in the United States via one downloaded ZIP file containing all of the hourly XMLs.  Apache NiFi can easily acquire this file, decompress it and unpack the files from the zip.   We can then convert all of them as records by inferring their schema and building new records in the output format of our choice.   We can then pull apart values we need or push this new cleaned format to one or more storage options including HBase, HDFS, Hive, SQL Database, MongoDB or elsewhere.  We can also send this data on via Apache Kafka to a streaming engine such as Kafka Streams or Spark Structured Streaming for more processing or joining with other datasets.








QueryRecord is allowing us to write a SQL query such as SELECT * FROM FLOWFILE reading XML records and producing JSON records as a result.  We can change fields or add things like SUMs and AVGs.


References:


Hourly Update