Ingesting Multiple IoT Devices with Apache NiFi 1.7 and MiniFi 0.5 (GPS, Temperature, Humidity, DL Results, Images)

Part 1: Multiple Devices with Data
Keywords: Deep Learning On The Edge, GPS Ingestion, Sense-Hat and Rainbow Hat Sensor Ingest, WebCam Image Ingest
In preparation for my talk at Strata in NYC, I am updating my IoT demos for more devices, more data types and more actions.
I have three streams coming from each device including web camera images.
When we are sending data from a MiniFi agent we need to define a port on an Apache NiFi server/cluster to receive it.
So I design my MiniFi flow in the Apache NiFi UI (pretty soon there will be a special designer for this). You then highlight everything there and hit Create Template. You can then export it and convert to config.yml. Again, this process will be automated and connected with the NiFi Registry very shortly to make this less clicks.
This is an example. When you connect to it in your flow you design in Apache NiFi UI you will connect to this port on the Remote Processor Group. If you are manually editing one (okay never do this, but sometimes I have to). You can copy that ID from this Port Details and past it in the file.
Once MiniFi has it's config.yml and it's started, we will start getting messages to that Port.
You can see I have two inputs, one for Movidius and one for Rainbow. I could just have one and route to what I want. It's up to you how you want to segment these flows.
Welcome to Apache NiFi registry v0.2.0, this one works just as well. Very stable, but with some new magic. You can now connect to Git and Github!!!!
We have structured JSON, so let's Infer a schema, clean it up and store it in the Hortonworks Schema Registry. That will make it versioned and REST enabled. I add one for the each of the two JSON file types I am sending from the rainbow device. You can see the schemas in full at the bottom of the article.
The data is received from MiniFi on my local NiFi edge server for simple event processing, filtering and analysis.
I route based on the two types of files, apply their schema, do a simple filter via SQL and send the converted AVRO formatted file to my cloud hosted cluster.
Once I get the data I send it from my edge server to my cloud HDF 3.2 cluster. For images, I send them to my existing image storage processor group. For my other two types of files I convert them to Apache ORC and store in HDFS as Apache Hive tables.
Server Dashboard
Rainbow Processing
Routing is Easy
On High Humidity, Send a Slack Message (Query on humidity value)
We can dive into any flowfile as it travels through the system and examine it's data and metadata.
Now that my data is saved in HDFS with Hive tables on top I can use the latest version of Apache Zeppelin to analyze the data.
I added some maps to Zeppelin via Helium, which is now available in HDP 3.0.
I found a bunch of new chart types, this one could be insightful.

So with the latest NiFi 1.7.1 and HDP 3.0 I can do a lot of interesting things. Next up, let's run some Dockerized TensorFlow application in my HDP 3.0 cluster.
Python Scripts
Schemas

rainbow
  1. {
  2. "type": "record",
  3. "name": "rainbow",
  4. "fields": [
  5. {
  6. "name": "tempf",
  7. "type": "double",
  8. "doc": "Type inferred from '84.15'"
  9. },
  10. {
  11. "name": "cputemp",
  12. "type": "double",
  13. "doc": "Type inferred from '53.0'"
  14. },
  15. {
  16. "name": "pressure",
  17. "type": "double",
  18. "doc": "Type inferred from '101028.56'"
  19. },
  20. {
  21. "name": "host",
  22. "type": "string",
  23. "doc": "Type inferred from '\"rainbow\"'"
  24. },
  25. {
  26. "name": "uniqueid",
  27. "type": "string",
  28. "doc": "Type inferred from '\"rainbow_uuid_20180718234222\"'"
  29. },
  30. {
  31. "name": "ipaddress",
  32. "type": "string",
  33. "doc": "Type inferred from '\"192.168.1.165\"'"
  34. },
  35. {
  36. "name": "temp",
  37. "type": "double",
  38. "doc": "Type inferred from '38.58'"
  39. },
  40. {
  41. "name": "diskfree",
  42. "type": "string",
  43. "doc": "Type inferred from '\"4831.2 MB\"'"
  44. },
  45. {
  46. "name": "altitude",
  47. "type": "double",
  48. "doc": "Type inferred from '80.65'"
  49. },
  50. {
  51. "name": "ts",
  52. "type": "string",
  53. "doc": "Type inferred from '\"2018-07-18 23:42:22\"'"
  54. },
  55. {
  56. "name": "tempf2",
  57. "type": "double",
  58. "doc": "Type inferred from '28.97'"
  59. },
  60. {
  61. "name": "memory",
  62. "type": "double",
  63. "doc": "Type inferred from '32.3'"
  64. }
  65. ]
  66. }
