245 lines
8.4 KiB
Python
245 lines
8.4 KiB
Python
import sys
|
|
import os
|
|
import json
|
|
import shutil
|
|
import concurrent.futures
|
|
import threading
|
|
import img2pdf
|
|
from reportlab.pdfgen import canvas
|
|
|
|
# Fix for Matplotlib in threads: Set backend to non-interactive 'Agg'
|
|
import matplotlib
|
|
matplotlib.use('Agg')
|
|
|
|
from PIL import Image, ImageDraw, ImageFont
|
|
import annotating
|
|
from annotating import MARGIN_LEFT, ANNOT_WIDTH
|
|
|
|
# Global lock for Matplotlib/Latex rendering to prevent race conditions
|
|
LATEX_LOCK = threading.Lock()
|
|
DPI = 100
|
|
BOX_SIZE = 30
|
|
SCORE_BOX_SIZE = 30
|
|
SCORES = [x * 0.5 for x in range(9)] # 0.0 to 4.0
|
|
|
|
try:
|
|
CHECKBOX_FONT = ImageFont.truetype("DejaVuSans.ttf", 20)
|
|
except IOError:
|
|
try:
|
|
CHECKBOX_FONT = ImageFont.truetype("arial.ttf", 20)
|
|
except IOError:
|
|
CHECKBOX_FONT = ImageFont.load_default()
|
|
|
|
def draw_checkbox(draw, x, y, size=BOX_SIZE, label=None, fill="white"):
|
|
if label:
|
|
draw.text((x - BOX_SIZE-5, y + 2), str(label), fill="black", font=CHECKBOX_FONT)
|
|
draw.rectangle([x, y, x + size, y + size], fill=fill, outline="black", width=2)
|
|
|
|
return [x, y, x + size, y + size]
|
|
|
|
def safe_render_latex(*args, **kwargs):
|
|
"""Thread-safe wrapper for latex rendering."""
|
|
# with LATEX_LOCK:
|
|
# return annotating.render_latex_text(*args, **kwargs)
|
|
return annotating.render_real_latex_text(*args, **kwargs)
|
|
|
|
class CheckboxRenderer:
|
|
def __init__(self, label_name):
|
|
self.label = label_name
|
|
self.checkboxes = [] # List of {type, box, etc.}
|
|
|
|
def callback(self, kind, draw, pos, meta):
|
|
"""
|
|
Called by compose_label_image during rendering.
|
|
pos contains {x, y, w, h} or {box}.
|
|
meta contains {data, index, etc.}
|
|
"""
|
|
if kind == "header_item":
|
|
# meta['data'] is either result object (for score) or feedback object
|
|
if meta.get("type") == "score":
|
|
# Draw score boxes
|
|
start_x = pos['w'] + 20
|
|
for val in SCORES:
|
|
box = draw_checkbox(draw, start_x, pos['y'] + 25, BOX_SIZE, str(val))
|
|
self.checkboxes.append({
|
|
"type": "score", "label": self.label, "value": val,
|
|
"rel_box": box # Will be adjusted for global Y later
|
|
})
|
|
start_x += BOX_SIZE + 60
|
|
elif meta.get("type") == "global_fb":
|
|
# Draw delete box for global feedback
|
|
bx = pos['w'] - BOX_SIZE - 5
|
|
by = pos['y'] + 5
|
|
box = draw_checkbox(draw, bx, by, BOX_SIZE)
|
|
self.checkboxes.append({
|
|
"type": "del_global", "label": self.label, "index": meta["index"],
|
|
"rel_box": box, "text_preview": meta["data"]["text"][:20]
|
|
})
|
|
|
|
elif kind == "local_rect":
|
|
# Delete rect checkbox
|
|
b = pos['box'] # [xmin, ymin, xmax, ymax]
|
|
box = draw_checkbox(draw, b[2] - BOX_SIZE, b[1], BOX_SIZE)
|
|
self.checkboxes.append({
|
|
"type": "del_local_rect", "label": self.label, "index": meta["index"],
|
|
"final_box": box, "text_preview": meta["data"]["text"][:20]
|
|
})
|
|
|
|
elif kind == "local_text":
|
|
# Delete whole local feedback checkbox
|
|
bx = pos['x'] + pos['w'] - BOX_SIZE
|
|
by = pos['y']
|
|
box = draw_checkbox(draw, bx, by, BOX_SIZE)
|
|
self.checkboxes.append({
|
|
"type": "del_local", "label": self.label, "index": meta["index"],
|
|
"final_box": box, "text_preview": meta["data"]["text"][:20]
|
|
})
|
|
|
|
import re
|
|
def natural_key(text):
|
|
return [int(c) if c.isdigit() else c.lower() for c in re.split(r'(\d+)', str(text))]
|
|
|
|
def process_student(args):
|
|
"""Thread worker: Processes one student."""
|
|
root_dir, student_id, labels, overwrite = args
|
|
|
|
output_dir = os.path.join(root_dir, "Bnot", f"Copie{student_id}")
|
|
|
|
if os.path.exists(output_dir):
|
|
if not overwrite:
|
|
print(f"Skipping {student_id}: Output already exists.")
|
|
return
|
|
shutil.rmtree(output_dir)
|
|
|
|
print(f"Generating Checkable PDF for: {student_id}")
|
|
os.makedirs(output_dir)
|
|
|
|
label_images = []
|
|
# ... (rest of the function remains exactly the same)
|
|
all_checkboxes = []
|
|
bnote_entries = [] # For bnote.json
|
|
|
|
sorted_labels = sorted(labels.items(), key=lambda x: natural_key(x[0]))
|
|
|
|
for label, content in sorted_labels:
|
|
pdf_path = content['pdf_path']
|
|
if not os.path.exists(pdf_path): continue
|
|
|
|
base_img, _, _ = annotating.make_base_image(pdf_path)
|
|
|
|
# Initialize the hook
|
|
cb_renderer = CheckboxRenderer(label)
|
|
|
|
# Render using the shared engine
|
|
final_img, header_h = annotating.compose_label_image(
|
|
base_img, label, content['result'], content['coordinates'][0],
|
|
render_fn=safe_render_latex,
|
|
draw_callback=cb_renderer.callback
|
|
)
|
|
if final_img == None:
|
|
continue
|
|
|
|
label_images.append(final_img)
|
|
all_checkboxes.append(cb_renderer.checkboxes)
|
|
bnote_entries.append({
|
|
"id": student_id,
|
|
"label": label,
|
|
"header_height": header_h,
|
|
# hmin/hmax will be filled during concatenation
|
|
"img_h": final_img.height
|
|
})
|
|
|
|
if not label_images: return
|
|
|
|
# Concatenate
|
|
max_w = max(i.width for i in label_images)
|
|
total_h = sum(i.height for i in label_images)
|
|
concat_img = Image.new("RGB", (max_w, total_h), "white")
|
|
|
|
final_json_map = []
|
|
current_y = 0
|
|
|
|
for idx, (img, boxes) in enumerate(zip(label_images, all_checkboxes)):
|
|
concat_img.paste(img, (0, current_y))
|
|
|
|
bnote_entries[idx]["hmin"] = current_y
|
|
bnote_entries[idx]["hmax"] = current_y + img.height
|
|
del bnote_entries[idx]["img_h"] # Clean up temp data
|
|
|
|
# Adjust coordinates for concatenated image
|
|
for item in boxes:
|
|
# item might have 'rel_box' (header) or 'final_box' (local)
|
|
# Both were relative to the label image. We just add current_y.
|
|
b = item.get('final_box') or item.get('rel_box')
|
|
item['global_box'] = [b[0], b[1] + current_y, b[2], b[3] + current_y]
|
|
final_json_map.append(item)
|
|
|
|
current_y += img.height
|
|
|
|
bnote_data = {
|
|
"width": max_w,
|
|
"height": total_h,
|
|
"images": bnote_entries
|
|
}
|
|
with open(os.path.join(output_dir, "bnote.json"), "w") as f:
|
|
json.dump(bnote_data, f, indent=2)
|
|
|
|
with open(os.path.join(output_dir, "checkboxes.json"), "w") as f:
|
|
json.dump(final_json_map, f, indent=2)
|
|
|
|
temp_img_path = os.path.join(output_dir, "Reference.jpg") # Can't use png here
|
|
concat_img.save(temp_img_path, quality=90)
|
|
|
|
pdf_path = os.path.join(output_dir, "Concat.pdf")
|
|
w, h = concat_img.size
|
|
c = canvas.Canvas(pdf_path, pagesize=(w, h))
|
|
c.drawImage(temp_img_path, 0, 0, width=w, height=h)
|
|
c.save()
|
|
|
|
import argparse # Added
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Generate annotated PDFs.")
|
|
parser.add_argument("input_path", help="Directory or specific file path")
|
|
parser.add_argument("--overwrite", action="store_true", help="Overwrite existing output files")
|
|
|
|
args = parser.parse_args()
|
|
|
|
input_path = args.input_path
|
|
overwrite = args.overwrite # Capture flag
|
|
target_id = None
|
|
|
|
# Detect if input is a specific file
|
|
if os.path.isfile(input_path):
|
|
root_dir = os.path.dirname(input_path) or "."
|
|
# Extract ID from filename (e.g., Copie40.pdf -> 40)
|
|
match = re.search(r'Copie(\d+)', os.path.basename(input_path))
|
|
if match:
|
|
target_id = match.group(1)
|
|
else:
|
|
print("Error: Could not extract student ID from filename.")
|
|
sys.exit(1)
|
|
else:
|
|
root_dir = input_path
|
|
|
|
results = annotating.make_dictionary(root_dir)
|
|
|
|
# Filter results if a specific target ID was requested
|
|
if target_id:
|
|
if target_id in results:
|
|
results = {target_id: results[target_id]}
|
|
else:
|
|
print(f"Student ID {target_id} not found in directory scan.")
|
|
results = {}
|
|
|
|
tasks = sorted([(root_dir, sid, lbls, overwrite) for sid, lbls in results.items()])
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
|
|
results = executor.map(process_student, tasks)
|
|
try:
|
|
for _ in results:
|
|
pass
|
|
except Exception:
|
|
import traceback
|
|
traceback.print_exc()
|