feat(*): make message processing sequential

This commit is contained in:
h
2026-05-04 22:30:40 +02:00
parent d8a5fde5f1
commit cfa592e85b
9 changed files with 413 additions and 197 deletions
+186 -151
View File
@@ -28,6 +28,7 @@ from bot.modules.ai import (
get_follow_ups,
stream_response,
)
from bot.user_lock import get_user_lock
from utils import env
from utils.convex import ConvexClient
@@ -256,15 +257,34 @@ async def process_message_from_web( # noqa: C901, PLR0912, PLR0913, PLR0915
convex_chat_id: str,
images_base64: list[str] | None = None,
images_media_types: list[str] | None = None,
pending_generation_id: str | None = None,
) -> None:
user = await convex.query("users:getById", {"userId": convex_user_id})
if not user or not user.get("geminiApiKey"):
return
tg_chat_id = user["telegramChatId"].value if user.get("telegramChatId") else None
is_summarize = text == "/summarize"
if not is_summarize:
user_message_args: dict = {
"chatId": convex_chat_id,
"role": "user",
"content": text,
"source": "web",
}
if images_base64 and images_media_types:
user_message_args["imagesBase64"] = images_base64
user_message_args["imagesMediaTypes"] = images_media_types
await convex.mutation("messages:createFromBackend", user_message_args)
if pending_generation_id:
await convex.mutation(
"pendingGenerations:remove", {"id": pending_generation_id}
)
tg_chat_id = user["telegramChatId"].value if user.get("telegramChatId") else None
if tg_chat_id and not is_summarize:
if images_base64 and images_media_types:
if len(images_base64) == 1:
@@ -387,7 +407,7 @@ async def process_message_from_web( # noqa: C901, PLR0912, PLR0913, PLR0915
await state.flush()
full_history = [*history, {"role": "assistant", "content": final_answer}]
follow_up_model = user.get("followUpModel", "gemini-2.5-flash-lite")
follow_up_model = user.get("followUpModel", "gemini-3.1-flash-lite-preview")
follow_up_prompt = user.get("followUpPrompt")
follow_up_agent = create_follow_up_agent(
api_key=api_key, model_name=follow_up_model, system_prompt=follow_up_prompt
@@ -485,174 +505,189 @@ async def process_message( # noqa: C901, PLR0912, PLR0913, PLR0915
model_name = user.get("model", "gemini-3-pro-preview")
convex_user_id = user["_id"]
proxy_config = get_proxy_config(chat_id)
proxy_state: ProxyStreamingState | None = None
async with get_user_lock(convex_user_id):
proxy_config = get_proxy_config(chat_id)
proxy_state: ProxyStreamingState | None = None
if proxy_config and not skip_proxy_user_message:
with contextlib.suppress(Exception):
await proxy_config.proxy_bot.send_message(
proxy_config.target_chat_id, f"👤 {text}"
if proxy_config and not skip_proxy_user_message:
with contextlib.suppress(Exception):
await proxy_config.proxy_bot.send_message(
proxy_config.target_chat_id, f"👤 {text}"
)
await increment_proxy_count(chat_id)
proxy_config = get_proxy_config(chat_id)
rag_connections = await convex.query(
"ragConnections:getActiveForUser", {"userId": convex_user_id}
)
rag_db_names: list[str] = []
if rag_connections:
for conn in rag_connections:
db = await convex.query(
"rag:getDatabaseById", {"ragDatabaseId": conn["ragDatabaseId"]}
)
if db:
rag_db_names.append(db["name"])
inject_connections = await convex.query(
"injectConnections:getActiveForUser", {"userId": convex_user_id}
)
inject_content = ""
if inject_connections:
for conn in inject_connections:
db = await convex.query(
"inject:getDatabaseById",
{"injectDatabaseId": conn["injectDatabaseId"]},
)
if db and db.get("content"):
inject_content += db["content"] + "\n\n"
inject_content = inject_content.strip()
if not skip_user_message:
await convex.mutation(
"messages:create",
{
"chatId": active_chat_id,
"role": "user",
"content": text,
"source": "telegram",
},
)
await increment_proxy_count(chat_id)
proxy_config = get_proxy_config(chat_id)
rag_connections = await convex.query(
"ragConnections:getActiveForUser", {"userId": convex_user_id}
)
rag_db_names: list[str] = []
if rag_connections:
for conn in rag_connections:
db = await convex.query(
"rag:getDatabaseById", {"ragDatabaseId": conn["ragDatabaseId"]}
)
if db:
rag_db_names.append(db["name"])
inject_connections = await convex.query(
"injectConnections:getActiveForUser", {"userId": convex_user_id}
)
inject_content = ""
if inject_connections:
for conn in inject_connections:
db = await convex.query(
"inject:getDatabaseById", {"injectDatabaseId": conn["injectDatabaseId"]}
)
if db and db.get("content"):
inject_content += db["content"] + "\n\n"
inject_content = inject_content.strip()
if not skip_user_message:
await convex.mutation(
assistant_message_id = await convex.mutation(
"messages:create",
{
"chatId": active_chat_id,
"role": "user",
"content": text,
"role": "assistant",
"content": "",
"source": "telegram",
"isStreaming": True,
},
)
assistant_message_id = await convex.mutation(
"messages:create",
{
"chatId": active_chat_id,
"role": "assistant",
"content": "",
"source": "telegram",
"isStreaming": True,
},
)
history = await convex.query(
"messages:getHistoryForAI", {"chatId": active_chat_id, "limit": 50}
)
system_prompt = user.get("systemPrompt")
if system_prompt and inject_content:
system_prompt = system_prompt.replace("{theory_database}", inject_content)
text_agent = create_text_agent(
api_key=api_key,
model_name=model_name,
system_prompt=system_prompt,
rag_db_names=rag_db_names or None,
)
agent_deps = (
AgentDeps(user_id=convex_user_id, api_key=api_key, rag_db_names=rag_db_names)
if rag_db_names
else None
)
processing_msg = await bot.send_message(chat_id, "...")
state = StreamingState(bot, chat_id, processing_msg)
if proxy_config:
proxy_processing_msg = await proxy_config.proxy_bot.send_message(
proxy_config.target_chat_id, "..."
)
proxy_state = ProxyStreamingState(
proxy_config.proxy_bot, proxy_config.target_chat_id, proxy_processing_msg
history = await convex.query(
"messages:getHistoryForAI", {"chatId": active_chat_id, "limit": 50}
)
try:
await state.start_typing()
system_prompt = user.get("systemPrompt")
if system_prompt and inject_content:
system_prompt = system_prompt.replace("{theory_database}", inject_content)
text_agent = create_text_agent(
api_key=api_key,
model_name=model_name,
system_prompt=system_prompt,
rag_db_names=rag_db_names or None,
)
async def on_chunk(content: str) -> None:
await state.update_message(content)
if proxy_state:
await proxy_state.update_message(content)
await convex.mutation(
"messages:update",
{"messageId": assistant_message_id, "content": content},
agent_deps = (
AgentDeps(
user_id=convex_user_id, api_key=api_key, rag_db_names=rag_db_names
)
if rag_db_names
else None
)
processing_msg = await bot.send_message(chat_id, "...")
state = StreamingState(bot, chat_id, processing_msg)
if proxy_config:
proxy_processing_msg = await proxy_config.proxy_bot.send_message(
proxy_config.target_chat_id, "..."
)
proxy_state = ProxyStreamingState(
proxy_config.proxy_bot,
proxy_config.target_chat_id,
proxy_processing_msg,
)
chat_images = await fetch_chat_images(active_chat_id)
try:
await state.start_typing()
final_answer = await stream_response(
text_agent,
text,
history[:-2],
on_chunk,
images=chat_images,
deps=agent_deps,
)
await state.flush()
if proxy_state:
await proxy_state.flush()
full_history = [*history[:-1], {"role": "assistant", "content": final_answer}]
follow_up_model = user.get("followUpModel", "gemini-2.5-flash-lite")
follow_up_prompt = user.get("followUpPrompt")
follow_up_agent = create_follow_up_agent(
api_key=api_key, model_name=follow_up_model, system_prompt=follow_up_prompt
)
follow_ups = await get_follow_ups(follow_up_agent, full_history, chat_images)
await state.stop_typing()
await convex.mutation(
"messages:update",
{
"messageId": assistant_message_id,
"content": final_answer,
"followUpOptions": follow_ups,
"isStreaming": False,
},
)
with contextlib.suppress(Exception):
await processing_msg.delete()
keyboard = make_follow_up_keyboard(follow_ups)
await send_long_message(bot, chat_id, final_answer, keyboard)
if proxy_state and proxy_config:
with contextlib.suppress(Exception):
await proxy_state.message.delete()
parts = split_message(final_answer)
for part in parts:
await proxy_config.proxy_bot.send_message(
proxy_config.target_chat_id, part
async def on_chunk(content: str) -> None:
await state.update_message(content)
if proxy_state:
await proxy_state.update_message(content)
await convex.mutation(
"messages:update",
{"messageId": assistant_message_id, "content": content},
)
await increment_proxy_count(chat_id)
except Exception as e: # noqa: BLE001
await state.stop_typing()
error_msg = f"Error: {e}"
await convex.mutation(
"messages:update",
{
"messageId": assistant_message_id,
"content": error_msg,
"isStreaming": False,
},
)
with contextlib.suppress(Exception):
await processing_msg.edit_text(html.quote(error_msg[:TELEGRAM_MAX_LENGTH]))
if proxy_state:
chat_images = await fetch_chat_images(active_chat_id)
final_answer = await stream_response(
text_agent,
text,
history[:-2],
on_chunk,
images=chat_images,
deps=agent_deps,
)
await state.flush()
if proxy_state:
await proxy_state.flush()
full_history = [
*history[:-1],
{"role": "assistant", "content": final_answer},
]
follow_up_model = user.get("followUpModel", "gemini-3.1-flash-lite-preview")
follow_up_prompt = user.get("followUpPrompt")
follow_up_agent = create_follow_up_agent(
api_key=api_key,
model_name=follow_up_model,
system_prompt=follow_up_prompt,
)
follow_ups = await get_follow_ups(
follow_up_agent, full_history, chat_images
)
await state.stop_typing()
await convex.mutation(
"messages:update",
{
"messageId": assistant_message_id,
"content": final_answer,
"followUpOptions": follow_ups,
"isStreaming": False,
},
)
with contextlib.suppress(Exception):
await proxy_state.message.edit_text(error_msg[:TELEGRAM_MAX_LENGTH])
await processing_msg.delete()
keyboard = make_follow_up_keyboard(follow_ups)
await send_long_message(bot, chat_id, final_answer, keyboard)
if proxy_state and proxy_config:
with contextlib.suppress(Exception):
await proxy_state.message.delete()
parts = split_message(final_answer)
for part in parts:
await proxy_config.proxy_bot.send_message(
proxy_config.target_chat_id, part
)
await increment_proxy_count(chat_id)
except Exception as e: # noqa: BLE001
await state.stop_typing()
error_msg = f"Error: {e}"
await convex.mutation(
"messages:update",
{
"messageId": assistant_message_id,
"content": error_msg,
"isStreaming": False,
},
)
with contextlib.suppress(Exception):
await processing_msg.edit_text(
html.quote(error_msg[:TELEGRAM_MAX_LENGTH])
)
if proxy_state:
with contextlib.suppress(Exception):
await proxy_state.message.edit_text(error_msg[:TELEGRAM_MAX_LENGTH])
async def send_to_telegram(user_id: int, text: str, bot: Bot) -> None:
+6 -3
View File
@@ -63,7 +63,7 @@ def create_text_agent(
if rag_db_names:
full_prompt = f"{base_prompt}{RAG_SYSTEM_ADDITION} {LATEX_INSTRUCTION}"
agent: Agent[None, str] = Agent(
agent: Agent[AgentDeps, str] = Agent(
model, instructions=full_prompt, deps_type=AgentDeps
)
@@ -101,13 +101,16 @@ def create_text_agent(
def create_follow_up_agent(
api_key: str,
model_name: str = "gemini-2.5-flash-lite",
model_name: str = "gemini-3.1-flash-lite-preview",
system_prompt: str | None = None,
) -> Agent[None, FollowUpOptions]:
provider = GoogleProvider(api_key=api_key)
model = GoogleModel(model_name, provider=provider)
prompt = system_prompt or DEFAULT_FOLLOW_UP
return Agent(model, output_type=FollowUpOptions, instructions=prompt)
agent: Agent[None, FollowUpOptions] = Agent( # ty: ignore[invalid-assignment]
model, output_type=FollowUpOptions, instructions=prompt
)
return agent
def build_message_history(history: list[dict[str, str]]) -> list[ModelMessage]:
+11 -8
View File
@@ -5,6 +5,7 @@ import time
from aiogram import Bot
from bot.handlers.message.handler import process_message_from_web
from bot.user_lock import get_user_lock
from utils import env
from utils.collaborative import (
CollaborativeClient,
@@ -53,14 +54,16 @@ async def start_sync_listener(bot: Bot) -> None:
async def handle_pending_generation(bot: Bot, item: dict, item_id: str) -> None:
try:
await process_message_from_web(
convex_user_id=item["userId"],
text=item["userMessage"],
bot=bot,
convex_chat_id=item["chatId"],
images_base64=item.get("imagesBase64"),
images_media_types=item.get("imagesMediaTypes"),
)
async with get_user_lock(item["userId"]):
await process_message_from_web(
convex_user_id=item["userId"],
text=item["userMessage"],
bot=bot,
convex_chat_id=item["chatId"],
images_base64=item.get("imagesBase64"),
images_media_types=item.get("imagesMediaTypes"),
pending_generation_id=item_id,
)
except Exception as e: # noqa: BLE001
logger.error(f"Error processing {item_id}: {e}")
finally:
+11
View File
@@ -0,0 +1,11 @@
import asyncio
_locks: dict[str, asyncio.Lock] = {}
def get_user_lock(convex_user_id: str) -> asyncio.Lock:
lock = _locks.get(convex_user_id)
if lock is None:
lock = asyncio.Lock()
_locks[convex_user_id] = lock
return lock
@@ -0,0 +1,48 @@
<script lang="ts">
import { processContent } from '$lib/utils/markdown';
interface Props {
content: string;
images?: string[];
mediaTypes?: string[];
}
let { content, images = [], mediaTypes = [] }: Props = $props();
</script>
<div
class="prose-mini w-full rounded-lg bg-blue-600 px-2.5 py-1.5 text-[11px] leading-relaxed text-white opacity-60"
>
{#if images.length > 0}
<div class="mb-1 flex flex-wrap gap-1">
{#each images as img, i (i)}
<img
src={`data:${mediaTypes[i] ?? 'image/jpeg'};base64,${img}`}
alt=""
class="max-h-16 rounded"
/>
{/each}
</div>
{/if}
{#if content}
<!-- eslint-disable-next-line svelte/no-at-html-tags -->
<div>{@html processContent(content)}</div>
{/if}
<div class="mt-1 flex items-center gap-1 text-[8px] text-blue-100/80">
<svg
xmlns="http://www.w3.org/2000/svg"
class="h-2.5 w-2.5 animate-spin"
fill="none"
viewBox="0 0 24 24"
>
<circle cx="12" cy="12" r="10" stroke="currentColor" stroke-width="3" opacity="0.3" />
<path
d="M12 2a10 10 0 0 1 10 10"
stroke="currentColor"
stroke-width="3"
stroke-linecap="round"
/>
</svg>
<span>в очереди</span>
</div>
</div>
+66 -32
View File
@@ -52,8 +52,48 @@ export const create = mutation({
followUpOptions: v.optional(v.array(v.string())),
isStreaming: v.optional(v.boolean())
},
returns: v.id('messages'),
returns: v.union(v.id('messages'), v.null()),
handler: async (ctx, args) => {
const drafts: Array<{ base64: string; mediaType: string; id: Id<'photoDrafts'> }> = [];
if (args.photoDraftIds && args.photoDraftIds.length > 0) {
for (const draftId of args.photoDraftIds) {
const draft = await ctx.db.get(draftId);
if (draft) {
drafts.push({ base64: draft.base64, mediaType: draft.mediaType, id: draft._id });
}
}
}
if (args.source === 'web' && args.role === 'user') {
const chat = await ctx.db.get(args.chatId);
if (chat) {
const pendingGenId = await ctx.db.insert('pendingGenerations', {
userId: chat.userId,
chatId: args.chatId,
userMessage: args.content,
imagesBase64: drafts.length > 0 ? drafts.map((d) => d.base64) : args.imagesBase64,
imagesMediaTypes:
drafts.length > 0 ? drafts.map((d) => d.mediaType) : args.imagesMediaTypes,
createdAt: Date.now()
});
for (let i = 0; i < drafts.length; i++) {
await ctx.db.insert('pendingGenerationImages', {
pendingGenerationId: pendingGenId,
base64: drafts[i].base64,
mediaType: drafts[i].mediaType,
order: i
});
}
}
for (const draft of drafts) {
await ctx.db.delete(draft.id);
}
return null;
}
const messageId = await ctx.db.insert('messages', {
chatId: args.chatId,
role: args.role,
@@ -68,16 +108,6 @@ export const create = mutation({
isStreaming: args.isStreaming
});
const drafts: Array<{ base64: string; mediaType: string; id: Id<'photoDrafts'> }> = [];
if (args.photoDraftIds && args.photoDraftIds.length > 0) {
for (const draftId of args.photoDraftIds) {
const draft = await ctx.db.get(draftId);
if (draft) {
drafts.push({ base64: draft.base64, mediaType: draft.mediaType, id: draft._id });
}
}
}
for (let i = 0; i < drafts.length; i++) {
await ctx.db.insert('messageImages', {
messageId,
@@ -87,27 +117,6 @@ export const create = mutation({
});
}
if (args.source === 'web' && args.role === 'user') {
const chat = await ctx.db.get(args.chatId);
if (chat) {
const pendingGenId = await ctx.db.insert('pendingGenerations', {
userId: chat.userId,
chatId: args.chatId,
userMessage: args.content,
createdAt: Date.now()
});
for (let i = 0; i < drafts.length; i++) {
await ctx.db.insert('pendingGenerationImages', {
pendingGenerationId: pendingGenId,
base64: drafts[i].base64,
mediaType: drafts[i].mediaType,
order: i
});
}
}
}
for (const draft of drafts) {
await ctx.db.delete(draft.id);
}
@@ -116,6 +125,31 @@ export const create = mutation({
}
});
export const createFromBackend = mutation({
args: {
chatId: v.id('chats'),
role: v.union(v.literal('user'), v.literal('assistant')),
content: v.string(),
source: v.union(v.literal('telegram'), v.literal('web')),
imagesBase64: v.optional(v.array(v.string())),
imagesMediaTypes: v.optional(v.array(v.string())),
isStreaming: v.optional(v.boolean())
},
returns: v.id('messages'),
handler: async (ctx, args) => {
return await ctx.db.insert('messages', {
chatId: args.chatId,
role: args.role,
content: args.content,
source: args.source,
imagesBase64: args.imagesBase64,
imagesMediaTypes: args.imagesMediaTypes,
createdAt: Date.now(),
isStreaming: args.isStreaming
});
}
});
export const update = mutation({
args: {
messageId: v.id('messages'),
@@ -40,6 +40,50 @@ export const list = query({
}
});
export const listByChat = query({
args: { chatId: v.id('chats') },
returns: v.array(
v.object({
_id: v.id('pendingGenerations'),
_creationTime: v.number(),
userMessage: v.string(),
imagesBase64: v.optional(v.array(v.string())),
imagesMediaTypes: v.optional(v.array(v.string())),
createdAt: v.number()
})
),
handler: async (ctx, args) => {
const pending = await ctx.db
.query('pendingGenerations')
.withIndex('by_chat_id', (q) => q.eq('chatId', args.chatId))
.order('asc')
.collect();
const result = [];
for (const p of pending) {
const images = await ctx.db
.query('pendingGenerationImages')
.withIndex('by_pending_generation_id', (q) => q.eq('pendingGenerationId', p._id))
.collect();
const sortedImages = images.sort((a, b) => a.order - b.order);
result.push({
_id: p._id,
_creationTime: p._creationTime,
userMessage: p.userMessage,
imagesBase64:
sortedImages.length > 0 ? sortedImages.map((img) => img.base64) : p.imagesBase64,
imagesMediaTypes:
sortedImages.length > 0 ? sortedImages.map((img) => img.mediaType) : p.imagesMediaTypes,
createdAt: p.createdAt
});
}
return result;
}
});
export const create = mutation({
args: {
userId: v.id('users'),
@@ -61,6 +105,10 @@ export const remove = mutation({
args: { id: v.id('pendingGenerations') },
returns: v.null(),
handler: async (ctx, args) => {
const existing = await ctx.db.get(args.id);
if (!existing) {
return null;
}
const images = await ctx.db
.query('pendingGenerationImages')
.withIndex('by_pending_generation_id', (q) => q.eq('pendingGenerationId', args.id))
+1 -1
View File
@@ -65,7 +65,7 @@ export default defineSchema({
imagesBase64: v.optional(v.array(v.string())),
imagesMediaTypes: v.optional(v.array(v.string())),
createdAt: v.number()
}),
}).index('by_chat_id', ['chatId']),
pendingGenerationImages: defineTable({
pendingGenerationId: v.id('pendingGenerations'),
+36 -2
View File
@@ -12,6 +12,7 @@
import { api } from '$lib/convex/_generated/api';
import type { Id } from '$lib/convex/_generated/dataModel';
import ChatMessage from '$lib/components/ChatMessage.svelte';
import PendingMessageBubble from '$lib/components/PendingMessageBubble.svelte';
import ChatInput from '$lib/components/ChatInput.svelte';
import FollowUpButtons from '$lib/components/FollowUpButtons.svelte';
import StealthOverlay from '$lib/components/StealthOverlay.svelte';
@@ -117,7 +118,16 @@
: null;
const messagesQuery = $derived(usePolling ? messagesQueryPoll! : messagesQueryWs!);
const pendingQueryWs = usePolling
? null
: useQuery(api.pendingGenerations.listByChat, () => (chatId ? { chatId } : 'skip'));
const pendingQueryPoll = usePolling
? usePollingQuery(api.pendingGenerations.listByChat, () => (chatId ? { chatId } : 'skip'))
: null;
const pendingQuery = $derived(usePolling ? pendingQueryPoll! : pendingQueryWs!);
let messages = $derived(messagesQuery.data ?? []);
let pendingMessages = $derived(pendingQuery.data ?? []);
let lastMessage = $derived(messages[messages.length - 1]);
let followUpOptions = $derived(
lastMessage?.role === 'assistant' && lastMessage.followUpOptions
@@ -263,12 +273,19 @@
let prevMessageCount = 0;
let prevLastMessageId: string | undefined;
let prevPendingCount = 0;
$effect(() => {
const count = messages.length;
const lastId = lastMessage?._id;
if (count > prevMessageCount || (lastId && lastId !== prevLastMessageId)) {
const pendingCount = pendingMessages.length;
if (
count > prevMessageCount ||
pendingCount > prevPendingCount ||
(lastId && lastId !== prevLastMessageId)
) {
prevMessageCount = count;
prevPendingCount = pendingCount;
prevLastMessageId = lastId;
window.scrollTo(0, document.body.scrollHeight);
}
@@ -577,7 +594,7 @@
{:else}
<div class="space-y-1">
{#each messages as message, i (message._id)}
{#if i === messages.length - 1}
{#if i === messages.length - 1 && pendingMessages.length === 0}
<div bind:this={lastMessageElement}>
<ChatMessage
role={message.role}
@@ -593,6 +610,23 @@
/>
{/if}
{/each}
{#each pendingMessages as p, i (p._id)}
{#if i === pendingMessages.length - 1}
<div bind:this={lastMessageElement}>
<PendingMessageBubble
content={p.userMessage}
images={p.imagesBase64 ?? []}
mediaTypes={p.imagesMediaTypes ?? []}
/>
</div>
{:else}
<PendingMessageBubble
content={p.userMessage}
images={p.imagesBase64 ?? []}
mediaTypes={p.imagesMediaTypes ?? []}
/>
{/if}
{/each}
</div>
{#if followUpOptions.length > 0}