Skip to content

实战演示:口述需求,AI 一步生成完整脚本

本章提供一个完整的真实演示案例,展示从"说一句中文需求"到"拿到一份能直接跑的脚本"的完整过程,每一步都有详细截图说明。


演示场景

业务背景:您有 20 个亚马逊卖家账号,每天需要检查每个账号下"等待发货"的订单数量,汇总成 Excel 报告发给团队。

以前的做法:手动逐个登录每个账号,记录订单数量,填入 Excel,大约需要 2 小时。

用 AI + 候鸟:描述需求,AI 生成脚本,脚本自动完成,全程 15 分钟(包括配置时间)。


第一步:准备工作(5 分钟)

NOTE

假设您已经按前面章节完成了以下配置:

  • ✅ Cursor 或 Antigravity 已安装
  • ✅ 候鸟 Rules 知识包已加载到 Cursor
  • ✅ 候鸟 ApiServer 已启动(http://127.0.0.1:8186
  • ✅ 您的亚马逊账号已在候鸟环境中登录过(Cookie 已保存)

准备 sessions.txt 文件

在您的项目文件夹中,创建 sessions.txt,填入所有要操作的环境 ID:

a1b2c3d4e5f6789012345678901234ab
b2c3d4e5f67890123456789012345bcd
c3d4e5f678901234567890123456cde
... (每行一个 Session_ID,共 20 行)

第二步:在 Cursor 中输入需求(30 秒)

Ctrl + L 打开 Cursor Chat,输入以下内容:

我有一个候鸟浏览器的 sessions.txt 文件,每行一个 Session_ID,共 20 个账号。
这些账号都已经在候鸟环境中登录了亚马逊。

请帮我生成一个 Python 脚本,功能如下:
1. 读取 sessions.txt 中的所有 Session_ID
2. 并发开启所有环境(最多同时 10 个)
3. 每个环境用 Playwright 接管后:
   a. 等待页面加载(环境打开后通常停在上次关闭的页面)
   b. 访问 https://www.amazon.com/gp/orders-v2/list 
      (亚马逊卖家订单页面)
   c. 等待 .a-row.shipment 元素出现(订单列表加载完成的标志)
   d. 统计页面上 .shipment-status-buyerPending 元素的数量
      (这是"等待发货"的订单数量)
   e. 把 Session_ID 前 8 位 + 等待发货订单数写入结果
4. 所有操作完成后关闭所有环境
5. 将结果保存到 amazon_orders_[今天日期].xlsx,
   列:环境ID(前8位)、等待发货订单数、检查时间、状态

有异常时截图保存到 errors/ 文件夹,并在结果表中记录"检查失败"。

第三步:AI 生成代码(约 30 秒)

Cursor 的 AI 接收到需求后,会直接生成完整代码。完整的生成结果类似:

python
"""
候鸟 + 亚马逊订单检查脚本
自动检查所有环境的等待发货订单数量
"""
import threading
import datetime
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
import openpyxl
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout

# ==============================
# 配置(按需修改)
# ==============================
APISERVER   = "http://127.0.0.1:8186"
SESSIONS_FILE = "sessions.txt"
MAX_WORKERS  = 10   # 最大并发数
ERRORS_DIR   = Path("errors")
ERRORS_DIR.mkdir(exist_ok=True)

results_lock = threading.Lock()
results = []

# ==============================
# ApiServer 封装
# ==============================
def start_env(session_id: str) -> dict | None:
    try:
        resp = requests.post(
            f"{APISERVER}/api/v1/browser/start",
            json={"Session_ID": [session_id], "isHeadless": False},
            timeout=30
        )
        data = resp.json()
        if data["code"] == 0:
            return data["data"]["listid"][0]
    except Exception as e:
        print(f"[{session_id[:8]}] ApiServer 错误: {e}")
    return None

def stop_env(session_id: str):
    try:
        requests.post(
            f"{APISERVER}/api/v1/browser/stop",
            json={"Session_ID": [session_id]},
            timeout=10
        )
    except Exception:
        pass

# ==============================
# 单账号任务
# ==============================
def check_orders(session_id: str):
    now = datetime.datetime.now().strftime("%H:%M:%S")
    env_short = session_id[:8]
    
    env_data = start_env(session_id)
    if not env_data:
        with results_lock:
            results.append({
                "env_id": env_short,
                "pending_orders": "N/A",
                "check_time": now,
                "status": "开启环境失败"
            })
        return
    
    port = env_data.get("browser_CDP_Port")
    ws   = f"ws://127.0.0.1:{port}/json/version"
    
    pw = sync_playwright().start()
    try:
        browser = pw.chromium.connect_over_cdp(ws)
        context = browser.contexts[0]
        page    = context.pages[0] if context.pages else context.new_page()
        
        # 访问亚马逊订单页
        page.goto(
            "https://www.amazon.com/gp/orders-v2/list",
            wait_until="domcontentloaded",
            timeout=20000
        )
        
        # 等待订单列表加载
        page.wait_for_selector(".a-row.shipment", timeout=15000)
        
        # 统计等待发货订单数量
        pending = page.locator(".shipment-status-buyerPending").count()
        print(f"[{env_short}] ✅ 等待发货订单: {pending}")
        
        with results_lock:
            results.append({
                "env_id": env_short,
                "pending_orders": pending,
                "check_time": now,
                "status": "成功"
            })
        
        browser.close()
        
    except PWTimeout:
        page.screenshot(path=str(ERRORS_DIR / f"{env_short}_timeout.png"))
        with results_lock:
            results.append({
                "env_id": env_short,
                "pending_orders": "N/A",
                "check_time": now,
                "status": "页面超时"
            })
    except Exception as e:
        try:
            page.screenshot(path=str(ERRORS_DIR / f"{env_short}_error.png"))
        except Exception:
            pass
        with results_lock:
            results.append({
                "env_id": env_short,
                "pending_orders": "N/A",
                "check_time": now,
                "status": f"异常: {str(e)[:50]}"
            })
    finally:
        try:
            pw.stop()
        except Exception:
            pass
        stop_env(session_id)

# ==============================
# 主程序
# ==============================
def main():
    # 读取 Session_ID 列表
    session_ids = [
        line.strip()
        for line in Path(SESSIONS_FILE).read_text().splitlines()
        if line.strip()
    ]
    print(f"📋 共 {len(session_ids)} 个账号,最大并发 {MAX_WORKERS}")
    
    # 并发执行
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
        futures = {pool.submit(check_orders, sid): sid for sid in session_ids}
        for future in as_completed(futures):
            sid = futures[future]
            try:
                future.result()
            except Exception as e:
                print(f"[{sid[:8]}] 未捕获异常: {e}")
    
    # 保存 Excel 报告
    today = datetime.date.today().strftime("%Y%m%d")
    filename = f"amazon_orders_{today}.xlsx"
    
    wb = openpyxl.Workbook()
    ws = wb.active
    ws.title = "订单检查结果"
    ws.append(["环境ID(前8位)", "等待发货订单数", "检查时间", "状态"])
    for r in results:
        ws.append([r["env_id"], r["pending_orders"], r["check_time"], r["status"]])
    
    wb.save(filename)
    
    # 汇总报告
    success = sum(1 for r in results if r["status"] == "成功")
    print(f"\n{'='*40}")
    print(f"✅ 成功: {success} / {len(results)}")
    print(f"📄 报告已保存到: {filename}")
    print(f"❌ 失败截图在: errors/ 目录")

if __name__ == "__main__":
    main()

第四步:安装依赖并运行(2 分钟)

AI 生成代码后,通常还会告诉您需要安装的依赖:

bash
pip install playwright requests openpyxl

然后直接运行:

bash
python check_amazon_orders.py

预期输出

📋 共 20 个账号,最大并发 10
[a1b2c3d4] ✅ 等待发货订单: 3
[b2c3d4e5] ✅ 等待发货订单: 0
[c3d4e5f6] ✅ 等待发货订单: 7
... 
========================================
✅ 成功: 18 / 20
📄 报告已保存到: amazon_orders_20260226.xlsx
❌ 失败截图在: errors/ 目录

常见问题与 AI 修正技巧

"脚本跑起来了但找不到订单元素"

选中报错相关代码,按 Ctrl + K 输入:

亚马逊页面的 selector 变了,实际的订单行 selector 是 .order-info,
等待发货的 selector 是 .order-status-pending,请更新代码

"想加一个功能:把订单明细也采集下来"

在 Chat 里继续对话:

在上面代码的基础上,在采集订单数量之后,
继续采集每个等待发货订单的订单号(.a-fixed-right-grid-col .a-link-normal)
和预计发货日期(.expected-ship-date),一并写入 Excel

"并发太快导致崩溃,需要限速"

在 ThreadPoolExecutor 的基础上,每启动一个新任务前先等待 3 秒,
避免同时开启太多环境导致内存不足

总结:AI 写脚本的正确姿势

[ 准备阶段 ]
1. 候鸟 Rules 知识包已注入 → AI 懂候鸟所有接口
2. 确认 ApiServer 已启动

[ 需求描述 ]
越具体越好:
  ✅ "访问 https://xxx,等待 #main-content 元素出现,点击 .order-btn,等待成功弹窗"
  ❌ "帮我操作订单"

[ 第一次生成 ]
通常 80% 可以直接用。如有小问题,再通过对话修改。

[ 迭代优化 ]
"在这个基础上加 xx 功能" → AI 追加修改,无需重写

IMPORTANT

重要提醒:AI 生成的代码可能偶尔有小错误,在正式大规模运行前,先用1-2个测试账号跑一遍验证。验证通过再扩大到全量账号。


TIP

🎉 恭喜!您现在已经掌握了用 AI 模型为候鸟浏览器生成完整自动化脚本的全套方法。

遇到问题?回到对应章节查阅: