Skip to main content

Cloudera Flow Management 101: Let's Build a Simple REST Ingest to Cloud Datawarehouse With LowCode. Powered by Apache NiFi




Use NiFi to call REST API, transform, route and store the data


Pick any REST API of your choice, but I have walked through this one to grab a number of weather stations reports.  Weather or not we have good weather, we can query it anyway.

workshopoverview


We are going to build a GenerateFlowFile to feed our REST calls.


generateflowfile
[
{"url":"http://weather.gov/xml/current_obs/CWAV.xml"},
{"url":"http://weather.gov/xml/current_obs/KTTN.xml"},
{"url":"http://weather.gov/xml/current_obs/KEWR.xml"},
{"url":"http://weather.gov/xml/current_obs/KEWR.xml"},
{"url":"http://weather.gov/xml/current_obs/CWDK.xml"},
{"url":"http://weather.gov/xml/current_obs/CWDZ.xml"},
{"url":"http://weather.gov/xml/current_obs/CWFJ.xml"},
{"url":"http://weather.gov/xml/current_obs/PAEC.xml"},
{"url":"http://weather.gov/xml/current_obs/PAYA.xml"},
{"url":"http://weather.gov/xml/current_obs/PARY.xml"},
{"url":"http://weather.gov/xml/current_obs/K1R7.xml"},
{"url":"http://weather.gov/xml/current_obs/KFST.xml"},
{"url":"http://weather.gov/xml/current_obs/KSSF.xml"},
{"url":"http://weather.gov/xml/current_obs/KTFP.xml"},
{"url":"http://weather.gov/xml/current_obs/CYXY.xml"},
{"url":"http://weather.gov/xml/current_obs/KJFK.xml"},
{"url":"http://weather.gov/xml/current_obs/KISP.xml"},
{"url":"http://weather.gov/xml/current_obs/KLGA.xml"},
{"url":"http://weather.gov/xml/current_obs/KNYC.xml"},
{"url":"http://weather.gov/xml/current_obs/KJRB.xml"}
]

So we are using ${url} which will be one of these. Feel free to pick your favorite airports or locations near you. https://w1.weather.gov/xml/current_obs/index.xml

If you wish to choose your own data adventure, you can pick one of these others. You will have to build your own table if you wish to store it. They return CSV, JSON or XML, since we have record processors we don’t care. Just know which you pick.

Then we will use SplitJSON to split the JSON records into single rows.

splitjson

Then use EvaluateJSONPath to extract the URL.

evaluatejsonpath2

Now we are going to call those REST URLs with InvokeHTTP.

You will need to create a Standard SSL controller.

enablessl
standardSSL
sslcontext


This is the default JDK JVM on Mac or some Centos 7.   You may have a real password, if so you are awesome.   If you don't know it, that's rough.   You can build a new one with SSL.

For more cloud ingest fun, https://docs.cloudera.com/cdf-datahub/7.1.0/howto-data-ingest.html.

SSL Defaults (In CDP Datahub, one is built for you automagically, thanks Michael).

Truststore filename: /usr/lib/jvm/java-openjdk/jre/lib/security/cacerts 

Truststore password: changeit 

Truststore type: JKS 

TLS Protocol: TLS


StandardSSLContextService for Your GET ${url}

invokehttp



We can tweak these defaults.
invokehttp2

Then we are going to run a query to convert these and route based on our queries.

Example query on the current NOAA weather observations to look for temperature in fareneheit below 60 degrees. You can make a query with any of the fields in the where cause. Give it a try!

queryRecord


You will need to set the Record Writer and Record Reader:

Record Reader: XML 

Record Writer: JSON


jsonwriter
SELECT * FROM FLOWFILE
WHERE temp_f <= 60
SELECT * FROM FLOWFILE

Now we are splitting into three concurrent paths. This shows the power of Apache NiFi. We will write to Kudu, HDFS and Kafka.

For the results of our cold path (temp_f ⇐60), we will write to a Kudu table.

putkudu


Kudu Masters: edge2ai-1.dim.local:7051 Table Name: impala::default.weatherkudu Record Reader: Infer Json Tree Reader Kudu Operation Type: UPSERT

Before you run this, go to Hue and build the table.


huechooseimpala
huecreateweatherkudu
CREATE TABLE weatherkudu
(`location` STRING,`observation_time` STRING, `credit` STRING, `credit_url` STRING, `image` STRING, `suggested_pickup` STRING, `suggested_pickup_period` BIGINT,
`station_id` STRING, `latitude` DOUBLE, `longitude` DOUBLE,  `observation_time_rfc822` STRING, `weather` STRING, `temperature_string` STRING,
`temp_f` DOUBLE, `temp_c` DOUBLE, `relative_humidity` BIGINT, `wind_string` STRING, `wind_dir` STRING, `wind_degrees` BIGINT, `wind_mph` DOUBLE, `wind_gust_mph` DOUBLE, `wind_kt` BIGINT,
`wind_gust_kt` BIGINT, `pressure_string` STRING, `pressure_mb` DOUBLE, `pressure_in` DOUBLE, `dewpoint_string` STRING, `dewpoint_f` DOUBLE, `dewpoint_c` DOUBLE, `windchill_string` STRING,
`windchill_f` BIGINT, `windchill_c` BIGINT, `visibility_mi` DOUBLE, `icon_url_base` STRING, `two_day_history_url` STRING, `icon_url_name` STRING, `ob_url` STRING, `disclaimer_url` STRING,
`copyright_url` STRING, `privacy_policy_url` STRING,
PRIMARY KEY (`location`, `observation_time`)
)
PARTITION BY HASH PARTITIONS 4
STORED AS KUDU
TBLPROPERTIES ('kudu.num_tablet_replicas' = '1');

Let it run and query it.   Kudu table queried via Impala, try it in Hue.

huequeryweatherkudu


The Second fork is to Kafka, this will be for the 'all' path.


publishKafka


Kafka Brokers: edge2ai-1.dim.local:9092 Topic: weather Reader & Writer: reuse the JSON ones

The Third and final fork is to HDFS (could be ontop of S3 or Blob Storage) as Apache ORC files. This will also autogenerate the DDL for an external Hive table as an attribute, check your provenance after running.

mergerecord


JSON in and out for record readers/writers, you can adjust the time and size of your batch or use defaults.

putorc
putorc1
putorc2


Hadoop Config: /etc/hadoop/conf/hdfs-site.xml,/etc/hadoop/conf/core-site.xml Record Reader: Infer Json Directory: /tmp/weather Table Name: weather

Before we run, build the /tmp/weather directory in HDFS and give it 777 permissions. We can do this with Apache Hue.


createhdfsdir
changepermissionshdfsdir

Once we run we can get the table DDL and location:

putOrcProvenanceWeather


Go to Hue to create your table.


huetohive
CREATE EXTERNAL TABLE IF NOT EXISTS `weather`
(`credit` STRING, `credit_url` STRING, `image` STRUCT<`url`:STRING, `title`:STRING, `link`:STRING>, `suggested_pickup` STRING, `suggested_pickup_period` BIGINT,
`location` STRING, `station_id` STRING, `latitude` DOUBLE, `longitude` DOUBLE, `observation_time` STRING, `observation_time_rfc822` STRING, `weather` STRING, `temperature_string` STRING,
`temp_f` DOUBLE, `temp_c` DOUBLE, `relative_humidity` BIGINT, `wind_string` STRING, `wind_dir` STRING, `wind_degrees` BIGINT, `wind_mph` DOUBLE, `wind_gust_mph` DOUBLE, `wind_kt` BIGINT,
`wind_gust_kt` BIGINT, `pressure_string` STRING, `pressure_mb` DOUBLE, `pressure_in` DOUBLE, `dewpoint_string` STRING, `dewpoint_f` DOUBLE, `dewpoint_c` DOUBLE, `windchill_string` STRING,
`windchill_f` BIGINT, `windchill_c` BIGINT, `visibility_mi` DOUBLE, `icon_url_base` STRING, `two_day_history_url` STRING, `icon_url_name` STRING, `ob_url` STRING, `disclaimer_url` STRING,
`copyright_url` STRING, `privacy_policy_url` STRING)
STORED AS ORC
LOCATION '/tmp/weather'
weatherhdfslist

You can now use Apache Hue to query your tables and do some weather analytics. When we are upserting into Kudu we are ensuring no duplicate reports for a weather station and observation time.

select `location`, weather, temp_f, wind_string, dewpoint_string, latitude, longitude, observation_time
from weatherkudu
order by observation_time desc, station_id asc
select *
from weather
lab3flow


In Atlas, we can see the flow.

atlasTopic

Popular posts from this blog

Ingesting Drone Data From DJII Ryze Tello Drones Part 1 - Setup and Practice

Ingesting Drone Data From DJII Ryze Tello Drones Part 1 - Setup and Practice In Part 1, we will setup our drone, our communication environment, capture the data and do initial analysis. We will eventually grab live video stream for object detection, real-time flight control and real-time data ingest of photos, videos and sensor readings. We will have Apache NiFi react to live situations facing the drone and have it issue flight commands via UDP. In this initial section, we will control the drone with Python which can be triggered by NiFi. Apache NiFi will ingest log data that is stored as CSV files on a NiFi node connected to the drone's WiFi. This will eventually move to a dedicated embedded device running MiniFi. This is a small personal drone with less than 13 minutes of flight time per battery. This is not a commercial drone, but gives you an idea of the what you can do with drones. Drone Live Communications for Sensor Readings and Drone Control You must connect t

NiFi on Cloudera Data Platform Upgrade - April 2021

CFM 2.1.1 on CDP 7.1.6 There is a new Cloudera release of Apache NiFi now with SAML support. Apache NiFi 1.13.2.2.1.1.0 Apache NiFi Registry 0.8.0.2.1.1.0 See:    https://blog.cloudera.com/the-new-releases-of-apache-nifi-in-public-cloud-and-private-cloud/ https://docs.cloudera.com/cfm/2.1.1/release-notes/topics/cfm-component-support.html https://docs.cloudera.com/cfm/2.1.1/release-notes/topics/cfm-whats-new.html https://docs.cloudera.com/cfm/2.1.1/upgrade-paths/topics/cfm-upgrade-paths.html   For changes:    https://www.datainmotion.dev/2021/02/new-features-of-apache-nifi-1130.html Get your download on:  https://docs.cloudera.com/cfm/2.1.1/download/topics/cfm-download-locations.html To start researching for the future, take a look at some of the technical preview features around Easy Rules engine and handlers. https://docs.cloudera.com/cfm/2.1.1/release-notes/topics/cfm-technical-preview.html Make sure you use the latest possible JDK 8 as there are some bugs out there.   Use a recent v

Using Apache NiFi in OpenShift and Anywhere Else to Act as Your Global Integration Gateway

Using Apache NiFi in OpenShift and Anywhere Else to Act as Your Global Integration Gateway What does it look like? Where Can I Run This Magic Engine: Private Cloud, Public Cloud, Hybrid Cloud, VM, Bare Metal, Single Node, Laptop, Raspberry Pi or anywhere you have a 1GB of RAM and some CPU is a good place to run a powerful graphical integration and dataflow engine.   You can also run MiNiFi C++ or Java agents if you want it even smaller. Sounds Too Powerful and Expensive: Apache NiFi is Open Source and can be run freely anywhere. For What Use Cases: Microservices, Images, Deep Learning and Machine Learning Models, Structured Data, Unstructured Data, NLP, Sentiment Analysis, Semistructured Data, Hive, Hadoop, MongoDB, ElasticSearch, SOLR, ETL/ELT, MySQL CDC, MySQL Insert/Update/Delete/Query, Hosting Unlimited REST Services, Interactive with Websockets, Ingesting Any REST API, Natively Converting JSON/XML/CSV/TSV/Logs/Avro/Parquet, Excel, PDF, Word Documents, Syslog, Kafka, JMS, MQTT, TCP