Rapid IoT Development with Cloudera Edge Flow Manager - Using SenseHat, Intel Movidius, WebCameras and Electric Monitoring

Rapid IoT Development with Cloudera Edge Flow Manager - Using SenseHat, Intel Movidius, WebCameras and Electric Monitoring




I wanted to upgrade and change flows on my Movidius device, so the best option was to update to MiNiFi 0.6.0 Java Agent.   This allowed me to write a new flow graphically easily.  I also added the PiP library to connect to the HS-110 electric energy monitoring.   So we are grabbing sensor data from the Sense-Hat on the RPI, plus calling to the HS-110 sensor over TCP/IP, grabbing an image from our web camera and calling Apache MXNet and Caffe to run deep learning analytics against it.


So I installed the 0.6.0 Java MiNiFi agent on my RPI with SenseHat.   I enabled C2 in the configuration file and pointed to one of my EFM servers.


In our new flow, I execute the Python script that runs a Caffe and a MXNet model against a newly captured webcamera image.

I also list and fetch all the images (after a minute) that have been created by the webcamera.

Finally I execute a Python script to read from a local energy meter over a Python API.



All of this data (image and two types of JSON files) are sent to a NiFi cluster via S2S.   I route based on the type and process.   Energy monitoring data is queried via QueryRecord and sent to Slack if good.   The image is sent to my standard image processor group to be sent to CDSW for YOLO SSD analysis as well is to local TensorFlow and MXNet NiFi processors.   Finally the JSON Sensor/DL results are sent to Slack as a notification seen below.   This took less than an hour.  Easy AI - IoT / Edge 2 AI!



Example Web Camera Image


Technology:

  • Apache NiFi
  • Apache NiFi - MiNiFi
  • Cloudera Edge Flow Manager
  • Apache Kafka
  • Apache MXNet
  • Apache MXNet GluonCV
  • TensorFlow
  • Intel Movidius
  • Caffe
  • Python 2/3
  • PiP
  • TensorFlow Model zsoo
  • TP-Link-HS110
  • SenseHat Sensors
  • Raspberry Pi 3B+
  • USB Webcamera
  • Cloudera Data Science Workbench

Source:

References:

Creating Apache Kafka Topics Dynamically As Part of a DataFlow

Creating Apache Kafka Topics Dynamically As Part of a DataFlow


Sometimes when you are ingesting data at scale, whether it is from a Data Warehouse, Logs, REST API, IoT, Social Media or other sources, you may need to create new Apache Kafka topics depending on the type, variations, newness, schema, schema version or other changes.

Instead of having to manually create an Apache Kafka topic with Cloudera Streams Messaging Manager or Apache Kafka command line kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test), I would like to create it mid-stream based on names that are relevant to arriving data.   So this could be the name of the schema from the data, the table name of the origin date, some unique name generated with the data or another source.   For my example, I am generating a unique name via Apache NiFi Expression Language:

nifi${now():format('yyyyMMddmmss')}${UUID()}

This is a Proof of Concept, there are more features I would add if I wanted this for production use cases such as adding fields for Number Of Partitions and Number of Replicas.

Example Run



The processor is very easy to use, you merely enter your Kafka Broker URL, such as demo.hortonworks.com:6667.   And then the name of your Kafka topic.   The processor will validate to make sure you have a valid name which should be Alphanumeric with only the addition of periods, dashes and underscores.  It will run quickly and when completed, you can check out the results.  Your flowfile will be unchanged, but you will get new attributes as seen below.






You will get kafka.bootstrap (your Broker URL), kafka.client.id (a generate one time use client id), kafka.topic.<TOPIC_NAME> - with one for each Kafka topic that exists, kafka.topic.creation.success - a status of flag, kafka.topic.message - a message, kafka.topic.YourNewNamed one.






In IntelliJ I quickly developed this program using the Apache Kafka Admin API and some JUnit tests.

For a production use case I would probably just use the Cloudera SMM REST API to create topics.




It is trivial to call a REST API from Apache NiFi so I can use an Apache NiFi flow to orchestrate an entire Kafka lifecycle with management and monitoring for real-time interaction.

Source Code for Custom Apache NiFi Processor



Source Code fo Apache Kafka Shell Scripts


Edge Processing with Jetson Nano Part 3 - AI Integration

Edge Data Processing with Jetson Nano Part 3 - AI Integration






Top Level NiFi Flow Receiving MiNiFi Agent Messages





Overview of our Apache NiFi Flow For Processing









We format a new flow file to send to CDSW in JSON to the CDSW Job Environment




