第3章:并行化 Parallelization
并行化模式概述
在前几章中,我们探讨了用于顺序工作流的提示链(Prompt Chaining)以及用于动态决策和路径转换的路由(Routing)。虽然这些模式非常重要,但许多复杂的智能体任务涉及多个子任务,这些子任务可以同时执行,而不是一个接一个地完成。这就是并行化模式发挥关键作用的地方。
并行化是指同时执行多个组件,例如 LLM 调用、工具使用,甚至是完整的子智能体(见图 1)。与等待一个步骤完成后再开始下一个步骤不同,并行执行允许独立任务同时运行,从而显著减少可以拆分为独立部分的任务的总体执行时间。
考虑一个设计用于研究某个主题并总结其发现的智能体。顺序方法可能是:
- 搜索来源 A。
- 总结来源 A。
- 搜索来源 B。
- 总结来源 B。
- 将来源 A 和 B 的总结综合为最终答案。
而并行方法可以是:
- 同时搜索来源 A 和 来源 B。
- 两个搜索完成后,同时总结来源 A 和 来源 B。
- 将来源 A 和 B 的总结综合为最终答案(此步骤通常是顺序的,需要等待并行步骤完成)。
核心思想是识别工作流中不依赖其他部分输出的部分,并并行执行它们。这种方法在处理具有延迟的外部服务(如 API 或数据库)时特别有效,因为可以同时发出多个请求。
实现并行化通常需要支持异步执行或多线程/多进程的框架。现代智能体框架设计时已考虑到异步操作,使您能够轻松定义可以并行运行的步骤。

图 1. 使用子智能体进行并行化的示例
诸如 LangChain、LangGraph 和 Google ADK 等框架提供了并行执行的机制。在 LangChain 表达语言(LCEL)中,可以通过使用 |(表示顺序)等运算符组合可运行对象,以及通过结构化链或图形以并行执行分支来实现并行执行。LangGraph 通过其图结构允许您定义多个节点,这些节点可以从单一状态转换中执行,从而有效地实现工作流中的并行分支。Google ADK 提供了强大的原生机制来促进和管理智能体的并行执行,大大提高了复杂多智能体系统的效率和可扩展性。ADK 框架的这一内在能力使开发人员能够设计和实现解决方案,使多个智能体可以并行运行,而不是顺序执行。
并行化模式对于提高智能体系统的效率和响应能力至关重要,特别是在处理涉及多个独立查找、计算或与外部服务交互的任务时。这是优化复杂智能体工作流性能的重要技术。
实际应用与用例
并行化是一种强大的模式,可用于优化智能体在各种应用中的性能:
1. 信息收集与研究:
同时从多个来源收集信息是一个经典的用例。
- 用例: 一个研究公司的智能体。
- 并行任务: 同时搜索新闻文章、提取股票数据、检查社交媒体提及以及查询公司数据库。
- 优势: 比顺序查找更快地获得全面的视图。
2. 数据处理与分析:
同时应用不同的分析技术或处理不同的数据片段。
- 用例: 分析客户反馈的智能智能体。
- 并行任务: 对一批反馈条目同时进行情感分析、关键词提取、反馈分类以及紧急问题识别。
- 优势: 快速提供多维度分析。
3. 多API或工具交互:
调用多个独立的API或工具以收集不同类型的信息或执行不同操作。
- 用例: 旅行规划智能智能体。
- 并行任务: 同时查询航班价格、搜索酒店可用性、查找当地活动信息以及寻找餐厅推荐。
- 优势: 更快速地提供完整的旅行计划。
4. 多组件内容生成:
并行生成复杂内容的不同部分。
- 用例: 创建营销邮件的智能智能体。
- 并行任务: 同时生成邮件主题行、撰写邮件正文、寻找相关图片以及创建行动号召按钮文本。
- 优势: 更高效地组装最终邮件。
5. 验证与校验:
同时执行多个独立的检查或验证任务。
- 用例: 验证用户输入的智能智能体。
- 并行任务: 同时检查邮件格式、验证电话号码、将地址与数据库比对以及检查是否包含不当语言。
- 优势: 更快地反馈输入的有效性。
6. 多模态处理:
同时处理同一输入的不同模态(文本、图像、音频)。
- 用例: 分析带有文本和图像的社交媒体帖子的智能智能体。
- 并行任务: 同时分析文本的情感和关键词,以及分析图像中的物体和场景描述。
- 优势: 更快速地整合来自不同模态的洞察。
7. A/B测试或多选项生成:
并行生成多个响应或输出变体以选择最佳选项。
- 用例: 生成不同创意文本选项的智能智能体。
- 并行任务: 使用稍有不同的提示或模型同时生成三种不同的文章标题。
- 优势: 快速比较并选择最佳选项。
并行化是智能智能体设计中的一种基本优化技术,通过对独立任务的并发执行,开发者可以构建性能更高、响应更快的应用程序。
LangChain 实践代码示例
在 LangChain 框架中,可以通过 LangChain 表达式语言(LCEL)实现并行执行。主要方法是将多个可运行组件结构化为字典或列表构造。当此集合作为输入传递给链中的后续组件时,LCEL运行时会并发执行其中包含的可运行任务。
在 LangGraph 的上下文中,这一原理应用于图的拓扑结构。通过设计图形,使得多个节点在没有直接顺序依赖的情况下,可以从一个公共节点启动,从而定义并行工作流。这些并行路径独立执行,随后其结果可在图中的后续汇聚点进行整合。
以下实现展示了一个使用 LangChain 框架构建的并行处理工作流。该工作流旨在响应单个用户查询时并发执行两个独立操作。这些并行进程被实例化为独立的链或函数,其各自的输出随后被汇总为统一的结果。
实现此功能的前提条件包括安装必要的 Python 包,例如 langchain、langchain-community 以及类似 langchain-openai 的模型提供库。此外,还需在本地环境中配置所选语言模型的有效 API 密钥以进行身份验证。
import os
import asyncio
from typing import Optional
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import Runnable, RunnableParallel, RunnablePassthrough
## --- 配置 ---
## 确保设置了您的 API 密钥环境变量(例如,OPENAI_API_KEY)
try:
llm: Optional[ChatOpenAI] = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
except Exception as e:
print(f"初始化语言模型时出错: {e}")
llm = None
## --- 定义独立链 ---
## 以下三个链表示可以并行执行的不同任务。
summarize_chain: Runnable = (
ChatPromptTemplate.from_messages([
("system", "请简明扼要地总结以下主题:"),
("user", "{topic}")
])
| llm
| StrOutputParser()
)
questions_chain: Runnable = (
ChatPromptTemplate.from_messages([
("system", "请针对以下主题生成三个有趣的问题:"),
("user", "{topic}")
])
| llm
| StrOutputParser()
)
terms_chain: Runnable = (
ChatPromptTemplate.from_messages([
("system", "请从以下主题中识别 5 到 10 个关键术语,用逗号分隔:"),
("user", "{topic}")
])
| llm
| StrOutputParser()
)
## --- 构建并行 + 综合链 ---
## 1. 定义并行运行的任务块。这些任务的结果以及原始主题将被传递到下一步。
map_chain = RunnableParallel(
{
"summary": summarize_chain,
"questions": questions_chain,
"key_terms": terms_chain,
"topic": RunnablePassthrough(), # 直接传递原始主题
}
)
## 2. 定义最终的综合提示,用于结合并行结果。
synthesis_prompt = ChatPromptTemplate.from_messages([
("system", """基于以下信息:
总结: {summary}
相关问题: {questions}
关键术语: {key_terms}
综合生成一个全面的答案。"""),
("user", "原始主题: {topic}")
])
## 3. 构建完整链,将并行结果直接传递到综合提示中,随后通过 LLM 和输出解析器处理。
full_parallel_chain = map_chain | synthesis_prompt | llm | StrOutputParser()
## --- 运行链 ---
async def run_parallel_example(topic: str) -> None:
"""
异步调用并行处理链以处理特定主题,并打印综合结果。
参数:
topic: LangChain 链处理的输入主题。
"""
if not llm:
print("LLM 未初始化,无法运行示例。")
return
print(f"\n--- 针对主题 '{topic}' 运行并行 LangChain 示例 ---")
try:
# `ainvoke` 的输入是单个 'topic' 字符串,
# 然后传递给 `map_chain` 中的每个 runnable。
response = await full_parallel_chain.ainvoke(topic)
print("\n--- 最终响应 ---")
print(response)
except Exception as e:
print(f"\n链执行过程中发生错误: {e}")
if __name__ == "__main__":
test_topic = "太空探索的历史"
# 在 Python 3.7+ 中,asyncio.run 是运行异步函数的标准方式。
asyncio.run(run_parallel_example(test_topic))
代码首先从 langchain_openai 和 langchain_core 导入必要的模块,包括语言模型、提示、输出解析以及可运行结构的组件。代码尝试初始化一个 ChatOpenAI 实例,具体使用 "gpt-4o-mini" 模型,并指定温度以控制创造性。在语言模型初始化过程中,使用了 try-except 块以增强鲁棒性。然后定义了三个独立的 LangChain "链",每个链都旨在对输入主题执行一个特定任务。第一个链用于简洁地总结主题,使用系统消息和包含主题占位符的用户消息。第二个链被配置为生成与主题相关的三个有趣问题。第三个链用于从输入主题中识别 5 到 10 个关键术语,并要求以逗号分隔的形式返回。这些独立的链每个都包含一个专门针对其任务的 ChatPromptTemplate,随后是初始化的语言模型和一个 StrOutputParser,用于将输出格式化为字符串。
接着构建了一个 RunnableParallel 块,将这三个链捆绑在一起,使它们能够同时执行。这个并行可运行结构还包括一个 RunnablePassthrough,以确保原始输入主题可用于后续步骤。为最终综合步骤定义了一个单独的 ChatPromptTemplate,将总结、问题、关键术语以及原始主题作为输入,以生成一个全面的答案。完整的端到端处理链名为 full_parallel_chain,通过将 map_chain(并行块)顺序连接到综合提示,然后是语言模型和输出解析器来创建。提供了一个异步函数 run_parallel_example,以演示如何调用这个 full_parallel_chain。该函数将主题作为输入,并使用 invoke 来运行异步链。最后,标准的 Python if __name__ == "__main__": 块展示了如何使用 asyncio.run 来管理异步执行,并以 "The history of space exploration" 为示例主题执行 run_parallel_example。
本质上,这段代码设置了一个工作流,其中针对给定主题的多个 LLM 调用(用于总结、问题和术语)同时发生,然后通过最终的 LLM 调用将结果组合起来。这展示了使用 LangChain 在智能体工作流中实现并行化的核心思想。
实操代码示例(Google ADK)
现在,我们将注意力转向一个具体的示例,说明如何在 Google ADK 框架中应用这些概念。我们将研究如何应用 ADK 的原语,例如 ParallelAgent 和 SequentialAgent,来构建一个利用并发执行以提高效率的智能体流。
from google.adk.agents import LlmAgent, ParallelAgent, SequentialAgent
from google.adk.tools import google_search
GEMINI_MODEL="gemini-2.0-flash"
## --- 1. 定义研究子智能体(并行运行) ---
## 研究员 1:可再生能源
researcher_agent_1 = LlmAgent(
name="RenewableEnergyResearcher",
model=GEMINI_MODEL,
instruction="""你是一名专注于能源领域的AI研究助理。研究“可再生能源来源”的最新进展。使用提供的 Google 搜索工具。简洁地总结你的关键发现(1-2句话)。仅输出总结。""",
description="研究可再生能源来源。",
tools=[google_search],
# 将结果存储在状态中以供合并智能体使用
output_key="renewable_energy_result"
)
## Researcher 2: 电动汽车
researcher_agent_2 = LlmAgent(
name="EVResearcher",
model=GEMINI_MODEL,
instruction="""你是一名专注于交通领域的AI研究助手。研究“电动汽车技术”的最新发展。使用提供的Google搜索工具。简明总结你的关键发现(1-2句话)。仅输出总结内容。""",
description="研究电动汽车技术。",
tools=[google_search],
# 将结果存储在状态中供合并智能体使用
output_key="ev_technology_result"
)
## Researcher 3: 碳捕获
researcher_agent_3 = LlmAgent(
name="CarbonCaptureResearcher",
model=GEMINI_MODEL,
instruction="""你是一名专注于气候解决方案的AI研究助手。研究“碳捕获方法”的现状。使用提供的Google搜索工具。简明总结你的关键发现(1-2句话)。仅输出总结内容。""",
description="研究碳捕获方法。",
tools=[google_search],
# 将结果存储在状态中供合并智能体使用
output_key="carbon_capture_result"
)
## --- 2. 创建并行智能体(同时运行研究员智能体) ---
## 此智能体协调研究员的并行执行。
## 一旦所有研究员完成并将结果存储在状态中,它将结束。
parallel_research_agent = ParallelAgent(
name="ParallelWebResearchAgent",
sub_agents=[researcher_agent_1, researcher_agent_2, researcher_agent_3],
description="并行运行多个研究智能体以收集信息。"
)
## --- 3. 定义合并智能体(在并行智能体之后运行) ---
## 此智能体从并行智能体在会话状态中存储的结果中提取信息,
## 并将其综合为一个结构化的报告,附上来源。
merger_agent = LlmAgent(
name="SynthesisAgent",
model=GEMINI_MODEL, # 或者在需要更强大的综合能力时使用更强大的模型
instruction="""你是一名负责将研究发现整合为结构化报告的AI助手。你的主要任务是综合以下研究总结,并清晰地归因于其来源领域。使用每个主题的标题来组织你的回应。确保报告连贯,并平滑地整合关键点。 **关键点:你的全部回应必须*仅*基于以下“输入总结”中提供的信息。请勿添加任何外部知识、事实或未在这些具体总结中出现的细节。** **输入总结:** * **可再生能源:** {renewable_energy_result} * **电动汽车:** {ev_technology_result} * **碳捕获:** {carbon_capture_result} **输出格式:** ## 最近可持续技术进展总结 ### 可再生能源发现(基于RenewableEnergyResearcher的发现) [仅基于上述可再生能源输入总结进行综合和阐述。] ### 电动汽车发现(基于EVResearcher的发现) [仅基于上述电动汽车输入总结进行综合和阐述。] ### 碳捕获发现(基于CarbonCaptureResearcher的发现) [仅基于上述碳捕获输入总结进行综合和阐述。] ### 总体结论 [提供一个简短的(1-2句话)总结声明,仅连接上述发现。] 仅输出遵循此格式的结构化报告。不要包含此结构之外的介绍或结论性短语,并严格遵守仅使用提供的输入总结内容。""",
description="将并行智能体的研究发现整合为结构化的、具有引用的报告,严格基于提供的输入。",
# 合并不需要工具
# 此处无需output_key,因为其直接响应是序列的最终输出
)
## --- 4. 创建 SequentialAgent(协调整体流程) ---
## 这是将要运行的主智能体。它首先执行 ParallelAgent 来填充状态,然后执行 MergerAgent 以生成最终输出。
sequential_pipeline_agent = SequentialAgent(
name="ResearchAndSynthesisPipeline",
# 先并行研究,然后合并
sub_agents=[parallel_research_agent, merger_agent],
description="协调并行研究并综合结果。"
)
root_agent = sequential_pipeline_agent
此代码定义了一个用于研究和综合可持续技术进展信息的多智能体系统。它设置了三个 LlmAgent 实例作为专门的研究员。ResearcherAgent_1 专注于可再生能源,ResearcherAgent_2 研究电动车技术,ResearcherAgent_3 调查碳捕获方法。每个研究员智能体都配置为使用 GEMINI_MODEL 和 google_search 工具,并被指示简洁地总结其发现(1-2 句),并使用 output_key 将这些总结存储在会话状态中。
接着,创建了一个名为 ParallelWebResearchAgent 的 ParallelAgent,用于同时运行这三个研究员智能体。这使得研究可以并行进行,从而节省时间。一旦所有子智能体(研究员)完成并填充了状态,ParallelAgent 的执行即告完成。
然后定义了一个 MergerAgent(也是一个 LlmAgent),用于综合研究结果。该智能体将并行研究员存储在会话状态中的总结作为输入。其指令强调输出必须严格基于提供的输入总结,禁止添加外部知识。MergerAgent 被设计为将综合的发现结构化为一个报告,其中包括每个主题的标题以及简短的总体结论。
最后,创建了一个名为 ResearchAndSynthesisPipeline 的 SequentialAgent 来协调整个工作流程。作为主要控制器,这个主智能体首先执行 ParallelAgent 以进行研究。一旦 ParallelAgent 完成,SequentialAgent 就会执行 MergerAgent 来综合收集到的信息。sequential_pipeline_agent 被设置为 root_agent,代表运行这个多智能体系统的入口点。整个流程旨在高效地从多个来源并行收集信息,然后将其整合为一个结构化的报告。
概览
定义(What)
许多智能体工作流涉及多个子任务,这些任务必须完成才能实现最终目标。纯粹的顺序执行方式,即每个任务都等待前一个任务完成后再开始,通常效率低下且耗时较长。当任务依赖外部 I/O 操作(例如调用不同的 API 或查询多个数据库)时,这种延迟会成为一个显著的瓶颈。如果没有并发执行机制,总处理时间将是所有单个任务持续时间的总和,从而阻碍系统的整体性能和响应速度。
设计意图(Why)
并行化模式通过允许独立任务的同时执行提供了一个标准化的解决方案。它通过识别工作流中的组件(如工具使用或 LLM 调用),这些组件不依赖彼此的即时输出。像 LangChain 和 Google ADK 这样的智能体框架提供了内置构造来定义和管理这些并发操作。例如,主进程可以调用多个并行运行的子任务,并在所有任务完成后继续执行下一步。通过同时运行这些独立任务而不是一个接一个地运行,这种模式显著减少了总执行时间。
使用原则(Rule of Thumb)
当工作流包含多个可以同时运行的独立操作时,例如从多个 API 获取数据、处理不同的数据块或生成多段内容以供后续合成时,可以使用此模式。
图解 (Visual Summary)

图 2:并行化设计模式
关键要点
以下是关键要点:
- 并行化是一种用于同时执行独立任务以提高效率的模式。
- 当任务涉及等待外部资源(如 API 调用)时,它特别有用。
- 采用并发或并行架构会引入显著的复杂性和成本,影响设计、调试和系统日志记录等关键开发阶段。
- LangChain 和 Google ADK 等框架提供了内置支持,用于定义和管理并行执行。
- 在 LangChain 表达语言 (LCEL) 中,
RunnableParallel是一个关键构造,用于并行运行多个可运行对象。 - Google ADK 可以通过基于 LLM 的委托实现并行执行,其中协调器智能体的 LLM 识别独立子任务,并触发专门子智能体的并行处理。
- 并行化有助于减少整体延迟,使智能体系统在处理复杂任务时更具响应性。
结论
并行化模式是一种通过同时执行独立子任务来优化计算工作流的方法。这种方法能够减少整体延迟,尤其是在涉及多个模型推理或调用外部服务的复杂操作中。
框架提供了实现此模式的不同机制。在 LangChain 中,可以使用诸如 RunnableParallel 之类的构造显式定义并同时执行多个处理链。而在 Google Agent Developer Kit (ADK) 等框架中,可以通过多智能体委托实现并行化,其中主协调器模型将不同的子任务分配给能够并行操作的专门智能体。
通过将并行处理与顺序(链式)和条件(路由)控制流相结合,可以构建复杂、高性能的计算系统,能够高效地管理多样化和复杂的任务。
参考文献
以下是有关并行化模式及相关概念的进一步阅读资源:
- LangChain 表达语言 (LCEL) 文档(并行化):https://python.langchain.com/docs/concepts/lcel/
- Google Agent Developer Kit (ADK) 文档(多智能体系统):https://google.github.io/adk-docs/agents/multi-agents/
- Python asyncio 文档:https://docs.python.org/3/library/asyncio.html