Performance Testing Apache NiFi - Part 1 - Loading Directories of CSV

Performance Testing Apache NiFi - Part 1 - Loading Directories of CSV

I am running a lot of different flows on different Apache NiFi configurations to get some performance numbers in different situations.

One situation I thought of was access directories of CSV files from HTTP.  Fortunately there's some really nice data available from NOAA (https://www.ncei.noaa.gov/data/global-hourly/access/2019/).

Example Flow:  NOAA





In this example performance testing flow I use my LinkProcessor to grab all of the links to CSV files on the HTTP download site.  I then split this JSON list into individual records and pull out the URL.   If it's a valid URL with a .CSV ending then I call invokeHTTP to download the CSV.   I then query the CSV for all the records (SELECT *) and for a count (SELECT COUNT(*)).   As part of this the records are written to JSON.



In this example we grab a specific CSV file and get 739 records.


 This CSVReader uses Jackson to parse the CSV files and figures out fields from the header.



I pull out the URL returned from the Link Processor.



This is my JSON Record Set Writer, it doesn't include a schema since I never built one.



I am looking at some performance stats for my NiFi instance which has 31GB of JVM space.  32GB causes issues due to the JVM's problem with 32bit addressing.









In this flow I generate unique JSON files in mass quantities at about 250bytes, merge them together, compress them, then push them to a file system.   This is to see how many records I can push.




QueryRecord is easy on CSV files even with no known schema.



The Results of the recordCount query:


I can also test with really fast multithreaded calls to a popular btc.com BitCoin exchange REST API.


Even encrypting and compressing won't slow me down.






Example Translated Data Segment
[{"STATION":"16541099999","DATE":"2019-01-07T05:55:00","SOURCE":"4","LATITUDE":"39.6666667","LONGITUDE":"9.4333333","ELEVATION":"645.0","NAME":"PERDASDEFOGU, IT","REPORT_TYPE":"FM-15","CALL_SIGN":"99999","QUALITY_CONTROL":"V020","WND":"330,1,N,0010,1","CIG":"99999,9,9,Y","VIS":"999999,9,9,9","TMP":"+0030,1","DEW":"+0020,1","SLP":"99999,9","MA1":null,"MD1":null,"REM":null},{"STATION":"16541099999","DATE":"2019-01-07T06:55:00","SOURCE":"4","LATITUDE":"39.6666667","LONGITUDE":"9.4333333","ELEVATION":"645.0","NAME":"PERDASDEFOGU, IT","REPORT_TYPE":"FM-15","CALL_SIGN":"99999","QUALITY_CONTROL":"V020","WND":"330,1,N,0010,1","CIG":"99999,9,9,Y","VIS":"999999,9,9,9","TMP":"+0030,1","DEW":"+0030,1","SLP":"99999,9","MA1":null,"MD1":null,"REM":null},{"STATION":"16541099999","DATE":"2019-01-07T07:55:00","SOURCE":"4","LATITUDE":"39.6666667","LONGITUDE":"9.4333333","ELEVATION":"645.0","NAME":"PERDASDEFOGU, IT","REPORT_TYPE":"FM-15","CALL_SIGN":"99999","QUALITY_CONTROL":"V020","WND":"300,1,N,0010,1","CIG":"99999,9,9,Y","VIS":"999999,9,9,9","TMP":"+0030,1","DEW":"+0020,1","SLP":"99999,9","MA1":null,"MD1":null,"REM":null},{"STATION":"16541099999","DATE":"2019-01-07T09:55:00","SOURCE":"4","LATITUDE":"39.6666667","LONGITUDE":"9.4333333","ELEVATION":"645.0","NAME":"PERDASDEFOGU, IT","REPORT_TYPE":"FM-15","CALL_SIGN":"99999","QUALITY_CONTROL":"V020","WND":"280,1,N,0026,1","CIG":"99999,9,9,Y","VIS":"999999,9,9,9","TMP":"+0070,1","DEW":"+0050,1","SLP":"99999,9","MA1":null,"MD1":null,"REM":null},{"STATION":"16541099999","DATE":"2019-01-07T10:55:00","SOURCE":"4","LATITUDE":"39.6666667","LONGITUDE":"9.4333333","ELEVATION":"645.0","NAME":"PERDASDEFOGU, IT","REPORT_TYPE":"FM-15","CALL_SIGN":"99999","QUALITY_CONTROL":"V020","WND":"260,1,N,0046,1","CIG":"99999,9,9,Y","VIS":"999999,9,9,9","TMP":"+0080,1

DataWorks Summit DC 2019 Report


While some lucky people were in DataWorkSummit Training, others of us were in the NoSQL Day.





After NoSQL Day's end party, it was time for meetups includings Apache NiFi and Apache Kafka sessions!    The Apache NiFi meetup was packed and had most of the Apache NiFi team on-site.















Tuesday May 21, 2019


NoSQL Day

Tracking Crime ... Phoenix/HBase/NiFi


Wednesday May 22, 2019

Expo Theatre 20 minute talk 1:35 pm - 
Apache Deep Learning 202


Thursday May 23, 2019

Cold Supply Chain Logistics using Sensors, Apache NiFi and the Hyperledger Fabric Blockchain Platform



1:35 - Expo Theatre 20 minute talk - Introduction to Apache NiFi



Edge to AI:  Analytics from Edge to Cloud with Efficient Movement of Machine Data



Reading OpenData JSON and Storing into Apache HBase / Phoenix Tables - Part 1

JSON Batch to Single Row Phoenix
I grabbed open data on Crime from Philly's Open Data (https://www.opendataphilly.org/dataset/crime-incidents), after a free sign up you get access to JSON crime data (https://data.phila.gov/resource/sspu-uyfa.json) You can grab individual dates or ranges for thousands of records. I wanted to spool each JSON record as a separate HBase row. With the flexibility of Apache NiFi 1.0.0, I can specify run times via cron or other familiar setup. This is my master flow.
First I use GetHTTP to retrieve the SSL JSON messages, I split the records up and store them as RAW JSON in HDFS as well as send some of them via Email, format them for Phoenix SQL and store them in Phoenix/HBase. All with no coding and in a simple flow. For extra output, I can send them to Reimann server for monitoring.
Setting up SSL for accessing HTTPS data like Philly Crime, require a little configuration and knowing what Java JRE you are using to run NiFi. You can run service nifi status to quickly get which JRE.
Split the Records
The Open Data set has many rows of data, let's split them up and pull out the attributes we want from the JSON.
Phoenix
Another part that requires specific formatting is setting up the Phoenix connection. Make sure you point to the correct driver and if you have security make sure that is set.
Load the Data (Upsert)
Once your data is loaded you can check quickly with /usr/hdp/current/phoenix-client/bin/sqlline.py localhost:2181:/hbase-unsecure
The SQL for this data set is pretty straight forward.
  1. CREATE TABLE phillycrime (dc_dist varchar,
  2. dc_key varchar not null primary key,dispatch_date varchar,dispatch_date_time varchar,dispatch_time varchar,hour varchar,location_block varchar,psa varchar,
  3. text_general_code varchar,ucr_general varchar);
  4.  
  5.  
  6. {"dc_dist":"18","dc_key":"200918067518","dispatch_date":"2009-10-02","dispatch_date_time":"2009-10-02T14:24:00.000","dispatch_time":"14:24:00","hour":"14","location_block":"S 38TH ST / MARKETUT ST","psa":"3","text_general_code":"Other Assaults","ucr_general":"800"}
  7. upsert into phillycrime values ('18', '200918067518', '2009-10-02','2009-10-02T14:24:00.000','14:24:00','14', 'S 38TH ST / MARKETUT ST','3','Other Assaults','800');
  8. !tables
  9. !describe phillycrime
The DC_KEY is unique so I used that as the Phoenix key. Now all the data I get will be added and any repeats will safely update. Sometimes during the data we may reget some of the same data, that's okay, it will just update to the same value.

Cloudera Edge Management Introduction


Using CEM - Adding a Processor to a Flow

Looking at Events From CEM

Designing a Java Flow

Configure A Stream Execution


Event Details



Example Apache NiFi Receiver 

CEM Design - Open Flow Screen


Configure a PutFile Processor




If you want to revert your current changes to a previous version


     An Example Flow Java Agent




An Example CPP Flow




Example of Data received in NiFi from CPP Agent


                          How to simulate data in GenerateDataFlow



Receiving Agent Data


Agent Logs Showing C2 Activities



Publish Flow to Agents



CEM

You can download CEM and NiFi Registry from Cloudera.   You need the Registry to be able to save and version the flows you will be deploying.

For a simple proof of concept, development test, you can setup both without needing a full fledged database.   You can use the H2 database for learning how to use the system.

I installed CEM on a few versions of Ubuntu and on Centos 7.

First thing you need to do is to install NiFi Registry, run it and create a bucket for EFM to use.

CEM Configuration Basics

conf/efm.properties   - turn on nifi registry
Create a bucket


EFM Settings
# Web Server Properties
#  address: the hostname or ip address of the interface to bind to; to bind to all, use 0.0.0.0
efm.server.address=0.0.0.0
efm.server.port=10080

efm.server.servlet.contextPath=/efm



New Features in MiniFi 0.6.0 C++ Agent

Python Processors

These are great, but first you will need to make sure you have Python installed and know where your Python modules are:

python -c "import site; print(site.getsitepackages())"python -m sitepython -m site --user-site


You will need a precompiled C++ agent for your environment or build it yourself.   You can also choose the Java agent if you do not wish to compile C++.   The C++ agent is smaller with a smaller footprint.

Configuring a MiNiFi Java Agent to Talk to EFM
(bootstrap.conf)

# MiNiFi Command & Control Configuration
# C2 Properties
# Enabling C2 Uncomment each of the following options
# define those with missing options
nifi.c2.enable=true
## define protocol parameters
nifi.c2.rest.url=http://server:10080/efm/api/c2-protocol/heartbeat
nifi.c2.rest.url.ack=http://server:10080/efm/api/c2-protocol/acknowledge
## heartbeat in milliseconds.  defaults to once a second
nifi.c2.agent.heartbeat.period=1000
## define parameters about your agent
nifi.c2.agent.class=centos7java
# Optional.  Defaults to a hardware based unique identifier
nifi.c2.agent.identifier=princeton0java
## Define TLS security properties for C2 communications

Configuring a MiNiFi C++ Agent to Talk to EFM
(minifi.properties)

nifi.c2.enable=true
nifi.c2.agent.protocol.class=RESTSender
nifi.c2.rest.url=http://server:10080/efm/api/c2-protocol/heartbeat
nifi.c2.rest.url.ack=http://server:10080/efm/api/c2-protocol/acknowledge
nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation
nifi.c2.agent.heartbeat.period=2000
nifi.c2.agent.class=centos7cpp
nifi.c2.agent.identifier=princeton0cpp

Code:



EFM Ports

EFM Server UI 10080
NiFi Registry 18080
CoAP 8989


EFM REST API

http://server:10080/efm/api/events

{"elements":[],"links":{"last":{"href":"events?filter=created%3Alte%3A1556648075461&pageNum=-1","rel":"last"},"first":{"href":"events?filter=created%3Alte%3A1556648075461&pageNum=0","rel":"first"},"new":{"href":"events?filter=created%3A-lte%3A1556648075461","rel":"new"},"self":{"href":"events?filter=created%3Alte%3A1556648075461&pageNum=0","rel":"self"}},"page":{"size":0,"number":0,"totalElements":0,"totalPages":0}}

http://server:10080/efm/api/events/fields

http://server:10080/efm/api/access

http://server:10080/efm/api/agent-classes

[{"name":"centos7java","agentManifests":["agent-manifest-id"]},{"name":"macjava","agentManifests":["agent-manifest-id"]},{"name":"centos7cpp","agentManifests":["UWcV4yk6ooO5CMMnSGcu7ift"]}]


http://server:10080/efm/api/c2-configuration

http://server:10080/efm/api/c2-configuration/nifi-registry

http://server:10080/efm/api/agents

[{"identifier":"princeton0java","agentClass":"centos7java","agentManifestId":"agent-manifest-id","status":{"uptime":1555621345767},"firstSeen":1555515050675,"lastSeen":1555621345821},{"identifier":"hw13125.local","agentClass":"macjava","agentManifestId":"agent-manifest-id","status":{"uptime":1555677981910},"firstSeen":1555535371415,"lastSeen":1555677983254},{"identifier":"princeton0cpp","agentClass":"centos7cpp","agentManifestId":"UWcV4yk6ooO5CMMnSGcu7ift","status":{"uptime":205159,"repositories":{"flowfile":{"size":0},"provenance":{"size":0}},"components":{"FlowController":{"running":false},"ListenHTTP":{"running":false},"SentimentAnalysis":{"running":false},"AppendHostInfo":{"running":false},"35ad349d-016a-1000-6b25-04742c52dff2":{"running":false}}},"firstSeen":1555678448409,"lastSeen":1555683502395}]

http://server:10080/efm/api/agent-manifests

http://server:10080/efm/api/designer/flows

http://server:10080/efm/api/designer/client-id

http://server:10080/efm/api/designer/flows/summaries

http://server:10080/efm/api/flow-mappings

http://server:10080/efm/api/flows

http://server:10080/efm/api/operations


You will also want an Apache NiFi 1.9.x server to receive calls from the MiNiFi Agents.

References: