Skip to main content

Five Sensors Real-Time with Pulsar and Python on a Pi (FLiP-Py-Pi-BreakoutGarden)

FLiP-Pi-BreakoutGarden

FLiP-Py

The easy way to build Python streaming applications from the edge to cloud.

Gear / Hardware

  • Raspberry Pi 3 Model B Rev 1.2, Bullseye Raspian, armv71
  • Pimoroni Breakout Garden Hat
  • 1.12" Mono OLED Breakout 128x128 White/Black Screen
  • BME680 Air Quality, Temperature, Pressure, Humidity Sensor
  • LWM303D 6D0F Motion Sensor (X, Y, Z Axes)
  • BH1745 Luminance and Color Sensor
  • LTR-559 Light and Proximity Sensor 0.01 lux to 64,000 lux
  • VL53L1X Time of Flight (TOF) Sensor

Device

Software / Libraries

  • Python 3.9
  • Pulsar Python Client 2.10 (avro) pip3 install pulsar-client[avro]
  • Python Breakout Garden
  • Python PSUTIL https://pypi.org/project/psutil/
  • Python LUMA OLED pip3 install --upgrade luma.oled
  • Libraries sudo apt-get install python3 python3-pip python3-pil libjpeg-dev zlib1g-dev libfreetype6-dev liblcms2-dev libopenjp2-7 libtiff5 -y

Architecture

StreamOps

bin/pulsar-admin topics create "persistent://public/default/pi-sensors"

Device Running

VL53L0X_GetDeviceInfo:
Device Name : VL53L1 cut1.1
Device Type : VL53L1
Device ID : 
ProductRevisionMajor : 1
ProductRevisionMinor : 15
{'_required_default': False, '_default': None, '_required': False, 'uuid': 'snr_20220323200032', 'ipaddress': '192.168.1.229', 'cputempf': 99, 'runtime': 154, 'host': 'piups', 'hostname': 'piups', 'macaddress': 'b8:27:eb:4a:4b:61', 'endtime': '1648065632.645613', 'te': '154.00473523139954', 'cpu': 0.0, 'diskusage': '3895.3 MB', 'memory': 21.5, 'rowid': '20220323200032_6a66f9ea-1273-4e5d-b150-9300f6272482', 'systemtime': '03/23/2022 16:00:33', 'ts': 1648065633, 'starttime': '03/23/2022 15:57:58', 'BH1745_red': 112.2, 'BH1745_green': 82.0, 'BH1745_blue': 63.0, 'BH1745_clear': 110.0, 'VL53L1X_distance_in_mm': -1185.0, 'ltr559_lux': 6.65, 'ltr559_prox': 0.0, 'bme680_tempc': 23.6, 'bme680_tempf': 74.48, 'bme680_pressure': 1017.48, 'bme680_humidity': 33.931, 'lsm303d_accelerometer': '-00.08g : -01.00g : +00.01g', 'lsm303d_magnetometer': '+00.06 : +00.30 : +00.07'}
VL53L1X Start Ranging Address 0x29

Consumer


bin/pulsar-client consume "persistent://public/default/pi-sensors" -s "pisnsrgrdnrdr" -n 0


** SQL Consumers **

Pulsar SQL / Presto/Trino


desc pulsar."public/default"."pi-sensors";

         Column         |   Type    | Extra |                                   Comment                                   