We Run Apache MXNet 1.3.1 (Java) SSD Against the Web Camera Image


Extract The Values From the FlowFile to Send to the Spark Job



Our JSON Results From the Logs






Log data has successfully arrived, consistent JSON rows are grabbed as they are written to the file
 



We can see the results of the Spark Job in Cloudera Data Science Workbench (CDSW)



We can also see messages that we sent to slack






Edge Processing with Jetson Nano Part 2 - Apache NiFi Flow

Edge Data Processing with Jetson Nano Part 2 - Apache NiFi - Process, Route, Transform, Store





Apache NiFi Flow to Process Data



We route images from the webcameras, logs from the runs and JSON sensor readings to appropriate processors.  We also convert JSON to AVRO for storage in Hadoop or S3 while running queries on the data to check temperatures of the device.   TensorFlow and Apache MXNet are run on the images in-stream as they pass through Apache NiFi.

Example Device and Deep Learning Data



Logs Returned From the Device



Push Some Results to Slack





Edge Data Processing with Jetson Nano Part 1 - Deploy, Setup and Ingest

Edge Data Processing with Jetson Nano Part 1 - Deploy, Setup and Ingest




















Configuring Executing Image Capture and Jetson Nano Classify Python Script



Configuring Tailing JSON Log



Configuring Acquiring Images from File Directory




Configuring the Remote Connection to NiFi






Example CEM Events





Simple NiFi Flow to Receive Remote Events


Apache NiFi Server receives from annotated images as well as JSON packets.


JSON Data Packet Example

{"uuid": "nano_uuid_kwo_20190719182103", "ipaddress": "192.168.1.254", "top1pct": 32.6171875, "top1": "desktop computer", "cputemp": "32.5", "gputemp": "31.5", "gputempf": "89", "cputempf": "90", "runtime": "5", "host": "jetsonnano", "filename": "/opt/demo/images/image_bei_20190719182103.jpg", "imageinput": "/opt/demo/images/2019-07-19_1421.jpg", "host_name": "jetsonnano", "macaddress": "de:07:5a:27:1e:7f", "end": "1563560468.7867181", "te": "4.806252717971802", "systemtime": "07/19/2019 14:21:08", "cpu": 55.8, "diskusage": "5225.1 MB", "memory": 57.5, "id": "20190719182103_fcaa94d4-7629-423a-b76e-714168e64677"}


Notes

It was very easy to setup a simple flow to execute out Deep Learning classification and data acquisition with Apache NiFi, MiNiFi and Cloudera EFM.  We can now do something with the data like push it to the cloud.

 Source:

Philadelphia Open Crime Data on Phoenix / HBase

This is an update to a previous article on accessing Philadelphia Open Crime Data and storing it in Apache Phoenix on HBase.

It seems an update to Spring Boot, Phoenix and Zeppelin make for a cleaner experience.

I also added a way to grab years of historical Policing data.

All NiFi, Zeppelin and Source is here:   https://github.com/tspannhw/phillycrime-springboot-phoenix




Part 1: https://community.hortonworks.com/articles/54947/reading-opendata-json-and-storing-into-phoenix-tab.html












We convert JSON to Phoenix Upserts.
We push JSON Records to HBase with PutHBaseReord.


Query Phoenix at the Command Line, Super Fast SQL



Resources




Example Data
"dc_dist":"18",
"dc_key":"200918067518",
"dispatch_date":"2009-10-02",
"dispatch_date_time":"2009-10-02T14:24:00.000",
"dispatch_time":"14:24:00",
"hour":"14",
"location_block":"S 38TH ST / MARKETUT ST",
"psa":"3",
"text_general_code":"Other Assaults",
"ucr_general":"800"}


Create a Phoenix Table

/usr/hdp/current/phoenix-client/bin/sqlline.py localhost:2181:/hbase-unsecure

CREATE TABLE phillycrime (dc_dist varchar,
dc_key varchar not null primary key,dispatch_date varchar,dispatch_date_time varchar,dispatch_time varchar,hour varchar,location_block varchar,psa varchar,
text_general_code varchar,ucr_general varchar);


Add NiFi / Spring Boot Connectivity to Phoenix
org.apache.phoenix.jdbc.PhoenixDriver
jdbc:phoenix:localhost:2181:/hbase-unsecure
/usr/hdp/3.1/phoenix/phoenix-client.jar
/usr/hdp/3.1/hbase/lib/hbase-client.jar
/etc/hbase/conf/hbase-site.xml



Run spring boot …