Monitoring Cloudera Edge Flow Manager and Cloudera Streams Messaging Manager with Apache NiFi


Monitoring Cloudera Edge Flow Manager and Cloudera Streams Messaging Manager with Apache NiFi


http://SERVER:10080/efm/actuator/health

{"status":{"code":"UP","description":""},"details":{"db":{"status":{"code":"UP","description":""},"details":{"database":"MySQL","hello":1}},"diskSpace":{"status":{"code":"UP","description":""},"details":{"total":1073729220608,"free":1023731712000,"threshold":10485760}}}}




http://SERVER:10080/efm/actuator/heapdump


http://SERVER:10080/efm/actuator/env


http://SERVER:10080/efm/actuator/httptrace 


Check REST API Made Available During EFM Startup

2019-08-21 22:30:25.045  INFO 100056 --- [           main] o.e.jetty.server.AbstractConnector       : Started ServerConnector@747d1932{HTTP/1.1,[http/1.1]}{cloudera:10080}
2019-08-21 22:30:25.047  INFO 100056 --- [           main] o.s.b.web.embedded.jetty.JettyWebServer  : Jetty started on port(s) 10080 (http/1.1) with context path '/efm'
2019-08-21 22:30:25.050  INFO 100056 --- [           main] com.cloudera.cem.efm.C2Application       : Started C2Application in 10.102 seconds (JVM running for 10.741)
2019-08-21 22:30:25.056  INFO 100056 --- [           main] com.cloudera.cem.efm.C2Application       : The Edge Flow Manager has started. Services available at the following URLs:
2019-08-21 22:30:25.056  INFO 100056 --- [           main] com.cloudera.cem.efm.C2Application       : >>> Access User Interface: http://cloudera:10080/efm/ui
2019-08-21 22:30:25.056  INFO 100056 --- [           main] com.cloudera.cem.efm.C2Application       : >>> Base URL for REST API: http://cloudera:10080/efm/api
2019-08-21 22:30:25.057  INFO 100056 --- [           main] com.cloudera.cem.efm.C2Application       : >>> Swagger REST API docs: http://cloudera:10080/efm/swagger
2019-08-21 22:30:25.057  INFO 100056 --- [           main] com.cloudera.cem.efm.C2Application       : >>> Status and management: http:/cloudera:10080/efm/actuator

Agent Classes
http://SERVER:10080/efm/api/agent-classes

[
  {
    "name": "iot-1",
    "agentManifests": [
      "agent-manifest-id"
    ]
  },
  {
    "name": "nanojetsonjava",
    "agentManifests": [
      "agent-manifest-id"
    ]
  },
  {
    "name": "raspianjava",
    "agentManifests": [
      "agent-manifest-id"
    ]
  },
  {
    "name": "rpi3javamovidiussensehat",
    "agentManifests": [
      "agent-manifest-id"
    ]
  },
  {
    "name": "rpi4java",
    "agentManifests": [
      "agent-manifest-id"
    ]
  },
  {
    "name": "rpijavamovidiussensehat",
    "agentManifests": [
      "agent-manifest-id"
    ]
  }
]

EFM Agent Manifests
http://server:10080/efm/api/agent-manifests

EFM Agents
http://server:10080/efm/api/agents

EFM C2 Configuration - NiFi Registry
http://server:10080/efm/api/c2-configuration/nifi-registry

EFM Designer Flows
http://server:10080/efm/api/designer/flows

EFM API FLOWS
http://server:10080/efm/api/flows


Monitoring SMM Metrics with NiFi

Aggregated Topics for Last Hour

http://server:9991/api/v1/admin/metrics/aggregated/topics?duration=LAST_ONE_HOUR&state=all

Aggregated Brokers for the Last Hour

http://server:9991/api/v1/admin/metrics/aggregated/brokers?duration=LAST_ONE_HOUR





