Ingesting Multiple IoT Devices with Apache NiFi 1.7 and MiniFi 0.5 (GPS, Temperature, Humidity, DL Results, Images)

Part 1: Multiple Devices with Data
Keywords: Deep Learning On The Edge, GPS Ingestion, Sense-Hat and Rainbow Hat Sensor Ingest, WebCam Image Ingest
In preparation for my talk at Strata in NYC, I am updating my IoT demos for more devices, more data types and more actions.
I have three streams coming from each device including web camera images.
When we are sending data from a MiniFi agent we need to define a port on an Apache NiFi server/cluster to receive it.
So I design my MiniFi flow in the Apache NiFi UI (pretty soon there will be a special designer for this). You then highlight everything there and hit Create Template. You can then export it and convert to config.yml. Again, this process will be automated and connected with the NiFi Registry very shortly to make this less clicks.
This is an example. When you connect to it in your flow you design in Apache NiFi UI you will connect to this port on the Remote Processor Group. If you are manually editing one (okay never do this, but sometimes I have to). You can copy that ID from this Port Details and past it in the file.
Once MiniFi has it's config.yml and it's started, we will start getting messages to that Port.
You can see I have two inputs, one for Movidius and one for Rainbow. I could just have one and route to what I want. It's up to you how you want to segment these flows.
Welcome to Apache NiFi registry v0.2.0, this one works just as well. Very stable, but with some new magic. You can now connect to Git and Github!!!!
We have structured JSON, so let's Infer a schema, clean it up and store it in the Hortonworks Schema Registry. That will make it versioned and REST enabled. I add one for the each of the two JSON file types I am sending from the rainbow device. You can see the schemas in full at the bottom of the article.
The data is received from MiniFi on my local NiFi edge server for simple event processing, filtering and analysis.
I route based on the two types of files, apply their schema, do a simple filter via SQL and send the converted AVRO formatted file to my cloud hosted cluster.
Once I get the data I send it from my edge server to my cloud HDF 3.2 cluster. For images, I send them to my existing image storage processor group. For my other two types of files I convert them to Apache ORC and store in HDFS as Apache Hive tables.
Server Dashboard
Rainbow Processing
Routing is Easy
On High Humidity, Send a Slack Message (Query on humidity value)
We can dive into any flowfile as it travels through the system and examine it's data and metadata.
Now that my data is saved in HDFS with Hive tables on top I can use the latest version of Apache Zeppelin to analyze the data.
I added some maps to Zeppelin via Helium, which is now available in HDP 3.0.
I found a bunch of new chart types, this one could be insightful.

So with the latest NiFi 1.7.1 and HDP 3.0 I can do a lot of interesting things. Next up, let's run some Dockerized TensorFlow application in my HDP 3.0 cluster.
Python Scripts
Schemas

rainbow
  1. {
  2. "type": "record",
  3. "name": "rainbow",
  4. "fields": [
  5. {
  6. "name": "tempf",
  7. "type": "double",
  8. "doc": "Type inferred from '84.15'"
  9. },
  10. {
  11. "name": "cputemp",
  12. "type": "double",
  13. "doc": "Type inferred from '53.0'"
  14. },
  15. {
  16. "name": "pressure",
  17. "type": "double",
  18. "doc": "Type inferred from '101028.56'"
  19. },
  20. {
  21. "name": "host",
  22. "type": "string",
  23. "doc": "Type inferred from '\"rainbow\"'"
  24. },
  25. {
  26. "name": "uniqueid",
  27. "type": "string",
  28. "doc": "Type inferred from '\"rainbow_uuid_20180718234222\"'"
  29. },
  30. {
  31. "name": "ipaddress",
  32. "type": "string",
  33. "doc": "Type inferred from '\"192.168.1.165\"'"
  34. },
  35. {
  36. "name": "temp",
  37. "type": "double",
  38. "doc": "Type inferred from '38.58'"
  39. },
  40. {
  41. "name": "diskfree",
  42. "type": "string",
  43. "doc": "Type inferred from '\"4831.2 MB\"'"
  44. },
  45. {
  46. "name": "altitude",
  47. "type": "double",
  48. "doc": "Type inferred from '80.65'"
  49. },
  50. {
  51. "name": "ts",
  52. "type": "string",
  53. "doc": "Type inferred from '\"2018-07-18 23:42:22\"'"
  54. },
  55. {
  56. "name": "tempf2",
  57. "type": "double",
  58. "doc": "Type inferred from '28.97'"
  59. },
  60. {
  61. "name": "memory",
  62. "type": "double",
  63. "doc": "Type inferred from '32.3'"
  64. }
  65. ]
  66. }
