Using NiFi CLI to Restore NiFi Flows From Backups
Using NiFi CLI to Restore NiFi Flows From Backups
Please note, Apache NiFi 1.11.4 is now available for download.
https://cwiki.apache.org/confluence/display/NIFI/Release+Notes#ReleaseNotes-Version1.11.4
References:
#> registry list-buckets -u http://somesite.compute-1.amazonaws.com:18080
# Name Id Description
- ---- ------------------------------------ -----------
1 IOT 45834964-d022-4f4c-891f-695898e1e5f0 (empty)
2 IoT 250a5ae5-ced8-4f4e-8b3b-01eb9d47a0d9 (empty)
3 dev 46b7bab7-400f-44ae-a0e6-7340ff19c96f (empty)
4 iot c594d6bc-7413-4f6a-ba9a-50b8020eec37 (empty)
5 prod 0bf59d2e-1dd5-4d24-8aa0-0614bf991dc9 (empty)
#> registry create-flow -verbose -u http://somesite.compute-1.amazonaws.com:18080 -b 250a5ae5-ced8-4f4e-8b3b-01eb9d47a0d9 --flowName iotFlow
a5a4ac59-9aeb-416e-937f-e601ca8beba9
#> registry import-flow-version -verbose -u http://somesite.compute-1.amazonaws.com:18080 -f a5a4ac59-9aeb-416e-937f-e601ca8beba9 -i iot-1.json
#> registry list-flows -u http://ec2-35-171-154-174.compute-1.amazonaws.com:18080 -b 250a5ae5-ced8-4f4e-8b3b-01eb9d47a0d9
# Name Id Description
- ------- ------------------------------------ -----------
1 iotFlow a5a4ac59-9aeb-416e-937f-e601ca8beba9 (empty)
Please note, Apache NiFi 1.11.4 is now available for download.
https://cwiki.apache.org/confluence/display/NIFI/Release+Notes#ReleaseNotes-Version1.11.4
References:
- https://cwiki.apache.org/confluence/display/NIFI/Release+Notes#ReleaseNotes-Version1.11.1
- https://pierrevillard.com/2018/04/09/automate-workflow-deployment-in-apache-nifi-with-the-nifi-registry/comment-page-1/
- https://nifi.apache.org/docs/nifi-docs/html/toolkit-guide.html#nifi_CLI
#> registry list-buckets -u http://somesite.compute-1.amazonaws.com:18080
# Name Id Description
- ---- ------------------------------------ -----------
1 IOT 45834964-d022-4f4c-891f-695898e1e5f0 (empty)
2 IoT 250a5ae5-ced8-4f4e-8b3b-01eb9d47a0d9 (empty)
3 dev 46b7bab7-400f-44ae-a0e6-7340ff19c96f (empty)
4 iot c594d6bc-7413-4f6a-ba9a-50b8020eec37 (empty)
5 prod 0bf59d2e-1dd5-4d24-8aa0-0614bf991dc9 (empty)
#> registry create-flow -verbose -u http://somesite.compute-1.amazonaws.com:18080 -b 250a5ae5-ced8-4f4e-8b3b-01eb9d47a0d9 --flowName iotFlow
a5a4ac59-9aeb-416e-937f-e601ca8beba9
#> registry import-flow-version -verbose -u http://somesite.compute-1.amazonaws.com:18080 -f a5a4ac59-9aeb-416e-937f-e601ca8beba9 -i iot-1.json
#> registry list-flows -u http://ec2-35-171-154-174.compute-1.amazonaws.com:18080 -b 250a5ae5-ced8-4f4e-8b3b-01eb9d47a0d9
# Name Id Description
- ------- ------------------------------------ -----------
1 iotFlow a5a4ac59-9aeb-416e-937f-e601ca8beba9 (empty)
Fixing Linux Webcams
v4l2-ctl --list-devices
v4l2-ctl -d /dev/video0 --list-ctrls
v4l2-ctl --get-ctrl=white_balance_temperature
v4l2-ctl --set-ctrl=white_balance_temperature=4000
v4l2-ctl --set-ctrl=white_balance_temperature=4000 -d /dev/video0
v4l2-ctl --set-ctrl=white_balance_temperature_auto=1
v4l2-ctl --set-ctrl=white_balance_temperature_auto=0
v4l2-ctl --set-ctrl=white_balance_temperature_auto=4000
v4l2-ctl --set-ctrl=exposure_auto=3
v4l2-ctl --set-ctrl=exposure_auto_priority=0
v4l2-ctl --set-ctrl=exposure_absolute=250
v4l2-ctl --set-ctrl=exposure_absolute=0
v4l2-ctl --set-ctrl=exposure_absolute=250
v4l2-ctl --set-ctrl=gain=0
v4l2-ctl -d /dev/video0 --list-ctrls
v4l2-ctl --set-ctrl=white_balance_temperature_auto=4000
v4l2-ctl --set-ctrl=white_balance_temperature_auto=0
v4l2-ctl --set-ctrl=white_balance_temperature=4000
v4l2-ctl -d /dev/video0 --list-ctrls
This article is great: https://www.kurokesu.com/main/2016/01/16/manual-usb-camera-settings-in-linux/
v4l2-ctl -d /dev/video0 --list-ctrls
brightness 0x00980900 (int) : min=0 max=255 step=1 default=128 value=128
contrast 0x00980901 (int) : min=0 max=255 step=1 default=128 value=128
saturation 0x00980902 (int) : min=0 max=255 step=1 default=128 value=128
white_balance_temperature_auto 0x0098090c (bool) : default=1 value=0
gain 0x00980913 (int) : min=0 max=255 step=1 default=0 value=0
power_line_frequency 0x00980918 (menu) : min=0 max=2 default=2 value=2
white_balance_temperature 0x0098091a (int) : min=2000 max=6500 step=1 default=4000 value=4000
sharpness 0x0098091b (int) : min=0 max=255 step=1 default=128 value=128
backlight_compensation 0x0098091c (int) : min=0 max=1 step=1 default=0 value=0
exposure_auto 0x009a0901 (menu) : min=0 max=3 default=3 value=3
exposure_absolute 0x009a0902 (int) : min=3 max=2047 step=1 default=250 value=83 flags=inactive
exposure_auto_priority 0x009a0903 (bool) : default=0 value=0
pan_absolute 0x009a0908 (int) : min=-36000 max=36000 step=3600 default=0 value=0
tilt_absolute 0x009a0909 (int) : min=-36000 max=36000 step=3600 default=0 value=0
focus_absolute 0x009a090a (int) : min=0 max=250 step=5 default=0 value=0 flags=inactive
focus_auto 0x009a090c (bool) : default=1 value=1
zoom_absolute 0x009a090d (int) : min=100 max=500 step=1 default=100 value=100
v4l2-ctl --list-devices
HD Pro Webcam C920 (usb-70090000.xusb-2.2):
/dev/video0
ODPI's OpenDS4All - Open Source Data Science Content To Teach the World
OpenDS4All
Start learning now:
ODPI has officially announced this recently and it looks great.
There is a ton of amazing materials including slides, notes, documentation, homework, exercises and Jupyter notebooks covering Data Wrangling, Data Science, the Basics and Apache Spark.
This“starter set” of training materials can help you build a Data Science program for yourself, your company, your university or your non-profit. I am going to bring some of these to my meetups and hopefully can help give back with new materials, updates and suggestions.
These are college level materials developed by the University of Pennsylvania and open source via the ODPI with IBM leading. The code and slides look great. I can see these helping to enable the world adding another million desperately needed Data Scientists, Data Engineers and Data Science Enabled professionals.
I have been running some of this via Cloudera Machine Learning in my CDP cluster in AWS and it works great. This is really well made. I am hoping to create a module on Streaming Data Science to contribute.
New and Improved: It's NiFi
Apache NiFi 1.11.3
http://nifi.apache.org/download.html
If you have downloaded anything after NiFi 1.10, please upgrade now. This has some major improvements and some fixes.
Release note highlights can be found here:
https://cwiki.apache.org/confluence/display/NIFI/Release+Notes#ReleaseNotes-Version1.11.3
I am running this now in Anaheim, and it's no Mickey Mouse upgrade. It's fast and nice.
Some of the more recent upgrades:
https://www.datainmotion.dev/2019/11/exploring-apache-nifi-110-parameters.html
For parameters and stateless, and ability to download a flow as JSON is worth the price of install.
See some more NiFi 1.11 features here: https://www.datainmotion.dev/2020/02/edgeai-google-coral-with-coral.html
EdgeAI: Google Coral with Coral Environmental Sensors and TPU With NiFi and MiNiFi (Updated EFM)
EdgeAI: Google Coral with Coral Environmental Sensors and TPU With NiFi and MiNiFi
Building MiNiFi IoT Apps with the new Cloudera EFM
It is very easy to build a drag and drop EdgeAI application with EFM and then push to all your MiNiFi agents.
Cloudera Edge Management CEM-1.1.1
Download the newest CEM today!
Download the newest CEM today!
NiFi Flow Receiving From MiNiFi Java Agent
In a cluster in my CDP-DC Cluster I consume Kafka messages sent from my remote NiFi gateway to publish alerts to Kafka and push records to Apache HBase and Apache Kudu. We filter our data with Streaming SQL.
We can use SQL to route, create aggregates like averages, chose a subset of fields and limit data returned. Using the power of Apache Calcite, Streaming SQL in NiFi is a game changer against Record Data Types including CSV, XML, Avro, Parquet, JSON and Grokable text. Read and write different formats and convert when your SQL is done. Or just to SELECT * FROM FLOWFILE to get everything.
We can see this flow from Atlas as we trace the data lineage and provenance from Kafka topic.
We can search Atlas for Kafka Topics.
From coral Kafka topic to NiFi to Kudu.
Details on Coral Kafka Topic
Examining the Hive Metastore Data on the Coral Kudu Table
NiFi Flow Details in Atlas
Details on Alerts Topic
Statistics from Atlas
Example Web Camera Image
Example JSON Record
[{"cputemp":59,"id":"20200221190718_2632409e-f635-48e7-9f32-aa1333f3b8f9","temperature":"39.44","memory":91.1,"score_1":"0.29","starttime":"02/21/2020 14:07:13","label_1":"hair spray","tempf":"102.34","diskusage":"50373.5 MB","message":"Success","ambient_light":"329.92","host":"coralenv","cpu":34.1,"macaddress":"b8:27:eb:99:64:6b","pressure":"102.76","score_2":"0.14","ip":"127.0.1.1","te":"5.10","systemtime":"02/21/2020 14:07:18","label_2":"syringe","humidity":"10.21"}]
Querying Kudu results in Hue
Pushing Alerts to Slack from NiFi
I am running on Apache NiFi 1.11.1 and wanted to point out a new feature. Download flow: Will download the highlighted flow/pgroup as JSON.
Looking at NiFi counters to monitor progress:
We can see how easy it is to ingest IoT sensor data and run AI algorithms on Coral TPUs.
Shell (coralrun.sh)
#!/bin/bash
DATE=$(date +"%Y-%m-%d_%H%M%S")
fswebcam -q -r 1280x720 /opt/demo/images/$DATE.jpg
python3 -W ignore /opt/demo/test.py --image /opt/demo/images/$DATE.jpg 2>/dev/null
Kudu Table DDL
https://github.com/tspannhw/table-ddl
Python 3 (test.py)
import timeimport sysimport subprocessimport osimport base64import uuidimport datetimeimport tracebackimport base64import jsonfrom time import gmtime, strftimeimport mathimport random, stringimport timeimport psutilimport uuidfrom getmac import get_mac_addressfrom coral.enviro.board import EnviroBoardfrom luma.core.render import canvasfrom PIL import Image, ImageDraw, ImageFontimport osimport argparsefrom edgetpu.classification.engine import ClassificationEngine# Importing socket libraryimport socketstart = time.time()starttf = datetime.datetime.now().strftime('%m/%d/%Y %H:%M:%S')def ReadLabelFile(file_path):with open(file_path, 'r') as f:lines = f.readlines()ret = {}for line in lines:pair = line.strip().split(maxsplit=1)ret[int(pair[0])] = pair[1].strip()return ret# Google Example Codedef update_display(display, msg):with canvas(display) as draw:draw.text((0, 0), msg, fill='white')def getCPUtemperature():res = os.popen('vcgencmd measure_temp').readline()return(res.replace("temp=","").replace("'C\n",""))# Get MAC address of a local interfacesdef psutil_iface(iface):# type: (str) -> Optional[str]import psutilnics = psutil.net_if_addrs()if iface in nics:nic = nics[iface]for i in nic:if i.family == psutil.AF_LINK:return i.address# /opt/demo/examples-camera/all_modelsrow = { }try:#i = 1#while i == 1:parser = argparse.ArgumentParser()parser.add_argument('--image', help='File path of the image to be recognized.', required=True)args = parser.parse_args()# Prepare labels.labels = ReadLabelFile('/opt/demo/examples-camera/all_models/imagenet_labels.txt')# Initialize engine.engine = ClassificationEngine('/opt/demo/examples-camera/all_models/inception_v4_299_quant_edgetpu.tflite')# Run inference.img = Image.open(args.image)scores = {}kCount = 1# Iterate Inference Resultsfor result in engine.ClassifyWithImage(img, top_k=5):scores['label_' + str(kCount)] = labels[result[0]]scores['score_' + str(kCount)] = "{:.2f}".format(result[1])kCount = kCount + 1enviro = EnviroBoard()host_name = socket.gethostname()host_ip = socket.gethostbyname(host_name)cpuTemp=int(float(getCPUtemperature()))uuid2 = '{0}_{1}'.format(strftime("%Y%m%d%H%M%S",gmtime()),uuid.uuid4())usage = psutil.disk_usage("/")end = time.time()row.update(scores)row['host'] = os.uname()[1]row['ip'] = host_iprow['macaddress'] = psutil_iface('wlan0')row['cputemp'] = round(cpuTemp,2)row['te'] = "{0:.2f}".format((end-start))row['starttime'] = starttfrow['systemtime'] = datetime.datetime.now().strftime('%m/%d/%Y %H:%M:%S')row['cpu'] = psutil.cpu_percent(interval=1)row['diskusage'] = "{:.1f} MB".format(float(usage.free) / 1024 / 1024)row['memory'] = psutil.virtual_memory().percentrow['id'] = str(uuid2)row['message'] = "Success"row['temperature'] = '{0:.2f}'.format(enviro.temperature)row['humidity'] = '{0:.2f}'.format(enviro.humidity)row['tempf'] = '{0:.2f}'.format((enviro.temperature * 1.8) + 32)row['ambient_light'] = '{0}'.format(enviro.ambient_light)row['pressure'] = '{0:.2f}'.format(enviro.pressure)msg = 'Temp: {0}'.format(row['temperature'])msg += 'IP: {0}'.format(row['ip'])update_display(enviro.display, msg)# i = 2except:row['message'] = "Error"print(json.dumps(row))
Source Code:
Sensors / Devices / Hardware:
- Humdity-HDC2010 humidity sensor
- Light-OPT3002 ambient light sensor
- Barometric-BMP280 barometric pressure sensor
- PS3 Eye Camera and Microphone USB
- Raspberry Pi 3B+
- Google Coral Environmental Sensor Board
- Google Coral USB Accelerator TPU
References:
- https://coral.ai/docs/enviro-board/get-started/
- https://coral.ai/products/accelerator/
- https://coral.ai/docs/enviro-board/datasheet/
- https://github.com/tspannhw/nifi-minifi-coral-env
- https://github.com/tspannhw/nifi-minifi-coral
- https://www.datainmotion.dev/2019/08/google-coral-tpu-with-edge-devices-and.html
- https://github.com/tspannhw/minifi-grove-sensors
- https://coral.ai/docs/enviro-board/get-started/
- https://coral.ai/products/accelerator/
- https://github.com/google/mediapipe/tree/master/mediapipe/examples/coral
- https://github.com/tensorflow/examples/tree/master/lite/examples/image_classification/raspberry_pi
- https://github.com/google-coral/examples-camera
- https://github.com/google-coral/project-keyword-spotter
- https://github.com/google/mediapipe/tree/master/mediapipe/examples/coral
Connecting Apache NiFi to Apache Atlas For Data Governance At Scale in Streaming
Connecting Apache NiFi to Apache Atlas For Data Governance At Scale in Streaming
Once connected you can see NiFi and Kafka flowing to Atlas.
You must add Atlas Report to NiFi cluster.
Add a ReportLineageToAtlas under Controller Settings / Reporting Tasks
You must add URL for Atlas, Authentication method and if basic, username/password.
You need to set Atlas Configuration directory, NiFi URL to use, Lineage Strategy - Complete Path
Another example with an AWS hosted NiFi and Atlas:
IMPORTANT NOTE: Keep your Atlas Default Cluster Name consistent with other applications for Cloudera clusters, usually the name cm is a great option or default.
You can now see the lineage state:
Configure Atlas to Be Enabled and Have Kafka
Have Atlas Service enabled in NiFi configuration
Example Configuration
You must have access to Atlas Application Properties.
/etc/atlas/conf
atlas-application.properties
#Generated by Apache NiFi ReportLineageToAtlas ReportingTask at 2020-02-21T17:18:28.493Z
#Fri Feb 21 17:18:28 UTC 2020
atlas.kafka.bootstrap.servers=princeton0.field.hortonworks.com\:9092
atlas.enableTLS=false
atlas.kafka.client.id=ReportLineageToAtlas.687a48e2-0170-1000-0000-00000a0de4ea
atlas.cluster.name=Princeton0
atlas.kafka.security.protocol=PLAINTEXT
atlas-server.properties
princeton0.field.hortonworks.com:atlas.authentication.method.kerberos=false
princeton0.field.hortonworks.com:atlas.enableTLS=false
princeton0.field.hortonworks.com:atlas.kafka.zookeeper.connection.timeout.ms=30000
princeton0.field.hortonworks.com:atlas.kafka.zookeeper.session.timeout.ms=60000
princeton0.field.hortonworks.com:atlas.kafka.zookeeper.sync.time.ms=20
princeton0.field.hortonworks.com:atlas.server.bind.address=0.0.0.0
princeton0.field.hortonworks.com:atlas.server.http.port=31000
princeton0.field.hortonworks.com:atlas.server.https.port=31443
Running Atlas
See:
- https://atlas.apache.org/#/
- https://medium.com/@kandalkarbhushan/integrating-nifi-with-atlas-in-secured-environment-c1b0f3cb7318
- https://docs.cloudera.com/HDPDocuments/HDF3/HDF-3.1.1/bk_installing-hdf-and-hdp/content/nifi-atlas.html
- https://docs.cloudera.com/HDPDocuments/HDF3/HDF-3.4.1.1/installing-hdf-and-hdp-ppc/content/configure_nifi_for_atlas_integration.html
- https://docs.cloudera.com/runtime/7.0.2/cdp-governance-overview/topics/atlas-overview.html
- https://www.datainmotion.dev/2020/02/apache-atlas-for-monitoring-edge2ai-iot.html
- https://docs.cloudera.com/cloudera-manager/7.0.3/reference/topics/cm_props_cr703_atlas.html
Example SMM Notification Email
Example SMM Notification Email
Root resource name: ANY,
Root resource type: CONSUMER,
Created timestamp: Tue Jan 07 21:13:45 UTC 2020 : 1578431625199,Last updated timestamp: Mon Jan 13 13:09:38 UTC 2020 : 1578920978293,
State: RAISED,
Message:
Alert policy : "ALERT IF ( ANY CONSUMER MILLISECONDS_LAPSED_SINCE_
Subscribe to:
Posts (Atom)