EFM Events
http://server:10080/efm/api/events
EFM Event by Event ID
http://server:10080/efm/api/events/9db708ca-3b7e-42bf-941a-a945fefa6fa6
Get a Heartbeat from a Device by HBDI
http://server:10080/efm/api/heartbeats/HBID
For Auto-configuring your processor, list of fields available

EFM Events / Fields
http://server:10080/efm/api/events/fields
What Flows available
http://server:10080/efm/api/designer/flows
http://server:10080/efm/api/designer/flows/summaries
Get One Flow

GET /designer/flows/{flowId}

http://server:10080/efm/api/designer/flows/46cac951-217d-41f7-9442-086e9199c044
Get That Flows Events

GET /designer/flows/{flowId}/events

http://server:10080/efm/api/designer/flows/46cac951-217d-41f7-9442-086e9199c044/events
Get All Flows and Buckets
http://server:10080/efm/api/flows
Agent Classes
http://server:10080/efm/api/agent-classes
Agents
http://server:10080/efm/api/agents
Agent Manifests
http://server:10080/efm/api/agent-manifests
What NiFi Registry
http://server:10080/efm/api/c2-configuration/nifi-registry
What EFM Server
http://server:10080/efm/api/c2-configuration
SMM API
EFM Flow Designer
http://server:10080/efm/ui/#/flow-designer/flow/4ae72206-372d-4f3e-916a-d7c1faf09811
For a Great Real World Usage Example
https://github.com/asdaraujo/edge2ai-workshop#lab_1
http://hostname:10080/efm/api/agent-classes
http://hostname:10080/efm/api/agent-manifests?class=
http://hostname:10080/efm/swagger/


Resizing AWS ESB


   11  lsblk
   12  df -H
   13  lsblk
   14  sudo growpart /dev/xvda 0
   15  sudo resize2fs /dev/xvda1
   16  lsblk
   17   sudo growpart /dev/xvda 2
   18  lsblk
   19  df -H
   24  lsblk
   25  xfs_growfs /dev/xvda2 


https://hackernoon.com/tutorial-how-to-extend-aws-ebs-volumes-with-no-downtime-ec7d9e82426e

https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/recognize-expanded-volume-linux.html


growpart /dev/xvda 2
CHANGED: partition=2 start=4096 old: size=20967391 end=20971487 new: size=2097147870 end=2097151966

lsblk
NAME    MAJ:MIN RM  SIZE RO TYPE MOUNTPOINT
xvda    202:0    0 1000G  0 disk 
├─xvda1 202:1    0    1M  0 part 
└─xvda2 202:2    0 1000G  0 part /

df -H
Filesystem      Size  Used Avail Use% Mounted on
devtmpfs         34G     0   34G   0% /dev
tmpfs            34G     0   34G   0% /dev/shm
tmpfs            34G   27M   34G   1% /run
tmpfs            34G     0   34G   0% /sys/fs/cgroup
/dev/xvda2       11G  2.3G  8.5G  22% /
tmpfs           6.8G     0  6.8G   0% /run/user/1000
tmpfs           6.8G     0  6.8G   0% /run/user/0

xfs_growfs /dev/xvda2 
meta-data=/dev/xvda2             isize=512    agcount=7, agsize=393216 blks
         =                       sectsz=512   attr=2, projid32bit=1
         =                       crc=1        finobt=0 spinodes=0
data     =                       bsize=4096   blocks=2620923, imaxpct=25
         =                       sunit=0      swidth=0 blks
naming   =version 2              bsize=4096   ascii-ci=0 ftype=1
log      =internal               bsize=4096   blocks=2560, version=2
         =                       sectsz=512   sunit=0 blks, lazy-count=1
realtime =none                   extsz=4096   blocks=0, rtextents=0
data blocks changed from 2620923 to 262143483
[root@ip-10-0-1-136 scripts]# lsblk
NAME    MAJ:MIN RM  SIZE RO TYPE MOUNTPOINT
xvda    202:0    0 1000G  0 disk 
├─xvda1 202:1    0    1M  0 part 
└─xvda2 202:2    0 1000G  0 part /

