Migrating Apache Flume Flows to Apache NiFi: Kafka Source to HTTP REST Sink and HTTP REST Source to Kafka Sink

Migrating Apache Flume Flows to Apache NiFi:  Kafka Source to HTTP REST Sink and HTTP REST Source to Kafka Sink


This is a simple use case of being a gateway between REST API and Kafka.   We can do a lot more than that in NiFi.  We can be a Kafka Consumer and Producer as well as POST REST calls and receive any REST calls on configurable ports.  All with No Code.

NiFi can act as a listener for HTTP Requests and provide HTTP Responses in a scriptable full Web Server mechanism with JETTY.   Or it can listen for HTTP REST calls on a port and route those files anywhere. https://community.cloudera.com/t5/Community-Articles/Parsing-Web-Pages-for-Images-with-Apache-NiFi/ta-p/248415 .  We can also do websockets https://community.cloudera.com/t5/Community-Articles/An-Example-WebSocket-Application-in-Apache-NiFi-1-1/ta-p/248598.  https://community.cloudera.com/t5/Community-Articles/Accessing-Feeds-from-EtherDelta-on-Trades-Funds-Buys-and/ta-p/248316

It is extremely easy to do this in NiFi.




Kafka Consumer to REST POST



HTTP REST to Kafka Producer




Full Monitoring on Apache NiFi






A Very Common Use Case:  Ingesting Stock Feeds From REST to Kafka



References

Migrating Apache Flume Flows to Apache NiFi: SYSLOG to KAFKA

Migrating Apache Flume Flows to Apache NiFi:  SYSLOG  to KAFKA




This is a simple use case of being a smart gateway/proxy between SYSLOGand Kafka.   We can do a lot more than that in NiFi.  We can be a Kafka Consumer and Producer as well as read and parse all types of logs including SYSLOG.  We have a GrokReader for converting semistructured logs into manageable tabular style data with schemas.   Log->JSON/CSV/AVRO/PARQUET.  All with No Code.

It is extremely easy to do this in NiFi.

See:  Log Parsing:   https://www.datainmotion.dev/2019/08/migrating-apache-flume-flows-to-apache.html

Detailed Tutorials

https://blog.davidvassallo.me/2018/09/19/apache-nifi-from-syslog-to-elasticsearch/
https://community.cloudera.com/t5/Community-Articles/NiFi-Send-to-syslog/ta-p/248638
http://www.youritgoeslinux.com/impl/bigdata/nifi/syslog

Processors

https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.9.2/org.apache.nifi.processors.standard.ListenSyslog/

https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.9.2/org.apache.nifi.processors.standard.ParseSyslog/index.html



References

Migrating Apache Flume Flows to Apache NiFi: Twitter Source to Kafka Sink

Migrating Apache Flume Flows to Apache NiFi:  Twitter Source to Kafka Sink


Article 4 -  This.

This is a simple use case of pushing Tweets to Kafka.   We can do a lot more than that in NiFi.   If you see the referenced article I can do Deep Learning on Tweet Images, Run Sentiment Analysis, Query the Tweets in Stream, Send messages to email / slack based on certain criteria and retweet automagically.  All with No Code.

It is extremely easy to do this in NiFi.





Create a Kafka Topic To Send Tweets To


Example Tweet



Configure Your Connection to Twitter



Configure Kafka Producer



NiFi Flow to Send Two Twitter Sources To Kafka


Additional Apache NiFi Flow Details For Other Processing







Kafka Messages in Topic Sent From NiFi Producer 



Source Code


References

Migrating Apache Flume Flows to Apache NiFi: Kafka Source to Apache Parquet on HDFS

Migrating Apache Flume Flows to Apache NiFi: Kafka Source to Apache Parquet on HDFS


Article 3 - This

This is one possible simple, fast replacement for "Flafka".   I can read any/all Kafka topics, route and transform them with SQL and store them in Apache ORC, Apache Avro, Apache Parquet, Apache Kudu, Apache HBase, JSON, CSV, XML or compressed files of many types in S3, Apache HDFS, File Systems or anywhere you want to stream this data in Real-time.   Also with a fast easy to use Web UI.   Everything you liked doing in Flume but now easier and with more Source and Sink options.







Consume Kafka And Store to Apache Parquet


Kafka to Kudu, ORC, AVRO and Parquet 


With Apache 1.10 I can send those Parquet files anywhere not only HDFS.


JSON (or CSV or AVRO or ...) and Parquet Out

In Apache 1.10, Parquet has a dedicated reader and writer


Or I can use PutParquet



Create A Parquet Table and Query It









References

Migrating Apache Flume Flows to Apache NiFi: Kafka Source to HDFS / Kudu / File / Hive

Migrating Apache Flume Flows to Apache NiFi: Kafka Source to HDFS / Kudu / File / Hive


This is one possible simple, fast replacement for "Flafka".




Consume / Publish Kafka And Store to Files, HDFS, Hive 3.1, Kudu



Consume Kafka Flow 



 Merge Records And Store As AVRO or ORC


Consume Kafka, Update Records via Machine Learning Models In CDSW And Store to Kudu



Source:  Apache Kafka Topics


You enter a few parameters and start ingesting data with or without schemas.   Apache Flume had no Schema support.   Flume did not support transactions.



Sink:   Files




Storing to files in files systems, object stores, SFTP or elsewhere could not be easier.  Choose S3, Local File System, SFTP, HDFS or wherever.

Sink:   Apache Kudu / Apache Impala



Storing to Kudu/Impala (or Parquet for that manner could not be easier with Apache NiFi).


Sink:   HDFS for Apache ORC Files


When completes, the ConvertAvroToORC and PutHDFS build the Hive DDL for you!  You can build the tables automagically with Apache NiFi if you wish.

CREATE EXTERNAL TABLE IF NOT EXISTS iotsensors
(sensor_id BIGINT, sensor_ts BIGINT, is_healthy STRING, response STRING, sensor_0 BIGINT, sensor_1 BIGINT,
sensor_2 BIGINT, sensor_3 BIGINT, sensor_4 BIGINT, sensor_5 BIGINT, sensor_6 BIGINT, sensor_7 BIGINT, sensor_8 BIGINT,
sensor_9 BIGINT, sensor_10 BIGINT, sensor_11 BIGINT)
STORED AS ORC
LOCATION '/tmp/iotsensors'





Sink: Kafka

Publishing to Kafka is just as easy!  Push records with schema references or raw data.  AVRO or JSON, whatever makes sense for your enterprise.

Write to data easily with no coding and no changes or redeploys for schema or schema version changes.
 Pick a Topic and Stream Data While Converting Types


Clean UI and REST API to Manage, Monitor, Configure and Notify on Kafka




Other Reasons to Use Apache NiFi Over Apache Flume

DevOps with REST API, CLI, Python API

Schemas!   We not only work with semi-structured, structured and unstructured data.  We are schema and schema version aware for CSV, JSON, AVRO, XML, Grokked Text Files and more. https://community.cloudera.com/t5/Community-Articles/Big-Data-DevOps-Apache-NiFi-HWX-Schema-Registry-Schema/ta-p/247963

Flume Replacement Use Cases Implemented in Apache NiFi

Sink/Source:   JMS

Source:   Files/PDF/PowerPoint/Excel/Word  Sink:  Files

Source:  Files/CSV  Sink:   HDFS/Hive/Apache ORC

Source:  REST/Files/Simulator   Sink:  HBase, Files, HDFS.    ETL with Lookups.

Flume Replacement - Lightweight Open Source Agents


If you need to replace local Log to Kafka agents or anything to Kafka or anything to anything with routing, transformation and manipulation.   You can use Edge Flow Manager deployed MiNiFi Agents available in Java and C++ versions.

References

Tracking Air Quality with Apache NiFi, Cloudera Data Science Workbench, Pyspark and Parquet

Tracking Air Quality 

Indoor vs Outdoor

Using a few sensors on a MiniFi node we are able to generate some air quality sensor readings.
Data:
row['bme680_tempc'] = '{0:.2f}'.format(sensor.data.temperature)
row['bme680_tempf'] = '{0:.2f}'.format((sensor.data.temperature * 1.8) + 32)
row['bme680_pressure'] = '{0:.2f}'.format(sensor.data.pressure)
row['bme680_gas'] = '{0:.2f}'.format(gas)
row['bme680_humidity'] = '{0:.2f}'.format(hum)
row['bme680_air_quality_score'] = '{0:.2f}'.format(air_quality_score)
row['bme680_gas_baseline'] = '{0:.2f}'.format(gas_baseline)
row['bme680_hum_baseline'] = '{0:.2f}'.format(hum_baseline)
See Part 1:
Newark / NYC Hazecam
Example
{"bme680_air_quality_score": "82.45", "uuid": "20190131191921_59c5441c-47b4-4f6f-a6d6-b3943bc9cf2b", "ipaddress": "192.168.1.166", "bme680_gas_baseline": 367283.28, "bme680_pressure": "1024.51", "bme680_hum_baseline": 40.0, "memory": 11.7, "end": "1548962361.4146328", "cputemp": 47, "host": "piups", "diskusage": "9992.7", "bme680_tempf": "87.53", "te": "761.2184100151062", "starttime": "01/31/2019 14:06:40", "systemtime": "01/31/2019 14:19:21", "bme680_humidity": "13.22", "bme680_tempc": "30.85", "bme680_gas": "363274.92"}
{
"end" : "1548967753.7064438",
"host" : "piups",
"diskusage" : "9990.4",
"cputemp" : 47,
"starttime" : "01/31/2019 15:44:11",
"bme680_hum_baseline" : "40.00",
"bme680_humidity" : "13.23",
"ipaddress" : "192.168.1.166",
"bme680_tempc" : "30.93",
"te" : "301.96490716934204",
"bme680_air_quality_score" : "83.27",
"systemtime" : "01/31/2019 15:49:13",
"bme680_tempf" : "87.67",
"bme680_gas_baseline" : "334942.60",
"uuid" : "20190131204913_4984a635-8dcd-408a-ba23-c0d225ba2d86",
"bme680_pressure" : "1024.69",
"memory" : 12.6,
"bme680_gas" : "336547.19"
}
Outdoor air quality
https://community.cloudera.com/t5/Community-Articles/Tracking-Air-Quality-with-HDP-and-HDF-Part-1-Apache-NiFi/ta-p/248265

https://openweathermap.org/api/pollution/co

https://airquality.weather.gov/probe_aq_data.php?city=hightstown&state=NJ&Submit=Get+Guidance

http://feeds.enviroflash.info/rss/realtime/445.xml

http://feeds.enviroflash.info/cap/aggregate.xml

http://www.airnowapi.org/aq/forecast/zipCode/?format=application/json&zipCode=08520&date=2019-09-05&distance=25&API_KEY=code

https://docs.airnowapi.org/webservices

http://www.airnowapi.org/aq/observation/zipCode/current/?format=application/json&zipCode=08520&distance=50&API_KEY=

code


https://api.openaq.org/v1/measurements?country=US&date_from=2018-05-04

https://api.openaq.org/v1/latest?country=US

http://www.airnowapi.org/aq/observation/zipCode/current/?format=application/json&zipCode=08520&distance=25&API_KEY=code









Flight Data

https://community.cloudera.com/t5/Community-Articles/Ingesting-Flight-Data-ADS-B-USB-Receiver-with-Apache-NiFi-1/ta-p/247940

Air Traffic Overhead

https://opensky-network.org/api/states/all?lamin=40.270599&lomin=-74.522430&lamax=40.270599&lomax=-74.522430

http://scorecard.goodguide.com/about/txt/data.html

https://www.epa.gov/visibility

https://www.airnow.gov/

https://www.state.nj.us/dep/daq/

http://www.nynjpollen.com/

http://www.njaqinow.net/

https://www.fsvisimages.com/descriptions.aspx

https://www.datainmotion.dev/2019/03/iot-series-sensors-utilizing-breakout_74.html

https://github.com/tspannhw/minifi-breakoutgarden/blob/master/aqminifi.py





Monitoring Cloudera Edge Flow Manager and Cloudera Streams Messaging Manager with Apache NiFi


Monitoring Cloudera Edge Flow Manager and Cloudera Streams Messaging Manager with Apache NiFi


http://SERVER:10080/efm/actuator/health

{"status":{"code":"UP","description":""},"details":{"db":{"status":{"code":"UP","description":""},"details":{"database":"MySQL","hello":1}},"diskSpace":{"status":{"code":"UP","description":""},"details":{"total":1073729220608,"free":1023731712000,"threshold":10485760}}}}




http://SERVER:10080/efm/actuator/heapdump


http://SERVER:10080/efm/actuator/env


http://SERVER:10080/efm/actuator/httptrace 


Check REST API Made Available During EFM Startup

2019-08-21 22:30:25.045  INFO 100056 --- [           main] o.e.jetty.server.AbstractConnector       : Started ServerConnector@747d1932{HTTP/1.1,[http/1.1]}{cloudera:10080}
2019-08-21 22:30:25.047  INFO 100056 --- [           main] o.s.b.web.embedded.jetty.JettyWebServer  : Jetty started on port(s) 10080 (http/1.1) with context path '/efm'
2019-08-21 22:30:25.050  INFO 100056 --- [           main] com.cloudera.cem.efm.C2Application       : Started C2Application in 10.102 seconds (JVM running for 10.741)
2019-08-21 22:30:25.056  INFO 100056 --- [           main] com.cloudera.cem.efm.C2Application       : The Edge Flow Manager has started. Services available at the following URLs:
2019-08-21 22:30:25.056  INFO 100056 --- [           main] com.cloudera.cem.efm.C2Application       : >>> Access User Interface: http://cloudera:10080/efm/ui
2019-08-21 22:30:25.056  INFO 100056 --- [           main] com.cloudera.cem.efm.C2Application       : >>> Base URL for REST API: http://cloudera:10080/efm/api
2019-08-21 22:30:25.057  INFO 100056 --- [           main] com.cloudera.cem.efm.C2Application       : >>> Swagger REST API docs: http://cloudera:10080/efm/swagger
2019-08-21 22:30:25.057  INFO 100056 --- [           main] com.cloudera.cem.efm.C2Application       : >>> Status and management: http:/cloudera:10080/efm/actuator

Agent Classes
http://SERVER:10080/efm/api/agent-classes

[
  {
    "name": "iot-1",
    "agentManifests": [
      "agent-manifest-id"
    ]
  },
  {
    "name": "nanojetsonjava",
    "agentManifests": [
      "agent-manifest-id"
    ]
  },
  {
    "name": "raspianjava",
    "agentManifests": [
      "agent-manifest-id"
    ]
  },
  {
    "name": "rpi3javamovidiussensehat",
    "agentManifests": [
      "agent-manifest-id"
    ]
  },
  {
    "name": "rpi4java",
    "agentManifests": [
      "agent-manifest-id"
    ]
  },
  {
    "name": "rpijavamovidiussensehat",
    "agentManifests": [
      "agent-manifest-id"
    ]
  }
]

EFM Agent Manifests
http://server:10080/efm/api/agent-manifests

EFM Agents
http://server:10080/efm/api/agents

EFM C2 Configuration - NiFi Registry
http://server:10080/efm/api/c2-configuration/nifi-registry

EFM Designer Flows
http://server:10080/efm/api/designer/flows

EFM API FLOWS
http://server:10080/efm/api/flows


Monitoring SMM Metrics with NiFi

Aggregated Topics for Last Hour

http://server:9991/api/v1/admin/metrics/aggregated/topics?duration=LAST_ONE_HOUR&state=all

Aggregated Brokers for the Last Hour

http://server:9991/api/v1/admin/metrics/aggregated/brokers?duration=LAST_ONE_HOUR





EFM Events
http://server:10080/efm/api/events
EFM Event by Event ID
http://server:10080/efm/api/events/9db708ca-3b7e-42bf-941a-a945fefa6fa6
Get a Heartbeat from a Device by HBDI
http://server:10080/efm/api/heartbeats/HBID
For Auto-configuring your processor, list of fields available

EFM Events / Fields
http://server:10080/efm/api/events/fields
What Flows available
http://server:10080/efm/api/designer/flows
http://server:10080/efm/api/designer/flows/summaries
Get One Flow

GET /designer/flows/{flowId}

http://server:10080/efm/api/designer/flows/46cac951-217d-41f7-9442-086e9199c044
Get That Flows Events

GET /designer/flows/{flowId}/events

http://server:10080/efm/api/designer/flows/46cac951-217d-41f7-9442-086e9199c044/events
Get All Flows and Buckets
http://server:10080/efm/api/flows
Agent Classes
http://server:10080/efm/api/agent-classes
Agents
http://server:10080/efm/api/agents
Agent Manifests
http://server:10080/efm/api/agent-manifests
What NiFi Registry
http://server:10080/efm/api/c2-configuration/nifi-registry
What EFM Server
http://server:10080/efm/api/c2-configuration
SMM API
EFM Flow Designer
http://server:10080/efm/ui/#/flow-designer/flow/4ae72206-372d-4f3e-916a-d7c1faf09811
For a Great Real World Usage Example
https://github.com/asdaraujo/edge2ai-workshop#lab_1
http://hostname:10080/efm/api/agent-classes
http://hostname:10080/efm/api/agent-manifests?class=
http://hostname:10080/efm/swagger/