Copies/plotting.py

311 lines
11 KiB
Python

import sys
import json
import threading
import queue
import subprocess
import tkinter as tk
from pathlib import Path
from PIL import Image, ImageDraw, ImageFont, ImageTk
# --- Configuration & Globals ---
padding = 60
# Queue payload: (pil_image, json_path, metadata)
# metadata is a dict: {'copie': str, 'part': int, 'schema': dict}
image_queue = queue.Queue(maxsize=5)
try:
font = ImageFont.truetype("DejaVuSans.ttf", size=30)
except OSError:
font = ImageFont.load_default()
# --- Helper Functions (Shared) ---
def page_number(b, nb_pages):
column_width = 1000 // nb_pages
center_x = (b[1] + b[3]) // 2
return center_x // column_width
def convert_box2d(b, pn_ori, npn, tot_ori, tot_dest):
l = b.copy()
l[1] = (l[1] - (1000 // tot_ori) * (pn_ori-1)) * tot_ori // tot_dest\
+ (1000 // tot_dest) * (npn - 1)
l[3] = (l[3] - (1000 // tot_ori) * (pn_ori-1)) * tot_ori // tot_dest\
+ (1000 // tot_dest) * (npn - 1)
return l
def convert_list(l, group_id, json_schema):
ll = []
nb_pages = json_schema["columns_per_file"][group_id-1]
nb_previous_pages = sum([json_schema["columns_per_file"][i] for i in range(group_id-1)])
nb_tot_pages = sum([e for e in json_schema["columns_per_file"]])
for e in l:
ee = e.copy()
pn = page_number(e["box_2d"], nb_pages)
npn = pn + nb_previous_pages
ee["box_2d"] = convert_box2d(ee["box_2d"], pn, npn, nb_pages, nb_tot_pages)
ee["part"] = group_id
ee["pn"] = npn
ll.append(ee)
return ll
def prepare_image(image_path: str, bounding_boxes, all_labels, nb_pages):
im = Image.open(image_path)
im.load()
width, height = im.size
new_im = Image.new(im.mode, (width + padding, height), "white")
new_im.paste(im, (0, 0))
draw = ImageDraw.Draw(new_im)
bounding_boxes.sort(key=lambda b: (page_number(b["box_2d"], nb_pages), b["box_2d"][0]))
last_label_index = -1
for bbox in bounding_boxes:
raw_y_min = int(bbox["box_2d"][0] * height / 1000)
raw_x_min = int(bbox["box_2d"][1] * width / 1000)
raw_y_max = int(bbox["box_2d"][2] * height / 1000)
raw_x_max = int(bbox["box_2d"][3] * width / 1000)
abs_y_min = max(0, raw_y_min - 10)
abs_x_min = max(0, raw_x_min - 10)
abs_y_max = min(height, raw_y_max + 10)
abs_x_max = min(width, raw_x_max + 10)
color = "black"
label = bbox.get("label")
if label and label in all_labels:
current_index = all_labels.index(label)
if current_index < last_label_index:
color = "red"
last_label_index = current_index
draw.rectangle(((abs_x_min, abs_y_min), (abs_x_max, abs_y_max)), outline=color, width=4)
if label:
if abs_y_min > 80:
draw.text((abs_x_min + 8, abs_y_min - 30), label, fill=color, font=font)
else:
draw.text((abs_x_min + 8, abs_y_max + 6), label, fill=color, font=font)
return new_im
# --- Processing Logic (Worker Thread) ---
def worker_thread(base_dir, files_to_process, all_labels):
"""
Iterates through files, prepares VISUALS only, and puts metadata in queue.
Does NOT write final JSON files anymore.
"""
for img_path in files_to_process:
json_path = base_dir / f"{img_path.stem}.json"
copie_part = int(img_path.stem[-2:])
copie = img_path.stem[:-3]
json_schema_path = base_dir / 'Cutleft' / f"{copie}_schema.json"
try:
with open(json_schema_path, 'r') as f:
json_schema = json.load(f)
except:
print("No json_schema : ", json_schema_path)
continue
nb_pages = json_schema["columns_per_file"][copie_part-1]
if json_path.exists():
try:
# Read strictly for visualization purposes
with open(json_path, 'r') as f:
json_result = json.load(f)
bb_list = json_result.get("list", [])
print(f"Buffering {img_path.name}...")
pil_image = prepare_image(str(img_path), bb_list, all_labels, nb_pages)
# Package metadata needed for final calculation later
metadata = {
"copie": copie,
"part": copie_part,
"schema": json_schema,
"name": json_result.get("name", "")
}
image_queue.put((pil_image, json_path, metadata))
except Exception as e:
print(f"Error processing {img_path.name}: {e}")
# Sentinel to indicate finished
image_queue.put((None, None, None))
# --- GUI Logic (Main Thread) ---
class ImageViewer:
def __init__(self, root, base_dir):
self.root = root
self.base_dir = base_dir
self.root.title("Bounding Box Viewer")
self.label = tk.Label(root, text="Waiting for images...")
self.label.pack(expand=True, fill="both")
# Display State
self.current_image = None
self.current_json_path = None
self.current_meta = None # Stores schema/copie info
self.is_viewing = False
self.scale_factor = 1.0
self.orig_size = (1, 1)
# Data Aggregation State
self.active_copie_name = None
self.accumulated_results = None # Dict with "name" and "list"
# Bindings
self.root.bind('<Return>', self.on_enter)
self.root.bind('e', self.on_edit)
self.root.bind('o', self.on_open_pdf)
self.root.bind('<Escape>', lambda e: self.root.quit())
self.label.bind('<Button-1>', self.on_click)
self.poll_queue()
def poll_queue(self):
if not self.is_viewing:
try:
pil_image, json_path, metadata = image_queue.get_nowait()
# Handle End of Stream
if pil_image is None:
self.save_current_batch() # Save any remaining data
print("All images processed.")
self.root.quit()
return
# Check if we switched to a new "Copie" group
if self.active_copie_name != metadata["copie"]:
self.save_current_batch() # Write previous group to disk
# Start new batch
self.active_copie_name = metadata["copie"]
self.accumulated_results = {"name": metadata["name"], "list": []}
self.display_image(pil_image, json_path, metadata)
except queue.Empty:
pass
self.root.after(100, self.poll_queue)
def save_current_batch(self):
"""Writes the accumulated data to the main JSON file."""
if self.active_copie_name and self.accumulated_results:
main_json_path = self.base_dir / f"{self.active_copie_name}.json"
print(f"Writing aggregated result to {main_json_path}")
with open(main_json_path, 'w') as f:
json.dump(self.accumulated_results, f)
self.accumulated_results = None
def display_image(self, pil_image, json_path, metadata):
self.orig_size = pil_image.size
self.scale_factor = 1.0
screen_h = self.root.winfo_screenheight() - 100
if pil_image.height > screen_h:
self.scale_factor = screen_h / pil_image.height
pil_image = pil_image.resize((int(pil_image.width * self.scale_factor),
int(pil_image.height * self.scale_factor)))
self.tk_image = ImageTk.PhotoImage(pil_image)
self.label.config(image=self.tk_image, text=f"Processing: {json_path.name}")
self.current_json_path = json_path
self.current_meta = metadata
self.is_viewing = True
self.root.lift()
def on_enter(self, event):
if self.is_viewing:
print(f"Committing data for {self.current_json_path.name}...")
# --- CRITICAL CHANGE: Re-read JSON here to capture user edits ---
try:
with open(self.current_json_path, 'r') as f:
current_data = json.load(f)
# Perform the conversion now, post-edit
converted_items = convert_list(
current_data["list"],
self.current_meta["part"],
self.current_meta["schema"]
)
# Add to accumulator
if self.accumulated_results:
self.accumulated_results["list"].extend(converted_items)
# Update name just in case (though usually consistent per group)
self.accumulated_results["name"] = current_data.get("name", self.accumulated_results["name"])
except Exception as e:
print(f"Error re-reading/saving {self.current_json_path}: {e}")
# Advance UI
self.is_viewing = False
self.label.config(image="", text="Loading next...")
def on_open_pdf(self, event):
if self.is_viewing and self.current_json_path:
pdf_path = self.current_json_path.with_suffix(".pdf")
print(f"Opening {pdf_path}")
subprocess.Popen(['xdg-open', str(pdf_path)])
def on_edit(self, event):
if self.is_viewing and self.current_json_path:
print(f"Opening {self.current_json_path}")
subprocess.Popen(['xdg-open', str(self.current_json_path)])
def on_click(self, event):
if not self.is_viewing: return
x = int(event.x / self.scale_factor)
y = int(event.y / self.scale_factor)
w, h = self.orig_size
box = [
int(max(0, y - 5) / h * 1000),
int(max(0, x - 5) / (w- padding) * 1000),
int(min(h, y + 5) / h * 1000),
int(min(w, x + 5) / (w - padding) * 1000),
]
box_str = "{ \"box_2d\": " + str(box) + ", \"label\": \"\" },"
print(f"Copied box at ({x},{y}): {box_str}")
self.root.clipboard_clear()
self.root.clipboard_append(box_str)
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python plotting.py <directory_or_file>")
sys.exit(1)
input_path = Path(sys.argv[1])
files_to_process = []
if input_path.is_file():
base_dir = input_path.parent
stem = input_path.stem
img_path = base_dir / "Cutleft" / f"{stem}.jpg"
if not img_path.exists() and input_path.parent.name == "Cutleft":
base_dir = input_path.parent.parent
img_path = input_path
if not img_path.exists():
print(f"Error: Could not find image at {img_path}")
sys.exit(1)
files_to_process = [img_path]
else:
base_dir = input_path
cutleft_dir = base_dir / "Cutleft"
if not cutleft_dir.exists():
print(f"Error: {cutleft_dir} does not exist.")
sys.exit(1)
files_to_process = sorted(cutleft_dir.glob("*.jpg"))
try:
all_labels = list(filter(None, (base_dir / "labels").read_text().splitlines()))
except FileNotFoundError:
all_labels = []
t = threading.Thread(target=worker_thread, args=(base_dir, files_to_process, all_labels))
t.daemon = True
t.start()
root = tk.Tk()
app = ImageViewer(root, base_dir)
root.mainloop()