目 录CONTENT

文章目录

闲鱼对接chatgpt,千问,实现自动智能回复!

JIN
JIN
2025-05-22 / 0 评论 / 0 点赞 / 13 阅读 / 0 字
广告 广告

闲鱼对接chatgpt,千问,实现自动智能回复!


🧩 第一部分:准备工作(1/4)

✅ 1. 购买 VPS + 安装 Linux(Ubuntu 22.04 推荐)

可以使用 RackNerd、CloudCone 等廉价服务,系统选 Ubuntu(Debian 也行)。

✅ 2. 使用 MobaXterm、XShell、SSH 登录到 VPS

比如你登录成功后看到:

root@your-hostname:~#

🧩 第二部分:安装依赖环境(2/4)

✅ 3. 安装 Python 和常用工具(如未安装)

apt update && apt install -y python3 python3-pip git

✅ 4. 安装 Node.js(为了构建用到的组件)

curl -fsSL https://deb.nodesource.com/setup_18.x | bash -
apt install -y nodejs

✅ 5. 安装依赖包管理器 pipenv(可选)

推荐用 pip,无需 pipenv:

pip install -r requirements.txt

🧩 第三部分:部署项目并配置(3/4)

✅ 6. 克隆项目

git clone https://github.com/shaxiu/XianyuAutoAgent.git
cd XianyuAutoAgent

✅ 7. 安装依赖

pip install -r requirements.txt

⚠️ 出现 openai SDK 报错?见第 10 步处理!


✅ 8. 准备提示词(Prompt)

cd prompts
mv classify_prompt_example.txt classify_prompt.txt
mv default_prompt_example.txt default_prompt.txt
mv price_prompt_example.txt price_prompt.txt
mv tech_prompt_example.txt tech_prompt.txt
cd ..

有些版本中已经是改好名的,直接 ls prompts 确认即可。


✅ 9. 配置 .env 文件

新建 .env 文件:

nano .env

粘贴以下内容并替换你自己的:

API_KEY=sk-你的Moonshot密钥
COOKIES_STR=你从浏览器F12复制的一整串cookie
MODEL_BASE_URL=https://api.moonshot.cn/v1
MODEL_NAME=moonshot-v1-8k

保存退出(Ctrl + O 回车,Ctrl + X)。


✅ 10. 替换 OpenAI SDK 版本为 Moonshot 兼容老版(关键!)

pip uninstall openai -y
pip install openai==0.28.1

✅ 11. 替换 XianyuAgent.py 内容

用我提供的 Moonshot 兼容版本覆盖,文件中包括:(见我这篇文章获取)

  • openai.api_key = ...
  • openai.api_base = ...
  • 使用 openai.ChatCompletion.create(...)

🧩 第四部分:运行与守护(4/4)

✅ 12. 测试运行

python3 main.py

看到日志中出现:

用户: XXX 发送消息: XXX
机器人回复: XXX

就说明部署成功 ✅


✅ 13. 后台守护运行(推荐)

cd ~/XianyuAutoAgent
nohup python3 main.py > out.log 2>&1 &
tail -f out.log

✅ 14. 可选:做成 systemd 开机启动


systemd 自动启动教程 + 多号并发部署方案 + 日志清理建议
等我更新。。。。。。。。。。

✅ 补充部分

补充1、 mian.py 代码

import base64
import json
import asyncio
import time
import os
import websockets
from loguru import logger
from dotenv import load_dotenv
from XianyuApis import XianyuApisfrom utils.xianyu_utils import generate_mid, generate_uuid, trans_cookies, generate_device_id, decrypt
from XianyuAgent import XianyuReplyBot
from context_manager import ChatContextManagerclass XianyuLive:
def init(self, cookies_str):
self.xianyu = XianyuApis()
self.base_url = 'wss://wss-goofish.dingtalk.com/'
self.cookies_str = cookies_str
self.cookies = trans_cookies(cookies_str)
self.xianyu.session.cookies.update(self.cookies)  # 直接使用 session.cookies.update
self.myid = self.cookies['unb']
self.device_id = generate_device_id(self.myid)
self.context_manager = ChatContextManager()    # 心跳相关配置
    self.heartbeat_interval = 15  # 心跳间隔15秒
    self.heartbeat_timeout = 5    # 心跳超时5秒
    self.last_heartbeat_time = 0
    self.last_heartbeat_response = 0
    self.heartbeat_task = None
    self.ws = None

async def send_msg(self, ws, cid, toid, text):
    text = {
        "contentType": 1,
        "text": {
            "text": text
        }
    }
    text_base64 = str(base64.b64encode(json.dumps(text).encode('utf-8')), 'utf-8')
    msg = {
        "lwp": "/r/MessageSend/sendByReceiverScope",
        "headers": {
            "mid": generate_mid()
        },
        "body": [
            {
                "uuid": generate_uuid(),
                "cid": f"{cid}@goofish",
                "conversationType": 1,
                "content": {
                    "contentType": 101,
                    "custom": {
                        "type": 1,
                        "data": text_base64
                    }
                },
                "redPointPolicy": 0,
                "extension": {
                    "extJson": "{}"
                },
                "ctx": {
                    "appVersion": "1.0",
                    "platform": "web"
                },
                "mtags": {},
                "msgReadStatusSetting": 1
            },
            {
                "actualReceivers": [
                    f"{toid}@goofish",
                    f"{self.myid}@goofish"
                ]
            }
        ]
    }
    await ws.send(json.dumps(msg))

async def init(self, ws):
    token = self.xianyu.get_token(self.device_id)['data']['accessToken']
    msg = {
        "lwp": "/reg",
        "headers": {
            "cache-header": "app-key token ua wv",
            "app-key": "444e9908a51d1cb236a27862abc769c9",
            "token": token,
            "ua": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36 DingTalk(2.1.5) OS(Windows/10) Browser(Chrome/133.0.0.0) DingWeb/2.1.5 IMPaaS DingWeb/2.1.5",
            "dt": "j",
            "wv": "im:3,au:3,sy:6",
            "sync": "0,0;0;0;",
            "did": self.device_id,
            "mid": generate_mid()
        }
    }
    await ws.send(json.dumps(msg))
    # 等待一段时间,确保连接注册完成
    await asyncio.sleep(1)
    msg = {"lwp": "/r/SyncStatus/ackDiff", "headers": {"mid": "5701741704675979 0"}, "body": [
        {"pipeline": "sync", "tooLong2Tag": "PNM,1", "channel": "sync", "topic": "sync", "highPts": 0,
         "pts": int(time.time() * 1000) * 1000, "seq": 0, "timestamp": int(time.time() * 1000)}]}
    await ws.send(json.dumps(msg))
    logger.info('连接注册完成')

def is_chat_message(self, message):
    """判断是否为用户聊天消息"""
    try:
        return (
            isinstance(message, dict) 
            and "1" in message 
            and isinstance(message["1"], dict)  # 确保是字典类型
            and "10" in message["1"]
            and isinstance(message["1"]["10"], dict)  # 确保是字典类型
            and "reminderContent" in message["1"]["10"]
        )
    except Exception:
        return False

def is_sync_package(self, message_data):
    """判断是否为同步包消息"""
    try:
        return (
            isinstance(message_data, dict)
            and "body" in message_data
            and "syncPushPackage" in message_data["body"]
            and "data" in message_data["body"]["syncPushPackage"]
            and len(message_data["body"]["syncPushPackage"]["data"]) > 0
        )
    except Exception:
        return False

def is_typing_status(self, message):
    """判断是否为用户正在输入状态消息"""
    try:
        return (
            isinstance(message, dict)
            and "1" in message
            and isinstance(message["1"], list)
            and len(message["1"]) > 0
            and isinstance(message["1"][0], dict)
            and "1" in message["1"][0]
            and isinstance(message["1"][0]["1"], str)
            and "@goofish" in message["1"][0]["1"]
        )
    except Exception:
        return False

async def handle_message(self, message_data, websocket):
    """处理所有类型的消息"""
    try:

        try:
            message = message_data
            ack = {
                "code": 200,
                "headers": {
                    "mid": message["headers"]["mid"] if "mid" in message["headers"] else generate_mid(),
                    "sid": message["headers"]["sid"] if "sid" in message["headers"] else '',
                }
            }
            if 'app-key' in message["headers"]:
                ack["headers"]["app-key"] = message["headers"]["app-key"]
            if 'ua' in message["headers"]:
                ack["headers"]["ua"] = message["headers"]["ua"]
            if 'dt' in message["headers"]:
                ack["headers"]["dt"] = message["headers"]["dt"]
            await websocket.send(json.dumps(ack))
        except Exception as e:
            pass

        # 如果不是同步包消息,直接返回
        if not self.is_sync_package(message_data):
            return

        # 获取并解密数据
        sync_data = message_data["body"]["syncPushPackage"]["data"][0]
  
        # 检查是否有必要的字段
        if "data" not in sync_data:
            logger.debug("同步包中无data字段")
            return

        # 解密数据
        try:
            data = sync_data["data"]
            try:
                data = base64.b64decode(data).decode("utf-8")
                data = json.loads(data)
                # logger.info(f"无需解密 message: {data}")
                return
            except Exception as e:
                # logger.info(f'加密数据: {data}')
                decrypted_data = decrypt(data)
                message = json.loads(decrypted_data)
        except Exception as e:
            logger.error(f"消息解密失败: {e}")
            return

        try:
            # 判断是否为订单消息,需要自行编写付款后的逻辑
            if message['3']['redReminder'] == '等待买家付款':
                user_id = message['1'].split('@')[0]
                user_url = f'https://www.goofish.com/personal?userId={user_id}'
                logger.info(f'等待买家 {user_url} 付款')
                return
            elif message['3']['redReminder'] == '交易关闭':
                user_id = message['1'].split('@')[0]
                user_url = f'https://www.goofish.com/personal?userId={user_id}'
                logger.info(f'卖家 {user_url} 交易关闭')
                return
            elif message['3']['redReminder'] == '等待卖家发货':
                user_id = message['1'].split('@')[0]
                user_url = f'https://www.goofish.com/personal?userId={user_id}'
                logger.info(f'交易成功 {user_url} 等待卖家发货')
                return

        except:
            pass

        # 判断消息类型
        if self.is_typing_status(message):
            logger.debug("用户正在输入")
            return
        elif not self.is_chat_message(message):
            logger.debug("其他非聊天消息")
            logger.debug(f"原始消息: {message}")
            return

        # 处理聊天消息
        create_time = int(message["1"]["5"])
        send_user_name = message["1"]["10"]["reminderTitle"]
        send_user_id = message["1"]["10"]["senderUserId"]
        send_message = message["1"]["10"]["reminderContent"]
  
        # 时效性验证(过滤5分钟前消息)
        if (time.time() * 1000 - create_time) > 300000:
            logger.debug("过期消息丢弃")
            return
      
        if send_user_id == self.myid:
            logger.debug("过滤自身消息")
            return
      
        url_info = message["1"]["10"]["reminderUrl"]
        item_id = url_info.split("itemId=")[1].split("&")[0] if "itemId=" in url_info else None
  
        if not item_id:
            logger.warning("无法获取商品ID")
            return
  
        # 从数据库中获取商品信息,如果不存在则从API获取并保存
        item_info = self.context_manager.get_item_info(item_id)
        if not item_info:
            logger.info(f"从API获取商品信息: {item_id}")
            api_result = self.xianyu.get_item_info(item_id)
            if 'data' in api_result and 'itemDO' in api_result['data']:
                item_info = api_result['data']['itemDO']
                # 保存商品信息到数据库
                self.context_manager.save_item_info(item_id, item_info)
            else:
                logger.warning(f"获取商品信息失败: {api_result}")
                return
        else:
            logger.info(f"从数据库获取商品信息: {item_id}")
      
        item_description = f"{item_info['desc']};当前商品售卖价格为:{str(item_info['soldPrice'])}"
        logger.info(f"user: {send_user_name}, 发送消息: {send_message}")
  
        # 添加用户消息到上下文
        self.context_manager.add_message(send_user_id, item_id, "user", send_message)
  
        # 获取完整的对话上下文
        context = self.context_manager.get_context(send_user_id, item_id)
  
        # 生成回复
        bot_reply = bot.generate_reply(
            send_message,
            item_description,
            context=context
        )
  
        # 检查是否为价格意图,如果是则增加议价次数
        if bot.last_intent == "price":
            self.context_manager.increment_bargain_count(send_user_id, item_id)
            bargain_count = self.context_manager.get_bargain_count(send_user_id, item_id)
            logger.info(f"用户 {send_user_name} 对商品 {item_id} 的议价次数: {bargain_count}")
  
        # 添加机器人回复到上下文
        self.context_manager.add_message(send_user_id, item_id, "assistant", bot_reply)
  
        logger.info(f"机器人回复: {bot_reply}")
        cid = message["1"]["2"].split('@')[0]
        await self.send_msg(websocket, cid, send_user_id, bot_reply)
  
    except Exception as e:
        logger.error(f"处理消息时发生错误: {str(e)}")
        logger.debug(f"原始消息: {message_data}")

