Skip to main content

IoT Edge Use Cases with Apache Kafka and Apache NiFi - MiniFi

Article

MiniFi Java Agent 0.5
Copy over necessary NARs from Apache NiFi 1.7 lib:
  • nifi-ssl-context-service-nar-1.7.0.nar
  • nifi-standard-services-api-nar-1.7.0.nar
  • nifi-kafka-1-0-nar-1.7.0.nar
This will support PublishKafka_1_0 and ConsumeKafka_1_0.
Then create a consume and/or publish flow. You can combine the two based on your needs. In my simple example I consume the Kafka messages in MiniFi and write to a file. I also write the metadata to a JSON file.
Consume Kafka
Publish Electric Monitoring Data To Kafka
Let's monitor the messages going through our topic, smartPlug.
Publish Messages to Kafka
Consume Any Messages From the smartPlug topic
Logs
  1. Provenance Event file containing 377 records. In the past 5 minutes, 1512 events have been written to the Provenance Repository, totaling 839.32 KB
  2.  
  3. 2018-11-26 19:42:32,473 INFO [main] o.a.n.c.s.StandardProcessScheduler Starting PutFile[id=25a86505-031a-37d9-0000-000000000000]2018-11-26 19:42:32,474 INFO [main] o.a.n.c.s.StandardProcessScheduler Starting UpdateAttribute[id=9220d40d-ee1d-3f61-0000-000000000000]2018-11-26 19:42:32,474 INFO [main] o.apache.nifi.controller.FlowController Started 0 Remote Group Ports transmitting2018-11-26 19:42:32,478 INFO [main] org.apache.nifi.minifi.MiNiFiServer Flow loaded successfully.2018-11-26 19:42:32,479 INFO [Monitor Processor Lifecycle Thread-2] o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled ConsumeKafka_1_0[id=8556f1ce-a915-3fda-0000-000000000000] to run with 1 threads2018-11-26 19:42:32,479 INFO [main] org.apache.nifi.BootstrapListener Successfully initiated communication with Bootstrap2018-11-26 19:42:32,479 INFO [Monitor Processor Lifecycle Thread-1] o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled AttributesToJSON[id=0628b4e5-10d0-3b09-0000-000000000000] to run with 1 threads2018-11-26 19:42:32,479 INFO [main] org.apache.nifi.minifi.MiNiFi Controller initialization took 2787584123 nanoseconds.2018-11-26 19:42:32,480 INFO [Monitor Processor Lifecycle Thread-1] o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled PutFile[id=25a86505-031a-37d9-0000-000000000000] to run with 1 threads2018-11-26 19:42:32,481 INFO [Monitor Processor Lifecycle Thread-2] o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled UpdateAttribute[id=9220d40d-ee1d-3f61-0000-000000000000] to run with 1 threads2018-11-26 19:42:32,585 INFO [Timer-Driven Process Thread-2] o.a.k.clients.consumer.ConsumerConfig ConsumerConfig values:auto.commit.interval.ms = 5000auto.offset.reset = latestbootstrap.servers = [princeton1.field.hortonworks.com:6667]check.crcs = trueclient.id =connections.max.idle.ms = 540000enable.auto.commit = falseexclude.internal.topics = truefetch.max.bytes = 52428800fetch.max.wait.ms = 500fetch.min.bytes = 1group.id = minificonsumer1heartbeat.interval.ms = 3000interceptor.classes = nullinternal.leave.group.on.close = trueisolation.level = read_uncommittedkey.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializermax.partition.fetch.bytes = 1048576max.poll.interval.ms = 300000max.poll.records = 10000metadata.max.age.ms = 300000metric.reporters = []metrics.num.samples = 2metrics.recording.level = INFOmetrics.sample.window.ms = 30000partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]receive.buffer.bytes = 65536reconnect.backoff.max.ms = 1000reconnect.backoff.ms = 50request.timeout.ms = 305000retry.backoff.ms = 100sasl.jaas.config = nullsasl.kerberos.kinit.cmd = /usr/bin/kinitsasl.kerberos.min.time.before.relogin = 60000sasl.kerberos.service.name = nullsasl.kerberos.ticket.renew.jitter = 0.05sasl.kerberos.ticket.renew.window.factor = 0.8sasl.mechanism = GSSAPIsecurity.protocol = PLAINTEXTsend.buffer.bytes = 131072session.timeout.ms = 10000ssl.cipher.suites = nullssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]ssl.endpoint.identification.algorithm = nullssl.key.password = nullssl.keymanager.algorithm = SunX509ssl.keystore.location = nullssl.keystore.password = nullssl.keystore.type = JKSssl.protocol = TLSssl.provider = nullssl.secure.random.implementation = nullssl.trustmanager.algorithm = PKIXssl.truststore.location = nullssl.truststore.password = nullssl.truststore.type = JKSvalue.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer2018-11-26 19:42:32,727 INFO [Timer-Driven Process Thread-2] o.a.kafka.common.utils.AppInfoParser Kafka version : 1.0.02018-11-26 19:42:32,727 INFO [Timer-Driven Process Thread-2] o.a.kafka.common.utils.AppInfoParser Kafka commitId : aaa7af6d4a11b29d2018-11-26 19:42:33,088 INFO [Timer-Driven Process Thread-2] o.a.k.c.c.internals.AbstractCoordinator [Consumer clientId=consumer-1, groupId=minificonsumer1] Discovered coordinator princeton1.field.hortonworks.com:6667 (id: 2147482646 rack: null)2018-11-26 19:42:33,090 INFO [Timer-Driven Process Thread-2] o.a.k.c.c.internals.ConsumerCoordinator [Consumer clientId=consumer-1, groupId=minificonsumer1] Revoking previously assigned partitions []2018-11-26 19:42:33,091 INFO [Timer-Driven Process Thread-2] o.a.k.c.c.internals.AbstractCoordinator [Consumer clientId=consumer-1, groupId=minificonsumer1] (Re-)joining group2018-11-26 19:42:36,391 INFO [Timer-Driven Process Thread-2] o.a.k.c.c.internals.AbstractCoordinator [Consumer clientId=consumer-1, groupId=minificonsumer1] Successfully joined group with generation 32018-11-26 19:42:36,394 INFO [Timer-Driven Process Thread-2] o.a.k.c.c.internals.ConsumerCoordinator [Consumer clientId=consumer-1, groupId=minificonsumer1] Setting newly assigned partitions [smartPlug-0]2018-11-26 19:44:32,325 INFO [pool-34-thread-1] o.a.n.c.r.WriteAheadFlowFileRepository Successfully checkpointed FlowFile Repository with 0 records in 0 milliseconds2018-11-26 19:44:40,700 INFO [Provenance Maintenance Thread-1] o.a.n.p.PersistentProvenanceRepository Created new Provenance Event Writers for events starting with ID 14372018-11-26 19:44:40,765 INFO [Provenance Repository Rollover Thread-1] o.a.n.p.lucene.SimpleIndexManager Index Writer for provenance_repository/index-1543271506000 has been returned to Index Manager and is no longer in use. Closing Index Writer2018-11-26 19:44:40,767 INFO [Provenance Repository Rollover Thread-1] o.a.n.p.PersistentProvenanceRepository Successfully merged 16 journal files (28 records) into single Provenance Log File provenance_repository/1409.prov in 62 milliseconds2018-11-26 19:44:40,768 INFO [Provenance Repository Rollover Thread-1] o.a.n.p.PersistentProvenanceRepository Successfully Rolled over Provenance Event file containing 151 records. In the past 5 minutes, 28 events have been written to the Provenance Repository, totaling 15.43 KB
JSON Kafka Message and JSON Kafka Metadata Stored As Files
monitor/1448678223641638.attr.json
  1. {"path":"./","filename":"1448678223641638","kafka.partition":"0","kafka.offset":"5543","kafka.topic":"smartPlug","kafka.key":"cb90ad21-b311-494c-96cc-06dd2e8747df","uuid":"041459fc-c63e-4056-ab50-1c375cd7d49f"}
monitor/1448678223641638
  1. {"day30": 0.431, "day31": 1.15, "sw_ver": "1.2.5 Build 171206 Rel.085954", "hw_ver": "1.0", "mac": "50:C7:BF:B1:95:D5", "type": "IOT.SMARTPLUGSWITCH", "hwId": "60FF6B258734EA6880E186F8C96DDC61", "fwId": "00000000000000000000000000000000", "oemId": "FFF22CFF774A0B89F7624BFC6F50D5DE", "dev_name": "Wi-Fi Smart Plug With Energy Monitoring", "model": "HS110(US)", "deviceId": "8006ECB1D454C4428953CB2B34D9292D18A6DB0E", "alias": "Tim", "icon_hash": "", "relay_state": 1, "on_time": 886569, "active_mode": "schedule", "feature": "TIM:ENE", "updating": 0, "rssi": -75, "led_off": 0, "latitude": 40.268216, "longitude": -74.529088, "index": 18, "zone_str": "(UTC-05:00) Eastern Daylight Time (US & Canada)", "tz_str": "EST5EDT,M3.2.0,M11.1.0", "dst_offset": 60, "month10": 1.581, "month11": 30.888, "current": 0.067041, "voltage": 122.151701, "power": 1.277361, "total": 24.289, "time": "11/26/2018 21:54:22", "ledon": true, "systemtime": "11/26/2018 21:54:22"}
