FLaNK: Using Apache Kudu As a Cache For FDA Updates
InvokeHTTP: We invoke the RSS feed to get our data.
QueryRecord: Convert RSS to JSON
SplitJson: Split one file into individual records. (Should refactor to ForkRecord)
EvaluateJSONPath: Extract attributes you need.
ProcessGroup for SPL Processing
We call and check the RSS feed frequently and we parse out the records from the RSS(XML) feed and check against our cache. We use Cloudera's Real-Time Datahub with Apache Impala/Apache Kudu as our cache to see if we already received a record for that. If it hasn't arrived yet, it will be added to the Kudu cache and processed.
SPL Processing
We use the extracted SPL to grab a detailed record for that SPL using InvokeHTTP.
We have created an Impala/Kudu table to cache this information with Apache Hue.
We use the LookupRecord to read from our cache.
If we don't have that value yet, we send it to the table for an UPSERT.
We send our raw data as XML/RSS to HDFS for archival use and audits.
We can see the results of our flow in Apache Atlas to see full governance of our solution.
So with some simple NiFi flow, we can ingest all the new updates to DailyMed and not reprocess anything that we already have.
Simple Change Data Capture (CDC) with SQL Selects via Apache NiFi (FLaNK)
Sometimes you need real CDC and you have access to transaction change logs and you use a tool like QLIK REPLICATE or GoldenGate to pump out records to Kafka and then Flink SQL or NiFi can read them and process them.
Other times you need something easier for just some basic changes and inserts to some tables you are interested in receiving new data as events. Apache NiFi can do this easily for you with QueryDatabaseTableRecord, you don't need to know anything but the database connection information, table name and what field may change. NiFi will query, watch state and give you new records. Nothing is hardcoded, parameterize those values and you have a generic Any RDBMS to Any Other Store data pipeline. We are reading as records which means each FlowFile in NiFi can have thousands of records that we know all the fields, types and schema related information for. This can be ones that NiFi infers the schema or ones we use from a Schema Registry like Cloudera's amazing Open Source Schema Registry.
Let's see what data is in our Postgresql table?
How to
QueryDatabaseTableRecord (we will output Json records, but could have done Parquet, XML, CSV or AVRO)
UpdateAttribute - optional - set a table and schema name, can do with parameters as well.
MergeRecord - optional - let's batch these up.
PutORC - let's send these records to HDFS (which could be on bare metal disks, GCS, S3, Azure or ADLS). This will build us an external hive table.
PutORC
As you can see we are looking at the "prices" table and checking maximum values to increment on the updated_on date and the item_id sequential key. We then output JSON records.
We could then:
Add-Ons Examples
PutKudu
PutHDFS (send as JSON, CSV, Parquet) and build an Impala or Hive table on top as external
PutHive3Streaming (Hive 3 ACID Tables)
PutS3
PutAzureDataLakeStorage
PutHBaseRecord
PublishKafkaRecord_2_* - send a copy to Kafka for Flink SQL, Spark Streaming, Spring, etc...
PutBigQueryStreaming (Google)
PutCassandraRecord
PutDatabaseRecord - let's send to another JDBC Datastore
PutDruidRecord - Druid is a cool datastore, check it out on CDP Public Cloud
PutElasticSearchRecord
PutMongoRecord
PutSolrRecord
PutRecord (to many RecordSinkServices like Databases, Kafka, Prometheus, Scripted and Site-to-Site)
PutParquet (store to HDFS as Parquet files)
You can do any number or all of these or multiple copies of each to other clouds or clusters. You can also enrichment, transformation, alerts, queries or routing.
These records can be also manipulated ETL/ELT style with Record processing in stream with options such as:
QueryRecord (use Calcite ANSI SQL to query and transform records and can also change output type)
JoltTransformRecord (use JOLT against any record not just JSON)
LookupRecord (to match against Lookup services like caches, Kudu, REST services, ML models, HBase and more)
PartitionRecord (to break up into like groups)
SplitRecord (to break up record groups into records)
UpdateRecord (update values in fields, often paired with LookupRecord)
ValidateRecord (check against a schema and check for extra fields)
GeoEnrichIPRecord
ConvertRecord (change between types like JSON to CSV)
When you use PutORC, it will give you the details on building your external table. You can do a PutHiveQL to auto-build this table, but most companies want this done by a DBA.
CREATE EXTERNAL TABLE IF NOT EXISTS `pricesorc` (`item_id` BIGINT, `price` DOUBLE, `created_on` BIGINT, `updated_on` BIGINT) STORED AS ORC LOCATION '/user/tspann/prices'
Part 2
REST to Database
Let's reverse this now. Sometimes you want to take data, say from a REST service and store it to a JDBC datastore.
InvokeHTTP (read from a REST endpoint)
PutDatabaseRecord (put JSON to our JDBC store).
That's it to store data to a database. We could add some of the ETL/ELT enrichments mentioned above
or others that manipulate content.
REST Output
Database Connection Pool
Get the REST Data
PutDatabaseRecord
From ApacheCon 2020, John Kuchmek does a great talk on Incrementally Streaming RDBMS Data.
Ingesting Websocket Data for Live Stock Streams with Cloudera Flow Management Powered by Apache NiFi
The stocks I follow have a lot of trades and changes throughout the day, I would like to capture all of this data and make it available to my colleagues. I will push it to Kafka and make it available via a topic and I may also push it to Slack or Dischord or a webpage or dashboard or Cloudera Visual App dashboard. We'll see what people request.
We will read websockets from wss://ws.finnhub.io?token=YOURTOKEN. You will need to sign up for a finnhub.io account to get this data. The API is well documented and very easy to use with Apache NiFi.
As updates happen we receive websocket calls and send them to Kafka for use in Flink SQL, Kafka Connect, Spark Streaming, Kafka Streams, Python, .Java Spring Boot Apps, NET Apps and NIFi.
Definition of Fields
s
Symbol.
p
Last price.
t
UNIX milliseconds timestamp.
v
Volume.
c
List of trade conditions. A comprehensive list of trade conditions code can be found here
Incoming Websocket Text Message Processing
We parse out the fields we want, then rename them for something readable. Then we build a new JSON field that matches our trades schema then we push to Kafka.
First step we need to setup a controller pool to connect to finnhub's web socket API.
We can see data in flight via NiFi Provenance.
The detailed steps and settings for converting raw websocket text messages to final messages to send to Kafka.
Raw Data From Websockets Text Message
Formatted JSON Data Before Converting and Sending to Kafka Topic (trades)
We can view the final clean data in Kafka via Cloudera Streams Messaging Manager (SMM)