Creating Apache Kafka Topics Dynamically As Part of a DataFlow

Creating Apache Kafka Topics Dynamically As Part of a DataFlow


Sometimes when you are ingesting data at scale, whether it is from a Data Warehouse, Logs, REST API, IoT, Social Media or other sources, you may need to create new Apache Kafka topics depending on the type, variations, newness, schema, schema version or other changes.

Instead of having to manually create an Apache Kafka topic with Cloudera Streams Messaging Manager or Apache Kafka command line kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test), I would like to create it mid-stream based on names that are relevant to arriving data.   So this could be the name of the schema from the data, the table name of the origin date, some unique name generated with the data or another source.   For my example, I am generating a unique name via Apache NiFi Expression Language:

nifi${now():format('yyyyMMddmmss')}${UUID()}

This is a Proof of Concept, there are more features I would add if I wanted this for production use cases such as adding fields for Number Of Partitions and Number of Replicas.

Example Run



The processor is very easy to use, you merely enter your Kafka Broker URL, such as demo.hortonworks.com:6667.   And then the name of your Kafka topic.   The processor will validate to make sure you have a valid name which should be Alphanumeric with only the addition of periods, dashes and underscores.  It will run quickly and when completed, you can check out the results.  Your flowfile will be unchanged, but you will get new attributes as seen below.






You will get kafka.bootstrap (your Broker URL), kafka.client.id (a generate one time use client id), kafka.topic.<TOPIC_NAME> - with one for each Kafka topic that exists, kafka.topic.creation.success - a status of flag, kafka.topic.message - a message, kafka.topic.YourNewNamed one.






In IntelliJ I quickly developed this program using the Apache Kafka Admin API and some JUnit tests.

For a production use case I would probably just use the Cloudera SMM REST API to create topics.




It is trivial to call a REST API from Apache NiFi so I can use an Apache NiFi flow to orchestrate an entire Kafka lifecycle with management and monitoring for real-time interaction.

Source Code for Custom Apache NiFi Processor



Source Code fo Apache Kafka Shell Scripts


Edge Processing with Jetson Nano Part 3 - AI Integration

Edge Data Processing with Jetson Nano Part 3 - AI Integration






Top Level NiFi Flow Receiving MiNiFi Agent Messages





Overview of our Apache NiFi Flow For Processing









We format a new flow file to send to CDSW in JSON to the CDSW Job Environment




We Run Apache MXNet 1.3.1 (Java) SSD Against the Web Camera Image


Extract The Values From the FlowFile to Send to the Spark Job



Our JSON Results From the Logs






Log data has successfully arrived, consistent JSON rows are grabbed as they are written to the file
 



We can see the results of the Spark Job in Cloudera Data Science Workbench (CDSW)



We can also see messages that we sent to slack






Edge Processing with Jetson Nano Part 2 - Apache NiFi Flow

Edge Data Processing with Jetson Nano Part 2 - Apache NiFi - Process, Route, Transform, Store





Apache NiFi Flow to Process Data



We route images from the webcameras, logs from the runs and JSON sensor readings to appropriate processors.  We also convert JSON to AVRO for storage in Hadoop or S3 while running queries on the data to check temperatures of the device.   TensorFlow and Apache MXNet are run on the images in-stream as they pass through Apache NiFi.

Example Device and Deep Learning Data



Logs Returned From the Device



Push Some Results to Slack





Edge Data Processing with Jetson Nano Part 1 - Deploy, Setup and Ingest

Edge Data Processing with Jetson Nano Part 1 - Deploy, Setup and Ingest




















Configuring Executing Image Capture and Jetson Nano Classify Python Script



Configuring Tailing JSON Log



Configuring Acquiring Images from File Directory




Configuring the Remote Connection to NiFi






Example CEM Events





Simple NiFi Flow to Receive Remote Events


Apache NiFi Server receives from annotated images as well as JSON packets.


JSON Data Packet Example

{"uuid": "nano_uuid_kwo_20190719182103", "ipaddress": "192.168.1.254", "top1pct": 32.6171875, "top1": "desktop computer", "cputemp": "32.5", "gputemp": "31.5", "gputempf": "89", "cputempf": "90", "runtime": "5", "host": "jetsonnano", "filename": "/opt/demo/images/image_bei_20190719182103.jpg", "imageinput": "/opt/demo/images/2019-07-19_1421.jpg", "host_name": "jetsonnano", "macaddress": "de:07:5a:27:1e:7f", "end": "1563560468.7867181", "te": "4.806252717971802", "systemtime": "07/19/2019 14:21:08", "cpu": 55.8, "diskusage": "5225.1 MB", "memory": 57.5, "id": "20190719182103_fcaa94d4-7629-423a-b76e-714168e64677"}


