Oracle Golden Gate to Apache Kafka to Apache NiFi to JDBC Data Sink

Oracle -> GoldenGate -> Apache Kafka -> Apache NiFi / Hortonworks Schema Registry -> JDBC Database
Sometimes you need to process any number of table changes sent from tools via Apache Kafka. As long as they have proper header data and records in JSON, it's really easy in Apache NiFi.
Requirements:
  1. Process Each Partition Separately
  2. Process Records in Order as each message is an Insert, Update or Delete to an existing table in our receiving JDBC store.
  3. Re-process if data lost
For The Main Processor for Routing, It must only run on the Primary Node.
Enforcing Order
We use the Kafka.Offset to order the records, which makes sense in Apache Kafka topics.
After Insert, Update, Delete queries are built, let's confirm and enforce that strict ordering.
To further confirm processing in order, we make each connection in the flow FirstInFirstOutPrioritizer.

To Route, We Route Each Partition to A Different Processor Group (One Local, The Other Remote)
Let's Store Some Data in HDFS for each Table
Connect To Kafka and Grab From our Topic
Let's Connect to our JDBC Store
Let's do an Update (Table Name is Dynamic)
The Jolt Processor has an awesome tester for trying out Jolt

Make sure we connect our remote partitions

Routing From Routing Server (Primary Node)
For Processing Partition 0 (Run on the Routing Server)
We infer the schema with our InferAvroSchema, so we don't need to know the embedded table layouts before a record arrives. In production it makes sense to know all these in advance and do integration tests and versioning of schemas. This is where Hortonworks Scheme Registry is awesome. We name the avro record after the table dynamically. We can get and store permanent schema in the Hortonworks Schema Registry.
Process The Next Partition 1 .. (We can have one server or cluster per partition)
Process the Partition 1 Kafka Records from the Topic

This Flow Will Convert Our Embedded JSON Table Record into New SQL
Input: {"ID":2001,"GD":"F","DPTID":2,"FIRSTNAME":"Tim","LAST":"Spann"}
Output: INSERT INTO THETABLE (ID, GD, DPTID, FIRSTNAME, LAST) VALUES (?, ?, ?, ?, ?)
sql.args.5.value Spann
sql.table THETABLE
With all the field being parameters for a SQL Injection safe parameter based insert, update or delete based on control sent.
Golden Gate Messages
{"table": "SCHEMA1.TABLE7","op_type": "I","op_ts": "2017-11-01 04:31:56.000000","current_ts": "2017-11-01T04:32:04.754000","pos": "00000000310000020884","after": {"ID":1,"CODE": "B","NAME":"STUFF","DESCR" :"Department","ACTIVE":1}}
Using a simple EvaluateJsonPath we pull out these control fields, example: $.before.
The Table Name for ConvertJSONtoSQL${table:substringAfter('.')}. This is to remove all leading schema / tablespace name. From the drop down for each of the three we pick either UPDATEINSERT or DELETE based on the op_type.
We follow this with a PutSQL which will execute on our destination JDBC database sink.
After that I collect all the attributes convert them to a JSON flowfile and save that to HDFS for logging and reporting. This step could be skipped or could be in another format or sent elsewhere.
Control Fields
pos: position
table: table to update in the data warehouse
current_ts: time stamp
op_ts: time stamp
op_type: operation type (I – insert, U- update, D – delete)
Important Apache NiFi System Fields
kafka.offset
kafka.partition
kafka.topic
We can Route and process these for special handling.
To Create HDFS Directories for Changes
  1. su hdfs <br>hdfs dfs -mkdir -p /new/T1 <br>hdfs dfs -mkdir -p /new/T2 <br>hdfs dfs -mkdir -p /poc/T3
  2. hdfs dfs -chmod -R 777 /new <br>hdfs dfs -ls -R /new
To Create a Test Apache Kafka Topic
  1. ./bin/kafka-topics.sh --create \
  2. --zookeeper localhost:2181 \
  3. --replication-factor 1 \
  4. --partitions 2 \
  5. --topic goldengate

Creating a MYSQL Database As Recipient JDBC Server
  1. wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.45.tar.gz
  2. mysql
  3. create database mydw;
  4. CREATE USER 'nifi'@'%' IDENTIFIED BY 'MyPassWordIsSoAwesome!!!!';
  5. GRANT ALL PRIVILEGES ON *.* TO 'nifi'@'%' WITH GRANT OPTION;
  6. commit;
  7. SHOW GRANTS FOR 'nifi'@'%';
  8.  
  9.  
  10. #Create some tables in the database for your records.
  11.  
  12.  
  13. create table ALOG (
  14. AID VARCHAR(1),
  15. TIMESEC INT,
  16. SOMEVAL VARCHAR(255),
  17. PRIMARY KEY (AID, TIMESEC)
  18. );
Jolt Filter
Attribute: afterJolt
${op_type:equalsIgnoreCase("D"):ifElse("none", "after")}
Attribute: beforeJolt
${op_type:equalsIgnoreCase("D"):ifElse("before", "none")}
Jolt Script to Transform JSON
  1. [ {
  2. "operation": "shift",
  3. "spec": {
  4. "${beforeJolt}": {
  5. "*": "&"
  6. },
  7. "${afterJolt}": {
  8. "*": "&"
  9. }
  10. }
  11. }, {
  12. "operation": "shift",
  13. "spec": {
  14. "*": "&"
  15. }
  16. } ]

Primary Node Flow Template

Partition X Node Flow Template

References:

Integration Testing for Apache NiFi Development

There are many ways to generate decent data to work with.

One good testing option is using REST APIs.

https://mockaroo.com/

https://randomuser.me/api

Another option is using a generator to generate CSV or JSON files.

https://www.generatedata.com/

There are also external data generators.

https://www.tomaszezula.com/2016/12/04/proxy-log-generator-to-load-test-nifi/

https://github.com/FINRAOS/DataGenerator#quick-start

There are also NiFi processors for generating data.

https://github.com/hashmapinc/nifi-simulator-bundle

The most common way to test your flows is with the GenerateFlowFile processor which lets you send valid flow files into your flow at a schedule or in rapid fire secession.

https://www.xenonstack.com/blog/test-driven-development-big-data/

For my example, I am using NiFi Expression Language to generate some data.




Example Expression Language in GenerateFlowFile

{"id": "${now():format("yyyyMMddHHmmss")}_${UUID()}_${thread()}",
"te": "0.${random():mod(100000):plus(1)}",
"diskusage": "${math("random")}.3 MB",
"memory": ${random():mod(95):plus(10)},
"cpu": ${nextInt()}.${random():mod(99):plus(1)},
"host": "${ip()}/${hostname(true)}",
"temperature": "${random():mod(60):plus(60)}",
"macaddress": "${UUID()}",
"end": "${random():mod(100000000000000):plus(1)}",
"systemtime": "${now():format("MM/dd/yyyy HH:mm:ss", "EST")}"}

Example JSON Produced

{"id": "20190425131936_f061de76-edaf-4d9e-a144-2aeff2b1576a_Timer-Driven Process Thread-3",
"te": "0.28235",
"diskusage": "0.05997607531046045.3 MB",
"memory": 58,
"cpu": 0.52,
"host": "192.168.1.193/192.168.1.193",
"temperature": "136",
"macaddress": "db00aef2-b242-4483-a552-223d74133aa5",
"end": "18296140941736",
"systemtime": "04/25/2019 12:19:36"}


References

https://www.nifi.rocks/developing-a-custom-apache-nifi-processor-unit-tests-partI/

https://community.hortonworks.com/questions/151190/generate-sequence-number-in-apache-nifi.html

https://datamelt.weebly.com/blog/nifi-processor-generatedata

https://github.com/uwegeercken/nifi_processors

https://medium.com/hashmapinc/its-here-an-apache-nifi-simulator-for-generating-random-and-realistic-time-series-data-d7e463aa5e78

https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.6.0/org.apache.nifi.processors.standard.GenerateFlowFile/

