FLiP-Py-Pi-EnviroPlus - Using Apache Pulsar with Apache Spark, Apache Flink for processing sensor data

 

FLiP-Py-Pi-EnviroPlus

FLiP-Py-Pi-EnviroPlus. Apache Flink, Apache Pulsar, Apache Spark, Python, Raspberry Pi, Enviro+ sensors. Tim streamnative

Code (https://github.com/tspannhw/FLiP-Py-Pi-EnviroPlus

client = pulsar.Client('pulsar://pulsar1:6650')
producer = client.create_producer(topic='persistent://public/default/rp4enviroplus' ,schema=JsonSchema(enviroplus),properties={"producer-name": "enviroplus-py-sensor","producer-id": "enviroplus-sensor" })

enviroRec = enviroplus()
enviroRec.adjtemp = str(adjtemp)
fa=open("/opt/demo/logs/envprec.log", "a+")
fa.write(str(enviroRec) + "\n")
fa.close()

print(enviroRec)

producer.send(enviroRec,partition_key=str(uniqueid))

Json Schema

class enviroplus(Record):
    adjtemp = String()
    adjtempf = String()
    amplitude100 = Float()
    amplitude1000 = Float()
    amplitude500 = Float()
    amps = Float()
    cpu = Float()
    cputemp = String()
    cputempf = String()
    diskusage = String()
    endtime = String()
    gasko = String()
    highnoise = Float()
    host = String()
    hostname = String()
    humidity = Float()
    ipaddress = String()
    lownoise = Float()
    lux = Float()
    macaddress = String()
    memory = Float()
    midnoise = Float()
    nh3 = Float()
    oxidising = Float()
    pressure = Float()
    proximity = Integer()
    reducing = Float()
    rowid = String()
    runtime = Integer()
    starttime = String()
    systemtime = String()
    temperature = String()
    temperaturef = String()
    ts = Integer()
    uuid = String()

Example Data

{'adjtemp': '26.7', 'adjtempf': '60.1', 'amplitude100': 1.0, 'amplitude1000': 0.2, 'amplitude500': 0.3, 'amps': 0.3, 'cpu': 0.0, 'cputemp': '45.7', 'cputempf': '114', 'diskusage': '31435.2 MB', 'endtime': '1646156801.2777877', 'gasko': 'Oxidising: 10165.41 Ohms\nReducing: 87589.74 Ohms\nNH3: 15213.87 Ohms', 'highnoise': 0.1, 'host': 'rp4', 'hostname': 'rp4', 'humidity': 16.4, 'ipaddress': '192.168.1.209', 'lownoise': 0.5, 'lux': 55.9, 'macaddress': 'a2:3f:eb:35:a7:99', 'memory': 7.2, 'midnoise': 0.2, 'nh3': 15.2, 'oxidising': 10.2, 'pressure': 1015.5, 'proximity': 0, 'reducing': 87.6, 'rowid': '20220301174640_34f06310-caa3-4e96-9766-6e8da40ad516', 'runtime': 6, 'starttime': '03/01/2022 12:46:34', 'systemtime': '03/01/2022 12:46:42', 'temperature': '32.7', 'temperaturef': '70.9', 'ts': 1646156802, 'uuid': 'rpi4_uuid_shx_20220301174640'}

Topic

persistent://public/default/rp4enviroplus

Run

bin/pulsar-admin topics create persistent://public/default/rp4enviroplus

bin/pulsar-client consume "persistent://public/default/rp4enviroplus" -s "rp4enviroplusrdr" -n 0

----- got message -----
key:[rpi4_uuid_upn_20220301174920], properties:[], content:{
 "adjtemp": "26.8",
 "adjtempf": "60.2",
 "amplitude100": 1.0,
 "amplitude1000": 0.2,
 "amplitude500": 0.3,
 "amps": 0.3,
 "cpu": 0.0,
 "cputemp": "45.2",
 "cputempf": "113",
 "diskusage": "31435.2 MB",
 "endtime": "1646156961.7520766",
 "gasko": "Oxidising: 11618.00 Ohms\nReducing: 95351.35 Ohms\nNH3: 17596.18 Ohms",
 "highnoise": 0.0,
 "host": "rp4",
 "hostname": "rp4",
 "humidity": 16.3,
 "ipaddress": "192.168.1.209",
 "lownoise": 0.4,
 "lux": 55.9,
 "macaddress": "a2:3f:eb:35:a7:99",
 "memory": 7.2,
 "midnoise": 0.1,
 "nh3": 17.6,
 "oxidising": 11.6,
 "pressure": 1015.5,
 "proximity": 0,
 "reducing": 95.4,
 "rowid": "20220301174920_13b9c774-c221-4ebc-8e17-cb2054954f14",
 "runtime": 6,
 "starttime": "03/01/2022 12:49:15",
 "systemtime": "03/01/2022 12:49:22",
 "temperature": "32.7",
 "temperaturef": "70.9",
 "ts": 1646156962,
 "uuid": "rpi4_uuid_upn_20220301174920"
}

 

Flink SQL

FLiP

CREATE CATALOG pulsar WITH (
   'type' = 'pulsar',
   'service-url' = 'pulsar://pulsar1:6650',
   'admin-url' = 'http://pulsar1:8080',
   'format' = 'json'
);

USE CATALOG pulsar;

SHOW TABLES;

    ______ _ _       _       _____  ____  _         _____ _ _            _  BETA   
   |  ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | |  
   | |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_ 
   |  __| | | | '_ \| |/ /  \___ \| |  | | |      | |    | | |/ _ \ '_ \| __|
   | |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_ 
   |_|    |_|_|_| |_|_|\_\ |_____/ \___\_\______|  \_____|_|_|\___|_| |_|\__|
          
        Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.


Flink SQL> CREATE CATALOG pulsar WITH (
>    'type' = 'pulsar',
>    'service-url' = 'pulsar://pulsar1:6650',
>    'admin-url' = 'http://pulsar1:8080',
>    'format' = 'json'
> );
[INFO] Execute statement succeed.

Flink SQL> USE CATALOG pulsar;
[INFO] Execute statement succeed.

Flink SQL> show tables;
+---------------------------+
|                table name |
+---------------------------+
| ble-tempE0:17:54:C1:D8:4C |
|                  breakout |
|                      chat |
|                     chat2 |
|                 chatfiles |
|                  chatlog2 |
|                chatresult |
|               chatresult2 |
|                    crypto |
|            custom-routing |
|           delayed-message |
|                dotnettest |
|           dynamic-topic-0 |
|           dynamic-topic-1 |
|           dynamic-topic-2 |
|           dynamic-topic-3 |
|           dynamic-topic-4 |
|                    energy |
|             energy-influx |
|                 energylog |
|                 ex1-basic |
|                 flaky-DLQ |
|        funhouselightstate |
|             funhousestate |
|                hfptransit |
|               input-topic |
|                      iot3 |
|                iotelastic |
|             iotjetsonjson |
|            iotjetsonjson2 |
|              jetsoninflux |
|                   moptest |
|                    mqtt-2 |
|              nodejs-topic |
|             nvidia-sensor |
|                pi-thermal |
|           pi-thermal-avro |
|                pi-weather |
|           pi-weather-avro |
|                     rp400 |
|             rp4enviroplus |
|           scyllacdcsource |
|                   seeking |
|                   sensors |
|                    stocks |
|                   stocks2 |
|   telegraf%2Fhost01%2Fcpu |
|               telegrafcpu |
|               telegrafmem |
|                  transcom |
|                   weather |
+---------------------------+
51 rows in set

Flink SQL> describe rp4enviroplus;
+---------------+--------+------+-----+--------+-----------+
|          name |   type | null | key | extras | watermark |
+---------------+--------+------+-----+--------+-----------+
|       adjtemp | STRING | true |     |        |           |
|      adjtempf | STRING | true |     |        |           |
|  amplitude100 |  FLOAT | true |     |        |           |
| amplitude1000 |  FLOAT | true |     |        |           |
|  amplitude500 |  FLOAT | true |     |        |           |
|          amps |  FLOAT | true |     |        |           |
|           cpu |  FLOAT | true |     |        |           |
|       cputemp | STRING | true |     |        |           |
|      cputempf | STRING | true |     |        |           |
|     diskusage | STRING | true |     |        |           |
|       endtime | STRING | true |     |        |           |
|         gasko | STRING | true |     |        |           |
|     highnoise |  FLOAT | true |     |        |           |
|          host | STRING | true |     |        |           |
|      hostname | STRING | true |     |        |           |
|      humidity |  FLOAT | true |     |        |           |
|     ipaddress | STRING | true |     |        |           |
|      lownoise |  FLOAT | true |     |        |           |
|           lux |  FLOAT | true |     |        |           |
|    macaddress | STRING | true |     |        |           |
|        memory |  FLOAT | true |     |        |           |
|      midnoise |  FLOAT | true |     |        |           |
|           nh3 |  FLOAT | true |     |        |           |
|     oxidising |  FLOAT | true |     |        |           |
|      pressure |  FLOAT | true |     |        |           |
|     proximity |    INT | true |     |        |           |
|      reducing |  FLOAT | true |     |        |           |
|         rowid | STRING | true |     |        |           |
|       runtime |    INT | true |     |        |           |
|     starttime | STRING | true |     |        |           |
|    systemtime | STRING | true |     |        |           |
|   temperature | STRING | true |     |        |           |
|  temperaturef | STRING | true |     |        |           |
|            ts |    INT | true |     |        |           |
|          uuid | STRING | true |     |        |           |
+---------------+--------+------+-----+--------+-----------+
35 rows in set

select * from rp4enviroplus;

FLiPN FLiPN

Pulsar SQL

presto> select * from pulsar."public/default"."rp4enviroplus";

 adjtemp | adjtempf | amplitude100 | amplitude1000 | amplitude500 | amps | cpu | cputemp | cputempf | diskusage  |      endtime       |           gasko           | highnoise | host | hostname | humidity |   ipaddress   | lownoise | lux  |    macaddress     | memory | midnoise |  
---------+----------+--------------+---------------+--------------+------+-----+---------+----------+------------+--------------------+---------------------------+-----------+------+----------+----------+---------------+----------+------+-------------------+--------+----------+--
 26.7    | 60.1     |          1.0 |           0.2 |          0.3 |  0.3 | 0.0 | 45.7    | 114      | 31435.2 MB | 1646157120.7991426 | Oxidising: 19675.68 Ohms +|       0.2 | rp4  | rp4      |     16.3 | 192.168.1.209 |      0.4 | 55.0 | a2:3f:eb:35:a7:99 |    7.3 |      0.1 |  
         |          |              |               |              |      |     |         |          |            |                    | Reducing: 119000.00 Ohms +|           |      |          |          |               |          |      |                   |        |          |  
         |          |              |               |              |      |     |         |          |            |                    | NH3: 27355.89 Ohms        |           |      |          |          |               |          |      |                   |        |          |  
 26.9    | 60.4     |          1.0 |           0.2 |          0.3 |  0.3 | 0.0 | 46.2    | 115      | 31435.2 MB | 1646157122.9693346 | Oxidising: 20143.39 Ohms +|       0.2 | rp4  | rp4      |     16.3 | 192.168.1.209 |      0.6 | 55.0 | a2:3f:eb:35:a7:99 |    7.3 |      0.4 |  
         |          |              |               |              |      |     |         |          |            |                    | Reducing: 120000.00 Ohms +|           |      |          |          |               |          |      |                   |        |          |  
         |          |              |               |              |      |     |         |          |            |                    | NH3: 27923.71 Ohms        |           |      |          |          |               |          |      |                   |        |          |  
 26.5    | 59.7     |          1.0 |           0.2 |          0.3 |  0.3 | 0.0 | 46.2    | 115      | 31435.2 MB | 1646157125.1368313 | Oxidising: 20616.92 Ohms +|       0.2 | rp4  | rp4      |     16.3 | 192.168.1.209 |      0.4 | 55.0 | a2:3f:eb:35:a7:99 |    7.3 |      0.2 |  
         |          |              |               |              |      |     |         |          |            |                    | Reducing: 120504.30 Ohms +|           |      |          |          |               |          |      |                   |        |          |  
         |          |              |               |              |      |     |         |          |            |                    | NH3: 28383.56 Ohms        |           |      |          |          |               |          |      |                   |        |          |  
 26.6    | 59.9     |          1.0 |           0.2 |          0.3 |  0.3 | 0.0 | 45.7    | 114      | 31435.2 MB | 1646157127.3089767 | Oxidising: 21096.37 Ohms +|       0.2 | rp4  | rp4      |     16.3 | 192.168.1.209 |      0.5 | 55.0 | a2:3f:eb:35:a7:99 |    7.3 |      0.3 |  
         |          |              |               |              |      |     |         |          |            |                    | Reducing: 121521.61 Ohms +|           |      |          |          |               |          |      |                   |        |          |  
         |          |              |               |              |      |     |         |          |            |                    | NH3: 28965.52 Ohms        |           |      |          |          |               |          |      |                   |        |          |  
 27.3    | 61.1     |          1.0 |           0.2 |          0.3 |  0.3 | 0.0 | 45.7    | 114      | 31435.2 MB | 1646157129.478687  | Oxidising: 21581.86 Ohms +|       0.1 | rp4  | rp4      |     16.3 | 192.168.1.209 |      0.4 | 55.0 | a2:3f:eb:35:a7:99 |    7.3 |      0.1 |  
         |          |              |               |              |      |     |         |          |            |                    | Reducing: 122034.68 Ohms +|           |      |          |          |               |          |      |                   |        |          |  
         |          |              |               |              |      |     |         |          |            |                    | NH3: 29436.89 Ohms        |           |      |          |          |               |          |      |                   |        |          |  
 27.0    | 60.6     |          1.0 |           0.3 |          0.4 |  0.3 | 0.0 | 45.7    | 114      | 31435.2 MB | 1646157131.6498218 | Oxidising: 22073.51 Ohms +|       0.2 | rp4  | rp4      |     16.2 | 192.168.1.209 |      0.5 | 55.0 | a2:3f:eb:35:a7:99 |    7.3 |      0.3 |  
         |          |              |               |              |      |     |         |          |            |                    | Reducing: 122550.72 Ohms +|           |      |          |          |               |          |      |                   |        |          |  
         |          |              |               |              |      |     |         |          |            |                    | NH3: 29913.53 Ohms        |           |      |          |          |               |          |      |                   |        |          |  
 27.1    | 60.8     |          1.0 |           0.1 |          0.3 |  0.3 | 0.2 | 46.2    | 115      | 31435.2 MB | 1646157133.8205895 | Oxidising: 22471.34 Ohms +|       0.3 | rp4  | rp4      |     16.2 | 192.168.1.209 |      0.5 | 55.0 | a2:3f:eb:35:a7:99 |    7.3 |      0.3 |  
         |          |              |               |              |      |     |         |          |            |                    | Reducing: 123591.84 Ohms +|           |      |          |          |               |          |      |                   |        |          |  
         |          |              |               |              |      |     |         |          |            |                    | NH3: 30395.51 Ohms        |           |      |          |          |               |          |      |                   |        |          |  
 27.1    | 60.8     |          1.0 |           0.2 |          0.3 |  0.3 | 0.0 | 46.2    | 115      | 31435.2 MB | 1646157135.9879181 | Oxidising: 22873.24 Ohms +|       0.2 | rp4  | rp4      |     16.2 | 192.168.1.209 |      0.6 | 55.0 | a2:3f:eb:35:a7:99 |    7.3 |      0.4 |  
         |          |              |               |              |      |     |         |          |            |                    | Reducing: 124116.96 Ohms +|           |      |          |          |               |          |      |                   |        |          |  
         |          |              |               |              |      |     |         |          |            |                    | NH3: 30882.93 Ohms        |           |      |          |          |               |          |      |                   |        |          |  
 27.0    | 60.6     |          1.0 |           0.2 |          0.3 |  0.3 | 0.0 | 46.2    | 115      | 31435.2 MB | 1646157138.160729  | Oxidising: 23279.28 Ohms +|       0.2 | rp4  | rp4      |     16.1 | 192.168.1.209 |      0.4 | 55.0 | a2:3f:eb:35:a7:99 |    7.3 |      0.2 |  
         |          |              |               |              |      |     |         |          |            |                    | Reducing: 124645.16 Ohms +|           |      |          |          |               |          |      |                   |        |          |  
         |          |              |               |              |      |     |         |          |            |                    | NH3: 31252.12 Ohms        |           |      |          |          |               |          |      |                   |        |          |  
 26.9    | 60.4     |          1.1 |           0.4 |          0.4 |  0.3 | 0.0 | 46.2    | 115      | 31435.2 MB | 1646157140.3307676 | Oxidising: 23689.52 Ohms +|       0.0 | rp4  | rp4      |     16.1 | 192.168.1.209 |      0.4 | 55.0 | a2:3f:eb:35:a7:99 |    7.3 |      0.1 |  
         |          |              |               |              |      |     |         |          |            |                    | Reducing: 125176.47 Ohms +|           |      |          |          |               |          |      |                   |        |          |  
         |          |              |               |              |      |     |         |          |            |                    | NH3: 31749.29 Ohms        |           |      |          |          |               |          |      |                   |        |          |  
 27.1    | 60.8     |          1.0 |           0.2 |          0.3 |  0.3 | 0.0 | 45.7    | 114      | 31435.2 MB | 1646157142.5007892 | Oxidising: 24104.03 Ohms +|       0.4 | rp4  | rp4      |     16.0 | 192.168.1.209 |      0.4 | 55.0 | a2:3f:eb:35:a7:99 |    7.3 |      0.3 |  
         |          |              |               |              |      |     |         |          |            |                    | Reducing: 125710.91 Ohms +|           |      |          |          |               |          |      |                   |        |          |  
         |          |              |               |              |      |     |         |          |            |                    | NH3: 32125.89 Ohms        |           |      |          |          |               |          |      |                   |        |          |  
 26.9    | 60.4     |          1.0 |           0.2 |          0.3 |  0.3 | 0.0 | 47.2    | 117      | 31435.2 MB | 1646157144.669611  | Oxidising: 24522.88 Ohms +|       0.3 | rp4  | rp4      |     16.0 | 192.168.1.209 |      0.5 | 55.0 | a2:3f:eb:35:a7:99 |    7.3 |      0.3 |  
         |          |              |               |              |      |     |         |          |            |                    | Reducing: 126248.52 Ohms +|           |      |          |          |               |          |      |                   |        |          |  
         |          |              |               |              |      |     |         |          |            |                    | NH3: 32505.75 Ohms        |           |      |          |          |               |          |      |                   |        |          |  
 27.4    | 61.3     |          1.0 |           0.2 |          0.3 |  0.3 | 0.0 | 45.7    | 114      | 31435.2 MB | 1646157146.839814  | Oxidising: 24839.90 Ohms +|       0.1 | rp4  | rp4      |     16.0 | 192.168.1.209 |      0.4 | 55.0 | a2:3f:eb:35:a7:99 |    7.3 |      0.1 |  
         |          |              |               |              |      |     |         |          |            |                    | Reducing: 126248.52 Ohms +|           |      |          |          |               |          |      |                   |        |          |  
         |          |              |               |              |      |     |         |          |            |                    | NH3: 32888.89 Ohms        |           |      |          |          |               |          |      |                   |        |          |  
 27.0    | 60.6     |          1.0 |           0.3 |          0.4 |  0.3 | 0.0 | 46.2    | 115      | 31435.2 MB | 1646157149.008095  | Oxidising: 25159.42 Ohms +|       0.2 | rp4  | rp4      |     16.1 | 192.168.1.209 |      0.5 | 55.0 | a2:3f:eb:35:a7:99 |    7.3 |      0.3 |  
         |          |              |               |              |      |     |         |          |            |                    | Reducing: 126789.32 Ohms +|           |      |          |          |               |          |      |                   |        |          |  
         |          |              |               |              |      |     |         |          |            |                    | NH3: 33275.36 Ohms        |           |      |          |          |               |          |      |                   |        |          |  
 27.0    | 60.6     |          1.1 |           0.4 |          0.5 |  0.3 | 0.0 | 46.2    | 115      | 31435.2 MB | 1646157151.1756244 | Oxidising: 25481.48 Ohms +|       0.2 | rp4  | rp4      |     16.0 | 192.168.1.209 |      0.5 | 55.0 | a2:3f:eb:35:a7:99 |    7.3 |      0.3 |  
         |          |              |               |              |      |     |         |          |            |                    | Reducing: 127333.33 Ohms +|           |      |          |          |               |          |      |                   |        |          |  
         |          |              |               |              |      |     |         |          |            |                    | NH3: 33665.21 Ohms        |           |      |          |          |               |          |      |                   |        |          |  
 27.4    | 61.3     |          1.0 |           0.2 |          0.3 |  0.3 | 0.0 | 46.2    | 115      | 31435.2 MB | 1646157153.365354  | Oxidising: 25806.11 Ohms +|       0.1 | rp4  | rp4      |     16.0 | 192.168.1.209 |      0.4 | 55.0 | a2:3f:eb:35:a7:99 |    7.3 |      0.2 |  
         |          |              |               |              |      |     |         |          |            |                    | Reducing: 127333.33 Ohms +|           |      |          |          |               |          |      |                   |        |          |  
         |          |              |               |              |      |     |         |          |            |                    | NH3: 34058.48 Ohms        |           |      |          |          |               |          |      |                   |        |          |  
 9.0     | 28.2     |          1.0 |           0.2 |          0.3 |  0.3 | 0.0 | 45.7    | 114      | 31435.2 MB | 1646156957.410107  | Oxidising: 466033.90 Ohms+|       0.6 | rp4  | rp4      |     16.3 | 192.168.1.209 |      0.4 | 55.9 | a2:3f:eb:35:a7:99 |    7.2 |      0.3 | 2
         |          |              |               |              |      |     |         |          |            |                    | Reducing: 4682461.54 Ohms+|           |      |          |          |               |          |      |                   |        |          |  
 

image

image

image

image

image

Spark SQL

Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.2.0
      /_/
         
Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 1.8.0_312)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val dfPulsar = spark.readStream.format("pulsar").option("service.url", "pulsar://localhost:6650").option("admin.url", "http://localhost:8080").option("topic", "persistent://public/default/rp4enviroplus").load()
dfPulsar: org.apache.spark.sql.DataFrame = [adjtemp: string, adjtempf: string ... 39 more fields]

scala> dfPulsar.printSchema()
root
 |-- adjtemp: string (nullable = true)
 |-- adjtempf: string (nullable = true)
 |-- amplitude100: float (nullable = true)
 |-- amplitude1000: float (nullable = true)
 |-- amplitude500: float (nullable = true)
 |-- amps: float (nullable = true)
 |-- cpu: float (nullable = true)
 |-- cputemp: string (nullable = true)
 |-- cputempf: string (nullable = true)
 |-- diskusage: string (nullable = true)
 |-- endtime: string (nullable = true)
 |-- gasko: string (nullable = true)
 |-- highnoise: float (nullable = true)
 |-- host: string (nullable = true)
 |-- hostname: string (nullable = true)
 |-- humidity: float (nullable = true)
 |-- ipaddress: string (nullable = true)
 |-- lownoise: float (nullable = true)
 |-- lux: float (nullable = true)
 |-- macaddress: string (nullable = true)
 |-- memory: float (nullable = true)
 |-- midnoise: float (nullable = true)
 |-- nh3: float (nullable = true)
 |-- oxidising: float (nullable = true)
 |-- pressure: float (nullable = true)
 |-- proximity: integer (nullable = true)
 |-- reducing: float (nullable = true)
 |-- rowid: string (nullable = true)
 |-- runtime: integer (nullable = true)
 |-- starttime: string (nullable = true)
 |-- systemtime: string (nullable = true)
 |-- temperature: string (nullable = true)
 |-- temperaturef: string (nullable = true)
 |-- ts: integer (nullable = true)
 |-- uuid: string (nullable = true)
 |-- __key: binary (nullable = true)
 |-- __topic: string (nullable = true)
 |-- __messageId: binary (nullable = true)
 |-- __publishTime: timestamp (nullable = true)
 |-- __eventTime: timestamp (nullable = true)
 |-- __messageProperties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)


scala> 

scala> val pQuery = dfPulsar.selectExpr("*").writeStream.format("console").option("truncate", "false").start()
22/03/01 13:36:05 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-347b8845-d46f-4feb-95f4-b92f159ab412. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
22/03/01 13:36:05 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
pQuery: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@6169de0c

scala> 

scala> 22/03/01 13:36:07 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
-------------------------------------------                                     
Batch: 0
-------------------------------------------
+-------+--------+------------+-------------+------------+----+---+-------+--------+---------+-------+-----+---------+----+--------+--------+---------+--------+---+----------+------+--------+---+---------+--------+---------+--------+-----+-------+---------+----------+-----------+------------+---+----+-----+-------+-----------+-------------+-----------+-------------------+
|adjtemp|adjtempf|amplitude100|amplitude1000|amplitude500|amps|cpu|cputemp|cputempf|diskusage|endtime|gasko|highnoise|host|hostname|humidity|ipaddress|lownoise|lux|macaddress|memory|midnoise|nh3|oxidising|pressure|proximity|reducing|rowid|runtime|starttime|systemtime|temperature|temperaturef|ts |uuid|__key|__topic|__messageId|__publishTime|__eventTime|__messageProperties|
+-------+--------+------------+-------------+------------+----+---+-------+--------+---------+-------+-----+---------+----+--------+--------+---------+--------+---+----------+------+--------+---+---------+--------+---------+--------+-----+-------+---------+----------+-----------+------------+---+----+-----+-------+-----------+-------------+-----------+-------------------+
+-------+--------+------------+-------------+------------+----+---+-------+--------+---------+-------+-----+---------+----+--------+--------+---------+--------+---+----------+------+--------+---+---------+--------+---------+--------+-----+-------+---------+----------+-----------+------------+---+----+-----+-------+-----------+-------------+-----------+-------------------+

pQuery.explain()
== Physical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@40dfee3b, org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$2950/1357418058@78d47078
+- *(1) Scan ExistingRDD pulsar[adjtemp#205,adjtempf#206,amplitude100#207,amplitude1000#208,amplitude500#209,amps#210,cpu#211,cputemp#212,cputempf#213,diskusage#214,endtime#215,gasko#216,highnoise#217,host#218,hostname#219,humidity#220,ipaddress#221,lownoise#222,lux#223,macaddress#224,memory#225,midnoise#226,nh3#227,oxidising#228,... 17 more fields]

-------------------------------------------
Batch: 4
-------------------------------------------
+-------+--------+------------+-------------+------------+----+---+-------+--------+----------+-----------------+----------------------------------------------------------------------+---------+----+--------+--------+-------------+--------+----+-----------------+------+--------+----+---------+--------+---------+--------+---------------------------------------------------+-------+-------------------+-------------------+-----------+------------+----------+----------------------------+-------------------------------------------------------------------------------------+-----------------------------------------+----------------------+-----------------------+-----------+-------------------+
|adjtemp|adjtempf|amplitude100|amplitude1000|amplitude500|amps|cpu|cputemp|cputempf|diskusage |endtime          |gasko                                                                 |highnoise|host|hostname|humidity|ipaddress    |lownoise|lux |macaddress       |memory|midnoise|nh3 |oxidising|pressure|proximity|reducing|rowid                                              |runtime|starttime          |systemtime         |temperature|temperaturef|ts        |uuid                        |__key                                                                                |__topic                                  |__messageId           |__publishTime          |__eventTime|__messageProperties|
+-------+--------+------------+-------------+------------+----+---+-------+--------+----------+-----------------+----------------------------------------------------------------------+---------+----+--------+--------+-------------+--------+----+-----------------+------+--------+----+---------+--------+---------+--------+---------------------------------------------------+-------+-------------------+-------------------+-----------+------------+----------+----------------------------+-------------------------------------------------------------------------------------+-----------------------------------------+----------------------+-----------------------+-----------+-------------------+
|27.3   |61.1    |1.0         |0.3          |0.4         |0.3 |0.0|46.2   |115     |31434.2 MB|1646159783.708531|Oxidising: 64077.97 Ohms\nReducing: 121011.49 Ohms\nNH3: 53803.92 Ohms|0.1      |rp4 |rp4     |15.9    |192.168.1.209|0.4     |55.5|a2:3f:eb:35:a7:99|7.5   |0.2     |53.8|64.1     |1014.3  |0        |121.0   |20220301183622_d4c15574-0828-4505-bb4e-86b17b0919e0|8      |03/01/2022 13:36:14|03/01/2022 13:36:24|33.1       |71.6        |1646159784|rpi4_uuid_mfj_20220301183622|[72 70 69 34 5F 75 75 69 64 5F 6D 66 6A 5F 32 30 32 32 30 33 30 31 31 38 33 36 32 32]|persistent://public/default/rp4enviroplus|[08 A5 E5 08 10 DC 08]|2022-03-01 13:36:24.741|null       |{}                 |
+-------+--------+------------+-------------+------------+----+---+-------+--------+----------+-----------------+----------------------------------------------------------------------+---------+----+--------+--------+-------------+--------+----+-----------------+------+--------+----+---------+--------+---------+--------+---------------------------------------------------+-------+-------------------+-------------------+-----------+------------+----------+----------------------------+-------------------------------------------------------------------------------------+-----------------------------------------+----------------------+-----------------------+-----------+-------------------+

-------------------------------------------
Batch: 5
-------------------------------------------
+-------+--------+------------+-------------+------------+----+---+-------+--------+----------+------------------+----------------------------------------------------------------------+---------+----+--------+--------+-------------+--------+----+-----------------+------+--------+----+---------+--------+---------+--------+---------------------------------------------------+-------+-------------------+-------------------+-----------+------------+----------+----------------------------+-------------------------------------------------------------------------------------+-----------------------------------------+----------------------+-----------------------+-----------+-------------------+
|adjtemp|adjtempf|amplitude100|amplitude1000|amplitude500|amps|cpu|cputemp|cputempf|diskusage |endtime           |gasko                                                                 |highnoise|host|hostname|humidity|ipaddress    |lownoise|lux |macaddress       |memory|midnoise|nh3 |oxidising|pressure|proximity|reducing|rowid                                              |runtime|starttime          |systemtime         |temperature|temperaturef|ts        |uuid                        |__key                                                                                |__topic                                  |__messageId           |__publishTime          |__eventTime|__messageProperties|
+-------+--------+------------+-------------+------------+----+---+-------+--------+----------+------------------+----------------------------------------------------------------------+---------+----+--------+--------+-------------+--------+----+-----------------+------+--------+----+---------+--------+---------+--------+---------------------------------------------------+-------+-------------------+-------------------+-----------+------------+----------+----------------------------+-------------------------------------------------------------------------------------+-----------------------------------------+----------------------+-----------------------+-----------+-------------------+
|27.1   |60.8    |1.0         |0.2          |0.3         |0.3 |0.0|46.2   |115     |31434.2 MB|1646159785.8810012|Oxidising: 63148.94 Ohms\nReducing: 125710.91 Ohms\nNH3: 53413.85 Ohms|0.1      |rp4 |rp4     |15.9    |192.168.1.209|0.5     |55.5|a2:3f:eb:35:a7:99|7.4   |0.2     |53.4|63.1     |1014.3  |0        |125.7   |20220301183624_5343a88c-f354-45f1-80a0-d30feac6ef5e|10     |03/01/2022 13:36:14|03/01/2022 13:36:26|33.1       |71.6        |1646159786|rpi4_uuid_apf_20220301183624|[72 70 69 34 5F 75 75 69 64 5F 61 70 66 5F 32 30 32 32 30 33 30 31 31 38 33 36 32 34]|persistent://public/default/rp4enviroplus|[08 A5 E5 08 10 DD 08]|2022-03-01 13:36:26.913|null       |{}                 |
+-------+--------+------------+-------------+------------+----+---+-------+--------+----------+------------------+----------------------------------------------------------------------+---------+----+--------+--------+-------------+--------+----+-----------------+------+--------+----+---------+--------+---------+--------+---------------------------------------------------+-------+-------------------+-------------------+-----------+------------+----------+----------------------------+-------------------------------------------------------------------------------------+-----------------------------------------+----------------------+-----------------------+-----------+-------------------+

FLiPNS

References

FLiP-Py-Pi-GasThermal: Building an IoT Edge Application with Apache Pulsar and Python for TVOC and CO2 Ingest

 FLiP-Py-Pi-GasThermal


tags:  Apache Pulsar, Python, Raspberry Pi, Gas Sensor + Thermal Camera Sensors, Apache Flink, Trino/Presto SQL

ThermalCam

Sensors

  • Pimoroni BreakoutGarden: SGP30
    • Sensiron SGP30 TVOC and eCO2 sensor (datasheet)
    • TVOC sensing from 0-60,000 ppb (parts per billion)
    • CO2 sensing from 400 to 60,000 ppm (parts per million)
  • Pimoroni BreakoutGarden: MLX90640 Thermal Camera

HardWare

Architecture

designthis more sendmoredata

Build

bin/pulsar-admin topics create persistent://public/default/garden3

bin/pulsar-client consume "persistent://public/default/garden3" -s "garden3reader" -n 0

class Garden(Record):
    cpu = Float()
    diskusage = String()
    endtime = String()
    equivalentco2ppm = String()
    host = String()
    hostname = String()
    ipaddress = String()
    macaddress = String()
    memory = Float()
    rowid = String()
    runtime = Integer()
    starttime = String()
    systemtime = String()
    totalvocppb = String()
    ts = Integer()
    uuid = String()

----- got message -----
key:[garden3_uuid_yvs_20220306191528], properties:[], content:{
 "cpu": 0.0,
 "diskusage": "103496.6 MB",
 "endtime": "1646594128.2460103",
 "equivalentco2ppm": "  413",
 "host": "garden3",
 "hostname": "garden3",
 "ipaddress": "192.168.1.198",
 "macaddress": "dc:a6:32:32:98:20",
 "memory": 9.2,
 "rowid": "20220306191528_707b34d4-7299-4233-a495-d2d97393e834",
 "runtime": 0,
 "starttime": "03/06/2022 14:15:28",
 "systemtime": "03/06/2022 14:15:29",
 "totalvocppb": "    5",
 "ts": 1646594129,
 "uuid": "garden3_uuid_yvs_20220306191528"
}

presto> select * from pulsar."public/default"."garden3";
 cpu |  diskusage  |      endtime       | equivalentco2ppm |  host   | hostname |   ipaddress   |    macaddress     | memory |                        rowid                        | runtime |      
-----+-------------+--------------------+------------------+---------+----------+---------------+-------------------+--------+-----------------------------------------------------+---------+------
 6.5 | 103496.5 MB | 1646594650.7116666 |   418            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192410_197b6b0c-b86c-4191-9e9c-11777767825e |       0 | 03/06
 6.7 | 103496.5 MB | 1646594651.7441382 |   418            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192411_4dab3212-423b-46a9-ae39-b10eb363336d |       0 | 03/06
 1.3 | 103496.5 MB | 1646594652.7764313 |   421            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192412_d24a819b-4ca1-489a-9683-da48bc37185c |       0 | 03/06
 0.2 | 103496.5 MB | 1646594653.810233  |   421            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192413_f4e43177-2486-4a36-b27a-903028d6aacf |       0 | 03/06
 0.0 | 103496.5 MB | 1646594654.8467774 |   416            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192414_558d3db8-725a-46ec-ad1f-89f8067142f8 |       0 | 03/06
 0.0 | 103496.5 MB | 1646594655.880628  |   420            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192415_1eeea533-7f7e-4945-a089-d4b2f8681e14 |       0 | 03/06
 0.0 | 103496.5 MB | 1646594656.9145741 |   418            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192416_d7d8c6ea-2adf-4704-8f49-3108a7328f26 |       0 | 03/06
 0.0 | 103496.5 MB | 1646594657.9489982 |   425            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192417_c71cbb6e-c59f-4855-84ae-b796fd3d7a76 |       0 | 03/06
 0.0 | 103496.5 MB | 1646594658.9828157 |   420            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192418_c14204dd-4c1c-4e76-a272-8ddecd11a97c |       0 | 03/06
 0.0 | 103496.5 MB | 1646594660.0187812 |   420            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192419_c7583063-06e0-4601-806a-96619b6bb136 |       0 | 03/06
 0.0 | 103496.5 MB | 1646594661.0531507 |   428            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192421_57e4a8ed-af6f-4dd0-b143-f29f1a211fdb |       0 | 03/06
 0.0 | 103496.5 MB | 1646594662.087301  |   421            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192422_cf2ce36e-c996-41ac-baec-db292cffdd37 |       0 | 03/06
 6.2 | 103496.5 MB | 1646594663.1214898 |   415            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192423_30b13428-ac2e-4929-b9f5-c4c3fbc3312a |       0 | 03/06
 6.5 | 103496.5 MB | 1646594664.1541135 |   424            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192424_d1517315-85fa-4923-b634-9b673c5b20ba |       0 | 03/06
 3.6 | 103496.5 MB | 1646594665.1890867 |   417            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192425_3984b0bc-a92e-4458-834c-e65259bb7a4d |       0 | 03/06
 0.0 | 103496.5 MB | 1646594666.221572  |   422            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192426_e0debd83-47e8-43f7-8ec5-cf0dbc27b87e |       0 | 03/06
 0.0 | 103496.5 MB | 1646594667.2555919 |   424            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192427_6ec920e7-56d2-4730-bddc-e18592cf1210 |       0 | 03/06
 0.0 | 103496.5 MB | 1646594668.2893167 |   428            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192428_434b4458-6f89-4161-b967-f29796d8bf5d |       0 | 03/06
 0.0 | 103496.5 MB | 1646594669.3234618 |   426            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192429_5bb29c2e-d078-405d-b8f6-967ea4753b3f |       0 | 03/06
 0.0 | 103496.5 MB | 1646594670.359024  |   421            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192430_05b86f6b-4d28-497c-acd6-d14f7ce27157 |       0 | 03/06
 0.0 | 103496.5 MB | 1646594671.392967  |   432            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192431_b7a2955a-6f39-4439-8eb9-27d7a2633836 |       0 | 03/06
 0.0 | 103496.5 MB | 1646594672.4271743 |   426            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192432_29861384-fc30-4deb-bc1d-b7f2d27ef89d |       0 | 03/06
 0.0 | 103496.5 MB | 1646594673.4611707 |   424            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192433_25435335-39fe-4294-b913-95c63537a743 |       0 | 03/06
 0.0 | 103496.5 MB | 1646594674.4951062 |   424            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192434_a52a90e8-b11c-467b-b87f-936432bc8988 |       0 | 03/06
 3.3 | 103496.5 MB | 1646594675.531778  |   436            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192435_48bab6a9-9f2e-443b-9fed-f096f6c24ebd |       0 | 03/06
 6.5 | 103496.5 MB | 1646594676.5642908 |   433            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192436_761f79dc-06f6-4ed3-8062-5227b6842b77 |       0 | 03/06
 6.2 | 103496.5 MB | 1646594677.5965276 |   421            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192437_6d8d49c9-0519-4825-b6b7-ed435e9fe747 |       0 | 03/06
 0.0 | 103496.5 MB | 1646594678.6290672 |   423            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192438_a920872d-f152-4b18-94a7-b4dfe5641482 |       0 | 03/06
 0.0 | 103496.5 MB | 1646594679.6629703 |   418            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192439_032ce574-10f6-4b50-b5e8-102b466af65a |       0 | 03/06
 0.0 | 103496.5 MB | 1646594680.699419  |   420            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192440_cde6f78a-d028-4f87-a95b-156bac0ee0c2 |       0 | 03/06
 0.0 | 103496.5 MB | 1646594681.7338123 |   424            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192441_7400689a-6974-4ff2-a688-96c317d45d03 |       0 | 03/06
 0.0 | 103496.5 MB | 1646594682.767776  |   417            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192442_46696fa7-295c-4fe6-bf49-8d8405e21cf5 |       0 | 03/06
 0.0 | 103496.5 MB | 1646594683.8017883 |   420            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192443_4de9f79c-0169-49d6-b337-fb57dcb5cf7e |       0 | 03/06
 0.0 | 103496.5 MB | 1646594684.8360054 |   422            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192444_63081ac4-2c87-462d-bc16-ab506e9c6db2 |       0 | 03/06
 0.0 | 103496.5 MB | 1646594685.8720167 |   420            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192445_fee13e36-26c6-4f06-894c-8b96cb0f3bae |       0 | 03/06
 0.0 | 103496.5 MB | 1646594686.90594   |   431            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192446_8aaa423d-bfe3-43e8-8653-d554c2afb8d6 |       0 | 03/06
 0.8 | 103496.4 MB | 1646594687.939747  |   416            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192447_bfe339cd-3fb4-4d2a-b612-3a2363e1b83b |       0 | 03/06
 6.5 | 103496.4 MB | 1646594688.9728699 |   424            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192448_8809fc3e-75f8-4aca-a141-6e5d4872b0de |       0 | 03/06
 6.5 | 103496.4 MB | 1646594690.0051649 |   412            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192449_0d0b4b62-0321-4bcd-952b-14607333d9f5 |       0 | 03/06

presto> desc pulsar."public/default"."garden3";
      Column       |   Type    | Extra |                                   Comment                                   
-------------------+-----------+-------+-----------------------------------------------------------------------------
 cpu               | real      |       | ["null","float"]                                                            
 diskusage         | varchar   |       | ["null","string"]                                                           
 endtime           | varchar   |       | ["null","string"]                                                           
 equivalentco2ppm  | varchar   |       | ["null","string"]                                                           
 host              | varchar   |       | ["null","string"]                                                           
 hostname          | varchar   |       | ["null","string"]                                                           
 ipaddress         | varchar   |       | ["null","string"]                                                           
 macaddress        | varchar   |       | ["null","string"]                                                           
 memory            | real      |       | ["null","float"]                                                            
 rowid             | varchar   |       | ["null","string"]                                                           
 runtime           | integer   |       | ["null","int"]                                                              
 starttime         | varchar   |       | ["null","string"]                                                           
 systemtime        | varchar   |       | ["null","string"]                                                           
 totalvocppb       | varchar   |       | ["null","string"]                                                           
 ts                | integer   |       | ["null","int"]                                                              
 uuid              | varchar   |       | ["null","string"]                                                           
 __partition__     | integer   |       | The partition number which the message belongs to                           
 __event_time__    | timestamp |       | Application defined timestamp in milliseconds of when the event occurred    
 __publish_time__  | timestamp |       | The timestamp in milliseconds of when event as published                    
 __message_id__    | varchar   |       | The message ID of the message used to generate this row                     
 __sequence_id__   | bigint    |       | The sequence ID of the message used to generate this row                    
 __producer_name__ | varchar   |       | The name of the producer that publish the message used to generate this row 
 __key__           | varchar   |       | The partition key for the topic                                             
 __properties__    | varchar   |       | User defined properties    
 

Presto/Trino gives us access to all that tasty meta data for each message/event/row/record/thing/data stuff. All with special names to help prevent collisions.

Meta Data

  • partition - if we have a partitioned topic, which partition does this message belong to.
  • event_time - you know what time it is! Timestamp in ms for event action.
  • publish_time - when was this message published to the topic?
  • message_id - unique id for this message
  • sequence_id - ordering information for this message
  • producer_name - who sent me this?
  • key - did you assign a key like I asked you to?
  • properties - all those extra fields you added around the payload

Spark Structured Streaming

val dfPulsar = spark.readStream.format("pulsar").option("service.url", "pulsar://localhost:6650").option("admin.url", "http://localhost:8080").option("topic", "persistent://public/default/garden3").load()
dfPulsar.printSchema()
val pQuery = dfPulsar.selectExpr("*").writeStream.format("parquet").option("truncate", false) .option("checkpointLocation", "/tmp/checkpoint").option("path", "/opt/demo/gasthermal").start()
    
pQuery.explain()
pQuery.awaitTermination()
pQuery.stop()

// can be "orc", "json", "csv", etc.

Spark

Show Me The Data

We can visualize data from Apache Pulsar by consuming it through the web sockets interface in a simple JQuery Single Page Web Application like below.

JavaScript

Example Parquet Files

pip3 install parquet-tools -U

parquet-tools inspect part-00000-b7e1f8dc-956d-4130-bc59-7b1435e41391-c000.snappy.parquet

############ file meta data ############
created_by: parquet-mr version 1.12.1 (build 2a5c06c58fa987f85aa22170be14d927d5ff6e7d)
num_columns: 23
num_rows: 1
num_row_groups: 1
format_version: 1.0
serialized_size: 5071


############ Columns ############
cpu
diskusage
endtime
equivalentco2ppm
host
hostname
ipaddress
macaddress
memory
rowid
runtime
starttime
systemtime
totalvocppb
ts
uuid
__key
__topic
__messageId
__publishTime
__eventTime
key
value

############ Column(cpu) ############
name: cpu
path: cpu
max_definition_level: 1
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE

############ Column(diskusage) ############
name: diskusage
path: diskusage
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(endtime) ############
name: endtime
path: endtime
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(equivalentco2ppm) ############
name: equivalentco2ppm
path: equivalentco2ppm
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(host) ############
name: host
path: host
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(hostname) ############
name: hostname
path: hostname
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(ipaddress) ############
name: ipaddress
path: ipaddress
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(macaddress) ############
name: macaddress
path: macaddress
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(memory) ############
name: memory
path: memory
max_definition_level: 1
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE

############ Column(rowid) ############
name: rowid
path: rowid
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(runtime) ############
name: runtime
path: runtime
max_definition_level: 1
max_repetition_level: 0
physical_type: INT32
logical_type: None
converted_type (legacy): NONE

############ Column(starttime) ############
name: starttime
path: starttime
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(systemtime) ############
name: systemtime
path: systemtime
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(totalvocppb) ############
name: totalvocppb
path: totalvocppb
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(ts) ############
name: ts
path: ts
max_definition_level: 1
max_repetition_level: 0
physical_type: INT32
logical_type: None
converted_type (legacy): NONE

############ Column(uuid) ############
name: uuid
path: uuid
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(__key) ############
name: __key
path: __key
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: None
converted_type (legacy): NONE

############ Column(__topic) ############
name: __topic
path: __topic
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(__messageId) ############
name: __messageId
path: __messageId
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: None
converted_type (legacy): NONE

############ Column(__publishTime) ############
name: __publishTime
path: __publishTime
max_definition_level: 1
max_repetition_level: 0
physical_type: INT96
logical_type: None
converted_type (legacy): NONE

############ Column(__eventTime) ############
name: __eventTime
path: __eventTime
max_definition_level: 1
max_repetition_level: 0
physical_type: INT96
logical_type: None
converted_type (legacy): NONE

############ Column(key) ############
name: key
path: __messageProperties.key_value.key
max_definition_level: 2
max_repetition_level: 1
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(value) ############
name: value
path: __messageProperties.key_value.value
max_definition_level: 3
max_repetition_level: 1
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

Flink

CREATE CATALOG pulsar WITH (
   'type' = 'pulsar',
   'service-url' = 'pulsar://pulsar1:6650',
   'admin-url' = 'http://pulsar1:8080',
   'format' = 'json'
);

USE CATALOG pulsar;

SHOW TABLES;

Flink SQL> describe garden3;
+------------------+--------+------+-----+--------+-----------+
|             name |   type | null | key | extras | watermark |
+------------------+--------+------+-----+--------+-----------+
|              cpu |  FLOAT | true |     |        |           |
|        diskusage | STRING | true |     |        |           |
|          endtime | STRING | true |     |        |           |
| equivalentco2ppm | STRING | true |     |        |           |
|             host | STRING | true |     |        |           |
|         hostname | STRING | true |     |        |           |
|        ipaddress | STRING | true |     |        |           |
|       macaddress | STRING | true |     |        |           |
|           memory |  FLOAT | true |     |        |           |
|            rowid | STRING | true |     |        |           |
|          runtime |    INT | true |     |        |           |
|        starttime | STRING | true |     |        |           |
|       systemtime | STRING | true |     |        |           |
|      totalvocppb | STRING | true |     |        |           |
|               ts |    INT | true |     |        |           |
|             uuid | STRING | true |     |        |           |
+------------------+--------+------+-----+--------+-----------+
16 rows in set

select equivalentco2ppm, totalvocppb, cpu, starttime, systemtime, ts, cpu, diskusage, endtime, memory, uuid from garden3;

select max(equivalentco2ppm) as MaxCO2, max(totalvocppb) as MaxVocPPB from garden3;


// TODO

Add your own table

  publishTime TIMESTAMP(3) METADATA,
  WATERMARK FOR publishTime AS publishTime - INTERVAL '5' SECOND
  

Flink Flink2 Flink3

Apache NiFi (FLiPN)

  • Choose a processor from the palette

ShowNiFi

  • Consume messages from Apache Pulsar

ConsumePulsar

  • Full Apache NiFi Data Flow with Apache Pulsar and MongoDB

Flow

MongoDB

mongo -u username1 -p password1 --authenticationDatabase admin pulsar1:27017/inventory

show databases

db.createCollection("garden3")

show collections

db.garden3.find().pretty()

{
        "_id" : ObjectId("622f7315f99b9a338d60592f"),
        "cpu" : 0,
        "diskusage" : "101615.9 MB",
        "endtime" : "1647276083.2033532",
        "equivalentco2ppm" : "  407",
        "host" : "garden3",
        "hostname" : "garden3",
        "ipaddress" : "192.168.1.199",
        "macaddress" : "dc:a6:32:32:98:20",
        "memory" : 8.8,
        "rowid" : "20220314164123_0e19c5e6-45f5-405e-bd93-9aed05b37630",
        "runtime" : 0,
        "starttime" : "03/14/2022 12:41:23",
        "systemtime" : "03/14/2022 12:41:24",
        "totalvocppb" : "  5",
        "ts" : 1647276084,
        "uuid" : "garden3_uuid_xrl_20220314164123"
}

mongo mongoquery

References

Additional Heat Images

hot hot2 more3 doesanyonereadthis4