import sys import os import json import collections import concurrent.futures from pathlib import Path from PIL import Image, ImageDraw import threading import annotating from utils import natural_key from reading_annotations import detect_checks_and_notes, has_significant_notes def get_extra_pdfs_as_images(root_dir, label, annotating_module): """Fetches Text and Sol pdfs for a given label and converts them to images.""" extra_images = [] for folder in ["Text", "Sol"]: pdf_path = os.path.join(root_dir, folder, f"{label}.pdf") if os.path.exists(pdf_path): img, _, _ = annotating_module.make_base_image(pdf_path) if img: extra_images.append(img) return extra_images def save_paginated_pdf(image_groups, output_path): """Concatenates groups of images vertically, adding inner borders and margins.""" if not image_groups: return max_w = max(img.width for group in image_groups for img in group) max_page_h = int(max_w * 1.414 * 1.25) # Calculate sizes in pixels at 100 DPI border_px = int((0.2 / 2.54) * 100) left_margin = int((0.3 / 2.54) * 100) tb_margin = int((0.2 / 2.54) * 100) # Available height for images once top/bottom margins are added max_content_h = max_page_h - (2 * tb_margin) pages = [] current_page_imgs = [] current_h = 0 for group in image_groups: if not group: continue # Process the group to add borders processed_group = [] for i, img in enumerate(group): if i in (0, 1): img = img.copy() draw = ImageDraw.Draw(img) color = "black" if i == 0 else "blue" draw.rectangle( [0, 0, img.width - 1, img.height - 1], outline=color, width=border_px ) processed_group.append(img) group_h = sum(img.height for img in processed_group) if current_page_imgs and (current_h + group_h > max_content_h): # Create page with margins included in dimensions page = Image.new("RGB", (max_w + left_margin, current_h + 2 * tb_margin), "white") y = tb_margin for c_img in current_page_imgs: page.paste(c_img, (left_margin, y)) y += c_img.height pages.append(page) current_page_imgs = processed_group current_h = group_h else: current_page_imgs.extend(processed_group) current_h += group_h if current_page_imgs: page = Image.new("RGB", (max_w + left_margin, current_h + 2 * tb_margin), "white") y = tb_margin for c_img in current_page_imgs: page.paste(c_img, (left_margin, y)) y += c_img.height pages.append(page) if pages: pages[0].save(output_path, "PDF", resolution=100.0, save_all=True, append_images=pages[1:]) def apply_actions_and_regenerate_grouped(root_dir, data, student_id, actions, label_notes, all_labels, update_score=False): """ Modifies data based on actions, pastes label-specific note crops, regenerates label images for consistency, saves dirty ones, and generates Concat.jpg in the BGnot/Copie{id} directory. Returns a string of accumulated log messages. """ logs = [f"\nProcessing compilation for: Copie{student_id}"] output_dir = os.path.join(root_dir, "BGnot", f"Copie{student_id}") os.makedirs(output_dir, exist_ok=True) score_path = os.path.join(output_dir, "score.json") labels_data = data.get(student_id, {}) # --- 1. Apply Actions to Data (Update scores / Flags for deletion) --- actions_by_label = collections.defaultdict(list) for a in actions: actions_by_label[a['label']].append(a) dirty_labels = set() for label, acts in actions_by_label.items(): if label not in labels_data: continue content = labels_data[label] result = content['result'] feedbacks = result.get('feedback', []) # Helpers to find objects by index global_fb = [f for f in feedbacks if not f.get('box_2d')] local_fb = [f for f in feedbacks if f.get('box_2d')] local_fb.sort(key=lambda x: x['box_2d'][0]) for act in acts: if act['type'] == 'score': result['score'] = act['value'] dirty_labels.add(label) logs.append(f" > Updated score for {label} to {act['value']}") elif act['type'] == 'clear_all': for fb in feedbacks: fb["to_delete"] = True if fb.get("box_2d"): fb["norectangle"] = True dirty_labels.add(label) logs.append(f" > Cleared all feedbacks in {label}") elif act['type'] == 'del_global': if act['index'] < len(global_fb): global_fb[act['index']]["to_delete"] = True dirty_labels.add(label) logs.append(f" > Deleted global feedback in {label}") elif act['type'] in ('del_local', 'del_local_rect'): if act['index'] < len(local_fb): target = local_fb[act['index']] if act['type'] == 'del_local': target["to_delete"] = True logs.append(f" > Deleted local feedback in {label}") else: target["norectangle"] = True logs.append(f" > Deleted rect in {label}") dirty_labels.add(label) # --- 1.5 Override with existing score.json if requested --- if update_score and os.path.exists(score_path): try: with open(score_path, "r") as f: existing_scores = json.load(f) for label, existing_score in existing_scores.items(): if label in labels_data: current_score = str(labels_data[label]['result'].get('score', 0)) # If manually modified, override the result and mark dirty if current_score != str(existing_score): labels_data[label]['result']['score'] = existing_score dirty_labels.add(label) logs.append(f" > Overrode score for {label} to {existing_score} from existing score.json") except json.JSONDecodeError: logs.append(f" > Warning: Could not read existing {score_path}") # --- 2. Process Images (Regenerate & Concatenate) --- concat_list = [] concat_list_F = [] d_notes = dict.fromkeys(all_labels, "") # Iterate over all labels naturally to assemble a complete student profile sorted_labels = sorted(labels_data.items(), key=lambda x: natural_key(x[0])) for label, content in sorted_labels: result = content['result'] d_notes[label] = str(result.get('score', 0)) # pdf_path = Path(root_dir) / "Copies" / f"Copie{student_id}" / f"{label}.pdf" pdf_path = content.get('pdf_path') if not os.path.exists(pdf_path): continue (base_img, _, _) = annotating.make_base_image(pdf_path) # Compose uses the result object we modified in step 1 final_img, new_header_h = annotating.compose_label_image( base_img, label, content['result'], content['coordinates'][0], with_error=False ) if final_img is None: continue # Overlay manual notes specific to this label has_notes = False if label in label_notes: note_info = label_notes[label] sub_note = note_info['img'] old_header_h = int(note_info['old_header_h']) if has_significant_notes(sub_note): has_notes = True w, h = sub_note.size # 1. Paste header ink at the top if old_header_h > 0: header_crop = sub_note.crop((0, 0, w, min(h, old_header_h))) final_img.paste(header_crop, (0, 0), mask=header_crop) # 2. Paste student-content ink at the new header height if h > old_header_h: body_crop = sub_note.crop((0, old_header_h, w, h)) final_img.paste(body_crop, (0, new_header_h), mask=body_crop) # Save individual file if Modified (Dirty logic or visual notes) if (label in dirty_labels) or has_notes: save_path = os.path.join(output_dir, f"{label}.jpg") final_img.save(save_path) logs.append(f" Saved dirty image: {label}.jpg") concat_list.append(final_img) perfect_no_comment = True if float(d_notes[label]) < 4.0: perfect_no_comment = False else: lfb = result.get('feedback', []) for e in lfb: if "to_delete" not in e or not e["to_delete"]: perfect_no_comment = False if not perfect_no_comment or has_notes: extras = get_extra_pdfs_as_images(root_dir, label, annotating) extras.append(final_img) concat_list_F.append(extras) # --- 3. Save Final Outputs --- with open(score_path, "w") as f: json.dump(d_notes, f, indent=4) logs.append(f" Saved {score_path}") if concat_list: max_w = max(i.width for i in concat_list) total_h = sum(i.height for i in concat_list) full_img = Image.new("RGB", (max_w, total_h), "white") y = 0 for img in concat_list: full_img.paste(img, (0, y)) y += img.height full_img.save(os.path.join(output_dir, "Concat.jpg")) logs.append(f" Saved regenerated Concat.jpg") if concat_list_F: pdf_out_path = os.path.join(output_dir, "Concat_F.pdf") save_paginated_pdf(concat_list_F, pdf_out_path) logs.append(f" Saved regenerated Concat_F.pdf") return "\n".join(logs) from utils import read_all_labels import argparse if __name__ == "__main__": parser = argparse.ArgumentParser(description="Read grouped annotations and compile PDFs") parser.add_argument("input_path", help="Directory path") parser.add_argument("--refaire", action="store_true", help="Merge refaire annotations from Bnot") parser.add_argument("--update-score", action="store_true", help="Override scores with values from existing score.json") args = parser.parse_args() root_dir = sys.argv[1] bgnot_dir = os.path.join(root_dir, "BGnot") if not os.path.exists(bgnot_dir): print(f"Directory {bgnot_dir} does not exist. Run annotating_by_label.py first.") sys.exit(1) try: all_labels = read_all_labels(Path(root_dir)) except FileNotFoundError: all_labels = [] refaire_dict = {} if args.refaire: refaire_path = os.path.join(root_dir, "refaire.json") if os.path.exists(refaire_path): with open(refaire_path, "r", encoding="utf-8") as f: refaire_list = json.load(f) for c_name, labels in refaire_list: sid = c_name.replace("Copie", "") refaire_dict[sid] = labels else: print(f"Warning: --refaire flag used, but {refaire_path} not found.") # Load original data if args.refaire and refaire_list: original_data = annotating.make_dictionary(root_dir, refaire=True, refaire_list=refaire_list) else: original_data = annotating.make_dictionary(root_dir) lock = threading.Lock() actions_by_student = collections.defaultdict(list) notes_by_student = collections.defaultdict(dict) def process_bgnot_entry(entry, only_ids=None): gdir = os.path.join(bgnot_dir, entry) if not os.path.isdir(gdir) or entry.startswith("Copie"): return bnote_path = os.path.join(gdir, "bnote.json") with open(bnote_path, "r") as f: bnote_data = json.load(f) if only_ids: id_found = False for d in bnote_data["images"]: if d["id"] in only_ids: id_found = True if not id_found: return actions, notes_img = detect_checks_and_notes(gdir) if not os.path.exists(bnote_path) or notes_img is None: return with lock: for act in actions: sid = str(act.get("student_id")) if sid: actions_by_student[sid].append(act) for img_info in bnote_data.get("images", []): sid, lbl = str(img_info.get("id")), img_info.get("label") hmin, hmax = img_info.get("hmin", 0), img_info.get("hmax", 0) if hmax > hmin: crop = notes_img.crop((0, hmin, notes_img.width, hmax)) if has_significant_notes(crop): notes_by_student[sid][lbl] = {'img': crop, 'old_header_h': img_info.get("header_height", 0)} def process_refaire_entry(sid, r_labels): s_bnot_dir = os.path.join(root_dir, "BRnot", f"Copie{sid}") if not os.path.exists(s_bnot_dir): return if not r_labels: r_labels = list(original_data.get(sid, {}).keys()) with lock: actions_by_student[sid] = [a for a in actions_by_student[sid] if a.get('label') not in r_labels] for lbl in r_labels: notes_by_student[sid].pop(lbl, None) b_actions, b_notes_img = detect_checks_and_notes(s_bnot_dir) b_bnote_path = os.path.join(s_bnot_dir, "bnote.json") if os.path.exists(b_bnote_path): with open(b_bnote_path, "r") as f: b_bnote_data = json.load(f) with lock: for act in b_actions: act["student_id"] = sid actions_by_student[sid].append(act) if b_notes_img: for img_info in b_bnote_data.get("images", []): lbl = img_info.get("label") hmin, hmax = img_info.get("hmin", 0), img_info.get("hmax", 0) if hmax > hmin: crop = b_notes_img.crop((0, hmin, b_notes_img.width, hmax)) if has_significant_notes(crop): notes_by_student[sid][lbl] = \ {'img': crop, 'old_header_h': img_info.get("header_height", 0)} # --- 0. Read refaire.json if requested --- if refaire_dict: only_ids = [ids for ids in refaire_dict] else: only_ids = None # Lecture des bgnot with concurrent.futures.ThreadPoolExecutor(max_workers=6) as executor: executor.map(lambda x: process_bgnot_entry(x, only_ids=only_ids), os.listdir(bgnot_dir)) # Refaire if args.refaire and refaire_dict: for sid, labels in refaire_dict.items(): process_refaire_entry(sid, labels) def process_student(sid): if sid not in original_data: return "" return apply_actions_and_regenerate_grouped( root_dir, original_data, sid, actions_by_student[sid], notes_by_student[sid], all_labels, update_score=args.update_score ) # --- 2. Process each student concurrently using 4 threads --- sids = sorted(original_data.keys(), key=natural_key) with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: if refaire_dict: futures = {executor.submit(process_student, sid): sid for sid in refaire_dict} else: futures = {executor.submit(process_student, sid): sid for sid in sids} for future in concurrent.futures.as_completed(futures): output = future.result() if output: print(output)