https://github.com/tspannhw?tab=overview&from=2019-03-01&to=2019-03-31



Simple Apache NiFi Operations Dashboard

Simple Apache NiFi Operations Dashboard
This is an evolving work in progress, please get involved everything is open source. @milind pandit and I are working on a project to build something useful for teams to analyze their flows, current cluster state, start and stop flows and have a rich one look dashboard.
There's a lot of data provided by Apache NiFi and related tools to aggregate, sort, categorize, search and eventually do machine learning analytics on.
There are a lot of tools that come out of the box that solve parts of these problems. Ambari Metrics, Grafana and Log Search provide a ton of data and analysis abilities. You can find all your errors easily in Log Search and see nice graphs of what is going on in Ambari Metrics and Grafana.
What is cool with Apache NiFi is that is has SitetoSite tasks for sending all the provenance, analytics, metrics and operational data you need to wherever you want it. That includes to Apache NiFi! This is Monitoring Driven Development (MDD).

Monitoring Driven Development (MDD)
In this little proof of concept work, we grab some of these flows process them in Apache NiFi and then store them in Apache Hive 3 tables for analytics. We should probably push the data to HBase for aggregates and Druid for time series. We will see as this expands.
There are also other data access options including the NiFi REST API and the NiFi Python APIs.
Boostrap Notifier
Reporting Tasks
  • AmbariReportingTask (global, per process group)
  • MonitorDiskUsage(Flowfile, content, provenance repositories)
  • MonitorMemory
Monitor Disk Usage
MonitorActivity
See:
These are especially useful for doing things like purging connections.
Purge it!
  • nipyapi.canvas.purge_connection(con_id)
  • nipyapi.canvas.purge_process_group(process_group, stop=False)
  • nipyapi.canvas.delete_process_group(process_group, force=True, refresh=True)


Use Cases
Example Metrics Data
  1. [ {
  2. "appid" : "nifi",
  3. "instanceid" : "7c84501d-d10c-407c-b9f3-1d80e38fe36a",
  4. "hostname" : "princeton1.field.hortonworks.com",
  5. "timestamp" : 1539411679652,
  6. "loadAverage1min" : 0.93,
  7. "availableCores" : 16,
  8. "FlowFilesReceivedLast5Minutes" : 14,
  9. "BytesReceivedLast5Minutes" : 343779,
  10. "FlowFilesSentLast5Minutes" : 0,
  11. "BytesSentLast5Minutes" : 0,
  12. "FlowFilesQueued" : 59952,
  13. "BytesQueued" : 294693938,
  14. "BytesReadLast5Minutes" : 241681,
  15. "BytesWrittenLast5Minutes" : 398753,
  16. "ActiveThreads" : 2,
  17. "TotalTaskDurationSeconds" : 273,
  18. "TotalTaskDurationNanoSeconds" : 273242860763,
  19. "jvmuptime" : 224997,
  20. "jvmheap_used" : 5.15272616E8,
  21. "jvmheap_usage" : 0.9597700387239456,
  22. "jvmnon_heap_usage" : -5.1572632E8,
  23. "jvmthread_statesrunnable" : 11,
  24. "jvmthread_statesblocked" : 2,
  25. "jvmthread_statestimed_waiting" : 26,
  26. "jvmthread_statesterminated" : 0,
  27. "jvmthread_count" : 242,
  28. "jvmdaemon_thread_count" : 125,
  29. "jvmfile_descriptor_usage" : 0.0709,
  30. "jvmgcruns" : null,
  31. "jvmgctime" : null
  32. } ]
Example Status Data
  1. {
  2. "statusId" : "a63818fe-dbd2-44b8-af53-eaa27fd9ef05",
  3. "timestampMillis" : "2018-10-18T20:54:38.218Z",
  4. "timestamp" : "2018-10-18T20:54:38.218Z",
  5. "actorHostname" : "princeton1.field.hortonworks.com",
  6. "componentType" : "RootProcessGroup",
  7. "componentName" : "NiFi Flow",
  8. "parentId" : null,
  9. "platform" : "nifi",
  10. "application" : "NiFi Flow",
  11. "componentId" : "7c84501d-d10c-407c-b9f3-1d80e38fe36a",
  12. "activeThreadCount" : 1,
  13. "flowFilesReceived" : 1,
  14. "flowFilesSent" : 0,
  15. "bytesReceived" : 1661,
  16. "bytesSent" : 0,
  17. "queuedCount" : 18,
  18. "bytesRead" : 0,
  19. "bytesWritten" : 1661,
  20. "bytesTransferred" : 16610,
  21. "flowFilesTransferred" : 10,
  22. "inputContentSize" : 0,
  23. "outputContentSize" : 0,
  24. "queuedContentSize" : 623564,
  25. "activeRemotePortCount" : null,
  26. "inactiveRemotePortCount" : null,
  27. "receivedContentSize" : null,
  28. "receivedCount" : null,
  29. "sentContentSize" : null,
  30. "sentCount" : null,
  31. "averageLineageDuration" : null,
  32. "inputBytes" : null,
  33. "inputCount" : 0,
  34. "outputBytes" : null,
  35. "outputCount" : 0,
  36. "sourceId" : null,
  37. "sourceName" : null,
  38. "destinationId" : null,
  39. "destinationName" : null,
  40. "maxQueuedBytes" : null,
  41. "maxQueuedCount" : null,
  42. "queuedBytes" : null,
  43. "backPressureBytesThreshold" : null,
  44. "backPressureObjectThreshold" : null,
  45. "isBackPressureEnabled" : null,
  46. "processorType" : null,
  47. "averageLineageDurationMS" : null,
  48. "flowFilesRemoved" : null,
  49. "invocations" : null,
  50. "processingNanos" : null
  51. }

Example Failure Data
  1. [ {
  2. "objectId" : "34c3249c-4a42-41ce-b94e-3563409ad55b",
  3. "platform" : "nifi",
  4. "project" : null,
  5. "bulletinId" : 28321,
  6. "bulletinCategory" : "Log Message",
  7. "bulletinGroupId" : "0b69ea51-7afb-32dd-a7f4-d82b936b37f9",
  8. "bulletinGroupName" : "Monitoring",
  9. "bulletinLevel" : "ERROR",
  10. "bulletinMessage" : "QueryRecord[id=d0258284-69ae-34f6-97df-fa5c82402ef3] Unable to query StandardFlowFileRecord[uuid=cd305393-f55a-40f7-8839-876d35a2ace1,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1539633295746-10, container=default, section=10], offset=95914, length=322846],offset=0,name=783936865185030,size=322846] due to Failed to read next record in stream for StandardFlowFileRecord[uuid=cd305393-f55a-40f7-8839-876d35a2ace1,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1539633295746-10, container=default, section=10], offset=95914, length=322846],offset=0,name=783936865185030,size=322846] due to -40: org.apache.nifi.processor.exception.ProcessException: Failed to read next record in stream for StandardFlowFileRecord[uuid=cd305393-f55a-40f7-8839-876d35a2ace1,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1539633295746-10, container=default, section=10], offset=95914, length=322846],offset=0,name=783936865185030,size=322846] due to -40",
  11. "bulletinNodeAddress" : null,
  12. "bulletinNodeId" : "91ab706b-5d92-454e-bc7a-6911d155fdca",
  13. "bulletinSourceId" : "d0258284-69ae-34f6-97df-fa5c82402ef3",
  14. "bulletinSourceName" : "QueryRecord",
  15. "bulletinSourceType" : "PROCESSOR",
  16. "bulletinTimestamp" : "2018-10-18T20:54:39.179Z"
  17. } ]

Apache Hive 3 Tables
  1. CREATE EXTERNAL TABLE IF NOT EXISTS failure (statusId STRING, timestampMillis BIGINT, `timestamp` STRING, actorHostname STRING, componentType STRING, componentName STRING, parentId STRING, platform STRING, `application` STRING, componentId STRING, activeThreadCount BIGINT, flowFilesReceived BIGINT, flowFilesSent BIGINT, bytesReceived BIGINT, bytesSent BIGINT, queuedCount BIGINT, bytesRead BIGINT, bytesWritten BIGINT, bytesTransferred BIGINT, flowFilesTransferred BIGINT, inputContentSize BIGINT, outputContentSize BIGINT, queuedContentSize BIGINT, activeRemotePortCount BIGINT, inactiveRemotePortCount BIGINT, receivedContentSize BIGINT, receivedCount BIGINT, sentContentSize BIGINT, sentCount BIGINT, averageLineageDuration BIGINT, inputBytes BIGINT, inputCount BIGINT, outputBytes BIGINT, outputCount BIGINT, sourceId STRING, sourceName STRING, destinationId STRING, destinationName STRING, maxQueuedBytes BIGINT, maxQueuedCount BIGINT, queuedBytes BIGINT, backPressureBytesThreshold BIGINT, backPressureObjectThreshold BIGINT, isBackPressureEnabled STRING, processorType STRING, averageLineageDurationMS BIGINT, flowFilesRemoved BIGINT, invocations BIGINT, processingNanos BIGINT) STORED AS ORC
  2. LOCATION '/failure';
  3.  
  4. CREATE EXTERNAL TABLE IF NOT EXISTS bulletin (objectId STRING, platform STRING, project STRING, bulletinId BIGINT, bulletinCategory STRING, bulletinGroupId STRING, bulletinGroupName STRING, bulletinLevel STRING, bulletinMessage STRING, bulletinNodeAddress STRING, bulletinNodeId STRING, bulletinSourceId STRING, bulletinSourceName STRING, bulletinSourceType STRING, bulletinTimestamp STRING) STORED AS ORC
  5. LOCATION '/error';
  6.  
  7.  
  8. CREATE EXTERNAL TABLE IF NOT EXISTS memory (objectId STRING, platform STRING, project STRING, bulletinId BIGINT, bulletinCategory STRING, bulletinGroupId STRING, bulletinGroupName STRING, bulletinLevel STRING, bulletinMessage STRING, bulletinNodeAddress STRING, bulletinNodeId STRING, bulletinSourceId STRING, bulletinSourceName STRING, bulletinSourceType STRING, bulletinTimestamp STRING) STORED AS ORC
  9. LOCATION '/memory'
  10. ;
  11.  
  12.  
  13. // backpressure
  14. CREATE EXTERNAL TABLE IF NOT EXISTS status (statusId STRING, timestampMillis BIGINT, `timestamp` STRING, actorHostname STRING, componentType STRING, componentName STRING, parentId STRING, platform STRING, `application` STRING, componentId STRING, activeThreadCount BIGINT, flowFilesReceived BIGINT, flowFilesSent BIGINT, bytesReceived BIGINT, bytesSent BIGINT, queuedCount BIGINT, bytesRead BIGINT, bytesWritten BIGINT, bytesTransferred BIGINT, flowFilesTransferred BIGINT, inputContentSize BIGINT, outputContentSize BIGINT, queuedContentSize BIGINT, activeRemotePortCount BIGINT, inactiveRemotePortCount BIGINT, receivedContentSize BIGINT, receivedCount BIGINT, sentContentSize BIGINT, sentCount BIGINT, averageLineageDuration BIGINT, inputBytes BIGINT, inputCount BIGINT, outputBytes BIGINT, outputCount BIGINT, sourceId STRING, sourceName STRING, destinationId STRING, destinationName STRING, maxQueuedBytes BIGINT, maxQueuedCount BIGINT, queuedBytes BIGINT, backPressureBytesThreshold BIGINT, backPressureObjectThreshold BIGINT, isBackPressureEnabled STRING, processorType STRING, averageLineageDurationMS BIGINT, flowFilesRemoved BIGINT, invocations BIGINT, processingNanos BIGINT) STORED AS ORC
  15. LOCATION '/status';
  16.  
  17.  
  18.  
  19.  
  20.  
  21.  
  22.  
  23.