Showing posts with label apache-flink. Show all posts
Showing posts with label apache-flink. Show all posts

Did the user really ask for Exactly Once? Fault Tolerance

Exactly Once Requirements

It is very tricky and can cause performance degradation, if your user could just use at least once, then always go with that.    Having data sinks like Kudu where you can do an upsert makes exactly once less needed.

https://docs.cloudera.com/csa/1.2.0/datastream-connectors/topics/csa-kafka.html

Apache Flink, Apache NiFi Stateless and Apache Kafka can participate in that.

For CDF Stream Processing and Analytics with Apache Flink 1.10 Streaming:

Both Kafka sources and sinks can be used with exactly once processing guarantees when checkpointing is enabled.


End-to-End Guaranteed Exactly-Once Record Delivery

The Data Source and Data Sink to need to support exactly-once state semantics and take part in checkpointing.


Data Sources
  • Apache Kafka - must have Exactly-Once selected, transactions enabled and correct driver.

Select:  Semantic.EXACTLY_ONCE


Data Sinks
  • HDFS BucketingSink
  • Apache Kafka



Reference


The Rise of the Mega Edge (FLaNK)

At one point edge devices were cheap, low energy and low powered.   They may have some old WiFi and a single core CPU running pretty slow.    Now power, memory, GPUs, custom processors and substantial power has come to the edge.

Sitting on my desk is the NVidia Xaver NX which is the massively powerful machine that can easily be used for edge computing while sporting 8GB of fast RAM, a 384 NVIDIA CUDA® cores and 48 Tensor cores GPU, a 6 core 64-bit ARM CPU and is fast.   This edge device would make a great workstation and is now something that can be affordably deployed in trucks, plants, sensors and other Edge and IoT applications.  


Next that titan device is the inexpensive hobby device, the Raspberry Pi 4 that now sports 8 GB of LPDDR4 RAM, 4 core 64-bit ARM CPU and is speedy!   It can also be augmented with a Google Coral TPU or Intel Movidius 2 Neural Compute Stick.   


These boxes come with fast networking, bluetooth and the modern hardware running in small edge devices that can now deployed en masse.    Enabling edge computing, fast data capture, smart processing and integration with servers and cloud services.    By adding Apache NiFi's subproject MiNiFi C++ and Java agents we can easily integrate these powerful devices into a Streaming Data Pipeline.   We can now build very powerful flows from edge to cloud with Apache NiFi, Apache Flink, Apache Kafka  (FLaNK) and Apache NiFi - MiNiFi.    I can run AI, Deep Learning, Machine Learning including Apache MXNet, DJL, H2O, TensorFlow, Apache OpenNLP and more at any and all parts of my data pipeline.   I can push models to my edge device that now has a powerful GPU/TPU and adequate CPU, networking and RAM to do more than simple classification.    The NVIDIA Jetson Xavier NX will run multiple real-time inference streams at 60 fps on multiple cameras.  

I can run live SQL against these events at every segment of the data pipeline and combine with machine learning, alert checks and flow programming.   It's now easy to build and deploy applications from edge to cloud.

I'll be posting some examples in my next article showing some simple examples.

By next year, 12 or 16 GB of RAM may be a common edge device RAM, perhaps 2 CPUs with 8 cores, multiple GPUs and large fast SSD storage.   My edge swarm may be running much of my computing power as my flows running elastically on public and private cloud scale up and down based on demand in real-time.


Explore Enterprise Apache Flink with Cloudera Streaming Analytics - CSA 1.2


Explore Enterprise Apache Flink with Cloudera Streaming Analytics - CSA 1.2

What's New in Cloudera Streaming Analytics


Try out the tutorials now:   https://github.com/cloudera/flink-tutorials

So let's get our Apache Flink on, as part of my FLaNK Stack series I'll show you some fun things we can do with Apache Flink + Apache Kafka + Apache NiFi.

We will look at some of updates in Apache Flink 1.10 including the SQL Client and API.

We are working with Apache Flink 1.10, Apache NiFi 1.11.4 and Apache Kafka 2.4.1.

The SQL features are strong and we will take a look at what we can do.


Table connectors
  • Kafka
  • Kudu
  • Hive (through catalog)

Data formats (Kafka)
  • JSON
  • Avro
  • CSV



Building a DataStream Application in Flink

Build A Flink Project

