Initial support for automatic fixing of additional and wrong label

master
Sébastien Miquel 2026-03-15 14:01:06 +01:00
parent 5f1613b8c1
commit f43c296e8a
11 changed files with 292 additions and 61 deletions

View File

@ -395,7 +395,7 @@ def compose_label_image(base_img, label, result, hmin,
width = base_img.width // 2 - 150 width = base_img.width // 2 - 150
img_score = render_score_text(label, score, error, width, img_score = render_score_text(label, score, error, width,
fontsize=18, with_error=with_error, fontsize=18, with_error=with_error,
id=with_error) id=with_id)
header_elements.append({"type": "score", "img": img_score, "data": result}) header_elements.append({"type": "score", "img": img_score, "data": result})
# Global Feedbacks # Global Feedbacks
@ -486,9 +486,7 @@ def compose_label_image(base_img, label, result, hmin,
return final_img, header_height return final_img, header_height
def natural_key(text): from utils import natural_key
return [int(c) if c.isdigit() else c.lower() for c in re.split(r'(\d+)', str(text))]
import concurrent.futures import concurrent.futures
def process_student(student_id, labels_data, root_dir, all_labels, overwrite): def process_student(student_id, labels_data, root_dir, all_labels, overwrite):

View File

@ -96,9 +96,7 @@ class CheckboxRenderer:
"final_box": box, "text_preview": meta["data"]["text"][:20] "final_box": box, "text_preview": meta["data"]["text"][:20]
}) })
import re from utils import natural_key
def natural_key(text):
return [int(c) if c.isdigit() else c.lower() for c in re.split(r'(\d+)', str(text))]
def process_student(args): def process_student(args):
"""Thread worker: Processes one student.""" """Thread worker: Processes one student."""

View File

