Real-time stream processing with Hazelcast and Pulsar

Real-time stream processing with Hazelcast and Pulsar


Introduction

One of the most useful features of real-time stream processing is to combine the strengths and advantages of various technologies to provide a unique developer experience and an efficient way of processing data in real time at scale. Hazelcast is a real-time distributed computation and storage platform for consistently low latency queries, aggregation and stateful computation against real-time event streams and traditional data sources. Apache Pulsar is a real-time multitenant geo-replicated distributed pub-sub messaging and streaming platform for real-time workloads handling millions of events per hour.  



However, real-time stream processing is not an easy task, especially when combining multiple live streams with large volumes of data stored in external data storages to provide context and instant results. When it comes to usage, Hazelcast can be used for stateful data processing over real-time streaming data, data at rest or a combination of both, querying streaming and batch data sources directly using SQL, distributed coordination for microservices, replicating data from one region to another or between data centres in the same region.

 

While Apache Pulsar can be used for both messaging and streaming use case taking the place of multiple products and provides a superset of their features.   Apache Pulsar is a cloud-native multitenant unified messaging platform to replace Apache Kafka, RabbitMQ, MQTT and legacy messaging platforms.   Apache Pulsar provides an infinite message bus for Hazelcast to act as an instant source and sink for any and all data sources.



Prerequisites

We’re building an application where we ingest data from Apache Pulsar into Hazelcast and then process it in real-time. To run this application, make sure your system has the following components:

 

-   Hazelcast installed on your system: we’re using CLI

-   Pulsar installed on your system: we’re using Docker

 

If you have macOS & Homebrew, you can install Hazelcast using the following command:

brew tap hazelcast/hz

brew install hazelcast@5.2.1

 

Check if Hazelcast is installed:

hz -V

 

 

Then start a local cluster:

hz start

 

You should see the following in the console:

INFO: [192.168.1.164]:5701 [dev] [5.2.1]

 

Members {size:1, ver:1} [

  Member [192.168.1.164]:5701 - 4221d540-e34e-4ff2-8ad3-41e060b895ce this

]

 

You can start Pulsar in Docker using the following command:

 

docker run -it -p 6650:6650 -p 8080:8080 \

    --mount source=pulsardata,target=/pulsar/data \

    --mount source=pulsarconf,target=/pulsar/conf \

    apachepulsar/pulsar:2.11.0 bin/pulsar standalone


To install Management Center, use one of the following methods, depending on your operating system:

 

brew tap hazelcast/hz

brew install hazelcast-management-center@5.2.1

 

Check that Management Center is installed:

hz-mc -V


Data collection:


For our application, we wish to ingest air quality readings from around the United States via the AirNow data provider.  If you wish to learn more about Air Quality, check out the information at AirNow.


Sourcehttps://docs.airnowapi.org/


With a simple Java application we make REST calls to the AirNow API that provides air quality reading for major zip codes around the United States.   The java application sends the JSON encoded AirNow data to the “airquality” Pulsar topic..   From this point a Hazelcast application can read it.  


Source:   https://github.com/tspannhw/spring-pulsar-airquality 


We also have a Java Pulsar function receiving each event from the “airquality” topic and parsing it into different topics based on which type of air quality reading it is.   This includes PM2.5, PM10 and Ozone.


Source: https://github.com/tspannhw/pulsar-airquality-function 


Example AirQuality Data


{"dateObserved":"2023-01-19 ","hourObserved":12,"localTimeZone":"EST","reportingArea":"Philadelphia","stateCode":"PA","latitude":39.95,"longitude":-75.151,"parameterName":"PM10","aqi":19,"category":{"number":1,"name":"Good","additionalProperties":{}},"additionalProperties":{}}


Example Ozone Data


{"dateObserved":"2023-01-19 ","hourObserved":12,"localTimeZone":"EST","reportingArea":"Philadelphia","stateCode":"PA","parameterName":"O3","latitude":39.95,"longitude":-75.151,"aqi":8}


