Did the user really ask for Exactly Once? Fault Tolerance

Exactly Once Requirements

It is very tricky and can cause performance degradation, if your user could just use at least once, then always go with that.    Having data sinks like Kudu where you can do an upsert makes exactly once less needed.

https://docs.cloudera.com/csa/1.2.0/datastream-connectors/topics/csa-kafka.html

Apache Flink, Apache NiFi Stateless and Apache Kafka can participate in that.

For CDF Stream Processing and Analytics with Apache Flink 1.10 Streaming:

Both Kafka sources and sinks can be used with exactly once processing guarantees when checkpointing is enabled.


End-to-End Guaranteed Exactly-Once Record Delivery

The Data Source and Data Sink to need to support exactly-once state semantics and take part in checkpointing.


Data Sources
  • Apache Kafka - must have Exactly-Once selected, transactions enabled and correct driver.

Select:  Semantic.EXACTLY_ONCE


Data Sinks
  • HDFS BucketingSink
  • Apache Kafka



Reference


FLaNK in the Cloud!!!! Huge Cloudera Data Platform Public Cloud Updates - July 2020 - Data Flow Releases

FLaNK in the Cloud!!!!   

Huge Cloudera Data Platform Public Cloud Updates 

July 2020 - Data Flow Releases


With the promotion of Cloud Runtime 7.2.1 to Public Cloud, the CDF team is pleased to announce three key and very important updates that were also promoted to production today and available to customers.  These are:


Let's see what's new in 7.2.1!


Flow (GA) - 2.0.3 is optimized for public cloud!  



Streams  (GA) -  Now with Apache Kafka v2.5!


Streaming Analytics  (TP) -   v1.2.1, powered by Apache Flink 1.10 is Technical Preview in the CDP Public Cloud.


Start with Streaming Analytics Light Duty

Using Kudu with Flink

Using HBase with Flink

Using Kafka with Flink

General CSA Flink Docs

  • Data source reading from Kafka
  • Data sinks writing to Kafka, HBase and Kudu
  • Apache Atlas integration
  • SQL/Table API and SQL Client
  • Table connectors 
    • Kafka
    • Kudu
    • Hive (through catalog)
We can now run the FLaNK Stack in the Public Cloud automagically!

Sizing Your Apache NiFi Cluster For Production Workloads

Sizing Your Apache NiFi Cluster For Production Workloads

Cloudera Flow Management provides an enterprise edition of support Apache NiFi managed by Cloudera Manager.    The official documentation provides a great guide for sizing your cluster.

https://docs.cloudera.com/cfm/2.0.1/nifi-sizing/topics/cfm-nifi-sizing.html

If the use case fits, NiFi Stateless Engine may fit and perform better utilizing no disk.

Check out that heap usage and utilization, you may need to increase.    24-32 Gigabytes of RAM is a nice sweet spot for most instances.



Check out how your nodes, threads and queues are doing.   If queue is not processing fast or thread count is high, you may need more cores, RAM or nodes.



When you are managing your cluster in Cloudera Manager, make sure you increase the default JVM memory for Apache NiFi.  512MB is not going to cut it for anything but single user development.



Do this correctly and process a billion events!!!   https://blog.cloudera.com/benchmarking-nifi-performance-and-scalability/.  - Notice the hardware and performance sections of that article


General tips:

Make sure you use SSD for Provenance and other repositories.  Faster disk, happier user. https://docs.cloudera.com/cfm/2.0.1/nifi-sizing/topics/cfm-sizing-disk-configuration.html

Monitor your flows to see how much resources you need:  https://www.datainmotion.dev/2020/07/report-on-this-apache-nifi-1114-monitor.html.


Use Records, if it's semistructured GrokReader can help.   https://www.nifi.rocks/record-path-cheat-sheet/  If it's CSV, JSON, XML, Parquet, Logs then use Readers and writers.   They are much faster, easier and cleaner.



Minimize use of CPU or Memory intensive processors (or make a not of them during sizing):   https://docs.cloudera.com/cfm/2.0.1/nifi-sizing/topics/cfm-sizing-resource-intensive-processors.html

There are a few decisions to make on repositories, talk to your Cloudera friends.    https://docs.cloudera.com/HDPDocuments/HDF3/HDF-3.5.1/nifi-configuration-best-practices/content/configuration-best-practices.html



Report on This: Apache NiFi 1.11.4 - Monitor All The Things

The easiest way to grab monitoring data is via the NiFi REST API.  Also everything in the NiFi UI is done through REST calls which you can call programmatically.   Please read the NiFi docs they are linked directly from your running NiFi application or on the web.   They are very thorough and have all the information you could want:   https://nifi.apache.org/docs/nifi-docs/.   If you are not running NiFi 1.11.4, I recommend you please upgrade.   This is supported by Cloudera on multiple platforms.

 

NiFi Rest API

https://nifi.apache.org/docs/nifi-docs/rest-api/

 

There's also an awesome Python wrapper for that REST API:  https://pypi.org/project/nipyapi/

 

Also in NiFi flow programming, every time you produce data to Kafka you get metadata back in FlowFile Attributes.   You can push those attributes directly to a kafka topic if you want.

 

So after your PublishKafkaRecord_2_0 1.11.4 so for success read the attributes on # of record and other data then AttributesToJson and push to another topic.   you may want a mergerecord in there to aggregate a few of those together.

 

If you are interested in Kafka metrics/record counts/monitoring then you must use Cloudera Streams Messaging Manager, it provides a full Web UI, Monitoring Tool, Alerts, REST API and everything you need for monitoring every producer, consumer, broker, cluster, topic, message, offset and Kafka component.

 

The best way to get NiFi stats is to use the NiFi Reporting Tasks, I like the SQL Reporting task.

 

SQL Reporting Tasks are very powerful and use standard SELECT * FROM JVM_METRICS style reporting, see my article:

https://www.datainmotion.dev/2020/04/sql-reporting-task-for-cloudera-flow.html

 

Monitoring Articles

 

https://www.datainmotion.dev/2019/04/monitoring-number-of-of-flow-files.html

https://www.datainmotion.dev/2019/03/apache-nifi-operations-and-monitoring.html

 

Other Resources

https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_9.html

https://www.datainmotion.dev/2019/08/using-cloudera-streams-messaging.html

https://dev.to/tspannhw/apache-nifi-and-nifi-registry-administration-3c92

https://dev.to/tspannhw/using-nifi-cli-to-restore-nifi-flows-from-backups-18p9

https://nifi.apache.org/docs/nifi-docs/html/toolkit-guide.html

https://www.datainmotion.dev/p/links.html

https://www.tutorialspoint.com/apache_nifi/apache_nifi_monitoring.htm

https://community.cloudera.com/t5/Community-Articles/Building-a-Custom-Apache-NiFi-Operations-Dashboard-Part-1/ta-p/249060

https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-metrics-reporting-nar/1.11.4/org.apache.nifi.metrics.reporting.task.MetricsReportingTask/

https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-scripting-nar/1.11.4/org.apache.nifi.reporting.script.ScriptedReportingTask/index.html

 

Ingesting All The Weather Data With Apache NiFi


Ingesting All The Weather Data With Apache NiFi



Step By Step NiFi Flow

  1. GenerateFlowFile - build a schedule matching when NOAA updates weather
  2. InvokeHTTP - download all weather ZIP
  3. CompressContent - decompress ZIP
  4. UnpackContent - extract files from ZIP
  5. *RouteOnAttribute - just give us ones that are airports (${filename:startsWith('K')}). optional.
  6. *QueryRecord - XMLReader to JsonRecordSetWriter.   Query:  SELECT * FROM FLOWFILE WHERE NOT location LIKE '%Unknown%'.  This is to remove some locations that are not identified.  optional.
  7. Send it somewhere for storage.   Could put PutKudu, PutORC, PutHDFS, PutHiveStreaming, PutHbaseRecord, PutDatabaseRecord, PublishKafkaRecord2* or others.








URL For All US Data

invokehttp.request.url
https://w1.weather.gov/xml/current_obs/all_xml.zip



Example Record As Converted JSON

[ {
  "credit" : "NOAA's National Weather Service",
  "credit_URL" : "http://weather.gov/",
  "image" : {
    "url" : "http://weather.gov/images/xml_logo.gif",
    "title" : "NOAA's National Weather Service",
    "link" : "http://weather.gov"
  },
  "suggested_pickup" : "15 minutes after the hour",
  "suggested_pickup_period" : 60,
  "location" : "Stanley Municipal Airport, ND",
  "station_id" : "K08D",
  "latitude" : 48.3008,
  "longitude" : -102.4064,
  "observation_time" : "Last Updated on Jul 10 2020, 9:55 am CDT",
  "observation_time_rfc822" : "Fri, 10 Jul 2020 09:55:00 -0500",
  "weather" : "Fair",
  "temperature_string" : "66.0 F (19.0 C)",
  "temp_f" : 66.0,
  "temp_c" : 19.0,
  "relative_humidity" : 83,
  "wind_string" : "South at 6.9 MPH (6 KT)",
  "wind_dir" : "South",
  "wind_degrees" : 180,
  "wind_mph" : 6.9,
  "wind_kt" : 6,
  "pressure_in" : 30.03,
  "dewpoint_string" : "60.8 F (16.0 C)",
  "dewpoint_f" : 60.8,
  "dewpoint_c" : 16.0,
  "visibility_mi" : 10.0,
  "icon_url_base" : "http://forecast.weather.gov/images/wtf/small/",
  "two_day_history_url" : "http://www.weather.gov/data/obhistory/K08D.html",
  "icon_url_name" : "skc.png",
  "ob_url" : "http://www.weather.gov/data/METAR/K08D.1.txt",
  "disclaimer_url" : "http://weather.gov/disclaimer.html",
  "copyright_url" : "http://weather.gov/disclaimer.html",
  "privacy_policy_url" : "http://weather.gov/notice.html"
} ]


Source Code

Resources