Consuming Streaming Stocks Data with Python, Websockets and Pulsar

 https://medium.com/@tspann/lets-check-our-stocks-from-finnhub-and-do-some-real-time-analytics-1b7963008e19


Let’s Check Our Stocks From FinnHub and Do Some Real-Time Analytics

Codehttps://github.com/tspannhw/FLiPN-Py-Stocks

The easiest application to build is a simple Python application since finnhub includes the basics in their documentation. We are going to use their free WEBSOCKET interface to Trades so we can get real-time events as they happen. We will get JSON data for each trade triggered.

Python App

Python application receives websocket stream of JSON arrays and sends individual JSON messages with a JSON schema.

architecture

Raw Data

{"data":[{"c":["1","8","24","12"],"p":122.1,"s":"TSLA","t":1672348887195,"v":1},{"c":["1","8","24","12"],"p":122.09,"s":"TSLA","t":1672348887196,"v":4},{"c":["1","8","24","12"],"p":122.09,"s":"TSLA","t":1672348887196,"v":10},{"c":["1","8","24","12"],"p":122.1,"s":"TSLA","t":1672348887196,"v":1},{"c":["1","8","24","12"],"p":122.1,"s":"TSLA","t":1672348887196,"v":2},{"c":["1","8","24","12"],"p":122.1,"s":"TSLA","t":1672348887196,"v":10},{"c":["1","8","24","12"],"p":122.1,"s":"TSLA","t":1672348887198,"v":79},{"c":["1","24","12"],"p":129.58,"s":"AAPL","t":1672348887666,"v":1},{"c":["1","24","12"],"p":129.575,"s":"AAPL","t":1672348887785,"v":1}],"type":"trade"}
{"c":["1","8","24","12"],"p":122.1,"s":"TSLA","t":1672348887195,"v":1}

Data Description

data
List of trades or price updates.
s
Symbol.
p
Last price.
t
UNIX milliseconds timestamp.
v
Volume.
c
List of trade conditions. A comprehensive list of trade conditions code can be found here

Let’s Build a Schema for our JSON data. Once we have a class definied for it in Python, we can send that to an Apache Pulsar cluster and it will generate the first version of our schema for us. When we have a schema it lets us treat that data as a table in Trino, Spark SQL and Flink SQL. So this is awesome.

By defining our data and making it fully structured with a schema even though it is still semi-structured JSON, it makes it very easy to work with. We know what we are getting. This will make it easier to stream into Apache Pinot, Apache Iceberg, Delta Lake or another analytics system.

class Stock (Record):
symbol = String()
ts = Float()
currentts = Float()
volume = Float()
price = Float()
tradeconditions = String()
uuid = String()

We then connect to our Pulsar cluster, very easy in Python.

client = pulsar.Client(‘pulsar://localhost:6650’)
producer = client.create_producer(topic=’persistent://public/default/stocks’ ,schema=JsonSchema(Stock),properties={“producer-name”: “py-stocks”,”producer-id”: “pystocks1” })

If we have never used this topic before, Pulsar will create it for you. For best practices, build your tenant, namespace and topic before your application while you are defining schemas and data contracts.

For more information on the Python interface for Pulsar, check out this link.

NEWBIE HINT:

For a free cluster and training, check out this training academy.

Example Python Projects

For all the real newbies, here is the real getting started.

Consume Pulsar Data

bin/pulsar-client consume "persistent://public/default/stocks" -s stocks-reader -n 0
----- got message -----
key:[20221230191756_42a4752d-5f66-4245-8153-a5ec8478f738], properties:[], content:{
"symbol": "AAPL",
"ts": 1672427874976.0,
"currentts": 20221230191756.0,
"volume": 10.0,
"price": 128.055,
"tradeconditions": "1 12",
"uuid": "20221230191756_42a4752d-5f66-4245-8153-a5ec8478f738"
}
----- got message -----
key:[20221230191756_a560a594-7c12-42e7-a76d-6650a48533e0], properties:[], content:{
"symbol": "TSLA",
"ts": 1672427874974.0,
"currentts": 20221230191756.0,
"volume": 100.0,
"price": 120.94,
"tradeconditions": "",
"uuid": "20221230191756_a560a594-7c12-42e7-a76d-6650a48533e0"
}

References

Cloud Native Apache Pulsar Development 101 with Python | Tim Spann | Con...

building-real-time-weather-dashboards-with-apache

Ten Tips for Building Scalable and Performant Data Pipelines with Apache Pulsar

10 Tips for Building Scalable and Performant Data Pipelines with Apache Pulsar

by Mr. Pulsar's Friend - ChatGPT

Data pipelines are crucial for modern organizations, allowing them to extract, transform, and load large volumes of data from various sources for analysis and reporting. Choosing the right technology to power these pipelines is key to ensuring they are scalable, performant, and able to handle the growing volumes of data.

Apache Pulsar, a distributed publish-subscribe messaging system, is a popular choice for building data pipelines. With its ability to handle high volumes of streaming data in real-time, Pulsar can be used to build scalable and fault-tolerant data pipelines that can grow with your organization's needs.

Here are 10 tips for building scalable and performant data pipelines with Apache Pulsar:

Choose the right data sources for your pipeline. Pulsar can be used to stream data from various sources, including social media feeds, IoT devices, and more. Consider the volume and frequency of the data, as well as the required processing and storage capabilities, when selecting your data sources.

Use Pulsar's publish-subscribe messaging model to your advantage. Pulsar allows you to ingest and process data streams in real-time, using a publish-subscribe messaging model. This allows you to easily scale your pipeline and add new data sources as needed.

Utilize Pulsar's real-time processing capabilities. Pulsar allows you to perform real-time transformations and enrichments on your data streams, making it a powerful tool for building data pipelines that need to process and analyze data in near real-time.

Integrate Pulsar with data warehouses like Snowflake. Pulsar can be integrated with data warehouses like Snowflake, providing fast and efficient data ingestion and allowing you to perform advanced analytics on your data.

Take advantage of Pulsar's scalability and fault-tolerance. Pulsar is designed to be scalable and fault-tolerant, allowing you to handle large volumes of data and maintain high availability even in the face of failures.

Use Pulsar's built-in security features. Pulsar offers a range of security features, including encryption, authentication, and authorization, to help protect your data and ensure compliance.

Optimize your pipeline for high throughput and low latency. Pulsar is optimized for high throughput and low latency, making it ideal for building data pipelines that need to handle high volumes of data with minimal delays.

Monitor and manage your pipeline with Pulsar's management tools. Pulsar offers a range of management tools, including monitoring and alerting, to help you manage and optimize your pipeline.

Stay up-to-date with the latest Pulsar features and best practices. Pulsar is an actively developed and supported open-source project, with new features and best practices being added regularly. Make sure to stay up-to-date with the latest developments to ensure your pipeline is running at its best.

Join the Pulsar community and seek out resources and support. The Pulsar community is a wealth of knowledge and resources, with a vibrant user and developer community, documentation, and support resources available. Don't hesitate to reach out and ask for help or share your experiences with others.

In conclusion, Apache Pulsar is a powerful tool for building scalable and performant data pipelines. By following these tips, you can build data pipelines that can handle large volumes of data, scale with your organization's needs, and provide fast and efficient data ingestion and processing.

10 Tips for Building Scalable and Performant Data Pipelines with Apache Pulsar

10 Tips for Building Scalable and Performant Data Pipelines with Apache Pulsar

Introduction:



1) Data pipelines play a crucial role in modern organizations, allowing them to efficiently move and process large volumes of data from various sources to a central repository for analysis and reporting. Choosing the right technology to build these pipelines is crucial for ensuring their scalability, performance, and reliability.

2) Apache Pulsar, the distributed publish-subscribe messaging system, is a popular choice for building data pipelines due to its ability to handle high volumes of streaming data in real-time and its scalability and fault-tolerance. In this article, we'll provide you with 10 tips for building scalable and performant data pipelines with Apache Pulsar.

3) Choose the right data sources for your pipeline: The first step in building a data pipeline is identifying the sources of data that you want to ingest. Pulsar can stream data from a variety of sources, including social media feeds, IoT devices, and more. Choose sources that align with your organization's goals and needs.

4) Use Pulsar's publish-subscribe messaging model to your advantage: Pulsar uses a publish-subscribe messaging model, which allows you to easily ingest and process data streams in real-time. Leverage this model to your advantage by using Pulsar to stream data from multiple sources and process it as it arrives.

5) Utilize Pulsar's real-time processing capabilities: One of the key benefits of Pulsar is its ability to process data streams in real-time. Use this capability to your advantage by building pipelines that can handle high volumes of data and provide near-instantaneous insights.

6) Integrate Pulsar with data warehouses like Snowflake: Pulsar can be integrated with data warehouses like Snowflake to provide fast and efficient data ingestion. This allows you to store and query large volumes of data, enabling real-time analytics and decision making.

7) Take advantage of Pulsar's scalability and fault-tolerance: Pulsar is designed to be scalable and fault-tolerant, allowing it to handle large volumes of data without performance degradation. Use Pulsar's built-in features, such as partitioning and load balancing, to ensure that your pipeline can scale with your organization's needs.

8) Use Pulsar's built-in security features: Pulsar provides a variety of built-in security features, including encryption and authentication, to help protect your data. Make sure to utilize these features to ensure the security and privacy of your data.

9) Optimize your pipeline for high throughput and low latency: Pulsar is designed to provide high throughput and low latency, making it well-suited for real-time streaming applications. Optimize your pipeline by choosing the right hardware and tuning your configuration to maximize throughput and minimize latency.

10) Monitor and manage your pipeline with Pulsar's management tools: Pulsar provides a variety of management tools, such as the Pulsar Manager and Prometheus, to help you monitor and manage your pipeline. Use these tools to ensure that your pipeline is running smoothly and to identify and resolve any issues.

Stay up-to-date with the latest Pulsar features and best practices: Pulsar is an active open-source project with a strong community. Stay up-to-date with the latest features and best practices by following the Pulsar mailing list, joining the Pulsar Slack community, and attending Pulsar meetups and conferences.

Join the Pulsar community and seek out resources and support:

StreamNative


Updated Youtube Channel for Streaming

 https://www.youtube.com/@FLiP-Stack



Building Modern Data Streaming Apps - Pulsar Summit Asia 2022

Princeton, NJ - Apache Pulsar Meetup - 15-Dec-2022 - Flink + Pulsar + NiFi

Building Real-Time Requires a Team: Tim Spann, Developer Advocate @ Stre...

Timothy Spann: Apache Pulsar for ML

Pulsar Encryption

Last FLiP Stack Weekly of 2022

 

Last FLiP Stack Weekly of 2022

31-Dec-2022

FLiP Stack Weekly

New Stuff

PODCAST

CODE + COMMUNITY

ARTICLES

CODE

FREE TRAINING

VIDEOS

TOOLS

Schedule 2023