NiFi Toolkit - CLI - For NiFi 1.10

NiFi Toolkit - CLI - For NiFi 1.10

Along with the updated Apache NiFi server, the NiFi 1.10 release also updated the Command Line Interface with some updated and new features.   Let's check them out.

Cool Tools

S2S.sh - send data to Apache NiFi via the CLI.

Formatted as such:
[{"attributes":{"key":"value"},"data":"stuff"}]

Examples

registry import-flow-version


Get Into Interactive Mode

./cli.sh




Get Parameter Contexts (simple or json format)

 nifi list-param-contexts -u http://localhost:8080 -ot simple



Export Parameter Context

nifi export-param-context -u http://localhost:8080 -verbose --paramContextId 8067d863-016e-1000-f0f7-265210d3e7dc 




Get Services

 nifi get-services -u http://localhost:8080


NiFi Dump

../bin/nifi.sh dump filedump.txt

NiFi home: /Users/tspann/Documents/nifi-1.10.0

Bootstrap Config File: /Users/tspann/Documents/nifi-1.10.0/conf/bootstrap.conf

2019-11-18 17:08:04,921 INFO [main] org.apache.nifi.bootstrap.Command Successfully wrote thread dump to /Users/tspann/Documents/nifi-1.10.0/filedump.txt

NiFi Diagnostics

../bin/nifi.sh diagnostics diag.txt

Java home:
NiFi home: /Users/tspann/Documents/nifi-1.10.0

Bootstrap Config File: /Users/tspann/Documents/nifi-1.10.0/conf/bootstrap.conf

2019-11-18 17:11:09,844 INFO [main] org.apache.nifi.bootstrap.Command Successfully wrote diagnostics information to /Users/tspann/Documents/nifi-1.10.0/diag.txt

2019-11-18 17:11:10,041 INFO [main] org.apache.nifi.bootstrap.Command gopherProxySet = false
2019-11-18 17:11:10,042 INFO [main] org.apache.nifi.bootstrap.Command awt.toolkit = sun.lwawt.macosx.LWCToolkit
2019-11-18 17:11:10,042 INFO [main] org.apache.nifi.bootstrap.Command java.specification.version = 11
2019-11-18 17:11:10,042 INFO [main] org.apache.nifi.bootstrap.Command sun.cpu.isalist =
2019-11-18 17:11:10,042 INFO [main] org.apache.nifi.bootstrap.Command sun.jnu.encoding = UTF-8
2019-11-18 17:11:10,042 INFO [main] org.apache.nifi.bootstrap.Command java.class.path = /Users/tspann/Documents/nifi-1.10.0/./conf:/Users/tspann/Documents/nifi-1.10.0/./lib/jetty-schemas-3.1.jar:/Users/tspann/Documents/nifi-1.10.0/./lib/slf4j-api-1.7.26.jar:/Users/tspann/Documents/nifi-1.10.0/./lib/stanford-english-corenlp-2018-02-27-models.jar:/Users/tspann/Documents/nifi-1.10.0/./lib/jcl-over-slf4j-1.7.26.jar:/Users/tspann/Documents/nifi-1.10.0/./lib/javax.servlet-api-3.1.0.jar:/Users/tspann/Documents/nifi-1.10.0/./lib/logback-classic-1.2.3.jar:/Users/tspann/Documents/nifi-1.10.0/./lib/nifi-properties-1.10.0.jar:/Users/tspann/Documents/nifi-1.10.0/./lib/nifi-nar-utils-1.10.0.jar:/Users/tspann/Documents/nifi-1.10.0/./lib/nifi-api-1.10.0.jar:/Users/tspann/Documents/nifi-1.10.0/./lib/nifi-framework-api-1.10.0.jar:/Users/tspann/Documents/nifi-1.10.0/./lib/jul-to-slf4j-1.7.26.jar:/Users/tspann/Documents/nifi-1.10.0/./lib/logback-core-1.2.3.jar:/Users/tspann/Documents/nifi-1.10.0/./lib/log4j-over-slf4j-1.7.26.jar:/Users/tspann/Documents/nifi-1.10.0/./lib/nifi-runtime-1.10.0.jar:/Users/tspann/Documents/nifi-1.10.0/./lib/java11/javax.annotation-api-1.3.2.jar:/Users/tspann/Documents/nifi-1.10.0/./lib/java11/jaxb-core-2.3.0.jar:/Users/tspann/Documents/nifi-1.10.0/./lib/java11/javax.activation-api-1.2.0.jar:/Users/tspann/Documents/nifi-1.10.0/./lib/java11/jaxb-impl-2.3.0.jar:/Users/tspann/Documents/nifi-1.10.0/./lib/java11/jaxb-api-2.3.0.jar
2019-11-18 17:11:10,042 INFO [main] org.apache.nifi.bootstrap.Command java.vm.vendor = Amazon.com Inc.
2019-11-18 17:11:10,042 INFO [main] org.apache.nifi.bootstrap.Command javax.security.auth.useSubjectCredsOnly = true
2019-11-18 17:11:10,043 INFO [main] org.apache.nifi.bootstrap.Command sun.arch.data.model = 64
2019-11-18 17:11:10,043 INFO [main] org.apache.nifi.bootstrap.Command sun.font.fontmanager = sun.font.CFontManager
2019-11-18 17:11:10,043 INFO [main] org.apache.nifi.bootstrap.Command java.vendor.url = https://aws.amazon.com/corretto/
2019-11-18 17:11:10,043 INFO [main] org.apache.nifi.bootstrap.Command user.timezone = America/New_York
2019-11-18 17:11:10,043 INFO [main] org.apache.nifi.bootstrap.Command org.apache.nifi.bootstrap.config.log.dir = /Users/tspann/Documents/nifi-1.10.0/logs
2019-11-18 17:11:10,043 INFO [main] org.apache.nifi.bootstrap.Command os.name = Mac OS X
2019-11-18 17:11:10,043 INFO [main] org.apache.nifi.bootstrap.Command java.vm.specification.version = 11
2019-11-18 17:11:10,043 INFO [main] org.apache.nifi.bootstrap.Command nifi.properties.file.path = /Users/tspann/Documents/nifi-1.10.0/./conf/nifi.properties
2019-11-18 17:11:10,043 INFO [main] org.apache.nifi.bootstrap.Command sun.java.launcher = SUN_STANDARD
2019-11-18 17:11:10,043 INFO [main] org.apache.nifi.bootstrap.Command user.country = US
2019-11-18 17:11:10,043 INFO [main] org.apache.nifi.bootstrap.Command sun.boot.library.path = /Library/Java/JavaVirtualMachines/amazon-corretto-11.jdk/Contents/Home/lib
2019-11-18 17:11:10,043 INFO [main] org.apache.nifi.bootstrap.Command app = NiFi
2019-11-18 17:11:10,043 INFO [main] org.apache.nifi.bootstrap.Command sun.java.command = org.apache.nifi.NiFi
2019-11-18 17:11:10,043 INFO [main] org.apache.nifi.bootstrap.Command jdk.debug = release
2019-11-18 17:11:10,043 INFO [main] org.apache.nifi.bootstrap.Command org.apache.jasper.compiler.disablejsr199 = true
2019-11-18 17:11:10,043 INFO [main] org.apache.nifi.bootstrap.Command sun.cpu.endian = little
2019-11-18 17:11:10,043 INFO [main] org.apache.nifi.bootstrap.Command user.home = /Users/tspann
2019-11-18 17:11:10,044 INFO [main] org.apache.nifi.bootstrap.Command user.language = en
2019-11-18 17:11:10,044 INFO [main] org.apache.nifi.bootstrap.Command java.specification.vendor = Oracle Corporation
2019-11-18 17:11:10,044 INFO [main] org.apache.nifi.bootstrap.Command java.version.date = 2019-07-16
2019-11-18 17:11:10,044 INFO [main] org.apache.nifi.bootstrap.Command java.home = /Library/Java/JavaVirtualMachines/amazon-corretto-11.jdk/Contents/Home
2019-11-18 17:11:10,044 INFO [main] org.apache.nifi.bootstrap.Command file.separator = /
2019-11-18 17:11:10,044 INFO [main] org.apache.nifi.bootstrap.Command java.vm.compressedOopsMode = Zero based
2019-11-18 17:11:10,044 INFO [main] org.apache.nifi.bootstrap.Command line.separator =

2019-11-18 17:11:10,044 INFO [main] org.apache.nifi.bootstrap.Command java.specification.name = Java Platform API Specification
2019-11-18 17:11:10,044 INFO [main] org.apache.nifi.bootstrap.Command java.vm.specification.vendor = Oracle Corporation
2019-11-18 17:11:10,050 INFO [main] org.apache.nifi.bootstrap.Command javax.xml.xpath.XPathFactory:http://saxon.sf.net/jaxp/xpath/om = net.sf.saxon.xpath.XPathFactoryImpl
2019-11-18 17:11:10,050 INFO [main] org.apache.nifi.bootstrap.Command java.awt.graphicsenv = sun.awt.CGraphicsEnvironment
2019-11-18 17:11:10,051 INFO [main] org.apache.nifi.bootstrap.Command java.awt.headless = true
2019-11-18 17:11:10,051 INFO [main] org.apache.nifi.bootstrap.Command java.protocol.handler.pkgs = sun.net.www.protocol
2019-11-18 17:11:10,051 INFO [main] org.apache.nifi.bootstrap.Command sun.management.compiler = HotSpot 64-Bit Tiered Compilers
2019-11-18 17:11:10,051 INFO [main] org.apache.nifi.bootstrap.Command java.runtime.version = 11.0.4+11-LTS
2019-11-18 17:11:10,051 INFO [main] org.apache.nifi.bootstrap.Command user.name = tspann
2019-11-18 17:11:10,051 INFO [main] org.apache.nifi.bootstrap.Command java.net.preferIPv4Stack = true
2019-11-18 17:11:10,051 INFO [main] org.apache.nifi.bootstrap.Command path.separator = :
2019-11-18 17:11:10,051 INFO [main] org.apache.nifi.bootstrap.Command java.security.egd = file:/dev/urandom
2019-11-18 17:11:10,051 INFO [main] org.apache.nifi.bootstrap.Command org.jruby.embed.localvariable.behavior = persistent
2019-11-18 17:11:10,051 INFO [main] org.apache.nifi.bootstrap.Command os.version = 10.14.6
2019-11-18 17:11:10,051 INFO [main] org.apache.nifi.bootstrap.Command java.runtime.name = OpenJDK Runtime Environment
2019-11-18 17:11:10,051 INFO [main] org.apache.nifi.bootstrap.Command file.encoding = UTF-8
2019-11-18 17:11:10,052 INFO [main] org.apache.nifi.bootstrap.Command sun.net.http.allowRestrictedHeaders = true
2019-11-18 17:11:10,052 INFO [main] org.apache.nifi.bootstrap.Command jnidispatch.path = /var/folders/t5/xz5j50wx2rl8kd3021lkbn800000gn/T/jna--864347536/jna3349001211756681540.tmp
2019-11-18 17:11:10,052 INFO [main] org.apache.nifi.bootstrap.Command java.vm.name = OpenJDK 64-Bit Server VM
2019-11-18 17:11:10,052 INFO [main] org.apache.nifi.bootstrap.Command jna.platform.library.path = /usr/lib:/usr/lib
2019-11-18 17:11:10,052 INFO [main] org.apache.nifi.bootstrap.Command java.vendor.version = Corretto-11.0.4.11.1
2019-11-18 17:11:10,052 INFO [main] org.apache.nifi.bootstrap.Command jna.loaded = true
2019-11-18 17:11:10,052 INFO [main] org.apache.nifi.bootstrap.Command java.vendor.url.bug = https://github.com/corretto/corretto-11/issues/
2019-11-18 17:11:10,052 INFO [main] org.apache.nifi.bootstrap.Command jetty.git.hash = afcf563148970e98786327af5e07c261fda175d3
2019-11-18 17:11:10,052 INFO [main] org.apache.nifi.bootstrap.Command java.io.tmpdir = /var/folders/t5/xz5j50wx2rl8kd3021lkbn800000gn/T/
2019-11-18 17:11:10,052 INFO [main] org.apache.nifi.bootstrap.Command java.version = 11.0.4
2019-11-18 17:11:10,052 INFO [main] org.apache.nifi.bootstrap.Command user.dir = /Users/tspann/Documents/nifi-1.10.0
2019-11-18 17:11:10,052 INFO [main] org.apache.nifi.bootstrap.Command os.arch = x86_64
2019-11-18 17:11:10,052 INFO [main] org.apache.nifi.bootstrap.Command nifi.bootstrap.listen.port = 55105
2019-11-18 17:11:10,052 INFO [main] org.apache.nifi.bootstrap.Command java.vm.specification.name = Java Virtual Machine Specification
2019-11-18 17:11:10,052 INFO [main] org.apache.nifi.bootstrap.Command java.awt.printerjob = sun.lwawt.macosx.CPrinterJob
2019-11-18 17:11:10,053 INFO [main] org.apache.nifi.bootstrap.Command sun.os.patch.level = unknown
2019-11-18 17:11:10,053 INFO [main] org.apache.nifi.bootstrap.Command bridj.quiet = true
2019-11-18 17:11:10,053 INFO [main] org.apache.nifi.bootstrap.Command java.library.path = /Users/tspann/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.
2019-11-18 17:11:10,053 INFO [main] org.apache.nifi.bootstrap.Command java.vendor = Amazon.com Inc.
2019-11-18 17:11:10,053 INFO [main] org.apache.nifi.bootstrap.Command java.vm.info = mixed mode
2019-11-18 17:11:10,053 INFO [main] org.apache.nifi.bootstrap.Command java.vm.version = 11.0.4+11-LTS
2019-11-18 17:11:10,053 INFO [main] org.apache.nifi.bootstrap.Command sun.io.unicode.encoding = UnicodeBig
2019-11-18 17:11:10,053 INFO [main] org.apache.nifi.bootstrap.Command java.class.version = 55.0


Resources

NiFi 1.10: PostSlack - Easy Image Upload


In Apache NiFi 1.10, we now have a nice PostSlack processor to send images to slack channels.   If you have read my blog I had a lesser processor that I wrote:  https://www.datainmotion.dev/2019/03/posting-images-to-slack-from-apache.html.
Time to upgrade my friend.  

To use the new processor you will need to have a Slack account, Slack administrator permissions and a Slack application created so you can have an application token.   You can grab that token here:
https://api.slack.com/methods/files.upload/test. Go to Tester tab, pick your app, mine is Nifi. then click test. At the bottom will be a URL with the token=? Grab that token.

Notes:

You also must login with slack admin role

files:write
files:write:user

under YOURSLACK /apps/manage .  then dive into app

https://api.slack.com/messaging/files/setup

You can make this a parameter so you can make your PostSlack processor into a stateless module.



Our Example Flow:   Sending Webcam Images to Slack


First we need a source, could be any picture source, I use my WebCam processor which grabs images from a webcamera attached to the NiFi server.

Then I add a mime.type, some sources will already set this for you.   For me I manually set the image mimetype to image/jpeg.


Posting to Slack is easy, just add the channel and that token we grabbed from the slack admin site.


The webcam image posted to slack.




 That's it, a new processor available for you.


In a future article I will talk about a new Rules Feature:







Ingest Salesforce Data into Hive Using Apache Nifi

Ingest Salesforce Data into Hive Using Apache NiFi

TOOLS AND ACCOUNT USED: 

Salesforce JDBC driver : https://www.progress.com/jdbc/salesforce
Java : 1.8.0_221 
OSS: MacOS Mojave 10.14.5
SDFC Developer account: https://developer.salesforce.com/signup - 15 day trial


Overall Flow :

Install Hive - Thrift, Metastore, and Hive Table schema that matches SDFC opportunity table. 
Install NiFi - Add PutHive3streaming nar 
Install and Configure Drivers : SalesforceConnect and Avroreader
Add Processors - QueryDatabaseTable and Puthive3Streaming


Install Salesforce JDBC and Hive drivers

  • Download DataDirect Salesforce JDBC driver from here.
  • Download Cloudera Hive driver from here.
  • To install the driver, execute the .jar package by running the following command in the terminal or just by double clicking on the jar package.


java -jar PROGRESS_DATADIRECT_JDBC_SF_ALL.jar
java -jar PROGRESS_DATADIRECT_JDBC_INSTALL.jar




Add Drivers to the Apache NiFi Classpath

Copy drivers

From - /home/user/Progress/DataDirect/Connect_for_JDBC_51/{sforce.jar,hive.jar}
To - /path/to/nifi/lib [/usr/local/Cellar/nifi/1.9.2/libexec/lib/

Restart Apache NiFi



Build the Apache NiFi Flow

Launch Apache NiFi: http://localhost:8080/nifi

Add and Configure the JDBC drivers
DBCPConnectionPool
AvroReader
Add DBCPConnectionPool

Choose DBCPConnectionPool as your controller service

Add Properties below:

Database Connection URL = jdbc:datadirect:sforce://login.salesforce.com;SecurityToken=****

- NOTE below on SecurityToken

  • Database Driver Class Name : com.ddtek.jdbc.sforce.SForceDriver
  • Database User - Developer account username 
  • Password - the Password for the Developer account
  • Name - SalesforceConnect


Add Avro reader Controller : 
This controller is used by the subsequent co-processor(PutHive3Streaming) to read Avro format data
from the previous processor(QueryDatabaseTable). 



Add and Configure Processors


QueryDatabaseTable
PutHive3Streaming


QueryDatabaseTable

To read the data from Salesforce use a processor called QueryDatabaseTable that supports
incremental data pull.

Choose QueryDatabaseTable and add it on to canvas as below.





Configure QueryDatabaseTable processor:

Right click on the QueryDatabaseTable, Choose scheduling type, manual or cron.

Under Properties tab and add following properties


Database Connection Pooling Service : SalesForceConnect

Database Type : Generic

Table Name : Opportunity 

Apply and save the processor configuration.


PutHive3Streaming 


NOTE: PutHivestreaming was used in the beginning, but for the downstream hive version 3.x 
@Matthew Burgess recommended to use PutHive3Streaming processor.
Restart NiFi and PutHive3Streaming should be available for use. PutHiveStreaming kept throwing
error not able connect to hivemetastore URI. Like below:




Configure PutHive3Streaming Processor 

Drag another processor from the menu and choose PutHive3streaming as your processor from the list.
Under Properties tab and add following properties:

Record Reader = AvroReader
Hive Metastore URI = thrift://localhost:9083
Hive Configuration Resources = /usr/local/Cellar/hive/3.1.2/libexec/conf/hive-site.xml
Database Name = default 
Table Name = opportunity


             Properties: 


              Relationship termination settings:



Connect the processor QueryDatabaseTable to PutHive3streaming.

Make sure to check success relationship. Then click on Add/Apply.




Opportunity table schema in Hive:


create table opportunity2
(ID string,
ISDELETED boolean,
ACCOUNTID string,
ISPRIVATE boolean,
NAME string,
DESCRIPTION string,
STAGENAME string,
AMOUNT string,
PROBABILITY string,
EXPECTEDREVENUE string,
TOTALOPPORTUNITYQUANTITY double,
CLOSEDATE string,
TYPE string,
NEXTSTEP string,
LEADSOURCE string,
ISCLOSED boolean,
ISWON boolean,
FORECASTCATEGORY string,
FORECASTCATEGORYNAME string,
CAMPAIGNID string,
HASOPPORTUNITYLINEITEM boolean,
PRICEBOOK2ID string,
OWNERID string,
CREATEDDATE string,
CREATEDBYID string,
LASTMODIFIEDDATE string,
LASTMODIFIEDBYID string,
SYSTEMMODSTAMP string,
LASTACTIVITYDATE string,
FISCALQUARTER int,
FISCALYEAR int,
FISCAL string,
LASTVIEWEDDATE string,
LASTREFERENCEDDATE string,
HASOPENACTIVITY boolean,
HASOVERDUETASK boolean,
DELIVERYINSTALLATIONSTATUS__C string,
TRACKINGNUMBER__C string,
ORDERNUMBER__C string,
CURRENTGENERATORS__C string,
MAINCOMPETITORS__C string )
STORED AS ORC TBLPROPERTIES ('transactional'='true');

NOTES: 


SDFC Security Token : 


This Token is needed to connect to SDFC via jdbc. 
Login to sdfc dev env and reset it, it will send the token in the email attached to the account.



Error while connecting to sdfc without security token.


Hive3 Acid table issues: 


Hive version being used was hive 3.1.2. By default table created was not transactional. This led to the error below. 


Error
2019-11-07 13:25:39,730 ERROR [Timer-Driven Process Thread-3] o.a.h.streaming.HiveStreamingConnection HiveEndPoint { metaStoreUri: thrift://localhost:9083, database: default, table: opportunity2 } must use an acid table. 
2019-11-07 13:25:39,731 ERROR [Timer-Driven Process Thread-3] o.a.n.processors.hive.PutHive3Streaming PutHive3Streaming[id=46a1d083-016e-1000-280f-5a026f1ceef7] PutHive3Streaming[id=46a1d083-016e-1000-280f-5a026f1ceef7] failed to process session due to java.lang.NullPointerException; Processor Administratively Yielded for 1 sec: java.lang.NullPointerException 


Note:{ metaStoreUri: thrift://localhost:9083, database: default, table: opportunity2 } must use an acid table.


Hive Command line properties set:

set hive.support.concurrency = true;
set hive.enforce.bucketing = true;
set hive.exec.dynamic.partition.mode = nonstrict;
set hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
set hive.compactor.initiator.on = true;
set hive.compactor.worker.threads = 1 


Only after this creating table with TBLPROPERTIES ('transactional'='true') was successful.

END PRODUCT


Apache NiFi Flow:



HIVE Result: 

Beeline >  !connect jdbc:hive2://localhost:10000;
0: jdbc:hive2://localhost:10000> select opportunity2.id, opportunity2.accountid,
opportunity2.name, opportunity2.stagename, opportunity2.amount 
from opportunity2 limit 10;

hive