246 lines
8.3 KiB
Python
246 lines
8.3 KiB
Python
import sys
|
|
import os
|
|
import json
|
|
import shutil
|
|
import concurrent.futures
|
|
import threading
|
|
|
|
# Fix for Matplotlib in threads: Set backend to non-interactive 'Agg'
|
|
import matplotlib
|
|
matplotlib.use('Agg')
|
|
|
|
from PIL import Image, ImageDraw, ImageFont
|
|
import annotating
|
|
from annotating import MARGIN_LEFT, ANNOT_WIDTH
|
|
|
|
# Global lock for Matplotlib/Latex rendering to prevent race conditions
|
|
LATEX_LOCK = threading.Lock()
|
|
DPI = 100
|
|
BOX_SIZE = 30
|
|
SCORE_BOX_SIZE = 30
|
|
SCORES = [x * 0.5 for x in range(9)] # 0.0 to 4.0
|
|
|
|
try:
|
|
CHECKBOX_FONT = ImageFont.truetype("DejaVuSans.ttf", 20)
|
|
except IOError:
|
|
try:
|
|
CHECKBOX_FONT = ImageFont.truetype("arial.ttf", 20)
|
|
except IOError:
|
|
CHECKBOX_FONT = ImageFont.load_default()
|
|
|
|
def draw_checkbox(draw, x, y, size=BOX_SIZE, label=None, fill="white"):
|
|
if label:
|
|
draw.text((x - BOX_SIZE-5, y + 2), str(label), fill="black", font=CHECKBOX_FONT)
|
|
draw.rectangle([x, y, x + size, y + size], fill=fill, outline="black", width=2)
|
|
|
|
return [x, y, x + size, y + size]
|
|
|
|
def safe_render_latex(text, **kwargs):
|
|
"""Thread-safe wrapper for latex rendering."""
|
|
with LATEX_LOCK:
|
|
return annotating.render_latex_text(text, **kwargs)
|
|
|
|
def render_header(label, score, feedbacks, base_width):
|
|
"""Generates the score line and global feedback elements."""
|
|
elements = []
|
|
|
|
# Score Line
|
|
score_text_img = safe_render_latex(f"{label} ; Note : {score}", width_px=base_width // 2, fontsize=18)
|
|
score_line_h = max(score_text_img.height, SCORE_BOX_SIZE + 10)
|
|
score_line_img = Image.new("RGBA", (base_width, score_line_h), (255, 255, 255, 0))
|
|
score_line_img.paste(score_text_img, (0, 0))
|
|
|
|
draw_score = ImageDraw.Draw(score_line_img)
|
|
start_x = score_text_img.width + 20
|
|
local_boxes = []
|
|
|
|
for val in SCORES:
|
|
box = draw_checkbox(draw_score, start_x, 5, SCORE_BOX_SIZE, str(val))
|
|
local_boxes.append({
|
|
"type": "score", "label": label, "value": val,
|
|
"rel_box": box, "elem_y": 0
|
|
})
|
|
start_x += SCORE_BOX_SIZE + 60
|
|
elements.append((score_line_img, local_boxes))
|
|
|
|
# Global Feedback
|
|
for i, fb in enumerate(feedbacks):
|
|
fb_img = safe_render_latex(fb['text'], width_px=base_width)
|
|
draw_fb = ImageDraw.Draw(fb_img)
|
|
bx = fb_img.width - BOX_SIZE - 5
|
|
by = 5
|
|
box = draw_checkbox(draw_fb, bx, by, BOX_SIZE)
|
|
|
|
elements.append((fb_img, [{
|
|
"type": "del_global", "label": label, "index": i,
|
|
"text_preview": fb['text'][:20], "rel_box": box, "elem_y": 0
|
|
}]))
|
|
|
|
return elements
|
|
|
|
def process_label(root_dir, student_id, label, content):
|
|
"""Processes a single label (PDF) -> Annotated Image + Checkboxes."""
|
|
copie_folder = f"Copie{student_id}"
|
|
pdf_path = os.path.join(root_dir, copie_folder, f"{label}.pdf")
|
|
|
|
if not os.path.exists(pdf_path):
|
|
return None, []
|
|
|
|
base_img, total_h, max_w = annotating.make_base_image(pdf_path)
|
|
if not base_img:
|
|
return None, []
|
|
|
|
# Extract Data
|
|
coordinates = content.get('coordinates', (0, 0))
|
|
hmin = coordinates[0]
|
|
result = content.get('result', {})
|
|
score = result.get('score', 0)
|
|
feedbacks = result.get('feedback', [])
|
|
|
|
global_fb = [f for f in feedbacks if not f.get('box_2d')]
|
|
local_fb = [f for f in feedbacks if f.get('box_2d')]
|
|
local_fb.sort(key=lambda x: x['box_2d'][0])
|
|
|
|
checkbox_map = []
|
|
|
|
# 1. Render Header
|
|
header_elements = render_header(label, score, global_fb, base_img.width)
|
|
header_height = sum(x[0].height for x in header_elements)
|
|
|
|
# 2. Assemble Base + Header
|
|
total_height = base_img.height + header_height
|
|
final_img = Image.new("RGB", (base_img.width + MARGIN_LEFT, total_height), "white")
|
|
|
|
current_y = 0
|
|
for img, boxes in header_elements:
|
|
final_img.paste(img, (0, current_y))
|
|
for b in boxes:
|
|
b['final_box'] = [b['rel_box'][0], b['rel_box'][1] + current_y,
|
|
b['rel_box'][2], b['rel_box'][3] + current_y]
|
|
checkbox_map.append(b)
|
|
current_y += img.height
|
|
|
|
image_offset_y = current_y
|
|
final_img.paste(base_img, (MARGIN_LEFT, image_offset_y))
|
|
|
|
# 3. Draw Local Annotations
|
|
draw = ImageDraw.Draw(final_img, "RGBA")
|
|
last_text_bottom = 0
|
|
|
|
for i, fb in enumerate(local_fb):
|
|
box = fb.get('box_2d')
|
|
if not box: continue
|
|
|
|
ymin, xmin, ymax, xmax = box
|
|
target_ymin = (ymin - hmin) + image_offset_y
|
|
target_ymax = (ymax - hmin) + image_offset_y
|
|
target_xmin = xmin + MARGIN_LEFT
|
|
target_xmax = xmax + MARGIN_LEFT
|
|
|
|
draw.rectangle([target_xmin, target_ymin, target_xmax, target_ymax], outline="red", width=3)
|
|
|
|
rect_cb_box = draw_checkbox(draw, target_xmax - BOX_SIZE, target_ymin, BOX_SIZE)
|
|
checkbox_map.append({
|
|
"type": "del_local_rect", "label": label, "index": i,
|
|
"text_preview": fb['text'][:20], "final_box": rect_cb_box
|
|
})
|
|
|
|
txt_img_raw = safe_render_latex(fb['text'], width_px=ANNOT_WIDTH,
|
|
bg_color=(255, 200, 200, 180), max_lines=3)
|
|
container_h = max(txt_img_raw.height, BOX_SIZE)
|
|
txt_img = Image.new("RGBA", (ANNOT_WIDTH, container_h), (255, 255, 255, 0))
|
|
txt_img.paste(txt_img_raw, (0, 0))
|
|
|
|
d_txt = ImageDraw.Draw(txt_img)
|
|
draw_checkbox(d_txt, ANNOT_WIDTH - BOX_SIZE, 0, BOX_SIZE)
|
|
|
|
center_y = (target_ymin + target_ymax) / 2
|
|
paste_y = max(center_y - (txt_img.height / 2), image_offset_y)
|
|
if paste_y < last_text_bottom:
|
|
paste_y = last_text_bottom + 5
|
|
|
|
req_h = int(paste_y + txt_img.height + 20)
|
|
if req_h > final_img.height:
|
|
new_final = Image.new("RGB", (final_img.width, req_h), "white")
|
|
new_final.paste(final_img, (0,0))
|
|
final_img = new_final
|
|
draw = ImageDraw.Draw(final_img, "RGBA")
|
|
|
|
final_img.paste(txt_img, (10, int(paste_y)), mask=txt_img)
|
|
checkbox_map.append({
|
|
"type": "del_local", "label": label, "index": i,
|
|
"text_preview": fb['text'][:20],
|
|
"final_box": [10 + ANNOT_WIDTH - BOX_SIZE, int(paste_y), 10 + ANNOT_WIDTH, int(paste_y) + BOX_SIZE]
|
|
})
|
|
last_text_bottom = paste_y + txt_img.height
|
|
|
|
return final_img, checkbox_map
|
|
|
|
import re
|
|
def natural_key(text):
|
|
return [int(c) if c.isdigit() else c.lower() for c in re.split(r'(\d+)', str(text))]
|
|
|
|
def process_student(args):
|
|
"""Thread worker: Processes one student."""
|
|
root_dir, student_id, labels = args
|
|
print(f"Generating Checkable PDF for: {student_id}")
|
|
|
|
output_dir = os.path.join(root_dir, "Bnot", f"Copie{student_id}")
|
|
if os.path.exists(output_dir):
|
|
shutil.rmtree(output_dir)
|
|
os.makedirs(output_dir)
|
|
|
|
label_images = []
|
|
student_checkboxes = []
|
|
processed_labels_order = []
|
|
|
|
for label, content in sorted(labels.items(), key=lambda x: natural_key(x[0])):
|
|
img, boxes = process_label(root_dir, student_id, label, content)
|
|
if img:
|
|
label_images.append(img)
|
|
student_checkboxes.append(boxes)
|
|
processed_labels_order.append(label)
|
|
|
|
if not label_images:
|
|
return
|
|
|
|
max_w = max(i.width for i in label_images)
|
|
total_h = sum(i.height for i in label_images)
|
|
concat_img = Image.new("RGB", (max_w, total_h), "white")
|
|
|
|
final_json_map = []
|
|
current_y = 0
|
|
|
|
for label_name, img, boxes in zip(processed_labels_order, label_images, student_checkboxes):
|
|
concat_img.paste(img, (0, current_y))
|
|
|
|
for item in boxes:
|
|
b = item['final_box']
|
|
item['global_box'] = [b[0], b[1] + current_y, b[2], b[3] + current_y]
|
|
final_json_map.append(item)
|
|
|
|
current_y += img.height
|
|
|
|
with open(os.path.join(output_dir, "checkboxes.json"), "w") as f:
|
|
json.dump(final_json_map, f, indent=2)
|
|
|
|
concat_img.save(os.path.join(output_dir, "Reference.png"))
|
|
concat_img.save(os.path.join(output_dir, "Concat.pdf"), "PDF", resolution=DPI)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
if len(sys.argv) < 2:
|
|
print("Usage: python annotating_with_checks.py <Dir>")
|
|
sys.exit(1)
|
|
|
|
root_dir = sys.argv[1]
|
|
results = annotating.make_dictionary(root_dir)
|
|
|
|
|
|
tasks = [(root_dir, sid, lbls) for sid, lbls in results.items()]
|
|
|
|
# print(tasks)
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
|
|
executor.map(process_student, tasks)
|