Example PM10 Data


{"dateObserved":"2023-01-19 ","hourObserved":12,"localTimeZone":"EST","reportingArea":"Philadelphia","stateCode":"PA","parameterName":"PM10","latitude":39.95,"longitude":-75.151,"aqi":19}


Example PM2.5 Data

{"dateObserved":"2023-01-19 ","hourObserved":12,"localTimeZone":"EST","reportingArea":"Philadelphia","stateCode":"PA","parameterName":"PM2.5","latitude":39.95,"longitude":-75.151,"aqi":54}


Data processing

In order to process the data collected, we used the Hazelcast Pulsar connector module to ingest data from Pulsar topics (note: you can use the same connector to write to Pulsar topics). Using Hazelcast allows us to compute various aggregation functions (sum, avg etc.) in real time on a specified window of stream items. The Pulsar connector uses the Pulsar client library, which has two different ways of reading messages from a Pulsar topic. These are Consumer API and Reader API, both use the builder pattern (for more information click here).

 

In your pom file, import the following dependencies.

 

<dependency>

        <groupId>com.hazelcast</groupId>

        <artifactId>hazelcast</artifactId>

        <version>5.1.4</version>

    </dependency>

    <dependency>

        <groupId>com.hazelcast.jet.contrib</groupId>

        <artifactId>pulsar</artifactId>

        <version>0.1</version>

    </dependency>

    <dependency>

        <groupId>org.apache.pulsar</groupId>

        <artifactId>pulsar-client</artifactId>

        <version>2.10.1</version>

    </dependency>

 

We create a PulsarSources.pulsarReaderBuilder instance to connect to the previously started pulsar cluster located at pulsar://localhost:6650

 

StreamSource<Event>source = PulsarSources.pulsarReaderBuilder(

     topicName,

     () -> PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build(),

     () -> Schema.JSON(Event.class),

     Message::getValue).build();

 

We then create a pipeline to read from the source with a sliding window and aggregate count, before we write to logger:

 

Pipeline p = Pipeline.create();

p.readFrom(source)

 .withNativeTimestamps(0)

 .groupingKey(Event::getUser)

 .window(sliding(SECONDS.toMillis(60), SECONDS.toMillis(30)))

 .aggregate(counting())

 .writeTo(Sinks.logger(wr -> String.format(

      "At %s Pulsar got %,d messages in the previous minute from %s.",

      TIME_FORMATTER.format(LocalDateTime.ofInstant(

              Instant.ofEpochMilli(wr.end()), ZoneId.systemDefault())),

      wr.result(), wr.key()))); 

JobConfig cfg = new JobConfig()

     .setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE)

     .setSnapshotIntervalMillis(SECONDS.toMillis(1))

     .setName("pulsar-airquality-counter");

HazelcastInstance hz = Hazelcast.bootstrappedInstance();

hz.getJet().newJob(p, cfg);

 

 

 

You can run the previous code from your IDE (in this case, it will create its own Hazelcast member and run the job on it), or you can run this on the previously started Hazelcast member (in this case, you need to create a runnable JAR including all dependencies required to run it): 

mvn package

bin/hz-cli submit target/pulsar-example-1.0-SNAPSHOT.jar

 

To cancel the job and shut down the Hazelcast cluster:

bin/hz-cli cancel pulsar-message-counter

hz-stop

Conclusion

In this blog post, we demonstrated how you can combine the strengths and advantages of various technologies to provide a unique developer experience and an efficient way of processing data in real time at scale. We stream of air quality data from Apache Pulsar into Hazelcast, where we processed data in real time. The rising trend in cloud technologies, the need for real-time intelligent applications and the urgency to process data at scale have brought us to a new chapter of real-time stream processing, where latencies are measured, not in minutes but in milliseconds and submilliseconds. 


