import sys import json import threading import queue import subprocess import tkinter as tk from pathlib import Path from PIL import Image, ImageDraw, ImageFont, ImageTk # --- Configuration & Globals --- padding = 60 # Queue payload: (pil_image, json_path, metadata) # metadata is a dict: {'copie': str, 'part': int, 'schema': dict} image_queue = queue.Queue(maxsize=5) try: font = ImageFont.truetype("DejaVuSans.ttf", size=30) except OSError: font = ImageFont.load_default() # --- Helper Functions (Shared) --- def page_number(b, nb_pages): column_width = 1000 // nb_pages center_x = (b[1] + b[3]) // 2 return center_x // column_width def convert_box2d(b, pn_ori, npn, tot_ori, tot_dest): l = b.copy() l[1] = (l[1] - (1000 // tot_ori) * (pn_ori-1)) * tot_ori // tot_dest\ + (1000 // tot_dest) * (npn - 1) l[3] = (l[3] - (1000 // tot_ori) * (pn_ori-1)) * tot_ori // tot_dest\ + (1000 // tot_dest) * (npn - 1) return l def convert_list(l, group_id, json_schema): ll = [] nb_pages = json_schema["columns_per_file"][group_id-1] nb_previous_pages = sum([json_schema["columns_per_file"][i] for i in range(group_id-1)]) nb_tot_pages = sum([e for e in json_schema["columns_per_file"]]) for e in l: ee = e.copy() pn = page_number(e["box_2d"], nb_pages) npn = pn + nb_previous_pages ee["box_2d"] = convert_box2d(ee["box_2d"], pn, npn, nb_pages, nb_tot_pages) ee["part"] = group_id ee["pn"] = npn ll.append(ee) return ll def prepare_image(image_path: str, bounding_boxes, all_labels, nb_pages): im = Image.open(image_path) im.load() width, height = im.size new_im = Image.new(im.mode, (width + padding, height), "white") new_im.paste(im, (0, 0)) draw = ImageDraw.Draw(new_im) bounding_boxes.sort(key=lambda b: (page_number(b["box_2d"], nb_pages), b["box_2d"][0])) last_label_index = -1 for bbox in bounding_boxes: raw_y_min = int(bbox["box_2d"][0] * height / 1000) raw_x_min = int(bbox["box_2d"][1] * width / 1000) raw_y_max = int(bbox["box_2d"][2] * height / 1000) raw_x_max = int(bbox["box_2d"][3] * width / 1000) abs_y_min = max(0, raw_y_min - 10) abs_x_min = max(0, raw_x_min - 10) abs_y_max = min(height, raw_y_max + 10) abs_x_max = min(width, raw_x_max + 10) color = "black" label = bbox.get("label") if label and label in all_labels: current_index = all_labels.index(label) if current_index < last_label_index: color = "red" last_label_index = current_index draw.rectangle(((abs_x_min, abs_y_min), (abs_x_max, abs_y_max)), outline=color, width=4) if label: if abs_y_min > 80: draw.text((abs_x_min + 8, abs_y_min - 30), label, fill=color, font=font) else: draw.text((abs_x_min + 8, abs_y_max + 6), label, fill=color, font=font) return new_im # --- Processing Logic (Worker Thread) --- def worker_thread(base_dir, files_to_process, all_labels): """ Iterates through files, prepares VISUALS only, and puts metadata in queue. Does NOT write final JSON files anymore. """ for img_path in files_to_process: json_path = base_dir / f"{img_path.stem}.json" copie_part = int(img_path.stem[-2:]) copie = img_path.stem[:-3] json_schema_path = base_dir / 'Cutleft' / f"{copie}_schema.json" try: with open(json_schema_path, 'r') as f: json_schema = json.load(f) except: print("No json_schema : ", json_schema_path) continue nb_pages = json_schema["columns_per_file"][copie_part-1] if json_path.exists(): try: # Read strictly for visualization purposes with open(json_path, 'r') as f: json_result = json.load(f) bb_list = json_result.get("list", []) print(f"Buffering {img_path.name}...") pil_image = prepare_image(str(img_path), bb_list, all_labels, nb_pages) # Package metadata needed for final calculation later metadata = { "copie": copie, "part": copie_part, "schema": json_schema, "name": json_result.get("name", "") } image_queue.put((pil_image, json_path, metadata)) except Exception as e: print(f"Error processing {img_path.name}: {e}") # Sentinel to indicate finished image_queue.put((None, None, None)) # --- GUI Logic (Main Thread) --- class ImageViewer: def __init__(self, root, base_dir): self.root = root self.base_dir = base_dir self.root.title("Bounding Box Viewer") self.label = tk.Label(root, text="Waiting for images...") self.label.pack(expand=True, fill="both") # Display State self.current_image = None self.current_json_path = None self.current_meta = None # Stores schema/copie info self.is_viewing = False self.scale_factor = 1.0 self.orig_size = (1, 1) # Data Aggregation State self.active_copie_name = None self.accumulated_results = None # Dict with "name" and "list" # Bindings self.root.bind('', self.on_enter) self.root.bind('e', self.on_edit) self.root.bind('o', self.on_open_pdf) self.root.bind('', lambda e: self.root.quit()) self.label.bind('', self.on_click) self.poll_queue() def poll_queue(self): if not self.is_viewing: try: pil_image, json_path, metadata = image_queue.get_nowait() # Handle End of Stream if pil_image is None: self.save_current_batch() # Save any remaining data print("All images processed.") self.root.quit() return # Check if we switched to a new "Copie" group if self.active_copie_name != metadata["copie"]: self.save_current_batch() # Write previous group to disk # Start new batch self.active_copie_name = metadata["copie"] self.accumulated_results = {"name": metadata["name"], "list": []} self.display_image(pil_image, json_path, metadata) except queue.Empty: pass self.root.after(100, self.poll_queue) def save_current_batch(self): """Writes the accumulated data to the main JSON file.""" if self.active_copie_name and self.accumulated_results: main_json_path = self.base_dir / f"{self.active_copie_name}.json" print(f"Writing aggregated result to {main_json_path}") with open(main_json_path, 'w') as f: json.dump(self.accumulated_results, f) self.accumulated_results = None def display_image(self, pil_image, json_path, metadata): self.orig_size = pil_image.size self.scale_factor = 1.0 screen_h = self.root.winfo_screenheight() - 100 if pil_image.height > screen_h: self.scale_factor = screen_h / pil_image.height pil_image = pil_image.resize((int(pil_image.width * self.scale_factor), int(pil_image.height * self.scale_factor))) self.tk_image = ImageTk.PhotoImage(pil_image) self.label.config(image=self.tk_image, text=f"Processing: {json_path.name}") self.current_json_path = json_path self.current_meta = metadata self.is_viewing = True self.root.lift() def on_enter(self, event): if self.is_viewing: print(f"Committing data for {self.current_json_path.name}...") # --- CRITICAL CHANGE: Re-read JSON here to capture user edits --- try: with open(self.current_json_path, 'r') as f: current_data = json.load(f) # Perform the conversion now, post-edit converted_items = convert_list( current_data["list"], self.current_meta["part"], self.current_meta["schema"] ) # Add to accumulator if self.accumulated_results: self.accumulated_results["list"].extend(converted_items) # Update name just in case (though usually consistent per group) self.accumulated_results["name"] = current_data.get("name", self.accumulated_results["name"]) except Exception as e: print(f"Error re-reading/saving {self.current_json_path}: {e}") # Advance UI self.is_viewing = False self.label.config(image="", text="Loading next...") def on_open_pdf(self, event): if self.is_viewing and self.current_json_path: pdf_path = self.current_json_path.with_suffix(".pdf") print(f"Opening {pdf_path}") subprocess.Popen(['xdg-open', str(pdf_path)]) def on_edit(self, event): if self.is_viewing and self.current_json_path: print(f"Opening {self.current_json_path}") subprocess.Popen(['xdg-open', str(self.current_json_path)]) def on_click(self, event): if not self.is_viewing: return x = int(event.x / self.scale_factor) y = int(event.y / self.scale_factor) w, h = self.orig_size box = [ int(max(0, y - 5) / h * 1000), int(max(0, x - 5) / (w- padding) * 1000), int(min(h, y + 5) / h * 1000), int(min(w, x + 5) / (w - padding) * 1000), ] box_str = "{ \"box_2d\": " + str(box) + ", \"label\": \"\" }," print(f"Copied box at ({x},{y}): {box_str}") self.root.clipboard_clear() self.root.clipboard_append(box_str) if __name__ == "__main__": if len(sys.argv) < 2: print("Usage: python plotting.py ") sys.exit(1) input_path = Path(sys.argv[1]) files_to_process = [] if input_path.is_file(): base_dir = input_path.parent stem = input_path.stem img_path = base_dir / "Cutleft" / f"{stem}.jpg" if not img_path.exists() and input_path.parent.name == "Cutleft": base_dir = input_path.parent.parent img_path = input_path if not img_path.exists(): print(f"Error: Could not find image at {img_path}") sys.exit(1) files_to_process = [img_path] else: base_dir = input_path cutleft_dir = base_dir / "Cutleft" if not cutleft_dir.exists(): print(f"Error: {cutleft_dir} does not exist.") sys.exit(1) files_to_process = sorted(cutleft_dir.glob("*.jpg")) try: all_labels = list(filter(None, (base_dir / "labels").read_text().splitlines())) except FileNotFoundError: all_labels = [] t = threading.Thread(target=worker_thread, args=(base_dir, files_to_process, all_labels)) t.daemon = True t.start() root = tk.Tk() app = ImageViewer(root, base_dir) root.mainloop()