Easy Deep Learning in Apache NiFi with DJL


Custom Processor for Deep Learning



 Happy Mm.. FLaNK Day!


I have been experimenting with the awesome new Apache 2.0 licensed Java Deep Learning Library, DJL.   In NiFi I was trying to figure out a quick use case and demo.   So I use my Web Camera processor to grab a still shot from my Powerbook webcam and send it to the processor.   The results are sent to slack.

Since it's the holidays I think of my favorite holiday movies:   The Matrix and Blade Runner.   So I thought a Voight-Kampf test would be fun.   Since I don't have a Deep Learning QA piece built yet, let's start by seeing if you look human.  We'll call them 'person'.   I am testing to see if I am a replicant.  Sometimes hard to tell.   Let's see if DJL thinks I am human.

See:   http://nautil.us/blog/-the-science-behind-blade-runners-voight_kampff-test



Okay, so it least it thinks I am a person.   The classification of a Christmas tree is vaguely accurate.






It did not identify my giant french bread.

Building A New Custom Processor for Deep Learning




The hardest part of was a good NiFi Integration test.   The DJL team provide some great examples and it's really easy to plug into their models.

ZooModel<BufferedImage, DetectedObjects> model =
                     MxModelZoo.SSD.loadModel(criteria, new ProgressBar())
Predictor<BufferedImage, DetectedObjects> predictor = model.newPredictor()
DetectedObjects detection = predictor.predict(img);

All the source is in github and references the below DJL sites and repos.

Using a New Custom Processor as part of a Real-time Holiday Flow

We first add the DeepLearningProcessor to our canvas.



An example flow:
  • GetWebCameraProcessor:  grab an image from an attached webcamera
  • UpdateAttribute:  Add media type for image
  • DeepLearningProcessor:   Run our DJL deep learning model from a zoo
  • PutSlack:   Put DJL results in a text window in slack
  • PostSlack:  Send our DJL altered image to slack
  • Funnel:   Send all failures to Valhalla



If we example the provenance we can see how long it took to run and some other interesting attributes.


We place the results of our image analysis in attributes while we return a new image that has a bounding box on the found object(s).




 We now a full classification workflow for real-time deep learning analysis on images, could be used for Santa watching, Security, Memes and other important business purposes.


The initial release is available here:   https://github.com/tspannhw/nifi-djl-processor/releases/tag/v1.0
Using library and example code from the Deep Java Library (https://djl.ai/).



Source Code:   https://github.com/tspannhw/nifi-djl-processor/

And now for something completely different, Christmas Cats:












Princeton, New Jersey, USA - Meetup - 10 - December - 2019 - HBase, Flink, NiFi 1.10, MiNiFi, MXNet, Kafka, Kudu

10-Dec-2019 Meetup:  HBase, Cloud, Flink, NiFi, Kafka, IoT, AI


Come network, socialize, trade notes and learn about Cloudera’s new Cloud offering for Operational Databases (powered by Apache HBase). Learn how developers are using Cloudera’s OpDB to support mission critical applications that need very high availability, resiliency and scalability. Learn how easy it is becoming to now do the same in the Cloud and why it is uniquely situated to support cloud native applications.



What you can expect:
6:00 – 6:45: Networking & Food
6:45 – 7:30: Presentations / Demo of Cloudera OpDB - HBase



7:30 - 7:45 Lightning Talk: Introduction to NiFi 1.10
7:45 - 8:15 Lightning Talk: Introduction to Mm.. FLaNK Stack
https://www.datainmotion.dev/2019/11/introducing-mm-flank-apache-flink-stack.html



8:15 – 8:30: Ask Me Anything



The three topics we will cover:
- An overview of the new Cloud offering and key capabilities of the database

- Delicious Food & Drinks

Hosted By PGA Fund at:
https://pga.fund/coworking-space/

Princeton Growth Accelerator
5 Independence Way, 4th Floor, Princeton, NJ

https://www.meetup.com/futureofdata-princeton/events/266496424/

See:  https://www.datainmotion.dev/2019/12/hbase-20-on-cdp-on-aws.html

For Code and Slides: 
https://github.com/tspannhw/HBase2
https://github.com/tspannhw/MmFLaNK
https://github.com/tspannhw/nifi-1.10-templates
https://www.slideshare.net/bunkertor/cloudera-operational-db-apache-hbase-apache-phoenix
https://github.com/tspannhw/stateless-examples
https://www.slideshare.net/bunkertor/mm-flank-stack-minifi-mxnet-flink-nifi-kudu-kafka
https://www.slideshare.net/bunkertor/introduction-to-apache-nifi-110


Introducing Mm FLaNK... An Apache Flink Stack for Rapid Streaming Development From Edge 2 AI

Introducing Mm FLaNK... 

An Apache Flink Stack for Rapid Streaming Development From Edge 2 AI





Source:   https://github.com/tspannhw/MmFLaNK

Stateless NiFi:   https://www.datainmotion.dev/2019/11/exploring-apache-nifi-110-parameters.html

To show an example of using the Mm FLaNK stack we have an Apache NiFi flow that reads IoT data (JSON) and send it to Apache Kafka.   An Apache Flink streaming application running in YARN reads it, validates the data and send it to another Kafka topic.  We monitor and check the data with SMM.    The data from that second topic is read by Apache NiFi and pushed to Apache Kudu tables.

Mm FLaNK Stack (MXNet, MiNiFi, Flink, NiFi, Kafka, Kudu)



 First, we rapidly ingest, route, transform, convert, query and process data with Apache NiFi.   Once we have transformed it into a client, schema-validated known data type we can stream it to Kafka for additional processing.



Second, we read from that Kafka topic, iot, with a Java Apache Flink application.   We then filter out empty records and then push it to another Kafka topic, iot-result.
       public static class NotNullFilter implements FilterFunction<String> {                    @Override public boolean filter(String string) throws Exception {                  if ( string == null || string.isEmpty() || string.trim().length() <=0) {                return false;}           
        return true; } 
}
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "cloudera:9092");
properties.setProperty("group.id","flinkKafkaGroup");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("iot", new SimpleStringSchema(), properties);
DataStream<String> source = env.addSource(consumer).name("Flink IoT Kafka Source");
source.print();
source.filter(new NotNullFilter());
source.addSink(new FlinkKafkaProducer<>("cloudera:9092", "iot-result",
new SimpleStringSchema())).name("Flink Kafka IoT Result Sink");


We compile our Java application with maven and build a big JAR that we will use the Flink Client to push to YARN on our CDH 6.3 cluster.



Once the Flink application is running we can see a lot of metrics, logs and information on our streaming service.  We can browse the logs via YARN UI and Flink UI.


From the Hadoop YARN UI, you can Link to this application's Flink Dashboard.




You can easily see logs from various components, your application, containers and various systems.



The Apache Flink Dashboard is a very rich easy to navigate to interface for all the information related to your running job.



You can see and add charts to view various metrics about sources, sinks and a large number of other interesting real-time attributes.


There is also an older view that is still available to show a less dynamic interface.


You can also dive down into individual task managers with their attributes.


Logs are easily accessible:


Another Apache NiFi flow reads from the iot-result topic and does additional processing, could do machine learning / deep learning and then stores to a data store such as Apache Kudu.   If you have attended any of the CDF Workshops, you will notice we could integrate machine learning via a LookupRecord processor calling CDSW Python Machine Learning application via REST.   Data read from Kafka is streaming in with helpful metadata into our provenance data stream including kafka offset, partition, topic and a unique id on the data.



Since we are using Apache Kafka 2.x for a lot of streams, we really need to see what is going on in there.   Fortunately Cloudera Streams Messaging Manager gives us full insight into everything.  We can browse brokers, topics, consumers and producers.   We can also setup alerts to notify someone via various methods (email, rest, etc...) if something happens such as consumer lag.   From the below screen we can see that these two consumers are lagging.   This is because I have stopped them so they cannot process the message in their respective topics.


I restart my consumers and quickly the lag is gone.   Flink and NiFi process Kafka topics quickly!


We can see a nice overview of my FLaNK application via this screen.   I see my current stats on the source and sink Kafka topics I am working with from NiFi and Flink.



I can also dive into a topic (if I have permissions) and see the content of the data.   There's also a link to the Schema Registry for this particular topic and the ability to view various types of Kafka data including Strings and Avro.   As you can see our payloads are JSON IoT data.


Once our NiFi Kafka Consumer has acquired the data we use our PutKudu processor to auto infer the schema of the JSON data and write to an existing Kudu table.  We can query the data via an interface such as Apache Hue.   We run a quick Apache Impala SQL query to get back data as it arrives.   Apache NiFi is inserting data as we stream it through Apache Kafka from our Apache Flink application.







That was easy!   Next up we will expand our Edge piece by utilizing MiNiFi deployed by Cloudera Edge Flow Manager as well as add our second M with Apache MXNet with my Apache MXNet NiFi processor as well as some Python GluonCV, GluonTS and more via Cloudera Data Science Workbench.  We will also use Stateless Apache NiFi 1.10 for some FaaS Kafka consuming and producing.

More FLaNK to come!

Upcoming:  GluonTS for Forecasting Time Series https://gluon-ts.mxnet.io/examples/basic_forecasting_tutorial/tutorial.html

References: