New and Improved: It's NiFi

Apache NiFi 1.11.3


http://nifi.apache.org/download.html

If you have downloaded anything after NiFi 1.10, please upgrade now.   This has some major improvements and some fixes.

Release note highlights can be found here:
https://cwiki.apache.org/confluence/display/NIFI/Release+Notes#ReleaseNotes-Version1.11.3


I am running this now in Anaheim, and it's no Mickey Mouse upgrade.   It's fast and nice.

Some of the more recent upgrades:

https://www.datainmotion.dev/2019/11/exploring-apache-nifi-110-parameters.html

For parameters and stateless, and ability to download a flow as JSON is worth the price of install.



See some more NiFi 1.11 features here:   https://www.datainmotion.dev/2020/02/edgeai-google-coral-with-coral.html


EdgeAI: Google Coral with Coral Environmental Sensors and TPU With NiFi and MiNiFi (Updated EFM)

EdgeAI:   Google Coral with Coral Environmental Sensors and TPU With NiFi and MiNiFi


Building MiNiFi IoT Apps with the new Cloudera EFM 


It is very easy to build a drag and drop EdgeAI application with EFM and then push to all your MiNiFi agents.


Cloudera Edge Management CEM-1.1.1
Download the newest CEM today!









NiFi Flow Receiving From MiNiFi Java Agent


In a cluster in my CDP-DC Cluster I consume Kafka messages sent from my remote NiFi gateway to publish alerts to Kafka and push records to Apache HBase and Apache Kudu.  We filter our data with Streaming SQL.


We can use SQL to route, create aggregates like averages, chose a subset of fields and limit data returned.   Using the power of Apache Calcite, Streaming SQL in NiFi is a game changer against Record Data Types including CSV, XML, Avro, Parquet, JSON and Grokable text.   Read and write different formats and convert when your SQL is done.   Or just to SELECT * FROM FLOWFILE to get everything.  



We can see this flow from Atlas as we trace the data lineage and provenance from Kafka topic.



We can search Atlas for Kafka Topics.



From coral Kafka topic to NiFi to Kudu.


Details on Coral Kafka Topic


Examining the Hive Metastore Data on the Coral Kudu Table


NiFi Flow Details in Atlas


Details on Alerts Topic
'


Statistics from Atlas





Example Web Camera Image



 Example JSON Record

[{"cputemp":59,"id":"20200221190718_2632409e-f635-48e7-9f32-aa1333f3b8f9","temperature":"39.44","memory":91.1,"score_1":"0.29","starttime":"02/21/2020 14:07:13","label_1":"hair spray","tempf":"102.34","diskusage":"50373.5 MB","message":"Success","ambient_light":"329.92","host":"coralenv","cpu":34.1,"macaddress":"b8:27:eb:99:64:6b","pressure":"102.76","score_2":"0.14","ip":"127.0.1.1","te":"5.10","systemtime":"02/21/2020 14:07:18","label_2":"syringe","humidity":"10.21"}]


Querying Kudu results in Hue


Pushing Alerts to Slack from NiFi





I am running on Apache NiFi 1.11.1 and wanted to point out a new feature.   Download flow:   Will download the highlighted flow/pgroup as JSON.




Looking at NiFi counters to monitor progress:

We can see how easy it is to ingest IoT sensor data and run AI algorithms on Coral TPUs.



Shell (coralrun.sh)


#!/bin/bash
DATE=$(date +"%Y-%m-%d_%H%M%S")
fswebcam -q -r 1280x720 /opt/demo/images/$DATE.jpg
python3 -W ignore /opt/demo/test.py --image /opt/demo/images/$DATE.jpg 2>/dev/null


Kudu Table DDL

https://github.com/tspannhw/table-ddl


Python 3 (test.py)


import time
import sys
import subprocess
import os
import base64
import uuid
import datetime
import traceback
import base64
import json
from time import gmtime, strftime
import math
import random, string
import time
import psutil
import uuid 
from getmac import get_mac_address
from coral.enviro.board import EnviroBoard
from luma.core.render import canvas
from PIL import Image, ImageDraw, ImageFont
import os
import argparse
from edgetpu.classification.engine import ClassificationEngine

# Importing socket library 
import socket 

start = time.time()
starttf = datetime.datetime.now().strftime('%m/%d/%Y %H:%M:%S')

def ReadLabelFile(file_path):
    with open(file_path, 'r') as f:
        lines = f.readlines()
    ret = {}
    for line in lines:
        pair = line.strip().split(maxsplit=1)
        ret[int(pair[0])] = pair[1].strip()
    return ret

# Google Example Code
def update_display(display, msg):
    with canvas(display) as draw:
        draw.text((0, 0), msg, fill='white')

def getCPUtemperature():
    res = os.popen('vcgencmd measure_temp').readline()
    return(res.replace("temp=","").replace("'C\n",""))

# Get MAC address of a local interfaces
def psutil_iface(iface):
    # type: (str) -> Optional[str]
    import psutil
    nics = psutil.net_if_addrs()
    if iface in nics:
        nic = nics[iface]
        for i in nic:
            if i.family == psutil.AF_LINK:
                return i.address
# /opt/demo/examples-camera/all_models  
row = { }
try:
#i = 1
#while i == 1:
    parser = argparse.ArgumentParser()
    parser.add_argument('--image', help='File path of the image to be recognized.', required=True)
    args = parser.parse_args()
    # Prepare labels.
    labels = ReadLabelFile('/opt/demo/examples-camera/all_models/imagenet_labels.txt')

    # Initialize engine.
    engine = ClassificationEngine('/opt/demo/examples-camera/all_models/inception_v4_299_quant_edgetpu.tflite')

    # Run inference.
    img = Image.open(args.image)

    scores = {}
    kCount = 1

    # Iterate Inference Results
    for result in engine.ClassifyWithImage(img, top_k=5):
        scores['label_' + str(kCount)] = labels[result[0]]
        scores['score_' + str(kCount)] = "{:.2f}".format(result[1])
        kCount = kCount + 1    

    enviro = EnviroBoard()
    host_name = socket.gethostname()
    host_ip = socket.gethostbyname(host_name)
    cpuTemp=int(float(getCPUtemperature()))
    uuid2 = '{0}_{1}'.format(strftime("%Y%m%d%H%M%S",gmtime()),uuid.uuid4())
    usage = psutil.disk_usage("/")
    end = time.time()
    row.update(scores)
    row['host'] = os.uname()[1]
    row['ip'] = host_ip
    row['macaddress'] = psutil_iface('wlan0')
    row['cputemp'] = round(cpuTemp,2)
    row['te'] = "{0:.2f}".format((end-start))
    row['starttime'] = starttf
    row['systemtime'] = datetime.datetime.now().strftime('%m/%d/%Y %H:%M:%S')
    row['cpu'] = psutil.cpu_percent(interval=1)
    row['diskusage'] = "{:.1f} MB".format(float(usage.free) / 1024 / 1024)
    row['memory'] = psutil.virtual_memory().percent
    row['id'] = str(uuid2)
    row['message'] = "Success"
    row['temperature'] = '{0:.2f}'.format(enviro.temperature)
    row['humidity'] = '{0:.2f}'.format(enviro.humidity)
    row['tempf'] = '{0:.2f}'.format((enviro.temperature * 1.8) + 32)    
    row['ambient_light'] = '{0}'.format(enviro.ambient_light)
    row['pressure'] = '{0:.2f}'.format(enviro.pressure)
    msg = 'Temp: {0}'.format(row['temperature'])
    msg += 'IP: {0}'.format(row['ip'])
    update_display(enviro.display, msg)
#    i = 2
except:
    row['message'] = "Error"
print(json.dumps(row)) 

Source Code:



Sensors / Devices / Hardware:

  • Humdity-HDC2010 humidity sensor
  • Light-OPT3002 ambient light sensor
  • Barometric-BMP280 barometric pressure sensor
  • PS3 Eye Camera and Microphone USB
  • Raspberry Pi 3B+
  • Google Coral Environmental Sensor Board
  • Google Coral USB Accelerator TPU

References:



Connecting Apache NiFi to Apache Atlas For Data Governance At Scale in Streaming


Connecting Apache NiFi to Apache Atlas For Data Governance At Scale in Streaming


Once connected you can see NiFi and Kafka flowing to Atlas.

You must add Atlas Report to NiFi cluster.



