Apache NiFi Operations and Monitoring 101

NiFi Operations






https://community.hortonworks.com/articles/183217/devops-backing-up-apache-nifi-registry-flows.html

Real-Time Energy Monitoring With Apache NiFi

Monitoring Energy




A simple example of using Apache NiFi to receive and process electric data stored as JSON.   We add SQL based alerting and storage to Parquet files with no code.



Cloudera Data Science Workbench



Using PySQL SQL to analyze Parquet files built by electric data stored to HDFS via Apache NiFi.


 We can display alerts easily with Slack.



If you want to know how something works in the Apache NiFi UI you can analyze the REST calls made by JSON.






Python Libraries
pip3 install pyhs100
pip3 install psutil 

Github Repos
Hardware Meter

  • https://www.amazon.com/TP-Link-HS110-Monitoring-Required-Assistant/dp/B0178IC5ZY


Resources




Barcelona DataWorks Summit March 2019

I just returned from this awesome event.   Not even a rough plane trip can damper my spirits after seeing all the amazing things and all that we got to do this year.   It was nice to see familiar faces from attendees from 2017 and 2018 including my friends from Prague and Germany!

Thanks to Andy LoPresto, George Vetticaden, Dinesh Chandrasekhar, Purnima, Nathan, Dan Chaffelson for great pictures, talks, support and being an amazing team for Data in Motionists.





Meetup

The meetup was great and in the same hall as some other amazing meetups at the same time. A great experience for those at Summit early (and open to all people for free).





Highlight:  Dan spinning up NiFi at scale in the audience on Google Cloud on K8 with ease!

Highlight:  Andy’s crushing it MiNiFi and NiFi presentation! I think he has too many RPIs!




Demopalooza








Edge to AI






Apache Deep Learning 201


All the Githubs

All the great Apache NiFi content from Andy LoPresto is here including custom processors:
https://github.com/alopresto/slides/blob/master/dws_barcelona_2019/
https://github.com/alopresto/slides

Exporting and Importing Data from MongoDB in the Cloud with Apache NiFi

We have data stored in a MongoDB from a third party application in Amazon.
Export from MongoDB to Parquet.
Moving data from a single purpose data silo to your Enterprise Data Lake is a common use case. Using Apache NiFi we can easily save your data from this remote silo and bring it streaming into your analytics store for machine learning and deep analytics with Impala, Hive and Spark. It doesn't matter which cloud which are coming from or going to or from cloud to on-premise or various Hybrid situations. Apache NiFi will work in all of these situations which full data lineage and provenance on what it did when.
I have created a mock dataset with Mockaroo. It's all about yummy South Jersey sandwiches.
Our Easy MongoDB Flows to Ingest Mongo data to our Date Lake and another flow to load MongoDB.
In our test, we loaded all the data from our Mock REST API into a MongoDB in the cloud. In the real world an application populated that dataset and now we need to bring it into our central data lake for analytics.
We use Jolt to replace the non-Hadoop friendly built-in MongoDB _id with a friendly name mongo_id.
Storing to Parquet on HDFS is Easy (Let's compress with Snappy)
Connecting to MongoDB is easy, setup a controller and specify the database and collection.
Our MongoDB Connection Service, just enter your URI with username/password@server.
GetHTTP URL
https://my.api.mockaroo.com/hoagie.json

GetHTTP Filename
${filename:append('hoagie.'):append(${now():format('yyyyMMddHHmmSS'):append(${md5}):append('.json')})}

JSON Path Expression
$.*

JOLT Chain
[{
"operation": "shift",
"spec": {
"_id": "mongo_id",
"*": "&"
}
}]

Mongo URI
mongodb://user:userpassword@server.cloud.com:13916/nifi
Many files stored in HDFS as Parquet

Ingesting Multiple IoT Devices with Apache NiFi 1.7 and MiniFi 0.5 (GPS, Temperature, Humidity, DL Results, Images)

Part 1: Multiple Devices with Data
Keywords: Deep Learning On The Edge, GPS Ingestion, Sense-Hat and Rainbow Hat Sensor Ingest, WebCam Image Ingest
In preparation for my talk at Strata in NYC, I am updating my IoT demos for more devices, more data types and more actions.
I have three streams coming from each device including web camera images.
When we are sending data from a MiniFi agent we need to define a port on an Apache NiFi server/cluster to receive it.
So I design my MiniFi flow in the Apache NiFi UI (pretty soon there will be a special designer for this). You then highlight everything there and hit Create Template. You can then export it and convert to config.yml. Again, this process will be automated and connected with the NiFi Registry very shortly to make this less clicks.
This is an example. When you connect to it in your flow you design in Apache NiFi UI you will connect to this port on the Remote Processor Group. If you are manually editing one (okay never do this, but sometimes I have to). You can copy that ID from this Port Details and past it in the file.
Once MiniFi has it's config.yml and it's started, we will start getting messages to that Port.
You can see I have two inputs, one for Movidius and one for Rainbow. I could just have one and route to what I want. It's up to you how you want to segment these flows.
Welcome to Apache NiFi registry v0.2.0, this one works just as well. Very stable, but with some new magic. You can now connect to Git and Github!!!!
We have structured JSON, so let's Infer a schema, clean it up and store it in the Hortonworks Schema Registry. That will make it versioned and REST enabled. I add one for the each of the two JSON file types I am sending from the rainbow device. You can see the schemas in full at the bottom of the article.
The data is received from MiniFi on my local NiFi edge server for simple event processing, filtering and analysis.
I route based on the two types of files, apply their schema, do a simple filter via SQL and send the converted AVRO formatted file to my cloud hosted cluster.
Once I get the data I send it from my edge server to my cloud HDF 3.2 cluster. For images, I send them to my existing image storage processor group. For my other two types of files I convert them to Apache ORC and store in HDFS as Apache Hive tables.
Server Dashboard
Rainbow Processing
Routing is Easy
On High Humidity, Send a Slack Message (Query on humidity value)
We can dive into any flowfile as it travels through the system and examine it's data and metadata.
Now that my data is saved in HDFS with Hive tables on top I can use the latest version of Apache Zeppelin to analyze the data.
I added some maps to Zeppelin via Helium, which is now available in HDP 3.0.
I found a bunch of new chart types, this one could be insightful.

So with the latest NiFi 1.7.1 and HDP 3.0 I can do a lot of interesting things. Next up, let's run some Dockerized TensorFlow application in my HDP 3.0 cluster.
Python Scripts
Schemas

rainbow
  1. {
  2. "type": "record",
  3. "name": "rainbow",
  4. "fields": [
  5. {
  6. "name": "tempf",
  7. "type": "double",
  8. "doc": "Type inferred from '84.15'"
  9. },
  10. {
  11. "name": "cputemp",
  12. "type": "double",
  13. "doc": "Type inferred from '53.0'"
  14. },
  15. {
  16. "name": "pressure",
  17. "type": "double",
  18. "doc": "Type inferred from '101028.56'"
  19. },
  20. {
  21. "name": "host",
  22. "type": "string",
  23. "doc": "Type inferred from '\"rainbow\"'"
  24. },
  25. {
  26. "name": "uniqueid",
  27. "type": "string",
  28. "doc": "Type inferred from '\"rainbow_uuid_20180718234222\"'"
  29. },
  30. {
  31. "name": "ipaddress",
  32. "type": "string",
  33. "doc": "Type inferred from '\"192.168.1.165\"'"
  34. },
  35. {
  36. "name": "temp",
  37. "type": "double",
  38. "doc": "Type inferred from '38.58'"
  39. },
  40. {
  41. "name": "diskfree",
  42. "type": "string",
  43. "doc": "Type inferred from '\"4831.2 MB\"'"
  44. },
  45. {
  46. "name": "altitude",
  47. "type": "double",
  48. "doc": "Type inferred from '80.65'"
  49. },
  50. {
  51. "name": "ts",
  52. "type": "string",
  53. "doc": "Type inferred from '\"2018-07-18 23:42:22\"'"
  54. },
  55. {
  56. "name": "tempf2",
  57. "type": "double",
  58. "doc": "Type inferred from '28.97'"
  59. },
  60. {
  61. "name": "memory",
  62. "type": "double",
  63. "doc": "Type inferred from '32.3'"
  64. }
  65. ]
  66. }
gps
  1. {
  2. "type": "record",
  3. "name": "gps",
  4. "fields": [
  5. {
  6. "name": "speed",
  7. "type": "string",
  8. "doc": "Type inferred from '\"0.066\"'"
  9. },
  10. {
  11. "name": "diskfree",
  12. "type": "string",
  13. "doc": "Type inferred from '\"4830.3 MB\"'"
  14. },
  15. {
  16. "name": "altitude",
  17. "type": "string",
  18. "doc": "Type inferred from '\"43.0\"'"
  19. },
  20. {
  21. "name": "ts",
  22. "type": "string",
  23. "doc": "Type inferred from '\"2018-07-18 23:46:39\"'"
  24. },
  25. {
  26. "name": "cputemp",
  27. "type": "double",
  28. "doc": "Type inferred from '54.0'"
  29. },
  30. {
  31. "name": "latitude",
  32. "type": "string",
  33. "doc": "Type inferred from '\"40.2681555\"'"
  34. },
  35. {
  36. "name": "track",
  37. "type": "string",
  38. "doc": "Type inferred from '\"0.0\"'"
  39. },
  40. {
  41. "name": "memory",
  42. "type": "double",
  43. "doc": "Type inferred from '32.3'"
  44. },
  45. {
  46. "name": "host",
  47. "type": "string",
  48. "doc": "Type inferred from '\"rainbow\"'"
  49. },
  50. {
  51. "name": "uniqueid",
  52. "type": "string",
  53. "doc": "Type inferred from '\"gps_uuid_20180718234640\"'"
  54. },
  55. {
  56. "name": "ipaddress",
  57. "type": "string",
  58. "doc": "Type inferred from '\"192.168.1.165\"'"
  59. },
  60. {
  61. "name": "epd",
  62. "type": "string",
  63. "doc": "Type inferred from '\"nan\"'"
  64. },
  65. {
  66. "name": "utc",
  67. "type": "string",
  68. "doc": "Type inferred from '\"2018-07-18T23:46:40.000Z\"'"
  69. },
  70. {
  71. "name": "epx",
  72. "type": "string",
  73. "doc": "Type inferred from '\"40.135\"'"
  74. },
  75. {
  76. "name": "epy",
  77. "type": "string",
  78. "doc": "Type inferred from '\"42.783\"'"
  79. },
  80. {
  81. "name": "epv",
  82. "type": "string",
  83. "doc": "Type inferred from '\"171.35\"'"
  84. },
  85. {
  86. "name": "ept",
  87. "type": "string",
  88. "doc": "Type inferred from '\"0.005\"'"
  89. },
  90. {
  91. "name": "eps",
  92. "type": "string",
  93. "doc": "Type inferred from '\"85.57\"'"
  94. },
  95. {
  96. "name": "longitude",
  97. "type": "string",
  98. "doc": "Type inferred from '\"-74.529094\"'"
  99. },
  100. {
  101. "name": "mode",
  102. "type": "string",
  103. "doc": "Type inferred from '\"3\"'"
  104. },
  105. {
  106. "name": "time",
  107. "type": "string",
  108. "doc": "Type inferred from '\"2018-07-18T23:46:40.000Z\"'"
  109. },
  110. {
  111. "name": "climb",
  112. "type": "string",
  113. "doc": "Type inferred from '\"0.0\"'"
  114. },
  115. {
  116. "name": "epc",
  117. "type": "string",
  118. "doc": "Type inferred from '\"nan\"'"
  119. }
  120. ]
  121. }
SQL
  1. %sql
  2.  
  3.  
  4. CREATE EXTERNAL TABLE IF NOT EXISTS movidiussense (label5 STRING, runtime STRING, label1 STRING, diskfree STRING, top1 STRING, starttime STRING, label2 STRING, label3 STRING, top3pct STRING, host STRING, top5pct STRING, humidity DOUBLE, currenttime STRING, roll DOUBLE, uuid STRING, label4 STRING, tempf DOUBLE, y DOUBLE, top4pct STRING, cputemp2 DOUBLE, top5 STRING, top2pct STRING, ipaddress STRING, cputemp INT, pitch DOUBLE, x DOUBLE, z DOUBLE, yaw DOUBLE, pressure DOUBLE, top3 STRING, temp DOUBLE, memory DOUBLE, top4 STRING, imagefilename STRING, top1pct STRING, top2 STRING) STORED AS ORC LOCATION '/movidiussense'
  5.  
  6.  
  7. %sql
  8.  
  9.  
  10. CREATE EXTERNAL TABLE IF NOT EXISTS minitensorflow2 (image STRING, ts STRING, host STRING, score STRING, human_string STRING, node_id INT) STORED AS ORC LOCATION '/minifitensorflow2'
  11.  
  12. %sql
  13.  
  14.  
  15. CREATE EXTERNAL TABLE IF NOT EXISTS gps (speed STRING, diskfree STRING, altitude STRING, ts STRING, cputemp DOUBLE, latitude STRING, track STRING, memory DOUBLE, host STRING, uniqueid STRING, ipaddress STRING, epd STRING, utc STRING, epx STRING, epy STRING, epv STRING, ept STRING, eps STRING, longitude STRING, mode STRING, time STRING, climb STRING, epc STRING) STORED AS ORC LOCATION '/gps'
  16.  
  17.  
  18. %sql
  19.  
  20.  
  21. CREATE EXTERNAL TABLE IF NOT EXISTS rainbow (tempf DOUBLE, cputemp DOUBLE, pressure DOUBLE, host STRING, uniqueid STRING, ipaddress STRING, temp DOUBLE, diskfree STRING, altitude DOUBLE, ts STRING,
  22. tempf2 DOUBLE, memory DOUBLE) STORED AS ORC LOCATION '/rainbow'
  23.  
  24.  
  25.  
  26.  

References