Migrating Apache Flume Flows to Apache NiFi: Kafka Source to HDFS / Kudu / File / Hive

Migrating Apache Flume Flows to Apache NiFi: Kafka Source to HDFS / Kudu / File / Hive


This is one possible simple, fast replacement for "Flafka".




Consume / Publish Kafka And Store to Files, HDFS, Hive 3.1, Kudu



Consume Kafka Flow 



 Merge Records And Store As AVRO or ORC


Consume Kafka, Update Records via Machine Learning Models In CDSW And Store to Kudu



Source:  Apache Kafka Topics


You enter a few parameters and start ingesting data with or without schemas.   Apache Flume had no Schema support.   Flume did not support transactions.



Sink:   Files




Storing to files in files systems, object stores, SFTP or elsewhere could not be easier.  Choose S3, Local File System, SFTP, HDFS or wherever.

Sink:   Apache Kudu / Apache Impala



Storing to Kudu/Impala (or Parquet for that manner could not be easier with Apache NiFi).


Sink:   HDFS for Apache ORC Files


When completes, the ConvertAvroToORC and PutHDFS build the Hive DDL for you!  You can build the tables automagically with Apache NiFi if you wish.

CREATE EXTERNAL TABLE IF NOT EXISTS iotsensors
(sensor_id BIGINT, sensor_ts BIGINT, is_healthy STRING, response STRING, sensor_0 BIGINT, sensor_1 BIGINT,
sensor_2 BIGINT, sensor_3 BIGINT, sensor_4 BIGINT, sensor_5 BIGINT, sensor_6 BIGINT, sensor_7 BIGINT, sensor_8 BIGINT,
sensor_9 BIGINT, sensor_10 BIGINT, sensor_11 BIGINT)
STORED AS ORC
LOCATION '/tmp/iotsensors'





Sink: Kafka

Publishing to Kafka is just as easy!  Push records with schema references or raw data.  AVRO or JSON, whatever makes sense for your enterprise.

Write to data easily with no coding and no changes or redeploys for schema or schema version changes.
 Pick a Topic and Stream Data While Converting Types


Clean UI and REST API to Manage, Monitor, Configure and Notify on Kafka




Other Reasons to Use Apache NiFi Over Apache Flume

DevOps with REST API, CLI, Python API

Schemas!   We not only work with semi-structured, structured and unstructured data.  We are schema and schema version aware for CSV, JSON, AVRO, XML, Grokked Text Files and more. https://community.cloudera.com/t5/Community-Articles/Big-Data-DevOps-Apache-NiFi-HWX-Schema-Registry-Schema/ta-p/247963

Flume Replacement Use Cases Implemented in Apache NiFi

Sink/Source:   JMS

Source:   Files/PDF/PowerPoint/Excel/Word  Sink:  Files

Source:  Files/CSV  Sink:   HDFS/Hive/Apache ORC

Source:  REST/Files/Simulator   Sink:  HBase, Files, HDFS.    ETL with Lookups.

Flume Replacement - Lightweight Open Source Agents


If you need to replace local Log to Kafka agents or anything to Kafka or anything to anything with routing, transformation and manipulation.   You can use Edge Flow Manager deployed MiNiFi Agents available in Java and C++ versions.

References

Tracking Air Quality with Apache NiFi, Cloudera Data Science Workbench, Pyspark and Parquet

Tracking Air Quality 

Indoor vs Outdoor

Using a few sensors on a MiniFi node we are able to generate some air quality sensor readings.
Data:
row['bme680_tempc'] = '{0:.2f}'.format(sensor.data.temperature)
row['bme680_tempf'] = '{0:.2f}'.format((sensor.data.temperature * 1.8) + 32)
row['bme680_pressure'] = '{0:.2f}'.format(sensor.data.pressure)
row['bme680_gas'] = '{0:.2f}'.format(gas)
row['bme680_humidity'] = '{0:.2f}'.format(hum)
row['bme680_air_quality_score'] = '{0:.2f}'.format(air_quality_score)
row['bme680_gas_baseline'] = '{0:.2f}'.format(gas_baseline)
row['bme680_hum_baseline'] = '{0:.2f}'.format(hum_baseline)
See Part 1:
Newark / NYC Hazecam
Example
{"bme680_air_quality_score": "82.45", "uuid": "20190131191921_59c5441c-47b4-4f6f-a6d6-b3943bc9cf2b", "ipaddress": "192.168.1.166", "bme680_gas_baseline": 367283.28, "bme680_pressure": "1024.51", "bme680_hum_baseline": 40.0, "memory": 11.7, "end": "1548962361.4146328", "cputemp": 47, "host": "piups", "diskusage": "9992.7", "bme680_tempf": "87.53", "te": "761.2184100151062", "starttime": "01/31/2019 14:06:40", "systemtime": "01/31/2019 14:19:21", "bme680_humidity": "13.22", "bme680_tempc": "30.85", "bme680_gas": "363274.92"}
{
"end" : "1548967753.7064438",
"host" : "piups",
"diskusage" : "9990.4",
"cputemp" : 47,
"starttime" : "01/31/2019 15:44:11",
"bme680_hum_baseline" : "40.00",
"bme680_humidity" : "13.23",
"ipaddress" : "192.168.1.166",
"bme680_tempc" : "30.93",
"te" : "301.96490716934204",
"bme680_air_quality_score" : "83.27",
"systemtime" : "01/31/2019 15:49:13",
"bme680_tempf" : "87.67",
"bme680_gas_baseline" : "334942.60",
"uuid" : "20190131204913_4984a635-8dcd-408a-ba23-c0d225ba2d86",
"bme680_pressure" : "1024.69",
"memory" : 12.6,
"bme680_gas" : "336547.19"
}
Outdoor air quality
https://community.cloudera.com/t5/Community-Articles/Tracking-Air-Quality-with-HDP-and-HDF-Part-1-Apache-NiFi/ta-p/248265

https://openweathermap.org/api/pollution/co

https://airquality.weather.gov/probe_aq_data.php?city=hightstown&state=NJ&Submit=Get+Guidance

http://feeds.enviroflash.info/rss/realtime/445.xml

http://feeds.enviroflash.info/cap/aggregate.xml

http://www.airnowapi.org/aq/forecast/zipCode/?format=application/json&zipCode=08520&date=2019-09-05&distance=25&API_KEY=code

https://docs.airnowapi.org/webservices

http://www.airnowapi.org/aq/observation/zipCode/current/?format=application/json&zipCode=08520&distance=50&API_KEY=

code


https://api.openaq.org/v1/measurements?country=US&date_from=2018-05-04

https://api.openaq.org/v1/latest?country=US

http://www.airnowapi.org/aq/observation/zipCode/current/?format=application/json&zipCode=08520&distance=25&API_KEY=code









Flight Data

https://community.cloudera.com/t5/Community-Articles/Ingesting-Flight-Data-ADS-B-USB-Receiver-with-Apache-NiFi-1/ta-p/247940

Air Traffic Overhead

https://opensky-network.org/api/states/all?lamin=40.270599&lomin=-74.522430&lamax=40.270599&lomax=-74.522430

http://scorecard.goodguide.com/about/txt/data.html

https://www.epa.gov/visibility

https://www.airnow.gov/

https://www.state.nj.us/dep/daq/

http://www.nynjpollen.com/

http://www.njaqinow.net/

https://www.fsvisimages.com/descriptions.aspx

https://www.datainmotion.dev/2019/03/iot-series-sensors-utilizing-breakout_74.html

https://github.com/tspannhw/minifi-breakoutgarden/blob/master/aqminifi.py





Monitoring Cloudera Edge Flow Manager and Cloudera Streams Messaging Manager with Apache NiFi


Monitoring Cloudera Edge Flow Manager and Cloudera Streams Messaging Manager with Apache NiFi


http://SERVER:10080/efm/actuator/health

{"status":{"code":"UP","description":""},"details":{"db":{"status":{"code":"UP","description":""},"details":{"database":"MySQL","hello":1}},"diskSpace":{"status":{"code":"UP","description":""},"details":{"total":1073729220608,"free":1023731712000,"threshold":10485760}}}}




http://SERVER:10080/efm/actuator/heapdump


http://SERVER:10080/efm/actuator/env


http://SERVER:10080/efm/actuator/httptrace 


Check REST API Made Available During EFM Startup

2019-08-21 22:30:25.045  INFO 100056 --- [           main] o.e.jetty.server.AbstractConnector       : Started ServerConnector@747d1932{HTTP/1.1,[http/1.1]}{cloudera:10080}
2019-08-21 22:30:25.047  INFO 100056 --- [           main] o.s.b.web.embedded.jetty.JettyWebServer  : Jetty started on port(s) 10080 (http/1.1) with context path '/efm'
2019-08-21 22:30:25.050  INFO 100056 --- [           main] com.cloudera.cem.efm.C2Application       : Started C2Application in 10.102 seconds (JVM running for 10.741)
2019-08-21 22:30:25.056  INFO 100056 --- [           main] com.cloudera.cem.efm.C2Application       : The Edge Flow Manager has started. Services available at the following URLs:
2019-08-21 22:30:25.056  INFO 100056 --- [           main] com.cloudera.cem.efm.C2Application       : >>> Access User Interface: http://cloudera:10080/efm/ui
2019-08-21 22:30:25.056  INFO 100056 --- [           main] com.cloudera.cem.efm.C2Application       : >>> Base URL for REST API: http://cloudera:10080/efm/api
2019-08-21 22:30:25.057  INFO 100056 --- [           main] com.cloudera.cem.efm.C2Application       : >>> Swagger REST API docs: http://cloudera:10080/efm/swagger
2019-08-21 22:30:25.057  INFO 100056 --- [           main] com.cloudera.cem.efm.C2Application       : >>> Status and management: http:/cloudera:10080/efm/actuator

Agent Classes
http://SERVER:10080/efm/api/agent-classes

[
  {
    "name": "iot-1",
    "agentManifests": [
      "agent-manifest-id"
    ]
  },
  {
    "name": "nanojetsonjava",
    "agentManifests": [
      "agent-manifest-id"
    ]
  },
  {
    "name": "raspianjava",
    "agentManifests": [
      "agent-manifest-id"
    ]
  },
  {
    "name": "rpi3javamovidiussensehat",
    "agentManifests": [
      "agent-manifest-id"
    ]
  },
  {
    "name": "rpi4java",
    "agentManifests": [
      "agent-manifest-id"
    ]
  },
  {
    "name": "rpijavamovidiussensehat",
    "agentManifests": [
      "agent-manifest-id"
    ]
  }
]

EFM Agent Manifests
http://server:10080/efm/api/agent-manifests

EFM Agents
http://server:10080/efm/api/agents

EFM C2 Configuration - NiFi Registry
http://server:10080/efm/api/c2-configuration/nifi-registry

EFM Designer Flows
http://server:10080/efm/api/designer/flows

EFM API FLOWS
http://server:10080/efm/api/flows


Monitoring SMM Metrics with NiFi

Aggregated Topics for Last Hour

http://server:9991/api/v1/admin/metrics/aggregated/topics?duration=LAST_ONE_HOUR&state=all

Aggregated Brokers for the Last Hour

http://server:9991/api/v1/admin/metrics/aggregated/brokers?duration=LAST_ONE_HOUR





EFM Events
http://server:10080/efm/api/events
EFM Event by Event ID
http://server:10080/efm/api/events/9db708ca-3b7e-42bf-941a-a945fefa6fa6
Get a Heartbeat from a Device by HBDI
http://server:10080/efm/api/heartbeats/HBID
For Auto-configuring your processor, list of fields available

EFM Events / Fields
http://server:10080/efm/api/events/fields
What Flows available
http://server:10080/efm/api/designer/flows
http://server:10080/efm/api/designer/flows/summaries
Get One Flow

GET /designer/flows/{flowId}

http://server:10080/efm/api/designer/flows/46cac951-217d-41f7-9442-086e9199c044
Get That Flows Events

GET /designer/flows/{flowId}/events

http://server:10080/efm/api/designer/flows/46cac951-217d-41f7-9442-086e9199c044/events
Get All Flows and Buckets
http://server:10080/efm/api/flows
Agent Classes
http://server:10080/efm/api/agent-classes
Agents
http://server:10080/efm/api/agents
Agent Manifests
http://server:10080/efm/api/agent-manifests
What NiFi Registry
http://server:10080/efm/api/c2-configuration/nifi-registry
What EFM Server
http://server:10080/efm/api/c2-configuration
SMM API
EFM Flow Designer
http://server:10080/efm/ui/#/flow-designer/flow/4ae72206-372d-4f3e-916a-d7c1faf09811
For a Great Real World Usage Example
https://github.com/asdaraujo/edge2ai-workshop#lab_1
http://hostname:10080/efm/api/agent-classes
http://hostname:10080/efm/api/agent-manifests?class=
http://hostname:10080/efm/swagger/


Resizing AWS ESB


   11  lsblk
   12  df -H
   13  lsblk
   14  sudo growpart /dev/xvda 0
   15  sudo resize2fs /dev/xvda1
   16  lsblk
   17   sudo growpart /dev/xvda 2
   18  lsblk
   19  df -H
   24  lsblk
   25  xfs_growfs /dev/xvda2 


https://hackernoon.com/tutorial-how-to-extend-aws-ebs-volumes-with-no-downtime-ec7d9e82426e

https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/recognize-expanded-volume-linux.html


growpart /dev/xvda 2
CHANGED: partition=2 start=4096 old: size=20967391 end=20971487 new: size=2097147870 end=2097151966

lsblk
NAME    MAJ:MIN RM  SIZE RO TYPE MOUNTPOINT
xvda    202:0    0 1000G  0 disk 
├─xvda1 202:1    0    1M  0 part 
└─xvda2 202:2    0 1000G  0 part /

df -H
Filesystem      Size  Used Avail Use% Mounted on
devtmpfs         34G     0   34G   0% /dev
tmpfs            34G     0   34G   0% /dev/shm
tmpfs            34G   27M   34G   1% /run
tmpfs            34G     0   34G   0% /sys/fs/cgroup
/dev/xvda2       11G  2.3G  8.5G  22% /
tmpfs           6.8G     0  6.8G   0% /run/user/1000
tmpfs           6.8G     0  6.8G   0% /run/user/0

xfs_growfs /dev/xvda2 
meta-data=/dev/xvda2             isize=512    agcount=7, agsize=393216 blks
         =                       sectsz=512   attr=2, projid32bit=1
         =                       crc=1        finobt=0 spinodes=0
data     =                       bsize=4096   blocks=2620923, imaxpct=25
         =                       sunit=0      swidth=0 blks
naming   =version 2              bsize=4096   ascii-ci=0 ftype=1
log      =internal               bsize=4096   blocks=2560, version=2
         =                       sectsz=512   sunit=0 blks, lazy-count=1
realtime =none                   extsz=4096   blocks=0, rtextents=0
data blocks changed from 2620923 to 262143483
[root@ip-10-0-1-136 scripts]# lsblk
NAME    MAJ:MIN RM  SIZE RO TYPE MOUNTPOINT
xvda    202:0    0 1000G  0 disk 
├─xvda1 202:1    0    1M  0 part 
└─xvda2 202:2    0 1000G  0 part /

df -H
Filesystem      Size  Used Avail Use% Mounted on
devtmpfs         34G     0   34G   0% /dev
tmpfs            34G     0   34G   0% /dev/shm
tmpfs            34G   27M   34G   1% /run
tmpfs            34G     0   34G   0% /sys/fs/cgroup
/dev/xvda2      1.1T  2.4G  1.1T   1% /
tmpfs           6.8G     0  6.8G   0% /run/user/1000
tmpfs           6.8G     0  6.8G   0% /run/user/0

Powering Edge AI for Sensor Reading with RPI and Breakout Garden (EFM, NiFi, MiNiFi Agents)


Powering Edge AI for Sensor Reading with RPI and Breakout Garden (EFM, NiFi, MiNiFi Agents)







Hardware Component List:

  • Raspberry Pi 3B+
  • BMP-280 Temperature, Pressure and Altitude
  • ST7735 0.96 SPI Colour LCD 160x80
  • MAX-30105 Oximeter and Smoke Sensor
  • Sony Playstation 3 EYE USB Web Camera

Software Component List:

  • Raspian
  • Python 3.5
  • JDK 8 Java (Soon Upgrading to JDK 11)
  • Apache NiFi 1.9.2
  • MiniFi Java Agent 0.6.0
  • Cloudera Edge Flow Manager
  • Apache Kafka 2.2

Summary

96410-breakoutgardenarchitecture.jpg

Potential Use Cases:   Tracking Environment in a facility that includes webcam detection, temperature, pressure and smoke.


Our Raspberry Pi 3B+ has a Breakout Garden Hat with 2 sensors and one small display. The display is showing the capture image and is constantly updating. 
We currently run via nohup, but when we go into constant use I will switch to a Linux Service to run on startup.
The Python script initializes the connections to all of the sensors and then goes into an infinite loop of reading those values and building a JSON packet that we send via MQTT over port 1883 to a MQTT Mosquitto broker. MiniFi 0.6.0 Java Agent is using ConsumeMQTT on that port to capture these messages and filter them based on alarm values. If outside of the checked parameters we send them via S2S/HTTP(s) to an Apache NiFi server.
We also have a USB WebCam (Sony Playstation 3 EYE) that is capturing images and we read those with MiniFi and send them to NiFi as well.  We will incorporate TensorFlow lite models into our analysis.
The first thing we need to do is pretty easy. We need to plug in our Pimoroni Breakout Garden Hat and our 3 plugs.
You have to do the standard installation of Python 3, Java 8, MiniFi and I recommend OpenCV. Make sure you have everything plugged in securely and the correct direction before you power on the Raspberry Pi.
Install Python PIP 
Install Breakout Garden Library 
unzip master.zip
cd breakout-garden-master
sudo ./install.sh

NiFi Flow


We Can Query IoT Events As They Stream In



Add ExecuteProcess to Run Our Shell/Python




IoT JSON Data



IoT User



Cloudera Edge Management - Monitoring IoT Events From MiNiFi Agents on Devices


Building an IoT Flow Graphically Is Easy!   It follows the Hadoop Philosophy to stitch these things together.  



Configure Connection to MQTT Broker



MQTT Configuration 2




Cloudera Edge Flow Manager REST API





 Let's Examine those MQTT Messages from Devices









As you see we follow the Hadoop Philosophy of keeping things open, extensible, modular, flexible, transparent, composable, using open data standards, open source, diverse and cloud friendly.  In this way we can always bend to the needs of the user and adapt to any environment, any data, any cloud at any time.   If we need to windowing we could easily add Storm or Flink.   For other streaming use cases we can connect our Kafka topics to Spark Structured Streaming or Kafka Streams for additional processing as needed.    We can public our public schemas from our schema registry as open data standards within our enterprise, our ecosystem or world wide.   Data is meant for sharing and utilizing to build knowledge.   Let's make it happen, from any Edge to any data store to any data cloud to any AI / ML / DS / DL model.

Source:
https://github.com/tspannhw/breakoutgardenhat-spi-minifi

Resources:

Breakout Garden Hat
https://github.com/pimoroni/breakout-garden/tree/master/examples/heartbeat
https://shop.pimoroni.com/products/0-96-spi-colour-lcd-160x80-breakout
https://datasheets.maximintegrated.com/en/ds/MAX30105.pdf
https://shop.pimoroni.com/products/max30105-breakout-heart-rate-oximeter-smoke-sensor
https://github.com/pimoroni/max30105-python
https://github.com/tspannhw/minifi-breakoutgarden
https://shop.pimoroni.com/products/0-96-spi-colour-lcd-160x80-breakout
curl https://get.pimoroni.com/st7735 | bash
https://github.com/pimoroni/st7735-python

Using a Different Configuration of Breakout Garden Sensors
https://community.cloudera.com/t5/Community-Articles/IoT-Series-Sensors-Utilizing-Breakout-Garden-Hat-Part-1/ta-p/249262


TensorFlow
https://github.com/tensorflow/examples/blob/master/lite/examples/image_classification/raspberry_pi/README.md
https://www.tensorflow.org/lite/guide/python
https://github.com/PINTO0309/Bazel_bin
https://github.com/PINTO0309/Tensorflow-bin

sudo apt-get install libatlas-base-dev 

wget https://dl.google.com/coral/python/tflite_runtime-1.14.0-cp35-cp35m-linux_armv7l.whl
pip3 install tflite_runtime-1.14.0-cp35-cp35m-linux_armv7l.whl