------------------------+-----------+-------+-----------------------------------------------------------------------------
 uuid                   | varchar   |       | ["null","string"]                                                           
 ipaddress              | varchar   |       | ["null","string"]                                                           
 cputempf               | integer   |       | ["null","int"]                                                              
 runtime                | integer   |       | ["null","int"]                                                              
 host                   | varchar   |       | ["null","string"]                                                           
 hostname               | varchar   |       | ["null","string"]                                                           
 macaddress             | varchar   |       | ["null","string"]                                                           
 endtime                | varchar   |       | ["null","string"]                                                           
 te                     | varchar   |       | ["null","string"]                                                           
 cpu                    | real      |       | ["null","float"]                                                            
 diskusage              | varchar   |       | ["null","string"]                                                           
 memory                 | real      |       | ["null","float"]                                                            
 rowid                  | varchar   |       | ["null","string"]                                                           
 systemtime             | varchar   |       | ["null","string"]                                                           
 ts                     | integer   |       | ["null","int"]                                                              
 starttime              | varchar   |       | ["null","string"]                                                           
 bh1745_red             | real      |       | ["null","float"]                                                            
 bh1745_green           | real      |       | ["null","float"]                                                            
 bh1745_blue            | real      |       | ["null","float"]                                                            
 bh1745_clear           | real      |       | ["null","float"]                                                            
 vl53l1x_distance_in_mm | real      |       | ["null","float"]                                                            
 ltr559_lux             | real      |       | ["null","float"]                                                            
 ltr559_prox            | real      |       | ["null","float"]                                                            
 bme680_tempc           | real      |       | ["null","float"]                                                            
 bme680_tempf           | real      |       | ["null","float"]                                                            
 bme680_pressure        | real      |       | ["null","float"]                                                            
 bme680_humidity        | real      |       | ["null","float"]                                                            
 lsm303d_accelerometer  | varchar   |       | ["null","string"]                                                           
 lsm303d_magnetometer   | varchar   |       | ["null","string"]                                                           
 __partition__          | integer   |       | The partition number which the message belongs to                           
 __event_time__         | timestamp |       | Application defined timestamp in milliseconds of when the event occurred    
 __publish_time__       | timestamp |       | The timestamp in milliseconds of when event as published                    
 __message_id__         | varchar   |       | The message ID of the message used to generate this row                     
 __sequence_id__        | bigint    |       | The sequence ID of the message used to generate this row                    
 __producer_name__      | varchar   |       | The name of the producer that publish the message used to generate this row 
 __key__                | varchar   |       | The partition key for the topic                                             
 __properties__         | varchar   |       | User defined properties                                                     
(37 rows)

presto> select * from pulsar."public/default"."pi-sensors";
        uuid        |   ipaddress   | cputempf | runtime | host  | hostname |    macaddress     |      endtime       |         te         | cpu | disk
--------------------+---------------+----------+---------+-------+----------+-------------------+--------------------+--------------------+-----+-----
 snr_20220323180318 | 192.168.1.229 |       99 |       4 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058598.8543017 | 4.47935152053833   | 0.2 | 3895
 snr_20220323180324 | 192.168.1.229 |       99 |      10 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058604.4054732 | 10.03052306175232  | 0.0 | 3895
 snr_20220323180329 | 192.168.1.229 |       99 |      16 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058609.8929565 | 15.518006324768066 | 6.5 | 3895
 snr_20220323180335 | 192.168.1.229 |       99 |      21 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058615.3783045 | 21.00335431098938  | 0.2 | 3895
 snr_20220323180340 | 192.168.1.229 |       99 |      26 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058620.8675282 | 26.49257802963257  | 4.6 | 3895
 snr_20220323180346 | 192.168.1.229 |       99 |      32 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058626.3639522 | 31.989001989364624 | 0.0 | 3895
 snr_20220323180351 | 192.168.1.229 |       99 |      38 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058631.8793604 | 37.50441026687622  | 0.0 | 3895
 snr_20220323180357 | 192.168.1.229 |      100 |      43 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058637.38347   | 43.008519887924194 | 0.0 | 3895
 snr_20220323180402 | 192.168.1.229 |       99 |      49 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058642.8820572 | 48.50710701942444  | 0.0 | 3895
 snr_20220323180408 | 192.168.1.229 |       99 |      54 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058648.3795574 | 54.00460720062256  | 6.2 | 3895
 snr_20220323180413 | 192.168.1.229 |       99 |      59 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058653.8280468 | 59.45309662818909  | 0.0 | 3895
 snr_20220323180419 | 192.168.1.229 |       99 |      65 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058659.3180714 | 64.94312119483948  | 4.9 | 3895
 snr_20220323180424 | 192.168.1.229 |       99 |      70 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058664.8023574 | 70.42740726470947  | 0.0 | 3895
 snr_20220323180430 | 192.168.1.229 |       99 |      76 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058670.286937  | 75.91198682785034  | 0.0 | 3895
 snr_20220323180435 | 192.168.1.229 |       97 |      81 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058675.7804654 | 81.40551519393921  | 0.0 | 3895
 snr_20220323180441 | 192.168.1.229 |       99 |      87 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058681.2751634 | 86.90021324157715  | 0.0 | 3895
 snr_20220323180446 | 192.168.1.229 |       99 |      92 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058686.7713509 | 92.39640069007874  | 5.9 | 3895
 snr_20220323180452 | 192.168.1.229 |       99 |      98 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058692.2672575 | 97.89230728149414  | 0.3 | 3895
 snr_20220323180457 | 192.168.1.229 |       99 |     103 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058697.7704427 | 103.39549255371094 | 5.4 | 3895
 snr_20220323180503 | 192.168.1.229 |       99 |     109 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058703.21333   | 108.83837985992432 | 0.3 | 3895
 snr_20220323180508 | 192.168.1.229 |       99 |     114 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058708.6879904 | 114.31304025650024 | 0.0 | 3895
 snr_20220323180514 | 192.168.1.229 |       99 |     120 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058714.1396198 | 119.76466965675354 | 0.3 | 3895
 snr_20220323180519 | 192.168.1.229 |       99 |     125 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058719.6158638 | 125.24091362953186 | 0.0 | 3895
 snr_20220323180525 | 192.168.1.229 |      100 |     131 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058725.0950723 | 130.72012209892273 | 6.5 | 3895
 snr_20220323180530 | 192.168.1.229 |       99 |     136 | piups | piups    | b8:27:eb:4a:4b:61 | 1648058730.57256   | 136.19760990142822 | 0.0 | 3895