gps
  1. {
  2. "type": "record",
  3. "name": "gps",
  4. "fields": [
  5. {
  6. "name": "speed",
  7. "type": "string",
  8. "doc": "Type inferred from '\"0.066\"'"
  9. },
  10. {
  11. "name": "diskfree",
  12. "type": "string",
  13. "doc": "Type inferred from '\"4830.3 MB\"'"
  14. },
  15. {
  16. "name": "altitude",
  17. "type": "string",
  18. "doc": "Type inferred from '\"43.0\"'"
  19. },
  20. {
  21. "name": "ts",
  22. "type": "string",
  23. "doc": "Type inferred from '\"2018-07-18 23:46:39\"'"
  24. },
  25. {
  26. "name": "cputemp",
  27. "type": "double",
  28. "doc": "Type inferred from '54.0'"
  29. },
  30. {
  31. "name": "latitude",
  32. "type": "string",
  33. "doc": "Type inferred from '\"40.2681555\"'"
  34. },
  35. {
  36. "name": "track",
  37. "type": "string",
  38. "doc": "Type inferred from '\"0.0\"'"
  39. },
  40. {
  41. "name": "memory",
  42. "type": "double",
  43. "doc": "Type inferred from '32.3'"
  44. },
  45. {
  46. "name": "host",
  47. "type": "string",
  48. "doc": "Type inferred from '\"rainbow\"'"
  49. },
  50. {
  51. "name": "uniqueid",
  52. "type": "string",
  53. "doc": "Type inferred from '\"gps_uuid_20180718234640\"'"
  54. },
  55. {
  56. "name": "ipaddress",
  57. "type": "string",
  58. "doc": "Type inferred from '\"192.168.1.165\"'"
  59. },
  60. {
  61. "name": "epd",
  62. "type": "string",
  63. "doc": "Type inferred from '\"nan\"'"
  64. },
  65. {
  66. "name": "utc",
  67. "type": "string",
  68. "doc": "Type inferred from '\"2018-07-18T23:46:40.000Z\"'"
  69. },
  70. {
  71. "name": "epx",
  72. "type": "string",
  73. "doc": "Type inferred from '\"40.135\"'"
  74. },
  75. {
  76. "name": "epy",
  77. "type": "string",
  78. "doc": "Type inferred from '\"42.783\"'"
  79. },
  80. {
  81. "name": "epv",
  82. "type": "string",
  83. "doc": "Type inferred from '\"171.35\"'"
  84. },
  85. {
  86. "name": "ept",
  87. "type": "string",
  88. "doc": "Type inferred from '\"0.005\"'"
  89. },
  90. {
  91. "name": "eps",
  92. "type": "string",
  93. "doc": "Type inferred from '\"85.57\"'"
  94. },
  95. {
  96. "name": "longitude",
  97. "type": "string",
  98. "doc": "Type inferred from '\"-74.529094\"'"
  99. },
  100. {
  101. "name": "mode",
  102. "type": "string",
  103. "doc": "Type inferred from '\"3\"'"
  104. },
  105. {
  106. "name": "time",
  107. "type": "string",
  108. "doc": "Type inferred from '\"2018-07-18T23:46:40.000Z\"'"
  109. },
  110. {
  111. "name": "climb",
  112. "type": "string",
  113. "doc": "Type inferred from '\"0.0\"'"
  114. },
  115. {
  116. "name": "epc",
  117. "type": "string",
  118. "doc": "Type inferred from '\"nan\"'"
  119. }
  120. ]
  121. }
SQL
  1. %sql
  2.  
  3.  
  4. CREATE EXTERNAL TABLE IF NOT EXISTS movidiussense (label5 STRING, runtime STRING, label1 STRING, diskfree STRING, top1 STRING, starttime STRING, label2 STRING, label3 STRING, top3pct STRING, host STRING, top5pct STRING, humidity DOUBLE, currenttime STRING, roll DOUBLE, uuid STRING, label4 STRING, tempf DOUBLE, y DOUBLE, top4pct STRING, cputemp2 DOUBLE, top5 STRING, top2pct STRING, ipaddress STRING, cputemp INT, pitch DOUBLE, x DOUBLE, z DOUBLE, yaw DOUBLE, pressure DOUBLE, top3 STRING, temp DOUBLE, memory DOUBLE, top4 STRING, imagefilename STRING, top1pct STRING, top2 STRING) STORED AS ORC LOCATION '/movidiussense'
  5.  
  6.  
  7. %sql
  8.  
  9.  
  10. CREATE EXTERNAL TABLE IF NOT EXISTS minitensorflow2 (image STRING, ts STRING, host STRING, score STRING, human_string STRING, node_id INT) STORED AS ORC LOCATION '/minifitensorflow2'
  11.  
  12. %sql
  13.  
  14.  
  15. CREATE EXTERNAL TABLE IF NOT EXISTS gps (speed STRING, diskfree STRING, altitude STRING, ts STRING, cputemp DOUBLE, latitude STRING, track STRING, memory DOUBLE, host STRING, uniqueid STRING, ipaddress STRING, epd STRING, utc STRING, epx STRING, epy STRING, epv STRING, ept STRING, eps STRING, longitude STRING, mode STRING, time STRING, climb STRING, epc STRING) STORED AS ORC LOCATION '/gps'
  16.  
  17.  
  18. %sql
  19.  
  20.  
  21. CREATE EXTERNAL TABLE IF NOT EXISTS rainbow (tempf DOUBLE, cputemp DOUBLE, pressure DOUBLE, host STRING, uniqueid STRING, ipaddress STRING, temp DOUBLE, diskfree STRING, altitude DOUBLE, ts STRING,
  22. tempf2 DOUBLE, memory DOUBLE) STORED AS ORC LOCATION '/rainbow'
  23.  
  24.  
  25.  
  26.  

References

Using Apache NiFi with Apache MXNet GluonCV for YOLO 3 Deep Learning Workflows

Using GluonCV 0.3 with Apache MXNet 1.3
source code:
*Captured and Processed Image Available for Viewing in Stream in Apache NiFi 1.7.x
use case:
I need to easily monitor the contents of my security vault. It is a fixed number of known things.
What we need in the real world is a nice camera(s) (maybe four to eight depending on angles of the room), a device like an NVidia Jetson TX2, MiniFi 0.5 Java Agent, JDK 8, Apache MXNet, GluonCV, Lots of Python Libraries, a network connection and a simple workflow. Outside of my vault, I will need a server(s) or clusters to do the more advanced processing, though I could run it all on the local box. If the number of items or certain items I am watching are no longer in the screen, then we should send an immediate alert. That could be to an SMS, Email, Slack, Alert System or other means. We had most of that implemented below. If anyone wants to do the complete use case I can assist.
demo implementation:
I wanted to use the new YOLO 3 model which is part of the new 0.3 stream, so I installed a 0.3. This may be final by the time you read this. You can try to do a regular pip3.6 install -U gluoncv and see what you get.
  1. pip3.6 install -U gluoncv==0.3.0b20180924
Yolo v3 is a great pretrained model to use for object detection.
See: https://gluon-cv.mxnet.io/build/examples_detection/demo_yolo.html
The GluonCV Model Zoo is very rich and incredibly easy to use. So we just grab the model "yolo3_darknet53_voc" with an automatic one time download and we are ready to go. They provide easy to customize code to start with. I write my processed image and JSON results out for ingest by Apache NiFi. You will notice this is similar to what we did for the Open Computer Vision talks: https://community.hortonworks.com/articles/198939/using-apache-mxnet-gluoncv-with-apache-nifi-for-de.html
This is updated and even easier. I dropped the MQTT and just output image files and some JSON to read.
GluonCV makes working with Computer Vision extremely clean and easy.
why Apache NiFi For Deep Learning Workflows
Let me count the top five ways:
#1 Provenance - lets me see everything, everywhere, all the time with the data and the metadata.
#2 Configurable Queues - queues are everywhere and they are extremely configurable on size and priority. There's always backpressure and safety between every step. Sinks, Sources and steps can be offline as things happen in the real-world internet. Offline, online, wherever, I can recover and have full visibility into my flows as they spread between devices, servers, networks, clouds and nation-states.
#3 Security - secure at every level from SSL and data encryption. Integration with leading edge tools including Apache Knox, Apache Ranger and Apache Atlas. See: https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.1.1/bk_security/content/ch_enabling-knox-for-nifi.html
#4 UI - a simple UI to develop, monitor and manage incredibly complex flows including IoT, Deep Learning, Logs and every data source you can throw at it.
#5 Agents - MiniFi gives me two different agents for my devices or systems to stream data headless.
running gluoncv yolo3 model
I wrap my Python script in a shell script to throw away warnings and junk
  1. cd /Volumes/TSPANN/2018/talks/ApacheDeepLearning101/nifi-gluoncv-yolo3
  2. python3.6 -W ignore /Volumes/TSPANN/2018/talks/ApacheDeepLearning101/nifi-gluoncv-yolo3/yolonifi.py 2>/dev/null
List of Possible Objects We Can Detect
  1. ["aeroplane", "bicycle", "bird", "boat", "bottle", "bus", "car", "cat", "chair", "cow",
  2. "diningtable", "dog", "horse", "motorbike", "person", "pottedplant", "sheep", "sofa", "train",
  3. "tvmonitor"]
I am going to train this with my own data for the upcoming INTERNET OF BEER, for the vault use case we would need your vault content pictures.
See: https://gluon-cv.mxnet.io/build/examples_datasets/detection_custom.html#sphx-glr-build-examples-datasets-detection-custom-py
Example Output in JSON
  1. {"imgname": "images/gluoncv_image_20180924190411_b90c6ba4-bbc7-4bbf-9f8f-ee5a6a859602.jpg", "imgnamep": "images/gluoncv_image_p_20180924190411_b90c6ba4-bbc7-4bbf-9f8f-ee5a6a859602.jpg", "class1": "tvmonitor", "pct1": "49.070724999999996", "host": "HW13125.local", "shape": "(1, 3, 512, 896)", "end": "1537815855.105193", "te": "4.199203014373779", "battery": 100, "systemtime": "09/24/2018 15:04:15", "cpu": 33.7, "diskusage": "49939.2 MB", "memory": 60.1, "id": "20180924190411_b90c6ba4-bbc7-4bbf-9f8f-ee5a6a859602"}
Example Processed Image Output
It found one generic person, we could train against a known set of humans that are allowed in an area or are known users.
nifi flows

Gateway Server (We could skip this, but aggregating multiple camera agents is useful)
Send the Flow to the Cloud
Cloud Server Site-to-Site
After we infer the schema of the data once, we don't need it again. We could derive the schema manually or from another tool, but this is easy. Once you are done, then you can delete the InferAvroSchema processor from your flow. I left mine in for your uses if you wish to start from this flow that is attached at the end of the article.
flow steps
Route When No Error to Merge Record Then Convert Those Aggregated Apache Avro Records into One Apache ORC file.
Then store it in an HDFS directory. Once complete their will be a DDL added to metadata that you can send to a PutHiveQL or manually create the table in Beeline or Zeppelin or Hortonworks Data Analytics Studio (https://hortonworks.com/products/dataplane/data-analytics-studio/).
schema: gluoncvyolo
  1. { "type" : "record", "name" : "gluoncvyolo", "fields" : [ { "name" : "imgname", "type" : "string", "doc" : "Type inferred from '\"images/gluoncv_image_20180924211055_8f3b9dac-5645-49aa-94e7-ee5176c3f55c.jpg\"'" }, { "name" : "imgnamep", "type" : "string", "doc" : "Type inferred from '\"images/gluoncv_image_p_20180924211055_8f3b9dac-5645-49aa-94e7-ee5176c3f55c.jpg\"'" }, { "name" : "class1", "type" : "string", "doc" : "Type inferred from '\"tvmonitor\"'" }, { "name" : "pct1", "type" : "string", "doc" : "Type inferred from '\"95.71207000000001\"'" }, { "name" : "host", "type" : "string", "doc" : "Type inferred from '\"HW13125.local\"'" }, { "name" : "shape", "type" : "string", "doc" : "Type inferred from '\"(1, 3, 512, 896)\"'" }, { "name" : "end", "type" : "string", "doc" : "Type inferred from '\"1537823458.559896\"'" }, { "name" : "te", "type" : "string", "doc" : "Type inferred from '\"3.580893039703369\"'" }, { "name" : "battery", "type" : "int", "doc" : "Type inferred from '100'" }, { "name" : "systemtime", "type" : "string", "doc" : "Type inferred from '\"09/24/2018 17:10:58\"'" }, { "name" : "cpu", "type" : "double", "doc" : "Type inferred from '12.0'" }, { "name" : "diskusage", "type" : "string", "doc" : "Type inferred from '\"48082.7 MB\"'" }, { "name" : "memory", "type" : "double", "doc" : "Type inferred from '70.6'" }, { "name" : "id", "type" : "string", "doc" : "Type inferred from '\"20180924211055_8f3b9dac-5645-49aa-94e7-ee5176c3f55c\"'" } ] }
Tabular data has fields with types and properties. Let's specify those for automated analysis, conversion and live stream SQL.
hive table schema: gluoncvyolo
  1. CREATE EXTERNAL TABLE IF NOT EXISTS gluoncvyolo (imgname STRING, imgnamep STRING, class1 STRING, pct1 STRING, host STRING, shape STRING, end STRING, te STRING, battery INT, systemtime STRING, cpu DOUBLE, diskusage STRING, memory DOUBLE, id STRING) STORED AS ORC;
  2.  
Apache NiFi generates tables for me in Apache Hive 3.x as Apache ORC files for fast performance.

hive acid table schema: gluoncvyoloacid
  1. CREATE TABLE gluoncvyoloacid
  2. (imgname STRING, imgnamep STRING, class1 STRING, pct1 STRING, host STRING, shape STRING, `end` STRING, te STRING, battery INT, systemtime STRING, cpu DOUBLE, diskusage STRING, memory DOUBLE, id STRING)
  3. STORED AS ORC TBLPROPERTIES ('transactional'='true')
I can just as easily insert or update data into Hive 3.x ACID 2 tables.
We have data, now query it. Easy, no install analytics with tables, Leafletjs, AngularJS, graphs, maps and charts.
nifi flow registry
To manage version control I am using the NiFi Registry which is great. In the newest version, 0.2, there is the ability to back it up with github! It's easy. Everything you need to know is in the doc and Bryan Bend's excellent post on the subject.
There were a few gotchas for me.
  • Use your own new github project with permissions and then clone it local git clone https://github.com/tspannhw/nifi-registry-github.git
  • Make sure github directory has permission and is empty (no readme or junk)
  • Make sure you put in the full directory path
  • Update your config like below:
  1. <flowPersistenceProvider>
  2. <class>org.apache.nifi.registry.provider.flow.git.GitFlowPersistenceProvider</class>
  3. <property name="Flow Storage Directory">/Users/tspann/Documents/nifi-registry-0.2.0/conf/nifi-registry-github</property>
  4. <property name="Remote To Push">origin</property>
  5. <property name="Remote Access User">tspannhw</property>
  6. <property name="Remote Access Password">generatethis</property>
  7. </flowPersistenceProvider>
This is my github directory to hold versions: https://github.com/tspannhw/nifi-registry-github
resources:

Properties File Lookup Augmentation of Data Flow in Apache NiFi

A really cool technologist contacted me on LinkedIn and asked an interesting question
Tim,
How do I read values from a properties file and use them in my flow. I want to update/inject an attribute with this value.
If you don't want to use the Variable Registry, but want to inject a value from a properties file how to do it. You could run some REST server and read it or does some file reading hack. But we have a great service to do this very easily!
In my UpdateAttribute (or in your regular attributes already), I have an attribute named, keytofind. This contains a lookup key such as an integer or a string key. We will find that value in the properties value and give you that in an attribute of your choosing.
We have a Controller Service to handle this for you. It reads from your specified properties file. Make sure Apache NiFi has permissions to that path and can read the file.

PropertiesFileLookupService
We lookup the key specified in the “keytofind”. It returns a value that you specify as an extra attribute, mine is “updatedvalue”.
This is my properties file:
  1. -rwxrwxrwx 1 tspann staff 67 Oct 4 09:15 lookup.properties
  2.  
  3. stuff1=value1
  4. stuff2=value2
  5. stuff3=value other
  6. tim=spann
  7. nifi=cool
In this example, we are using the LookupAttribute processor. You can also use the LookupRecord processor depending on your needs.
Resources: