kevinnest LV
发表于 2025-4-9 18:04:28
嘿,各位!你们是不是也在捣鼓AI智能体呢?是不是跟我一样,每次想给智能体加点新技能,就得头疼地啃那些又臭又长的文档,然后对着代码改来改去,简直生无可恋?真是太痛苦了!不过,我发现了个超棒的东西——MCP服务器,简直就像游戏的“作弊码”一样!你只需要简单地把它们接入你的智能体,你的智能体立马就学会新本事了,再也不用费劲巴拉地搞代码了,简直不要太爽!这次和大家分享五款“神器”,赶紧一起来看看吧!
1. Firecrawl MCP 服务器: 网页数据抓取
有没有那么一瞬间,你特别希望你的AI智能体能够自己上网“冲浪”,帮你抓取各种有用的信息? Firecrawl MCP 就是为此而生的! 它是由 Firecrawl 公司开发的,而且在 GitHub 上也能找到它哦。 把它和像 Claude 或者 Cursor 这样的工具连接起来,你的智能体就能像个专业的网络高手一样,轻松抓取网页内容,爬遍整个网站,或者深入挖掘各种你想要的信息。
它能干啥?
- 轻轻松松抓取网页上的文字、图片各种内容。
- 像模像样地在各个网站之间“溜达”。
- 帮你把乱七八糟的数据整理干净,变成像 JSON 这样方便使用的格式。
我为什么喜欢它?
比如说,你想让你的智能体做点市场调研啥的。有了 Firecrawl,你就不用费劲自己写代码去抓网页了,直接用它就行,省时省力!给你看个例子,你就明白了:- // 完整的网页抓取功能示例,使用了假设的 firecrawl-mcp 库
- const firecrawl = require('firecrawl-mcp'); // 假设的 MCP 客户端库
- // 默认配置
- const DEFAULT_CONFIG = {
- TIMEOUT: 30000, // 默认超时时间:30秒
- MAX_RETRIES: 2, // 最大重试次数
- RETRY_DELAY: 1000, // 重试间隔:1秒 (毫秒)
- USER_AGENT: 'WebScraper/1.0' // 默认用户代理
- };
- /**
- * 抓取网站数据并返回结构化数据
- * @param {string} url - 要抓取的网址
- * @param {Object} [options] - 配置选项
- * @returns {Promise<Object>} 抓取到的数据
- */
- async function scrapeWebsite(url, options = {}) {
- // 合并默认配置和用户自定义配置
- const config = {
- timeout: options.timeout || DEFAULT_CONFIG.TIMEOUT,
- maxRetries: options.maxRetries || DEFAULT_CONFIG.MAX_RETRIES,
- retryDelay: options.retryDelay || DEFAULT_CONFIG.RETRY_DELAY,
- headers: {
- &#39;User-Agent&#39;: options.userAgent || DEFAULT_CONFIG.USER_AGENT,
- ...options.headers
- },
- format: options.format || &#39;json&#39;
- };
- // 输入验证
- if (!url || typeof url !== &#39;string&#39; || !isValidUrl(url)) {
- throw new Error(&#39;提供的URL无效&#39;);
- }
- let attempts = 0;
- let lastError;
- // 重试逻辑
- while (attempts <= config.maxRetries) {
- try {
- // 配置抓取选项
- const scrapeOptions = {
- url: url,
- format: config.format,
- timeout: config.timeout,
- headers: config.headers
- };
- // 执行抓取
- const result = await firecrawl.scrape(scrapeOptions);
- // 验证响应
- if (!result || !result.data) {
- throw new Error(&#39;抓取操作未返回任何数据&#39;);
- }
- console.log(`第 ${attempts + 1} 次尝试,成功抓取 ${url}`);
- return processScrapedData(result.data);
- } catch (error) {
- attempts++;
- lastError = error;
- // 处理特定错误情况
- if (error.code === &#39;ECONNREFUSED&#39; || error.code === &#39;ETIMEDOUT&#39;) {
- console.warn(`第 ${attempts} 次尝试,网络错误: ${error.message}`);
- } else if (error.status === 429) {
- console.warn(&#39;请求频率超限,正在重试...&#39;);
- }
- // 如果不是最后一次尝试,则等待后重试
- if (attempts <= config.maxRetries) {
- await new Promise(resolve => setTimeout(resolve, config.retryDelay));
- console.log(`正在重试 (第 ${attempts}/${config.maxRetries} 次)...`);
- continue;
- }
- }
- }
- throw new Error(`经过 ${config.maxRetries + 1} 次尝试后,抓取 ${url} 失败: ${lastError.message}`);
- }
- /**
- * 验证URL格式是否有效
- * @param {string} url - 要验证的URL
- * @returns {boolean}
- */
- function isValidUrl(url) {
- try {
- new URL(url);
- return true;
- } catch {
- return false;
- }
- }
- /**
- * 处理和标准化抓取到的数据
- * @param {Object} data - 原始抓取数据
- * @returns {Object} 处理后的数据
- */
- function processScrapedData(data) {
- // 可以根据需要添加数据标准化或转换步骤
- return {
- ...data,
- timestamp: new Date().toISOString(),
- sourceUrl: data.url || &#39;unknown&#39;
- };
- }
- /**
- * 主执行函数,包含使用示例
- */
- async function main() {
- const targetUrl = &#39;https://example.com&#39;;
- // 自定义配置
- const scrapeOptions = {
- timeout: 60000, // 超时时间:60秒
- maxRetries: 3, // 最大重试次数:3次
- retryDelay: 2000, // 重试间隔:2秒
- userAgent: &#39;CustomScraper/1.0&#39;,
- headers: {
- &#39;Accept&#39;: &#39;application/json&#39;
- }
- };
- try {
- console.log(`开始抓取 ${targetUrl}`);
- const scrapedData = await scrapeWebsite(targetUrl, scrapeOptions);
- // 打印抓取结果
- console.log(&#39;抓取到的内容:&#39;, JSON.stringify(scrapedData, null, 2));
- // 示例:访问特定数据
- if (scrapedData.title) {
- console.log(&#39;页面标题:&#39;, scrapedData.title);
- }
- } catch (error) {
- console.error(&#39;抓取失败:&#39;, {
- message: error.message,
- stack: error.stack,
- url: targetUrl
- });
- process.exit(1); // 以错误代码退出
- }
- }
- // 如果直接运行此脚本
- if (require.main === module) {
- main().then(() => {
- console.log(&#39;网页抓取成功完成&#39;);
- process.exit(0);
- });
- }
- // 导出模块,方便其他地方使用
- module.exports = {
- scrapeWebsite,
- isValidUrl,
- DEFAULT_CONFIG
- };
复制代码 怎么样,你的智能体是不是瞬间变得超能了? 简直不要太轻松!
2. Browserbase MCP 服务器: 浏览器,随叫随到!
想象一下:你的AI智能体,它竟然能自己打开浏览器,像模像样地点击各种链接,甚至还能给你截个网页的屏幕快照! 没错,Browserbase MCP 就能帮你实现这个神奇的功能!有了它,就好像给你的智能体配了个超贴心的“网上冲浪”小助手一样。
它能做些啥?
- 一键启动一个真正的浏览器。
- 想去哪个网站,随便你定。
- 网页截图。
它牛在哪儿?
比如说,你想监控某个网站的内容有没有更新。 有了 Browserbase MCP,你的智能体就能自己“溜”进浏览器,到处看看,然后把网页截图给你发回来,是不是很方便?- // 完整的基于浏览器的页面捕获功能示例,使用了假设的 browserbase-mcp 库
- const browserbase = require(&#39;browserbase-mcp&#39;); // 假设的 MCP 客户端库
- // 默认配置
- const DEFAULT_CONFIG = {
- TIMEOUT: 30000, // 默认超时时间:30秒
- MAX_RETRIES: 2, // 最大重试次数
- RETRY_DELAY: 1000, // 重试间隔:1秒 (毫秒)
- SCREENSHOT_FORMAT: &#39;png&#39;, // 默认截图格式
- VIEWPORT: { // 默认视窗大小
- width: 1280,
- height: 720
- }
- };
- /**
- * 捕获网页并返回截图信息
- * @param {string} url - 要捕获的网址
- * @param {Object} [options] - 配置选项
- * @returns {Promise<Object>} 截图详细信息
- */
- async function capturePage(url, options = {}) {
- // 合并默认配置和用户自定义配置
- const config = {
- timeout: options.timeout || DEFAULT_CONFIG.TIMEOUT,
- maxRetries: options.maxRetries || DEFAULT_CONFIG.MAX_RETRIES,
- retryDelay: options.retryDelay || DEFAULT_CONFIG.RETRY_DELAY,
- format: options.format || DEFAULT_CONFIG.SCREENSHOT_FORMAT,
- viewport: options.viewport || DEFAULT_CONFIG.VIEWPORT,
- headless: options.headless !== false // 默认为无头模式
- };
- // 输入验证
- if (!url || typeof url !== &#39;string&#39; || !isValidUrl(url)) {
- throw new Error(&#39;提供的URL无效&#39;);
- }
- let session;
- let attempts = 0;
- let lastError;
- // 重试逻辑
- while (attempts <= config.maxRetries) {
- try {
- // 创建浏览器会话
- session = await browserbase.createSession({
- timeout: config.timeout,
- headless: config.headless,
- viewport: config.viewport
- });
- if (!session) {
- throw new Error(&#39;创建浏览器会话失败&#39;);
- }
- console.log(`第 ${attempts + 1} 次尝试,正在访问 ${url}`);
- await browserbase.navigate(session, url, { timeout: config.timeout });
- // 等待页面加载完成 (可选,根据需要调整)
- await new Promise(resolve => setTimeout(resolve, 1000));
- // 截取屏幕截图
- const screenshot = await browserbase.takeScreenshot(session, {
- format: config.format,
- fullPage: options.fullPage || false
- });
- if (!screenshot || !screenshot.filePath) {
- throw new Error(&#39;截图失败&#39;);
- }
- const result = processScreenshotData(screenshot);
- console.log(&#39;截图已保存至:&#39;, result.filePath);
- return result;
- } catch (error) {
- attempts++;
- lastError = error;
- // 处理特定错误情况
- if (error.code === &#39;TIMEOUT&#39; || error.code === &#39;NETWORK_ERROR&#39;) {
- console.warn(`第 ${attempts} 次尝试,浏览器错误: ${error.message}`);
- } else if (error.message.includes(&#39;session&#39;)) {
- console.warn(&#39;会话创建失败&#39;);
- }
- // 清理会话(如果已创建)
- if (session) {
- await cleanupSession(session).catch(cleanupError => {
- console.warn(&#39;会话清理失败:&#39;, cleanupError.message);
- });
- }
- // 如果不是最后一次尝试,则等待后重试
- if (attempts <= config.maxRetries) {
- await new Promise(resolve => setTimeout(resolve, config.retryDelay));
- console.log(`正在重试 (第 ${attempts}/${config.maxRetries} 次)...`);
- continue;
- }
- }
- }
- throw new Error(`经过 ${config.maxRetries + 1} 次尝试后,捕获 ${url} 失败: ${lastError.message}`);
- }
- /**
- * 清理浏览器会话
- * @param {Object} session - 要清理的浏览器会话
- */
- async function cleanupSession(session) {
- if (session) {
- await browserbase.closeSession(session);
- }
- }
- /**
- * 验证URL格式是否有效
- * @param {string} url - 要验证的URL
- * @returns {boolean}
- */
- function isValidUrl(url) {
- try {
- new URL(url);
- return true;
- } catch {
- return false;
- }
- }
- /**
- * 处理和标准化截图数据
- * @param {Object} screenshot - 原始截图数据
- * @returns {Object} 处理后的截图数据
- */
- function processScreenshotData(screenshot) {
- return {
- filePath: screenshot.filePath,
- format: screenshot.format,
- timestamp: new Date().toISOString(),
- size: screenshot.size || null,
- url: screenshot.url || &#39;unknown&#39;
- };
- }
- /**
- * 主执行函数,包含使用示例
- */
- async function main() {
- const targetUrl = &#39;https://example.com&#39;;
- // 自定义配置
- const captureOptions = {
- timeout: 60000, // 超时时间:60秒
- maxRetries: 3, // 最大重试次数:3次
- retryDelay: 2000, // 重试间隔:2秒
- format: &#39;jpeg&#39;, // 自定义格式:jpeg
- viewport: { // 自定义视窗大小
- width: 1920,
- height: 1080
- },
- fullPage: true, // 捕获完整页面
- headless: true // 运行在无头模式
- };
- try {
- console.log(`开始捕获 ${targetUrl}`);
- const screenshotData = await capturePage(targetUrl, captureOptions);
- // 打印截图详细信息
- console.log(&#39;截图详细信息:&#39;, JSON.stringify(screenshotData, null, 2));
- // 示例:访问特定数据
- console.log(&#39;文件位置:&#39;, screenshotData.filePath);
- console.log(&#39;捕获时间:&#39;, screenshotData.timestamp);
- } catch (error) {
- console.error(&#39;页面捕获失败:&#39;, {
- message: error.message,
- stack: error.stack,
- url: targetUrl
- });
- process.exit(1); // 以错误代码退出
- }
- }
- // 如果直接运行此脚本
- if (require.main === module) {
- main().then(() => {
- console.log(&#39;页面捕获成功完成&#39;);
- process.exit(0);
- });
- }
- // 导出模块,方便其他地方使用
- module.exports = {
- capturePage,
- isValidUrl,
- DEFAULT_CONFIG
- };
复制代码 使用代码方法:- // 作为脚本运行
- node capture.js
- // 作为模块导入
- const { capturePage } = require(&#39;./capture&#39;);
- await capturePage(&#39;https://example.com&#39;);
复制代码 温馨提示:
- 请根据你实际使用的库调整API调用。
- 根据你的具体需求修改 processScreenshotData 函数。
你的智能体现在基本上可以像人一样浏览网页了,是不是很神奇?
3. Opik MCP 服务器: 给你的AI智能体装上“监控”
有没有好奇过,你的AI智能体平时都在干些啥? Opik MCP,就像一个“追踪器”一样。 它可以密切关注你的AI智能体的一举一动,然后把“一手情报”都告诉你。
它有啥本事?
- 帮你创建项目,方便管理。
- 追踪你的智能体的每一个操作步骤。
- 给你展示各种统计数据,让你对智能体的表现一目了然。
它为啥实用?
如果你的智能体突然“抽风”,表现得不太正常,Opik 就能帮你找出问题所在。 用起来大概是这个样子:- #!/usr/bin/env python3
- &#34;&#34;&#34;
- 完整的追踪功能示例,使用了假设的 Opik MCP 客户端库
- &#34;&#34;&#34;
- import os
- import time
- import logging
- from typing import Optional, Dict, Any
- from opik_mcp import OpikClient # 假设的 MCP 客户端库
- from datetime import datetime
- # 配置日志
- logging.basicConfig(
- level=logging.INFO,
- format=&#39;%(asctime)s - %(levelname)s - %(message)s&#39;
- )
- logger = logging.getLogger(__name__)
- # 默认配置常量
- class Config:
- DEFAULT_API_KEY = os.getenv(&#39;OPIK_API_KEY&#39;, &#39;your-api-key&#39;)
- DEFAULT_TIMEOUT = 30 # 秒
- RETRY_ATTEMPTS = 2
- RETRY_DELAY = 1 # 秒
- class OpikTracer:
- &#34;&#34;&#34;Opik MCP 操作的封装类&#34;&#34;&#34;
- def __init__(self, api_key: str = Config.DEFAULT_API_KEY, timeout: int = Config.DEFAULT_TIMEOUT):
- &#34;&#34;&#34;使用配置初始化 Opik 客户端&#34;&#34;&#34;
- if not api_key or api_key == &#39;your-api-key&#39;:
- raise ValueError(&#34;必须提供有效的 API 密钥&#34;)
- self.client = OpikClient(api_key=api_key, timeout=timeout)
- self.projects = {}
- self.timeout = timeout
- def create_project(self, project_name: str) -> Any:
- &#34;&#34;&#34;创建项目或获取现有项目,带重试逻辑&#34;&#34;&#34;
- if project_name in self.projects:
- return self.projects[project_name]
- attempts = 0
- while attempts <= Config.RETRY_ATTEMPTS:
- try:
- project = self.client.create_project(project_name)
- if not project:
- raise ValueError(f&#34;创建项目失败: {project_name}&#34;)
- self.projects[project_name] = project
- logger.info(f&#34;已创建项目: {project_name}&#34;)
- return project
- except Exception as e:
- attempts += 1
- if attempts > Config.RETRY_ATTEMPTS:
- logger.error(f&#34;经过 {Config.RETRY_ATTEMPTS + 1} 次尝试后,创建项目失败: {str(e)}&#34;)
- raise
- logger.warning(f&#34;第 {attempts} 次项目创建尝试失败: {str(e)}&#34;)
- time.sleep(Config.RETRY_DELAY)
- def trace_action(self, project_name: str, trace_name: str, metadata: Optional[Dict] = None) -> None:
- &#34;&#34;&#34;追踪一个动作,包含错误处理和元数据&#34;&#34;&#34;
- project = self.create_project(project_name)
- try:
- trace = self.client.start_trace(project, trace_name)
- if not trace:
- raise ValueError(f&#34;启动追踪失败: {trace_name}&#34;)
- start_time = time.time()
- trace.log(f&#34;开始 {trace_name},时间: {datetime.now().isoformat()}&#34;)
- if metadata:
- trace.log(f&#34;元数据: {metadata}&#34;)
- yield trace # 允许上下文管理器使用
- duration = time.time() - start_time
- trace.log(f&#34;耗时 {duration:.2f} 秒&#34;)
- trace.end()
- logger.info(f&#34;追踪 {trace_name} 成功完成&#34;)
- except Exception as e:
- logger.error(f&#34;追踪 {trace_name} 失败: {str(e)}&#34;)
- if &#39;trace&#39; in locals():
- trace.log(f&#34;错误: {str(e)}&#34;)
- trace.end(status=&#34;failed&#34;)
- raise
- def get_stats(self, project_name: str) -> Dict:
- &#34;&#34;&#34;获取项目统计数据,带验证&#34;&#34;&#34;
- project = self.create_project(project_name)
- attempts = 0;
- while attempts <= Config.RETRY_ATTEMPTS:
- try:
- stats = self.client.get_trace_stats(project)
- if not stats:
- raise ValueError(&#34;未返回任何统计数据&#34;)
- return self._process_stats(stats)
- except Exception as e:
- attempts += 1
- if attempts > Config.RETRY_ATTEMPTS:
- logger.error(f&#34;经过 {Config.RETRY_ATTEMPTS + 1} 次尝试后,获取统计数据失败: {str(e)}&#34;)
- raise
- logger.warning(f&#34;第 {attempts} 次统计数据获取尝试失败: {str(e)}&#34;)
- time.sleep(Config.RETRY_DELAY)
- def _process_stats(self, stats: Dict) -> Dict:
- &#34;&#34;&#34;处理和标准化统计数据&#34;&#34;&#34;
- return {
- &#34;project&#34;: stats.get(&#34;project_name&#34;, &#34;unknown&#34;),
- &#34;total_traces&#34;: stats.get(&#34;total_traces&#34;, 0),
- &#34;avg_duration&#34;: stats.get(&#34;avg_duration&#34;, 0.0),
- &#34;success_rate&#34;: stats.get(&#34;success_rate&#34;, 0.0),
- &#34;timestamp&#34;: datetime.now().isoformat()
- }
- def main():
- &#34;&#34;&#34;主执行函数,包含使用示例&#34;&#34;&#34;
- try:
- # 初始化追踪器
- tracer = OpikTracer(
- api_key=Config.DEFAULT_API_KEY,
- timeout=60 # 自定义超时时间
- )
- # 项目设置
- project_name = &#34;MyAIProject&#34;
- # 使用上下文管理器的追踪示例
- with tracer.trace_action(project_name, &#34;DecisionStep&#34;, {&#34;input_size&#34;: 1024}) as trace:
- trace.log(&#34;正在处理输入...&#34;)
- time.sleep(1) # 模拟工作
- trace.log(&#34;分析完成&#34;)
- # 获取并显示统计数据
- stats = tracer.get_stats(project_name)
- logger.info(f&#34;性能统计数据: {stats}&#34;)
- except Exception as e:
- logger.error(f&#34;执行失败: {str(e)}&#34;)
- exit(1)
- if __name__ == &#34;__main__&#34;:
- main()
复制代码 使用代码方法:- # 作为脚本运行
- python trace.py
- # 作为模块导入
- from trace import OpikTracer
- tracer = OpikTracer(api_key=&#34;your-actual-key&#34;)
- with tracer.trace_action(&#34;MyProject&#34;, &#34;TestStep&#34;) as trace:
- trace.log(&#34;正在工作...&#34;)
- stats = tracer.get_stats(&#34;MyProject&#34;)
复制代码 请注意:
- 把代码里的 &#39;opik_mcp&#39; 替换成你实际用的库的名字。
- 记得设置 OPIK_API_KEY 环境变量,或者直接把 API 密钥传进去。
- _process_stats 函数要根据实际的统计数据格式来调整。
现在,你的智能体就像有了“眼睛”,你随时都能掌握它的动态了,是不是超实用?
4. Brave MCP 服务器: 搜索,就是要更聪明!
搜索功能对AI智能体来说,那是相当重要的! Brave MCP 服务器就能让你的智能体接入 Brave 搜索引擎的 API。 无论是想在网上搜信息,还是想找本地文件,它都能帮你搞定。
它能做啥?
- 帮你“冲浪”搜索全网信息。
- 还能在你自己的电脑里“翻箱倒柜”找东西。
我为啥觉得它赞?
是不是有时候你的智能体需要快速查点资料? 用 Brave MCP 服务器就能轻松实现搜索功能,代码大概是这样写的:- // 完整的网页搜索功能示例,使用了假设的 brave-mcp 库
- const brave = require(&#39;brave-mcp&#39;); // 假设的 MCP 客户端库
- // 默认配置
- const DEFAULT_CONFIG = {
- TIMEOUT: 10000, // 默认超时时间:10秒
- MAX_RETRIES: 3, // 最大重试次数
- RETRY_DELAY: 500, // 重试间隔:0.5秒 (毫秒)
- MAX_RESULTS: 5, // 默认最大结果数
- LANGUAGE: &#39;en&#39; // 默认语言:英语
- };
- /**
- * 使用 Brave MCP 执行网页搜索
- * @param {string} query - 搜索关键词
- * @param {Object} [options] - 配置选项
- * @returns {Promise<Object[]>} 搜索结果数组
- */
- async function searchWeb(query, options = {}) {
- // 合并默认配置和用户自定义配置
- const config = {
- timeout: options.timeout || DEFAULT_CONFIG.TIMEOUT,
- maxRetries: options.maxRetries || DEFAULT_CONFIG.MAX_RETRIES,
- retryDelay: options.retryDelay || DEFAULT_CONFIG.RETRY_DELAY,
- maxResults: options.maxResults || DEFAULT_CONFIG.MAX_RESULTS,
- language: options.language || DEFAULT_CONFIG.LANGUAGE,
- safeSearch: options.safeSearch || &#39;moderate&#39; // 默认安全搜索级别:适中
- };
- // 输入验证
- if (!query || typeof query !== &#39;string&#39; || query.trim().length === 0) {
- throw new Error(&#39;提供的搜索关键词无效或为空&#39;);
- }
- let attempts = 0;
- let lastError;
- // 重试逻辑
- while (attempts <= config.maxRetries) {
- try {
- // 配置搜索参数
- const searchParams = {
- query: query.trim(),
- maxResults: config.maxResults,
- timeout: config.timeout,
- language: config.language,
- safeSearch: config.safeSearch
- };
- console.log(`正在搜索关键词 &#34;${query}&#34; (第 ${attempts + 1} 次尝试)`);
- const results = await brave.webSearch(searchParams);
- // 验证响应
- if (!results || !Array.isArray(results)) {
- throw new Error(&#39;搜索结果格式无效&#39;);
- }
- const processedResults = processSearchResults(results);
- console.log(`找到 ${processedResults.length} 条结果`);
- return processedResults;
- } catch (error) {
- attempts++;
- lastError = error;
- // 处理特定错误情况
- if (error.code === &#39;ETIMEDOUT&#39; || error.code === &#39;ENOTFOUND&#39;) {
- console.warn(`第 ${attempts} 次尝试,网络错误: ${error.message}`);
- } else if (error.status === 429) {
- console.warn(&#39;请求频率超限,正在重试...&#39;);
- }
- // 如果不是最后一次尝试,则等待后重试
- if (attempts <= config.maxRetries) {
- await new Promise(resolve => setTimeout(resolve, config.retryDelay));
- console.log(`正在重试 (第 ${attempts}/${config.maxRetries} 次)...`);
- continue;
- }
- }
- }
- throw new Error(`经过 ${config.maxRetries + 1} 次尝试后,搜索关键词 &#34;${query}&#34; 失败: ${lastError.message}`);
- }
- /**
- * 处理和标准化搜索结果
- * @param {Object[]} results - 原始搜索结果数组
- * @returns {Object[]} 处理后的结果数组
- */
- function processSearchResults(results) {
- return results.map((result, index) => ({
- id: result.id || index,
- title: result.title || &#39;无标题&#39;,
- url: result.url || &#39;无URL&#39;,
- snippet: result.snippet || result.description || &#39;无描述&#39;,
- timestamp: new Date().toISOString(),
- source: result.source || &#39;未知来源&#39;
- }));
- }
- /**
- * 主执行函数,包含使用示例
- */
- async function main() {
- const searchQuery = &#39;最新AI趋势&#39;;
- // 自定义配置
- const searchOptions = {
- timeout: 15000, // 超时时间:15秒
- maxRetries: 4, // 最大重试次数:4次
- retryDelay: 1000, // 重试间隔:1秒
- maxResults: 10, // 获取10条结果
- language: &#39;en&#39;, // 结果语言:英语
- safeSearch: &#39;strict&#39; // 安全搜索级别:严格
- };
- try {
- console.log(`开始搜索关键词 &#34;${searchQuery}&#34;`);
- const results = await searchWeb(searchQuery, searchOptions);
- // 打印搜索结果 (JSON格式)
- console.log(&#39;搜索结果:&#39;, JSON.stringify(results, null, 2));
- // 示例:访问特定数据并打印
- results.forEach((result, index) => {
- console.log(`结果 ${index + 1}:`);
- console.log(`- 标题: ${result.title}`);
- console.log(`- URL: ${result.url}`);
- console.log(`- 摘要: ${result.snippet}`);
- console.log(&#39;---&#39;);
- });
- } catch (error) {
- console.error(&#39;搜索失败:&#39;, {
- message: error.message,
- stack: error.stack,
- query: searchQuery
- });
- process.exit(1); // 以错误代码退出
- }
- }
- // 如果直接运行此脚本
- if (require.main === module) {
- main().then(() => {
- console.log(&#39;搜索成功完成&#39;);
- process.exit(0);
- });
- }
- // 导出模块,方便其他地方使用
- module.exports = {
- searchWeb,
- DEFAULT_CONFIG,
- processSearchResults
- };
复制代码 使用代码方法:- // 作为脚本运行
- node search.js
- // 作为模块导入
- const { searchWeb } = require(&#39;./search&#39;);
- await searchWeb(&#39;最新AI趋势&#39;);
复制代码 温馨提示: 记得根据你实际用的 Brave Search API 或者类似的搜索服务来调整代码里的 API 调用。 processSearchResults 函数也要根据你的具体需求和实际返回的数据格式来修改。 可能你还需要先安装一下这个包: npm install brave-mcp。
有了它,你的智能体就好像有了自己的“谷歌”,是不是超值?
5. Sequential Thinking MCP 服务器: 步步为营,让思考更深入
有时候,你的智能体不能光靠“猜”,得学会像人一样思考问题。 Sequential Thinking MCP 服务器就能帮它把复杂的问题拆成一个个小步骤,然后一步一步地想明白。
它是干啥的?
- 把复杂的大问题拆分成很多小步骤。
- 让智能体在解决问题的过程中,一步一步地深入思考。
它厉害在哪儿?
就拿解数学题来说吧,你的智能体可以用 Sequential Thinking MCP 服务器,像下面这样一步一步地把题解出来:- #!/usr/bin/env python3
- &#34;&#34;&#34;
- 完整的顺序思考功能示例,使用了假设的 sequential_thinking_mcp 库
- &#34;&#34;&#34;
- import logging
- from typing import List, Optional, Union
- from sequential_thinking_mcp import Thinker # 假设的 MCP 客户端库
- from datetime import datetime
- # 配置日志
- logging.basicConfig(
- level=logging.INFO,
- format=&#39;%(asctime)s - %(levelname)s - %(message)s&#39;
- )
- logger = logging.getLogger(__name__)
- # 默认配置常量
- class Config:
- MAX_STEPS = 20 # 最大步骤数,防止无限循环
- TIMEOUT = 30 # 每次操作的超时时间,单位:秒
- DEFAULT_FORMAT = &#39;text&#39; # 默认输出格式:文本
- class SequentialSolver:
- &#34;&#34;&#34;顺序思考操作的封装类&#34;&#34;&#34;
- def __init__(self, timeout: int = Config.TIMEOUT, max_steps: int = Config.MAX_STEPS):
- &#34;&#34;&#34;使用配置初始化 Thinker&#34;&#34;&#34;
- try:
- self.thinker = Thinker(timeout=timeout)
- self.max_steps = max_steps
- self.timeout = timeout
- except Exception as e:
- logger.error(f&#34;初始化 Thinker 失败: {str(e)}&#34;)
- raise
- def solve(self, problem: str, steps: bool = True, output_format: str = Config.DEFAULT_FORMAT) -> Union[List[str], str]:
- &#34;&#34;&#34;
- 使用顺序思考步骤解决问题
- Args:
- problem: 要解决的问题
- steps: 是否返回步骤式的解答
- output_format: 输出格式 (&#39;text&#39;, &#39;json&#39; 等)
- Returns:
- 根据 steps 参数返回步骤列表或最终答案
- &#34;&#34;&#34;
- // 输入验证
- if not problem or not isinstance(problem, str):
- raise ValueError(&#34;必须提供有效的问题字符串&#34;)
- try:
- logger.info(f&#34;开始解决问题: {problem}&#34;)
- solution = self.thinker.solve(
- problem=problem,
- steps=steps,
- max_steps=self.max_steps
- )
- if not solution:
- raise ValueError(&#34;未返回任何解答&#34;)
- if steps:
- processed_steps = self._process_steps(solution, output_format)
- logger.info(f&#34;在 {len(processed_steps)} 步内找到解答&#34;)
- return processed_steps
- else:
- result = self._process_result(solution, output_format)
- logger.info(f&#34;找到解答: {result}&#34;)
- return result
- except Exception as e:
- logger.error(f&#34;解决问题 &#39;{problem}&#39; 失败: {str(e)}&#34;)
- raise
- def _process_steps(self, steps: List, format: str) -> List[str]:
- &#34;&#34;&#34;处理和格式化解答步骤&#34;&#34;&#34;
- if not isinstance(steps, (list, tuple)):
- raise ValueError(&#34;步骤必须是列表或元组类型&#34;)
- processed = []
- for i, step in enumerate(steps[:self.max_steps], 1):
- if format == &#39;json&#39;:
- processed.append({
- &#34;step&#34;: i,
- &#34;description&#34;: str(step),
- &#34;timestamp&#34;: datetime.now().isoformat()
- })
- else:
- processed.append(f&#34;步骤 {i}: {str(step)}&#34;)
- return processed
- def _process_result(self, result: Any, format: str) -> str:
- &#34;&#34;&#34;处理和格式化最终结果&#34;&#34;&#34;
- if format == &#39;json&#39;:
- return {
- &#34;result&#34;: str(result),
- &#34;timestamp&#34;: datetime.now().isoformat()
- }
- return str(result)
- def validate_solution(self, problem: str, solution: Union[List[str], str]) -> bool:
- &#34;&#34;&#34;验证解答 (基础实现)&#34;&#34;&#34;
- try:
- if isinstance(solution, list):
- final_step = solution[-1].lower()
- // 对代数问题进行基本检查
- if &#39;=&#39; in problem and &#39;x =&#39; in final_step:
- return True
- return bool(solution)
- except Exception as e:
- logger.warning(f&#34;解答验证失败: {str(e)}&#34;)
- return False
- def main():
- &#34;&#34;&#34;主执行函数,包含使用示例&#34;&#34;&#34;
- try:
- # 初始化解题器
- solver = SequentialSolver(
- timeout=60, # 自定义超时时间
- max_steps=10 # 自定义最大步骤数
- )
- # 示例问题
- problem = &#34;解方程 2x + 3 = 7&#34;
- # 步骤式解题
- logger.info(f&#34;正在解题: {problem}&#34;)
- steps = solver.solve(problem, steps=True, output_format=&#39;text&#39;)
- # 打印解题步骤
- for step in steps:
- print(step)
- # 验证解答
- if solver.validate_solution(problem, steps):
- logger.info(&#34;解答似乎有效&#34;)
- else:
- logger.warning(&#34;解答验证失败&#34;)
- # 非步骤式解题示例
- result = solver.solve(problem, steps=False)
- print(f&#34;\n最终答案: {result}&#34;)
- except Exception as e:
- logger.error(f&#34;执行失败: {str(e)}&#34;)
- exit(1)
- if __name__ == &#34;__main__&#34;:
- main()
复制代码 使用代码方法:- # 作为脚本运行
- python solver.py
- # 作为模块导入
- from solver import SequentialSolver
- solver = SequentialSolver()
- steps = solver.solve(&#34;Solve 2x + 3 = 7&#34;, steps=True)
- for step in steps:
- print(step)
复制代码 温馨提示:记得根据你实际用的库来调整代码里的 API 调用。 _process_steps 和 _process_result 这两个函数也要根据你的实际输出需求来改。 如果你想让智能体更智能地验证答案,可以改进 validate_solution 函数。 还有,max_steps 和 timeout 这两个参数,也要根据问题的复杂程度来调整。
有了这个 Sequential Thinking MCP 服务器,你的智能体就好像有了一个“笔记本”,可以用来一步一步地思考问题了,是不是超级给力?
这些 MCP 服务器,简直就是给你的AI智能体开“外挂”! 再也不用为各种功能苦哈哈地写代码了,直接“插”上这些服务器,你的AI智能体就能瞬间拥有网页抓取、浏览器操作、行为追踪、智能搜索、深入思考等等各种超能力,简直是如有神助!
<hr/>我会定期更新干货和学习笔记。喜欢的话,记得点个关注 ,不错过后续精彩内容!
|
|