Building a Real-Time IoT Application with Apache Pulsar and Apache Pinot

 Building a Real-Time IoT Application with Apache Pulsar and Apache Pinot

pulsar-thermal-pinot

Apache Pulsar — Apache Pinot — Thermal Sensor Data

Meetup December 2022

https://www.meetup.com/new-york-city-apache-pulsar-meetup/events/289817171/

Source Code

https://github.com/tspannhw/pulsar-thermal-pinot

Access Docker Container

docker exec -it pinot-controller /bin/bash

Build a Schema From Data

docker exec -it pinot-controller bin/pinot-admin.sh JsonToPinotSchema \
-timeColumnName ts \
-metrics "temperature,humidity,co2,totalvocppb,equivalentco2ppm,pressure,temperatureicp,cputempf"\
-dimensions "host,ipaddress" \
-pinotSchemaName=thermal \
-jsonFile=/data/thermal.json \
-outputDir=/config

Consume Data in Pulsar

bin/pulsar-client consume "persistent://public/default/thermalsensors" -s "thrmlsnosconsumer" -n 0

DevOps Pulsar

curl http://localhost:8080/admin/v2/persistent/public/default
curl http://localhost:8080/admin/v2/persistent/public/default/thermalsensors-partition-0/stats

Data

{
"uuid": "thrml_qsx_20221121215610",
"ipaddress": "192.168.1.179",
"cputempf": 115,
"runtime": 0,
"host": "thermal",
"hostname": "thermal",
"macaddress": "e4:5f:01:7c:3f:34",
"endtime": "1669067770.6400402",
"te": "0.0005550384521484375",
"cpu": 4.5,
"diskusage": "102676.2 MB",
"memory": 9.7,
"rowid": "20221121215610_8e753591-cb7c-4e1c-886d-85cb3dba6c50",
"systemtime": "11/21/2022 16:56:15",
"ts": 1669067775,
"starttime": "11/21/2022 16:56:10",
"datetimestamp": "2022-11-21 21:56:14.404291+00:00",
"temperature": 27.9069,
"humidity": 24.89,
"co2": 698.0,
"totalvocppb": 0.0,
"equivalentco2ppm": 65535.0,
"pressure": 102048.65,
"temperatureicp": 82.0
}

Continuous Analytics with Flink SQL (Pulsar-Flink 1.15+ Connector)

Reference: https://github.com/tspannhw/pulsar-transit-function

CREATE CATALOG pulsar WITH (
'type' = 'pulsar-catalog',
'catalog-service-url' = 'pulsar://localhost:6650',
'catalog-admin-url' = 'http://localhost:8080'
);
SHOW CURRENT DATABASE;
SHOW DATABASES;
USE CATALOG pulsar;set table.dynamic-table-options.enabled = true;show databases;use `public/default`;SHOW TABLES;describe `thermalsensors`;show create table `thermalsensors`;
CREATE TABLE `pulsar`.`public/default`.`thermalsensors` (
`uuid` VARCHAR(2147483647) NOT NULL,
`ipaddress` VARCHAR(2147483647) NOT NULL,
`cputempf` INT NOT NULL,
`runtime` INT NOT NULL,
`host` VARCHAR(2147483647) NOT NULL,
`hostname` VARCHAR(2147483647) NOT NULL,
`macaddress` VARCHAR(2147483647) NOT NULL,
`endtime` VARCHAR(2147483647) NOT NULL,
`te` VARCHAR(2147483647) NOT NULL,
`cpu` FLOAT NOT NULL,
`diskusage` VARCHAR(2147483647) NOT NULL,
`memory` FLOAT NOT NULL,
`rowid` VARCHAR(2147483647) NOT NULL,
`systemtime` VARCHAR(2147483647) NOT NULL,
`ts` INT NOT NULL,
`starttime` VARCHAR(2147483647) NOT NULL,
`datetimestamp` VARCHAR(2147483647) NOT NULL,
`temperature` FLOAT NOT NULL,
`humidity` FLOAT NOT NULL,
`co2` FLOAT NOT NULL,
`totalvocppb` FLOAT NOT NULL,
`equivalentco2ppm` FLOAT NOT NULL,
`pressure` FLOAT NOT NULL,
`temperatureicp` FLOAT NOT NULL
) WITH (
'connector' = 'pulsar',
'topics' = 'persistent://public/default/thermalsensors',
'format' = 'json',
'admin-url' = 'http://localhost:8080',
'service-url' = 'pulsar://localhost:6650'
)
select * from thermalsensors;

