Showing posts with label json. Show all posts
Showing posts with label json. Show all posts

Real-Time Irish Transit Analytics

 


Real-Time Irish Transit Analytics

Apache NiFi, Postgresql, GenAI, Apache Kafka, Apache Flink, JSON, GTFS

Let’s hop on a bus in Ireland!

We need to load static (rarely changing lookup data). We can do this with NiFi very easily. We build and insert these into new Postgresql tables.

See me here:

ChatGPT Authored Introduction:

Unlocking the Future of Transportation: Real-Time Irish Transit Analytics

In the bustling landscape of modern transportation, the ability to harness real-time data is not just a competitive advantage; it’s a necessity. In Ireland, where efficient transit systems are the lifeblood of daily commutes and city connectivity, the fusion of cutting-edge technologies is revolutionizing how we understand and optimize public transportation. This article delves into the world of Real-Time Irish Transit Analytics, where Apache NiFi, PostgreSQL, GenAI, Apache Kafka, Apache Flink, JSON, and GTFS converge to create a dynamic and responsive ecosystem.

Every day, thousands of passengers rely on Ireland’s public transit systems to navigate cities, reach work, or simply explore the beauty of the countryside. Yet, behind the scenes of this seemingly seamless operation lies a complex network of data streams, from vehicle locations to passenger counts, schedules to service updates. Here, Apache NiFi emerges as a pivotal tool, seamlessly orchestrating the flow of data from various sources into a unified pipeline.

PostgreSQL steps in as the reliable database backbone, providing a robust foundation for storing and querying vast amounts of transit data. With the power of GenAI, machine learning algorithms sift through this data trove, uncovering valuable insights into passenger behaviors, traffic patterns, and optimal routes.

But data is only as valuable as its timeliness, and this is where Apache Kafka and Apache Flink shine. Kafka acts as the real-time messaging hub, ensuring that updates from buses, trains, and stations are instantly propagated through the system. Flink’s stream processing capabilities then come into play, analyzing incoming data on the fly to generate actionable intelligence.

In the realm of data interchange, JSON (JavaScript Object Notation) emerges as the lingua franca, facilitating seamless communication between different components of the analytics ecosystem. And anchoring it all is the General Transit Feed Specification (GTFS), a standardized format for public transit schedules and geographic information, ensuring interoperability and accuracy across the board.

Join us on a journey through the intricacies of Real-Time Irish Transit Analytics, where these technologies converge to enhance efficiency, improve passenger experiences, and pave the way for the future of smart transportation.

An important source of data is the static GTFS lookup tables provided a zip file of CSV. We can download and parse this automagically in NiFi. No need to know and precreate tables. NiFi will determine the fields for you.

https://www.transportforireland.ie/transitData/Data/GTFS_Realtime.zip

GTFS Static Data Load

Skip shapes.txt as we aren’t loading those

Set a Default Primary Key

Setting All the Correct Primary Keys for all the Static Files/Tables

Split Up Tables into 1,000 Row Chunks to Make it Easier for Postgresql

We converted CSV to JSON and split up in 1 step
Loaded Results

Update the SQL Automagically

we do not manually set field names, no SQL injection here

Send this SQL to the Database

A list of Ireland Lookup Trips loaded from trips.txt

Let’s parse the real time transit information for Ireland.

GTFS Real-Time

Vehicle Positions is the primary API to get where the buses are.

API REST TEST

GET https://api.nationaltransport.ie/gtfsr/v2/gtfsr?format=json HTTP/1.1

Cache-Control: no-cache

x-api-key: dddddd

As opposed to most transit systems we have seen in GTFS and GTFS-R feeds they don’t have three types, just the two. They are missing alerts.

[ Trip Updates, Vehicle Positions]

The GTFS-R API contains real-time updates for services provided by Dublin Bus, Bus √Čireann, and Go-Ahead Ireland.

You have to sign up and subscribe to the API to use this.

x-api-key is the header for our private key

Example Vehicle Position as JSON

[ {
"recordid" : "V56",
"route_id" : "3924_62692",
"directionid" : "0",
"latitude" : "53.3537788",
"tripid" : "3924_16321",
"starttime" : "22:50:00",
"vehicleid" : "274",
"startdate" : "20240322",
"uuid" : "8a50c084-0aea-496e-b4c3-dbed373e812e",
"longitude" : "-6.40118694",
"timestamp" : "1711150967",
"ts" : "1711167213555"
} ]

Vehicle Position Slack Message

Irish Transit Tracking
Direction ${directionid}
Request ${invokehttp.request.url} ${invokehttp.status.message} ${invokehttp.tx.id}
Lat/Long ${latitude}/${longitude}
Vehicle ${vehicleid}
Route ${route_id}
Scheduled? ${scheduled}
Start Date/Time/TS ${startdate} / ${starttime} / ${timestamp}
IDs ${uuid} ${recordid} TripID ${tripid}
Scheduled: ${scheduled}

Trip Updates

Example Trip Update as JSON

{
"triptimestamp" : "1711415067",
"stopsequence" : "10",
"schedulerelationship" : "SCHEDULED",
"tripstarttime" : "21:30:00",
"stopid" : "8530B1520901",
"departuredelay" : "-104",
"tripid" : "3950_45558",
"tripschedulerelationship" : "SCHEDULED",
"tripstartdate" : "20240325",
"uuid" : "46595e37-4fdd-48db-8431-216bcabe4dd7",
"departuretime" : "",
"tripdirectionid" : "0",
"arrivaltime" : "",
"arrivaldelay" : "-104",
"triprouteid" : "3950_62756",
"ts" : "1711476673867",
"route_long_name" : "Dublin - Airport - Cavan - Donegal",
"stop_name" : "Topaz Belleek"
}

Trip Update Slack Message

Irish Transit Tracking Trip Updates
Request ${invokehttp.request.url} ${invokehttp.status.message} ${invokehttp.tx.id}
IDs ${uuid}
Arrival Delay / Time: ${arrivaldelay} / ${arrivaltime}
Departure Delay / Time: ${departuredelay} / ${departuretime}
Schedule: ${schedulerelationship} ${tripschedulerelationship}
Stop ID/Sequence: ${stopid} / ${stopsequence}
Trip Direction: ${tripdirectionid} ${tripid}
Trip Route: ${triprouteid}
Trip Start Date / Time / TS: ${tripstartdate} / ${tripstarttime} / ${triptimestamp}

Create Table in Flink

Query Kafka Topic — Flink SQL Table in SSB

Send Messages

Lookups From Postgresql Table

Finally Send Messages to Slack

NATIONAL ROADS WEATHER STATION

PUBLIC TRANSPORT DATA

LOOKUP DATA FROM GTFS

stop_idUnique ID

Primary key (trip_id, stop_sequence)

DUBLIN BIKES

RAILROAD

IRISH STATIONS

{"StationDesc":"Millstreet","StationAlias":null,
"StationLatitude":52.0776,"StationLongitude":-9.06973,
"StationCode":"MLSRT","StationId":24,"ts":"1711496919762",
"uuid":"f6e71a76-41cc-4a8e-8795-323c3b43d62f"}

IRISH TRAIN RECORD

{
"TrainStatus":"R","TrainLatitude":53.4169,
"TrainLongitude":-6.1512,"TrainCode":"P617",
"TrainDate":"27 Mar 2024",
"PublicMessage":"P617\\n16:02 - Drogheda to Dublin Pearse (1 mins late)\\nDeparted Portmarnock next stop Dublin Connolly",
"Direction":"Southbound","ts":"1711557932947",
"uuid":"b485cefb-67e8-482d-86ba-1ca43e0b523a"
}

IRISH STATION RECORD

