Showing posts with label edge-to-ai. Show all posts
Showing posts with label edge-to-ai. Show all posts

Using Raspberry Pi 3B+ with Apache NiFi MiNiFi and Google Coral Accelerator and Pimoroni Inky Phat


Using Raspberry Pi 3B+ with Apache NiFi MiNiFi and Google Coral Accelerator and Pimoroni Inky Phat

Architecture



Introduction

First we need to unbox our new goodies.   The Inky Phat is an awesome E-Ink display with low power usage that stays displayed after shutdown! 

Next I added a new Google Coral Edge TPU ML Accelerator USB Coprocessor to a new Raspberry Pi 3B+.    This was so easy to integrate and get up and running.

Let's unbox this beautiful device (but be careful when it runs it can get really hot and there is a warning in the instructions).   So I run this on top of an aluminum case and with a big fan on it.







Pimoroni Inky Phat

It is pretty easy to set this up and it provides a robust Python library to write to our E-Ink display.   You can see an example screen here.

https://github.com/pimoroni/inky
Pimoroni Inky pHAT ePaper eInk Display in Red


Pimoroni Inky Phat (Red)


https://shop.pimoroni.com/products/inky-phat
https://github.com/pimoroni/inky
https://pillow.readthedocs.io/en/stable/reference/ImageDraw.html
https://learn.pimoroni.com/tutorial/sandyj/getting-started-with-inky-phat


Install Some Python Libraries and Debian Install for Inky PHAT and Coral

pip3 install font_fredoka_one
pip3 install geocoder
pip3 install fswebcam
sudo apt-get install fe
pip3 install psutil
pip3 install font_hanken_grotesk
pip3 install font_intuitive
wget http://storage.googleapis.com/cloud-iot-edge-pretrained-models/edgetpu_api.tar.gz
These libraries are for the Inky, it needs fonts to write.   The last TAR is for the Edge device and is a fast install documented well by Google.

Download Apache NiFi - MiNiFi Java Agent

https://nifi.apache.org/minifi/download.html

Next up, the most important piece.  You will need to have JDK 8 installed on your device if you are using the Java agent.   You can also use the MiniFi C++ Agent but that may require building it for your OS/Platform.   That has some interesting Python running abilities.


Google Coral Documentation - Google Edge TPU
  • Google Edge TPU ML accelerator coprocessor
  • USB 3.0 Type-C socket
  • Supports Debian Linux on host CPU
  • ASIC designed by Google that provides high performance ML inferencing for TensorFlow Lite models


Using Pretrained Tensorflow Lite Model:

Inception V4 (ImageNet)
Recognizes 1,000 types of objects
Dataset: ImageNet
Input size: 299x299

Let's run a flow!

I can run this Python3 script every 10 seconds without issues that includes capturing the picture, running it through classification with the model, forming JSON data, grabbing network and device stats, forming a JSON file and completing in under 5 seconds.   Our MiNiFi agent is scheduled to call the script every 10 seconds and grab images after 60 seconds. 


MiNiFi Flow



Flow Overview



Apache NiFi Flow





Results (Once an hour we update our E-Ink Display with Date, IP, Run Time, Label 1)





Example JSON Data

{"endtime": "1552164369.27", "memory": "19.1", "cputemp": "32", "ipaddress": "192.168.1.183", "diskusage": "50336.5", "score_2": "0.14", "score_1": "0.68", "runtime": "4.74", "host": "mv2", "starttime": "03/09/2019 15:46:04", "label_1": "hard disc, hard disk, fixed disk", "uuid": "20190309204609_05c9a240-d801-4bac-b029-e5bf38c02d40", "label_2": "buckle", "systemtime": "03/09/2019 15:46:09"}

Example Slack Alert


PS3 Eye USB Camera Capturing an Image


Image It Captured




Source Code

https://github.com/tspannhw/nifi-minifi-coral

Convert Your Flow To Config.YML For MiniFi (Look for a major innovation here soon).

 ./config.sh transform Coral_MiniFi_Agent_Flow.xml config.yml
config.sh: JAVA_HOME not set; results may vary

Java home: 
MiNiFi Toolkit home: /Volumes/TSPANN/2019/apps/minifi-toolkit-0.5.0



No validation errors found in converted configuration.


Example Call From MiNiFi 0.5.0 Java Agent to Apache NiFi 1.9.0 Server


