写在前面

本人在在网上冲浪时,看见了一个非常有意思的项目,这里是原项目地址。这个项目主要对基于 UIAutomation 的开源 Python 微信自动化库进行了封装,实现很多日常的微信操作的自动化,如自动发送消息、自动添加好友、自动回复、自动获取聊天记录、图片、文件等功能

接入大模型自动回复信息

这是接入大模型后的效果,感觉还不错不过好像做不到联系上下文,这只是一个简单的demo。

在本人的大学时期就想过这件事情,不过当时大部分都是调用微信的接口去实现,最终结局就是微信封号。

我原本是想使用的openai的API,原作者大大也贴心的给了一个接入openai的demo但是调用需要验证手机号,于是便是用了阿里的通义千问,这里是官方的文档

一个简单demo

import time
import threading
import logging
from wxauto import WeChat
import dashscope
from http import HTTPStatus

dashscope.api_key = "xxxxxxxxxxxxxx"

# 配置日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def sample_sync_call(prompt_text):
    try:
        resp = dashscope.Generation.call(
            model='qwen-turbo',  # 在这里指定模型名称
            prompt=prompt_text
        )
        if resp.status_code == HTTPStatus.OK:
            logging.info(f"API调用成功:{resp.usage}")  # 使用token数
            return resp.output
        else:
            logging.error(f"API调用失败:错误码 {resp.code},错误信息 {resp.message}")
            return "对不起,我无法处理你的请求。"
    except Exception as e:
        logging.error(f"API调用过程中发生错误:{e}")
        return "对不起,我无法处理你的请求。"

def extract_text_from_dict(data_dict, key='text'):
    """
    从字典中提取指定键的内容,如果没有指定键,则返回默认值
    """
    return data_dict.get(key, '对不起,我无法处理你的请求。')


# 获取微信窗口对象
wx = WeChat()

# 监听列表和回复列表
listen_list = [
    '陈彦鹏',
    '佘瑞婕',
    '王武斌',
    '吴耀华'
]

def add_listeners():
    """添加监听聊天对象"""
    for who in listen_list:
        try:
            wx.AddListenChat(who=who, savepic=False)
            logging.info(f"添加监听聊天对象成功:{who}")
        except Exception as e:
            logging.error(f"添加监听聊天对象失败:{who},错误:{e}")

def handle_messages():
    """处理消息并回复"""
    wait = 1  # 设置1秒查看一次是否有新消息
    while True:
        try:
            msgs = wx.GetListenMessage()
            for chat in msgs:
                who = chat.who              # 获取聊天窗口名(人或群名)
                one_msgs = msgs.get(chat)   # 获取消息内容
                for msg in one_msgs:
                    msgtype = msg.type       # 获取消息类型
                    content = msg.content    # 获取消息内容,字符串类型的消息内容
                    logging.info(f'【{who}】:{content}')
                    if msgtype == 'friend':
                        response = sample_sync_call(content)
                        reply = extract_text_from_dict(response)
                        logging.info(f'回复【{who}】:{reply}')
                        chat.SendMsg(reply)  # 回复内容
            time.sleep(wait)
        except Exception as e:
            logging.error(f"处理消息失败:{e}")

if __name__ == "__main__":
    add_listeners()

    # 创建一个线程来处理消息
    message_thread = threading.Thread(target=handle_messages)
    message_thread.daemon = True  # 设置为守护线程,程序退出时该线程自动退出
    message_thread.start()

    # 主线程保持运行
    while True:
        time.sleep(10)

引入 wxauto 库:

from wxauto import WeChat

初始化微信对象:

wx = WeChat()

原作者这里提供了更详细的文档。

一个懒人