Copies/annotating_with_checks.py

243 lines
8.4 KiB
Python

import sys
import os
import json
import shutil
import concurrent.futures
import threading
import img2pdf
from reportlab.pdfgen import canvas
# Fix for Matplotlib in threads: Set backend to non-interactive 'Agg'
import matplotlib
matplotlib.use('Agg')
from PIL import Image, ImageDraw, ImageFont
import annotating
from annotating import MARGIN_LEFT, ANNOT_WIDTH
# Global lock for Matplotlib/Latex rendering to prevent race conditions
LATEX_LOCK = threading.Lock()
DPI = 100
BOX_SIZE = 30
SCORE_BOX_SIZE = 40
SCORES = [x * 0.5 for x in range(9)] # 0.0 to 4.0
try:
CHECKBOX_FONT = ImageFont.truetype("DejaVuSans.ttf", 20)
except IOError:
try:
CHECKBOX_FONT = ImageFont.truetype("arial.ttf", 20)
except IOError:
CHECKBOX_FONT = ImageFont.load_default()
def draw_checkbox(draw, x, y, size=BOX_SIZE, label=None, fill="white"):
if label:
draw.text((x - BOX_SIZE-5, y + 2), str(label), fill="black", font=CHECKBOX_FONT)
draw.rectangle([x, y, x + size, y + size], fill=fill, outline="black", width=2)
return [x, y, x + size, y + size]
def safe_render_latex(*args, **kwargs):
"""Thread-safe wrapper for latex rendering."""
# with LATEX_LOCK:
# return annotating.render_latex_text(*args, **kwargs)
return annotating.render_real_latex_text(*args, **kwargs)
class CheckboxRenderer:
def __init__(self, label_name):
self.label = label_name
self.checkboxes = [] # List of {type, box, etc.}
def callback(self, kind, draw, pos, meta):
"""
Called by compose_label_image during rendering.
pos contains {x, y, w, h} or {box}.
meta contains {data, index, etc.}
"""
if kind == "header_item":
# meta['data'] is either result object (for score) or feedback object
if meta.get("type") == "score":
# Draw score boxes
start_x = pos['w'] + 20
for val in SCORES:
box = draw_checkbox(draw, start_x, pos['y'] + 25,
SCORE_BOX_SIZE, str(val))
self.checkboxes.append({
"type": "score", "label": self.label, "value": val,
"rel_box": box # Will be adjusted for global Y later
})
start_x += SCORE_BOX_SIZE + 60
elif meta.get("type") == "global_fb":
# Draw delete box for global feedback
bx = pos['w'] - BOX_SIZE - 5
by = pos['y'] + 5
box = draw_checkbox(draw, bx, by, BOX_SIZE)
self.checkboxes.append({
"type": "del_global", "label": self.label, "index": meta["index"],
"rel_box": box, "text_preview": meta["data"]["text"][:20]
})
elif kind == "local_rect":
# Delete rect checkbox
b = pos['box'] # [xmin, ymin, xmax, ymax]
box = draw_checkbox(draw, b[2] - BOX_SIZE, b[1], BOX_SIZE)
self.checkboxes.append({
"type": "del_local_rect", "label": self.label, "index": meta["index"],
"final_box": box, "text_preview": meta["data"]["text"][:20]
})
elif kind == "local_text":
# Delete whole local feedback checkbox
bx = pos['x'] + pos['w'] - BOX_SIZE
by = pos['y']
box = draw_checkbox(draw, bx, by, BOX_SIZE)
self.checkboxes.append({
"type": "del_local", "label": self.label, "index": meta["index"],
"final_box": box, "text_preview": meta["data"]["text"][:20]
})
from utils import natural_key
def process_student(args):
"""Thread worker: Processes one student."""
root_dir, student_id, labels, overwrite = args
output_dir = os.path.join(root_dir, "Bnot", f"Copie{student_id}")
if os.path.exists(output_dir):
if not overwrite:
print(f"Skipping {student_id}: Output already exists.")
return
shutil.rmtree(output_dir)
print(f"Generating Checkable PDF for: {student_id}")
os.makedirs(output_dir)
label_images = []
# ... (rest of the function remains exactly the same)
all_checkboxes = []
bnote_entries = [] # For bnote.json
sorted_labels = sorted(labels.items(), key=lambda x: natural_key(x[0]))
for label, content in sorted_labels:
pdf_path = content['pdf_path']
if not os.path.exists(pdf_path): continue
base_img, _, _ = annotating.make_base_image(pdf_path)
# Initialize the hook
cb_renderer = CheckboxRenderer(label)
# Render using the shared engine
final_img, header_h = annotating.compose_label_image(
base_img, label, content['result'], content['coordinates'][0],
draw_callback=cb_renderer.callback
)
if final_img == None:
continue
label_images.append(final_img)
all_checkboxes.append(cb_renderer.checkboxes)
bnote_entries.append({
"id": student_id,
"label": label,
"header_height": header_h,
# hmin/hmax will be filled during concatenation
"img_h": final_img.height
})
if not label_images: return
# Concatenate
max_w = max(i.width for i in label_images)
total_h = sum(i.height for i in label_images)
concat_img = Image.new("RGB", (max_w, total_h), "white")
final_json_map = []
current_y = 0
for idx, (img, boxes) in enumerate(zip(label_images, all_checkboxes)):
concat_img.paste(img, (0, current_y))
bnote_entries[idx]["hmin"] = current_y
bnote_entries[idx]["hmax"] = current_y + img.height
del bnote_entries[idx]["img_h"] # Clean up temp data
# Adjust coordinates for concatenated image
for item in boxes:
# item might have 'rel_box' (header) or 'final_box' (local)
# Both were relative to the label image. We just add current_y.
b = item.get('final_box') or item.get('rel_box')
item['global_box'] = [b[0], b[1] + current_y, b[2], b[3] + current_y]
final_json_map.append(item)
current_y += img.height
bnote_data = {
"width": max_w,
"height": total_h,
"images": bnote_entries
}
with open(os.path.join(output_dir, "bnote.json"), "w") as f:
json.dump(bnote_data, f, indent=2)
with open(os.path.join(output_dir, "checkboxes.json"), "w") as f:
json.dump(final_json_map, f, indent=2)
temp_img_path = os.path.join(output_dir, "Reference.jpg") # Can't use png here
concat_img.save(temp_img_path, quality=90)
pdf_path = os.path.join(output_dir, "Concat.pdf")
w, h = concat_img.size
c = canvas.Canvas(pdf_path, pagesize=(w, h))
c.drawImage(temp_img_path, 0, 0, width=w, height=h)
c.save()
import argparse # Added
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Generate annotated PDFs.")
parser.add_argument("input_path", help="Directory or specific file path")
parser.add_argument("--overwrite", action="store_true", help="Overwrite existing output files")
args = parser.parse_args()
input_path = args.input_path
overwrite = args.overwrite # Capture flag
target_id = None
# Detect if input is a specific file
if os.path.isfile(input_path):
root_dir = os.path.dirname(input_path) or "."
# Extract ID from filename (e.g., Copie40.pdf -> 40)
match = re.search(r'Copie(\d+)', os.path.basename(input_path))
if match:
target_id = match.group(1)
else:
print("Error: Could not extract student ID from filename.")
sys.exit(1)
else:
root_dir = input_path
results = annotating.make_dictionary(root_dir)
# Filter results if a specific target ID was requested
if target_id:
if target_id in results:
results = {target_id: results[target_id]}
else:
print(f"Student ID {target_id} not found in directory scan.")
results = {}
tasks = sorted([(root_dir, sid, lbls, overwrite) for sid, lbls in results.items()])
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
results = executor.map(process_student, tasks)
try:
for _ in results:
pass
except Exception:
import traceback
traceback.print_exc()