From f16c0273bdcdeb5689ba5ad4be44baf9c2744afe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Miquel?= Date: Sun, 29 Mar 2026 21:52:51 +0200 Subject: [PATCH] Refaire support --- annotating_with_checks.py | 28 ++++++++++++- correction.py | 63 +++++++++++++++++++++++++++++ reading_grouped_annotations.py | 72 +++++++++++++++++++++++++++++++--- 3 files changed, 156 insertions(+), 7 deletions(-) diff --git a/annotating_with_checks.py b/annotating_with_checks.py index 08e8000..0fbeb3c 100644 --- a/annotating_with_checks.py +++ b/annotating_with_checks.py @@ -208,6 +208,8 @@ if __name__ == "__main__": parser = argparse.ArgumentParser(description="Generate annotated PDFs.") parser.add_argument("input_path", help="Directory or specific file path") parser.add_argument("--overwrite", action="store_true", help="Overwrite existing output files") + parser.add_argument("--refaire", action="store_true", help="Process only copies/labels defined in refaire.json") # ADD THIS LINE + args = parser.parse_args() @@ -230,8 +232,30 @@ if __name__ == "__main__": results = annotating.make_dictionary(root_dir) - # Filter results if a specific target ID was requested - if target_id: + # --- ADD THE REFAIRE BLOCK HERE --- + if args.refaire: + refaire_path = os.path.join(root_dir, "refaire.json") + if os.path.exists(refaire_path): + with open(refaire_path, "r", encoding="utf-8") as f: + refaire_list = json.load(f) + + filtered_results = {} + for copie_name, labels_to_redo in refaire_list: + sid = copie_name.replace("Copie", "") # Extract "01" from "Copie01" + if sid in results: + if not labels_to_redo: + # Empty list: keep all labels for this Copie + filtered_results[sid] = results[sid] + else: + # Keep only the requested labels + filtered_results[sid] = { + lbl: data for lbl, data in results[sid].items() + if lbl in labels_to_redo + } + results = filtered_results + else: + print(f"Warning: --refaire flag used, but {refaire_path} not found.") + elif target_id: if target_id in results: results = {target_id: results[target_id]} else: diff --git a/correction.py b/correction.py index ec56a1c..9cddb14 100644 --- a/correction.py +++ b/correction.py @@ -16,6 +16,8 @@ parser = argparse.ArgumentParser() parser.add_argument("--overwrite", action="store_true", help="Force redo requests even if output exists") parser.add_argument("--limit", type=int, help="limit calls to gemini rpo integer") +parser.add_argument("--refaire", action="store_true", + help="Redo specific copies/labels defined in refaire.json") args, _ = parser.parse_known_args() @@ -675,6 +677,67 @@ def process_single_task(task_tuple): flush_thread_log() if __name__ == "__main__": + if args.refaire: + refaire_path = Path(INPUT_DIR) / "refaire.json" + overwritten_path = Path(INPUT_DIR) / "overwritten_correction.json" + + if refaire_path.exists(): + with open(refaire_path, "r", encoding="utf-8") as f: + refaire_list = json.load(f) + + overwritten_data = [] + if overwritten_path.exists(): + with open(overwritten_path, "r", encoding="utf-8") as f: + overwritten_data = json.load(f) + + dirty_results = False + + for copie_name, labels in refaire_list: + pid = copie_name.replace("Copie", "") + copie_dir = Path(INPUT_DIR) / copie_name + + # If list is empty, redo all labels available for this Copie + if not labels: + labels = [p.stem for p in copie_dir.glob("*.pdf")] + + for label in labels: + # 1. Extract and backup old corrections + if label in results: + for batch in results[label]: + to_remove = None + for item in batch: + if item.get("id") == pid: + to_remove = item + break + if to_remove: + batch.remove(to_remove) + overwritten_data.append({ + "pid": pid, + "label": label, + "data": to_remove, + "timestamp": time.time() + }) + dirty_results = True + # Clean up empty batches + results[label] = [b for b in results[label] if b] + + # 2. Make new group and add to tasks + pdf_path = copie_dir / f"{label}.pdf" + if pdf_path.exists(): + idx = get_next_group_idx(INPUT_DIR, label) + height = grouping.get_pdf_height(str(pdf_path)) + grouping.create_jpg(label, idx, [(pid, str(pdf_path), height)], INPUT_DIR) + new_group_path = str(Path(INPUT_DIR) / label / f"Group_{idx+1}.jpg") + tasks_to_process.append((new_group_path, label)) + + if dirty_results: + with open(output_path, "w", encoding="utf-8") as f: + json.dump(results, f, indent=2) + with open(overwritten_path, "w", encoding="utf-8") as f: + json.dump(overwritten_data, f, indent=2) + else: + print(f"Warning: --refaire flag used, but {refaire_path} not found.", file=sys.stderr) + print(f"Starting processing on {len(tasks_to_process)} tasks with {NB_THREADS} threads...") with concurrent.futures.ThreadPoolExecutor(max_workers=NB_THREADS) as executor: diff --git a/reading_grouped_annotations.py b/reading_grouped_annotations.py index f782636..09eea71 100644 --- a/reading_grouped_annotations.py +++ b/reading_grouped_annotations.py @@ -57,7 +57,7 @@ def apply_actions_and_regenerate_grouped(root_dir, data, student_id, actions, la fb["norectangle"] = True dirty_labels.add(label) logs.append(f" > Cleared all feedbacks in {label}") - + elif act['type'] == 'del_global': if act['index'] < len(global_fb): global_fb[act['index']]["to_delete"] = True @@ -171,13 +171,14 @@ def apply_actions_and_regenerate_grouped(root_dir, data, student_id, actions, la return "\n".join(logs) - from utils import read_all_labels +import argparse if __name__ == "__main__": - if len(sys.argv) < 2: - print("Usage: python reading_grouped_annotations.py ") - sys.exit(1) + parser = argparse.ArgumentParser(description="Read grouped annotations and compile PDFs") + parser.add_argument("input_path", help="Directory path") + parser.add_argument("--with-refaire", action="store_true", help="Merge refaire annotations from Bnot") + args = parser.parse_args() root_dir = sys.argv[1] bgnot_dir = os.path.join(root_dir, "BGnot") @@ -197,6 +198,18 @@ if __name__ == "__main__": actions_by_student = collections.defaultdict(list) notes_by_student = collections.defaultdict(dict) + # --- 0. Read refaire.json if requested --- + refaire_dict = {} + if args.with_refaire: + refaire_path = os.path.join(root_dir, "refaire.json") + if os.path.exists(refaire_path): + with open(refaire_path, "r", encoding="utf-8") as f: + refaire_list = json.load(f) + for c_name, labels in refaire_list: + sid = c_name.replace("Copie", "") + refaire_dict[sid] = labels + else: + print(f"Warning: --with-refaire flag used, but {refaire_path} not found.") # --- 1. Scan BGnot grouped directories and extract all checks & notes --- for entry in os.listdir(bgnot_dir): gdir = os.path.join(bgnot_dir, entry) @@ -236,6 +249,55 @@ if __name__ == "__main__": 'old_header_h': img_info.get("header_height", 0) } + # --- 1.5. Override BGnot actions with Bnot actions for Refaire targets --- + if args.with_refaire and refaire_dict: + bnot_dir = os.path.join(root_dir, "Bnot") + for sid, r_labels in refaire_dict.items(): + s_bnot_dir = os.path.join(bnot_dir, f"Copie{sid}") + if not os.path.exists(s_bnot_dir): + continue + + # If empty list, it targets all labels for this Copie + if not r_labels: + r_labels = list(original_data.get(sid, {}).keys()) + + # Clear out existing BGnot actions & notes for the refaire labels + actions_by_student[sid] = [ + a for a in actions_by_student[sid] if a.get('label') not in r_labels + ] + for lbl in r_labels: + notes_by_student[sid].pop(lbl, None) + + print(f"\nScanning refaire annotations in Bnot for Copie{sid}") + b_actions, b_notes_img = detect_checks_and_notes(s_bnot_dir) + + b_bnote_path = os.path.join(s_bnot_dir, "bnote.json") + if os.path.exists(b_bnote_path): + with open(b_bnote_path, "r") as f: + b_bnote_data = json.load(f) + + # Inject actions + for act in b_actions: + # Bnot checkboxes don't natively have student_id, so we inject it + act["student_id"] = sid + actions_by_student[sid].append(act) + + # Inject notes + if b_notes_img is not None: + for img_info in b_bnote_data.get("images", []): + lbl = img_info.get("label") + hmin = img_info.get("hmin", 0) + hmax = img_info.get("hmax", 0) + + if hmax > hmin: + crop = b_notes_img.crop((0, hmin, b_notes_img.width, hmax)) + if has_significant_notes(crop): + notes_by_student[sid][lbl] = { + 'img': crop, + 'old_header_h': img_info.get("header_height", 0) + } + + def process_student(sid): if sid not in original_data: return ""