Notes

It was very easy to setup a simple flow to execute out Deep Learning classification and data acquisition with Apache NiFi, MiNiFi and Cloudera EFM.  We can now do something with the data like push it to the cloud.

 Source:

Philadelphia Open Crime Data on Phoenix / HBase

This is an update to a previous article on accessing Philadelphia Open Crime Data and storing it in Apache Phoenix on HBase.

It seems an update to Spring Boot, Phoenix and Zeppelin make for a cleaner experience.

I also added a way to grab years of historical Policing data.

All NiFi, Zeppelin and Source is here:   https://github.com/tspannhw/phillycrime-springboot-phoenix




Part 1: https://community.hortonworks.com/articles/54947/reading-opendata-json-and-storing-into-phoenix-tab.html












We convert JSON to Phoenix Upserts.
We push JSON Records to HBase with PutHBaseReord.


Query Phoenix at the Command Line, Super Fast SQL



Resources




Example Data
"dc_dist":"18",
"dc_key":"200918067518",
"dispatch_date":"2009-10-02",
"dispatch_date_time":"2009-10-02T14:24:00.000",
"dispatch_time":"14:24:00",
"hour":"14",
"location_block":"S 38TH ST / MARKETUT ST",
"psa":"3",
"text_general_code":"Other Assaults",
"ucr_general":"800"}


Create a Phoenix Table

/usr/hdp/current/phoenix-client/bin/sqlline.py localhost:2181:/hbase-unsecure

CREATE TABLE phillycrime (dc_dist varchar,
dc_key varchar not null primary key,dispatch_date varchar,dispatch_date_time varchar,dispatch_time varchar,hour varchar,location_block varchar,psa varchar,
text_general_code varchar,ucr_general varchar);


Add NiFi / Spring Boot Connectivity to Phoenix
org.apache.phoenix.jdbc.PhoenixDriver
jdbc:phoenix:localhost:2181:/hbase-unsecure
/usr/hdp/3.1/phoenix/phoenix-client.jar
/usr/hdp/3.1/hbase/lib/hbase-client.jar
/etc/hbase/conf/hbase-site.xml



Run spring boot …

Powering Edge AI with the Powerful Jetson Nano

NVidia Jetson Nano Deep Learning Edge Device


Nano The Cat





Hardware:
Jetson Nano developer kit. Built around a 128-core Maxwell GPU and quad-core ARM A57 CPU running at 1.43 GHz and coupled with 4GB of LPDDR4 memory! This is power at the edge. I now have a favorite new device.

You need to add some kind of USB WiFi adaptor if you are not hardwired to ethernet. This is cheap and easy, I added a tiny $15 WiFi adapter and was off to the races.


Operating System:
Ubuntu 18.04

Library Setup:


sudo apt-get update -y
sudo apt-get install git cmake -y
sudo apt-get install libatlas-base-dev gfortran -y
sudo apt-get install libhdf5-serial-dev hdf5-tools -y

sudo apt-get install python3-dev -y
sudo apt-get install libcv-dev libopencv-dev -y
sudo apt-get install fswebcam -y
sudo apt-get install libv4l-dev -y
sudo apt-get install python-opencv -y
pip3 install psutil
pip2 install psutil
pip3.6 install easydict -U
pip3.6 install scikit-learn -U
pip3.6 install opencv-python -U --user
pip3.6 install numpy -U
pip3.6 install mxnet -U
pip3.6 install mxnet-mkl -U
pip3.6 install gluoncv --upgrade
sudo apt-get install libhdf5-serial-dev hdf5-tools libhdf5-dev zlib1g-dev zip libjpeg8-dev -y
sudo apt-get install python3-pip
sudo pip3 install -U pip
sudo pip3 install --pre --extra-index-url https://developer.download.nvidia.com/compute/redist/jp/v42 tensorflow-gpu
sudo nvpmodel -q --verbose
pip3 install numpy
pip3 install keras
git clone https://github.com/dusty-nv/jetson-inference
cd jetson-inference
git submodule update --init
tegrastats
pip3 install -U jetson-stats

Source:
https://github.com/tspannhw/iot-device-install
https://github.com/tspannhw/minifi-jetson-nano

IoT Setup

Download MiNiFi 0.6.0 Source from Cloudera and Build.
Download MiNiFi Java Agent (Binary)  and Unzip.

Follow these instructions.

On a Server

We want to hookup to EFM to make flow development, deploy, management and monitoring of MiNiFi agents trivial.   Download NiFi Registry.    You will also need Apache NiFi.

For a good walkthrough and hands-on demonstration see this workshop.

See these cool Jetson Nano Projects:  https://developer.nvidia.com/embedded/community/jetson-projects

Monitor Status
https://github.com/rbonghi/jetson_stats

Example Flow

It's easy to add MiNiFi Java or CPP Agents to the Jetson Nano.   I did a custom NiFi CPP 0.6.0 build for Jetson.  I did a quick flow to run the jetson-inference imagenet-console CPP binary on an image captured from a compatible Logitech USB Webcam with fswebcam.   I store the images to /opt/demo/images and pass it on the command line to the CPP console as a proof of concept.

#!/bin/bash

DATE=$(date +"%Y-%m-%d_%H%M")

fswebcam -q -r 1280x720 --no-banner /opt/demo/images/$DATE.jpg

/opt/demo/jetson-inference/build/aarch64/bin/imagenet-console  /opt/demo/images/$DATE.jpg  /opt/demo/images/out_$DATE.jpg
==
imagenet-console
  args (3):  0 [/opt/demo/jetson-inference/build/aarch64/bin/imagenet-console]  1 [/opt/demo/images/2019-07-01_1405.jpg]  2 [/opt/demo/images/out_2019-07-01_1405.jpg]


imageNet -- loading classification network model from:
         -- prototxt     networks/googlenet.prototxt
         -- model        networks/bvlc_googlenet.caffemodel
         -- class_labels networks/ilsvrc12_synset_words.txt
         -- input_blob   'data'
         -- output_blob  'prob'
         -- batch_size   2

[TRT]  TensorRT version 5.0.6
[TRT]  detected model format - caffe  (extension '.caffemodel')
[TRT]  desired precision specified for GPU: FASTEST
[TRT]  requested fasted precision for device GPU without providing valid calibrator, disabling INT8
[TRT]  native precisions detected for GPU:  FP32, FP16
[TRT]  selecting fastest native precision for GPU:  FP16
[TRT]  attempting to open engine cache file /opt/demo/jetson-inference/build/aarch64/bin/networks/bvlc_googlenet.caffemodel.2.1.GPU.FP16.engine
[TRT]  loading network profile from engine cache... /opt/demo/jetson-inference/build/aarch64/bin/networks/bvlc_googlenet.caffemodel.2.1.GPU.FP16.engine
[TRT]  device GPU, /opt/demo/jetson-inference/build/aarch64/bin/networks/bvlc_googlenet.caffemodel loaded
[TRT]  device GPU, CUDA engine context initialized with 2 bindings
[TRT]  binding -- index   0
               -- name    'data'
               -- type    FP32
               -- in/out  INPUT
               -- # dims  3
               -- dim #0  3 (CHANNEL)
               -- dim #1  224 (SPATIAL)
               -- dim #2  224 (SPATIAL)
[TRT]  binding -- index   1
               -- name    'prob'
               -- type    FP32
               -- in/out  OUTPUT
               -- # dims  3
               -- dim #0  1000 (CHANNEL)
               -- dim #1  1 (SPATIAL)
               -- dim #2  1 (SPATIAL)
[TRT]  binding to input 0 data  binding index:  0
[TRT]  binding to input 0 data  dims (b=2 c=3 h=224 w=224) size=1204224
[cuda]  cudaAllocMapped 1204224 bytes, CPU 0x100e30000 GPU 0x100e30000
[TRT]  binding to output 0 prob  binding index:  1
[TRT]  binding to output 0 prob  dims (b=2 c=1000 h=1 w=1) size=8000
[cuda]  cudaAllocMapped 8000 bytes, CPU 0x100f60000 GPU 0x100f60000
device GPU, /opt/demo/jetson-inference/build/aarch64/bin/networks/bvlc_googlenet.caffemodel initialized.
[TRT]  networks/bvlc_googlenet.caffemodel loaded
imageNet -- loaded 1000 class info entries
networks/bvlc_googlenet.caffemodel initialized.








Reference: