Mastodon Streaming to Apache Pulsar via Python

https://medium.com/@tspann/mastodon-streaming-to-pulsar-via-python-be7538112023


Mastodon Streaming to Pulsar via Python

Python, Pulsar, Mastodon, Streaming, Real-Time, Events, Filtering

Sourcehttps://github.com/tspannhw/pulsar-mastodon-sink

Ideally, the next step is either Apache Flink SQL or Apache Pinot and then some analytics with say Apache SuperSet. I am debating what to do with this data. I may add a full data pipeline with multiple Pulsar functions or could run some ML in Flink. I could stream it to a Pulsar sink that stores it to DeltaLake or Apache Iceberg. It would be interesting to run some NLP or ML on it or even feed it to ChatGPT.

I asked ChatGPT what I should do.

How should I process Mastodon social feeds with Apache Pulsar?

Can I stream Mastodon streaming data to Pulsar and then to Flink SQL for real-time analytics? If so, can you build me a SQL application that does that and joins it with another stream that is relevant.

write an application that streams mastodon streaming events to Apache Pulsar and aggregates # of toots with Flink SQL

I’m sorry, I am not able to write a full-fledged application as it’s beyond the scope of my abilities and it would require a lot of resources and knowledge of your specific use-case and environment. However, I can provide you with a rough skeleton of how such an application could be built.

Here is a skeleton of a Flink job that streams Mastodon data from Pulsar and aggregates the number of toots using Flink SQL:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.table.api.scala._

object MastodonTootCount {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tEnv = StreamTableEnvironment.create(env, settings)
// Connect to the Pulsar source, consume data from the topic and convert it to a DataStream
val mastodonStream: DataStream[MastodonEvent] = env.addSource(new PulsarSource[MastodonEvent](...))
// Convert the DataStream to a Table
val mastodonTable: Table = tEnv.fromDataStream(mastodonStream)
// Register the table in the TableEnvironment
tEnv.registerTable("mastodon", mastodonTable)
// Define the SQL query to aggregate the number of toots
val tootCount: Table = tEnv.sqlQuery("SELECT COUNT(*) FROM mastodon WHERE event_type = 'toot'")
// Convert the result to a DataStream and print it
val tootCountStream: DataStream[Long] = tEnv.toRetractStream[Long](tootCount).map(_._2)
tootCountStream.print()
env.execute("Mastodon Toot Count")
}
}

This is just a skeleton, you would need to fill in the details such as Pulsar configuration, data mapping and the specific logic for your use case. It’s important to note that this example uses the Flink Table and SQL API and assumes that the data received from Mastodon is of type MastodonEvent and has an event_type field, also it only considers toots and doesn’t cover other possible events. It would be beneficial to have a more detailed understanding of Mastodon’s streaming API, Pulsar’s features, and Flink’s SQL capabilities before proceeding.

For now, I am happy to get and stream the data as it streams through Mastondon.social. As a test I am publishing content that meets my simple filter. It will be posted here https://mastodon.social/@PaaSDev

I saw an example of using NodeJS to stream Mastodon from Aiven and thought that looked fun. https://aiven.io/blog/mastodon-kafka-js I wanted to do this in Python as that is a good language for Data Processing, Machine Learning, Pulsar, Kafka, Spark and other things. Perhaps next time I will use JavaScript, Go, C# or Java. If you have a suggestion drop it in a comment.

Let’s look at the Python JSon Schema I setup for Pulsar.

class mastodondata(Record):
language = String()
created_at = String()
ts = Float()
uuid = String()
uri = String()
url = String()
favourites_count = Integer()
replies_count = Integer()
reblogs_count = Integer()
content = String()
username = String()
accountname = String()
displayname = String()
note = String()
followers_count = Integer()
statuses_count = Integer()

An example result consumed with the command-line client in Apache Pulsar. I should use Pulsar Shell next time.

----- got message -----
key:[20230117223725_f68a866e-1df1-4d5d-b977-eb151535f240], properties:[], content:{
"language": "en",
"created_at": "2023-01-17 22:37:25.347000+00:00",
"ts": 20230117223725.0,
"uuid": "20230117223725_f68a866e-1df1-4d5d-b977-eb151535f240",
"uri": "https://mastodon.social/users/PaaSDev/statuses/109706939291797415",
"url": "https://mastodon.social/@PaaSDev/109706939291797415",
"favourites_count": 0,
"replies_count": 0,
"reblogs_count": 0,
"content": "<p>I am working on an Apache Pulsar streaming application in Python to ingest mastodon messages. <a href=\"https://github.com/tspannhw/pulsar-mastodon-sink\" target=\"_blank\" rel=\"nofollow noopener noreferrer\"><span class=\"invisible\">https://</span><span class=\"ellipsis\">github.com/tspannhw/pulsar-mas</span><span class=\"invisible\">todon-sink</span></a></p>",
"username": "PaaSDev",
"accountname": "PaaSDev",
"displayname": "",
"note": "",
"followers_count": 0,
"statuses_count": 1
}

Let’s run the final application.

python3 stream.py

2023-01-17 17:28:17.197 INFO [0x16ec2b000] HandlerBase:72 | [persistent://public/default/mastodon-partition-0, ] Getting connection from pool
2023-01-17 17:28:17.200 INFO [0x16ec2b000] ProducerImpl:190 | [persistent://public/default/mastodon-partition-0, ] Created producer on broker [127.0.0.1:56776 -> 127.0.0.1:6650]
20230117222817.0

The Python code is below.

import mastodon
from mastodon import Mastodon
from pprint import pprint
import requests
from bs4 import BeautifulSoup
import pulsar
from pulsar.schema import *
import time
import sys
import datetime
import subprocess
import sys
import os
from subprocess import PIPE, Popen
import traceback
import math
import base64
import json
from time import gmtime, strftime
import random, string
import psutil
import uuid
import json
import socket
import logging
from jsonpath_ng import jsonpath, parse
import re


#### Apache Pulsar
pulsarClient = pulsar.Client('pulsar://localhost:6650')

#### Schema Record
class mastodondata(Record):
language = String()
created_at = String()
ts = Float()
uuid = String()
uri = String()
url = String()
favourites_count = Integer()
replies_count = Integer()
reblogs_count = Integer()
content = String()
username = String()
accountname = String()
displayname = String()
note = String()
followers_count = Integer()
statuses_count = Integer()

#### Keywords to match
keywordList = ['apache spark','Apache Spark', 'Apache Pinot','flink','Flink','Apache Flink','kafka', 'Kafka', 'Apache Kafka', 'pulsar', 'Pulsar', 'datapipeline', 'real-time', 'real-time streaming', 'StreamNative', 'Confluent', 'RedPandaData', 'Apache Pulsar', 'streaming', 'Streaming', 'big data', 'Big Data']

#### Build our Regex
words_re = re.compile("|".join(keywordList))

#### Listener for Mastodon events

class Listener(mastodon.StreamListener):

def on_update(self, status):
if words_re.search(status.content):
pulsarProducer = pulsarClient.create_producer(topic='persistent://public/default/mastodon',
schema=JsonSchema(mastodondata), properties={"producer-name": "mastodon-py-strean","producer-id": "mastodon-producer" })
mastodonRec = mastodondata()
uuid_key = '{0}_{1}'.format(strftime("%Y%m%d%H%M%S",gmtime()),uuid.uuid4())
mastodonRec.language = status.language
mastodonRec.created_at = str(status.created_at)
mastodonRec.ts = float(strftime("%Y%m%d%H%M%S",gmtime()))
mastodonRec.uuid = uuid_key
mastodonRec.uri = status.uri
mastodonRec.url = status.url
mastodonRec.favourites_count = status.favourites_count
mastodonRec.replies_count = status.replies_count
mastodonRec.reblogs_count = status.reblogs_count
mastodonRec.content = status.content
mastodonRec.username = status.account.username
mastodonRec.accountname = status.account.acct
mastodonRec.displayname = status.account.display_name
mastodonRec.note = status.account.note
mastodonRec.followers_count = status.account.followers_count
mastodonRec.statuses_count = status.account.statuses_count
print(mastodonRec.ts)
pulsarProducer.send(mastodonRec,partition_key=str(uuid_key))
pulsarProducer.flush()
#producer.send('rp4-kafka-1', mastodonRec.encode('utf-8'))
#producer.flush()

def on_notification(self, notification):
# print(f"on_notification: {notification}")
print("notification")

Mastodon.create_app(
'streamreader',
api_base_url = 'https://mastodon.social'
)

mastodon = Mastodon(api_base_url='https://mastodon.social')
mastodon.stream_public(Listener())

In the follow-up I will decide what to do with this data, where it should go and if it needs additional data fields. I think I may add some TCP/IP status data and perhaps join in some other streams in Flink. I think depending on the result of our NLP and Sentiment analytics we could add other data lookups dynamically.

Pulsar Commands


pulsar-admin topics create persistent://public/default/mastodon

pulsar-admin schemas get persistent://public/default/mastodon

pulsar-client consume "persistent://public/default/mastodon" -s "mreader2" -n 0

References

Building a Real-Time IoT Application with Apache Pulsar and Apache Pinot

 Building a Real-Time IoT Application with Apache Pulsar and Apache Pinot

pulsar-thermal-pinot

Apache Pulsar — Apache Pinot — Thermal Sensor Data

Meetup December 2022

https://www.meetup.com/new-york-city-apache-pulsar-meetup/events/289817171/

Source Code

https://github.com/tspannhw/pulsar-thermal-pinot

Access Docker Container

docker exec -it pinot-controller /bin/bash

Build a Schema From Data

docker exec -it pinot-controller bin/pinot-admin.sh JsonToPinotSchema \
-timeColumnName ts \
-metrics "temperature,humidity,co2,totalvocppb,equivalentco2ppm,pressure,temperatureicp,cputempf"\
-dimensions "host,ipaddress" \
-pinotSchemaName=thermal \
-jsonFile=/data/thermal.json \
-outputDir=/config

Consume Data in Pulsar

bin/pulsar-client consume "persistent://public/default/thermalsensors" -s "thrmlsnosconsumer" -n 0

DevOps Pulsar

curl http://localhost:8080/admin/v2/persistent/public/default
curl http://localhost:8080/admin/v2/persistent/public/default/thermalsensors-partition-0/stats

Data

{
"uuid": "thrml_qsx_20221121215610",
"ipaddress": "192.168.1.179",
"cputempf": 115,
"runtime": 0,
"host": "thermal",
"hostname": "thermal",
"macaddress": "e4:5f:01:7c:3f:34",
"endtime": "1669067770.6400402",
"te": "0.0005550384521484375",
"cpu": 4.5,
"diskusage": "102676.2 MB",
"memory": 9.7,
"rowid": "20221121215610_8e753591-cb7c-4e1c-886d-85cb3dba6c50",
"systemtime": "11/21/2022 16:56:15",
"ts": 1669067775,
"starttime": "11/21/2022 16:56:10",
"datetimestamp": "2022-11-21 21:56:14.404291+00:00",
"temperature": 27.9069,
"humidity": 24.89,
"co2": 698.0,
"totalvocppb": 0.0,
"equivalentco2ppm": 65535.0,
"pressure": 102048.65,
"temperatureicp": 82.0
}

Continuous Analytics with Flink SQL (Pulsar-Flink 1.15+ Connector)

Reference: https://github.com/tspannhw/pulsar-transit-function

CREATE CATALOG pulsar WITH (
'type' = 'pulsar-catalog',
'catalog-service-url' = 'pulsar://localhost:6650',
'catalog-admin-url' = 'http://localhost:8080'
);
SHOW CURRENT DATABASE;
SHOW DATABASES;
USE CATALOG pulsar;set table.dynamic-table-options.enabled = true;show databases;use `public/default`;SHOW TABLES;describe `thermalsensors`;show create table `thermalsensors`;
CREATE TABLE `pulsar`.`public/default`.`thermalsensors` (
`uuid` VARCHAR(2147483647) NOT NULL,
`ipaddress` VARCHAR(2147483647) NOT NULL,
`cputempf` INT NOT NULL,
`runtime` INT NOT NULL,
`host` VARCHAR(2147483647) NOT NULL,
`hostname` VARCHAR(2147483647) NOT NULL,
`macaddress` VARCHAR(2147483647) NOT NULL,
`endtime` VARCHAR(2147483647) NOT NULL,
`te` VARCHAR(2147483647) NOT NULL,
`cpu` FLOAT NOT NULL,
`diskusage` VARCHAR(2147483647) NOT NULL,
`memory` FLOAT NOT NULL,
`rowid` VARCHAR(2147483647) NOT NULL,
`systemtime` VARCHAR(2147483647) NOT NULL,
`ts` INT NOT NULL,
`starttime` VARCHAR(2147483647) NOT NULL,
`datetimestamp` VARCHAR(2147483647) NOT NULL,
`temperature` FLOAT NOT NULL,
`humidity` FLOAT NOT NULL,
`co2` FLOAT NOT NULL,
`totalvocppb` FLOAT NOT NULL,
`equivalentco2ppm` FLOAT NOT NULL,
`pressure` FLOAT NOT NULL,
`temperatureicp` FLOAT NOT NULL
) WITH (
'connector' = 'pulsar',
'topics' = 'persistent://public/default/thermalsensors',
'format' = 'json',
'admin-url' = 'http://localhost:8080',
'service-url' = 'pulsar://localhost:6650'
)
select * from thermalsensors;

Create Postgresql Table

CREATE TABLE "public"."thermalalerts" (
systemtime VARCHAR(256),
humidity FLOAT,
temperature FLOAT,
uuid VARCHAR(256),
co2 FLOAT,
datetimestamp VARCHAR(256),
rowid VARCHAR(256),
diskusage VARCHAR(256)
);

Delete Table and Delete Schema

curl -X DELETE "http://localhost:9000/tables/thermal?type=realtime" -H "accept: application/json"
curl -X DELETE "http://localhost:9000/schemas/thermal" -H "accept: application/json"

Add our schema

docker exec -it pinot-controller bin/pinot-admin.sh AddSchema   \
-schemaFile /config/thermalschema.json \
-exec

Add Table Via Swagger UI / Curl

curl -X POST "http://localhost:9000/tables" -H "accept: application/json" -H "Content-Type: application/json" -d "{ \"tableName\": \"thermal\", \"tableType\": \"REALTIME\", \"segmentsConfig\": { \"timeColumnName\": \"ts\", \"schemaName\": \"thermal\", \"replication\": \"1\", \"replicasPerPartition\": \"1\" }, \"ingestionConfig\": { \"batchIngestionConfig\": { \"segmentIngestionType\": \"APPEND\", \"segmentIngestionFrequency\": \"DAILY\" } }, \"tableIndexConfig\": { \"loadMode\": \"MMAP\", \"streamConfigs\": { \"streamType\": \"pulsar\", \"stream.pulsar.topic.name\": \"persistent://public/default/thermalsensors\", \"stream.pulsar.bootstrap.servers\": \"pulsar://Timothys-MBP:6650\", \"stream.pulsar.consumer.type\": \"lowlevel\", \"stream.pulsar.fetch.timeout.millis\": \"10000\", \"stream.pulsar.consumer.prop.auto.offset.reset\": \"largest\", \"stream.pulsar.consumer.factory.class.name\": \"org.apache.pinot.plugin.stream.pulsar.PulsarConsumerFactory\", \"stream.pulsar.decoder.class.name\": \"org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder\", \"realtime.segment.flush.threshold.rows\": \"0\", \"realtime.segment.flush.threshold.time\": \"1h\", \"realtime.segment.flush.threshold.segment.size\": \"5M\" } }, \"tenants\": {}, \"metadata\": {}}"

Defining Pulsar-Pinot Realtime Table

If you use stream.pulsar.consumer.prop.auto.offset.reset=smallest than it goes back earliest which can be a lot of data.

https://docs.pinot.apache.org/basics/data-import/pinot-stream-ingestion/apache-pulsar

This could be millions or billions of records.

Pinot Cluster

Query Console Table Schema

Query Console Table Information

Query Console SQL results

Adding a Realtime Table via REST AI / Swagger Docs

Apache Pinot Query

select systemtime, totalvocppb, temperature, cputempf, humidity, co2, equivalentco2ppm,
pressure, temperatureicp, ts, datetimestamp, cpu, diskusage, memory, rowid
from thermal
order by ts desc
limit 200

Superset + Pinot

Run this to initialize: https://github.com/kbastani/climate-change-analysis/blob/master/docker/docker-init.sh

Let’s Explore and Visualize Apache Pinot Data

Add a Database

Configure a Pinot dataset

Query and Validate the dataset

Create a new chart from that dataset

Save the chart to a dashboard

Build a dashboard of charts and markdown

Build more charts

Build more charts

Superset CSV Chart Extract

Video Preview

https://youtu.be/KMbTlmoDXXA

Pulsar to NiFi

NiFI to Postgresql

Postgresql Exploration

References