专栏名称: 奇舞精选
《奇舞精选》是由奇舞团维护的前端技术公众号。除周五外,每天向大家推荐一篇前端相关技术文章,每周五向大家推送汇总周刊内容。
目录
相关文章推荐
国家外汇管理局  ·  国家外汇管理局公布2025年1月末外汇储备规模数据 ·  19 小时前  
国家外汇管理局  ·  习近平在哈尔滨第九届亚洲冬季运动会开幕式欢迎 ... ·  19 小时前  
NoxInfluencer  ·  万象新启 再踏征程! ·  2 天前  
金色旋风  ·  最后3天,10年最低,买贵倒赔1000!福利 ... ·  2 天前  
金色旋风  ·  最后3天,10年最低,买贵倒赔1000!福利 ... ·  2 天前  
51好读  ›  专栏  ›  奇舞精选

事件驱动的 AI:使用 Kafka 和 Flink 构建研究助手

奇舞精选  · 公众号  ·  · 2025-01-18 10:00

正文

本文译者为 360 奇舞团前端开发工程师
原文标题:Event-Driven AI: Building a Research Assistant with Kafka and Flink
原文作者:Sean Falconer
原文地址:https://seanfalconer.medium.com/event-driven-ai-building-a-research-assistant-with-kafka-and-flink-e95db47eb3f3

自主式 AI 的兴起,让人们开始关注能够自主执行任务,提供建议并融合人工智能与传统计算来执行复杂工作流程的智能体的强烈关注。但是现实是产品导向的,创建这种类型的智能体,其困难程度面临超出 AI 本身能力的挑战。

如果没有精心设计的架构,那么随着系统的不断演变,组件之间的依赖关系可能会造成瓶颈,譬如可扩展性的限制,维护工作日渐困难等等。解决这些问题的关键在于解耦工作流程,使智能体与基础设施、以及其他组件的交互灵活起来,而不是生硬的依赖关系。

这种灵活、可扩展的集成需要一种共享的数据交换“语言”——即一种由事件流驱动的具有鲁棒性的事件驱动架构。通过围绕事件组织应用程序,智能体能够在一个响应迅速且相互解耦的系统中运行,在该系统里每个部分都能独立履行自身职责完成工作。并且,团队可以自由选择技术,单独管理扩展需求,并持续保持组件之间的清晰边界,从而实现真正的敏捷。

为了验证上述内容提到的规范,我开发了 PodPrep AI,一个能够为我在 Software Engineering Daily 和 Software Huddle 的播客采访中提供帮助的 AI 驱动型研究助手。而在本文后续篇幅,我将深入探讨 PodPrep AI 的设计和架构,展示 EDA 和实时数据流如何驱动有效的自主式系统。

注意:如果你想直接查看代码,可以跳转到我的 Github 仓库(https://github.com/thefalc/podcast-research-agent/)。

为什么选择事件驱动架构用于 AI?

在现实世界的 AI 应用,紧密耦合的单体设计较难发挥作用。当概念验证或演示为了简洁而使用单体系统的话,在切换到生产环境尤其是分布式环境中就会陷入困境,难以发挥作用。紧密耦合的系统会带来瓶颈,难以扩展,进而拖累后续的迭代——这些都是随着 AI 解决方案增长而应当避免的挑战。

所以此时,不妨考虑一个典型的 AI 智能体。

该 AI 智能体可能需要从多个来源提取数据,处理提示工程和检索增强生成 (RAG) 工作流,并直接与各种工具交互以执行确定性的工作流程。这种情况下,背后的编排是相当复杂的,而且依赖于多个系统。如果智能体需要跟其他智能体通信,复杂性只会日趋复杂。如果没有灵活的架构,这些依赖关系会让后续扩展和修改极其困难。

在生产环境中,不同的团队通常负责堆栈的不同部分:例如 MLOps 和数据工程管理负责管理 RAG 管道,数据科学负责选择模型,应用开发者负责构建交互和服务。紧密耦合的设置迫使这些团队之间形成依赖关系,从而减缓交付速度并使扩展变得困难。理想情况下,应用层不需要理解 AI 的内部结构;它们应该仅在需要时消费结果。

此外,AI 应用不能孤立运行。为了实现真正的价值,AI 洞察需要在客户数据平台(CDPs)、客户关系管理(CRM)、分析等之间无缝流动。当客户互动时应当实时触发更新,数据直接传入到其他工具进行操作和分析。如果没有一致的方法,跨平台集成的洞察会难以管理、而且无法扩展。

EDA 驱动的 AI 通过为数据创建一个“中央神经系统”来解决这些挑战。通过 EDA,应用程序能够广播事件,而不是依赖于链式命令。这样一来解耦了组件,数据能够异步流动在被需要的地方,从而让每个团队能够独立工作。EDA 能够促进无缝的数据集成、可扩展的增长和弹性——然后成为现代 AI 驱动系统的强大基础。

设计一个可扩展的 AI 驱动型研究智能体

在过去两年中,我在 Software Engineering Daily、Software Huddle 和 Partially Redacted 上主持了数百个播客。

为了筹备每一期播客,我会开展深入的调研,用于撰写一份播客概要。这份概要涵盖我的想法、嘉宾和主题背景信息。在撰写过程,我通常会了解嘉宾以及他们所在的公司,收听他们可能上过的其他播客节目,阅读他们撰写的博客文章,以及深入研究我们即将讨论的主要话题。

我会尝试把此次内容与我主持过的其他播客,或是类似主题相关的经历联系起来。整个过程挺耗时费力。通常大型播客团队的节目主持人,其工作都会有专门的研究人员和助理支持,但我只能亲力亲为。

为了解决这个问题,我想构建一个可以为我完成这项工作的智能体。从高层次来看,智能体的工作流程大致如下图所示。

我提供诸如嘉宾姓名、所在公司、聚焦主题等基础材料,还有一些参考网址,比如博客文章和现有的播客,接着经过一些人工智能的神奇操作,我的调研就完成了。

这个简单的想法促使我创建了 PodPrep AI,我的 AI 驱动研究智能体,只需花费我一些代币。

接下来的文章,从用户界面开始讨论 PodPrep AI 的设计。

构建智能体用户界面

我将该智能体的界面设计为一个 Web 应用程序,这样我就可以轻松地为调研过程输入源材料。这些源材料包括嘉宾的姓名、他们的公司、采访主题、任何额外的背景信息,以及相关博客、网站和以往播客采访的链接。

我本来可以给智能体更少的指示,作为其工作流程的一部分,让它自行寻找源材料,但在 1.0 版本中,我还是决定提供源 URL。

这个 Web 应用程序是一个标准的三层应用,使用 Next.js 和 MongoDB 作为应用数据库。它对 AI 一无所知。它只是允许用户输入新的研究包,这些包会一直呆在处理状态,直到智能体完成工作流并在应用数据库中填充研究简报。

一旦 AI 魔法完成,我就可以访问该条目的简报文件,如下所示。

创建智能体工作流程

在 1.0 版本中,我希望能够执行三个主要步骤来构建研究简报:

对于任何网站 URL、博客文章或播客,检索文本或摘要,将文本分块为合理大小,生成文本嵌入,并持久化生成的向量。

然后在研究源 URL 提取的所有文本中,提取最有趣的问题,并存储。

最后根据嵌入信息、之前提出的最佳问题以及作为输入包一部分的任何其他信息,生成一份播客研究概要,将最相关的背景信息整合在一起。

下图显示了从 Web 应用程序到智能体工作流程的架构。

上述操作 #1 得到了“处理 URL 和创建嵌入智能体 HTTP 接口”支持。

操作 #2 使用 Flink 和 Confluent Cloud 中内置的 AI 模型支持来执行。

最后,操作 #3 由“生成研究简报智能体”执行,这也是一个 HTTP 接口,在完成前面两个操作后被调用一次。

在接下来的部分中,我将详细讨论这些操作。

处理 URL 和创建嵌入智能体

该智能体负责从研究源 URL 和向量嵌入管道提取文本。下面显示了处理研究材料时后台发生的高层次流程。

一旦用户创建了一个研究包并将其保存到 MongoDB 中,一个 MongoDB 源连接器就会向名为 research-requests 的 Kafka 主题发送消息。这就是启动智能体工作流程的起点。

发送到 HTTP 端点的每个 POST 请求都包含来自研究请求的网址以及 MongoDB 研究包集合中的主键。

该智能体会遍历每个网址,如果所访问的网址不是苹果播客,那么它会尝试获取完整的页面 HTML。由于我不知道页面的结构,我无法依赖 HTML 解析库来找到相关文本。相反,我会使用以下提示,将页面文本发送给 temperature:0 设置的 gpt-4o-minimodel,从而获取我所需的内容。

这是一个网页的内容:
${text}

Instructions:
- If there is a blog post within this content, extract and return the main text of the blog post.
- If there is no blog post, summarize the most important information on the page.

接下来,对于播客,我需要花费更多工夫。

反向工程 Apple 播客 URL

为了从播客节目中提取数据,我们首先需要使用 Whisper 模型将音频转换为文本。但在此之前,我们必须找到每个播客节目的实际 MP3 文件,将其下载,并切割成 25MB 或更小的片段(这是 Whisper 所能处理的最大文件)。

挑战在于 Apple 不提供其播客集的直接 MP3 链接。然而,MP3 文件可以在播客的原始 RSS 源中找到,我们可以在 Apple 播客 ID 找到该源。

例如,在下面的 URL 中,/id 后面的数字部分是播客的唯一 Apple ID:

https://podcasts.apple.com/us/podcast/deep-dive-into-inference-optimization-for-llms-with/id1699385780?i=1000675820505

使用 Apple 的 API,我们可以查找播客 ID 并检索包含 RSS 源 URL 的 JSON 响应内容:

https://itunes.apple.com/lookup?id=1699385780&entity=podcast

一旦我们拥有了 RSS 订阅源的 XML 文件,我们就会在其中搜索特定的剧集。由于我们仅从苹果公司获得了剧集的 URL(而不是实际的标题),所以我们会使用 URL 中的标题片段来在订阅源中定位该剧集并检索其 MP3 网址。

async function getMp3DownloadUrl(url{
  let podcastId = extractPodcastId(url);
  let titleToMatch = extractAndFormatTitle(url);

  if (podcastId) {
    let feedLookupUrl = `https://itunes.apple.com/lookup?id=${podcastId}&entity=podcast`;

    const itunesResponse = await axios.get(feedLookupUrl);
    const itunesData = itunesResponse.data;

    // Check if results were returned
    if (itunesData.resultCount === 0 || !itunesData.results[0].feedUrl) {
      console.error("No feed URL found for this podcast ID.");
      return;
    }

    // Extract the feed URL
    const feedUrl = itunesData.results[0].feedUrl;

    // Fetch the document from the feed URL
    const feedResponse = await axios.get(feedUrl);
    const rssContent = feedResponse.data;

    // Parse the RSS feed XML
    const rssData = await parseStringPromise(rssContent);
    const episodes = rssData.rss.channel[0].item; // Access all items (episodes) in the feed

    // Find the matching episode by title, have to transform title to match the URL-based title
    const matchingEpisode = episodes.find(episode => {
        return getSlug(episode.title[0]).includes(titleToMatch);
      }
    );

    if (!matchingEpisode) {
      console.log(`No episode found with title containing "${titleToMatch}"`);
      return false;
    }

    // Extract the MP3 URL from the enclosure tag
    return matchingEpisode.enclosure[0].$.url;
  }
  
  return false;
}

现在,由于有了来自博客文章、网站和 MP3 的文本,该智能体使用 LangChain 的递归字符文本拆分器将文本拆分成若干片段,并根据这些片段生成嵌入。这些片段会被发布到 text-embedding topic,并存储到 MongoDB 中。

注意:我选择将 MongoDB 既用作我的应用程序数据库,也用作向量数据库。然而,由于我所采用的探索性数据分析(EDA)方法,这两者可以轻松地作为独立系统,并且只需将来自文本嵌入主题的接收连接器进行切换即可。

除了创建和发布嵌入信息外,该智能体还将来自源的文本发布到一个名为 full-text-from-sources 的主题。向该主题发布信息会启动行动 #2。

使用 Flink 和 OpenAI 提取问题

Apache Flink 是一个开源的流处理框架,专为实时处理大量数据而构建,非常适用于高吞吐量、低延迟的应用程序。通过将 Flink 与 Confluent 相结合,我们可以将像 OpenAI 的 GPT 这样的大型语言模型直接引入流工作流。这种集成可实现实时的检索增强生成(RAG)工作流,确保问题提取过程使用最新的可用数据。

在流中包含原始源文本还使我们能够在之后引入使用相同数据的新工作流,从而增强研究概要的生成过程,或将其发送至诸如数据仓库之类的下游服务。这种灵活的设置允许我们随着时间的推移添加额外的人工智能和非人工智能特性,而无需对核心管道进行全面检修。

在 PodPrep AI 中,我使用 Flink 从源 URL 提取文本中的问题。

将 Flink 配置为调用大型语言模型(LLM)涉及通过 Confluent 的命令行界面(CLI)来配置一个连接。以下是一条用于建立 OpenAI 连接的示例命令,不过还有多个可用选项。

confluent flink connection create openai-connection \
--cloud aws \
--region us-east-1 \
--type openai \
--endpoint https://api.openai.com/v1/chat/completions \
--api-key 

一旦连接建立,我可以在 Cloud Console 或 Flink SQL shell 中创建模型。然后对于问题提取,我相应地设置模型。

-- Creates model for pulling questions from research source material
CREATE MODEL `question_generation`
INPUT (text STRING)
OUTPUT (response STRING)
WITH (
  'openai.connection'='openai-connection',
  'provider'='openai',
  'task'='text_generation',
  'openai.model_version' = 'gpt-3.5-turbo',
  'openai.system_prompt' = 'Extract the most interesting questions asked from the text. Paraphrase the questions and seperate each one by a blank line. Do not number the questions.'
);

在模型准备好后,我使用 Flink 内置的 ml_predict 函数从源材料生成问题,将输出写入名为 mined-questions 的流,该流与 MongoDB 同步以供后续使用。

-- Generates questions based on text pulled from research source material
INSERT INTO `mined-questions`
SELECT 
    `key`
    `bundleId`
    `url`
    q.response AS questions 
FROM 
    `full-text-from-sources`,
    LATERAL TABLE (
        ml_predict('question_generation'content)
    ) AS q;

Flink 还可帮助跟踪所有研究材料何时处理完毕,进而触发研究简报生成。这是通过以下方式实现的:当 mined-questions 中的 URL 与 full-text sources 流中的 URL 匹配后,写入 completed-requests 流来完成的。

-- Writes the bundleId to the complete topic once all questions have been created
INSERT INTO `completed-requests`
SELECT '' AS id, pmq.bundleId
FROM (
    SELECT bundleId, COUNT(urlAS url_count_mined
    FROM `mined-questions`
    GROUP BY bundleId
AS pmq
JOIN (
    SELECT bundleId, COUNT(urlAS url_count_full
    FROM `full-text-from-sources`
    GROUP BY bundleId
AS pft
ON pmq.bundleId = pft.bundleId
WHERE pmq.url_count_mined = pft.url_count_full;
As messages are written to completed-requests, the unique ID for the research bundle is sent to the Generate Research Brief Agent.

当消息写入 completed-requests 时,研究包的唯一 ID 会发送到生成研究简报智能体。

生成研究简报智能体

该智能体获取所有可用的最相关研究材料,并使用 LLM 创建研究简报。下面显示了创建研究简报时发生的高层次事件流程。







请到「今天看啥」查看全文


推荐文章
国家外汇管理局  ·  国家外汇管理局公布2025年1月末外汇储备规模数据
19 小时前
NoxInfluencer  ·  万象新启 再踏征程!
2 天前