Creating Apache Kafka Topics Dynamically As Part of a DataFlow

Creating Apache Kafka Topics Dynamically As Part of a DataFlow

Sometimes when you are ingesting data at scale, whether it is from a Data Warehouse, Logs, REST API, IoT, Social Media or other sources, you may need to create new Apache Kafka topics depending on the type, variations, newness, schema, schema version or other changes.

Instead of having to manually create an Apache Kafka topic with Cloudera Streams Messaging Manager or Apache Kafka command line --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test), I would like to create it mid-stream based on names that are relevant to arriving data.   So this could be the name of the schema from the data, the table name of the origin date, some unique name generated with the data or another source.   For my example, I am generating a unique name via Apache NiFi Expression Language:


This is a Proof of Concept, there are more features I would add if I wanted this for production use cases such as adding fields for Number Of Partitions and Number of Replicas.

Example Run

The processor is very easy to use, you merely enter your Kafka Broker URL, such as   And then the name of your Kafka topic.   The processor will validate to make sure you have a valid name which should be Alphanumeric with only the addition of periods, dashes and underscores.  It will run quickly and when completed, you can check out the results.  Your flowfile will be unchanged, but you will get new attributes as seen below.

You will get kafka.bootstrap (your Broker URL), (a generate one time use client id), kafka.topic.<TOPIC_NAME> - with one for each Kafka topic that exists, kafka.topic.creation.success - a status of flag, kafka.topic.message - a message, kafka.topic.YourNewNamed one.

In IntelliJ I quickly developed this program using the Apache Kafka Admin API and some JUnit tests.

For a production use case I would probably just use the Cloudera SMM REST API to create topics.

It is trivial to call a REST API from Apache NiFi so I can use an Apache NiFi flow to orchestrate an entire Kafka lifecycle with management and monitoring for real-time interaction.

Source Code for Custom Apache NiFi Processor

Source Code fo Apache Kafka Shell Scripts

Edge Processing with Jetson Nano Part 3 - AI Integration

Edge Data Processing with Jetson Nano Part 3 - AI Integration

Top Level NiFi Flow Receiving MiNiFi Agent Messages

Overview of our Apache NiFi Flow For Processing

We format a new flow file to send to CDSW in JSON to the CDSW Job Environment

We Run Apache MXNet 1.3.1 (Java) SSD Against the Web Camera Image

Extract The Values From the FlowFile to Send to the Spark Job

Our JSON Results From the Logs

Log data has successfully arrived, consistent JSON rows are grabbed as they are written to the file

We can see the results of the Spark Job in Cloudera Data Science Workbench (CDSW)

We can also see messages that we sent to slack

Edge Processing with Jetson Nano Part 2 - Apache NiFi Flow

Edge Data Processing with Jetson Nano Part 2 - Apache NiFi - Process, Route, Transform, Store

Apache NiFi Flow to Process Data

We route images from the webcameras, logs from the runs and JSON sensor readings to appropriate processors.  We also convert JSON to AVRO for storage in Hadoop or S3 while running queries on the data to check temperatures of the device.   TensorFlow and Apache MXNet are run on the images in-stream as they pass through Apache NiFi.

Example Device and Deep Learning Data

Logs Returned From the Device

Push Some Results to Slack

Edge Data Processing with Jetson Nano Part 1 - Deploy, Setup and Ingest

Edge Data Processing with Jetson Nano Part 1 - Deploy, Setup and Ingest

Configuring Executing Image Capture and Jetson Nano Classify Python Script

Configuring Tailing JSON Log

Configuring Acquiring Images from File Directory

Configuring the Remote Connection to NiFi

Example CEM Events

Simple NiFi Flow to Receive Remote Events

Apache NiFi Server receives from annotated images as well as JSON packets.

JSON Data Packet Example

{"uuid": "nano_uuid_kwo_20190719182103", "ipaddress": "", "top1pct": 32.6171875, "top1": "desktop computer", "cputemp": "32.5", "gputemp": "31.5", "gputempf": "89", "cputempf": "90", "runtime": "5", "host": "jetsonnano", "filename": "/opt/demo/images/image_bei_20190719182103.jpg", "imageinput": "/opt/demo/images/2019-07-19_1421.jpg", "host_name": "jetsonnano", "macaddress": "de:07:5a:27:1e:7f", "end": "1563560468.7867181", "te": "4.806252717971802", "systemtime": "07/19/2019 14:21:08", "cpu": 55.8, "diskusage": "5225.1 MB", "memory": 57.5, "id": "20190719182103_fcaa94d4-7629-423a-b76e-714168e64677"}


It was very easy to setup a simple flow to execute out Deep Learning classification and data acquisition with Apache NiFi, MiNiFi and Cloudera EFM.  We can now do something with the data like push it to the cloud.


Philadelphia Open Crime Data on Phoenix / HBase

This is an update to a previous article on accessing Philadelphia Open Crime Data and storing it in Apache Phoenix on HBase.

It seems an update to Spring Boot, Phoenix and Zeppelin make for a cleaner experience.

I also added a way to grab years of historical Policing data.

All NiFi, Zeppelin and Source is here:

Part 1:

We convert JSON to Phoenix Upserts.
We push JSON Records to HBase with PutHBaseReord.

Query Phoenix at the Command Line, Super Fast SQL


Example Data
"location_block":"S 38TH ST / MARKETUT ST",
"text_general_code":"Other Assaults",

Create a Phoenix Table

/usr/hdp/current/phoenix-client/bin/ localhost:2181:/hbase-unsecure

CREATE TABLE phillycrime (dc_dist varchar,
dc_key varchar not null primary key,dispatch_date varchar,dispatch_date_time varchar,dispatch_time varchar,hour varchar,location_block varchar,psa varchar,
text_general_code varchar,ucr_general varchar);

Add NiFi / Spring Boot Connectivity to Phoenix

Run spring boot …

Powering Edge AI with the Powerful Jetson Nano

NVidia Jetson Nano Deep Learning Edge Device

Nano The Cat

Jetson Nano developer kit. Built around a 128-core Maxwell GPU and quad-core ARM A57 CPU running at 1.43 GHz and coupled with 4GB of LPDDR4 memory! This is power at the edge. I now have a favorite new device.

You need to add some kind of USB WiFi adaptor if you are not hardwired to ethernet. This is cheap and easy, I added a tiny $15 WiFi adapter and was off to the races.

Operating System:
Ubuntu 18.04

Library Setup:

sudo apt-get update -y
sudo apt-get install git cmake -y
sudo apt-get install libatlas-base-dev gfortran -y
sudo apt-get install libhdf5-serial-dev hdf5-tools -y

sudo apt-get install python3-dev -y
sudo apt-get install libcv-dev libopencv-dev -y
sudo apt-get install fswebcam -y
sudo apt-get install libv4l-dev -y
sudo apt-get install python-opencv -y
pip3 install psutil
pip2 install psutil
pip3.6 install easydict -U
pip3.6 install scikit-learn -U
pip3.6 install opencv-python -U --user
pip3.6 install numpy -U
pip3.6 install mxnet -U
pip3.6 install mxnet-mkl -U
pip3.6 install gluoncv --upgrade
sudo apt-get install libhdf5-serial-dev hdf5-tools libhdf5-dev zlib1g-dev zip libjpeg8-dev -y
sudo apt-get install python3-pip
sudo pip3 install -U pip
sudo pip3 install --pre --extra-index-url tensorflow-gpu
sudo nvpmodel -q --verbose
pip3 install numpy
pip3 install keras
git clone
cd jetson-inference
git submodule update --init
pip3 install -U jetson-stats


IoT Setup

Download MiNiFi 0.6.0 Source from Cloudera and Build.
Download MiNiFi Java Agent (Binary)  and Unzip.

Follow these instructions.

On a Server