mvn archetype:generate                               \
      -DarchetypeGroupId=org.apache.flink              \
      -DarchetypeArtifactId=flink-quickstart-java      \
      -DarchetypeVersion=1.10.0

References:










Commonly Used TCP/IP Ports in Streaming

Cloudera CDF and HDF Ports
NiFi and Friends
FLaNK Extended Stack


Note: 

All of these ports can be changed by administrators or in version updates.   Also if you are running Apache Knox like in Cloudera Data Platform Public Cloud, these ports may be changed or hidden.   This is just based on a version of CDF I am running and defaults in.   This does not include standard Cloudera ports for Cloudera Manager, Hadoop, Atlas, Ranger and other necessary and fun services.


Cloudera Flow Management (CFM Powered by Apache NiFi)
  • Cloudera NiFi HTTP:    8080 or 9090
  • Cloudera NiFi HTTPS:  8443 or 9443
  • Cloudera NiFi RIP Socket: 10443 or 50999
  • Cloudera NiFi Node Protocol: 11443
  • Cloudera NiFi Load Balancing:  6342
  • Cloudera NiFi Registry: 18080
  • Cloudera NiFi Registry SSL: 18433
  • Cloudera NiFi Certificate Authority:  10443

Cloudera Edge Flow Management (CEM Powered by Apache NiFi - MiNiFi)

  • Cloudera EFM HTTP:  10080
  • Cloudera EFM CoAP:  8989

Cloudera Stream Processing (CSP Powered by Apache Kafka)
  • Cloudera Kafka: 9092
  • Cloudera Kafka SSL:  9093
  • Cloudera Kafka Connect:  38083
  • Cloudera Kafka Connect SSL:  38085
  • Cloudera Kafka Jetty Metrics: 38084
  • Cloudera Kafka JMX: 9393
  • Cloudera Kafka MirrorMaker JMX: 9394
  • Cloudera Kafka HTTP Metric: 24042
  • Cloudera Schema Registry Registry: 7788
  • Cloudera Schema Registry Admin: 7789
  • Cloudera Schema Registry SSL:  7790
  • Cloudera Schema Registry Admin SSL:  7791
  • Cloudera Schema Registry Database (Postgresql):  5432
  • Cloudera SRM:  6669
  • Cloudera RPC: 8081
  • Cloudera SRM Rest: 6670
  • Cloudera SRM Rest SSL:  6671
  • Cloudera SMM Rest / UI: 9991
  • Cloudera SMM Manager:  8585
  • Cloudera SMM Manager SSL:  8587
  • Cloudera SMM Manager Admin:  8586
  • Cloudera SMM Manager Admin SSL: 8588
  • Cloudera SMM Service Monitor:  9997
  • Cloudera SMM Kafka Connect:  38083
  • Cloudera SMM Database (Postgresql):  5432

Cloudera Streaming Analytics (CSA Powered by Apache Flink)
  • Cloudera Flink Dashboard:  8082



References



FLaNK: Low Code Streaming: Populating Kafka Topics with FlinkSQL Joins in Real-Time


FLaNK:  Low Code Streaming:  Populating Kafka Topics with FlinkSQL Joins in Real-Time 







FLaNK




Then I can create my 3 tables.   Two are the source ones to join and the third is the destination for my insert.



INSERT INTO global_sensor_events 
SELECT 
 scada.uuid, 
 scada.systemtime ,  
scada.temperaturef , 
scada.pressure , 
scada.humidity , 
scada.lux , 
scada.proximity , 
scada.oxidising , 
scada.reducing , 
scada.nh3 , 
scada.gasko,
energy.`current`, 
energy.voltage ,
energy.`power` ,
energy.`total`,
energy.fanstatus

FROM energy,
     scada
WHERE
    scada.systemtime = energy.systemtime;

Examples

Assets / Scripts / DDL / SQL

Flink Guide to SQL Joins
https://www.youtube.com/watch?v=5AuBlVRKQuo

Slides

Article on Joins

Resources

Time Series Analysis - Dataflow






In a first, we joined together for the forces of NYC, New Jersey and Philly to power this meetup.   A huge thanks to John Kuchmek, Amol Thacker and Paul Vidal for promoting and cross running a sweet meetup.   John was an amazing meetup lead and made sure we kept moving.  A giant thanks to Cloudera marketing for helping with logistics and some awesome giveaways!   Hopefully next year's we can do a Cinco De Mayo Taco Feast!  Bill Brooks and Robert Hryniewicz were great help!   And thanks for Cloudera for providing CDP Public Cloud on AWS and CDP-DC on OpenStack for demos, development and general data fun.   And thanks for the initial meetup suggestion and speaker to Bethann Noble and her awesome machine learning people.



