Skip to main content

Ingesting Multiple IoT Devices with Apache NiFi 1.7 and MiniFi 0.5 (GPS, Temperature, Humidity, DL Results, Images)

Part 1: Multiple Devices with Data
Keywords: Deep Learning On The Edge, GPS Ingestion, Sense-Hat and Rainbow Hat Sensor Ingest, WebCam Image Ingest
In preparation for my talk at Strata in NYC, I am updating my IoT demos for more devices, more data types and more actions.
I have three streams coming from each device including web camera images.
When we are sending data from a MiniFi agent we need to define a port on an Apache NiFi server/cluster to receive it.
So I design my MiniFi flow in the Apache NiFi UI (pretty soon there will be a special designer for this). You then highlight everything there and hit Create Template. You can then export it and convert to config.yml. Again, this process will be automated and connected with the NiFi Registry very shortly to make this less clicks.
This is an example. When you connect to it in your flow you design in Apache NiFi UI you will connect to this port on the Remote Processor Group. If you are manually editing one (okay never do this, but sometimes I have to). You can copy that ID from this Port Details and past it in the file.
Once MiniFi has it's config.yml and it's started, we will start getting messages to that Port.
You can see I have two inputs, one for Movidius and one for Rainbow. I could just have one and route to what I want. It's up to you how you want to segment these flows.
Welcome to Apache NiFi registry v0.2.0, this one works just as well. Very stable, but with some new magic. You can now connect to Git and Github!!!!
We have structured JSON, so let's Infer a schema, clean it up and store it in the Hortonworks Schema Registry. That will make it versioned and REST enabled. I add one for the each of the two JSON file types I am sending from the rainbow device. You can see the schemas in full at the bottom of the article.
The data is received from MiniFi on my local NiFi edge server for simple event processing, filtering and analysis.
I route based on the two types of files, apply their schema, do a simple filter via SQL and send the converted AVRO formatted file to my cloud hosted cluster.
Once I get the data I send it from my edge server to my cloud HDF 3.2 cluster. For images, I send them to my existing image storage processor group. For my other two types of files I convert them to Apache ORC and store in HDFS as Apache Hive tables.
Server Dashboard
Rainbow Processing
Routing is Easy
On High Humidity, Send a Slack Message (Query on humidity value)
We can dive into any flowfile as it travels through the system and examine it's data and metadata.
Now that my data is saved in HDFS with Hive tables on top I can use the latest version of Apache Zeppelin to analyze the data.
I added some maps to Zeppelin via Helium, which is now available in HDP 3.0.
I found a bunch of new chart types, this one could be insightful.

So with the latest NiFi 1.7.1 and HDP 3.0 I can do a lot of interesting things. Next up, let's run some Dockerized TensorFlow application in my HDP 3.0 cluster.
Python Scripts
Schemas

rainbow
  1. {
  2. "type": "record",
  3. "name": "rainbow",
  4. "fields": [
  5. {
  6. "name": "tempf",
  7. "type": "double",
  8. "doc": "Type inferred from '84.15'"
  9. },
  10. {
  11. "name": "cputemp",
  12. "type": "double",
  13. "doc": "Type inferred from '53.0'"
  14. },
  15. {
  16. "name": "pressure",
  17. "type": "double",
  18. "doc": "Type inferred from '101028.56'"
  19. },
  20. {
  21. "name": "host",
  22. "type": "string",
  23. "doc": "Type inferred from '\"rainbow\"'"
  24. },
  25. {
  26. "name": "uniqueid",
  27. "type": "string",
  28. "doc": "Type inferred from '\"rainbow_uuid_20180718234222\"'"
  29. },
  30. {
  31. "name": "ipaddress",
  32. "type": "string",
  33. "doc": "Type inferred from '\"192.168.1.165\"'"
  34. },
  35. {
  36. "name": "temp",
  37. "type": "double",
  38. "doc": "Type inferred from '38.58'"
  39. },
  40. {
  41. "name": "diskfree",
  42. "type": "string",
  43. "doc": "Type inferred from '\"4831.2 MB\"'"
  44. },
  45. {
  46. "name": "altitude",
  47. "type": "double",
  48. "doc": "Type inferred from '80.65'"
  49. },
  50. {
  51. "name": "ts",
  52. "type": "string",
  53. "doc": "Type inferred from '\"2018-07-18 23:42:22\"'"
  54. },
  55. {
  56. "name": "tempf2",
  57. "type": "double",
  58. "doc": "Type inferred from '28.97'"
  59. },
  60. {
  61. "name": "memory",
  62. "type": "double",
  63. "doc": "Type inferred from '32.3'"
  64. }
  65. ]
  66. }