{
"StationDesc":"Midleton",
"StationAlias":null,
"StationLatitude":51.9212,
"StationLongitude":-8.17579,
"StationCode":"MDLTN",
"StationId":68,
"ts":"1711558009615",
"uuid":"1f5ae394-4726-4f3e-8e53-7f50f95ae05e"
}

SOURCE CODE

FLINK SQL KAFKA TABLE

CREATE TABLE `ssb`.`Meetups`.`irelandvehicle` (
`recordid` VARCHAR(2147483647),
`route_id` VARCHAR(2147483647),
`directionid` VARCHAR(2147483647),
`latitude` VARCHAR(2147483647),
`tripid` VARCHAR(2147483647),
`starttime` VARCHAR(2147483647),
`vehicleid` VARCHAR(2147483647),
`startdate` VARCHAR(2147483647),
`uuid` VARCHAR(2147483647),
`longitude` VARCHAR(2147483647),
`timestamp` VARCHAR(2147483647),
`ts` VARCHAR(2147483647),
`route_long_name` VARCHAR(2147483647),
`trip_short_name` VARCHAR(2147483647),
`trip_headsign` VARCHAR(2147483647),
`eventTimeStamp` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp',
WATERMARK FOR `eventTimeStamp` AS `eventTimeStamp` - INTERVAL '3' SECOND
) WITH (
'scan.startup.mode' = 'group-offsets',
'deserialization.failure.policy' = 'ignore_and_log',
'properties.request.timeout.ms' = '120000',
'properties.auto.offset.reset' = 'earliest',
'format' = 'json',
'properties.bootstrap.servers' = 'kafka:9092',
'connector' = 'kafka',
'properties.transaction.timeout.ms' = '900000',
'topic' = 'irelandvehicle',
'properties.group.id' = 'irelandconsumersbb1'
)

RESOURCES


Streaming Data with Cloudera Data Flow (CDF) Into Public Cloud (CDP)

Streaming Data with Cloudera Data Flow (CDF) Into Public Cloud (CDP)






At Cloudera Now NYC, I showed a demo on streaming data from MQTT Sensors and Twitter that was running in AWS.   Today I am going to walk you through some of the details and give you the tools to build your own streaming demos in CDP Public Cloud.   If you missed that event, you can watch a recording here.



Let's get streaming!



Let's login, I use Okta for Single-Sign On (SSO) which makes this so easy.  Cloudera Flow Management (CFM) Apache NiFi is officially available in the CDP Public Cloud.   So get started here.   We will be following the guide (https://docs.cloudera.com/cdf-datahub/7.1.0/howto-data-ingest.html).   We are running CDF DataHub on CDP 7.1.0.

There's a lot of data engineering and streaming tasks I can accomplish with few clicks.   I can bring up a virtual datawarehouse and use tools like Apache Hue and Data Analytics Studio to examine database and tables and run queries.



We go to Data Hub Clusters and can see the latest Apache NiFi in there.   You can see we have Data Engineering, Kafka and NiFi clusters already built and ready to go.   It only takes a click, a few drop down settings and a name to build and deploy in minutes.   This saves me and my team so much time.   Thanks Cloud Team!



Kafka and NiFi Data Hub Clusters


Provision a New Data Hub - Op Db


Provision a New Data Hub - NiFi



Once build, the Kafka Data Hub is our launching place for Cloudera Manager, Schema Registry and SMM.



Provision a New Data Hub - Real-Time Data Mart



Data Engineering on AWS Details


Display Environments For Your Clouds



From the DataHub cluster that we built for CFM - Apache NiFi or for Apache Kafka I can access Cloudera Manager to do monitoring, management and other tasks that Cloudera administrators are use to like searching logs.


Let's jump into the Apache NiFi UI from CDP Data Hub.


Once I've logged into Flow Management, I can as an administrator see some of the server monitoring, metrics and administrative features and areas.





Our module for Twitter ingest on CDP Data Hub.





We can download our flow immediately and quickly sent our code to version control.




