写在前面
本人在在网上冲浪时,看见了一个非常有意思的项目,这里是原项目地址。这个项目主要对基于 UIAutomation 的开源 Python 微信自动化库进行了封装,实现很多日常的微信操作的自动化,如自动发送消息、自动添加好友、自动回复、自动获取聊天记录、图片、文件等功能
接入大模型自动回复信息
这是接入大模型后的效果,感觉还不错不过好像做不到联系上下文,这只是一个简单的demo。
在本人的大学时期就想过这件事情,不过当时大部分都是调用微信的接口去实现,最终结局就是微信封号。
我原本是想使用的openai的API,原作者大大也贴心的给了一个接入openai的demo但是调用需要验证手机号,于是便是用了阿里的通义千问,这里是官方的文档
一个简单demo
import time
import threading
import logging
from wxauto import WeChat
import dashscope
from http import HTTPStatus
dashscope.api_key = "xxxxxxxxxxxxxx"
# 配置日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def sample_sync_call(prompt_text):
try:
resp = dashscope.Generation.call(
model='qwen-turbo', # 在这里指定模型名称
prompt=prompt_text
)
if resp.status_code == HTTPStatus.OK:
logging.info(f"API调用成功:{resp.usage}") # 使用token数
return resp.output
else:
logging.error(f"API调用失败:错误码 {resp.code},错误信息 {resp.message}")
return "对不起,我无法处理你的请求。"
except Exception as e:
logging.error(f"API调用过程中发生错误:{e}")
return "对不起,我无法处理你的请求。"
def extract_text_from_dict(data_dict, key='text'):
"""
从字典中提取指定键的内容,如果没有指定键,则返回默认值
"""
return data_dict.get(key, '对不起,我无法处理你的请求。')
# 获取微信窗口对象
wx = WeChat()
# 监听列表和回复列表
listen_list = [
'陈彦鹏',
'佘瑞婕',
'王武斌',
'吴耀华'
]
def add_listeners():
"""添加监听聊天对象"""
for who in listen_list:
try:
wx.AddListenChat(who=who, savepic=False)
logging.info(f"添加监听聊天对象成功:{who}")
except Exception as e:
logging.error(f"添加监听聊天对象失败:{who},错误:{e}")
def handle_messages():
"""处理消息并回复"""
wait = 1 # 设置1秒查看一次是否有新消息
while True:
try:
msgs = wx.GetListenMessage()
for chat in msgs:
who = chat.who # 获取聊天窗口名(人或群名)
one_msgs = msgs.get(chat) # 获取消息内容
for msg in one_msgs:
msgtype = msg.type # 获取消息类型
content = msg.content # 获取消息内容,字符串类型的消息内容
logging.info(f'【{who}】:{content}')
if msgtype == 'friend':
response = sample_sync_call(content)
reply = extract_text_from_dict(response)
logging.info(f'回复【{who}】:{reply}')
chat.SendMsg(reply) # 回复内容
time.sleep(wait)
except Exception as e:
logging.error(f"处理消息失败:{e}")
if __name__ == "__main__":
add_listeners()
# 创建一个线程来处理消息
message_thread = threading.Thread(target=handle_messages)
message_thread.daemon = True # 设置为守护线程,程序退出时该线程自动退出
message_thread.start()
# 主线程保持运行
while True:
time.sleep(10)
引入 wxauto 库:
from wxauto import WeChat
初始化微信对象:
wx = WeChat()
原作者这里提供了更详细的文档。