import sys import json import threading import re import queue import subprocess import tkinter as tk from pathlib import Path from PIL import Image, ImageDraw, ImageFont, ImageTk print("o to open pdf, O original pdf, e to emacs part, click for coordinates") # --- Configuration & Globals --- padding = 60 # Queue payload: (pil_image, json_path, metadata) # metadata is a dict: {'copie': str, 'part': int, 'schema': dict} image_queue = queue.Queue(maxsize=5) try: font = ImageFont.truetype("DejaVuSans.ttf", size=30) except OSError: font = ImageFont.load_default() # --- Helper Functions (Shared) --- def page_number(b, nb_pages): column_width = 1000 // nb_pages center_x = (b[1] + b[3]) // 2 return center_x // column_width def convert_box2d(b, pn_ori, npn, tot_ori, tot_dest): l = b.copy() l[1] = (l[1] - (1000 // tot_ori) * (pn_ori-1)) * tot_ori // tot_dest\ + (1000 // tot_dest) * (npn - 1) l[3] = (l[3] - (1000 // tot_ori) * (pn_ori-1)) * tot_ori // tot_dest\ + (1000 // tot_dest) * (npn - 1) return l def convert_list(l, group_id, json_schema): ll = [] nb_pages = json_schema["columns_per_file"][group_id-1] nb_previous_pages = sum([json_schema["columns_per_file"][i] for i in range(group_id-1)]) nb_tot_pages = sum([e for e in json_schema["columns_per_file"]]) for e in l: ee = e.copy() pn = page_number(e["box_2d"], nb_pages) npn = pn + nb_previous_pages ee["box_2d"] = convert_box2d(ee["box_2d"], pn, npn, nb_pages, nb_tot_pages) ee["part"] = group_id ee["pn"] = npn ll.append(ee) return ll def prepare_image(image_path: str, bounding_boxes, all_labels, nb_pages): im = Image.open(image_path) im.load() width, height = im.size new_im = Image.new(im.mode, (width + padding, height), "white") new_im.paste(im, (0, 0)) draw = ImageDraw.Draw(new_im) bounding_boxes.sort(key=lambda b: (page_number(b["box_2d"], nb_pages), b["box_2d"][0])) last_label_index = -1 for bbox in bounding_boxes: raw_y_min = int(bbox["box_2d"][0] * height / 1000) raw_x_min = int(bbox["box_2d"][1] * width / 1000) raw_y_max = int(bbox["box_2d"][2] * height / 1000) raw_x_max = int(bbox["box_2d"][3] * width / 1000) abs_y_min = max(0, raw_y_min - 10) abs_x_min = max(0, raw_x_min - 10) abs_y_max = min(height, raw_y_max + 10) abs_x_max = min(width, raw_x_max + 10) color = "black" label = bbox.get("label") if label and label in all_labels: current_index = all_labels.index(label) if current_index < last_label_index: color = "red" last_label_index = current_index draw.rectangle(((abs_x_min, abs_y_min), (abs_x_max, abs_y_max)), outline=color, width=4) if label: if abs_y_min > 80: draw.text((abs_x_min + 8, abs_y_min - 30), label, fill=color, font=font) else: draw.text((abs_x_min + 8, abs_y_max + 6), label, fill=color, font=font) return new_im # --- Processing Logic (Worker Thread) --- def worker_thread(base_dir, files_to_process, all_labels): """ Iterates through files, prepares VISUALS only, and puts metadata in queue. Does NOT write final JSON files anymore. """ for img_path in files_to_process: json_path = base_dir / f"{img_path.stem}.json" copie_part = int(img_path.stem[-2:]) copie = img_path.stem[:-3] json_schema_path = base_dir / 'Cutleft' / f"{copie}_schema.json" try: with open(json_schema_path, 'r') as f: json_schema = json.load(f) except: print("No json_schema : ", json_schema_path) continue nb_pages = json_schema["columns_per_file"][copie_part-1] if json_path.exists(): try: # Read strictly for visualization purposes with open(json_path, 'r') as f: json_result = json.load(f) bb_list = json_result.get("list", []) print(f"Buffering {img_path.name}...") pil_image = prepare_image(str(img_path), bb_list, all_labels, nb_pages) # Package metadata needed for final calculation later metadata = { "copie": copie, "part": copie_part, "schema": json_schema, "name": json_result.get("name", "") } image_queue.put((pil_image, json_path, metadata)) except Exception as e: print(f"Error processing {img_path.name}: {e}") # Sentinel to indicate finished image_queue.put((None, None, None)) # --- GUI Logic (Main Thread) --- class ImageViewer: def __init__(self, root, base_dir): self.root = root self.base_dir = base_dir self.root.title("Bounding Box Viewer") self.label = tk.Label(root, text="Waiting for images...") self.label.pack(expand=True, fill="both") # Display State self.current_image = None self.current_json_path = None self.current_meta = None # Stores schema/copie info self.is_viewing = False self.scale_factor = 1.0 self.orig_size = (1, 1) # Data Aggregation State self.active_copie_name = None self.accumulated_results = None # Dict with "name" and "list" # Bindings self.root.bind('', self.on_enter) self.root.bind('e', self.on_edit) self.root.bind('o', self.on_open_pdf) self.root.bind('O', self.on_open_ori_pdf) self.root.bind('', lambda e: self.root.quit()) self.label.bind('', self.on_click) self.poll_queue() def poll_queue(self): if not self.is_viewing: try: pil_image, json_path, metadata = image_queue.get_nowait() # Handle End of Stream if pil_image is None: self.save_current_batch() # Save any remaining data print("All images processed.") self.root.quit() return # Check if we switched to a new "Copie" group if self.active_copie_name != metadata["copie"]: self.save_current_batch() # Write previous group to disk # Start new batch self.active_copie_name = metadata["copie"] self.accumulated_results = {"name": metadata["name"], "list": []} self.display_image(pil_image, json_path, metadata) except queue.Empty: pass self.root.after(100, self.poll_queue) def save_current_batch(self): """Writes the accumulated data to the main JSON file.""" if self.active_copie_name and self.accumulated_results: main_json_path = self.base_dir / f"{self.active_copie_name}.json" print(f"Writing aggregated result to {main_json_path}") with open(main_json_path, 'w') as f: json.dump(self.accumulated_results, f) self.accumulated_results = None def display_image(self, pil_image, json_path, metadata): self.orig_size = pil_image.size self.scale_factor = 1.0 screen_h = self.root.winfo_screenheight() - 100 if pil_image.height > screen_h: self.scale_factor = screen_h / pil_image.height pil_image = pil_image.resize((int(pil_image.width * self.scale_factor), int(pil_image.height * self.scale_factor))) self.tk_image = ImageTk.PhotoImage(pil_image) self.label.config(image=self.tk_image, text=f"Processing: {json_path.name}") self.current_json_path = json_path self.current_meta = metadata self.is_viewing = True self.root.lift() def on_enter(self, event): if self.is_viewing: print(f"Committing data for {self.current_json_path.name}...") # --- CRITICAL CHANGE: Re-read JSON here to capture user edits --- try: with open(self.current_json_path, 'r') as f: current_data = json.load(f) # Perform the conversion now, post-edit converted_items = convert_list( current_data["list"], self.current_meta["part"], self.current_meta["schema"] ) # Add to accumulator if self.accumulated_results: self.accumulated_results["list"].extend(converted_items) # Update name just in case (though usually consistent per group) if "name" in current_data and current_data["name"] != "Continued": self.accumulated_results["name"] = current_data["name"] except Exception as e: print(f"Error re-reading/saving {self.current_json_path}: {e}") # Advance UI self.is_viewing = False self.label.config(image="", text="Loading next...") def on_open_pdf(self, event): if self.is_viewing and self.current_json_path: a = self.current_json_path.stem.split('_')[0] + ".pdf" pdf_path = self.current_json_path.with_name(a) print(f"Opening {pdf_path}") subprocess.Popen(['xdg-open', str(pdf_path.absolute())]) def on_open_ori_pdf(self, event): if self.is_viewing and self.current_json_path: new_filename = self.current_json_path.stem.split('_')[0] + ".pdf" pdf_path = self.current_json_path.parent / "Copies Originales" / new_filename print(f"Opening {pdf_path}") subprocess.Popen(['xdg-open', str(pdf_path.absolute())]) def on_edit(self, event): if self.is_viewing and self.current_json_path: print(f"Opening {self.current_json_path}") subprocess.Popen(['xdg-open', str(self.current_json_path.absolute())]) def on_click(self, event): if not self.is_viewing: return x = int(event.x / self.scale_factor) y = int(event.y / self.scale_factor) w, h = self.orig_size box = [ int(max(0, y - 5) / h * 1000), int(max(0, x - 5) / (w- padding) * 1000), int(min(h, y + 5) / h * 1000), int(min(w, x + 5) / (w - padding) * 1000), ] box_str = "{ \"box_2d\": " + str(box) + ", \"label\": \"\" }," print(f"Copied box at ({x},{y}): {box_str}") self.root.clipboard_clear() self.root.clipboard_append(box_str) def natural_key(text): return [int(c) if c.isdigit() else c.lower() for c in re.split(r'(\d+)', str(text))] if __name__ == "__main__": if len(sys.argv) < 2: print("Usage: python plotting.py ") sys.exit(1) input_path = Path(sys.argv[1]) files_to_process = [] if input_path.is_file(): base_dir = input_path.parent stem = input_path.stem img_path = base_dir / "Cutleft" / f"{stem}.jpg" files_to_process = [img_path] if not img_path.exists() and input_path.parent.name == "Cutleft": base_dir = input_path.parent.parent img_path = input_path files_to_process = [img_path] if not img_path.exists(): # We're given Copie01.pdf, look for parts cutleft_dir = base_dir / "Cutleft" files_to_process = sorted(list(cutleft_dir.glob(f"{img_path.stem}_*.jpg")), key=natural_key) else: base_dir = input_path cutleft_dir = base_dir / "Cutleft" if not cutleft_dir.exists(): print(f"Error: {cutleft_dir} does not exist.") sys.exit(1) files_to_process = sorted(cutleft_dir.glob("*.jpg")) try: all_labels = sorted(list(filter(None, (base_dir / "labels").read_text().splitlines())), key = natural_key) except FileNotFoundError: all_labels = [] t = threading.Thread(target=worker_thread, args=(base_dir, files_to_process, all_labels)) t.daemon = True t.start() root = tk.Tk() app = ImageViewer(root, base_dir) root.mainloop()