We want to hookup to EFM to make flow development, deploy, management and monitoring of MiNiFi agents trivial.   Download NiFi Registry.    You will also need Apache NiFi.

For a good walkthrough and hands-on demonstration see this workshop.

See these cool Jetson Nano Projects:

Monitor Status

Example Flow

It's easy to add MiNiFi Java or CPP Agents to the Jetson Nano.   I did a custom NiFi CPP 0.6.0 build for Jetson.  I did a quick flow to run the jetson-inference imagenet-console CPP binary on an image captured from a compatible Logitech USB Webcam with fswebcam.   I store the images to /opt/demo/images and pass it on the command line to the CPP console as a proof of concept.


DATE=$(date +"%Y-%m-%d_%H%M")

fswebcam -q -r 1280x720 --no-banner /opt/demo/images/$DATE.jpg

/opt/demo/jetson-inference/build/aarch64/bin/imagenet-console  /opt/demo/images/$DATE.jpg  /opt/demo/images/out_$DATE.jpg
  args (3):  0 [/opt/demo/jetson-inference/build/aarch64/bin/imagenet-console]  1 [/opt/demo/images/2019-07-01_1405.jpg]  2 [/opt/demo/images/out_2019-07-01_1405.jpg]

imageNet -- loading classification network model from:
         -- prototxt     networks/googlenet.prototxt
         -- model        networks/bvlc_googlenet.caffemodel
         -- class_labels networks/ilsvrc12_synset_words.txt
         -- input_blob   'data'
         -- output_blob  'prob'
         -- batch_size   2

[TRT]  TensorRT version 5.0.6
[TRT]  detected model format - caffe  (extension '.caffemodel')
[TRT]  desired precision specified for GPU: FASTEST
[TRT]  requested fasted precision for device GPU without providing valid calibrator, disabling INT8
[TRT]  native precisions detected for GPU:  FP32, FP16
[TRT]  selecting fastest native precision for GPU:  FP16
[TRT]  attempting to open engine cache file /opt/demo/jetson-inference/build/aarch64/bin/networks/bvlc_googlenet.caffemodel.2.1.GPU.FP16.engine
[TRT]  loading network profile from engine cache... /opt/demo/jetson-inference/build/aarch64/bin/networks/bvlc_googlenet.caffemodel.2.1.GPU.FP16.engine
[TRT]  device GPU, /opt/demo/jetson-inference/build/aarch64/bin/networks/bvlc_googlenet.caffemodel loaded
[TRT]  device GPU, CUDA engine context initialized with 2 bindings
[TRT]  binding -- index   0
               -- name    'data'
               -- type    FP32
               -- in/out  INPUT
               -- # dims  3
               -- dim #0  3 (CHANNEL)
               -- dim #1  224 (SPATIAL)
               -- dim #2  224 (SPATIAL)
[TRT]  binding -- index   1
               -- name    'prob'
               -- type    FP32
               -- in/out  OUTPUT
               -- # dims  3
               -- dim #0  1000 (CHANNEL)
               -- dim #1  1 (SPATIAL)
               -- dim #2  1 (SPATIAL)
[TRT]  binding to input 0 data  binding index:  0
[TRT]  binding to input 0 data  dims (b=2 c=3 h=224 w=224) size=1204224
[cuda]  cudaAllocMapped 1204224 bytes, CPU 0x100e30000 GPU 0x100e30000
[TRT]  binding to output 0 prob  binding index:  1
[TRT]  binding to output 0 prob  dims (b=2 c=1000 h=1 w=1) size=8000
[cuda]  cudaAllocMapped 8000 bytes, CPU 0x100f60000 GPU 0x100f60000
device GPU, /opt/demo/jetson-inference/build/aarch64/bin/networks/bvlc_googlenet.caffemodel initialized.
[TRT]  networks/bvlc_googlenet.caffemodel loaded
imageNet -- loaded 1000 class info entries
networks/bvlc_googlenet.caffemodel initialized.