Apache Flink REST and Metrics

Apache Flink REST and Metrics




After seeing Caito Scherr's amazing talk, I want to build up some useful dashboards.   My first step is exploring all the available APIs in my CSA/Flink environment.   The easiest way to discover them was I turned on Developer Console in Chrome while using the Flink Dashboard which is a great dashboard in it's own right.   But it is not focused on some key metrics that some customers are asking about in a very easy to read format for end-users.


Some URLs I have been using:

Overview of Flink Cluster

http://FLINKCLUSTER:8078/overview

{"taskmanagers":1,"slots-total":2,"slots-available":1,"jobs-running":1,"jobs-finished":0,"jobs-cancelled":3,"jobs-failed":0,"flink-version":"1.10.0-csa1.2.0.0","flink-commit":"664a5f5"}

Overview of All Flink Jobs

http://FLINKCLUSTER:8078/jobs/overview

{"jobs":[{"jid":"7c01884b74ff981a896307c4a06f2b15","name":"default: select * from itemprice","state":"CANCELED","start-time":1599576455857,"end-time":1599576486876,"duration":31019,"last-modification":1599576486876,"cluster":null,"tasks":{"total":1,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":0,"reconciling":0}},{"jid":"faeb308856db337ce628af5fea24b895","name":"default: insert into krogerprices\nselect upc,updatedate,itemdescription,brandname,CAST(price as float) as price, UUID() as uuid\nfrom itemprice\nwhere originstore = 'kroger'","state":"CANCELED","start-time":1599674296089,"end-time":1599766705456,"duration":92409367,"last-modification":1599766705456,"cluster":null,"tasks":{"total":1,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":0,"reconciling":0}},{"jid":"5d6ae4f72ab9fca3cea28ba6d4905ca7","name":"default: select * from krogerprices","state":"CANCELED","start-time":1599576795487,"end-time":1599579517485,"duration":2721998,"last-modification":1599579517485,"cluster":null,"tasks":{"total":1,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":0,"reconciling":0}},{"jid":"ec80a32b6ab59d96f649f5b3e493ec67","name":"Streaming WordCount","state":"FINISHED","start-time":1599571302659,"end-time":1599571318768,"duration":16109,"last-modification":1599571318768,"cluster":null,"tasks":{"total":5,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":5,"canceling":0,"canceled":0,"failed":0,"reconciling":0}},{"jid":"ad949e727a8c0267c9f2550c6a9b6000","name":"default: select * from itemprice","state":"CANCELED","start-time":1599676984684,"end-time":1599677004620,"duration":19936,"last-modification":1599677004620,"cluster":null,"tasks":{"total":1,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":0,"reconciling":0}},{"jid":"8c1a6903b81e7b926b7105720e24aee8","name":"default: insert into krogerprices\nselect upc,updatedate,itemdescription,brandname,CAST(price as float) as price, UUID() as uuid\nfrom itemprice\nwhere originstore = 'kroger'","state":"CANCELED","start-time":1599576540243,"end-time":1599582887998,"duration":6347755,"last-modification":1599582887998,"cluster":null,"tasks":{"total":1,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":0,"reconciling":0}},{"jid":"4cb8a5983b0bd3a14fe90618e17e2488","name":"default: select * from krogerprices","state":"CANCELED","start-time":1599674323425,"end-time":1599676592438,"duration":2269013,"last-modification":1599676592438,"cluster":null,"tasks":{"total":1,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":0,"reconciling":0}},{"jid":"01988557ccd71cbab899ded9babab606","name":"default: insert into krogerprices\nselect upc,updatedate,itemdescription,brandname,CAST(price as float) as price, UUID() as uuid\nfrom itemprice\nwhere originstore = 'kroger'","state":"RUNNING","start-time":1599673893701,"end-time":-1,"duration":103791030,"last-modification":1599673903811,"cluster":{"url":"http://ec2-3-86-165-80.compute-1.amazonaws.com:8088/proxy/application_1599570933443_0003/","originalUrl":"http://ec2-3-86-165-80.compute-1.amazonaws.com:35981","id":"application_1599570933443_0003","hostAndPort":"ec2-3-86-165-80.compute-1.amazonaws.com:35981"},"tasks":{"total":1,"created":0,"scheduled":0,"deploying":0,"running":1,"finished":0,"canceling":0,"canceled":0,"failed":0,"reconciling":0}},{"jid":"7c7932678f193f51c32cd3a2ebff6d59","name":"default: select * from itemprice","state":"CANCELED","start-time":1599573232967,"end-time":1599576425840,"duration":3192873,"last-modification":1599576425840,"cluster":null,"tasks":{"total":1,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":0,"reconciling":0}}]}

 

Flink Job Details

http://FLINKCLUSTER:8078/jobs/7c01884b74ff981a896307c4a06f2b15

{"jid":"7c01884b74ff981a896307c4a06f2b15","name":"default: select * from itemprice","isStoppable":false,"state":"CANCELED","start-time":1599576455857,"end-time":1599576486876,"duration":31019,"now":1599576486888,"timestamps":{"FAILING":0,"CANCELED":1599576486876,"RECONCILING":0,"SUSPENDED":0,"CREATED":1599576455857,"CANCELLING":1599576486855,"FAILED":0,"RESTARTING":0,"FINISHED":0,"RUNNING":1599576455905},"vertices":[{"id":"cbc357ccb763df2852fee8c4fc7d55f2","name":"Source: KafkaTableSource(upc, originstore, updatedate, longdescription, itemdescription, brandname, displayimage, price, msrp, tpr) -> SourceConversion(table=[registry.default_database.itemprice, source: [KafkaTableSource(upc, originstore, updatedate, longdescription, itemdescription, brandname, displayimage, price, msrp, tpr)]], fields=[upc, originstore, updatedate, longdescription, itemdescription, brandname, displayimage, price, msrp, tpr]) -> SinkConversionToTuple2 -> Sink: SQL Client Stream Collect Sink","parallelism":1,"status":"CANCELED","start-time":1599576455941,"end-time":1599576486876,"duration":30935,"tasks":{"RUNNING":0,"FAILED":0,"RECONCILING":0,"FINISHED":0,"CANCELING":0,"CANCELED":1,"SCHEDULED":0,"CREATED":0,"DEPLOYING":0},"metrics":{"read-bytes":0,"read-bytes-complete":true,"write-bytes":0,"write-bytes-complete":true,"read-records":0,"read-records-complete":true,"write-records":0,"write-records-complete":true}}],"status-counts":{"RUNNING":0,"FAILED":0,"RECONCILING":0,"FINISHED":0,"CANCELING":0,"CANCELED":1,"SCHEDULED":0,"CREATED":0,"DEPLOYING":0},"plan":{"jid":"7c01884b74ff981a896307c4a06f2b15","name":"default: select * from itemprice","nodes":[{"id":"cbc357ccb763df2852fee8c4fc7d55f2","parallelism":1,"operator":"","operator_strategy":"","description":"Source: KafkaTableSource(upc, originstore, updatedate, longdescription, itemdescription, brandname, displayimage, price, msrp, tpr) -> SourceConversion(table=[registry.default_database.itemprice, source: [KafkaTableSource(upc, originstore, updatedate, longdescription, itemdescription, brandname, displayimage, price, msrp, tpr)]], fields=[upc, originstore, updatedate, longdescription, itemdescription, brandname, displayimage, price, msrp, tpr]) -> SinkConversionToTuple2 -> Sink: SQL Client Stream Collect Sink","optimizer_properties":{}}]}}

Flink Cluster Configuration
http://FLINKCLUSTER:8078/config
{"refresh-interval":10000,"timezone-name":"Coordinated Universal Time","timezone-offset":0,"flink-version":"1.10.0-csa1.2.0.0","flink-revision":"664a5f5 @ 29.04.2020 @ 14:13:26 UTC","features":{"web-submit":false,"global-dashboard":true}}

Jobs - Specific Job - Checkpoints
http://YARNCLUSTER:8088/proxy/application_1599570933443_0003/jobs/01988557ccd71cbab899ded9babab606/checkpoints
{"counts":{"restored":0,"total":0,"in_progress":0,"completed":0,"failed":0},"summary":{"state_size":{"min":0,"max":0,"avg":0},"end_to_end_duration":{"min":0,"max":0,"avg":0},"alignment_buffered":{"min":0,"max":0,"avg":0}},"latest":{"completed":null,"savepoint":null,"failed":null,"restored":null},"history":[]}

Job Manager Configuration
http://YARNCLUSTER:8088/proxy/application_1599570933443_0003/jobmanager/config
[{"key":"yarn.flink-dist-jar","value":"file:/opt/cloudera/parcels/FLINK-1.10.0-csa1.2.0.0-cdh7.1.1.0-326-2901802/lib/flink/lib/flink-dist_2.11-1.10.0-csa1.2.0.0.jar"},{"key":"high-availability.cluster-id","value":"application_1599570933443_0003"},{"key":"jobmanager.rpc.address","value":"ec2-3-86-165-80.compute-1.amazonaws.com"},{"key":"high-availability.storageDir","value":"hdfs:///user/flink/ha"},{"key":"state.backend.rocksdb.timer-service.factory","value":"ROCKSDB"},{"key":"io.tmp.dirs","value":"/yarn/nm/usercache/root/appcache/application_1599570933443_0003"},{"key":"historyserver.cli.fallback","value":"true"},{"key":"parallelism.default","value":"1"},{"key":"execution.buffer-timeout","value":"100"},{"key":"jobmanager.heap.size","value":"1073741824"},{"key":"execution.checkpointing.mode","value":"EXACTLY_ONCE"},{"key":"taskmanager.memory.process.size","value":"2 gb"},{"key":"web.port","value":"0"},{"key":"state.backend.local-recovery","value":"true"},{"key":"state.backend.rocksdb.memory.managed","value":"true"},{"key":"yarn.tags","value":"flink"},{"key":"state.backend.incremental","value":"true"},{"key":"taskmanager.memory.network.fraction","value":"0.1"},{"key":"yarn.container-start-command-template","value":"%java% %jvmmem% %jvmopts% -DyarnContainerId=$CONTAINER_ID %logging% %class% %args% %redirects%"},{"key":"web.tmpdir","value":"/tmp/flink-web-37be6c08-f457-468a-9c4b-5fde92c042b1"},{"key":"jobmanager.rpc.port","value":"33566"},{"key":"execution.checkpointing.timeout","value":"60000"},{"key":"high-availability.zookeeper.quorum","value":"ec2-3-86-165-80.compute-1.amazonaws.com:2181"},{"key":"taskmanager.memory.managed.fraction","value":"0.4"},{"key":"rest.address","value":"ec2-3-86-165-80.compute-1.amazonaws.com"},{"key":"state.backend","value":"FILESYSTEM"},{"key":"logging.configuration.file","value":"/etc/flink/conf/log4j.properties"},{"key":"execution.checkpointing.max-concurrent-checkpoints","value":"1"},{"key":"high-availability.zookeeper.client.acl","value":"open"},{"key":"historyserver.web.address","value":"ec2-3-86-165-80.compute-1.amazonaws.com"},{"key":"state.checkpoints.num-retained","value":"3"},{"key":"historyserver.web.port","value":"8078"},{"key":"pipeline.auto-watermark-interval","value":"200"},{"key":"state.savepoints.dir","value":"hdfs:///user/flink/savepoints"},{"key":"pipeline.generic-types","value":"true"},{"key":"yarn.maximum-failed-containers","value":"100"},{"key":"yarn.application-attempts","value":"5"},{"key":"taskmanager.numberOfTaskSlots","value":"2"},{"key":"state.backend.rocksdb.memory.write-buffer-ratio","value":"0.5"},{"key":"jobmanager.archive.fs.dir","value":"hdfs:///user/flink/applicationHistory"},{"key":"execution.target","value":"yarn-per-job"},{"key":"pipeline.object-reuse","value":"false"},{"key":"internal.io.tmpdirs.use-local-default","value":"true"},{"key":"state.backend.rocksdb.memory.high-prio-pool-ratio","value":"0.1"},{"key":"taskmanager.memory.network.max","value":"2147483648"},{"key":"execution.attached","value":"false"},{"key":"internal.cluster.execution-mode","value":"NORMAL"},{"key":"execution.checkpointing.externalized-checkpoint-retention","value":"RETAIN_ON_CANCELLATION"},{"key":"high-availability","value":"ZOOKEEPER"},{"key":"execution.checkpointing.min-pause","value":"0"},{"key":"execution.checkpointing.snapshot-compression","value":"false"},{"key":"state.checkpoints.dir","value":"hdfs:///user/flink/checkpoints"}]

Job Manager - Log
http://YARNCLUSTER:8088/proxy/application_1599570933443_0003/jobmanager/log
2020-09-11 13:37:42,586 INFO  org.apache.flink.yarn.YarnResourceManager                     - Disconnect job manager 9c2e3f25dae1d548dc730941d6484cbb@akka.tcp://flink@ec2-3-86-165-80.compute-1.amazonaws.com:33566/user/jobmanager_7 for job 67a2de8fb291333bbd90b334f8f83def from the resource manager.
2020-09-11 13:37:42,586 INFO  org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  - Stopping ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/67a2de8fb291333bbd90b334f8f83def/job_manager_lock'}.
2020-09-11 13:37:42,591 INFO  org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore    - Removed job graph 67a2de8fb291333bbd90b334f8f83def from ZooKeeper.

Job Manager - Standard Out
http://YARNCLUSTER:8088/proxy/application_1599570933443_0003/jobmanager/stdout

Task Managers
http://YARNCLUSTER:8088/proxy/application_1599570933443_0003/taskmanagers
{"taskmanagers":[{"id":"container_1599570933443_0003_04_000002","path":"akka.tcp://flink@ec2-3-86-165-80.compute-1.amazonaws.com:38435/user/taskmanager_0","dataPort":41920,"timeSinceLastHeartbeat":1599832789681,"slotsNumber":2,"freeSlots":1,"hardware":{"cpuCores":16,"physicalMemory":133838598144,"freeMemory":669515776,"managedMemory":665719939}}]}

Task Managers - Container
http://YARNCLUSTER:8088/proxy/application_1599570933443_0003/taskmanagers/container_1599570933443_0003_04_000002

{"id":"container_1599570933443_0003_04_000002","path":"akka.tcp://flink@ec2-3-86-165-80.compute-1.amazonaws.com:38435/user/taskmanager_0","dataPort":41920,"timeSinceLastHeartbeat":1599832809720,"slotsNumber":2,"freeSlots":1,"hardware":{"cpuCores":16,"physicalMemory":133838598144,"freeMemory":669515776,"managedMemory":665719939},"metrics":{"heapUsed":200512944,"heapCommitted":664272896,"heapMax":664272896,"nonHeapUsed":145389616,"nonHeapCommitted":149569536,"nonHeapMax":780140544,"directCount":5111,"directUsed":168414439,"directMax":168414435,"mappedCount":0,"mappedUsed":0,"mappedMax":0,"memorySegmentsAvailable":5079,"memorySegmentsTotal":5079,"garbageCollectors":[{"name":"PS_Scavenge","count":91,"time":453},{"name":"PS_MarkSweep","count":4,"time":358}]}}


Task Managers - Container - Log

http://YARNCLUSTER:8088/proxy/application_1599570933443_0003/taskmanagers/container_1599570933443_0003_04_000002/log


2020-09-09 17:58:17,456 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer  - Starting FlinkKafkaInternalProducer (1/1) to produce into default topic krogerprices
2020-09-09 17:58:17,465 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 0 has no restore state.
2020-09-09 17:58:17,475 INFO  org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerConfig  - ConsumerConfig values: 

Task Managers - Container - Standard Out
http://YARNCLUSTER:8088/proxy/application_1599570933443_0003/taskmanagers/container_1599570933443_0003_04_000002/stdout



Talks

For the definitive way to read and use these, see Caito's awesome talk here:

https://www.youtube.com/watch?v=MQQ7qaKKKc8


Command Line (https://docs.cloudera.com/csa/1.2.0/job-lifecycle/topics/csa-supported-cli.html)


flink list

20/09/11 13:45:34 INFO cli.CliFrontend: Successfully retrieved list of jobs

------------------ Running/Restarting Jobs -------------------

09.09.2020 17:51:33 : 01988557ccd71cbab899ded9babab606 : default: insert into krogerprices

select upc,updatedate,itemdescription,brandname,CAST(price as float) as price, UUID() as uuid

from itemprice

where originstore = 'kroger' (RUNNING)

--------------------------------------------------------------









Apache NiFi 1.12 Released! 18-August-2020

Apache NiFi 1.12 Released! 18-August-2020

Release Notes

https://cwiki.apache.org/confluence/display/NIFI/Release+Notes#ReleaseNotes-Version1.12.0

Issues

https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12316020&version=12346778

Major Feature List

https://twitter.com/pvillard31/status/1296469452180119553


Release Date: August 18, 2020. 

Major Features:

  • New processor to write scripted record transforms live in the flow (ScriptedTransformRecord)
  • Expose a REST Endpoint for easy metric scraping by Prometheus
  • Ability to specify group level flow file concurrency - for instance run a single flow file end to end for traditional job handling
  • Improved several capabilities related to Azure service interaction including ADLS Gen2 
  • Improved AMQP and MQTT support as well as JMS improvements
  • Support for latest Kafka 2.6 clients
  • Search UI Improvements

I will be posting a few demos and test drives soon.


ScriptedTransformRecord


https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-scripting-nar/1.12.0/org.apache.nifi.processors.script.ScriptedTransformRecord/index.html


Deleting Schemas From Cloudera Schema Registry

 Deleting Schemas From Cloudera Schema Registry


It is very easy to delete schemas from Cloudera Schema Registry if you need to do so.   I recommend downloading them and having a backup first.

Let's look at our schema


Well let's get rid of that junk.

Here is the documentation For CDF Datahub in CDP Public Cloud



Example

curl -X DELETE "http://MYSERVERHASACOOLNAME.DEV:7788/api/v1/schemaregistry/schemas/junk" -H "accept: application/json"

Where junk is the name of my schema.





You could call this REST API from NiFi, a DevOps tool or just a simple CURL like listed above.

Knox and other security may apply.


Did the user really ask for Exactly Once? Fault Tolerance

Exactly Once Requirements

It is very tricky and can cause performance degradation, if your user could just use at least once, then always go with that.    Having data sinks like Kudu where you can do an upsert makes exactly once less needed.

https://docs.cloudera.com/csa/1.2.0/datastream-connectors/topics/csa-kafka.html

Apache Flink, Apache NiFi Stateless and Apache Kafka can participate in that.

For CDF Stream Processing and Analytics with Apache Flink 1.10 Streaming:

Both Kafka sources and sinks can be used with exactly once processing guarantees when checkpointing is enabled.


End-to-End Guaranteed Exactly-Once Record Delivery

The Data Source and Data Sink to need to support exactly-once state semantics and take part in checkpointing.


Data Sources
  • Apache Kafka - must have Exactly-Once selected, transactions enabled and correct driver.

Select:  Semantic.EXACTLY_ONCE


Data Sinks
  • HDFS BucketingSink
  • Apache Kafka



Reference


FLaNK in the Cloud!!!! Huge Cloudera Data Platform Public Cloud Updates - July 2020 - Data Flow Releases

FLaNK in the Cloud!!!!   

Huge Cloudera Data Platform Public Cloud Updates 

July 2020 - Data Flow Releases


With the promotion of Cloud Runtime 7.2.1 to Public Cloud, the CDF team is pleased to announce three key and very important updates that were also promoted to production today and available to customers.  These are:


Let's see what's new in 7.2.1!


Flow (GA) - 2.0.3 is optimized for public cloud!  



Streams  (GA) -  Now with Apache Kafka v2.5!


Streaming Analytics  (TP) -   v1.2.1, powered by Apache Flink 1.10 is Technical Preview in the CDP Public Cloud.


Start with Streaming Analytics Light Duty

Using Kudu with Flink

Using HBase with Flink

Using Kafka with Flink

General CSA Flink Docs

  • Data source reading from Kafka
  • Data sinks writing to Kafka, HBase and Kudu
  • Apache Atlas integration
  • SQL/Table API and SQL Client
  • Table connectors 
    • Kafka
    • Kudu
    • Hive (through catalog)
We can now run the FLaNK Stack in the Public Cloud automagically!

Sizing Your Apache NiFi Cluster For Production Workloads

Sizing Your Apache NiFi Cluster For Production Workloads

Cloudera Flow Management provides an enterprise edition of support Apache NiFi managed by Cloudera Manager.    The official documentation provides a great guide for sizing your cluster.

https://docs.cloudera.com/cfm/2.0.1/nifi-sizing/topics/cfm-nifi-sizing.html

If the use case fits, NiFi Stateless Engine may fit and perform better utilizing no disk.

Check out that heap usage and utilization, you may need to increase.    24-32 Gigabytes of RAM is a nice sweet spot for most instances.



Check out how your nodes, threads and queues are doing.   If queue is not processing fast or thread count is high, you may need more cores, RAM or nodes.



When you are managing your cluster in Cloudera Manager, make sure you increase the default JVM memory for Apache NiFi.  512MB is not going to cut it for anything but single user development.



Do this correctly and process a billion events!!!   https://blog.cloudera.com/benchmarking-nifi-performance-and-scalability/.  - Notice the hardware and performance sections of that article


General tips:

Make sure you use SSD for Provenance and other repositories.  Faster disk, happier user. https://docs.cloudera.com/cfm/2.0.1/nifi-sizing/topics/cfm-sizing-disk-configuration.html

Monitor your flows to see how much resources you need:  https://www.datainmotion.dev/2020/07/report-on-this-apache-nifi-1114-monitor.html.


Use Records, if it's semistructured GrokReader can help.   https://www.nifi.rocks/record-path-cheat-sheet/  If it's CSV, JSON, XML, Parquet, Logs then use Readers and writers.   They are much faster, easier and cleaner.



Minimize use of CPU or Memory intensive processors (or make a not of them during sizing):   https://docs.cloudera.com/cfm/2.0.1/nifi-sizing/topics/cfm-sizing-resource-intensive-processors.html

There are a few decisions to make on repositories, talk to your Cloudera friends.    https://docs.cloudera.com/HDPDocuments/HDF3/HDF-3.5.1/nifi-configuration-best-practices/content/configuration-best-practices.html



Report on This: Apache NiFi 1.11.4 - Monitor All The Things

The easiest way to grab monitoring data is via the NiFi REST API.  Also everything in the NiFi UI is done through REST calls which you can call programmatically.   Please read the NiFi docs they are linked directly from your running NiFi application or on the web.   They are very thorough and have all the information you could want:   https://nifi.apache.org/docs/nifi-docs/.   If you are not running NiFi 1.11.4, I recommend you please upgrade.   This is supported by Cloudera on multiple platforms.

 

NiFi Rest API

https://nifi.apache.org/docs/nifi-docs/rest-api/

 

There's also an awesome Python wrapper for that REST API:  https://pypi.org/project/nipyapi/

 

Also in NiFi flow programming, every time you produce data to Kafka you get metadata back in FlowFile Attributes.   You can push those attributes directly to a kafka topic if you want.

 

So after your PublishKafkaRecord_2_0 1.11.4 so for success read the attributes on # of record and other data then AttributesToJson and push to another topic.   you may want a mergerecord in there to aggregate a few of those together.

 

If you are interested in Kafka metrics/record counts/monitoring then you must use Cloudera Streams Messaging Manager, it provides a full Web UI, Monitoring Tool, Alerts, REST API and everything you need for monitoring every producer, consumer, broker, cluster, topic, message, offset and Kafka component.

 

The best way to get NiFi stats is to use the NiFi Reporting Tasks, I like the SQL Reporting task.

 

SQL Reporting Tasks are very powerful and use standard SELECT * FROM JVM_METRICS style reporting, see my article:

https://www.datainmotion.dev/2020/04/sql-reporting-task-for-cloudera-flow.html

 

Monitoring Articles

 

https://www.datainmotion.dev/2019/04/monitoring-number-of-of-flow-files.html

https://www.datainmotion.dev/2019/03/apache-nifi-operations-and-monitoring.html

 

Other Resources

https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_9.html

https://www.datainmotion.dev/2019/08/using-cloudera-streams-messaging.html

https://dev.to/tspannhw/apache-nifi-and-nifi-registry-administration-3c92

https://dev.to/tspannhw/using-nifi-cli-to-restore-nifi-flows-from-backups-18p9

https://nifi.apache.org/docs/nifi-docs/html/toolkit-guide.html

https://www.datainmotion.dev/p/links.html

https://www.tutorialspoint.com/apache_nifi/apache_nifi_monitoring.htm

https://community.cloudera.com/t5/Community-Articles/Building-a-Custom-Apache-NiFi-Operations-Dashboard-Part-1/ta-p/249060

https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-metrics-reporting-nar/1.11.4/org.apache.nifi.metrics.reporting.task.MetricsReportingTask/

https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-scripting-nar/1.11.4/org.apache.nifi.reporting.script.ScriptedReportingTask/index.html

 

Ingesting All The Weather Data With Apache NiFi


Ingesting All The Weather Data With Apache NiFi



Step By Step NiFi Flow

  1. GenerateFlowFile - build a schedule matching when NOAA updates weather
  2. InvokeHTTP - download all weather ZIP
  3. CompressContent - decompress ZIP
  4. UnpackContent - extract files from ZIP
  5. *RouteOnAttribute - just give us ones that are airports (${filename:startsWith('K')}). optional.
  6. *QueryRecord - XMLReader to JsonRecordSetWriter.   Query:  SELECT * FROM FLOWFILE WHERE NOT location LIKE '%Unknown%'.  This is to remove some locations that are not identified.  optional.
  7. Send it somewhere for storage.   Could put PutKudu, PutORC, PutHDFS, PutHiveStreaming, PutHbaseRecord, PutDatabaseRecord, PublishKafkaRecord2* or others.








URL For All US Data

invokehttp.request.url
https://w1.weather.gov/xml/current_obs/all_xml.zip



Example Record As Converted JSON

[ {
  "credit" : "NOAA's National Weather Service",
  "credit_URL" : "http://weather.gov/",
  "image" : {
    "url" : "http://weather.gov/images/xml_logo.gif",
    "title" : "NOAA's National Weather Service",
    "link" : "http://weather.gov"
  },
  "suggested_pickup" : "15 minutes after the hour",
  "suggested_pickup_period" : 60,
  "location" : "Stanley Municipal Airport, ND",
  "station_id" : "K08D",
  "latitude" : 48.3008,
  "longitude" : -102.4064,
  "observation_time" : "Last Updated on Jul 10 2020, 9:55 am CDT",
  "observation_time_rfc822" : "Fri, 10 Jul 2020 09:55:00 -0500",
  "weather" : "Fair",
  "temperature_string" : "66.0 F (19.0 C)",
  "temp_f" : 66.0,
  "temp_c" : 19.0,
  "relative_humidity" : 83,
  "wind_string" : "South at 6.9 MPH (6 KT)",
  "wind_dir" : "South",
  "wind_degrees" : 180,
  "wind_mph" : 6.9,
  "wind_kt" : 6,
  "pressure_in" : 30.03,
  "dewpoint_string" : "60.8 F (16.0 C)",
  "dewpoint_f" : 60.8,
  "dewpoint_c" : 16.0,
  "visibility_mi" : 10.0,
  "icon_url_base" : "http://forecast.weather.gov/images/wtf/small/",
  "two_day_history_url" : "http://www.weather.gov/data/obhistory/K08D.html",
  "icon_url_name" : "skc.png",
  "ob_url" : "http://www.weather.gov/data/METAR/K08D.1.txt",
  "disclaimer_url" : "http://weather.gov/disclaimer.html",
  "copyright_url" : "http://weather.gov/disclaimer.html",
  "privacy_policy_url" : "http://weather.gov/notice.html"
} ]


Source Code

Resources

Using Cloudera Data Platform with Flow Management and Streams on Azure


Using Cloudera Data Platform with Flow Management and Streams on Azure

Today I am going to be walking you through using Cloudera Data Platform (CDP) with Flow Management and Streams on Azure Cloud.  To see a streaming demo video, please join my webinar (or see it on demand) at Streaming Data Pipelines with CDF in Azure.  I'll share some additional how-to videos on using Apache NiFi and Apache Kafka in Azure very soon.   



Apache NiFi on Azure CDP Data Hub
Sensors to ADLS/HDFS and Kafka




In the above process group we are using QueryRecord to segment JSON records and only pick ones where the Temperature in Fahrenheit is over 80 degrees then we pick out a few attributes to display from the record and send them to a slack channel.

To become a Kafka Producer you set a Record Reader for the type coming in, this is JSON in my case and then set a Record Writer for the type to send to the sensors topic.    In this case we kept it as JSON, but we could convert to AVRO.   I usually do that if I am going to be reading it with Cloudera Kafka Connect.



Our security is automagic and requires little for you to do in NiFi.   I put in my username and password from CDP.   The SSL context is setup for my when I create my datahub.


When I am writing to our Real-Time Data Mart (Apache Kudu), I enter my Kudu servers that I copied from the Kudu Data Mart Hardware page, put in my table name and your login info.   I recommend UPSERT and use your Record Reader JSON.


For real use cases, you will need to spin up:

Public Cloud Data Hubs:
  • Streams Messaging Heavy Duty for AWS
  • Streams Messaging Heavy Duty for Azure
  • Flow Management Heavy Duty for AWS
  • Flow Management Heavy Duty for Azure
Software:
  • Apache Kafka 2.4.1
  • Cloudera Schema Registry 0.8.1
  • Cloudera Streams Messaging Manager 2.1.0
  • Apache NiFi 1.11.4
  • Apache NiFi Registry 0.5.0
Demo Source Code:


Let's configure out Data Hubs in CDP in an Azure Environment.   It is a few clicks and some naming and then it builds.












Under the Azure Portal


In Azure, we can examine the files we uploaded to the Azure object store.





Under the Data Lake SDX


NiFi and Kafka are autoconfigured to work with Apache Atlas under our environments Data Lake SDX.  We can browse through the lineage for all the Kafka topics we use.






We can also see the flow for NiFi, HDFS and Kudu.

SMM

We can examine all of our Kafka infrastructure from Kafka Brokers, Topics, Consumers, Producers, Latency and Messages.  We can also create and update topics.




Cloudera Manager

We still have access to all of our traditional items like Cloudera Manager to manage configuration of servers.



Under Real-Time Data Mart

We can view tables, create tables and query our table.   Apache Hue is a great tool for accessing data in my Real-Time Data Mart in a datahub.



We can also look at table details in the Impala UI.


References
©2020 Timothy Spann