Create Postgresql Table

CREATE TABLE "public"."thermalalerts" (
systemtime VARCHAR(256),
humidity FLOAT,
temperature FLOAT,
uuid VARCHAR(256),
co2 FLOAT,
datetimestamp VARCHAR(256),
rowid VARCHAR(256),
diskusage VARCHAR(256)
);

Delete Table and Delete Schema

curl -X DELETE "http://localhost:9000/tables/thermal?type=realtime" -H "accept: application/json"
curl -X DELETE "http://localhost:9000/schemas/thermal" -H "accept: application/json"

Add our schema

docker exec -it pinot-controller bin/pinot-admin.sh AddSchema   \
-schemaFile /config/thermalschema.json \
-exec

Add Table Via Swagger UI / Curl

curl -X POST "http://localhost:9000/tables" -H "accept: application/json" -H "Content-Type: application/json" -d "{ \"tableName\": \"thermal\", \"tableType\": \"REALTIME\", \"segmentsConfig\": { \"timeColumnName\": \"ts\", \"schemaName\": \"thermal\", \"replication\": \"1\", \"replicasPerPartition\": \"1\" }, \"ingestionConfig\": { \"batchIngestionConfig\": { \"segmentIngestionType\": \"APPEND\", \"segmentIngestionFrequency\": \"DAILY\" } }, \"tableIndexConfig\": { \"loadMode\": \"MMAP\", \"streamConfigs\": { \"streamType\": \"pulsar\", \"stream.pulsar.topic.name\": \"persistent://public/default/thermalsensors\", \"stream.pulsar.bootstrap.servers\": \"pulsar://Timothys-MBP:6650\", \"stream.pulsar.consumer.type\": \"lowlevel\", \"stream.pulsar.fetch.timeout.millis\": \"10000\", \"stream.pulsar.consumer.prop.auto.offset.reset\": \"largest\", \"stream.pulsar.consumer.factory.class.name\": \"org.apache.pinot.plugin.stream.pulsar.PulsarConsumerFactory\", \"stream.pulsar.decoder.class.name\": \"org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder\", \"realtime.segment.flush.threshold.rows\": \"0\", \"realtime.segment.flush.threshold.time\": \"1h\", \"realtime.segment.flush.threshold.segment.size\": \"5M\" } }, \"tenants\": {}, \"metadata\": {}}"

Defining Pulsar-Pinot Realtime Table

If you use stream.pulsar.consumer.prop.auto.offset.reset=smallest than it goes back earliest which can be a lot of data.

https://docs.pinot.apache.org/basics/data-import/pinot-stream-ingestion/apache-pulsar

This could be millions or billions of records.

Pinot Cluster

Query Console Table Schema

Query Console Table Information

Query Console SQL results

Adding a Realtime Table via REST AI / Swagger Docs

Apache Pinot Query

select systemtime, totalvocppb, temperature, cputempf, humidity, co2, equivalentco2ppm,
pressure, temperatureicp, ts, datetimestamp, cpu, diskusage, memory, rowid
from thermal
order by ts desc
limit 200

Superset + Pinot

Run this to initialize: https://github.com/kbastani/climate-change-analysis/blob/master/docker/docker-init.sh

Let’s Explore and Visualize Apache Pinot Data

Add a Database

Configure a Pinot dataset

Query and Validate the dataset

Create a new chart from that dataset

Save the chart to a dashboard

Build a dashboard of charts and markdown

Build more charts

Build more charts

Superset CSV Chart Extract

Video Preview

https://youtu.be/KMbTlmoDXXA

Pulsar to NiFi

NiFI to Postgresql

Postgresql Exploration

References

FLiP Stack Weekly for 15-Jan-2023

 

15-Jan-2023

FLiP Stack Weekly

Welcome to the second newsletter of 2023. I was on vacation so a little light this week. Next week will be superheavy.

Tim Spann @PaaSDev

vacation

PODCAST

Take a look at recent podcasts in audio or video format.

https://www.buzzsprout.com/2062659/11463086-messaging-streaming-and-events-101-episode-1-of-crossing-the-streams

https://www.youtube.com/watch?v=U8aPBhlvDHU&feature=embimpwoyt

CODE + COMMUNITY

Join my meetup group NJ/NYC/Philly/Virtual. We will have a hybrid event on December 8th.

https://www.meetup.com/new-york-city-apache-pulsar-meetup/

This is Issue #66!!

https://github.com/tspannhw/FLiPStackWeekly

https://www.linkedin.com/pulse/2022-schedule-tim-spann

News

Apache Pulsar 2.11 Released!

https://pulsar.apache.org/download/

https://pulsar.apache.org/release-notes/versioned/pulsar-2.11.0/ https://pulsar.apache.org/docs/2.11.x/administration-pulsar-shell/#install-pulsar-shell

  • Pulsar Shell (https://github.com/apache/pulsar/issues/16250)
  • Multi Cloud Sync
  • Pulsar Server JDK 17
  • Python 2 support removed
  • Performance Improvements
  • HTTP Sink Function Added
  • and hundreds more...

For Pulsar Node.js client release details and downloads, visit: https://www.npmjs.com/package/pulsar-client/v/1.8.0

Release Notes are at: https://pulsar.apache.org/release-notes/versioned/pulsar-client-node-1.8.0/

Videos

https://www.youtube.com/watch?v=RWasN8h3528

Articles

https://pulsar.apache.org/blog/2023/01/10/pulsar-2022-year-in-review/

Events

DevOps

Jan 26, 2023: DevOps 2023

https://www.conf42.com/DevOps2023TimSpannmoderndatastreaming_apps

Feb 15, 2023: Scylla Summit. Virtual

https://www.scylladb.com/scylladb-summit-2023/

Feb 28, 2023: Spring One: Virtual https://tanzu.vmware.com/developer/tv/

March 3, 2023: Spring One: Virtual https://tanzu.vmware.com/developer/tv/

April 4-6, 2023: DevNexus: Atlanta, GA https://devnexus.com/

https://www.linkedin.com/pulse/schedule-2023-tim-spann-/

Tools

FLiP Stack Weekly for 06-Jan-2023

 

06-Jan-2023

FLiP Stack Weekly

Light week this week, stay tuned for talks on Spring, Apache Pinot, ScyllaDB, Apache Flink, Apache NiFi and more.

PODCAST

Take a look at recent podcasts in audio or video format.

https://www.buzzsprout.com/2062659/11463086-messaging-streaming-and-events-101-episode-1-of-crossing-the-streams

https://www.youtube.com/watch?v=U8aPBhlvDHU&feature=embimpwoyt

CODE + COMMUNITY

Join my meetup group NJ/NYC/Philly/Virtual. We will have a hybrid event on December 8th.

https://www.meetup.com/new-york-city-apache-pulsar-meetup/

This is Issue #65!!

https://github.com/tspannhw/FLiPStackWeekly

https://www.linkedin.com/pulse/2022-schedule-tim-spann

New Stuff

HTAP Virtual Summit

https://www.pingcap.com/htap-summit/auth/login/?next=/htap-summit/auth/watch/super-charging-real-time-analytics-at-scale

I am on PTO this week, but here's a few little tid bits.

https://medium.com/@tspann/2022-wrap-up-for-streaming-247cd21fd483?source=user_profile---------0----------------------------

https://medium.com/@tspann/building-real-time-schema-pipelines-from-messaging-topics-291a8d569130?source=user_profile---------2----------------------------

https://medium.com/@tspann/lets-check-our-stocks-from-finnhub-and-do-some-real-time-analytics-1b7963008e19?source=user_profile---------3----------------------------

https://medium.com/@tspann/predictions-for-streaming-in-2023-ad4d7395d714

New update on Pulsar 2.10.3

https://pulsar.apache.org/release-notes/

ARTICLES

CODE

VIDEOS

https://www.youtube.com/watch?v=MnErwxQ0q_k

TOOLS