Skip to main content

Simple Change Data Capture (CDC) with SQL Selects via Apache NiFi (FLaNK)

 Simple Change Data Capture (CDC) with SQL Selects via Apache NiFi (FLaNK)

Sometimes you need real CDC and you have access to transaction change logs and you use a tool like QLIK REPLICATE or GoldenGate to pump out records to Kafka and then Flink SQL or NiFi can read them and process them.

Other times you need something easier for just some basic changes and inserts to some tables you are interested in receiving new data as events.   Apache NiFi can do this easily for you with QueryDatabaseTableRecord, you don't need to know anything but the database connection information, table name and what field may change.  NiFi will query, watch state and give you new records.   Nothing is hardcoded, parameterize those values and you have a generic Any RDBMS to Any Other Store data pipeline.   We are reading as records which means each FlowFile in NiFi can have thousands of records that we know all the fields, types and schema related information for.   This can be ones that NiFi infers the schema or ones we use from a Schema Registry like Cloudera's amazing Open Source Schema Registry.

Let's see what data is in our Postgresql table?

How to 

  1. QueryDatabaseTableRecord (we will output Json records, but could have done Parquet, XML, CSV or AVRO)
  2. UpdateAttribute - optional - set a table and schema name, can do with parameters as well.
  3. MergeRecord - optional - let's batch these up.
  4. PutORC - let's send these records to HDFS (which could be on bare metal disks, GCS, S3, Azure or ADLS).   This will build us an external hive table.


As you can see we are looking at the "prices" table and checking maximum values to increment on the updated_on date and the item_id sequential key.  We then output JSON records.

We could then:

Add-Ons Examples

  1. PutKudu
  2. PutHDFS (send as JSON, CSV, Parquet) and build an Impala or Hive table on top as external
  3. PutHive3Streaming (Hive 3 ACID Tables)
  4. PutS3
  5. PutAzureDataLakeStorage
  6. PutHBaseRecord
  7. PublishKafkaRecord_2_* - send a copy to Kafka for Flink SQL, Spark Streaming, Spring, etc...
  8. PutBigQueryStreaming (Google)
  9. PutCassandraRecord
  10. PutDatabaseRecord - let's send to another JDBC Datastore
  11. PutDruidRecord - Druid is a cool datastore, check it out on CDP Public Cloud
  12. PutElasticSearchRecord
  13. PutMongoRecord
  14. PutSolrRecord
  15. PutRecord (to many RecordSinkServices like Databases, Kafka, Prometheus, Scripted and Site-to-Site)
  16. PutParquet (store to HDFS as Parquet files)
You can do any number or all of these or multiple copies of each to other clouds or clusters.    You can also enrichment, transformation, alerts, queries or routing.

These records can be also manipulated ETL/ELT style with Record processing in stream with options such as:

  1. QueryRecord (use Calcite ANSI SQL to query and transform records and can also change output type)
  2. JoltTransformRecord (use JOLT against any record not just JSON)
  3. LookupRecord (to match against Lookup services like caches, Kudu, REST services, ML models, HBase and more)
  4. PartitionRecord (to break up into like groups)
  5. SplitRecord (to break up record groups into records)
  6. UpdateRecord (update values in fields, often paired with LookupRecord)
  7. ValidateRecord (check against a schema and check for extra fields)
  8. GeoEnrichIPRecord
  9.  ConvertRecord (change between types like JSON to CSV)  

When you use PutORC, it will give you the details on building your external table.   You can do a PutHiveQL to auto-build this table, but most companies want this done by a DBA.

CREATE EXTERNAL TABLE IF NOT EXISTS `pricesorc` (`item_id` BIGINT, `price` DOUBLE, `created_on` BIGINT, `updated_on` BIGINT)

Part 2

REST to Database

Let's reverse this now.   Sometimes you want to take data, say from a REST service and store it to a JDBC datastore.

  1. InvokeHTTP (read from a REST endpoint)
  2. PutDatabaseRecord (put JSON to our JDBC store).
That's it to store data to a database.  We could add some of the ETL/ELT enrichments mentioned above 
or others that manipulate content.

REST Output

Database Connection Pool

Get the REST Data


From ApacheCon 2020, John Kuchmek does a great talk on Incrementally Streaming RDBMS Data.


That's it.  Happy Holidays!

Popular posts from this blog

Ingesting Drone Data From DJII Ryze Tello Drones Part 1 - Setup and Practice

Ingesting Drone Data From DJII Ryze Tello Drones Part 1 - Setup and Practice In Part 1, we will setup our drone, our communication environment, capture the data and do initial analysis. We will eventually grab live video stream for object detection, real-time flight control and real-time data ingest of photos, videos and sensor readings. We will have Apache NiFi react to live situations facing the drone and have it issue flight commands via UDP. In this initial section, we will control the drone with Python which can be triggered by NiFi. Apache NiFi will ingest log data that is stored as CSV files on a NiFi node connected to the drone's WiFi. This will eventually move to a dedicated embedded device running MiniFi. This is a small personal drone with less than 13 minutes of flight time per battery. This is not a commercial drone, but gives you an idea of the what you can do with drones. Drone Live Communications for Sensor Readings and Drone Control You must connect t

Using Apache NiFi in OpenShift and Anywhere Else to Act as Your Global Integration Gateway

Using Apache NiFi in OpenShift and Anywhere Else to Act as Your Global Integration Gateway What does it look like? Where Can I Run This Magic Engine: Private Cloud, Public Cloud, Hybrid Cloud, VM, Bare Metal, Single Node, Laptop, Raspberry Pi or anywhere you have a 1GB of RAM and some CPU is a good place to run a powerful graphical integration and dataflow engine.   You can also run MiNiFi C++ or Java agents if you want it even smaller. Sounds Too Powerful and Expensive: Apache NiFi is Open Source and can be run freely anywhere. For What Use Cases: Microservices, Images, Deep Learning and Machine Learning Models, Structured Data, Unstructured Data, NLP, Sentiment Analysis, Semistructured Data, Hive, Hadoop, MongoDB, ElasticSearch, SOLR, ETL/ELT, MySQL CDC, MySQL Insert/Update/Delete/Query, Hosting Unlimited REST Services, Interactive with Websockets, Ingesting Any REST API, Natively Converting JSON/XML/CSV/TSV/Logs/Avro/Parquet, Excel, PDF, Word Documents, Syslog, Kafka, JMS, MQTT, TCP

DevOps: Working with Parameter Contexts in Apache NiFi 1.11.4+

 DevOps:  Working with Parameter Contexts in Apache NiFi 1.11.4+ nifi list-param-contexts -u http://localhost:8080 -ot simple #   Id                                     Name             Description     -   ------------------------------------   --------------   -----------     1   3a801ff4-1f73-1836-b59c-b9fbc79ab030   backupregistry                   2   7184b9f4-0171-1000-4627-967e118f3037   health                           3   3a801faf-1f87-1836-54ba-3d913fa223ad   retail                           4   3a801fde-1f73-1836-957b-a9f4d2c9b73d   sensors                         #> nifi export-param-context -u http://localhost:8080 -verbose --paramContextId 3a801faf-1f87-1836-54ba-3d913fa223ad {   "name" : "retail",   "description" : "",   "parameters" : [ {     "parameter" : {       "name" : "allquery",       "description" : "",       "sensitive" : false,       "value"