Copies/plotting.py

352 lines
13 KiB
Python

import sys
import json
import threading
import re
import queue
import subprocess
import tkinter as tk
from tkinter import messagebox
from pathlib import Path
from PIL import Image, ImageDraw, ImageFont, ImageTk
print("o to open pdf, O original pdf, e to emacs part, i to interro, click for coordinates")
# --- Configuration & Globals ---
padding = 60
# Queue payload: (pil_image, json_path, metadata)
# metadata is a dict: {'copie': str, 'part': int, 'schema': dict}
image_queue = queue.Queue(maxsize=5)
try:
font = ImageFont.truetype("DejaVuSans.ttf", size=30)
except OSError:
font = ImageFont.load_default()
# --- Helper Functions (Shared) ---
def page_number(b, nb_pages):
column_width = 1000 // nb_pages
center_x = (b[1] + b[3]) // 2
return center_x // column_width
def convert_box2d(b, pn_ori, npn, tot_ori, tot_dest):
l = b.copy()
l[1] = (l[1] - (1000 // tot_ori) * (pn_ori-1)) * tot_ori // tot_dest\
+ (1000 // tot_dest) * (npn - 1)
l[3] = (l[3] - (1000 // tot_ori) * (pn_ori-1)) * tot_ori // tot_dest\
+ (1000 // tot_dest) * (npn - 1)
return l
def convert_list(l, group_id, json_schema):
ll = []
nb_pages = json_schema["columns_per_file"][group_id-1]
nb_previous_pages = sum([json_schema["columns_per_file"][i] for i in range(group_id-1)])
nb_tot_pages = sum([e for e in json_schema["columns_per_file"]])
for e in l:
ee = e.copy()
pn = page_number(e["box_2d"], nb_pages)
npn = pn + nb_previous_pages
ee["box_2d"] = convert_box2d(ee["box_2d"], pn, npn, nb_pages, nb_tot_pages)
ee["part"] = group_id
ee["pn"] = npn
ll.append(ee)
return ll
def prepare_image(image_path: str, bounding_boxes, all_labels, nb_pages, last_label_index):
im = Image.open(image_path)
im.load()
width, height = im.size
new_im = Image.new(im.mode, (width + padding, height), "white")
new_im.paste(im, (0, 0))
draw = ImageDraw.Draw(new_im)
bounding_boxes.sort(key=lambda b: (page_number(b["box_2d"], nb_pages), b["box_2d"][0]))
for bbox in bounding_boxes:
raw_y_min = int(bbox["box_2d"][0] * height / 1000)
raw_x_min = int(bbox["box_2d"][1] * width / 1000)
raw_y_max = int(bbox["box_2d"][2] * height / 1000)
raw_x_max = int(bbox["box_2d"][3] * width / 1000)
abs_y_min = max(0, raw_y_min - 10)
abs_x_min = max(0, raw_x_min - 10)
abs_y_max = min(height, raw_y_max + 10)
abs_x_max = min(width, raw_x_max + 10)
color = "black"
label = bbox.get("label")
if label and label in all_labels:
current_index = all_labels.index(label)
if current_index < last_label_index or (last_label_index == -1 and current_index != 0):
color = "red"
last_label_index = current_index
draw.rectangle(((abs_x_min, abs_y_min), (abs_x_max, abs_y_max)), outline=color, width=4)
if label:
if abs_y_min > 80:
draw.text((abs_x_min + 8, abs_y_min - 30), label, fill=color, font=font)
else:
draw.text((abs_x_min + 8, abs_y_max + 6), label, fill=color, font=font)
return (new_im, last_label_index)
# --- Processing Logic (Worker Thread) ---
def worker_thread(base_dir, files_to_process, all_labels):
"""
Iterates through files, prepares VISUALS only, and puts metadata in queue.
Does NOT write final JSON files anymore.
"""
previous_copie = None
last_label_index = None
for img_path in files_to_process:
json_path = base_dir / f"{img_path.stem}.json"
copie_part = int(img_path.stem[-2:])
copie = img_path.stem[:-3]
if copie != previous_copie:
last_label_index = -1
previous_copie = copie
json_schema_path = base_dir / 'Cutleft' / f"{copie}_schema.json"
try:
with open(json_schema_path, 'r') as f:
json_schema = json.load(f)
except:
print("No json_schema : ", json_schema_path)
continue
nb_pages = json_schema["columns_per_file"][copie_part-1]
if json_path.exists():
# Read strictly for visualization purposes
bb_list = []
json_name = ""
try:
with open(json_path, 'r') as f:
json_result = json.load(f)
bb_list = json_result.get("list", [])
json_name = json_result.get("name", "")
except Exception as e:
print(f"Warning: {json_path.name} is malformed! Loading blank. {e}")
# We do NOT skip; we continue so the user can fix it in the GUI
try:
print(f"Buffering {img_path.name}...")
(pil_image, last_label_index) = \
prepare_image(str(img_path), bb_list, all_labels, nb_pages, last_label_index)
metadata = {
"copie": copie,
"part": copie_part,
"schema": json_schema,
"name": json_name
}
image_queue.put((pil_image, json_path, metadata))
except Exception as e:
print(f"Error processing {img_path.name}: {e}")
# Sentinel to indicate finished
image_queue.put((None, None, None))
# --- GUI Logic (Main Thread) ---
class ImageViewer:
def __init__(self, root, base_dir):
self.root = root
self.base_dir = base_dir
self.root.title("Bounding Box Viewer")
self.label = tk.Label(root, text="Waiting for images...")
self.label.pack(expand=True, fill="both")
# Display State
self.current_image = None
self.current_json_path = None
self.current_meta = None # Stores schema/copie info
self.is_viewing = False
self.scale_factor = 1.0
self.orig_size = (1, 1)
# Data Aggregation State
self.active_copie_name = None
self.accumulated_results = None # Dict with "name" and "list"
# Bindings
self.root.bind('<Return>', self.on_enter)
self.root.bind('e', self.on_edit)
self.root.bind('o', self.on_open_pdf)
self.root.bind('i', self.on_open_interro)
self.root.bind('O', self.on_open_ori_pdf)
self.root.bind('<Escape>', lambda e: self.root.quit())
self.label.bind('<Button-1>', self.on_click)
self.poll_queue()
def poll_queue(self):
if not self.is_viewing:
try:
pil_image, json_path, metadata = image_queue.get_nowait()
# Handle End of Stream
if pil_image is None:
self.save_current_batch() # Save any remaining data
print("All images processed.")
self.root.quit()
return
# Check if we switched to a new "Copie" group
if self.active_copie_name != metadata["copie"]:
self.save_current_batch() # Write previous group to disk
# Start new batch
self.active_copie_name = metadata["copie"]
self.accumulated_results = {"name": metadata["name"], "list": []}
self.display_image(pil_image, json_path, metadata)
except queue.Empty:
pass
self.root.after(100, self.poll_queue)
def save_current_batch(self):
"""Writes the accumulated data to the main JSON file."""
if self.active_copie_name and self.accumulated_results:
main_json_path = self.base_dir / f"{self.active_copie_name}.json"
print(f"Writing aggregated result to {main_json_path}")
with open(main_json_path, 'w') as f:
json.dump(self.accumulated_results, f)
self.accumulated_results = None
def display_image(self, pil_image, json_path, metadata):
self.orig_size = pil_image.size
self.scale_factor = 1.0
screen_h = self.root.winfo_screenheight() - 100
if pil_image.height > screen_h:
self.scale_factor = screen_h / pil_image.height
pil_image = pil_image.resize((int(pil_image.width * self.scale_factor),
int(pil_image.height * self.scale_factor)))
self.tk_image = ImageTk.PhotoImage(pil_image)
self.label.config(image=self.tk_image, text=f"Processing: {json_path.name}")
self.current_json_path = json_path
self.current_meta = metadata
self.is_viewing = True
self.root.lift()
def on_enter(self, event):
if self.is_viewing:
print(f"Committing data for {self.current_json_path.name}...")
try:
with open(self.current_json_path, 'r') as f:
current_data = json.load(f)
# Perform the conversion now, post-edit
converted_items = convert_list(
current_data["list"],
self.current_meta["part"],
self.current_meta["schema"]
)
# Add to accumulator
if self.accumulated_results:
self.accumulated_results["list"].extend(converted_items)
# Update name just in case (though usually consistent per group)
if "name" in current_data and current_data["name"] != "Continued":
self.accumulated_results["name"] = current_data["name"]
except Exception as e:
# Warn user and STOP (do not advance to next image)
msg = f"Error reading {self.current_json_path.name}:\n\n{e}\n\nPlease press 'e' to fix it, then press Enter again."
print(msg)
messagebox.showerror("JSON Error", msg)
return # Abort advancement
# Advance UI
self.is_viewing = False
self.label.config(image="", text="Loading next...")
def on_open_pdf(self, event):
if self.is_viewing and self.current_json_path:
a = self.current_json_path.stem.split('_')[0] + ".pdf"
pdf_path = self.current_json_path.with_name(a)
print(f"Opening {pdf_path}")
subprocess.Popen(['xdg-open', str(pdf_path.absolute())])
def on_open_ori_pdf(self, event):
if self.is_viewing and self.current_json_path:
pdf_path = "/home/sebastien/Staging/Interro/" + str(base_dir) + "pdf"
print(f"Opening {pdf_path}")
subprocess.Popen(['xdg-open', pdf_path])
def on_open_interro(self, event):
if self.is_viewing and self.current_json_path:
new_filename = self.current_json_path.stem.split('_')[0] + ".pdf"
pdf_path = self.current_json_path.parent / "Copies Originales" / new_filename
print(f"Opening {pdf_path}")
subprocess.Popen(['xdg-open', str(pdf_path.absolute())])
def on_edit(self, event):
if self.is_viewing and self.current_json_path:
print(f"Opening {self.current_json_path}")
subprocess.Popen(['xdg-open', str(self.current_json_path.absolute())])
def on_click(self, event):
if not self.is_viewing: return
x = int(event.x / self.scale_factor)
y = int(event.y / self.scale_factor)
w, h = self.orig_size
box = [
int(max(0, y - 5) / h * 1000),
int(max(0, x - 5) / (w- padding) * 1000),
int(min(h, y + 5) / h * 1000),
int(min(w, x + 5) / (w - padding) * 1000),
]
box_str = "{ \"box_2d\": " + str(box) + ", \"label\": \"\" },"
print(f"Copied box at ({x},{y}): {box_str}")
self.root.clipboard_clear()
self.root.clipboard_append(box_str)
from utils import natural_key, read_all_labels
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python plotting.py <directory_or_file>")
sys.exit(1)
input_path = Path(sys.argv[1])
files_to_process = []
if input_path.is_file():
base_dir = input_path.parent
stem = input_path.stem
img_path = base_dir / "Cutleft" / f"{stem}.jpg"
files_to_process = [img_path]
if not img_path.exists() and input_path.parent.name == "Cutleft":
base_dir = input_path.parent.parent
img_path = input_path
files_to_process = [img_path]
if not img_path.exists():
# We're given Copie01.pdf, look for parts
cutleft_dir = base_dir / "Cutleft"
files_to_process = sorted(list(cutleft_dir.glob(f"{img_path.stem}_*.jpg")),
key=natural_key)
else:
base_dir = input_path
cutleft_dir = base_dir / "Cutleft"
if not cutleft_dir.exists():
print(f"Error: {cutleft_dir} does not exist.")
sys.exit(1)
files_to_process = sorted(cutleft_dir.glob("*.jpg"))
try:
all_labels = read_all_labels(base_dir)
except FileNotFoundError:
all_labels = []
t = threading.Thread(target=worker_thread, args=(base_dir, files_to_process, all_labels))
t.daemon = True
t.start()
root = tk.Tk()
app = ImageViewer(root, base_dir)
root.mainloop()