起因

  • 故事的起因是这样的,笨蛋楠楠不好好学习,每天到家就躺床上玩手机。需要我每天督促她。于是乎就想看一下QQ机器人相关技术,然后做个定时任务就好了,毕竟懒才是第一生产力,如果让我每天手动发的话,长此以往会累死的。

初步调研

go-cqhttp

Mrs4s commented on Oct 10, 2023
由于QQ官方针对协议库的围追堵截 持续👊🐔 , 不断更新加密方案, 我们已无力继续维护此项目.
在未来 sign-server 方案彻底被官方封死之后 go-cqhttp 将无法继续使用.
同时NTQQ的出现让我们可以使用官方 完美 实现的协议实现来继续开发Bot, 不再担心由于协议实现不完美而导致被识别.
我们建议所有QQBot项目开始做好迁移至无头NTQQ或类似基于官方客户端技术的准备以应对未来的彻底封锁,
如果你的 go-cqhttp 还能继续使用, 不建议立即迁移, 但请开始阅读相关文档并做好迁移准备

推荐项目:

如果你想在电脑/服务器上部署bot -> https://chronocat.vercel.app/blog/0050 如果你想在Android 手机/模拟器上部署bot -> https://github.com/linxinrao/Shamrock 以上项目均为调用官方协议实现

以上项目均被请喝茶了,只能说有缘再见了.

相关问题可以在这个issue下讨论

协议库的时代已经过去, 接下来是Hook官方客户端的时代了, 感谢大家三年来的支持

其实go-cqhttp项目最初只是想做一个能在路由器上跑的酷Q

——————————————————————
什么是无头NTQQ?

众所周知, QQ官方最新推出的 NTQQ 客户端使用了 electron 技术, 该技术可以非常方便的跨平台同时使用前端已有的技术栈进行客户端开发.
NTQQ 客户端项目分为前后端两个部分, 前端是使用 Web 技术开发的 UI 界面供用户交互,后端使用 nodejs addons 技术包装了一个库来处理客户端逻辑和与服务端通信 (wrapper.node).
这个库的作用和 go-cqhttp 非常相似, 所以我们完全可以将前端删除只与这个库交互, 并引出 API 来为我们的Bot服务.
从服务端视角来说我们的 Bot 和正常客户端一样, 因为都是通过 wrapper.node 与服务端通信. 并且由于是官方根据内部文档开发的模块, 我们可以说这是一个 完美 的 go-cqhttp.

优点: 无头模式下相对低的占用.
缺点: 可能会受未来QQ更新的影响.

Shamrock项目是什么原理?

Shamrock 项目使用 xposed 的 hook 技术来实现远程操作 AndroidQQ 客户端.
优点: 不容易受未来更新封堵的影响.
缺点: 需要运行一个完整 AndroidOS 环境.

如果你的服务器资源足够充足, 我个人建议观望并跟进 Shamrock 项目. xposed 是久经考验且生态完善的技术.

mirai

  • mirai 是一个在全平台下运行,提供 QQ Android 协议支持的高效率机器人库。
  • 乍一看好像还没凉,遂尝试了一番,经过磕磕绊绊之后,虽说可以收发消息,但是相当不稳定,而且还有封号的风险
  • 一看论坛,也是风中残烛了,遂放弃。

R.I.P

  • 新时代已经没有能承载他们的船了

LLOneBot

  • 由于在go-cqhttp中,作者提到了QQ最新退出的NTQQ客户端,而我很久之前就已经装过LiteLoaderQQNT这个插件了。那么肯定有基于这个新客户端开发的机器人技术吧,然后就找到了这个项目,真是相见恨晚啊,前面两个旧时代的残党我花了一个晚上捋清了来龙去脉,浪费了宝贵的8小时打游戏时间,下次我要更快更强。
  • 装好插件配置好后,我们仅仅需要修改这几个地方

收发消息

  • 这个是LLOneBot给出的收发消息的示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import uvicorn
from fastapi import FastAPI, Request

app = FastAPI()


@app.post("/")
async def root(request: Request):
data = await request.json() # 获取事件数据
print(data)
return {}

if __name__ == "__main__":
uvicorn.run(app, port=8080)
  • 运行这个 Python 代码后,会在本地 8080 端口启动一个 HTTP 服务
  • 当有事件发生时,LLOneBot 会向 http://localhost:8080/ 发送 POST JSON 请求,具体事件数据可以查看 事件
1
2
3
4
5
6
7
8
9
10
11
import requests

requests.post('http://localhost:3000/send_private_msg', json={
'user_id': 123456,
'message': [{
'type': 'text',
'data': {
'text': 'Hello, World!'
}
}]
})
  • 其中 send_private_msg 是 OneBot V11 的 发送私聊消息 API,具体 API 可以查看 API 文档
  • user_id 是 QQ 号,message 是消息内容
  • 这里以文本消息格式为例,type 表示消息类型,type: text 表示文本消息,data 是消息内容,text 表示文本内容
  • 更多的消息内容的格式可以查看 消息类型
  • 有了最最基础的收发消息的能力了,剩下的就好说了,接收楠楠的消息,判断其意图,如果是已经下班到家,那么向延迟队列中插入一条提醒学习的任务,2小时后触发即可。对于意图判断,可以接入大模型来实现,同时大模型也会给出更温馨的回答(岂可修,那楠楠到底是会更喜欢AI一点还是更喜欢我一点)。

接入大模型

  • 既然已经可以收发消息了,那么剩下的就是接入大模型API,温馨提醒一下楠楠该学习啦。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import uvicorn
import logging
import requests
import json

import yaml
from fastapi import FastAPI, Request
from openai import OpenAI

from models import Msg

from logging_config import setup_logging

setup_logging()

logger = logging.getLogger(__name__)


def load_config():
with open(r"config.yaml", "r", encoding="utf-8") as file:
return yaml.safe_load(file)


def init_llm(api_key):
client = OpenAI(
api_key=api_key,
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
return client


app = FastAPI()

config = load_config()
llm = init_llm(api_key=config['openai']['api_key'])


@app.post("/")
async def root(request: Request):
data = await request.json()
try:
msg_obj = Msg.parse_obj(data)
qq = msg_obj.user_id
logger.info(f"QQ:{qq},Nickname -- {msg_obj.sender.nickname}{msg_obj.raw_message}")
if qq == config['girl_friend']['qq']:
logger.info("Received a message from girl friend")
answer = llm_answer(msg_obj.raw_message, llm, config['girl_friend']['system_prompt'])
send_message(qq, answer)

except Exception:
logger.error(f"data parse error:{data}")


def llm_answer(question, client, prompt):
completion = client.chat.completions.create(
model="qwen-max",
messages=[
{
'role': 'system',
'content': prompt
},
{'role': 'user', 'content': '楠楠对你说:' + question}
]
)
content = completion.choices[0].message.content
logging.info(f'LLM Response:{content}')
return content


def send_message(qq, message):
url = config['send_msg_url']
data = {
"user_id": qq,
"message": [
{
"type": "text",
"data": {
"text": message
}
}
]
}
response = requests.post(url, data=json.dumps(data))
logger.info(f"send message to {qq} result:{response.text}")


if __name__ == "__main__":
uvicorn.run(app, port=8080)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from typing import List, Dict

from pydantic import BaseModel


class Sender(BaseModel):
user_id: int
nickname: str
card: str


class MessageContent(BaseModel):
type: str
data: Dict[str, str]


class Msg(BaseModel):
self_id: int
user_id: int
time: int
message_id: int
real_id: int
message_seq: int
message_type: str
sender: Sender
raw_message: str
font: int
sub_type: str
message: List[MessageContent]
message_format: str
post_type: str

1
2
3
4
5
6
7
8
9
10
11
12
13
import logging

def setup_logging():
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=[
logging.FileHandler("app.log", encoding="utf-8"),
logging.StreamHandler(),
],
)

1
2
3
4
5
6
7
8
send_msg_url: http://localhost:3000/send_private_msg        # 发送消息的接口
openai:
api_key: YOUR_API_KEY # 我这里接入的是阿里系的通义千问,提供对应的API_KEY即可

girl_friend:
name: 楠楠 # 设置称呼,可以让AI以这个称呼她
qq: XXX # 设置QQ号
system_prompt: YOUR_SYSTEM_PROMPT # 为智能体初始化的prompt

初步效果

延迟队列与 Function Call 实现定时提醒

  • 目标:期望的效果是,对它说一句话,例如“半小时后提醒我去洗澡”,希望能理解语义,并且在对应的时间后向目标发送消息。
  • 那么在xx时间后向特定目标发送这个消息,可以用延迟队列来实现,等下我手写一个延迟队列出来就好了。
  • 难点主要是在于,如何让大模型理解我的语意,来调用添加延迟队列任务的方法,这个可以用function call来实现。

手写一个延迟队列

  1. 延迟队列的概念

    • 延迟队列是一种特殊的队列,它将任务按指定的延迟时间存储,任务在指定时间到达后才会被执行。例如,你可以设置一个任务,让它在一分钟后执行,而在这段时间内,它会处于等待状态。延迟队列常用于需要定时执行的任务,比如定时提醒、定时清理等。
  2. 底层数据结构

    • 在实现延迟队列时,关键是如何高效地管理和调度任务。可以使用多种数据结构,常见的有:
      • 栈:不适合延迟队列,栈是后进先出,难以实现按时间顺序执行。
      • 队列:队列适合按顺序执行任务,但如果任务的执行时间不同,队列无法高效地处理优先级。
      • 堆:最合适的选择。堆,特别是最小堆,可以保证延迟时间最短的任务排在队列的最前面。
  3. 为什么选择堆?

    • 堆是一个完全二叉树,具有以下优点:
      1. 最小堆:每个父节点的值都不大于其子节点的值。最小堆根节点存储的是当前队列中延迟时间最短的任务。
      2. 操作效率:插入和删除操作时间复杂度为O(log n),因此堆非常适合用来高效地管理延迟队列。
  4. 如何实现?

    • 每个任务都会被分配一个延迟时间(即任务的执行时间)。
    • 使用最小堆来存储这些任务,任务按延迟时间从小到大排列。
    • 任务的执行顺序是从堆顶开始的,即最短延迟时间的任务最先执行。
    • 示例:假设有三个任务,它们的延迟时间分别是 10 分钟、1 分钟和5 分钟。最小堆会保证 1 分钟的任务在最前面,接着是 5 分钟的任务,最后是 10 分钟的任务。
  5. 工作原理

    • 任务加入队列:
      • 每次添加任务时,我们将任务与其延迟时间一同插入堆中。堆会自动将任务按延迟时间排序,使得最短延迟的任务始终位于堆顶。
    • 任务执行:
      • 使用一个循环持续监听堆顶任务的延迟时间。
      • 每次循环检查堆顶任务的延迟时间是否到期(即当前时间是否超过任务的执行时间)。
      • 如果任务未到期,程序就会休眠一段时间,再次检查。如果任务已经到期,则从堆中取出任务并执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class DelayQueue:

def __init__(self):
self.queue = []
self.lock = threading.Lock()
self.condition = threading.Condition(self.lock)

def add_delay_task(self, delay_msg, delay_time):
execute_time = time.time() + delay_time
with self.condition:
heapq.heappush(self.queue, (execute_time, delay_msg))
self.condition.notify()
logger.info(f'添加延迟队列任务: {delay_msg}, 将于 {delay_time} 秒后执行.')

def get_task(self):
with self.condition:
while True:
if self.queue:
execute_time, task = self.queue[0]
current_time = time.time()

if current_time >= execute_time:
heapq.heappop(self.queue)
return task
else:
self.condition.wait(timeout=execute_time - current_time)
else:
self.condition.wait()

def start_worker(self):
def worker():
while True:
try:
task = self.get_task()
logger.info(
f'Executing task: {task} at {time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}')
answer = llm_answer_delay_msg(task, [], llm, config['girl_friend']['system_prompt'])
send_message(config['girl_friend']['qq'], answer, config['send_msg_url'])
except Exception as e:
logger.error(f"Worker encountered an error: {e}", exc_info=True)

threading.Thread(target=worker, daemon=True, name="DelayQueueWorker").start()

Function Call

  • Function Call 是一种机制,通过大语言模型来决定是否触发某些预定义的功能。与传统的 API 调用不同,Function Call 允许模型根据上下文和用户输入动态判断是否需要执行某些操作。
  • 在本次应用中,Function Call 主要用于根据用户的自然语言输入,决定是否需要将任务添加到延迟队列中,并指定延迟的时间。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
function_definitions = [
{
"name": "add_delay_task",
"description": "将任务添加到延迟队列。",
"parameters": {
"type": "object",
"properties": {
"delay_msg": {
"type": "string",
"description": "延迟任务的消息内容"
},
"delay_time": {
"type": "integer",
"description": "延迟时间,单位是秒"
}
},
"required": ["delay_msg", "delay_time"]
}
}
]
  • 在这个示例中,我们定义了一个名为 add_delay_task 的函数,表示将任务添加到延迟队列。该函数有两个参数:
    • delay_msg:任务的消息内容,类型为字符串。
    • delay_time:延迟的时间,单位为秒,类型为整数。
  1. 调用大语言模型进行消息处理

    • 我们将利用大语言模型来处理用户的输入消息,并判断是否需要调用 add_delay_task 函数。模型根据自然语言的上下文来决定是否调用延迟任务功能。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    response = LLM_CLIENT.chat.completions.create(
    model="qwen-plus",
    messages=[
    {
    "role": "system",
    "content": "你是一位智能助手,帮助解析自然语言任务并添加到延迟队列,仅仅当明确出现时间并且需要判断是否有需要提醒的意愿时,才需要调用。例如:‘十分钟后提醒我去洗澡’。"
    },
    {
    "role": "user",
    "content": f"女朋友说:{msg}"
    }
    ],
    functions=function_definitions,
    function_call="auto" # 让模型决定是否调用功能
    )
    • 其中function_call="auto" 让模型自动判断是否需要调用功能,而不需要我们手动指定。
  2. 处理模型响应结果

    • 当模型处理完用户消息后,它会返回一个 response 对象,其中包含了模型的回复以及是否需要调用某个函数。如果模型决定调用 add_delay_task 函数,它会在 response 中返回 function_call,包含了要调用的函数名和相应的参数。
    1
    2
    3
    if response.choices[0].message.function_call:
    func_name = response.choices[0].message.function_call.name
    arguments = json.loads(response.choices[0].message.function_call.arguments)
  3. 解析并调用函数

    1
    2
    if func_name == "add_delay_task":
    delay_queue.add_delay_task(**arguments)
  4. 整体示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    @router.post("/")
    async def root(request: Request):
    data = await request.json()
    try:
    msg_obj = Msg.parse_obj(data)
    qq = msg_obj.user_id
    logger.info(f"QQ:{qq},Nickname -- {msg_obj.sender.nickname}{msg_obj.raw_message}")
    if qq not in message_queue_cache:
    message_queue_cache[qq] = deque(maxlen=MAX_QUEUE_SIZE)
    if qq == config['girl_friend']['qq']:
    if qq not in message_queue_cache:
    message_queue_cache[qq] = deque(maxlen=MAX_QUEUE_SIZE)
    msg = msg_obj.raw_message
    message_queue_cache[qq].append({"type": "user", "message": msg})

    # 调用大模型解析消息
    response = LLM_CLIENT.chat.completions.create(
    model="qwen-plus",
    messages=[
    {
    "role": "system",
    "content": "你是一位智能助手,帮助解析自然语言任务并添加到延迟队列,仅仅当明确出现时间并且需要判断是否有需要提醒的意愿时,才需要调用。例如:‘十分钟后提醒我去洗澡’。"
    },
    {
    "role": "user",
    "content": f"女朋友说:{msg}"
    }
    ],
    functions=function_definitions,
    function_call="auto" # 让模型决定是否调用功能
    )

    # 如果模型调用了 Function
    if response.choices[0].message.function_call:
    func_name = response.choices[0].message.function_call.name
    arguments = json.loads(response.choices[0].message.function_call.arguments)

    if func_name == "add_delay_task":
    # 执行了 add_delay_task 方法
    delay_queue.add_delay_task(**arguments)
    # 生成一个立即回复
    answer = llm_received_delay_msg(msg, list(message_queue_cache[qq]), llm,
    config['girl_friend']['system_prompt'],
    config['girl_friend']['name'])
    message_queue_cache[qq].append({"type": "llm", "message": answer})
    send_message(qq, answer, config['send_msg_url'])
    return
    # 没有调用Function,生成普通回复
    answer = llm_answer(msg, list(message_queue_cache[qq]), llm,
    config['girl_friend']['system_prompt'],
    config['girl_friend']['name'])
    message_queue_cache[qq].append({"type": "llm", "message": answer})
    send_message(qq, answer, config['send_msg_url'])
    logger.info(f"Queue for {qq}: cache message:{list(message_queue_cache[qq])}")
    except Exception as e:
    logger.error(f"data parse error:{data}, error: {e}")
    logger.error(f"Stack trace: {traceback.format_exc()}")

结语

  • 既然都看到这里了,不妨给这个项目点个Star吧,多谢!