Showing posts with label apache pulsar. Show all posts
Showing posts with label apache pulsar. Show all posts

November 15-19, 2022 FLiP Stack Weekly

 # November 15-19, 2022 FLiP Stack Weekly



#### More talks coming this month!   Join Tim around the virtual world at various conferences.   



![MerryChristmas](https://github.com/tspannhw/FLiPStackWeekly/raw/main/images/merrychristmasflipstack.jpg)



### Podcast


Take a look at recent podcasts in audio or video format.


[https://www.buzzsprout.com/2062659/11463086-messaging-streaming-and-events-101-episode-1-of-crossing-the-streams](https://www.buzzsprout.com/2062659/11463086-messaging-streaming-and-events-101-episode-1-of-crossing-the-streams)


[https://www.youtube.com/watch?v=U8aPBhlvDHU&feature=emb_imp_woyt](https://www.youtube.com/watch?v=U8aPBhlvDHU&feature=emb_imp_woyt)


### CODE + COMMUNITY



Join my meetup group NJ/NYC/Philly/Virtual.   We will have a hybrid event on December 8th.


[https://www.meetup.com/new-york-city-apache-pulsar-meetup/](https://www.meetup.com/new-york-city-apache-pulsar-meetup/

)


**This is Issue #58!!**


[https://github.com/tspannhw/FLiPStackWeekly](https://github.com/tspannhw/FLiPStackWeekly)


[https://www.linkedin.com/pulse/2022-schedule-tim-spann](https://www.linkedin.com/pulse/2022-schedule-tim-spann)



#### Upcoming Events


I’ll be speaking at #PulsarSummit Asia 2022! Join the #ApachePulsar community and RSVP for the webinar: https://streamnative.zoom.us/webinar/register/8516668631400/WN_qKibcbEFTxKv6-MszyFeAg.


![Pulsar 101](https://github.com/tspannhw/FLiPStackWeekly/raw/main/images/Timothy%20Spann%20101.jpg)


*.  


![ModernApps](https://github.com/tspannhw/FLiPStackWeekly/raw/main/images/Timothy%20Spann.jpg)




#### Videos


[https://www.youtube.com/watch?v=ehlK_d-ItG0](https://www.youtube.com/watch?v=ehlK_d-ItG0)


<iframe width="560" height="315" src="https://www.youtube.com/embed/ehlK_d-ItG0" title="YouTube video player" frameborder="0" allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture" allowfullscreen></iframe>



[https://www.youtube.com/watch?v=XH0nsTttSY0](https://www.youtube.com/watch?v=XH0nsTttSY0)



[https://www.youtube.com/watch?v=K-I2DJYIkTg&t=6s](https://www.youtube.com/watch?v=K-I2DJYIkTg&t=6s)


<iframe width="560" height="315" src="https://www.youtube.com/embed/K-I2DJYIkTg" title="YouTube video player" frameborder="0" allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture" allowfullscreen></iframe>



#### Spring for Pulsar on Secure Cloud


[https://github.com/spring-projects-experimental/spring-pulsar/wiki/Stream-Native-Cloud](https://github.com/spring-projects-experimental/spring-pulsar/wiki/Stream-Native-Cloud)


[https://spring.io/blog/2022/11/16/spring-framework-6-0-goes-ga](https://spring.io/blog/2022/11/16/spring-framework-6-0-goes-ga)



### CODE


[https://pulsar.apache.org/release-notes/versioned/client-cpp-3.0.0/](https://pulsar.apache.org/release-notes/versioned/client-cpp-3.0.0/)


[https://github.com/tspannhw/spring-pulsar-gtfsrealtime](https://github.com/tspannhw/spring-pulsar-gtfsrealtime)


[https://github.com/tspannhw/FLiPN-FLaNK-KafkaConnectToMoP](https://github.com/tspannhw/FLiPN-FLaNK-KafkaConnectToMoP)


[https://github.com/tspannhw/pulsar-adsb-cockroachdb](https://github.com/tspannhw/pulsar-adsb-cockroachdb)


[https://github.com/tspannhw/pulsar-adsb-function](https://github.com/tspannhw/pulsar-adsb-function)



### ARTICLES


[https://www.mparticle.com/blog/apache-pulsar-migration/](https://www.mparticle.com/blog/apache-pulsar-migration/)


[https://streamnative.io/blog/case/2022-11-15-how-proxima-beta-implemented-cqrs-and-event-sourcing-on-top-of-apache-pulsar-and-scylladb/](https://streamnative.io/blog/case/2022-11-15-how-proxima-beta-implemented-cqrs-and-event-sourcing-on-top-of-apache-pulsar-and-scylladb/)


[https://medium.com/@tspann/gtfs-real-time-feed-ingest-with-java-67e0d324cfc4](https://medium.com/@tspann/gtfs-real-time-feed-ingest-with-java-67e0d324cfc4)


[https://www.immerok.io/blog/apache-flink-newsletter-nov-2022](https://www.immerok.io/blog/apache-flink-newsletter-nov-2022)


[https://nvidianews.nvidia.com/news/nvidia-microsoft-accelerate-cloud-enterprise-ai](https://nvidianews.nvidia.com/news/nvidia-microsoft-accelerate-cloud-enterprise-ai)


[https://verraes.net/2019/05/patterns-for-decoupling-distsys-passage-of-time-event/](https://verraes.net/2019/05/patterns-for-decoupling-distsys-passage-of-time-event/)


[https://streamnative.io/blog/community/2022-11-04-announcing-pulsar-summit-asia-2022-conference-schedule/](https://streamnative.io/blog/community/2022-11-04-announcing-pulsar-summit-asia-2022-conference-schedule/)


[https://dl.acm.org/doi/fullHtml/10.1145/3531146.3533231](https://dl.acm.org/doi/fullHtml/10.1145/3531146.3533231)



### TRAINING


Dates are Dec 5 - Dec 8, 2022


Link to register: [https://www.eventbrite.com/e/463731161387](https://www.eventbrite.com/e/463731161387)



Dates are January 17 - 19, 2023 from 2pm - 5pm CET / 8am - 12pm EST

Link to register: [https://www.eventbrite.com/e/465055021087](https://www.eventbrite.com/e/465055021087)


[https://streamnative.io/training/](https://streamnative.io/training/)




### EVENTS



Nov 20, 2022 9:00 pm. Pulsar Summit Asia. Virtual.


[https://pulsar-summit.org/event/asia-2022/schedule](https://pulsar-summit.org/event/asia-2022/schedule)



Nov 23, 2022: Big Data EU. Virtual.


[https://bigdataconference.eu/timothy-spann/](https://bigdataconference.eu/timothy-spann/)



Nov 29, 2022: Machine Intelligence Guild Speaker of the Month. Virtual.


Coming soon.


Dec 14, 2022: Manhattan, NYC:  Pulsar + Pinot Meetup



Dec 15, 2022: TigerLabs, Princeton, NJ: Pulsar + NiFi + Flink Meetup


[https://www.meetup.com/technologysolutionshub/events/289756167/](https://www.meetup.com/technologysolutionshub/events/289756167/)


**HTAP Summit**


Coming soon.


**Pinot / Pulsar Meetup**


Coming soon.


**CockroachDB NYC Meetup**



**Hazelcast Event**


[https://docs.hazelcast.com/hazelcast/5.1/integrate/pulsar-connector](https://docs.hazelcast.com/hazelcast/5.1/integrate/pulsar-connector)



### TOOLS


* [https://github-business-card.vercel.app/](https://github-business-card.vercel.app/)


* [https://github.com/Eugeny/tabby/releases/tag/v1.0.186](https://github.com/Eugeny/tabby/releases/tag/v1.0.186)

 

* [https://tabby.sh/](https://tabby.sh/)

 

* [https://github.com/aleixrodriala/wa-tunnel](https://github.com/aleixrodriala/wa-tunnel)

 

* [https://github.com/awslabs/deequ](https://github.com/awslabs/deequ)

 

* [https://github.com/MonitorControl/MonitorControl](https://github.com/MonitorControl/MonitorControl)


* [https://github.com/sibprogrammer/xq](https://github.com/sibprogrammer/xq)


* [https://github.com/danielgatis/rembg](https://github.com/danielgatis/rembg)


* [https://pomochat.com/](https://pomochat.com/)


* [https://github.com/enso-org/enso](https://github.com/enso-org/enso)


* [https://github.com/streamnative/pulsar-io-cloud-storage](https://github.com/streamnative/pulsar-io-cloud-storage)


* [https://github.com/FusionAuth/java-http](https://github.com/FusionAuth/java-http)


* [https://github.com/containers/podman-desktop](https://github.com/containers/podman-desktop)


* [https://github.com/cachix/devenv](https://github.com/cachix/devenv)


* [https://devenv.sh/](https://devenv.sh/)


* [https://teropa.info/musicmouse/](https://teropa.info/musicmouse/)



#### Blast from the Past


[https://www.youtube.com/watch?v=tnWq8opMI6s](https://www.youtube.com/watch?v=tnWq8opMI6s)




#### QUICK SCRIPTS



curl http://localhost:8080/admin/v2/clusters/standalone




FLiP-Py-Pi-GasThermal: Building an IoT Edge Application with Apache Pulsar and Python for TVOC and CO2 Ingest

 FLiP-Py-Pi-GasThermal


tags:  Apache Pulsar, Python, Raspberry Pi, Gas Sensor + Thermal Camera Sensors, Apache Flink, Trino/Presto SQL

ThermalCam

Sensors

  • Pimoroni BreakoutGarden: SGP30
    • Sensiron SGP30 TVOC and eCO2 sensor (datasheet)
    • TVOC sensing from 0-60,000 ppb (parts per billion)
    • CO2 sensing from 400 to 60,000 ppm (parts per million)
  • Pimoroni BreakoutGarden: MLX90640 Thermal Camera

HardWare

Architecture

designthis more sendmoredata

Build

bin/pulsar-admin topics create persistent://public/default/garden3

bin/pulsar-client consume "persistent://public/default/garden3" -s "garden3reader" -n 0

class Garden(Record):
    cpu = Float()
    diskusage = String()
    endtime = String()
    equivalentco2ppm = String()
    host = String()
    hostname = String()
    ipaddress = String()
    macaddress = String()
    memory = Float()
    rowid = String()
    runtime = Integer()
    starttime = String()
    systemtime = String()
    totalvocppb = String()
    ts = Integer()
    uuid = String()

----- got message -----
key:[garden3_uuid_yvs_20220306191528], properties:[], content:{
 "cpu": 0.0,
 "diskusage": "103496.6 MB",
 "endtime": "1646594128.2460103",
 "equivalentco2ppm": "  413",
 "host": "garden3",
 "hostname": "garden3",
 "ipaddress": "192.168.1.198",
 "macaddress": "dc:a6:32:32:98:20",
 "memory": 9.2,
 "rowid": "20220306191528_707b34d4-7299-4233-a495-d2d97393e834",
 "runtime": 0,
 "starttime": "03/06/2022 14:15:28",
 "systemtime": "03/06/2022 14:15:29",
 "totalvocppb": "    5",
 "ts": 1646594129,
 "uuid": "garden3_uuid_yvs_20220306191528"
}

presto> select * from pulsar."public/default"."garden3";
 cpu |  diskusage  |      endtime       | equivalentco2ppm |  host   | hostname |   ipaddress   |    macaddress     | memory |                        rowid                        | runtime |      
-----+-------------+--------------------+------------------+---------+----------+---------------+-------------------+--------+-----------------------------------------------------+---------+------
 6.5 | 103496.5 MB | 1646594650.7116666 |   418            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192410_197b6b0c-b86c-4191-9e9c-11777767825e |       0 | 03/06
 6.7 | 103496.5 MB | 1646594651.7441382 |   418            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192411_4dab3212-423b-46a9-ae39-b10eb363336d |       0 | 03/06
 1.3 | 103496.5 MB | 1646594652.7764313 |   421            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192412_d24a819b-4ca1-489a-9683-da48bc37185c |       0 | 03/06
 0.2 | 103496.5 MB | 1646594653.810233  |   421            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192413_f4e43177-2486-4a36-b27a-903028d6aacf |       0 | 03/06
 0.0 | 103496.5 MB | 1646594654.8467774 |   416            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192414_558d3db8-725a-46ec-ad1f-89f8067142f8 |       0 | 03/06
 0.0 | 103496.5 MB | 1646594655.880628  |   420            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192415_1eeea533-7f7e-4945-a089-d4b2f8681e14 |       0 | 03/06
 0.0 | 103496.5 MB | 1646594656.9145741 |   418            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192416_d7d8c6ea-2adf-4704-8f49-3108a7328f26 |       0 | 03/06
 0.0 | 103496.5 MB | 1646594657.9489982 |   425            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192417_c71cbb6e-c59f-4855-84ae-b796fd3d7a76 |       0 | 03/06
 0.0 | 103496.5 MB | 1646594658.9828157 |   420            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192418_c14204dd-4c1c-4e76-a272-8ddecd11a97c |       0 | 03/06
 0.0 | 103496.5 MB | 1646594660.0187812 |   420            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192419_c7583063-06e0-4601-806a-96619b6bb136 |       0 | 03/06
 0.0 | 103496.5 MB | 1646594661.0531507 |   428            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192421_57e4a8ed-af6f-4dd0-b143-f29f1a211fdb |       0 | 03/06
 0.0 | 103496.5 MB | 1646594662.087301  |   421            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192422_cf2ce36e-c996-41ac-baec-db292cffdd37 |       0 | 03/06
 6.2 | 103496.5 MB | 1646594663.1214898 |   415            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192423_30b13428-ac2e-4929-b9f5-c4c3fbc3312a |       0 | 03/06
 6.5 | 103496.5 MB | 1646594664.1541135 |   424            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192424_d1517315-85fa-4923-b634-9b673c5b20ba |       0 | 03/06
 3.6 | 103496.5 MB | 1646594665.1890867 |   417            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192425_3984b0bc-a92e-4458-834c-e65259bb7a4d |       0 | 03/06
 0.0 | 103496.5 MB | 1646594666.221572  |   422            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192426_e0debd83-47e8-43f7-8ec5-cf0dbc27b87e |       0 | 03/06
 0.0 | 103496.5 MB | 1646594667.2555919 |   424            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192427_6ec920e7-56d2-4730-bddc-e18592cf1210 |       0 | 03/06
 0.0 | 103496.5 MB | 1646594668.2893167 |   428            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192428_434b4458-6f89-4161-b967-f29796d8bf5d |       0 | 03/06
 0.0 | 103496.5 MB | 1646594669.3234618 |   426            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192429_5bb29c2e-d078-405d-b8f6-967ea4753b3f |       0 | 03/06
 0.0 | 103496.5 MB | 1646594670.359024  |   421            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192430_05b86f6b-4d28-497c-acd6-d14f7ce27157 |       0 | 03/06
 0.0 | 103496.5 MB | 1646594671.392967  |   432            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192431_b7a2955a-6f39-4439-8eb9-27d7a2633836 |       0 | 03/06
 0.0 | 103496.5 MB | 1646594672.4271743 |   426            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192432_29861384-fc30-4deb-bc1d-b7f2d27ef89d |       0 | 03/06
 0.0 | 103496.5 MB | 1646594673.4611707 |   424            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192433_25435335-39fe-4294-b913-95c63537a743 |       0 | 03/06
 0.0 | 103496.5 MB | 1646594674.4951062 |   424            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192434_a52a90e8-b11c-467b-b87f-936432bc8988 |       0 | 03/06
 3.3 | 103496.5 MB | 1646594675.531778  |   436            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192435_48bab6a9-9f2e-443b-9fed-f096f6c24ebd |       0 | 03/06
 6.5 | 103496.5 MB | 1646594676.5642908 |   433            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192436_761f79dc-06f6-4ed3-8062-5227b6842b77 |       0 | 03/06
 6.2 | 103496.5 MB | 1646594677.5965276 |   421            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192437_6d8d49c9-0519-4825-b6b7-ed435e9fe747 |       0 | 03/06
 0.0 | 103496.5 MB | 1646594678.6290672 |   423            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192438_a920872d-f152-4b18-94a7-b4dfe5641482 |       0 | 03/06
 0.0 | 103496.5 MB | 1646594679.6629703 |   418            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192439_032ce574-10f6-4b50-b5e8-102b466af65a |       0 | 03/06
 0.0 | 103496.5 MB | 1646594680.699419  |   420            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192440_cde6f78a-d028-4f87-a95b-156bac0ee0c2 |       0 | 03/06
 0.0 | 103496.5 MB | 1646594681.7338123 |   424            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192441_7400689a-6974-4ff2-a688-96c317d45d03 |       0 | 03/06
 0.0 | 103496.5 MB | 1646594682.767776  |   417            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192442_46696fa7-295c-4fe6-bf49-8d8405e21cf5 |       0 | 03/06
 0.0 | 103496.5 MB | 1646594683.8017883 |   420            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192443_4de9f79c-0169-49d6-b337-fb57dcb5cf7e |       0 | 03/06
 0.0 | 103496.5 MB | 1646594684.8360054 |   422            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192444_63081ac4-2c87-462d-bc16-ab506e9c6db2 |       0 | 03/06
 0.0 | 103496.5 MB | 1646594685.8720167 |   420            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192445_fee13e36-26c6-4f06-894c-8b96cb0f3bae |       0 | 03/06
 0.0 | 103496.5 MB | 1646594686.90594   |   431            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192446_8aaa423d-bfe3-43e8-8653-d554c2afb8d6 |       0 | 03/06
 0.8 | 103496.4 MB | 1646594687.939747  |   416            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192447_bfe339cd-3fb4-4d2a-b612-3a2363e1b83b |       0 | 03/06
 6.5 | 103496.4 MB | 1646594688.9728699 |   424            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192448_8809fc3e-75f8-4aca-a141-6e5d4872b0de |       0 | 03/06
 6.5 | 103496.4 MB | 1646594690.0051649 |   412            | garden3 | garden3  | 192.168.1.198 | dc:a6:32:32:98:20 |    9.2 | 20220306192449_0d0b4b62-0321-4bcd-952b-14607333d9f5 |       0 | 03/06

presto> desc pulsar."public/default"."garden3";
      Column       |   Type    | Extra |                                   Comment                                   
-------------------+-----------+-------+-----------------------------------------------------------------------------
 cpu               | real      |       | ["null","float"]                                                            
 diskusage         | varchar   |       | ["null","string"]                                                           
 endtime           | varchar   |       | ["null","string"]                                                           
 equivalentco2ppm  | varchar   |       | ["null","string"]                                                           
 host              | varchar   |       | ["null","string"]                                                           
 hostname          | varchar   |       | ["null","string"]                                                           
 ipaddress         | varchar   |       | ["null","string"]                                                           
 macaddress        | varchar   |       | ["null","string"]                                                           
 memory            | real      |       | ["null","float"]                                                            
 rowid             | varchar   |       | ["null","string"]                                                           
 runtime           | integer   |       | ["null","int"]                                                              
 starttime         | varchar   |       | ["null","string"]                                                           
 systemtime        | varchar   |       | ["null","string"]                                                           
 totalvocppb       | varchar   |       | ["null","string"]                                                           
 ts                | integer   |       | ["null","int"]                                                              
 uuid              | varchar   |       | ["null","string"]                                                           
 __partition__     | integer   |       | The partition number which the message belongs to                           
 __event_time__    | timestamp |       | Application defined timestamp in milliseconds of when the event occurred    
 __publish_time__  | timestamp |       | The timestamp in milliseconds of when event as published                    
 __message_id__    | varchar   |       | The message ID of the message used to generate this row                     
 __sequence_id__   | bigint    |       | The sequence ID of the message used to generate this row                    
 __producer_name__ | varchar   |       | The name of the producer that publish the message used to generate this row 
 __key__           | varchar   |       | The partition key for the topic                                             
 __properties__    | varchar   |       | User defined properties    
 

Presto/Trino gives us access to all that tasty meta data for each message/event/row/record/thing/data stuff. All with special names to help prevent collisions.

Meta Data

  • partition - if we have a partitioned topic, which partition does this message belong to.
  • event_time - you know what time it is! Timestamp in ms for event action.
  • publish_time - when was this message published to the topic?
  • message_id - unique id for this message
  • sequence_id - ordering information for this message
  • producer_name - who sent me this?
  • key - did you assign a key like I asked you to?
  • properties - all those extra fields you added around the payload

Spark Structured Streaming

val dfPulsar = spark.readStream.format("pulsar").option("service.url", "pulsar://localhost:6650").option("admin.url", "http://localhost:8080").option("topic", "persistent://public/default/garden3").load()
dfPulsar.printSchema()
val pQuery = dfPulsar.selectExpr("*").writeStream.format("parquet").option("truncate", false) .option("checkpointLocation", "/tmp/checkpoint").option("path", "/opt/demo/gasthermal").start()
    
pQuery.explain()
pQuery.awaitTermination()
pQuery.stop()

// can be "orc", "json", "csv", etc.

Spark

Show Me The Data

We can visualize data from Apache Pulsar by consuming it through the web sockets interface in a simple JQuery Single Page Web Application like below.

JavaScript

Example Parquet Files

pip3 install parquet-tools -U

parquet-tools inspect part-00000-b7e1f8dc-956d-4130-bc59-7b1435e41391-c000.snappy.parquet

############ file meta data ############
created_by: parquet-mr version 1.12.1 (build 2a5c06c58fa987f85aa22170be14d927d5ff6e7d)
num_columns: 23
num_rows: 1
num_row_groups: 1
format_version: 1.0
serialized_size: 5071


############ Columns ############
cpu
diskusage
endtime
equivalentco2ppm
host
hostname
ipaddress
macaddress
memory
rowid
runtime
starttime
systemtime
totalvocppb
ts
uuid
__key
__topic
__messageId
__publishTime
__eventTime
key
value

############ Column(cpu) ############
name: cpu
path: cpu
max_definition_level: 1
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE

############ Column(diskusage) ############
name: diskusage
path: diskusage
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(endtime) ############
name: endtime
path: endtime
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(equivalentco2ppm) ############
name: equivalentco2ppm
path: equivalentco2ppm
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(host) ############
name: host
path: host
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(hostname) ############
name: hostname
path: hostname
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(ipaddress) ############
name: ipaddress
path: ipaddress
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(macaddress) ############
name: macaddress
path: macaddress
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(memory) ############
name: memory
path: memory
max_definition_level: 1
max_repetition_level: 0
physical_type: FLOAT
logical_type: None
converted_type (legacy): NONE

############ Column(rowid) ############
name: rowid
path: rowid
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(runtime) ############
name: runtime
path: runtime
max_definition_level: 1
max_repetition_level: 0
physical_type: INT32
logical_type: None
converted_type (legacy): NONE

############ Column(starttime) ############
name: starttime
path: starttime
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(systemtime) ############
name: systemtime
path: systemtime
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(totalvocppb) ############
name: totalvocppb
path: totalvocppb
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(ts) ############
name: ts
path: ts
max_definition_level: 1
max_repetition_level: 0
physical_type: INT32
logical_type: None
converted_type (legacy): NONE

############ Column(uuid) ############
name: uuid
path: uuid
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(__key) ############
name: __key
path: __key
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: None
converted_type (legacy): NONE

############ Column(__topic) ############
name: __topic
path: __topic
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(__messageId) ############
name: __messageId
path: __messageId
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: None
converted_type (legacy): NONE

############ Column(__publishTime) ############
name: __publishTime
path: __publishTime
max_definition_level: 1
max_repetition_level: 0
physical_type: INT96
logical_type: None
converted_type (legacy): NONE

############ Column(__eventTime) ############
name: __eventTime
path: __eventTime
max_definition_level: 1
max_repetition_level: 0
physical_type: INT96
logical_type: None
converted_type (legacy): NONE

############ Column(key) ############
name: key
path: __messageProperties.key_value.key
max_definition_level: 2
max_repetition_level: 1
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

############ Column(value) ############
name: value
path: __messageProperties.key_value.value
max_definition_level: 3
max_repetition_level: 1
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8

Flink

CREATE CATALOG pulsar WITH (
   'type' = 'pulsar',
   'service-url' = 'pulsar://pulsar1:6650',
   'admin-url' = 'http://pulsar1:8080',
   'format' = 'json'
);

USE CATALOG pulsar;

SHOW TABLES;

Flink SQL> describe garden3;
+------------------+--------+------+-----+--------+-----------+
|             name |   type | null | key | extras | watermark |
+------------------+--------+------+-----+--------+-----------+
|              cpu |  FLOAT | true |     |        |           |
|        diskusage | STRING | true |     |        |           |
|          endtime | STRING | true |     |        |           |
| equivalentco2ppm | STRING | true |     |        |           |
|             host | STRING | true |     |        |           |
|         hostname | STRING | true |     |        |           |
|        ipaddress | STRING | true |     |        |           |
|       macaddress | STRING | true |     |        |           |
|           memory |  FLOAT | true |     |        |           |
|            rowid | STRING | true |     |        |           |
|          runtime |    INT | true |     |        |           |
|        starttime | STRING | true |     |        |           |
|       systemtime | STRING | true |     |        |           |
|      totalvocppb | STRING | true |     |        |           |
|               ts |    INT | true |     |        |           |
|             uuid | STRING | true |     |        |           |
+------------------+--------+------+-----+--------+-----------+
16 rows in set

select equivalentco2ppm, totalvocppb, cpu, starttime, systemtime, ts, cpu, diskusage, endtime, memory, uuid from garden3;

select max(equivalentco2ppm) as MaxCO2, max(totalvocppb) as MaxVocPPB from garden3;


// TODO

Add your own table

  publishTime TIMESTAMP(3) METADATA,
  WATERMARK FOR publishTime AS publishTime - INTERVAL '5' SECOND
  

Flink Flink2 Flink3

Apache NiFi (FLiPN)

  • Choose a processor from the palette

ShowNiFi

  • Consume messages from Apache Pulsar

ConsumePulsar

  • Full Apache NiFi Data Flow with Apache Pulsar and MongoDB

Flow

MongoDB

mongo -u username1 -p password1 --authenticationDatabase admin pulsar1:27017/inventory

show databases

db.createCollection("garden3")

show collections

db.garden3.find().pretty()

{
        "_id" : ObjectId("622f7315f99b9a338d60592f"),
        "cpu" : 0,
        "diskusage" : "101615.9 MB",
        "endtime" : "1647276083.2033532",
        "equivalentco2ppm" : "  407",
        "host" : "garden3",
        "hostname" : "garden3",
        "ipaddress" : "192.168.1.199",
        "macaddress" : "dc:a6:32:32:98:20",
        "memory" : 8.8,
        "rowid" : "20220314164123_0e19c5e6-45f5-405e-bd93-9aed05b37630",
        "runtime" : 0,
        "starttime" : "03/14/2022 12:41:23",
        "systemtime" : "03/14/2022 12:41:24",
        "totalvocppb" : "  5",
        "ts" : 1647276084,
        "uuid" : "garden3_uuid_xrl_20220314164123"
}

mongo mongoquery

References

Additional Heat Images

hot hot2 more3 doesanyonereadthis4

Five Sensors Real-Time with Pulsar and Python on a Pi (FLiP-Py-Pi-BreakoutGarden)

FLiP-Pi-BreakoutGarden

FLiP-Py

The easy way to build Python streaming applications from the edge to cloud.

Gear / Hardware

  • Raspberry Pi 3 Model B Rev 1.2, Bullseye Raspian, armv71
  • Pimoroni Breakout Garden Hat
  • 1.12" Mono OLED Breakout 128x128 White/Black Screen
  • BME680 Air Quality, Temperature, Pressure, Humidity Sensor
  • LWM303D 6D0F Motion Sensor (X, Y, Z Axes)
  • BH1745 Luminance and Color Sensor
  • LTR-559 Light and Proximity Sensor 0.01 lux to 64,000 lux
  • VL53L1X Time of Flight (TOF) Sensor

Device

Software / Libraries

  • Python 3.9
  • Pulsar Python Client 2.10 (avro) pip3 install pulsar-client[avro]
  • Python Breakout Garden
  • Python PSUTIL https://pypi.org/project/psutil/
  • Python LUMA OLED pip3 install --upgrade luma.oled
  • Libraries sudo apt-get install python3 python3-pip python3-pil libjpeg-dev zlib1g-dev libfreetype6-dev liblcms2-dev libopenjp2-7 libtiff5 -y

Architecture

StreamOps

bin/pulsar-admin topics create "persistent://public/default/pi-sensors"

Device Running

VL53L0X_GetDeviceInfo:
Device Name : VL53L1 cut1.1
Device Type : VL53L1
Device ID : 
ProductRevisionMajor : 1
ProductRevisionMinor : 15
{'_required_default': False, '_default': None, '_required': False, 'uuid': 'snr_20220323200032', 'ipaddress': '192.168.1.229', 'cputempf': 99, 'runtime': 154, 'host': 'piups', 'hostname': 'piups', 'macaddress': 'b8:27:eb:4a:4b:61', 'endtime': '1648065632.645613', 'te': '154.00473523139954', 'cpu': 0.0, 'diskusage': '3895.3 MB', 'memory': 21.5, 'rowid': '20220323200032_6a66f9ea-1273-4e5d-b150-9300f6272482', 'systemtime': '03/23/2022 16:00:33', 'ts': 1648065633, 'starttime': '03/23/2022 15:57:58', 'BH1745_red': 112.2, 'BH1745_green': 82.0, 'BH1745_blue': 63.0, 'BH1745_clear': 110.0, 'VL53L1X_distance_in_mm': -1185.0, 'ltr559_lux': 6.65, 'ltr559_prox': 0.0, 'bme680_tempc': 23.6, 'bme680_tempf': 74.48, 'bme680_pressure': 1017.48, 'bme680_humidity': 33.931, 'lsm303d_accelerometer': '-00.08g : -01.00g : +00.01g', 'lsm303d_magnetometer': '+00.06 : +00.30 : +00.07'}
VL53L1X Start Ranging Address 0x29

Consumer


bin/pulsar-client consume "persistent://public/default/pi-sensors" -s "pisnsrgrdnrdr" -n 0


** SQL Consumers **

Pulsar SQL / Presto/Trino


desc pulsar."public/default"."pi-sensors";

         Column         |   Type    | Extra |                                   Comment                                   
------------------------+-----------+-------+-----------------------------------------------------------------------------
 uuid                   | varchar   |       | ["null","string"]                                                           
 ipaddress              | varchar   |       | ["null","string"]                                                           
 cputempf               | integer   |       | ["null","int"]                                                              
 runtime                | integer   |       | ["null","int"]                                                              
 host                   | varchar   |       | ["null","string"]                                                           
 hostname               | varchar   |       | ["null","string"]                                                           
 macaddress             | varchar   |       | ["null","string"]                                                           
 endtime                | varchar   |       | ["null","string"]                                                           
 te                     | varchar   |       | ["null","string"]                                                           
 cpu                    | real      |       | ["null","float"]                                                            
 diskusage              | varchar   |       | ["null","string"]                                                           
 memory                 | real      |       | ["null","float"]                                                            
 rowid                  | varchar   |       | ["null","string"]                                                           
 systemtime             | varchar   |       | ["null","string"]                                                           
 ts                     | integer   |       | ["null","int"]                                                              
 starttime              | varchar   |       | ["null","string"]                                                           
 bh1745_red             | real      |       | ["null","float"]                                                            
 bh1745_green           | real      |       | ["null","float"]                                                            
 bh1745_blue            | real      |       | ["null","float"]                                                            
 bh1745_clear           | real      |       | ["null","float"]                                                            
 vl53l1x_distance_in_mm | real      |       | ["null","float"]                                                            
 ltr559_lux             | real      |       | ["null","float"]                                                            
 ltr559_prox            | real      |       | ["null","float"]                                                            
 bme680_tempc           | real      |       | ["null","float"]                                                            
 bme680_tempf           | real      |       | ["null","float"]                                                            
 bme680_pressure        | real      |       | ["null","float"]                                                            
 bme680_humidity        | real      |       | ["null","float"]                                                            
 lsm303d_accelerometer  | varchar   |       | ["null","string"]                                                           
 lsm303d_magnetometer   | varchar   |       | ["null","string"]                                                           
 __partition__          | integer   |       | The partition number which the message belongs to                           
 __event_time__         | timestamp |       | Application defined timestamp in milliseconds of when the event occurred    
 __publish_time__       | timestamp |       | The timestamp in milliseconds of when event as published                    
 __message_id__         | varchar   |       | The message ID of the message used to generate this row                     
 __sequence_id__        | bigint    |       | The sequence ID of the message used to generate this row                    
 __producer_name__      | varchar   |       | The name of the producer that publish the message used to generate this row 
 __key__                | varchar   |       | The partition key for the topic                                             
 __properties__         | varchar   |       | User defined properties                                                     
(37 rows)

presto> select * from pulsar."public/default"."pi-sensors";
        uuid        |   ipaddress   | cputempf | runtime | host  | hostname |    macaddress     |      endtime       |         te         | cpu | disk
--------------------+---------------+----------+---------+-------+----------+-------------------+--------------------+--------------------+-----+-----
 snr_20220323180318 | 192.168.1.229 |       99 |       4 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058598.8543017 | 4.47935152053833   | 0.2 | 3895
 snr_20220323180324 | 192.168.1.229 |       99 |      10 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058604.4054732 | 10.03052306175232  | 0.0 | 3895
 snr_20220323180329 | 192.168.1.229 |       99 |      16 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058609.8929565 | 15.518006324768066 | 6.5 | 3895
 snr_20220323180335 | 192.168.1.229 |       99 |      21 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058615.3783045 | 21.00335431098938  | 0.2 | 3895
 snr_20220323180340 | 192.168.1.229 |       99 |      26 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058620.8675282 | 26.49257802963257  | 4.6 | 3895
 snr_20220323180346 | 192.168.1.229 |       99 |      32 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058626.3639522 | 31.989001989364624 | 0.0 | 3895
 snr_20220323180351 | 192.168.1.229 |       99 |      38 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058631.8793604 | 37.50441026687622  | 0.0 | 3895
 snr_20220323180357 | 192.168.1.229 |      100 |      43 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058637.38347   | 43.008519887924194 | 0.0 | 3895
 snr_20220323180402 | 192.168.1.229 |       99 |      49 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058642.8820572 | 48.50710701942444  | 0.0 | 3895
 snr_20220323180408 | 192.168.1.229 |       99 |      54 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058648.3795574 | 54.00460720062256  | 6.2 | 3895
 snr_20220323180413 | 192.168.1.229 |       99 |      59 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058653.8280468 | 59.45309662818909  | 0.0 | 3895
 snr_20220323180419 | 192.168.1.229 |       99 |      65 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058659.3180714 | 64.94312119483948  | 4.9 | 3895
 snr_20220323180424 | 192.168.1.229 |       99 |      70 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058664.8023574 | 70.42740726470947  | 0.0 | 3895
 snr_20220323180430 | 192.168.1.229 |       99 |      76 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058670.286937  | 75.91198682785034  | 0.0 | 3895
 snr_20220323180435 | 192.168.1.229 |       97 |      81 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058675.7804654 | 81.40551519393921  | 0.0 | 3895
 snr_20220323180441 | 192.168.1.229 |       99 |      87 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058681.2751634 | 86.90021324157715  | 0.0 | 3895
 snr_20220323180446 | 192.168.1.229 |       99 |      92 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058686.7713509 | 92.39640069007874  | 5.9 | 3895
 snr_20220323180452 | 192.168.1.229 |       99 |      98 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058692.2672575 | 97.89230728149414  | 0.3 | 3895
 snr_20220323180457 | 192.168.1.229 |       99 |     103 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058697.7704427 | 103.39549255371094 | 5.4 | 3895
 snr_20220323180503 | 192.168.1.229 |       99 |     109 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058703.21333   | 108.83837985992432 | 0.3 | 3895
 snr_20220323180508 | 192.168.1.229 |       99 |     114 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058708.6879904 | 114.31304025650024 | 0.0 | 3895
 snr_20220323180514 | 192.168.1.229 |       99 |     120 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058714.1396198 | 119.76466965675354 | 0.3 | 3895
 snr_20220323180519 | 192.168.1.229 |       99 |     125 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058719.6158638 | 125.24091362953186 | 0.0 | 3895
 snr_20220323180525 | 192.168.1.229 |      100 |     131 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058725.0950723 | 130.72012209892273 | 6.5 | 3895
 snr_20220323180530 | 192.168.1.229 |       99 |     136 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058730.57256   | 136.19760990142822 | 0.0 | 3895
(25 rows)

Query 20220323_184946_00003_p66fs, FINISHED, 1 node

PULSARSQL

PULSARSQL

PULSARSQL

Spark SQL

val dfPulsar = spark.readStream.format("pulsar").option("service.url", "pulsar://pulsar1:6650").option("admin.url", "http://pulsar1:8080").option("topic", "persistent://public/default/pi-sensors").load()

scala> dfPulsar.printSchema()
root
 |-- uuid: string (nullable = true)
 |-- ipaddress: string (nullable = true)
 |-- cputempf: integer (nullable = true)
 |-- runtime: integer (nullable = true)
 |-- host: string (nullable = true)
 |-- hostname: string (nullable = true)
 |-- macaddress: string (nullable = true)
 |-- endtime: string (nullable = true)
 |-- te: string (nullable = true)
 |-- cpu: float (nullable = true)
 |-- diskusage: string (nullable = true)
 |-- memory: float (nullable = true)
 |-- rowid: string (nullable = true)
 |-- systemtime: string (nullable = true)
 |-- ts: integer (nullable = true)
 |-- starttime: string (nullable = true)
 |-- BH1745_red: float (nullable = true)
 |-- BH1745_green: float (nullable = true)
 |-- BH1745_blue: float (nullable = true)
 |-- BH1745_clear: float (nullable = true)
 |-- VL53L1X_distance_in_mm: float (nullable = true)
 |-- ltr559_lux: float (nullable = true)
 |-- ltr559_prox: float (nullable = true)
 |-- bme680_tempc: float (nullable = true)
 |-- bme680_tempf: float (nullable = true)
 |-- bme680_pressure: float (nullable = true)
 |-- bme680_humidity: float (nullable = true)
 |-- lsm303d_accelerometer: string (nullable = true)
 |-- lsm303d_magnetometer: string (nullable = true)
 |-- __key: binary (nullable = true)
 |-- __topic: string (nullable = true)
 |-- __messageId: binary (nullable = true)
 |-- __publishTime: timestamp (nullable = true)
 |-- __eventTime: timestamp (nullable = true)
 |-- __messageProperties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)


## Example Queries

val pQuery = dfPulsar.selectExpr("*").writeStream.format("console").option("truncate", false).start()

val pQuery = dfPulsar.selectExpr("CAST(__key AS STRING)", 
                                 "CAST(uuid AS STRING)",
                                 "CAST(ipaddress AS STRING)",
                                 "CAST(cputempf AS STRING)",
                                 "CAST(host AS STRING)",
                                 "CAST(cpu AS STRING)",
                                 "CAST(diskusage AS STRING)",
                                 "CAST(memory AS STRING)",
                                 "CAST(systemtime AS STRING)",
                                 "CAST(BH1745_red AS STRING)",
                                 "CAST(BH1745_green AS STRING)",
                                 "CAST(BH1745_blue AS STRING)",
                                 "CAST(BH1745_clear AS STRING)",
                                 "CAST(VL53L1X_distance_in_mm AS STRING)",
                                 "CAST(ltr559_lux AS STRING)",                                 
                                 "CAST(bme680_tempf AS STRING)",
                                 "CAST(bme680_pressure AS STRING)",
                                 "CAST(bme680_humidity AS STRING)")
                                 .as[(String, String, String, String, String, String, String, String,
                                 String, String, String, String, String, String, String, String, String, String)]
            .writeStream.format("csv")
            .option("truncate", "false")
            .option("header", true)
            .option("path", "/opt/demo/pisensordata")
            .option("checkpointLocation", "/tmp/checkpoint")
            .start()

## You could do csv, parquet, json, orc

pQuery.explain()
pQuery.awaitTermination()
pQuery.stop()

// can be "orc", "json", "csv", etc.

SPARK SPARK SPARK SPARK SPARK SPARK

Example Spark ETL CSV Output

/opt/demo/pisensordata# cat part-00000-0425bfc8-5d25-4143-818c-bc7af5e1d82c-c000.csv
__key,uuid,ipaddress,cputempf,host,cpu,diskusage,memory,systemtime,BH1745_red,BH1745_green,BH1745_blue,BH1745_clear,VL53L1X_distance_in_mm,ltr559_lux,bme680_tempf,bme680_pressure,bme680_humidity
snr_20220324215723,snr_20220324215723,192.168.1.229,95,piups,0.0,3887.5 MB,20.6,03/24/2022 17:57:24,134.2,99.0,75.6,130.0,15.0,6.09,70.66,1006.11,44.737

CSV

Flink SQL

CREATE CATALOG pulsar WITH (
   'type' = 'pulsar',
   'service-url' = 'pulsar://pulsar1:6650',
   'admin-url' = 'http://pulsar1:8080',
   'format' = 'json'
);

USE CATALOG pulsar;

SHOW TABLES;


describe `pi-sensors`;
> 
+------------------------+--------+------+-----+--------+-----------+
|                   name |   type | null | key | extras | watermark |
+------------------------+--------+------+-----+--------+-----------+
|                   uuid | STRING | true |     |        |           |
|              ipaddress | STRING | true |     |        |           |
|               cputempf |    INT | true |     |        |           |
|                runtime |    INT | true |     |        |           |
|                   host | STRING | true |     |        |           |
|               hostname | STRING | true |     |        |           |
|             macaddress | STRING | true |     |        |           |
|                endtime | STRING | true |     |        |           |
|                     te | STRING | true |     |        |           |
|                    cpu |  FLOAT | true |     |        |           |
|              diskusage | STRING | true |     |        |           |
|                 memory |  FLOAT | true |     |        |           |
|                  rowid | STRING | true |     |        |           |
|             systemtime | STRING | true |     |        |           |
|                     ts |    INT | true |     |        |           |
|              starttime | STRING | true |     |        |           |
|             BH1745_red |  FLOAT | true |     |        |           |
|           BH1745_green |  FLOAT | true |     |        |           |
|            BH1745_blue |  FLOAT | true |     |        |           |
|           BH1745_clear |  FLOAT | true |     |        |           |
| VL53L1X_distance_in_mm |  FLOAT | true |     |        |           |
|             ltr559_lux |  FLOAT | true |     |        |           |
|            ltr559_prox |  FLOAT | true |     |        |           |
|           bme680_tempc |  FLOAT | true |     |        |           |
|           bme680_tempf |  FLOAT | true |     |        |           |
|        bme680_pressure |  FLOAT | true |     |        |           |
|        bme680_humidity |  FLOAT | true |     |        |           |
|  lsm303d_accelerometer | STRING | true |     |        |           |
|   lsm303d_magnetometer | STRING | true |     |        |           |
+------------------------+--------+------+-----+--------+-----------+

select max(bme680_pressure) as maxpressure, max(bme680_tempf) as maxtemp, max(ltr559_lux) as maxlux, avg(BH1745_red) as avgred,
       max(VL53L1X_distance_in_mm) as maxdistance
from `pi-sensors`

select * from `pi-sensors`;

FLINK FLINK FLINK FLINK

Apache NiFi - Pulsar Consumer. MongoDB Writer.

NIFI

NIFI

NIFI

NIFI

NIFI

NIFI

Data Store - MongoDB


mongo -u debezium -p dbz --authenticationDatabase admin pulsar1:27017/inventory

show databases

db.createCollection("pisensors")

show collections

db.pisensors.find().pretty()

db.pisensors.find().pretty()
{
        "_id" : ObjectId("623b812e5dae8913d42a93ee"),
        "uuid" : "snr_20220323194514",
        "ipaddress" : "192.168.1.229",
        "cputempf" : 100,
        "runtime" : 9,
        "host" : "piups",
        "hostname" : "piups",
        "macaddress" : "b8:27:eb:4a:4b:61",
        "endtime" : "1648064714.7820184",
        "te" : "9.371636629104614",
        "cpu" : 6.5,
        "diskusage" : "3895.4 MB",
        "memory" : 21.4,
        "rowid" : "20220323194514_c9ec900f-05c2-49c4-985f-ddd83e8b15c0",
        "systemtime" : "03/23/2022 15:45:15",
        "ts" : 1648064715,
        "starttime" : "03/23/2022 15:45:05",
        "BH1745_red" : 112.2,
        "BH1745_green" : 83,
        "BH1745_blue" : 64.8,
        "BH1745_clear" : 110,
        "VL53L1X_distance_in_mm" : 31,
        "ltr559_lux" : 6.65,
        "ltr559_prox" : 0,
        "bme680_tempc" : 23.47,
        "bme680_tempf" : 74.25,
        "bme680_pressure" : 1017.71,
        "bme680_humidity" : 34.432,
        "lsm303d_accelerometer" : "-00.08g : -01.01g : +00.01g",
        "lsm303d_magnetometer" : "+00.06 : +00.30 : +00.07"
}

MongoData

Monitor Everything! Let me see what's going on!?!??!

GRAFANA

GRAFANA

GRAFANA

PULSARMAN

References