Showing posts with label edge. Show all posts
Showing posts with label edge. Show all posts

Edge to AI: Apache Spark, Apache NiFi, Apache NiFi MiNiFi, Cloudera Data Science Workbench Example



Use Case
IoT Devices with Sensors, Cameras
Overview
In this, the third of the CDSW series, we build on using CDSW to classify images with a Python Apache MXNet model. In this use case we are receiving edge data from devices running MiniFi Agents that are collecting sensor data, images and also running edge analytics with TensorFlow. An Apache NiFi server collects this data with full data lineage using HTTP calls from the device(s). We then filter, transform, merge and route the sensor data, image data, deep learning analytics data and metadata to different data stores. As part of the flow we upload our images to a cloud hosted FTP server (could be S3 or any media store anywhere) and call a CDSW Model from Apache NiFi via REST and get the model results back as JSON. We are also storing our sensor data in Parquet files in HDFS. We then trigger a PySpark job from CDSW via API from Apache NiFi and check the status of that. We store the status result data in Parquet as well for PySpark SQL analysis.
For additional steps we can join together the image and sensor data via image name and do additional queries, reports and dashboards.
We can also route this data to Apache Kafka for downstream analysis in Kafka Streams, Storm, Spark Streaming or SAM.
Summary
MiniFi Java Agents read sensor values and feed them to Apache NiFi via HTTPS with full data provenance and lineage. Apache NiFi acts as master orchestrator conducting, filtering, transforming, converting, querying, aggregating, routing and cleansing the streams. As part of the flow we call Cloudera Data Science Workbench via REST API to classify ingested images via an Apache MXNet Python GluonCV Yolo model. We also call a Spark job to process ingested Parquet files stored in HDFS loaded from the related sensor and metadata. The Pyspark jobs are triggered from Apache NiFi via REST API calls to Cloudera Data Science Workbench's jobs api.
For this particular integration I am using a self-built Apache 1.9, Apache NiFi - MiniFi Java Agent 0.5.0, Cloudera Data Science Workbench 1.5 for HDP, HDFS, Apache Spark 2, Python 3, PySpark and Parquet.
Overall Apache NiFi Flow
Workflow walk-thru
For Images, we transmit the images to an FTP server, run them through an Inception classifier (TensorFlow NiFi Processor) and extract those results plus metadata for future uses.
For Sensor Data, we merge it, convert to Parquet and store the files. We also store it to HBase and send alerts to a slack channel. When we are complete we trigger an Apache Spark PySpark SQL job via CDSW. This job can email us a report and has nice dashboards to see your job run. We also clean up, filter, flatten and merge with JSON status as Parquet files for future analysis with PySpark SQL.
We must set Content-Type for application/json, send an empty message body, no chunk encoding and you can turn on Always Output response.
We need to cleanup and remove some fields from the status returned. Jolt works magic on JSON.
Setting up FTP is easy.
Here is what some of the sensor data looks like while in motion.
We setup a job in CDSW very easily from an existing Python file.
After we have run the job a few times we get a nice graph of run duration for our Job history.
You can see details of the run including the session and the results.
When the job is running you can see it in process and all the completed runs.
We can query our data with Pyspark Dataframes for simple output.
we can display the schema.
We can use Pandas for a nicer table display of the data.
Load Data Manually
We can have Apache NiFi push to HDFS directly for us. To load data manually in Cloudera DSW after uploading the files to a directory in CDSW:

  1. # To Load Data Created By niFi

  2.  

  3. !hdfs dfs -mkdir /tmp/status

  4. !hdfs dfs -put status/*.parquet /tmp/status

  5. !hdfs dfs -ls /tmp/status!hdfs dfs -mkdir /tmp/sensors

  6. !hdfs dfs -put sensors/*.parquet /tmp/sensors

  7. !hdfs dfs -ls /tmp/sensors

  8.  


Source Code
Jolt To Cleanup CDSW Status JSON

  1. [{

  2. "operation": "shift",

  3. "spec": { "*.*": "&(0,1)_&(0,2)",

  4. "*.*.*": "&(0,1)_&(0,2)_&(0,3)",

  5. "*.*.*.*": "&(0,1)_&(0,2)_&(0,3)_&(0,4)", "*": "&" } },

  6. { "operation": "remove",

  7. "spec": { "environment": "", "environment*": "", "latest_k8s": "",

  8. "report_attachments": "" }}]


We remove the arrays, remove some unwanted fields and flatten the data for easy querying. We then convert to Apache Avro and store as Apache Parquet files for querying with Pyspark.
URL to Start a Cloudera Data Science Workbench Job
as Per:
What Does the IoT Data Look Like?
{ "uuid" : "20190213043439_e58bee05-142b-4b7e-a28b-fec0305ab125", "BH1745_clear" : "0.0", "te" : "601.1575453281403", "host" : "piups", "BH1745_blue" : "0.0", "imgname" : "/opt/demo/images/bog_image_20190213043439_e58bee05-142b-4b7e-a28b-fec0305ab125.jpg", "lsm303d_accelerometer" : "+00.08g : -01.01g : +00.09g", "cputemp" : 44, "systemtime" : "02/12/2019 23:34:39", "memory" : 45.7, "bme680_tempc" : "23.97", "imgnamep" : "/opt/demo/images/bog_image_p_20190213043439_e58bee05-142b-4b7e-a28b-fec0305ab125.jpg", "bme680_pressure" : "1000.91", "BH1745_red" : "0.0", "bme680_tempf" : "75.15", "diskusage" : "9622.5", "ltr559_lux" : "000.00", "bme680_humidity" : "24.678", "lsm303d_magnetometer" : "+00.03 : +00.42 : -00.11", "BH1745_green" : "0.0", "ipaddress" : "192.168.1.166", "starttime" : "02/12/2019 23:24:38", "ltr559_prox" : "0000", "VL53L1X_distance_in_mm" : 553, "end" : "1550032479.3900714" }
What Does the TensorFlow Image Analytics Data Look Like?
{"probability_4":"2.00%","file.group":"root", "s2s.address":"192.168.1.166:60966", "probability_5":"1.90%","file.lastModifiedTime":"2019-02-12T18:02:21-0500", "probability_2":"3.14%","probability_3":"2.35%","probability_1":"3.40%", "file.permissions":"rw-r--r--","uuid":"0596aa5f-325b-4bd2-ae80-6c7561c8c056", "absolute.path":"/opt/demo/images/","path":"/","label_5":"fountain", "label_4":"lampshade","filename":"bog_image_20190212230221_00c846a7-b8d2-4192-b8eb-f6f13268483c.jpg", "label_3":"breastplate","s2s.host":"192.168.1.166","file.creationTime":"2019-02-12T18:02:21-0500", "file.lastAccessTime":"2019-02-12T18:02:21-0500", "file.owner":"root", "label_2":"spotlight", "label_1":"coffeepot", "RouteOnAttribute.Route":"isImage"}
Transformed Job Status Data
{ "id" : 4, "name" : "Pyspark SQL Job", "script" : "pysparksqljob.py", "cpu" : 2, "memory" : 4, "nvidia_gpu" : 0, "engine_image_id" : 7, "kernel" : "python3", "englishSchedule" : "", "timezone" : "America/New_York", "total_runs" : 108, "total_failures" : 0, "paused" : false, "type" : "manual", "creator_id" : 19, "creator_username" : "tspann", "creator_name" : "Timothy Spann", "creator_email" : "tspann@EmailIsland.Space", "creator_url" : "http://cdsw-hdp-3/api/v1/users/tspann", "creator_html_url" : "http://cdsw-hdp-3/tspann", "project_id" : 30, "project_slug" : "tspann/future-of-data-meetup-princeton-12-feb-2019", "project_name" : "Future of Data Meetup Princeton 12 Feb 2019", "project_owner_id" : 19, "project_owner_username" : "tspann", "project_owner_email" : "tspann@email.tu", "project_owner_name" : "Timothy Spann", "project_owner_url" : "http://cdsw-hdp-3/api/v1/users/tspann", "project_owner_html_url" : "http://cdsw-hdp/tspann", "project_url" : "http://cdsw-hdp-3/api/v1/projects/tspann/future-of-data-meetup-princeton-12-feb-2019", "project_html_url" : "http://cdsw-hdp-3/tspann/future-of-data-meetup-princeton-12-feb-2019", "latest_id" : "jq47droa9zv9ou0j", "latest_batch" : true, "latest_job_id" : 4, "latest_status" : "scheduling", "latest_oomKilled" : false, "latest_created_at" : "2019-02-13T13:04:28.961Z", "latest_scheduling_at" : "2019-02-13T13:04:28.961Z", "latest_url" : "http://server/api/v1/projects/tspann/future-of-data-meetup-princeton-12-feb-2019/dashboards/jq47droa9zv9ou0j", "latest_html_url" : "http://server/tspann/future-of-data-meetup-princeton-12-feb-2019/engines/jq47droa9zv9ou0j", "latest_shared_view_visibility" : "private", "report_include_logs" : true, "report_send_from_creator" : false, "timeout" : 30, "timeout_kill" : false, "created_at" : "2019-02-13T04:46:26.597Z", "updated_at" : "2019-02-13T04:46:26.597Z", "shared_view_visibility" : "private", "url" : "http://serverapi/v1/projects/tspann/future-of-data-meetup-princeton-12-feb-2019/jobs/4", "html_url" : "http://server/tspann/future-of-data-meetup-princeton-12-feb-2019/jobs/4", "engine_id" : "jq47droa9zv9ou0j" }
PySpark Sensor Spark SQL for Data Analysis

  1. from __future__ import print_function

  2. import pandas as pd

  3. import sys, re

  4. from operator import add

  5. from pyspark.sql import SparkSession

  6. pd.options.display.html.table_schema = True

  7. spark = SparkSession\

  8. .builder\

  9. .appName("Sensors")\

  10. .getOrCreate()

  11. # Access the parquet

  12. sensor = spark.read.parquet("/tmp/sensors/*.parquet")

  13. data = sensor.toPandas()

  14. pd.DataFrame(data)

  15. spark.stop()


PySpark Status Spark SQL for Data Analysis


  1. from __future__ import print_function

  2. import pandas as pd

  3. import sys, re

  4. from operator import add

  5. from pyspark.sql

  6. import SparkSession

  7. pd.options.display.html.table_schema = True

  8. spark = SparkSession\

  9. .builder\

  10. .appName("Status")\

  11. .getOrCreate()

  12. # Access the parquet

  13. sensor = spark.read.parquet("/tmp/status/*.parquet")

  14. # show content

  15. sensor.show()

  16. # query

  17. #

  18. sensor.select(sensor['bme680_humidity'], sensor['bme680_tempf'], sensor['lsm303d_magnetometer']).show()

  19. sensor.printSchema()sensor.count()

  20. data = sensor.toPandas()pd.DataFrame(data)

  21. spark.stop()



Status Schema (jobstatus)

  1. {

  2. "type":"record",

  3. "name":"jobstatus",

  4. "fields":[

  5. {

  6. "name":"id",

  7. "type":["int","null"]

  8. },

  9. {

  10. "name":"name",

  11. "type":["string","null"]

  12. },

  13. {

  14. "name":"script",

  15. "type":["string","null"]

  16. },

  17. {

  18. "name":"cpu",

  19. "type":["int","null"]

  20. },

  21. {

  22. "name":"memory",

  23. "type":["int","null"]

  24. },

  25. {

  26. "name":"nvidia_gpu",

  27. "type":["int","null"]

  28. },

  29. {

  30. "name":"engine_image_id",

  31. "type":["int","null"]

  32. },

  33. {

  34. "name":"kernel",

  35. "type":["string","null"]

  36. },

  37. {

  38. "name":"englishSchedule",

  39. "type":["string","null"]

  40. },

  41. {

  42. "name":"timezone",

  43. "type":["string","null"]

  44. },

  45. {

  46. "name":"total_runs",

  47. "type":["int","null"]

  48. },

  49. {

  50. "name":"total_failures",

  51. "type":["int","null"],

  52. "doc":"Type inferred from '0'"

  53. },

  54. {

  55. "name":"paused",

  56. "type":["boolean","null"],

  57. "doc":"Type inferred from 'false'"

  58. },

  59. {

  60. "name":"type",

  61. "type":["string","null"],

  62. "doc":"Type inferred from '\"manual\"'"

  63. },

  64. {

  65. "name":"creator_id",

  66. "type":["int","null"],

  67. "doc":"Type inferred from '19'"

  68. },

  69. {

  70. "name":"creator_username",

  71. "type":["string","null"]

  72. },

  73. {

  74. "name":"creator_name",

  75. "type":["string","null"]

  76. },

  77. {

  78. "name":"creator_email",

  79. "type":["string","null"]

  80. },

  81. {

  82. "name":"creator_url",

  83. "type":["string","null"]

  84. },

  85. {

  86. "name":"creator_html_url",

  87. "type":["string","null"]

  88. },

  89. {

  90. "name":"project_id",

  91. "type":["int","null"]

  92. },

  93. {

  94. "name":"project_slug",

  95. "type":["string","null"]

  96. },

  97. {

  98. "name":"project_name",

  99. "type":["string","null"]

  100. },

  101. {

  102. "name":"project_owner_id",

  103. "type":["int","null"]

  104. },

  105. {

  106. "name":"project_owner_username",

  107. "type":["string","null"]

  108. },

  109. {

  110. "name":"project_owner_email",

  111. "type":["string","null"]

  112. },

  113. {

  114. "name":"project_owner_name",

  115. "type":["string","null"]

  116. },

  117. {

  118. "name":"project_owner_url",

  119. "type":["string","null"]

  120. },

  121. {

  122. "name":"project_owner_html_url",

  123. "type":["string","null"]

  124. },

  125. {

  126. "name":"project_url",

  127. "type":["string","null"]

  128. },

  129. {

  130. "name":"project_html_url",

  131. "type":["string","null"]

  132. },

  133. {

  134. "name":"latest_id",

  135. "type":["string","null"]

  136. },

  137. {

  138. "name":"latest_batch",

  139. "type":["boolean","null"]

  140. },

  141. {

  142. "name":"latest_job_id",

  143. "type":["int","null"]

  144. },

  145. {

  146. "name":"latest_status",

  147. "type":["string","null"]

  148. },

  149. {

  150. "name":"latest_oomKilled",

  151. "type":["boolean","null"]

  152. },

  153. {

  154. "name":"latest_created_at",

  155. "type":["string","null"]

  156. },

  157. {

  158. "name":"latest_scheduling_at",

  159. "type":["string","null"]

  160. },

  161. {

  162. "name":"latest_url",

  163. "type":["string","null"]

  164. },

  165. {

  166. "name":"latest_html_url",

  167. "type":["string","null"]

  168. },

  169. {

  170. "name":"latest_shared_view_visibility",

  171. "type":["string","null"]

  172. },

  173. {

  174. "name":"report_include_logs",

  175. "type":["boolean","null"]

  176. },

  177. {

  178. "name":"report_send_from_creator",

  179. "type":["boolean","null"]

  180. },

  181. {

  182. "name":"timeout",

  183. "type":["int","null"]

  184. },

  185. {

  186. "name":"timeout_kill",

  187. "type":["boolean","null"]

  188. },

  189. {

  190. "name":"created_at",

  191. "type":["string","null"]

  192. },

  193. {

  194. "name":"updated_at",

  195. "type":["string","null"]

  196. },

  197. {

  198. "name":"shared_view_visibility",

  199. "type":["string","null"]

  200. },

  201. {

  202. "name":"url",

  203. "type":["string","null"]

  204. },

  205. {

  206. "name":"html_url",

  207. "type":["string","null"]

  208. },

  209. {

  210. "name":"engine_id",

  211. "type":["string","null"]

  212. }

  213. ]

  214. }


Documentation
References
Join Me in March at Data Works Summit in Barcelona.