Copies/annotating_by_label.py

191 lines
6.2 KiB
Python

import sys
import os
import json
import shutil
import argparse
import concurrent.futures
from PIL import Image, ImageDraw
from reportlab.pdfgen import canvas
import annotating
import annotating_with_checks
# Roughly 10 A4 pages at 100 DPI
# MAX_HEIGHT_PX = 11690
MAX_HEIGHT_PX = 17000 # Can be increased by 10%.
# MAX_HEIGHT_PX = 16000
def render_item(item):
student_id, label, content = item
pdf_path = content['pdf_path']
if not os.path.exists(pdf_path):
return None
base_img, _, _ = annotating.make_base_image(pdf_path)
cb_renderer = annotating_with_checks.CheckboxRenderer(label)
final_img, header_h = annotating.compose_label_image(
base_img, label, content['result'], content['coordinates'][0],
render_fn=annotating_with_checks.safe_render_latex,
draw_callback=cb_renderer.callback
)
if final_img is None:
return None
return (student_id, label, final_img, header_h, cb_renderer.checkboxes)
def save_batch(batch, prefix, group_id, root_dir, overwrite):
output_dir = os.path.join(root_dir, "BGnot", f"{prefix} G{group_id}")
if os.path.exists(output_dir):
if not overwrite:
print(f"Skipping {output_dir}: Output already exists.")
return
shutil.rmtree(output_dir)
print(f"Generating Group PDF: {prefix} G{group_id} ({len(batch)} elements)")
os.makedirs(output_dir)
max_w = max(item[2].width for item in batch)
total_h = sum(item[2].height for item in batch)
concat_img = Image.new("RGB", (max_w, total_h), "white")
draw = ImageDraw.Draw(concat_img)
final_json_map = []
bnote_entries = []
current_y = 0
last_sid = None
for sid, label, img, header_h, boxes in batch:
concat_img.paste(img, (0, current_y))
if sid != last_sid:
draw.rectangle([0, current_y, max_w, current_y + 4], fill="purple")
last_sid = sid
bnote_entries.append({
"id": sid,
"label": label,
"header_height": header_h,
"hmin": current_y,
"hmax": current_y + img.height
})
for item in boxes:
b = item.get('final_box') or item.get('rel_box')
item['global_box'] = [b[0], b[1] + current_y, b[2], b[3] + current_y]
item['student_id'] = sid # Required to map checkbox to the correct student
final_json_map.append(item)
current_y += img.height
with open(os.path.join(output_dir, "bnote.json"), "w") as f:
json.dump({"width": max_w, "height": total_h, "images": bnote_entries}, f, indent=2)
with open(os.path.join(output_dir, "checkboxes.json"), "w") as f:
json.dump(final_json_map, f, indent=2)
temp_img_path = os.path.join(output_dir, "Reference.jpg")
concat_img.save(temp_img_path, quality=90)
pdf_path = os.path.join(output_dir, "Concat.pdf")
w, h = concat_img.size
c = canvas.Canvas(pdf_path, pagesize=(w, h))
c.drawImage(temp_img_path, 0, 0, width=w, height=h)
c.save()
def main():
parser = argparse.ArgumentParser(description="Generate annotated PDFs grouped by labels.")
parser.add_argument("input_path", help="Directory containing Bnot structure")
parser.add_argument("--overwrite", action="store_true", help="Overwrite existing output files")
args = parser.parse_args()
root_dir = args.input_path
results = annotating.make_dictionary(root_dir)
labels_file = os.path.join(root_dir, "label_groups")
if not os.path.exists(labels_file):
print(f"Error: Labels file '{labels_file}' not found.")
sys.exit(1)
with open(labels_file, "r") as f:
lines = [line.strip() for line in f if line.strip()]
bgnot_dir = os.path.join(root_dir, "BGnot")
os.makedirs(bgnot_dir, exist_ok=True)
for line in lines:
labels = [l.replace(":", "").strip() for l in line.split(',') if l.strip()]
if not labels:
continue
prefix = os.path.commonprefix(labels).strip()
if not prefix:
prefix = "Group"
items_to_render = []
for sid, lbls in results.items():
for lbl in labels:
if lbl in lbls:
items_to_render.append((sid, lbl, lbls[lbl]))
# Sort structurally: by student id and label
items_to_render.sort(key=lambda x: (
annotating_with_checks.natural_key(x[0]),
annotating_with_checks.natural_key(x[1])
))
# Render images in parallel using the pre-existing lock & render function
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
rendered = list(executor.map(render_item, items_to_render))
rendered = [r for r in rendered if r is not None]
if not rendered:
continue
# Split into constrained height batches
batches = []
current_batch = []
current_h = 0
for r in rendered:
sid = r[0]
img_h = r[2].height
# Split if we exceed max height AND we are on a new student
if current_batch and current_h + img_h > MAX_HEIGHT_PX and sid != last_sid:
batches.append(current_batch)
current_batch = []
current_h = 0
current_batch.append(r)
current_h += img_h
last_sid = sid
if current_batch:
batches.append(current_batch)
batches2 = []
current_batch2 = []
current_h2 = 0
last_sid2 = None
for r in rendered:
sid = r[0]
img_h = r[2].height
# Split if we exceed max height AND we are on a new student
if current_batch2 and current_h2 + img_h > 1.1 *MAX_HEIGHT_PX \
and sid != last_sid2:
batches2.append(current_batch2)
current_batch2 = []
current_h2 = 0
current_batch2.append(r)
current_h2 += img_h
last_sid2 = sid
if current_batch2:
batches2.append(current_batch2)
if len(batches2) < len(batches):
batches = batches2
for i, batch in enumerate(batches, 1):
save_batch(batch, prefix, i, root_dir, args.overwrite)
if __name__ == "__main__":
main()