Pulsar DevOps with Pulsar Shell and CLI Tools


Pulsar DevOps with Pulsar Shell and CLI Tools

Using Pulsar admin tools

Pulsar Shell

Using directory: /Users/tspann/.pulsar-shell
Welcome to Pulsar shell!
Service URL: pulsar://localhost:6650/
Admin URL: http://localhost:8080/

Type help to get started or try the autocompletion (TAB button).
Type exit or quit to end the shell session.

For Dev Ops and repeatable processes, this is a good example.

echo "
# Configure the pulsar1 cluster
config use pulsar1
# Create a tenant
admin tenants create chatgpt
# Create a new namespace
admin namespaces create chatgpt/articles
# Build a topic
admin topics create persistent://chatgpt/articles/genitems
# List topics
admin topics list public/default
> setup-shell.txt

./bin/pulsar-shell -f ./setup-shell.txt --fail-on-error

If you aren’t sure what you want to do, just hit “tab”.

If you aren’t sure of the full parameters required, just enter it empty.

admin topics create
Main parameters are required ("persistent://tenant/namespace/topic")

Create a non-partitioned topic.
Usage: create [options] persistent://tenant/namespace/topic
--metadata, -m
key value pair properties(a=a,b=b,c=c)

As you can see these are just simple version of pulsar-client, pulsar-admin and config.

Pulsar is a distributed messaging system that allows for the efficient exchange of data between different components of a system. Pulsar Shell is a command-line interface (CLI) tool that can be used to interact with a Pulsar cluster and perform various operations such as creating topics, publishing and consuming messages, and managing the overall cluster.

To start using Pulsar Shell, you will first need to have a Pulsar cluster set up and running. This can be done by following the instructions on the Pulsar website or by using a managed service such as Apache Pulsar on Amazon Web Services (AWS).

You can connect to a Pulsar machine (could be EC2, Docker, Kube Pods or local) and use the commands there.

Otherwise, download to your laptop.

wget https://archive.apache.org/dist/pulsar/pulsar-2.11.0/apache-pulsar-shell-2.11.0-bin.tar.gz
tar xzvf apache-pulsar-shell-2.11.0-bin.tar.gz
cd apache-pulsar-shell-2.11.0-bin.tar.gz

Once you have a Pulsar cluster up and running, you can start using Pulsar Shell by connecting to the cluster using the pulsar-shell command.

Pulsar CLI Tools

Below is how to use the other Pulsar tools, this was incorrectly generated by ChatGPT which did not know that pulsar-shell is not the same as pulsar-admin and pulsar-client command line tools.

Once you have a Pulsar cluster up and running, you can start using Pulsar CLI by connecting to the cluster using the pulsar-admin and pulsar-client commands. Seepulsar-admin command. This command takes the following format:

pulsar-admin --url http://<pulsar-broker-address>:<pulsar-broker-port>

Replace <pulsar-broker-address> and <pulsar-broker-port> with the address and port of your Pulsar cluster. For example, if your Pulsar cluster is running on localhost on port 6650, you would use the following command to connect:

pulsar-admin --url http://localhost:6650

Once connected to the Pulsar cluster, you can use various commands to interact with it. Some of the most commonly used commands include:

  • topics create: Creates a new topic on the cluster.
  • topics list: Lists all topics on the cluster.
  • topics delete: Deletes a topic from the cluster.
  • produce: Publishes a message to a topic.
  • consume: Consumes messages from a topic.
  • clusters list: Lists all clusters connected to the current cluster.
  • clusters unload: Unloads a cluster from the current cluster.

In addition to these basic commands, Pulsar CLI also provides a number of advanced features such as the ability to manage subscriptions, configure authentication and authorization, and collect metrics and statistics on the cluster.

Overall, Pulsar CLI is a powerful tool for managing and interacting with a Pulsar cluster. By following the steps outlined in this article, you should be able to start using Pulsar CLI to create topics, publish and consume messages, and manage your cluster effectively.

Another important feature of Pulsar CLI is the ability to manage the schema for topics. Schemas are used to define the structure of messages that are published and consumed on a topic. Pulsar supports different types of schemas such as Avro, JSON, and Protobuf.

To create a new topic with a schema, you can use the topics create command and specify the schema type and the schema definition. For example, the following command creates a topic named my-topic with an Avro schema:

pulsar-admin topics create my-topic --schema-type avro --schema-file my-schema.avsc

Where my-schema.avsc is the file containing the Avro schema definition.

Once the topic is created, you can use the produce command to publish messages to the topic and consume command to consume messages from the topic. When publishing messages, you need to provide the message payload and the schema type, and when consuming messages, you will receive the message payload and the schema type.

