Showing posts with label apache-kafka. Show all posts
Showing posts with label apache-kafka. Show all posts

Migrating Apache Flume Flows to Apache NiFi: Kafka Source to HTTP REST Sink and HTTP REST Source to Kafka Sink

Migrating Apache Flume Flows to Apache NiFi:  Kafka Source to HTTP REST Sink and HTTP REST Source to Kafka Sink


This is a simple use case of being a gateway between REST API and Kafka.   We can do a lot more than that in NiFi.  We can be a Kafka Consumer and Producer as well as POST REST calls and receive any REST calls on configurable ports.  All with No Code.

NiFi can act as a listener for HTTP Requests and provide HTTP Responses in a scriptable full Web Server mechanism with JETTY.   Or it can listen for HTTP REST calls on a port and route those files anywhere. https://community.cloudera.com/t5/Community-Articles/Parsing-Web-Pages-for-Images-with-Apache-NiFi/ta-p/248415 .  We can also do websockets https://community.cloudera.com/t5/Community-Articles/An-Example-WebSocket-Application-in-Apache-NiFi-1-1/ta-p/248598.  https://community.cloudera.com/t5/Community-Articles/Accessing-Feeds-from-EtherDelta-on-Trades-Funds-Buys-and/ta-p/248316

It is extremely easy to do this in NiFi.




Kafka Consumer to REST POST



HTTP REST to Kafka Producer




Full Monitoring on Apache NiFi






A Very Common Use Case:  Ingesting Stock Feeds From REST to Kafka



References

Updating Machine Learning Models At The Edge With Apache NiFi and MiNiFi

Updating Machine Learning Models At The Edge With Apache NiFi and MiNiFi

Yes, we have bidirectional communication with MiNiFi agents from Apache NiFi via Site-to-Site (S2S) over HTTPS.   This means I can push in anything I want to the agent, including commands, files and updates.

I can also transmit data to edge agents via MQTT, REST and Kafka amongst other options.


NiFi Ready To Send and Receive Messages From Other NiFi Nodes, Clusters and MiNiFi Agents


Our NiFi flow is consuming Kafka and MQTT Messages, as well as reading updated model files and generating integration test sensor data.



MiNiFi Agents Have Downloaded The Model and Anything Else We Send to It



It's Easy to Configure MQTT Message Consumption in CEM, we just need the broker (with port) and a topic to filter on if you wish.




To Listen For Files/Models You can easily add a REST End Point to Proxy in Data of Your Choice with or without SSL


Here's an example CURL script to test that REST API:

curl -d '{"key1":"value1", "key2":"value2"}' -H "Content-Type: application/json" -X POST http://ec2-3-85-54-189.compute-1.amazonaws.com:8899/upload


We can generate JSON IoT Style Data for Integration Tests with ease using GenerateFlowFile:


Let's grab updated models when they change from my Data Science server:


I can read Kafka messages and send them to MiNiFi agents as well.




So I pushed a TFLITE model, but ONNX, PMML, Docker or Pickle are all options.


Using Cloudera Streams Messaging Manager for Apache Kafka Monitoring, Management, Analytics and CRUD

Using Cloudera Streams Messaging Manager for Apache Kafka Monitoring, Management, Analytics and CRUD

SMM is powerful tool to work with Apache Kafka and provide you with monitoring, management, analytics and creating Kafka topics.   You will be able to monitor servers, brokers, consumers, producers, topics and messages.   You will also be able to easily build alerts based on various events that can occur with those entities.

From Cloudera Manager, we can now install and manage Kafka, SMM, NiFi and Hadoop services.


Let's create a Kafka topic, no command-line!





For a simple topic, we select Low size for replication factor of one and replica count of one.  We also set a cleanup policy of delete.



Let's create an alert.


For this one if the nifi-reader consumer group has a lag then send an email to me.


Let's browse our Kafka infrastructure in our AWS Cloudera Kafka cluster, so easy to navigate.



You can dive into a topic and see individual messages, see offsets, keys, values, timestamps and more.


Zoom into one message in a topic.


Let's analyze a topic's configuration.



The result of the alert we built is an email sent to me with this data:


Example Alert Sent

Notification id: 56d35dcc-8fc0-4c59-b70a-ccbd1bb35681,
Root resource name: nifi-reader,
Root resource type: CONSUMER,
Created timestamp: Thu Aug 22 18:42:41 UTC 2019 : 1566499361366,
Last updated timestamp: Thu Aug 22 18:42:41 UTC 2019 : 1566499361366, 
State: RAISED,

Message:
Alert policy : "ALERT IF ( CONSUMER (name="nifi-reader") CONSUMER_GROUP_LAG >= 100 )" has been evaluated to true Condition : "CONSUMER_GROUP_LAG>=100" has been evaluated to true for following CONSUMERS - CONSUMER = "nifi-reader" had following attribute values * CONSUMER_GROUP_LAG = 139



Software



  • CSP 2.1 
  • CDH 6.3.0
  • Cloudera Schema Registry 0.80
  • CFM 1.0.1.0
  • Apache NiFi Registry 0.3.0
  • Apache NiFi 1.9.0.1.0
  • JDK 1.8



Resources