Refaire support

master
Sébastien Miquel 2026-03-29 21:52:51 +02:00
parent 8a1eea6b3b
commit f16c0273bd
3 changed files with 156 additions and 7 deletions

View File

@ -208,6 +208,8 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Generate annotated PDFs.")
parser.add_argument("input_path", help="Directory or specific file path")
parser.add_argument("--overwrite", action="store_true", help="Overwrite existing output files")
parser.add_argument("--refaire", action="store_true", help="Process only copies/labels defined in refaire.json") # ADD THIS LINE
args = parser.parse_args()
@ -230,8 +232,30 @@ if __name__ == "__main__":
results = annotating.make_dictionary(root_dir)
# Filter results if a specific target ID was requested
if target_id:
# --- ADD THE REFAIRE BLOCK HERE ---
if args.refaire:
refaire_path = os.path.join(root_dir, "refaire.json")
if os.path.exists(refaire_path):
with open(refaire_path, "r", encoding="utf-8") as f:
refaire_list = json.load(f)
filtered_results = {}
for copie_name, labels_to_redo in refaire_list:
sid = copie_name.replace("Copie", "") # Extract "01" from "Copie01"
if sid in results:
if not labels_to_redo:
# Empty list: keep all labels for this Copie
filtered_results[sid] = results[sid]
else:
# Keep only the requested labels
filtered_results[sid] = {
lbl: data for lbl, data in results[sid].items()
if lbl in labels_to_redo
}
results = filtered_results
else:
print(f"Warning: --refaire flag used, but {refaire_path} not found.")
elif target_id:
if target_id in results:
results = {target_id: results[target_id]}
else:

View File

@ -16,6 +16,8 @@ parser = argparse.ArgumentParser()
parser.add_argument("--overwrite", action="store_true",
help="Force redo requests even if output exists")
parser.add_argument("--limit", type=int, help="limit calls to gemini rpo integer")
parser.add_argument("--refaire", action="store_true",
help="Redo specific copies/labels defined in refaire.json")
args, _ = parser.parse_known_args()
@ -675,6 +677,67 @@ def process_single_task(task_tuple):
flush_thread_log()
if __name__ == "__main__":
if args.refaire:
refaire_path = Path(INPUT_DIR) / "refaire.json"
overwritten_path = Path(INPUT_DIR) / "overwritten_correction.json"
if refaire_path.exists():
with open(refaire_path, "r", encoding="utf-8") as f:
refaire_list = json.load(f)
overwritten_data = []
if overwritten_path.exists():
with open(overwritten_path, "r", encoding="utf-8") as f:
overwritten_data = json.load(f)
dirty_results = False
for copie_name, labels in refaire_list:
pid = copie_name.replace("Copie", "")
copie_dir = Path(INPUT_DIR) / copie_name
# If list is empty, redo all labels available for this Copie
if not labels:
labels = [p.stem for p in copie_dir.glob("*.pdf")]
for label in labels:
# 1. Extract and backup old corrections
if label in results:
for batch in results[label]:
to_remove = None
for item in batch:
if item.get("id") == pid:
to_remove = item
break
if to_remove:
batch.remove(to_remove)
overwritten_data.append({
"pid": pid,
"label": label,
"data": to_remove,
"timestamp": time.time()
})
dirty_results = True
# Clean up empty batches
results[label] = [b for b in results[label] if b]
# 2. Make new group and add to tasks
pdf_path = copie_dir / f"{label}.pdf"
if pdf_path.exists():
idx = get_next_group_idx(INPUT_DIR, label)
height = grouping.get_pdf_height(str(pdf_path))
grouping.create_jpg(label, idx, [(pid, str(pdf_path), height)], INPUT_DIR)
new_group_path = str(Path(INPUT_DIR) / label / f"Group_{idx+1}.jpg")
tasks_to_process.append((new_group_path, label))
if dirty_results:
with open(output_path, "w", encoding="utf-8") as f:
json.dump(results, f, indent=2)
with open(overwritten_path, "w", encoding="utf-8") as f:
json.dump(overwritten_data, f, indent=2)
else:
print(f"Warning: --refaire flag used, but {refaire_path} not found.", file=sys.stderr)
print(f"Starting processing on {len(tasks_to_process)} tasks with {NB_THREADS} threads...")
with concurrent.futures.ThreadPoolExecutor(max_workers=NB_THREADS) as executor:

View File

@ -171,13 +171,14 @@ def apply_actions_and_regenerate_grouped(root_dir, data, student_id, actions, la
return "\n".join(logs)
from utils import read_all_labels
import argparse
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python reading_grouped_annotations.py <Dir>")
sys.exit(1)
parser = argparse.ArgumentParser(description="Read grouped annotations and compile PDFs")
parser.add_argument("input_path", help="Directory path")
parser.add_argument("--with-refaire", action="store_true", help="Merge refaire annotations from Bnot")
args = parser.parse_args()
root_dir = sys.argv[1]
bgnot_dir = os.path.join(root_dir, "BGnot")
@ -197,6 +198,18 @@ if __name__ == "__main__":
actions_by_student = collections.defaultdict(list)
notes_by_student = collections.defaultdict(dict)
# --- 0. Read refaire.json if requested ---
refaire_dict = {}
if args.with_refaire:
refaire_path = os.path.join(root_dir, "refaire.json")
if os.path.exists(refaire_path):
with open(refaire_path, "r", encoding="utf-8") as f:
refaire_list = json.load(f)
for c_name, labels in refaire_list:
sid = c_name.replace("Copie", "")
refaire_dict[sid] = labels
else:
print(f"Warning: --with-refaire flag used, but {refaire_path} not found.")
# --- 1. Scan BGnot grouped directories and extract all checks & notes ---
for entry in os.listdir(bgnot_dir):
gdir = os.path.join(bgnot_dir, entry)
@ -236,6 +249,55 @@ if __name__ == "__main__":
'old_header_h': img_info.get("header_height", 0)
}
# --- 1.5. Override BGnot actions with Bnot actions for Refaire targets ---
if args.with_refaire and refaire_dict:
bnot_dir = os.path.join(root_dir, "Bnot")
for sid, r_labels in refaire_dict.items():
s_bnot_dir = os.path.join(bnot_dir, f"Copie{sid}")
if not os.path.exists(s_bnot_dir):
continue
# If empty list, it targets all labels for this Copie
if not r_labels:
r_labels = list(original_data.get(sid, {}).keys())
# Clear out existing BGnot actions & notes for the refaire labels
actions_by_student[sid] = [
a for a in actions_by_student[sid] if a.get('label') not in r_labels
]
for lbl in r_labels:
notes_by_student[sid].pop(lbl, None)
print(f"\nScanning refaire annotations in Bnot for Copie{sid}")
b_actions, b_notes_img = detect_checks_and_notes(s_bnot_dir)
b_bnote_path = os.path.join(s_bnot_dir, "bnote.json")
if os.path.exists(b_bnote_path):
with open(b_bnote_path, "r") as f:
b_bnote_data = json.load(f)
# Inject actions
for act in b_actions:
# Bnot checkboxes don't natively have student_id, so we inject it
act["student_id"] = sid
actions_by_student[sid].append(act)
# Inject notes
if b_notes_img is not None:
for img_info in b_bnote_data.get("images", []):
lbl = img_info.get("label")
hmin = img_info.get("hmin", 0)
hmax = img_info.get("hmax", 0)
if hmax > hmin:
crop = b_notes_img.crop((0, hmin, b_notes_img.width, hmax))
if has_significant_notes(crop):
notes_by_student[sid][lbl] = {
'img': crop,
'old_header_h': img_info.get("header_height", 0)
}
def process_student(sid):
if sid not in original_data:
return ""