async def send_heartbeat(self, ws):
    """发送心跳包并等待响应"""
    try:
        heartbeat_mid = generate_mid()
        heartbeat_msg = {
            "lwp": "/!",
            "headers": {
                "mid": heartbeat_mid
            }
        }
        await ws.send(json.dumps(heartbeat_msg))
        self.last_heartbeat_time = time.time()
        logger.debug("心跳包已发送")
        return heartbeat_mid
    except Exception as e:
        logger.error(f"发送心跳包失败: {e}")
        raise

async def heartbeat_loop(self, ws):
    """心跳维护循环"""
    while True:
        try:
            current_time = time.time()
      
            # 检查是否需要发送心跳
            if current_time - self.last_heartbeat_time >= self.heartbeat_interval:
                await self.send_heartbeat(ws)
      
            # 检查上次心跳响应时间,如果超时则认为连接已断开
            if (current_time - self.last_heartbeat_response) > (self.heartbeat_interval + self.heartbeat_timeout):
                logger.warning("心跳响应超时,可能连接已断开")
                break
      
            await asyncio.sleep(1)
        except Exception as e:
            logger.error(f"心跳循环出错: {e}")
            break

async def handle_heartbeat_response(self, message_data):
    """处理心跳响应"""
    try:
        if (
            isinstance(message_data, dict)
            and "headers" in message_data
            and "mid" in message_data["headers"]
            and "code" in message_data
            and message_data["code"] == 200
        ):
            self.last_heartbeat_response = time.time()
            logger.debug("收到心跳响应")
            return True
    except Exception as e:
        logger.error(f"处理心跳响应出错: {e}")
    return False

async def main(self):
    while True:
        try:
            headers = {
                "Cookie": self.cookies_str,
                "Host": "wss-goofish.dingtalk.com",
                "Connection": "Upgrade",
                "Pragma": "no-cache",
                "Cache-Control": "no-cache",
                "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36",
                "Origin": "https://www.goofish.com",
                "Accept-Encoding": "gzip, deflate, br, zstd",
                "Accept-Language": "zh-CN,zh;q=0.9",
            }

            async with websockets.connect(self.base_url, extra_headers=headers) as websocket:
                self.ws = websocket
                await self.init(websocket)
          
                # 初始化心跳时间
                self.last_heartbeat_time = time.time()
                self.last_heartbeat_response = time.time()
          
                # 启动心跳任务
                self.heartbeat_task = asyncio.create_task(self.heartbeat_loop(websocket))
          
                async for message in websocket:
                    try:
                        message_data = json.loads(message)
                  
                        # 处理心跳响应
                        if await self.handle_heartbeat_response(message_data):
                            continue
                  
                        # 发送通用ACK响应
                        if "headers" in message_data and "mid" in message_data["headers"]:
                            ack = {
                                "code": 200,
                                "headers": {
                                    "mid": message_data["headers"]["mid"],
                                    "sid": message_data["headers"].get("sid", "")
                                }
                            }
                            # 复制其他可能的header字段
                            for key in ["app-key", "ua", "dt"]:
                                if key in message_data["headers"]:
                                    ack["headers"][key] = message_data["headers"][key]
                            await websocket.send(json.dumps(ack))
                  
                        # 处理其他消息
                        await self.handle_message(message_data, websocket)
                      
                    except json.JSONDecodeError:
                        logger.error("消息解析失败")
                    except Exception as e:
                        logger.error(f"处理消息时发生错误: {str(e)}")
                        logger.debug(f"原始消息: {message}")

        except websockets.exceptions.ConnectionClosed:
            logger.warning("WebSocket连接已关闭")
            if self.heartbeat_task:
                self.heartbeat_task.cancel()
                try:
                    await self.heartbeat_task
                except asyncio.CancelledError:
                    pass
            await asyncio.sleep(5)  # 等待5秒后重连
      
        except Exception as e:
            logger.error(f"连接发生错误: {e}")
            if self.heartbeat_task:
                self.heartbeat_task.cancel()
                try:
                    await self.heartbeat_task
                except asyncio.CancelledError:
                    pass
            await asyncio.sleep(5)  # 等待5秒后重连
    # 心跳相关配置
    self.heartbeat_interval = 15  # 心跳间隔15秒
    self.heartbeat_timeout = 5    # 心跳超时5秒
    self.last_heartbeat_time = 0
    self.last_heartbeat_response = 0
    self.heartbeat_task = None
    self.ws = None

async def send_msg(self, ws, cid, toid, text):
    text = {
        "contentType": 1,
        "text": {
            "text": text
        }
    }
    text_base64 = str(base64.b64encode(json.dumps(text).encode('utf-8')), 'utf-8')
    msg = {
        "lwp": "/r/MessageSend/sendByReceiverScope",
        "headers": {
            "mid": generate_mid()
        },
        "body": [
            {
                "uuid": generate_uuid(),
                "cid": f"{cid}@goofish",
                "conversationType": 1,
                "content": {
                    "contentType": 101,
                    "custom": {
                        "type": 1,
                        "data": text_base64
                    }
                },
                "redPointPolicy": 0,
                "extension": {
                    "extJson": "{}"
                },
                "ctx": {
                    "appVersion": "1.0",
                    "platform": "web"
                },
                "mtags": {},
                "msgReadStatusSetting": 1
            },
            {
                "actualReceivers": [
                    f"{toid}@goofish",
                    f"{self.myid}@goofish"
                ]
            }
        ]
    }
    await ws.send(json.dumps(msg))

async def init(self, ws):
    token = self.xianyu.get_token(self.device_id)['data']['accessToken']
    msg = {
        "lwp": "/reg",
        "headers": {
            "cache-header": "app-key token ua wv",
            "app-key": "444e9908a51d1cb236a27862abc769c9",
            "token": token,
            "ua": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36 DingTalk(2.1.5) OS(Windows/10) Browser(Chrome/133.0.0.0) DingWeb/2.1.5 IMPaaS DingWeb/2.1.5",
            "dt": "j",
            "wv": "im:3,au:3,sy:6",
            "sync": "0,0;0;0;",
            "did": self.device_id,
            "mid": generate_mid()
        }
    }
    await ws.send(json.dumps(msg))
    # 等待一段时间,确保连接注册完成
    await asyncio.sleep(1)
    msg = {"lwp": "/r/SyncStatus/ackDiff", "headers": {"mid": "5701741704675979 0"}, "body": [
        {"pipeline": "sync", "tooLong2Tag": "PNM,1", "channel": "sync", "topic": "sync", "highPts": 0,
         "pts": int(time.time() * 1000) * 1000, "seq": 0, "timestamp": int(time.time() * 1000)}]}
    await ws.send(json.dumps(msg))
    logger.info('连接注册完成')

def is_chat_message(self, message):
    """判断是否为用户聊天消息"""
    try:
        return (
            isinstance(message, dict) 
            and "1" in message 
            and isinstance(message["1"], dict)  # 确保是字典类型
            and "10" in message["1"]
            and isinstance(message["1"]["10"], dict)  # 确保是字典类型
            and "reminderContent" in message["1"]["10"]
        )
    except Exception:
        return False

def is_sync_package(self, message_data):
    """判断是否为同步包消息"""
    try:
        return (
            isinstance(message_data, dict)
            and "body" in message_data
            and "syncPushPackage" in message_data["body"]
            and "data" in message_data["body"]["syncPushPackage"]
            and len(message_data["body"]["syncPushPackage"]["data"]) > 0
        )
    except Exception:
        return False

def is_typing_status(self, message):
    """判断是否为用户正在输入状态消息"""
    try:
        return (
            isinstance(message, dict)
            and "1" in message
            and isinstance(message["1"], list)
            and len(message["1"]) > 0
            and isinstance(message["1"][0], dict)
            and "1" in message["1"][0]
            and isinstance(message["1"][0]["1"], str)
            and "@goofish" in message["1"][0]["1"]
        )
    except Exception:
        return False

async def handle_message(self, message_data, websocket):
    """处理所有类型的消息"""
    try:

        try:
            message = message_data
            ack = {
                "code": 200,
                "headers": {
                    "mid": message["headers"]["mid"] if "mid" in message["headers"] else generate_mid(),
                    "sid": message["headers"]["sid"] if "sid" in message["headers"] else '',
                }
            }
            if 'app-key' in message["headers"]:
                ack["headers"]["app-key"] = message["headers"]["app-key"]
            if 'ua' in message["headers"]:
                ack["headers"]["ua"] = message["headers"]["ua"]
            if 'dt' in message["headers"]:
                ack["headers"]["dt"] = message["headers"]["dt"]
            await websocket.send(json.dumps(ack))
        except Exception as e:
            pass

        # 如果不是同步包消息,直接返回
        if not self.is_sync_package(message_data):
            return

        # 获取并解密数据
        sync_data = message_data["body"]["syncPushPackage"]["data"][0]
  
        # 检查是否有必要的字段
        if "data" not in sync_data:
            logger.debug("同步包中无data字段")
            return

        # 解密数据
        try:
            data = sync_data["data"]
            try:
                data = base64.b64decode(data).decode("utf-8")
                data = json.loads(data)
                # logger.info(f"无需解密 message: {data}")
                return
            except Exception as e:
                # logger.info(f'加密数据: {data}')
                decrypted_data = decrypt(data)
                message = json.loads(decrypted_data)
        except Exception as e:
            logger.error(f"消息解密失败: {e}")
            return

        try:
            # 判断是否为订单消息,需要自行编写付款后的逻辑
            if message['3']['redReminder'] == '等待买家付款':
                user_id = message['1'].split('@')[0]
                user_url = f'https://www.goofish.com/personal?userId={user_id}'
                logger.info(f'等待买家 {user_url} 付款')
                return
            elif message['3']['redReminder'] == '交易关闭':
                user_id = message['1'].split('@')[0]
                user_url = f'https://www.goofish.com/personal?userId={user_id}'
                logger.info(f'卖家 {user_url} 交易关闭')
                return
            elif message['3']['redReminder'] == '等待卖家发货':
                user_id = message['1'].split('@')[0]
                user_url = f'https://www.goofish.com/personal?userId={user_id}'
                logger.info(f'交易成功 {user_url} 等待卖家发货')
                return

        except:
            pass

        # 判断消息类型
        if self.is_typing_status(message):
            logger.debug("用户正在输入")
            return
        elif not self.is_chat_message(message):
            logger.debug("其他非聊天消息")
            logger.debug(f"原始消息: {message}")
            return

        # 处理聊天消息
        create_time = int(message["1"]["5"])
        send_user_name = message["1"]["10"]["reminderTitle"]
        send_user_id = message["1"]["10"]["senderUserId"]
        send_message = message["1"]["10"]["reminderContent"]
  
        # 时效性验证(过滤5分钟前消息)
        if (time.time() * 1000 - create_time) > 300000:
            logger.debug("过期消息丢弃")
            return
      
        if send_user_id == self.myid:
            logger.debug("过滤自身消息")
            return
      
        url_info = message["1"]["10"]["reminderUrl"]
        item_id = url_info.split("itemId=")[1].split("&")[0] if "itemId=" in url_info else None
  
        if not item_id:
            logger.warning("无法获取商品ID")
            return
  
        # 从数据库中获取商品信息,如果不存在则从API获取并保存
        item_info = self.context_manager.get_item_info(item_id)
        if not item_info:
            logger.info(f"从API获取商品信息: {item_id}")
            api_result = self.xianyu.get_item_info(item_id)
            if 'data' in api_result and 'itemDO' in api_result['data']:
                item_info = api_result['data']['itemDO']
                # 保存商品信息到数据库
                self.context_manager.save_item_info(item_id, item_info)
            else:
                logger.warning(f"获取商品信息失败: {api_result}")
                return
        else:
            logger.info(f"从数据库获取商品信息: {item_id}")
      
        item_description = f"{item_info['desc']};当前商品售卖价格为:{str(item_info['soldPrice'])}"
        logger.info(f"user: {send_user_name}, 发送消息: {send_message}")
  
        # 添加用户消息到上下文
        self.context_manager.add_message(send_user_id, item_id, "user", send_message)
  
        # 获取完整的对话上下文
        context = self.context_manager.get_context(send_user_id, item_id)
  
        # 生成回复
        bot_reply = bot.generate_reply(
            send_message,
            item_description,
            context=context
        )
  
        # 检查是否为价格意图,如果是则增加议价次数
        if bot.last_intent == "price":
            self.context_manager.increment_bargain_count(send_user_id, item_id)
            bargain_count = self.context_manager.get_bargain_count(send_user_id, item_id)
            logger.info(f"用户 {send_user_name} 对商品 {item_id} 的议价次数: {bargain_count}")
  
        # 添加机器人回复到上下文
        self.context_manager.add_message(send_user_id, item_id, "assistant", bot_reply)
  
        logger.info(f"机器人回复: {bot_reply}")
        cid = message["1"]["2"].split('@')[0]
        await self.send_msg(websocket, cid, send_user_id, bot_reply)
  
    except Exception as e:
        logger.error(f"处理消息时发生错误: {str(e)}")
        logger.debug(f"原始消息: {message_data}")

async def send_heartbeat(self, ws):
    """发送心跳包并等待响应"""
    try:
        heartbeat_mid = generate_mid()
        heartbeat_msg = {
            "lwp": "/!",
            "headers": {
                "mid": heartbeat_mid
            }
        }
        await ws.send(json.dumps(heartbeat_msg))
        self.last_heartbeat_time = time.time()
        logger.debug("心跳包已发送")
        return heartbeat_mid
    except Exception as e:
        logger.error(f"发送心跳包失败: {e}")
        raise

async def heartbeat_loop(self, ws):
    """心跳维护循环"""
    while True:
        try:
            current_time = time.time()
      
            # 检查是否需要发送心跳
            if current_time - self.last_heartbeat_time >= self.heartbeat_interval:
                await self.send_heartbeat(ws)
      
            # 检查上次心跳响应时间,如果超时则认为连接已断开
            if (current_time - self.last_heartbeat_response) > (self.heartbeat_interval + self.heartbeat_timeout):
                logger.warning("心跳响应超时,可能连接已断开")
                break
      
            await asyncio.sleep(1)
        except Exception as e:
            logger.error(f"心跳循环出错: {e}")
            break

async def handle_heartbeat_response(self, message_data):
    """处理心跳响应"""
    try:
        if (
            isinstance(message_data, dict)
            and "headers" in message_data
            and "mid" in message_data["headers"]
            and "code" in message_data
            and message_data["code"] == 200
        ):
            self.last_heartbeat_response = time.time()
            logger.debug("收到心跳响应")
            return True
    except Exception as e:
        logger.error(f"处理心跳响应出错: {e}")
    return False

async def main(self):
    while True:
        try:
            headers = {
                "Cookie": self.cookies_str,
                "Host": "wss-goofish.dingtalk.com",
                "Connection": "Upgrade",
                "Pragma": "no-cache",
                "Cache-Control": "no-cache",
                "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36",
                "Origin": "https://www.goofish.com",
                "Accept-Encoding": "gzip, deflate, br, zstd",
                "Accept-Language": "zh-CN,zh;q=0.9",
            }

            async with websockets.connect(self.base_url, extra_headers=headers) as websocket:
                self.ws = websocket
                await self.init(websocket)
          
                # 初始化心跳时间
                self.last_heartbeat_time = time.time()
                self.last_heartbeat_response = time.time()
          
                # 启动心跳任务
                self.heartbeat_task = asyncio.create_task(self.heartbeat_loop(websocket))
          
                async for message in websocket:
                    try:
                        message_data = json.loads(message)
                  
                        # 处理心跳响应
                        if await self.handle_heartbeat_response(message_data):
                            continue
                  
                        # 发送通用ACK响应
                        if "headers" in message_data and "mid" in message_data["headers"]:
                            ack = {
                                "code": 200,
                                "headers": {
                                    "mid": message_data["headers"]["mid"],
                                    "sid": message_data["headers"].get("sid", "")
                                }
                            }
                            # 复制其他可能的header字段
                            for key in ["app-key", "ua", "dt"]:
                                if key in message_data["headers"]:
                                    ack["headers"][key] = message_data["headers"][key]
                            await websocket.send(json.dumps(ack))
                  
                        # 处理其他消息
                        await self.handle_message(message_data, websocket)
                      
                    except json.JSONDecodeError:
                        logger.error("消息解析失败")
                    except Exception as e:
                        logger.error(f"处理消息时发生错误: {str(e)}")
                        logger.debug(f"原始消息: {message}")

        except websockets.exceptions.ConnectionClosed:
            logger.warning("WebSocket连接已关闭")
            if self.heartbeat_task:
                self.heartbeat_task.cancel()
                try:
                    await self.heartbeat_task
                except asyncio.CancelledError:
                    pass
            await asyncio.sleep(5)  # 等待5秒后重连
      
        except Exception as e:
            logger.error(f"连接发生错误: {e}")
            if self.heartbeat_task:
                self.heartbeat_task.cancel()
                try:
                    await self.heartbeat_task
                except asyncio.CancelledError:
                    pass
            await asyncio.sleep(5)  # 等待5秒后重连
if name == 'main':
#加载环境变量 cookie
load_dotenv()
cookies_str = os.getenv("COOKIES_STR")
bot = XianyuReplyBot()
xianyuLive = XianyuLive(cookies_str)常驻进程asyncio.run(xianyuLive.main())

补充2、 XianyuAgent.py 代码

import re
from typing import List, Dict
import os
import openai
from loguru import loggerclass XianyuReplyBot:
def init(self):
# 设置 OpenAI API 配置(适配 openai 0.28.1)
openai.api_key = os.getenv("API_KEY")
openai.api_base = os.getenv("MODEL_BASE_URL")    self._init_system_prompts()
    self._init_agents()
    self.router = IntentRouter(self.agents['classify'])
    self.last_intent = None  # 记录最后一次意图

def _init_agents(self):
    """初始化各领域Agent"""
    self.agents = {
        'classify': ClassifyAgent(self.classify_prompt, self._safe_filter),
        'price': PriceAgent(self.price_prompt, self._safe_filter),
        'tech': TechAgent(self.tech_prompt, self._safe_filter),
        'default': DefaultAgent(self.default_prompt, self._safe_filter),
    }

def _init_system_prompts(self):
    """初始化各Agent专用提示词,直接从文件中加载"""
    prompt_dir = "prompts"

    try:
        with open(os.path.join(prompt_dir, "classify_prompt.txt"), "r", encoding="utf-8") as f:
            self.classify_prompt = f.read()
            logger.debug(f"已加载分类提示词,长度: {len(self.classify_prompt)} 字符")

        with open(os.path.join(prompt_dir, "price_prompt.txt"), "r", encoding="utf-8") as f:
            self.price_prompt = f.read()
            logger.debug(f"已加载价格提示词,长度: {len(self.price_prompt)} 字符")

        with open(os.path.join(prompt_dir, "tech_prompt.txt"), "r", encoding="utf-8") as f:
            self.tech_prompt = f.read()
            logger.debug(f"已加载技术提示词,长度: {len(self.tech_prompt)} 字符")

        with open(os.path.join(prompt_dir, "default_prompt.txt"), "r", encoding="utf-8") as f:
            self.default_prompt = f.read()
            logger.debug(f"已加载默认提示词,长度: {len(self.default_prompt)} 字符")

        logger.info("成功加载所有提示词")
    except Exception as e:
        logger.error(f"加载提示词时出错: {e}")
        raise

def _safe_filter(self, text: str) -> str:
    blocked_phrases = ["微信", "QQ", "支付宝", "银行卡", "线下"]
    return "[安全提醒]请通过平台沟通" if any(p in text for p in blocked_phrases) else text

def format_history(self, context: List[Dict]) -> str:
    user_assistant_msgs = [msg for msg in context if msg['role'] in ['user', 'assistant']]
    return "\n".join([f"{msg['role']}: {msg['content']}" for msg in user_assistant_msgs])

def generate_reply(self, user_msg: str, item_desc: str, context: List[Dict]) -> str:
    formatted_context = self.format_history(context)

    detected_intent = self.router.detect(user_msg, item_desc, formatted_context)

    internal_intents = {'classify'}

    if detected_intent in self.agents and detected_intent not in internal_intents:
        agent = self.agents[detected_intent]
        logger.info(f'意图识别完成: {detected_intent}')
        self.last_intent = detected_intent
    else:
        agent = self.agents['default']
        logger.info(f'意图识别完成: default')
        self.last_intent = 'default'

    bargain_count = self._extract_bargain_count(context)
    logger.info(f'议价次数: {bargain_count}')

    return agent.generate(
        user_msg=user_msg,
        item_desc=item_desc,
        context=formatted_context,
        bargain_count=bargain_count
    )

def _extract_bargain_count(self, context: List[Dict]) -> int:
    for msg in context:
        if msg['role'] == 'system' and '议价次数' in msg['content']:
            try:
                match = re.search(r'议价次数[::]\s*(\d+)', msg['content'])
                if match:
                    return int(match.group(1))
            except Exception:
                pass
    return 0

def reload_prompts(self):
    logger.info("正在重新加载提示词...")
    self._init_system_prompts()
    self._init_agents()
    logger.info("提示词重新加载完成")
    self._init_system_prompts()
    self._init_agents()
    self.router = IntentRouter(self.agents['classify'])
    self.last_intent = None  # 记录最后一次意图

def _init_agents(self):
    """初始化各领域Agent"""
    self.agents = {
        'classify': ClassifyAgent(self.classify_prompt, self._safe_filter),
        'price': PriceAgent(self.price_prompt, self._safe_filter),
        'tech': TechAgent(self.tech_prompt, self._safe_filter),
        'default': DefaultAgent(self.default_prompt, self._safe_filter),
    }

def _init_system_prompts(self):
    """初始化各Agent专用提示词,直接从文件中加载"""
    prompt_dir = "prompts"

    try:
        with open(os.path.join(prompt_dir, "classify_prompt.txt"), "r", encoding="utf-8") as f:
            self.classify_prompt = f.read()
            logger.debug(f"已加载分类提示词,长度: {len(self.classify_prompt)} 字符")

        with open(os.path.join(prompt_dir, "price_prompt.txt"), "r", encoding="utf-8") as f:
            self.price_prompt = f.read()
            logger.debug(f"已加载价格提示词,长度: {len(self.price_prompt)} 字符")

        with open(os.path.join(prompt_dir, "tech_prompt.txt"), "r", encoding="utf-8") as f:
            self.tech_prompt = f.read()
            logger.debug(f"已加载技术提示词,长度: {len(self.tech_prompt)} 字符")

        with open(os.path.join(prompt_dir, "default_prompt.txt"), "r", encoding="utf-8") as f:
            self.default_prompt = f.read()
            logger.debug(f"已加载默认提示词,长度: {len(self.default_prompt)} 字符")

        logger.info("成功加载所有提示词")
    except Exception as e:
        logger.error(f"加载提示词时出错: {e}")
        raise

def _safe_filter(self, text: str) -> str:
    blocked_phrases = ["微信", "QQ", "支付宝", "银行卡", "线下"]
    return "[安全提醒]请通过平台沟通" if any(p in text for p in blocked_phrases) else text

def format_history(self, context: List[Dict]) -> str:
    user_assistant_msgs = [msg for msg in context if msg['role'] in ['user', 'assistant']]
    return "\n".join([f"{msg['role']}: {msg['content']}" for msg in user_assistant_msgs])

def generate_reply(self, user_msg: str, item_desc: str, context: List[Dict]) -> str:
    formatted_context = self.format_history(context)

    detected_intent = self.router.detect(user_msg, item_desc, formatted_context)

    internal_intents = {'classify'}

    if detected_intent in self.agents and detected_intent not in internal_intents:
        agent = self.agents[detected_intent]
        logger.info(f'意图识别完成: {detected_intent}')
        self.last_intent = detected_intent
    else:
        agent = self.agents['default']
        logger.info(f'意图识别完成: default')
        self.last_intent = 'default'

    bargain_count = self._extract_bargain_count(context)
    logger.info(f'议价次数: {bargain_count}')

    return agent.generate(
        user_msg=user_msg,
        item_desc=item_desc,
        context=formatted_context,
        bargain_count=bargain_count
    )

def _extract_bargain_count(self, context: List[Dict]) -> int:
    for msg in context:
        if msg['role'] == 'system' and '议价次数' in msg['content']:
            try:
                match = re.search(r'议价次数[::]\s*(\d+)', msg['content'])
                if match:
                    return int(match.group(1))
            except Exception:
                pass
    return 0

def reload_prompts(self):
    logger.info("正在重新加载提示词...")
    self._init_system_prompts()
    self._init_agents()
    logger.info("提示词重新加载完成")
class IntentRouter:
def init(self, classify_agent):
self.rules = {
'tech': {
'keywords': ['参数', '规格', '型号', '连接', '对比'],
'patterns': [r'和.+比']
},
'price': {
'keywords': ['便宜', '价', '砍价', '少点'],
'patterns': [r'\d+元', r'能少\d+']
}
}
self.classify_agent = classify_agentdef detect(self, user_msg: str, item_desc, context) -> str:
    text_clean = re.sub(r'[^\w\u4e00-\u9fa5]', '', user_msg)

    if any(kw in text_clean for kw in self.rules['tech']['keywords']):
        return 'tech'

    for pattern in self.rules['tech']['patterns']:
        if re.search(pattern, text_clean):
            return 'tech'

    for intent in ['price']:
        if any(kw in text_clean for kw in self.rules[intent]['keywords']):
            return intent
        for pattern in self.rules[intent]['patterns']:
            if re.search(pattern, text_clean):
                return intent

    return self.classify_agent.generate(
        user_msg=user_msg,
        item_desc=item_desc,
        context=context
    )
def detect(self, user_msg: str, item_desc, context) -> str:
    text_clean = re.sub(r'[^\w\u4e00-\u9fa5]', '', user_msg)

    if any(kw in text_clean for kw in self.rules['tech']['keywords']):
        return 'tech'

    for pattern in self.rules['tech']['patterns']:
        if re.search(pattern, text_clean):
            return 'tech'

    for intent in ['price']:
        if any(kw in text_clean for kw in self.rules[intent]['keywords']):
            return intent
        for pattern in self.rules[intent]['patterns']:
            if re.search(pattern, text_clean):
                return intent

    return self.classify_agent.generate(
        user_msg=user_msg,
        item_desc=item_desc,
        context=context
    )
class BaseAgent:
def init(self, system_prompt, safety_filter):
self.system_prompt = system_prompt
self.safety_filter = safety_filterdef generate(self, user_msg: str, item_desc: str, context: str, bargain_count: int = 0) -> str:
    messages = self._build_messages(user_msg, item_desc, context)
    response = self._call_llm(messages)
    return self.safety_filter(response)

def _build_messages(self, user_msg: str, item_desc: str, context: str) -> List[Dict]:
    return [
        {"role": "system", "content": f"【商品信息】{item_desc}\n【你与客户对话历史】{context}\n{self.system_prompt}"},
        {"role": "user", "content": user_msg}
    ]

def _call_llm(self, messages: List[Dict], temperature: float = 0.4) -> str:
    response = openai.ChatCompletion.create(
        model=os.getenv("MODEL_NAME", "qwen-max"),
        messages=messages,
        temperature=temperature,
        max_tokens=500,
        top_p=0.8
    )
    return response['choices'][0]['message']['content']
def generate(self, user_msg: str, item_desc: str, context: str, bargain_count: int = 0) -> str:
    messages = self._build_messages(user_msg, item_desc, context)
    response = self._call_llm(messages)
    return self.safety_filter(response)

def _build_messages(self, user_msg: str, item_desc: str, context: str) -> List[Dict]:
    return [
        {"role": "system", "content": f"【商品信息】{item_desc}\n【你与客户对话历史】{context}\n{self.system_prompt}"},
        {"role": "user", "content": user_msg}
    ]

def _call_llm(self, messages: List[Dict], temperature: float = 0.4) -> str:
    response = openai.ChatCompletion.create(
        model=os.getenv("MODEL_NAME", "qwen-max"),
        messages=messages,
        temperature=temperature,
        max_tokens=500,
        top_p=0.8
    )
    return response['choices'][0]['message']['content']
class PriceAgent(BaseAgent):
def generate(self, user_msg: str, item_desc: str, context: str, bargain_count: int = 0) -> str:
dynamic_temp = self._calc_temperature(bargain_count)
messages = self._build_messages(user_msg, item_desc, context)
messages[0]['content'] += f"\n▲当前议价轮次:{bargain_count}"    response = openai.ChatCompletion.create(
        model=os.getenv("MODEL_NAME", "qwen-max"),
        messages=messages,
        temperature=dynamic_temp,
        max_tokens=500,
        top_p=0.8
    )
    return self.safety_filter(response['choices'][0]['message']['content'])

def _calc_temperature(self, bargain_count: int) -> float:
    return min(0.3 + bargain_count * 0.15, 0.9)
    response = openai.ChatCompletion.create(
        model=os.getenv("MODEL_NAME", "qwen-max"),
        messages=messages,
        temperature=dynamic_temp,
        max_tokens=500,
        top_p=0.8
    )
    return self.safety_filter(response['choices'][0]['message']['content'])

def _calc_temperature(self, bargain_count: int) -> float:
    return min(0.3 + bargain_count * 0.15, 0.9)
class TechAgent(BaseAgent):
def generate(self, user_msg: str, item_desc: str, context: str, bargain_count: int = 0) -> str:
messages = self._build_messages(user_msg, item_desc, context)    response = openai.ChatCompletion.create(
        model=os.getenv("MODEL_NAME", "qwen-max"),
        messages=messages,
        temperature=0.4,
        max_tokens=500,
        top_p=0.8
    )
    return self.safety_filter(response['choices'][0]['message']['content'])
    response = openai.ChatCompletion.create(
        model=os.getenv("MODEL_NAME", "qwen-max"),
        messages=messages,
        temperature=0.4,
        max_tokens=500,
        top_p=0.8
    )
    return self.safety_filter(response['choices'][0]['message']['content'])
class ClassifyAgent(BaseAgent):
def generate(self, **args) -> str:
return super().generate(**args)class DefaultAgent(BaseAgent):
def _call_llm(self, messages: List[Dict], *args) -> str:
response = openai.ChatCompletion.create(
model=os.getenv("MODEL_NAME", "qwen-max"),
messages=messages,
temperature=0.7,
max_tokens=500,
top_p=0.8
)
return response['choices'][0]['message']['content']
0
广告 广告

评论区