网站搭建 / 计算机技术 · 2021年2月20日 0

使用go-cqhttp结合python作自己的机器人(上)

前言

想搭建个QQ机器人玩玩,这波不一定能成功,但是试试总是好的

 

 

 

 

参考文章:

https://blog.csdn.net/xvktdmjg/article/details/113484966?utm_medium=distribute.wap_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.wap_blog_relevant_pic&depth_1-utm_source=distribute.wap_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.wap_blog_relevant_pic

https://docs.go-cqhttp.org/guide/eventfilter.html#%E7%A4%BA%E4%BE%8B

 


故事背景

各种框架都只是机器人的各种实现,每个框架之间接口都不通用,编程语言也不尽相同,想要开发一个自己的机器人,换一套框架所有的代码都要推到重来,所以亟需一个规范统一各个框架,所以OneBot 诞生了,先来看看OneBot 是啥:

OneBot 标准

一个聊天机器人应用接口标准

简单

接口简单易懂,可轻松接入。

兼容性

兼容原 CQHTTP 插件,零负担迁移。

兼容原 CQHTTP 插件,这个标准和CQHTTP有啥关系?CQHTTP是酷Q的一个插件,这个插件可以提供HTTP协议的接口供第三方系统调用,这样就和具体语言无关,什么语言都可以调用机器人接口实现自己的逻辑,而且基于CQHTTP 已经实现了很多功能的机器人了,详情见上面优秀机器人框架,为了不让以前所有的优秀代码都要推倒重来,所以OneBot 干脆直接在CQHTTP 的基础之上指定标准,这就是典型的先有实现,后有标准的例子。

在这个标准的基础之上,有了各种编程语言的实现,我们称之为生态,如下

理论上,基于 OneBot 标准开发的任何 SDK、框架和机器人应用,都可以无缝地在下面的不同实现中切换。当然,在一小部分细节上各实现可能有一些不同,这也是本项目希望推动解决的问题。

项目地址平台核心作者备注richardchien/coolq-http-apiCKYUrichardchien可在 Mirai 平台使用 mirai-native 加载Mrs4s/go-cqhttpMiraiGoMrs4syyuueexxiinngg/cqhttp-miraiMiraiyyuueexxiinnggtakayama-lily/onebotOICQtakayamaYiwen-Chan/OneBot-YaYa先驱Kanri

MiraiGo就是GO语言实现的安卓QQ协议,API很原始,相当于一个类库,go-cqhttp是在MiraiGo的基础之上封装的HTTP协议和Websocket协议接口的框架,得益于GO语言先天的优势(资源占用少,运行简单,协程并发高),总之就是性能高,引用项目原话

在关闭数据库的情况下, 加载 25 个好友 128 个群运行 24 小时后内存使用为 10MB 左右. 开启数据库后内存使用将根据消息量增加 10-20MB, 如果系统内存小于 128M 建议关闭数据库使用.

本文选择go-cqhttp作为实战框架

 

来到了最激动人心的实战环节,先看一下OneBot 标准的接口列表,参考https://github.com/howmanybots/onebot/blob/master/v11/specs/api/public.md


搭建

环境配置

首先我个人是在一个公网上的CentOS7服务器上进行的配置

 

下载安装

登录CentOS服务器,然后:

mkdir ~/QQBOT && cd ~/QQBOT
# 下载
wget https://github.com/Mrs4s/go-cqhttp/releases/download/v0.9.39/go-cqhttp-v0.9.39-linux-amd64
# 改名
mv go-cqhttp-v0.9.39-linux-amd64 go-cqhttp
# 赋权
chmod +x go-cqhttp

配置文件

下面我们来写一个配置文件保存到~/QQBOT/config.json文件中,注意将一些信息写好,QQ、QQ密码等

web_ui,不知道干嘛的先关掉,HTTP端口为5700,WS端口为6700(此例用HTTP不用WS,所以先把WS的关掉),这些都是默认的不用改,还有个上报端口后面再说,此处先不做解释

建议用FTP客户端上传上去