(25 rows)

Query 20220323_184946_00003_p66fs, FINISHED, 1 node

PULSARSQL

PULSARSQL

PULSARSQL

Spark SQL

val dfPulsar = spark.readStream.format("pulsar").option("service.url", "pulsar://pulsar1:6650").option("admin.url", "http://pulsar1:8080").option("topic", "persistent://public/default/pi-sensors").load()

scala> dfPulsar.printSchema()
root
 |-- uuid: string (nullable = true)
 |-- ipaddress: string (nullable = true)
 |-- cputempf: integer (nullable = true)
 |-- runtime: integer (nullable = true)
 |-- host: string (nullable = true)
 |-- hostname: string (nullable = true)
 |-- macaddress: string (nullable = true)
 |-- endtime: string (nullable = true)
 |-- te: string (nullable = true)
 |-- cpu: float (nullable = true)
 |-- diskusage: string (nullable = true)
 |-- memory: float (nullable = true)
 |-- rowid: string (nullable = true)
 |-- systemtime: string (nullable = true)
 |-- ts: integer (nullable = true)
 |-- starttime: string (nullable = true)
 |-- BH1745_red: float (nullable = true)
 |-- BH1745_green: float (nullable = true)
 |-- BH1745_blue: float (nullable = true)
 |-- BH1745_clear: float (nullable = true)
 |-- VL53L1X_distance_in_mm: float (nullable = true)
 |-- ltr559_lux: float (nullable = true)
 |-- ltr559_prox: float (nullable = true)
 |-- bme680_tempc: float (nullable = true)
 |-- bme680_tempf: float (nullable = true)
 |-- bme680_pressure: float (nullable = true)
 |-- bme680_humidity: float (nullable = true)
 |-- lsm303d_accelerometer: string (nullable = true)
 |-- lsm303d_magnetometer: string (nullable = true)
 |-- __key: binary (nullable = true)
 |-- __topic: string (nullable = true)
 |-- __messageId: binary (nullable = true)
 |-- __publishTime: timestamp (nullable = true)
 |-- __eventTime: timestamp (nullable = true)
 |-- __messageProperties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)


## Example Queries

val pQuery = dfPulsar.selectExpr("*").writeStream.format("console").option("truncate", false).start()

val pQuery = dfPulsar.selectExpr("CAST(__key AS STRING)", 
                                 "CAST(uuid AS STRING)",
                                 "CAST(ipaddress AS STRING)",
                                 "CAST(cputempf AS STRING)",
                                 "CAST(host AS STRING)",
                                 "CAST(cpu AS STRING)",
                                 "CAST(diskusage AS STRING)",
                                 "CAST(memory AS STRING)",
                                 "CAST(systemtime AS STRING)",
                                 "CAST(BH1745_red AS STRING)",
                                 "CAST(BH1745_green AS STRING)",
                                 "CAST(BH1745_blue AS STRING)",
                                 "CAST(BH1745_clear AS STRING)",
                                 "CAST(VL53L1X_distance_in_mm AS STRING)",
                                 "CAST(ltr559_lux AS STRING)",                                 
                                 "CAST(bme680_tempf AS STRING)",
                                 "CAST(bme680_pressure AS STRING)",
                                 "CAST(bme680_humidity AS STRING)")
                                 .as[(String, String, String, String, String, String, String, String,
                                 String, String, String, String, String, String, String, String, String, String)]
            .writeStream.format("csv")
            .option("truncate", "false")
            .option("header", true)
            .option("path", "/opt/demo/pisensordata")
            .option("checkpointLocation", "/tmp/checkpoint")
            .start()

## You could do csv, parquet, json, orc

pQuery.explain()
pQuery.awaitTermination()
pQuery.stop()

// can be "orc", "json", "csv", etc.

SPARK SPARK SPARK SPARK SPARK SPARK

Example Spark ETL CSV Output

/opt/demo/pisensordata# cat part-00000-0425bfc8-5d25-4143-818c-bc7af5e1d82c-c000.csv
__key,uuid,ipaddress,cputempf,host,cpu,diskusage,memory,systemtime,BH1745_red,BH1745_green,BH1745_blue,BH1745_clear,VL53L1X_distance_in_mm,ltr559_lux,bme680_tempf,bme680_pressure,bme680_humidity
snr_20220324215723,snr_20220324215723,192.168.1.229,95,piups,0.0,3887.5 MB,20.6,03/24/2022 17:57:24,134.2,99.0,75.6,130.0,15.0,6.09,70.66,1006.11,44.737

CSV

Flink SQL

CREATE CATALOG pulsar WITH (
   'type' = 'pulsar',
   'service-url' = 'pulsar://pulsar1:6650',
   'admin-url' = 'http://pulsar1:8080',
   'format' = 'json'
);

USE CATALOG pulsar;

SHOW TABLES;


describe `pi-sensors`;
> 
+------------------------+--------+------+-----+--------+-----------+
|                   name |   type | null | key | extras | watermark |
+------------------------+--------+------+-----+--------+-----------+
|                   uuid | STRING | true |     |        |           |
|              ipaddress | STRING | true |     |        |           |
|               cputempf |    INT | true |     |        |           |
|                runtime |    INT | true |     |        |           |
|                   host | STRING | true |     |        |           |
|               hostname | STRING | true |     |        |           |
|             macaddress | STRING | true |     |        |           |
|                endtime | STRING | true |     |        |           |
|                     te | STRING | true |     |        |           |
|                    cpu |  FLOAT | true |     |        |           |
|              diskusage | STRING | true |     |        |           |
|                 memory |  FLOAT | true |     |        |           |
|                  rowid | STRING | true |     |        |           |
|             systemtime | STRING | true |     |        |           |
|                     ts |    INT | true |     |        |           |
|              starttime | STRING | true |     |        |           |
|             BH1745_red |  FLOAT | true |     |        |           |
|           BH1745_green |  FLOAT | true |     |        |           |
|            BH1745_blue |  FLOAT | true |     |        |           |
|           BH1745_clear |  FLOAT | true |     |        |           |
| VL53L1X_distance_in_mm |  FLOAT | true |     |        |           |
|             ltr559_lux |  FLOAT | true |     |        |           |
|            ltr559_prox |  FLOAT | true |     |        |           |
|           bme680_tempc |  FLOAT | true |     |        |           |
|           bme680_tempf |  FLOAT | true |     |        |           |
|        bme680_pressure |  FLOAT | true |     |        |           |
|        bme680_humidity |  FLOAT | true |     |        |           |
|  lsm303d_accelerometer | STRING | true |     |        |           |
|   lsm303d_magnetometer | STRING | true |     |        |           |
+------------------------+--------+------+-----+--------+-----------+

select max(bme680_pressure) as maxpressure, max(bme680_tempf) as maxtemp, max(ltr559_lux) as maxlux, avg(BH1745_red) as avgred,
       max(VL53L1X_distance_in_mm) as maxdistance
from `pi-sensors`

select * from `pi-sensors`;

FLINK FLINK FLINK FLINK

Apache NiFi - Pulsar Consumer. MongoDB Writer.

NIFI

NIFI

NIFI

NIFI

NIFI

NIFI

Data Store - MongoDB


mongo -u debezium -p dbz --authenticationDatabase admin pulsar1:27017/inventory

show databases

db.createCollection("pisensors")

show collections

db.pisensors.find().pretty()

