New Features of Apache NiFi 1.13.2

 New Features of Apache NiFi 1.13.2























New Features
  • ListenFTP
  • UpdateHiveTable - Hive DDL changes -Hive Update Schema ie Data Drift ie Hive Schema Migration!!!!
  • SampleRecord - different sampling approaches to records (Interval Sampling, Probabilistic Sampling, Reservoir Sampling)
  • CDC Updates
  • Kudu updates
  • AMQP and MQTT Integration Upgrades
  • ConsumeMQTT - readers and writers added
  • HTTP access to NiFi by default is now configured to accept connections to 127.0.0.1/localhost only.  If you want to allow broader access for some reason for HTTP and you understand the security implications you can still control that as always by changing the 'nifi.web.http.host' property in nifi.properties as always. That said, please take the time to configure proper HTTPS.  We offer detailed instructions and tooling to assist.
  • ConsumeMQTT - add record reader/writer
  • The ability to run NiFi with no GUI as MiNiFi/NiFi combined code base continues.
  • Support for Kudu Dates (https://kudu.apache.org/releases/1.12.0/docs/release_notes.html)
  • Updated GRPC versions
  • Apache Calcite update
  • PutDatabaseRecord update

Here is an example NiFi ETL Flow:


Example NiFi 1.13.2 Flow:
  • ConsumeMQTT:   now with readers
  • UpdateAttribute:   set record.sink.name to kafka and recordreader.name to json.
  • SampleRecord:   sample a few of the records
  • PutRecord:   Use reader and destination service
  • UpdateHiveTable:   new sink
Consume from MQTT and read and write to/from records.

Some example attributes from a running flow:

Connection Pools for DatabaseRecordSinks can be JDBC, Hadoop and Hive.



FreeFormTextRecordSetWriter is great for writing any format.



RecordSinkService we will pick Kafka as our destination.



KafkaRecordSink from PutRecord


Reader will pick json in our example based on our UpdateAttribute, we can dynamically change this as data streams.

ReaderLookup  - lets you pick a reader based on an attribute.



We have defined readers for Parquet, JSON, AVRO, XML and CSV so no matter the type I can automagically read it.    Great for reusing code and great for cases like our new ListenFTP where you may get sent tons of different files to process.   Use one FLOW!


RecordSinkService can help you make all our flows generic so you can drop in different sinks/destinations for your writers based on what the data coming in is.   This is revolutionary for code reuse.


We can write our output in a custom format that could look like a document, HTML, fixed width, a form letter, weird delimiter or whatever you need.


Sample records using different methods.


We use the RecordSinkServiceLookup to allow us to change our sink location dynamically, we are passing in an attribute to choose Kafka.


We have pushed our data to Kafka via the KafkaRecordSink.  We can see our data easily in Streams Messaging Manager (SMM).


With a RecordReaderFactory, you can pick readers like the new WindowsEventLogReader.



As another output, we can UpdateHiveTable from our data and change the table as needed.



Straight From Release Notes:  New Feature
  • [NIFI-7386] - AzureStorageCredentialsControllerService should also connect to storage emulator
  • [NIFI-7429] - Add Status History capabilities for system level metrics
  • [NIFI-7549] - Adding Hazelcast based implementation for DistributedMapCacheClient
  • [NIFI-7624] - Build a ListenFTP processor
  • [NIFI-7745] - Add a SampleRecord processor
  • [NIFI-7796] - Add Prometheus metrics for total bytes received and bytes sent for components
  • [NIFI-7801] - Add acknowledgement check to Splunk
  • [NIFI-7821] - Create a Cassandra implementation of DistributedMapCacheClient
  • [NIFI-7879] - Create record path function for UUID v5
  • [NIFI-7906] - Add graph processor with flexibility to query graph database conditioned on flowfile content and attributes
  • [NIFI-7989] - Add Hive "data drift" processor
  • [NIFI-8136] - Allow State Management to be tied to Process Session
  • [NIFI-8142] - Add "on conflict do nothing" feature to PutDatabaseRecord
  • [NIFI-8146] - Allow RecordPath to be used for specifying operation type and data fields when using PutDatabaseRecord
  • [NIFI-8175] - Add a WindowsEventLogReader


An update Cloudera Flow Management!

Cloudera Flow Management on DataHub Public Cloud 


This minor update has some Schema Registry and Atlas integration updates.  



If that wasn't enough, new version of MiNiFi C++ Agent!

Cloudera Edge Manager 1.2.2 Release


February 15, 2021

CEM MiNiFi C++ Agent - 1.21.01 release includes:
  • Support for JSON output in the Consume Windows Even Log processor
  • Full Expression Language support on Windows
  • Full S3 support (List, Fetch, Get, Put)


Remember when you are done.



Drivers to use with NiFi

Cloudera JDBC 2.6.20 driver for Apache Impala.  

This release has the following enhancements and fixes:

Enhancements and New Features

Ingest Into the Cloud

Ingest Into the Cloud




Ingesting Data into Apache Kafka in CDP Public Cloud
Describes how to use an Apache NiFi data flow to ingest data into Apache Kafka in CDP Public Cloud.
Ingesting Data into Apache Hive in CDP Public Cloud
Describes how to use an Apache NiFi data flow to ingest data into Apache Hive in CDP Public Cloud.
Ingesting Data into Apache HBase in CDP Public Cloud
Describes how to use an Apache NiFi data flow to ingest data into Apache HBase in CDP Public Cloud.
Ingesting Data into Apache Kudu in CDP Public Cloud
Describes how to use an Apache NiFi data flow to ingest data into Apache Kudu in CDP Public Cloud.
Ingesting Data into Amazon S3 Buckets
Describes how to use an Apache NiFi data flow to ingest data into Amazon S3 buckets in CDP Public Cloud.
Ingesting Data into Azure Data Lake Storage
Describes how to use an Apache NiFi data flow to ingest data into Azure Data Lake Storage in CDP Public Cloud.
Ingesting Data into Google Cloud Storage
Describes how to use an Apache NiFi data flow to ingest data into Google Cloud Storage in CDP Public Cloud.



Other resources:


















Cloud Sources and Sinks
Apache NiFi - Apache Kafka - Apache Hive - Apache HBase - Apache Kudu - Apache Impala - Apache Hadoop - Apache HDFS - AWS S3 - Azure ADLS - Google GCS - Files - REST

Using Apache NiFi in OpenShift and Anywhere Else to Act as Your Global Integration Gateway

Using Apache NiFi in OpenShift and Anywhere Else to Act as Your Global Integration Gateway


What does it look like?



Where Can I Run This Magic Engine:

Private Cloud, Public Cloud, Hybrid Cloud, VM, Bare Metal, Single Node, Laptop, Raspberry Pi or anywhere you have a 1GB of RAM and some CPU is a good place to run a powerful graphical integration and dataflow engine.   You can also run MiNiFi C++ or Java agents if you want it even smaller.


Sounds Too Powerful and Expensive:

Apache NiFi is Open Source and can be run freely anywhere.


For What Use Cases:

Microservices, Images, Deep Learning and Machine Learning Models, Structured Data, Unstructured Data, NLP, Sentiment Analysis, Semistructured Data, Hive, Hadoop, MongoDB, ElasticSearch, SOLR, ETL/ELT, MySQL CDC, MySQL Insert/Update/Delete/Query, Hosting Unlimited REST Services, Interactive with Websockets, Ingesting Any REST API, Natively Converting JSON/XML/CSV/TSV/Logs/Avro/Parquet, Excel, PDF, Word Documents, Syslog, Kafka, JMS, MQTT, TCP/IP, UDP, FTP, sFTP, Files, Directories, Google Forms, Object Stores, NoSQL, Lookups, Hosting Web sites, Updates and live SQL on data streams.



 MySQL/REST/MQTT/JMS/REST/Files/S3/Object Stores.   You also have an expert available on NiFi here.   https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_9.html. 

This makes these tasks much easier to develop, deploy, manage and control.  A single Data Engineer can now build, deploy and manage thousands of data streams in batch, microbatch and streams.





How about version control?

NiFi Registry provides easy to integrate version control with full REST API and can export your flows to a Git repository like Github.




DevOps?



How about deployment?

Apache NiFi can run anywhere!

You can run Apache NiFi on a single VM or localhost or laptop:

https://nifi.apache.org/download.html


On OpenShift

https://catalog.redhat.com/software/containers/cdt-common-rns/nifi/6026bb6c2937380b51711b73

https://github.com/rromannissen/nifi-openshift


Apache NiFi Stateless can run all in RAM, one event at a time like a Job or Function as a Service:

https://github.com/SamHjelmfelt/OpenWhisk-YarnDeployment

https://github.com/apache/nifi/blob/7d76bcd5202a8680c952d3a19072087a971d0b69/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/README.md


Docker

https://hub.docker.com/r/apache/nifi




What If I don't like easy to use Web UIs?

You can code everything with REST calls:

https://nifi.apache.org/docs/nifi-docs/rest-api/index.html

Okay, maybe not that low-level, what about a CLI?

You can run and install it here:  https://nifi.apache.org/docs/nifi-docs/html/toolkit-guide.html#nifi_CLI



Can I get more information:


I need support:

A partner of Amazon, Google, Microsoft, Oracle, IBM and thousands more, you can trust Cloudera for enterprise cloud hosting, support and development.  Cloudera has a majority of developers of Apache NiFi working on Open Source.

https://www.cloudera.com/products/cdf.html






Automating Starting Services in Apache NiFi and Applying Parameters

Automating Starting Services in Apache NiFi and Applying Parameters

Automate all the things!   You can call these commands interactively or script all of them with awesome devops tools.  Andre and Dan can tell you more about that.


Enable All NiFi Services on the Canvas

By running this three times, I get any stubborn ones or ones that needed something previously running.   This could be put into a loop and check the status before trying again.

nifi pg-list
nifi pg-status
nifi pg-get-services

 The NiFi CLI has interactive help available and also some good documentation:

https://nifi.apache.org/docs/nifi-docs/html/toolkit-guide.html#nifi_CLI


/opt/demo/nifi-toolkit-1.12.1/bin/cli.sh nifi pg-enable-services  -u http://edge2ai-1.dim.local:8080 --processGroupId root  


/opt/demo/nifi-toolkit-1.12.1/bin/cli.sh nifi pg-enable-services  -u http://edge2ai-1.dim.local:8080 --processGroupId root  


/opt/demo/nifi-toolkit-1.12.1/bin/cli.sh nifi pg-enable-services  -u http://edge2ai-1.dim.local:8080 --processGroupId root  


We could then start a process group if we wanted:


nifi pg-start -u http://edge2ai-1.dim.local:8080 -pgid 2c1860b3-7f21-36f4-a0b8-b415c652fc62  


List all process groups


/opt/demo/nifi-toolkit-1.12.1/bin/cli.sh nifi pg-list -u http://edge2ai-1.dim.local:8080   


List Parameters


/opt/demo/nifi-toolkit-1.12.1/bin/cli.sh nifi list-param-contexts -u http://edge2ai-1.dim.local:8080 -verbose  


Set parameters to set parameter context for a process group, you can loop to do all.

  • pgid => parameter group id
  • pcid => parameter context id


I need to put this in a shell or python script:

/opt/demo/nifi-toolkit-1.12.1/bin/cli.sh nifi pg-set-param-context -u http://edge2ai-1.dim.local:8080 -verbose -pgid 2c1860b3-7f21-36f4-a0b8-b415c652fc62  -pcid 39f0f296-0177-1000-ffff-ffffdccb6d90


Example

https://github.com/tspannhw/ApacheConAtHome2020/blob/main/scripts/setupnifi.sh


You could also use the NiFi REST API or Dan's awesome Python API (https://nipyapi.readthedocs.io/en/latest/).

References

Migrating from Apache Storm to Apache Flink

 Migrating from Apache Storm to Apache Flink

The first thing you need to do is to not just pick up and dump to a new system, but to see what can be reconfigured, refactored or reimagined.   For some routing, transformation or simple ingest type applications or solution parts you may want to use Apache NiFi.

For others Spark or Spark Streaming can quickly meet your needs.   For simple Thing to Kafka or Kafka to Thing flows, a flow with Kafka Connect is appropriate.   For things that need to run in individual devices, containers, pods you may want to move a small application to NiFi Stateless.    There are also sometimes a simple Kafka Stream application will meet your needs.

For many use cases you can replace a compiled application with some solid Flink SQL code.   For some discussions, check this out.

For some really good information on how to migrate Storm solutions to Flink,  Cloudera has a well documented solution for you:

https://docs.cloudera.com/csa/1.2.0/stormflink-migration/topics/csa-stormflink-migration-process.html


Conceptual

https://docs.cloudera.com/csa/1.2.0/stormflink-migration/topics/csa-stormflink-concept.html



Architecture

https://docs.cloudera.com/csa/1.2.0/stormflink-migration/topics/csa-stormflink-architecture.html





Redistribution

https://docs.cloudera.com/csa/1.2.0/stormflink-migration/topics/csa-stormflink-redistribution.html


References


FLaNK: Real-Time Transit Information For NY/NJ/CT (TRANSCOM)

 FLaNK:   Real-Time Transit Information For NY/NJ/CT (TRANSCOM)

SOURCE: XML/RSS REST ENDPOINT 
FREQUENCY:  Every Minute
DESTINATIONS:   HDFS, Kudu/Impala, Cloud, Kafka




The main source of this real-time transit updates for New Jersey, New York and Connecticut is TRANSCOM.   I will read from this datasource every minute to know about real-time traffic events that occurring on the roads and transportation systems near me.   We will be reading the feed that is in XML/RSS format and parse out the hundreds of events that come with each minutes update.   

I want to store the raw XML/RSS file in S3/ADLS2/HDFS or GCS, that's an easy step.   I will also parse and enhance this data for easier querying and tracking.

I will add to all events a unique ID and a timestamp as the data is streaming by.   I will store my data in Impala/Kudu for fast queries and upserts.   I can then build some graphs, charts and tables with Apache Hue and Cloudera Visual Applications.   I will also publish my data as AVRO enhanced with a schema to Kafka so that I can use it from Spark, Kafka Connect, Kafka Streams and Flink SQL applications.



  1. GenerateFlowFile - optional scheduler
  2. InvokeHTTP - call RSS endpoint
  3. PutHDFS - store raw data to Object or File store on premise or in the cloud via HDFS / S3 / ADLSv2 / GCP / Ozone / ...
  4.  QueryRecord - convert XML to JSON
  5. SplitJSON - break out individual events


  1. UpdateAttribute - set schema name
  2. UpdateRecord - generate an add a unique ID and timestamp
  3. UpdateRecord - clean up the point field
  4. UpdateRecord - remove garbage whitespace



  1. PutKudu - upsert new data to our Impala / Kudu table.
  2. RetryFlowFile - retry if network or other connectivity issue.

Send Messages to Kafka




Our flow has delivered many messages to our transcomevents topic as schema attached Apache Avro formatted messages.






SMM links into the Schema Registry and schema for this topic.


We use a schema for validation and as a contract between consumers and producers of these traffic events.


Since events are streaming into our Kafka topic and have a schema, we can query them with Continuous SQL with Flink SQL.  We can then run some Continuous ETL.



We could also consume this data with Structured Spark Streaming applications, Spring Boot apps, Kafka Streams, Stateless NiFi and Kafka Connect applications.

We also stored our data in Impala / Kudu for permanent storage, ad-hoc queries, analytics, Cloudera Visualizations, reports, applications and more.










It is very easy to have fast data against our agile Cloud Data Lakehouse.

Source Code



Resources


FLaNK: Using Apache Kudu As a Cache For FDA Updates

 FLaNK:  Using Apache Kudu As a Cache For FDA Updates


  1. InvokeHTTP:   We invoke the RSS feed to get our data.
  2. QueryRecord:   Convert RSS to JSON
  3. SplitJson:  Split one file into individual records.   (Should refactor to ForkRecord)
  4. EvaluateJSONPath:  Extract attributes you need.
  5. ProcessGroup for SPL Processing




We call and check the RSS feed frequently and we parse out the records from the RSS(XML) feed and check against our cache.   We use Cloudera's Real-Time Datahub with Apache Impala/Apache Kudu as our cache to see if we already received a record for that.   If it hasn't arrived yet, it will be added to the Kudu cache and processed.

SPL Processing

We use the extracted SPL to grab a detailed record for that SPL using InvokeHTTP.



We have created an Impala/Kudu table to cache this information with Apache Hue.




We use the LookupRecord to read from our cache.



If we don't have that value yet, we send it to the table for an UPSERT.


We send our raw data as XML/RSS to HDFS for archival use and audits.


We can see the results of our flow in Apache Atlas to see full governance of our solution.

So with some simple NiFi flow, we can ingest all the new updates to DailyMed and not reprocess anything that we already have.    

Resources