DevOps: Working with Parameter Contexts in Apache NiFi 1.11.4+

 DevOps:  Working with Parameter Contexts in Apache NiFi 1.11.4+

nifi list-param-contexts -u http://localhost:8080 -ot simple


#   Id                                     Name             Description   

-   ------------------------------------   --------------   -----------   

1   3a801ff4-1f73-1836-b59c-b9fbc79ab030   backupregistry                 

2   7184b9f4-0171-1000-4627-967e118f3037   health                         

3   3a801faf-1f87-1836-54ba-3d913fa223ad   retail                         

4   3a801fde-1f73-1836-957b-a9f4d2c9b73d   sensors                        


#> nifi export-param-context -u http://localhost:8080 -verbose --paramContextId 3a801faf-1f87-1836-54ba-3d913fa223ad


{

  "name" : "retail",

  "description" : "",

  "parameters" : [ {

    "parameter" : {

      "name" : "allquery",

      "description" : "",

      "sensitive" : false,

      "value" : "SELECT * FROM FLOWFILE"

    }

  }, {

    "parameter" : {

      "name" : "allrecordssql",

      "description" : "",

      "sensitive" : false,

      "value" : "SELECT * FROM FLOWFILE"

    }

  }, {

    "parameter" : {

      "name" : "energytopic",

      "description" : "",

      "sensitive" : false,

      "value" : "energy"

    }

  }, {

    "parameter" : {

      "name" : "importantsql",

      "description" : "",

      "sensitive" : false,

      "value" : "SELECT * FROM FLOWFILE\nWHERE kernel_logs like '%SIGKILL%'"

    }

  }, {

    "parameter" : {

      "name" : "itempricetable",

      "description" : "",

      "sensitive" : false,

      "value" : "impala::default.itemprice"

    }

  }, {

    "parameter" : {

      "name" : "itsgettingHotInHere",

      "description" : "",

      "sensitive" : false,

      "value" : "SELECT * FROM\nFLOWFILE\nWHERE CAST (temp_f as DOUBLE) > 80\nAND UPPER(location) LIKE '%NJ%'"

    }

  },





You can now move that to another server and import. nifi import-param-context.


 bin/cli.sh nifi list-param-contexts -u http://localhost:8080 -ot json

Use simple for a simple table list.

 bin/cli.sh nifi export-param-context -u http://localhost:8080  --paramContextId 3a801ff4-1f73-1836-b59c-b9fbc79ab030 -ot json -o backupregistry.json

Example Shell Script

/Users/tspann/Downloads/nifi-toolkit-1.12.0/bin/cli.sh nifi export-param-context -u http://localhost:8080  --paramContextId a13e3764-134c-16f0-7c35-312b7ee4b182 -ot json -o financial.json
/Users/tspann/Downloads/nifi-toolkit-1.12.0/bin/cli.sh nifi export-param-context -u http://localhost:8080  --paramContextId 7184b9f4-0171-1000-4627-967e118f3037 -ot json -o health.json
/Users/tspann/Downloads/nifi-toolkit-1.12.0/bin/cli.sh nifi export-param-context -u http://localhost:8080  --paramContextId 3a801faf-1f87-1836-54ba-3d913fa223ad -ot json -o retail.json
/Users/tspann/Downloads/nifi-toolkit-1.12.0/bin/cli.sh nifi export-param-context -u http://localhost:8080  --paramContextId 3a801fde-1f73-1836-957b-a9f4d2c9b73d -ot json -o  sensors.json
/Users/tspann/Downloads/nifi-toolkit-1.12.0/bin/cli.sh nifi export-param-context -u http://localhost:8080  --paramContextId 3a801ff4-1f73-1836-b59c-b9fbc79ab030 -ot json -o backupregistry.json

Reference

http://nifi.apache.org/docs/nifi-docs/html/toolkit-guide.html#nifi_CLI


https://docs.cloudera.com/HDPDocuments/HDF3/HDF-3.5.1/versioning-a-dataflow/content/parameters-in-versioned-flows.html


Using Google Forms As a A Data Source for NiFi Flows


Setup a Google Developers Account

'


Use or Create an API Key For Sheets at Developer Console


For Your Google Sheet (If not OAuth, You Need to Make it Visible via URL)

Or you will face PERMISSION_DENIED


Enable Google Sheets API

https://console.developers.google.com/apis/api/sheets.googleapis.com/overview?project=YOURPROJECTID


View Metrics

https://console.developers.google.com/apis/api/sheets.googleapis.com/overview?project=YOURPROJECTISCOOL

Access The Data Via NIFI

https://sheets.googleapis.com/v4/spreadsheets/YOURGOOGLESHEET?includeGridData=true&key=YOURKEY

References:

https://community.cloudera.com/t5/Community-Articles/Streaming-Ingest-of-Google-Sheets-with-HDF-2-0/ta-p/247764


Using DJL.AI For Deep Learning BERT Q&A in NiFi DataFlows

 

Using DJL.AI For Deep Learning BERT Q&A in NiFi DataFlows


Introduction:

I will be talking about this processor at Apache Con @ Home 2020 in my "Apache Deep Learning 301" talk with Dr. Ian Brooks.

Sometimes you want your Deep Learning Easy and in Java, so let's do that with DJL in a custom Apache NiFi processor running in CDP Data Hubs.   This one does BERT QA.


To use the processor feed in a paragraph to analyze via the paragraph parameter in the NiFi processor.   Also feed in a question, like Why? or something very specific like asking the date or an event.


The pretrained model is BERT QA model using PyTorch. the NiFi Processor Source:

https://github.com/tspannhw/nifi-djlqa-processor


Grab the Recent Release NAR to install to your NiFi lib directories:

https://github.com/tspannhw/nifi-djlqa-processor/releases/tag/1.2


Example Run





Demo Data Source

https://newsapi.org/v2/everything?q=cloudera&apiKey=REGISTERFORAKEY



Reference:



Deep Learning Note:   

BERT QA Model


Tip


Make sure you have 1-2 GB of RAM extra for your NiFi instance for running each DJL processor.   If you have a lot of text, run more nodes and/or RAM.   Make sure you have at least 8 cores per Deep Learning process.   I prefer JDK 11 for this.


See Also:   https://www.datainmotion.dev/2019/12/easy-deep-learning-in-apache-nifi-with.html



Using DJL.AI For Deep Learning Based Sentiment Analysis in NiFi DataFlow

Using DJL.AI For Deep Learning Based Sentiment Analysis in NiFi DataFlow 


Introduction:

I will be talking about this processor at Apache Con @ Home 2020 in my "Apache Deep Learning 301" talk with Dr. Ian Brooks.

Sometimes you want your Deep Learning Easy and in Java, so let's do that with DJL in a custom Apache NiFi processor running in CDP Data Hubs.

Grab the Source:

https://github.com/tspannhw/nifi-djlsentimentanalysis-processor

Grab the Recent Release NAR to install to your NiFi lib directories:

https://github.com/tspannhw/nifi-djlsentimentanalysis-processor/releases/tag/1.2

Example Run

probnegative
0.99
No value set
probnegativeperc
99.44
No value set
probpositive
0.01
No value set
probpositiveperc
0.56
No value set
rawclassification
[class: "Negative", probability: 0.99440, class: "Positive", probability: 0.00559]

Demo Data Source

https://newsapi.org/v2/everything?q=cloudera&apiKey=REGISTERFORAKEY



Reference:


Deep Learning Note:   

The pretrained model is DistilBERT model trained by HuggingFace using PyTorch.


Tip


Make sure you have 1-2 GB of RAM extra for your NiFi instance for running each DJL processor.   If you have a lot of text, run more nodes and/or RAM.   Make sure you have at least 8 cores per Deep Learning process.   I prefer JDK 11 for this.


See Also:   https://www.datainmotion.dev/2019/12/easy-deep-learning-in-apache-nifi-with.html



Cloudera Streams Messaging Manager Swagger Docs (For Kafka Monitoring, Management, Kafka Connect)

Cloudera Streams Messaging Manager Swagger Docs (For Kafka Monitoring, Management, Kafka Connect)



Note that the port is 8585 and not the SMM port which is often 9991.

YOURSERVER:8585/swagger

See:

https://docs.cloudera.com/smm/2.0.0/rest-api-reference/index.html#/Application_context_related_operations


Apache Flink REST and Metrics

Apache Flink REST and Metrics




After seeing Caito Scherr's amazing talk, I want to build up some useful dashboards.   My first step is exploring all the available APIs in my CSA/Flink environment.   The easiest way to discover them was I turned on Developer Console in Chrome while using the Flink Dashboard which is a great dashboard in it's own right.   But it is not focused on some key metrics that some customers are asking about in a very easy to read format for end-users.


Some URLs I have been using:

Overview of Flink Cluster

http://FLINKCLUSTER:8078/overview

{"taskmanagers":1,"slots-total":2,"slots-available":1,"jobs-running":1,"jobs-finished":0,"jobs-cancelled":3,"jobs-failed":0,"flink-version":"1.10.0-csa1.2.0.0","flink-commit":"664a5f5"}

Overview of All Flink Jobs

http://FLINKCLUSTER:8078/jobs/overview

{"jobs":[{"jid":"7c01884b74ff981a896307c4a06f2b15","name":"default: select * from itemprice","state":"CANCELED","start-time":1599576455857,"end-time":1599576486876,"duration":31019,"last-modification":1599576486876,"cluster":null,"tasks":{"total":1,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":0,"reconciling":0}},{"jid":"faeb308856db337ce628af5fea24b895","name":"default: insert into krogerprices\nselect upc,updatedate,itemdescription,brandname,CAST(price as float) as price, UUID() as uuid\nfrom itemprice\nwhere originstore = 'kroger'","state":"CANCELED","start-time":1599674296089,"end-time":1599766705456,"duration":92409367,"last-modification":1599766705456,"cluster":null,"tasks":{"total":1,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":0,"reconciling":0}},{"jid":"5d6ae4f72ab9fca3cea28ba6d4905ca7","name":"default: select * from krogerprices","state":"CANCELED","start-time":1599576795487,"end-time":1599579517485,"duration":2721998,"last-modification":1599579517485,"cluster":null,"tasks":{"total":1,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":0,"reconciling":0}},{"jid":"ec80a32b6ab59d96f649f5b3e493ec67","name":"Streaming WordCount","state":"FINISHED","start-time":1599571302659,"end-time":1599571318768,"duration":16109,"last-modification":1599571318768,"cluster":null,"tasks":{"total":5,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":5,"canceling":0,"canceled":0,"failed":0,"reconciling":0}},{"jid":"ad949e727a8c0267c9f2550c6a9b6000","name":"default: select * from itemprice","state":"CANCELED","start-time":1599676984684,"end-time":1599677004620,"duration":19936,"last-modification":1599677004620,"cluster":null,"tasks":{"total":1,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":0,"reconciling":0}},{"jid":"8c1a6903b81e7b926b7105720e24aee8","name":"default: insert into krogerprices\nselect upc,updatedate,itemdescription,brandname,CAST(price as float) as price, UUID() as uuid\nfrom itemprice\nwhere originstore = 'kroger'","state":"CANCELED","start-time":1599576540243,"end-time":1599582887998,"duration":6347755,"last-modification":1599582887998,"cluster":null,"tasks":{"total":1,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":0,"reconciling":0}},{"jid":"4cb8a5983b0bd3a14fe90618e17e2488","name":"default: select * from krogerprices","state":"CANCELED","start-time":1599674323425,"end-time":1599676592438,"duration":2269013,"last-modification":1599676592438,"cluster":null,"tasks":{"total":1,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":0,"reconciling":0}},{"jid":"01988557ccd71cbab899ded9babab606","name":"default: insert into krogerprices\nselect upc,updatedate,itemdescription,brandname,CAST(price as float) as price, UUID() as uuid\nfrom itemprice\nwhere originstore = 'kroger'","state":"RUNNING","start-time":1599673893701,"end-time":-1,"duration":103791030,"last-modification":1599673903811,"cluster":{"url":"http://ec2-3-86-165-80.compute-1.amazonaws.com:8088/proxy/application_1599570933443_0003/","originalUrl":"http://ec2-3-86-165-80.compute-1.amazonaws.com:35981","id":"application_1599570933443_0003","hostAndPort":"ec2-3-86-165-80.compute-1.amazonaws.com:35981"},"tasks":{"total":1,"created":0,"scheduled":0,"deploying":0,"running":1,"finished":0,"canceling":0,"canceled":0,"failed":0,"reconciling":0}},{"jid":"7c7932678f193f51c32cd3a2ebff6d59","name":"default: select * from itemprice","state":"CANCELED","start-time":1599573232967,"end-time":1599576425840,"duration":3192873,"last-modification":1599576425840,"cluster":null,"tasks":{"total":1,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":0,"reconciling":0}}]}

 

Flink Job Details

http://FLINKCLUSTER:8078/jobs/7c01884b74ff981a896307c4a06f2b15

{"jid":"7c01884b74ff981a896307c4a06f2b15","name":"default: select * from itemprice","isStoppable":false,"state":"CANCELED","start-time":1599576455857,"end-time":1599576486876,"duration":31019,"now":1599576486888,"timestamps":{"FAILING":0,"CANCELED":1599576486876,"RECONCILING":0,"SUSPENDED":0,"CREATED":1599576455857,"CANCELLING":1599576486855,"FAILED":0,"RESTARTING":0,"FINISHED":0,"RUNNING":1599576455905},"vertices":[{"id":"cbc357ccb763df2852fee8c4fc7d55f2","name":"Source: KafkaTableSource(upc, originstore, updatedate, longdescription, itemdescription, brandname, displayimage, price, msrp, tpr) -> SourceConversion(table=[registry.default_database.itemprice, source: [KafkaTableSource(upc, originstore, updatedate, longdescription, itemdescription, brandname, displayimage, price, msrp, tpr)]], fields=[upc, originstore, updatedate, longdescription, itemdescription, brandname, displayimage, price, msrp, tpr]) -> SinkConversionToTuple2 -> Sink: SQL Client Stream Collect Sink","parallelism":1,"status":"CANCELED","start-time":1599576455941,"end-time":1599576486876,"duration":30935,"tasks":{"RUNNING":0,"FAILED":0,"RECONCILING":0,"FINISHED":0,"CANCELING":0,"CANCELED":1,"SCHEDULED":0,"CREATED":0,"DEPLOYING":0},"metrics":{"read-bytes":0,"read-bytes-complete":true,"write-bytes":0,"write-bytes-complete":true,"read-records":0,"read-records-complete":true,"write-records":0,"write-records-complete":true}}],"status-counts":{"RUNNING":0,"FAILED":0,"RECONCILING":0,"FINISHED":0,"CANCELING":0,"CANCELED":1,"SCHEDULED":0,"CREATED":0,"DEPLOYING":0},"plan":{"jid":"7c01884b74ff981a896307c4a06f2b15","name":"default: select * from itemprice","nodes":[{"id":"cbc357ccb763df2852fee8c4fc7d55f2","parallelism":1,"operator":"","operator_strategy":"","description":"Source: KafkaTableSource(upc, originstore, updatedate, longdescription, itemdescription, brandname, displayimage, price, msrp, tpr) -> SourceConversion(table=[registry.default_database.itemprice, source: [KafkaTableSource(upc, originstore, updatedate, longdescription, itemdescription, brandname, displayimage, price, msrp, tpr)]], fields=[upc, originstore, updatedate, longdescription, itemdescription, brandname, displayimage, price, msrp, tpr]) -> SinkConversionToTuple2 -> Sink: SQL Client Stream Collect Sink","optimizer_properties":{}}]}}

Flink Cluster Configuration
http://FLINKCLUSTER:8078/config
{"refresh-interval":10000,"timezone-name":"Coordinated Universal Time","timezone-offset":0,"flink-version":"1.10.0-csa1.2.0.0","flink-revision":"664a5f5 @ 29.04.2020 @ 14:13:26 UTC","features":{"web-submit":false,"global-dashboard":true}}

Jobs - Specific Job - Checkpoints
http://YARNCLUSTER:8088/proxy/application_1599570933443_0003/jobs/01988557ccd71cbab899ded9babab606/checkpoints
{"counts":{"restored":0,"total":0,"in_progress":0,"completed":0,"failed":0},"summary":{"state_size":{"min":0,"max":0,"avg":0},"end_to_end_duration":{"min":0,"max":0,"avg":0},"alignment_buffered":{"min":0,"max":0,"avg":0}},"latest":{"completed":null,"savepoint":null,"failed":null,"restored":null},"history":[]}

Job Manager Configuration
http://YARNCLUSTER:8088/proxy/application_1599570933443_0003/jobmanager/config
[{"key":"yarn.flink-dist-jar","value":"file:/opt/cloudera/parcels/FLINK-1.10.0-csa1.2.0.0-cdh7.1.1.0-326-2901802/lib/flink/lib/flink-dist_2.11-1.10.0-csa1.2.0.0.jar"},{"key":"high-availability.cluster-id","value":"application_1599570933443_0003"},{"key":"jobmanager.rpc.address","value":"ec2-3-86-165-80.compute-1.amazonaws.com"},{"key":"high-availability.storageDir","value":"hdfs:///user/flink/ha"},{"key":"state.backend.rocksdb.timer-service.factory","value":"ROCKSDB"},{"key":"io.tmp.dirs","value":"/yarn/nm/usercache/root/appcache/application_1599570933443_0003"},{"key":"historyserver.cli.fallback","value":"true"},{"key":"parallelism.default","value":"1"},{"key":"execution.buffer-timeout","value":"100"},{"key":"jobmanager.heap.size","value":"1073741824"},{"key":"execution.checkpointing.mode","value":"EXACTLY_ONCE"},{"key":"taskmanager.memory.process.size","value":"2 gb"},{"key":"web.port","value":"0"},{"key":"state.backend.local-recovery","value":"true"},{"key":"state.backend.rocksdb.memory.managed","value":"true"},{"key":"yarn.tags","value":"flink"},{"key":"state.backend.incremental","value":"true"},{"key":"taskmanager.memory.network.fraction","value":"0.1"},{"key":"yarn.container-start-command-template","value":"%java% %jvmmem% %jvmopts% -DyarnContainerId=$CONTAINER_ID %logging% %class% %args% %redirects%"},{"key":"web.tmpdir","value":"/tmp/flink-web-37be6c08-f457-468a-9c4b-5fde92c042b1"},{"key":"jobmanager.rpc.port","value":"33566"},{"key":"execution.checkpointing.timeout","value":"60000"},{"key":"high-availability.zookeeper.quorum","value":"ec2-3-86-165-80.compute-1.amazonaws.com:2181"},{"key":"taskmanager.memory.managed.fraction","value":"0.4"},{"key":"rest.address","value":"ec2-3-86-165-80.compute-1.amazonaws.com"},{"key":"state.backend","value":"FILESYSTEM"},{"key":"logging.configuration.file","value":"/etc/flink/conf/log4j.properties"},{"key":"execution.checkpointing.max-concurrent-checkpoints","value":"1"},{"key":"high-availability.zookeeper.client.acl","value":"open"},{"key":"historyserver.web.address","value":"ec2-3-86-165-80.compute-1.amazonaws.com"},{"key":"state.checkpoints.num-retained","value":"3"},{"key":"historyserver.web.port","value":"8078"},{"key":"pipeline.auto-watermark-interval","value":"200"},{"key":"state.savepoints.dir","value":"hdfs:///user/flink/savepoints"},{"key":"pipeline.generic-types","value":"true"},{"key":"yarn.maximum-failed-containers","value":"100"},{"key":"yarn.application-attempts","value":"5"},{"key":"taskmanager.numberOfTaskSlots","value":"2"},{"key":"state.backend.rocksdb.memory.write-buffer-ratio","value":"0.5"},{"key":"jobmanager.archive.fs.dir","value":"hdfs:///user/flink/applicationHistory"},{"key":"execution.target","value":"yarn-per-job"},{"key":"pipeline.object-reuse","value":"false"},{"key":"internal.io.tmpdirs.use-local-default","value":"true"},{"key":"state.backend.rocksdb.memory.high-prio-pool-ratio","value":"0.1"},{"key":"taskmanager.memory.network.max","value":"2147483648"},{"key":"execution.attached","value":"false"},{"key":"internal.cluster.execution-mode","value":"NORMAL"},{"key":"execution.checkpointing.externalized-checkpoint-retention","value":"RETAIN_ON_CANCELLATION"},{"key":"high-availability","value":"ZOOKEEPER"},{"key":"execution.checkpointing.min-pause","value":"0"},{"key":"execution.checkpointing.snapshot-compression","value":"false"},{"key":"state.checkpoints.dir","value":"hdfs:///user/flink/checkpoints"}]

Job Manager - Log
http://YARNCLUSTER:8088/proxy/application_1599570933443_0003/jobmanager/log
2020-09-11 13:37:42,586 INFO  org.apache.flink.yarn.YarnResourceManager                     - Disconnect job manager 9c2e3f25dae1d548dc730941d6484cbb@akka.tcp://flink@ec2-3-86-165-80.compute-1.amazonaws.com:33566/user/jobmanager_7 for job 67a2de8fb291333bbd90b334f8f83def from the resource manager.
2020-09-11 13:37:42,586 INFO  org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  - Stopping ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/67a2de8fb291333bbd90b334f8f83def/job_manager_lock'}.
2020-09-11 13:37:42,591 INFO  org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore    - Removed job graph 67a2de8fb291333bbd90b334f8f83def from ZooKeeper.

Job Manager - Standard Out
http://YARNCLUSTER:8088/proxy/application_1599570933443_0003/jobmanager/stdout

Task Managers
http://YARNCLUSTER:8088/proxy/application_1599570933443_0003/taskmanagers
{"taskmanagers":[{"id":"container_1599570933443_0003_04_000002","path":"akka.tcp://flink@ec2-3-86-165-80.compute-1.amazonaws.com:38435/user/taskmanager_0","dataPort":41920,"timeSinceLastHeartbeat":1599832789681,"slotsNumber":2,"freeSlots":1,"hardware":{"cpuCores":16,"physicalMemory":133838598144,"freeMemory":669515776,"managedMemory":665719939}}]}

Task Managers - Container
http://YARNCLUSTER:8088/proxy/application_1599570933443_0003/taskmanagers/container_1599570933443_0003_04_000002

{"id":"container_1599570933443_0003_04_000002","path":"akka.tcp://flink@ec2-3-86-165-80.compute-1.amazonaws.com:38435/user/taskmanager_0","dataPort":41920,"timeSinceLastHeartbeat":1599832809720,"slotsNumber":2,"freeSlots":1,"hardware":{"cpuCores":16,"physicalMemory":133838598144,"freeMemory":669515776,"managedMemory":665719939},"metrics":{"heapUsed":200512944,"heapCommitted":664272896,"heapMax":664272896,"nonHeapUsed":145389616,"nonHeapCommitted":149569536,"nonHeapMax":780140544,"directCount":5111,"directUsed":168414439,"directMax":168414435,"mappedCount":0,"mappedUsed":0,"mappedMax":0,"memorySegmentsAvailable":5079,"memorySegmentsTotal":5079,"garbageCollectors":[{"name":"PS_Scavenge","count":91,"time":453},{"name":"PS_MarkSweep","count":4,"time":358}]}}


Task Managers - Container - Log

http://YARNCLUSTER:8088/proxy/application_1599570933443_0003/taskmanagers/container_1599570933443_0003_04_000002/log


2020-09-09 17:58:17,456 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer  - Starting FlinkKafkaInternalProducer (1/1) to produce into default topic krogerprices
2020-09-09 17:58:17,465 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 0 has no restore state.
2020-09-09 17:58:17,475 INFO  org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerConfig  - ConsumerConfig values: 

Task Managers - Container - Standard Out
http://YARNCLUSTER:8088/proxy/application_1599570933443_0003/taskmanagers/container_1599570933443_0003_04_000002/stdout



Talks

For the definitive way to read and use these, see Caito's awesome talk here:

https://www.youtube.com/watch?v=MQQ7qaKKKc8


Command Line (https://docs.cloudera.com/csa/1.2.0/job-lifecycle/topics/csa-supported-cli.html)


flink list

20/09/11 13:45:34 INFO cli.CliFrontend: Successfully retrieved list of jobs

------------------ Running/Restarting Jobs -------------------

09.09.2020 17:51:33 : 01988557ccd71cbab899ded9babab606 : default: insert into krogerprices

select upc,updatedate,itemdescription,brandname,CAST(price as float) as price, UUID() as uuid

from itemprice

where originstore = 'kroger' (RUNNING)

--------------------------------------------------------------









Apache NiFi 1.12 Released! 18-August-2020

Apache NiFi 1.12 Released! 18-August-2020

Release Notes

https://cwiki.apache.org/confluence/display/NIFI/Release+Notes#ReleaseNotes-Version1.12.0

Issues

https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12316020&version=12346778

Major Feature List

https://twitter.com/pvillard31/status/1296469452180119553


Release Date: August 18, 2020. 

Major Features:

  • New processor to write scripted record transforms live in the flow (ScriptedTransformRecord)
  • Expose a REST Endpoint for easy metric scraping by Prometheus
  • Ability to specify group level flow file concurrency - for instance run a single flow file end to end for traditional job handling
  • Improved several capabilities related to Azure service interaction including ADLS Gen2 
  • Improved AMQP and MQTT support as well as JMS improvements
  • Support for latest Kafka 2.6 clients
  • Search UI Improvements

I will be posting a few demos and test drives soon.


ScriptedTransformRecord


https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-scripting-nar/1.12.0/org.apache.nifi.processors.script.ScriptedTransformRecord/index.html


Deleting Schemas From Cloudera Schema Registry

 Deleting Schemas From Cloudera Schema Registry


It is very easy to delete schemas from Cloudera Schema Registry if you need to do so.   I recommend downloading them and having a backup first.

Let's look at our schema


Well let's get rid of that junk.

Here is the documentation For CDF Datahub in CDP Public Cloud



Example

curl -X DELETE "http://MYSERVERHASACOOLNAME.DEV:7788/api/v1/schemaregistry/schemas/junk" -H "accept: application/json"

Where junk is the name of my schema.





You could call this REST API from NiFi, a DevOps tool or just a simple CURL like listed above.

Knox and other security may apply.


Did the user really ask for Exactly Once? Fault Tolerance

Exactly Once Requirements

It is very tricky and can cause performance degradation, if your user could just use at least once, then always go with that.    Having data sinks like Kudu where you can do an upsert makes exactly once less needed.

https://docs.cloudera.com/csa/1.2.0/datastream-connectors/topics/csa-kafka.html

Apache Flink, Apache NiFi Stateless and Apache Kafka can participate in that.

For CDF Stream Processing and Analytics with Apache Flink 1.10 Streaming:

Both Kafka sources and sinks can be used with exactly once processing guarantees when checkpointing is enabled.


End-to-End Guaranteed Exactly-Once Record Delivery

The Data Source and Data Sink to need to support exactly-once state semantics and take part in checkpointing.


Data Sources
  • Apache Kafka - must have Exactly-Once selected, transactions enabled and correct driver.

Select:  Semantic.EXACTLY_ONCE


Data Sinks
  • HDFS BucketingSink
  • Apache Kafka



Reference


FLaNK in the Cloud!!!! Huge Cloudera Data Platform Public Cloud Updates - July 2020 - Data Flow Releases

FLaNK in the Cloud!!!!   

Huge Cloudera Data Platform Public Cloud Updates 

July 2020 - Data Flow Releases


With the promotion of Cloud Runtime 7.2.1 to Public Cloud, the CDF team is pleased to announce three key and very important updates that were also promoted to production today and available to customers.  These are:


Let's see what's new in 7.2.1!


Flow (GA) - 2.0.3 is optimized for public cloud!  



Streams  (GA) -  Now with Apache Kafka v2.5!


Streaming Analytics  (TP) -   v1.2.1, powered by Apache Flink 1.10 is Technical Preview in the CDP Public Cloud.


Start with Streaming Analytics Light Duty

Using Kudu with Flink

Using HBase with Flink

Using Kafka with Flink

General CSA Flink Docs

  • Data source reading from Kafka
  • Data sinks writing to Kafka, HBase and Kudu
  • Apache Atlas integration
  • SQL/Table API and SQL Client
  • Table connectors 
    • Kafka
    • Kudu
    • Hive (through catalog)
We can now run the FLaNK Stack in the Public Cloud automagically!

Sizing Your Apache NiFi Cluster For Production Workloads

Sizing Your Apache NiFi Cluster For Production Workloads

Cloudera Flow Management provides an enterprise edition of support Apache NiFi managed by Cloudera Manager.    The official documentation provides a great guide for sizing your cluster.

https://docs.cloudera.com/cfm/2.0.1/nifi-sizing/topics/cfm-nifi-sizing.html

If the use case fits, NiFi Stateless Engine may fit and perform better utilizing no disk.

Check out that heap usage and utilization, you may need to increase.    24-32 Gigabytes of RAM is a nice sweet spot for most instances.



Check out how your nodes, threads and queues are doing.   If queue is not processing fast or thread count is high, you may need more cores, RAM or nodes.



When you are managing your cluster in Cloudera Manager, make sure you increase the default JVM memory for Apache NiFi.  512MB is not going to cut it for anything but single user development.



Do this correctly and process a billion events!!!   https://blog.cloudera.com/benchmarking-nifi-performance-and-scalability/.  - Notice the hardware and performance sections of that article


General tips:

Make sure you use SSD for Provenance and other repositories.  Faster disk, happier user. https://docs.cloudera.com/cfm/2.0.1/nifi-sizing/topics/cfm-sizing-disk-configuration.html

Monitor your flows to see how much resources you need:  https://www.datainmotion.dev/2020/07/report-on-this-apache-nifi-1114-monitor.html.


Use Records, if it's semistructured GrokReader can help.   https://www.nifi.rocks/record-path-cheat-sheet/  If it's CSV, JSON, XML, Parquet, Logs then use Readers and writers.   They are much faster, easier and cleaner.



Minimize use of CPU or Memory intensive processors (or make a not of them during sizing):   https://docs.cloudera.com/cfm/2.0.1/nifi-sizing/topics/cfm-sizing-resource-intensive-processors.html

There are a few decisions to make on repositories, talk to your Cloudera friends.    https://docs.cloudera.com/HDPDocuments/HDF3/HDF-3.5.1/nifi-configuration-best-practices/content/configuration-best-practices.html