Using DJL.AI For Deep Learning BERT Q&A in NiFi DataFlows
Introduction:
I will be talking about this processor at Apache Con @ Home 2020 in my "Apache Deep Learning 301" talk with Dr. Ian Brooks.
Sometimes you want your Deep Learning Easy and in Java, so let's do that with DJL in a custom Apache NiFi processor running in CDP Data Hubs. This one does BERT QA.
To use the processor feed in a paragraph to analyze via the paragraph parameter in the NiFi processor. Also feed in a question, like Why? or something very specific like asking the date or an event.
The pretrained model is BERT QA model using PyTorch. the NiFi Processor Source:
Make sure you have 1-2 GB of RAM extra for your NiFi instance for running each DJL processor. If you have a lot of text, run more nodes and/or RAM. Make sure you have at least 8 cores per Deep Learning process. I prefer JDK 11 for this.
The pretrained model is DistilBERT model trained by HuggingFace using PyTorch.
Tip
Make sure you have 1-2 GB of RAM extra for your NiFi instance for running each DJL processor. If you have a lot of text, run more nodes and/or RAM. Make sure you have at least 8 cores per Deep Learning process. I prefer JDK 11 for this.
After seeing Caito Scherr's amazing talk, I want to build up some useful dashboards. My first step is exploring all the available APIs in my CSA/Flink environment. The easiest way to discover them was I turned on Developer Console in Chrome while using the Flink Dashboard which is a great dashboard in it's own right. But it is not focused on some key metrics that some customers are asking about in a very easy to read format for end-users.
{"jobs":[{"jid":"7c01884b74ff981a896307c4a06f2b15","name":"default: select * from itemprice","state":"CANCELED","start-time":1599576455857,"end-time":1599576486876,"duration":31019,"last-modification":1599576486876,"cluster":null,"tasks":{"total":1,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":0,"reconciling":0}},{"jid":"faeb308856db337ce628af5fea24b895","name":"default: insert into krogerprices\nselect upc,updatedate,itemdescription,brandname,CAST(price as float) as price, UUID() as uuid\nfrom itemprice\nwhere originstore = 'kroger'","state":"CANCELED","start-time":1599674296089,"end-time":1599766705456,"duration":92409367,"last-modification":1599766705456,"cluster":null,"tasks":{"total":1,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":0,"reconciling":0}},{"jid":"5d6ae4f72ab9fca3cea28ba6d4905ca7","name":"default: select * from krogerprices","state":"CANCELED","start-time":1599576795487,"end-time":1599579517485,"duration":2721998,"last-modification":1599579517485,"cluster":null,"tasks":{"total":1,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":0,"reconciling":0}},{"jid":"ec80a32b6ab59d96f649f5b3e493ec67","name":"Streaming WordCount","state":"FINISHED","start-time":1599571302659,"end-time":1599571318768,"duration":16109,"last-modification":1599571318768,"cluster":null,"tasks":{"total":5,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":5,"canceling":0,"canceled":0,"failed":0,"reconciling":0}},{"jid":"ad949e727a8c0267c9f2550c6a9b6000","name":"default: select * from itemprice","state":"CANCELED","start-time":1599676984684,"end-time":1599677004620,"duration":19936,"last-modification":1599677004620,"cluster":null,"tasks":{"total":1,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":0,"reconciling":0}},{"jid":"8c1a6903b81e7b926b7105720e24aee8","name":"default: insert into krogerprices\nselect upc,updatedate,itemdescription,brandname,CAST(price as float) as price, UUID() as uuid\nfrom itemprice\nwhere originstore = 'kroger'","state":"CANCELED","start-time":1599576540243,"end-time":1599582887998,"duration":6347755,"last-modification":1599582887998,"cluster":null,"tasks":{"total":1,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":0,"reconciling":0}},{"jid":"4cb8a5983b0bd3a14fe90618e17e2488","name":"default: select * from krogerprices","state":"CANCELED","start-time":1599674323425,"end-time":1599676592438,"duration":2269013,"last-modification":1599676592438,"cluster":null,"tasks":{"total":1,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":0,"reconciling":0}},{"jid":"01988557ccd71cbab899ded9babab606","name":"default: insert into krogerprices\nselect upc,updatedate,itemdescription,brandname,CAST(price as float) as price, UUID() as uuid\nfrom itemprice\nwhere originstore = 'kroger'","state":"RUNNING","start-time":1599673893701,"end-time":-1,"duration":103791030,"last-modification":1599673903811,"cluster":{"url":"http://ec2-3-86-165-80.compute-1.amazonaws.com:8088/proxy/application_1599570933443_0003/","originalUrl":"http://ec2-3-86-165-80.compute-1.amazonaws.com:35981","id":"application_1599570933443_0003","hostAndPort":"ec2-3-86-165-80.compute-1.amazonaws.com:35981"},"tasks":{"total":1,"created":0,"scheduled":0,"deploying":0,"running":1,"finished":0,"canceling":0,"canceled":0,"failed":0,"reconciling":0}},{"jid":"7c7932678f193f51c32cd3a2ebff6d59","name":"default: select * from itemprice","state":"CANCELED","start-time":1599573232967,"end-time":1599576425840,"duration":3192873,"last-modification":1599576425840,"cluster":null,"tasks":{"total":1,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":0,"reconciling":0}}]}
2020-09-11 13:37:42,586 INFO org.apache.flink.yarn.YarnResourceManager - Disconnect job manager 9c2e3f25dae1d548dc730941d6484cbb@akka.tcp://flink@ec2-3-86-165-80.compute-1.amazonaws.com:33566/user/jobmanager_7 for job 67a2de8fb291333bbd90b334f8f83def from the resource manager.
2020-09-11 13:37:42,586 INFO org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService - Stopping ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/67a2de8fb291333bbd90b334f8f83def/job_manager_lock'}.
2020-09-11 13:37:42,591 INFO org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore - Removed job graph 67a2de8fb291333bbd90b334f8f83def from ZooKeeper.
2020-09-09 17:58:17,456 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer - Starting FlinkKafkaInternalProducer (1/1) to produce into default topic krogerprices
2020-09-09 17:58:17,465 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Consumer subtask 0 has no restore state.
2020-09-09 17:58:17,475 INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
It is very tricky and can cause performance degradation, if your user could just use at least once, then always go with that. Having data sinks like Kudu where you can do an upsert makes exactly once less needed.
With the promotion of Cloud Runtime 7.2.1 to Public Cloud, the CDF team is pleased to announce three key and very important updates that were also promoted to production today and available to customers. These are:
Sizing Your Apache NiFi Cluster For Production Workloads
Cloudera Flow Management provides an enterprise edition of support Apache NiFi managed by Cloudera Manager. The official documentation provides a great guide for sizing your cluster.
If the use case fits, NiFi Stateless Engine may fit and perform better utilizing no disk.
Check out that heap usage and utilization, you may need to increase. 24-32 Gigabytes of RAM is a nice sweet spot for most instances.
Check out how your nodes, threads and queues are doing. If queue is not processing fast or thread count is high, you may need more cores, RAM or nodes.
When you are managing your cluster in Cloudera Manager, make sure you increase the default JVM memory for Apache NiFi. 512MB is not going to cut it for anything but single user development.
Use Records, if it's semistructured GrokReader can help. https://www.nifi.rocks/record-path-cheat-sheet/ If it's CSV, JSON, XML, Parquet, Logs then use Readers and writers. They are much faster, easier and cleaner.
The easiest way to grab monitoring data is via the NiFi REST API. Also everything in the NiFi UI is done through REST calls which you can call programmatically. Please read the NiFi docs they are linked directly from your running NiFi application or on the web. They are very thorough and have all the information you could want: https://nifi.apache.org/docs/nifi-docs/. If you are not running NiFi 1.11.4, I recommend you please upgrade. This is supported by Cloudera on multiple platforms.
Also in NiFi flow programming, every time you produce data to Kafka you get metadata back in FlowFile Attributes. You can push those attributes directly to a kafka topic if you want.
So after your PublishKafkaRecord_2_0 1.11.4 so for success read the attributes on # of record and other data then AttributesToJson and push to another topic. you may want a mergerecord in there to aggregate a few of those together.
If you are interested in Kafka metrics/record counts/monitoring then you must use Cloudera Streams Messaging Manager, it provides a full Web UI, Monitoring Tool, Alerts, REST API and everything you need for monitoring every producer, consumer, broker, cluster, topic, message, offset and Kafka component.
The best way to get NiFi stats is to use the NiFi Reporting Tasks, I like the SQL Reporting task.
SQL Reporting Tasks are very powerful and use standard SELECT * FROM JVM_METRICS style reporting, see my article:
GenerateFlowFile - build a schedule matching when NOAA updates weather
InvokeHTTP - download all weather ZIP
CompressContent - decompress ZIP
UnpackContent - extract files from ZIP
*RouteOnAttribute - just give us ones that are airports (${filename:startsWith('K')}). optional.
*QueryRecord - XMLReader to JsonRecordSetWriter. Query: SELECT * FROM FLOWFILE WHERE NOT location LIKE '%Unknown%'. This is to remove some locations that are not identified. optional.
Send it somewhere for storage. Could put PutKudu, PutORC, PutHDFS, PutHiveStreaming, PutHbaseRecord, PutDatabaseRecord, PublishKafkaRecord2* or others.