Streams Messaging Manager (SMM) REST API

CALL SMM REST API from nifi






--- article , demo


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/metrics/aggregated/topics/syndicate-speed-event-avro?duration=LAST_ONE_HOUR&state=all" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/metrics/brokers/1003?duration=LAST_ONE_HOUR&from=-1&to=-1" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/brokers?brokerIds=1004%2C1002%2C1003" -H "accept: application/json"


Kafka Cluster Details


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/cluster" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/brokers/1002" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/consumers/groups?state=all" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/consumers/groups/console-consumer-64800" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/consumers/clients" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/consumers/groupNames" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/consumers/clients/console-consumer-64800" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/metrics/consumers/group/kafka-streams-analytics-geo-event?duration=LAST_ONE_HOUR&from=-1&to=-1" -H "accept: application/json"




curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/metrics/producers?state=all&duration=LAST_ONE_HOUR&from=-1&to=-1" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/metrics/producers/minifi-eu-i1?duration=LAST_ONE_HOUR&from=-1&to=-1" -H "accept: application/json"


Details on one topic
curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/configs/topics/syndicate-speed-event-avro" -H "accept: application/json"


List of All Topics with details
curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/configs/topics" -H "accept: application/json"


List of All Brokers with details


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/configs/brokers" -H "accept: application/json"


Broker Details
curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/configs/brokers/1002" -H "accept: application/json"

curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/search/configs/brokers?brokerIds=1001%2C%201002%2C%201003%2C%201004" -H "accept: application/json"

curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/search/brokers?brokerIds=1001%2C%201002%2C%201003%2C%201004%2C%201005" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/search/topics?topicNames=supply-chain%2C%20gateway-east-raw-sensors" -H "accept: application/json"


Partitions
curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/search/topicPartitions?partitions=P0%2CP1%2CP2%2CP3" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/search/configs/topics?topicNames=gateway-europe-raw-sensors%2C%20syndicate-speed-event-json" -H "accept: application/json"


List Serdes / Types


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/serdes" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/topics/syndicate-speed-event-json/offsets" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/topics" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/topics/syndicate-geo-event-json" -H "accept: application/json"

curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/topics/syndicate-geo-event-json/partitions" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/metrics/topics/syndicate-geo-event-json?duration=LAST_ONE_HOUR&from=-1&to=-1" -H "accept: application/json"


curl -X GET "http://magellan-5.field.hortonworks.com:8585/api/v1/admin/metrics/topics/syndicate-geo-event-json/0?duration=LAST_THIRTY_DAYS" -H "accept: application/json"

Energy Monitoring via Apache NiFi MiNiFi 0.6.0 C++ Agent with Cloudera Edge Manager


Energy Monitoring via Apache NiFi MiNiFi 0.6.0 C++ Agent with Cloudera Edge Manager


With the advent of version 0.6.0 of the C++ Agent, we can have native fast Python Processors that can easily be added to your palate in Cloudera Edge Manager.


















Source:

https://github.com/tspannhw/minificpp-python-EnergyMonitoring


Publishing and Consuming JMS Messages from Tibco Enterprise Message Service (JMS) with Apache NiFi



TIBCO Enterprise Message Service
I tested this against the most recent release of TIBCO Enterprise Message Service and their JMS driver available via trial download. I followed the very easy install directions. I downloaded it to a Centos 7 server.
Expanded my download to TIB_ems-dev_8.4.0_linux_x86_64
Then made it executable and ran TIBCOUniversalInstaller-lnx-x86-64.bin --console.
I used all the defaults (I picked server and client) and then quickly ran the finished install server.
Running Tibco on Centos 7
  1. cd /opt/tibco/ems/8.4/bin/
  2. ./tibemsd64 -config ~/TIBCO_HOME/tibco/cfgmgmt/ems/data/tibemsd.conf
Example JMS Queue Settings
  1. URL: tcp://servername:7222
  2.  
  3. class: com.tibco.tibjms.TibjmsQueueConnectionFactory
  4.  
  5. Directory: /opt/tibco/ems/8.4/lib/
I believe it just uses these files from that directory:
  • tibjms.jar
  • jms-2.0.jar
Once I have my server and port shown, it's easy to add those settings to Apache NiFi.
The settings I need to Publish messages is below.
After you enter your username and queue, you need to create (or use) a controller service.
Then we use our settings for our server, mine are the default ones. Make sure you enter the lib directory containing your jars and that it is on the Apache NiFi server and Apache NiFi user has permissions to read them.
You can also use this same controller to Consume JMS messages from TIBCO EMS.
These are example metadata attributes that Apache NiFi provides to you on message receipt.
Example Run Log of my TIBCO EMS v8.4.0 Server running on Linux.
Example Flow
Example Data
  1. {
  2. "top1pct" : "43.1",
  3. "top5" : "n09428293 seashore, coast, seacoast, sea-coast",
  4. "top4" : "n04371774 swing",
  5. "top3" : "n02894605 breakwater, groin, groyne, mole, bulwark, seawall, jetty",
  6. "top2" : "n03933933 pier",
  7. "top1" : "n03216828 dock, dockage, docking facility",
  8. "top2pct" : "34.3",
  9. "imagefilename" : "/opt/demo/images/201817121004997.jpg",
  10. "top3pct" : "3.8",
  11. "uuid" : "mxnet_uuid_img_20180413140808",
  12. "top4pct" : "2.7",
  13. "top5pct" : "2.4",
  14. "runtime" : "1.0"
  15. }
