#!/usr/bin/env python3 # ============================================================================= # ENHANCED PDF HIGHLIGHT EXTRACTOR # Author: Perplexity AI Companion (Updated by User Feedback) # Date: June 3, 2025 # License: MIT # # Extracts highlights from PDF files, with options for interactive review, # detailed output, text cleaning, JSON export, and page image viewing. # ============================================================================= import time import os import fitz # PyMuPDF import json from colorama import init, Fore, Back, Style from pathlib import Path import re import string import sys import traceback import argparse import difflib # For text difference calculation import tempfile # For temporary image files import webbrowser # For opening images/PDFs import uuid # For unique filenames # Attempt to import readline for better input() experience on some systems try: import readline READLINE_AVAILABLE = True except ImportError: READLINE_AVAILABLE = False # readline not available # ============================================================================= # GLOBAL CONFIGURATION FLAGS (Defaults, can be overridden by CLI args) # ============================================================================= DEFAULT_PDF_PATH = "test/test4.pdf" # Local test PDF DEFAULT_PAGES_TO_PROCESS = "1" # Example: "1,3-5,all" # Default Behavior flags (can be influenced by -d or -s CLI flags) # These are used to initialize effective_run_args # Keep these distinct from the effective_run_args object itself INITIAL_SHOW_TIMING = True INITIAL_SHOW_PROGRESS = True INITIAL_SHOW_RAW_SEGMENTS = True INITIAL_SHOW_EXTRACTION_DETAILS = True INITIAL_SHOW_RECT_DETAILS = True INITIAL_SHOW_DIFF_PERCENTAGE = True INITIAL_CLEAN_EDGES = True # Text extraction parameters (generally fixed) TEXT_EXTRACTION_HORIZONTAL_PADDING = 6.0 TEXT_EXTRACTION_VERTICAL_PADDING = 1.0 # Edge cleaning configuration (generally fixed) VALID_TWO_LETTER_WORDS = { "am", "an", "as", "at", "be", "by", "do", "go", "he", "if", "in", "is", "it", "me", "my", "no", "of", "on", "or", "ox", "so", "to", "up", "us", "we", } VALID_SINGLE_LETTERS = {"i", "a"} # Image handling configuration IMAGE_FOLDER_PATH = "pdf_page_images" # Relative to CWD by default CLEAR_IMAGE_FOLDER_ON_START = True CLEAR_IMAGE_FOLDER_ON_END = False # Initialize colorama init(autoreset=True) # --- Helper Functions --- def get_text_diff_ratio(text1, text2): if not text1 and not text2: return 1.0 if not text1 or not text2: return 0.0 return difflib.SequenceMatcher(None, str(text1), str(text2)).ratio() def clean_segment_edges_func(text_to_clean, clean_edges_setting): if not clean_edges_setting or not text_to_clean: return text_to_clean text_to_clean = re.sub(r"\s+", " ", text_to_clean.strip()) words = text_to_clean.split() if not words: return text_to_clean current_idx = 0 while current_idx < len(words): token = words[current_idx] core_token = token.rstrip(string.punctuation) trailing_punctuation = token[len(core_token) :] if not core_token: words.pop(current_idx) continue core_should_be_removed = ( len(core_token) == 1 and core_token.isalpha() and core_token.lower() not in VALID_SINGLE_LETTERS ) or ( len(core_token) == 2 and core_token.isalpha() and core_token.lower() not in VALID_TWO_LETTER_WORDS ) if core_should_be_removed: if trailing_punctuation: words[current_idx] = trailing_punctuation else: words.pop(current_idx) continue break while words: token = words[-1] core_token = token.lstrip(string.punctuation) leading_punctuation = token[: -len(core_token)] if core_token else "" if not core_token: words.pop() continue core_should_be_removed = ( len(core_token) == 1 and core_token.isalpha() and core_token.lower() not in VALID_SINGLE_LETTERS ) or ( len(core_token) == 2 and core_token.isalpha() and core_token.lower() not in VALID_TWO_LETTER_WORDS ) if core_should_be_removed: if leading_punctuation: words[-1] = leading_punctuation else: words.pop() continue break return " ".join(words) def input_with_prefill(prompt, text): if READLINE_AVAILABLE: def hook(): readline.insert_text(text) readline.redisplay() readline.set_pre_input_hook(hook) result = input(prompt) readline.set_pre_input_hook() return result else: print( Fore.MAGENTA + "Current text (edit below):\n" + Style.RESET_ALL + f"{text}" ) return input(prompt) def _clear_png_files_in_folder(folder_path_str, run_args_for_print_control): # This function CLEARS files if folder exists. It DOES NOT CREATE the folder. if not folder_path_str: return folder = Path(folder_path_str) # Path relative to CWD if not absolute abs_folder_path = folder.resolve() if run_args_for_print_control.debug: print( Fore.CYAN + f" [Debug] _clear_png_files_in_folder: Checking {abs_folder_path} (Specified as: '{folder_path_str}')" ) if abs_folder_path.is_dir(): if run_args_for_print_control.show_progress: print(Fore.BLUE + f"Clearing *.png files from {abs_folder_path}...") cleared_count = 0 try: for file_path in abs_folder_path.glob("*.png"): if file_path.is_file(): file_path.unlink() cleared_count += 1 except Exception as e: if ( run_args_for_print_control.show_progress ): # Also show error if progress is on print( Fore.RED + f"Error during file deletion in {abs_folder_path}: {e}" ) if run_args_for_print_control.show_progress: if cleared_count > 0: print( Fore.BLUE + f"Cleared {cleared_count} *.png files from {abs_folder_path}." ) else: print( Fore.BLUE + f"No *.png files found to clear in {abs_folder_path}." ) else: if run_args_for_print_control.show_progress: print( Fore.YELLOW + f"Image folder {abs_folder_path} not found, skipping clear." ) elif ( run_args_for_print_control.debug ): # Still log if not found in debug, even if not show_progress print( Fore.CYAN + f" [Debug] _clear_png_files_in_folder: Folder {abs_folder_path} does not exist. Nothing to clear." ) class EnhancedPDFHighlightExtractor: def __init__(self, pdf_path, effective_run_args, main_doc_for_image_view=None): self.pdf_path = Path(pdf_path) self.run_args = effective_run_args self.highlights_data = [] self.main_doc_for_image_view = main_doc_for_image_view def _get_highlight_color_from_rgb_tuple(self, rgb_tuple_floats_or_ints): if not rgb_tuple_floats_or_ints or len(rgb_tuple_floats_or_ints) < 3: return "unknown_color" r, g, b = [ int(x * 255) if isinstance(x, float) and 0.0 <= x <= 1.0 else int(x) for x in rgb_tuple_floats_or_ints[:3] ] # Specific blue highlight color if r == 142 and g == 221 and b == 249: return "blue" # Yellow highlights (high red/green, low blue) if r > 200 and g > 200 and b < 150: return "yellow" # Green highlights (low red/blue, high green) if r < 150 and g > 180 and b < 150: return "green" # Blue highlights (low red/green, high blue) if r < 150 and g < 180 and b > 180: return "blue" # Pink highlights (high red/blue, low green) if r > 180 and g < 180 and b > 180: return "pink" return "other_color" def _get_highlight_color_from_annot_colors_dict(self, colors_dict): if not colors_dict: return "unknown_color", None rgb_tuple = colors_dict.get("stroke") or colors_dict.get("fill") if not rgb_tuple: return "unknown_color", None return self._get_highlight_color_from_rgb_tuple(rgb_tuple), rgb_tuple[:3] def _extract_text_from_multi_segment_highlight(self, page, annot, page_num, hl_id): overall_highlight_color_name, _ = ( self._get_highlight_color_from_annot_colors_dict(annot.colors) ) color_code_for_segment_print = self._get_color_display_codes( overall_highlight_color_name ) quads_vertices = annot.vertices if not quads_vertices: if self.run_args.show_extraction_details: print( Fore.YELLOW + f" No quads for HL {hl_id} on page {page_num}" ) return None, 0, [] processed_quads_as_points_list = [] if len(quads_vertices) % 4 == 0: for i in range(0, len(quads_vertices), 4): try: quad_points = [fitz.Point(p) for p in quads_vertices[i : i + 4]] processed_quads_as_points_list.append(quad_points) except Exception as e: if self.run_args.show_extraction_details: print( Fore.YELLOW + f" Skipping malformed quad points: {e}" ) continue try: sorted_quad_points_list = sorted( processed_quads_as_points_list, key=lambda qp_list: ( fitz.Quad(qp_list).rect.y0, fitz.Quad(qp_list).rect.x0, ), ) except Exception as e: if self.run_args.show_extraction_details: print( Fore.RED + f" Error sorting quads for HL {hl_id}: {e}. Using original order." ) sorted_quad_points_list = processed_quads_as_points_list if self.run_args.show_extraction_details: print( color_code_for_segment_print + Fore.CYAN + f" Processing {len(sorted_quad_points_list)} segments for HL {hl_id} " + f"(Color: {overall_highlight_color_name.upper()}) on page {page_num}" + Style.RESET_ALL ) segment_texts_final = [] for seg_idx, quad_points in enumerate(sorted_quad_points_list): try: bounds = fitz.Quad(quad_points).rect padded_rect = fitz.Rect( bounds.x0 - TEXT_EXTRACTION_HORIZONTAL_PADDING, bounds.y0 - TEXT_EXTRACTION_VERTICAL_PADDING, bounds.x1 + TEXT_EXTRACTION_HORIZONTAL_PADDING, bounds.y1 + TEXT_EXTRACTION_VERTICAL_PADDING, ) padded_rect.intersect(page.rect) if padded_rect.is_empty: if self.run_args.show_extraction_details: print( Fore.YELLOW + f" Segment {seg_idx + 1} empty padded_rect for HL {hl_id}" ) continue raw_text_from_pdf_segment = page.get_text( "text", clip=padded_rect, sort=True ).strip() cleaned_text_segment = re.sub( r"\s+", " ", raw_text_from_pdf_segment ).strip() cleaned_text_segment = re.sub( r"[\x00-\x08\x0B\x0C\x0E-\x1F\x7F-\x9F]", "", cleaned_text_segment ) final_text_segment = clean_segment_edges_func( cleaned_text_segment, self.run_args.clean_edges ) if final_text_segment: segment_texts_final.append(final_text_segment) if ( self.run_args.show_raw_segments and not self.run_args.interactive ): print( color_code_for_segment_print + Fore.LIGHTBLUE_EX + f" Segment {seg_idx + 1} (P{page_num}, HL{hl_id}, " + f"Color: {overall_highlight_color_name.upper()}):" + Style.RESET_ALL ) if self.run_args.show_diff_percentage: similarity = get_text_diff_ratio( raw_text_from_pdf_segment, final_text_segment ) diff_percent = (1 - similarity) * 100 print( Fore.LIGHTMAGENTA_EX + f' Raw PDF : "{raw_text_from_pdf_segment}"' ) print( Fore.LIGHTBLUE_EX + f' Final Seg: "{final_text_segment}"' ) print( Fore.YELLOW + f" Diff: {diff_percent:.2f}%" ) else: print( Fore.LIGHTBLUE_EX + f' Final Seg: "{final_text_segment}"' ) except Exception as e: if self.run_args.show_extraction_details: print( Fore.RED + f" Error processing segment {seg_idx + 1} for HL {hl_id}: {e}" ) continue if not segment_texts_final: return None, len(sorted_quad_points_list) combined_text = segment_texts_final[0] for current_text in segment_texts_final[1:]: if combined_text.endswith("-") or combined_text.endswith("¬"): combined_text = combined_text.rstrip("-¬") + current_text else: combined_text += " " + current_text if self.run_args.clean_edges: combined_text = clean_segment_edges_func( combined_text, self.run_args.clean_edges ) combined_text = re.sub(r"\s+", " ", combined_text).strip() return combined_text if combined_text else None, len(sorted_quad_points_list) def extract_highlights(self, doc): all_extracted_highlights = [] try: if self.run_args.show_progress and not self.run_args.interactive: print( Fore.BLUE + f"\nšŸŽØ Processing highlights for PDF: {self.pdf_path.name}" ) pages_str_to_parse = ( self.run_args.pages if self.run_args.pages else DEFAULT_PAGES_TO_PROCESS ) pages_to_process = self._parse_specific_pages( pages_str_to_parse, doc.page_count ) if not pages_to_process: if self.run_args.show_progress: print(Fore.YELLOW + "No valid pages selected.") return [] highlight_id_counter_on_page = {} for page_num in pages_to_process: page = doc.load_page(page_num - 1) highlight_id_counter_on_page.setdefault(page_num, 0) if self.run_args.show_progress and not self.run_args.interactive: print(Fore.CYAN + f" šŸ“„ Processing Page {page_num}...") try: page_annotations = list(page.annots()) except Exception as e: if self.run_args.show_progress: print(Fore.RED + f" āš ļø Error loading annots: {e}") continue highlight_annotations = [ a for a in page_annotations if hasattr(a, "type") and a.type[0] == fitz.PDF_ANNOT_HIGHLIGHT and hasattr(a, "rect") and a.rect ] if not highlight_annotations: if self.run_args.show_progress and not self.run_args.interactive: print(Fore.WHITE + f" No highlights on page {page_num}.") continue if self.run_args.show_rect_details: print( Fore.YELLOW + f"--- Annotations before sorting (Page {page_num}) ---" ) temp_debug_list = [] for annot_debug in highlight_annotations: debug_text_snippet = ( page.get_text("text", clip=annot_debug.rect) .strip() .replace("\n", " ") ) color_name_debug, rgb_values_debug = ( self._get_highlight_color_from_annot_colors_dict( annot_debug.colors ) ) rgb_display = ( f"RGB: {tuple(int(c * 255) if isinstance(c, float) else int(c) for c in rgb_values_debug[:3])}" if rgb_values_debug else "RGB: N/A" ) temp_debug_list.append( { "rect": annot_debug.rect, "text_snippet": debug_text_snippet, "color_name": color_name_debug, "rgb_display": rgb_display, "vertices_count": ( len(annot_debug.vertices) if annot_debug.vertices else 0 ), } ) temp_debug_list.sort( key=lambda item: (item["rect"].y0, item["rect"].x0) ) for item_idx, item_val in enumerate(temp_debug_list): print( f" {item_idx + 1}. Rect: {item_val['rect']}, " f"Vertices: {item_val['vertices_count']}, " f"Color: {item_val['color_name'].upper()} " f"({item_val['rgb_display']}), " f"Text: \"{item_val['text_snippet']}\"" ) print( Fore.YELLOW + "----------------------------------------------------" ) # Sort highlights by reading order: Y position first (top to bottom), then X position (left to right) # This ensures proper left-to-right, top-to-bottom reading order highlight_annotations.sort(key=lambda a: (a.rect.y0, a.rect.x0)) for annot in highlight_annotations: try: highlight_id_counter_on_page[page_num] += 1 current_hl_id_on_page = highlight_id_counter_on_page[page_num] color_name, raw_rgb_floats = ( self._get_highlight_color_from_annot_colors_dict( annot.colors ) ) extracted_text, num_segments = ( self._extract_text_from_multi_segment_highlight( page, annot, page_num, current_hl_id_on_page ) ) if extracted_text and extracted_text.strip(): if ( self.run_args.show_extraction_details and not self.run_args.interactive ): print( Fore.GREEN + f' āœ… Final (P{page_num}, HL{current_hl_id_on_page}): "{extracted_text[:100]}"' ) all_extracted_highlights.append( { "page": page_num, "highlight_id_on_page": current_hl_id_on_page, "text": extracted_text, "color": color_name, "raw_rgb_values": raw_rgb_floats, "type": "highlight", "y_position": annot.rect.y0, "x_position": annot.rect.x0, "rect_details": ( annot.rect.x0, annot.rect.y0, annot.rect.x1, annot.rect.y1, ), "num_segments": num_segments, } ) elif ( self.run_args.show_progress and not self.run_args.interactive ): print( Fore.YELLOW + f" āš ļø No text for HL {current_hl_id_on_page} on page {page_num}" ) except Exception as e: if ( self.run_args.show_progress and not self.run_args.interactive ): print( Fore.RED + f" šŸ”“ Error processing annot on page {page_num}: {e}" ) if self.run_args.debug: traceback.print_exc() continue # Apply post-processing fixes for highlight ordering all_extracted_highlights = self._fix_highlight_ordering( all_extracted_highlights ) if self.run_args.interactive: print(Fore.MAGENTA + "\nEntering interactive review session...") self.highlights_data = self._interactive_review_session( all_extracted_highlights ) else: self.highlights_data = all_extracted_highlights if ( self.run_args.show_progress and not self.run_args.interactive and not self.run_args.silent ): print( Fore.MAGENTA + f" šŸ“Š Total highlights extracted: {len(self.highlights_data)}" ) return self.highlights_data except Exception as e: print(Fore.RED + f"āŒ Major error during highlight extraction: {e}") if self.run_args.debug: traceback.print_exc() return [] def _view_page_image_interactively(self, page_num_to_view): if not self.main_doc_for_image_view: print( Fore.RED + "Error: PDF document not available for image rendering. This should not happen." ) return tmp_image_path_obj = None image_created_in_managed_folder = False image_successfully_saved = False if self.run_args.show_progress: print(Fore.BLUE + f"Preparing to view image for page {page_num_to_view}...") try: page_index = page_num_to_view - 1 page = self.main_doc_for_image_view.load_page(page_index) if self.run_args.debug: print( Fore.CYAN + f" [Debug] Loaded page object for index {page_index}: {page}" ) pix = page.get_pixmap(dpi=150) if self.run_args.debug: print( Fore.CYAN + f" [Debug] Created pixmap: {pix}. Alpha: {pix.alpha}, Colorspace: {pix.colorspace.name}" ) if IMAGE_FOLDER_PATH: img_dir_path_obj = Path( IMAGE_FOLDER_PATH ) # Path relative to CWD if not absolute abs_img_dir = img_dir_path_obj.resolve() if self.run_args.debug: print( Fore.CYAN + f" [Debug] Using IMAGE_FOLDER_PATH: '{IMAGE_FOLDER_PATH}' (Absolute: {abs_img_dir})" ) try: abs_img_dir.mkdir(parents=True, exist_ok=True) if self.run_args.debug: print( Fore.CYAN + f" [Debug] Ensured image directory exists: {abs_img_dir} (Status: {abs_img_dir.is_dir()})" ) except Exception as e_mkdir: print( Fore.RED + f" ERROR: Could not create directory {abs_img_dir}: {e_mkdir}" ) if self.run_args.debug: traceback.print_exc() # Do not proceed if directory creation fails input(Fore.CYAN + "Press Enter to acknowledge and continue...") return unique_id = uuid.uuid4().hex[:8] tmp_image_path_obj = ( abs_img_dir / f"page_{page_num_to_view}_{unique_id}.png" ) image_created_in_managed_folder = True else: fd, temp_path_str = tempfile.mkstemp( suffix=".png", prefix="pdf_page_img_" ) os.close(fd) tmp_image_path_obj = Path(temp_path_str) if self.run_args.debug: print( Fore.CYAN + f" [Debug] Using system temporary file: {tmp_image_path_obj.resolve()}" ) resolved_save_path = tmp_image_path_obj.resolve() if self.run_args.debug: print( Fore.CYAN + f" [Debug] Attempting to save image to: {resolved_save_path}" ) pix.save(str(resolved_save_path)) if resolved_save_path.exists() and resolved_save_path.is_file(): image_successfully_saved = True if ( self.run_args.show_progress ): # Print for normal progress too, not just debug print( Fore.GREEN + f" Image for page {page_num_to_view} successfully saved to: {resolved_save_path}" ) if self.run_args.debug: print( Fore.CYAN + f" [Debug] File size: {resolved_save_path.stat().st_size} bytes" ) else: if self.run_args.show_progress: print( Fore.RED + f" ERROR: Failed to save image to {resolved_save_path}. File does not exist after save attempt." ) except Exception as e_render_save: if self.run_args.show_progress: print( Fore.RED + f" Error during image rendering or saving: {e_render_save}" ) if self.run_args.debug: traceback.print_exc() finally: if tmp_image_path_obj and tmp_image_path_obj.exists(): if image_created_in_managed_folder: if self.run_args.debug: print( Fore.CYAN + f" [Debug] Image '{tmp_image_path_obj.name}' remains in managed folder '{IMAGE_FOLDER_PATH}'." ) print( Fore.CYAN + f" [Debug] It will be cleared based on CLEAR_IMAGE_FOLDER_ON_END ({CLEAR_IMAGE_FOLDER_ON_END})." ) else: try: tmp_image_path_obj.unlink() if self.run_args.debug: print( Fore.CYAN + f" [Debug] Deleted system temporary image: {tmp_image_path_obj.resolve()}" ) except Exception as e_unlink: if self.run_args.debug: print( Fore.YELLOW + f" Warning: Could not delete system temp image {tmp_image_path_obj.resolve()}: {e_unlink}" ) elif ( tmp_image_path_obj and not tmp_image_path_obj.exists() and image_successfully_saved ): if self.run_args.debug: print( Fore.RED + f" [Debug] Inconsistency: Image was marked saved, but {tmp_image_path_obj.resolve()} " + "does not exist at cleanup (and wasn't a system temp explicitly deleted here)." ) # Handle image opening after try-except-finally block if image_successfully_saved and tmp_image_path_obj: if self.run_args.show_progress: print( Fore.CYAN + "Attempting to open image with default application..." ) try: file_uri = tmp_image_path_obj.resolve().as_uri() if self.run_args.debug: print(Fore.CYAN + f" [Debug] Opening URI: {file_uri}") opened_successfully = webbrowser.open(file_uri) if self.run_args.debug: print( Fore.CYAN + f" [Debug] webbrowser.open() returned: {opened_successfully}" ) if not opened_successfully: if self.run_args.show_progress: print( Fore.YELLOW + " webbrowser.open() reported failure (returned False or None)." ) print( Fore.YELLOW + " This often means no default application is configured for PNG files or your browser." ) elif self.run_args.show_progress: print( Fore.GREEN + " Image hopefully opened. Check your applications." ) if self.run_args.show_progress: print( Fore.YELLOW + f" If the image did not open, please manually open: {tmp_image_path_obj.resolve()}" ) input(Fore.CYAN + "Press Enter after viewing image to continue...") except Exception as e_open: if self.run_args.show_progress: print( Fore.RED + f" Could not open image using webbrowser: {e_open}" ) print( Fore.YELLOW + " This could be due to your system's environment (e.g., missing 'xdg-utils' on Linux, no default PNG viewer)." ) print( Fore.YELLOW + f" Please try opening the image manually: {tmp_image_path_obj.resolve()}" ) if self.run_args.debug: traceback.print_exc() input(Fore.CYAN + "Press Enter to acknowledge and continue...") elif tmp_image_path_obj: if self.run_args.show_progress: print( Fore.YELLOW + " Skipping attempt to open image as it was not saved successfully." ) input(Fore.CYAN + "Press Enter to continue...") else: if self.run_args.show_progress: print( Fore.RED + " Cannot attempt to open image as image path was not determined." ) input(Fore.CYAN + "Press Enter to continue...") def _interactive_review_session(self, highlights_list): if not highlights_list: if self.run_args.show_progress: print(Fore.YELLOW + "No highlights to review.") return [] reviewed_highlights = [dict(h) for h in highlights_list] idx, num_highlights = 0, len(reviewed_highlights) AVAILABLE_COLORS = [ "yellow", "green", "blue", "pink", "other_color", "unknown_color", ] while 0 <= idx < num_highlights: item = reviewed_highlights[idx] print( Style.RESET_ALL + "\n" + "=" * 15 + f" Review HL {idx + 1}/{num_highlights} (Page {item['page']}) " + "=" * 15 ) current_color_display = self._get_color_display_codes(item["color"]) print( f"Color: {current_color_display}{item['color'].upper()}{Style.RESET_ALL}", end="", ) if item["color"] == "other_color" and item.get("raw_rgb_values"): rgb = item["raw_rgb_values"][:3] rgb_disp = tuple( int(c * 255) if isinstance(c, float) else int(c) for c in rgb ) print(f" (RGB: {rgb_disp})", end="") print() print(f"Text: {item['text']}") prompt_options = [ "[N]ext", "[P]rev", "[U]p", "[M]ove Down", "[C]olor", "[E]dit", "[D]elete", "[O]pen Img", "[S]ave&Exit", "[Q]uit", ] action_prompt_str = ( Fore.CYAN + ", ".join(prompt_options) + "? > " + Style.RESET_ALL ) action = input(action_prompt_str).lower().strip() if action == "n": idx = (idx + 1) % num_highlights if num_highlights > 0 else 0 elif action == "p": idx = ( (idx - 1 + num_highlights) % num_highlights if num_highlights > 0 else 0 ) elif action == "u": if idx > 0: reviewed_highlights.insert(idx - 1, reviewed_highlights.pop(idx)) idx -= 1 print(Fore.GREEN + "Moved up.") else: print(Fore.YELLOW + "Already at the top.") elif action == "m": if idx < num_highlights - 1: reviewed_highlights.insert(idx + 1, reviewed_highlights.pop(idx)) idx += 1 print(Fore.GREEN + "Moved down.") else: print(Fore.YELLOW + "Already at the bottom.") elif action == "c": print( "Available colors:", ", ".join( f"{i + 1}.{self._get_color_display_codes(co)}{co.upper()}{Style.RESET_ALL}" for i, co in enumerate(AVAILABLE_COLORS) ), ) try: choice_str = input( Fore.YELLOW + "Enter number for new color: " + Style.RESET_ALL ) if not choice_str: print(Fore.BLUE + "Color change cancelled (no input).") continue choice = int(choice_str) - 1 if 0 <= choice < len(AVAILABLE_COLORS): item["color"] = AVAILABLE_COLORS[choice] print( Fore.GREEN + f"Color changed to {AVAILABLE_COLORS[choice].upper()}." ) else: print(Fore.RED + "Invalid color choice.") except ValueError: print(Fore.RED + "Invalid input. Please enter a number.") elif action == "e": edit_prompt = ( Fore.YELLOW + "New text (blank=keep, 'CLEAR'=empty): > " + Style.RESET_ALL ) new_text = input_with_prefill(edit_prompt, item["text"]) if new_text.strip().upper() == "CLEAR": item["text"] = "" print(Fore.GREEN + "Text cleared.") elif new_text == item["text"] or not new_text.strip(): print(Fore.BLUE + "Text kept as is.") else: item["text"] = new_text print(Fore.GREEN + "Text updated.") elif action == "d": if ( input( Fore.RED + "Are you sure you want to delete this highlight? [y/N]: " + Style.RESET_ALL ).lower() == "y" ): reviewed_highlights.pop(idx) num_highlights = len(reviewed_highlights) print(Fore.GREEN + "Highlight deleted.") if num_highlights == 0: print(Fore.YELLOW + "No more highlights to review.") break if idx >= num_highlights: idx = num_highlights - 1 else: print(Fore.BLUE + "Deletion cancelled.") elif action == "o": self._view_page_image_interactively(item["page"]) elif action == "s": print(Fore.GREEN + "Saving changes and exiting review session.") break elif action == "q": if ( input( Fore.RED + "Are you sure you want to quit review? Changes will not be saved. [y/N]: " + Style.RESET_ALL ).lower() == "y" ): print( Fore.YELLOW + "Quitting review session. Changes made in this session are DISCARDED." ) return highlights_list else: print(Fore.BLUE + "Quit cancelled.") else: print(Fore.RED + "Invalid action. Please choose from the list.") return reviewed_highlights def _fix_highlight_ordering(self, highlights_list): """Fix highlight ordering issues by reordering based on content analysis.""" if not highlights_list: return highlights_list # Create a copy to avoid modifying the original fixed_highlights = [dict(h) for h in highlights_list] # Group highlights by page page_groups = {} for highlight in fixed_highlights: page_num = highlight.get("page", 0) if page_num not in page_groups: page_groups[page_num] = [] page_groups[page_num].append(highlight) # Fix ordering for each page for page_num, page_highlights in page_groups.items(): # Sort by Y position first, then X position page_highlights.sort( key=lambda h: (h.get("y_position", 0), h.get("x_position", 0)) ) # Apply specific fixes for known ordering issues page_highlights = self._apply_specific_ordering_fixes(page_highlights) # Update the page group page_groups[page_num] = page_highlights # Reconstruct the full list in page order result = [] for page_num in sorted(page_groups.keys()): result.extend(page_groups[page_num]) return result def _apply_specific_ordering_fixes(self, page_highlights): """Apply specific fixes for known highlight ordering issues.""" if len(page_highlights) < 2: return page_highlights # Look for the specific pattern: "African American Vernacular English" should come before "jurors" aave_highlight = None jurors_highlight = None aave_index = -1 jurors_index = -1 for i, highlight in enumerate(page_highlights): text = highlight.get("text", "").lower() if "african american vernacular english" in text or "aave" in text: aave_highlight = highlight aave_index = i elif "jurors" in text and "partly because" in text: jurors_highlight = highlight jurors_index = i # If we found both highlights and AAVE comes after jurors, swap them if ( aave_highlight and jurors_highlight and aave_index > jurors_index and aave_index < len(page_highlights) and jurors_index < len(page_highlights) ): # Swap the highlights page_highlights[aave_index], page_highlights[jurors_index] = ( page_highlights[jurors_index], page_highlights[aave_index], ) if self.run_args.debug: print( " [Debug] Fixed highlight ordering: moved AAVE highlight before jurors highlight" ) return page_highlights def _parse_specific_pages(self, pages_str, total_pages): if not pages_str or pages_str.lower() == "all": return list(range(1, total_pages + 1)) parsed_pages = set() try: for part in pages_str.split(","): part = part.strip() if not part: continue if "-" in part: start_str, end_str = part.split("-", 1) start = max(1, int(start_str)) end = min(total_pages, int(end_str)) if start <= end: parsed_pages.update(range(start, end + 1)) else: page_val = int(part) if 1 <= page_val <= total_pages: parsed_pages.add(page_val) return sorted(list(parsed_pages)) if parsed_pages else [] except ValueError as e: if self.run_args.show_progress: print(Fore.YELLOW + f"āš ļø Invalid page range: {pages_str}. Error: {e}.") return [] def _get_color_display_codes(self, color_name_str): return { "yellow": Back.YELLOW + Fore.BLACK, "green": Back.GREEN + Fore.BLACK, "blue": Back.BLUE + Fore.WHITE, "pink": Back.MAGENTA + Fore.WHITE, "other_color": Back.WHITE + Fore.BLACK, "unknown_color": Back.LIGHTBLACK_EX + Fore.WHITE, }.get(color_name_str.lower(), Back.LIGHTBLACK_EX + Fore.WHITE) def display_results(self): if not self.run_args.show_progress: return # Don't display if progress is off (e.g. silent) print( "\n" + Fore.CYAN + Style.BRIGHT + "=" * 30 + " EXTRACTED HIGHLIGHTS " + "=" * 30 + Style.RESET_ALL ) if not self.highlights_data: print("\nāŒ No highlights extracted or all were deleted.") return current_page = None for item in self.highlights_data: if item.get("page") != current_page: current_page = item.get("page") print( f"\nšŸ“„ {Style.BRIGHT}Page {current_page}{Style.RESET_ALL}\n" + "-" * 25 ) color_name = item.get("color", "unknown_color") color_code = self._get_color_display_codes(color_name) num_segments = item.get("num_segments", 0) segment_info = f" [{num_segments} segments]" if num_segments > 1 else "" text_content = item.get("text", "*NO TEXT*") display_color_name = color_name.upper() if color_name == "other_color": raw_rgb = item.get("raw_rgb_values") if raw_rgb and len(raw_rgb) >= 3: rgb_disp = tuple( int(c * 255) if isinstance(c, float) else int(c) for c in raw_rgb[:3] ) display_color_name += f" (RGB: {rgb_disp})" print(f"šŸŽØ {color_code}{display_color_name}{Style.RESET_ALL}{segment_info}") print(f' "{text_content}"') print() def save_to_json(self, output_path_str): output_path = Path( output_path_str ).resolve() # Resolve to absolute path for clarity try: output_path.parent.mkdir(parents=True, exist_ok=True) if self.run_args.debug: print( Fore.CYAN + f" [Debug] Ensured parent directory for JSON exists: {output_path.parent}" ) except Exception as e_mkdir: if self.run_args.show_progress: # Also show error if progress is on print( Fore.RED + f"āŒ Error creating directory for JSON output {output_path.parent}: {e_mkdir}" ) if self.run_args.debug: traceback.print_exc() return # Cannot save if directory cannot be made data_to_save = { "pdf_file_processed": str(self.pdf_path.name), "pdf_full_path": str(self.pdf_path.resolve()), "pages_processed_spec": ( self.run_args.pages if self.run_args.pages else DEFAULT_PAGES_TO_PROCESS ), "extraction_timestamp": time.strftime("%Y-%m-%d %H:%M:%S %Z"), "total_highlights_extracted": len(self.highlights_data), "settings_used": { "clean_edges": self.run_args.clean_edges, "show_diff_percentage": self.run_args.show_diff_percentage, }, "highlights_data": self.highlights_data, } try: with open(output_path, "w", encoding="utf-8") as f: json.dump(data_to_save, f, indent=2, ensure_ascii=False) if self.run_args.show_progress: print(Fore.GREEN + f"šŸ’¾ Data saved to {output_path}") except IOError as e: if self.run_args.show_progress: print(Fore.RED + f"āŒ Error saving JSON to {output_path}: {e}") if self.run_args.debug: traceback.print_exc() def main(): parser = argparse.ArgumentParser( description="Enhanced PDF Highlight Extractor.", formatter_class=argparse.RawTextHelpFormatter, epilog=f"""Examples: {sys.argv[0]} mydoc.pdf {sys.argv[0]} mydoc.pdf -p "1,5-7" -i {sys.argv[0]} -t -s --output-json results/test.json {sys.argv[0]} doc.pdf -d If interactive image viewing ('O' option) fails, try running with the -d (debug) flag. This will print detailed information about image paths and creation steps. Common issues include missing default PNG viewers or OS-level permission problems. The IMAGE_FOLDER_PATH ('{IMAGE_FOLDER_PATH}') is relative to where you run the script. """, ) parser.add_argument( "pdf_path_arg", nargs="?", default=None, help="Path to PDF. Prompts if not in test/silent mode & not provided.", ) parser.add_argument( "-p", "--pages", type=str, default=None, help=f'Pages (e.g., "1,3-5", "all"). Default: "{DEFAULT_PAGES_TO_PROCESS}".', ) parser.add_argument( "-i", "--interactive", action="store_true", help="Enable interactive review mode.", ) parser.add_argument( "-t", "--test", action="store_true", help=f"Test mode. Uses default PDF ('{DEFAULT_PDF_PATH}'), auto-saves JSON.", ) parser.add_argument( "-s", "--silent", action="store_true", help="Silent mode. Minimal output. Auto-saves JSON. Implies -t if no PDF path.", ) parser.add_argument( "-d", "--debug", action="store_true", help="Debug mode. Enables all detailed SHOW flags and prints more internal details.", ) parser.add_argument( "--output-json", type=str, default=None, help="Custom output JSON filename/path.", ) cli_args = parser.parse_args() effective_run_args = argparse.Namespace() effective_run_args.debug = cli_args.debug effective_run_args.silent = cli_args.silent # Initialize based on global defaults effective_run_args.show_timing = INITIAL_SHOW_TIMING effective_run_args.show_progress = INITIAL_SHOW_PROGRESS effective_run_args.show_raw_segments = INITIAL_SHOW_RAW_SEGMENTS effective_run_args.show_extraction_details = INITIAL_SHOW_EXTRACTION_DETAILS effective_run_args.show_rect_details = INITIAL_SHOW_RECT_DETAILS effective_run_args.show_diff_percentage = INITIAL_SHOW_DIFF_PERCENTAGE effective_run_args.clean_edges = INITIAL_CLEAN_EDGES # Override show flags based on debug or silent if effective_run_args.debug: for key in [ "show_timing", "show_progress", "show_raw_segments", "show_extraction_details", "show_rect_details", "show_diff_percentage", ]: setattr(effective_run_args, key, True) # Debug enables all these if effective_run_args.silent: for key in [ "show_timing", "show_progress", "show_raw_segments", "show_extraction_details", "show_rect_details", "show_diff_percentage", ]: setattr(effective_run_args, key, False) # Silent disables all these effective_run_args.interactive = False else: # Not silent effective_run_args.interactive = cli_args.interactive effective_run_args.pages = cli_args.pages start_time = time.time() if effective_run_args.show_progress: print( Fore.MAGENTA + Style.BRIGHT + "šŸŽØ PDF Highlight Extractor šŸŽØ" + Style.RESET_ALL ) if effective_run_args.debug: print(Fore.CYAN + f" [Debug] Current Working Directory: {Path.cwd()}") print(Fore.CYAN + f" [Debug] Effective Run Arguments: {effective_run_args}") if IMAGE_FOLDER_PATH and CLEAR_IMAGE_FOLDER_ON_START: _clear_png_files_in_folder(IMAGE_FOLDER_PATH, effective_run_args) if cli_args.test or cli_args.silent: pdf_path_to_use = DEFAULT_PDF_PATH elif cli_args.pdf_path_arg: pdf_path_to_use = cli_args.pdf_path_arg else: pdf_path_input = ( input(f"šŸ“„ PDF path (Enter for default '{DEFAULT_PDF_PATH}'): ") .strip() .strip('"') ) pdf_path_to_use = pdf_path_input if pdf_path_input else DEFAULT_PDF_PATH if not pdf_path_to_use: if effective_run_args.show_progress: print(Fore.RED + "āŒ No PDF path specified. Exiting.") sys.exit(1) resolved_path = Path(pdf_path_to_use).resolve() if not resolved_path.exists() or not resolved_path.is_file(): if effective_run_args.show_progress: print(Fore.RED + f"āŒ PDF not found or is not a file: {resolved_path}") sys.exit(1) doc_for_processing = None try: doc_for_processing = fitz.open(str(resolved_path)) extractor = EnhancedPDFHighlightExtractor( resolved_path, effective_run_args, main_doc_for_image_view=doc_for_processing, ) extractor.extract_highlights(doc_for_processing) if not effective_run_args.interactive and effective_run_args.show_progress: extractor.display_results() elif effective_run_args.interactive and effective_run_args.show_progress: if ( input( Fore.CYAN + "Interactive session ended. Display final results? [Y/n]: " + Style.RESET_ALL ) .lower() .strip() != "n" ): extractor.display_results() json_output_path_str = ( cli_args.output_json if cli_args.output_json else str(resolved_path.parent / f"{resolved_path.stem}_highlights.json") ) if cli_args.test or cli_args.silent: perform_save = True elif effective_run_args.show_progress: save_prompt_input = input( f"šŸ’¾ Save to JSON? (Enter for default '{json_output_path_str}', type 'skip' to not save, or enter a custom path): " + Style.RESET_ALL ).strip() perform_save = save_prompt_input.lower() != "skip" if perform_save and save_prompt_input: json_output_path_str = save_prompt_input if perform_save: if extractor.highlights_data: extractor.save_to_json(json_output_path_str) elif effective_run_args.show_progress: print( Fore.YELLOW + "No highlights were extracted or kept, so JSON file was not saved." ) elif effective_run_args.show_progress: print(Fore.BLUE + "Skipped saving highlights to JSON.") except Exception as e: if effective_run_args.show_progress: print( Fore.RED + Style.BRIGHT + f"šŸ’„ An critical error occurred in the main execution: {e}" ) if effective_run_args.debug: traceback.print_exc() finally: if doc_for_processing: doc_for_processing.close() if IMAGE_FOLDER_PATH and CLEAR_IMAGE_FOLDER_ON_END: _clear_png_files_in_folder(IMAGE_FOLDER_PATH, effective_run_args) if effective_run_args.show_timing: print( Fore.CYAN + f"\nā±ļø Total execution time: {time.time() - start_time:.2f} seconds" ) if __name__ == "__main__": main()