Cloudera Flow Management 101: Let's Build a Simple REST Ingest to Cloud Datawarehouse With LowCode. Powered by Apache NiFi




Use NiFi to call REST API, transform, route and store the data


Pick any REST API of your choice, but I have walked through this one to grab a number of weather stations reports.  Weather or not we have good weather, we can query it anyway.

workshopoverview


We are going to build a GenerateFlowFile to feed our REST calls.


generateflowfile
[
{"url":"http://weather.gov/xml/current_obs/CWAV.xml"},
{"url":"http://weather.gov/xml/current_obs/KTTN.xml"},
{"url":"http://weather.gov/xml/current_obs/KEWR.xml"},
{"url":"http://weather.gov/xml/current_obs/KEWR.xml"},
{"url":"http://weather.gov/xml/current_obs/CWDK.xml"},
{"url":"http://weather.gov/xml/current_obs/CWDZ.xml"},
{"url":"http://weather.gov/xml/current_obs/CWFJ.xml"},
{"url":"http://weather.gov/xml/current_obs/PAEC.xml"},
{"url":"http://weather.gov/xml/current_obs/PAYA.xml"},
{"url":"http://weather.gov/xml/current_obs/PARY.xml"},
{"url":"http://weather.gov/xml/current_obs/K1R7.xml"},
{"url":"http://weather.gov/xml/current_obs/KFST.xml"},
{"url":"http://weather.gov/xml/current_obs/KSSF.xml"},
{"url":"http://weather.gov/xml/current_obs/KTFP.xml"},
{"url":"http://weather.gov/xml/current_obs/CYXY.xml"},
{"url":"http://weather.gov/xml/current_obs/KJFK.xml"},
{"url":"http://weather.gov/xml/current_obs/KISP.xml"},
{"url":"http://weather.gov/xml/current_obs/KLGA.xml"},
{"url":"http://weather.gov/xml/current_obs/KNYC.xml"},
{"url":"http://weather.gov/xml/current_obs/KJRB.xml"}
]

So we are using ${url} which will be one of these. Feel free to pick your favorite airports or locations near you. https://w1.weather.gov/xml/current_obs/index.xml

If you wish to choose your own data adventure, you can pick one of these others. You will have to build your own table if you wish to store it. They return CSV, JSON or XML, since we have record processors we don’t care. Just know which you pick.

Then we will use SplitJSON to split the JSON records into single rows.

splitjson

Then use EvaluateJSONPath to extract the URL.

evaluatejsonpath2

Now we are going to call those REST URLs with InvokeHTTP.

You will need to create a Standard SSL controller.

enablessl
standardSSL
sslcontext


This is the default JDK JVM on Mac or some Centos 7.   You may have a real password, if so you are awesome.   If you don't know it, that's rough.   You can build a new one with SSL.

For more cloud ingest fun, https://docs.cloudera.com/cdf-datahub/7.1.0/howto-data-ingest.html.

SSL Defaults (In CDP Datahub, one is built for you automagically, thanks Michael).

Truststore filename: /usr/lib/jvm/java-openjdk/jre/lib/security/cacerts 

Truststore password: changeit 

Truststore type: JKS 

TLS Protocol: TLS


StandardSSLContextService for Your GET ${url}

invokehttp



We can tweak these defaults.
invokehttp2

Then we are going to run a query to convert these and route based on our queries.

Example query on the current NOAA weather observations to look for temperature in fareneheit below 60 degrees. You can make a query with any of the fields in the where cause. Give it a try!

queryRecord


You will need to set the Record Writer and Record Reader:

Record Reader: XML 

Record Writer: JSON


jsonwriter
SELECT * FROM FLOWFILE
WHERE temp_f <= 60
SELECT * FROM FLOWFILE

Now we are splitting into three concurrent paths. This shows the power of Apache NiFi. We will write to Kudu, HDFS and Kafka.

For the results of our cold path (temp_f ⇐60), we will write to a Kudu table.