df -H
Filesystem      Size  Used Avail Use% Mounted on
devtmpfs         34G     0   34G   0% /dev
tmpfs            34G     0   34G   0% /dev/shm
tmpfs            34G   27M   34G   1% /run
tmpfs            34G     0   34G   0% /sys/fs/cgroup
/dev/xvda2      1.1T  2.4G  1.1T   1% /
tmpfs           6.8G     0  6.8G   0% /run/user/1000
tmpfs           6.8G     0  6.8G   0% /run/user/0

Powering Edge AI for Sensor Reading with RPI and Breakout Garden (EFM, NiFi, MiNiFi Agents)


Powering Edge AI for Sensor Reading with RPI and Breakout Garden (EFM, NiFi, MiNiFi Agents)







Hardware Component List:

  • Raspberry Pi 3B+
  • BMP-280 Temperature, Pressure and Altitude
  • ST7735 0.96 SPI Colour LCD 160x80
  • MAX-30105 Oximeter and Smoke Sensor
  • Sony Playstation 3 EYE USB Web Camera

Software Component List:

  • Raspian
  • Python 3.5
  • JDK 8 Java (Soon Upgrading to JDK 11)
  • Apache NiFi 1.9.2
  • MiniFi Java Agent 0.6.0
  • Cloudera Edge Flow Manager
  • Apache Kafka 2.2

Summary

96410-breakoutgardenarchitecture.jpg

Potential Use Cases:   Tracking Environment in a facility that includes webcam detection, temperature, pressure and smoke.


Our Raspberry Pi 3B+ has a Breakout Garden Hat with 2 sensors and one small display. The display is showing the capture image and is constantly updating. 
We currently run via nohup, but when we go into constant use I will switch to a Linux Service to run on startup.
The Python script initializes the connections to all of the sensors and then goes into an infinite loop of reading those values and building a JSON packet that we send via MQTT over port 1883 to a MQTT Mosquitto broker. MiniFi 0.6.0 Java Agent is using ConsumeMQTT on that port to capture these messages and filter them based on alarm values. If outside of the checked parameters we send them via S2S/HTTP(s) to an Apache NiFi server.
We also have a USB WebCam (Sony Playstation 3 EYE) that is capturing images and we read those with MiniFi and send them to NiFi as well.  We will incorporate TensorFlow lite models into our analysis.
The first thing we need to do is pretty easy. We need to plug in our Pimoroni Breakout Garden Hat and our 3 plugs.
You have to do the standard installation of Python 3, Java 8, MiniFi and I recommend OpenCV. Make sure you have everything plugged in securely and the correct direction before you power on the Raspberry Pi.
Install Python PIP 
Install Breakout Garden Library 
unzip master.zip
cd breakout-garden-master
sudo ./install.sh

NiFi Flow


We Can Query IoT Events As They Stream In



Add ExecuteProcess to Run Our Shell/Python




IoT JSON Data



IoT User



Cloudera Edge Management - Monitoring IoT Events From MiNiFi Agents on Devices


Building an IoT Flow Graphically Is Easy!   It follows the Hadoop Philosophy to stitch these things together.  



Configure Connection to MQTT Broker



MQTT Configuration 2




Cloudera Edge Flow Manager REST API





 Let's Examine those MQTT Messages from Devices









As you see we follow the Hadoop Philosophy of keeping things open, extensible, modular, flexible, transparent, composable, using open data standards, open source, diverse and cloud friendly.  In this way we can always bend to the needs of the user and adapt to any environment, any data, any cloud at any time.   If we need to windowing we could easily add Storm or Flink.   For other streaming use cases we can connect our Kafka topics to Spark Structured Streaming or Kafka Streams for additional processing as needed.    We can public our public schemas from our schema registry as open data standards within our enterprise, our ecosystem or world wide.   Data is meant for sharing and utilizing to build knowledge.   Let's make it happen, from any Edge to any data store to any data cloud to any AI / ML / DS / DL model.

