---
Real-Time Slack Bots Powered By Generative AI and Data Flows
Utilizing WatsonX.AI LLM Foundation Models with Cloudera DataFlow via Apache NiFi sending, receiving and processing Slack messages.
Real-time Integration of WatsonX.AI Foundation Models with NiFi
Hi, I am Timothy Spann, Principal Developer Advocate for Streaming at Cloudera.
In this article I will show you how to use Cloudera DataFlow powered by Apache NiFi to interact with IBM WatsonX.AI foundation large language models in real-time. We can work with any of the foundation models such as Google FLAN T5 XXL or IBM Granite models.
I'll show you how easy it is to build a real-time data pipeline feeding your like Slack and mobile applications questions directly to secure WatsonX.AI models running in IBM Cloud. We will handle all the security, management, lineage and governance with Cloudera Data Flow. As part of decision making we can choose different WatsonX.AI models on the fly based on what type of prompt it is. For example, if we want to continue a sentence versus answering a question I can pick different models. For questions answering Google FLAN T5 XXL works well. If I want to continue sentences I would use one of the IBM Granite models.
You will notice how amazingly fast the WatsonX.AI models return the results we need. I do some quick enrichment and transformation and then send them out their way to Cloudera Apache Kafka to be used for continuous analytics and distribution to many other applications, systems, platforms and downstream consumers. We will also output our answers to the original requester which could be someone in a Slack channel or someone in an application. All of this happens real-time, with no code, full governance, lineage, data management and security at any scale and on any platform.
The power of IBM and Cloudera together in private, public and hybrid cloud environments for real-time data and AI is just getting started. Try it today.
2023Step by Step Real-Time Flow
First, in Slack I type a question,
"Q: What is a good way to integrate Generative AI and Apache NiFi?"
NiFi Flow TopOnce that question is typed, the Slack server sends these events to our registered service. This can be hosted anywhere publicly facing.
https://api.slack.com/apps/<YOURAPP>/event-subscriptions
Slack APIOnce enabled your server will start received JSON events for each Slack post. This is easy to receive and parse in NiFi. Cloudera DataFlow enables receiving secure HTTPS REST calls in the public cloud hosted edition with ease, even in Designer mode.
NiFi Top Flow 2In the first part of the flow we received the REST JSON Post, which is as follows.
Slackbot 1.0 (+https://api.slack.com/robots)
application/json
POST
HTTP/1.1
{
"token" : "qHvJe59yetAp1bao6wmQzH0C",
"team_id" : "T1SD6MZMF",
"context_team_id" : "T1SD6MZMF",
"context_enterprise_id" : null,
"api_app_id" : "A04U64MN9HS",
"event" : {
"type" : "message",
"subtype" : "bot_message",
"text" : "==== NiFi to IBM <http://WatsonX.AI|WatsonX.AI> LLM Answers\n\nOn Date: Wed, 15 Nov 2023 15:43:29 GMT Created: 2023-11-15T15:43:29.248Z\n\n Prompt: Q: What is a good way to integrate Generative AI and Apache NiFi?\n\nResponse: )\n\nYou can use Apache NiFi to integrate Generative AI models in several ways:\n\n1. Data Preprocessing: Use NiFi to preprocess data before feeding it into your Generative AI model. This can include data cleaning, transformation, and feature engineering.\n2. Model Training: Use NiFi to automate the training process of your Generative AI model. You can use NiFi's PutFile and PutFile_SFTP processors to write the training data to a file, and then use a processor like ExecuteScript to run the training script.\n3. Model Deployment: Once your Generative AI model is trained, you can use NiFi to deploy it. You can create a NiFi flow that takes in input data, runs it through the Generative AI model, and then outputs the generated data.\n4. Real-time Inference: You can use NiFi's StreamingJobs\n\nToken: 200\nReq Duration: 8153\nHTTP TX ID: 89d71099-da23-4e7e-89f9-4e8f5620c0fb\nIBM Msg: This model is a Non-IBM Product governed by a third-party license that may impose use restrictions and other obligations. By using this model you agree to its terms as identified in the following URL. URL: <https://dataplatform.cloud.ibm.com/docs/content/wsj/analyze-data/fm-models.html?context=wx>\nIBM Msg ID: disclaimer_warning\nModel ID: meta-llama/llama-2-70b-chat\nStop Reason: max_tokens\nToken Count: 38\nTX ID: NGp0djg-c05f740f84f84b7c80f93f9da05aa756\nUUID: da0806cb-6133-4bf4-808e-1fbf419c09e3\nCorr ID: NGp0djg-c05f740f84f84b7c80f93f9da05aa756\nGlobal TX ID: 20c3a9cf276c38bcdaf26e3c27d0479b\nService Time: 478\nRequest ID: 03c2726a-dcb6-407f-96f1-f83f20fe9c9c\nFile Name: 1a3c4386-86d2-4969-805b-37649c16addb\nRequest Duration: 8153\nRequest URL: <https://us-south.ml.cloud.ibm.com/ml/v1-beta/generation/text?version=2023-05-29>\ncf-ray: 82689bfd28e48ce2-EWR\n\n=====",
"ts" : "1700063009.661159",
"bot_id" : "B062HDEJCBH",
"blocks" : [ {
"type" : "rich_text",
"block_id" : "sAFS",
"elements" : [ {
"type" : "rich_text_section",
"elements" : [ {
"type" : "text",
"text" : "==== NiFi to IBM "
}, {
"type" : "link",
"url" : "http://WatsonX.AI",
"text" : "WatsonX.AI"
}, {
"type" : "text",
"text" : " LLM Answers\n\nOn Date: Wed, 15 Nov 2023 15:43:29 GMT Created: 2023-11-15T15:43:29.248Z\n\n Prompt: Q: What is a good way to integrate Generative AI and Apache NiFi?\n\nResponse: )\n\nYou can use Apache NiFi to integrate Generative AI models in several ways:\n\n1. Data Preprocessing: Use NiFi to preprocess data before feeding it into your Generative AI model. This can include data cleaning, transformation, and feature engineering.\n2. Model Training: Use NiFi to automate the training process of your Generative AI model. You can use NiFi's PutFile and PutFile_SFTP processors to write the training data to a file, and then use a processor like ExecuteScript to run the training script.\n3. Model Deployment: Once your Generative AI model is trained, you can use NiFi to deploy it. You can create a NiFi flow that takes in input data, runs it through the Generative AI model, and then outputs the generated data.\n4. Real-time Inference: You can use NiFi's StreamingJobs\n\nToken: 200\nReq Duration: 8153\nHTTP TX ID: 89d71099-da23-4e7e-89f9-4e8f5620c0fb\nIBM Msg: This model is a Non-IBM Product governed by a third-party license that may impose use restrictions and other obligations. By using this model you agree to its terms as identified in the following URL. URL: "
}, {
"type" : "link",
"url" : "https://dataplatform.cloud.ibm.com/docs/content/wsj/analyze-data/fm-models.html?context=wx"
}, {
"type" : "text",
"text" : "\nIBM Msg ID: disclaimer_warning\nModel ID: meta-llama/llama-2-70b-chat\nStop Reason: max_tokens\nToken Count: 38\nTX ID: NGp0djg-c05f740f84f84b7c80f93f9da05aa756\nUUID: da0806cb-6133-4bf4-808e-1fbf419c09e3\nCorr ID: NGp0djg-c05f740f84f84b7c80f93f9da05aa756\nGlobal TX ID: 20c3a9cf276c38bcdaf26e3c27d0479b\nService Time: 478\nRequest ID: 03c2726a-dcb6-407f-96f1-f83f20fe9c9c\nFile Name: 1a3c4386-86d2-4969-805b-37649c16addb\nRequest Duration: 8153\nRequest URL: "
}, {
"type" : "link",
"url" : "https://us-south.ml.cloud.ibm.com/ml/v1-beta/generation/text?version=2023-05-29"
}, {
"type" : "text",
"text" : "\ncf-ray: 82689bfd28e48ce2-EWR\n\n====="
} ]
} ]
} ],
"channel" : "C05QAAVEC0H",
"event_ts" : "1700063009.661159",
"channel_type" : "channel"
},
"type" : "event_callback",
"event_id" : "Ev065VTPRMV0",
"event_time" : 1700063009,
"authorizations" : [ {
"enterprise_id" : null,
"team_id" : "T1SD6MZMF",
"user_id" : "ULMRENSE4",
"is_bot" : false,
"is_enterprise_install" : false
} ],
"is_ext_shared_channel" : false,
"event_context" : "4-eyJldCI6Im1lc3NhZ2UiLCJ0aWQiOiJUMVNENk1aTUYiLCJhaWQiOiJBMDRVNjRNTjlIUyIsImNpZCI6IkMwNVFBQVZFQzBIIn0"
}
This is a very rich detailed JSON file that we could push immediately raw to an Apache Iceberg Open Cloud Lakehouse, a Kafka topic or an object store as a JSON document. (Enhancement Option) I am just going to parse what I need.
EvaluateJSONPathWe parse out the channel ID and plain text of the post. I only want messages from general ("C1SD6N197"). Then I copy the texts to an inputs field as is required for hugging face.
We check our input if it's stocks or weather (more to come) we avoid calling the LLM.
SELECT * FROM FLOWFILE
WHERE upper(inputs) like '%WEATHER%'
AND not upper(inputs) like '%LLM SKIPPED%'
SELECT * FROM FLOWFILE
WHERE upper(inputs) like '%STOCK%'
AND not upper(inputs) like '%LLM SKIPPED%'
SELECT * FROM FLOWFILE
WHERE (upper(inputs) like 'QUESTION:%'
OR upper(inputs) like 'Q:%') and not upper(inputs) like '%WEATHER%'
and not upper(inputs) like '%STOCK%'
For Stocks processing:
To parse what stock we need I am using my Open NLP processor to get it.
So you will need to download the processor and the Entity extraction models.
GitHub - tspannhw/nifi-nlp-processor: Apache NiFi NLP Processor
Apache NiFi NLP Processor. Contribute to tspannhw/nifi-nlp-processor development by creating an account on GitHub.github.com
Open NLP Example Apache NiFi Processor
Open NLP Example Apache NiFi Processor I wanted to be able to add NLP processing to my dataflow without calling to…community.cloudera.com
Then we pass that company name to an HTTP REST Endpoint from AlphaVantage that converts Company Name's to Stock symbols. In free accounts you only get a few calls a day, so if we fail we then bypass this step and try to just use whatever you passed in.
https://www.alphavantage.co/query?function=SYMBOL_SEARCH&keywords=${nlp_org_1:trim()}&apikey=GetYourselfAKey&datatype=csv
Using RouteOnContent we filter an Error messages out.
Then we use a QueryRecord processor to convert from CSV to JSON and filter.
SELECT name as companyName, symbol FROM FLOWFILE
ORDER BY matchScore DESC
LIMIT 1
We do a SplitRecord to ensure we are only one record. We then run EvaluateJsonPath to get our fields as attributes.
In an UpdateAttribute we trim the symbol just in case.
${stockSymbol:trim()}
We then pass that stock symbol to Twelve Data via InvokeHTTP to get our stock data.
https://api.twelvedata.com/time_series?apikey=GetAnAPIKey&interval=1min&symbol=${stockSymbol}&format=JSON
We then get a lot of stock data back.
{
"meta" : {
"symbol" : "IBM",
"interval" : "1min",
"currency" : "USD",
"exchange_timezone" : "America/New_York",
"exchange" : "NYSE",
"mic_code" : "XNYS",
"type" : "Common Stock"
},
"values" : [ {
"datetime" : "2023-11-15 10:37:00",
"open" : "152.07001",
"high" : "152.08000",
"low" : "151.99500",
"close" : "152.00999",
"volume" : "8525"
}, {
"datetime" : "2023-11-15 10:36:00",
"open" : "152.08501",
"high" : "152.12250",
"low" : "152.08000",
"close" : "152.08501",
"volume" : "15204"
} ...
We then run EvaluateJSONPath to grab the exchange information.
We fork the record to just get one record as this is just to return to Slack. We use UpdateRecord calls to enrich the stock data with other values. We then run a QueryRecord to limit us to 1 record to send to Slack.
SELECT * FROM FLOWFILE
ORDER BY 'datetime' DESC
LIMIT 1
We run an EvaluateJsonPath to get the most value fields to display.
We then run a PutSlack with our message.
LLM Skipped. Stock Value for ${companyName} [${nlp_org_1}/${stockSymbol}] on ${date} is ${closeStockValue}. stock date ${stockdateTime}. stock exchange ${exchange}
We also have a separate flow that split from Company Name.
In the first step we call Yahoo Finance to get RSS headlines for that stock.
https://feeds.finance.yahoo.com/rss/2.0/headline?s=${stockSymbol:trim()}®ion=US&lang=en-US
We use QueryRecord to convert RSS/XML Records to JSON.
We then run a SplitJSON to break out the news items.
We run a SplitRecord to limit to 1 record. We use EvaluateJSONPath to get the fields we need for our Slack message.
We then run UpdateRecord to finalize our JSON.
We then send this message to Slack.
LLM Skipped. Stock News Information for ${companyName} [${nlp_org_1}/${stockSymbol}] on ${date}
${title} : ${description}.
${guid} article date ${pubdate}
For those who selected weather, we follow a similiar route (we should add caching with Redis @ Aiven), to stocks. We use my OpenNLP processor to extract locations you might want to have weather on.
The next step is taking the output of the processor and building a value to send to our Geoencoder.
weatherlocation = ${nlp_location_1:notNull():ifElse(${nlp_location_1}, "New York City")}
If we couldn't find a valid location, I am going to say "New York City". We could use some other lookup. I am doing some work on loading all location and could do some advanced PostgreSQL searches on that. Or perhaps OpenSearch or a vectorized datastore.
I pass that location to Open Meteo to find the geo via InvokeHTTP.
https://geocoding-api.open-meteo.com/v1/search?name=${weatherlocation:trim():urlEncode()}&count=1&language=en&format=json
We then parse the values we need from the results.
{
"results" : [ {
"id" : 5128581,
"name" : "New York",
"latitude" : 40.71427,
"longitude" : -74.00597,
"elevation" : 10.0,
"feature_code" : "PPL",
"country_code" : "US",
"admin1_id" : 5128638,
"timezone" : "America/New_York",
"population" : 8175133,
"postcodes" : [ "10001", "10002", "10003", "10004", "10005", "10006", "10007", "10008", "10009", "10010", "10011", "10012", "10013", "10014", "10016", "10017", "10018", "10019", "10020", "10021", "10022", "10023", "10024", "10025", "10026", "10027", "10028", "10029", "10030", "10031", "10032", "10033", "10034", "10035", "10036", "10037", "10038", "10039", "10040", "10041", "10043", "10044", "10045", "10055", "10060", "10065", "10069", "10080", "10081", "10087", "10090", "10101", "10102", "10103", "10104", "10105", "10106", "10107", "10108", "10109", "10110", "10111", "10112", "10113", "10114", "10115", "10116", "10117", "10118", "10119", "10120", "10121", "10122", "10123", "10124", "10125", "10126", "10128", "10129", "10130", "10131", "10132", "10133", "10138", "10150", "10151", "10152", "10153", "10154", "10155", "10156", "10157", "10158", "10159", "10160", "10161", "10162", "10163", "10164", "10165", "10166", "10167", "10168", "10169", "10170", "10171", "10172", "10173", "10174", "10175", "10176", "10177", "10178", "10179", "10185", "10199", "10203", "10211", "10212", "10213", "10242", "10249", "10256", "10258", "10259", "10260", "10261", "10265", "10268", "10269", "10270", "10271", "10272", "10273", "10274", "10275", "10276", "10277", "10278", "10279", "10280", "10281", "10282", "10285", "10286" ],
"country_id" : 6252001,
"country" : "United States",
"admin1" : "New York"
} ],
"generationtime_ms" : 0.92196465
}
We then parse the results so we can call another API to get the current weather for that latitude and longitude via InvokeHTTP.
https://api.weather.gov/points/${latitude:trim()},${longitude:trim()}
The results are geo-json.
{
"@context": [
"https://geojson.org/geojson-ld/geojson-context.jsonld",
{
"@version": "1.1",
"wx": "https://api.weather.gov/ontology#",
"s": "https://schema.org/",
"geo": "http://www.opengis.net/ont/geosparql#",
"unit": "http://codes.wmo.int/common/unit/",
"@vocab": "https://api.weather.gov/ontology#",
"geometry": {
"@id": "s:GeoCoordinates",
"@type": "geo:wktLiteral"
},
"city": "s:addressLocality",
"state": "s:addressRegion",
"distance": {
"@id": "s:Distance",
"@type": "s:QuantitativeValue"
},
"bearing": {
"@type": "s:QuantitativeValue"
},
"value": {
"@id": "s:value"
},
"unitCode": {
"@id": "s:unitCode",
"@type": "@id"
},
"forecastOffice": {
"@type": "@id"
},
"forecastGridData": {
"@type": "@id"
},
"publicZone": {
"@type": "@id"
},
"county": {
"@type": "@id"
}
}
],
"id": "https://api.weather.gov/points/40.7143,-74.006",
"type": "Feature",
"geometry": {
"type": "Point",
"coordinates": [
-74.006,
40.714300000000001
]
},
"properties": {
"@id": "https://api.weather.gov/points/40.7143,-74.006",
"@type": "wx:Point",
"cwa": "OKX",
"forecastOffice": "https://api.weather.gov/offices/OKX",
"gridId": "OKX",
"gridX": 33,
"gridY": 35,
"forecast": "https://api.weather.gov/gridpoints/OKX/33,35/forecast",
"forecastHourly": "https://api.weather.gov/gridpoints/OKX/33,35/forecast/hourly",
"forecastGridData": "https://api.weather.gov/gridpoints/OKX/33,35",
"observationStations": "https://api.weather.gov/gridpoints/OKX/33,35/stations",
"relativeLocation": {
"type": "Feature",
"geometry": {
"type": "Point",
"coordinates": [
-74.0279259,
40.745251000000003
]
},
"properties": {
"city": "Hoboken",
"state": "NJ",
"distance": {
"unitCode": "wmoUnit:m",
"value": 3906.1522008034999
},
"bearing": {
"unitCode": "wmoUnit:degree_(angle)",
"value": 151
}
}
},
"forecastZone": "https://api.weather.gov/zones/forecast/NYZ072",
"county": "https://api.weather.gov/zones/county/NYC061",
"fireWeatherZone": "https://api.weather.gov/zones/fire/NYZ212",
"timeZone": "America/New_York",
"radarStation": "KDIX"
}
}
We use EvaluateJSONPath to grab a forecast URL.
Then we call that forecast URL via invokeHTTP.
That produces a larger JSON output that we will parse for the results we want to return to Slack.
{
"@context": [
"https://geojson.org/geojson-ld/geojson-context.jsonld",
{
"@version": "1.1",
"wx": "https://api.weather.gov/ontology#",
"geo": "http://www.opengis.net/ont/geosparql#",
"unit": "http://codes.wmo.int/common/unit/",
"@vocab": "https://api.weather.gov/ontology#"
}
],
"type": "Feature",
"geometry": {
"type": "Polygon",
"coordinates": [
[
[
-74.025095199999996,
40.727052399999998
],
[
-74.0295579,
40.705361699999997
],
[
-74.000948300000005,
40.701977499999998
],
[
-73.996479800000003,
40.723667899999995
],
[
-74.025095199999996,
40.727052399999998
]
]
]
},
"properties": {
"updated": "2023-11-15T14:34:46+00:00",
"units": "us",
"forecastGenerator": "BaselineForecastGenerator",
"generatedAt": "2023-11-15T15:11:39+00:00",
"updateTime": "2023-11-15T14:34:46+00:00",
"validTimes": "2023-11-15T08:00:00+00:00/P7DT17H",
"elevation": {
"unitCode": "wmoUnit:m",
"value": 2.1335999999999999
},
"periods": [
{
"number": 1,
"name": "Today",
"startTime": "2023-11-15T10:00:00-05:00",
"endTime": "2023-11-15T18:00:00-05:00",
"isDaytime": true,
"temperature": 51,
"temperatureUnit": "F",
"temperatureTrend": null,
"probabilityOfPrecipitation": {
"unitCode": "wmoUnit:percent",
"value": null
},
"dewpoint": {
"unitCode": "wmoUnit:degC",
"value": 2.2222222222222223
},
"relativeHumidity": {
"unitCode": "wmoUnit:percent",
"value": 68
},
"windSpeed": "1 to 7 mph",
"windDirection": "SW",
"icon": "https://api.weather.gov/icons/land/day/bkn?size=medium",
"shortForecast": "Partly Sunny",
"detailedForecast": "Partly sunny, with a high near 51. Southwest wind 1 to 7 mph."
},
{
"number": 2,
"name": "Tonight",
"startTime": "2023-11-15T18:00:00-05:00",
"endTime": "2023-11-16T06:00:00-05:00",
"isDaytime": false,
"temperature": 44,
"temperatureUnit": "F",
"temperatureTrend": null,
"probabilityOfPrecipitation": {
"unitCode": "wmoUnit:percent",
"value": null
},
"dewpoint": {
"unitCode": "wmoUnit:degC",
"value": 3.8888888888888888
},
"relativeHumidity": {
"unitCode": "wmoUnit:percent",
"value": 82
},
"windSpeed": "8 mph",
"windDirection": "SW",
"icon": "https://api.weather.gov/icons/land/night/sct?size=medium",
"shortForecast": "Partly Cloudy",
"detailedForecast": "Partly cloudy, with a low around 44. Southwest wind around 8 mph."
},
{
"number": 3,
"name": "Thursday",
"startTime": "2023-11-16T06:00:00-05:00",
"endTime": "2023-11-16T18:00:00-05:00",
"isDaytime": true,
"temperature": 60,
"temperatureUnit": "F",
"temperatureTrend": "falling",
"probabilityOfPrecipitation": {
"unitCode": "wmoUnit:percent",
"value": null
},
"dewpoint": {
"unitCode": "wmoUnit:degC",
"value": 5.5555555555555554
},
"relativeHumidity": {
"unitCode": "wmoUnit:percent",
"value": 82
},
"windSpeed": "6 mph",
"windDirection": "SW",
"icon": "https://api.weather.gov/icons/land/day/few?size=medium",
"shortForecast": "Sunny",
"detailedForecast": "Sunny. High near 60, with temperatures falling to around 58 in the afternoon. Southwest wind around 6 mph."
},
{
"number": 4,
"name": "Thursday Night",
"startTime": "2023-11-16T18:00:00-05:00",
"endTime": "2023-11-17T06:00:00-05:00",
"isDaytime": false,
"temperature": 47,
"temperatureUnit": "F",
"temperatureTrend": null,
"probabilityOfPrecipitation": {
"unitCode": "wmoUnit:percent",
"value": null
},
"dewpoint": {
"unitCode": "wmoUnit:degC",
"value": 6.1111111111111107
},
"relativeHumidity": {
"unitCode": "wmoUnit:percent",
"value": 80
},
"windSpeed": "3 mph",
"windDirection": "SW",
"icon": "https://api.weather.gov/icons/land/night/few?size=medium",
"shortForecast": "Mostly Clear",
"detailedForecast": "Mostly clear, with a low around 47. Southwest wind around 3 mph."
},
{
"number": 5,
"name": "Friday",
"startTime": "2023-11-17T06:00:00-05:00",
"endTime": "2023-11-17T18:00:00-05:00",
"isDaytime": true,
"temperature": 63,
"temperatureUnit": "F",
"temperatureTrend": "falling",
"probabilityOfPrecipitation": {
"unitCode": "wmoUnit:percent",
"value": 20
},
"dewpoint": {
"unitCode": "wmoUnit:degC",
"value": 12.222222222222221
},
"relativeHumidity": {
"unitCode": "wmoUnit:percent",
"value": 86
},
"windSpeed": "2 to 10 mph",
"windDirection": "S",
"icon": "https://api.weather.gov/icons/land/day/bkn/rain,20?size=medium",
"shortForecast": "Partly Sunny then Slight Chance Light Rain",
"detailedForecast": "A slight chance of rain after 1pm. Partly sunny. High near 63, with temperatures falling to around 61 in the afternoon. South wind 2 to 10 mph. Chance of precipitation is 20%."
},
{
"number": 6,
"name": "Friday Night",
"startTime": "2023-11-17T18:00:00-05:00",
"endTime": "2023-11-18T06:00:00-05:00",
"isDaytime": false,
"temperature": 51,
"temperatureUnit": "F",
"temperatureTrend": null,
"probabilityOfPrecipitation": {
"unitCode": "wmoUnit:percent",
"value": 70
},
"dewpoint": {
"unitCode": "wmoUnit:degC",
"value": 12.777777777777779
},
"relativeHumidity": {
"unitCode": "wmoUnit:percent",
"value": 100
},
"windSpeed": "6 to 10 mph",
"windDirection": "SW",
"icon": "https://api.weather.gov/icons/land/night/rain,60/rain,70?size=medium",
"shortForecast": "Light Rain Likely",
"detailedForecast": "Rain likely. Cloudy, with a low around 51. Chance of precipitation is 70%. New rainfall amounts between a quarter and half of an inch possible."
},
{
"number": 7,
"name": "Saturday",
"startTime": "2023-11-18T06:00:00-05:00",
"endTime": "2023-11-18T18:00:00-05:00",
"isDaytime": true,
"temperature": 55,
"temperatureUnit": "F",
"temperatureTrend": null,
"probabilityOfPrecipitation": {
"unitCode": "wmoUnit:percent",
"value": 70
},
"dewpoint": {
"unitCode": "wmoUnit:degC",
"value": 11.111111111111111
},
"relativeHumidity": {
"unitCode": "wmoUnit:percent",
"value": 100
},
"windSpeed": "8 to 18 mph",
"windDirection": "NW",
"icon": "https://api.weather.gov/icons/land/day/rain,70/rain,30?size=medium",
"shortForecast": "Light Rain Likely",
"detailedForecast": "Rain likely before 1pm. Partly sunny, with a high near 55. Chance of precipitation is 70%."
},
{
"number": 8,
"name": "Saturday Night",
"startTime": "2023-11-18T18:00:00-05:00",
"endTime": "2023-11-19T06:00:00-05:00",
"isDaytime": false,
"temperature": 40,
"temperatureUnit": "F",
"temperatureTrend": null,
"probabilityOfPrecipitation": {
"unitCode": "wmoUnit:percent",
"value": null
},
"dewpoint": {
"unitCode": "wmoUnit:degC",
"value": 1.1111111111111112
},
"relativeHumidity": {
"unitCode": "wmoUnit:percent",
"value": 65
},
"windSpeed": "12 to 17 mph",
"windDirection": "NW",
"icon": "https://api.weather.gov/icons/land/night/few?size=medium",
"shortForecast": "Mostly Clear",
"detailedForecast": "Mostly clear, with a low around 40."
},
{
"number": 9,
"name": "Sunday",
"startTime": "2023-11-19T06:00:00-05:00",
"endTime": "2023-11-19T18:00:00-05:00",
"isDaytime": true,
"temperature": 50,
"temperatureUnit": "F",
"temperatureTrend": null,
"probabilityOfPrecipitation": {
"unitCode": "wmoUnit:percent",
"value": null
},
"dewpoint": {
"unitCode": "wmoUnit:degC",
"value": -0.55555555555555558
},
"relativeHumidity": {
"unitCode": "wmoUnit:percent",
"value": 65
},
"windSpeed": "10 to 14 mph",
"windDirection": "W",
"icon": "https://api.weather.gov/icons/land/day/few?size=medium",
"shortForecast": "Sunny",
"detailedForecast": "Sunny, with a high near 50."
},
{
"number": 10,
"name": "Sunday Night",
"startTime": "2023-11-19T18:00:00-05:00",
"endTime": "2023-11-20T06:00:00-05:00",
"isDaytime": false,
"temperature": 38,
"temperatureUnit": "F",
"temperatureTrend": null,
"probabilityOfPrecipitation": {
"unitCode": "wmoUnit:percent",
"value": null
},
"dewpoint": {
"unitCode": "wmoUnit:degC",
"value": -0.55555555555555558
},
"relativeHumidity": {
"unitCode": "wmoUnit:percent",
"value": 67
},
"windSpeed": "13 mph",
"windDirection": "NW",
"icon": "https://api.weather.gov/icons/land/night/few?size=medium",
"shortForecast": "Mostly Clear",
"detailedForecast": "Mostly clear, with a low around 38."
},
{
"number": 11,
"name": "Monday",
"startTime": "2023-11-20T06:00:00-05:00",
"endTime": "2023-11-20T18:00:00-05:00",
"isDaytime": true,
"temperature": 46,
"temperatureUnit": "F",
"temperatureTrend": null,
"probabilityOfPrecipitation": {
"unitCode": "wmoUnit:percent",
"value": null
},
"dewpoint": {
"unitCode": "wmoUnit:degC",
"value": -1.6666666666666667
},
"relativeHumidity": {
"unitCode": "wmoUnit:percent",
"value": 70
},
"windSpeed": "13 mph",
"windDirection": "NW",
"icon": "https://api.weather.gov/icons/land/day/sct?size=medium",
"shortForecast": "Mostly Sunny",
"detailedForecast": "Mostly sunny, with a high near 46."
},
{
"number": 12,
"name": "Monday Night",
"startTime": "2023-11-20T18:00:00-05:00",
"endTime": "2023-11-21T06:00:00-05:00",
"isDaytime": false,
"temperature": 38,
"temperatureUnit": "F",
"temperatureTrend": null,
"probabilityOfPrecipitation": {
"unitCode": "wmoUnit:percent",
"value": null
},
"dewpoint": {
"unitCode": "wmoUnit:degC",
"value": -1.1111111111111112
},
"relativeHumidity": {
"unitCode": "wmoUnit:percent",
"value": 70
},
"windSpeed": "10 mph",
"windDirection": "N",
"icon": "https://api.weather.gov/icons/land/night/sct?size=medium",
"shortForecast": "Partly Cloudy",
"detailedForecast": "Partly cloudy, with a low around 38."
},
{
"number": 13,
"name": "Tuesday",
"startTime": "2023-11-21T06:00:00-05:00",
"endTime": "2023-11-21T18:00:00-05:00",
"isDaytime": true,
"temperature": 49,
"temperatureUnit": "F",
"temperatureTrend": null,
"probabilityOfPrecipitation": {
"unitCode": "wmoUnit:percent",
"value": 30
},
"dewpoint": {
"unitCode": "wmoUnit:degC",
"value": 2.7777777777777777
},
"relativeHumidity": {
"unitCode": "wmoUnit:percent",
"value": 73
},
"windSpeed": "9 to 13 mph",
"windDirection": "E",
"icon": "https://api.weather.gov/icons/land/day/bkn/rain,30?size=medium",
"shortForecast": "Partly Sunny then Chance Light Rain",
"detailedForecast": "A chance of rain after 1pm. Partly sunny, with a high near 49. Chance of precipitation is 30%."
},
{
"number": 14,
"name": "Tuesday Night",
"startTime": "2023-11-21T18:00:00-05:00",
"endTime": "2023-11-22T06:00:00-05:00",
"isDaytime": false,
"temperature": 46,
"temperatureUnit": "F",
"temperatureTrend": null,
"probabilityOfPrecipitation": {
"unitCode": "wmoUnit:percent",
"value": 50
},
"dewpoint": {
"unitCode": "wmoUnit:degC",
"value": 7.7777777777777777
},
"relativeHumidity": {
"unitCode": "wmoUnit:percent",
"value": 86
},
"windSpeed": "13 to 18 mph",
"windDirection": "S",
"icon": "https://api.weather.gov/icons/land/night/rain,50?size=medium",
"shortForecast": "Chance Light Rain",
"detailedForecast": "A chance of rain. Mostly cloudy, with a low around 46. Chance of precipitation is 50%."
}
]
}
}
We parse the data with EvaluateJSONPath to get primary fields for weather.
We then format those fields to PutSlack.
LLM Skipped. Read forecast on ${date} for ${weatherlocation} @ ${latitude},${longitude}
Used ${forecasturl} ${icon} Temp: ${temperature} ${temperatureunit} - ${temperaturetrend}
There is a wind ${winddirection} at ${windspeed}. ${detailedforecast}
Slack Output
---
If we do have an LLM question, let's make sure it's just one record.
We use a few different models that are available at IBM WatsonX.AI on IBM Cloud to quickly be access by our REST prompts.
I tested and built the prompts initially at IBM's Prompt Lab and then copied the initial curl statement from there.
IBM watsonx.ai
https://www.ibm.com/docs/en/watsonx-as-a-service?topic=models-
ibm/mpt-7b-instruct2
meta-llama/llama-2–70b-chat
ibm/granite-13b-chat-v1
We have to send our unique secure key to IBM and they will give us a token to use in our next call.
We parse out the question and then send to the WatsonX via REST API.
We build a prompt to send to IBM as follows.
{
"model_id": "meta-llama/llama-2-70b-chat",
"input": "${inputs:urlEncode()}",
"parameters": {
"decoding_method": "greedy",
"max_new_tokens": 200,
"min_new_tokens": 50,
"stop_sequences": [],
"repetition_penalty": 1
},
"project_id": "0ead8ec4-d137-4f9c-8956-50b0da4a7068" }
We parse the generated text which is our Generative AI results plus some helpful metadata on timings.
The result posted to Slack is:
"You can use Apache NiFi to integrate Generative AI models in several ways:1. Data Preprocessing: Use NiFi to preprocess data before feeding it into your Generative AI model. This can include data cleaning, transformation, and feature engineering.
2. Model Training: Use NiFi to automate the training process of your Generative AI model. You can use NiFi's PutFile and PutFile_SFTP processors to write the training data to a file, and then use a processor like ExecuteScript to run the training script.
3. Model Deployment: Once your Generative AI model is trained, you can use NiFi to deploy it. You can create a NiFi flow that takes in input data, runs it through the Generative AI model, and then outputs the generated data.
4. Real-time Inference: You can use NiFi's StreamingJobs"
After the slack bot posted the results, it posted metrics and debugging information to the chat channel.
All of the metadata is posted to another slack channel for administrator monitoring.
==== NiFi to IBM WatsonX.AI LLM Answers
On Date: Wed, 15 Nov 2023 15:43:29 GMT Created: 2023-11-15T15:43:29.248Z
Prompt: Q: What is a good way to integrate Generative AI and Apache NiFi?
Response: )
You can use Apache NiFi to integrate Generative AI models in several ways:
1. Data Preprocessing: Use NiFi to preprocess data before feeding it into your Generative AI model. This can include data cleaning, transformation, and feature engineering.
2. Model Training: Use NiFi to automate the training process of your Generative AI model. You can use NiFi's PutFile and PutFile_SFTP processors to write the training data to a file, and then use a processor like ExecuteScript to run the training script.
3. Model Deployment: Once your Generative AI model is trained, you can use NiFi to deploy it. You can create a NiFi flow that takes in input data, runs it through the Generative AI model, and then outputs the generated data.
4. Real-time Inference: You can use NiFi's StreamingJobs
Token: 200
Req Duration: 8153
HTTP TX ID: 89d71099-da23-4e7e-89f9-4e8f5620c0fb
IBM Msg: This model is a Non-IBM Product governed by a third-party license that may impose use restrictions and other obligations. By using this model you agree to its terms as identified in the following URL. URL: https://dataplatform.cloud.ibm.com/docs/content/wsj/analyze-data/fm-models.html?context=wx
IBM Msg ID: disclaimer_warning
Model ID: meta-llama/llama-2-70b-chat
Stop Reason: max_tokens
Token Count: 38
TX ID: NGp0djg-c05f740f84f84b7c80f93f9da05aa756
UUID: da0806cb-6133-4bf4-808e-1fbf419c09e3
Corr ID: NGp0djg-c05f740f84f84b7c80f93f9da05aa756
Global TX ID: 20c3a9cf276c38bcdaf26e3c27d0479b
Service Time: 478
Request ID: 03c2726a-dcb6-407f-96f1-f83f20fe9c9c
File Name: 1a3c4386-86d2-4969-805b-37649c16addb
Request Duration: 8153
Request URL: https://us-south.ml.cloud.ibm.com/ml/v1-beta/generation/text?version=2023-05-29
cf-ray: 82689bfd28e48ce2-EWR
=====
Source
GitHub - tspannhw/FLaNK-watsonx.ai: FLaNK Stack with watsonx.ai for google/flan-ul2…
FLaNK Stack with watsonx.ai for google/flan-ul2, google/flan-t5-xxl, Granite and other foundation models - GitHub …github.com
Make your own Slack Bot
Slack Output
Kafka Distribute
Apache Flink SQL Table Creation DDL
CREATE TABLE `ssb`.`Meetups`.`watsonairesults` (
`date` VARCHAR(2147483647),
`x_global_transaction_id` VARCHAR(2147483647),
`x_request_id` VARCHAR(2147483647),
`cf_ray` VARCHAR(2147483647),
`inputs` VARCHAR(2147483647),
`created_at` VARCHAR(2147483647),
`stop_reason` VARCHAR(2147483647),
`x_correlation_id` VARCHAR(2147483647),
`x_proxy_upstream_service_time` VARCHAR(2147483647),
`message_id` VARCHAR(2147483647),
`model_id` VARCHAR(2147483647),
`invokehttp_request_duration` VARCHAR(2147483647),
`message` VARCHAR(2147483647),
`uuid` VARCHAR(2147483647),
`generated_text` VARCHAR(2147483647),
`transaction_id` VARCHAR(2147483647),
`tokencount` VARCHAR(2147483647),
`generated_token` VARCHAR(2147483647),
`ts` VARCHAR(2147483647),
`advisoryId` VARCHAR(2147483647),
`eventTimeStamp` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp',
WATERMARK FOR `eventTimeStamp` AS `eventTimeStamp` - INTERVAL '3' SECOND
) WITH (
'deserialization.failure.policy' = 'ignore_and_log',
'properties.request.timeout.ms' = '120000',
'format' = 'json',
'properties.bootstrap.servers' = 'kafka:9092',
'connector' = 'kafka',
'properties.transaction.timeout.ms' = '900000',
'topic' = 'watsonxaillmanswers',
'scan.startup.mode' = 'group-offsets',
'properties.auto.offset.reset' = 'earliest',
'properties.group.id' = 'watsonxaillmconsumer'
)
CREATE TABLE `ssb`.`Meetups`.`watsonxresults` (
`date` VARCHAR(2147483647),
`x_global_transaction_id` VARCHAR(2147483647),
`x_request_id` VARCHAR(2147483647),
`cf_ray` VARCHAR(2147483647),
`inputs` VARCHAR(2147483647),
`created_at` VARCHAR(2147483647),
`stop_reason` VARCHAR(2147483647),
`x_correlation_id` VARCHAR(2147483647),
`x_proxy_upstream_service_time` VARCHAR(2147483647),
`message_id` VARCHAR(2147483647),
`model_id` VARCHAR(2147483647),
`invokehttp_request_duration` VARCHAR(2147483647),
`message` VARCHAR(2147483647),
`uuid` VARCHAR(2147483647),
`generated_text` VARCHAR(2147483647),
`transaction_id` VARCHAR(2147483647),
`tokencount` VARCHAR(2147483647),
`generated_token` VARCHAR(2147483647),
`ts` VARCHAR(2147483647),
`eventTimeStamp` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp',
WATERMARK FOR `eventTimeStamp` AS `eventTimeStamp` - INTERVAL '3' SECOND
) WITH (
'deserialization.failure.policy' = 'ignore_and_log',
'properties.request.timeout.ms' = '120000',
'format' = 'json',
'properties.bootstrap.servers' = 'kafka:9092',
'connector' = 'kafka',
'properties.transaction.timeout.ms' = '900000',
'topic' = 'watsonxaillm',
'scan.startup.mode' = 'group-offsets',
'properties.auto.offset.reset' = 'earliest',
'properties.group.id' = 'allwatsonx1'
)
Example Prompt
{"inputs":"Please answer to the following question. What is the capital of the United States?"}
IBM DB2 SQL
alter table "DB2INST1"."TRAVELADVISORY"
add column "summary" VARCHAR(2048);
-- DB2INST1.TRAVELADVISORY definition
CREATE TABLE "DB2INST1"."TRAVELADVISORY" (
"TITLE" VARCHAR(250 OCTETS) ,
"PUBDATE" VARCHAR(250 OCTETS) ,
"LINK" VARCHAR(250 OCTETS) ,
"GUID" VARCHAR(250 OCTETS) ,
"ADVISORYID" VARCHAR(250 OCTETS) ,
"DOMAIN" VARCHAR(250 OCTETS) ,
"CATEGORY" VARCHAR(4096 OCTETS) ,
"DESCRIPTION" VARCHAR(4096 OCTETS) ,
"UUID" VARCHAR(250 OCTETS) NOT NULL ,
"TS" BIGINT NOT NULL ,
"summary" VARCHAR(2048 OCTETS) )
IN "IBMDB2SAMPLEREL"
ORGANIZE BY ROW;
ALTER TABLE "DB2INST1"."TRAVELADVISORY"
ADD PRIMARY KEY
("UUID")
ENFORCED;
GRANT CONTROL ON TABLE "DB2INST1"."TRAVELADVISORY" TO USER "DB2INST1";
GRANT CONTROL ON INDEX "SYSIBM "."SQL230620142604860" TO USER "DB2INST1";
SELECT "summary", TITLE , ADVISORYID , TS, PUBDATE FROM DB2INST1.TRAVELADVISORY t
WHERE "summary" IS NOT NULL
ORDER BY ts DESC
For an example output email
https://github.com/tspannhw/FLaNK-watsonx.ai/blob/main/example.email.txt
References
The Latest in Real-Tim(e) Analytics: Generative AI, LLM and Beyond
Thursday October 26th, we had a meetup with two Tim's speaking on real-time analytics plus LLM and more. It was a great…medium.com
Streaming LLM with Apache NiFi (HuggingFace)
See my talk on August 23, 2023 at NYC AI Dev Day.medium.com
IBM Documentation
IBM Documentation.www.ibm.com
Supported foundation models available with watsonx.ai
A collection of open source and IBM foundation models are deployed in IBM watsonx.ai.www.ibm.com
Tips for writing foundation model prompts: prompt engineering
Part art, part science, prompt engineering is the process of crafting prompt text to best effect for a given model and…www.ibm.com
Sample foundation model prompts for common tasks
Try these samples to learn how different prompts can guide foundation models to do common tasks.www.ibm.com
IBM Documentation
IBM Documentation.www.ibm.com
IBM Technology Chooses Cloudera as its Preferred Partner for Addressing Real Time Data Movement…
Organizations increasingly rely on streaming data sources not only to bring data into the enterprise but also to…blog.cloudera.com
Travel Advisories
https://travel.state.gov/content/travel/en/traveladvisories/traveladvisories.html/
https://travel.state.gov/_res/rss/TAsTWs.xml
https://medium.com/@tspann/building-a-travel-advisory-app-with-apache-nifi-in-k8-969b44c84958
https://github.com/tspannhw/FLaNK-TravelAdvisory
Video
https://www.youtube.com/watch?v=RPz7Xm4fLF4&t=6s
Source Code
https://github.com/tspannhw/FLaNK-watsonx.ai
Models
https://www.ibm.com/docs/en/watsonx-as-a-service?topic=models-
https://medium.com/cloudera-inc/ingesting-events-into-dockerized-ibm-db2-jdbc-with-apache-nifi-f0ca452d1351
https://www.youtube.com/watch?v=-r8zf_nfxCw
https://medium.com/cloudera-inc/no-code-sentiment-analysis-with-hugging-face-and-apache-nifi-for-article-summaries-cf06d1df1283
https://medium.com/cloudera-inc/cdc-not-cat-data-capture-e43713879c03
https://medium.com/cloudera-inc/building-a-real-time-data-pipeline-a-comprehensive-tutorial-on-minifi-nifi-kafka-and-flink-ee03ee6722cb
https://medium.com/cloudera-inc/building-a-real-time-data-pipeline-a-comprehensive-tutorial-on-minifi-nifi-kafka-and-flink-ee03ee6722cb
Other Related Articles
The Latest in Real-Tim(e) Analytics: Generative AI, LLM and Beyond
Thursday October 26th, we had a meetup with two Tim's speaking on real-time analytics plus LLM and more. It was a great…medium.com
Streaming LLM with Apache NiFi (HuggingFace)
See my talk on August 23, 2023 at NYC AI Dev Day.medium.com
Evolve NYC 2023 Wrap-Up
Evolve NYC 2023 was an intense event with a ton of speakers, sessions, topics and cool people.medium.com
No Code Sentiment Analysis with Hugging Face and Apache NiFi for Article Summaries
Apache NiFi, Hugging Face, NY Times RSS, Sentiment Analysis, Deep Learning, REST API, Classification, Distilbert…medium.com