feat(*): add data injection

This commit is contained in:
h
2026-02-04 17:26:41 +01:00
parent 504282cb0f
commit 5980d98019
12 changed files with 648 additions and 3 deletions

View File

@@ -1,6 +1,6 @@
from aiogram import Router from aiogram import Router
from . import apikey, chat, initialize, message, proxy, rag, start from . import apikey, chat, initialize, inject, message, proxy, rag, start
router = Router() router = Router()
@@ -10,6 +10,7 @@ router.include_routers(
apikey.router, apikey.router,
chat.router, chat.router,
rag.router, rag.router,
inject.router,
proxy.router, proxy.router,
message.router, message.router,
) )

View File

@@ -18,6 +18,7 @@ async def startup(bot: Bot) -> None:
types.BotCommand(command="/presets", description="Show prompt presets"), types.BotCommand(command="/presets", description="Show prompt presets"),
types.BotCommand(command="/preset", description="Apply a preset"), types.BotCommand(command="/preset", description="Apply a preset"),
types.BotCommand(command="/proxy", description="Proxy chat to another bot"), types.BotCommand(command="/proxy", description="Proxy chat to another bot"),
types.BotCommand(command="/inject", description="Inject knowledge base"),
] ]
) )
logger.info(f"[green]Started as[/] @{(await bot.me()).username}") logger.info(f"[green]Started as[/] @{(await bot.me()).username}")

View File

@@ -0,0 +1,10 @@
from aiogram import Router
from .collection import router as collection_router
from .handler import router as command_router
router = Router()
router.include_routers(command_router, collection_router)
__all__ = ["router"]

View File

@@ -0,0 +1,97 @@
import io
from aiogram import Bot, F, Router, types
from aiogram.filters import Filter
from convex import ConvexInt64
from utils import env
from utils.convex import ConvexClient
router = Router()
convex = ConvexClient(env.convex_url)
class InInjectCollectionMode(Filter):
async def __call__(self, message: types.Message) -> bool | dict:
if not message.from_user:
return False
user = await convex.query(
"users:getByTelegramId", {"telegramId": ConvexInt64(message.from_user.id)}
)
if not user or not user.get("injectCollectionMode"):
return False
return {
"inject_user": user,
"inject_collection_mode": user["injectCollectionMode"],
}
in_collection_mode = InInjectCollectionMode()
@router.message(in_collection_mode, F.text & ~F.text.startswith("/"))
async def on_text_in_collection_mode(
message: types.Message, inject_user: dict, inject_collection_mode: dict
) -> None:
if not message.text:
return
db_id = inject_collection_mode["injectDatabaseId"]
db = await convex.query("inject:getDatabaseById", {"injectDatabaseId": db_id})
db_name = db["name"] if db else "database"
await convex.mutation(
"inject:setContent", {"injectDatabaseId": db_id, "content": message.text}
)
await convex.mutation(
"users:stopInjectCollectionMode", {"userId": inject_user["_id"]}
)
await message.answer(
f"✓ Text saved to '{db_name}'.\n\n"
f"Connect it with: <code>/inject connect {db_name}</code>",
parse_mode="HTML",
)
@router.message(in_collection_mode, F.document)
async def on_document_in_collection_mode(
message: types.Message, bot: Bot, inject_user: dict, inject_collection_mode: dict
) -> None:
if not message.document:
return
doc = message.document
db_id = inject_collection_mode["injectDatabaseId"]
db = await convex.query("inject:getDatabaseById", {"injectDatabaseId": db_id})
db_name = db["name"] if db else "database"
file = await bot.get_file(doc.file_id)
if not file.file_path:
await message.answer("Failed to download file.")
return
buffer = io.BytesIO()
await bot.download_file(file.file_path, buffer)
text = buffer.getvalue().decode("utf-8")
await convex.mutation(
"inject:setContent", {"injectDatabaseId": db_id, "content": text}
)
await convex.mutation(
"users:stopInjectCollectionMode", {"userId": inject_user["_id"]}
)
file_name = doc.file_name or "file"
await message.answer(
f"'{file_name}' saved to '{db_name}'.\n\n"
f"Connect it with: <code>/inject connect {db_name}</code>",
parse_mode="HTML",
)

View File

@@ -0,0 +1,172 @@
from aiogram import Router, types
from aiogram.filters import Command
from convex import ConvexInt64
from utils import env
from utils.convex import ConvexClient
router = Router()
convex = ConvexClient(env.convex_url)
@router.message(Command("inject"))
async def on_inject(message: types.Message) -> None:
if not message.from_user or not message.text:
return
args = message.text.split()[1:]
if not args:
await show_usage(message)
return
user = await convex.query(
"users:getByTelegramId", {"telegramId": ConvexInt64(message.from_user.id)}
)
if not user:
await message.answer("Use /apikey first to set your Gemini API key.")
return
user_id = user["_id"]
command = args[0]
if command == "list":
await list_databases(message, user_id)
return
if len(args) < 2: # noqa: PLR2004
await show_usage(message)
return
db_name = args[1]
if command == "create":
await create_database(message, user_id, db_name)
elif command == "connect":
await connect_database(message, user_id, db_name)
elif command == "disconnect":
await disconnect_database(message, user_id, db_name)
elif command == "clear":
await clear_database(message, user_id, db_name)
else:
await show_usage(message)
async def show_usage(message: types.Message) -> None:
await message.answer(
"<b>Inject Commands:</b>\n\n"
"<code>/inject list</code> - List inject databases\n\n"
"<code>/inject create &lt;name&gt;</code> - Create and upload one file\n"
"<code>/inject connect &lt;name&gt;</code> - Connect to all chats\n"
"<code>/inject disconnect &lt;name&gt;</code> - Disconnect\n"
"<code>/inject clear &lt;name&gt;</code> - Delete database",
parse_mode="HTML",
)
async def list_databases(message: types.Message, user_id: str) -> None:
databases = await convex.query("inject:listDatabases", {"userId": user_id})
connections = await convex.query(
"injectConnections:getActiveForUser", {"userId": user_id}
)
connected_db_ids = {conn["injectDatabaseId"] for conn in connections}
if not databases:
await message.answer(
"No inject databases found.\n\n"
"Create one with: <code>/inject create mydb</code>",
parse_mode="HTML",
)
return
lines = ["<b>Your inject databases:</b>\n"]
for db in databases:
status = ""
if db["_id"] in connected_db_ids:
status += " (connected)"
if not db.get("content"):
status += " (empty)"
lines.append(f"{db['name']}{status}")
await message.answer("\n".join(lines), parse_mode="HTML")
async def create_database(message: types.Message, user_id: str, db_name: str) -> None:
collection_mode = await convex.query(
"users:getInjectCollectionMode", {"userId": user_id}
)
if collection_mode:
await message.answer(
"Already waiting for a file. Send a file or text to complete."
)
return
db_id = await convex.mutation(
"inject:createDatabase", {"userId": user_id, "name": db_name}
)
await convex.mutation(
"users:startInjectCollectionMode",
{"userId": user_id, "injectDatabaseId": db_id},
)
await message.answer(
f"<b>Waiting for content for '{db_name}'</b>\n\n"
"Send a file (json, txt, csv, etc.) or a text message.\n"
"It will be saved automatically.",
parse_mode="HTML",
)
async def connect_database(message: types.Message, user_id: str, db_name: str) -> None:
db = await convex.query("inject:getDatabase", {"userId": user_id, "name": db_name})
if not db:
await message.answer(
f"Database '{db_name}' not found.\n"
f"Create it with: <code>/inject create {db_name}</code>",
parse_mode="HTML",
)
return
await convex.mutation(
"injectConnections:connect",
{"userId": user_id, "injectDatabaseId": db["_id"], "isGlobal": True},
)
await message.answer(f"'{db_name}' connected to all your chats.")
async def disconnect_database(
message: types.Message, user_id: str, db_name: str
) -> None:
db = await convex.query("inject:getDatabase", {"userId": user_id, "name": db_name})
if not db:
await message.answer(f"Database '{db_name}' not found.")
return
result = await convex.mutation(
"injectConnections:disconnect",
{"userId": user_id, "injectDatabaseId": db["_id"]},
)
if result:
await message.answer(f"'{db_name}' disconnected.")
else:
await message.answer(f"'{db_name}' was not connected.")
async def clear_database(message: types.Message, user_id: str, db_name: str) -> None:
db = await convex.query("inject:getDatabase", {"userId": user_id, "name": db_name})
if not db:
await message.answer(f"Database '{db_name}' not found.")
return
await convex.mutation("inject:deleteDatabase", {"injectDatabaseId": db["_id"]})
await message.answer(f"'{db_name}' deleted.")

View File

@@ -265,6 +265,19 @@ async def process_message_from_web( # noqa: C901, PLR0912, PLR0913, PLR0915
if db: if db:
rag_db_names.append(db["name"]) rag_db_names.append(db["name"])
inject_connections = await convex.query(
"injectConnections:getActiveForUser", {"userId": convex_user_id}
)
inject_content = ""
if inject_connections:
for conn in inject_connections:
db = await convex.query(
"inject:getDatabaseById", {"injectDatabaseId": conn["injectDatabaseId"]}
)
if db and db.get("content"):
inject_content += db["content"] + "\n\n"
inject_content = inject_content.strip()
assistant_message_id = await convex.mutation( assistant_message_id = await convex.mutation(
"messages:create", "messages:create",
{ {
@@ -281,6 +294,8 @@ async def process_message_from_web( # noqa: C901, PLR0912, PLR0913, PLR0915
) )
system_prompt = SUMMARIZE_PROMPT if is_summarize else user.get("systemPrompt") system_prompt = SUMMARIZE_PROMPT if is_summarize else user.get("systemPrompt")
if system_prompt and inject_content:
system_prompt = system_prompt.replace("{theory_database}", inject_content)
text_agent = create_text_agent( text_agent = create_text_agent(
api_key=api_key, api_key=api_key,
model_name=model_name, model_name=model_name,
@@ -441,6 +456,19 @@ async def process_message( # noqa: C901, PLR0912, PLR0913, PLR0915
if db: if db:
rag_db_names.append(db["name"]) rag_db_names.append(db["name"])
inject_connections = await convex.query(
"injectConnections:getActiveForUser", {"userId": convex_user_id}
)
inject_content = ""
if inject_connections:
for conn in inject_connections:
db = await convex.query(
"inject:getDatabaseById", {"injectDatabaseId": conn["injectDatabaseId"]}
)
if db and db.get("content"):
inject_content += db["content"] + "\n\n"
inject_content = inject_content.strip()
if not skip_user_message: if not skip_user_message:
await convex.mutation( await convex.mutation(
"messages:create", "messages:create",
@@ -467,10 +495,13 @@ async def process_message( # noqa: C901, PLR0912, PLR0913, PLR0915
"messages:getHistoryForAI", {"chatId": active_chat_id, "limit": 50} "messages:getHistoryForAI", {"chatId": active_chat_id, "limit": 50}
) )
system_prompt = user.get("systemPrompt")
if system_prompt and inject_content:
system_prompt = system_prompt.replace("{theory_database}", inject_content)
text_agent = create_text_agent( text_agent = create_text_agent(
api_key=api_key, api_key=api_key,
model_name=model_name, model_name=model_name,
system_prompt=user.get("systemPrompt"), system_prompt=system_prompt,
rag_db_names=rag_db_names if rag_db_names else None, rag_db_names=rag_db_names if rag_db_names else None,
) )

View File

@@ -22,6 +22,48 @@ for example Group A: 1, Group A: 2a, Group B: 2b, etc.
Or, Theory: 1, Theory: 2a, Practice: 1, etc. Or, Theory: 1, Theory: 2a, Practice: 1, etc.
Only output identifiers that exist in the image.""" Only output identifiers that exist in the image."""
PROOFS_SYSTEM = """
You are an Examination Engine designed for Apple Watch output.
CONTEXT: You have a loaded JSON database of theoretical knowledge below.
<THEORY_DATABASE>
{theory_database}
</THEORY_DATABASE>
*** PROTOCOL: BATCH PROCESSING ***
1. IMAGE INPUT (Primary Mode):
- **DETECT ALL** tasks/questions visible in the image.
- **SOLVE ALL** of them sequentially in a single response.
- **ORDER:** Follow the numbering on the exam sheet (Ex 1, Ex 2, ...).
- **SEPARATOR:** Use "---" between tasks.
2. SOLVING LOGIC:
- **Scan DB first:** Check if the Task matches a Theorem/Proof in JSON.
- IF MATCH: Output `statement` AND `proof` VERBATIM from JSON
(as requested in task)
- IF PARTIAL MATCH (e.g., specific function):
Use JSON method but plug in the numbers.
- **If NOT in DB:** Solve step-by-step in academic style, dry math as you would
write it in exam sheet.
- **Style:** Dry, formal, "notebook" style. No conversational filler.
3. APPLE WATCH FORMATTING (Strict):
- **Line Width:** MAX 25-30 chars. Force line breaks (`\\`) often.
- **Math:** Standard LaTeX blocks `$$...$$` or inline `$..$`.
- **Structure:**
**Ex. X ([Topic])**
[Solution/Proof]
---
**Ex. Y ([Topic])**
[Solution/Proof]
4. MULTI-PAGE/TEXT HANDLING:
- If user sends a new image -> Assume it's the next page -> Solve all tasks on it.
- If user types text (e.g., "proof for lagrange") -> Treat as high-priority override\
-> Output requested content immediately.
- Ignore typos in text input (fuzzy match).
"""
RAGTHEORY_SYSTEM = """You help answer theoretical exam questions. RAGTHEORY_SYSTEM = """You help answer theoretical exam questions.
@@ -74,4 +116,5 @@ Max 2-3 sentences. This is for Apple Watch display."""
PRESETS: dict[str, tuple[str, str]] = { PRESETS: dict[str, tuple[str, str]] = {
"exam": (EXAM_SYSTEM, EXAM_FOLLOW_UP), "exam": (EXAM_SYSTEM, EXAM_FOLLOW_UP),
"ragtheory": (RAGTHEORY_SYSTEM, EXAM_FOLLOW_UP), "ragtheory": (RAGTHEORY_SYSTEM, EXAM_FOLLOW_UP),
"proofs": (PROOFS_SYSTEM, EXAM_FOLLOW_UP),
} }

View File

@@ -11,6 +11,8 @@
import type * as chats from "../chats.js"; import type * as chats from "../chats.js";
import type * as devicePairings from "../devicePairings.js"; import type * as devicePairings from "../devicePairings.js";
import type * as http from "../http.js"; import type * as http from "../http.js";
import type * as inject from "../inject.js";
import type * as injectConnections from "../injectConnections.js";
import type * as messages from "../messages.js"; import type * as messages from "../messages.js";
import type * as pairingRequests from "../pairingRequests.js"; import type * as pairingRequests from "../pairingRequests.js";
import type * as pendingGenerations from "../pendingGenerations.js"; import type * as pendingGenerations from "../pendingGenerations.js";
@@ -30,6 +32,8 @@ declare const fullApi: ApiFromModules<{
chats: typeof chats; chats: typeof chats;
devicePairings: typeof devicePairings; devicePairings: typeof devicePairings;
http: typeof http; http: typeof http;
inject: typeof inject;
injectConnections: typeof injectConnections;
messages: typeof messages; messages: typeof messages;
pairingRequests: typeof pairingRequests; pairingRequests: typeof pairingRequests;
pendingGenerations: typeof pendingGenerations; pendingGenerations: typeof pendingGenerations;

View File

@@ -0,0 +1,109 @@
import { v } from 'convex/values';
import { mutation, query } from './_generated/server';
export const createDatabase = mutation({
args: { userId: v.id('users'), name: v.string() },
returns: v.id('injectDatabases'),
handler: async (ctx, args) => {
const existing = await ctx.db
.query('injectDatabases')
.withIndex('by_user_id_and_name', (q) => q.eq('userId', args.userId).eq('name', args.name))
.unique();
if (existing) {
return existing._id;
}
return await ctx.db.insert('injectDatabases', {
userId: args.userId,
name: args.name,
createdAt: Date.now()
});
}
});
export const getDatabase = query({
args: { userId: v.id('users'), name: v.string() },
returns: v.union(
v.object({
_id: v.id('injectDatabases'),
_creationTime: v.number(),
userId: v.id('users'),
name: v.string(),
content: v.optional(v.string()),
createdAt: v.number()
}),
v.null()
),
handler: async (ctx, args) => {
return await ctx.db
.query('injectDatabases')
.withIndex('by_user_id_and_name', (q) => q.eq('userId', args.userId).eq('name', args.name))
.unique();
}
});
export const getDatabaseById = query({
args: { injectDatabaseId: v.id('injectDatabases') },
returns: v.union(
v.object({
_id: v.id('injectDatabases'),
_creationTime: v.number(),
userId: v.id('users'),
name: v.string(),
content: v.optional(v.string()),
createdAt: v.number()
}),
v.null()
),
handler: async (ctx, args) => {
return await ctx.db.get(args.injectDatabaseId);
}
});
export const listDatabases = query({
args: { userId: v.id('users') },
returns: v.array(
v.object({
_id: v.id('injectDatabases'),
_creationTime: v.number(),
userId: v.id('users'),
name: v.string(),
content: v.optional(v.string()),
createdAt: v.number()
})
),
handler: async (ctx, args) => {
return await ctx.db
.query('injectDatabases')
.withIndex('by_user_id', (q) => q.eq('userId', args.userId))
.collect();
}
});
export const setContent = mutation({
args: { injectDatabaseId: v.id('injectDatabases'), content: v.string() },
returns: v.null(),
handler: async (ctx, args) => {
await ctx.db.patch(args.injectDatabaseId, { content: args.content });
return null;
}
});
export const deleteDatabase = mutation({
args: { injectDatabaseId: v.id('injectDatabases') },
returns: v.null(),
handler: async (ctx, args) => {
const connections = await ctx.db
.query('injectConnections')
.withIndex('by_inject_database_id', (q) => q.eq('injectDatabaseId', args.injectDatabaseId))
.collect();
for (const conn of connections) {
await ctx.db.delete(conn._id);
}
await ctx.db.delete(args.injectDatabaseId);
return null;
}
});

View File

@@ -0,0 +1,102 @@
import { v } from 'convex/values';
import { mutation, query } from './_generated/server';
export const connect = mutation({
args: {
userId: v.id('users'),
injectDatabaseId: v.id('injectDatabases'),
isGlobal: v.optional(v.boolean())
},
returns: v.id('injectConnections'),
handler: async (ctx, args) => {
const existing = await ctx.db
.query('injectConnections')
.withIndex('by_user_id_and_inject_database_id', (q) =>
q.eq('userId', args.userId).eq('injectDatabaseId', args.injectDatabaseId)
)
.unique();
if (existing) {
return existing._id;
}
return await ctx.db.insert('injectConnections', {
userId: args.userId,
injectDatabaseId: args.injectDatabaseId,
isGlobal: args.isGlobal ?? true,
createdAt: Date.now()
});
}
});
export const disconnect = mutation({
args: {
userId: v.id('users'),
injectDatabaseId: v.id('injectDatabases')
},
returns: v.boolean(),
handler: async (ctx, args) => {
const existing = await ctx.db
.query('injectConnections')
.withIndex('by_user_id_and_inject_database_id', (q) =>
q.eq('userId', args.userId).eq('injectDatabaseId', args.injectDatabaseId)
)
.unique();
if (!existing) {
return false;
}
await ctx.db.delete(existing._id);
return true;
}
});
export const getActiveForUser = query({
args: { userId: v.id('users') },
returns: v.array(
v.object({
_id: v.id('injectConnections'),
_creationTime: v.number(),
userId: v.id('users'),
injectDatabaseId: v.id('injectDatabases'),
isGlobal: v.boolean(),
createdAt: v.number()
})
),
handler: async (ctx, args) => {
return await ctx.db
.query('injectConnections')
.withIndex('by_user_id', (q) => q.eq('userId', args.userId))
.collect();
}
});
export const getByInjectDatabaseId = query({
args: { injectDatabaseId: v.id('injectDatabases') },
returns: v.array(
v.object({
_id: v.id('injectConnections'),
_creationTime: v.number(),
userId: v.id('users'),
injectDatabaseId: v.id('injectDatabases'),
isGlobal: v.boolean(),
createdAt: v.number()
})
),
handler: async (ctx, args) => {
return await ctx.db
.query('injectConnections')
.withIndex('by_inject_database_id', (q) => q.eq('injectDatabaseId', args.injectDatabaseId))
.collect();
}
});
export const deleteConnection = mutation({
args: { connectionId: v.id('injectConnections') },
returns: v.null(),
handler: async (ctx, args) => {
await ctx.db.delete(args.connectionId);
return null;
}
});

View File

@@ -16,6 +16,12 @@ export default defineSchema({
ragDatabaseId: v.id('ragDatabases'), ragDatabaseId: v.id('ragDatabases'),
activeSince: v.number() activeSince: v.number()
}) })
),
injectCollectionMode: v.optional(
v.object({
injectDatabaseId: v.id('injectDatabases'),
activeSince: v.number()
})
) )
}).index('by_telegram_id', ['telegramId']), }).index('by_telegram_id', ['telegramId']),
@@ -122,5 +128,24 @@ export default defineSchema({
}) })
.index('by_user_id', ['userId']) .index('by_user_id', ['userId'])
.index('by_user_id_and_rag_database_id', ['userId', 'ragDatabaseId']) .index('by_user_id_and_rag_database_id', ['userId', 'ragDatabaseId'])
.index('by_rag_database_id', ['ragDatabaseId']) .index('by_rag_database_id', ['ragDatabaseId']),
injectDatabases: defineTable({
userId: v.id('users'),
name: v.string(),
content: v.optional(v.string()),
createdAt: v.number()
})
.index('by_user_id', ['userId'])
.index('by_user_id_and_name', ['userId', 'name']),
injectConnections: defineTable({
userId: v.id('users'),
injectDatabaseId: v.id('injectDatabases'),
isGlobal: v.boolean(),
createdAt: v.number()
})
.index('by_user_id', ['userId'])
.index('by_user_id_and_inject_database_id', ['userId', 'injectDatabaseId'])
.index('by_inject_database_id', ['injectDatabaseId'])
}); });

View File

@@ -22,6 +22,12 @@ export const getById = query({
ragDatabaseId: v.id('ragDatabases'), ragDatabaseId: v.id('ragDatabases'),
activeSince: v.number() activeSince: v.number()
}) })
),
injectCollectionMode: v.optional(
v.object({
injectDatabaseId: v.id('injectDatabases'),
activeSince: v.number()
})
) )
}), }),
v.null() v.null()
@@ -50,6 +56,12 @@ export const getByTelegramId = query({
ragDatabaseId: v.id('ragDatabases'), ragDatabaseId: v.id('ragDatabases'),
activeSince: v.number() activeSince: v.number()
}) })
),
injectCollectionMode: v.optional(
v.object({
injectDatabaseId: v.id('injectDatabases'),
activeSince: v.number()
})
) )
}), }),
v.null() v.null()
@@ -177,3 +189,41 @@ export const getRagCollectionMode = query({
return user?.ragCollectionMode ?? null; return user?.ragCollectionMode ?? null;
} }
}); });
export const startInjectCollectionMode = mutation({
args: { userId: v.id('users'), injectDatabaseId: v.id('injectDatabases') },
returns: v.null(),
handler: async (ctx, args) => {
await ctx.db.patch(args.userId, {
injectCollectionMode: {
injectDatabaseId: args.injectDatabaseId,
activeSince: Date.now()
}
});
return null;
}
});
export const stopInjectCollectionMode = mutation({
args: { userId: v.id('users') },
returns: v.null(),
handler: async (ctx, args) => {
await ctx.db.patch(args.userId, { injectCollectionMode: undefined });
return null;
}
});
export const getInjectCollectionMode = query({
args: { userId: v.id('users') },
returns: v.union(
v.object({
injectDatabaseId: v.id('injectDatabases'),
activeSince: v.number()
}),
v.null()
),
handler: async (ctx, args) => {
const user = await ctx.db.get(args.userId);
return user?.injectCollectionMode ?? null;
}
});