putkudu


Kudu Masters: edge2ai-1.dim.local:7051 Table Name: impala::default.weatherkudu Record Reader: Infer Json Tree Reader Kudu Operation Type: UPSERT

Before you run this, go to Hue and build the table.


huechooseimpala
huecreateweatherkudu
CREATE TABLE weatherkudu
(`location` STRING,`observation_time` STRING, `credit` STRING, `credit_url` STRING, `image` STRING, `suggested_pickup` STRING, `suggested_pickup_period` BIGINT,
`station_id` STRING, `latitude` DOUBLE, `longitude` DOUBLE,  `observation_time_rfc822` STRING, `weather` STRING, `temperature_string` STRING,
`temp_f` DOUBLE, `temp_c` DOUBLE, `relative_humidity` BIGINT, `wind_string` STRING, `wind_dir` STRING, `wind_degrees` BIGINT, `wind_mph` DOUBLE, `wind_gust_mph` DOUBLE, `wind_kt` BIGINT,
`wind_gust_kt` BIGINT, `pressure_string` STRING, `pressure_mb` DOUBLE, `pressure_in` DOUBLE, `dewpoint_string` STRING, `dewpoint_f` DOUBLE, `dewpoint_c` DOUBLE, `windchill_string` STRING,
`windchill_f` BIGINT, `windchill_c` BIGINT, `visibility_mi` DOUBLE, `icon_url_base` STRING, `two_day_history_url` STRING, `icon_url_name` STRING, `ob_url` STRING, `disclaimer_url` STRING,
`copyright_url` STRING, `privacy_policy_url` STRING,
PRIMARY KEY (`location`, `observation_time`)
)
PARTITION BY HASH PARTITIONS 4
STORED AS KUDU
TBLPROPERTIES ('kudu.num_tablet_replicas' = '1');

Let it run and query it.   Kudu table queried via Impala, try it in Hue.

huequeryweatherkudu


The Second fork is to Kafka, this will be for the 'all' path.


publishKafka


Kafka Brokers: edge2ai-1.dim.local:9092 Topic: weather Reader & Writer: reuse the JSON ones

The Third and final fork is to HDFS (could be ontop of S3 or Blob Storage) as Apache ORC files. This will also autogenerate the DDL for an external Hive table as an attribute, check your provenance after running.

mergerecord


JSON in and out for record readers/writers, you can adjust the time and size of your batch or use defaults.

putorc
putorc1
putorc2


Hadoop Config: /etc/hadoop/conf/hdfs-site.xml,/etc/hadoop/conf/core-site.xml Record Reader: Infer Json Directory: /tmp/weather Table Name: weather

Before we run, build the /tmp/weather directory in HDFS and give it 777 permissions. We can do this with Apache Hue.


createhdfsdir
changepermissionshdfsdir

Once we run we can get the table DDL and location:

putOrcProvenanceWeather


Go to Hue to create your table.


huetohive
CREATE EXTERNAL TABLE IF NOT EXISTS `weather`
(`credit` STRING, `credit_url` STRING, `image` STRUCT<`url`:STRING, `title`:STRING, `link`:STRING>, `suggested_pickup` STRING, `suggested_pickup_period` BIGINT,
`location` STRING, `station_id` STRING, `latitude` DOUBLE, `longitude` DOUBLE, `observation_time` STRING, `observation_time_rfc822` STRING, `weather` STRING, `temperature_string` STRING,
`temp_f` DOUBLE, `temp_c` DOUBLE, `relative_humidity` BIGINT, `wind_string` STRING, `wind_dir` STRING, `wind_degrees` BIGINT, `wind_mph` DOUBLE, `wind_gust_mph` DOUBLE, `wind_kt` BIGINT,
`wind_gust_kt` BIGINT, `pressure_string` STRING, `pressure_mb` DOUBLE, `pressure_in` DOUBLE, `dewpoint_string` STRING, `dewpoint_f` DOUBLE, `dewpoint_c` DOUBLE, `windchill_string` STRING,
`windchill_f` BIGINT, `windchill_c` BIGINT, `visibility_mi` DOUBLE, `icon_url_base` STRING, `two_day_history_url` STRING, `icon_url_name` STRING, `ob_url` STRING, `disclaimer_url` STRING,
`copyright_url` STRING, `privacy_policy_url` STRING)
STORED AS ORC
LOCATION '/tmp/weather'
weatherhdfslist

You can now use Apache Hue to query your tables and do some weather analytics. When we are upserting into Kudu we are ensuring no duplicate reports for a weather station and observation time.

select `location`, weather, temp_f, wind_string, dewpoint_string, latitude, longitude, observation_time
from weatherkudu
order by observation_time desc, station_id asc
select *
from weather
lab3flow


In Atlas, we can see the flow.

atlasTopic

One Minute NiFi Tip: Calcite SQL Notes

NiFi Quick Tip on SQL


You sometimes have to cast, as fields aren't what you think they are.   I have some temperatures that are stored as string, yeah I know let's yell at who did that.   Maybe it was some lazy developer (Me?~??~?~?!!!).    Let's just cast to a type that makes sense for math and comparisons.   CAST is my friend.

SELECT * 
FROM FLOWFILE
WHERE CAST(temperaturef as FLOAT) > 60


Apache NiFi (and lots of other awesome projects) use Apache Calcite for queries.   So if you need some SQL help, always look here:   https://calcite.apache.org/docs/reference.html

You can also include variables in your QueryRecord queries.

SELECT * 
FROM FLOWFILE
WHERE CAST(temperaturef as FLOAT) >= (CAST(${predictedTemperature} as FLOAT) - 5)


There are wildcard characters that you may need to watch.

Underscore has special meaning.  Also there often column names that are reserved words.   I got a lot of columns coming from IoT often with names like timestamp, start, end and other ones used by SQL.   Just put a `start` around it.

Watch those wildcards.

select * from flowfile where internal = false
and name not like '@_@_%' ESCAPE '@'

FLaNK: Low Code Streaming: Populating Kafka Topics with FlinkSQL Joins in Real-Time


FLaNK:  Low Code Streaming:  Populating Kafka Topics with FlinkSQL Joins in Real-Time 







FLaNK




Then I can create my 3 tables.   Two are the source ones to join and the third is the destination for my insert.



INSERT INTO global_sensor_events 
SELECT 
 scada.uuid, 
 scada.systemtime ,  
scada.temperaturef , 
scada.pressure , 
scada.humidity , 
scada.lux , 
scada.proximity , 
scada.oxidising , 
scada.reducing , 
scada.nh3 , 
scada.gasko,
energy.`current`, 
energy.voltage ,
energy.`power` ,
energy.`total`,
energy.fanstatus

FROM energy,
     scada
WHERE
    scada.systemtime = energy.systemtime;

Examples

Assets / Scripts / DDL / SQL

Flink Guide to SQL Joins
https://www.youtube.com/watch?v=5AuBlVRKQuo

Slides

Article on Joins

Resources

Time Series Analysis - Dataflow






In a first, we joined together for the forces of NYC, New Jersey and Philly to power this meetup.   A huge thanks to John Kuchmek, Amol Thacker and Paul Vidal for promoting and cross running a sweet meetup.   John was an amazing meetup lead and made sure we kept moving.  A giant thanks to Cloudera marketing for helping with logistics and some awesome giveaways!   Hopefully next year's we can do a Cinco De Mayo Taco Feast!  Bill Brooks and Robert Hryniewicz were great help!   And thanks for Cloudera for providing CDP Public Cloud on AWS and CDP-DC on OpenStack for demos, development and general data fun.   And thanks for the initial meetup suggestion and speaker to Bethann Noble and her awesome machine learning people.



Philly - NJ - NYC


To quote, John Kuchmek:

The Internet of Things (IoT) is growing in popularity but it isn’t new. Connected devices have existed in manufacturing and utilities with Supervisory Control and Data Acquisition (SCADA) systems. Time series data has been looked at for sometime in these industries as well as the stock market. Time series analysis can bring valuable insight to businesses and individuals with smart homes. There are many parts and components to be able to collect data at the edge, store in a central location for initial analysis, model build, train and eventually deploy. Time series forecasting is one of the more challenging problems to solve in data science. Important factors in time series analysis and forecasting are seasonality, stationary nature of data and autocorrelation of target variables. We show you a platform, built on open source technology, that has this potential. Sensor data will be collected at the edge, off a Raspberry Pi, using Cloudera’s Edge Flow Manager (powered by MiNiFi). The data will then be pushed to a cluster containing Cloudera Flow Manager (powered by NiFi) so it can be manipulated, routed, and then be stored in Kudu on Cloudera’s Data Platform. Initial inspection can be done in Hue using Impala. The time series data will be analyzed with potential forecasting using an ARIMA model in CML (Cloudera Machine Learning). Time series analysis and forecasting can be applied to but not limited to stock market analysis, forecasting electricity loads, inventory studies, weather conditions, census analysis and sales forecasting.


The main portion of our meetup was an amazing talk by Data Scientist - Victor Dibia.

Analyzing Time Series Data with an ARIMA model


His talk comes right after mine and is about an hour of in-depth Data Science with many hard questions answered.   Also a cool demo.   Thanks again Victor.

We also had some really great attendees who asked some tough question.  My favorite question was by a Flink expert who joined from the West Coast who asked for a FLaNK sticker.



Time Series Analysis - Dataflow

For my small part I did a demo of ingesting data from MiNiFi to NiFi to CML and Kafka.   Flink reads from two Kafka topics, joins them and inserts into a third Kafka topic.   We call the ML model for classification as part of our ingest flow.   This is an example of my FLaNK Stack.

MiNiFi sends the data it reads from sensors and a camera and sends them to a local NiFi gateway.   That NiFi gateway sends a stream to my CDP hosted CFM NiFi cluster for processing.  This cluster splits the data based on which set of sensors (energy or scada) and then publishes to Kafka topics and populates Kudu tables with an UPSERT.




We have great options for monitoring, querying and analyzing our data with the tools from CDP and CDP-DC.   These include Cloudera DAS, Apache Hue, Cloudera SMM for Kafka, Flink SQL console, Flink Dashboard, CML Notebooks, Jupyter Notebooks from CML and Apache Zeppelin.















As a separate way to investigate Kafka, I have created a Hive external table in beeline and connected that to a Kafka topic.  I can know query the current state of that topic.







Video Walkthrough of FlinkSQL Application (and awesome Machine Learning Talk on Time Series)



Slides From Talk


Related Articles

Flink SQL Preview

FLaNK:  Flink SQL Preview







From our Web Flink Dashboard, we can see how our insert is doing and view the joins and records passing quickly through our tiny cluster.










As part of the May 7th, 2020 Virtual Meetup, I was doing some work with Flink SQL to show for a quick demo as the introduction to the meetup and I found out how easy it was to do some cool stuff.   This was inspired by my Streaming Hero, Abdelkrim, who wrote this amazing article on Flink SQL use cases:   https://towardsdatascience.com/event-driven-supply-chain-for-crisis-with-flinksql-be80cb3ad4f9

As part of our time series meetup, I have a few streams of data coming from one device from a MiNiFi Java agent to NiFi for some transformation, routing and processing and then sent to Apache Flink for final processing.   I decided to join Kafka topics with Flink SQL.   


Let's create Flink Tables:

This table will be used to insert the joined events from both source Kafka topics.

CREATE TABLE global_sensor_events (
 uuid STRING, 
systemtime STRING ,  
temperaturef STRING , 
pressure DOUBLE, 
humidity DOUBLE, 
lux DOUBLE, 
proximity int, 
oxidising DOUBLE , 
reducing DOUBLE, 
nh3 DOUBLE , 
gasko STRING,
`current` INT, 
voltage INT ,
`power` INT,
`total` INT,
fanstatus STRING
) WITH (
'connector.type'    = 'kafka',
'connector.version' = 'universal',
'connector.topic'    = 'global_sensor_events',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.bootstrap.servers' = 'tspann-princeton0-cluster-0.general.fuse.l42.cloudera.com:9092',
'connector.properties.group.id' = 'flink-sql-global-sensor_join',
'format.type' = 'json'
);


This table will hold Kafka topic messages from our energy reader.

CREATE TABLE energy (
uuid STRING, 
systemtime STRING,  
        `current` INT, 
voltage INT, 
`power` INT, 
`total` INT, 
swver STRING, 
hwver STRING,
type STRING, 
model STRING, 
mac STRING, 
deviceId STRING, 
hwId STRING, 
fwId STRING, 
oemId STRING,
alias STRING, 
devname STRING, 
iconhash STRING, 
relaystate INT, 
ontime INT, 
activemode STRING, 
feature STRING, 
updating INT, 
rssi INT, 
ledoff INT, 
latitude INT, 
longitude INT, 
`day` INT, 
`index` INT, 
zonestr STRING, 
tzstr STRING, 
dstoffset INT, 
host STRING, 
currentconsumption INT, 
devicetime STRING, 
ledon STRING, 
fanstatus STRING, 
`end` STRING, 
te STRING, 
cpu INT, 
memory INT, 
diskusage STRING
) WITH (
'connector.type'    = 'kafka',
'connector.version' = 'universal',
'connector.topic'    = 'energy',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.bootstrap.servers' = 'tspann-princeton0-cluster-0.general.fuse.l42.cloudera.com:9092',
'connector.properties.group.id' = 'flink-sql-energy-consumer',
'format.type' = 'json'
);


The scada table holds events from our sensors.

CREATE TABLE scada (
uuid STRING, 
systemtime STRING,  
amplitude100 DOUBLE, 
        amplitude500 DOUBLE, 
amplitude1000 DOUBLE, 
lownoise DOUBLE, 
midnoise DOUBLE,
        highnoise DOUBLE, 
amps DOUBLE, 
ipaddress STRING, 
host STRING, 
host_name STRING,
        macaddress STRING, 
endtime STRING, 
runtime STRING, 
starttime STRING, 
        cpu DOUBLE, 
cpu_temp STRING, 
diskusage STRING, 
memory DOUBLE, 
id STRING, 
temperature STRING, 
adjtemp STRING, 
adjtempf STRING, 
temperaturef STRING, 
pressure DOUBLE, 
humidity DOUBLE, 
lux DOUBLE, 
proximity INT, 
oxidising DOUBLE, 
reducing DOUBLE, 
nh3 DOUBLE, 
gasko STRING
) WITH (
'connector.type'    = 'kafka',
'connector.version' = 'universal',
'connector.topic'    = 'scada',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.bootstrap.servers' = 'tspann-princeton0-cluster-0.general.fuse.l42.cloudera.com:9092',
'connector.properties.group.id' = 'flink-sql-scada-consumer',
'format.type' = 'json'
);


This is the magic part:

INSERT INTO global_sensor_events 
SELECT 
scada.uuid, 
scada.systemtime ,  
scada.temperaturef , 
scada.pressure , 
scada.humidity , 
scada.lux , 
scada.proximity , 
scada.oxidising , 
scada.reducing , 
scada.nh3 , 
scada.gasko,
energy.`current`, 
energy.voltage ,
energy.`power` ,
energy.`total`,
energy.fanstatus

FROM energy,
     scada
WHERE
    scada.systemtime = energy.systemtime;

So we join two Kafka topics and use some of their fields to populate a third Kafka topic that we defined above.

With Cloudera, it is so easy to monitor our streaming Kafka events with SMM.


For context, this is where the data comes from: