SQL Reporting Task for Cloudera Flow Management / HDF / Apache NiFi

SQL Reporting Task for Cloudera Flow Management / HDF / Apache NiFi

Would you like to have reporting tasks gathering metrics and sending them to your database or Kafka from NiFi based on a query of NiFi provenance, bulletins, metrics, processor status or other KPI?

Now you can.   If you are using HDF 3.5.0, this Reporting task NAR is pre installed and ready to go.

Let's add some Reporting tasks that use SQL!!!  QueryNiFiReportingTask.



The first one that was interesting for me was to write queries against provenance for one processor that consumes from a certain topic, I decided to query it every 10 seconds.   My query and some results are below.

So let's go to Controller Settings / Reporting Tasks and then add QueryNiFiReportingTask:


We add one per item we want to monitor.   Then for the reporting task we will need a place to send the records (a sink), we can send it to a JDBC Database (DatabaseRecordSink, KafkaRecordSink, PrometheusRecordSink, ScriptedRecordSink or SiteToSiteReportingRecordSink).   I am going to do Kafka, but Prometheus, Database and S2S are good options.   If you send SiteToSite you can send to another NiFi cluster for processing for that NiFinception processing route,


We have to write an Apache Calcite compliant SQL query, set our sink and decide if we want to include zero record results (false is a good idea).



One option is to query the BULLETINS table, which are NiFi Cluster bulletins (warnings/errors).


Another option is the CONNECTION_STATUS table.




How about NiFi JVM Metrics?  That has some good stuff in there.



Let's configure some Kafka Record Sinks.  I am using Kafka 2.3, so I'll use the Kafka 2 sink.   I set some brokers (default port of 9092), create a new topic for it, chose the record writer.  I chose JSON, but it could be CSV, AVRO, Parquet, XML or something else.   Stick with JSON or AVRO for Kafka.


So data starts getting sent, by default the schedule is every 5 minutes.   This can be adjusted, for provenance I set mine to every 10 seconds.

 

Let's look at data as it passes through Kafka, in the follow up to this article we'll land in Impala/Kudu/Hive/Druid/Phoenix/HBase or HDFS/S3 and query it.   For now, we can examine the data within Kafka via Cloudera Streams Messaging Manager (SMM).







We can examine any of the topics in Kafka with SMM.   We can then start consuming these records and build live metric handling applications or send to live data marts and dashboards powered by Cloudera Data Platform (CDP).


We have a full set of SQL to use for these reporting tasks including selecting columns, doing aggregates like MAX and AVG, ordering, grouping, where clauses and row limits.



Provenance Query

SELECT eventId, durationMillis, lineageStart, timestampMillis,
updatedAttributes, entitySize, details
FROM PROVENANCE
WHERE componentName = 'Consume Kafka iot messages'
LIMIT 25

Provenance Results


[{"eventId":2724,"durationMillis":69,"lineageStart":1586294707989,"timestampMillis":1586294708002,"updatedAttributes":"{path=./, schema.protocol.version=1, filename=be499074-c595-46f5-a03a-482607fb9c8c, schema.identifier=1, kafka.partition=7, kafka.offset=36, schema.name=SensorReading, kafka.timestamp=1586294707933, kafka.topic=iot, schema.version=1, mime.type=application/json, uuid=be499074-c595-46f5-a03a-482607fb9c8c}","entitySize":246,"details":null},{"eventId":2736,"durationMillis":20,"lineageStart":1586294708905,"timestampMillis":1586294708916,"updatedAttributes":"{path=./, schema.protocol.version=1, filename=74c9e28d-82a4-4cea-8331-92b7a4bee1b3, schema.identifier=1, kafka.partition=3, kafka.offset=36, schema.name=SensorReading, kafka.timestamp=1586294708898, kafka.topic=iot, schema.version=1, mime.type=application/json, uuid=74c9e28d-82a4-4cea-8331-92b7a4bee1b3}","entitySize":247,"details":null},..]

I have classes on using this technology and some webinars coming to you in the East Coast of the US and virtually at my meetup.

https://www.meetup.com/futureofdata-princeton/

Cloudera HDF 3.5.0 Is Now Available For Download

https://docs.cloudera.com/HDPDocuments/HDF3/HDF-3.5.0/release-notes/content/whats-new.html

https://www.cloudera.com/downloads/hdf.html

HDF 3.5.0 includes the following components:
  • Apache Ambari 2.7.5
  • Apache Kafka 2.3.1
  • Apache NiFi 1.11.1
  • NiFi Registry 0.5.0
  • Apache Ranger 1.2.0
  • Apache Storm 1.2.1
  • Apache ZooKeeper 3.4.6
  • Apache MiNiFi Java Agent 0.6.0
  • Apache MiNiFi C++ 0.6.0
  • Hortonworks Schema Registry 0.8.1
  • Hortonworks Streaming Analytics Manager 0.6.0
  • Apache Knox 1.0.0
  • SmartSense 1.5.0

SQL reporting task. The QueryNiFiReportingTask allows users to execute SQL queries against tables containing information on Connection Status, Processor Status, Bulletins, Process Group Status, JVM Metrics, Provenance and Connection Status Predictions. In combination with Site to Site, it is particularly useful to define fine-grained monitoring capabilities on top of the running workflows.

Using NiFi CLI to Restore NiFi Flows From Backups

Using NiFi CLI to Restore NiFi Flows From Backups

Please note, Apache NiFi 1.11.4 is now available for download

https://cwiki.apache.org/confluence/display/NIFI/Release+Notes#ReleaseNotes-Version1.11.4

References:



#> registry list-buckets -u http://somesite.compute-1.amazonaws.com:18080

#   Name   Id                                     Description 
-   ----   ------------------------------------   ----------- 
1   IOT    45834964-d022-4f4c-891f-695898e1e5f0   (empty)     
2   IoT    250a5ae5-ced8-4f4e-8b3b-01eb9d47a0d9   (empty)     
3   dev    46b7bab7-400f-44ae-a0e6-7340ff19c96f   (empty)     
4   iot    c594d6bc-7413-4f6a-ba9a-50b8020eec37   (empty)     
5   prod   0bf59d2e-1dd5-4d24-8aa0-0614bf991dc9   (empty)     


#> registry create-flow -verbose -u http://somesite.compute-1.amazonaws.com:18080 -b 250a5ae5-ced8-4f4e-8b3b-01eb9d47a0d9 --flowName iotFlow


a5a4ac59-9aeb-416e-937f-e601ca8beba9


#> registry import-flow-version -verbose -u http://somesite.compute-1.amazonaws.com:18080 -f a5a4ac59-9aeb-416e-937f-e601ca8beba9 -i iot-1.json


#> registry list-flows  -u http://ec2-35-171-154-174.compute-1.amazonaws.com:18080 -b 250a5ae5-ced8-4f4e-8b3b-01eb9d47a0d9

#   Name      Id                                     Description 
-   -------   ------------------------------------   ----------- 
1   iotFlow   a5a4ac59-9aeb-416e-937f-e601ca8beba9   (empty)     




Fixing Linux Webcams


  v4l2-ctl --list-devices
  v4l2-ctl -d /dev/video0 --list-ctrls
  v4l2-ctl --get-ctrl=white_balance_temperature
  v4l2-ctl --set-ctrl=white_balance_temperature=4000
  v4l2-ctl --set-ctrl=white_balance_temperature=4000 -d /dev/video0
  v4l2-ctl --set-ctrl=white_balance_temperature_auto=1
  v4l2-ctl --set-ctrl=white_balance_temperature_auto=0
  v4l2-ctl --set-ctrl=white_balance_temperature_auto=4000
  v4l2-ctl --set-ctrl=exposure_auto=3
  v4l2-ctl --set-ctrl=exposure_auto_priority=0
  v4l2-ctl --set-ctrl=exposure_absolute=250
  v4l2-ctl --set-ctrl=exposure_absolute=0
  v4l2-ctl --set-ctrl=exposure_absolute=250
  v4l2-ctl --set-ctrl=gain=0
  v4l2-ctl -d /dev/video0 --list-ctrls
  v4l2-ctl --set-ctrl=white_balance_temperature_auto=4000
  v4l2-ctl --set-ctrl=white_balance_temperature_auto=0
  v4l2-ctl --set-ctrl=white_balance_temperature=4000
 v4l2-ctl -d /dev/video0 --list-ctrls



This article is great:   https://www.kurokesu.com/main/2016/01/16/manual-usb-camera-settings-in-linux/


v4l2-ctl -d /dev/video0 --list-ctrls
                     brightness 0x00980900 (int)    : min=0 max=255 step=1 default=128 value=128
                       contrast 0x00980901 (int)    : min=0 max=255 step=1 default=128 value=128
                     saturation 0x00980902 (int)    : min=0 max=255 step=1 default=128 value=128
 white_balance_temperature_auto 0x0098090c (bool)   : default=1 value=0
                           gain 0x00980913 (int)    : min=0 max=255 step=1 default=0 value=0
           power_line_frequency 0x00980918 (menu)   : min=0 max=2 default=2 value=2
      white_balance_temperature 0x0098091a (int)    : min=2000 max=6500 step=1 default=4000 value=4000
                      sharpness 0x0098091b (int)    : min=0 max=255 step=1 default=128 value=128
         backlight_compensation 0x0098091c (int)    : min=0 max=1 step=1 default=0 value=0
                  exposure_auto 0x009a0901 (menu)   : min=0 max=3 default=3 value=3
              exposure_absolute 0x009a0902 (int)    : min=3 max=2047 step=1 default=250 value=83 flags=inactive
         exposure_auto_priority 0x009a0903 (bool)   : default=0 value=0
                   pan_absolute 0x009a0908 (int)    : min=-36000 max=36000 step=3600 default=0 value=0
                  tilt_absolute 0x009a0909 (int)    : min=-36000 max=36000 step=3600 default=0 value=0
                 focus_absolute 0x009a090a (int)    : min=0 max=250 step=5 default=0 value=0 flags=inactive
                     focus_auto 0x009a090c (bool)   : default=1 value=1
                  zoom_absolute 0x009a090d (int)    : min=100 max=500 step=1 default=100 value=100




v4l2-ctl --list-devices
HD Pro Webcam C920 (usb-70090000.xusb-2.2):
/dev/video0


ODPI's OpenDS4All - Open Source Data Science Content To Teach the World

OpenDS4All




Start learning now:


ODPI has officially announced this recently and it looks great.

There is a ton of amazing materials including slides, notes, documentation, homework, exercises and Jupyter notebooks covering Data Wrangling, Data Science, the Basics and Apache Spark.   

taxonomy

This“starter set” of training materials can help you build a Data Science program for yourself, your company, your university or your non-profit.    I am going to bring some of these to my meetups and hopefully can help give back with new materials, updates and suggestions.

These are college level materials developed by the University of Pennsylvania and open source via the ODPI with IBM leading.   The code and slides look great.   I can see these helping to enable the world adding another million desperately needed Data Scientists, Data Engineers and Data Science Enabled professionals.

I have been running some of this via Cloudera Machine Learning in my CDP cluster in AWS and it works great.   This is really well made.   I am hoping to create a module on Streaming Data Science to contribute.



New and Improved: It's NiFi

Apache NiFi 1.11.3


http://nifi.apache.org/download.html

If you have downloaded anything after NiFi 1.10, please upgrade now.   This has some major improvements and some fixes.

Release note highlights can be found here:
https://cwiki.apache.org/confluence/display/NIFI/Release+Notes#ReleaseNotes-Version1.11.3


I am running this now in Anaheim, and it's no Mickey Mouse upgrade.   It's fast and nice.

Some of the more recent upgrades:

https://www.datainmotion.dev/2019/11/exploring-apache-nifi-110-parameters.html

For parameters and stateless, and ability to download a flow as JSON is worth the price of install.



See some more NiFi 1.11 features here:   https://www.datainmotion.dev/2020/02/edgeai-google-coral-with-coral.html


EdgeAI: Google Coral with Coral Environmental Sensors and TPU With NiFi and MiNiFi (Updated EFM)

EdgeAI:   Google Coral with Coral Environmental Sensors and TPU With NiFi and MiNiFi


Building MiNiFi IoT Apps with the new Cloudera EFM 


It is very easy to build a drag and drop EdgeAI application with EFM and then push to all your MiNiFi agents.


Cloudera Edge Management CEM-1.1.1
Download the newest CEM today!









NiFi Flow Receiving From MiNiFi Java Agent


In a cluster in my CDP-DC Cluster I consume Kafka messages sent from my remote NiFi gateway to publish alerts to Kafka and push records to Apache HBase and Apache Kudu.  We filter our data with Streaming SQL.


We can use SQL to route, create aggregates like averages, chose a subset of fields and limit data returned.   Using the power of Apache Calcite, Streaming SQL in NiFi is a game changer against Record Data Types including CSV, XML, Avro, Parquet, JSON and Grokable text.   Read and write different formats and convert when your SQL is done.   Or just to SELECT * FROM FLOWFILE to get everything.  



We can see this flow from Atlas as we trace the data lineage and provenance from Kafka topic.



We can search Atlas for Kafka Topics.



From coral Kafka topic to NiFi to Kudu.


Details on Coral Kafka Topic


Examining the Hive Metastore Data on the Coral Kudu Table


NiFi Flow Details in Atlas


Details on Alerts Topic
'


Statistics from Atlas





Example Web Camera Image



 Example JSON Record

[{"cputemp":59,"id":"20200221190718_2632409e-f635-48e7-9f32-aa1333f3b8f9","temperature":"39.44","memory":91.1,"score_1":"0.29","starttime":"02/21/2020 14:07:13","label_1":"hair spray","tempf":"102.34","diskusage":"50373.5 MB","message":"Success","ambient_light":"329.92","host":"coralenv","cpu":34.1,"macaddress":"b8:27:eb:99:64:6b","pressure":"102.76","score_2":"0.14","ip":"127.0.1.1","te":"5.10","systemtime":"02/21/2020 14:07:18","label_2":"syringe","humidity":"10.21"}]


Querying Kudu results in Hue


Pushing Alerts to Slack from NiFi





I am running on Apache NiFi 1.11.1 and wanted to point out a new feature.   Download flow:   Will download the highlighted flow/pgroup as JSON.




Looking at NiFi counters to monitor progress:

We can see how easy it is to ingest IoT sensor data and run AI algorithms on Coral TPUs.



Shell (coralrun.sh)


#!/bin/bash
DATE=$(date +"%Y-%m-%d_%H%M%S")
fswebcam -q -r 1280x720 /opt/demo/images/$DATE.jpg
python3 -W ignore /opt/demo/test.py --image /opt/demo/images/$DATE.jpg 2>/dev/null


Kudu Table DDL

https://github.com/tspannhw/table-ddl


Python 3 (test.py)


import time
import sys
import subprocess
import os
import base64
import uuid
import datetime
import traceback
import base64
import json
from time import gmtime, strftime
import math
import random, string
import time
import psutil
import uuid 
from getmac import get_mac_address
from coral.enviro.board import EnviroBoard
from luma.core.render import canvas
from PIL import Image, ImageDraw, ImageFont
import os
import argparse
from edgetpu.classification.engine import ClassificationEngine

# Importing socket library 
import socket 

start = time.time()
starttf = datetime.datetime.now().strftime('%m/%d/%Y %H:%M:%S')

def ReadLabelFile(file_path):
    with open(file_path, 'r') as f:
        lines = f.readlines()
    ret = {}
    for line in lines:
        pair = line.strip().split(maxsplit=1)
        ret[int(pair[0])] = pair[1].strip()
    return ret

# Google Example Code
def update_display(display, msg):
    with canvas(display) as draw:
        draw.text((0, 0), msg, fill='white')

def getCPUtemperature():
    res = os.popen('vcgencmd measure_temp').readline()
    return(res.replace("temp=","").replace("'C\n",""))

# Get MAC address of a local interfaces
def psutil_iface(iface):
    # type: (str) -> Optional[str]
    import psutil
    nics = psutil.net_if_addrs()
    if iface in nics:
        nic = nics[iface]
        for i in nic:
            if i.family == psutil.AF_LINK:
                return i.address
# /opt/demo/examples-camera/all_models  
row = { }
try:
#i = 1
#while i == 1:
    parser = argparse.ArgumentParser()
    parser.add_argument('--image', help='File path of the image to be recognized.', required=True)
    args = parser.parse_args()
    # Prepare labels.
    labels = ReadLabelFile('/opt/demo/examples-camera/all_models/imagenet_labels.txt')

    # Initialize engine.
    engine = ClassificationEngine('/opt/demo/examples-camera/all_models/inception_v4_299_quant_edgetpu.tflite')

    # Run inference.
    img = Image.open(args.image)

    scores = {}
    kCount = 1

    # Iterate Inference Results
    for result in engine.ClassifyWithImage(img, top_k=5):
        scores['label_' + str(kCount)] = labels[result[0]]
        scores['score_' + str(kCount)] = "{:.2f}".format(result[1])
        kCount = kCount + 1    

    enviro = EnviroBoard()
    host_name = socket.gethostname()
    host_ip = socket.gethostbyname(host_name)
    cpuTemp=int(float(getCPUtemperature()))
    uuid2 = '{0}_{1}'.format(strftime("%Y%m%d%H%M%S",gmtime()),uuid.uuid4())
    usage = psutil.disk_usage("/")
    end = time.time()
    row.update(scores)
    row['host'] = os.uname()[1]
    row['ip'] = host_ip
    row['macaddress'] = psutil_iface('wlan0')
    row['cputemp'] = round(cpuTemp,2)
    row['te'] = "{0:.2f}".format((end-start))
    row['starttime'] = starttf
    row['systemtime'] = datetime.datetime.now().strftime('%m/%d/%Y %H:%M:%S')
    row['cpu'] = psutil.cpu_percent(interval=1)
    row['diskusage'] = "{:.1f} MB".format(float(usage.free) / 1024 / 1024)
    row['memory'] = psutil.virtual_memory().percent
    row['id'] = str(uuid2)
    row['message'] = "Success"
    row['temperature'] = '{0:.2f}'.format(enviro.temperature)
    row['humidity'] = '{0:.2f}'.format(enviro.humidity)
    row['tempf'] = '{0:.2f}'.format((enviro.temperature * 1.8) + 32)    
    row['ambient_light'] = '{0}'.format(enviro.ambient_light)
    row['pressure'] = '{0:.2f}'.format(enviro.pressure)
    msg = 'Temp: {0}'.format(row['temperature'])
    msg += 'IP: {0}'.format(row['ip'])
    update_display(enviro.display, msg)
#    i = 2
except:
    row['message'] = "Error"
print(json.dumps(row)) 

Source Code:



Sensors / Devices / Hardware:

  • Humdity-HDC2010 humidity sensor
  • Light-OPT3002 ambient light sensor
  • Barometric-BMP280 barometric pressure sensor
  • PS3 Eye Camera and Microphone USB
  • Raspberry Pi 3B+
  • Google Coral Environmental Sensor Board
  • Google Coral USB Accelerator TPU

References: