Automating Starting Services in Apache NiFi and Applying Parameters

Automating Starting Services in Apache NiFi and Applying Parameters

Automate all the things!   You can call these commands interactively or script all of them with awesome devops tools.  Andre and Dan can tell you more about that.

Enable All NiFi Services on the Canvas

By running this three times, I get any stubborn ones or ones that needed something previously running.   This could be put into a loop and check the status before trying again.

nifi pg-list
nifi pg-status
nifi pg-get-services

 The NiFi CLI has interactive help available and also some good documentation:

/opt/demo/nifi-toolkit-1.12.1/bin/ nifi pg-enable-services  -u http://edge2ai-1.dim.local:8080 --processGroupId root  

/opt/demo/nifi-toolkit-1.12.1/bin/ nifi pg-enable-services  -u http://edge2ai-1.dim.local:8080 --processGroupId root  

/opt/demo/nifi-toolkit-1.12.1/bin/ nifi pg-enable-services  -u http://edge2ai-1.dim.local:8080 --processGroupId root  

We could then start a process group if we wanted:

nifi pg-start -u http://edge2ai-1.dim.local:8080 -pgid 2c1860b3-7f21-36f4-a0b8-b415c652fc62  

List all process groups

/opt/demo/nifi-toolkit-1.12.1/bin/ nifi pg-list -u http://edge2ai-1.dim.local:8080   

List Parameters

/opt/demo/nifi-toolkit-1.12.1/bin/ nifi list-param-contexts -u http://edge2ai-1.dim.local:8080 -verbose  

Set parameters to set parameter context for a process group, you can loop to do all.

  • pgid => parameter group id
  • pcid => parameter context id

I need to put this in a shell or python script:

/opt/demo/nifi-toolkit-1.12.1/bin/ nifi pg-set-param-context -u http://edge2ai-1.dim.local:8080 -verbose -pgid 2c1860b3-7f21-36f4-a0b8-b415c652fc62  -pcid 39f0f296-0177-1000-ffff-ffffdccb6d90


You could also use the NiFi REST API or Dan's awesome Python API (


Migrating from Apache Storm to Apache Flink

 Migrating from Apache Storm to Apache Flink

The first thing you need to do is to not just pick up and dump to a new system, but to see what can be reconfigured, refactored or reimagined.   For some routing, transformation or simple ingest type applications or solution parts you may want to use Apache NiFi.

For others Spark or Spark Streaming can quickly meet your needs.   For simple Thing to Kafka or Kafka to Thing flows, a flow with Kafka Connect is appropriate.   For things that need to run in individual devices, containers, pods you may want to move a small application to NiFi Stateless.    There are also sometimes a simple Kafka Stream application will meet your needs.

For many use cases you can replace a compiled application with some solid Flink SQL code.   For some discussions, check this out.

For some really good information on how to migrate Storm solutions to Flink,  Cloudera has a well documented solution for you:





FLaNK: Real-Time Transit Information For NY/NJ/CT (TRANSCOM)

 FLaNK:   Real-Time Transit Information For NY/NJ/CT (TRANSCOM)

FREQUENCY:  Every Minute
DESTINATIONS:   HDFS, Kudu/Impala, Cloud, Kafka

The main source of this real-time transit updates for New Jersey, New York and Connecticut is TRANSCOM.   I will read from this datasource every minute to know about real-time traffic events that occurring on the roads and transportation systems near me.   We will be reading the feed that is in XML/RSS format and parse out the hundreds of events that come with each minutes update.   

I want to store the raw XML/RSS file in S3/ADLS2/HDFS or GCS, that's an easy step.   I will also parse and enhance this data for easier querying and tracking.

I will add to all events a unique ID and a timestamp as the data is streaming by.   I will store my data in Impala/Kudu for fast queries and upserts.   I can then build some graphs, charts and tables with Apache Hue and Cloudera Visual Applications.   I will also publish my data as AVRO enhanced with a schema to Kafka so that I can use it from Spark, Kafka Connect, Kafka Streams and Flink SQL applications.

  1. GenerateFlowFile - optional scheduler
  2. InvokeHTTP - call RSS endpoint
  3. PutHDFS - store raw data to Object or File store on premise or in the cloud via HDFS / S3 / ADLSv2 / GCP / Ozone / ...
  4.  QueryRecord - convert XML to JSON
  5. SplitJSON - break out individual events

  1. UpdateAttribute - set schema name
  2. UpdateRecord - generate an add a unique ID and timestamp
  3. UpdateRecord - clean up the point field
  4. UpdateRecord - remove garbage whitespace

  1. PutKudu - upsert new data to our Impala / Kudu table.
  2. RetryFlowFile - retry if network or other connectivity issue.

Send Messages to Kafka

Our flow has delivered many messages to our transcomevents topic as schema attached Apache Avro formatted messages.

SMM links into the Schema Registry and schema for this topic.

We use a schema for validation and as a contract between consumers and producers of these traffic events.

Since events are streaming into our Kafka topic and have a schema, we can query them with Continuous SQL with Flink SQL.  We can then run some Continuous ETL.

We could also consume this data with Structured Spark Streaming applications, Spring Boot apps, Kafka Streams, Stateless NiFi and Kafka Connect applications.

We also stored our data in Impala / Kudu for permanent storage, ad-hoc queries, analytics, Cloudera Visualizations, reports, applications and more.

It is very easy to have fast data against our agile Cloud Data Lakehouse.

Source Code


FLaNK: Using Apache Kudu As a Cache For FDA Updates

 FLaNK:  Using Apache Kudu As a Cache For FDA Updates

  1. InvokeHTTP:   We invoke the RSS feed to get our data.
  2. QueryRecord:   Convert RSS to JSON
  3. SplitJson:  Split one file into individual records.   (Should refactor to ForkRecord)
  4. EvaluateJSONPath:  Extract attributes you need.
  5. ProcessGroup for SPL Processing

We call and check the RSS feed frequently and we parse out the records from the RSS(XML) feed and check against our cache.   We use Cloudera's Real-Time Datahub with Apache Impala/Apache Kudu as our cache to see if we already received a record for that.   If it hasn't arrived yet, it will be added to the Kudu cache and processed.

SPL Processing

We use the extracted SPL to grab a detailed record for that SPL using InvokeHTTP.

We have created an Impala/Kudu table to cache this information with Apache Hue.

We use the LookupRecord to read from our cache.

If we don't have that value yet, we send it to the table for an UPSERT.

We send our raw data as XML/RSS to HDFS for archival use and audits.

We can see the results of our flow in Apache Atlas to see full governance of our solution.

So with some simple NiFi flow, we can ingest all the new updates to DailyMed and not reprocess anything that we already have.