Showing posts with label nifi. Show all posts
Showing posts with label nifi. Show all posts

Predicting Sensor Readings with Time Series Machine Learning


Predicting Sensor Readings with Time Series Machine Learning





Sensors:

Sensor Unit (https://shop.pimoroni.com/products/enviro?variant=31155658457171)
  • BME280 temperature, pressure, humidity sensor
  • LTR-559 light and proximity sensor
  • MICS6814 analog gas sensor
  • ADS1015 ADC with spare channel for adding another analog sensor
  • MEMS microphone
  • 0.96-inch, 160 x 80 color LCD
Unit
  • Raspberry Pi 4
  • Intel Movidius 2
  • JDK 8
  • MiNIFi Java Agent 0.6.0
  • Python 3




Example Data

{"uuid": "rpi4_uuid_omi_20200417211935", "amplitude100": 0.3, "amplitude500": 0.1, "amplitude1000": 0.1, "lownoise": 0.1, "midnoise": 0.1, "highnoise": 0.1, "amps": 0.3, "ipaddress": "192.168.1.243", "host": "rp4", "host_name": "rp4", "macaddress": "dc:a6:32:03:a6:e9", "systemtime": "04/17/2020 17:19:36", "endtime": "1587158376.22", "runtime": "36.47", "starttime": "04/17/2020 17:18:58", "cpu": 0.0, "cpu_temp": "59.0", "diskusage": "46651.6 MB", "memory": 6.3, "id": "20200417211935_7b7ae5da-905b-418b-94f1-270a15dbc1df", "temperature": "38.7", "adjtemp": "29.7", "adjtempf": "65.5", "temperaturef": "81.7", "pressure": 1015.6, "humidity": 6.8, "lux": 1.2, "proximity": 0, "oxidising": 8.3, "reducing": 306.4, "nh3": 129.5, "gasKO": "Oxidising: 8300.63 Ohms\nReducing: 306352.94 Ohms\nNH3: 129542.17 Ohms"}

Updating Machine Learning Models At The Edge With Apache NiFi and MiNiFi

Updating Machine Learning Models At The Edge With Apache NiFi and MiNiFi

Yes, we have bidirectional communication with MiNiFi agents from Apache NiFi via Site-to-Site (S2S) over HTTPS.   This means I can push in anything I want to the agent, including commands, files and updates.

I can also transmit data to edge agents via MQTT, REST and Kafka amongst other options.


NiFi Ready To Send and Receive Messages From Other NiFi Nodes, Clusters and MiNiFi Agents


Our NiFi flow is consuming Kafka and MQTT Messages, as well as reading updated model files and generating integration test sensor data.



MiNiFi Agents Have Downloaded The Model and Anything Else We Send to It



It's Easy to Configure MQTT Message Consumption in CEM, we just need the broker (with port) and a topic to filter on if you wish.




To Listen For Files/Models You can easily add a REST End Point to Proxy in Data of Your Choice with or without SSL


Here's an example CURL script to test that REST API:

curl -d '{"key1":"value1", "key2":"value2"}' -H "Content-Type: application/json" -X POST http://ec2-3-85-54-189.compute-1.amazonaws.com:8899/upload


We can generate JSON IoT Style Data for Integration Tests with ease using GenerateFlowFile:


Let's grab updated models when they change from my Data Science server:


I can read Kafka messages and send them to MiNiFi agents as well.




So I pushed a TFLITE model, but ONNX, PMML, Docker or Pickle are all options.


GTFS Real-time Streaming with Apache NiFi


GTFS Real-time Streaming with Apache NiFi


To facilitate ingesting GTFS Real-Time data, I have added a processor that converts GTFS (General Transit Feed Specification) formatted ProtoBuf data into JSON.   This is using the standard Google APIs to accomplish this.      You can see the Protocol Buffers schema here

We will be able to get data on trip updates including delays, service alerts including changed routes and vehicle positions which can have location and congestion information.   This is the same information that these public transit systems feed to Google Maps.

https://developers.google.com/transit/gtfs-realtime/


An Example NiFi Flow For Accessing GTFS Data




First we add my new NiFi Processor, which you can get as source and build the NAR.   Or download one of the release builds of my new NiFi NAR archive.   This is alpha level code done very quickly.   If you find bugs please report and suggest updates.





Once it is added to your canvas, you can change the name and scheduling.



The only setting currently is the URL of the GTFS resource (which may require a key from some transportation APIs).




Once it runs, it will quickly return a flow file containing a JSON formatted version of the GTFS result as well as some attributes with GTFS information.






 The JSON has a lot of arrays and subsections, not very flat.   We could dump this raw to Hive and query it as a JSON document.  Or we can parse it with FlattenJSON, EvaluateJSONPath, SplitJson or various other options.

As mentioned before the main sections are tripUpdate, vehicle and alerts.   I will parse this and use this for real-time Slack, Kafka, HBase and Kudu messaging.   This data can become critical to companies that have trucks or people trying to navigate systems.

My first use case is to see if I can find out this data real-time during time of emergencies or events to help communities.    Non-Profit agents will be able to have real-time status reports of what transportation is available.   You have to ingest, cleanse and transform data to become knowledge and that can empower the community.

Join me in Puerto Rico at the NetHope Global Summit.

Source

https://github.com/tspannhw/gtfs/

Nar Release To Install in Apache NiFi 1.9.2 


https://github.com/tspannhw/gtfs/releases

Resources