db.pisensors.find().pretty()
{
        "_id" : ObjectId("623b812e5dae8913d42a93ee"),
        "uuid" : "snr_20220323194514",
        "ipaddress" : "192.168.1.229",
        "cputempf" : 100,
        "runtime" : 9,
        "host" : "piups",
        "hostname" : "piups",
        "macaddress" : "b8:27:eb:4a:4b:61",
        "endtime" : "1648064714.7820184",
        "te" : "9.371636629104614",
        "cpu" : 6.5,
        "diskusage" : "3895.4 MB",
        "memory" : 21.4,
        "rowid" : "20220323194514_c9ec900f-05c2-49c4-985f-ddd83e8b15c0",
        "systemtime" : "03/23/2022 15:45:15",
        "ts" : 1648064715,
        "starttime" : "03/23/2022 15:45:05",
        "BH1745_red" : 112.2,
        "BH1745_green" : 83,
        "BH1745_blue" : 64.8,
        "BH1745_clear" : 110,
        "VL53L1X_distance_in_mm" : 31,
        "ltr559_lux" : 6.65,
        "ltr559_prox" : 0,
        "bme680_tempc" : 23.47,
        "bme680_tempf" : 74.25,
        "bme680_pressure" : 1017.71,
        "bme680_humidity" : 34.432,
        "lsm303d_accelerometer" : "-00.08g : -01.01g : +00.01g",
        "lsm303d_magnetometer" : "+00.06 : +00.30 : +00.07"
}

MongoData

Monitor Everything! Let me see what's going on!?!??!

GRAFANA

GRAFANA

GRAFANA

PULSARMAN

References

Popular posts from this blog

Ingesting Drone Data From DJII Ryze Tello Drones Part 1 - Setup and Practice

Ingesting Drone Data From DJII Ryze Tello Drones Part 1 - Setup and Practice In Part 1, we will setup our drone, our communication environment, capture the data and do initial analysis. We will eventually grab live video stream for object detection, real-time flight control and real-time data ingest of photos, videos and sensor readings. We will have Apache NiFi react to live situations facing the drone and have it issue flight commands via UDP. In this initial section, we will control the drone with Python which can be triggered by NiFi. Apache NiFi will ingest log data that is stored as CSV files on a NiFi node connected to the drone's WiFi. This will eventually move to a dedicated embedded device running MiniFi. This is a small personal drone with less than 13 minutes of flight time per battery. This is not a commercial drone, but gives you an idea of the what you can do with drones. Drone Live Communications for Sensor Readings and Drone Control You must connect t

NiFi on Cloudera Data Platform Upgrade - April 2021

CFM 2.1.1 on CDP 7.1.6 There is a new Cloudera release of Apache NiFi now with SAML support. Apache NiFi 1.13.2.2.1.1.0 Apache NiFi Registry 0.8.0.2.1.1.0 See:    https://blog.cloudera.com/the-new-releases-of-apache-nifi-in-public-cloud-and-private-cloud/ https://docs.cloudera.com/cfm/2.1.1/release-notes/topics/cfm-component-support.html https://docs.cloudera.com/cfm/2.1.1/release-notes/topics/cfm-whats-new.html https://docs.cloudera.com/cfm/2.1.1/upgrade-paths/topics/cfm-upgrade-paths.html   For changes:    https://www.datainmotion.dev/2021/02/new-features-of-apache-nifi-1130.html Get your download on:  https://docs.cloudera.com/cfm/2.1.1/download/topics/cfm-download-locations.html To start researching for the future, take a look at some of the technical preview features around Easy Rules engine and handlers. https://docs.cloudera.com/cfm/2.1.1/release-notes/topics/cfm-technical-preview.html Make sure you use the latest possible JDK 8 as there are some bugs out there.   Use a recent v

Using Apache NiFi in OpenShift and Anywhere Else to Act as Your Global Integration Gateway

Using Apache NiFi in OpenShift and Anywhere Else to Act as Your Global Integration Gateway What does it look like? Where Can I Run This Magic Engine: Private Cloud, Public Cloud, Hybrid Cloud, VM, Bare Metal, Single Node, Laptop, Raspberry Pi or anywhere you have a 1GB of RAM and some CPU is a good place to run a powerful graphical integration and dataflow engine.   You can also run MiNiFi C++ or Java agents if you want it even smaller. Sounds Too Powerful and Expensive: Apache NiFi is Open Source and can be run freely anywhere. For What Use Cases: Microservices, Images, Deep Learning and Machine Learning Models, Structured Data, Unstructured Data, NLP, Sentiment Analysis, Semistructured Data, Hive, Hadoop, MongoDB, ElasticSearch, SOLR, ETL/ELT, MySQL CDC, MySQL Insert/Update/Delete/Query, Hosting Unlimited REST Services, Interactive with Websockets, Ingesting Any REST API, Natively Converting JSON/XML/CSV/TSV/Logs/Avro/Parquet, Excel, PDF, Word Documents, Syslog, Kafka, JMS, MQTT, TCP