gps
  1. {
  2. "type": "record",
  3. "name": "gps",
  4. "fields": [
  5. {
  6. "name": "speed",
  7. "type": "string",
  8. "doc": "Type inferred from '\"0.066\"'"
  9. },
  10. {
  11. "name": "diskfree",
  12. "type": "string",
  13. "doc": "Type inferred from '\"4830.3 MB\"'"
  14. },
  15. {
  16. "name": "altitude",
  17. "type": "string",
  18. "doc": "Type inferred from '\"43.0\"'"
  19. },
  20. {
  21. "name": "ts",
  22. "type": "string",
  23. "doc": "Type inferred from '\"2018-07-18 23:46:39\"'"
  24. },
  25. {
  26. "name": "cputemp",
  27. "type": "double",
  28. "doc": "Type inferred from '54.0'"
  29. },
  30. {
  31. "name": "latitude",
  32. "type": "string",
  33. "doc": "Type inferred from '\"40.2681555\"'"
  34. },
  35. {
  36. "name": "track",
  37. "type": "string",
  38. "doc": "Type inferred from '\"0.0\"'"
  39. },
  40. {
  41. "name": "memory",
  42. "type": "double",
  43. "doc": "Type inferred from '32.3'"
  44. },
  45. {
  46. "name": "host",
  47. "type": "string",
  48. "doc": "Type inferred from '\"rainbow\"'"
  49. },
  50. {
  51. "name": "uniqueid",
  52. "type": "string",
  53. "doc": "Type inferred from '\"gps_uuid_20180718234640\"'"
  54. },
  55. {
  56. "name": "ipaddress",
  57. "type": "string",
  58. "doc": "Type inferred from '\"192.168.1.165\"'"
  59. },
  60. {
  61. "name": "epd",
  62. "type": "string",
  63. "doc": "Type inferred from '\"nan\"'"
  64. },
  65. {
  66. "name": "utc",
  67. "type": "string",
  68. "doc": "Type inferred from '\"2018-07-18T23:46:40.000Z\"'"
  69. },
  70. {
  71. "name": "epx",
  72. "type": "string",
  73. "doc": "Type inferred from '\"40.135\"'"
  74. },
  75. {
  76. "name": "epy",
  77. "type": "string",
  78. "doc": "Type inferred from '\"42.783\"'"
  79. },
  80. {
  81. "name": "epv",
  82. "type": "string",
  83. "doc": "Type inferred from '\"171.35\"'"
  84. },
  85. {
  86. "name": "ept",
  87. "type": "string",
  88. "doc": "Type inferred from '\"0.005\"'"
  89. },
  90. {
  91. "name": "eps",
  92. "type": "string",
  93. "doc": "Type inferred from '\"85.57\"'"
  94. },
  95. {
  96. "name": "longitude",
  97. "type": "string",
  98. "doc": "Type inferred from '\"-74.529094\"'"
  99. },
  100. {
  101. "name": "mode",
  102. "type": "string",
  103. "doc": "Type inferred from '\"3\"'"
  104. },
  105. {
  106. "name": "time",
  107. "type": "string",
  108. "doc": "Type inferred from '\"2018-07-18T23:46:40.000Z\"'"
  109. },
  110. {
  111. "name": "climb",
  112. "type": "string",
  113. "doc": "Type inferred from '\"0.0\"'"
  114. },
  115. {
  116. "name": "epc",
  117. "type": "string",
  118. "doc": "Type inferred from '\"nan\"'"
  119. }
  120. ]
  121. }
SQL
  1. %sql
  2.  
  3.  
  4. CREATE EXTERNAL TABLE IF NOT EXISTS movidiussense (label5 STRING, runtime STRING, label1 STRING, diskfree STRING, top1 STRING, starttime STRING, label2 STRING, label3 STRING, top3pct STRING, host STRING, top5pct STRING, humidity DOUBLE, currenttime STRING, roll DOUBLE, uuid STRING, label4 STRING, tempf DOUBLE, y DOUBLE, top4pct STRING, cputemp2 DOUBLE, top5 STRING, top2pct STRING, ipaddress STRING, cputemp INT, pitch DOUBLE, x DOUBLE, z DOUBLE, yaw DOUBLE, pressure DOUBLE, top3 STRING, temp DOUBLE, memory DOUBLE, top4 STRING, imagefilename STRING, top1pct STRING, top2 STRING) STORED AS ORC LOCATION '/movidiussense'
  5.  
  6.  
  7. %sql
  8.  
  9.  
  10. CREATE EXTERNAL TABLE IF NOT EXISTS minitensorflow2 (image STRING, ts STRING, host STRING, score STRING, human_string STRING, node_id INT) STORED AS ORC LOCATION '/minifitensorflow2'
  11.  
  12. %sql
  13.  
  14.  
  15. CREATE EXTERNAL TABLE IF NOT EXISTS gps (speed STRING, diskfree STRING, altitude STRING, ts STRING, cputemp DOUBLE, latitude STRING, track STRING, memory DOUBLE, host STRING, uniqueid STRING, ipaddress STRING, epd STRING, utc STRING, epx STRING, epy STRING, epv STRING, ept STRING, eps STRING, longitude STRING, mode STRING, time STRING, climb STRING, epc STRING) STORED AS ORC LOCATION '/gps'
  16.  
  17.  
  18. %sql
  19.  
  20.  
  21. CREATE EXTERNAL TABLE IF NOT EXISTS rainbow (tempf DOUBLE, cputemp DOUBLE, pressure DOUBLE, host STRING, uniqueid STRING, ipaddress STRING, temp DOUBLE, diskfree STRING, altitude DOUBLE, ts STRING,
  22. tempf2 DOUBLE, memory DOUBLE) STORED AS ORC LOCATION '/rainbow'
  23.  
  24.  
  25.  
  26.  

References

Popular posts from this blog

Migrating Apache Flume Flows to Apache NiFi: Kafka Source to HDFS / Kudu / File / Hive

Migrating Apache Flume Flows to Apache NiFi: Kafka Source to HDFS / Kudu / File / HiveArticle 7 - https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_9.html Article 6 - https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_35.html
Article 5 - 
Article 4 - https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_8.html Article 3 - https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_7.html Article 2 - https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache.html Article 1https://www.datainmotion.dev/2019/08/migrating-apache-flume-flows-to-apache.html Source Code:  https://github.com/tspannhw/flume-to-nifi
This is one possible simple, fast replacement for "Flafka".



Consume / Publish Kafka And Store to Files, HDFS, Hive 3.1, Kudu

Consume Kafka Flow 

 Merge Records And Store As AVRO or ORC
Consume Kafka, Update Records via Machine Learning Models In CDSW And Store to Kudu

Sour…

Exploring Apache NiFi 1.10: Stateless Engine and Parameters

Exploring Apache NiFi 1.10:   Stateless Engine and Parameters Apache NiFi is now available in 1.10!
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12316020&version=12344993

You can now use JDK 8 or JDK 11!   I am running in JDK 11, seems a bit faster.

A huge feature is the addition of Parameters!   And you can use these to pass parameters to Apache NiFi Stateless!

A few lesser Processors have been moved from the main download, see here for migration hints:
https://cwiki.apache.org/confluence/display/NIFI/Migration+Guidance

Release Notes:   https://cwiki.apache.org/confluence/display/NIFI/Release+Notes#ReleaseNotes-Version1.10.0

Example Source Code:https://github.com/tspannhw/stateless-examples

More New Features:

ParquetReader/Writer (See:  https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_7.html)Prometheus Reporting Task.   Expect more Prometheus stuff coming.Experimental Encrypted content repository.   People asked me for this one before.Par…

Ingesting Drone Data From DJII Ryze Tello Drones Part 1 - Setup and Practice

Ingesting Drone Data From DJII Ryze Tello Drones Part 1 - Setup and Practice In Part 1, we will setup our drone, our communication environment, capture the data and do initial analysis. We will eventually grab live video stream for object detection, real-time flight control and real-time data ingest of photos, videos and sensor readings. We will have Apache NiFi react to live situations facing the drone and have it issue flight commands via UDP. In this initial section, we will control the drone with Python which can be triggered by NiFi. Apache NiFi will ingest log data that is stored as CSV files on a NiFi node connected to the drone's WiFi. This will eventually move to a dedicated embedded device running MiniFi. This is a small personal drone with less than 13 minutes of flight time per battery. This is not a commercial drone, but gives you an idea of the what you can do with drones. Drone Live Communications for Sensor Readings and Drone Control You must connect to the drone…