Philly - NJ - NYC


To quote, John Kuchmek:

The Internet of Things (IoT) is growing in popularity but it isn’t new. Connected devices have existed in manufacturing and utilities with Supervisory Control and Data Acquisition (SCADA) systems. Time series data has been looked at for sometime in these industries as well as the stock market. Time series analysis can bring valuable insight to businesses and individuals with smart homes. There are many parts and components to be able to collect data at the edge, store in a central location for initial analysis, model build, train and eventually deploy. Time series forecasting is one of the more challenging problems to solve in data science. Important factors in time series analysis and forecasting are seasonality, stationary nature of data and autocorrelation of target variables. We show you a platform, built on open source technology, that has this potential. Sensor data will be collected at the edge, off a Raspberry Pi, using Cloudera’s Edge Flow Manager (powered by MiNiFi). The data will then be pushed to a cluster containing Cloudera Flow Manager (powered by NiFi) so it can be manipulated, routed, and then be stored in Kudu on Cloudera’s Data Platform. Initial inspection can be done in Hue using Impala. The time series data will be analyzed with potential forecasting using an ARIMA model in CML (Cloudera Machine Learning). Time series analysis and forecasting can be applied to but not limited to stock market analysis, forecasting electricity loads, inventory studies, weather conditions, census analysis and sales forecasting.


The main portion of our meetup was an amazing talk by Data Scientist - Victor Dibia.

Analyzing Time Series Data with an ARIMA model


His talk comes right after mine and is about an hour of in-depth Data Science with many hard questions answered.   Also a cool demo.   Thanks again Victor.

We also had some really great attendees who asked some tough question.  My favorite question was by a Flink expert who joined from the West Coast who asked for a FLaNK sticker.



Time Series Analysis - Dataflow

For my small part I did a demo of ingesting data from MiNiFi to NiFi to CML and Kafka.   Flink reads from two Kafka topics, joins them and inserts into a third Kafka topic.   We call the ML model for classification as part of our ingest flow.   This is an example of my FLaNK Stack.

MiNiFi sends the data it reads from sensors and a camera and sends them to a local NiFi gateway.   That NiFi gateway sends a stream to my CDP hosted CFM NiFi cluster for processing.  This cluster splits the data based on which set of sensors (energy or scada) and then publishes to Kafka topics and populates Kudu tables with an UPSERT.




We have great options for monitoring, querying and analyzing our data with the tools from CDP and CDP-DC.   These include Cloudera DAS, Apache Hue, Cloudera SMM for Kafka, Flink SQL console, Flink Dashboard, CML Notebooks, Jupyter Notebooks from CML and Apache Zeppelin.















As a separate way to investigate Kafka, I have created a Hive external table in beeline and connected that to a Kafka topic.  I can know query the current state of that topic.







Video Walkthrough of FlinkSQL Application (and awesome Machine Learning Talk on Time Series)



Slides From Talk


Related Articles

Flink SQL Preview

FLaNK:  Flink SQL Preview







From our Web Flink Dashboard, we can see how our insert is doing and view the joins and records passing quickly through our tiny cluster.










As part of the May 7th, 2020 Virtual Meetup, I was doing some work with Flink SQL to show for a quick demo as the introduction to the meetup and I found out how easy it was to do some cool stuff.   This was inspired by my Streaming Hero, Abdelkrim, who wrote this amazing article on Flink SQL use cases:   https://towardsdatascience.com/event-driven-supply-chain-for-crisis-with-flinksql-be80cb3ad4f9

As part of our time series meetup, I have a few streams of data coming from one device from a MiNiFi Java agent to NiFi for some transformation, routing and processing and then sent to Apache Flink for final processing.   I decided to join Kafka topics with Flink SQL.   


Let's create Flink Tables:

This table will be used to insert the joined events from both source Kafka topics.