This is example JSON data, we could use any TEXT.
References

IoT Edge Use Cases with Apache Kafka and Apache NiFi - MiniFi

Article

MiniFi Java Agent 0.5
Copy over necessary NARs from Apache NiFi 1.7 lib:
  • nifi-ssl-context-service-nar-1.7.0.nar
  • nifi-standard-services-api-nar-1.7.0.nar
  • nifi-kafka-1-0-nar-1.7.0.nar
This will support PublishKafka_1_0 and ConsumeKafka_1_0.
Then create a consume and/or publish flow. You can combine the two based on your needs. In my simple example I consume the Kafka messages in MiniFi and write to a file. I also write the metadata to a JSON file.
Consume Kafka
Publish Electric Monitoring Data To Kafka
Let's monitor the messages going through our topic, smartPlug.
Publish Messages to Kafka
Consume Any Messages From the smartPlug topic
Logs
  1. Provenance Event file containing 377 records. In the past 5 minutes, 1512 events have been written to the Provenance Repository, totaling 839.32 KB
  2.  
  3. 2018-11-26 19:42:32,473 INFO [main] o.a.n.c.s.StandardProcessScheduler Starting PutFile[id=25a86505-031a-37d9-0000-000000000000]2018-11-26 19:42:32,474 INFO [main] o.a.n.c.s.StandardProcessScheduler Starting UpdateAttribute[id=9220d40d-ee1d-3f61-0000-000000000000]2018-11-26 19:42:32,474 INFO [main] o.apache.nifi.controller.FlowController Started 0 Remote Group Ports transmitting2018-11-26 19:42:32,478 INFO [main] org.apache.nifi.minifi.MiNiFiServer Flow loaded successfully.2018-11-26 19:42:32,479 INFO [Monitor Processor Lifecycle Thread-2] o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled ConsumeKafka_1_0[id=8556f1ce-a915-3fda-0000-000000000000] to run with 1 threads2018-11-26 19:42:32,479 INFO [main] org.apache.nifi.BootstrapListener Successfully initiated communication with Bootstrap2018-11-26 19:42:32,479 INFO [Monitor Processor Lifecycle Thread-1] o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled AttributesToJSON[id=0628b4e5-10d0-3b09-0000-000000000000] to run with 1 threads2018-11-26 19:42:32,479 INFO [main] org.apache.nifi.minifi.MiNiFi Controller initialization took 2787584123 nanoseconds.2018-11-26 19:42:32,480 INFO [Monitor Processor Lifecycle Thread-1] o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled PutFile[id=25a86505-031a-37d9-0000-000000000000] to run with 1 threads2018-11-26 19:42:32,481 INFO [Monitor Processor Lifecycle Thread-2] o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled UpdateAttribute[id=9220d40d-ee1d-3f61-0000-000000000000] to run with 1 threads2018-11-26 19:42:32,585 INFO [Timer-Driven Process Thread-2] o.a.k.clients.consumer.ConsumerConfig ConsumerConfig values:auto.commit.interval.ms = 5000auto.offset.reset = latestbootstrap.servers = [princeton1.field.hortonworks.com:6667]check.crcs = trueclient.id =connections.max.idle.ms = 540000enable.auto.commit = falseexclude.internal.topics = truefetch.max.bytes = 52428800fetch.max.wait.ms = 500fetch.min.bytes = 1group.id = minificonsumer1heartbeat.interval.ms = 3000interceptor.classes = nullinternal.leave.group.on.close = trueisolation.level = read_uncommittedkey.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializermax.partition.fetch.bytes = 1048576max.poll.interval.ms = 300000max.poll.records = 10000metadata.max.age.ms = 300000metric.reporters = []metrics.num.samples = 2metrics.recording.level = INFOmetrics.sample.window.ms = 30000partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]receive.buffer.bytes = 65536reconnect.backoff.max.ms = 1000reconnect.backoff.ms = 50request.timeout.ms = 305000retry.backoff.ms = 100sasl.jaas.config = nullsasl.kerberos.kinit.cmd = /usr/bin/kinitsasl.kerberos.min.time.before.relogin = 60000sasl.kerberos.service.name = nullsasl.kerberos.ticket.renew.jitter = 0.05sasl.kerberos.ticket.renew.window.factor = 0.8sasl.mechanism = GSSAPIsecurity.protocol = PLAINTEXTsend.buffer.bytes = 131072session.timeout.ms = 10000ssl.cipher.suites = nullssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]ssl.endpoint.identification.algorithm = nullssl.key.password = nullssl.keymanager.algorithm = SunX509ssl.keystore.location = nullssl.keystore.password = nullssl.keystore.type = JKSssl.protocol = TLSssl.provider = nullssl.secure.random.implementation = nullssl.trustmanager.algorithm = PKIXssl.truststore.location = nullssl.truststore.password = nullssl.truststore.type = JKSvalue.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer2018-11-26 19:42:32,727 INFO [Timer-Driven Process Thread-2] o.a.kafka.common.utils.AppInfoParser Kafka version : 1.0.02018-11-26 19:42:32,727 INFO [Timer-Driven Process Thread-2] o.a.kafka.common.utils.AppInfoParser Kafka commitId : aaa7af6d4a11b29d2018-11-26 19:42:33,088 INFO [Timer-Driven Process Thread-2] o.a.k.c.c.internals.AbstractCoordinator [Consumer clientId=consumer-1, groupId=minificonsumer1] Discovered coordinator princeton1.field.hortonworks.com:6667 (id: 2147482646 rack: null)2018-11-26 19:42:33,090 INFO [Timer-Driven Process Thread-2] o.a.k.c.c.internals.ConsumerCoordinator [Consumer clientId=consumer-1, groupId=minificonsumer1] Revoking previously assigned partitions []2018-11-26 19:42:33,091 INFO [Timer-Driven Process Thread-2] o.a.k.c.c.internals.AbstractCoordinator [Consumer clientId=consumer-1, groupId=minificonsumer1] (Re-)joining group2018-11-26 19:42:36,391 INFO [Timer-Driven Process Thread-2] o.a.k.c.c.internals.AbstractCoordinator [Consumer clientId=consumer-1, groupId=minificonsumer1] Successfully joined group with generation 32018-11-26 19:42:36,394 INFO [Timer-Driven Process Thread-2] o.a.k.c.c.internals.ConsumerCoordinator [Consumer clientId=consumer-1, groupId=minificonsumer1] Setting newly assigned partitions [smartPlug-0]2018-11-26 19:44:32,325 INFO [pool-34-thread-1] o.a.n.c.r.WriteAheadFlowFileRepository Successfully checkpointed FlowFile Repository with 0 records in 0 milliseconds2018-11-26 19:44:40,700 INFO [Provenance Maintenance Thread-1] o.a.n.p.PersistentProvenanceRepository Created new Provenance Event Writers for events starting with ID 14372018-11-26 19:44:40,765 INFO [Provenance Repository Rollover Thread-1] o.a.n.p.lucene.SimpleIndexManager Index Writer for provenance_repository/index-1543271506000 has been returned to Index Manager and is no longer in use. Closing Index Writer2018-11-26 19:44:40,767 INFO [Provenance Repository Rollover Thread-1] o.a.n.p.PersistentProvenanceRepository Successfully merged 16 journal files (28 records) into single Provenance Log File provenance_repository/1409.prov in 62 milliseconds2018-11-26 19:44:40,768 INFO [Provenance Repository Rollover Thread-1] o.a.n.p.PersistentProvenanceRepository Successfully Rolled over Provenance Event file containing 151 records. In the past 5 minutes, 28 events have been written to the Provenance Repository, totaling 15.43 KB
JSON Kafka Message and JSON Kafka Metadata Stored As Files
monitor/1448678223641638.attr.json
  1. {"path":"./","filename":"1448678223641638","kafka.partition":"0","kafka.offset":"5543","kafka.topic":"smartPlug","kafka.key":"cb90ad21-b311-494c-96cc-06dd2e8747df","uuid":"041459fc-c63e-4056-ab50-1c375cd7d49f"}
monitor/1448678223641638
  1. {"day30": 0.431, "day31": 1.15, "sw_ver": "1.2.5 Build 171206 Rel.085954", "hw_ver": "1.0", "mac": "50:C7:BF:B1:95:D5", "type": "IOT.SMARTPLUGSWITCH", "hwId": "60FF6B258734EA6880E186F8C96DDC61", "fwId": "00000000000000000000000000000000", "oemId": "FFF22CFF774A0B89F7624BFC6F50D5DE", "dev_name": "Wi-Fi Smart Plug With Energy Monitoring", "model": "HS110(US)", "deviceId": "8006ECB1D454C4428953CB2B34D9292D18A6DB0E", "alias": "Tim", "icon_hash": "", "relay_state": 1, "on_time": 886569, "active_mode": "schedule", "feature": "TIM:ENE", "updating": 0, "rssi": -75, "led_off": 0, "latitude": 40.268216, "longitude": -74.529088, "index": 18, "zone_str": "(UTC-05:00) Eastern Daylight Time (US & Canada)", "tz_str": "EST5EDT,M3.2.0,M11.1.0", "dst_offset": 60, "month10": 1.581, "month11": 30.888, "current": 0.067041, "voltage": 122.151701, "power": 1.277361, "total": 24.289, "time": "11/26/2018 21:54:22", "ledon": true, "systemtime": "11/26/2018 21:54:22"}
Resources: