Showing posts with label apache-nifi-1.11. Show all posts
Showing posts with label apache-nifi-1.11. Show all posts

Using NiFi CLI to Restore NiFi Flows From Backups

Using NiFi CLI to Restore NiFi Flows From Backups

Please note, Apache NiFi 1.11.4 is now available for download

https://cwiki.apache.org/confluence/display/NIFI/Release+Notes#ReleaseNotes-Version1.11.4

References:



#> registry list-buckets -u http://somesite.compute-1.amazonaws.com:18080

#   Name   Id                                     Description 
-   ----   ------------------------------------   ----------- 
1   IOT    45834964-d022-4f4c-891f-695898e1e5f0   (empty)     
2   IoT    250a5ae5-ced8-4f4e-8b3b-01eb9d47a0d9   (empty)     
3   dev    46b7bab7-400f-44ae-a0e6-7340ff19c96f   (empty)     
4   iot    c594d6bc-7413-4f6a-ba9a-50b8020eec37   (empty)     
5   prod   0bf59d2e-1dd5-4d24-8aa0-0614bf991dc9   (empty)     


#> registry create-flow -verbose -u http://somesite.compute-1.amazonaws.com:18080 -b 250a5ae5-ced8-4f4e-8b3b-01eb9d47a0d9 --flowName iotFlow


a5a4ac59-9aeb-416e-937f-e601ca8beba9


#> registry import-flow-version -verbose -u http://somesite.compute-1.amazonaws.com:18080 -f a5a4ac59-9aeb-416e-937f-e601ca8beba9 -i iot-1.json


#> registry list-flows  -u http://ec2-35-171-154-174.compute-1.amazonaws.com:18080 -b 250a5ae5-ced8-4f4e-8b3b-01eb9d47a0d9

#   Name      Id                                     Description 
-   -------   ------------------------------------   ----------- 
1   iotFlow   a5a4ac59-9aeb-416e-937f-e601ca8beba9   (empty)     




New and Improved: It's NiFi

Apache NiFi 1.11.3


http://nifi.apache.org/download.html

If you have downloaded anything after NiFi 1.10, please upgrade now.   This has some major improvements and some fixes.

Release note highlights can be found here:
https://cwiki.apache.org/confluence/display/NIFI/Release+Notes#ReleaseNotes-Version1.11.3


I am running this now in Anaheim, and it's no Mickey Mouse upgrade.   It's fast and nice.

Some of the more recent upgrades:

https://www.datainmotion.dev/2019/11/exploring-apache-nifi-110-parameters.html

For parameters and stateless, and ability to download a flow as JSON is worth the price of install.



See some more NiFi 1.11 features here:   https://www.datainmotion.dev/2020/02/edgeai-google-coral-with-coral.html


EdgeAI: Google Coral with Coral Environmental Sensors and TPU With NiFi and MiNiFi (Updated EFM)

EdgeAI:   Google Coral with Coral Environmental Sensors and TPU With NiFi and MiNiFi


Building MiNiFi IoT Apps with the new Cloudera EFM 


It is very easy to build a drag and drop EdgeAI application with EFM and then push to all your MiNiFi agents.


Cloudera Edge Management CEM-1.1.1
Download the newest CEM today!









NiFi Flow Receiving From MiNiFi Java Agent


In a cluster in my CDP-DC Cluster I consume Kafka messages sent from my remote NiFi gateway to publish alerts to Kafka and push records to Apache HBase and Apache Kudu.  We filter our data with Streaming SQL.


We can use SQL to route, create aggregates like averages, chose a subset of fields and limit data returned.   Using the power of Apache Calcite, Streaming SQL in NiFi is a game changer against Record Data Types including CSV, XML, Avro, Parquet, JSON and Grokable text.   Read and write different formats and convert when your SQL is done.   Or just to SELECT * FROM FLOWFILE to get everything.  



We can see this flow from Atlas as we trace the data lineage and provenance from Kafka topic.



We can search Atlas for Kafka Topics.



From coral Kafka topic to NiFi to Kudu.


Details on Coral Kafka Topic


Examining the Hive Metastore Data on the Coral Kudu Table


NiFi Flow Details in Atlas


Details on Alerts Topic
'


Statistics from Atlas





Example Web Camera Image



 Example JSON Record

[{"cputemp":59,"id":"20200221190718_2632409e-f635-48e7-9f32-aa1333f3b8f9","temperature":"39.44","memory":91.1,"score_1":"0.29","starttime":"02/21/2020 14:07:13","label_1":"hair spray","tempf":"102.34","diskusage":"50373.5 MB","message":"Success","ambient_light":"329.92","host":"coralenv","cpu":34.1,"macaddress":"b8:27:eb:99:64:6b","pressure":"102.76","score_2":"0.14","ip":"127.0.1.1","te":"5.10","systemtime":"02/21/2020 14:07:18","label_2":"syringe","humidity":"10.21"}]


Querying Kudu results in Hue


Pushing Alerts to Slack from NiFi





I am running on Apache NiFi 1.11.1 and wanted to point out a new feature.   Download flow:   Will download the highlighted flow/pgroup as JSON.




Looking at NiFi counters to monitor progress:

We can see how easy it is to ingest IoT sensor data and run AI algorithms on Coral TPUs.



Shell (coralrun.sh)


#!/bin/bash
DATE=$(date +"%Y-%m-%d_%H%M%S")
fswebcam -q -r 1280x720 /opt/demo/images/$DATE.jpg
python3 -W ignore /opt/demo/test.py --image /opt/demo/images/$DATE.jpg 2>/dev/null


Kudu Table DDL

https://github.com/tspannhw/table-ddl


Python 3 (test.py)


import time
import sys
import subprocess
import os
import base64
import uuid
import datetime
import traceback
import base64
import json
from time import gmtime, strftime
import math
import random, string
import time
import psutil
import uuid 
from getmac import get_mac_address
from coral.enviro.board import EnviroBoard
from luma.core.render import canvas
from PIL import Image, ImageDraw, ImageFont
import os
import argparse
from edgetpu.classification.engine import ClassificationEngine

# Importing socket library 
import socket 

start = time.time()
starttf = datetime.datetime.now().strftime('%m/%d/%Y %H:%M:%S')

def ReadLabelFile(file_path):
    with open(file_path, 'r') as f:
        lines = f.readlines()
    ret = {}
    for line in lines:
        pair = line.strip().split(maxsplit=1)
        ret[int(pair[0])] = pair[1].strip()
    return ret

# Google Example Code
def update_display(display, msg):
    with canvas(display) as draw:
        draw.text((0, 0), msg, fill='white')

def getCPUtemperature():
    res = os.popen('vcgencmd measure_temp').readline()
    return(res.replace("temp=","").replace("'C\n",""))

# Get MAC address of a local interfaces
def psutil_iface(iface):
    # type: (str) -> Optional[str]
    import psutil
    nics = psutil.net_if_addrs()
    if iface in nics:
        nic = nics[iface]
        for i in nic:
            if i.family == psutil.AF_LINK:
                return i.address
# /opt/demo/examples-camera/all_models  
row = { }
try:
#i = 1
#while i == 1:
    parser = argparse.ArgumentParser()
    parser.add_argument('--image', help='File path of the image to be recognized.', required=True)
    args = parser.parse_args()
    # Prepare labels.
    labels = ReadLabelFile('/opt/demo/examples-camera/all_models/imagenet_labels.txt')

    # Initialize engine.
    engine = ClassificationEngine('/opt/demo/examples-camera/all_models/inception_v4_299_quant_edgetpu.tflite')

    # Run inference.
    img = Image.open(args.image)

    scores = {}
    kCount = 1

    # Iterate Inference Results
    for result in engine.ClassifyWithImage(img, top_k=5):
        scores['label_' + str(kCount)] = labels[result[0]]
        scores['score_' + str(kCount)] = "{:.2f}".format(result[1])
        kCount = kCount + 1    

    enviro = EnviroBoard()
    host_name = socket.gethostname()
    host_ip = socket.gethostbyname(host_name)
    cpuTemp=int(float(getCPUtemperature()))
    uuid2 = '{0}_{1}'.format(strftime("%Y%m%d%H%M%S",gmtime()),uuid.uuid4())
    usage = psutil.disk_usage("/")
    end = time.time()
    row.update(scores)
    row['host'] = os.uname()[1]
    row['ip'] = host_ip
    row['macaddress'] = psutil_iface('wlan0')
    row['cputemp'] = round(cpuTemp,2)
    row['te'] = "{0:.2f}".format((end-start))
    row['starttime'] = starttf
    row['systemtime'] = datetime.datetime.now().strftime('%m/%d/%Y %H:%M:%S')
    row['cpu'] = psutil.cpu_percent(interval=1)
    row['diskusage'] = "{:.1f} MB".format(float(usage.free) / 1024 / 1024)
    row['memory'] = psutil.virtual_memory().percent
    row['id'] = str(uuid2)
    row['message'] = "Success"
    row['temperature'] = '{0:.2f}'.format(enviro.temperature)
    row['humidity'] = '{0:.2f}'.format(enviro.humidity)
    row['tempf'] = '{0:.2f}'.format((enviro.temperature * 1.8) + 32)    
    row['ambient_light'] = '{0}'.format(enviro.ambient_light)
    row['pressure'] = '{0:.2f}'.format(enviro.pressure)
    msg = 'Temp: {0}'.format(row['temperature'])
    msg += 'IP: {0}'.format(row['ip'])
    update_display(enviro.display, msg)
#    i = 2
except:
    row['message'] = "Error"
print(json.dumps(row)) 

Source Code:



Sensors / Devices / Hardware:

  • Humdity-HDC2010 humidity sensor
  • Light-OPT3002 ambient light sensor
  • Barometric-BMP280 barometric pressure sensor
  • PS3 Eye Camera and Microphone USB
  • Raspberry Pi 3B+
  • Google Coral Environmental Sensor Board
  • Google Coral USB Accelerator TPU

References:



EdgeAI: Jetson Nano with MiNiFi C++ Agent

Build and Utilizing The Apache NiFi - MiNiFi C++ Agent For Jetson Nano

(EdgeAI:   Jetson Nano with MiNiFi C++ Agent)


source.hostname
jetsonnano

source.ipv4
192.168.1.217

GetUSBCamera

FPS: .5


Bootstrap and Build

/opt/demo/nifi-minifi-cpp-source/build

bootstrap.sh

Options:  Kafka, OpenCV, TensorFlow, USB Camera


****************************************
 Select MiNiFi C++ Features to toggle.
****************************************
A. Persistent Repositories .....Enabled
B. Lib Curl Features ...........Enabled
C. Lib Archive Features ........Enabled
D. Execute Script support ......Enabled
E. Expression Language support .Enabled
F. Kafka support ...............Enabled
G. PCAP support ................Disabled
H. USB Camera support ..........Enabled
I. GPS support .................Disabled
J. TensorFlow Support ..........Disabled
K. Bustache Support ............Disabled
L. MQTT Support ................Enabled
M. SQLite Support ..............Disabled
N. Python Support ..............Enabled
O. COAP Support ................Enabled
S. SFTP Support ................Enabled
V. AWS Support .................Disabled
T. OpenCV Support ..............Enabled
U. OPC-UA Support...............Enabled

****************************************

sudo apt-get install libcurl-dev libcurl4-openssl-dev -y
make


We can see when data arrives in NiFi from a MiNiFi Agent.



 We can publish to Kafka directly from our MiNiFi C++ agent.


If CEM/Edge Flow Manager is a mystery to you, check out the live Swagger REST Documentation.


With MiNiFi C++ I can add a USB Camera.




 In NiFi we can see the Host Information that MiNiFi attached.



Example Data



{"uuid": "nano_uuid_crr_20200218002610", "ipaddress": "192.168.1.217", "top1pct": 54.833984375, "top1": "cab, hack, taxi, taxicab", "cputemp": "45.5", "gputemp": "43.5", "gputempf": "110", "cputempf": "114", "runtime": "4", "host": "jetsonnano", "filename": "/opt/demo/images/image_esq_20200218002610.jpg", "imageinput": "/opt/demo/images/2020-02-17_1926.jpg", "host_name": "jetsonnano", "macaddress": "ec:08:6b:18:0d:7f", "end": "1581985574.6246474", "te": "4.158604383468628", "systemtime": "02/17/2020 19:26:14", "cpu": 51.8, "diskusage": "5479.7 MB", "memory": 71.4, "id": "20200218002610_8a12dd65-1038-41ac-b923-98fc907f5be0"}

Example Config.yml Section


  name: AppendHostInfo
  class: org.apache.nifi.minifi.processors.AppendHostInfo
  max concurrent tasks: 1
  scheduling strategy: TIMER_DRIVEN
  scheduling period: 1000 ms
  penalization period: 30000 ms
  yield period: 1000 ms
  run duration nanos: 0
  auto-terminated relationships list: []
  Properties:
    Hostname Attribute: source.hostname
    IP Attribute: source.ipv4
    Network Interface Name: wlan0

Example Output


[2020-02-11 19:35:09.116] [org::apache::nifi::minifi::processors::ExecuteProcess] [info] Execute Command /opt/demo/rundemo.sh 
[2020-02-11 19:35:11.275] [org::apache::nifi::minifi::c2::C2Agent] [info] Checking 0 triggers
[2020-02-11 19:35:13.742] [org::apache::nifi::minifi::c2::C2Agent] [info] Checking 0 triggers
[2020-02-11 19:35:15.568] [org::apache::nifi::minifi::core::ProcessSession] [info] Transferring 899b5964-4d2f-11ea-8b9a-6e260e221e3d from ExecuteProcess - Python to relationship success
[2020-02-11 19:35:15.568] [org::apache::nifi::minifi::processors::ExecuteProcess] [info] Execute Command Complete /opt/demo/rundemo.sh status 0 pid 31004
[2020-02-11 19:35:15.569] [org::apache::nifi::minifi::core::ProcessSession] [info] Transferring 899b5964-4d2f-11ea-8b9a-6e260e221e3d from AppendHostInfo to relationship success
[2020-02-11 19:35:15.649] [org::apache::nifi::minifi::sitetosite::SiteToSiteClient] [info] Site to Site transaction 4d0b460e-e4f6-4ca1-8c56-30d310a0712b sent flow 1flow records, with total size 3581
[2020-02-11 19:35:15.785] [org::apache::nifi::minifi::sitetosite::HttpSiteToSiteClient] [info] Site to Site closed transaction 4d0b460e-e4f6-4ca1-8c56-30d310a0712b
[2020-02-11 19:35:15.841] [org::apache::nifi::minifi::sitetosite::SiteToSiteClient] [info] Site2Site transaction 4d0b460e-e4f6-4ca1-8c56-30d310a0712b peer finished transaction
[2020-02-11 19:35:15.841] [org::apache::nifi::minifi::io::HttpStream] [warning] Future status already cleared for http://ec2-35-171-154-174.compute-1.amazonaws.com:8080/nifi-api/data-transfer/input-ports/17979d5f-0170-1000-0000-000011f1cc00/transactions/4d0b460e-e4f6-4ca1-8c56-30d310a0712b/flow-files, continuing
[2020-02-11 19:35:16.236] [org::apache::nifi::minifi::c2::C2Agent] [info] Checking 0 triggers
[2020-02-11 19:35:16.263] [org::apache::nifi::minifi::core::ProcessSession] [info] Transferring 8a05413a-4d2f-11ea-8b9a-6e260e221e3d from TailFile to relationship success
[2020-02-11 19:35:16.264] [org::apache::nifi::minifi::processors::TailFile] [info] TailFile nano.log for 616 bytes
[2020-02-11 19:35:16.273] [org::apache::nifi::minifi::core::ProcessSession] [info] Transferring 8a05413a-4d2f-11ea-8b9a-6e260e221e3d from AppendHostInfo to relationship success
[2020-02-11 19:35:16.274] [org::apache::nifi::minifi::core::ProcessSession] [info] Transferring 8a05413a-4d2f-11ea-8b9a-6e260e221e3d from PublishKafka to relationship success
[2020-02-11 19:35:18.748] [org::apache::nifi::minifi::c2::C2Agent] [info] Checking 0 triggers
[2020-02-11 19:35:21.260] [org::apache::nifi::minifi::c2::C2Agent] [info] Checking 0 triggers

Using Apache NiFi - MiNiFi C++ Agent Elsewhere

I am working on a Jetbot robot powered by NVidia Jetson Nano that will use the MiNiFi C++ agent.








References







Quick Tip: NiFi JSON Cleanup

From Vasilis Vagias:


evaluateJsonPath immediately after the InvokeHTTP and replace the flowfile content with the $.response then NiFi unescapes and removes the additional quotes auto magically.

This is helpful for occasions when CDSW returns JSON or other REST APIs which may double encode JSON files.