CREATE TABLE global_sensor_events (
 uuid STRING, 
systemtime STRING ,  
temperaturef STRING , 
pressure DOUBLE, 
humidity DOUBLE, 
lux DOUBLE, 
proximity int, 
oxidising DOUBLE , 
reducing DOUBLE, 
nh3 DOUBLE , 
gasko STRING,
`current` INT, 
voltage INT ,
`power` INT,
`total` INT,
fanstatus STRING
) WITH (
'connector.type'    = 'kafka',
'connector.version' = 'universal',
'connector.topic'    = 'global_sensor_events',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.bootstrap.servers' = 'tspann-princeton0-cluster-0.general.fuse.l42.cloudera.com:9092',
'connector.properties.group.id' = 'flink-sql-global-sensor_join',
'format.type' = 'json'
);


This table will hold Kafka topic messages from our energy reader.

CREATE TABLE energy (
uuid STRING, 
systemtime STRING,  
        `current` INT, 
voltage INT, 
`power` INT, 
`total` INT, 
swver STRING, 
hwver STRING,
type STRING, 
model STRING, 
mac STRING, 
deviceId STRING, 
hwId STRING, 
fwId STRING, 
oemId STRING,
alias STRING, 
devname STRING, 
iconhash STRING, 
relaystate INT, 
ontime INT, 
activemode STRING, 
feature STRING, 
updating INT, 
rssi INT, 
ledoff INT, 
latitude INT, 
longitude INT, 
`day` INT, 
`index` INT, 
zonestr STRING, 
tzstr STRING, 
dstoffset INT, 
host STRING, 
currentconsumption INT, 
devicetime STRING, 
ledon STRING, 
fanstatus STRING, 
`end` STRING, 
te STRING, 
cpu INT, 
memory INT, 
diskusage STRING
) WITH (
'connector.type'    = 'kafka',
'connector.version' = 'universal',
'connector.topic'    = 'energy',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.bootstrap.servers' = 'tspann-princeton0-cluster-0.general.fuse.l42.cloudera.com:9092',
'connector.properties.group.id' = 'flink-sql-energy-consumer',
'format.type' = 'json'
);


The scada table holds events from our sensors.

CREATE TABLE scada (
uuid STRING, 
systemtime STRING,  
amplitude100 DOUBLE, 
        amplitude500 DOUBLE, 
amplitude1000 DOUBLE, 
lownoise DOUBLE, 
midnoise DOUBLE,
        highnoise DOUBLE, 
amps DOUBLE, 
ipaddress STRING, 
host STRING, 
host_name STRING,
        macaddress STRING, 
endtime STRING, 
runtime STRING, 
starttime STRING, 
        cpu DOUBLE, 
cpu_temp STRING, 
diskusage STRING, 
memory DOUBLE, 
id STRING, 
temperature STRING, 
adjtemp STRING, 
adjtempf STRING, 
temperaturef STRING, 
pressure DOUBLE, 
humidity DOUBLE, 
lux DOUBLE, 
proximity INT, 
oxidising DOUBLE, 
reducing DOUBLE, 
nh3 DOUBLE, 
gasko STRING
) WITH (
'connector.type'    = 'kafka',
'connector.version' = 'universal',
'connector.topic'    = 'scada',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.bootstrap.servers' = 'tspann-princeton0-cluster-0.general.fuse.l42.cloudera.com:9092',
'connector.properties.group.id' = 'flink-sql-scada-consumer',
'format.type' = 'json'
);


This is the magic part:

INSERT INTO global_sensor_events 
SELECT 
scada.uuid, 
scada.systemtime ,  
scada.temperaturef , 
scada.pressure , 
scada.humidity , 
scada.lux , 
scada.proximity , 
scada.oxidising , 
scada.reducing , 
scada.nh3 , 
scada.gasko,
energy.`current`, 
energy.voltage ,
energy.`power` ,
energy.`total`,
energy.fanstatus

FROM energy,
     scada
WHERE
    scada.systemtime = energy.systemtime;

So we join two Kafka topics and use some of their fields to populate a third Kafka topic that we defined above.

With Cloudera, it is so easy to monitor our streaming Kafka events with SMM.


For context, this is where the data comes from:



Analyzing Wood Burning Stoves with FLaNK Stack Part 2 - Analytics

Analyzing Wood Burning Stoves with FLaNK Stack Part 2 - Analytics - Part 2

Part 1:  https://www.datainmotion.dev/2020/01/analyzing-wood-burning-stoves-with.html

See:   https://shop.pimoroni.com/products/sgp30-air-quality-sensor-breakout
  • Sensiron SGP30 TVOC and eCO2 sensor
  • TVOC sensing from 0-60,000 ppb (parts per billion)
  • CO2 sensing from 400 to 60,000 ppm (parts per million)
Running the fire I can see I am getting higher CO2 production than normal.

Since I stored my data in Kudu tables, it's easy to analyze with Impala and Hue.


select equivalentco2ppm, totalvocppb, systemtime
from gassensors
order by equivalentco2ppm desc



select avg( cast(  equivalentco2ppm as double) ) CO2PPM
from gassensors

The average was 493.

Now that we have some time series data, I can start feeding this to some standard machine learning algorithms and have CML and a Data Scientist if me some analytics and help me determine where I a may want an alert.


Up to 400 is considered normal.

400 to 1,000 is typical of occupied locations with air exchange.

Once you get over 1,000 you start getting drowsy and noticeable effects.

Over 2,000 you get headaches, this is a concern.   Over 5,000 you should remove yourself from the situation.   

select appx_median(cast(equivalentco2ppm as double)) median, min(cast(equivalentco2ppm as double)) min, 
       max(cast(equivalentco2ppm as double)) max, avg(cast(equivalentco2ppm as double)) avg, 
stddev(cast(equivalentco2ppm as double)) standarddev,
stddev_pop(cast(equivalentco2ppm as double)) standardpop
from gassensors


Let's start setting alerts at various levels.

We can also look at the indoor air quality.


As a baseline for the sensor, in an empty ventilated room my numbers are:


{"uuid": "sgp30_uuid_glv_20200123132631", "ipaddress": "192.168.1.221", "runtime": "0", "host": "garden3", "host_name": "garden3", "macaddress": "dc:a6:32:32:98:20", "end": "1579785991.7173052", "te": "0.0261075496673584", "systemtime": "01/23/2020 08:26:31", "cpu": 53.5, "diskusage": "109138.7 MB", "memory": 46.5, "equivalentco2ppm": "  412", "totalvocppb": "    6", "id": "20200123132631_dec207f1-9234-4bee-ad38-a0256629c976"}
{"uuid": "sgp30_uuid_snt_20200123132633", "ipaddress": "192.168.1.221", "runtime": "0", "host": "garden3", "host_name": "garden3", "macaddress": "dc:a6:32:32:98:20", "end": "1579785993.7479923", "te": "0.02589273452758789", "systemtime": "01/23/2020 08:26:33", "cpu": 55.6, "diskusage": "109137.0 MB", "memory": 46.5, "equivalentco2ppm": "  403", "totalvocppb": "    5", "id": "20200123132633_3bd5fb39-d6b2-4f23-8904-0ada862ede2b"}
{"uuid": "sgp30_uuid_uha_20200123132635", "ipaddress": "192.168.1.221", "runtime": "0", "host": "garden3", "host_name": "garden3", "macaddress": "dc:a6:32:32:98:20", "end": "1579785995.7779448", "te": "0.025917768478393555", "systemtime": "01/23/2020 08:26:35", "cpu": 51.1, "diskusage": "109135.3 MB", "memory": 46.5, "equivalentco2ppm": "  406", "totalvocppb": "    3", "id": "20200123132635_0412f445-9b8c-43a8-b34a-a5466f914be7"}
{"uuid": "sgp30_uuid_wau_20200123132637", "ipaddress": "192.168.1.221", "runtime": "0", "host": "garden3", "host_name": "garden3", "macaddress": "dc:a6:32:32:98:20", "end": "1579785997.8079107", "te": "0.02591681480407715", "systemtime": "01/23/2020 08:26:37", "cpu": 58.7, "diskusage": "109133.5 MB", "memory": 47.1, "equivalentco2ppm": "  406", "totalvocppb": "   13", "id": "20200123132637_73f069d9-0beb-4d06-a638-2bd92e50ece7"}
{"uuid": "sgp30_uuid_lse_20200123132639", "ipaddress": "192.168.1.221", "runtime": "0", "host": "garden3", "host_name": "garden3", "macaddress": "dc:a6:32:32:98:20", "end": "1579785999.83777", "te": "0.025897502899169922", "systemtime": "01/23/2020 08:26:39", "cpu": 53.1, "diskusage": "109131.6 MB", "memory": 46.5, "equivalentco2ppm": "  410", "totalvocppb": "    1", "id": "20200123132639_1aa392fe-0eb7-4332-9631-83ac5838e153"}

Very low parts per billion between 1 and 13, with nothing changing in the static room seems like that's a 10 ppb margin of error, we can run some queries in Hue for better stats.

Let's look at some data over time for TVOC.

select appx_median(cast(totalvocppb as double)) median, min(cast(totalvocppb as double)) min, 
       max(cast(totalvocppb as double)) max, avg(cast(totalvocppb as double)) avg, 
stddev(cast(totalvocppb as double)) standarddev,
stddev_pop(cast(totalvocppb as double)) standardpop
from gassensors



So what's a good TVOC?   On average we are below the range of potential irritation of 120 - 1200 ppb.   We do have some variance for sensor capabilities and lack of professional calibration.      Median and Average numbers look good.   The maximum is a bit disturbing but can be sensor error, warm up time or other data quality issues.   We'll have to dive more into the numbers.

Next we can look at PM 2.5 values.

Need to crowd source some science here.

We had 3,500+ records of data over 120.

select count(*)
from gassensors
where  cast(totalvocppb as double) > 120

I can see a number of records and the data climb as the fire burns and we add more cherry wood.

select systemtime, equivalentco2ppm, totalvocppb
from gassensors
where  cast(totalvocppb as double) > 120
order by systemtime asc

I should also note that the time series data is coming in every 2 seconds.

select to_timestamp(systemtime, 'MM/dd/yyyy HH:mm:ss'), EXTRACT(to_timestamp(systemtime, 'MM/dd/yyyy HH:mm:ss'), 
    'MINUTE') as minute , 
cast(totalvocppb as double) as TVOC, cast(equivalentco2ppm as double) CO2PPM

from gassensors
order by systemtime desc

Resources



Analyzing Wood Burning Stoves with FLaNK Stack: MiNiFi, Flink, NiFi, Kafka, Kudu

Analyzing Wood Burning Stoves with FLaNK Stack:   MiNiFi, Flink, NiFi, Kafka, Kudu (FLaNK Stack)


Winter has arrived, finally.   The 50-70 F days are over, it dropped below 30 F in Princeton, so time to light up the wood burning stove and burn some season cherry wood (We get cherry wood from a local tree service that removes dead trees for people and then season the wood.  Recycle!) .  It's great for camp fires, smoking meats and for heating up our house.  Also if you have no smelled cherry wood smoke it is amazing.   I wanted to see if having a fire that raised my houses temperature from 67 F to 87 F would produce noticeable sensor readings.   Fortunately, I have a thermal camera sensor (Pimoroni rocks! Add another thing to my list of thinks I love from Britain (Dr. Who, Jelly Babies, Pimoroni and my awesome boss Dan).  I also have Raspberry Pi sensors for temperature, humidity, light and various gas sensors.   Let's see what the numbers look like.    The temperatures and images start greeen and yellow and as they heat up turn red, purple and then pure white.   That's real hot.   Fortunately the Raspberry Pis didn't overheat, had to open a window when we got close to 90.   Yes, temperature regulation and maybe an automated wood feeder would be nice.
  




Inside the Stove


Cherry Wood burning nice in stove, notice Fire on Cloudera T-Shirt

Four  USB PS3 Eye Cameras ($7!!!) attached to Raspberry Pi 3B+ and 4s.


A very organized professional assortment of Pis and sensors...





 It's very easy to spot check my sensor values as they stream through Apache Kafka with Cloudera SMM.


Some Sensor Readings:

{"bme280_tempf": "93.78", "uuid": "20200117195629_104c9f2a-b5a8-43d2-8386-57b7bd05f55a", "systemtime": "01/17/2020 14:56:29", "bme280_altitude": "-41.31", "memory": 92.1, "max30105_value": "84.00", "end": "1579290989.4628081", "imgnamep": "images/bog_image_p_20200117195629_104c9f2a-b5a8-43d2-8386-57b7bd05f55a.jpg", "max30105_temp": "34.56", "ipaddress": "192.168.1.251", "diskusage": "44726.6", "host": "garden2", "max30105timestamp": "20200117-145629-345697", "starttime": "01/17/2020 13:48:29", "bme280_altitude_feet": "-135.53", "max30105_delta": "0.00", "max30105_mean": "84.00", "max30105_detected": "False", "bme280_tempc": "34.32", "bme280_pressure": "1034.61", "cputemp": 59, "te": "4079.87322807312", "imgname": "images/bog_image_20200117195629_104c9f2a-b5a8-43d2-8386-57b7bd05f55a.jpg"}


