Introducing Mm FLaNK... An Apache Flink Stack for Rapid Streaming Development From Edge 2 AI

Introducing Mm FLaNK... 

An Apache Flink Stack for Rapid Streaming Development From Edge 2 AI





Source:   https://github.com/tspannhw/MmFLaNK

Stateless NiFi:   https://www.datainmotion.dev/2019/11/exploring-apache-nifi-110-parameters.html

To show an example of using the Mm FLaNK stack we have an Apache NiFi flow that reads IoT data (JSON) and send it to Apache Kafka.   An Apache Flink streaming application running in YARN reads it, validates the data and send it to another Kafka topic.  We monitor and check the data with SMM.    The data from that second topic is read by Apache NiFi and pushed to Apache Kudu tables.

Mm FLaNK Stack (MXNet, MiNiFi, Flink, NiFi, Kafka, Kudu)



 First, we rapidly ingest, route, transform, convert, query and process data with Apache NiFi.   Once we have transformed it into a client, schema-validated known data type we can stream it to Kafka for additional processing.



Second, we read from that Kafka topic, iot, with a Java Apache Flink application.   We then filter out empty records and then push it to another Kafka topic, iot-result.
       public static class NotNullFilter implements FilterFunction<String> {                    @Override public boolean filter(String string) throws Exception {                  if ( string == null || string.isEmpty() || string.trim().length() <=0) {                return false;}           
        return true; } 
}
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "cloudera:9092");
properties.setProperty("group.id","flinkKafkaGroup");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("iot", new SimpleStringSchema(), properties);
DataStream<String> source = env.addSource(consumer).name("Flink IoT Kafka Source");
source.print();
source.filter(new NotNullFilter());
source.addSink(new FlinkKafkaProducer<>("cloudera:9092", "iot-result",
new SimpleStringSchema())).name("Flink Kafka IoT Result Sink");


We compile our Java application with maven and build a big JAR that we will use the Flink Client to push to YARN on our CDH 6.3 cluster.



Once the Flink application is running we can see a lot of metrics, logs and information on our streaming service.  We can browse the logs via YARN UI and Flink UI.


From the Hadoop YARN UI, you can Link to this application's Flink Dashboard.




You can easily see logs from various components, your application, containers and various systems.



The Apache Flink Dashboard is a very rich easy to navigate to interface for all the information related to your running job.



You can see and add charts to view various metrics about sources, sinks and a large number of other interesting real-time attributes.


There is also an older view that is still available to show a less dynamic interface.


You can also dive down into individual task managers with their attributes.


Logs are easily accessible:


Another Apache NiFi flow reads from the iot-result topic and does additional processing, could do machine learning / deep learning and then stores to a data store such as Apache Kudu.   If you have attended any of the CDF Workshops, you will notice we could integrate machine learning via a LookupRecord processor calling CDSW Python Machine Learning application via REST.   Data read from Kafka is streaming in with helpful metadata into our provenance data stream including kafka offset, partition, topic and a unique id on the data.



Since we are using Apache Kafka 2.x for a lot of streams, we really need to see what is going on in there.   Fortunately Cloudera Streams Messaging Manager gives us full insight into everything.  We can browse brokers, topics, consumers and producers.   We can also setup alerts to notify someone via various methods (email, rest, etc...) if something happens such as consumer lag.   From the below screen we can see that these two consumers are lagging.   This is because I have stopped them so they cannot process the message in their respective topics.


I restart my consumers and quickly the lag is gone.   Flink and NiFi process Kafka topics quickly!


We can see a nice overview of my FLaNK application via this screen.   I see my current stats on the source and sink Kafka topics I am working with from NiFi and Flink.



I can also dive into a topic (if I have permissions) and see the content of the data.   There's also a link to the Schema Registry for this particular topic and the ability to view various types of Kafka data including Strings and Avro.   As you can see our payloads are JSON IoT data.


Once our NiFi Kafka Consumer has acquired the data we use our PutKudu processor to auto infer the schema of the JSON data and write to an existing Kudu table.  We can query the data via an interface such as Apache Hue.   We run a quick Apache Impala SQL query to get back data as it arrives.   Apache NiFi is inserting data as we stream it through Apache Kafka from our Apache Flink application.







That was easy!   Next up we will expand our Edge piece by utilizing MiNiFi deployed by Cloudera Edge Flow Manager as well as add our second M with Apache MXNet with my Apache MXNet NiFi processor as well as some Python GluonCV, GluonTS and more via Cloudera Data Science Workbench.  We will also use Stateless Apache NiFi 1.10 for some FaaS Kafka consuming and producing.

More FLaNK to come!

Upcoming:  GluonTS for Forecasting Time Series https://gluon-ts.mxnet.io/examples/basic_forecasting_tutorial/tutorial.html

References:

Learning Apache Flink 1.9


Learning Apache Flink 1.9






















Everything is a Stream, important concept to remember when working with Apache Flink.


Submit a Job

https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html


flink run -m yarn-cluster -d -p 2 -ynm Wiki iot-1.0.jar 

Mm Flank

I am working on a nice integration pattern with the Mm Flank Stack of Apache NiFi-MiniFi, MXNet, Flink, NiFi, Kafka and Kudu.

Flink Monitoring



{"refresh-interval":10000,"timezone-offset":0,"timezone-name":"Coordinated Universal Time","flink-version":"1.9.0-csa1.0.0.0","flink-revision":"3cedceb @ 04.11.2019 @ 13:38:10 UTC"}



--cluster=true

flink run -m yarn-cluster -d -p 2 -yD log4j.configuration.file=log4j.properties -ynm IoT iot-1.0.jar  --cluster true
flink run -m yarn-cluster -d -p 2 -ynm Wiki iot-1.0.jar 

 flink run -m yarn-cluster /opt/cloudera/parcels/FLINK/lib/flink/examples/streaming/WordCount.jar --input hdfs:///tmp/README.txt --output hdfs:///tmp/ReadMe-Counts

References






NiFi Toolkit - CLI - For NiFi 1.10

NiFi Toolkit - CLI - For NiFi 1.10

Along with the updated Apache NiFi server, the NiFi 1.10 release also updated the Command Line Interface with some updated and new features.   Let's check them out.

Cool Tools

S2S.sh - send data to Apache NiFi via the CLI.

Formatted as such:
[{"attributes":{"key":"value"},"data":"stuff"}]

Examples

registry import-flow-version


Get Into Interactive Mode

./cli.sh




Get Parameter Contexts (simple or json format)

 nifi list-param-contexts -u http://localhost:8080 -ot simple



Export Parameter Context

nifi export-param-context -u http://localhost:8080 -verbose --paramContextId 8067d863-016e-1000-f0f7-265210d3e7dc 




Get Services

 nifi get-services -u http://localhost:8080


NiFi Dump

../bin/nifi.sh dump filedump.txt

NiFi home: /Users/tspann/Documents/nifi-1.10.0

Bootstrap Config File: /Users/tspann/Documents/nifi-1.10.0/conf/bootstrap.conf

2019-11-18 17:08:04,921 INFO [main] org.apache.nifi.bootstrap.Command Successfully wrote thread dump to /Users/tspann/Documents/nifi-1.10.0/filedump.txt

NiFi Diagnostics

../bin/nifi.sh diagnostics diag.txt

Java home:
NiFi home: /Users/tspann/Documents/nifi-1.10.0

Bootstrap Config File: /Users/tspann/Documents/nifi-1.10.0/conf/bootstrap.conf

2019-11-18 17:11:09,844 INFO [main] org.apache.nifi.bootstrap.Command Successfully wrote diagnostics information to /Users/tspann/Documents/nifi-1.10.0/diag.txt

2019-11-18 17:11:10,041 INFO [main] org.apache.nifi.bootstrap.Command gopherProxySet = false
2019-11-18 17:11:10,042 INFO [main] org.apache.nifi.bootstrap.Command awt.toolkit = sun.lwawt.macosx.LWCToolkit
2019-11-18 17:11:10,042 INFO [main] org.apache.nifi.bootstrap.Command java.specification.version = 11
2019-11-18 17:11:10,042 INFO [main] org.apache.nifi.bootstrap.Command sun.cpu.isalist =
2019-11-18 17:11:10,042 INFO [main] org.apache.nifi.bootstrap.Command sun.jnu.encoding = UTF-8
2019-11-18 17:11:10,042 INFO [main] org.apache.nifi.bootstrap.Command java.class.path = /Users/tspann/Documents/nifi-1.10.0/./conf:/Users/tspann/Documents/nifi-1.10.0/./lib/jetty-schemas-3.1.jar:/Users/tspann/Documents/nifi-1.10.0/./lib/slf4j-api-1.7.26.jar:/Users/tspann/Documents/nifi-1.10.0/./lib/stanford-english-corenlp-2018-02-27-models.jar:/Users/tspann/Documents/nifi-1.10.0/./lib/jcl-over-slf4j-1.7.26.jar:/Users/tspann/Documents/nifi-1.10.0/./lib/javax.servlet-api-3.1.0.jar:/Users/tspann/Documents/nifi-1.10.0/./lib/logback-classic-1.2.3.jar:/Users/tspann/Documents/nifi-1.10.0/./lib/nifi-properties-1.10.0.jar:/Users/tspann/Documents/nifi-1.10.0/./lib/nifi-nar-utils-1.10.0.jar:/Users/tspann/Documents/nifi-1.10.0/./lib/nifi-api-1.10.0.jar:/Users/tspann/Documents/nifi-1.10.0/./lib/nifi-framework-api-1.10.0.jar:/Users/tspann/Documents/nifi-1.10.0/./lib/jul-to-slf4j-1.7.26.jar:/Users/tspann/Documents/nifi-1.10.0/./lib/logback-core-1.2.3.jar:/Users/tspann/Documents/nifi-1.10.0/./lib/log4j-over-slf4j-1.7.26.jar:/Users/tspann/Documents/nifi-1.10.0/./lib/nifi-runtime-1.10.0.jar:/Users/tspann/Documents/nifi-1.10.0/./lib/java11/javax.annotation-api-1.3.2.jar:/Users/tspann/Documents/nifi-1.10.0/./lib/java11/jaxb-core-2.3.0.jar:/Users/tspann/Documents/nifi-1.10.0/./lib/java11/javax.activation-api-1.2.0.jar:/Users/tspann/Documents/nifi-1.10.0/./lib/java11/jaxb-impl-2.3.0.jar:/Users/tspann/Documents/nifi-1.10.0/./lib/java11/jaxb-api-2.3.0.jar
2019-11-18 17:11:10,042 INFO [main] org.apache.nifi.bootstrap.Command java.vm.vendor = Amazon.com Inc.
2019-11-18 17:11:10,042 INFO [main] org.apache.nifi.bootstrap.Command javax.security.auth.useSubjectCredsOnly = true
2019-11-18 17:11:10,043 INFO [main] org.apache.nifi.bootstrap.Command sun.arch.data.model = 64
2019-11-18 17:11:10,043 INFO [main] org.apache.nifi.bootstrap.Command sun.font.fontmanager = sun.font.CFontManager
2019-11-18 17:11:10,043 INFO [main] org.apache.nifi.bootstrap.Command java.vendor.url = https://aws.amazon.com/corretto/
2019-11-18 17:11:10,043 INFO [main] org.apache.nifi.bootstrap.Command user.timezone = America/New_York
2019-11-18 17:11:10,043 INFO [main] org.apache.nifi.bootstrap.Command org.apache.nifi.bootstrap.config.log.dir = /Users/tspann/Documents/nifi-1.10.0/logs
2019-11-18 17:11:10,043 INFO [main] org.apache.nifi.bootstrap.Command os.name = Mac OS X
2019-11-18 17:11:10,043 INFO [main] org.apache.nifi.bootstrap.Command java.vm.specification.version = 11
2019-11-18 17:11:10,043 INFO [main] org.apache.nifi.bootstrap.Command nifi.properties.file.path = /Users/tspann/Documents/nifi-1.10.0/./conf/nifi.properties
2019-11-18 17:11:10,043 INFO [main] org.apache.nifi.bootstrap.Command sun.java.launcher = SUN_STANDARD
2019-11-18 17:11:10,043 INFO [main] org.apache.nifi.bootstrap.Command user.country = US
2019-11-18 17:11:10,043 INFO [main] org.apache.nifi.bootstrap.Command sun.boot.library.path = /Library/Java/JavaVirtualMachines/amazon-corretto-11.jdk/Contents/Home/lib
2019-11-18 17:11:10,043 INFO [main] org.apache.nifi.bootstrap.Command app = NiFi
2019-11-18 17:11:10,043 INFO [main] org.apache.nifi.bootstrap.Command sun.java.command = org.apache.nifi.NiFi
2019-11-18 17:11:10,043 INFO [main] org.apache.nifi.bootstrap.Command jdk.debug = release
2019-11-18 17:11:10,043 INFO [main] org.apache.nifi.bootstrap.Command org.apache.jasper.compiler.disablejsr199 = true
2019-11-18 17:11:10,043 INFO [main] org.apache.nifi.bootstrap.Command sun.cpu.endian = little
2019-11-18 17:11:10,043 INFO [main] org.apache.nifi.bootstrap.Command user.home = /Users/tspann
2019-11-18 17:11:10,044 INFO [main] org.apache.nifi.bootstrap.Command user.language = en
2019-11-18 17:11:10,044 INFO [main] org.apache.nifi.bootstrap.Command java.specification.vendor = Oracle Corporation
2019-11-18 17:11:10,044 INFO [main] org.apache.nifi.bootstrap.Command java.version.date = 2019-07-16
2019-11-18 17:11:10,044 INFO [main] org.apache.nifi.bootstrap.Command java.home = /Library/Java/JavaVirtualMachines/amazon-corretto-11.jdk/Contents/Home
2019-11-18 17:11:10,044 INFO [main] org.apache.nifi.bootstrap.Command file.separator = /
2019-11-18 17:11:10,044 INFO [main] org.apache.nifi.bootstrap.Command java.vm.compressedOopsMode = Zero based
2019-11-18 17:11:10,044 INFO [main] org.apache.nifi.bootstrap.Command line.separator =

2019-11-18 17:11:10,044 INFO [main] org.apache.nifi.bootstrap.Command java.specification.name = Java Platform API Specification
2019-11-18 17:11:10,044 INFO [main] org.apache.nifi.bootstrap.Command java.vm.specification.vendor = Oracle Corporation
2019-11-18 17:11:10,050 INFO [main] org.apache.nifi.bootstrap.Command javax.xml.xpath.XPathFactory:http://saxon.sf.net/jaxp/xpath/om = net.sf.saxon.xpath.XPathFactoryImpl
2019-11-18 17:11:10,050 INFO [main] org.apache.nifi.bootstrap.Command java.awt.graphicsenv = sun.awt.CGraphicsEnvironment
2019-11-18 17:11:10,051 INFO [main] org.apache.nifi.bootstrap.Command java.awt.headless = true
2019-11-18 17:11:10,051 INFO [main] org.apache.nifi.bootstrap.Command java.protocol.handler.pkgs = sun.net.www.protocol
2019-11-18 17:11:10,051 INFO [main] org.apache.nifi.bootstrap.Command sun.management.compiler = HotSpot 64-Bit Tiered Compilers
2019-11-18 17:11:10,051 INFO [main] org.apache.nifi.bootstrap.Command java.runtime.version = 11.0.4+11-LTS
2019-11-18 17:11:10,051 INFO [main] org.apache.nifi.bootstrap.Command user.name = tspann
2019-11-18 17:11:10,051 INFO [main] org.apache.nifi.bootstrap.Command java.net.preferIPv4Stack = true
2019-11-18 17:11:10,051 INFO [main] org.apache.nifi.bootstrap.Command path.separator = :
2019-11-18 17:11:10,051 INFO [main] org.apache.nifi.bootstrap.Command java.security.egd = file:/dev/urandom
2019-11-18 17:11:10,051 INFO [main] org.apache.nifi.bootstrap.Command org.jruby.embed.localvariable.behavior = persistent
2019-11-18 17:11:10,051 INFO [main] org.apache.nifi.bootstrap.Command os.version = 10.14.6
2019-11-18 17:11:10,051 INFO [main] org.apache.nifi.bootstrap.Command java.runtime.name = OpenJDK Runtime Environment
2019-11-18 17:11:10,051 INFO [main] org.apache.nifi.bootstrap.Command file.encoding = UTF-8
2019-11-18 17:11:10,052 INFO [main] org.apache.nifi.bootstrap.Command sun.net.http.allowRestrictedHeaders = true
2019-11-18 17:11:10,052 INFO [main] org.apache.nifi.bootstrap.Command jnidispatch.path = /var/folders/t5/xz5j50wx2rl8kd3021lkbn800000gn/T/jna--864347536/jna3349001211756681540.tmp
2019-11-18 17:11:10,052 INFO [main] org.apache.nifi.bootstrap.Command java.vm.name = OpenJDK 64-Bit Server VM
2019-11-18 17:11:10,052 INFO [main] org.apache.nifi.bootstrap.Command jna.platform.library.path = /usr/lib:/usr/lib
2019-11-18 17:11:10,052 INFO [main] org.apache.nifi.bootstrap.Command java.vendor.version = Corretto-11.0.4.11.1
2019-11-18 17:11:10,052 INFO [main] org.apache.nifi.bootstrap.Command jna.loaded = true
2019-11-18 17:11:10,052 INFO [main] org.apache.nifi.bootstrap.Command java.vendor.url.bug = https://github.com/corretto/corretto-11/issues/
2019-11-18 17:11:10,052 INFO [main] org.apache.nifi.bootstrap.Command jetty.git.hash = afcf563148970e98786327af5e07c261fda175d3
2019-11-18 17:11:10,052 INFO [main] org.apache.nifi.bootstrap.Command java.io.tmpdir = /var/folders/t5/xz5j50wx2rl8kd3021lkbn800000gn/T/
2019-11-18 17:11:10,052 INFO [main] org.apache.nifi.bootstrap.Command java.version = 11.0.4
2019-11-18 17:11:10,052 INFO [main] org.apache.nifi.bootstrap.Command user.dir = /Users/tspann/Documents/nifi-1.10.0
2019-11-18 17:11:10,052 INFO [main] org.apache.nifi.bootstrap.Command os.arch = x86_64
2019-11-18 17:11:10,052 INFO [main] org.apache.nifi.bootstrap.Command nifi.bootstrap.listen.port = 55105
2019-11-18 17:11:10,052 INFO [main] org.apache.nifi.bootstrap.Command java.vm.specification.name = Java Virtual Machine Specification
2019-11-18 17:11:10,052 INFO [main] org.apache.nifi.bootstrap.Command java.awt.printerjob = sun.lwawt.macosx.CPrinterJob
2019-11-18 17:11:10,053 INFO [main] org.apache.nifi.bootstrap.Command sun.os.patch.level = unknown
2019-11-18 17:11:10,053 INFO [main] org.apache.nifi.bootstrap.Command bridj.quiet = true
2019-11-18 17:11:10,053 INFO [main] org.apache.nifi.bootstrap.Command java.library.path = /Users/tspann/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.
2019-11-18 17:11:10,053 INFO [main] org.apache.nifi.bootstrap.Command java.vendor = Amazon.com Inc.
2019-11-18 17:11:10,053 INFO [main] org.apache.nifi.bootstrap.Command java.vm.info = mixed mode
2019-11-18 17:11:10,053 INFO [main] org.apache.nifi.bootstrap.Command java.vm.version = 11.0.4+11-LTS
2019-11-18 17:11:10,053 INFO [main] org.apache.nifi.bootstrap.Command sun.io.unicode.encoding = UnicodeBig
2019-11-18 17:11:10,053 INFO [main] org.apache.nifi.bootstrap.Command java.class.version = 55.0


Resources

NiFi 1.10: PostSlack - Easy Image Upload


In Apache NiFi 1.10, we now have a nice PostSlack processor to send images to slack channels.   If you have read my blog I had a lesser processor that I wrote:  https://www.datainmotion.dev/2019/03/posting-images-to-slack-from-apache.html.
Time to upgrade my friend.  

To use the new processor you will need to have a Slack account, Slack administrator permissions and a Slack application created so you can have an application token.   You can grab that token here:
https://api.slack.com/methods/files.upload/test. Go to Tester tab, pick your app, mine is Nifi. then click test. At the bottom will be a URL with the token=? Grab that token.

Notes:

You also must login with slack admin role

files:write
files:write:user

under YOURSLACK /apps/manage .  then dive into app

https://api.slack.com/messaging/files/setup

You can make this a parameter so you can make your PostSlack processor into a stateless module.



Our Example Flow:   Sending Webcam Images to Slack


First we need a source, could be any picture source, I use my WebCam processor which grabs images from a webcamera attached to the NiFi server.

Then I add a mime.type, some sources will already set this for you.   For me I manually set the image mimetype to image/jpeg.


Posting to Slack is easy, just add the channel and that token we grabbed from the slack admin site.


The webcam image posted to slack.




 That's it, a new processor available for you.


In a future article I will talk about a new Rules Feature:







Ingest Salesforce Data into Hive Using Apache Nifi

Ingest Salesforce Data into Hive Using Apache NiFi

TOOLS AND ACCOUNT USED: 

Salesforce JDBC driver : https://www.progress.com/jdbc/salesforce
Java : 1.8.0_221 
OSS: MacOS Mojave 10.14.5
SDFC Developer account: https://developer.salesforce.com/signup - 15 day trial


Overall Flow :

Install Hive - Thrift, Metastore, and Hive Table schema that matches SDFC opportunity table. 
Install NiFi - Add PutHive3streaming nar 
Install and Configure Drivers : SalesforceConnect and Avroreader
Add Processors - QueryDatabaseTable and Puthive3Streaming


Install Salesforce JDBC and Hive drivers

  • Download DataDirect Salesforce JDBC driver from here.
  • Download Cloudera Hive driver from here.
  • To install the driver, execute the .jar package by running the following command in the terminal or just by double clicking on the jar package.


java -jar PROGRESS_DATADIRECT_JDBC_SF_ALL.jar
java -jar PROGRESS_DATADIRECT_JDBC_INSTALL.jar




Add Drivers to the Apache NiFi Classpath

Copy drivers

From - /home/user/Progress/DataDirect/Connect_for_JDBC_51/{sforce.jar,hive.jar}
To - /path/to/nifi/lib [/usr/local/Cellar/nifi/1.9.2/libexec/lib/

Restart Apache NiFi



Build the Apache NiFi Flow

Launch Apache NiFi: http://localhost:8080/nifi

Add and Configure the JDBC drivers
DBCPConnectionPool
AvroReader
Add DBCPConnectionPool

Choose DBCPConnectionPool as your controller service

Add Properties below:

Database Connection URL = jdbc:datadirect:sforce://login.salesforce.com;SecurityToken=****

- NOTE below on SecurityToken

  • Database Driver Class Name : com.ddtek.jdbc.sforce.SForceDriver
  • Database User - Developer account username 
  • Password - the Password for the Developer account
  • Name - SalesforceConnect


Add Avro reader Controller : 
This controller is used by the subsequent co-processor(PutHive3Streaming) to read Avro format data
from the previous processor(QueryDatabaseTable). 



Add and Configure Processors


QueryDatabaseTable
PutHive3Streaming


QueryDatabaseTable

To read the data from Salesforce use a processor called QueryDatabaseTable that supports
incremental data pull.

Choose QueryDatabaseTable and add it on to canvas as below.





Configure QueryDatabaseTable processor:

Right click on the QueryDatabaseTable, Choose scheduling type, manual or cron.

Under Properties tab and add following properties


Database Connection Pooling Service : SalesForceConnect

Database Type : Generic

Table Name : Opportunity 

Apply and save the processor configuration.


PutHive3Streaming 


NOTE: PutHivestreaming was used in the beginning, but for the downstream hive version 3.x 
@Matthew Burgess recommended to use PutHive3Streaming processor.
Restart NiFi and PutHive3Streaming should be available for use. PutHiveStreaming kept throwing
error not able connect to hivemetastore URI. Like below:




Configure PutHive3Streaming Processor 

Drag another processor from the menu and choose PutHive3streaming as your processor from the list.
Under Properties tab and add following properties:

Record Reader = AvroReader
Hive Metastore URI = thrift://localhost:9083
Hive Configuration Resources = /usr/local/Cellar/hive/3.1.2/libexec/conf/hive-site.xml
Database Name = default 
Table Name = opportunity


             Properties: 


              Relationship termination settings:



Connect the processor QueryDatabaseTable to PutHive3streaming.

Make sure to check success relationship. Then click on Add/Apply.




Opportunity table schema in Hive:


create table opportunity2
(ID string,
ISDELETED boolean,
ACCOUNTID string,
ISPRIVATE boolean,
NAME string,
DESCRIPTION string,
STAGENAME string,
AMOUNT string,
PROBABILITY string,
EXPECTEDREVENUE string,
TOTALOPPORTUNITYQUANTITY double,
CLOSEDATE string,
TYPE string,
NEXTSTEP string,
LEADSOURCE string,
ISCLOSED boolean,
ISWON boolean,
FORECASTCATEGORY string,
FORECASTCATEGORYNAME string,
CAMPAIGNID string,
HASOPPORTUNITYLINEITEM boolean,
PRICEBOOK2ID string,
OWNERID string,
CREATEDDATE string,
CREATEDBYID string,
LASTMODIFIEDDATE string,
LASTMODIFIEDBYID string,
SYSTEMMODSTAMP string,
LASTACTIVITYDATE string,
FISCALQUARTER int,
FISCALYEAR int,
FISCAL string,
LASTVIEWEDDATE string,
LASTREFERENCEDDATE string,
HASOPENACTIVITY boolean,
HASOVERDUETASK boolean,
DELIVERYINSTALLATIONSTATUS__C string,
TRACKINGNUMBER__C string,
ORDERNUMBER__C string,
CURRENTGENERATORS__C string,
MAINCOMPETITORS__C string )
STORED AS ORC TBLPROPERTIES ('transactional'='true');

NOTES: 


SDFC Security Token : 


This Token is needed to connect to SDFC via jdbc. 
Login to sdfc dev env and reset it, it will send the token in the email attached to the account.



Error while connecting to sdfc without security token.


Hive3 Acid table issues: 


Hive version being used was hive 3.1.2. By default table created was not transactional. This led to the error below. 


Error
2019-11-07 13:25:39,730 ERROR [Timer-Driven Process Thread-3] o.a.h.streaming.HiveStreamingConnection HiveEndPoint { metaStoreUri: thrift://localhost:9083, database: default, table: opportunity2 } must use an acid table. 
2019-11-07 13:25:39,731 ERROR [Timer-Driven Process Thread-3] o.a.n.processors.hive.PutHive3Streaming PutHive3Streaming[id=46a1d083-016e-1000-280f-5a026f1ceef7] PutHive3Streaming[id=46a1d083-016e-1000-280f-5a026f1ceef7] failed to process session due to java.lang.NullPointerException; Processor Administratively Yielded for 1 sec: java.lang.NullPointerException 


Note:{ metaStoreUri: thrift://localhost:9083, database: default, table: opportunity2 } must use an acid table.


Hive Command line properties set:

set hive.support.concurrency = true;
set hive.enforce.bucketing = true;
set hive.exec.dynamic.partition.mode = nonstrict;
set hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
set hive.compactor.initiator.on = true;
set hive.compactor.worker.threads = 1 


Only after this creating table with TBLPROPERTIES ('transactional'='true') was successful.

END PRODUCT


Apache NiFi Flow:



HIVE Result: 

Beeline >  !connect jdbc:hive2://localhost:10000;
0: jdbc:hive2://localhost:10000> select opportunity2.id, opportunity2.accountid,
opportunity2.name, opportunity2.stagename, opportunity2.amount 
from opportunity2 limit 10;

hive