Using Apache NiFi with Apache MXNet GluonCV for YOLO 3 Deep Learning Workflows

Using GluonCV 0.3 with Apache MXNet 1.3
source code:
*Captured and Processed Image Available for Viewing in Stream in Apache NiFi 1.7.x
use case:
I need to easily monitor the contents of my security vault. It is a fixed number of known things.
What we need in the real world is a nice camera(s) (maybe four to eight depending on angles of the room), a device like an NVidia Jetson TX2, MiniFi 0.5 Java Agent, JDK 8, Apache MXNet, GluonCV, Lots of Python Libraries, a network connection and a simple workflow. Outside of my vault, I will need a server(s) or clusters to do the more advanced processing, though I could run it all on the local box. If the number of items or certain items I am watching are no longer in the screen, then we should send an immediate alert. That could be to an SMS, Email, Slack, Alert System or other means. We had most of that implemented below. If anyone wants to do the complete use case I can assist.
demo implementation:
I wanted to use the new YOLO 3 model which is part of the new 0.3 stream, so I installed a 0.3. This may be final by the time you read this. You can try to do a regular pip3.6 install -U gluoncv and see what you get.
  1. pip3.6 install -U gluoncv==0.3.0b20180924
Yolo v3 is a great pretrained model to use for object detection.
See: https://gluon-cv.mxnet.io/build/examples_detection/demo_yolo.html
The GluonCV Model Zoo is very rich and incredibly easy to use. So we just grab the model "yolo3_darknet53_voc" with an automatic one time download and we are ready to go. They provide easy to customize code to start with. I write my processed image and JSON results out for ingest by Apache NiFi. You will notice this is similar to what we did for the Open Computer Vision talks: https://community.hortonworks.com/articles/198939/using-apache-mxnet-gluoncv-with-apache-nifi-for-de.html
This is updated and even easier. I dropped the MQTT and just output image files and some JSON to read.
GluonCV makes working with Computer Vision extremely clean and easy.
why Apache NiFi For Deep Learning Workflows
Let me count the top five ways:
#1 Provenance - lets me see everything, everywhere, all the time with the data and the metadata.
#2 Configurable Queues - queues are everywhere and they are extremely configurable on size and priority. There's always backpressure and safety between every step. Sinks, Sources and steps can be offline as things happen in the real-world internet. Offline, online, wherever, I can recover and have full visibility into my flows as they spread between devices, servers, networks, clouds and nation-states.
#3 Security - secure at every level from SSL and data encryption. Integration with leading edge tools including Apache Knox, Apache Ranger and Apache Atlas. See: https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.1.1/bk_security/content/ch_enabling-knox-for-nifi.html
#4 UI - a simple UI to develop, monitor and manage incredibly complex flows including IoT, Deep Learning, Logs and every data source you can throw at it.
#5 Agents - MiniFi gives me two different agents for my devices or systems to stream data headless.
running gluoncv yolo3 model
I wrap my Python script in a shell script to throw away warnings and junk
  1. cd /Volumes/TSPANN/2018/talks/ApacheDeepLearning101/nifi-gluoncv-yolo3
  2. python3.6 -W ignore /Volumes/TSPANN/2018/talks/ApacheDeepLearning101/nifi-gluoncv-yolo3/yolonifi.py 2>/dev/null
List of Possible Objects We Can Detect
  1. ["aeroplane", "bicycle", "bird", "boat", "bottle", "bus", "car", "cat", "chair", "cow",
  2. "diningtable", "dog", "horse", "motorbike", "person", "pottedplant", "sheep", "sofa", "train",
  3. "tvmonitor"]
I am going to train this with my own data for the upcoming INTERNET OF BEER, for the vault use case we would need your vault content pictures.
See: https://gluon-cv.mxnet.io/build/examples_datasets/detection_custom.html#sphx-glr-build-examples-datasets-detection-custom-py
Example Output in JSON
  1. {"imgname": "images/gluoncv_image_20180924190411_b90c6ba4-bbc7-4bbf-9f8f-ee5a6a859602.jpg", "imgnamep": "images/gluoncv_image_p_20180924190411_b90c6ba4-bbc7-4bbf-9f8f-ee5a6a859602.jpg", "class1": "tvmonitor", "pct1": "49.070724999999996", "host": "HW13125.local", "shape": "(1, 3, 512, 896)", "end": "1537815855.105193", "te": "4.199203014373779", "battery": 100, "systemtime": "09/24/2018 15:04:15", "cpu": 33.7, "diskusage": "49939.2 MB", "memory": 60.1, "id": "20180924190411_b90c6ba4-bbc7-4bbf-9f8f-ee5a6a859602"}
Example Processed Image Output
It found one generic person, we could train against a known set of humans that are allowed in an area or are known users.
nifi flows

Gateway Server (We could skip this, but aggregating multiple camera agents is useful)
Send the Flow to the Cloud
Cloud Server Site-to-Site
After we infer the schema of the data once, we don't need it again. We could derive the schema manually or from another tool, but this is easy. Once you are done, then you can delete the InferAvroSchema processor from your flow. I left mine in for your uses if you wish to start from this flow that is attached at the end of the article.
flow steps
Route When No Error to Merge Record Then Convert Those Aggregated Apache Avro Records into One Apache ORC file.
Then store it in an HDFS directory. Once complete their will be a DDL added to metadata that you can send to a PutHiveQL or manually create the table in Beeline or Zeppelin or Hortonworks Data Analytics Studio (https://hortonworks.com/products/dataplane/data-analytics-studio/).
schema: gluoncvyolo
  1. { "type" : "record", "name" : "gluoncvyolo", "fields" : [ { "name" : "imgname", "type" : "string", "doc" : "Type inferred from '\"images/gluoncv_image_20180924211055_8f3b9dac-5645-49aa-94e7-ee5176c3f55c.jpg\"'" }, { "name" : "imgnamep", "type" : "string", "doc" : "Type inferred from '\"images/gluoncv_image_p_20180924211055_8f3b9dac-5645-49aa-94e7-ee5176c3f55c.jpg\"'" }, { "name" : "class1", "type" : "string", "doc" : "Type inferred from '\"tvmonitor\"'" }, { "name" : "pct1", "type" : "string", "doc" : "Type inferred from '\"95.71207000000001\"'" }, { "name" : "host", "type" : "string", "doc" : "Type inferred from '\"HW13125.local\"'" }, { "name" : "shape", "type" : "string", "doc" : "Type inferred from '\"(1, 3, 512, 896)\"'" }, { "name" : "end", "type" : "string", "doc" : "Type inferred from '\"1537823458.559896\"'" }, { "name" : "te", "type" : "string", "doc" : "Type inferred from '\"3.580893039703369\"'" }, { "name" : "battery", "type" : "int", "doc" : "Type inferred from '100'" }, { "name" : "systemtime", "type" : "string", "doc" : "Type inferred from '\"09/24/2018 17:10:58\"'" }, { "name" : "cpu", "type" : "double", "doc" : "Type inferred from '12.0'" }, { "name" : "diskusage", "type" : "string", "doc" : "Type inferred from '\"48082.7 MB\"'" }, { "name" : "memory", "type" : "double", "doc" : "Type inferred from '70.6'" }, { "name" : "id", "type" : "string", "doc" : "Type inferred from '\"20180924211055_8f3b9dac-5645-49aa-94e7-ee5176c3f55c\"'" } ] }
Tabular data has fields with types and properties. Let's specify those for automated analysis, conversion and live stream SQL.
hive table schema: gluoncvyolo
  1. CREATE EXTERNAL TABLE IF NOT EXISTS gluoncvyolo (imgname STRING, imgnamep STRING, class1 STRING, pct1 STRING, host STRING, shape STRING, end STRING, te STRING, battery INT, systemtime STRING, cpu DOUBLE, diskusage STRING, memory DOUBLE, id STRING) STORED AS ORC;
  2.  
Apache NiFi generates tables for me in Apache Hive 3.x as Apache ORC files for fast performance.

hive acid table schema: gluoncvyoloacid
  1. CREATE TABLE gluoncvyoloacid
  2. (imgname STRING, imgnamep STRING, class1 STRING, pct1 STRING, host STRING, shape STRING, `end` STRING, te STRING, battery INT, systemtime STRING, cpu DOUBLE, diskusage STRING, memory DOUBLE, id STRING)
  3. STORED AS ORC TBLPROPERTIES ('transactional'='true')
I can just as easily insert or update data into Hive 3.x ACID 2 tables.
We have data, now query it. Easy, no install analytics with tables, Leafletjs, AngularJS, graphs, maps and charts.
nifi flow registry
To manage version control I am using the NiFi Registry which is great. In the newest version, 0.2, there is the ability to back it up with github! It's easy. Everything you need to know is in the doc and Bryan Bend's excellent post on the subject.
There were a few gotchas for me.
  • Use your own new github project with permissions and then clone it local git clone https://github.com/tspannhw/nifi-registry-github.git
  • Make sure github directory has permission and is empty (no readme or junk)
  • Make sure you put in the full directory path
  • Update your config like below:
  1. <flowPersistenceProvider>
  2. <class>org.apache.nifi.registry.provider.flow.git.GitFlowPersistenceProvider</class>
  3. <property name="Flow Storage Directory">/Users/tspann/Documents/nifi-registry-0.2.0/conf/nifi-registry-github</property>
  4. <property name="Remote To Push">origin</property>
  5. <property name="Remote Access User">tspannhw</property>
  6. <property name="Remote Access Password">generatethis</property>
  7. </flowPersistenceProvider>
This is my github directory to hold versions: https://github.com/tspannhw/nifi-registry-github
resources:

Properties File Lookup Augmentation of Data Flow in Apache NiFi

A really cool technologist contacted me on LinkedIn and asked an interesting question
Tim,
How do I read values from a properties file and use them in my flow. I want to update/inject an attribute with this value.
If you don't want to use the Variable Registry, but want to inject a value from a properties file how to do it. You could run some REST server and read it or does some file reading hack. But we have a great service to do this very easily!
In my UpdateAttribute (or in your regular attributes already), I have an attribute named, keytofind. This contains a lookup key such as an integer or a string key. We will find that value in the properties value and give you that in an attribute of your choosing.
We have a Controller Service to handle this for you. It reads from your specified properties file. Make sure Apache NiFi has permissions to that path and can read the file.

PropertiesFileLookupService
We lookup the key specified in the “keytofind”. It returns a value that you specify as an extra attribute, mine is “updatedvalue”.
This is my properties file:
  1. -rwxrwxrwx 1 tspann staff 67 Oct 4 09:15 lookup.properties
  2.  
  3. stuff1=value1
  4. stuff2=value2
  5. stuff3=value other
  6. tim=spann
  7. nifi=cool
In this example, we are using the LookupAttribute processor. You can also use the LookupRecord processor depending on your needs.
Resources:

Scanning Documents into Data Lakes via Tesseract, MQTT, Python, JSON, Records, TensorFlow, OpenCV and Apache NiFi

There are many awesome open source tools available to integrate with your Big Data Streaming flows.
Take a look at these articles for installation and why the new version of Tesseract is different.
I am officially recommending Python 3.6 or newer. Please don't use Python 2.7 if you don't have to. Friends don't let friends use old Python.
Tesseract 4 with Deep Learning
For installation on a Mac Laptop:
  1. brew install tesseract --HEAD
  2.  
  3. pip3.6 install pytesseract
  4.  
  5. brew install leptonica
Note: if you have tesseract already, you may need to uninstall and unlink it first with brew. If you don't use brew, you can install another way.
Summary
  1. Execute the run.sh (https://github.com/tspannhw/nifi-tesseract-python/blob/master/pytesstest.py) .
  2. It will send a MQTT message of the text and some other attributes in JSON format to the tesseract topic in the specified MQTT broker.
  3. Apache NiFi will read from this topic via ConsumeMQTT
  4. The flow checks to see if it's valid JSON via RouteOnContent.
  5. We run MergeRecord to convert a bunch of JSON into one big Apache Avro File
  6. Then we run ConvertAvroToORC to make a superfast Apache ORC file for storage
  7. Then we store it in HDFS via PutHDFS
Running The Python Script
You could have this also hooked up to a scanner or point it at a directory. You could also have it scheduled to run every 30 seconds or so. I had this hooked up to a local Apache NiFi instance to schedule runs. This can also be run by MiniFi Java Agent or MiniFi C++ agent. Or on demand if you wish.
Sending MQTT Messages From Python
  1. # MQTT
  2. client = mqtt.Client()
  3. client.username_pw_set("user","pass")
  4. client.connect("server.server.com", 17769, 60)
  5. client.publish("tesseract", payload=json_string, qos=0, retain=True)
You will need to run: pip3 install paho-mqtt
Create the HDFS Directory
  1. hdfs dfs -mkdir -p /tesseract

Create the External Hive Table (DDL Built by NiFi)
  1. CREATE EXTERNAL TABLE IF NOT EXISTS tesseract (`text` STRING, imgname STRING, host STRING, `end` STRING, te STRING, battery INT, systemtime STRING, cpu DOUBLE, diskusage STRING, memory DOUBLE, id STRING) STORED AS ORC
  2. LOCATION '/tesseract';

This DDL is a side effect, it's built by our ORC conversion and HDFS storage commands.
You could run that create script in Hive View 2, Beeline or another Apache Hive JDBC/ODBC tool. I used Apache Zeppelin since I am going to be doing queries there anyway.

Let's Ingest Our Captured Images and Process Them with Apache Tika, TensorFlow and grab the metadata
Consume MQTT Records and Store in Apache Hive
Let's look at other fields in Zeppelin
Let's Look at Our Records in Apache Zeppelin via a SQL Query (SELECT *FROM TESSERACT)
ConsumeMQTT: Give me all the record from the tesseract topic from our MQTT Broker. Isolation from our ingest clients which could be 100,000 devices.
MergeRecord: Merge all the JSON files sent via MQTT into one big AVRO File
ConvertAVROToORC: converts are merged AVRO file
PutHDFS
Tesseract Example Schema in Hortonworks Schema Registry
TIP: You can generate your schema with InferAvroSchema. Do that once, copy it and paste into Schema Registry. Then you can remove that step from your flow.
The Schema Text
  1. {
  2. "type": "record",
  3. "name": "tesseract",
  4. "fields": [
  5. {
  6. "name": "text",
  7. "type": "string",
  8. "doc": "Type inferred from '\"cgi cctong aiternacrety, pou can acces the complete Pro\\nLance repesiiry from eh Provenance mens: The Provenance\\n‘emu inchades the Date/Time, Actontype, the Unsque Fowie\\nTD and other sata. Om the ar it is smal exci i oe:\\n‘ick chs icon, and you get the flowin On the right, war\\n‘cots like three inthe cic soemecaed gether Liege:\\n\\nLineage ts visualined as « lange direcnad sqycie graph (DAG) char\\nSrones the seeps 1m she Gow where modifications oF routing ‘oot\\nplace on the Aewiike. Righe-iieit « step lp the Lineage s view\\nSetusls aboot the fowtle at that step ar expand the ow to ander:\\nScand where & was potentially domed frum. Af the very bottom\\nleft of the Lineage Oi a slider wath a play button to play the pro\\n“sing flow (with scaled ame} and understand where tbe owtise\\nSpent the meat Game of at whch PORN get muted\\n\\naide the Bowtie dealin, you cam: finn deed analy of box\\n\\ntern\\n=\"'"
  9. },
  10. {
  11. "name": "imgname",
  12. "type": "string",
  13. "doc": "Type inferred from '\"images/tesseract_image_20180613205132_c14779b8-1546-433e-8976-ddb5bfc5f978.jpg\"'"
  14. },
  15. {
  16. "name": "host",
  17. "type": "string",
  18. "doc": "Type inferred from '\"HW13125.local\"'"
  19. },
  20. {
  21. "name": "end",
  22. "type": "string",
  23. "doc": "Type inferred from '\"1528923095.3205361\"'"
  24. },
  25. {
  26. "name": "te",
  27. "type": "string",
  28. "doc": "Type inferred from '\"3.7366552352905273\"'"
  29. },
  30. {
  31. "name": "battery",
  32. "type": "int",
  33. "doc": "Type inferred from '100'"
  34. },
  35. {
  36. "name": "systemtime",
  37. "type": "string",
  38. "doc": "Type inferred from '\"06/13/2018 16:51:35\"'"
  39. },
  40. {
  41. "name": "cpu",
  42. "type": "double",
  43. "doc": "Type inferred from '22.8'"
  44. },
  45. {
  46. "name": "diskusage",
  47. "type": "string",
  48. "doc": "Type inferred from '\"113759.7 MB\"'"
  49. },
  50. {
  51. "name": "memory",
  52. "type": "double",
  53. "doc": "Type inferred from '69.4'"
  54. },
  55. {
  56. "name": "id",
  57. "type": "string",
  58. "doc": "Type inferred from '\"20180613205132_c14779b8-1546-433e-8976-ddb5bfc5f978\"'"
  59. }
  60. ]
  61. }
The above schema was generated by Infer Avro Schema in Apache NiFi.
Image Analytics Results
  1. {
  2. "tiffImageWidth" : "1280",
  3. "ContentType" : "image/jpeg",
  4. "JPEGImageWidth" : "1280 pixels",
  5. "FileTypeDetectedFileTypeName" : "JPEG",
  6. "tiffBitsPerSample" : "8",
  7. "ThumbnailHeightPixels" : "0",
  8. "label4" : "book jacket",
  9. "YResolution" : "1 dot",
  10. "label5" : "pill bottle",
  11. "ImageWidth" : "1280 pixels",
  12. "JFIFYResolution" : "1 dot",
  13. "JPEGImageHeight" : "720 pixels",
  14. "filecreationTime" : "2018-06-13T17:24:07-0400",
  15. "JFIFThumbnailHeightPixels" : "0",
  16. "DataPrecision" : "8 bits",
  17. "XResolution" : "1 dot",
  18. "ImageHeight" : "720 pixels",
  19. "JPEGNumberofComponents" : "3",
  20. "JFIFXResolution" : "1 dot",
  21. "FileTypeExpectedFileNameExtension" : "jpg",
  22. "JPEGDataPrecision" : "8 bits",
  23. "FileSize" : "223716 bytes",
  24. "probability4" : "1.74%",
  25. "tiffImageLength" : "720",
  26. "probability3" : "3.29%",
  27. "probability2" : "6.13%",
  28. "probability1" : "81.23%",
  29. "FileName" : "apache-tika-2858986094088526803.tmp",
  30. "filelastAccessTime" : "2018-06-13T17:24:07-0400",
  31. "JFIFThumbnailWidthPixels" : "0",
  32. "JPEGCompressionType" : "Baseline",
  33. "JFIFVersion" : "1.1",
  34. "filesize" : "223716",
  35. "FileModifiedDate" : "Wed Jun 13 17:24:27 -04:00 2018",
  36. "Component3" : "Cr component: Quantization table 1, Sampling factors 1 horiz/1 vert",
  37. "Component1" : "Y component: Quantization table 0, Sampling factors 2 horiz/2 vert",
  38. "Component2" : "Cb component: Quantization table 1, Sampling factors 1 horiz/1 vert",
  39. "NumberofTables" : "4 Huffman tables",
  40. "FileTypeDetectedFileTypeLongName" : "Joint Photographic Experts Group",
  41. "fileowner" : "tspann",
  42. "filepermissions" : "rw-r--r--",
  43. "JPEGComponent3" : "Cr component: Quantization table 1, Sampling factors 1 horiz/1 vert",
  44. "JPEGComponent2" : "Cb component: Quantization table 1, Sampling factors 1 horiz/1 vert",
  45. "JPEGComponent1" : "Y component: Quantization table 0, Sampling factors 2 horiz/2 vert",
  46. "FileTypeDetectedMIMEType" : "image/jpeg",
  47. "NumberofComponents" : "3",
  48. "HuffmanNumberofTables" : "4 Huffman tables",
  49. "label1" : "menu",
  50. "XParsedBy" : "org.apache.tika.parser.DefaultParser, org.apache.tika.parser.ocr.TesseractOCRParser, org.apache.tika.parser.jpeg.JpegParser",
  51. "label2" : "web site",
  52. "label3" : "crossword puzzle",
  53. "absolutepath" : "/Volumes/seagate/opensourcecomputervision/images/",
  54. "filelastModifiedTime" : "2018-06-13T17:24:07-0400",
  55. "ThumbnailWidthPixels" : "0",
  56. "filegroup" : "staff",
  57. "ResolutionUnits" : "none",
  58. "JFIFResolutionUnits" : "none",
  59. "CompressionType" : "Baseline",
  60. "probability5" : "1.12%"
  61. }
This is built using a combination of Apache Tika, TensorFlow and other metadata analysis processors.