Reading OpenData JSON and Storing into Apache HBase / Phoenix Tables - Part 1

JSON Batch to Single Row Phoenix
I grabbed open data on Crime from Philly's Open Data (https://www.opendataphilly.org/dataset/crime-incidents), after a free sign up you get access to JSON crime data (https://data.phila.gov/resource/sspu-uyfa.json) You can grab individual dates or ranges for thousands of records. I wanted to spool each JSON record as a separate HBase row. With the flexibility of Apache NiFi 1.0.0, I can specify run times via cron or other familiar setup. This is my master flow.
First I use GetHTTP to retrieve the SSL JSON messages, I split the records up and store them as RAW JSON in HDFS as well as send some of them via Email, format them for Phoenix SQL and store them in Phoenix/HBase. All with no coding and in a simple flow. For extra output, I can send them to Reimann server for monitoring.
Setting up SSL for accessing HTTPS data like Philly Crime, require a little configuration and knowing what Java JRE you are using to run NiFi. You can run service nifi status to quickly get which JRE.
Split the Records
The Open Data set has many rows of data, let's split them up and pull out the attributes we want from the JSON.
Phoenix
Another part that requires specific formatting is setting up the Phoenix connection. Make sure you point to the correct driver and if you have security make sure that is set.
Load the Data (Upsert)
Once your data is loaded you can check quickly with /usr/hdp/current/phoenix-client/bin/sqlline.py localhost:2181:/hbase-unsecure
The SQL for this data set is pretty straight forward.
  1. CREATE TABLE phillycrime (dc_dist varchar,
  2. dc_key varchar not null primary key,dispatch_date varchar,dispatch_date_time varchar,dispatch_time varchar,hour varchar,location_block varchar,psa varchar,
  3. text_general_code varchar,ucr_general varchar);
  4.  
  5.  
  6. {"dc_dist":"18","dc_key":"200918067518","dispatch_date":"2009-10-02","dispatch_date_time":"2009-10-02T14:24:00.000","dispatch_time":"14:24:00","hour":"14","location_block":"S 38TH ST / MARKETUT ST","psa":"3","text_general_code":"Other Assaults","ucr_general":"800"}
  7. upsert into phillycrime values ('18', '200918067518', '2009-10-02','2009-10-02T14:24:00.000','14:24:00','14', 'S 38TH ST / MARKETUT ST','3','Other Assaults','800');
  8. !tables
  9. !describe phillycrime
The DC_KEY is unique so I used that as the Phoenix key. Now all the data I get will be added and any repeats will safely update. Sometimes during the data we may reget some of the same data, that's okay, it will just update to the same value.

Cloudera Edge Management Introduction


Using CEM - Adding a Processor to a Flow

Looking at Events From CEM

Designing a Java Flow

Configure A Stream Execution


Event Details



Example Apache NiFi Receiver 

CEM Design - Open Flow Screen


Configure a PutFile Processor




If you want to revert your current changes to a previous version


     An Example Flow Java Agent




An Example CPP Flow




Example of Data received in NiFi from CPP Agent


                          How to simulate data in GenerateDataFlow



Receiving Agent Data


Agent Logs Showing C2 Activities



Publish Flow to Agents



CEM

You can download CEM and NiFi Registry from Cloudera.   You need the Registry to be able to save and version the flows you will be deploying.

For a simple proof of concept, development test, you can setup both without needing a full fledged database.   You can use the H2 database for learning how to use the system.

I installed CEM on a few versions of Ubuntu and on Centos 7.

First thing you need to do is to install NiFi Registry, run it and create a bucket for EFM to use.

CEM Configuration Basics

conf/efm.properties   - turn on nifi registry
Create a bucket


EFM Settings
# Web Server Properties
#  address: the hostname or ip address of the interface to bind to; to bind to all, use 0.0.0.0
efm.server.address=0.0.0.0
efm.server.port=10080

efm.server.servlet.contextPath=/efm



New Features in MiniFi 0.6.0 C++ Agent

Python Processors

These are great, but first you will need to make sure you have Python installed and know where your Python modules are:

python -c "import site; print(site.getsitepackages())"python -m sitepython -m site --user-site


You will need a precompiled C++ agent for your environment or build it yourself.   You can also choose the Java agent if you do not wish to compile C++.   The C++ agent is smaller with a smaller footprint.

Configuring a MiNiFi Java Agent to Talk to EFM
(bootstrap.conf)