Resources:

Popular posts from this blog

Ingesting Drone Data From DJII Ryze Tello Drones Part 1 - Setup and Practice

Ingesting Drone Data From DJII Ryze Tello Drones Part 1 - Setup and Practice In Part 1, we will setup our drone, our communication environment, capture the data and do initial analysis. We will eventually grab live video stream for object detection, real-time flight control and real-time data ingest of photos, videos and sensor readings. We will have Apache NiFi react to live situations facing the drone and have it issue flight commands via UDP. In this initial section, we will control the drone with Python which can be triggered by NiFi. Apache NiFi will ingest log data that is stored as CSV files on a NiFi node connected to the drone's WiFi. This will eventually move to a dedicated embedded device running MiniFi. This is a small personal drone with less than 13 minutes of flight time per battery. This is not a commercial drone, but gives you an idea of the what you can do with drones. Drone Live Communications for Sensor Readings and Drone Control You must connect t

Migrating Apache Flume Flows to Apache NiFi: Kafka Source to HDFS / Kudu / File / Hive

Migrating Apache Flume Flows to Apache NiFi: Kafka Source to HDFS / Kudu / File / Hive Article 7 -  https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_9.html Article 6 -  https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_35.html Article 5 -  Article 4 -  https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_8.html Article 3 -  https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_7.html Article 2 -  https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache.html Article 1 -  https://www.datainmotion.dev/2019/08/migrating-apache-flume-flows-to-apache.html Source Code:   https://github.com/tspannhw/flume-to-nifi This is one possible simple, fast replacement for " Flafka ". Consume / Publish Kafka And Store to Files, HDFS, Hive 3.1, Kudu Consume Kafka Flow   Merge Records And Store As AVRO or ORC Consume Kafka, Upda

Advanced XML Processing with Apache NiFi 1.9.1

Advanced XML Processing with Apache NiFi 1.9.1 With the latest version of Apache NiFi, you can now directly convert XML to JSON or Apache AVRO, CSV or any other format supported by RecordWriters.   This is a great advancement.  To make it even easier, you don't even need to know the schema before hand.   There is a built-in option to Infer Schema. The results of an RSS (XML) feed converted to JSON and displayed in a slack channel. Besides just RSS feeds, we can grab regular XML data including XML data that is wrapped in a Zip file (or even in a Zipfile in an email, SFTP server or Google Docs). Get the Hourly Weather Observation for the United States Decompress That Zip  Unpack That Zip into Files One ZIP becomes many XML files of data. An example XML record from a NOAA weather station. Converted to JSON Automagically Let's Read Those Records With A Query and Convert the results to JSON Records