gps
  1. {
  2. "type": "record",
  3. "name": "gps",
  4. "fields": [
  5. {
  6. "name": "speed",
  7. "type": "string",
  8. "doc": "Type inferred from '\"0.066\"'"
  9. },
  10. {
  11. "name": "diskfree",
  12. "type": "string",
  13. "doc": "Type inferred from '\"4830.3 MB\"'"
  14. },
  15. {
  16. "name": "altitude",
  17. "type": "string",
  18. "doc": "Type inferred from '\"43.0\"'"
  19. },
  20. {
  21. "name": "ts",
  22. "type": "string",
  23. "doc": "Type inferred from '\"2018-07-18 23:46:39\"'"
  24. },
  25. {
  26. "name": "cputemp",
  27. "type": "double",
  28. "doc": "Type inferred from '54.0'"
  29. },
  30. {
  31. "name": "latitude",
  32. "type": "string",
  33. "doc": "Type inferred from '\"40.2681555\"'"
  34. },
  35. {
  36. "name": "track",
  37. "type": "string",
  38. "doc": "Type inferred from '\"0.0\"'"
  39. },
  40. {
  41. "name": "memory",
  42. "type": "double",
  43. "doc": "Type inferred from '32.3'"
  44. },
  45. {
  46. "name": "host",
  47. "type": "string",
  48. "doc": "Type inferred from '\"rainbow\"'"
  49. },
  50. {
  51. "name": "uniqueid",
  52. "type": "string",
  53. "doc": "Type inferred from '\"gps_uuid_20180718234640\"'"
  54. },
  55. {
  56. "name": "ipaddress",
  57. "type": "string",
  58. "doc": "Type inferred from '\"192.168.1.165\"'"
  59. },
  60. {
  61. "name": "epd",
  62. "type": "string",
  63. "doc": "Type inferred from '\"nan\"'"
  64. },
  65. {
  66. "name": "utc",
  67. "type": "string",
  68. "doc": "Type inferred from '\"2018-07-18T23:46:40.000Z\"'"
  69. },
  70. {
  71. "name": "epx",
  72. "type": "string",
  73. "doc": "Type inferred from '\"40.135\"'"
  74. },
  75. {
  76. "name": "epy",
  77. "type": "string",
  78. "doc": "Type inferred from '\"42.783\"'"
  79. },
  80. {
  81. "name": "epv",
  82. "type": "string",
  83. "doc": "Type inferred from '\"171.35\"'"
  84. },
  85. {
  86. "name": "ept",
  87. "type": "string",
  88. "doc": "Type inferred from '\"0.005\"'"
  89. },
  90. {
  91. "name": "eps",
  92. "type": "string",
  93. "doc": "Type inferred from '\"85.57\"'"
  94. },
  95. {
  96. "name": "longitude",
  97. "type": "string",
  98. "doc": "Type inferred from '\"-74.529094\"'"
  99. },
  100. {
  101. "name": "mode",
  102. "type": "string",
  103. "doc": "Type inferred from '\"3\"'"
  104. },
  105. {
  106. "name": "time",
  107. "type": "string",
  108. "doc": "Type inferred from '\"2018-07-18T23:46:40.000Z\"'"
  109. },
  110. {
  111. "name": "climb",
  112. "type": "string",
  113. "doc": "Type inferred from '\"0.0\"'"
  114. },
  115. {
  116. "name": "epc",
  117. "type": "string",
  118. "doc": "Type inferred from '\"nan\"'"
  119. }
  120. ]
  121. }
SQL
  1. %sql
  2.  
  3.  
  4. CREATE EXTERNAL TABLE IF NOT EXISTS movidiussense (label5 STRING, runtime STRING, label1 STRING, diskfree STRING, top1 STRING, starttime STRING, label2 STRING, label3 STRING, top3pct STRING, host STRING, top5pct STRING, humidity DOUBLE, currenttime STRING, roll DOUBLE, uuid STRING, label4 STRING, tempf DOUBLE, y DOUBLE, top4pct STRING, cputemp2 DOUBLE, top5 STRING, top2pct STRING, ipaddress STRING, cputemp INT, pitch DOUBLE, x DOUBLE, z DOUBLE, yaw DOUBLE, pressure DOUBLE, top3 STRING, temp DOUBLE, memory DOUBLE, top4 STRING, imagefilename STRING, top1pct STRING, top2 STRING) STORED AS ORC LOCATION '/movidiussense'
  5.  
  6.  
  7. %sql
  8.  
  9.  
  10. CREATE EXTERNAL TABLE IF NOT EXISTS minitensorflow2 (image STRING, ts STRING, host STRING, score STRING, human_string STRING, node_id INT) STORED AS ORC LOCATION '/minifitensorflow2'
  11.  
  12. %sql
  13.  
  14.  
  15. CREATE EXTERNAL TABLE IF NOT EXISTS gps (speed STRING, diskfree STRING, altitude STRING, ts STRING, cputemp DOUBLE, latitude STRING, track STRING, memory DOUBLE, host STRING, uniqueid STRING, ipaddress STRING, epd STRING, utc STRING, epx STRING, epy STRING, epv STRING, ept STRING, eps STRING, longitude STRING, mode STRING, time STRING, climb STRING, epc STRING) STORED AS ORC LOCATION '/gps'
  16.  
  17.  
  18. %sql
  19.  
  20.  
  21. CREATE EXTERNAL TABLE IF NOT EXISTS rainbow (tempf DOUBLE, cputemp DOUBLE, pressure DOUBLE, host STRING, uniqueid STRING, ipaddress STRING, temp DOUBLE, diskfree STRING, altitude DOUBLE, ts STRING,
  22. tempf2 DOUBLE, memory DOUBLE) STORED AS ORC LOCATION '/rainbow'
  23.  
  24.  
  25.  
  26.  

References