From cfa592e85bf2509e3aa80e1c52769c1bb2937f2d Mon Sep 17 00:00:00 2001 From: h Date: Mon, 4 May 2026 22:30:40 +0200 Subject: [PATCH] feat(*): make message processing sequential --- backend/src/bot/handlers/message/handler.py | 337 ++++++++++-------- backend/src/bot/modules/ai/agent.py | 9 +- backend/src/bot/sync.py | 19 +- backend/src/bot/user_lock.py | 11 + .../components/PendingMessageBubble.svelte | 48 +++ frontend/src/lib/convex/messages.ts | 98 +++-- frontend/src/lib/convex/pendingGenerations.ts | 48 +++ frontend/src/lib/convex/schema.ts | 2 +- frontend/src/routes/[mnemonic]/+page.svelte | 38 +- 9 files changed, 413 insertions(+), 197 deletions(-) create mode 100644 backend/src/bot/user_lock.py create mode 100644 frontend/src/lib/components/PendingMessageBubble.svelte diff --git a/backend/src/bot/handlers/message/handler.py b/backend/src/bot/handlers/message/handler.py index dc77c47..6d5c784 100644 --- a/backend/src/bot/handlers/message/handler.py +++ b/backend/src/bot/handlers/message/handler.py @@ -28,6 +28,7 @@ from bot.modules.ai import ( get_follow_ups, stream_response, ) +from bot.user_lock import get_user_lock from utils import env from utils.convex import ConvexClient @@ -256,15 +257,34 @@ async def process_message_from_web( # noqa: C901, PLR0912, PLR0913, PLR0915 convex_chat_id: str, images_base64: list[str] | None = None, images_media_types: list[str] | None = None, + pending_generation_id: str | None = None, ) -> None: user = await convex.query("users:getById", {"userId": convex_user_id}) if not user or not user.get("geminiApiKey"): return - tg_chat_id = user["telegramChatId"].value if user.get("telegramChatId") else None is_summarize = text == "/summarize" + if not is_summarize: + user_message_args: dict = { + "chatId": convex_chat_id, + "role": "user", + "content": text, + "source": "web", + } + if images_base64 and images_media_types: + user_message_args["imagesBase64"] = images_base64 + user_message_args["imagesMediaTypes"] = images_media_types + await convex.mutation("messages:createFromBackend", user_message_args) + + if pending_generation_id: + await convex.mutation( + "pendingGenerations:remove", {"id": pending_generation_id} + ) + + tg_chat_id = user["telegramChatId"].value if user.get("telegramChatId") else None + if tg_chat_id and not is_summarize: if images_base64 and images_media_types: if len(images_base64) == 1: @@ -387,7 +407,7 @@ async def process_message_from_web( # noqa: C901, PLR0912, PLR0913, PLR0915 await state.flush() full_history = [*history, {"role": "assistant", "content": final_answer}] - follow_up_model = user.get("followUpModel", "gemini-2.5-flash-lite") + follow_up_model = user.get("followUpModel", "gemini-3.1-flash-lite-preview") follow_up_prompt = user.get("followUpPrompt") follow_up_agent = create_follow_up_agent( api_key=api_key, model_name=follow_up_model, system_prompt=follow_up_prompt @@ -485,174 +505,189 @@ async def process_message( # noqa: C901, PLR0912, PLR0913, PLR0915 model_name = user.get("model", "gemini-3-pro-preview") convex_user_id = user["_id"] - proxy_config = get_proxy_config(chat_id) - proxy_state: ProxyStreamingState | None = None + async with get_user_lock(convex_user_id): + proxy_config = get_proxy_config(chat_id) + proxy_state: ProxyStreamingState | None = None - if proxy_config and not skip_proxy_user_message: - with contextlib.suppress(Exception): - await proxy_config.proxy_bot.send_message( - proxy_config.target_chat_id, f"👤 {text}" + if proxy_config and not skip_proxy_user_message: + with contextlib.suppress(Exception): + await proxy_config.proxy_bot.send_message( + proxy_config.target_chat_id, f"👤 {text}" + ) + await increment_proxy_count(chat_id) + proxy_config = get_proxy_config(chat_id) + + rag_connections = await convex.query( + "ragConnections:getActiveForUser", {"userId": convex_user_id} + ) + rag_db_names: list[str] = [] + if rag_connections: + for conn in rag_connections: + db = await convex.query( + "rag:getDatabaseById", {"ragDatabaseId": conn["ragDatabaseId"]} + ) + if db: + rag_db_names.append(db["name"]) + + inject_connections = await convex.query( + "injectConnections:getActiveForUser", {"userId": convex_user_id} + ) + inject_content = "" + if inject_connections: + for conn in inject_connections: + db = await convex.query( + "inject:getDatabaseById", + {"injectDatabaseId": conn["injectDatabaseId"]}, + ) + if db and db.get("content"): + inject_content += db["content"] + "\n\n" + inject_content = inject_content.strip() + + if not skip_user_message: + await convex.mutation( + "messages:create", + { + "chatId": active_chat_id, + "role": "user", + "content": text, + "source": "telegram", + }, ) - await increment_proxy_count(chat_id) - proxy_config = get_proxy_config(chat_id) - rag_connections = await convex.query( - "ragConnections:getActiveForUser", {"userId": convex_user_id} - ) - rag_db_names: list[str] = [] - if rag_connections: - for conn in rag_connections: - db = await convex.query( - "rag:getDatabaseById", {"ragDatabaseId": conn["ragDatabaseId"]} - ) - if db: - rag_db_names.append(db["name"]) - - inject_connections = await convex.query( - "injectConnections:getActiveForUser", {"userId": convex_user_id} - ) - inject_content = "" - if inject_connections: - for conn in inject_connections: - db = await convex.query( - "inject:getDatabaseById", {"injectDatabaseId": conn["injectDatabaseId"]} - ) - if db and db.get("content"): - inject_content += db["content"] + "\n\n" - inject_content = inject_content.strip() - - if not skip_user_message: - await convex.mutation( + assistant_message_id = await convex.mutation( "messages:create", { "chatId": active_chat_id, - "role": "user", - "content": text, + "role": "assistant", + "content": "", "source": "telegram", + "isStreaming": True, }, ) - assistant_message_id = await convex.mutation( - "messages:create", - { - "chatId": active_chat_id, - "role": "assistant", - "content": "", - "source": "telegram", - "isStreaming": True, - }, - ) - - history = await convex.query( - "messages:getHistoryForAI", {"chatId": active_chat_id, "limit": 50} - ) - - system_prompt = user.get("systemPrompt") - if system_prompt and inject_content: - system_prompt = system_prompt.replace("{theory_database}", inject_content) - text_agent = create_text_agent( - api_key=api_key, - model_name=model_name, - system_prompt=system_prompt, - rag_db_names=rag_db_names or None, - ) - - agent_deps = ( - AgentDeps(user_id=convex_user_id, api_key=api_key, rag_db_names=rag_db_names) - if rag_db_names - else None - ) - - processing_msg = await bot.send_message(chat_id, "...") - state = StreamingState(bot, chat_id, processing_msg) - - if proxy_config: - proxy_processing_msg = await proxy_config.proxy_bot.send_message( - proxy_config.target_chat_id, "..." - ) - proxy_state = ProxyStreamingState( - proxy_config.proxy_bot, proxy_config.target_chat_id, proxy_processing_msg + history = await convex.query( + "messages:getHistoryForAI", {"chatId": active_chat_id, "limit": 50} ) - try: - await state.start_typing() + system_prompt = user.get("systemPrompt") + if system_prompt and inject_content: + system_prompt = system_prompt.replace("{theory_database}", inject_content) + text_agent = create_text_agent( + api_key=api_key, + model_name=model_name, + system_prompt=system_prompt, + rag_db_names=rag_db_names or None, + ) - async def on_chunk(content: str) -> None: - await state.update_message(content) - if proxy_state: - await proxy_state.update_message(content) - await convex.mutation( - "messages:update", - {"messageId": assistant_message_id, "content": content}, + agent_deps = ( + AgentDeps( + user_id=convex_user_id, api_key=api_key, rag_db_names=rag_db_names + ) + if rag_db_names + else None + ) + + processing_msg = await bot.send_message(chat_id, "...") + state = StreamingState(bot, chat_id, processing_msg) + + if proxy_config: + proxy_processing_msg = await proxy_config.proxy_bot.send_message( + proxy_config.target_chat_id, "..." + ) + proxy_state = ProxyStreamingState( + proxy_config.proxy_bot, + proxy_config.target_chat_id, + proxy_processing_msg, ) - chat_images = await fetch_chat_images(active_chat_id) + try: + await state.start_typing() - final_answer = await stream_response( - text_agent, - text, - history[:-2], - on_chunk, - images=chat_images, - deps=agent_deps, - ) - - await state.flush() - if proxy_state: - await proxy_state.flush() - - full_history = [*history[:-1], {"role": "assistant", "content": final_answer}] - follow_up_model = user.get("followUpModel", "gemini-2.5-flash-lite") - follow_up_prompt = user.get("followUpPrompt") - follow_up_agent = create_follow_up_agent( - api_key=api_key, model_name=follow_up_model, system_prompt=follow_up_prompt - ) - follow_ups = await get_follow_ups(follow_up_agent, full_history, chat_images) - - await state.stop_typing() - - await convex.mutation( - "messages:update", - { - "messageId": assistant_message_id, - "content": final_answer, - "followUpOptions": follow_ups, - "isStreaming": False, - }, - ) - - with contextlib.suppress(Exception): - await processing_msg.delete() - - keyboard = make_follow_up_keyboard(follow_ups) - await send_long_message(bot, chat_id, final_answer, keyboard) - - if proxy_state and proxy_config: - with contextlib.suppress(Exception): - await proxy_state.message.delete() - parts = split_message(final_answer) - for part in parts: - await proxy_config.proxy_bot.send_message( - proxy_config.target_chat_id, part + async def on_chunk(content: str) -> None: + await state.update_message(content) + if proxy_state: + await proxy_state.update_message(content) + await convex.mutation( + "messages:update", + {"messageId": assistant_message_id, "content": content}, ) - await increment_proxy_count(chat_id) - except Exception as e: # noqa: BLE001 - await state.stop_typing() - error_msg = f"Error: {e}" - await convex.mutation( - "messages:update", - { - "messageId": assistant_message_id, - "content": error_msg, - "isStreaming": False, - }, - ) - with contextlib.suppress(Exception): - await processing_msg.edit_text(html.quote(error_msg[:TELEGRAM_MAX_LENGTH])) - if proxy_state: + chat_images = await fetch_chat_images(active_chat_id) + + final_answer = await stream_response( + text_agent, + text, + history[:-2], + on_chunk, + images=chat_images, + deps=agent_deps, + ) + + await state.flush() + if proxy_state: + await proxy_state.flush() + + full_history = [ + *history[:-1], + {"role": "assistant", "content": final_answer}, + ] + follow_up_model = user.get("followUpModel", "gemini-3.1-flash-lite-preview") + follow_up_prompt = user.get("followUpPrompt") + follow_up_agent = create_follow_up_agent( + api_key=api_key, + model_name=follow_up_model, + system_prompt=follow_up_prompt, + ) + follow_ups = await get_follow_ups( + follow_up_agent, full_history, chat_images + ) + + await state.stop_typing() + + await convex.mutation( + "messages:update", + { + "messageId": assistant_message_id, + "content": final_answer, + "followUpOptions": follow_ups, + "isStreaming": False, + }, + ) + with contextlib.suppress(Exception): - await proxy_state.message.edit_text(error_msg[:TELEGRAM_MAX_LENGTH]) + await processing_msg.delete() + + keyboard = make_follow_up_keyboard(follow_ups) + await send_long_message(bot, chat_id, final_answer, keyboard) + + if proxy_state and proxy_config: + with contextlib.suppress(Exception): + await proxy_state.message.delete() + parts = split_message(final_answer) + for part in parts: + await proxy_config.proxy_bot.send_message( + proxy_config.target_chat_id, part + ) + await increment_proxy_count(chat_id) + + except Exception as e: # noqa: BLE001 + await state.stop_typing() + error_msg = f"Error: {e}" + await convex.mutation( + "messages:update", + { + "messageId": assistant_message_id, + "content": error_msg, + "isStreaming": False, + }, + ) + with contextlib.suppress(Exception): + await processing_msg.edit_text( + html.quote(error_msg[:TELEGRAM_MAX_LENGTH]) + ) + if proxy_state: + with contextlib.suppress(Exception): + await proxy_state.message.edit_text(error_msg[:TELEGRAM_MAX_LENGTH]) async def send_to_telegram(user_id: int, text: str, bot: Bot) -> None: diff --git a/backend/src/bot/modules/ai/agent.py b/backend/src/bot/modules/ai/agent.py index ea39216..6b79137 100644 --- a/backend/src/bot/modules/ai/agent.py +++ b/backend/src/bot/modules/ai/agent.py @@ -63,7 +63,7 @@ def create_text_agent( if rag_db_names: full_prompt = f"{base_prompt}{RAG_SYSTEM_ADDITION} {LATEX_INSTRUCTION}" - agent: Agent[None, str] = Agent( + agent: Agent[AgentDeps, str] = Agent( model, instructions=full_prompt, deps_type=AgentDeps ) @@ -101,13 +101,16 @@ def create_text_agent( def create_follow_up_agent( api_key: str, - model_name: str = "gemini-2.5-flash-lite", + model_name: str = "gemini-3.1-flash-lite-preview", system_prompt: str | None = None, ) -> Agent[None, FollowUpOptions]: provider = GoogleProvider(api_key=api_key) model = GoogleModel(model_name, provider=provider) prompt = system_prompt or DEFAULT_FOLLOW_UP - return Agent(model, output_type=FollowUpOptions, instructions=prompt) + agent: Agent[None, FollowUpOptions] = Agent( # ty: ignore[invalid-assignment] + model, output_type=FollowUpOptions, instructions=prompt + ) + return agent def build_message_history(history: list[dict[str, str]]) -> list[ModelMessage]: diff --git a/backend/src/bot/sync.py b/backend/src/bot/sync.py index f47860b..1467462 100644 --- a/backend/src/bot/sync.py +++ b/backend/src/bot/sync.py @@ -5,6 +5,7 @@ import time from aiogram import Bot from bot.handlers.message.handler import process_message_from_web +from bot.user_lock import get_user_lock from utils import env from utils.collaborative import ( CollaborativeClient, @@ -53,14 +54,16 @@ async def start_sync_listener(bot: Bot) -> None: async def handle_pending_generation(bot: Bot, item: dict, item_id: str) -> None: try: - await process_message_from_web( - convex_user_id=item["userId"], - text=item["userMessage"], - bot=bot, - convex_chat_id=item["chatId"], - images_base64=item.get("imagesBase64"), - images_media_types=item.get("imagesMediaTypes"), - ) + async with get_user_lock(item["userId"]): + await process_message_from_web( + convex_user_id=item["userId"], + text=item["userMessage"], + bot=bot, + convex_chat_id=item["chatId"], + images_base64=item.get("imagesBase64"), + images_media_types=item.get("imagesMediaTypes"), + pending_generation_id=item_id, + ) except Exception as e: # noqa: BLE001 logger.error(f"Error processing {item_id}: {e}") finally: diff --git a/backend/src/bot/user_lock.py b/backend/src/bot/user_lock.py new file mode 100644 index 0000000..3141917 --- /dev/null +++ b/backend/src/bot/user_lock.py @@ -0,0 +1,11 @@ +import asyncio + +_locks: dict[str, asyncio.Lock] = {} + + +def get_user_lock(convex_user_id: str) -> asyncio.Lock: + lock = _locks.get(convex_user_id) + if lock is None: + lock = asyncio.Lock() + _locks[convex_user_id] = lock + return lock diff --git a/frontend/src/lib/components/PendingMessageBubble.svelte b/frontend/src/lib/components/PendingMessageBubble.svelte new file mode 100644 index 0000000..a71bcba --- /dev/null +++ b/frontend/src/lib/components/PendingMessageBubble.svelte @@ -0,0 +1,48 @@ + + +
+ {#if images.length > 0} +
+ {#each images as img, i (i)} + + {/each} +
+ {/if} + {#if content} + +
{@html processContent(content)}
+ {/if} +
+ + + + + в очереди +
+
diff --git a/frontend/src/lib/convex/messages.ts b/frontend/src/lib/convex/messages.ts index 3f96069..c6c8181 100644 --- a/frontend/src/lib/convex/messages.ts +++ b/frontend/src/lib/convex/messages.ts @@ -52,8 +52,48 @@ export const create = mutation({ followUpOptions: v.optional(v.array(v.string())), isStreaming: v.optional(v.boolean()) }, - returns: v.id('messages'), + returns: v.union(v.id('messages'), v.null()), handler: async (ctx, args) => { + const drafts: Array<{ base64: string; mediaType: string; id: Id<'photoDrafts'> }> = []; + if (args.photoDraftIds && args.photoDraftIds.length > 0) { + for (const draftId of args.photoDraftIds) { + const draft = await ctx.db.get(draftId); + if (draft) { + drafts.push({ base64: draft.base64, mediaType: draft.mediaType, id: draft._id }); + } + } + } + + if (args.source === 'web' && args.role === 'user') { + const chat = await ctx.db.get(args.chatId); + if (chat) { + const pendingGenId = await ctx.db.insert('pendingGenerations', { + userId: chat.userId, + chatId: args.chatId, + userMessage: args.content, + imagesBase64: drafts.length > 0 ? drafts.map((d) => d.base64) : args.imagesBase64, + imagesMediaTypes: + drafts.length > 0 ? drafts.map((d) => d.mediaType) : args.imagesMediaTypes, + createdAt: Date.now() + }); + + for (let i = 0; i < drafts.length; i++) { + await ctx.db.insert('pendingGenerationImages', { + pendingGenerationId: pendingGenId, + base64: drafts[i].base64, + mediaType: drafts[i].mediaType, + order: i + }); + } + } + + for (const draft of drafts) { + await ctx.db.delete(draft.id); + } + + return null; + } + const messageId = await ctx.db.insert('messages', { chatId: args.chatId, role: args.role, @@ -68,16 +108,6 @@ export const create = mutation({ isStreaming: args.isStreaming }); - const drafts: Array<{ base64: string; mediaType: string; id: Id<'photoDrafts'> }> = []; - if (args.photoDraftIds && args.photoDraftIds.length > 0) { - for (const draftId of args.photoDraftIds) { - const draft = await ctx.db.get(draftId); - if (draft) { - drafts.push({ base64: draft.base64, mediaType: draft.mediaType, id: draft._id }); - } - } - } - for (let i = 0; i < drafts.length; i++) { await ctx.db.insert('messageImages', { messageId, @@ -87,27 +117,6 @@ export const create = mutation({ }); } - if (args.source === 'web' && args.role === 'user') { - const chat = await ctx.db.get(args.chatId); - if (chat) { - const pendingGenId = await ctx.db.insert('pendingGenerations', { - userId: chat.userId, - chatId: args.chatId, - userMessage: args.content, - createdAt: Date.now() - }); - - for (let i = 0; i < drafts.length; i++) { - await ctx.db.insert('pendingGenerationImages', { - pendingGenerationId: pendingGenId, - base64: drafts[i].base64, - mediaType: drafts[i].mediaType, - order: i - }); - } - } - } - for (const draft of drafts) { await ctx.db.delete(draft.id); } @@ -116,6 +125,31 @@ export const create = mutation({ } }); +export const createFromBackend = mutation({ + args: { + chatId: v.id('chats'), + role: v.union(v.literal('user'), v.literal('assistant')), + content: v.string(), + source: v.union(v.literal('telegram'), v.literal('web')), + imagesBase64: v.optional(v.array(v.string())), + imagesMediaTypes: v.optional(v.array(v.string())), + isStreaming: v.optional(v.boolean()) + }, + returns: v.id('messages'), + handler: async (ctx, args) => { + return await ctx.db.insert('messages', { + chatId: args.chatId, + role: args.role, + content: args.content, + source: args.source, + imagesBase64: args.imagesBase64, + imagesMediaTypes: args.imagesMediaTypes, + createdAt: Date.now(), + isStreaming: args.isStreaming + }); + } +}); + export const update = mutation({ args: { messageId: v.id('messages'), diff --git a/frontend/src/lib/convex/pendingGenerations.ts b/frontend/src/lib/convex/pendingGenerations.ts index db4c8e5..3f8e5ca 100644 --- a/frontend/src/lib/convex/pendingGenerations.ts +++ b/frontend/src/lib/convex/pendingGenerations.ts @@ -40,6 +40,50 @@ export const list = query({ } }); +export const listByChat = query({ + args: { chatId: v.id('chats') }, + returns: v.array( + v.object({ + _id: v.id('pendingGenerations'), + _creationTime: v.number(), + userMessage: v.string(), + imagesBase64: v.optional(v.array(v.string())), + imagesMediaTypes: v.optional(v.array(v.string())), + createdAt: v.number() + }) + ), + handler: async (ctx, args) => { + const pending = await ctx.db + .query('pendingGenerations') + .withIndex('by_chat_id', (q) => q.eq('chatId', args.chatId)) + .order('asc') + .collect(); + + const result = []; + for (const p of pending) { + const images = await ctx.db + .query('pendingGenerationImages') + .withIndex('by_pending_generation_id', (q) => q.eq('pendingGenerationId', p._id)) + .collect(); + + const sortedImages = images.sort((a, b) => a.order - b.order); + + result.push({ + _id: p._id, + _creationTime: p._creationTime, + userMessage: p.userMessage, + imagesBase64: + sortedImages.length > 0 ? sortedImages.map((img) => img.base64) : p.imagesBase64, + imagesMediaTypes: + sortedImages.length > 0 ? sortedImages.map((img) => img.mediaType) : p.imagesMediaTypes, + createdAt: p.createdAt + }); + } + + return result; + } +}); + export const create = mutation({ args: { userId: v.id('users'), @@ -61,6 +105,10 @@ export const remove = mutation({ args: { id: v.id('pendingGenerations') }, returns: v.null(), handler: async (ctx, args) => { + const existing = await ctx.db.get(args.id); + if (!existing) { + return null; + } const images = await ctx.db .query('pendingGenerationImages') .withIndex('by_pending_generation_id', (q) => q.eq('pendingGenerationId', args.id)) diff --git a/frontend/src/lib/convex/schema.ts b/frontend/src/lib/convex/schema.ts index 8bed854..ef901ca 100644 --- a/frontend/src/lib/convex/schema.ts +++ b/frontend/src/lib/convex/schema.ts @@ -65,7 +65,7 @@ export default defineSchema({ imagesBase64: v.optional(v.array(v.string())), imagesMediaTypes: v.optional(v.array(v.string())), createdAt: v.number() - }), + }).index('by_chat_id', ['chatId']), pendingGenerationImages: defineTable({ pendingGenerationId: v.id('pendingGenerations'), diff --git a/frontend/src/routes/[mnemonic]/+page.svelte b/frontend/src/routes/[mnemonic]/+page.svelte index 6c53a1f..5c1bd56 100644 --- a/frontend/src/routes/[mnemonic]/+page.svelte +++ b/frontend/src/routes/[mnemonic]/+page.svelte @@ -12,6 +12,7 @@ import { api } from '$lib/convex/_generated/api'; import type { Id } from '$lib/convex/_generated/dataModel'; import ChatMessage from '$lib/components/ChatMessage.svelte'; + import PendingMessageBubble from '$lib/components/PendingMessageBubble.svelte'; import ChatInput from '$lib/components/ChatInput.svelte'; import FollowUpButtons from '$lib/components/FollowUpButtons.svelte'; import StealthOverlay from '$lib/components/StealthOverlay.svelte'; @@ -117,7 +118,16 @@ : null; const messagesQuery = $derived(usePolling ? messagesQueryPoll! : messagesQueryWs!); + const pendingQueryWs = usePolling + ? null + : useQuery(api.pendingGenerations.listByChat, () => (chatId ? { chatId } : 'skip')); + const pendingQueryPoll = usePolling + ? usePollingQuery(api.pendingGenerations.listByChat, () => (chatId ? { chatId } : 'skip')) + : null; + const pendingQuery = $derived(usePolling ? pendingQueryPoll! : pendingQueryWs!); + let messages = $derived(messagesQuery.data ?? []); + let pendingMessages = $derived(pendingQuery.data ?? []); let lastMessage = $derived(messages[messages.length - 1]); let followUpOptions = $derived( lastMessage?.role === 'assistant' && lastMessage.followUpOptions @@ -263,12 +273,19 @@ let prevMessageCount = 0; let prevLastMessageId: string | undefined; + let prevPendingCount = 0; $effect(() => { const count = messages.length; const lastId = lastMessage?._id; - if (count > prevMessageCount || (lastId && lastId !== prevLastMessageId)) { + const pendingCount = pendingMessages.length; + if ( + count > prevMessageCount || + pendingCount > prevPendingCount || + (lastId && lastId !== prevLastMessageId) + ) { prevMessageCount = count; + prevPendingCount = pendingCount; prevLastMessageId = lastId; window.scrollTo(0, document.body.scrollHeight); } @@ -577,7 +594,7 @@ {:else}
{#each messages as message, i (message._id)} - {#if i === messages.length - 1} + {#if i === messages.length - 1 && pendingMessages.length === 0}
{/if} {/each} + {#each pendingMessages as p, i (p._id)} + {#if i === pendingMessages.length - 1} +
+ +
+ {:else} + + {/if} + {/each}
{#if followUpOptions.length > 0}