Showing posts with label agent. Show all posts
Showing posts with label agent. Show all posts

IoT Series: MiNiFi Agent on Raspberry Pi 4 with Enviro+ Hat For Environmental Monitoring and Analytics

IoT Series:  MiNiFi Agent on Raspberry Pi 4 with Enviro+ Hat For Environmental Monitoring and Analytics

Summary:  Our powerful edge device streams sensor readings for environmental readings while also performing edge analytics with deep learning libraries and enhanced edge VPU.   We can perform complex running calculations on sensor data locally on the box before making potentially expense network calls.  We can also decide when to send out data based on heuristics, machine learning or simple if-then logic.

Use Case:   Monitor Environment.   Act Local, Report Global.

Stack:   FLANK

Category:   AI, IoT, Edge2AI, Real-Time Streaming, Sensors, Cameras, Telemetry.

Hardware:  Intel Movidius NCC 2 VPU (Neural Computing), Pimoroni Enviro Plus pHAT, Raspberry Pi 4 (4GB Edition).

Software:  Python 3 + Java + MiNiFi Agents + Cloudera Edge Flow Manager (EFM/CEM) + Apache NiFi.   Using Mm... FLaNK Stack.

Keywords:  Edge2AI, CV, AI, Deep Learning, Cloudera, NiFi, Raspberry Pi, Deep Learning, Sensors, IoT, IIoT, Devices, Java, Agents, FLaNK Stack, VPU, Movidius.

Open Source Assets:

I am running a Python script that streams sensor data continuously to MQTT to be picked up by MiNiFi agents or NiFi.   For development I am just running my Python script with a shell script and nohup.
python3 /opt/demo/

nohup ./ &

Example Enviro Plus pHAT Sensor Data

  "uuid" : "rpi4_uuid_xki_20191220215721",
  "ipaddress" : "",
  "host" : "rp4",
  "host_name" : "rp4",
  "macaddress" : "dc:a6:32:03:a6:e9",
  "systemtime" : "12/20/2019 16:57:21",
  "cpu" : 0.0,
  "diskusage" : "46958.1 MB",
  "memory" : 6.3,
  "id" : "20191220215721_938f2137-5adb-4c22-867d-cdfbce6431a8",
  "temperature" : "33.590520852237226",
  "pressure" : "1032.0433707836428",
  "humidity" : "7.793797584651376",
  "lux" : "0.0",
  "proximity" : "0",
  "gas" : "Oxidising: 3747.82 Ohms\nReducing: 479652.17 Ohms\nNH3: 60888.05 Ohms"


We are also running a standard MiNiFi Java Agent 0.6 that is running a Python application to do sensors, edge AI with Intel's OpenVino and some other analytics.


DATE=$(date +"%Y-%m-%d_%H%M")
source /opt/intel/openvino/bin/
fswebcam -q -r 1280x720 --no-banner /opt/demo/images/$DATE.jpg
python3 -W ignore /opt/intel/openvino/build/ /opt/demo/images/$DATE.jpg 2>/dev/null

Example OpenVino Data

{"host": "rp4", "cputemp": "67", "ipaddress": "", "endtime": "1577194586.66", "runtime": "0.00", "systemtime": "12/24/2019 08:36:26", "starttime": "12/24/2019 08:36:26", "diskfree": "46889.0", "memory": "15.1", "uuid": "20191224133626_55157415-1354-4137-8472-424779645fbe", "image_filename": "20191224133626_9317880e-ee87-485a-8627-c7088df734fc.jpg"}

In our flow I convert to Apache Avro, as you can see Avro schema is embedded.

The flow is very simple, consume MQTT messages from our broker on the topic we are pushing messages to from our field sensors.   We also ingest MiNiFi events through standard Apache NiFi HTTP(s) Site-to-Site (S2S).   We route images to our image processor and sensor data right to Kudu tables.

Now that the data is stored to Apache Kudu we can do our analytics.

Easy to Run an MQTT Broker


Demo Info:

Run Mosquitto MQTT on Local Machine (RPI, Mac, Win10, ...)

On OSX, brew install mosquitto


To have launchd start mosquitto now and restart at login:
  brew services start mosquitto

Or, if you don't want/need a background service you can just run:
  mosquitto -c /usr/local/etc/mosquitto/mosquitto.conf

For Python, we need pip3 install paho-mqtt.

Run Sensors on Device that pushes to MQTT

Python pushes continuous stream of sensor data to MQTT

MiNiFi Agent Reads Broker

Send to Kafka and/or NiFi

Example Image Grabbed From Webcam in Dark Office (It's Christmas Eve!)

 When ready we can push to a CDP Data Warehouse in AWS.

With CDP, it's very easy to have a data environment in many clouds to store my live sensor data.

 I can now use this data in Kudu tables from Cloudera Data Science Workbench for real Data Science, machine learning and insights.

What do we do with all of this data?   Check in soon for real-time analytics and dash boarding.

Migrating Apache Flume Flows to Apache NiFi: JMS To X and X to JMS

Migrating Apache Flume Flows to Apache NiFi:  JMS To/From Anywhere

This is a simple use case of being a gateway between JMS and other sources and sinks.   We can do a lot more than that in NiFi.  We can be a JMS Consumer or Producer.  All with No Code.  We can work with topics and queues and any message types you have.   We can turn tabular messages (JSON, CSV, XML, AVRO, Parquet, Grokable Text) into Records and process them at speed with queries, updates, merging and fast record processing that is schema aware.  So we know your fields and types and can validate those for you while real-time querying that data as it is sent from and to JMS topics and queues with Apache Calcite SQL.  We can store your schemas in our Cloudera Schema Registry and allow for REST API access to them.   Schemas are accessible from Spark, Flink, Kafka, NiFi and more.

It is extremely easy to do this in NiFi.

In our example we are using Apache ActiveMQ 5.15 as our example JMS Broker.   We are grabbing example data from a few different REST sources and pushing to and from our JMS broker.

Simple NiFi Flow For Pushing JMS Data to KUDU

We can monitor our JMS Activity in Apache ActiveMQ's Web Console

With Apache NiFi We Ingest All the REST Feeds

These feeds include Coinbase

NYC Demographics and Live Subway GTFS Data

Transit Land Feeds and Operators

World Trading Data

'Quandl REST Data

It is easy to Consume JMS messages from Topics or Queues

Consuming Messages in a snap, We just need to set our Connection Factory Service, Destination and Topic/Queue.

 JMS Connection Factory Settings, Just a Java Class, JAR path and Broker URI.   Yes we support SSL!

For JMS Queues, pick QUEUE and your QUEUE Name

Example JMS MetaData Produced including Delivery Mode, Expiration and Message ID

 Consume From a QUEUE

Consume From A TOPIC

Let's Push Any and All REST Feed to JMS Topics and Queues


Using GrovePi with Raspberry Pi and MiNiFi Agents for Data Ingest to Parquet, Kudu, ORC, Kafka, Hive and Impala

Using GrovePi with Raspberry Pi and MiNiFi Agents for Data Ingest

Source Code:

Acquiring sensor data from Grove sensors is easy using a GrovePi Hat and some compatible sensors.

Just before my talk at the Future of Data Meetup @ Bell Works in Holmdel, NJ, I thought I should ingest some data from a grove sensor interface.

It's so easy a sleeping cat could do it.

So what does this device look like?  

I have a temperature and humidity sensor on there.

The distance sonic sensor is in there too, that's for the next article.

Let's do this with minimal RAM.

That's a 64GB hard drive underneath in the white case with the RPI.

I need more data and BACON.

We design our MiNiFi Agent Flow in CEM/EFM.   Grab JSON data stream and run sensors.

Apache NiFi 1.9.2 / CFM 1.0 Received HTTPS S2S Events From MiNiFi Agent

A simple flow to query and convert our JSON data, then store it to Kudu and HDFS (ORC) as well as push it to Kafka with a schema.

Let's read that Kafka message and store to Parquet, we will push to MQTT and JMS in the next article.   This is our universal proxy/gateway.

We could infer a schema and not save it.   But by saving a schema to the schema registry it makes SMM, Kafka, NiFi and others schema aware and easy to automagically query and convert between CSV/JSON/XML/AVRO/Parquet and more.

Let's store the data in Parquet files on HDFS with an Impala table.   In Apache NiFi 1.10 there is a ParquetWriter

Before we push to Kafka, let's create a topic for it with Cloudera SMM

Let's build an impala table for that Kudu data.

We can query our tables with ease as data rapidly is added.

Let's Examine the Parquet Files that NiFi Generated

 Let's query that parquet data with Impala in Hue

 Let's monitor that data in Kafka with Cloudera SMM

That was easy from device to enterprise cloud data store(s) with enterprise messages, security, governance, lineage, data catalog, SDX, monitoring and more.   How easy can you ingest IoT data, query it mid stream and store it in multiple data stores.   It took longer to write the article then to do the project and code.   All graphical, Single Sign On, multiple schemas/verisons/data types/engines, multiple OSs, edge, cloud and laptop.   Easy.

Table DDL

(humidity STRING, uuid STRING, systemtime STRING, runtime STRING, cpu DOUBLE, id STRING, te STRING, host STRING, `end` STRING, 
macaddress STRING, temperature STRING, diskusage STRING, memory DOUBLE, ipaddress STRING, host_name STRING) 
LOCATION '/tmp/grovesensors'

CREATE TABLE grovesensors ( uuid STRING,  `end` STRING,humidity STRING, systemtime STRING, runtime STRING, cpu DOUBLE, id STRING, te STRING, 
host STRING,
macaddress STRING, temperature STRING, diskusage STRING, memory DOUBLE, ipaddress STRING, host_name STRING,
PRIMARY KEY (uuid, `end`)
TBLPROPERTIES ('kudu.num_tablet_replicas' = '1')

hdfs dfs -mkdir -p /tmp/grovesensors
hdfs dfs -mkdir -p /tmp/groveparquet

 diskusage STRING, 
  memory DOUBLE,  host_name STRING,
  systemtime STRING,
  macaddress STRING,
  temperature STRING,
  humidity STRING,
  cpu DOUBLE,
  uuid STRING,  ipaddress STRING,
  host STRING,
  `end` STRING,  te STRING,
  runtime STRING,
LOCATION '/tmp/groveparquet/'

Parquet Format

message org.apache.nifi.grove {
  optional binary diskusage (STRING);
  optional double memory;
  optional binary host_name (STRING);
  optional binary systemtime (STRING);
  optional binary macaddress (STRING);
  optional binary temperature (STRING);
  optional binary humidity (STRING);
  optional double cpu;
  optional binary uuid (STRING);
  optional binary ipaddress (STRING);
  optional binary host (STRING);
  optional binary end (STRING);
  optional binary te (STRING);
  optional binary runtime (STRING);
  optional binary id (STRING);
