Skip to main content

Edge to AI: Apache Spark, Apache NiFi, Apache NiFi MiNiFi, Cloudera Data Science Workbench Example



Use Case
IoT Devices with Sensors, Cameras
Overview
In this, the third of the CDSW series, we build on using CDSW to classify images with a Python Apache MXNet model. In this use case we are receiving edge data from devices running MiniFi Agents that are collecting sensor data, images and also running edge analytics with TensorFlow. An Apache NiFi server collects this data with full data lineage using HTTP calls from the device(s). We then filter, transform, merge and route the sensor data, image data, deep learning analytics data and metadata to different data stores. As part of the flow we upload our images to a cloud hosted FTP server (could be S3 or any media store anywhere) and call a CDSW Model from Apache NiFi via REST and get the model results back as JSON. We are also storing our sensor data in Parquet files in HDFS. We then trigger a PySpark job from CDSW via API from Apache NiFi and check the status of that. We store the status result data in Parquet as well for PySpark SQL analysis.
For additional steps we can join together the image and sensor data via image name and do additional queries, reports and dashboards.
We can also route this data to Apache Kafka for downstream analysis in Kafka Streams, Storm, Spark Streaming or SAM.
Summary
MiniFi Java Agents read sensor values and feed them to Apache NiFi via HTTPS with full data provenance and lineage. Apache NiFi acts as master orchestrator conducting, filtering, transforming, converting, querying, aggregating, routing and cleansing the streams. As part of the flow we call Cloudera Data Science Workbench via REST API to classify ingested images via an Apache MXNet Python GluonCV Yolo model. We also call a Spark job to process ingested Parquet files stored in HDFS loaded from the related sensor and metadata. The Pyspark jobs are triggered from Apache NiFi via REST API calls to Cloudera Data Science Workbench's jobs api.
For this particular integration I am using a self-built Apache 1.9, Apache NiFi - MiniFi Java Agent 0.5.0, Cloudera Data Science Workbench 1.5 for HDP, HDFS, Apache Spark 2, Python 3, PySpark and Parquet.
Overall Apache NiFi Flow
Workflow walk-thru
For Images, we transmit the images to an FTP server, run them through an Inception classifier (TensorFlow NiFi Processor) and extract those results plus metadata for future uses.
For Sensor Data, we merge it, convert to Parquet and store the files. We also store it to HBase and send alerts to a slack channel. When we are complete we trigger an Apache Spark PySpark SQL job via CDSW. This job can email us a report and has nice dashboards to see your job run. We also clean up, filter, flatten and merge with JSON status as Parquet files for future analysis with PySpark SQL.
We must set Content-Type for application/json, send an empty message body, no chunk encoding and you can turn on Always Output response.
We need to cleanup and remove some fields from the status returned. Jolt works magic on JSON.
Setting up FTP is easy.
Here is what some of the sensor data looks like while in motion.
We setup a job in CDSW very easily from an existing Python file.
After we have run the job a few times we get a nice graph of run duration for our Job history.
You can see details of the run including the session and the results.
When the job is running you can see it in process and all the completed runs.
We can query our data with Pyspark Dataframes for simple output.
we can display the schema.
We can use Pandas for a nicer table display of the data.
Load Data Manually
We can have Apache NiFi push to HDFS directly for us. To load data manually in Cloudera DSW after uploading the files to a directory in CDSW:

  1. # To Load Data Created By niFi

  2.  

  3. !hdfs dfs -mkdir /tmp/status

  4. !hdfs dfs -put status/*.parquet /tmp/status

  5. !hdfs dfs -ls /tmp/status!hdfs dfs -mkdir /tmp/sensors

  6. !hdfs dfs -put sensors/*.parquet /tmp/sensors

  7. !hdfs dfs -ls /tmp/sensors

  8.  


Source Code
Jolt To Cleanup CDSW Status JSON

  1. [{

  2. "operation": "shift",

  3. "spec": { "*.*": "&(0,1)_&(0,2)",

  4. "*.*.*": "&(0,1)_&(0,2)_&(0,3)",

  5. "*.*.*.*": "&(0,1)_&(0,2)_&(0,3)_&(0,4)", "*": "&" } },

  6. { "operation": "remove",

  7. "spec": { "environment": "", "environment*": "", "latest_k8s": "",

  8. "report_attachments": "" }}]


We remove the arrays, remove some unwanted fields and flatten the data for easy querying. We then convert to Apache Avro and store as Apache Parquet files for querying with Pyspark.
URL to Start a Cloudera Data Science Workbench Job
as Per:
What Does the IoT Data Look Like?
{ "uuid" : "20190213043439_e58bee05-142b-4b7e-a28b-fec0305ab125", "BH1745_clear" : "0.0", "te" : "601.1575453281403", "host" : "piups", "BH1745_blue" : "0.0", "imgname" : "/opt/demo/images/bog_image_20190213043439_e58bee05-142b-4b7e-a28b-fec0305ab125.jpg", "lsm303d_accelerometer" : "+00.08g : -01.01g : +00.09g", "cputemp" : 44, "systemtime" : "02/12/2019 23:34:39", "memory" : 45.7, "bme680_tempc" : "23.97", "imgnamep" : "/opt/demo/images/bog_image_p_20190213043439_e58bee05-142b-4b7e-a28b-fec0305ab125.jpg", "bme680_pressure" : "1000.91", "BH1745_red" : "0.0", "bme680_tempf" : "75.15", "diskusage" : "9622.5", "ltr559_lux" : "000.00", "bme680_humidity" : "24.678", "lsm303d_magnetometer" : "+00.03 : +00.42 : -00.11", "BH1745_green" : "0.0", "ipaddress" : "192.168.1.166", "starttime" : "02/12/2019 23:24:38", "ltr559_prox" : "0000", "VL53L1X_distance_in_mm" : 553, "end" : "1550032479.3900714" }
What Does the TensorFlow Image Analytics Data Look Like?
{"probability_4":"2.00%","file.group":"root", "s2s.address":"192.168.1.166:60966", "probability_5":"1.90%","file.lastModifiedTime":"2019-02-12T18:02:21-0500", "probability_2":"3.14%","probability_3":"2.35%","probability_1":"3.40%", "file.permissions":"rw-r--r--","uuid":"0596aa5f-325b-4bd2-ae80-6c7561c8c056", "absolute.path":"/opt/demo/images/","path":"/","label_5":"fountain", "label_4":"lampshade","filename":"bog_image_20190212230221_00c846a7-b8d2-4192-b8eb-f6f13268483c.jpg", "label_3":"breastplate","s2s.host":"192.168.1.166","file.creationTime":"2019-02-12T18:02:21-0500", "file.lastAccessTime":"2019-02-12T18:02:21-0500", "file.owner":"root", "label_2":"spotlight", "label_1":"coffeepot", "RouteOnAttribute.Route":"isImage"}
Transformed Job Status Data
{ "id" : 4, "name" : "Pyspark SQL Job", "script" : "pysparksqljob.py", "cpu" : 2, "memory" : 4, "nvidia_gpu" : 0, "engine_image_id" : 7, "kernel" : "python3", "englishSchedule" : "", "timezone" : "America/New_York", "total_runs" : 108, "total_failures" : 0, "paused" : false, "type" : "manual", "creator_id" : 19, "creator_username" : "tspann", "creator_name" : "Timothy Spann", "creator_email" : "tspann@EmailIsland.Space", "creator_url" : "http://cdsw-hdp-3/api/v1/users/tspann", "creator_html_url" : "http://cdsw-hdp-3/tspann", "project_id" : 30, "project_slug" : "tspann/future-of-data-meetup-princeton-12-feb-2019", "project_name" : "Future of Data Meetup Princeton 12 Feb 2019", "project_owner_id" : 19, "project_owner_username" : "tspann", "project_owner_email" : "tspann@email.tu", "project_owner_name" : "Timothy Spann", "project_owner_url" : "http://cdsw-hdp-3/api/v1/users/tspann", "project_owner_html_url" : "http://cdsw-hdp/tspann", "project_url" : "http://cdsw-hdp-3/api/v1/projects/tspann/future-of-data-meetup-princeton-12-feb-2019", "project_html_url" : "http://cdsw-hdp-3/tspann/future-of-data-meetup-princeton-12-feb-2019", "latest_id" : "jq47droa9zv9ou0j", "latest_batch" : true, "latest_job_id" : 4, "latest_status" : "scheduling", "latest_oomKilled" : false, "latest_created_at" : "2019-02-13T13:04:28.961Z", "latest_scheduling_at" : "2019-02-13T13:04:28.961Z", "latest_url" : "http://server/api/v1/projects/tspann/future-of-data-meetup-princeton-12-feb-2019/dashboards/jq47droa9zv9ou0j", "latest_html_url" : "http://server/tspann/future-of-data-meetup-princeton-12-feb-2019/engines/jq47droa9zv9ou0j", "latest_shared_view_visibility" : "private", "report_include_logs" : true, "report_send_from_creator" : false, "timeout" : 30, "timeout_kill" : false, "created_at" : "2019-02-13T04:46:26.597Z", "updated_at" : "2019-02-13T04:46:26.597Z", "shared_view_visibility" : "private", "url" : "http://serverapi/v1/projects/tspann/future-of-data-meetup-princeton-12-feb-2019/jobs/4", "html_url" : "http://server/tspann/future-of-data-meetup-princeton-12-feb-2019/jobs/4", "engine_id" : "jq47droa9zv9ou0j" }
PySpark Sensor Spark SQL for Data Analysis

  1. from __future__ import print_function

  2. import pandas as pd

  3. import sys, re

  4. from operator import add

  5. from pyspark.sql import SparkSession

  6. pd.options.display.html.table_schema = True

  7. spark = SparkSession\

  8. .builder\

  9. .appName("Sensors")\

  10. .getOrCreate()

  11. # Access the parquet

  12. sensor = spark.read.parquet("/tmp/sensors/*.parquet")

  13. data = sensor.toPandas()

  14. pd.DataFrame(data)

  15. spark.stop()


PySpark Status Spark SQL for Data Analysis


  1. from __future__ import print_function

  2. import pandas as pd

  3. import sys, re

  4. from operator import add

  5. from pyspark.sql

  6. import SparkSession

  7. pd.options.display.html.table_schema = True

  8. spark = SparkSession\

  9. .builder\

  10. .appName("Status")\

  11. .getOrCreate()

  12. # Access the parquet

  13. sensor = spark.read.parquet("/tmp/status/*.parquet")

  14. # show content

  15. sensor.show()

  16. # query

  17. #

  18. sensor.select(sensor['bme680_humidity'], sensor['bme680_tempf'], sensor['lsm303d_magnetometer']).show()

  19. sensor.printSchema()sensor.count()

  20. data = sensor.toPandas()pd.DataFrame(data)

  21. spark.stop()



Status Schema (jobstatus)

  1. {

  2. "type":"record",

  3. "name":"jobstatus",

  4. "fields":[

  5. {

  6. "name":"id",

  7. "type":["int","null"]

  8. },

  9. {

  10. "name":"name",

  11. "type":["string","null"]

  12. },

  13. {

  14. "name":"script",

  15. "type":["string","null"]

  16. },

  17. {

  18. "name":"cpu",

  19. "type":["int","null"]

  20. },

  21. {

  22. "name":"memory",

  23. "type":["int","null"]

  24. },

  25. {

  26. "name":"nvidia_gpu",

  27. "type":["int","null"]

  28. },

  29. {

  30. "name":"engine_image_id",

  31. "type":["int","null"]

  32. },

  33. {

  34. "name":"kernel",

  35. "type":["string","null"]

  36. },

  37. {

  38. "name":"englishSchedule",

  39. "type":["string","null"]

  40. },

  41. {

  42. "name":"timezone",

  43. "type":["string","null"]

  44. },

  45. {

  46. "name":"total_runs",

  47. "type":["int","null"]

  48. },

  49. {

  50. "name":"total_failures",

  51. "type":["int","null"],

  52. "doc":"Type inferred from '0'"

  53. },

  54. {

  55. "name":"paused",

  56. "type":["boolean","null"],

  57. "doc":"Type inferred from 'false'"

  58. },

  59. {

  60. "name":"type",

  61. "type":["string","null"],

  62. "doc":"Type inferred from '\"manual\"'"

  63. },

  64. {

  65. "name":"creator_id",

  66. "type":["int","null"],

  67. "doc":"Type inferred from '19'"

  68. },

  69. {

  70. "name":"creator_username",

  71. "type":["string","null"]

  72. },

  73. {

  74. "name":"creator_name",

  75. "type":["string","null"]

  76. },

  77. {

  78. "name":"creator_email",

  79. "type":["string","null"]

  80. },

  81. {

  82. "name":"creator_url",

  83. "type":["string","null"]

  84. },

  85. {

  86. "name":"creator_html_url",

  87. "type":["string","null"]

  88. },

  89. {

  90. "name":"project_id",

  91. "type":["int","null"]

  92. },

  93. {

  94. "name":"project_slug",

  95. "type":["string","null"]

  96. },

  97. {

  98. "name":"project_name",

  99. "type":["string","null"]

  100. },

  101. {

  102. "name":"project_owner_id",

  103. "type":["int","null"]

  104. },

  105. {

  106. "name":"project_owner_username",

  107. "type":["string","null"]

  108. },

  109. {

  110. "name":"project_owner_email",

  111. "type":["string","null"]

  112. },

  113. {

  114. "name":"project_owner_name",

  115. "type":["string","null"]

  116. },

  117. {

  118. "name":"project_owner_url",

  119. "type":["string","null"]

  120. },

  121. {

  122. "name":"project_owner_html_url",

  123. "type":["string","null"]

  124. },

  125. {

  126. "name":"project_url",

  127. "type":["string","null"]

  128. },

  129. {

  130. "name":"project_html_url",

  131. "type":["string","null"]

  132. },

  133. {

  134. "name":"latest_id",

  135. "type":["string","null"]

  136. },

  137. {

  138. "name":"latest_batch",

  139. "type":["boolean","null"]

  140. },

  141. {

  142. "name":"latest_job_id",

  143. "type":["int","null"]

  144. },

  145. {

  146. "name":"latest_status",

  147. "type":["string","null"]

  148. },

  149. {

  150. "name":"latest_oomKilled",

  151. "type":["boolean","null"]

  152. },

  153. {

  154. "name":"latest_created_at",

  155. "type":["string","null"]

  156. },

  157. {

  158. "name":"latest_scheduling_at",

  159. "type":["string","null"]

  160. },

  161. {

  162. "name":"latest_url",

  163. "type":["string","null"]

  164. },

  165. {

  166. "name":"latest_html_url",

  167. "type":["string","null"]

  168. },

  169. {

  170. "name":"latest_shared_view_visibility",

  171. "type":["string","null"]

  172. },

  173. {

  174. "name":"report_include_logs",

  175. "type":["boolean","null"]

  176. },

  177. {

  178. "name":"report_send_from_creator",

  179. "type":["boolean","null"]

  180. },

  181. {

  182. "name":"timeout",

  183. "type":["int","null"]

  184. },

  185. {

  186. "name":"timeout_kill",

  187. "type":["boolean","null"]

  188. },

  189. {

  190. "name":"created_at",

  191. "type":["string","null"]

  192. },

  193. {

  194. "name":"updated_at",

  195. "type":["string","null"]

  196. },

  197. {

  198. "name":"shared_view_visibility",

  199. "type":["string","null"]

  200. },

  201. {

  202. "name":"url",

  203. "type":["string","null"]

  204. },

  205. {

  206. "name":"html_url",

  207. "type":["string","null"]

  208. },

  209. {

  210. "name":"engine_id",

  211. "type":["string","null"]

  212. }

  213. ]

  214. }


Documentation
References
Join Me in March at Data Works Summit in Barcelona.

Popular posts from this blog

Migrating Apache Flume Flows to Apache NiFi: Kafka Source to HDFS / Kudu / File / Hive

Migrating Apache Flume Flows to Apache NiFi: Kafka Source to HDFS / Kudu / File / HiveArticle 7 - https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_9.html Article 6 - https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_35.html
Article 5 - 
Article 4 - https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_8.html Article 3 - https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_7.html Article 2 - https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache.html Article 1https://www.datainmotion.dev/2019/08/migrating-apache-flume-flows-to-apache.html Source Code:  https://github.com/tspannhw/flume-to-nifi
This is one possible simple, fast replacement for "Flafka".



Consume / Publish Kafka And Store to Files, HDFS, Hive 3.1, Kudu

Consume Kafka Flow 

 Merge Records And Store As AVRO or ORC
Consume Kafka, Update Records via Machine Learning Models In CDSW And Store to Kudu

Sour…

Exploring Apache NiFi 1.10: Stateless Engine and Parameters

Exploring Apache NiFi 1.10:   Stateless Engine and Parameters Apache NiFi is now available in 1.10!
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12316020&version=12344993

You can now use JDK 8 or JDK 11!   I am running in JDK 11, seems a bit faster.

A huge feature is the addition of Parameters!   And you can use these to pass parameters to Apache NiFi Stateless!

A few lesser Processors have been moved from the main download, see here for migration hints:
https://cwiki.apache.org/confluence/display/NIFI/Migration+Guidance

Release Notes:   https://cwiki.apache.org/confluence/display/NIFI/Release+Notes#ReleaseNotes-Version1.10.0

Example Source Code:https://github.com/tspannhw/stateless-examples

More New Features:

ParquetReader/Writer (See:  https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_7.html)Prometheus Reporting Task.   Expect more Prometheus stuff coming.Experimental Encrypted content repository.   People asked me for this one before.Par…

Ingesting Drone Data From DJII Ryze Tello Drones Part 1 - Setup and Practice

Ingesting Drone Data From DJII Ryze Tello Drones Part 1 - Setup and Practice In Part 1, we will setup our drone, our communication environment, capture the data and do initial analysis. We will eventually grab live video stream for object detection, real-time flight control and real-time data ingest of photos, videos and sensor readings. We will have Apache NiFi react to live situations facing the drone and have it issue flight commands via UDP. In this initial section, we will control the drone with Python which can be triggered by NiFi. Apache NiFi will ingest log data that is stored as CSV files on a NiFi node connected to the drone's WiFi. This will eventually move to a dedicated embedded device running MiniFi. This is a small personal drone with less than 13 minutes of flight time per battery. This is not a commercial drone, but gives you an idea of the what you can do with drones. Drone Live Communications for Sensor Readings and Drone Control You must connect to the drone…