Compare commits

..

3 Commits

10 changed files with 177 additions and 83 deletions

View File

@ -1,7 +1,7 @@
#+title: Script #+title: Script
#+author: Sébastien Miquel #+author: Sébastien Miquel
#+date: 14-03-2026 #+date: 14-03-2026
# Time-stamp: <17-05-26 10:51> # Time-stamp: <06-06-26 10:10>
#+OPTIONS: #+OPTIONS:
* Méta * Méta
@ -235,7 +235,7 @@ sous le nom =Concat_annotated.pdf=.
OU OU
2. =python reading_grouped_annotations.py Interro= 2. =python reading_grouped_annotations.py Interro=
Idem, mais pour =BGnot=. Idem, mais pour =BGnot=.
3. =python giving_names.py Interro BGnot= 3. =python giving_names.py Interro BGnot=
@ -245,7 +245,10 @@ OU
Si un nom est =Unknown= : renommer à la main le dossier et le fichier dedans. Si un nom est =Unknown= : renommer à la main le dossier et le fichier dedans.
On peut faire des changements manuels aux =score.json= ici. 4. On peut faire des changements manuels aux =score.json= ici, puis
- `python reading_annotations.py --update-score Interro`
- `python reading_grouped_annotations.py --update-score Interro`
pour mettre à jour les scores dans les images.
4. (gestion perso) 4. (gestion perso)
+ =gestion_classe ne= pour créer l'interro puis + =gestion_classe ne= pour créer l'interro puis
+ =gestion_classe we= (set barème here) + =gestion_classe we= (set barème here)

View File

@ -34,6 +34,9 @@ def make_dictionary(root_dir, refaire=False, refaire_list=[]):
student_id = item['id'] student_id = item['id']
result_obj = item['result'] result_obj = item['result']
if result_obj.get("suffix") == "_old":
continue
# Find coordinates # Find coordinates
coordinates = None coordinates = None
height,width= None, None height,width= None, None
@ -58,9 +61,11 @@ def make_dictionary(root_dir, refaire=False, refaire_list=[]):
if coordinates: if coordinates:
break break
# Construct PDF path: Dir/Copie{id}/{label}.pdf suffix = result_obj.get("suffix", "")
pdf_path = Path(root_dir) / "Copies" / f"Copie{student_id}" / f"{label}.pdf" if suffix == "_new":
pdf_path = Path(root_dir) / "Copies" / f"Copie{student_id}" / f"{label}_new.pdf"
else:
pdf_path = Path(root_dir) / "Copies" / f"Copie{student_id}" / f"{label}.pdf"
# Initialize dictionary structure for this ID if missing # Initialize dictionary structure for this ID if missing
if student_id not in result_data: if student_id not in result_data:
result_data[student_id] = {} result_data[student_id] = {}
@ -91,13 +96,16 @@ def make_dictionary(root_dir, refaire=False, refaire_list=[]):
for lbl in labels_to_redo: for lbl in labels_to_redo:
pdf_path = Path(root_dir) / "Copies" / f"Copie{sid}" / f"{lbl}.pdf" pdf_path = Path(root_dir) / "Copies" / f"Copie{sid}" / f"{lbl}.pdf"
if not Path(pdf_path).exists(): if not Path(pdf_path).exists():
print("Debug : asked to refaire", sid, lbl, "but pdf absent") pdf_path_new = Path(root_dir) / "Copies" / f"Copie{sid}" / f"{lbl}_new.pdf"
continue if pdf_path_new.exists():
pdf_path = pdf_path_new
else:
print("Debug : asked to refaire", sid, lbl, "but pdf absent")
continue
result_data[sid][lbl] = { result_data[sid][lbl] = {
"pdf_path": pdf_path, "pdf_path": pdf_path,
"result": { "result": {
"score": 0.0, "score": 0.0,
"confidence": 1.0,
"feedback": [], "feedback": [],
"error": "non traité" "error": "non traité"
}, },
@ -108,13 +116,16 @@ def make_dictionary(root_dir, refaire=False, refaire_list=[]):
for lbl in labels_to_redo: for lbl in labels_to_redo:
pdf_path = Path(root_dir) / "Copies" / f"Copie{sid}" / f"{lbl}.pdf" pdf_path = Path(root_dir) / "Copies" / f"Copie{sid}" / f"{lbl}.pdf"
if not pdf_path.exists(): if not pdf_path.exists():
print("Debug : asked to refaire", sid, lbl, "but pdf absent") pdf_path_new = Path(root_dir) / "Copies" / f"Copie{sid}" / f"{lbl}_new.pdf"
continue if pdf_path_new.exists():
pdf_path = pdf_path_new
else:
print("Debug : asked to refaire", sid, lbl, "but pdf absent")
continue
result_data[sid][lbl] = { result_data[sid][lbl] = {
"pdf_path": pdf_path, "pdf_path": pdf_path,
"result": { "result": {
"score": 0.0, "score": 0.0,
"confidence": 1.0,
"feedback": [], "feedback": [],
"error": "non traité" "error": "non traité"
}, },
@ -572,9 +583,9 @@ def process_student(student_id, labels_data, root_dir, all_labels, overwrite):
for label, content in sorted_labels: for label, content in sorted_labels:
# 1. Find PDF path # 1. Find PDF path
copie_folder = f"Copie{student_id}" copie_folder = f"Copie{student_id}"
pdf_full_path = Path(root_dir) / "Copies" / copie_folder / f"{label}.pdf" pdf_full_path = content.get('pdf_path')
if not os.path.exists(pdf_full_path): if not pdf_full_path or not os.path.exists(pdf_full_path):
print(f"File not found: {pdf_full_path}") print(f"File not found: {pdf_full_path}")
continue continue
@ -616,18 +627,6 @@ def process_student(student_id, labels_data, root_dir, all_labels, overwrite):
def process_correction(root_dir, data, all_labels, overwrite=False): def process_correction(root_dir, data, all_labels, overwrite=False):
# with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
# # Create a list of futures
# futures = []
# for student_id, labels in sorted(data.items()):
# futures.append(
# executor.submit(process_student, student_id, labels, root_dir, all_labels, overwrite)
# )
# # Wait for all threads to complete
# concurrent.futures.wait(futures)
# Ne pas thread cette application # Ne pas thread cette application
# 1. Il faut protéger les appels à matplotlib # 1. Il faut protéger les appels à matplotlib
# 2. tu vas perdre les erreurs # 2. tu vas perdre les erreurs

View File

@ -175,6 +175,17 @@ def call_gemini_with_retries(model_id, contents, config,
except Exception as e: except Exception as e:
error_msg = str(e).lower() error_msg = str(e).lower()
is_quota_error = "429" in error_msg or "quota" in error_msg or "exhausted" in error_msg is_quota_error = "429" in error_msg or "quota" in error_msg or "exhausted" in error_msg
is_minute_limit = "minute" in error_msg or "rpm" in error_msg or "tpm" in error_msg
if is_minute_limit:
import re
# Extract wait time if present, else use default delay
retry_match = re.search(r"retry in ([\d.]+)s", error_msg)
wait_time = float(retry_match.group(1)) + 1.0 if retry_match else delays[attempt]
tprint(f"\tGemini Pro minute limit hit. Waiting {wait_time:.1f}s...")
time.sleep(wait_time)
continue # Retry same model
# Immediately fallback to Flash without waiting if it's a Pro quota error # Immediately fallback to Flash without waiting if it's a Pro quota error
if is_quota_error and model_id == MODEL_ID_pro and fallback_model_id: if is_quota_error and model_id == MODEL_ID_pro and fallback_model_id:
@ -190,10 +201,10 @@ def call_gemini_with_retries(model_id, contents, config,
tprint(f"\tGemini API failure: {e}. Maximum retries reached.") tprint(f"\tGemini API failure: {e}. Maximum retries reached.")
raise raise
def correct_boxes_with_gemini(pid, label, original_feedbacks, def correct_boxes_with_gemini(pid, label, pdf_path, original_feedbacks,
yming, ymaxg, width_r, total_height): yming, ymaxg, width_r, total_height):
"""Requests corrected bounding boxes from Gemini Flash on the single image.""" """Requests corrected bounding boxes from Gemini Flash on the single image."""
pdf_path = COPIES_DIR / f"Copie{pid}" / f"{label}.pdf" # pdf_path = COPIES_DIR / f"Copie{pid}" / f"{label}.pdf"
contents, config = prompting.request_for_box_correction(pdf_path, original_feedbacks) contents, config = prompting.request_for_box_correction(pdf_path, original_feedbacks)
response_text = call_gemini_with_retries(MODEL_ID_flash, contents, config) response_text = call_gemini_with_retries(MODEL_ID_flash, contents, config)
@ -253,20 +264,26 @@ def handle_label_errors(pid, label, res, pdf_path):
if new_label == label: if new_label == label:
res["error"] = "" res["error"] = ""
return [] return []
new_pdf_path = COPIES_DIR / f"Copie{pid}" / f"{new_label}.pdf"
if new_pdf_path.exists(): base_new_pdf_path = COPIES_DIR / f"Copie{pid}" / f"{new_label}.pdf"
new_pdf_path = COPIES_DIR / f"Copie{pid}" / f"{new_label}_new.pdf"
if base_new_pdf_path.exists() or new_pdf_path.exists():
tprint(f"\t\tCopie{pid} tried to move wrong {label} to {new_label}, but it already exists.") tprint(f"\t\tCopie{pid} tried to move wrong {label} to {new_label}, but it already exists.")
res["error"] = f"wrg-lbl:{new_label}?exists" res["error"] = f"wrg-lbl:{new_label}?exists"
else: else:
res["error"] = f"wrg-lbl-moved-to:{new_label}" res["error"] = f"wrg-lbl-moved-to:{new_label}"
tprint(f"\t\tCopie{pid} : moving wrong {label} to {new_label}.") tprint(f"\t\tCopie{pid} : moving wrong {label} to {new_label}.")
shutil.move(str(pdf_path), str(new_pdf_path))
# Since we moved the file, this Copie/label should not be taken # Copie vers _new, puis renommage de l'original vers _old
# into account in the future, I think shutil.copy(str(pdf_path), str(new_pdf_path))
old_pdf_path = pdf_path.with_name(f"{label}_old.pdf")
if pdf_path != old_pdf_path:
shutil.move(str(pdf_path), str(old_pdf_path))
idx = get_next_group_idx(new_label) idx = get_next_group_idx(new_label)
height = grouping.get_pdf_height(str(new_pdf_path)) height = grouping.get_pdf_height(str(new_pdf_path))
grouping.create_jpg(new_label, idx, [(pid, str(new_pdf_path), height)], grouping.create_jpg(new_label, idx, [(pid, str(new_pdf_path), height)], GROUPS_DIR)
GROUPS_DIR)
tprint(f"\t\tMaking {new_label} group {idx+1}") tprint(f"\t\tMaking {new_label} group {idx+1}")
new_tasks.append((str(GROUPS_DIR / new_label / f"Group_{idx+1}.jpg"), new_tasks.append((str(GROUPS_DIR / new_label / f"Group_{idx+1}.jpg"),
new_label, False)) new_label, False))
@ -289,14 +306,17 @@ def handle_label_errors(pid, label, res, pdf_path):
error += f"{add_label}??" error += f"{add_label}??"
keep_error = True keep_error = True
continue continue
new_pdf_path = COPIES_DIR / f"Copie{pid}" / f"{add_label}.pdf"
if not new_pdf_path.exists(): base_add_pdf_path = COPIES_DIR / f"Copie{pid}" / f"{add_label}.pdf"
shutil.copy(str(pdf_path), str(new_pdf_path)) add_pdf_path = COPIES_DIR / f"Copie{pid}" / f"{add_label}_new.pdf"
if not base_add_pdf_path.exists() and not add_pdf_path.exists():
shutil.copy(str(pdf_path), str(add_pdf_path))
tprint(f"\t\tCopying Copie{pid} : {label} -> {add_label}") tprint(f"\t\tCopying Copie{pid} : {label} -> {add_label}")
idx = get_next_group_idx(add_label) idx = get_next_group_idx(add_label)
tprint(f"\t\tMaking {add_label} group {idx+1}") tprint(f"\t\tMaking {add_label} group {idx+1}")
height = grouping.get_pdf_height(str(new_pdf_path)) height = grouping.get_pdf_height(str(add_pdf_path))
grouping.create_jpg(add_label, idx, [(pid, str(new_pdf_path), height)], GROUPS_DIR) grouping.create_jpg(add_label, idx, [(pid, str(add_pdf_path), height)], GROUPS_DIR)
new_tasks.append((str(GROUPS_DIR / add_label / f"Group_{idx+1}.jpg"), new_tasks.append((str(GROUPS_DIR / add_label / f"Group_{idx+1}.jpg"),
add_label, False)) add_label, False))
error += f"(->){add_label}" error += f"(->){add_label}"
@ -305,7 +325,6 @@ def handle_label_errors(pid, label, res, pdf_path):
keep_error = True keep_error = True
error += f"(xx){add_label}" error += f"(xx){add_label}"
tprint(f"\t\tAlready present (not copied) Copie{pid} : {label} -> {add_label}") tprint(f"\t\tAlready present (not copied) Copie{pid} : {label} -> {add_label}")
if not keep_error: if not keep_error:
res["error"] = "" res["error"] = ""
else: else:
@ -367,6 +386,26 @@ def process_single_task(task_tuple, precomputed_response=None):
yming, ymaxg, width_r = d_data[pid] yming, ymaxg, width_r = d_data[pid]
pdf_path = COPIES_DIR / f"Copie{pid}" / f"{label}.pdf" pdf_path = COPIES_DIR / f"Copie{pid}" / f"{label}.pdf"
current_suffix = ""
# Détection du vrai fichier s'il a un suffixe
if not pdf_path.exists():
if pdf_path.with_name(f"{label}_new.pdf").exists():
pdf_path = pdf_path.with_name(f"{label}_new.pdf")
current_suffix = "_new"
# Quand est-ce que ce chemin est utilisé ? Jamais ?
elif pdf_path.with_name(f"{label}_old.pdf").exists():
pdf_path = pdf_path.with_name(f"{label}_old.pdf")
current_suffix = "_old"
# 1. Gestion de empty-answer
if res.get("error") == "empty-answer":
old_path = pdf_path.with_name(f"{label}_old.pdf")
if pdf_path.exists() and pdf_path != old_path:
shutil.move(str(pdf_path), str(old_path))
pdf_path = old_path
current_suffix = "_old"
if (not can_spawn_tasks) and res["error"] == "additional-answer": if (not can_spawn_tasks) and res["error"] == "additional-answer":
tprint("\tSwallowing an additional-answer from a subsequent task.") tprint("\tSwallowing an additional-answer from a subsequent task.")
res["error"]= "" res["error"]= ""
@ -375,6 +414,13 @@ def process_single_task(task_tuple, precomputed_response=None):
if can_spawn_tasks and res.get("error") in ["wrong-label", "additional-answer"]: if can_spawn_tasks and res.get("error") in ["wrong-label", "additional-answer"]:
new_tasks.extend(handle_label_errors(pid, label, res, pdf_path)) new_tasks.extend(handle_label_errors(pid, label, res, pdf_path))
# Si "wrong-label" a déplacé le fichier courant vers _old
if res.get("error", "").startswith("wrg-lbl-moved-to:"):
current_suffix = "_old"
# 5. Enregistrer l'information dans correction.json
if current_suffix:
res["suffix"] = current_suffix
needs_correction = [] needs_correction = []
for (i,f) in enumerate(res["feedback"]): for (i,f) in enumerate(res["feedback"]):
@ -403,8 +449,9 @@ def process_single_task(task_tuple, precomputed_response=None):
if needs_correction: if needs_correction:
tprint(f"\tBox anomalies detected for Copie {pid} {group_name}. \n\tRequesting isolated correction from Gemini Flash...") tprint(f"\tBox anomalies detected for Copie {pid} {group_name}. \n\tRequesting isolated correction from Gemini Flash...")
try: try:
# Pensez à passer pdf_path à la fonction modifiée !
res["feedback"] = correct_boxes_with_gemini( res["feedback"] = correct_boxes_with_gemini(
pid, label, res["feedback"], pid, label, pdf_path, res["feedback"],
yming, ymaxg, width_r, total_height) yming, ymaxg, width_r, total_height)
except Exception as e: except Exception as e:
tprint(f"\tCorrection failed for Copie {pid}, {group_name} : {e}\n\tRemoving the boxes") tprint(f"\tCorrection failed for Copie {pid}, {group_name} : {e}\n\tRemoving the boxes")
@ -487,6 +534,12 @@ if __name__ == "__main__":
# 2. Make new group and add to tasks # 2. Make new group and add to tasks
pdf_path = copie_dir / f"{label}.pdf" pdf_path = copie_dir / f"{label}.pdf"
if not pdf_path.exists():
if (copie_dir / f"{label}_new.pdf").exists():
pdf_path = copie_dir / f"{label}_new.pdf"
# elif (copie_dir / f"{label}_old.pdf").exists():
# pdf_path = copie_dir / f"{label}_old.pdf"
if pdf_path.exists(): if pdf_path.exists():
idx = get_next_group_idx(label) idx = get_next_group_idx(label)
height = grouping.get_pdf_height(str(pdf_path)) height = grouping.get_pdf_height(str(pdf_path))
@ -510,7 +563,7 @@ if __name__ == "__main__":
for label in all_labels: for label in all_labels:
if label.startswith(args.batch_from): if label.startswith(args.batch_from):
args.batch_from = label args.batch_from = label
print("Batching from : ", args.batch_from) input(f"About to batch from: {args.batch_from}. Press Enter to confirm...")
break break
if args.batch_from not in all_labels: if args.batch_from not in all_labels:
sys.exit(f"Error: Label '{args.batch_from}' not found. Available labels: {all_labels}") sys.exit(f"Error: Label '{args.batch_from}' not found. Available labels: {all_labels}")

View File

@ -16,6 +16,7 @@ def compile_to_pdf(text, output_pdf_path): # 21 cm + 3.8 (dimension de la marge
\\usepackage{{lmodern}} \\usepackage{{lmodern}}
\\usepackage{{amsmath, amssymb}} \\usepackage{{amsmath, amssymb}}
\\usepackage{{commands}} \\usepackage{{commands}}
\\usepackage{{minted}}
\\usepackage{{graphicx}} \\usepackage{{graphicx}}
\\usepackage{{enumitem}} \\usepackage{{enumitem}}
\\begin{{document}} \\begin{{document}}
@ -45,6 +46,13 @@ def compile_to_pdf(text, output_pdf_path): # 21 cm + 3.8 (dimension de la marge
stderr=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
check=False check=False
) )
if "minted" in text:
subprocess.run(
['pdflatex', '-interaction=nonstopmode', tex_filename],
cwd=temp_dir,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False)
generated_pdf = os.path.join(temp_dir, pdf_filename) generated_pdf = os.path.join(temp_dir, pdf_filename)
if os.path.exists(generated_pdf): if os.path.exists(generated_pdf):

View File

@ -155,6 +155,8 @@ def worker_thread(base_dir, files_to_process, all_labels):
class ImageViewer: class ImageViewer:
def __init__(self, root, base_dir): def __init__(self, root, base_dir):
self.root = root self.root = root
self.root.resizable(False, False) # If you resize, coordinates will be wrong
self.base_dir = base_dir self.base_dir = base_dir
self.root.title("Bounding Box Viewer") self.root.title("Bounding Box Viewer")
self.label = tk.Label(root, text="Waiting for images...") self.label = tk.Label(root, text="Waiting for images...")

View File

@ -131,7 +131,13 @@ def clean_obj(obj):
return [clean_obj(x) for x in obj] return [clean_obj(x) for x in obj]
elif isinstance(obj, dict): elif isinstance(obj, dict):
return {k: clean_obj(v) for k, v in obj.items()} r = {}
for k, v in obj.items():
if k != "suffix":
r[k] = clean_obj(v)
else:
r[k] = v
return r
else: else:
return obj return obj

View File

@ -39,11 +39,6 @@ Avoid giving feedback about confusing letters `n` with `m`, `x` with
`n` or `h` with `k`. If it looks wrong, assume you read it wrong, `n` or `h` with `k`. If it looks wrong, assume you read it wrong,
unless the distinction is very important. unless the distinction is very important.
You should also give me a measure of confidence, from 0 to 1 that you
were able to correctly understand the answer. A score below 0.5 means
that you think it is likely that you couldn't understand an important
part.
In some case, you may find that either In some case, you may find that either
- The student didn't answer the right question. Set the score to 0. - The student didn't answer the right question. Set the score to 0.
Since it could be a labeling error, indicate is by setting `error` Since it could be a labeling error, indicate is by setting `error`
@ -57,19 +52,17 @@ In some case, you may find that either
If there's no error, set `error` to `\"\"`. If there's no error, set `error` to `\"\"`.
You will answer using json describing a list of dictionary with a key You will answer using json describing a list of dictionary with a key
\"id\", and a key \"result\" that contains the \"score\", the \"confidence\", a \"id\", and a key \"result\" that contains the \"score\", a list
list \"feedback\", and possibly an \"error\". Like this example : \"feedback\", and possibly an \"error\". Like this example :
[{ \"id\": \"01\", [{ \"id\": \"01\",
\"result\": {\"score\" : 2.5, \"result\": {\"score\" : 2.5,
\"confidence\" : 0.8,
\"feedback\": [{text: \"Un retour générique. Il faut apprendre le cours.\", box_2d: null}, \"feedback\": [{text: \"Un retour générique. Il faut apprendre le cours.\", box_2d: null},
{text: \"Non, la fonction n'est pas forcément continue\", pos: [145, 280, 340, 500]}], {text: \"Non, la fonction n'est pas forcément continue\", pos: [145, 280, 340, 500]}],
\"error\": \"\"} \"error\": \"\"}
}, },
{ \"id\": \"04\", { \"id\": \"04\",
\"result\": {\"score\" : 4., \"result\": {\"score\" : 4.,
\"confidence\" : 0.9,
\"feedback\" : [] \"feedback\" : []
\"error\": \"\" } \"error\": \"\" }
} }
@ -121,7 +114,6 @@ class FeedbackItem(BaseModel):
class ResultData(BaseModel): class ResultData(BaseModel):
score: float = Field(description="The numeric score") score: float = Field(description="The numeric score")
confidence: float = Field(description="Confidence level")
feedback: List[FeedbackItem] = Field(description="List of feedback items") feedback: List[FeedbackItem] = Field(description="List of feedback items")
error: str = Field(description="Indicates if an error occurred") error: str = Field(description="Indicates if an error occurred")
@ -140,7 +132,6 @@ UNROLLED_SCHEMA = {
"type": "OBJECT", "type": "OBJECT",
"properties": { "properties": {
"score": {"type": "NUMBER", "description": "The numeric score"}, "score": {"type": "NUMBER", "description": "The numeric score"},
"confidence": {"type": "NUMBER", "description": "Confidence level"},
"error": {"type": "STRING", "description": "Indicates if an error occurred"}, "error": {"type": "STRING", "description": "Indicates if an error occurred"},
"feedback": { "feedback": {
"type": "ARRAY", "type": "ARRAY",
@ -160,7 +151,7 @@ UNROLLED_SCHEMA = {
} }
} }
}, },
"required": ["score", "confidence", "feedback", "error"] "required": ["score", "feedback", "error"]
} }
}, },
"required": ["id", "result"] "required": ["id", "result"]

View File

@ -161,7 +161,8 @@ def has_significant_notes(note_img, threshold=20):
# print(f"Debug : visible pixels is {visible_pixels}") # print(f"Debug : visible pixels is {visible_pixels}")
return visible_pixels > threshold return visible_pixels > threshold
def apply_actions_and_regenerate(root_dir, data, student_id, actions, notes_layer, all_labels): def apply_actions_and_regenerate(root_dir, data, student_id, actions, notes_layer,
all_labels, update_score=False):
""" """
Modifies data based on actions, reads bnote.json, cuts notes, Modifies data based on actions, reads bnote.json, cuts notes,
regenerates all label images for consistency, saves dirty ones, regenerates all label images for consistency, saves dirty ones,
@ -230,6 +231,23 @@ def apply_actions_and_regenerate(root_dir, data, student_id, actions, notes_laye
print(f" > Deleted rect in {label}") print(f" > Deleted rect in {label}")
dirty_labels.add(label) dirty_labels.add(label)
# --- 1.5 Override with existing score.json if requested ---
if update_score and os.path.exists(score_path):
try:
with open(score_path, "r") as f:
existing_scores = json.load(f)
for label, existing_score in existing_scores.items():
if label in labels_data:
current_score = str(labels_data[label]['result'].get('score', 0))
# If manually modified, override the result and mark dirty
if current_score != str(existing_score):
labels_data[label]['result']['score'] = existing_score
dirty_labels.add(label)
print(f" > Overrode score for {label} to {existing_score} from existing score.json")
except json.JSONDecodeError:
print(f" > Warning: Could not read existing {score_path}")
# --- 2. Process Images (Cut notes, Regenerate, Concatenate) --- # --- 2. Process Images (Cut notes, Regenerate, Concatenate) ---
concat_list = [] concat_list = []
concat_list_F = [] concat_list_F = []
@ -255,7 +273,8 @@ def apply_actions_and_regenerate(root_dir, data, student_id, actions, notes_laye
# B. Regenerate Label Image # B. Regenerate Label Image
# We always regenerate to ensure Concat.jpg is consistent with any modifications # We always regenerate to ensure Concat.jpg is consistent with any modifications
pdf_path = Path(root_dir) / "Copies" / f"Copie{student_id}" / f"{label}.pdf" # pdf_path = Path(root_dir) / "Copies" / f"Copie{student_id}" / f"{label}.pdf"
pdf_path = content.get('pdf_path') # Contient le suffixe _new si nécessaire
if not os.path.exists(pdf_path): continue if not os.path.exists(pdf_path): continue
(base_img, _, _) = annotating.make_base_image(pdf_path) (base_img, _, _) = annotating.make_base_image(pdf_path)
@ -331,11 +350,13 @@ def apply_actions_and_regenerate(root_dir, data, student_id, actions, notes_laye
from utils import read_all_labels from utils import read_all_labels
if __name__ == "__main__": if __name__ == "__main__":
if len(sys.argv) < 2: import argparse
print("Usage: python reading_annotations.py <Dir>") parser = argparse.ArgumentParser(description="Read annotations and compile PDFs")
sys.exit(1) parser.add_argument("input_path", help="Directory path")
parser.add_argument("--update-score", action="store_true", help="Override scores with values from existing score.json")
args = parser.parse_args()
root_dir = sys.argv[1] root_dir = args.input_path
try: try:
all_labels = read_all_labels(Path(root_dir)) all_labels = read_all_labels(Path(root_dir))
@ -351,7 +372,9 @@ if __name__ == "__main__":
if os.path.exists(bnot_dir): if os.path.exists(bnot_dir):
print(f"Processing annotations for: {student_id}") print(f"Processing annotations for: {student_id}")
actions, notes = detect_checks_and_notes(bnot_dir) actions, notes = detect_checks_and_notes(bnot_dir)
if actions or notes: if actions or notes or args.update_score:
apply_actions_and_regenerate(root_dir, original_data, student_id, actions, notes, all_labels) apply_actions_and_regenerate(root_dir, original_data, student_id,
actions, notes, all_labels,
update_score=args.update_score)
else: else:
print(" No changes detected or missing files.") print(" No changes detected or missing files.")

View File

@ -91,7 +91,8 @@ def save_paginated_pdf(image_groups, output_path):
pages[0].save(output_path, "PDF", resolution=100.0, save_all=True, append_images=pages[1:]) pages[0].save(output_path, "PDF", resolution=100.0, save_all=True, append_images=pages[1:])
def apply_actions_and_regenerate_grouped(root_dir, data, student_id, def apply_actions_and_regenerate_grouped(root_dir, data, student_id,
actions, label_notes, all_labels): actions, label_notes, all_labels,
update_score=False):
""" """
Modifies data based on actions, pastes label-specific note crops, Modifies data based on actions, pastes label-specific note crops,
regenerates label images for consistency, saves dirty ones, regenerates label images for consistency, saves dirty ones,
@ -155,6 +156,23 @@ def apply_actions_and_regenerate_grouped(root_dir, data, student_id,
logs.append(f" > Deleted rect in {label}") logs.append(f" > Deleted rect in {label}")
dirty_labels.add(label) dirty_labels.add(label)
# --- 1.5 Override with existing score.json if requested ---
if update_score and os.path.exists(score_path):
try:
with open(score_path, "r") as f:
existing_scores = json.load(f)
for label, existing_score in existing_scores.items():
if label in labels_data:
current_score = str(labels_data[label]['result'].get('score', 0))
# If manually modified, override the result and mark dirty
if current_score != str(existing_score):
labels_data[label]['result']['score'] = existing_score
dirty_labels.add(label)
logs.append(f" > Overrode score for {label} to {existing_score} from existing score.json")
except json.JSONDecodeError:
logs.append(f" > Warning: Could not read existing {score_path}")
# --- 2. Process Images (Regenerate & Concatenate) --- # --- 2. Process Images (Regenerate & Concatenate) ---
concat_list = [] concat_list = []
concat_list_F = [] concat_list_F = []
@ -167,7 +185,8 @@ def apply_actions_and_regenerate_grouped(root_dir, data, student_id,
result = content['result'] result = content['result']
d_notes[label] = str(result.get('score', 0)) d_notes[label] = str(result.get('score', 0))
pdf_path = Path(root_dir) / "Copies" / f"Copie{student_id}" / f"{label}.pdf" # pdf_path = Path(root_dir) / "Copies" / f"Copie{student_id}" / f"{label}.pdf"
pdf_path = content.get('pdf_path')
if not os.path.exists(pdf_path): continue if not os.path.exists(pdf_path): continue
(base_img, _, _) = annotating.make_base_image(pdf_path) (base_img, _, _) = annotating.make_base_image(pdf_path)
@ -246,18 +265,6 @@ def apply_actions_and_regenerate_grouped(root_dir, data, student_id,
save_paginated_pdf(concat_list_F, pdf_out_path) save_paginated_pdf(concat_list_F, pdf_out_path)
logs.append(f" Saved regenerated Concat_F.pdf") logs.append(f" Saved regenerated Concat_F.pdf")
# max_w = max(i.width for i in concat_list_F)
# total_h = sum(i.height for i in concat_list_F)
# full_img = Image.new("RGB", (max_w, total_h), "white")
# y = 0
# for img in concat_list_F:
# full_img.paste(img, (0, y))
# y += img.height
# full_img.save(os.path.join(output_dir, "Concat_F.jpg"))
# logs.append(f" Saved regenerated Concat_F.jpg")
return "\n".join(logs) return "\n".join(logs)
from utils import read_all_labels from utils import read_all_labels
@ -269,6 +276,7 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Read grouped annotations and compile PDFs") parser = argparse.ArgumentParser(description="Read grouped annotations and compile PDFs")
parser.add_argument("input_path", help="Directory path") parser.add_argument("input_path", help="Directory path")
parser.add_argument("--refaire", action="store_true", help="Merge refaire annotations from Bnot") parser.add_argument("--refaire", action="store_true", help="Merge refaire annotations from Bnot")
parser.add_argument("--update-score", action="store_true", help="Override scores with values from existing score.json")
args = parser.parse_args() args = parser.parse_args()
root_dir = sys.argv[1] root_dir = sys.argv[1]
@ -407,7 +415,8 @@ if __name__ == "__main__":
sid, sid,
actions_by_student[sid], actions_by_student[sid],
notes_by_student[sid], notes_by_student[sid],
all_labels all_labels,
update_score=args.update_score
) )
# --- 2. Process each student concurrently using 4 threads --- # --- 2. Process each student concurrently using 4 threads ---

View File

@ -112,8 +112,8 @@ def split_an_interro(base_dir, input_pdf, coords_list):
if is_stop: if is_stop:
end_page = n_pn end_page = n_pn
# end_y_target_raw = n_y_start # end_y_target_raw = n_y_start
# On avait retiré un carreau précédemment, on le rajoute… # On avait retiré un carreau précédemment inutilement, on le rajoute, plus un demi carreau
end_y_target_raw = min(n_y_start + int(1.25 * carreau), 1000) end_y_target_raw = min(n_y_start + int(1.5 * carreau), 1000)
break break
# RULES 3 & 4: Calculate horizontal boundaries (0.0 to 1.0 fraction of local page width) # RULES 3 & 4: Calculate horizontal boundaries (0.0 to 1.0 fraction of local page width)