DataWorks Summit DC 2019 Report


While some lucky people were in DataWorkSummit Training, others of us were in the NoSQL Day.





After NoSQL Day's end party, it was time for meetups includings Apache NiFi and Apache Kafka sessions!    The Apache NiFi meetup was packed and had most of the Apache NiFi team on-site.















Tuesday May 21, 2019


NoSQL Day

Tracking Crime ... Phoenix/HBase/NiFi


Wednesday May 22, 2019

Expo Theatre 20 minute talk 1:35 pm - 
Apache Deep Learning 202


Thursday May 23, 2019

Cold Supply Chain Logistics using Sensors, Apache NiFi and the Hyperledger Fabric Blockchain Platform



1:35 - Expo Theatre 20 minute talk - Introduction to Apache NiFi



Edge to AI:  Analytics from Edge to Cloud with Efficient Movement of Machine Data



Reading OpenData JSON and Storing into Apache HBase / Phoenix Tables - Part 1

JSON Batch to Single Row Phoenix
I grabbed open data on Crime from Philly's Open Data (https://www.opendataphilly.org/dataset/crime-incidents), after a free sign up you get access to JSON crime data (https://data.phila.gov/resource/sspu-uyfa.json) You can grab individual dates or ranges for thousands of records. I wanted to spool each JSON record as a separate HBase row. With the flexibility of Apache NiFi 1.0.0, I can specify run times via cron or other familiar setup. This is my master flow.
First I use GetHTTP to retrieve the SSL JSON messages, I split the records up and store them as RAW JSON in HDFS as well as send some of them via Email, format them for Phoenix SQL and store them in Phoenix/HBase. All with no coding and in a simple flow. For extra output, I can send them to Reimann server for monitoring.
Setting up SSL for accessing HTTPS data like Philly Crime, require a little configuration and knowing what Java JRE you are using to run NiFi. You can run service nifi status to quickly get which JRE.
Split the Records
The Open Data set has many rows of data, let's split them up and pull out the attributes we want from the JSON.
Phoenix
Another part that requires specific formatting is setting up the Phoenix connection. Make sure you point to the correct driver and if you have security make sure that is set.
Load the Data (Upsert)
Once your data is loaded you can check quickly with /usr/hdp/current/phoenix-client/bin/sqlline.py localhost:2181:/hbase-unsecure
The SQL for this data set is pretty straight forward.
  1. CREATE TABLE phillycrime (dc_dist varchar,
  2. dc_key varchar not null primary key,dispatch_date varchar,dispatch_date_time varchar,dispatch_time varchar,hour varchar,location_block varchar,psa varchar,
  3. text_general_code varchar,ucr_general varchar);
  4.  
  5.  
  6. {"dc_dist":"18","dc_key":"200918067518","dispatch_date":"2009-10-02","dispatch_date_time":"2009-10-02T14:24:00.000","dispatch_time":"14:24:00","hour":"14","location_block":"S 38TH ST / MARKETUT ST","psa":"3","text_general_code":"Other Assaults","ucr_general":"800"}
  7. upsert into phillycrime values ('18', '200918067518', '2009-10-02','2009-10-02T14:24:00.000','14:24:00','14', 'S 38TH ST / MARKETUT ST','3','Other Assaults','800');
  8. !tables
  9. !describe phillycrime
The DC_KEY is unique so I used that as the Phoenix key. Now all the data I get will be added and any repeats will safely update. Sometimes during the data we may reget some of the same data, that's okay, it will just update to the same value.

Cloudera Edge Management Introduction


Using CEM - Adding a Processor to a Flow

Looking at Events From CEM

Designing a Java Flow

Configure A Stream Execution


Event Details



Example Apache NiFi Receiver 

CEM Design - Open Flow Screen


Configure a PutFile Processor




If you want to revert your current changes to a previous version


     An Example Flow Java Agent




An Example CPP Flow




Example of Data received in NiFi from CPP Agent


                          How to simulate data in GenerateDataFlow



Receiving Agent Data


Agent Logs Showing C2 Activities



Publish Flow to Agents



CEM

You can download CEM and NiFi Registry from Cloudera.   You need the Registry to be able to save and version the flows you will be deploying.

For a simple proof of concept, development test, you can setup both without needing a full fledged database.   You can use the H2 database for learning how to use the system.

I installed CEM on a few versions of Ubuntu and on Centos 7.

First thing you need to do is to install NiFi Registry, run it and create a bucket for EFM to use.

CEM Configuration Basics

conf/efm.properties   - turn on nifi registry
Create a bucket


EFM Settings
# Web Server Properties
#  address: the hostname or ip address of the interface to bind to; to bind to all, use 0.0.0.0
efm.server.address=0.0.0.0
efm.server.port=10080

efm.server.servlet.contextPath=/efm



New Features in MiniFi 0.6.0 C++ Agent

Python Processors

These are great, but first you will need to make sure you have Python installed and know where your Python modules are:

python -c "import site; print(site.getsitepackages())"python -m sitepython -m site --user-site


You will need a precompiled C++ agent for your environment or build it yourself.   You can also choose the Java agent if you do not wish to compile C++.   The C++ agent is smaller with a smaller footprint.

Configuring a MiNiFi Java Agent to Talk to EFM
(bootstrap.conf)

# MiNiFi Command & Control Configuration
# C2 Properties
# Enabling C2 Uncomment each of the following options
# define those with missing options
nifi.c2.enable=true
## define protocol parameters
nifi.c2.rest.url=http://server:10080/efm/api/c2-protocol/heartbeat
nifi.c2.rest.url.ack=http://server:10080/efm/api/c2-protocol/acknowledge
## heartbeat in milliseconds.  defaults to once a second
nifi.c2.agent.heartbeat.period=1000
## define parameters about your agent
nifi.c2.agent.class=centos7java
# Optional.  Defaults to a hardware based unique identifier
nifi.c2.agent.identifier=princeton0java
## Define TLS security properties for C2 communications

Configuring a MiNiFi C++ Agent to Talk to EFM
(minifi.properties)

nifi.c2.enable=true
nifi.c2.agent.protocol.class=RESTSender
nifi.c2.rest.url=http://server:10080/efm/api/c2-protocol/heartbeat
nifi.c2.rest.url.ack=http://server:10080/efm/api/c2-protocol/acknowledge
nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation
nifi.c2.agent.heartbeat.period=2000
nifi.c2.agent.class=centos7cpp
nifi.c2.agent.identifier=princeton0cpp

Code:



EFM Ports

EFM Server UI 10080
NiFi Registry 18080
CoAP 8989


EFM REST API

http://server:10080/efm/api/events

{"elements":[],"links":{"last":{"href":"events?filter=created%3Alte%3A1556648075461&pageNum=-1","rel":"last"},"first":{"href":"events?filter=created%3Alte%3A1556648075461&pageNum=0","rel":"first"},"new":{"href":"events?filter=created%3A-lte%3A1556648075461","rel":"new"},"self":{"href":"events?filter=created%3Alte%3A1556648075461&pageNum=0","rel":"self"}},"page":{"size":0,"number":0,"totalElements":0,"totalPages":0}}

http://server:10080/efm/api/events/fields

http://server:10080/efm/api/access

http://server:10080/efm/api/agent-classes

[{"name":"centos7java","agentManifests":["agent-manifest-id"]},{"name":"macjava","agentManifests":["agent-manifest-id"]},{"name":"centos7cpp","agentManifests":["UWcV4yk6ooO5CMMnSGcu7ift"]}]


http://server:10080/efm/api/c2-configuration

http://server:10080/efm/api/c2-configuration/nifi-registry

http://server:10080/efm/api/agents

[{"identifier":"princeton0java","agentClass":"centos7java","agentManifestId":"agent-manifest-id","status":{"uptime":1555621345767},"firstSeen":1555515050675,"lastSeen":1555621345821},{"identifier":"hw13125.local","agentClass":"macjava","agentManifestId":"agent-manifest-id","status":{"uptime":1555677981910},"firstSeen":1555535371415,"lastSeen":1555677983254},{"identifier":"princeton0cpp","agentClass":"centos7cpp","agentManifestId":"UWcV4yk6ooO5CMMnSGcu7ift","status":{"uptime":205159,"repositories":{"flowfile":{"size":0},"provenance":{"size":0}},"components":{"FlowController":{"running":false},"ListenHTTP":{"running":false},"SentimentAnalysis":{"running":false},"AppendHostInfo":{"running":false},"35ad349d-016a-1000-6b25-04742c52dff2":{"running":false}}},"firstSeen":1555678448409,"lastSeen":1555683502395}]

http://server:10080/efm/api/agent-manifests

http://server:10080/efm/api/designer/flows

http://server:10080/efm/api/designer/client-id

http://server:10080/efm/api/designer/flows/summaries

http://server:10080/efm/api/flow-mappings

http://server:10080/efm/api/flows

http://server:10080/efm/api/operations


You will also want an Apache NiFi 1.9.x server to receive calls from the MiNiFi Agents.

References:

Streams Messaging Manager (SMM) REST API

CALL SMM REST API from nifi






--- article , demo


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/metrics/aggregated/topics/syndicate-speed-event-avro?duration=LAST_ONE_HOUR&state=all" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/metrics/brokers/1003?duration=LAST_ONE_HOUR&from=-1&to=-1" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/brokers?brokerIds=1004%2C1002%2C1003" -H "accept: application/json"


Kafka Cluster Details


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/cluster" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/brokers/1002" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/consumers/groups?state=all" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/consumers/groups/console-consumer-64800" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/consumers/clients" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/consumers/groupNames" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/consumers/clients/console-consumer-64800" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/metrics/consumers/group/kafka-streams-analytics-geo-event?duration=LAST_ONE_HOUR&from=-1&to=-1" -H "accept: application/json"




curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/metrics/producers?state=all&duration=LAST_ONE_HOUR&from=-1&to=-1" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/metrics/producers/minifi-eu-i1?duration=LAST_ONE_HOUR&from=-1&to=-1" -H "accept: application/json"


Details on one topic
curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/configs/topics/syndicate-speed-event-avro" -H "accept: application/json"


List of All Topics with details
curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/configs/topics" -H "accept: application/json"


List of All Brokers with details


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/configs/brokers" -H "accept: application/json"


Broker Details
curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/configs/brokers/1002" -H "accept: application/json"

curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/search/configs/brokers?brokerIds=1001%2C%201002%2C%201003%2C%201004" -H "accept: application/json"

curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/search/brokers?brokerIds=1001%2C%201002%2C%201003%2C%201004%2C%201005" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/search/topics?topicNames=supply-chain%2C%20gateway-east-raw-sensors" -H "accept: application/json"


Partitions
curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/search/topicPartitions?partitions=P0%2CP1%2CP2%2CP3" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/search/configs/topics?topicNames=gateway-europe-raw-sensors%2C%20syndicate-speed-event-json" -H "accept: application/json"


List Serdes / Types


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/serdes" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/topics/syndicate-speed-event-json/offsets" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/topics" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/topics/syndicate-geo-event-json" -H "accept: application/json"

curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/topics/syndicate-geo-event-json/partitions" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/metrics/topics/syndicate-geo-event-json?duration=LAST_ONE_HOUR&from=-1&to=-1" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/metrics/topics/syndicate-geo-event-json/0?duration=LAST_THIRTY_DAYS" -H "accept: application/json"

Energy Monitoring via Apache NiFi MiNiFi 0.6.0 C++ Agent with Cloudera Edge Manager


Energy Monitoring via Apache NiFi MiNiFi 0.6.0 C++ Agent with Cloudera Edge Manager


With the advent of version 0.6.0 of the C++ Agent, we can have native fast Python Processors that can easily be added to your palate in Cloudera Edge Manager.


















Source:

https://github.com/tspannhw/minificpp-python-EnergyMonitoring