Source:
https://github.com/tspannhw/breakoutgardenhat-spi-minifi

Resources:

Breakout Garden Hat
https://github.com/pimoroni/breakout-garden/tree/master/examples/heartbeat
https://shop.pimoroni.com/products/0-96-spi-colour-lcd-160x80-breakout
https://datasheets.maximintegrated.com/en/ds/MAX30105.pdf
https://shop.pimoroni.com/products/max30105-breakout-heart-rate-oximeter-smoke-sensor
https://github.com/pimoroni/max30105-python
https://github.com/tspannhw/minifi-breakoutgarden
https://shop.pimoroni.com/products/0-96-spi-colour-lcd-160x80-breakout
curl https://get.pimoroni.com/st7735 | bash
https://github.com/pimoroni/st7735-python

Using a Different Configuration of Breakout Garden Sensors
https://community.cloudera.com/t5/Community-Articles/IoT-Series-Sensors-Utilizing-Breakout-Garden-Hat-Part-1/ta-p/249262


TensorFlow
https://github.com/tensorflow/examples/blob/master/lite/examples/image_classification/raspberry_pi/README.md
https://www.tensorflow.org/lite/guide/python
https://github.com/PINTO0309/Bazel_bin
https://github.com/PINTO0309/Tensorflow-bin

sudo apt-get install libatlas-base-dev 

wget https://dl.google.com/coral/python/tflite_runtime-1.14.0-cp35-cp35m-linux_armv7l.whl
pip3 install tflite_runtime-1.14.0-cp35-cp35m-linux_armv7l.whl

OpenSSL / SSL Hosting in NiFi



1. Host a Web Page (index.html) via HTTP GET with 200 OK Status
2. Receive POST from that page via AJAX with browser data
3. Extract Content and Attributes
4. Build a JSON file of HTTP data
5. Store it

To accept location in a phone or modern browser you must be running SSL.
So I added that for this HTTP Request.

Use openssl to create your 2048 RSA X509, PKCS12, JKS Keystore, Import Trust Store and import in browser




Your web page can be any web page, just POST back via AJAX or Form Submit.


<html>
<head>
<title>NiFi Browser Data Acquisition</title>
<body>
<script>
// Usage
window.onload = function() {
      
navigator.getBattery().then(function(battery) {
  console.log(battery.level);
  battery.addEventListener('levelchange', function() {
    console.log(this.level);
  });
});




};


////////////// print these


  var latitude = "";
  var longitude = "";
  var ips = "";
  var batteryInfo = "";
  var screenInfo = screen.width +","+ screen.height + "," +
                   screen.availWidth +","+ screen.availHeight + "," +
         screen.colorDepth + "," + screen.pixelDepth;
  var pluginsInfo = "";
  var coresInfo = "";
  
/////////////


////// Set Plugins
 for (var i = 0; i < 12; i++) {
   if ( typeof window.navigator.plugins[i] !== 'undefined' ) { 
         pluginsInfo += window.navigator.plugins[i].name + ', ';                 
        }
 }
 


////// Set Cores
if ( window.navigator.hardwareConcurrency > 0  ) {
 coresInfo = window.navigator.hardwareConcurrency + " cores";
}




/////////////
/// send the information to the server
function loadDoc() {
  var xhttp = new XMLHttpRequest();
  xhttp.onreadystatechange = function() {
    if (this.readyState == 4 && this.status == 200) {
      document.getElementById("demo").innerHTML = 'Sent.';
    }
  };
  // /send
  xhttp.open("POST", "/send", true);
  xhttp.setRequestHeader("Content-type", "application/json");
  xhttp.send('{"plugins":"' + pluginsInfo +
               '", "screen":"' + screenInfo + 
               '", "cores":"' + coresInfo + 
               '", "battery":"' + batteryInfo + 
               '", "ip":"' + ips + 
               '", "lat":"' + latitude + '", "lng":"' + longitude + '"}')
}


////////////
function geoFindMe() {
  var output = document.getElementById("out");


  if (!navigator.geolocation){
    output.innerHTML = "<p>Geolocation is not supported by your browser</p>";
    return;
  }


  function success(position) {
     latitude  = position.coords.latitude;
     longitude = position.coords.longitude;


    output.innerHTML = '<p>Latitude is ' + latitude + '° <br>Longitude is ' + longitude + '°</p>';


    var img = new Image();
    img.src="https://maps.googleapis.com/maps/api/staticmap?center=" + latitude + "," + longitude + "&zoom=13&size=300x300&sensor=false";


    output.appendChild(img);
  }


  function error() {
    output.innerHTML = "Unable to retrieve your location";
  }


  output.innerHTML = "<p>Locating…</p>";


  navigator.geolocation.getCurrentPosition(success, error);
}






//get the IP addresses associated with an account
function getIPs(callback){
    var ip_dups = {};


    //compatibility for firefox and chrome
    var RTCPeerConnection = window.RTCPeerConnection
        || window.mozRTCPeerConnection
        || window.webkitRTCPeerConnection;
    var useWebKit = !!window.webkitRTCPeerConnection;


    //bypass naive webrtc blocking using an iframe
    if(!RTCPeerConnection){
        //NOTE: you need to have an iframe in the page right above the script tag
        //
        //<iframe id="iframe" sandbox="allow-same-origin" style="display: none"></iframe>
        //<script>...getIPs called in here...
        //
        var win = iframe.contentWindow;
        RTCPeerConnection = win.RTCPeerConnection
            || win.mozRTCPeerConnection
            || win.webkitRTCPeerConnection;
        useWebKit = !!win.webkitRTCPeerConnection;
    }


    //minimal requirements for data connection
    var mediaConstraints = {
        optional: [{RtpDataChannels: true}]
    };


    var servers = {iceServers: [{urls: "stun:stun.services.mozilla.com"}]};


    //construct a new RTCPeerConnection
    var pc = new RTCPeerConnection(servers, mediaConstraints);


    function handleCandidate(candidate){
        //match just the IP address
        var ip_regex = /([0-9]{1,3}(\.[0-9]{1,3}){3}|[a-f0-9]{1,4}(:[a-f0-9]{1,4}){7})/
        var ip_addr = ip_regex.exec(candidate)[1];


        //remove duplicates
        if(ip_dups[ip_addr] === undefined)
            callback(ip_addr);


        ip_dups[ip_addr] = true;
    }


    //listen for candidate events
    pc.onicecandidate = function(ice){


        //skip non-candidate events
        if(ice.candidate)
            handleCandidate(ice.candidate.candidate);
    };


    //create a bogus data channel
    pc.createDataChannel("");


    //create an offer sdp
    pc.createOffer(function(result){


        //trigger the stun server request
        pc.setLocalDescription(result, function(){}, function(){});


    }, function(){});


    //wait for a while to let everything done
    setTimeout(function(){
        //read candidate info from local description
        var lines = pc.localDescription.sdp.split('\n');


        lines.forEach(function(line){
            if(line.indexOf('a=candidate:') === 0)
                handleCandidate(line);
        });
    }, 1000);
}




