Annotating by label
parent
5efae09664
commit
7a43be3ac1
|
|
@ -0,0 +1,170 @@
|
|||
import sys
|
||||
import os
|
||||
import json
|
||||
import shutil
|
||||
import argparse
|
||||
import concurrent.futures
|
||||
from PIL import Image, ImageDraw
|
||||
from reportlab.pdfgen import canvas
|
||||
|
||||
import annotating
|
||||
import annotating_with_checks
|
||||
|
||||
# Roughly 10 A4 pages at 100 DPI
|
||||
MAX_HEIGHT_PX = 11690
|
||||
|
||||
def render_item(item):
|
||||
student_id, label, content = item
|
||||
pdf_path = content['pdf_path']
|
||||
if not os.path.exists(pdf_path):
|
||||
return None
|
||||
|
||||
base_img, _, _ = annotating.make_base_image(pdf_path)
|
||||
cb_renderer = annotating_with_checks.CheckboxRenderer(label)
|
||||
|
||||
final_img, header_h = annotating.compose_label_image(
|
||||
base_img, label, content['result'], content['coordinates'][0],
|
||||
render_fn=annotating_with_checks.safe_render_latex,
|
||||
draw_callback=cb_renderer.callback
|
||||
)
|
||||
if final_img is None:
|
||||
return None
|
||||
|
||||
return (student_id, label, final_img, header_h, cb_renderer.checkboxes)
|
||||
|
||||
def save_batch(batch, prefix, group_id, root_dir, overwrite):
|
||||
output_dir = os.path.join(root_dir, "BGnot", f"{prefix} G{group_id}")
|
||||
|
||||
if os.path.exists(output_dir):
|
||||
if not overwrite:
|
||||
print(f"Skipping {output_dir}: Output already exists.")
|
||||
return
|
||||
shutil.rmtree(output_dir)
|
||||
|
||||
print(f"Generating Group PDF: {prefix} G{group_id} ({len(batch)} elements)")
|
||||
os.makedirs(output_dir)
|
||||
|
||||
max_w = max(item[2].width for item in batch)
|
||||
total_h = sum(item[2].height for item in batch)
|
||||
concat_img = Image.new("RGB", (max_w, total_h), "white")
|
||||
draw = ImageDraw.Draw(concat_img)
|
||||
|
||||
final_json_map = []
|
||||
bnote_entries = []
|
||||
current_y = 0
|
||||
last_sid = None
|
||||
|
||||
for sid, label, img, header_h, boxes in batch:
|
||||
concat_img.paste(img, (0, current_y))
|
||||
|
||||
if sid != last_sid:
|
||||
draw.rectangle([0, current_y, max_w, current_y + 4], fill="purple")
|
||||
last_sid = sid
|
||||
|
||||
bnote_entries.append({
|
||||
"id": sid,
|
||||
"label": label,
|
||||
"header_height": header_h,
|
||||
"hmin": current_y,
|
||||
"hmax": current_y + img.height
|
||||
})
|
||||
|
||||
for item in boxes:
|
||||
b = item.get('final_box') or item.get('rel_box')
|
||||
item['global_box'] = [b[0], b[1] + current_y, b[2], b[3] + current_y]
|
||||
item['student_id'] = sid # Required to map checkbox to the correct student
|
||||
final_json_map.append(item)
|
||||
|
||||
current_y += img.height
|
||||
|
||||
with open(os.path.join(output_dir, "bnote.json"), "w") as f:
|
||||
json.dump({"width": max_w, "height": total_h, "images": bnote_entries}, f, indent=2)
|
||||
|
||||
with open(os.path.join(output_dir, "checkboxes.json"), "w") as f:
|
||||
json.dump(final_json_map, f, indent=2)
|
||||
|
||||
temp_img_path = os.path.join(output_dir, "Reference.jpg")
|
||||
concat_img.save(temp_img_path, quality=90)
|
||||
|
||||
pdf_path = os.path.join(output_dir, "Concat.pdf")
|
||||
w, h = concat_img.size
|
||||
c = canvas.Canvas(pdf_path, pagesize=(w, h))
|
||||
c.drawImage(temp_img_path, 0, 0, width=w, height=h)
|
||||
c.save()
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Generate annotated PDFs grouped by labels.")
|
||||
parser.add_argument("input_path", help="Directory containing Bnot structure")
|
||||
parser.add_argument("--overwrite", action="store_true", help="Overwrite existing output files")
|
||||
args = parser.parse_args()
|
||||
|
||||
root_dir = args.input_path
|
||||
results = annotating.make_dictionary(root_dir)
|
||||
labels_file = os.path.join(root_dir, "label_groups")
|
||||
|
||||
if not os.path.exists(labels_file):
|
||||
print(f"Error: Labels file '{labels_file}' not found.")
|
||||
sys.exit(1)
|
||||
|
||||
with open(labels_file, "r") as f:
|
||||
lines = [line.strip() for line in f if line.strip()]
|
||||
|
||||
bgnot_dir = os.path.join(root_dir, "BGnot")
|
||||
os.makedirs(bgnot_dir, exist_ok=True)
|
||||
|
||||
for line in lines:
|
||||
labels = [l.strip() for l in line.split(',') if l.strip()]
|
||||
if not labels:
|
||||
continue
|
||||
|
||||
prefix = os.path.commonprefix(labels).strip()
|
||||
if not prefix:
|
||||
prefix = "Group"
|
||||
|
||||
items_to_render = []
|
||||
for sid, lbls in results.items():
|
||||
for lbl in labels:
|
||||
if lbl in lbls:
|
||||
items_to_render.append((sid, lbl, lbls[lbl]))
|
||||
|
||||
# Sort structurally: by student id and label
|
||||
items_to_render.sort(key=lambda x: (
|
||||
annotating_with_checks.natural_key(x[0]),
|
||||
annotating_with_checks.natural_key(x[1])
|
||||
))
|
||||
|
||||
# Render images in parallel using the pre-existing lock & render function
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
|
||||
rendered = list(executor.map(render_item, items_to_render))
|
||||
|
||||
rendered = [r for r in rendered if r is not None]
|
||||
if not rendered:
|
||||
continue
|
||||
|
||||
# Split into constrained height batches
|
||||
batches = []
|
||||
current_batch = []
|
||||
current_h = 0
|
||||
|
||||
for r in rendered:
|
||||
sid = r[0]
|
||||
img_h = r[2].height
|
||||
|
||||
# Split if we exceed max height AND we are on a new student
|
||||
if current_batch and current_h + img_h > MAX_HEIGHT_PX and sid != last_sid:
|
||||
batches.append(current_batch)
|
||||
current_batch = []
|
||||
current_h = 0
|
||||
|
||||
current_batch.append(r)
|
||||
current_h += img_h
|
||||
last_sid = sid
|
||||
|
||||
if current_batch:
|
||||
batches.append(current_batch)
|
||||
|
||||
for i, batch in enumerate(batches, 1):
|
||||
save_batch(batch, prefix, i, root_dir, args.overwrite)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Reference in New Issue