nanobot/nanobot/channels/manager.py

140 lines
4.7 KiB
Python

"""Channel manager for coordinating chat channels."""
import asyncio
from typing import Any
from loguru import logger
from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.channels.base import BaseChannel
from nanobot.config.schema import Config
class ChannelManager:
"""
Manages chat channels and coordinates message routing.
Responsibilities:
- Initialize enabled channels (Telegram, WhatsApp, etc.)
- Start/stop channels
- Route outbound messages
"""
def __init__(self, config: Config, bus: MessageBus):
self.config = config
self.bus = bus
self.channels: dict[str, BaseChannel] = {}
self._dispatch_task: asyncio.Task | None = None
self._init_channels()
def _init_channels(self) -> None:
"""Initialize channels based on config."""
# Telegram channel
if self.config.channels.telegram.enabled:
try:
from nanobot.channels.telegram import TelegramChannel
self.channels["telegram"] = TelegramChannel(
self.config.channels.telegram,
self.bus,
groq_api_key=self.config.providers.groq.api_key,
)
logger.info("Telegram channel enabled")
except ImportError as e:
logger.warning(f"Telegram channel not available: {e}")
# WhatsApp channel
if self.config.channels.whatsapp.enabled:
try:
from nanobot.channels.whatsapp import WhatsAppChannel
self.channels["whatsapp"] = WhatsAppChannel(
self.config.channels.whatsapp, self.bus
)
logger.info("WhatsApp channel enabled")
except ImportError as e:
logger.warning(f"WhatsApp channel not available: {e}")
async def start_all(self) -> None:
"""Start WhatsApp channel and the outbound dispatcher."""
if not self.channels:
logger.warning("No channels enabled")
return
# Start outbound dispatcher
self._dispatch_task = asyncio.create_task(self._dispatch_outbound())
# Start WhatsApp channel
tasks = []
for name, channel in self.channels.items():
logger.info(f"Starting {name} channel...")
tasks.append(asyncio.create_task(channel.start()))
# Wait for all to complete (they should run forever)
await asyncio.gather(*tasks, return_exceptions=True)
async def stop_all(self) -> None:
"""Stop all channels and the dispatcher."""
logger.info("Stopping all channels...")
# Stop dispatcher
if self._dispatch_task:
self._dispatch_task.cancel()
try:
await self._dispatch_task
except asyncio.CancelledError:
pass
# Stop all channels
for name, channel in self.channels.items():
try:
await channel.stop()
logger.info(f"Stopped {name} channel")
except Exception as e:
logger.error(f"Error stopping {name}: {e}")
async def _dispatch_outbound(self) -> None:
"""Dispatch outbound messages to the appropriate channel."""
logger.info("Outbound dispatcher started")
while True:
try:
msg = await asyncio.wait_for(
self.bus.consume_outbound(),
timeout=1.0
)
channel = self.channels.get(msg.channel)
if channel:
try:
await channel.send(msg)
except Exception as e:
logger.error(f"Error sending to {msg.channel}: {e}")
else:
logger.warning(f"Unknown channel: {msg.channel}")
except asyncio.TimeoutError:
continue
except asyncio.CancelledError:
break
def get_channel(self, name: str) -> BaseChannel | None:
"""Get a channel by name."""
return self.channels.get(name)
def get_status(self) -> dict[str, Any]:
"""Get status of all channels."""
return {
name: {
"enabled": True,
"running": channel.is_running
}
for name, channel in self.channels.items()
}
@property
def enabled_channels(self) -> list[str]:
"""Get list of enabled channel names."""
return list(self.channels.keys())