闲鱼对接chatgpt,千问,实现自动智能回复!
🧩 第一部分:准备工作(1/4)
✅ 1. 购买 VPS + 安装 Linux(Ubuntu 22.04 推荐)
可以使用 RackNerd、CloudCone 等廉价服务,系统选 Ubuntu(Debian 也行)。
✅ 2. 使用 MobaXterm、XShell、SSH 登录到 VPS
比如你登录成功后看到:
root@your-hostname:~#
🧩 第二部分:安装依赖环境(2/4)
✅ 3. 安装 Python 和常用工具(如未安装)
apt update && apt install -y python3 python3-pip git
✅ 4. 安装 Node.js(为了构建用到的组件)
curl -fsSL https://deb.nodesource.com/setup_18.x | bash -
apt install -y nodejs
✅ 5. 安装依赖包管理器 pipenv(可选)
推荐用 pip
,无需 pipenv:
pip install -r requirements.txt
🧩 第三部分:部署项目并配置(3/4)
✅ 6. 克隆项目
git clone https://github.com/shaxiu/XianyuAutoAgent.git
cd XianyuAutoAgent
✅ 7. 安装依赖
pip install -r requirements.txt
⚠️ 出现 openai
SDK 报错?见第 10 步处理!
✅ 8. 准备提示词(Prompt)
cd prompts
mv classify_prompt_example.txt classify_prompt.txt
mv default_prompt_example.txt default_prompt.txt
mv price_prompt_example.txt price_prompt.txt
mv tech_prompt_example.txt tech_prompt.txt
cd ..
有些版本中已经是改好名的,直接 ls prompts
确认即可。
✅ 9. 配置 .env
文件
新建 .env
文件:
nano .env
粘贴以下内容并替换你自己的:
API_KEY=sk-你的Moonshot密钥
COOKIES_STR=你从浏览器F12复制的一整串cookie
MODEL_BASE_URL=https://api.moonshot.cn/v1
MODEL_NAME=moonshot-v1-8k
保存退出(Ctrl + O 回车,Ctrl + X)。
✅ 10. 替换 OpenAI SDK 版本为 Moonshot 兼容老版(关键!)
pip uninstall openai -y
pip install openai==0.28.1
✅ 11. 替换 XianyuAgent.py
内容
用我提供的 Moonshot 兼容版本覆盖,文件中包括:(见我这篇文章获取)
openai.api_key = ...
openai.api_base = ...
- 使用
openai.ChatCompletion.create(...)
🧩 第四部分:运行与守护(4/4)
✅ 12. 测试运行
python3 main.py
看到日志中出现:
用户: XXX 发送消息: XXX
机器人回复: XXX
就说明部署成功 ✅
✅ 13. 后台守护运行(推荐)
cd ~/XianyuAutoAgent
nohup python3 main.py > out.log 2>&1 &
tail -f out.log
✅ 14. 可选:做成 systemd 开机启动
systemd 自动启动教程 + 多号并发部署方案 + 日志清理建议
等我更新。。。。。。。。。。
✅ 补充部分
补充1、 mian.py 代码
import base64
import json
import asyncio
import time
import os
import websockets
from loguru import logger
from dotenv import load_dotenv
from XianyuApis import XianyuApisfrom utils.xianyu_utils import generate_mid, generate_uuid, trans_cookies, generate_device_id, decrypt
from XianyuAgent import XianyuReplyBot
from context_manager import ChatContextManagerclass XianyuLive:
def init(self, cookies_str):
self.xianyu = XianyuApis()
self.base_url = 'wss://wss-goofish.dingtalk.com/'
self.cookies_str = cookies_str
self.cookies = trans_cookies(cookies_str)
self.xianyu.session.cookies.update(self.cookies) # 直接使用 session.cookies.update
self.myid = self.cookies['unb']
self.device_id = generate_device_id(self.myid)
self.context_manager = ChatContextManager() # 心跳相关配置
self.heartbeat_interval = 15 # 心跳间隔15秒
self.heartbeat_timeout = 5 # 心跳超时5秒
self.last_heartbeat_time = 0
self.last_heartbeat_response = 0
self.heartbeat_task = None
self.ws = None
async def send_msg(self, ws, cid, toid, text):
text = {
"contentType": 1,
"text": {
"text": text
}
}
text_base64 = str(base64.b64encode(json.dumps(text).encode('utf-8')), 'utf-8')
msg = {
"lwp": "/r/MessageSend/sendByReceiverScope",
"headers": {
"mid": generate_mid()
},
"body": [
{
"uuid": generate_uuid(),
"cid": f"{cid}@goofish",
"conversationType": 1,
"content": {
"contentType": 101,
"custom": {
"type": 1,
"data": text_base64
}
},
"redPointPolicy": 0,
"extension": {
"extJson": "{}"
},
"ctx": {
"appVersion": "1.0",
"platform": "web"
},
"mtags": {},
"msgReadStatusSetting": 1
},
{
"actualReceivers": [
f"{toid}@goofish",
f"{self.myid}@goofish"
]
}
]
}
await ws.send(json.dumps(msg))
async def init(self, ws):
token = self.xianyu.get_token(self.device_id)['data']['accessToken']
msg = {
"lwp": "/reg",
"headers": {
"cache-header": "app-key token ua wv",
"app-key": "444e9908a51d1cb236a27862abc769c9",
"token": token,
"ua": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36 DingTalk(2.1.5) OS(Windows/10) Browser(Chrome/133.0.0.0) DingWeb/2.1.5 IMPaaS DingWeb/2.1.5",
"dt": "j",
"wv": "im:3,au:3,sy:6",
"sync": "0,0;0;0;",
"did": self.device_id,
"mid": generate_mid()
}
}
await ws.send(json.dumps(msg))
# 等待一段时间,确保连接注册完成
await asyncio.sleep(1)
msg = {"lwp": "/r/SyncStatus/ackDiff", "headers": {"mid": "5701741704675979 0"}, "body": [
{"pipeline": "sync", "tooLong2Tag": "PNM,1", "channel": "sync", "topic": "sync", "highPts": 0,
"pts": int(time.time() * 1000) * 1000, "seq": 0, "timestamp": int(time.time() * 1000)}]}
await ws.send(json.dumps(msg))
logger.info('连接注册完成')
def is_chat_message(self, message):
"""判断是否为用户聊天消息"""
try:
return (
isinstance(message, dict)
and "1" in message
and isinstance(message["1"], dict) # 确保是字典类型
and "10" in message["1"]
and isinstance(message["1"]["10"], dict) # 确保是字典类型
and "reminderContent" in message["1"]["10"]
)
except Exception:
return False
def is_sync_package(self, message_data):
"""判断是否为同步包消息"""
try:
return (
isinstance(message_data, dict)
and "body" in message_data
and "syncPushPackage" in message_data["body"]
and "data" in message_data["body"]["syncPushPackage"]
and len(message_data["body"]["syncPushPackage"]["data"]) > 0
)
except Exception:
return False
def is_typing_status(self, message):
"""判断是否为用户正在输入状态消息"""
try:
return (
isinstance(message, dict)
and "1" in message
and isinstance(message["1"], list)
and len(message["1"]) > 0
and isinstance(message["1"][0], dict)
and "1" in message["1"][0]
and isinstance(message["1"][0]["1"], str)
and "@goofish" in message["1"][0]["1"]
)
except Exception:
return False
async def handle_message(self, message_data, websocket):
"""处理所有类型的消息"""
try:
try:
message = message_data
ack = {
"code": 200,
"headers": {
"mid": message["headers"]["mid"] if "mid" in message["headers"] else generate_mid(),
"sid": message["headers"]["sid"] if "sid" in message["headers"] else '',
}
}
if 'app-key' in message["headers"]:
ack["headers"]["app-key"] = message["headers"]["app-key"]
if 'ua' in message["headers"]:
ack["headers"]["ua"] = message["headers"]["ua"]
if 'dt' in message["headers"]:
ack["headers"]["dt"] = message["headers"]["dt"]
await websocket.send(json.dumps(ack))
except Exception as e:
pass
# 如果不是同步包消息,直接返回
if not self.is_sync_package(message_data):
return
# 获取并解密数据
sync_data = message_data["body"]["syncPushPackage"]["data"][0]
# 检查是否有必要的字段
if "data" not in sync_data:
logger.debug("同步包中无data字段")
return
# 解密数据
try:
data = sync_data["data"]
try:
data = base64.b64decode(data).decode("utf-8")
data = json.loads(data)
# logger.info(f"无需解密 message: {data}")
return
except Exception as e:
# logger.info(f'加密数据: {data}')
decrypted_data = decrypt(data)
message = json.loads(decrypted_data)
except Exception as e:
logger.error(f"消息解密失败: {e}")
return
try:
# 判断是否为订单消息,需要自行编写付款后的逻辑
if message['3']['redReminder'] == '等待买家付款':
user_id = message['1'].split('@')[0]
user_url = f'https://www.goofish.com/personal?userId={user_id}'
logger.info(f'等待买家 {user_url} 付款')
return
elif message['3']['redReminder'] == '交易关闭':
user_id = message['1'].split('@')[0]
user_url = f'https://www.goofish.com/personal?userId={user_id}'
logger.info(f'卖家 {user_url} 交易关闭')
return
elif message['3']['redReminder'] == '等待卖家发货':
user_id = message['1'].split('@')[0]
user_url = f'https://www.goofish.com/personal?userId={user_id}'
logger.info(f'交易成功 {user_url} 等待卖家发货')
return
except:
pass
# 判断消息类型
if self.is_typing_status(message):
logger.debug("用户正在输入")
return
elif not self.is_chat_message(message):
logger.debug("其他非聊天消息")
logger.debug(f"原始消息: {message}")
return
# 处理聊天消息
create_time = int(message["1"]["5"])
send_user_name = message["1"]["10"]["reminderTitle"]
send_user_id = message["1"]["10"]["senderUserId"]
send_message = message["1"]["10"]["reminderContent"]
# 时效性验证(过滤5分钟前消息)
if (time.time() * 1000 - create_time) > 300000:
logger.debug("过期消息丢弃")
return
if send_user_id == self.myid:
logger.debug("过滤自身消息")
return
url_info = message["1"]["10"]["reminderUrl"]
item_id = url_info.split("itemId=")[1].split("&")[0] if "itemId=" in url_info else None
if not item_id:
logger.warning("无法获取商品ID")
return
# 从数据库中获取商品信息,如果不存在则从API获取并保存
item_info = self.context_manager.get_item_info(item_id)
if not item_info:
logger.info(f"从API获取商品信息: {item_id}")
api_result = self.xianyu.get_item_info(item_id)
if 'data' in api_result and 'itemDO' in api_result['data']:
item_info = api_result['data']['itemDO']
# 保存商品信息到数据库
self.context_manager.save_item_info(item_id, item_info)
else:
logger.warning(f"获取商品信息失败: {api_result}")
return
else:
logger.info(f"从数据库获取商品信息: {item_id}")
item_description = f"{item_info['desc']};当前商品售卖价格为:{str(item_info['soldPrice'])}"
logger.info(f"user: {send_user_name}, 发送消息: {send_message}")
# 添加用户消息到上下文
self.context_manager.add_message(send_user_id, item_id, "user", send_message)
# 获取完整的对话上下文
context = self.context_manager.get_context(send_user_id, item_id)
# 生成回复
bot_reply = bot.generate_reply(
send_message,
item_description,
context=context
)
# 检查是否为价格意图,如果是则增加议价次数
if bot.last_intent == "price":
self.context_manager.increment_bargain_count(send_user_id, item_id)
bargain_count = self.context_manager.get_bargain_count(send_user_id, item_id)
logger.info(f"用户 {send_user_name} 对商品 {item_id} 的议价次数: {bargain_count}")
# 添加机器人回复到上下文
self.context_manager.add_message(send_user_id, item_id, "assistant", bot_reply)
logger.info(f"机器人回复: {bot_reply}")
cid = message["1"]["2"].split('@')[0]
await self.send_msg(websocket, cid, send_user_id, bot_reply)
except Exception as e:
logger.error(f"处理消息时发生错误: {str(e)}")
logger.debug(f"原始消息: {message_data}")
async def send_heartbeat(self, ws):
"""发送心跳包并等待响应"""
try:
heartbeat_mid = generate_mid()
heartbeat_msg = {
"lwp": "/!",
"headers": {
"mid": heartbeat_mid
}
}
await ws.send(json.dumps(heartbeat_msg))
self.last_heartbeat_time = time.time()
logger.debug("心跳包已发送")
return heartbeat_mid
except Exception as e:
logger.error(f"发送心跳包失败: {e}")
raise
async def heartbeat_loop(self, ws):
"""心跳维护循环"""
while True:
try:
current_time = time.time()
# 检查是否需要发送心跳
if current_time - self.last_heartbeat_time >= self.heartbeat_interval:
await self.send_heartbeat(ws)
# 检查上次心跳响应时间,如果超时则认为连接已断开
if (current_time - self.last_heartbeat_response) > (self.heartbeat_interval + self.heartbeat_timeout):
logger.warning("心跳响应超时,可能连接已断开")
break
await asyncio.sleep(1)
except Exception as e:
logger.error(f"心跳循环出错: {e}")
break
async def handle_heartbeat_response(self, message_data):
"""处理心跳响应"""
try:
if (
isinstance(message_data, dict)
and "headers" in message_data
and "mid" in message_data["headers"]
and "code" in message_data
and message_data["code"] == 200
):
self.last_heartbeat_response = time.time()
logger.debug("收到心跳响应")
return True
except Exception as e:
logger.error(f"处理心跳响应出错: {e}")
return False
async def main(self):
while True:
try:
headers = {
"Cookie": self.cookies_str,
"Host": "wss-goofish.dingtalk.com",
"Connection": "Upgrade",
"Pragma": "no-cache",
"Cache-Control": "no-cache",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36",
"Origin": "https://www.goofish.com",
"Accept-Encoding": "gzip, deflate, br, zstd",
"Accept-Language": "zh-CN,zh;q=0.9",
}
async with websockets.connect(self.base_url, extra_headers=headers) as websocket:
self.ws = websocket
await self.init(websocket)
# 初始化心跳时间
self.last_heartbeat_time = time.time()
self.last_heartbeat_response = time.time()
# 启动心跳任务
self.heartbeat_task = asyncio.create_task(self.heartbeat_loop(websocket))
async for message in websocket:
try:
message_data = json.loads(message)
# 处理心跳响应
if await self.handle_heartbeat_response(message_data):
continue
# 发送通用ACK响应
if "headers" in message_data and "mid" in message_data["headers"]:
ack = {
"code": 200,
"headers": {
"mid": message_data["headers"]["mid"],
"sid": message_data["headers"].get("sid", "")
}
}
# 复制其他可能的header字段
for key in ["app-key", "ua", "dt"]:
if key in message_data["headers"]:
ack["headers"][key] = message_data["headers"][key]
await websocket.send(json.dumps(ack))
# 处理其他消息
await self.handle_message(message_data, websocket)
except json.JSONDecodeError:
logger.error("消息解析失败")
except Exception as e:
logger.error(f"处理消息时发生错误: {str(e)}")
logger.debug(f"原始消息: {message}")
except websockets.exceptions.ConnectionClosed:
logger.warning("WebSocket连接已关闭")
if self.heartbeat_task:
self.heartbeat_task.cancel()
try:
await self.heartbeat_task
except asyncio.CancelledError:
pass
await asyncio.sleep(5) # 等待5秒后重连
except Exception as e:
logger.error(f"连接发生错误: {e}")
if self.heartbeat_task:
self.heartbeat_task.cancel()
try:
await self.heartbeat_task
except asyncio.CancelledError:
pass
await asyncio.sleep(5) # 等待5秒后重连
# 心跳相关配置
self.heartbeat_interval = 15 # 心跳间隔15秒
self.heartbeat_timeout = 5 # 心跳超时5秒
self.last_heartbeat_time = 0
self.last_heartbeat_response = 0
self.heartbeat_task = None
self.ws = None
async def send_msg(self, ws, cid, toid, text):
text = {
"contentType": 1,
"text": {
"text": text
}
}
text_base64 = str(base64.b64encode(json.dumps(text).encode('utf-8')), 'utf-8')
msg = {
"lwp": "/r/MessageSend/sendByReceiverScope",
"headers": {
"mid": generate_mid()
},
"body": [
{
"uuid": generate_uuid(),
"cid": f"{cid}@goofish",
"conversationType": 1,
"content": {
"contentType": 101,
"custom": {
"type": 1,
"data": text_base64
}
},
"redPointPolicy": 0,
"extension": {
"extJson": "{}"
},
"ctx": {
"appVersion": "1.0",
"platform": "web"
},
"mtags": {},
"msgReadStatusSetting": 1
},
{
"actualReceivers": [
f"{toid}@goofish",
f"{self.myid}@goofish"
]
}
]
}
await ws.send(json.dumps(msg))
async def init(self, ws):
token = self.xianyu.get_token(self.device_id)['data']['accessToken']
msg = {
"lwp": "/reg",
"headers": {
"cache-header": "app-key token ua wv",
"app-key": "444e9908a51d1cb236a27862abc769c9",
"token": token,
"ua": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36 DingTalk(2.1.5) OS(Windows/10) Browser(Chrome/133.0.0.0) DingWeb/2.1.5 IMPaaS DingWeb/2.1.5",
"dt": "j",
"wv": "im:3,au:3,sy:6",
"sync": "0,0;0;0;",
"did": self.device_id,
"mid": generate_mid()
}
}
await ws.send(json.dumps(msg))
# 等待一段时间,确保连接注册完成
await asyncio.sleep(1)
msg = {"lwp": "/r/SyncStatus/ackDiff", "headers": {"mid": "5701741704675979 0"}, "body": [
{"pipeline": "sync", "tooLong2Tag": "PNM,1", "channel": "sync", "topic": "sync", "highPts": 0,
"pts": int(time.time() * 1000) * 1000, "seq": 0, "timestamp": int(time.time() * 1000)}]}
await ws.send(json.dumps(msg))
logger.info('连接注册完成')
def is_chat_message(self, message):
"""判断是否为用户聊天消息"""
try:
return (
isinstance(message, dict)
and "1" in message
and isinstance(message["1"], dict) # 确保是字典类型
and "10" in message["1"]
and isinstance(message["1"]["10"], dict) # 确保是字典类型
and "reminderContent" in message["1"]["10"]
)
except Exception:
return False
def is_sync_package(self, message_data):
"""判断是否为同步包消息"""
try:
return (
isinstance(message_data, dict)
and "body" in message_data
and "syncPushPackage" in message_data["body"]
and "data" in message_data["body"]["syncPushPackage"]
and len(message_data["body"]["syncPushPackage"]["data"]) > 0
)
except Exception:
return False
def is_typing_status(self, message):
"""判断是否为用户正在输入状态消息"""
try:
return (
isinstance(message, dict)
and "1" in message
and isinstance(message["1"], list)
and len(message["1"]) > 0
and isinstance(message["1"][0], dict)
and "1" in message["1"][0]
and isinstance(message["1"][0]["1"], str)
and "@goofish" in message["1"][0]["1"]
)
except Exception:
return False
async def handle_message(self, message_data, websocket):
"""处理所有类型的消息"""
try:
try:
message = message_data
ack = {
"code": 200,
"headers": {
"mid": message["headers"]["mid"] if "mid" in message["headers"] else generate_mid(),
"sid": message["headers"]["sid"] if "sid" in message["headers"] else '',
}
}
if 'app-key' in message["headers"]:
ack["headers"]["app-key"] = message["headers"]["app-key"]
if 'ua' in message["headers"]:
ack["headers"]["ua"] = message["headers"]["ua"]
if 'dt' in message["headers"]:
ack["headers"]["dt"] = message["headers"]["dt"]
await websocket.send(json.dumps(ack))
except Exception as e:
pass
# 如果不是同步包消息,直接返回
if not self.is_sync_package(message_data):
return
# 获取并解密数据
sync_data = message_data["body"]["syncPushPackage"]["data"][0]
# 检查是否有必要的字段
if "data" not in sync_data:
logger.debug("同步包中无data字段")
return
# 解密数据
try:
data = sync_data["data"]
try:
data = base64.b64decode(data).decode("utf-8")
data = json.loads(data)
# logger.info(f"无需解密 message: {data}")
return
except Exception as e:
# logger.info(f'加密数据: {data}')
decrypted_data = decrypt(data)
message = json.loads(decrypted_data)
except Exception as e:
logger.error(f"消息解密失败: {e}")
return
try:
# 判断是否为订单消息,需要自行编写付款后的逻辑
if message['3']['redReminder'] == '等待买家付款':
user_id = message['1'].split('@')[0]
user_url = f'https://www.goofish.com/personal?userId={user_id}'
logger.info(f'等待买家 {user_url} 付款')
return
elif message['3']['redReminder'] == '交易关闭':
user_id = message['1'].split('@')[0]
user_url = f'https://www.goofish.com/personal?userId={user_id}'
logger.info(f'卖家 {user_url} 交易关闭')
return
elif message['3']['redReminder'] == '等待卖家发货':
user_id = message['1'].split('@')[0]
user_url = f'https://www.goofish.com/personal?userId={user_id}'
logger.info(f'交易成功 {user_url} 等待卖家发货')
return
except:
pass
# 判断消息类型
if self.is_typing_status(message):
logger.debug("用户正在输入")
return
elif not self.is_chat_message(message):
logger.debug("其他非聊天消息")
logger.debug(f"原始消息: {message}")
return
# 处理聊天消息
create_time = int(message["1"]["5"])
send_user_name = message["1"]["10"]["reminderTitle"]
send_user_id = message["1"]["10"]["senderUserId"]
send_message = message["1"]["10"]["reminderContent"]
# 时效性验证(过滤5分钟前消息)
if (time.time() * 1000 - create_time) > 300000:
logger.debug("过期消息丢弃")
return
if send_user_id == self.myid:
logger.debug("过滤自身消息")
return
url_info = message["1"]["10"]["reminderUrl"]
item_id = url_info.split("itemId=")[1].split("&")[0] if "itemId=" in url_info else None
if not item_id:
logger.warning("无法获取商品ID")
return
# 从数据库中获取商品信息,如果不存在则从API获取并保存
item_info = self.context_manager.get_item_info(item_id)
if not item_info:
logger.info(f"从API获取商品信息: {item_id}")
api_result = self.xianyu.get_item_info(item_id)
if 'data' in api_result and 'itemDO' in api_result['data']:
item_info = api_result['data']['itemDO']
# 保存商品信息到数据库
self.context_manager.save_item_info(item_id, item_info)
else:
logger.warning(f"获取商品信息失败: {api_result}")
return
else:
logger.info(f"从数据库获取商品信息: {item_id}")
item_description = f"{item_info['desc']};当前商品售卖价格为:{str(item_info['soldPrice'])}"
logger.info(f"user: {send_user_name}, 发送消息: {send_message}")
# 添加用户消息到上下文
self.context_manager.add_message(send_user_id, item_id, "user", send_message)
# 获取完整的对话上下文
context = self.context_manager.get_context(send_user_id, item_id)
# 生成回复
bot_reply = bot.generate_reply(
send_message,
item_description,
context=context
)
# 检查是否为价格意图,如果是则增加议价次数
if bot.last_intent == "price":
self.context_manager.increment_bargain_count(send_user_id, item_id)
bargain_count = self.context_manager.get_bargain_count(send_user_id, item_id)
logger.info(f"用户 {send_user_name} 对商品 {item_id} 的议价次数: {bargain_count}")
# 添加机器人回复到上下文
self.context_manager.add_message(send_user_id, item_id, "assistant", bot_reply)
logger.info(f"机器人回复: {bot_reply}")
cid = message["1"]["2"].split('@')[0]
await self.send_msg(websocket, cid, send_user_id, bot_reply)
except Exception as e:
logger.error(f"处理消息时发生错误: {str(e)}")
logger.debug(f"原始消息: {message_data}")
async def send_heartbeat(self, ws):
"""发送心跳包并等待响应"""
try:
heartbeat_mid = generate_mid()
heartbeat_msg = {
"lwp": "/!",
"headers": {
"mid": heartbeat_mid
}
}
await ws.send(json.dumps(heartbeat_msg))
self.last_heartbeat_time = time.time()
logger.debug("心跳包已发送")
return heartbeat_mid
except Exception as e:
logger.error(f"发送心跳包失败: {e}")
raise
async def heartbeat_loop(self, ws):
"""心跳维护循环"""
while True:
try:
current_time = time.time()
# 检查是否需要发送心跳
if current_time - self.last_heartbeat_time >= self.heartbeat_interval:
await self.send_heartbeat(ws)
# 检查上次心跳响应时间,如果超时则认为连接已断开
if (current_time - self.last_heartbeat_response) > (self.heartbeat_interval + self.heartbeat_timeout):
logger.warning("心跳响应超时,可能连接已断开")
break
await asyncio.sleep(1)
except Exception as e:
logger.error(f"心跳循环出错: {e}")
break
async def handle_heartbeat_response(self, message_data):
"""处理心跳响应"""
try:
if (
isinstance(message_data, dict)
and "headers" in message_data
and "mid" in message_data["headers"]
and "code" in message_data
and message_data["code"] == 200
):
self.last_heartbeat_response = time.time()
logger.debug("收到心跳响应")
return True
except Exception as e:
logger.error(f"处理心跳响应出错: {e}")
return False
async def main(self):
while True:
try:
headers = {
"Cookie": self.cookies_str,
"Host": "wss-goofish.dingtalk.com",
"Connection": "Upgrade",
"Pragma": "no-cache",
"Cache-Control": "no-cache",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36",
"Origin": "https://www.goofish.com",
"Accept-Encoding": "gzip, deflate, br, zstd",
"Accept-Language": "zh-CN,zh;q=0.9",
}
async with websockets.connect(self.base_url, extra_headers=headers) as websocket:
self.ws = websocket
await self.init(websocket)
# 初始化心跳时间
self.last_heartbeat_time = time.time()
self.last_heartbeat_response = time.time()
# 启动心跳任务
self.heartbeat_task = asyncio.create_task(self.heartbeat_loop(websocket))
async for message in websocket:
try:
message_data = json.loads(message)
# 处理心跳响应
if await self.handle_heartbeat_response(message_data):
continue
# 发送通用ACK响应
if "headers" in message_data and "mid" in message_data["headers"]:
ack = {
"code": 200,
"headers": {
"mid": message_data["headers"]["mid"],
"sid": message_data["headers"].get("sid", "")
}
}
# 复制其他可能的header字段
for key in ["app-key", "ua", "dt"]:
if key in message_data["headers"]:
ack["headers"][key] = message_data["headers"][key]
await websocket.send(json.dumps(ack))
# 处理其他消息
await self.handle_message(message_data, websocket)
except json.JSONDecodeError:
logger.error("消息解析失败")
except Exception as e:
logger.error(f"处理消息时发生错误: {str(e)}")
logger.debug(f"原始消息: {message}")
except websockets.exceptions.ConnectionClosed:
logger.warning("WebSocket连接已关闭")
if self.heartbeat_task:
self.heartbeat_task.cancel()
try:
await self.heartbeat_task
except asyncio.CancelledError:
pass
await asyncio.sleep(5) # 等待5秒后重连
except Exception as e:
logger.error(f"连接发生错误: {e}")
if self.heartbeat_task:
self.heartbeat_task.cancel()
try:
await self.heartbeat_task
except asyncio.CancelledError:
pass
await asyncio.sleep(5) # 等待5秒后重连
if name == 'main':
#加载环境变量 cookie
load_dotenv()
cookies_str = os.getenv("COOKIES_STR")
bot = XianyuReplyBot()
xianyuLive = XianyuLive(cookies_str)常驻进程asyncio.run(xianyuLive.main())
补充2、 XianyuAgent.py 代码
import re
from typing import List, Dict
import os
import openai
from loguru import loggerclass XianyuReplyBot:
def init(self):
# 设置 OpenAI API 配置(适配 openai 0.28.1)
openai.api_key = os.getenv("API_KEY")
openai.api_base = os.getenv("MODEL_BASE_URL") self._init_system_prompts()
self._init_agents()
self.router = IntentRouter(self.agents['classify'])
self.last_intent = None # 记录最后一次意图
def _init_agents(self):
"""初始化各领域Agent"""
self.agents = {
'classify': ClassifyAgent(self.classify_prompt, self._safe_filter),
'price': PriceAgent(self.price_prompt, self._safe_filter),
'tech': TechAgent(self.tech_prompt, self._safe_filter),
'default': DefaultAgent(self.default_prompt, self._safe_filter),
}
def _init_system_prompts(self):
"""初始化各Agent专用提示词,直接从文件中加载"""
prompt_dir = "prompts"
try:
with open(os.path.join(prompt_dir, "classify_prompt.txt"), "r", encoding="utf-8") as f:
self.classify_prompt = f.read()
logger.debug(f"已加载分类提示词,长度: {len(self.classify_prompt)} 字符")
with open(os.path.join(prompt_dir, "price_prompt.txt"), "r", encoding="utf-8") as f:
self.price_prompt = f.read()
logger.debug(f"已加载价格提示词,长度: {len(self.price_prompt)} 字符")
with open(os.path.join(prompt_dir, "tech_prompt.txt"), "r", encoding="utf-8") as f:
self.tech_prompt = f.read()
logger.debug(f"已加载技术提示词,长度: {len(self.tech_prompt)} 字符")
with open(os.path.join(prompt_dir, "default_prompt.txt"), "r", encoding="utf-8") as f:
self.default_prompt = f.read()
logger.debug(f"已加载默认提示词,长度: {len(self.default_prompt)} 字符")
logger.info("成功加载所有提示词")
except Exception as e:
logger.error(f"加载提示词时出错: {e}")
raise
def _safe_filter(self, text: str) -> str:
blocked_phrases = ["微信", "QQ", "支付宝", "银行卡", "线下"]
return "[安全提醒]请通过平台沟通" if any(p in text for p in blocked_phrases) else text
def format_history(self, context: List[Dict]) -> str:
user_assistant_msgs = [msg for msg in context if msg['role'] in ['user', 'assistant']]
return "\n".join([f"{msg['role']}: {msg['content']}" for msg in user_assistant_msgs])
def generate_reply(self, user_msg: str, item_desc: str, context: List[Dict]) -> str:
formatted_context = self.format_history(context)
detected_intent = self.router.detect(user_msg, item_desc, formatted_context)
internal_intents = {'classify'}
if detected_intent in self.agents and detected_intent not in internal_intents:
agent = self.agents[detected_intent]
logger.info(f'意图识别完成: {detected_intent}')
self.last_intent = detected_intent
else:
agent = self.agents['default']
logger.info(f'意图识别完成: default')
self.last_intent = 'default'
bargain_count = self._extract_bargain_count(context)
logger.info(f'议价次数: {bargain_count}')
return agent.generate(
user_msg=user_msg,
item_desc=item_desc,
context=formatted_context,
bargain_count=bargain_count
)
def _extract_bargain_count(self, context: List[Dict]) -> int:
for msg in context:
if msg['role'] == 'system' and '议价次数' in msg['content']:
try:
match = re.search(r'议价次数[::]\s*(\d+)', msg['content'])
if match:
return int(match.group(1))
except Exception:
pass
return 0
def reload_prompts(self):
logger.info("正在重新加载提示词...")
self._init_system_prompts()
self._init_agents()
logger.info("提示词重新加载完成")
self._init_system_prompts()
self._init_agents()
self.router = IntentRouter(self.agents['classify'])
self.last_intent = None # 记录最后一次意图
def _init_agents(self):
"""初始化各领域Agent"""
self.agents = {
'classify': ClassifyAgent(self.classify_prompt, self._safe_filter),
'price': PriceAgent(self.price_prompt, self._safe_filter),
'tech': TechAgent(self.tech_prompt, self._safe_filter),
'default': DefaultAgent(self.default_prompt, self._safe_filter),
}
def _init_system_prompts(self):
"""初始化各Agent专用提示词,直接从文件中加载"""
prompt_dir = "prompts"
try:
with open(os.path.join(prompt_dir, "classify_prompt.txt"), "r", encoding="utf-8") as f:
self.classify_prompt = f.read()
logger.debug(f"已加载分类提示词,长度: {len(self.classify_prompt)} 字符")
with open(os.path.join(prompt_dir, "price_prompt.txt"), "r", encoding="utf-8") as f:
self.price_prompt = f.read()
logger.debug(f"已加载价格提示词,长度: {len(self.price_prompt)} 字符")
with open(os.path.join(prompt_dir, "tech_prompt.txt"), "r", encoding="utf-8") as f:
self.tech_prompt = f.read()
logger.debug(f"已加载技术提示词,长度: {len(self.tech_prompt)} 字符")
with open(os.path.join(prompt_dir, "default_prompt.txt"), "r", encoding="utf-8") as f:
self.default_prompt = f.read()
logger.debug(f"已加载默认提示词,长度: {len(self.default_prompt)} 字符")
logger.info("成功加载所有提示词")
except Exception as e:
logger.error(f"加载提示词时出错: {e}")
raise
def _safe_filter(self, text: str) -> str:
blocked_phrases = ["微信", "QQ", "支付宝", "银行卡", "线下"]
return "[安全提醒]请通过平台沟通" if any(p in text for p in blocked_phrases) else text
def format_history(self, context: List[Dict]) -> str:
user_assistant_msgs = [msg for msg in context if msg['role'] in ['user', 'assistant']]
return "\n".join([f"{msg['role']}: {msg['content']}" for msg in user_assistant_msgs])
def generate_reply(self, user_msg: str, item_desc: str, context: List[Dict]) -> str:
formatted_context = self.format_history(context)
detected_intent = self.router.detect(user_msg, item_desc, formatted_context)
internal_intents = {'classify'}
if detected_intent in self.agents and detected_intent not in internal_intents:
agent = self.agents[detected_intent]
logger.info(f'意图识别完成: {detected_intent}')
self.last_intent = detected_intent
else:
agent = self.agents['default']
logger.info(f'意图识别完成: default')
self.last_intent = 'default'
bargain_count = self._extract_bargain_count(context)
logger.info(f'议价次数: {bargain_count}')
return agent.generate(
user_msg=user_msg,
item_desc=item_desc,
context=formatted_context,
bargain_count=bargain_count
)
def _extract_bargain_count(self, context: List[Dict]) -> int:
for msg in context:
if msg['role'] == 'system' and '议价次数' in msg['content']:
try:
match = re.search(r'议价次数[::]\s*(\d+)', msg['content'])
if match:
return int(match.group(1))
except Exception:
pass
return 0
def reload_prompts(self):
logger.info("正在重新加载提示词...")
self._init_system_prompts()
self._init_agents()
logger.info("提示词重新加载完成")
class IntentRouter:
def init(self, classify_agent):
self.rules = {
'tech': {
'keywords': ['参数', '规格', '型号', '连接', '对比'],
'patterns': [r'和.+比']
},
'price': {
'keywords': ['便宜', '价', '砍价', '少点'],
'patterns': [r'\d+元', r'能少\d+']
}
}
self.classify_agent = classify_agentdef detect(self, user_msg: str, item_desc, context) -> str:
text_clean = re.sub(r'[^\w\u4e00-\u9fa5]', '', user_msg)
if any(kw in text_clean for kw in self.rules['tech']['keywords']):
return 'tech'
for pattern in self.rules['tech']['patterns']:
if re.search(pattern, text_clean):
return 'tech'
for intent in ['price']:
if any(kw in text_clean for kw in self.rules[intent]['keywords']):
return intent
for pattern in self.rules[intent]['patterns']:
if re.search(pattern, text_clean):
return intent
return self.classify_agent.generate(
user_msg=user_msg,
item_desc=item_desc,
context=context
)
def detect(self, user_msg: str, item_desc, context) -> str:
text_clean = re.sub(r'[^\w\u4e00-\u9fa5]', '', user_msg)
if any(kw in text_clean for kw in self.rules['tech']['keywords']):
return 'tech'
for pattern in self.rules['tech']['patterns']:
if re.search(pattern, text_clean):
return 'tech'
for intent in ['price']:
if any(kw in text_clean for kw in self.rules[intent]['keywords']):
return intent
for pattern in self.rules[intent]['patterns']:
if re.search(pattern, text_clean):
return intent
return self.classify_agent.generate(
user_msg=user_msg,
item_desc=item_desc,
context=context
)
class BaseAgent:
def init(self, system_prompt, safety_filter):
self.system_prompt = system_prompt
self.safety_filter = safety_filterdef generate(self, user_msg: str, item_desc: str, context: str, bargain_count: int = 0) -> str:
messages = self._build_messages(user_msg, item_desc, context)
response = self._call_llm(messages)
return self.safety_filter(response)
def _build_messages(self, user_msg: str, item_desc: str, context: str) -> List[Dict]:
return [
{"role": "system", "content": f"【商品信息】{item_desc}\n【你与客户对话历史】{context}\n{self.system_prompt}"},
{"role": "user", "content": user_msg}
]
def _call_llm(self, messages: List[Dict], temperature: float = 0.4) -> str:
response = openai.ChatCompletion.create(
model=os.getenv("MODEL_NAME", "qwen-max"),
messages=messages,
temperature=temperature,
max_tokens=500,
top_p=0.8
)
return response['choices'][0]['message']['content']
def generate(self, user_msg: str, item_desc: str, context: str, bargain_count: int = 0) -> str:
messages = self._build_messages(user_msg, item_desc, context)
response = self._call_llm(messages)
return self.safety_filter(response)
def _build_messages(self, user_msg: str, item_desc: str, context: str) -> List[Dict]:
return [
{"role": "system", "content": f"【商品信息】{item_desc}\n【你与客户对话历史】{context}\n{self.system_prompt}"},
{"role": "user", "content": user_msg}
]
def _call_llm(self, messages: List[Dict], temperature: float = 0.4) -> str:
response = openai.ChatCompletion.create(
model=os.getenv("MODEL_NAME", "qwen-max"),
messages=messages,
temperature=temperature,
max_tokens=500,
top_p=0.8
)
return response['choices'][0]['message']['content']
class PriceAgent(BaseAgent):
def generate(self, user_msg: str, item_desc: str, context: str, bargain_count: int = 0) -> str:
dynamic_temp = self._calc_temperature(bargain_count)
messages = self._build_messages(user_msg, item_desc, context)
messages[0]['content'] += f"\n▲当前议价轮次:{bargain_count}" response = openai.ChatCompletion.create(
model=os.getenv("MODEL_NAME", "qwen-max"),
messages=messages,
temperature=dynamic_temp,
max_tokens=500,
top_p=0.8
)
return self.safety_filter(response['choices'][0]['message']['content'])
def _calc_temperature(self, bargain_count: int) -> float:
return min(0.3 + bargain_count * 0.15, 0.9)
response = openai.ChatCompletion.create(
model=os.getenv("MODEL_NAME", "qwen-max"),
messages=messages,
temperature=dynamic_temp,
max_tokens=500,
top_p=0.8
)
return self.safety_filter(response['choices'][0]['message']['content'])
def _calc_temperature(self, bargain_count: int) -> float:
return min(0.3 + bargain_count * 0.15, 0.9)
class TechAgent(BaseAgent):
def generate(self, user_msg: str, item_desc: str, context: str, bargain_count: int = 0) -> str:
messages = self._build_messages(user_msg, item_desc, context) response = openai.ChatCompletion.create(
model=os.getenv("MODEL_NAME", "qwen-max"),
messages=messages,
temperature=0.4,
max_tokens=500,
top_p=0.8
)
return self.safety_filter(response['choices'][0]['message']['content'])
response = openai.ChatCompletion.create(
model=os.getenv("MODEL_NAME", "qwen-max"),
messages=messages,
temperature=0.4,
max_tokens=500,
top_p=0.8
)
return self.safety_filter(response['choices'][0]['message']['content'])
class ClassifyAgent(BaseAgent):
def generate(self, **args) -> str:
return super().generate(**args)class DefaultAgent(BaseAgent):
def _call_llm(self, messages: List[Dict], *args) -> str:
response = openai.ChatCompletion.create(
model=os.getenv("MODEL_NAME", "qwen-max"),
messages=messages,
temperature=0.7,
max_tokens=500,
top_p=0.8
)
return response['choices'][0]['message']['content']
评论区