FLaNK: Real-Time Transit Information For NY/NJ/CT (TRANSCOM)

 FLaNK:   Real-Time Transit Information For NY/NJ/CT (TRANSCOM)

SOURCE: XML/RSS REST ENDPOINT 
FREQUENCY:  Every Minute
DESTINATIONS:   HDFS, Kudu/Impala, Cloud, Kafka




The main source of this real-time transit updates for New Jersey, New York and Connecticut is TRANSCOM.   I will read from this datasource every minute to know about real-time traffic events that occurring on the roads and transportation systems near me.   We will be reading the feed that is in XML/RSS format and parse out the hundreds of events that come with each minutes update.   

I want to store the raw XML/RSS file in S3/ADLS2/HDFS or GCS, that's an easy step.   I will also parse and enhance this data for easier querying and tracking.

I will add to all events a unique ID and a timestamp as the data is streaming by.   I will store my data in Impala/Kudu for fast queries and upserts.   I can then build some graphs, charts and tables with Apache Hue and Cloudera Visual Applications.   I will also publish my data as AVRO enhanced with a schema to Kafka so that I can use it from Spark, Kafka Connect, Kafka Streams and Flink SQL applications.



  1. GenerateFlowFile - optional scheduler
  2. InvokeHTTP - call RSS endpoint
  3. PutHDFS - store raw data to Object or File store on premise or in the cloud via HDFS / S3 / ADLSv2 / GCP / Ozone / ...
  4.  QueryRecord - convert XML to JSON
  5. SplitJSON - break out individual events


  1. UpdateAttribute - set schema name
  2. UpdateRecord - generate an add a unique ID and timestamp
  3. UpdateRecord - clean up the point field
  4. UpdateRecord - remove garbage whitespace



  1. PutKudu - upsert new data to our Impala / Kudu table.
  2. RetryFlowFile - retry if network or other connectivity issue.

Send Messages to Kafka




Our flow has delivered many messages to our transcomevents topic as schema attached Apache Avro formatted messages.






SMM links into the Schema Registry and schema for this topic.


We use a schema for validation and as a contract between consumers and producers of these traffic events.


Since events are streaming into our Kafka topic and have a schema, we can query them with Continuous SQL with Flink SQL.  We can then run some Continuous ETL.



We could also consume this data with Structured Spark Streaming applications, Spring Boot apps, Kafka Streams, Stateless NiFi and Kafka Connect applications.

We also stored our data in Impala / Kudu for permanent storage, ad-hoc queries, analytics, Cloudera Visualizations, reports, applications and more.










It is very easy to have fast data against our agile Cloud Data Lakehouse.

Source Code



Resources


FLaNK: Using Apache Kudu As a Cache For FDA Updates

 FLaNK:  Using Apache Kudu As a Cache For FDA Updates


  1. InvokeHTTP:   We invoke the RSS feed to get our data.
  2. QueryRecord:   Convert RSS to JSON
  3. SplitJson:  Split one file into individual records.   (Should refactor to ForkRecord)
  4. EvaluateJSONPath:  Extract attributes you need.
  5. ProcessGroup for SPL Processing




We call and check the RSS feed frequently and we parse out the records from the RSS(XML) feed and check against our cache.   We use Cloudera's Real-Time Datahub with Apache Impala/Apache Kudu as our cache to see if we already received a record for that.   If it hasn't arrived yet, it will be added to the Kudu cache and processed.

SPL Processing

We use the extracted SPL to grab a detailed record for that SPL using InvokeHTTP.



We have created an Impala/Kudu table to cache this information with Apache Hue.




We use the LookupRecord to read from our cache.



If we don't have that value yet, we send it to the table for an UPSERT.


We send our raw data as XML/RSS to HDFS for archival use and audits.


We can see the results of our flow in Apache Atlas to see full governance of our solution.

So with some simple NiFi flow, we can ingest all the new updates to DailyMed and not reprocess anything that we already have.    

Resources



My Year in Review (2020)

My Year in Review (2020)

 


Last year, I thought a few things would be coming in 2020.   

What's Coming in 2020

  • Cloud Enterprise Data Platforms
  • Hybrid Cloud
  • Streaming with Flink, Kafka, NiFi
  • AI at the Edge with Microcontrollers and Small Devices
  • Voice Data In Queries
  • Event Handler as a Service (Automatic Kafka Message Reading)
  • More Powerful Parameter Based Modular Streaming 
  • Cloud First For Big Data
  • Log Handling Moves to MiNiFi
  • Full AI At The Edge with Deployable Models
  • More Powerful Edge TPU/GPU/VPU
  • Kafka is everywhere
  • Open Source UI Driven Event Engines
  • FLaNK Stack gains popularity
  • FLINK Everywhere

Some of this was deferred due to the global pandemic and I had a big miss with Voice Data and some advances in streaming getting delayed.

Here's a list bit of new news for 2020, SRM was added to Kafka DataHub in the Public Cloud:

I also just did a best of 2020 video you can check out:



Articles from 2020

Talks Around the World (Virtual)


June 11, 2020 - INRHYTHM Lightning Talk




August 13, 2020 - Real-Time Analytics



August 28, 2020 - Apache Beam Summit





September 10, 2020 - Future of Data Princeton Meetup - Flink Flank Flink Fest



September 29 - October 1, 2020 - ApacheCon








September 29, 2020 - Apache Streaming Meetup



October 9, 2020 - DevOps Stage




October 22, 2020 - Flink Forward



October 26, 2020 - Open Source Summit Europe


October 27, 2020 - AI Dev World




October 30, 2020 - Nethope Conference with Cloudera Foundation

November 10, 2020 - Variis Talk


November 25, 2020 - Big Data Conference


December 14, 2020 - Apache MXNet Day

https://www.eventbrite.com/e/apache-mxnet-day-dec-14th-900-to-500-pm-pst-tickets-127767842055#

Source Code


Slides From Events and Talks 2020


This Year's Devices

  • NVIDIA Jetson Xavier NX
  • NVIDIA Jetson Nano 2GB
  • Breakout Garden for Raspberry Pi (I2C + SPI)
  • SGP30 Air Quality Sensor Breakout
  • MLX90640 Thermal Camera Breakout – Wide angle (110°)
  • Raspberry Pi 4 - 8G RAM!

What's Coming in 2021

  • AI Driven Streaming
  • Cloudera Data Platform on GCP
  • OpenShift Powered by CDP
  • More Hybrid Cloud
  • SQL Stream Builder for Flink SQL
  • More powerful Edge AI
  • More TPUs, GPUs and specialized chips
  • More Streaming
  • More COVID datasets
  • More Robots
  • More Automation
  • More Serverless
  • Voice Data In Queries
  • K8 Everywhere... 
  • K8 Backlash

References:





Simple Change Data Capture (CDC) with SQL Selects via Apache NiFi (FLaNK)

 Simple Change Data Capture (CDC) with SQL Selects via Apache NiFi (FLaNK)


Sometimes you need real CDC and you have access to transaction change logs and you use a tool like QLIK REPLICATE or GoldenGate to pump out records to Kafka and then Flink SQL or NiFi can read them and process them.

Other times you need something easier for just some basic changes and inserts to some tables you are interested in receiving new data as events.   Apache NiFi can do this easily for you with QueryDatabaseTableRecord, you don't need to know anything but the database connection information, table name and what field may change.  NiFi will query, watch state and give you new records.   Nothing is hardcoded, parameterize those values and you have a generic Any RDBMS to Any Other Store data pipeline.   We are reading as records which means each FlowFile in NiFi can have thousands of records that we know all the fields, types and schema related information for.   This can be ones that NiFi infers the schema or ones we use from a Schema Registry like Cloudera's amazing Open Source Schema Registry.

Let's see what data is in our Postgresql table?

How to 

  1. QueryDatabaseTableRecord (we will output Json records, but could have done Parquet, XML, CSV or AVRO)
  2. UpdateAttribute - optional - set a table and schema name, can do with parameters as well.
  3. MergeRecord - optional - let's batch these up.
  4. PutORC - let's send these records to HDFS (which could be on bare metal disks, GCS, S3, Azure or ADLS).   This will build us an external hive table.




PutORC


As you can see we are looking at the "prices" table and checking maximum values to increment on the updated_on date and the item_id sequential key.  We then output JSON records.




We could then:

Add-Ons Examples

  1. PutKudu
  2. PutHDFS (send as JSON, CSV, Parquet) and build an Impala or Hive table on top as external
  3. PutHive3Streaming (Hive 3 ACID Tables)
  4. PutS3
  5. PutAzureDataLakeStorage
  6. PutHBaseRecord
  7. PublishKafkaRecord_2_* - send a copy to Kafka for Flink SQL, Spark Streaming, Spring, etc...
  8. PutBigQueryStreaming (Google)
  9. PutCassandraRecord
  10. PutDatabaseRecord - let's send to another JDBC Datastore
  11. PutDruidRecord - Druid is a cool datastore, check it out on CDP Public Cloud
  12. PutElasticSearchRecord
  13. PutMongoRecord
  14. PutSolrRecord
  15. PutRecord (to many RecordSinkServices like Databases, Kafka, Prometheus, Scripted and Site-to-Site)
  16. PutParquet (store to HDFS as Parquet files)
You can do any number or all of these or multiple copies of each to other clouds or clusters.    You can also enrichment, transformation, alerts, queries or routing.

These records can be also manipulated ETL/ELT style with Record processing in stream with options such as:

  1. QueryRecord (use Calcite ANSI SQL to query and transform records and can also change output type)
  2. JoltTransformRecord (use JOLT against any record not just JSON)
  3. LookupRecord (to match against Lookup services like caches, Kudu, REST services, ML models, HBase and more)
  4. PartitionRecord (to break up into like groups)
  5. SplitRecord (to break up record groups into records)
  6. UpdateRecord (update values in fields, often paired with LookupRecord)
  7. ValidateRecord (check against a schema and check for extra fields)
  8. GeoEnrichIPRecord
  9.  ConvertRecord (change between types like JSON to CSV)  

When you use PutORC, it will give you the details on building your external table.   You can do a PutHiveQL to auto-build this table, but most companies want this done by a DBA.

CREATE EXTERNAL TABLE IF NOT EXISTS `pricesorc` (`item_id` BIGINT, `price` DOUBLE, `created_on` BIGINT, `updated_on` BIGINT)
STORED AS ORC
LOCATION '
/user/tspann/prices'


Part 2

REST to Database

Let's reverse this now.   Sometimes you want to take data, say from a REST service and store it to a JDBC datastore.

  1. InvokeHTTP (read from a REST endpoint)
  2. PutDatabaseRecord (put JSON to our JDBC store).
That's it to store data to a database.  We could add some of the ETL/ELT enrichments mentioned above 
or others that manipulate content.




REST Output



Database Connection Pool

Get the REST Data

PutDatabaseRecord



From ApacheCon 2020, John Kuchmek does a great talk on Incrementally Streaming RDBMS Data.



Resources



That's it.  Happy Holidays!