2019-03-09 16:21:01,877 INFO [Timer-Driven Process Thread-10] o.a.nifi.remote.StandardRemoteGroupPort RemoteGroupPort[name=Coral Input,targets=http://hw13125.local:8080/nifi] Successfully sent [StandardFlowFileRecord[uuid=eab17784-2e76-4438-a60a-fd67df37a102,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1552166446123-3, container=default, section=3], offset=362347, length=685083],offset=0,name=d74bc911bfd167fe79d5a3aa780004fd66fa6d,size=685083], StandardFlowFileRecord[uuid=eb979d09-a936-4b2d-82ff-d204f9d768eb,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1552166446123-3, container=default, section=3], offset=1047430, length=361022],offset=0,name=2019-03-09_1541.jpg,size=361022], StandardFlowFileRecord[uuid=343a4c91-b863-440e-ac81-1f68d6210792,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1552166446123-3, container=default, section=3], offset=1408452, length=668],offset=0,name=3026822c780724b39e826230bdef43f8ed9786,size=668], StandardFlowFileRecord[uuid=97df9d3a-dc3c-4d03-b533-7b75c3180032,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1552166446123-3, container=default, section=3], offset=1409120, length=2133417],offset=0,name=abb6feaac5bda3c6d3660e7593cc4ef2e1cfce,size=2133417]] (3.03 MB) to http://hw13125.local:8080/nifi-api in 1416 milliseconds at a rate of 2.14 MB/sec


References







Edge to AI: Apache Spark, Apache NiFi, Apache NiFi MiNiFi, Cloudera Data Science Workbench Example



Use Case
IoT Devices with Sensors, Cameras
Overview
In this, the third of the CDSW series, we build on using CDSW to classify images with a Python Apache MXNet model. In this use case we are receiving edge data from devices running MiniFi Agents that are collecting sensor data, images and also running edge analytics with TensorFlow. An Apache NiFi server collects this data with full data lineage using HTTP calls from the device(s). We then filter, transform, merge and route the sensor data, image data, deep learning analytics data and metadata to different data stores. As part of the flow we upload our images to a cloud hosted FTP server (could be S3 or any media store anywhere) and call a CDSW Model from Apache NiFi via REST and get the model results back as JSON. We are also storing our sensor data in Parquet files in HDFS. We then trigger a PySpark job from CDSW via API from Apache NiFi and check the status of that. We store the status result data in Parquet as well for PySpark SQL analysis.
For additional steps we can join together the image and sensor data via image name and do additional queries, reports and dashboards.
We can also route this data to Apache Kafka for downstream analysis in Kafka Streams, Storm, Spark Streaming or SAM.
Summary
MiniFi Java Agents read sensor values and feed them to Apache NiFi via HTTPS with full data provenance and lineage. Apache NiFi acts as master orchestrator conducting, filtering, transforming, converting, querying, aggregating, routing and cleansing the streams. As part of the flow we call Cloudera Data Science Workbench via REST API to classify ingested images via an Apache MXNet Python GluonCV Yolo model. We also call a Spark job to process ingested Parquet files stored in HDFS loaded from the related sensor and metadata. The Pyspark jobs are triggered from Apache NiFi via REST API calls to Cloudera Data Science Workbench's jobs api.
For this particular integration I am using a self-built Apache 1.9, Apache NiFi - MiniFi Java Agent 0.5.0, Cloudera Data Science Workbench 1.5 for HDP, HDFS, Apache Spark 2, Python 3, PySpark and Parquet.
Overall Apache NiFi Flow
Workflow walk-thru
For Images, we transmit the images to an FTP server, run them through an Inception classifier (TensorFlow NiFi Processor) and extract those results plus metadata for future uses.
For Sensor Data, we merge it, convert to Parquet and store the files. We also store it to HBase and send alerts to a slack channel. When we are complete we trigger an Apache Spark PySpark SQL job via CDSW. This job can email us a report and has nice dashboards to see your job run. We also clean up, filter, flatten and merge with JSON status as Parquet files for future analysis with PySpark SQL.
We must set Content-Type for application/json, send an empty message body, no chunk encoding and you can turn on Always Output response.
We need to cleanup and remove some fields from the status returned. Jolt works magic on JSON.
Setting up FTP is easy.
Here is what some of the sensor data looks like while in motion.
We setup a job in CDSW very easily from an existing Python file.
After we have run the job a few times we get a nice graph of run duration for our Job history.
You can see details of the run including the session and the results.
When the job is running you can see it in process and all the completed runs.
We can query our data with Pyspark Dataframes for simple output.
we can display the schema.
We can use Pandas for a nicer table display of the data.
Load Data Manually
We can have Apache NiFi push to HDFS directly for us. To load data manually in Cloudera DSW after uploading the files to a directory in CDSW:

  1. # To Load Data Created By niFi

  2.  

  3. !hdfs dfs -mkdir /tmp/status

  4. !hdfs dfs -put status/*.parquet /tmp/status

  5. !hdfs dfs -ls /tmp/status!hdfs dfs -mkdir /tmp/sensors

  6. !hdfs dfs -put sensors/*.parquet /tmp/sensors

  7. !hdfs dfs -ls /tmp/sensors

  8.  


Source Code
Jolt To Cleanup CDSW Status JSON

  1. [{

  2. "operation": "shift",

  3. "spec": { "*.*": "&(0,1)_&(0,2)",

  4. "*.*.*": "&(0,1)_&(0,2)_&(0,3)",

  5. "*.*.*.*": "&(0,1)_&(0,2)_&(0,3)_&(0,4)", "*": "&" } },

  6. { "operation": "remove",

  7. "spec": { "environment": "", "environment*": "", "latest_k8s": "",

  8. "report_attachments": "" }}]


We remove the arrays, remove some unwanted fields and flatten the data for easy querying. We then convert to Apache Avro and store as Apache Parquet files for querying with Pyspark.
URL to Start a Cloudera Data Science Workbench Job
as Per:
What Does the IoT Data Look Like?
{ "uuid" : "20190213043439_e58bee05-142b-4b7e-a28b-fec0305ab125", "BH1745_clear" : "0.0", "te" : "601.1575453281403", "host" : "piups", "BH1745_blue" : "0.0", "imgname" : "/opt/demo/images/bog_image_20190213043439_e58bee05-142b-4b7e-a28b-fec0305ab125.jpg", "lsm303d_accelerometer" : "+00.08g : -01.01g : +00.09g", "cputemp" : 44, "systemtime" : "02/12/2019 23:34:39", "memory" : 45.7, "bme680_tempc" : "23.97", "imgnamep" : "/opt/demo/images/bog_image_p_20190213043439_e58bee05-142b-4b7e-a28b-fec0305ab125.jpg", "bme680_pressure" : "1000.91", "BH1745_red" : "0.0", "bme680_tempf" : "75.15", "diskusage" : "9622.5", "ltr559_lux" : "000.00", "bme680_humidity" : "24.678", "lsm303d_magnetometer" : "+00.03 : +00.42 : -00.11", "BH1745_green" : "0.0", "ipaddress" : "192.168.1.166", "starttime" : "02/12/2019 23:24:38", "ltr559_prox" : "0000", "VL53L1X_distance_in_mm" : 553, "end" : "1550032479.3900714" }
What Does the TensorFlow Image Analytics Data Look Like?
{"probability_4":"2.00%","file.group":"root", "s2s.address":"192.168.1.166:60966", "probability_5":"1.90%","file.lastModifiedTime":"2019-02-12T18:02:21-0500", "probability_2":"3.14%","probability_3":"2.35%","probability_1":"3.40%", "file.permissions":"rw-r--r--","uuid":"0596aa5f-325b-4bd2-ae80-6c7561c8c056", "absolute.path":"/opt/demo/images/","path":"/","label_5":"fountain", "label_4":"lampshade","filename":"bog_image_20190212230221_00c846a7-b8d2-4192-b8eb-f6f13268483c.jpg", "label_3":"breastplate","s2s.host":"192.168.1.166","file.creationTime":"2019-02-12T18:02:21-0500", "file.lastAccessTime":"2019-02-12T18:02:21-0500", "file.owner":"root", "label_2":"spotlight", "label_1":"coffeepot", "RouteOnAttribute.Route":"isImage"}
Transformed Job Status Data
{ "id" : 4, "name" : "Pyspark SQL Job", "script" : "pysparksqljob.py", "cpu" : 2, "memory" : 4, "nvidia_gpu" : 0, "engine_image_id" : 7, "kernel" : "python3", "englishSchedule" : "", "timezone" : "America/New_York", "total_runs" : 108, "total_failures" : 0, "paused" : false, "type" : "manual", "creator_id" : 19, "creator_username" : "tspann", "creator_name" : "Timothy Spann", "creator_email" : "tspann@EmailIsland.Space", "creator_url" : "http://cdsw-hdp-3/api/v1/users/tspann", "creator_html_url" : "http://cdsw-hdp-3/tspann", "project_id" : 30, "project_slug" : "tspann/future-of-data-meetup-princeton-12-feb-2019", "project_name" : "Future of Data Meetup Princeton 12 Feb 2019", "project_owner_id" : 19, "project_owner_username" : "tspann", "project_owner_email" : "tspann@email.tu", "project_owner_name" : "Timothy Spann", "project_owner_url" : "http://cdsw-hdp-3/api/v1/users/tspann", "project_owner_html_url" : "http://cdsw-hdp/tspann", "project_url" : "http://cdsw-hdp-3/api/v1/projects/tspann/future-of-data-meetup-princeton-12-feb-2019", "project_html_url" : "http://cdsw-hdp-3/tspann/future-of-data-meetup-princeton-12-feb-2019", "latest_id" : "jq47droa9zv9ou0j", "latest_batch" : true, "latest_job_id" : 4, "latest_status" : "scheduling", "latest_oomKilled" : false, "latest_created_at" : "2019-02-13T13:04:28.961Z", "latest_scheduling_at" : "2019-02-13T13:04:28.961Z", "latest_url" : "http://server/api/v1/projects/tspann/future-of-data-meetup-princeton-12-feb-2019/dashboards/jq47droa9zv9ou0j", "latest_html_url" : "http://server/tspann/future-of-data-meetup-princeton-12-feb-2019/engines/jq47droa9zv9ou0j", "latest_shared_view_visibility" : "private", "report_include_logs" : true, "report_send_from_creator" : false, "timeout" : 30, "timeout_kill" : false, "created_at" : "2019-02-13T04:46:26.597Z", "updated_at" : "2019-02-13T04:46:26.597Z", "shared_view_visibility" : "private", "url" : "http://serverapi/v1/projects/tspann/future-of-data-meetup-princeton-12-feb-2019/jobs/4", "html_url" : "http://server/tspann/future-of-data-meetup-princeton-12-feb-2019/jobs/4", "engine_id" : "jq47droa9zv9ou0j" }
PySpark Sensor Spark SQL for Data Analysis

  1. from __future__ import print_function

  2. import pandas as pd

  3. import sys, re

  4. from operator import add

  5. from pyspark.sql import SparkSession

  6. pd.options.display.html.table_schema = True

  7. spark = SparkSession\

  8. .builder\

  9. .appName("Sensors")\

  10. .getOrCreate()

  11. # Access the parquet

  12. sensor = spark.read.parquet("/tmp/sensors/*.parquet")

  13. data = sensor.toPandas()

  14. pd.DataFrame(data)

  15. spark.stop()


PySpark Status Spark SQL for Data Analysis


  1. from __future__ import print_function

  2. import pandas as pd

  3. import sys, re

  4. from operator import add

  5. from pyspark.sql

  6. import SparkSession

  7. pd.options.display.html.table_schema = True

  8. spark = SparkSession\

  9. .builder\

  10. .appName("Status")\

  11. .getOrCreate()

  12. # Access the parquet

  13. sensor = spark.read.parquet("/tmp/status/*.parquet")

  14. # show content

  15. sensor.show()

  16. # query

  17. #

  18. sensor.select(sensor['bme680_humidity'], sensor['bme680_tempf'], sensor['lsm303d_magnetometer']).show()

  19. sensor.printSchema()sensor.count()

  20. data = sensor.toPandas()pd.DataFrame(data)

  21. spark.stop()



Status Schema (jobstatus)

  1. {

  2. "type":"record",

  3. "name":"jobstatus",

  4. "fields":[

  5. {

  6. "name":"id",

  7. "type":["int","null"]

  8. },

  9. {

  10. "name":"name",

  11. "type":["string","null"]

  12. },

  13. {

  14. "name":"script",

  15. "type":["string","null"]

  16. },

  17. {

  18. "name":"cpu",

  19. "type":["int","null"]

  20. },

  21. {

  22. "name":"memory",

  23. "type":["int","null"]

  24. },

  25. {

  26. "name":"nvidia_gpu",

  27. "type":["int","null"]

  28. },

  29. {

  30. "name":"engine_image_id",

  31. "type":["int","null"]

  32. },

  33. {

  34. "name":"kernel",

  35. "type":["string","null"]

  36. },

  37. {

  38. "name":"englishSchedule",

  39. "type":["string","null"]

  40. },

  41. {

  42. "name":"timezone",

  43. "type":["string","null"]

  44. },

  45. {

  46. "name":"total_runs",

  47. "type":["int","null"]

  48. },

  49. {

  50. "name":"total_failures",

  51. "type":["int","null"],

  52. "doc":"Type inferred from '0'"

  53. },

  54. {

  55. "name":"paused",

  56. "type":["boolean","null"],

  57. "doc":"Type inferred from 'false'"

  58. },

  59. {

  60. "name":"type",

  61. "type":["string","null"],

  62. "doc":"Type inferred from '\"manual\"'"

  63. },

  64. {

  65. "name":"creator_id",

  66. "type":["int","null"],

  67. "doc":"Type inferred from '19'"

  68. },

  69. {

  70. "name":"creator_username",

  71. "type":["string","null"]

  72. },

  73. {

  74. "name":"creator_name",

  75. "type":["string","null"]

  76. },

  77. {

  78. "name":"creator_email",

  79. "type":["string","null"]

  80. },

  81. {

  82. "name":"creator_url",

  83. "type":["string","null"]

  84. },

  85. {

  86. "name":"creator_html_url",

  87. "type":["string","null"]

  88. },

  89. {

  90. "name":"project_id",

  91. "type":["int","null"]

  92. },

  93. {

  94. "name":"project_slug",

  95. "type":["string","null"]

  96. },

  97. {

  98. "name":"project_name",

  99. "type":["string","null"]

  100. },

  101. {

  102. "name":"project_owner_id",

  103. "type":["int","null"]

  104. },

  105. {

  106. "name":"project_owner_username",

  107. "type":["string","null"]

  108. },

  109. {

  110. "name":"project_owner_email",

  111. "type":["string","null"]

  112. },

  113. {

  114. "name":"project_owner_name",

  115. "type":["string","null"]

  116. },

  117. {

  118. "name":"project_owner_url",

  119. "type":["string","null"]

  120. },

  121. {

  122. "name":"project_owner_html_url",

  123. "type":["string","null"]

  124. },

  125. {

  126. "name":"project_url",

  127. "type":["string","null"]

  128. },

  129. {

  130. "name":"project_html_url",

  131. "type":["string","null"]

  132. },

  133. {

  134. "name":"latest_id",

  135. "type":["string","null"]

  136. },

  137. {

  138. "name":"latest_batch",

  139. "type":["boolean","null"]

  140. },

  141. {

  142. "name":"latest_job_id",

  143. "type":["int","null"]

  144. },

  145. {

  146. "name":"latest_status",

  147. "type":["string","null"]

  148. },

  149. {

  150. "name":"latest_oomKilled",

  151. "type":["boolean","null"]

  152. },

  153. {

  154. "name":"latest_created_at",

  155. "type":["string","null"]

  156. },

  157. {

  158. "name":"latest_scheduling_at",

  159. "type":["string","null"]

  160. },

  161. {

  162. "name":"latest_url",

  163. "type":["string","null"]

  164. },

  165. {

  166. "name":"latest_html_url",

  167. "type":["string","null"]

  168. },

  169. {

  170. "name":"latest_shared_view_visibility",

  171. "type":["string","null"]

  172. },

  173. {

  174. "name":"report_include_logs",

  175. "type":["boolean","null"]

  176. },

  177. {

  178. "name":"report_send_from_creator",

  179. "type":["boolean","null"]

  180. },

  181. {

  182. "name":"timeout",

  183. "type":["int","null"]

  184. },

  185. {

  186. "name":"timeout_kill",

  187. "type":["boolean","null"]

  188. },

  189. {

  190. "name":"created_at",

  191. "type":["string","null"]

  192. },

  193. {

  194. "name":"updated_at",

  195. "type":["string","null"]

  196. },

  197. {

  198. "name":"shared_view_visibility",

  199. "type":["string","null"]

  200. },

  201. {

  202. "name":"url",

  203. "type":["string","null"]

  204. },

  205. {

  206. "name":"html_url",

  207. "type":["string","null"]

  208. },

  209. {

  210. "name":"engine_id",

  211. "type":["string","null"]

  212. }

  213. ]

  214. }


Documentation
References
Join Me in March at Data Works Summit in Barcelona.