window.addEventListener("load", function (ev) {
    "use strict";
    var log = document.getElementById("log");
    // https://dvcs.w3.org/hg/dap/raw-file/tip/sensor-api/Overview.html
    window.addEventListener("devicetemperature", function (ev) {
        log.textContent += "devicetemperature " + ev.value + "\n";
    }, false);
    window.addEventListener("devicepressure", function (ev) {
        log.textContent += "devicepressure " + ev.value + "\n";
    }, false);
    window.addEventListener("devicelight", function (ev) {
        log.textContent += "devicelight " + ev.value + "\n";
        // toy tric
        log.style.color = "rgb(" + (255 - 2*ev.value) + ",0,0)";
        log.style.backgroundColor = "rgb(0,0," + (2*ev.value) + ")";
    }, false);
    window.addEventListener("deviceproximity", function (ev) {
        log.textContent += "deviceproximity " + ev.value + "\n";
        // toy tric
        if (ev.value < 3) navigator.vibrate([300, 100, 100]);
    }, false);
    window.addEventListener("devicenoise", function (ev) {
        log.textContent += "devicenoise " + ev.value + "\n";
    }, false);
    window.addEventListener("devicehumidity", function (ev) {
        log.textContent += "devicehumidity " + ev.value + "\n";
    }, false);


    //https://wiki.mozilla.org/Magnetic_Field_Events
    window.addEventListener("devicemagneticfield", function (ev) {
        log.textContent += "devicemagneticfield " + [ev.x, ev.y, ev.x]+ "\n";
    }, false);


    // https://dvcs.w3.org/hg/dap/raw-file/default/pressure/Overview.html
    window.addEventListener("atmpressure", function (ev) {
        log.textContent += "atmpressure " + ev.value + "\n";
    }, false);
    
    // https://dvcs.w3.org/hg/dap/raw-file/tip/humidity/Overview.html
    window.addEventListener("humidity", function (ev) {
        log.textContent += "humidity " + ev.value + "\n";
    }, false);
    
    // https://dvcs.w3.org/hg/dap/raw-file/tip/temperature/Overview.html
    window.addEventListener("temperature", function (ev) {
        log.textContent += "temperature " + [ev.f, ev.c, ev.k, ev.value] + "\n";
    }, false);
    
    // https://dvcs.w3.org/hg/dap/raw-file/tip/battery/Overview.html
    try {
        if (typeof navigator.getBattery === "function") {
            navigator.getBattery().then(function (battery) {
                log.textContent += "battery.level " + battery.level + "\n";
                log.textContent += "battery.charging " + battery.charging + "\n";
                
                batteryInfo = "battery.level=" + battery.level + "," + 
                               "battery.charging=" + battery.charging;
                
                log.textContent += "battery.chargeTime " + battery.chargeTime + "\n";
                log.textContent += "battery.dischargeTime " + battery.dischargeTime + "\n";
                battery.addEventListener("levelcharge", function (ev) {
                    log.textContent += "change battery.level " + battery.level + "\n";
                }, false);
            }).catch(function (err) {
                log.textContent += err.toString() + "\n";
            });
        } else {
            log.textContent += "";
        }
    } catch (ex) {
        log.textContent += ex.toString() + "\n";
    }
}, false);




</script>


<p>
<br>
DEMO:  Send Data to HDF /  Apache NiFi via HandleHTTPRequest
<br>


<p><button onclick="geoFindMe()">Show my location</button></p>


<div id="out"></div>


<div id="demo"></div>


<pre id="log"></pre>
 
<button type="button" onclick="loadDoc()">Send data to Apache NiFi SSL Server</button>


<iframe id="iframe" sandbox="allow-same-origin" style="display: none"></iframe>
<script>


getIPs(function(ip){ips = ip;});


</script>
</body>
</html>
index.html : A web page to grab user information.
mobile-ingest-v3.xml : Apache NiFi 1.1.x template.
Note: Different browsers, devices, phones, tables and versions will send different values. Users should get a location request pop-up.
JSON Result File
{
  "http.request.uri" : "/send",
  "http.context.identifier" : "a4f9ae25-5f49-463e-97eb-c8a6bf3be8a7",
  "http.remote.host" : "192.168.1.151",
  "http.headers.Host" : "192.168.1.151:9178",
  "http.local.name" : "192.168.1.151",
  "http.headers.DNT" : "1",
  "plugins" : "Widevine Content Decryption Module, Shockwave Flash, Chrome PDF Viewer, Native Client, Chrome PDF Viewer, ",
  "latitude" : "40.2681799",
  "http.headers.Accept" : "*/*",
  "battery" : "battery.level=1,battery.charging=true",
  "uuid" : "a2f299ae-6ef6-480d-a359-1362d25abe76",
  "http.request.url" : "https://192.168.1.151:9178/send",
  "http.server.name" : "192.168.1.151",
  "http.character.encoding" : "UTF-8",
  "path" : "./",
  "cores" : "8 cores",
  "http.remote.addr" : "192.168.1.151",
  "http.headers.User-Agent" : "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36",
  "http.method" : "POST",
  "http.headers.Connection" : "keep-alive",
  "longitude" : "-74.5291745",
  "http.server.port" : "9178",
  "ip" : "192.168.1.151",
  "mime.type" : "application/json",
  "http.locale" : "en_US",
  "http.headers.Accept-Encoding" : "gzip, deflate, br",
  "http.headers.Origin" : "https://192.168.1.151:9178",
  "http.servlet.path" : "",
  "http.local.addr" : "192.168.1.151",
  "filename" : "1082639525534467",
  "http.headers.Referer" : "https://192.168.1.151:9178/",
  "http.headers.Accept-Language" : "en-US,en;q=0.8",
  "http.headers.Content-Length" : "253",
  "http.headers.Content-Type" : "application/json",
  "RouteOnAttribute.Route" : "isjsonpost"
}
References:





Resources

https://www.freecodecamp.org/news/openssl-command-cheatsheet-b441be1e8c4a/
https://www.ibm.com/support/knowledgecenter/en/SSMNED_5.0.0/com.ibm.apic.cmc.doc/task_apionprem_gernerate_self_signed_openSSL.html
https://blogs.oracle.com/blogbypuneeth/steps-to-create-a-self-signed-certificate-using-openssl
https://www.batchiq.com/nifi-configuring-ssl-auth.html
http://www.treselle.com/blog/apache-nifi-data-crawling-from-https-websites/
https://www.tomaszezula.com/2016/11/06/using-ssl-with-nifi/
https://community.cloudera.com/t5/Support-Questions/Nifi-WebSocket-Secure-wss/m-p/237209
https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-ssl-context-service-nar/1.9.0/org.apache.nifi.ssl.StandardRestrictedSSLContextService/index.html



openssl req -newkey rsa:2048 -x509 -keyout cakey.pem -out cacert.pem -days 3650 
openssl pkcs12 -export -in cacert.pem -inkey cakey.pem -out identity.p12 -name "mykey" 
keytool -importkeystore -destkeystore identity.jks -deststorepass password -srckeystore identity.p12 -srcstoretype PKCS12 -srcstorepass password  
keytool -import -file cacert.pem -keystore trust.jks -storepass password
openssl x509 -in cacert.pem -noout -text
cat cakey.pem cacert.pem > server.pem  

http://www.gaudreault.ca/nifi-kerberize-ssl/




Backup and Restore NiFi Registry Templates

Backup and Restore NiFi Registry Templates



Example Backed Up NiFi Flow Excerpt

