FLaNK-TravelAdvisory

 

FLaNK-TravelAdvisory

Travel Advisory - RSS Processing - Apache NiFi - Apache Kafka - Apache Flink - SQL

Overview

overview

Final Flow

overview

Adding Processors to the Designer

Here I list most of the processors available

https://www.datainmotion.dev/2023/04/dataflow-processors.html

Flow Parameters

Go to parameters and enter all you will need for the flow.

overview

You can add all the ones listed below.

overview

Flow Walk Through

If you are loading my pre-built flow when you enter you will see the details for the process group in the configuration pallet.

We add an invokeHTTP processor and set the parameters.

overview

details

Now we can add a parameter for the HTTP URL for Travel Advisories.

overview

Connect InvokeHTTP to QueryRecord. Name your connection for monitoring later.

overview

QueryRecord, convert XML(RSS) to JSON, you will need RSSXMLReader and TravelJsonRecordSetWriter.

overview

Connect QueryRecord to SplitJson if no errors.

overview

SplitJson we set the JsonPath Expression to $.*.*.item

overview

We then connect SplitJson to SplitRecord.

overview

For SplitRecord we set the Record Reader to JSON_Reader_InferRoot, the Record Writer to TravelJsonRecordSetWriter and records per split to 1.

overview

SplitRecord connected to EvaluateJSONPath

overview

overview

We set the Destination to flowfile-attribute, Return Type to json and add several new fields.

  • description - $.description
  • guid - $.guid
  • identifier - $.identifier
  • link - $.link
  • pubdate - $.pubDate
  • title - $.title

overview

We connect EvaluateJsonPath to SplitJson.

overview

For SplitJson we set the JsonPath Expression to $.category

overview

From SplitJson to UpdateRecord

overview

overview

In UpdateRecord, we set Record Reader to JSON_Reader_InferRoot and Record Writer to TravelJsonRecordSetWriter. We set Replacement Value Strategy to Literal Value.

We add new fields for our new record format.

  • /advisoryId - ${filename}
  • /description - ${description}
  • /domain - ${identifier:trim()}
  • /guid - ${guid}
  • /link - ${link}
  • /pubdate - ${pubdate}
  • /title - ${title}
  • /ts - ${now():toNumber()}
  • /uuid - ${uuid}

overview

Next we connect UpdateRecord to our Slack Sub-Processor Group

overview

The other branches flows from UpdateRecord to Write to Kafka

overview

overview

For PublishKafka2RecordCDP, there's a lot of parameters to set. This is why we recommend starting with a ReadyFlow.

There are a lot of parameters here, we need to set our Kafka Brokers, Destination Topic Name, JSON_Reader_InferRoot for Reader, AvroRecordSetWriterHWX for writer, turn transactions off, Guarantee Replicated Delivery, Use Content as Record Value, SASL_SSL/Plain security, Username to your login user id or machine user and then the associated password, the SSL Context maps to the Default NiFi SSL Context Service is built in, set uuid as the Message Key Field and finally set the client.id to a unique Kafka producer id.

overview

overview

We then send messages also to Slack about our travel advisories.

overview

We only need one processor to send to slack.

overview

We connect input to our PutSlack processor.

overview

For PutSlack we need to set the Webhook URL to the one from your Slack group admin and put the text from the ingest, set your channel to the channel mapped in the web hook and set a username for your bot.

Flow Services

services

All these services need to be set.

@copy; 2023 Tim Spann https://datainmotion.dev/

Cloud Tools Guidance: How To Build Data Assets: Create an Apache Iceberg Table

 Cloud Tools Guidance: How To Build Data Assets: Create an Apache Iceberg Table


Author: Michael Kohs  George Vetticaden Timothy Spann 

Date: 04/19/2023

Last Updated: 5/02/2023


Notice


This document assumes that you have registered for an account, activated it and logged into the CDP Sandbox.   This is for authorized users only who have attended the webinar and have read the training materials.


A short guide and references are listed here. 


1.3 Create an Apache Iceberg Table


  1. Navigate to oss-kudu-demo from the Data Hubs list






  1. Navigate to Hue from the Kudu Data Hub.



  1. Inside of Hue you can now create your table.



  1. Navigate to your database, this was created for you. 



Info:   The database name pattern is your email address and then all special characters are replaced with underscore and then _db is appended to that to make the db name and the ranger policy is created to limit access to just the user and those that are in the admin group.   For example: 






  1. Create your Apache Iceberg table, it must be prefixed with your Work Load User Name (userid).  




CREATE TABLE <<userid>>_syslog_critical_archive

(priority int, severity int, facility int, version int, event_timestamp bigint, hostname string,

body string, appName string, procid string, messageid string,

structureddata struct<sdid:struct<eventid:string,eventsource:string,iut:string>>)

STORED BY ICEBERG






  1. Your table is created in s3a://oss-uat2/iceberg/ 



  1. Once you have sent data to your table, you can query it.