Skip to main content

Exploring Apache NiFi 1.10: Stateless Engine and Parameters

Exploring Apache NiFi 1.10:   Stateless Engine and Parameters

Apache NiFi is now available in 1.10!


https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12316020&version=12344993

You can now use JDK 8 or JDK 11!   I am running in JDK 11, seems a bit faster.

A huge feature is the addition of Parameters!   And you can use these to pass parameters to Apache NiFi Stateless!

A few lesser Processors have been moved from the main download, see here for migration hints:
https://cwiki.apache.org/confluence/display/NIFI/Migration+Guidance

Release Notes:   https://cwiki.apache.org/confluence/display/NIFI/Release+Notes#ReleaseNotes-Version1.10.0

Example Source Code:   https://github.com/tspannhw/stateless-examples

More New Features:

  • ParquetReader/Writer (See:  https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_7.html)
  • Prometheus Reporting Task.   Expect more Prometheus stuff coming.
  • Experimental Encrypted content repository.   People asked me for this one before.
  • Parameters!!    Time to replace Variables/Variable Registry.   Parameters are better in every way.
  • Toolkit module to generate and build Swagger API library for NiFi
  • PostSlack processor
  • PublishKafka Partition Support
  • GeoEnrichIPRecord Processor
  • Remote Input Port in a Process Group
  • Command Line Diagnostics
  • RocksDB FlowFile Repository
  • PutBigQueryStreaming Processor
  • nifi.analytics.predict.enabled - Turn on Back Pressure Prediction
  • More Lookup Services for ETL/ELT:   DatabaseRecordLookupService
  • KuduLookupService
  • HBase_2_ListLookupService
Stateless

First we will run in the command line straight from the NiFi Registry.    This is easiest.   Then we will run from YARN!   Yes you can now run your Apache NiFi flows on your giant Cloudera CDH/HDP/CDP YARN clusters!   Let's make use of your hundreds of Hadoop nodes.



Stateless Examples


Let's Build A Stateless Flow

The first thing to keep in mind, is we will want anything that might change to be a parameter that we can pass with our JSON file.    It's very easy to set parameters even for drop downs!   You even get prompted to pick a parameter from a selection list.   Before parameters are available you will need to add them to a parameter list and assign that parameter context to your Process Group.


A Parameter in a Processor Configuration is shown as #{broker}

Parameter Context Connected to a Process Group, Controller Service, ...


Apply those parameters



Param(eter) is now an option for properties


Pop-up Hint for Using Parameters


Edit a Parameter in a Parameter Context



We can configure parameters in Controller Services as well.




So easy to choose an existing one.

Use them for anything that can change or is a something you don't want to hardcode.




Apache Kafka Consumer to Sink

This is a simple two step Apache NiFi flow the reads from Kafka and sends to a sink, for example a File.



Let's make sure we use that Parameter Context 




To Build Your JSON Configuration File you will need the bucket ID and flow ID from your Apache NiFi Registry.   You will also need the URL for that registry.   You can browse that registry at a URL similiar to http://tspann-mbp15-hw14277:18080.





My Command Line Runner


/Users/tspann/Documents/nifi-1.10.0-SNAPSHOT/bin/nifi.sh stateless RunFromRegistry Continuous --file /Users/tspann/Documents/nifi-1.10.0-SNAPSHOT/logs/kafkaconsumer.json


RunFromRegistry [Once|Continuous] --file <File Name>

This is the basic use case of running from the command-line using a file.   The flow must exist in the reference Apache NiFi Registry.

JSON Configuration File (kafkaconsumer.json)

{
  "registryUrl": "http://tspann-mbp15-hw14277:18080",
  "bucketId": "140b30f0-5a47-4747-9021-19d4fde7f993",
  "flowId": "0540e1fd-c7ca-46fb-9296-e37632021945",
  "ssl": {
    "keystoreFile": "",
    "keystorePass": "",
    "keyPass": "",
    "keystoreType": "",
    "truststoreFile": "/Library/Java/JavaVirtualMachines/amazon-corretto-11.jdk/Contents/Home/lib/security/cacerts",
    "truststorePass": "changeit",
    "truststoreType": "JKS"
  },
  "parameters": {
    "broker" : "4.317.852.100:9092",
    "topic" : "iot",
    "group_id" : "nifi-stateless-kafka-consumer",
    "DestinationDirectory" : "/tmp/nifistateless/output2/",
    "output_dir": "/Users/tspann/Documents/nifi-1.10.0-SNAPSHOT/logs/output"
  }
}

Example Run


12:25:38.725 [main] DEBUG org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0 - ConsumeKafka_2_0[id=e405df7f-87ca-305a-95a9-d25e3c5dbb56] Running ConsumeKafka_2_0.onTrigger with 0 FlowFiles
12:25:38.728 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Node 8 sent an incremental fetch response for session 1943199939 with 0 response partition(s), 10 implied partition(s)
12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-8 at offset 15 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)
12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-9 at offset 16 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)
12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-6 at offset 17 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)
12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-7 at offset 17 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)
12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-4 at offset 18 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)
12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-5 at offset 16 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)
12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-2 at offset 17 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)
12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-3 at offset 19 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)
12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-0 at offset 16 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)
12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-1 at offset 20 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)
12:25:38.728 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Built incremental fetch (sessionId=1943199939, epoch=5) for node 8. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 10 partition(s)
12:25:38.729 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(iot-8, iot-9, iot-6, iot-7, iot-4, iot-5, iot-2, iot-3, iot-0, iot-1)) to broker ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)
12:25:38.737 [main] DEBUG org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0 - ConsumeKafka_2_0[id=e405df7f-87ca-305a-95a9-d25e3c5dbb56] Running ConsumeKafka_2_0.onTrigger with 0 FlowFiles


Example Output


cat output/247361879273711.statelessFlowFile
{"id":"20191105113853_350b493f-9308-4eb2-b71f-6bcdbaf5d6c1_Timer-Driven Process Thread-13","te":"0.5343","diskusage":"0.2647115097153814.3 MB","memory":57,"cpu":132.87,"host":"192.168.1.249/tspann-MBP15-HW14277","temperature":"72","macaddress":"dd73eadf-1ac1-4f76-aecb-14be86ce46ce","end":"48400221819907","systemtime":"11/05/2019 11:38:53"}



We can also run Once in this example to send one Kafka message.

Generator to Apache Kafka Producer


My Command Line Runner

/Users/tspann/Documents/nifi-1.10.0-SNAPSHOT/bin/nifi.sh stateless RunFromRegistry Once --file /Users/tspann/Documents/nifi-1.10.0-SNAPSHOT/logs/kafka.json


JSON Configuration File (kafka.json)

{
  "registryUrl": "http://tspann-mbp15-hw14277:18080",
  "bucketId": "140b30f0-5a47-4747-9021-19d4fde7f993",
  "flowId": "402814a2-fb7a-4b19-a641-9f4bb191ed67",
  "flowVersion": "1",
  "ssl": {
    "keystoreFile": "",
    "keystorePass": "",
    "keyPass": "",
    "keystoreType": "",
    "truststoreFile": "/Library/Java/JavaVirtualMachines/amazon-corretto-11.jdk/Contents/Home/lib/security/cacerts",
    "truststorePass": "changeit",
    "truststoreType": "JKS"
  },
  "parameters": {
    "broker" : "3.218.152.236:9092"
  }
}

Example Output


