fix: improve long-running stability for various channels
This commit is contained in:
parent
dd63337a83
commit
b429bf9381
@ -137,8 +137,15 @@ class DingTalkChannel(BaseChannel):
|
|||||||
|
|
||||||
logger.info("DingTalk bot started with Stream Mode")
|
logger.info("DingTalk bot started with Stream Mode")
|
||||||
|
|
||||||
# client.start() is an async infinite loop handling the websocket connection
|
# Reconnect loop: restart stream if SDK exits or crashes
|
||||||
await self._client.start()
|
while self._running:
|
||||||
|
try:
|
||||||
|
await self._client.start()
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"DingTalk stream error: {e}")
|
||||||
|
if self._running:
|
||||||
|
logger.info("Reconnecting DingTalk stream in 5 seconds...")
|
||||||
|
await asyncio.sleep(5)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.exception(f"Failed to start DingTalk channel: {e}")
|
logger.exception(f"Failed to start DingTalk channel: {e}")
|
||||||
|
|||||||
@ -98,12 +98,15 @@ class FeishuChannel(BaseChannel):
|
|||||||
log_level=lark.LogLevel.INFO
|
log_level=lark.LogLevel.INFO
|
||||||
)
|
)
|
||||||
|
|
||||||
# Start WebSocket client in a separate thread
|
# Start WebSocket client in a separate thread with reconnect loop
|
||||||
def run_ws():
|
def run_ws():
|
||||||
try:
|
while self._running:
|
||||||
self._ws_client.start()
|
try:
|
||||||
except Exception as e:
|
self._ws_client.start()
|
||||||
logger.error(f"Feishu WebSocket error: {e}")
|
except Exception as e:
|
||||||
|
logger.warning(f"Feishu WebSocket error: {e}")
|
||||||
|
if self._running:
|
||||||
|
import time; time.sleep(5)
|
||||||
|
|
||||||
self._ws_thread = threading.Thread(target=run_ws, daemon=True)
|
self._ws_thread = threading.Thread(target=run_ws, daemon=True)
|
||||||
self._ws_thread.start()
|
self._ws_thread.start()
|
||||||
|
|||||||
@ -75,12 +75,15 @@ class QQChannel(BaseChannel):
|
|||||||
logger.info("QQ bot started (C2C private message)")
|
logger.info("QQ bot started (C2C private message)")
|
||||||
|
|
||||||
async def _run_bot(self) -> None:
|
async def _run_bot(self) -> None:
|
||||||
"""Run the bot connection."""
|
"""Run the bot connection with auto-reconnect."""
|
||||||
try:
|
while self._running:
|
||||||
await self._client.start(appid=self.config.app_id, secret=self.config.secret)
|
try:
|
||||||
except Exception as e:
|
await self._client.start(appid=self.config.app_id, secret=self.config.secret)
|
||||||
logger.error(f"QQ auth failed, check AppID/Secret at q.qq.com: {e}")
|
except Exception as e:
|
||||||
self._running = False
|
logger.warning(f"QQ bot error: {e}")
|
||||||
|
if self._running:
|
||||||
|
logger.info("Reconnecting QQ bot in 5 seconds...")
|
||||||
|
await asyncio.sleep(5)
|
||||||
|
|
||||||
async def stop(self) -> None:
|
async def stop(self) -> None:
|
||||||
"""Stop the QQ bot."""
|
"""Stop the QQ bot."""
|
||||||
|
|||||||
@ -9,6 +9,7 @@ from typing import TYPE_CHECKING
|
|||||||
from loguru import logger
|
from loguru import logger
|
||||||
from telegram import BotCommand, Update
|
from telegram import BotCommand, Update
|
||||||
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
|
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
|
||||||
|
from telegram.request import HTTPXRequest
|
||||||
|
|
||||||
from nanobot.bus.events import OutboundMessage
|
from nanobot.bus.events import OutboundMessage
|
||||||
from nanobot.bus.queue import MessageBus
|
from nanobot.bus.queue import MessageBus
|
||||||
@ -121,11 +122,13 @@ class TelegramChannel(BaseChannel):
|
|||||||
|
|
||||||
self._running = True
|
self._running = True
|
||||||
|
|
||||||
# Build the application
|
# Build the application with larger connection pool to avoid pool-timeout on long runs
|
||||||
builder = Application.builder().token(self.config.token)
|
req = HTTPXRequest(connection_pool_size=16, pool_timeout=5.0, connect_timeout=30.0, read_timeout=30.0)
|
||||||
|
builder = Application.builder().token(self.config.token).request(req).get_updates_request(req)
|
||||||
if self.config.proxy:
|
if self.config.proxy:
|
||||||
builder = builder.proxy(self.config.proxy).get_updates_proxy(self.config.proxy)
|
builder = builder.proxy(self.config.proxy).get_updates_proxy(self.config.proxy)
|
||||||
self._app = builder.build()
|
self._app = builder.build()
|
||||||
|
self._app.add_error_handler(self._on_error)
|
||||||
|
|
||||||
# Add command handlers
|
# Add command handlers
|
||||||
self._app.add_handler(CommandHandler("start", self._on_start))
|
self._app.add_handler(CommandHandler("start", self._on_start))
|
||||||
@ -386,6 +389,10 @@ class TelegramChannel(BaseChannel):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.debug(f"Typing indicator stopped for {chat_id}: {e}")
|
logger.debug(f"Typing indicator stopped for {chat_id}: {e}")
|
||||||
|
|
||||||
|
async def _on_error(self, update: object, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||||
|
"""Log polling / handler errors instead of silently swallowing them."""
|
||||||
|
logger.error(f"Telegram error: {context.error}")
|
||||||
|
|
||||||
def _get_extension(self, media_type: str, mime_type: str | None) -> str:
|
def _get_extension(self, media_type: str, mime_type: str | None) -> str:
|
||||||
"""Get file extension based on media type."""
|
"""Get file extension based on media type."""
|
||||||
if mime_type:
|
if mime_type:
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user