{
  "bucket" : {
    "createdTimestamp" : 1566430253392,
    "identifier" : "aaa50be8-b994-46b5-a0bb-be245c34b928",
    "link" : {
      "href" : "buckets/aaa50be8-b994-46b5-a0bb-be245c34b928",
      "params" : {
        "rel" : "self"
      }
    },
    "name" : "IoT",
    "permissions" : {
      "canDelete" : true,
      "canRead" : true,
      "canWrite" : true
    }
  },
  "flow" : {
    "bucketIdentifier" : "aaa50be8-b994-46b5-a0bb-be245c34b928",
    "bucketName" : "IoT",
    "createdTimestamp" : 1566430269976,
    "description" : "Created by MiNiFi C2 Flow Designer",
    "identifier" : "56902ff0-62ac-4189-9684-765247c885b5",
    "link" : {
      "href" : "buckets/aaa50be8-b994-46b5-a0bb-be245c34b928/flows/56902ff0-62ac-4189-9684-765247c885b5",
      "params" : {
        "rel" : "self"
      }
    },
    "modifiedTimestamp" : 1567693783102,
    "name" : "raspianjava",
    "type" : "Flow",
    "versionCount" : 10

Restoring Backed Up NiFi Flows


http://nifi.apache.org/docs/nifi-registry-docs/rest-api/index.html

To restore an Apache NiFi registry flow file.

Create a flow

POST
/buckets/{bucketId}/flows
Creates a flow


Using bucket id, upload a flow via POST and get a new flow id

Create a version of a flow

POST with that bucket id and flow id

/buckets/{bucketId}/flows/{flowId}/versions
Creates the next version of a flow

Scripted NiFi Registry Backups for DevOps Processing


You can also backup and restore NiFi Registry using the NiFi Python API or REST API directly.




Google Coral TPU with Edge Devices and MiNiFi

Google Coral TPU with Edge Devices and MiNiFi 


Designing Our Edge AI Flow with Cloudera Edge Flow Manager.


Configure Your Remote Process Group to Send Data to Your NiFi Cluster


Monitor Your Agents From the Events Screen


Let's grab all the new images and then delete on completion



We have Input and Output Ports to have Bidirectional communication with 0-n MiNiFi agents


Our NiFi flow to process calls from MiNiFi Agents running Coral TPUs


We run a query to check the TensorFlow Lite classification results and send out a slack message.



Let's push JSON data to a Kafka Cluster in AWS





Updating Machine Learning Models At The Edge With Apache NiFi and MiNiFi

Updating Machine Learning Models At The Edge With Apache NiFi and MiNiFi

Yes, we have bidirectional communication with MiNiFi agents from Apache NiFi via Site-to-Site (S2S) over HTTPS.   This means I can push in anything I want to the agent, including commands, files and updates.

I can also transmit data to edge agents via MQTT, REST and Kafka amongst other options.


NiFi Ready To Send and Receive Messages From Other NiFi Nodes, Clusters and MiNiFi Agents


Our NiFi flow is consuming Kafka and MQTT Messages, as well as reading updated model files and generating integration test sensor data.



MiNiFi Agents Have Downloaded The Model and Anything Else We Send to It



It's Easy to Configure MQTT Message Consumption in CEM, we just need the broker (with port) and a topic to filter on if you wish.




To Listen For Files/Models You can easily add a REST End Point to Proxy in Data of Your Choice with or without SSL


Here's an example CURL script to test that REST API:

curl -d '{"key1":"value1", "key2":"value2"}' -H "Content-Type: application/json" -X POST http://ec2-3-85-54-189.compute-1.amazonaws.com:8899/upload


We can generate JSON IoT Style Data for Integration Tests with ease using GenerateFlowFile:


Let's grab updated models when they change from my Data Science server:


I can read Kafka messages and send them to MiNiFi agents as well.




So I pushed a TFLITE model, but ONNX, PMML, Docker or Pickle are all options.


Generating JSON Data in Apache NiFi



Example of a JSON IoT Generator


{
   "ip": ${ip()},
   "unique_id": "${UUID()}",
   "thread": "${thread()}",
   "hostname": "${hostname()}",
   "sensor_9": ${random():mod(100)},
   "sensor_id": ${random():mod(30)},
   "sensor_3": ${random():mod(50)},
   "sensor_2": ${random():mod(500)},
   "sensor_1": ${random():mod(110)},
   "sensor_0": ${random():mod(150)},
   "sensor_7": ${random():mod(255)},
   "sensor_6": ${random():mod(95)},
   "sensor_5": ${random():mod(80)},
   "sensor_ts": ${now():toNumber()},
   "sensor_8": ${random():mod(120)},
   "sensor_4": ${random():mod(60)},
   "sensor_11": ${random():mod(20)},
   "sensor_10": ${random():mod(10)}
}

Resources