# MiNiFi Command & Control Configuration
# C2 Properties
# Enabling C2 Uncomment each of the following options
# define those with missing options
nifi.c2.enable=true
## define protocol parameters
nifi.c2.rest.url=http://server:10080/efm/api/c2-protocol/heartbeat
nifi.c2.rest.url.ack=http://server:10080/efm/api/c2-protocol/acknowledge
## heartbeat in milliseconds.  defaults to once a second
nifi.c2.agent.heartbeat.period=1000
## define parameters about your agent
nifi.c2.agent.class=centos7java
# Optional.  Defaults to a hardware based unique identifier
nifi.c2.agent.identifier=princeton0java
## Define TLS security properties for C2 communications

Configuring a MiNiFi C++ Agent to Talk to EFM
(minifi.properties)

nifi.c2.enable=true
nifi.c2.agent.protocol.class=RESTSender
nifi.c2.rest.url=http://server:10080/efm/api/c2-protocol/heartbeat
nifi.c2.rest.url.ack=http://server:10080/efm/api/c2-protocol/acknowledge
nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation
nifi.c2.agent.heartbeat.period=2000
nifi.c2.agent.class=centos7cpp
nifi.c2.agent.identifier=princeton0cpp

Code:



EFM Ports

EFM Server UI 10080
NiFi Registry 18080
CoAP 8989


EFM REST API

http://server:10080/efm/api/events

{"elements":[],"links":{"last":{"href":"events?filter=created%3Alte%3A1556648075461&pageNum=-1","rel":"last"},"first":{"href":"events?filter=created%3Alte%3A1556648075461&pageNum=0","rel":"first"},"new":{"href":"events?filter=created%3A-lte%3A1556648075461","rel":"new"},"self":{"href":"events?filter=created%3Alte%3A1556648075461&pageNum=0","rel":"self"}},"page":{"size":0,"number":0,"totalElements":0,"totalPages":0}}

http://server:10080/efm/api/events/fields

http://server:10080/efm/api/access

http://server:10080/efm/api/agent-classes

[{"name":"centos7java","agentManifests":["agent-manifest-id"]},{"name":"macjava","agentManifests":["agent-manifest-id"]},{"name":"centos7cpp","agentManifests":["UWcV4yk6ooO5CMMnSGcu7ift"]}]


http://server:10080/efm/api/c2-configuration

http://server:10080/efm/api/c2-configuration/nifi-registry

http://server:10080/efm/api/agents

[{"identifier":"princeton0java","agentClass":"centos7java","agentManifestId":"agent-manifest-id","status":{"uptime":1555621345767},"firstSeen":1555515050675,"lastSeen":1555621345821},{"identifier":"hw13125.local","agentClass":"macjava","agentManifestId":"agent-manifest-id","status":{"uptime":1555677981910},"firstSeen":1555535371415,"lastSeen":1555677983254},{"identifier":"princeton0cpp","agentClass":"centos7cpp","agentManifestId":"UWcV4yk6ooO5CMMnSGcu7ift","status":{"uptime":205159,"repositories":{"flowfile":{"size":0},"provenance":{"size":0}},"components":{"FlowController":{"running":false},"ListenHTTP":{"running":false},"SentimentAnalysis":{"running":false},"AppendHostInfo":{"running":false},"35ad349d-016a-1000-6b25-04742c52dff2":{"running":false}}},"firstSeen":1555678448409,"lastSeen":1555683502395}]

http://server:10080/efm/api/agent-manifests

http://server:10080/efm/api/designer/flows

http://server:10080/efm/api/designer/client-id

http://server:10080/efm/api/designer/flows/summaries

http://server:10080/efm/api/flow-mappings

http://server:10080/efm/api/flows

http://server:10080/efm/api/operations


You will also want an Apache NiFi 1.9.x server to receive calls from the MiNiFi Agents.

References:

Streams Messaging Manager (SMM) REST API

CALL SMM REST API from nifi






--- article , demo


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/metrics/aggregated/topics/syndicate-speed-event-avro?duration=LAST_ONE_HOUR&state=all" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/metrics/brokers/1003?duration=LAST_ONE_HOUR&from=-1&to=-1" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/brokers?brokerIds=1004%2C1002%2C1003" -H "accept: application/json"


Kafka Cluster Details


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/cluster" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/brokers/1002" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/consumers/groups?state=all" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/consumers/groups/console-consumer-64800" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/consumers/clients" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/consumers/groupNames" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/consumers/clients/console-consumer-64800" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/metrics/consumers/group/kafka-streams-analytics-geo-event?duration=LAST_ONE_HOUR&from=-1&to=-1" -H "accept: application/json"




curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/metrics/producers?state=all&duration=LAST_ONE_HOUR&from=-1&to=-1" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/metrics/producers/minifi-eu-i1?duration=LAST_ONE_HOUR&from=-1&to=-1" -H "accept: application/json"


Details on one topic
curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/configs/topics/syndicate-speed-event-avro" -H "accept: application/json"


List of All Topics with details
curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/configs/topics" -H "accept: application/json"


List of All Brokers with details


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/configs/brokers" -H "accept: application/json"


Broker Details
curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/configs/brokers/1002" -H "accept: application/json"

curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/search/configs/brokers?brokerIds=1001%2C%201002%2C%201003%2C%201004" -H "accept: application/json"

curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/search/brokers?brokerIds=1001%2C%201002%2C%201003%2C%201004%2C%201005" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/search/topics?topicNames=supply-chain%2C%20gateway-east-raw-sensors" -H "accept: application/json"


Partitions
curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/search/topicPartitions?partitions=P0%2CP1%2CP2%2CP3" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/search/configs/topics?topicNames=gateway-europe-raw-sensors%2C%20syndicate-speed-event-json" -H "accept: application/json"


List Serdes / Types


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/serdes" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/topics/syndicate-speed-event-json/offsets" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/topics" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/topics/syndicate-geo-event-json" -H "accept: application/json"

curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/topics/syndicate-geo-event-json/partitions" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/metrics/topics/syndicate-geo-event-json?duration=LAST_ONE_HOUR&from=-1&to=-1" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/metrics/topics/syndicate-geo-event-json/0?duration=LAST_THIRTY_DAYS" -H "accept: application/json"

Energy Monitoring via Apache NiFi MiNiFi 0.6.0 C++ Agent with Cloudera Edge Manager


Energy Monitoring via Apache NiFi MiNiFi 0.6.0 C++ Agent with Cloudera Edge Manager


With the advent of version 0.6.0 of the C++ Agent, we can have native fast Python Processors that can easily be added to your palate in Cloudera Edge Manager.


















Source:

https://github.com/tspannhw/minificpp-python-EnergyMonitoring


Publishing and Consuming JMS Messages from Tibco Enterprise Message Service (JMS) with Apache NiFi



TIBCO Enterprise Message Service
I tested this against the most recent release of TIBCO Enterprise Message Service and their JMS driver available via trial download. I followed the very easy install directions. I downloaded it to a Centos 7 server.
Expanded my download to TIB_ems-dev_8.4.0_linux_x86_64
Then made it executable and ran TIBCOUniversalInstaller-lnx-x86-64.bin --console.
I used all the defaults (I picked server and client) and then quickly ran the finished install server.
Running Tibco on Centos 7
  1. cd /opt/tibco/ems/8.4/bin/
  2. ./tibemsd64 -config ~/TIBCO_HOME/tibco/cfgmgmt/ems/data/tibemsd.conf
Example JMS Queue Settings
  1. URL: tcp://servername:7222
  2.  
  3. class: com.tibco.tibjms.TibjmsQueueConnectionFactory
  4.  
  5. Directory: /opt/tibco/ems/8.4/lib/
I believe it just uses these files from that directory:
  • tibjms.jar
  • jms-2.0.jar
Once I have my server and port shown, it's easy to add those settings to Apache NiFi.
The settings I need to Publish messages is below.
After you enter your username and queue, you need to create (or use) a controller service.
Then we use our settings for our server, mine are the default ones. Make sure you enter the lib directory containing your jars and that it is on the Apache NiFi server and Apache NiFi user has permissions to read them.
You can also use this same controller to Consume JMS messages from TIBCO EMS.
These are example metadata attributes that Apache NiFi provides to you on message receipt.
Example Run Log of my TIBCO EMS v8.4.0 Server running on Linux.
Example Flow
Example Data
  1. {
  2. "top1pct" : "43.1",
  3. "top5" : "n09428293 seashore, coast, seacoast, sea-coast",
  4. "top4" : "n04371774 swing",
  5. "top3" : "n02894605 breakwater, groin, groyne, mole, bulwark, seawall, jetty",
  6. "top2" : "n03933933 pier",
  7. "top1" : "n03216828 dock, dockage, docking facility",
  8. "top2pct" : "34.3",
  9. "imagefilename" : "/opt/demo/images/201817121004997.jpg",
  10. "top3pct" : "3.8",
  11. "uuid" : "mxnet_uuid_img_20180413140808",
  12. "top4pct" : "2.7",
  13. "top5pct" : "2.4",
  14. "runtime" : "1.0"
  15. }
This is example JSON data, we could use any TEXT.
References