{
    // QQ号
    uin: 
    // QQ密码
    password: ""
    // 是否启用密码加密
    encrypt_password: false
    // 加密后的密码, 如未启用密码加密将为空, 请勿随意修改.
    password_encrypted: ""
    // 是否启用内置数据库
    // 启用将会增加10-20MB的内存占用和一定的磁盘空间
    // 关闭将无法使用 撤回 回复 get_msg 等上下文相关功能
    enable_db: true
    // 访问密钥, 强烈推荐在公网的服务器设置
    access_token: ""
    // 重连设置
    relogin: {
        // 是否启用自动重连
        // 如不启用掉线后将不会自动重连
        enabled: true
        // 重连延迟, 单位秒
        relogin_delay: 3
        // 最大重连次数, 0为无限制
        max_relogin_times: 0
    }
    // API限速设置
    // 该设置为全局生效
    // 原 cqhttp 虽然启用了 rate_limit 后缀, 但是基本没插件适配
    // 目前该限速设置为令牌桶算法, 请参考: 
    // https://baike.baidu.com/item/%E4%BB%A4%E7%89%8C%E6%A1%B6%E7%AE%97%E6%B3%95/6597000?fr=aladdin
    _rate_limit: {
        // 是否启用限速
        enabled: false
        // 令牌回复频率, 单位秒
        frequency: 1
        // 令牌桶大小
        bucket_size: 1
    }
    // 是否忽略无效的CQ码
    // 如果为假将原样发送
    ignore_invalid_cqcode: false
    // 是否强制分片发送消息
    // 分片发送将会带来更快的速度
    // 但是兼容性会有些问题
    force_fragmented: false
    // 心跳频率, 单位秒
    // -1 为关闭心跳
    heartbeat_interval: 0
    // HTTP设置
    http_config: {
        // 是否启用正向HTTP服务器
        enabled: true
        // 服务端监听地址
        host: 0.0.0.0
        // 服务端监听端口
        port: 5700
        // 反向HTTP超时时间, 单位秒
        // 最小值为5,小于5将会忽略本项设置
        timeout: 0
        // 反向HTTP POST地址列表
        // 格式: 
        // {
        //    地址: secret
        // }
        post_urls: {
        }
    }
    // 正向WS设置
    ws_config: {
        // 是否启用正向WS服务器
        enabled: false
        // 正向WS服务器监听地址
        host: 0.0.0.0
        // 正向WS服务器监听端口
        port: 6700
    }
    // 反向WS设置
    ws_reverse_servers: [
        // 可以添加多个反向WS推送
        {
            // 是否启用该推送
            enabled: false
            // 反向WS Universal 地址
            // 注意 设置了此项地址后下面两项将会被忽略
            // 留空请使用 ""
            reverse_url: ws://you_websocket_universal.server
            // 反向WS API 地址
            reverse_api_url: ws://you_websocket_api.server
            // 反向WS Event 地址
            reverse_event_url: ws://you_websocket_event.server
            // 重连间隔 单位毫秒
            reverse_reconnect_interval: 3000
        }
    ]
    // 上报数据类型
    // 可选: string array
    post_message_format: string
    // 是否使用服务器下发的新地址进行重连
    // 注意, 此设置可能导致在海外服务器上连接情况更差
    use_sso_address: false
    // 是否启用 DEBUG
    debug: false
    // 日志等级
    log_level: ""
    // WebUi 设置
    web_ui: {
        // 是否启用 WebUi
        enabled: false
        // 监听地址
        host: 127.0.0.1
        // 监听端口
        web_ui_port: 9999
        // 是否接收来自web的输入
        web_input: false
    }
}

上面文件内容保存到~/QQBOT/config.json文件中

启动

# 普通启动(调试用)
./go-cqhttp

# 后台启动(稳定后用)
nohup ~/QQBOT/go-cqhttp &

这里启动后可能遇到情况说是配置文件有问题,它又给生成了一个配置文件,那我们直接编辑它自己生成的那个文件,将QQ等信息填写好,关掉ws协议的东西,保存新配置文件后再启动运行即可。

滑块验证

不出意外就会开始登录验证

: 登录需要滑条验证码, 请选择解决方案:  
[2021-01-31 19:33:30] [WARNING]: 1. 自行抓包. (推荐) 
[2021-01-31 19:33:30] [WARNING]: 2. 使用Cef自动处理. 
[2021-01-31 19:33:30] [WARNING]: 3. 不提交滑块并继续.(可能会导致上网环境异常错误) 
[2021-01-31 19:33:30] [WARNING]: 详细信息请参考文档 -> https://github.com/Mrs4s/go-cqhttp/blob/master/docs/slider.md <- 
[2021-01-31 19:33:30] [WARNING]: 请输入(1 - 3):  

这里我选择的是第一个方法,自行抓包,它会给你发一个url,你只需要在Chrome或火狐等浏览器打开这个URL,然后去把滑块划过去,然后抓包,找到Ticket即可。

手机验证

再然后就是手机验证,这个我相信应该难不住各位的。

 

控制台输出如下即登录成功:

[2021-02-13 10:45:58] [INFO]: 登录成功 欢迎使用: 祝东风 
[2021-02-13 10:45:59] [INFO]: 开始加载好友列表... 
[2021-02-13 10:46:00] [INFO]: 共加载 3 个好友. 
[2021-02-13 10:46:00] [INFO]: 开始加载群列表... 
[2021-02-13 10:46:00] [INFO]: 共加载 0 个群. 
[2021-02-13 10:46:00] [INFO]: 信息数据库初始化完成. 
[2021-02-13 10:46:00] [INFO]: 正在加载事件过滤器. 
[2021-02-13 10:46:00] [WARNING]: 事件过滤器启动失败: open filter.json: no such file or directory 
[2021-02-13 10:46:00] [INFO]: 正在加载silk编码器... 
[2021-02-13 10:46:00] [INFO]: 资源初始化完成, 开始处理信息. 
[2021-02-13 10:46:00] [INFO]: アトリは、高性能ですから! 
[2021-02-13 10:46:00] [INFO]: 正在检查更新. 
[2021-02-13 10:46:00] [INFO]: CQ HTTP 服务器已启动: 0.0.0.0:5700 
[2021-02-13 10:46:01] [INFO]: 当前有更新的 go-cqhttp 可供更新, 请前往 https://github.com/Mrs4s/go-cqhttp/releases 下载. 
[2021-02-13 10:46:01] [INFO]: 当前版本: v0.9.39 最新版本: v0.9.40 
[2021-02-13 10:46:01] [INFO]: 收到服务器地址更新通知, 根据配置文件已忽略. 

检查监听

下面我们来检查一下监听是否成功

telnet 你的服务器IP 5700

如果失败,应该是防火墙的问题,打开防火墙即可。

或者在外部,比如自己的电脑上用工具去扫这个公网服务器的主机的某端口,我是用netcat去扫了一些。

当然,如果你不计划外部主机访问端口那就不必对外开放5700。


消息与事件

API调用

API参考https://github.com/howmanybots/onebot/blob/master/v11/specs/api/public.md

注意:外部调用时一定是运行着./go-cqhttp。

发送消息

打开浏览器,去浏览http://服务器IP:5700/send_private_msg?user_id=你的某个好友的QQ号&message=要发送的信息

不出意外,会返回一串json

{"data":{"message_id":-1333288940},"retcode":0,"status":"ok"}

与此同时,你的那个好友应该也收到了对应的消息,恭喜!搭建成功!

获取所有好友信息

http://服务器IP:5700/get_friend_list

其他的接口请参考上面的API文档,大多数读数据接口(get打头)的速度很快,一些写数据接口(set打头)会失败,比如点赞、退出群啥的,其他感兴趣的可自行测试

 

事件上报

什么是事件?比如一个好友给你发消息,这就是一个消息事件,上面实现了给别人发消息,那么别人给自己发消息自己怎么知道?答案就是事件上报,机器人会将收到的消息异步发送到指定的端口,由业务逻辑系统处理

比如好友给我发一个:听《一千年以后》,机器人将消息事件上报给指定端口,该端口在监听的系统去网易云搜索《一千年以后》的资源,然后调用上面的发消息API发送给好友,这样才能形成一个业务闭环。

一个事件必须的字段比如事件发生时间、事件类型、事件内容等等,可以参考OneBot 标准:https://github.com/howmanybots/onebot/blob/master/v11/specs/event/README.md

有哪些事件呢?下面列举了所有的事件类型

  • 消息事件
  • 通知事件
  • 请求事件
  • 元事件

来看一下我们最关心的消息事件

  • 私聊消息
  • 群消息

和通知事件

  • 群文件上传
  • 群管理员变动
  • 群成员减少
  • 群成员增加
  • 群禁言
  • 好友添加
  • 群消息撤回
  • 好友消息撤回
  • 群内戳一戳
  • 群红包运气王
  • 群成员荣誉变更

群成员增加可以干什么?自己细想吧,这里不过多透露了

配置业务系统

此处简单起见,用nc(netcat)监听一个端口接收来自机器人的事件上报,在5701端口起一个监听,如果你是公网服务器,记得防火墙处理好。

[root@linux-centos-7 ~]# nc -lvvp 5701
Listening on any address 5701

CentOS7安装netcat,非常简单:

1、下载安装

wget https://sourceforge.NET/projects/netcat/files/netcat/0.7.1/netcat-0.7.1.tar.gz/download
tar -zxvf netcat-0.7.1.tar.gz -C /usr/local
cd /usr/local
mv netcat-0.7.1 netcat
cd /usr/local/netcat
./configure
make && make install

2、配置

vim /etc/profile

添加以下内容:

# set  netcat path
export NETCAT_HOME=/usr/local/netcat
export PATH=$PATH:$NETCAT_HOME/bin

保存,退出,并使配置生效:

source /etc/profile

3、测试

nc -help成功

 

修改上报配置

// HTTP设置
http_config: {
    // 是否启用正向HTTP服务器
    enabled: true
    // 服务端监听地址
    host: 0.0.0.0
    // 服务端监听端口
    port: 5700
    // 反向HTTP超时时间, 单位秒
    // 最小值为5,小于5将会忽略本项设置
    timeout: 0
    // 反向HTTP POST地址列表
    // 格式: 
    // {
    //    地址: secret
    // }
    post_urls: 
    {
        //我的局域网地址10.0.0.3
        "10.0.0.3:5701":secret
    }
}

重新启动机器人,会发现nc监听的5701端口有如下输出,说明上报配置生效

 [root@VM-0-3-centos ~]# nc -lvvp 5701
Listening on any address 5701
Connection from 127.0.0.1:34412
POST / HTTP/1.1
Host: 127.0.0.1:5701
User-Agent: CQHttp/4.15.0
Content-Length: 385
Content-Type: application/json
X-Self-Id: 马赛克
X-Signature: sha1=d306216b332c953f706fb534252091a22c128a90
Accept-Encoding: gzip

{"interval":5000,"meta_event_type":"heartbeat","post_type":"meta_event","self_id":马赛克,
"status":{"app_enabled":true,"app_good":true,"app_initialized":true,"good":true,"online":true,
"plugins_good":null,"stat":{"packet_received":15,"packet_sent":10,"packet_lost":0,
"message_received":0,"message_sent":0,"disconnect_times":0,"lost_times":0,"last_message_time":0}},
"time":1613726465}Total received bytes: 612
Total sent bytes: 0

OK,可以看到,我们的QQ的上报消息就会全部发送到该端口了。

 


程序开发

上面只是介绍了基本的规则,要实现QQ机器人的闭环系统,还需要我们用程序去操作。

为了方便起见,下面,我们就用python代码来一步步实现一个真正的QQ机器人。

 

注意:项目所创建的每一个python文件,请在第一行声明编码:

# coding=utf-8

基本准备

我这里选用了我的老朋友——python,一个是因为方便,另一个是因为我参考的文章就是用的python,该文章很好地启发了我基本的思路。

我用vscode写python,python的版本是3.9了。

废话不多说,我们开始吧。

基本框架

启动文件与目录结构

首先我们大致的目录结构是

  • handle目录(__init__.py、main_handle.py、msg_handle.py、message_handle.py、notice_handle.py、request_handle.py)
  • socket_operate目录(__init__.py、client.py、server.py)
  • main.py

目录结构可能并不规范,毕竟我没咋写过python,大神们见笑啦

大致说一下就是,main文件是我们的启动文件,handle目录下是我们对传来信息的解析与处理,socket_operate目录下则是我们操作socket来接收数据或者发送数据、发送请求等。

#main.py
import  socket_operate.server
import  handle.main_handle

def main():
    while 1:
        #使用try、except语句保证程序不会因部分错误退出。
        all_message = socket_operate.server.rev_msg()
        try:
            handle.main_handle.main_handle(all_message)
        except:
            continue
    
if (__name__ == "__main__"):
    main()  

上面就是主函数

这里需要注意,如上图所示,每个目录中需要你手动创建一个__init__.py,什么内容也不用写,这个文件可以标识这个目录为“模块”

#__init__.py
#空文件,只是为了handle目录成为一个模块,python的奇妙规则~

监听

主函数中我们可以看到我们利用socket_operate的server的rev_msg方法来监听了数据

#serverpy
#在5701端口的角度上,我们是接收消息的服务端

import socket
import json

##定义socket类型,网络通信,TCP
ListenSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
#套接字绑定的IP与端口
ListenSocket.bind(('10.0.0.3', 5701))
#开始TCP监听
ListenSocket.listen(100)  

#定义一个http响应头
HttpResponseHeader = '''HTTP/1.1 200 OK
Content-Type: text/html
'''
 
#转json定位有效信息
def json_to_info(msg):
    for i in range(len(msg)):
        if msg[i]=="{" and msg[-1]=="}":
            #解码 JSON 数据。返回 Python 字段的数据类型
            return json.loads(msg[i:])
    return None
 
#需要循环执行,返回值为json格式
def rev_msg():
    #接受TCP连接,并返回新的套接字与IP地址
    conn, addr = ListenSocket.accept()
    #接收数据解码(接收到的是string形式的json)
    data = conn.recv(1024).decode(encoding='utf-8')

    #json格式转dict格式
    rev_dict=json_to_info(data)

    conn.sendall((HttpResponseHeader).encode(encoding='utf-8'))
    conn.close()
    return rev_dict

监听数据后,我们调用了handle的mainhandle来进行总处理。

监听地址一定要与go-cq中上报地址一致,最好是局域网地址!!!

监听地址一定要与go-cq中上报地址一致,最好是局域网地址!!!

监听地址一定要与go-cq中上报地址一致,最好是局域网地址!!!

处理

首先,我们单独出来一个文件来处理对数据的解析,即将字典数据中的key对应的value拿出来

#msg_handle.py
#该文件主要是对主文件中得到的字典中得到关键数据的方法

#获取上报类型:message、notice、request
def get_post_type(msg):
    return msg['post_type']

#获取信息类型 群聊/私聊 group/private
def get_message_type(msg):
    return msg['message_type']

#获取群号/私聊qq号
def get_number(msg):
    if get_message_type(msg) == 'group':
        return msg['group_id']
    elif get_message_type(msg) == 'private':
        return msg['user_id']
    else:
        print('出错啦!找不到群号/QQ号')
        exit()

# 获取信息发送者的QQ号
def get_user_id(msg):
    return msg['user_id']

#获取发送的信息
def get_raw_message(msg):
    return msg['raw_message']

这里我们需要注意了解一些,通过官网可知,go-cqhttp将事件分成了三种类型:消息事件、通知事件、请求事件。

故我们在main_handle中进行事件的分发(路由思想)

#main_handle.py
#该文件主要是对主文件中得到的接收消息进行处理的方法

import handle.msg_handle
    
import handle.message_handle
import handle.notice_handle
import handle.request_handle


#事件类型出现异常
def default():
    print("事件类型异常")
    return 0

#将不同的事件分发给不同的情况
def main_handle(msg):
    post_type = handle.msg_handle.get_post_type(msg)         # 获取上报类型
    if post_type=='message':  #消息事件
        handle.message_handle.message_handle(msg)
    elif post_type=='notice':   #通知事件
        handle.notice_handle.notice_handle(msg)
    elif post_type=='request':   #请求事件
        handle.request_handle.request_handle(msg)
    else:
        default()
    return 0

这下你知道了吧,那三个文件——message_handle.py、notice_handle.py、request_handle.py,其实就是进行不同的分发处理而已。

#notice_handle.py
#对于 通知事件 的处理

def notice_handle(msg):
    print("处理通知事件")
    return 
#request_handle.py
#对于 请求事件 的处理
def request_handle(msg):
    print("处理请求事件")
    return 0
#message_handle.py
#对于 消息事件 的处理
import socket_operate.client

def message_handle(msg):
    print("处理消息事件")
    return 0

 

发送消息

最后,我们在client.py中定义发送的方法,我们将发送数据封装到一个字典中作为参数传下去来发送

#client.py
#在5700端口的角度上,我们是发送消息的客户端
import socket

#通过发送http请求让傀儡QQ发送消息
#在参数resp_dict中,我们要指定好类型、回复信息、账号(群号)
def send_msg(resp_dict):
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    ip='10.0.0.3'
    client.connect((ip,5700))

    msg_type=resp_dict['msg_type']  #回复类型(群聊/私聊)
    number=resp_dict['number']    #回复账号(群号/好友号)
    msg=resp_dict['msg']      #要回复的消息

    #将字符中的特殊字符进行url编码
    msg=msg.replace(" ", "%20")
    msg=msg.replace("\n","%0a")  

    if msg_type == 'group':
        payload = "GET /send_group_msg?group_id=" + str(number) + "&message=" + msg + " HTTP/1.1\r\nHost:"+ip+":5700\r\nConnection: close\r\n\r\n"
    elif msg_type == 'private':
        payload = "GET /send_private_msg?user_id=" + str(number) + "&message=" + msg + " HTTP/1.1\r\nHost:"+ip+":5700\r\nConnection: close\r\n\r\n"
    print("发送"+payload)
    client.send(payload.encode("utf-8"))
    client.close()

    return 0

一定要对特殊字符进行url转码,否则发送失败!!!

中途测试

下面我们来做个小测试,修改message_handle.py

#message_handle.py
#对于 消息事件 的处理
import socket_operate.client

def message_handle(msg):
    msg_dict={
        "msg_type":"group",
        "number":"123123123",
        "msg":"有趣!"
    }
    socket_operate.client.send_msg(msg_dict)
    return 0

首先,我们启动一个服务端(充当go-cqhttp监听5700的角色),我这里还是使用了netcat:

nc -lvvp 5700

然后,我们找一个可以发送POST请求的方法,我这里用的Chrome浏览器的POSTMan,蛮好用的。

下面我们启动我们的python项目:

py   main.py

然后,使用我们的POSTman,向http://127.0.0.1:5701发送一个POST请求,发送的是application/json类型的json格式数据,如:

{
	"username":"luoluo",
	"password":"123123123123123",
	"post_type":"message",
	"test":{
		"one":"111",
		"two":"222"
	}
}

上面这一片json,唯一有用的就是post_type

然后我们的message_handle.py的message_handle就应该被触发了,加上我们故意设计的字典,最后在cmd窗口可以看到nc显示出:

C:\Users\lenovo>nc -lvvp 5700
listening on [any] 5700 ...
connect to [127.0.0.1] from www.xmind.com [127.0.0.1] 51182
GET /send_group_msg?group_id=123123123&message=有趣! HTTP/1.1
Host: 127.0.0.1:5700
Connection: close

 sent 0, rcvd 110

消息事件

消息事件处理

我们先来处理最常用的最简单的消息事件

由于我们优秀的路由分发,我们直接在message_handle中进行编程即可。

首先将message_handle再分发一下消息——私聊/群发:

#对于 消息事件 的处理
from socket_operate.client  import send_msg
from handle.msg_handle import get_message_type

#私聊消息处理
def private_msg_handle(msg):
    return 



#群聊消息处理
def group_msg_handle(msg):
    return 



def message_handle(msg):
    if get_message_type(msg)=='private':
        private_msg_handle(msg)
    elif get_message_type(msg)=='group':
        group_msg_handle(msg)
    return 0

再细分一下:

#对于 消息事件 的处理
from socket_operate.client  import send_msg
from handle.msg_handle import get_message_type,get_number

#---------------------------------------------------------------
#私聊信息的普通检测
def private_msg_general_detection(message):
    reply=''
    return reply

#私聊信息的数据库检测(通过数据库来找对应的回复)
def private_msg_db_detection(message):
    reply=''
    return reply

#私聊信息的错误处理
def private_msg_error(message):
    reply=''
    return reply

#私聊消息处理
def private_msg_handle(message):
    msg_dict={ 
        "msg_type":"private", 
        "number":get_number(message), 
        "msg":"我听不懂" 
    }
    general_detect=private_msg_general_detection(message)
    db_detect=private_msg_db_detection(message)
    if general_detect!='':
        msg_dict["msg"]=general_detect
    elif db_detect!='':
        msg_dict["msg"]=db_detect
    else:
        msg_dict["msg"]=private_msg_error(message)
    send_msg(msg_dict)  
    return 

#---------------------------------------------------------------
#群聊信息的普通检测
def group_msg_general_detection(message):
    reply=''
    return reply

#私聊信息的数据库检测(通过数据库来找对应的回复)
def group_msg_db_detection(message):
    reply=''
    return reply

#群聊信息的错误处理
def group_msg_error(message):
    reply=''
    return reply

#群聊消息处理
def group_msg_handle(message):
    msg_dict={ 
        "msg_type":"group", 
        "number":get_number(message), 
        "msg":"我听不懂" 
    }
    #检测
    general_detect=group_msg_general_detection(message)
    db_detect=group_msg_db_detection(message)

    if general_detect!='':
        msg_dict["msg"]=general_detect
    elif db_detect!='':
        msg_dict["msg"]=db_detect
    else:
        msg_dict["msg"]=group_msg_error(message)
    send_msg(msg_dict)  
    return  

#---------------------------------------------------------------

def message_handle(message):
    if get_message_type(message)=='private':
        private_msg_handle(message)
    elif get_message_type(message)=='group':
        group_msg_handle(message)
    return 0

OK,这样我们就实现了消息的特殊处理

数据库与学习系统

下面,我们将目光转一下,先放下这个message_handle,我们去新建一个目录——special_function,这个目录中,我们希望可以建立起来QQ机器人的学习系统,即通过服务器本地的数据库来自动回复,同时,我们也可以在QQ上通过特殊的消息教给QQ机器人怎么回复。

磨刀不误砍柴工,我们再新建一个目录mysql,这个模块来写访问数据库的操作。

新建的两个目录先新建__init__.py,声明为模块。

数据库这边我用的pymysql库,以前写过对应的文章,有兴趣可以自行查阅,这里直接贴代码了

# mysql/operate.py
import pymysql

host='127.0.0.1'
port=3306
user='root'
password='964939451'
database='qqrobot'


#增加信息:往数据库中增加quest和reply
def addInfo(quest,reply):
    #连接数据库
    db=pymysql.connect(host=host,port=port,user=user,password=password,database=database)
    #创建一个游标
    cursor=db.cursor()

    #sql语句
    sql="insert into words(id,quest,reply) VALUES(NULL,%s,%s)"
    db.begin()       #开启事务
    try:
        #执行sql语句
        cursor.execute(sql,[quest,reply])
        #提交事务
        db.commit()
    except pymysql.Error as e:
        print(e)
        db.rollback()   #报错,则回滚
        return False
    db.close()
    return True

#查找信息,通过quest,查找reply
def findInfo(quest):
    #连接数据库
    db=pymysql.connect(host=host,port=port,user=user,password=password,database=database)
    #创建一个游标
    cursor=db.cursor()

    #sql语句
    sql="select * from words WHERE quest=%s"
    db.begin() #开启事务
    #接收查找结果
    res=None
    try:
        #执行sql语句 (第二个参数即把占位符解释)
        cursor.execute(sql,quest)
        res=cursor.fetchall()
        db.commit() #提交事务
    except pymysql.Error as e:
        print(e)
        db.rollback() #报错,则回滚
    db.close()
    #将查询结果(元组)返回
    return res

这个文件中定义的两个方法就可以让我们来访问数据库了,下面我们先在本机来建立数据库:

CREATE DATABASE qqrobot;
USE qqrobot;
CREATE TABLE words(
	id INT PRIMARY KEY AUTO_INCREMENT,
	quest VARCHAR(30),
	reply VARCHAR(30)
);

下面我们来做special_function目录。

新建一个study.py文件,我们来实现学习和查找学习成果。

from mysql.operate import findInfo,addInfo
import random

#学习消息
def study_info(quest,reply):
    quest=quest.encode('utf-8')
    reply=reply.encode('utf-8')
    if len(quest)<90 and len(reply)<90:
        if addInfo(quest,reply):
            return True
    else:
        return False

#返回消息
def get_reply(quest):
    res=None
    #长度小于90才有必要查找
    if len(quest)<90:
        res=findInfo(quest)
    else:
        print("超过90的字符串不会存储在数据库中的")        
    #没有找到结果
    if not res:
        return ''
    #有结果(随机返回结果)
    index=random.randint(0,len(res)-1)
    return res[index][2]

回到我们的message_handle.py,新内容;

#对于 消息事件 的处理
from socket_operate.client  import send_msg
from handle.msg_handle import get_message_type,get_number,get_raw_message,get_user_id
from special_function.study import get_reply,random,study_info

from special_function.logging_tool import logging_put

#---------------------------------------------------------------
#私聊信息的普通检测
def private_msg_general_detection(message):
    reply=''
    #得到特殊指令
    command=get_raw_message(message).split(' ')[0]
    if command=="#学习" and len(get_raw_message(message).split(' '))==3:
        new_quest=get_raw_message(message).split(' ')[1]
        new_reply=get_raw_message(message).split(' ')[2]
        if study_info(new_quest,new_reply):
            logging_put("用户"+str(get_user_id(message))+"教我新内容:"+new_quest+"--"+new_reply)
            reply='学习成功,收获新知识'
        else:
            reply='学习失败'
    return reply

#私聊信息的数据库检测(通过数据库来找对应的回复)
def private_msg_db_detection(message):
    reply=''
    reply=get_reply(get_raw_message(message))
    return reply

#私聊信息的错误处理
def private_msg_error():
    rand = random.randint(1,4)
    msg="超出我的知识上限"
    if rand == 1:
        msg = "消息太复杂"
    elif rand == 2:
        msg = "江湖险恶,我不会答"
    elif rand == 3:
        msg = "啊?"
    elif rand == 4:
        msg = "听不懂~"
    return msg

#私聊消息处理
def private_msg_handle(message):
    msg_dict={ 
        "msg_type":"private", 
        "number":get_number(message), 
        "msg":"我听不懂" 
    }
    #检测
    general_detect=private_msg_general_detection(message)
    db_detect=private_msg_db_detection(message)

    if general_detect!='':
        msg_dict["msg"]=general_detect
    elif db_detect!='':
        msg_dict["msg"]=db_detect
    else:
        logging_put("私聊中用户请求没有对应的信息:"+get_user_id(message)+"-"+get_raw_message(message))
        msg_dict["msg"]=private_msg_error()
    print(msg_dict)
    send_msg(msg_dict)  
    return 

#---------------------------------------------------------------
#群聊信息的普通检测
def group_msg_general_detection(message):
    reply=''
    return reply

#群聊信息的数据库检测(通过数据库来找对应的回复)
def group_msg_db_detection(message):
    reply=''
    reply=get_reply(get_raw_message(message))
    return reply

#群聊信息的错误处理
def group_msg_error():
    rand = random.randint(1,5)
    msg="超出我的知识上限"
    if rand == 1:
        msg = "消息太复杂"
    elif rand == 5:
        msg = "群里有人知道怎么回答ta吗?"
    elif rand == 2:
        msg = "江湖险恶,我不会答"
    elif rand == 3:
        msg = "啊?"
    elif rand == 4:
        msg = "听不懂~"
    return msg

#群聊消息处理
def group_msg_handle(message):
    msg_dict={ 
        "msg_type":"group", 
        "number":get_number(message), 
        "msg":"我听不懂" 
    }
    #检测
    general_detect=group_msg_general_detection(message)
    db_detect=group_msg_db_detection(message)

    if general_detect!='':
        msg_dict["msg"]=general_detect
    elif db_detect!='':
        msg_dict["msg"]=db_detect
    else:
        logging_put("群聊中用户请求没有对应的信息:"+str(get_user_id(message))+"-"+get_raw_message(message))
        msg_dict["msg"]=group_msg_error()
    send_msg(msg_dict)  
    return  

#---------------------------------------------------------------

def message_handle(message):
    if get_message_type(message)=='private':
        private_msg_handle(message)
    elif get_message_type(message)=='group':
        group_msg_handle(message)
    return 0

这样就可以实现QQ机器人的自我学习和自动回复。

上面用到的logging,是一个日志库,很简单

#logging_tool.py
import logging


def logging_put(info):
    logging.basicConfig(
        filename='robot.log',
        level=logging.INFO,
        format='%(asctime)s:%(levelname)s:%(message)s'
    )
    logging.info(info)

 

 


篇幅问题,我们这一篇文章的内容就到此为止了,目前为止都是非常成功的,希望你也坚持下来了,加油

 

 

 

 

 

 

 

商业转载 请联系作者获得授权,非商业转载 请标明出处,谢谢

是的,我就是计算机界的枭雄!