We consume MQTT messages sent from my IoT gateway that is pushing messages from multiple devices via MQTT.



Using parameters that can be set via DevOps or via Apache NiFi, we setup a reusable component to read from any MQTT broker.   Username, password, broker uri and topic are parameters that we set and can change based on any use needed.





Ingesting from Twitter is just as easy as reading from MQTT.


We can also parameterize our Twitter ingest for easy reuse.  For this twitter ingest, we have some sensitive values that are protected as well as some query terms for twitter to limit our data to airline data.




Editing parameters from the NiFi UI is super easy.





All the data passing through the nodes of my cluster.



Apache NiFi has a ton of tabs for visualizing any of the metrics of interest.

















We are setting a JSON reader for inferring any JSON data.



To write to Kafka, we have some parameters for brokers and a reader/writer for records.  We use the prebuilt "Default NiFi SSL Context Service" for SSL security.   We also need to specify:  SASL_SSL, PLAIN, your username for CDP, your password for CDP.




On a local edge server, We are publishing sensor data to MQTT.




PutHDFS Configuration (Store GZIPPED JSON Files)


Put To Hive Streaming Table


PutORC Files to CNOW3, autoconverted JSON to ORC






When we push to PutORC it will build us the DDL for an external table automatically, just grab it from data provenance.




For storing to Apache Hive 3 tables, we have to set some parameters for Hive Configuration and the metastore from our data store.





In Apache NiFi, Ranger controls policies for permissions to NiFi.  CDP creates one for NiFi Administrators which I am am member.

Version Control is preconfigured for CDP Data Hub NiFi users with the same single sign on.   Apache NiFi Registry will have all our modules and their versions.


Before we push NiFi changes to version control, you get a list of changes you made.



We can see data as it travels through Apache NiFi in it's built-in data provenance (lineage).


Let's check out our new data in Amazon S3.



We want to look at our data in Kafka, so the we can use Cloudera Streams Messaging Manager (SMM) to view, edit, monitor and manage everything Kafka.



We can build alerts for any piece of the Kafka infrastructure (broker, topics, etc...)


I want to look at the lineage, provenance and metadata for my flow from data birth to storage.   Atlas is easy to use and integrated with CDP.   Thanks to the automagic configuration done in Cloudera Enterprise Data Cloud - NiFi, Kafka, HDFS, S3, Hive, HBase and more are providing data that comes together in one easy to follow diagram powered by Graphs.



The connection to Atlas is prebuilt for you in Apache NiFi, you can take a look and see.









Using Apache Hue, I can search our tables and produce simple charts.



We push our merged ORC files to /tmp/cnow3 directory in S3 controlled by HDFS and full security for an external Hive table.




It becomes trivial to push data to S3, whether it's compressed JSON files or internal ORC files used in Hive 3 tables.




As part of our output we push sensor readings to Slack for a sampling of current device status.




We can quickly access Cloudera SMM from CDP Data Hub with a single click thanks to Single Sign On.   Once in SMM, we can build alerts based on conditions within clusters, brokers, topics, consumers, producers, replication and more.


We can look at topics and see alerts in one UI.




We can view our alert messages from our history display.


After alerts are triggered, we can see a history of them in the UI for a single pane of glass view of the system.


The Brokers screen shows me totals for all the brokers and individual broker data.


I can browse inside a topic like this one for our sensors data.   I can view the key, offset, timestamp and data.   I can view text, byte, json and AVRO formatted data.   There is also a connection to the schema it used from the Cloudera Schema Registry.



Below is an example email sent via Cloudera SMM for an alert condition on Kafka.


Before we can query the ORC files that we have stored in HDFS, we'll need to create an external Hive table.


We can use Apache Hue or Data Analytics Studio to query our new table.



If you need to connect to a machine, you can SSH into an instance.  


If you need more information, join us in the Cloud, in the Community or up close in virtual Meetups.

Additional Resources