@ -152,7 +152,7 @@ import os
import threading import threading
import concurrent.futures import concurrent.futures
NB_THREADS = 8 NB_THREADS = 12
# PROXY_URL = "http://192.168.241.1:3128" # PROXY_URL = "http://192.168.241.1:3128"
PROXY_URL = None PROXY_URL = None
@ -265,11 +265,218 @@ def call_gemini_with_retries(model_id, contents, config):
print(f"\tGemini API failure: {e}. Maximum retries reached.") print(f"\tGemini API failure: {e}. Maximum retries reached.")
raise raise
import io
from pdf2image import convert_from_path
from PIL import Image
def get_single_image_bytes(pdf_path):
"""Converts a multi-page PDF into a single stitched JPEG in memory."""
imgs = convert_from_path(pdf_path, dpi=200) # Same DPI as grouping.py
if not imgs:
raise ValueError(f"No pages in {pdf_path}")
if len(imgs) == 1:
combined = imgs[0]
else:
max_width = max(img.width for img in imgs)
total_height = sum(img.height for img in imgs)
combined = Image.new('RGB', (max_width, total_height), 'white')
y_offset = 0
for img in imgs:
combined.paste(img, (0, y_offset))
y_offset += img.height
img_byte_arr = io.BytesIO()
combined.save(img_byte_arr, format='JPEG', quality=85)
return img_byte_arr.getvalue()
def correct_boxes_with_gemini(pid, label, original_feedbacks,
root_dir, yming, ymaxg, width_r, total_height):
"""Requests corrected bounding boxes from Gemini Flash on the single image."""
pdf_path = Path(root_dir) / f"Copie{pid}" / f"{label}.pdf"
img_bytes = get_single_image_bytes(pdf_path)
localized_feedbacks = [f for f in original_feedbacks if f["box_2d"]]
global_feedbacks = [f for f in original_feedbacks if not f["box_2d"]]
prompt = f"""
Here is a single student's submission to a question in a written exam. The following JSON contains feedback items with bounding boxes (box_2d) that are incorrect. Each piece of feedback is supposed to be related to a piece of the answer that is wrong.
For example, if the student says a function is continuous when it
isn't, the coordinates should be where the word «continuous» is. If a
calculation went wrong, the coordinates should be where the step where
it goes wrong, and the feedback is what went wrong.
Please analyze the image and return the exact same feedback text, but with ONLY the box_2d coordinates corrected for this specific image.
Coordinates must be [ymin, xmin, ymax, xmax] scaled to 1000. If a box is invalid/not found, return null for it.
Original feedback:
{json.dumps(localized_feedbacks, indent=2)}
"""
contents = [
types.Content(
role="user",
parts=[
types.Part.from_bytes(data=img_bytes, mime_type="image/jpeg"),
types.Part.from_text(text=prompt),
],
)
]
config = types.GenerateContentConfig(
temperature=0.0, # Low temperature for accurate correction
response_mime_type="application/json",
response_json_schema=TypeAdapter(List[FeedbackItem]).json_schema()
)
response_text = call_gemini_with_retries(MODEL_ID_flash, contents, config)
corrected_feedbacks = json.loads(response_text)
# Map the coordinates back from the single image to the group canvas
for f in corrected_feedbacks:
b = f.get("box_2d")
if b:
ymin_s, xmin_s, ymax_s, xmax_s = b
# Y mapping: Add the group Y-offset (yming), then normalize to total_height
single_h = ymaxg - yming
new_ymin = int((yming + (ymin_s * single_h / 1000.0)) * 1000.0 / total_height)
new_ymax = int((yming + (ymax_s * single_h / 1000.0)) * 1000.0 / total_height)
# X mapping: Multiply by the width ratio of this sub-image vs the group image
new_xmin = int(xmin_s * width_r)
new_xmax = int(xmax_s * width_r)
f["box_2d"] = [new_ymin, new_xmin, new_ymax, new_xmax]
return global_feedbacks + corrected_feedbacks
import shutil
import grouping
def get_next_group_idx(root_dir, label):
"""Finds the next available Group index for a given label."""
target_folder = Path(root_dir) / label
target_folder.mkdir(exist_ok=True)
existing = list(target_folder.glob("Group_*.jpg"))
if not existing: return 0
return max([int(f.stem.split("_")[1]) for f in existing])
from utils import read_all_labels, enonce_total
def handle_label_errors(pid, label, res, pdf_path):
"""Handles Gemini labeling errors, moves/copies files, and returns new tasks."""
new_tasks = []
error_type = res.get("error")
all_labels = read_all_labels(INPUT_DIR)
labels_txt = (Path(INPUT_DIR) / "labels").read_text()
enonce = enonce_total(INPUT_DIR)
if error_type == "wrong-label":
print(f"\tHandling wrong-label for {pid} {label}")
prompt = f"""This image is a part of the answer of a student to a written exam.
It was initially labeled '{label}' but I suspect this label is wrong. Perhaps the student himself wrote the wrong label.
You need to analyse this image, and find the label of the question it answers. Do not trust the label written by the student but instead check the content of its answer and the notation he uses to identify the correct label of the question the student answered.
Return ONLY the exact label string.
Here is the full content of the exam :
{enonce}
Here is a list of all possible lables. You need to answer with one of these :
{labels_txt}
"""
contents = [types.Content(role="user", parts=[
types.Part.from_bytes(data=get_single_image_bytes(pdf_path), mime_type="image/jpeg"),
types.Part.from_text(text=prompt) ])]
config = types.GenerateContentConfig(temperature=0.0)
new_label = call_gemini_with_retries(MODEL_ID_flash, contents, config).strip().strip('"\'')
new_pdf_path = Path(INPUT_DIR) / f"Copie{pid}" / f"{new_label}.pdf"
if new_pdf_path.exists():
print(f"\t\tCopie{pid} tried to move wrong {label} to {new_label}, but it already exists.")
res["error"] = f"wrong-label:{new_label}?"
else:
print(f"\t\tCopie{pid} : moving wrong {label} to {new_label}.")
shutil.move(str(pdf_path), str(new_pdf_path))
idx = get_next_group_idx(INPUT_DIR, new_label)
height = grouping.get_pdf_height(str(new_pdf_path))
grouping.create_jpg(new_label, idx, [(pid, str(new_pdf_path), height)], INPUT_DIR)
print(f"\t\tMaking {new_label} group {idx+1}")
new_tasks.append((str(Path(INPUT_DIR) / new_label / f"Group_{idx+1}.jpg"),
new_label, False))
elif error_type == "additional-answer":
prompt = f"""This image is a part of the answer of a student to a written exam.
It was initially labeled '{label}' but I suspect this image also contains answers to another, or several other questions.
You need to analyse this image, and find the list of the labels of the questions it answers. Return ONLY the list of the exact label strings.
If the end of the image only contains the first line of an answer to another question, ignore it.
Here is the full content of the exam :
{enonce}
Here is a list of all possible labels. You need to answer with a list one of these :
{labels_txt}
"""
print(f"\tHandling additional-answer for {pid} {label}")
contents = [types.Content(role="user", parts=[
types.Part.from_bytes(data=get_single_image_bytes(pdf_path), mime_type="image/jpeg"),
types.Part.from_text(text=prompt)
])]
config = types.GenerateContentConfig(temperature=0.0, response_mime_type="application/json")
try:
add_labels = json.loads(call_gemini_with_retries(MODEL_ID_flash, contents, config))
except Exception:
add_labels = []
print(f"\tHandling additional-answer for {pid} {label}")
some_present = False
for add_label in add_labels:
if add_label == label:
continue
new_pdf_path = Path(INPUT_DIR) / f"Copie{pid}" / f"{add_label}.pdf"
if not new_pdf_path.exists():
shutil.copy(str(pdf_path), str(new_pdf_path))
print(f"\t\tCopying Copie{pid} : {label} -> {add_label}")
idx = get_next_group_idx(INPUT_DIR, add_label)
print(f"\t\tMaking {add_label} group {idx+1}")
height = grouping.get_pdf_height(str(new_pdf_path))
grouping.create_jpg(add_label, idx, [(pid, str(new_pdf_path), height)], INPUT_DIR)
new_tasks.append((str(Path(INPUT_DIR) / add_label / f"Group_{idx+1}.jpg"),
add_label, False))
else:
some_present = True
print(f"\t\tAlready present (not copied) Copie{pid} : {label} -> {add_label}")
if not some_present:
res["error"] = ""
return new_tasks
def process_single_task(task_tuple): def process_single_task(task_tuple):
global pro_count, flash_count global pro_count, flash_count
file_path, label = task_tuple file_path = task_tuple[0]
label = task_tuple[1]
can_spawn_tasks = task_tuple[2] if len(task_tuple) > 2 else True
group_name = os.path.splitext(file_path)[0] group_name = os.path.splitext(file_path)[0]
json_path = group_name + '.json' json_path = group_name + '.json'
new_tasks = []
with open(json_path, 'r') as f: with open(json_path, 'r') as f:
group_data = json.load(f) group_data = json.load(f)
@ -302,8 +509,15 @@ def process_single_task(task_tuple):
for p in json_data: for p in json_data:
pid = p["id"] pid = p["id"]
res = p["result"] res = p["result"]
yming, ymaxg, width_r = d_data[pid]
pdf_path = Path(INPUT_DIR) / f"Copie{pid}" / f"{label}.pdf"
if res["error"] != "": if res["error"] != "":
print("\tError :", res["error"], "for Copie", pid, label, group_name) print("\tError :", res["error"], "for Copie", pid, group_name)
if can_spawn_tasks and res.get("error") in ["wrong-label", "additional-answer"]:
new_tasks.extend(handle_label_errors(pid, label, res, pdf_path))
needs_correction = [] needs_correction = []
for (i,f) in enumerate(res["feedback"]): for (i,f) in enumerate(res["feedback"]):
b = f["box_2d"] b = f["box_2d"]
@ -323,7 +537,7 @@ def process_single_task(task_tuple):
needs_correction.append(i) needs_correction.append(i)
break break
# yming, ymaxg, width_r = d_data[pid] #
# if ymin < yming-50 or ymax > ymaxg+50: # if ymin < yming-50 or ymax > ymaxg+50:
# print("Error : Gemini answered box2d too low/up", pid, label, group_name) # print("Error : Gemini answered box2d too low/up", pid, label, group_name)
# if ymax < yming or ymin > ymaxg: # if ymax < yming or ymin > ymaxg:
@ -343,7 +557,7 @@ def process_single_task(task_tuple):
# f["box_2d"][3] = int(width_r * 1000) # f["box_2d"][3] = int(width_r * 1000)
if needs_correction: if needs_correction:
print(f"\tBox anomalies detected for Copie {pid} {group_name}. Requesting isolated correction from Gemini Flash...") print(f"\tBox anomalies detected for Copie {pid} {group_name}. \n\tRequesting isolated correction from Gemini Flash...")
try: try:
res["feedback"] = correct_boxes_with_gemini( res["feedback"] = correct_boxes_with_gemini(
pid, label, res["feedback"], INPUT_DIR, pid, label, res["feedback"], INPUT_DIR,
@ -371,11 +585,23 @@ def process_single_task(task_tuple):
print(error_msg, file=sys.stderr) print(error_msg, file=sys.stderr)
with io_lock: with io_lock:
errors_summary.append((error_msg, file_path)) errors_summary.append((error_msg, file_path))
return new_tasks
print(f"Starting processing on {len(tasks_to_process)} tasks with {NB_THREADS} threads...") print(f"Starting processing on {len(tasks_to_process)} tasks with {NB_THREADS} threads...")
with concurrent.futures.ThreadPoolExecutor(max_workers=NB_THREADS) as executor: with concurrent.futures.ThreadPoolExecutor(max_workers=NB_THREADS) as executor:
executor.map(process_single_task, tasks_to_process) futures = {executor.submit(process_single_task, task): task for task in tasks_to_process}
# Process tasks as they complete, allowing dynamic task addition
for future in concurrent.futures.as_completed(futures):
try:
new_generated_tasks = future.result()
if new_generated_tasks:
for new_task in new_generated_tasks:
futures[executor.submit(process_single_task, new_task)] = new_task
except Exception as e:
print(f"Exception during task execution: {e}", file=sys.stderr)
end_time = time.time() end_time = time.time()
print("Time elapsed : ", end_time - start_time) print("Time elapsed : ", end_time - start_time)
@ -384,5 +610,5 @@ if errors_summary:
print("\n--- Summary of Exceptions ---", file=sys.stderr) print("\n--- Summary of Exceptions ---", file=sys.stderr)
for (err, file) in errors_summary: for (err, file) in errors_summary:
print(err, file=sys.stderr) print(err, file=sys.stderr)
escaped_path = shlex.quote(str(file_path)) escaped_path = shlex.quote(str(file))
print(f"Run : python correction.py {escaped_path}") print(f"Run : python correction.py {escaped_path}")

View File

@ -175,8 +175,7 @@ args = parser.parse_args()
# input_arg = Path(args.input_path) # input_arg = Path(args.input_path)
image_files = [] image_files = []
def natural_key(text): from utils import natural_key
return [int(c) if c.isdigit() else c.lower() for c in re.split(r'(\d+)', str(text))]
for path_str in args.input_paths: for path_str in args.input_paths:
input_arg = Path(path_str) input_arg = Path(path_str)

View File

@ -210,8 +210,7 @@ def create_jpg(identifier, group_index, group, root_dir):
print(f"Saved {output_path} with {len(group)} ({os.path.getsize(output_path)/1024/1024:.2f} MB)") print(f"Saved {output_path} with {len(group)} ({os.path.getsize(output_path)/1024/1024:.2f} MB)")
def natural_key(text): from utils import natural_key
return [int(c) if c.isdigit() else c.lower() for c in re.split(r'(\d+)', str(text))]
def process_identifier(identifier, files_info, root_dir): def process_identifier(identifier, files_info, root_dir):

View File

@ -283,9 +283,7 @@ class ImageViewer:
self.root.clipboard_clear() self.root.clipboard_clear()
self.root.clipboard_append(box_str) self.root.clipboard_append(box_str)
def natural_key(text): from utils import natural_key, read_all_labels
return [int(c) if c.isdigit() else c.lower() for c in re.split(r'(\d+)', str(text))]
if __name__ == "__main__": if __name__ == "__main__":
if len(sys.argv) < 2: if len(sys.argv) < 2:
@ -319,9 +317,7 @@ if __name__ == "__main__":
files_to_process = sorted(cutleft_dir.glob("*.jpg")) files_to_process = sorted(cutleft_dir.glob("*.jpg"))
try: try:
all_labels = sorted(list(filter(None, all_labels = read_all_labels(base_dir)
(base_dir / "labels").read_text().splitlines())),
key = natural_key)
except FileNotFoundError: except FileNotFoundError:
all_labels = [] all_labels = []

View File

@ -20,39 +20,39 @@ import ftfy
import re import re
import urllib.request import urllib.request
url = "https://raw.githubusercontent.com/hbenbel/French-Dictionary/master/dictionary/dictionary.txt" # url = "https://raw.githubusercontent.com/hbenbel/French-Dictionary/master/dictionary/dictionary.txt"
french_words = urllib.request.urlopen(url).read().decode('utf-8').splitlines() # french_words = urllib.request.urlopen(url).read().decode('utf-8').splitlines()
# 2. Pre-compute an O(1) lookup dictionary # 2. Pre-compute an O(1) lookup dictionary
# We simulate the corruption by replacing accents with null bytes (\x00) # We simulate the corruption by replacing accents with null bytes (\x00)
lookup_map = {} # lookup_map = {}
for word in french_words: # for word in french_words:
# Replace all French accents with \x00 to create the "broken" key # # Replace all French accents with \x00 to create the "broken" key
broken_key = re.sub(r'[éèêëàâäîïôöùûüçœÉÈÊËÀÂÄÎÏÔÖÙÛÜÇŒ]', '\x00', word) # broken_key = re.sub(r'[éèêëàâäîïôöùûüçœÉÈÊËÀÂÄÎÏÔÖÙÛÜÇŒ]', '\x00', word)
if '\x00' in broken_key: # if '\x00' in broken_key:
lookup_map[broken_key] = word # e.g., "\x00cole" -> "école" # lookup_map[broken_key] = word # e.g., "\x00cole" -> "école"
# 3. Fast replace function # 3. Fast replace function
def fast_fix(text): def fast_fix(text):
# Find words containing regular letters and null bytes # Find words containing regular letters and null bytes
def replacer(match): # def replacer(match):
broken_word = match.group(0) # broken_word = match.group(0)
# Return the fixed word from our map, or leave it if not found # # Return the fixed word from our map, or leave it if not found
# (Handles case-insensitivity by falling back to lowercase map) # # (Handles case-insensitivity by falling back to lowercase map)
return lookup_map.get(broken_word.lower(), broken_word) # return lookup_map.get(broken_word.lower(), broken_word)
return re.sub(r'[a-zA-Z\x00]+', replacer, text)
# return re.sub(r'[a-zA-Z\x00]+', replacer, text)
return text
INPUT_FILE = Path(INPUT_DIR) / "correction.json" INPUT_FILE = Path(INPUT_DIR) / "correction.json"
OUTPUT_FILE = Path(INPUT_DIR) / "fixed_correction.json" OUTPUT_FILE = Path(INPUT_DIR) / "correction.json"
def clean_string(s: str) -> str: def clean_string(s: str) -> str:
# fix encoding issues # fix encoding issues
s = ftfy.fix_text(s) s = ftfy.fix_text(s)
s = re.sub(r'\x19', r'\x00', s) s = s.replace('\x19', '\x00')
s = re.sub(r'\x18', r'\x00', s) s = s.replace('\x18', '\x00')
s = fast_fix(s) s = fast_fix(s)
s = s.replace('\x00', '') s = s.replace('\x00', '')
return s return s

View File

@ -142,10 +142,7 @@ def detect_checks_and_notes(output_dir):
from PIL import ImageDraw from PIL import ImageDraw
import re from utils import natural_key
def natural_key(text):
return [int(c) if c.isdigit() else c.lower() for c in re.split(r'(\d+)', str(text))]
from annotating import MARGIN_LEFT, ANNOT_WIDTH from annotating import MARGIN_LEFT, ANNOT_WIDTH
def has_significant_notes(note_img, threshold=20): def has_significant_notes(note_img, threshold=20):
@ -324,7 +321,7 @@ def apply_actions_and_regenerate(root_dir, data, student_id, actions, notes_laye
print(f" Saved regenerated Concat_F.jpg") print(f" Saved regenerated Concat_F.jpg")
from pathlib import Path from pathlib import Path
from utils import read_all_labelse
if __name__ == "__main__": if __name__ == "__main__":
if len(sys.argv) < 2: if len(sys.argv) < 2:
print("Usage: python reading_annotations.py <Dir>") print("Usage: python reading_annotations.py <Dir>")
@ -333,10 +330,7 @@ if __name__ == "__main__":
root_dir = sys.argv[1] root_dir = sys.argv[1]
try: try:
all_labels = sorted(list(filter(None, all_labels = read_all_labels(Path(root_dir))
(Path(root_dir) / "labels")
.read_text().splitlines())),
key = natural_key)
except FileNotFoundError: except FileNotFoundError:
all_labels = [] all_labels = []

View File

@ -163,6 +163,8 @@ def apply_actions_and_regenerate_grouped(root_dir, data, student_id, actions, la
return "\n".join(logs) return "\n".join(logs)
from utils import read_all_labels
if __name__ == "__main__": if __name__ == "__main__":
if len(sys.argv) < 2: if len(sys.argv) < 2:
print("Usage: python reading_grouped_annotations.py <Dir>") print("Usage: python reading_grouped_annotations.py <Dir>")
@ -176,10 +178,7 @@ if __name__ == "__main__":
sys.exit(1) sys.exit(1)
try: try:
all_labels = sorted(list(filter(None, all_labels = read_all_labels(Path(root_dir))
(Path(root_dir) / "labels")
.read_text().splitlines())),
key=natural_key)
except FileNotFoundError: except FileNotFoundError:
all_labels = [] all_labels = []

View File

@ -5,8 +5,7 @@ import ezodf
import re import re
from pathlib import Path from pathlib import Path
def natural_key(text): from utils import natural_key, read_all_labels
return [int(c) if c.isdigit() else c.lower() for c in re.split(r'(\d+)', str(text))]
# Configuration # Configuration
ODS_PATH = "/home/sebastien/Rust/gestion_classe/Staging/current_eval.ods" ODS_PATH = "/home/sebastien/Rust/gestion_classe/Staging/current_eval.ods"
@ -19,10 +18,8 @@ def main():
else: else:
work_dir = os.path.abspath(sys.argv[1]) work_dir = os.path.abspath(sys.argv[1])
all_labels = sorted(list(filter(None, all_labels = read_all_labels(Path(work_dir))
(Path(work_dir) / "labels")
.read_text().splitlines())),
key = natural_key)
a_rendre_path = os.path.join(work_dir, TARGET_DIR_NAME) a_rendre_path = os.path.join(work_dir, TARGET_DIR_NAME)
if not os.path.isdir(a_rendre_path): if not os.path.isdir(a_rendre_path):

25
utils.py Normal file
View File

@ -0,0 +1,25 @@
import re
from pathlib import Path
def natural_key(text):
return [int(c) if c.isdigit() else c.lower() for c in re.split(r'(\d+)', str(text))]
def read_all_labels(base_dir):
return sorted(list(filter(None,
(Path(base_dir) / "labels").read_text().splitlines())),
key = natural_key)
def enonce_total(base_dir):
text_dir = Path(base_dir) / 'Text'
if not text_dir.is_dir():
return ""
files = [f for f in text_dir.iterdir() if f.is_file()]
files.sort(key=lambda f: natural_key(f.name))
output = []
for filepath in files:
content = filepath.read_text(encoding='utf-8')
output.append(f"{filepath.name}\n{content}\n\n\n")
return "".join(output)