12:32:37.717 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Created socket with SO_RCVBUF = 33304, SO_SNDBUF = 131768, SO_TIMEOUT = 0 to node 8
12:32:37.717 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Completed connection to node 8. Fetching API versions.
12:32:37.717 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating API versions fetch from node 8.
12:32:37.732 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Recorded API versions for node 8: (Produce(0): 0 to 7 [usable: 6], Fetch(1): 0 to 10 [usable: 8], ListOffsets(2): 0 to 5 [usable: 3], Metadata(3): 0 to 7 [usable: 6], LeaderAndIsr(4): 0 to 2 [usable: 1], StopReplica(5): 0 to 1 [usable: 0], UpdateMetadata(6): 0 to 5 [usable: 4], ControlledShutdown(7): 0 to 2 [usable: 1], OffsetCommit(8): 0 to 6 [usable: 4], OffsetFetch(9): 0 to 5 [usable: 4], FindCoordinator(10): 0 to 2 [usable: 2], JoinGroup(11): 0 to 4 [usable: 3], Heartbeat(12): 0 to 2 [usable: 2], LeaveGroup(13): 0 to 2 [usable: 2], SyncGroup(14): 0 to 2 [usable: 2], DescribeGroups(15): 0 to 2 [usable: 2], ListGroups(16): 0 to 2 [usable: 2], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 2 [usable: 2], CreateTopics(19): 0 to 3 [usable: 3], DeleteTopics(20): 0 to 3 [usable: 2], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 1 [usable: 1], OffsetForLeaderEpoch(23): 0 to 2 [usable: 1], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 2 [usable: 1], DescribeAcls(29): 0 to 1 [usable: 1], CreateAcls(30): 0 to 1 [usable: 1], DeleteAcls(31): 0 to 1 [usable: 1], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 1 [usable: 0], CreatePartitions(37): 0 to 1 [usable: 1], CreateDelegationToken(38): 0 to 1 [usable: 1], RenewDelegationToken(39): 0 to 1 [usable: 1], ExpireDelegationToken(40): 0 to 1 [usable: 1], DescribeDelegationToken(41): 0 to 1 [usable: 1], DeleteGroups(42): 0 to 1 [usable: 1], UNKNOWN(43): 0)
12:32:37.739 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.iot.records-per-batch
12:32:37.739 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.iot.bytes
12:32:37.739 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.iot.compression-rate
12:32:37.739 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.iot.record-retries
12:32:37.740 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.iot.record-errors
12:32:37.745 [main] DEBUG org.apache.nifi.parameter.ExpressionLanguageAwareParameterParser - For input iot found 0 Parameter references: []
12:32:37.745 [main] DEBUG org.apache.nifi.parameter.ExpressionLanguageAwareParameterParser - For input iot found 0 Parameter references: []
Flow Succeeded


Other Runtime Options:


RunYARNServiceFromRegistry        <YARN RM URL> <Docker Image Name> <Service Name> <# of Containers> --file <File Name>


RunOpenwhiskActionServer          <Port>

References:

© 2019 Timothy Spann

Popular posts from this blog

Migrating Apache Flume Flows to Apache NiFi: Kafka Source to HDFS / Kudu / File / Hive

Migrating Apache Flume Flows to Apache NiFi: Kafka Source to HDFS / Kudu / File / HiveArticle 7 - https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_9.html Article 6 - https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_35.html
Article 5 - 
Article 4 - https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_8.html Article 3 - https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_7.html Article 2 - https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache.html Article 1https://www.datainmotion.dev/2019/08/migrating-apache-flume-flows-to-apache.html Source Code:  https://github.com/tspannhw/flume-to-nifi
This is one possible simple, fast replacement for "Flafka".



Consume / Publish Kafka And Store to Files, HDFS, Hive 3.1, Kudu

Consume Kafka Flow 

 Merge Records And Store As AVRO or ORC
Consume Kafka, Update Records via Machine Learning Models In CDSW And Store to Kudu

Sour…

Ingesting Drone Data From DJII Ryze Tello Drones Part 1 - Setup and Practice

Ingesting Drone Data From DJII Ryze Tello Drones Part 1 - Setup and Practice In Part 1, we will setup our drone, our communication environment, capture the data and do initial analysis. We will eventually grab live video stream for object detection, real-time flight control and real-time data ingest of photos, videos and sensor readings. We will have Apache NiFi react to live situations facing the drone and have it issue flight commands via UDP. In this initial section, we will control the drone with Python which can be triggered by NiFi. Apache NiFi will ingest log data that is stored as CSV files on a NiFi node connected to the drone's WiFi. This will eventually move to a dedicated embedded device running MiniFi. This is a small personal drone with less than 13 minutes of flight time per battery. This is not a commercial drone, but gives you an idea of the what you can do with drones. Drone Live Communications for Sensor Readings and Drone Control You must connect to the drone…