diff --git a/nanobot/channels/feishu.py b/nanobot/channels/feishu.py index 23d1415..9017b40 100644 --- a/nanobot/channels/feishu.py +++ b/nanobot/channels/feishu.py @@ -166,6 +166,10 @@ class FeishuChannel(BaseChannel): re.MULTILINE, ) + _HEADING_RE = re.compile(r"^(#{1,6})\s+(.+)$", re.MULTILINE) + + _CODE_BLOCK_RE = re.compile(r"(```[\s\S]*?```)", re.MULTILINE) + @staticmethod def _parse_md_table(table_text: str) -> dict | None: """Parse a markdown table into a Feishu table element.""" @@ -185,17 +189,52 @@ class FeishuChannel(BaseChannel): } def _build_card_elements(self, content: str) -> list[dict]: - """Split content into markdown + table elements for Feishu card.""" + """Split content into div/markdown + table elements for Feishu card.""" elements, last_end = [], 0 for m in self._TABLE_RE.finditer(content): - before = content[last_end:m.start()].strip() - if before: - elements.append({"tag": "markdown", "content": before}) + before = content[last_end:m.start()] + if before.strip(): + elements.extend(self._split_headings(before)) elements.append(self._parse_md_table(m.group(1)) or {"tag": "markdown", "content": m.group(1)}) last_end = m.end() - remaining = content[last_end:].strip() + remaining = content[last_end:] + if remaining.strip(): + elements.extend(self._split_headings(remaining)) + return elements or [{"tag": "markdown", "content": content}] + + def _split_headings(self, content: str) -> list[dict]: + """Split content by headings, converting headings to div elements.""" + protected = content + code_blocks = [] + for m in self._CODE_BLOCK_RE.finditer(content): + code_blocks.append(m.group(1)) + protected = protected.replace(m.group(1), f"\x00CODE{len(code_blocks)-1}\x00", 1) + + elements = [] + last_end = 0 + for m in self._HEADING_RE.finditer(protected): + before = protected[last_end:m.start()].strip() + if before: + elements.append({"tag": "markdown", "content": before}) + level = len(m.group(1)) + text = m.group(2).strip() + elements.append({ + "tag": "div", + "text": { + "tag": "lark_md", + "content": f"**{text}**", + }, + }) + last_end = m.end() + remaining = protected[last_end:].strip() if remaining: elements.append({"tag": "markdown", "content": remaining}) + + for i, cb in enumerate(code_blocks): + for el in elements: + if el.get("tag") == "markdown": + el["content"] = el["content"].replace(f"\x00CODE{i}\x00", cb) + return elements or [{"tag": "markdown", "content": content}] async def send(self, msg: OutboundMessage) -> None: