Using Raspberry Pi 3B+ with Apache NiFi MiNiFi and Google Coral Accelerator and Pimoroni Inky Phat


Using Raspberry Pi 3B+ with Apache NiFi MiNiFi and Google Coral Accelerator and Pimoroni Inky Phat

Architecture



Introduction

First we need to unbox our new goodies.   The Inky Phat is an awesome E-Ink display with low power usage that stays displayed after shutdown! 

Next I added a new Google Coral Edge TPU ML Accelerator USB Coprocessor to a new Raspberry Pi 3B+.    This was so easy to integrate and get up and running.

Let's unbox this beautiful device (but be careful when it runs it can get really hot and there is a warning in the instructions).   So I run this on top of an aluminum case and with a big fan on it.







Pimoroni Inky Phat

It is pretty easy to set this up and it provides a robust Python library to write to our E-Ink display.   You can see an example screen here.

https://github.com/pimoroni/inky
Pimoroni Inky pHAT ePaper eInk Display in Red


Pimoroni Inky Phat (Red)


https://shop.pimoroni.com/products/inky-phat
https://github.com/pimoroni/inky
https://pillow.readthedocs.io/en/stable/reference/ImageDraw.html
https://learn.pimoroni.com/tutorial/sandyj/getting-started-with-inky-phat


Install Some Python Libraries and Debian Install for Inky PHAT and Coral

pip3 install font_fredoka_one
pip3 install geocoder
pip3 install fswebcam
sudo apt-get install fe
pip3 install psutil
pip3 install font_hanken_grotesk
pip3 install font_intuitive
wget http://storage.googleapis.com/cloud-iot-edge-pretrained-models/edgetpu_api.tar.gz
These libraries are for the Inky, it needs fonts to write.   The last TAR is for the Edge device and is a fast install documented well by Google.

Download Apache NiFi - MiNiFi Java Agent

https://nifi.apache.org/minifi/download.html

Next up, the most important piece.  You will need to have JDK 8 installed on your device if you are using the Java agent.   You can also use the MiniFi C++ Agent but that may require building it for your OS/Platform.   That has some interesting Python running abilities.


Google Coral Documentation - Google Edge TPU
  • Google Edge TPU ML accelerator coprocessor
  • USB 3.0 Type-C socket
  • Supports Debian Linux on host CPU
  • ASIC designed by Google that provides high performance ML inferencing for TensorFlow Lite models


Using Pretrained Tensorflow Lite Model:

Inception V4 (ImageNet)
Recognizes 1,000 types of objects
Dataset: ImageNet
Input size: 299x299

Let's run a flow!

I can run this Python3 script every 10 seconds without issues that includes capturing the picture, running it through classification with the model, forming JSON data, grabbing network and device stats, forming a JSON file and completing in under 5 seconds.   Our MiNiFi agent is scheduled to call the script every 10 seconds and grab images after 60 seconds. 


MiNiFi Flow



Flow Overview



Apache NiFi Flow





Results (Once an hour we update our E-Ink Display with Date, IP, Run Time, Label 1)





Example JSON Data

{"endtime": "1552164369.27", "memory": "19.1", "cputemp": "32", "ipaddress": "192.168.1.183", "diskusage": "50336.5", "score_2": "0.14", "score_1": "0.68", "runtime": "4.74", "host": "mv2", "starttime": "03/09/2019 15:46:04", "label_1": "hard disc, hard disk, fixed disk", "uuid": "20190309204609_05c9a240-d801-4bac-b029-e5bf38c02d40", "label_2": "buckle", "systemtime": "03/09/2019 15:46:09"}

Example Slack Alert


PS3 Eye USB Camera Capturing an Image


Image It Captured




Source Code

https://github.com/tspannhw/nifi-minifi-coral

Convert Your Flow To Config.YML For MiniFi (Look for a major innovation here soon).

 ./config.sh transform Coral_MiniFi_Agent_Flow.xml config.yml
config.sh: JAVA_HOME not set; results may vary

Java home: 
MiNiFi Toolkit home: /Volumes/TSPANN/2019/apps/minifi-toolkit-0.5.0



No validation errors found in converted configuration.


Example Call From MiNiFi 0.5.0 Java Agent to Apache NiFi 1.9.0 Server


2019-03-09 16:21:01,877 INFO [Timer-Driven Process Thread-10] o.a.nifi.remote.StandardRemoteGroupPort RemoteGroupPort[name=Coral Input,targets=http://hw13125.local:8080/nifi] Successfully sent [StandardFlowFileRecord[uuid=eab17784-2e76-4438-a60a-fd67df37a102,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1552166446123-3, container=default, section=3], offset=362347, length=685083],offset=0,name=d74bc911bfd167fe79d5a3aa780004fd66fa6d,size=685083], StandardFlowFileRecord[uuid=eb979d09-a936-4b2d-82ff-d204f9d768eb,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1552166446123-3, container=default, section=3], offset=1047430, length=361022],offset=0,name=2019-03-09_1541.jpg,size=361022], StandardFlowFileRecord[uuid=343a4c91-b863-440e-ac81-1f68d6210792,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1552166446123-3, container=default, section=3], offset=1408452, length=668],offset=0,name=3026822c780724b39e826230bdef43f8ed9786,size=668], StandardFlowFileRecord[uuid=97df9d3a-dc3c-4d03-b533-7b75c3180032,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1552166446123-3, container=default, section=3], offset=1409120, length=2133417],offset=0,name=abb6feaac5bda3c6d3660e7593cc4ef2e1cfce,size=2133417]] (3.03 MB) to http://hw13125.local:8080/nifi-api in 1416 milliseconds at a rate of 2.14 MB/sec


References







Apache NiFi 101

Let's get learning Apache NiFi Now!

General

https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.3.1/apache-nifi-overview/content/what-is-apache-nifi.html

Using Record Path

https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.3.1/record-path-guide/content/overview.html

Using Expression Language

https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.3.1/expression-language-guide/content/overview.html

Apache NiFi Configuration

https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.3.1/nifi-configuration-best-practices/content/configuration-best-practices.html

Crash Courses

https://www.slideshare.net/Hadoop_Summit/apache-nifi-crash-course-131483547

https://www.youtube.com/watch?v=fblkgr1PJ0o&t=3961s

DataWorks Summit 

https://www.youtube.com/user/HadoopSummit/videos

High Level

https://www.slideshare.net/hortonworks/hortonworks-dataflow-hdf-33-taking-stream-processing-to-the-next-level-126209768

https://www.slideshare.net/HadoopSummit/dataflow-with-apache-nifi 

https://www.slideshare.net/cloudera/introducing-cloudera-dataflow-cdf-21319

Apache NiFi Integration with Apache Spark

https://community.hortonworks.com/articles/171787/hdf-31-executing-apache-spark-via-executesparkinte.html

https://community.hortonworks.com/articles/171893/hdf-31-executing-apache-spark-via-executesparkinte-1.html

DevOps

https://community.hortonworks.com/articles/207858/more-devops-for-hdf-apache-nifi-and-friends.html

https://community.hortonworks.com/articles/167187/provenance-site-to-site-reporting.html


MiNiFi

https://www.slideshare.net/bunkertor/handson-deep-dive-with-minifi-and-apache-mxnet



Useful Learning Materials

https://pierrevillard.com/best-of-nifi/ 

https://pierrevillard.com/category/apache-nifi/

https://dzone.com/articles/integration-of-apache-nifi-and-cloudera-data-scien

https://community.hortonworks.com/articles/163776/parsing-any-document-with-apache-nifi-15-with-apac.html

https://community.hortonworks.com/articles/177370/extracting-html-from-pdf-excel-and-word-documents.html

https://community.hortonworks.com/articles/189213/etl-with-lookups-with-apache-hbase-and-apache-nifi.html

https://community.hortonworks.com/articles/222605/converting-powerpoint-presentations-into-french-fr.html

https://dzone.com/articles/sochain-bitcoin-dogecoin-and-litecoin-data-rest-ap

https://dzone.com/articles/dataworks-summit-2018-berlin-apache-nifi-wrapup

https://dzone.com/articles/real-time-stock-processing-with-apache-nifi-and-ap

https://dzone.com/articles/using-websockets-with-apache-nifi


Example Custom Processors




























Example Videos

https://www.youtube.com/watch?v=Q4dSGPvqXSA

https://www.youtube.com/watch?v=ksDKNp6Z4BE

https://www.youtube.com/watch?v=5w6rV7562xM

















Best Practices

https://www.youtube.com/watch?v=rF7FV8cCYIc


Apache Kafka Best Practices

https://www.slideshare.net/HadoopSummit/apache-kafka-best-practices

Populating Apache Phoenix HBase Tables and Apache Hive Tables from RDBMS in real-time with streaming from Apache NiFi

Populating Apache Phoenix HBase Tables and Apache Hive Tables from RDBMS in real-time with streaming from Apache NiFi.

Article

INGESTING RDBMS DATA
I previously posted an article on ingesting and converting data (https://community.hortonworks.com/articles/64069/converting-a-large-json-file-into-csv.html). Once you have a SQL database loaded, you will eventually need to store your data in your one unified datalake. This is quite simple with NiFi. If you have a specialized tool that reads from your RDBMS logs and sends them to Kafka or JMS, that would be easy to ingest as well. For those wishing to stay open source, NiFi works great. If you don't have a good increasing key to use, you can add an article one that increases on every insert. Almost every database supports this from MariaDB to Oracle.
  1. ALTER TABLE `useraccount` ADD COLUMN `id` INT AUTO_INCREMENT UNIQUE FIRST;
For mine, I just added an autoincrement id column to be my trigger.
For Apache NiFi, you will need connections to all your sources and sinks. So I need a DB Connection Pool for Apache Phoenix and MySQL (DBCPConnectionPool) as well as Hive (HiveConnectionPool).
Tools Required:
  • RDMS (I am using MySQL)
  • HDF 2.0 (NiFi 1.0.0+)
  • HDP 2.4+ (I am using HDP 2.5) with HBase and Phoenix enabled and running, HDFS, YARN and Hive running.
  • Optional: Apache Zeppelin for quick data analysis and validation
To build a SQL database, I needed a source of interesting and plentiful data.
So I used the excellent free API: https://api.randomuser.me/. It's easy to get this URL to return 5,000 formatted JSON results via the extra parameters: ?results=3&format=pretty.
This API returns JSON in this format that requires some basic transformation (easily done in NiFi).
  1. {"results":[
  2. {"gender":"male",
  3. "name":{"title":"monsieur","first":"lohan","last":"marchand"},
  4. "location":{"street":"6684 rue jean-baldassini","city":"auboranges","state":"schwyz","postcode":9591},
  5. "email":"lohan.marchand@example.com",
  6. "login":{"username":"biggoose202","password":"esther","salt":"QIU1HBsr","md5":"9e60da6d4490cd6d102e8010ac98f283","sha1":"3de3ea419da1afe5c83518f8b46f157895266d17","sha256":"c6750c1a5bd18cac01c63d9e58a57d75520861733666ddb7ea6e767a7460479b"},
  7. "dob":"1965-01-28 03:56:58",
  8. "registered":"2014-07-26 11:06:46",
  9. "phone":"(849)-890-5523",
  10. "cell":"(395)-127-9369",
  11. "id":{"name":"AVS","value":"756.OUVK.GFAB.51"},
  12. "picture":{"large":"https://randomuser.me/api/portraits/men/69.jpg","medium":"https://randomuser.me/api/portraits/med/men/69.jpg","thumbnail":"https://randomuser.me/api/portraits/thumb/men/69.jpg"},"nat":"CH"}]
Then I created a MySQL table to populate with JSON data.
  1. drop table useraccount; create table useraccount(
  2. gender varchar(200),
  3. title varchar(200),
  4. first varchar(200),
  5. last varchar(200),
  6. street varchar(200),
  7. city varchar(200),
  8. state varchar(200),
  9. postcode varchar(200),
  10. email varchar(200),
  11. username varchar(200),
  12. password varchar(200),
  13. salt varchar(200),
  14. md5 varchar(200),
  15. sha1 varchar(200),
  16. sha256 varchar(200),
  17. dob varchar(200),
  18. registered varchar(200),
  19. phone varchar(200),
  20. cell varchar(200),
  21. name varchar(200),
  22. value varchar(200),
  23. large varchar(200),
  24. medium varchar(200),
  25. thumbnail varchar(200),
  26. nat varchar(200));
I created a Phoenix table ontop of HBase to hold data:
  1. create table useraccount(
  2. gender varchar,
  3. title varchar,
  4. firstname varchar,
  5. lastname varchar,
  6. street varchar,
  7. city varchar,
  8. state varchar,
  9. postcode varchar,
  10. email varchar,
  11. username varchar,
  12. password varchar,
  13. salt varchar,
  14. md5 varchar not null primary key,
  15. sha1 varchar,
  16. sha256 varchar,
  17. dob varchar,
  18. registered varchar,
  19. phone varchar,
  20. cell varchar,
  21. name varchar,
  22. value2 varchar,
  23. large varchar,
  24. medium varchar,
  25. thumbnail varchar,
  26. nat varchar);
Step 1: QueryDatabaseTable
Reads from MySQL tables. This processor just needs the MySQL Connection, table name: useraccount and column: id.
With have two forks from this query table.
Fork 1
Step 2: ConvertAvroToJSON
Use Array
You will get arrays of JSON that look like this:
  1. {
  2. "id" : 656949,
  3. "gender" : "female",
  4. "title" : "madame",
  5. "first" : "amandine",
  6. "last" : "sanchez",
  7. "street" : "8604 place paul-duquaire",
  8. "city" : "savigny",
  9. "state" : "genève",
  10. "postcode" : "5909",
  11. "email" : "amandine.sanchez@example.com",
  12. "username" : "ticklishmeercat183",
  13. "password" : "hillary",
  14. "salt" : "Sgq7HHP1",
  15. "md5" : "d82d6c3524f3a1118399113e6c43ed31",
  16. "sha1" : "23ce2b372f94d39fb949d95e81e82bece1e06a4a",
  17. "sha256" : "49d7e92a2815df1d5fd991ce9ebbbcdffee4e0e7fe398bc32f0331894cae1154",
  18. "dob" : "1983-05-22 15:16:49",
  19. "registered" : "2011-02-06 22:03:37",
  20. "phone" : "(518)-683-8709",
  21. "cell" : "(816)-306-5232",
  22. "name" : "AVS",
  23. "value" : "756.IYWK.GJBH.35",
  24. "large" : "https://randomuser.me/api/portraits/women/50.jpg",
  25. "medium" : "https://randomuser.me/api/portraits/med/women/50.jpg",
  26. "thumbnail" : "https://randomuser.me/api/portraits/thumb/women/50.jpg",
  27. "nat" : "CH"
  28. }
Step 3: SplitJSON
Use: $.* to split all the arrays into individual JSON records.
Step 4: EvaluateJSONPath
You need to pull out each attribute you want and name it, example
cell for $.cell
See the guide to JSONPath with testing tool here.
Step 5: ReplaceText
Here we format the SQL from the attributes we just parsed from JSON:
  1. upsert into useraccount (gender,title,firstname,lastname,street,city,state,postcode,email,
  2. username,password,salt,md5,sha1,sha256,dob,registered,phone,cell,name,value2,large,medium,thumbnail,nat)
  3. values ('${'gender'}','${'title'}','${'first'}','${'last'}','${'street'}','${'city'}','${'state'}','${'postcode'}',
  4. '${'email'}','${'username'}','${'password'}','${'salt'}','${'md5'}','${'sha1'}','${'sha256'}','${'dob'}',
  5. '${'registered'}','${'phone'}','${'cell'}','${'name'}','${'value'}','${'large'}','${'medium'}','${'thumbnail'}','${'nat'}' )
Step 6: PutSQL
With an example Batch Size of 100, we connect to our Phoenix DB Connection Pool.
Fork 2
Step 2: UpdateAttribute
We set orc.table to useraccount
Step 3: ConvertAvroToORC
We set our configuration files for Hive: /etc/hive/conf/hive-site.xml, 64MB stripe, and importantly Hive Table Name to ${orc.table}
Step 4: PutHDFS
Set out configuration /etc/hadoop/conf/core-site.xml and a directory you have access to write to for storing the ORC files.
Step 5: ReplaceText
Search Value: (?s:^.*$)
Replacement Value: ${hive.ddl} LOCATION '${absolute.hdfs.path}'
Always replace and entire text.
Step 6: PutHiveQL
You need to connect to your Hive Connection.
You will see the resulting ORC files in your HDFS directory
  1. [root@tspanndev12 demo]# hdfs dfs -ls /orcdata
  2. Found 2 items
  3. -rw-r--r-- 3 root hdfs 246806 2016-10-29 01:24 /orcdata/2795061363634412.orc
  4. -rw-r--r-- 3 root hdfs 246829 2016-10-29 17:25 /orcdata/2852682816977877.orc
After my first few batches of data are ingested, I check them in Apache Zeppelin. Looks good.
The data has also been loaded into Apache Hive.

Getting Started With Custom Processor for Apache NiFi

Writing Custom Processors for Apache NiFi is easy.   I recommend you start with Apache NiFi 1.20.

You can download from Cloudera or the Apache NiFi site.

You will also need JDK 17 and Maven.   And IDE like Eclipse or IntelliJ IDEA is helpful as well.  I recommend a Linux or OSX machine to develop from.

Just build a new processor shell using:  https://cwiki.apache.org/confluence/display/NIFI/Maven+Projects+for+Extensions




https://github.com/tspannhw/nifi-mxnetinference-processor
https://github.com/tspannhw/nifi-extracttext-processor
https://github.com/tspannhw/nifi-langdetect-processor
https://github.com/tspannhw/nifi-attributecleaner-processor
https://github.com/tspannhw/nifi-convertjsontoddl-processor
https://github.com/tspannhw/nifi-postimage-processor
https://github.com/tspannhw/GetWebCamera
https://github.com/tspannhw/nifi-imageextractor-processor
https://github.com/tspannhw/nifi-puttwitter-processor
https://github.com/tspannhw/nifi-tensorflow-processor

Good Articles:

https://medium.com/hashmapinc/creating-custom-processors-and-controllers-in-apache-nifi-e14148740ea

https://www.nifi.rocks/developing-a-custom-apache-nifi-processor-json/


References:

Apache NiFi Developers Guide
http://nifi.apache.org/docs.html