Hazelcast allows you to quickly build resource-efficient, real-time applications. You can deploy it at any scale from small edge devices to a large cluster of cloud instances. A cluster of Hazelcast nodes share both the data storage and computational load which can dynamically scale up and down. When you add new nodes to the cluster, the data is automatically rebalanced across the cluster, and currently running computational tasks (known as jobs) snapshot their state and scale with processing guarantees.  Pulsar allows you to use your choice of messaging protocols to quickly distribute events between multiple types of consumers and producers and act as a universal message hub.   Pulsar separates compute from storage allowing for dynamic scaling and efficient handling of fast data.   StreamNative is the company made up of the original creators of Apache Pulsar and Apache BookKeeper.   StreamNative provides a full enterprise experience for Apache Pulsar in the cloud and on premise.



More on Hazelcast

  • Learn the Hazelcast Fundamentals: Start a Local Cluster with the CLI or Docker.

  • Start a Viridian Serverless Cluster: Serverless is a managed cloud service that offers a pay-as-you-go pricing model. Serverless clusters auto-scale to provide the resources that your application needs. You pay only for the resources that your application consumes.

  • Join the Hazelcast Slack and Hazelcast Github repository.

More on Apache Pulsar

  • Learn the Pulsar Fundamentals: While this blog did not cover the Pulsar fundamentals, there are great resources available to help you learn more. If you are new to Pulsar, we recommend you to take the self-paced Pulsar courses or instructor-led Pulsar training developed by some of the original creators of Pulsar. This will get you started with Pulsar and accelerate your streaming immediately.

  • Spin up a Pulsar Cluster in Minutes: If you want to try building microservices without having to set up a Pulsar cluster yourself, sign up for StreamNative Cloud today. StreamNative Cloud is a simple, fast, and cost-effective way to run Pulsar in the public cloud.

  • Join the Apache Pulsar Slack

https://github.com/tspannhw/pulsar-hazelcast-airquality

https://github.com/tspannhw/spring-pulsar-airquality

https://github.com/tspannhw/pulsar-airquality-function 

More on the Authors

Tim Spann

Developer Advocate at StreamNative

https://twitter.com/paasdev 

https://github.com/tspannhw/SpeakerProfile/blob/main/README.md



Fawaz Ghali

Principal Developer Advocate at Hazelcast

https://www.linkedin.com/in/fawazghali/

https://twitter.com/FawazGhali








Pulsar DevOps with Pulsar Shell and CLI Tools

 

Pulsar DevOps with Pulsar Shell and CLI Tools

Using Pulsar admin tools

Pulsar Shell

bin/pulsar-shell
Using directory: /Users/tspann/.pulsar-shell
Welcome to Pulsar shell!
Service URL: pulsar://localhost:6650/
Admin URL: http://localhost:8080/

Type help to get started or try the autocompletion (TAB button).
Type exit or quit to end the shell session.

For Dev Ops and repeatable processes, this is a good example.

echo "
# Configure the pulsar1 cluster
config use pulsar1
# Create a tenant
admin tenants create chatgpt
# Create a new namespace
admin namespaces create chatgpt/articles
# Build a topic
admin topics create persistent://chatgpt/articles/genitems
# List topics
admin topics list public/default
"
> setup-shell.txt

./bin/pulsar-shell -f ./setup-shell.txt --fail-on-error

If you aren’t sure what you want to do, just hit “tab”.

If you aren’t sure of the full parameters required, just enter it empty.

admin topics create
Main parameters are required ("persistent://tenant/namespace/topic")

Create a non-partitioned topic.
Usage: create [options] persistent://tenant/namespace/topic
Options:
--metadata, -m
key value pair properties(a=a,b=b,c=c)

As you can see these are just simple version of pulsar-client, pulsar-admin and config.

Pulsar is a distributed messaging system that allows for the efficient exchange of data between different components of a system. Pulsar Shell is a command-line interface (CLI) tool that can be used to interact with a Pulsar cluster and perform various operations such as creating topics, publishing and consuming messages, and managing the overall cluster.

