Building a Real-Time IoT Application with Apache Pulsar and Apache Pinot

 Building a Real-Time IoT Application with Apache Pulsar and Apache Pinot

pulsar-thermal-pinot

Apache Pulsar — Apache Pinot — Thermal Sensor Data

Meetup December 2022

https://www.meetup.com/new-york-city-apache-pulsar-meetup/events/289817171/

Source Code

https://github.com/tspannhw/pulsar-thermal-pinot

Access Docker Container

docker exec -it pinot-controller /bin/bash

Build a Schema From Data

docker exec -it pinot-controller bin/pinot-admin.sh JsonToPinotSchema \
-timeColumnName ts \
-metrics "temperature,humidity,co2,totalvocppb,equivalentco2ppm,pressure,temperatureicp,cputempf"\
-dimensions "host,ipaddress" \
-pinotSchemaName=thermal \
-jsonFile=/data/thermal.json \
-outputDir=/config

Consume Data in Pulsar

bin/pulsar-client consume "persistent://public/default/thermalsensors" -s "thrmlsnosconsumer" -n 0

DevOps Pulsar

curl http://localhost:8080/admin/v2/persistent/public/default
curl http://localhost:8080/admin/v2/persistent/public/default/thermalsensors-partition-0/stats

Data

{
"uuid": "thrml_qsx_20221121215610",
"ipaddress": "192.168.1.179",
"cputempf": 115,
"runtime": 0,
"host": "thermal",
"hostname": "thermal",
"macaddress": "e4:5f:01:7c:3f:34",
"endtime": "1669067770.6400402",
"te": "0.0005550384521484375",
"cpu": 4.5,
"diskusage": "102676.2 MB",
"memory": 9.7,
"rowid": "20221121215610_8e753591-cb7c-4e1c-886d-85cb3dba6c50",
"systemtime": "11/21/2022 16:56:15",
"ts": 1669067775,
"starttime": "11/21/2022 16:56:10",
"datetimestamp": "2022-11-21 21:56:14.404291+00:00",
"temperature": 27.9069,
"humidity": 24.89,
"co2": 698.0,
"totalvocppb": 0.0,
"equivalentco2ppm": 65535.0,
"pressure": 102048.65,
"temperatureicp": 82.0
}

Continuous Analytics with Flink SQL (Pulsar-Flink 1.15+ Connector)

Reference: https://github.com/tspannhw/pulsar-transit-function

CREATE CATALOG pulsar WITH (
'type' = 'pulsar-catalog',
'catalog-service-url' = 'pulsar://localhost:6650',
'catalog-admin-url' = 'http://localhost:8080'
);
SHOW CURRENT DATABASE;
SHOW DATABASES;
USE CATALOG pulsar;set table.dynamic-table-options.enabled = true;show databases;use `public/default`;SHOW TABLES;describe `thermalsensors`;show create table `thermalsensors`;
CREATE TABLE `pulsar`.`public/default`.`thermalsensors` (
`uuid` VARCHAR(2147483647) NOT NULL,
`ipaddress` VARCHAR(2147483647) NOT NULL,
`cputempf` INT NOT NULL,
`runtime` INT NOT NULL,
`host` VARCHAR(2147483647) NOT NULL,
`hostname` VARCHAR(2147483647) NOT NULL,
`macaddress` VARCHAR(2147483647) NOT NULL,
`endtime` VARCHAR(2147483647) NOT NULL,
`te` VARCHAR(2147483647) NOT NULL,
`cpu` FLOAT NOT NULL,
`diskusage` VARCHAR(2147483647) NOT NULL,
`memory` FLOAT NOT NULL,
`rowid` VARCHAR(2147483647) NOT NULL,
`systemtime` VARCHAR(2147483647) NOT NULL,
`ts` INT NOT NULL,
`starttime` VARCHAR(2147483647) NOT NULL,
`datetimestamp` VARCHAR(2147483647) NOT NULL,
`temperature` FLOAT NOT NULL,
`humidity` FLOAT NOT NULL,
`co2` FLOAT NOT NULL,
`totalvocppb` FLOAT NOT NULL,
`equivalentco2ppm` FLOAT NOT NULL,
`pressure` FLOAT NOT NULL,
`temperatureicp` FLOAT NOT NULL
) WITH (
'connector' = 'pulsar',
'topics' = 'persistent://public/default/thermalsensors',
'format' = 'json',
'admin-url' = 'http://localhost:8080',
'service-url' = 'pulsar://localhost:6650'
)
select * from thermalsensors;

Create Postgresql Table

CREATE TABLE "public"."thermalalerts" (
systemtime VARCHAR(256),
humidity FLOAT,
temperature FLOAT,
uuid VARCHAR(256),
co2 FLOAT,
datetimestamp VARCHAR(256),
rowid VARCHAR(256),
diskusage VARCHAR(256)
);

Delete Table and Delete Schema

curl -X DELETE "http://localhost:9000/tables/thermal?type=realtime" -H "accept: application/json"
curl -X DELETE "http://localhost:9000/schemas/thermal" -H "accept: application/json"

Add our schema

docker exec -it pinot-controller bin/pinot-admin.sh AddSchema   \
-schemaFile /config/thermalschema.json \
-exec

Add Table Via Swagger UI / Curl

curl -X POST "http://localhost:9000/tables" -H "accept: application/json" -H "Content-Type: application/json" -d "{ \"tableName\": \"thermal\", \"tableType\": \"REALTIME\", \"segmentsConfig\": { \"timeColumnName\": \"ts\", \"schemaName\": \"thermal\", \"replication\": \"1\", \"replicasPerPartition\": \"1\" }, \"ingestionConfig\": { \"batchIngestionConfig\": { \"segmentIngestionType\": \"APPEND\", \"segmentIngestionFrequency\": \"DAILY\" } }, \"tableIndexConfig\": { \"loadMode\": \"MMAP\", \"streamConfigs\": { \"streamType\": \"pulsar\", \"stream.pulsar.topic.name\": \"persistent://public/default/thermalsensors\", \"stream.pulsar.bootstrap.servers\": \"pulsar://Timothys-MBP:6650\", \"stream.pulsar.consumer.type\": \"lowlevel\", \"stream.pulsar.fetch.timeout.millis\": \"10000\", \"stream.pulsar.consumer.prop.auto.offset.reset\": \"largest\", \"stream.pulsar.consumer.factory.class.name\": \"org.apache.pinot.plugin.stream.pulsar.PulsarConsumerFactory\", \"stream.pulsar.decoder.class.name\": \"org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder\", \"realtime.segment.flush.threshold.rows\": \"0\", \"realtime.segment.flush.threshold.time\": \"1h\", \"realtime.segment.flush.threshold.segment.size\": \"5M\" } }, \"tenants\": {}, \"metadata\": {}}"

Defining Pulsar-Pinot Realtime Table

If you use stream.pulsar.consumer.prop.auto.offset.reset=smallest than it goes back earliest which can be a lot of data.

https://docs.pinot.apache.org/basics/data-import/pinot-stream-ingestion/apache-pulsar

This could be millions or billions of records.

Pinot Cluster

Query Console Table Schema

Query Console Table Information

Query Console SQL results

Adding a Realtime Table via REST AI / Swagger Docs

Apache Pinot Query

select systemtime, totalvocppb, temperature, cputempf, humidity, co2, equivalentco2ppm,
pressure, temperatureicp, ts, datetimestamp, cpu, diskusage, memory, rowid
from thermal
order by ts desc
limit 200

Superset + Pinot

Run this to initialize: https://github.com/kbastani/climate-change-analysis/blob/master/docker/docker-init.sh

Let’s Explore and Visualize Apache Pinot Data

Add a Database

Configure a Pinot dataset

Query and Validate the dataset

Create a new chart from that dataset

Save the chart to a dashboard

Build a dashboard of charts and markdown

Build more charts

Build more charts

Superset CSV Chart Extract

Video Preview

https://youtu.be/KMbTlmoDXXA

Pulsar to NiFi

NiFI to Postgresql

Postgresql Exploration

References