[{"uuid":"sgp30_uuid_xyg_20200117185015","ipaddress":"192.168.1.221","runtime":"0","host":"garden3","host_name":"garden3","macaddress":"dc:a6:32:32:98:20","end":"1579287015.6653564","te":"0.025962352752685547","systemtime":"01/17/2020 13:50:15","cpu":55.0,"diskusage":"109290.8 MB","memory":29.7,"equivalentco2ppm":"  400","totalvocppb":"   37","id":"20200117185015_b8fbd9c1-fa30-4f70-b20d-e43a2c703b18"}]

{"uuid": "rpi4_uuid_kse_20200117222947", "ipaddress": "192.168.1.243", "host": "rp4", "host_name": "rp4", "macaddress": "dc:a6:32:03:a6:e9", "systemtime": "01/17/2020 17:29:47", "cpu": 50.8, "diskusage": "46208.0 MB", "memory": 18.2, "id": "20200117222947_e9299089-d56f-468b-8bac-897a2918307a", "temperature": "48.96355155197982", "pressure": "1035.4460084255888", "humidity": "0.0", "lux": "49.0753", "proximity": "0", "gas": "Oxidising: 30516.85 Ohms\nReducing: 194406.50 Ohms\nNH3: 104000.00 Ohms"}

{"host": "rp4", "cputemp": "72", "ipaddress": "192.168.1.243", "endtime": "1579293634.02", "runtime": "0.00", "systemtime": "01/17/2020 15:40:34", "starttime": "01/17/2020 15:40:34", "diskfree": "46322.7", "memory": "17.1", "uuid": "20200117204034_99f49e71-7444-4fd7-b82e-7e03720c4c39", "image_filename": "20200117204034_d9f811a3-8582-4b47-b4b4-cb6ec51cca04.jpg"}

The next step is to have NiFi load the data to Kudu, Hive, HBase or Phoenix tables for analysis with Cloudera Data Science Work Bench and some machine learning analytics in Python 3 on either Zeppelin or Jupyter notebooks feeding CDSW.   Then I can host my final model on K8 within CDSW for a real edge to AI application and solve the issue of how much fire in my house is too much? 

This article is part of the FLaNK Stack series, highlighting using the FL(ink) Apache NiFi Kafka Kudu stack for big data streaming development with IoT and AI applications.

FLANK Stack: NiFi Processor for Kafka Consumption on Demand - REST Proxy Example

Writing a Custom Kafka Rest Proxy in 4 Hours


A custom processor for using NiFi as a REST Proxy to Kafka is very easy.  So I made one in NiFi 1.10.   It's a simple Kafka smart client that accepts POSTs, GETs or whatever HTTP request and returns a message from a Kafka topic, topic can be set via variables, HTTP request or your choice.   To get on of the query parameters, you do so like this:   ${http.query.param.topic}.    I am using the plain old KafkaConsumer class. 



If you need data, just CURL it or use your HTTP/REST client controls in Java, Python, Go, Scala, Ruby, C#, VB.NET or whatever.


curl http://localhost:9089?topic=bme680

Download a pre-built NAR and install.   Note:   this is a pre-release Alpha that I quickly built and tested on a few clusters.   This is not an official project or product.   This is a POC for myself to see how hard it could it be.   It's not!   Roll your own or join me in building out an open source project for one.   What requirements do you have?   This worked for me, send a curl, get a message.



Build a topic for Kafka with SMM in seconds


 Here's an entire Kafka REST Proxy in a few steps.

HandleHttpRequest (We could have hundreds of options here.
RouteOnAttribute


Provenance For An Example Kafka REST Call

To use the processor, we need to set some variables for Kafka Broker, Topic, Offset Reset, # of Records to grab at a time, Client Id, Group Id - important for keeping your offset, auto commit, deserializer for your key and value types - String is usually good, maybe Byte.


Kafka to HBase is Easy.

Kafka to Kudu is Easy.




Kafka Proxy Processor For Message Consumption Source


https://github.com/tspannhw/kafkarest-processor

Pre-Built NAR

https://github.com/tspannhw/kafkarest-processor/releases/download/0.1/nifi-kafkarest-nar-1.0.nar

Other Kafka Articles



It's so easy, didn't wake the cat.