It’s worth noting that Pulsar CLI also allows you to manage permissions, configure authentication and authorization, and collect metrics and statistics on the cluster.

For example, you can use the permissions grant command to grant a user or a role a set of permissions on a specific topic. And you can use the metrics command to collect various metrics about the cluster such as the number of messages published, the number of messages consumed, the number of active subscriptions, and so on.

In conclusion, Pulsar CLI is an essential tool for managing and interacting with a Pulsar cluster. It provides a wide range of functionality that allows you to create, publish, and consume messages, manage schemas, configure authentication and authorization, and collect metrics and statistics. By learning how to use Pulsar CLI, you can become proficient in managing and monitoring your Pulsar cluster and make sure that it runs smoothly.

Author Part 2ChatGPT

Prompt: Write me a full length article on how to start using Pulsar shell.

Prompt 2: continue

Mastodon Streaming to Apache Pulsar via Python


Mastodon Streaming to Pulsar via Python

Python, Pulsar, Mastodon, Streaming, Real-Time, Events, Filtering


Ideally, the next step is either Apache Flink SQL or Apache Pinot and then some analytics with say Apache SuperSet. I am debating what to do with this data. I may add a full data pipeline with multiple Pulsar functions or could run some ML in Flink. I could stream it to a Pulsar sink that stores it to DeltaLake or Apache Iceberg. It would be interesting to run some NLP or ML on it or even feed it to ChatGPT.

I asked ChatGPT what I should do.

How should I process Mastodon social feeds with Apache Pulsar?

Can I stream Mastodon streaming data to Pulsar and then to Flink SQL for real-time analytics? If so, can you build me a SQL application that does that and joins it with another stream that is relevant.

write an application that streams mastodon streaming events to Apache Pulsar and aggregates # of toots with Flink SQL

I’m sorry, I am not able to write a full-fledged application as it’s beyond the scope of my abilities and it would require a lot of resources and knowledge of your specific use-case and environment. However, I can provide you with a rough skeleton of how such an application could be built.

Here is a skeleton of a Flink job that streams Mastodon data from Pulsar and aggregates the number of toots using Flink SQL:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.table.api.scala._

object MastodonTootCount {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tEnv = StreamTableEnvironment.create(env, settings)
// Connect to the Pulsar source, consume data from the topic and convert it to a DataStream
val mastodonStream: DataStream[MastodonEvent] = env.addSource(new PulsarSource[MastodonEvent](...))
// Convert the DataStream to a Table
val mastodonTable: Table = tEnv.fromDataStream(mastodonStream)
// Register the table in the TableEnvironment
tEnv.registerTable("mastodon", mastodonTable)
// Define the SQL query to aggregate the number of toots
val tootCount: Table = tEnv.sqlQuery("SELECT COUNT(*) FROM mastodon WHERE event_type = 'toot'")
// Convert the result to a DataStream and print it
val tootCountStream: DataStream[Long] = tEnv.toRetractStream[Long](tootCount).map(_._2)
env.execute("Mastodon Toot Count")

This is just a skeleton, you would need to fill in the details such as Pulsar configuration, data mapping and the specific logic for your use case. It’s important to note that this example uses the Flink Table and SQL API and assumes that the data received from Mastodon is of type MastodonEvent and has an event_type field, also it only considers toots and doesn’t cover other possible events. It would be beneficial to have a more detailed understanding of Mastodon’s streaming API, Pulsar’s features, and Flink’s SQL capabilities before proceeding.

For now, I am happy to get and stream the data as it streams through Mastondon.social. As a test I am publishing content that meets my simple filter. It will be posted here https://mastodon.social/@PaaSDev

I saw an example of using NodeJS to stream Mastodon from Aiven and thought that looked fun. https://aiven.io/blog/mastodon-kafka-js I wanted to do this in Python as that is a good language for Data Processing, Machine Learning, Pulsar, Kafka, Spark and other things. Perhaps next time I will use JavaScript, Go, C# or Java. If you have a suggestion drop it in a comment.

Let’s look at the Python JSon Schema I setup for Pulsar.

class mastodondata(Record):
language = String()
created_at = String()
ts = Float()
uuid = String()
uri = String()
url = String()
favourites_count = Integer()
replies_count = Integer()
reblogs_count = Integer()
content = String()
username = String()
accountname = String()
displayname = String()
note = String()
followers_count = Integer()
statuses_count = Integer()

An example result consumed with the command-line client in Apache Pulsar. I should use Pulsar Shell next time.

----- got message -----
key:[20230117223725_f68a866e-1df1-4d5d-b977-eb151535f240], properties:[], content:{
"language": "en",
"created_at": "2023-01-17 22:37:25.347000+00:00",
"ts": 20230117223725.0,
"uuid": "20230117223725_f68a866e-1df1-4d5d-b977-eb151535f240",
"uri": "https://mastodon.social/users/PaaSDev/statuses/109706939291797415",
"url": "https://mastodon.social/@PaaSDev/109706939291797415",
"favourites_count": 0,
"replies_count": 0,
"reblogs_count": 0,
"content": "<p>I am working on an Apache Pulsar streaming application in Python to ingest mastodon messages. <a href=\"https://github.com/tspannhw/pulsar-mastodon-sink\" target=\"_blank\" rel=\"nofollow noopener noreferrer\"><span class=\"invisible\">https://</span><span class=\"ellipsis\">github.com/tspannhw/pulsar-mas</span><span class=\"invisible\">todon-sink</span></a></p>",
"username": "PaaSDev",
"accountname": "PaaSDev",
"displayname": "",
"note": "",
"followers_count": 0,
"statuses_count": 1

Let’s run the final application.

python3 stream.py

2023-01-17 17:28:17.197 INFO [0x16ec2b000] HandlerBase:72 | [persistent://public/default/mastodon-partition-0, ] Getting connection from pool
2023-01-17 17:28:17.200 INFO [0x16ec2b000] ProducerImpl:190 | [persistent://public/default/mastodon-partition-0, ] Created producer on broker [ ->]

The Python code is below.

import mastodon
from mastodon import Mastodon
from pprint import pprint
import requests
from bs4 import BeautifulSoup
import pulsar
from pulsar.schema import *
import time
import sys
import datetime
import subprocess
import sys
import os
from subprocess import PIPE, Popen
import traceback
import math
import base64
import json
from time import gmtime, strftime
import random, string
import psutil
import uuid
import json
import socket
import logging
from jsonpath_ng import jsonpath, parse
import re

#### Apache Pulsar
pulsarClient = pulsar.Client('pulsar://localhost:6650')

#### Schema Record
class mastodondata(Record):
language = String()
created_at = String()
ts = Float()
uuid = String()
uri = String()
url = String()
favourites_count = Integer()
replies_count = Integer()
reblogs_count = Integer()
content = String()
username = String()
accountname = String()
displayname = String()
note = String()
followers_count = Integer()
statuses_count = Integer()

#### Keywords to match
keywordList = ['apache spark','Apache Spark', 'Apache Pinot','flink','Flink','Apache Flink','kafka', 'Kafka', 'Apache Kafka', 'pulsar', 'Pulsar', 'datapipeline', 'real-time', 'real-time streaming', 'StreamNative', 'Confluent', 'RedPandaData', 'Apache Pulsar', 'streaming', 'Streaming', 'big data', 'Big Data']

#### Build our Regex
words_re = re.compile("|".join(keywordList))

#### Listener for Mastodon events

class Listener(mastodon.StreamListener):

def on_update(self, status):
if words_re.search(status.content):
pulsarProducer = pulsarClient.create_producer(topic='persistent://public/default/mastodon',
schema=JsonSchema(mastodondata), properties={"producer-name": "mastodon-py-strean","producer-id": "mastodon-producer" })
mastodonRec = mastodondata()
uuid_key = '{0}_{1}'.format(strftime("%Y%m%d%H%M%S",gmtime()),uuid.uuid4())
mastodonRec.language = status.language
mastodonRec.created_at = str(status.created_at)
mastodonRec.ts = float(strftime("%Y%m%d%H%M%S",gmtime()))
mastodonRec.uuid = uuid_key
mastodonRec.uri = status.uri
mastodonRec.url = status.url
mastodonRec.favourites_count = status.favourites_count
mastodonRec.replies_count = status.replies_count
mastodonRec.reblogs_count = status.reblogs_count
mastodonRec.content = status.content
mastodonRec.username = status.account.username
mastodonRec.accountname = status.account.acct
mastodonRec.displayname = status.account.display_name
mastodonRec.note = status.account.note
mastodonRec.followers_count = status.account.followers_count
mastodonRec.statuses_count = status.account.statuses_count
#producer.send('rp4-kafka-1', mastodonRec.encode('utf-8'))

def on_notification(self, notification):
# print(f"on_notification: {notification}")

api_base_url = 'https://mastodon.social'

mastodon = Mastodon(api_base_url='https://mastodon.social')

In the follow-up I will decide what to do with this data, where it should go and if it needs additional data fields. I think I may add some TCP/IP status data and perhaps join in some other streams in Flink. I think depending on the result of our NLP and Sentiment analytics we could add other data lookups dynamically.

Pulsar Commands

pulsar-admin topics create persistent://public/default/mastodon

pulsar-admin schemas get persistent://public/default/mastodon

pulsar-client consume "persistent://public/default/mastodon" -s "mreader2" -n 0