To start using Pulsar Shell, you will first need to have a Pulsar cluster set up and running. This can be done by following the instructions on the Pulsar website or by using a managed service such as Apache Pulsar on Amazon Web Services (AWS).

You can connect to a Pulsar machine (could be EC2, Docker, Kube Pods or local) and use the commands there.

Otherwise, download to your laptop.

wget https://archive.apache.org/dist/pulsar/pulsar-2.11.0/apache-pulsar-shell-2.11.0-bin.tar.gz
tar xzvf apache-pulsar-shell-2.11.0-bin.tar.gz
cd apache-pulsar-shell-2.11.0-bin.tar.gz

Once you have a Pulsar cluster up and running, you can start using Pulsar Shell by connecting to the cluster using the pulsar-shell command.

Pulsar CLI Tools

Below is how to use the other Pulsar tools, this was incorrectly generated by ChatGPT which did not know that pulsar-shell is not the same as pulsar-admin and pulsar-client command line tools.

Once you have a Pulsar cluster up and running, you can start using Pulsar CLI by connecting to the cluster using the pulsar-admin and pulsar-client commands. Seepulsar-admin command. This command takes the following format:

pulsar-admin --url http://<pulsar-broker-address>:<pulsar-broker-port>

Replace <pulsar-broker-address> and <pulsar-broker-port> with the address and port of your Pulsar cluster. For example, if your Pulsar cluster is running on localhost on port 6650, you would use the following command to connect:

pulsar-admin --url http://localhost:6650

Once connected to the Pulsar cluster, you can use various commands to interact with it. Some of the most commonly used commands include:

  • topics create: Creates a new topic on the cluster.
  • topics list: Lists all topics on the cluster.
  • topics delete: Deletes a topic from the cluster.
  • produce: Publishes a message to a topic.
  • consume: Consumes messages from a topic.
  • clusters list: Lists all clusters connected to the current cluster.
  • clusters unload: Unloads a cluster from the current cluster.

In addition to these basic commands, Pulsar CLI also provides a number of advanced features such as the ability to manage subscriptions, configure authentication and authorization, and collect metrics and statistics on the cluster.

Overall, Pulsar CLI is a powerful tool for managing and interacting with a Pulsar cluster. By following the steps outlined in this article, you should be able to start using Pulsar CLI to create topics, publish and consume messages, and manage your cluster effectively.

Another important feature of Pulsar CLI is the ability to manage the schema for topics. Schemas are used to define the structure of messages that are published and consumed on a topic. Pulsar supports different types of schemas such as Avro, JSON, and Protobuf.

To create a new topic with a schema, you can use the topics create command and specify the schema type and the schema definition. For example, the following command creates a topic named my-topic with an Avro schema:

pulsar-admin topics create my-topic --schema-type avro --schema-file my-schema.avsc

Where my-schema.avsc is the file containing the Avro schema definition.

Once the topic is created, you can use the produce command to publish messages to the topic and consume command to consume messages from the topic. When publishing messages, you need to provide the message payload and the schema type, and when consuming messages, you will receive the message payload and the schema type.

It’s worth noting that Pulsar CLI also allows you to manage permissions, configure authentication and authorization, and collect metrics and statistics on the cluster.

For example, you can use the permissions grant command to grant a user or a role a set of permissions on a specific topic. And you can use the metrics command to collect various metrics about the cluster such as the number of messages published, the number of messages consumed, the number of active subscriptions, and so on.

In conclusion, Pulsar CLI is an essential tool for managing and interacting with a Pulsar cluster. It provides a wide range of functionality that allows you to create, publish, and consume messages, manage schemas, configure authentication and authorization, and collect metrics and statistics. By learning how to use Pulsar CLI, you can become proficient in managing and monitoring your Pulsar cluster and make sure that it runs smoothly.

Author Part 2ChatGPT

Prompt: Write me a full length article on how to start using Pulsar shell.

Prompt 2: continue