Discord Integration with Apache NiFi

Discord Integration with Apache NiFi



For Slack, it's easy and built-in but sometimes as I was reminded you may want to send messages elsewhere.   Thanks to Brian Stitt for the starting information.

I created a free Discord Server in a few seconds and was ready to send messages.


You then copy the webhook URL and you are ready to call.   It's a long SSL link, just past that into the InvokeHTTP processor.

In our flow we need to create an SSL Context for NiFi.   If you are using NiFi in a CDP DataHub there's one for you already configured!

The output will not have a body but there will be cool headers if you want to save them or use them for processing.   Otherwise wrap this in a process group and you have an alert system of your own.


Before we call the InvokeHTTP we need to set the username of the bot.


Your message must be encoded as JSON like so:

{
"content":  "This is my message"
}

Let's call the hook with our data every field is important.

HTTP Method:   POST
Remote URL:  https://discord.com ... our copied webhooks link
SSL Context:  Create or use a StandardSSLContextService
Date Header:  False
Follow Redirecrs:  True
Use Digest: False
Always Output Response:  True
Add Response Headers to Request:  False
Content-Type:   application/json
Send Message Body: true
Chunk Encoding:  False
Ignore responses content: false




Here are the results:

We can see the output of the call to Discord Webhook HTTPS REST Endpoint:




We can see our bot, Spidey Bot, has posted some exciting test messages from NiFi.

For Source Code:

References:


No More Spaghetti Flows

Spaghetti Flows




You may have heard of:   https://en.wikipedia.org/wiki/Spaghetti_code.   For Apache NiFi, I have seen some (and have done some of them in the past), I call them Spaghetti Flows.


Let's avoid them.   When you are first building a flow it often meanders and has lots of extra steps and extra UpdateAttributes and random routes. This applies if you are running on-premise, in CDP or in other stateful NiFi clusters (or single nodes). The following video from Mark Payne is a must watch before you write any NiFi flows.


Apache NiFi Anti-Patterns with Mark Payne


https://www.youtube.com/watch?v=RjWstt7nRVY

https://www.youtube.com/watch?v=v1CoQk730qs

https://www.youtube.com/watch?v=JbUjYr6Kd3I

https://github.com/tspannhw/EverythingApacheNiFi 



Do Not:

  • Do not Put 1,000 Flows on one workspace.

  • If your flow has hundreds of steps, this is a Flow Smell.   Investigate why.

  • Do not Use ExecuteProcess, ExecuteScripts or a lot of Groovy scripts as a default, look for existing processors

  • Do not Use Random Custom Processors you find that have no documentation or are unknown.

  • Do not forget to upgrade, if you are running anything before Apache NiFi 1.10, upgrade now!

  • Do not run on default 512M RAM.

  • Do not run one node and think you have a highly available cluster.

  • Do not split a file with millions of records to individual records in one shot without checking available space/memory and back pressure.

  • Use Split processors only as an absolute last resort. Many processors are designed to work on FlowFiles that contain many records or many lines of text. Keeping the FlowFiles together instead of splitting them apart can often yield performance that is improved by 1-2 orders of magnitude.


Do:

  • Reduce, Reuse, Recycle.    Use Parameters to reuse common modules.

  • Put flows, reusable chunks (write to Slack, Database, Kafka) into separate Process Groups.

  • Write custom processors if you need new or specialized features

  • Use Cloudera supported NiFi Processors

  • Use RecordProcessors everywhere

  • Read the Docs!

  • Use the NiFi Registry for version control.

  • Use NiFi CLI and DevOps for Migrations.

  • Run a CDP NiFi Datahub or CFM managed 3 or more node cluster.

  • Walk through your flow and make sure you understand every step and it’s easy to read and follow.   Is every processor used?   Are there dead ends?

  • Do run Zookeeper on different nodes from Apache NiFi.

  • For Cloud Hosted Apache NiFi - go with the "high cpu" instances, such as 8 cores, 7 GB ram.

  • same flow 'templatized' and deployed many many times with different params in the same instance

  • Use routing based on content and attributes to allow one flow to handle multiple nearly identical flows is better than deploying the same flow many times with tweaks to parameters in same cluster.

  • Use the correct driver for your database.   There's usually a couple different JDBC drivers.

  • Make sure you match your Hive version to the NiFi processor for it.   There are ones out there for Hive 1 and Hive 3!   HiveStreaming needs Hive3 with ACID, ORC.  https://community.cloudera.com/t5/Support-Questions/how-to-use-puthivestreaming/td-p/108430


Let's revisit some Best Practices:


https://medium.com/@abdelkrim.hadjidj/best-practices-for-using-apache-nifi-in-real-world-projects-3-takeaways-1fe6912101db


Get your Apache NiFi for Dummies.   My own NiFi 101.


Here are a few things you should have read and tried before building your first Apache NiFi flow:

Also when in doubt, use Records!  Use Record Processors and use pre-defined schemas, this will be easier to develop, cleaner and more performant. Easier, Faster, Better!!!


There are record processors for Logs (Grok), JSON, AVRO, XML, CSV, Parquet and more.


Look for a processor that has “Record” in the name like PutDatabaseRecord or QueryRecord.


Use the best DevOps processes, testing and tools.

Some newer features in 1.8, 1.9, 1.10, 1.11 that you need to use.

Advanced Articles:

Spaghetti is for eating, not for real-time data streams.   Let's keep it that way.


If you are not sure what to do check out the Cloudera Community, NiFi Slack or the NiFi docs.   Also I may have a helpful article here. Join me and my NiFi friends at virtual meetups for more in-depth NiFi, Flink, Kafka and more. We keep it interactive so you can feel free to ask questions.


Note:   In this picture I am in Italy doing spaghetti research.


Commonly Used TCP/IP Ports in Streaming

Cloudera CDF and HDF Ports
NiFi and Friends
FLaNK Extended Stack


Note: 

All of these ports can be changed by administrators or in version updates.   Also if you are running Apache Knox like in Cloudera Data Platform Public Cloud, these ports may be changed or hidden.   This is just based on a version of CDF I am running and defaults in.   This does not include standard Cloudera ports for Cloudera Manager, Hadoop, Atlas, Ranger and other necessary and fun services.


Cloudera Flow Management (CFM Powered by Apache NiFi)
  • Cloudera NiFi HTTP:    8080 or 9090
  • Cloudera NiFi HTTPS:  8443 or 9443
  • Cloudera NiFi RIP Socket: 10443 or 50999
  • Cloudera NiFi Node Protocol: 11443
  • Cloudera NiFi Load Balancing:  6342
  • Cloudera NiFi Registry: 18080
  • Cloudera NiFi Registry SSL: 18433
  • Cloudera NiFi Certificate Authority:  10443

Cloudera Edge Flow Management (CEM Powered by Apache NiFi - MiNiFi)

  • Cloudera EFM HTTP:  10080
  • Cloudera EFM CoAP:  8989

Cloudera Stream Processing (CSP Powered by Apache Kafka)
  • Cloudera Kafka: 9092
  • Cloudera Kafka SSL:  9093
  • Cloudera Kafka Connect:  38083
  • Cloudera Kafka Connect SSL:  38085
  • Cloudera Kafka Jetty Metrics: 38084
  • Cloudera Kafka JMX: 9393
  • Cloudera Kafka MirrorMaker JMX: 9394
  • Cloudera Kafka HTTP Metric: 24042
  • Cloudera Schema Registry Registry: 7788
  • Cloudera Schema Registry Admin: 7789
  • Cloudera Schema Registry SSL:  7790
  • Cloudera Schema Registry Admin SSL:  7791
  • Cloudera Schema Registry Database (Postgresql):  5432
  • Cloudera SRM:  6669
  • Cloudera RPC: 8081
  • Cloudera SRM Rest: 6670
  • Cloudera SRM Rest SSL:  6671
  • Cloudera SMM Rest / UI: 9991
  • Cloudera SMM Manager:  8585
  • Cloudera SMM Manager SSL:  8587
  • Cloudera SMM Manager Admin:  8586
  • Cloudera SMM Manager Admin SSL: 8588
  • Cloudera SMM Service Monitor:  9997
  • Cloudera SMM Kafka Connect:  38083
  • Cloudera SMM Database (Postgresql):  5432

Cloudera Streaming Analytics (CSA Powered by Apache Flink)
  • Cloudera Flink Dashboard:  8082



References



Cloudera Edge Management 1.1.0 Release

Let's Query Kafka with Hive

Let's Query Kafka with Hive


I can hop into beeline and build an external Hive table to access my Cloudera CDF Kafka cluster whether it is in the public cloud in CDP DataHub, on-premise in HDF or CDF or in CDP-DC.

I just have to set my KafkaStorageHandler, Kafka Topic Name and my bootstrap servers (usually port 9092).   Now I can use that table to do ELT/ELT for populating Hive tables or populating Kafka topics from Hive tables.   This is a nice and easy way to do data engineering on the quick and easy.

This is a good item to augment CDP Data Engineering with Spark, CDP DataHub with NiFi, CDP DataHub with Kafka and KafkaStreams and various SQOOP or Python utilities you may have in your environment.

For real-time continuous queries on Kafka with SQL, you can use Flink SQL.  https://www.datainmotion.dev/2020/05/flank-low-code-streaming-populating.html



Example Table Create

CREATE EXTERNAL TABLE <tableName>
  (`uuid` STRING, `systemtime` STRING , `temperaturef` STRING , `pressure` DOUBLE,`humidity` DOUBLE, `lux` DOUBLE, `proximity` int, `oxidising` DOUBLE , `reducing` DOUBLE, `nh3` DOUBLE , `gasko` STRING,`current` INT, `voltage` INT ,`power` INT, `total` INT,`fanstatus` STRING)
  STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
  TBLPROPERTIES
  ("kafka.topic" = "<TopicName>", 
  "kafka.bootstrap.servers"="<ServerName>:9092");

show tables;

describe extended kafka_table;

select *
from kafka_table;

I can browse my Kafka topics with Cloudera SMM to see what the data is and why I want to load or need to load.



For more information take a look at the documentation for Integrating Hive and Kafka at Cloudera below:



Cloudera Flow Management 101: Let's Build a Simple REST Ingest to Cloud Datawarehouse With LowCode. Powered by Apache NiFi




Use NiFi to call REST API, transform, route and store the data


Pick any REST API of your choice, but I have walked through this one to grab a number of weather stations reports.  Weather or not we have good weather, we can query it anyway.

workshopoverview


We are going to build a GenerateFlowFile to feed our REST calls.


generateflowfile
[
{"url":"http://weather.gov/xml/current_obs/CWAV.xml"},
{"url":"http://weather.gov/xml/current_obs/KTTN.xml"},
{"url":"http://weather.gov/xml/current_obs/KEWR.xml"},
{"url":"http://weather.gov/xml/current_obs/KEWR.xml"},
{"url":"http://weather.gov/xml/current_obs/CWDK.xml"},
{"url":"http://weather.gov/xml/current_obs/CWDZ.xml"},
{"url":"http://weather.gov/xml/current_obs/CWFJ.xml"},
{"url":"http://weather.gov/xml/current_obs/PAEC.xml"},
{"url":"http://weather.gov/xml/current_obs/PAYA.xml"},
{"url":"http://weather.gov/xml/current_obs/PARY.xml"},
{"url":"http://weather.gov/xml/current_obs/K1R7.xml"},
{"url":"http://weather.gov/xml/current_obs/KFST.xml"},
{"url":"http://weather.gov/xml/current_obs/KSSF.xml"},
{"url":"http://weather.gov/xml/current_obs/KTFP.xml"},
{"url":"http://weather.gov/xml/current_obs/CYXY.xml"},
{"url":"http://weather.gov/xml/current_obs/KJFK.xml"},
{"url":"http://weather.gov/xml/current_obs/KISP.xml"},
{"url":"http://weather.gov/xml/current_obs/KLGA.xml"},
{"url":"http://weather.gov/xml/current_obs/KNYC.xml"},
{"url":"http://weather.gov/xml/current_obs/KJRB.xml"}
]

So we are using ${url} which will be one of these. Feel free to pick your favorite airports or locations near you. https://w1.weather.gov/xml/current_obs/index.xml

If you wish to choose your own data adventure, you can pick one of these others. You will have to build your own table if you wish to store it. They return CSV, JSON or XML, since we have record processors we don’t care. Just know which you pick.

Then we will use SplitJSON to split the JSON records into single rows.

splitjson

Then use EvaluateJSONPath to extract the URL.

evaluatejsonpath2

Now we are going to call those REST URLs with InvokeHTTP.

You will need to create a Standard SSL controller.

enablessl
standardSSL
sslcontext


This is the default JDK JVM on Mac or some Centos 7.   You may have a real password, if so you are awesome.   If you don't know it, that's rough.   You can build a new one with SSL.

For more cloud ingest fun, https://docs.cloudera.com/cdf-datahub/7.1.0/howto-data-ingest.html.

SSL Defaults (In CDP Datahub, one is built for you automagically, thanks Michael).

Truststore filename: /usr/lib/jvm/java-openjdk/jre/lib/security/cacerts 

Truststore password: changeit 

Truststore type: JKS 

TLS Protocol: TLS


StandardSSLContextService for Your GET ${url}

invokehttp



We can tweak these defaults.
invokehttp2

Then we are going to run a query to convert these and route based on our queries.

Example query on the current NOAA weather observations to look for temperature in fareneheit below 60 degrees. You can make a query with any of the fields in the where cause. Give it a try!

queryRecord


You will need to set the Record Writer and Record Reader:

Record Reader: XML 

Record Writer: JSON


jsonwriter
SELECT * FROM FLOWFILE
WHERE temp_f <= 60
SELECT * FROM FLOWFILE

Now we are splitting into three concurrent paths. This shows the power of Apache NiFi. We will write to Kudu, HDFS and Kafka.

For the results of our cold path (temp_f ⇐60), we will write to a Kudu table.

putkudu


Kudu Masters: edge2ai-1.dim.local:7051 Table Name: impala::default.weatherkudu Record Reader: Infer Json Tree Reader Kudu Operation Type: UPSERT

Before you run this, go to Hue and build the table.


huechooseimpala
huecreateweatherkudu
CREATE TABLE weatherkudu
(`location` STRING,`observation_time` STRING, `credit` STRING, `credit_url` STRING, `image` STRING, `suggested_pickup` STRING, `suggested_pickup_period` BIGINT,
`station_id` STRING, `latitude` DOUBLE, `longitude` DOUBLE,  `observation_time_rfc822` STRING, `weather` STRING, `temperature_string` STRING,
`temp_f` DOUBLE, `temp_c` DOUBLE, `relative_humidity` BIGINT, `wind_string` STRING, `wind_dir` STRING, `wind_degrees` BIGINT, `wind_mph` DOUBLE, `wind_gust_mph` DOUBLE, `wind_kt` BIGINT,
`wind_gust_kt` BIGINT, `pressure_string` STRING, `pressure_mb` DOUBLE, `pressure_in` DOUBLE, `dewpoint_string` STRING, `dewpoint_f` DOUBLE, `dewpoint_c` DOUBLE, `windchill_string` STRING,
`windchill_f` BIGINT, `windchill_c` BIGINT, `visibility_mi` DOUBLE, `icon_url_base` STRING, `two_day_history_url` STRING, `icon_url_name` STRING, `ob_url` STRING, `disclaimer_url` STRING,
`copyright_url` STRING, `privacy_policy_url` STRING,
PRIMARY KEY (`location`, `observation_time`)
)
PARTITION BY HASH PARTITIONS 4
STORED AS KUDU
TBLPROPERTIES ('kudu.num_tablet_replicas' = '1');

Let it run and query it.   Kudu table queried via Impala, try it in Hue.

huequeryweatherkudu


The Second fork is to Kafka, this will be for the 'all' path.


publishKafka


Kafka Brokers: edge2ai-1.dim.local:9092 Topic: weather Reader & Writer: reuse the JSON ones

The Third and final fork is to HDFS (could be ontop of S3 or Blob Storage) as Apache ORC files. This will also autogenerate the DDL for an external Hive table as an attribute, check your provenance after running.

mergerecord


JSON in and out for record readers/writers, you can adjust the time and size of your batch or use defaults.

putorc
putorc1
putorc2


Hadoop Config: /etc/hadoop/conf/hdfs-site.xml,/etc/hadoop/conf/core-site.xml Record Reader: Infer Json Directory: /tmp/weather Table Name: weather

Before we run, build the /tmp/weather directory in HDFS and give it 777 permissions. We can do this with Apache Hue.


createhdfsdir
changepermissionshdfsdir

Once we run we can get the table DDL and location:

putOrcProvenanceWeather


Go to Hue to create your table.


huetohive
CREATE EXTERNAL TABLE IF NOT EXISTS `weather`
(`credit` STRING, `credit_url` STRING, `image` STRUCT<`url`:STRING, `title`:STRING, `link`:STRING>, `suggested_pickup` STRING, `suggested_pickup_period` BIGINT,
`location` STRING, `station_id` STRING, `latitude` DOUBLE, `longitude` DOUBLE, `observation_time` STRING, `observation_time_rfc822` STRING, `weather` STRING, `temperature_string` STRING,
`temp_f` DOUBLE, `temp_c` DOUBLE, `relative_humidity` BIGINT, `wind_string` STRING, `wind_dir` STRING, `wind_degrees` BIGINT, `wind_mph` DOUBLE, `wind_gust_mph` DOUBLE, `wind_kt` BIGINT,
`wind_gust_kt` BIGINT, `pressure_string` STRING, `pressure_mb` DOUBLE, `pressure_in` DOUBLE, `dewpoint_string` STRING, `dewpoint_f` DOUBLE, `dewpoint_c` DOUBLE, `windchill_string` STRING,
`windchill_f` BIGINT, `windchill_c` BIGINT, `visibility_mi` DOUBLE, `icon_url_base` STRING, `two_day_history_url` STRING, `icon_url_name` STRING, `ob_url` STRING, `disclaimer_url` STRING,
`copyright_url` STRING, `privacy_policy_url` STRING)
STORED AS ORC
LOCATION '/tmp/weather'
weatherhdfslist

You can now use Apache Hue to query your tables and do some weather analytics. When we are upserting into Kudu we are ensuring no duplicate reports for a weather station and observation time.

select `location`, weather, temp_f, wind_string, dewpoint_string, latitude, longitude, observation_time
from weatherkudu
order by observation_time desc, station_id asc
select *
from weather
lab3flow


In Atlas, we can see the flow.

atlasTopic