Kafka 事件流式处理 AI 和自动化(kafka处理流式数据)
探索如何使用 ChatGPT 创建物联网 Kafka 事件消费者,并使用 API 逻辑服务器来生成定义范围之外的温度读取事件。
Apache Kafka 已成为从静态数据(数据库事务)迁移到事件流的企业架构的明显领导者。有许多演示文稿解释了 Kafka 的工作原理以及如何扩展此技术堆栈(本地或云)。使用 ChatGPT 构建一个微服务来消费消息并丰富、转换和持久化是该项目的下一阶段。在此示例中,我们将使用来自 IoT 设备 (RaspberryPi) 的输入,该设备每隔几秒钟发送一次 JSON 温度读数。
使用消息
生成(并记录)每条 Kafka 事件消息时,Kafka 微服务使用者已准备好处理每条消息。我让 ChatGPT 生成一些 Python 代码,它为我提供了从命名的“主题”中轮询和读取的基础知识。我得到的是一个非常好的开始,可以消耗主题、键和 JSON 有效负载。ChatGPT 创建了代码,使用 SQLAlchemy 将其持久化到数据库中。然后,我想转换 JSON 有效负载,并使用 API Logic Server(ALS - GitHub 上的一个开源项目)规则来取消 JSON,验证、计算并根据给定范围之外的源温度生成一组新的消息有效负载。
ChatGPT: “design a Python Event Streaming Kafka Consumer interface”
注意:ChatGPT 选择了 Confluent Kafka 库(并使用其 Docker Kafka 容器)- 您可以修改代码以使用其他 Python Kafka 库。
SQLAlchemy模型
使用 API Logic Server(ALS:Python 开源平台),我们连接到 MySQL 数据库。ALS 将读取这些表,并为每个 ORM 端点创建一个 SQLAlchemy ORM 模型、一个 react-admin 用户界面、safrs-JSON Open API (Swagger) 和一个正在运行的 REST Web 服务。新的温度表将包含时间戳、IoT 设备 ID 和温度读数。在这里,我们使用 ALS 命令行实用程序来创建 ORM 模型:
ApiLogicServer create --project_name=iot --db_url=mysql+pymysql://root:password@127.0.0.1:3308/iot
API Logic Server 生成的类用于保存我们的值。Temperature
class Temperature(SAFRSBase, Base):br
__tablename__ = 'Temperature'br
_s_collection_name = 'Temperature' # type: ignorebr
__bind_key__ = 'None'br
br
Id = Column(Integer, primary_key=True)br
DeviceId = Column(Integer, nullable=False)br
TempReading = Column(Integer, nullable=False)br
CreateDT = Column(TIMESTAMP, server_default=text("CURRENT_TIMESTAMP"), nullable=False)br
KafkaMessageSent = Column(Booelan, default=text("False"))
变化
因此,我们不是将 Kafka JSON 使用者消息再次保存在 SQL 数据库中(并触发规则来执行工作),而是解包 JSON 有效负载 () 并将其插入 Temperature 表,而不是保存 JSON 有效负载。我们让声明性规则处理每个温度读数。util.row_to_entity
蟒
entity = models.Temperature()br
util.row_to_entity(message_data, entity) br
session.add(entity)
当消费者收到消息时,它会将其添加到会话中,从而触发规则(如下)。commit_event
声明性逻辑:生成消息
使用 API Logic Server(使用 SQLAlchemy、Flask 和类似 LogicBank 电子表格的规则引擎构建的自动化框架:公式、总和、计数、复制、约束、事件等),我们在 ORM 实体上添加一个声明性规则。当每条消息都保存到 Temperature 表中时,将调用该规则。如果温度读数超过或小于 ,我们将发送有关该主题的 Kafka 消息。我们还添加了一个约束,以确保我们在正常范围 (-) 内接收数据。我们将让另一个事件使用者处理警报消息。commit_eventTemperaturecommit_eventMAX_TEMPMIN_TEMP“TempRangeAlert”32132TDD 行为测试
使用 TDD(Test Driven Development),我们可以编写一个 Behave 测试,将记录直接插入到 Temperature 表中,然后检查返回值。行为以 /(.feature 文件)开头。对于每个场景,我们使用装饰器编写相应的 Python 类。KafkaMessageSentFeatureScenarioBehave
功能定义
Feature: TDD Temperature Examplebr
br
Scenario: Temperature Processingbr
Given A Kafka Message Normal (Temperature)br
When Transactions normal temperature is submittedbr
Then Check KafkaMessageSent Flag is Falsebr
br
Scenario: Temperature Processingbr
Given A Kafka Message Abnormal (Temperature)br
When Transactions abnormal temperature is submittedbr
Then Check KafkaMessageSent Flag is True
TDD Python 类
from behave import *br
import safrsbr
br
db = safrs.DB br
session = db.sessionbr
br
def insertTemperature(temp:int) -> bool:br
entity = model.Temperature()br
entity.TempReading = tempbr
entity.DeviceId = 'local_behave_test'br
session.add(entity) br
return entity.KafkaMessageSent br
br
@given('A Kafka Message Normal (Temperature)')br
def step_impl(context):br
context.temp = 76br
assert Truebr
br
@when('Transactions normal temperature is submitted')br
def step_impl(context):br
context.response_text = insertTemperature(context.temp)br
br
@then('Check KafkaMessageSent Flag is False')br
def step_impl(context):br
assert context.response_text == False
总结
使用 ChatGPT 为 Consumer 和 Producer 生成 Kafka 消息代码似乎是一个很好的起点。安装 Confluent Docker for Kafka。将 API Logic Server 用于声明性逻辑规则,使我们能够将公式、约束和事件添加到正常的事务流中,并将其添加到我们的 SQL 数据库中,并生成(和转换)新的 Kafka 消息,这是一个很好的组合。ChatGPT 和声明式逻辑是“配对编程”的下一个层次。
from confluent_kafka import Producerbr
conf = {'bootstrap.servers': 'localhostd:9092'}br
producer = Producer(conf)br
MAX_TEMP = arg.MAX_TEMP or 102br
MIN_TEMP = arg.MIN_TTEMP or 78br
br
def produce_message(br
row: models.KafkaMessage, br
old_row: models.KafkaMessage, br
logic_row: LogicRow):br
br
if logic_row.isInserted() and row.TempReading > MAX_TEMP:br
produce(topic="TempRangeAlert", br
key=row.Id,br
value=f"The temperature {row.TempReading}F exceeds {MAX_TEMP}F on Device {row.DeviceId}")br
row.KafkaMessageSent = Truebr
br
if logic_row.isInserted() and row.TempReading < MIN_TEMP:br
produce(topic="TempRangeAlert", br
key=row.Id,br
value=f"The temperature {row.TempReading}F less than {MIN_TEMP}F on Device {row.DeviceId}")br
br
row.KafkaMessageSent = Truebr
br
Rules.constraint(models.Temperature, br
as_expression= lambda row: row.TempReading < 32 or row.TempReading > 132, br
error_message= "Temperature {row.TempReading} is out of range"br
Rules.commit_event(models.Temperature, calling=produce_message)
仅当温度读数大于或小于时才会生成警报消息。Constraint 将在调用 commit 事件之前检查温度范围(请注意,规则始终是无序的,可以随着规范的变化而引入)。MAX_TEMPMIN_TEMP
原文标题:Kafka Event Streaming AI and Automation
原文链接:https://dzone.com/articles/event-streaming-ai-amp-automation
作者:Tyler Band
编译:LCR