Add a ReportLineageToAtlas under Controller Settings / Reporting Tasks
You must add URL for Atlas, Authentication method and if basic, username/password.





You need to set Atlas Configuration directory, NiFi URL to use, Lineage Strategy - Complete Path


Another example with an AWS hosted NiFi and Atlas:



IMPORTANT NOTE:   Keep your Atlas Default Cluster Name consistent with other applications for Cloudera clusters, usually the name cm is a great option or default.




You can now see the lineage state:



Configure Atlas to Be Enabled and Have Kafka


Have Atlas Service enabled in NiFi configuration



Example Configuration

You must have access to Atlas Application Properties.

/etc/atlas/conf


atlas-application.properties 

#Generated by Apache NiFi ReportLineageToAtlas ReportingTask at 2020-02-21T17:18:28.493Z
#Fri Feb 21 17:18:28 UTC 2020
atlas.kafka.bootstrap.servers=princeton0.field.hortonworks.com\:9092
atlas.enableTLS=false
atlas.kafka.client.id=ReportLineageToAtlas.687a48e2-0170-1000-0000-00000a0de4ea
atlas.cluster.name=Princeton0

atlas.kafka.security.protocol=PLAINTEXT


atlas-server.properties 

princeton0.field.hortonworks.com:atlas.authentication.method.kerberos=false
princeton0.field.hortonworks.com:atlas.enableTLS=false
princeton0.field.hortonworks.com:atlas.kafka.zookeeper.connection.timeout.ms=30000
princeton0.field.hortonworks.com:atlas.kafka.zookeeper.session.timeout.ms=60000
princeton0.field.hortonworks.com:atlas.kafka.zookeeper.sync.time.ms=20
princeton0.field.hortonworks.com:atlas.server.bind.address=0.0.0.0
princeton0.field.hortonworks.com:atlas.server.http.port=31000

princeton0.field.hortonworks.com:atlas.server.https.port=31443


Running Atlas





















See:   


Example SMM Notification Email


Example SMM Notification Email


Notification id: 12f61ec2-11a3-45ba-b7bb-2416d8a1b076,
Root resource name: ANY,
Root resource type: CONSUMER,
Created timestamp: Tue Jan 07 21:13:45 UTC 2020 : 1578431625199,
Last updated timestamp: Mon Jan 13 13:09:38 UTC 2020 : 1578920978293,
State: RAISED,

Message:

Alert policy : "ALERT IF ( ANY CONSUMER MILLISECONDS_LAPSED_SINCE_CONSUMER_WAS_ACTIVE >= 1200 )" has been evaluated to true Condition : "MILLISECONDS_LAPSED_SINCE_CONSUMER_WAS_ACTIVE>=1200" has been evaluated to true for following CONSUMERS - CONSUMER = "tensorflow-nifi-aws-client" had following attribute values * MILLISECONDS_LAPSED_SINCE_CONSUMER_WAS_ACTIVE = 308208428 - CONSUMER = "atlas" had following attribute values * MILLISECONDS_LAPSED_SINCE_CONSUMER_WAS_ACTIVE = 596819269 - CONSUMER = "nifi-gassensor-aws-client" had following attribute values * MILLISECONDS_LAPSED_SINCE_CONSUMER_WAS_ACTIVE = 310692570 - CONSUMER = "NIFI-TEST-GROUP-1" had following attribute values * MILLISECONDS_LAPSED_SINCE_CONSUMER_WAS_ACTIVE = 231168806 - CONSUMER = "bme680-nifi-client" had following attribute values * MILLISECONDS_LAPSED_SINCE_CONSUMER_WAS_ACTIVE = 291113014 - CONSUMER = "JUNIT_GROUP_TEST" had following attribute values * MILLISECONDS_LAPSED_SINCE_CONSUMER_WAS_ACTIVE = 250840173 - CONSUMER = "__smm-app" had following attribute values * MILLISECONDS_LAPSED_SINCE_CONSUMER_WAS_ACTIVE = 9223372036854775807 - CONSUMER = "nifi-kafka-group" had following attribute values * MILLISECONDS_LAPSED_SINCE_CONSUMER_WAS_ACTIVE = 291111173

Apache Atlas for Monitoring Edge2AI IoT Flows

Apache Atlas for Monitoring Edge2AI IoT Flows