Prompting.py ; Miscs for Interro29

master
Sébastien Miquel 2026-05-19 11:02:49 +02:00
parent c6c1a052e1
commit c2e915226e
7 changed files with 387 additions and 335 deletions

View File

@ -278,6 +278,7 @@ def render_real_latex_text(text, width_px, bg_color=(255, 255, 255, 255), max_li
\\usepackage[T1]{{fontenc}} \\usepackage[T1]{{fontenc}}
\\usepackage{{lmodern}} % Enables arbitrary font scaling \\usepackage{{lmodern}} % Enables arbitrary font scaling
\\usepackage{{amsmath, amssymb}} \\usepackage{{amsmath, amssymb}}
\\usepackage{{mathabx}} % larger inline operators.
\\usepackage{{commands}} \\usepackage{{commands}}
%\\usepackage{{anyfontsize}} % replaced by lmodern %\\usepackage{{anyfontsize}} % replaced by lmodern
\\begin{{document}} \\begin{{document}}

View File

@ -3,6 +3,14 @@ import os
import time import time
from pathlib import Path from pathlib import Path
import argparse import argparse
import prompting
import signal
from google import genai
import base64
import shlex
import json
import threading
import concurrent.futures
if len(sys.argv) < 2: if len(sys.argv) < 2:
sys.exit("Usage: python script.py 'InterroTest/Ex 2/Group_1.jpg' OR <InputDir> OR 'file1' 'file2'") sys.exit("Usage: python script.py 'InterroTest/Ex 2/Group_1.jpg' OR <InputDir> OR 'file1' 'file2'")
@ -36,7 +44,7 @@ for path_str in args.paths:
if arg_path.is_file() and arg_path.suffix.lower() == ".jpg": if arg_path.is_file() and arg_path.suffix.lower() == ".jpg":
# Handle individual file # Handle individual file
# Note: assumes structure InterroTest/Ex 2/Group_1.jpg to get parents[1] # Note: assumes structure InterroTest/Ex 2/Group_1.jpg
label = arg_path.parent.name label = arg_path.parent.name
INPUT_DIR = arg_path.parent.parent.parent INPUT_DIR = arg_path.parent.parent.parent
COPIES_DIR = INPUT_DIR / "Copies" COPIES_DIR = INPUT_DIR / "Copies"
@ -58,124 +66,6 @@ for path_str in args.paths:
for img in sub.glob("*.jpg"): for img in sub.glob("*.jpg"):
tasks.append((str(img), label)) tasks.append((str(img), label))
my_prompt = """I'm giving you an image of several written answers to an exam.
Each answer is separated by a black horizontal line, and underneath,
to the left, is indicated the ID of the answer, from `01` to `50`.
I want you to score each answer, from 0 to 4, you may score half
points, such as 2.5. Even if a result is wrong, if the reasoning is
correct and could lead to a right answer, you should give at least
half the points.
You also need to give feedback to the student, in french :
- which part of his answer is wrong,
- why is it wrong
- possibly, what he should have done instead.
Your feedback may contain LaTeX fragments written like `$a^2 + b^2 = c^2$`.
If your score is not 4, you should always provide some feedback
explaining what's missing.
For each piece of feedback, if it is related to a specific part of the
answer that is wrong, you may provide a `box_2d`, to locate this
specific part of the answer. This `box_2d` should be in the form
[ymin, xmin, ymax, xmax] normalized to 0-1000. If you do not provide
one, set `box_2d` to `null`.
If the answer is correct, there is no need to provide feedback. You do
not have to give positive feedback, but if you do, do not provide a
`box_2d` for it.
For example, if the student says a function is continuous when it
isn't, provide the coordinates where the word «continuous» is. If a
calculation went wrong, gives the coordinates of the step where it
goes wrong, and as feedback, what went wrong.
Avoid giving feedback about confusing letters `n` with `m`, `x` with
`n` or `h` with `k`. If it looks wrong, assume you read it wrong,
unless the distinction is very important.
You should also give me a measure of confidence, from 0 to 1 that you
were able to correctly understand the answer. A score below 0.5 means
that you think it is likely that you couldn't understand an important
part.
In some case, you may find that either
- The student didn't answer the right question. Set the score to 0.
Since it could be a labeling error, indicate is by setting `error`
to \"wrong-label\".
- You can find an answer to another question of the exercice (taking
more than a couple of lines). Score the question you are supposed
to score, but set `error` to \"additional-answer\".
- The answer to the question is empty, or the student has only
rewritten the statement of the question. In this case, set `error`
to \"empty-answer\" and do not provide any kind of feedback.
If there's no error, set `error` to `\"\"`.
You will answer using json describing a list of dictionary with a key
\"id\", and a key \"result\" that contains the \"score\", the \"confidence\", a
list \"feedback\", and possibly an \"error\". Like this example :
[{ \"id\": \"01\",
\"result\": {\"score\" : 2.5,
\"confidence\" : 0.8,
\"feedback\": [{text: \"Un retour générique. Il faut apprendre le cours.\", box_2d: null},
{text: \"Non, la fonction n'est pas forcément continue\", pos: [145, 280, 340, 500]}],
\"error\": \"\"}
},
{ \"id\": \"04\",
\"result\": {\"score\" : 4.,
\"confidence\" : 0.9,
\"feedback\" : []
\"error\": \"\" }
}
]
Here is the text of the exercice (or the relevant part of the problem)
of the exam :
```
<<text>>
```
Here is a possible correct answer :
```
<<corr>>
```
<<persp>>
You are asked to score the question or exercice labeled `<<label>>`,
do not score or give feedback to any other question."""
def make_prompt(full_label):
def read_longest_prefix_file(subdir):
dir_path = INPUT_DIR / subdir
matches = [f for f in dir_path.iterdir()
if f.is_file()
and full_label.startswith(f.name)
and f.suffix not in [".pdf", ".tex"]]
if not matches:
return ""
return max(matches, key=lambda f: len(f.name)).read_text(encoding="utf-8", errors="replace")
text = read_longest_prefix_file("Text")
corr = read_longest_prefix_file("Sol")
persp = read_longest_prefix_file("Persp")
if persp != "":
persp = "\n\nHere are additional scoring instructions : \n\n```\n" + persp +"\n```\n"
return my_prompt.replace("<<text>>", text).replace("<<corr>>", corr).replace("<<persp>>", persp).replace("<<label>>", full_label)
from google import genai
from google.genai import types
import base64
import shlex
import json
import os
import threading
import concurrent.futures
NB_THREADS = 12 NB_THREADS = 12
@ -190,9 +80,6 @@ MODEL_ID_pro = "gemini-3.1-pro-preview"
MODEL_ID_flash = "gemini-3-flash-preview" MODEL_ID_flash = "gemini-3-flash-preview"
api_key = os.environ["GEMINI_API_KEY"] api_key = os.environ["GEMINI_API_KEY"]
import signal
import sys
# --- Thread-safe Logging --- # --- Thread-safe Logging ---
log_lock = threading.Lock() log_lock = threading.Lock()
thread_logs = {} thread_logs = {}
@ -227,94 +114,11 @@ def handle_interrupt(sig, frame):
flush_thread_log(tid) flush_thread_log(tid)
sys.exit(1) sys.exit(1)
signal.signal(signal.SIGINT, handle_interrupt) signal.signal(signal.SIGINT, handle_interrupt)
signal.signal(signal.SIGTERM, handle_interrupt) signal.signal(signal.SIGTERM, handle_interrupt)
# --------------------------- # ---------------------------
from pydantic import BaseModel, Field, TypeAdapter
from typing import List, Optional, Tuple
class FeedbackItem(BaseModel):
text: str = Field(description="Feedback content")
box_2d: Optional[List[int]] = Field(None, description="box coordinates or null")
class ResultData(BaseModel):
score: float = Field(description="The numeric score")
confidence: float = Field(description="Confidence level")
feedback: List[FeedbackItem] = Field(description="List of feedback items")
error: str = Field(description="Indicates if an error occurred")
class EvaluationEntry(BaseModel):
id: str = Field(description="Entry identifier")
result: ResultData = Field(description="Result details")
# These nested definitions do not work with the batch api, unroll them
UNROLLED_SCHEMA = {
"type": "ARRAY",
"items": {
"type": "OBJECT",
"properties": {
"id": {"type": "STRING", "description": "Entry identifier"},
"result": {
"type": "OBJECT",
"properties": {
"score": {"type": "NUMBER", "description": "The numeric score"},
"confidence": {"type": "NUMBER", "description": "Confidence level"},
"error": {"type": "STRING", "description": "Indicates if an error occurred"},
"feedback": {
"type": "ARRAY",
"description": "List of feedback items",
"items": {
"type": "OBJECT",
"properties": {
"text": {"type": "STRING", "description": "Feedback content"},
"box_2d": {
"type": "ARRAY",
"items": {"type": "INTEGER"},
"nullable": True,
"description": "box coordinates or null"
}
},
"required": ["text"]
}
}
},
"required": ["score", "confidence", "feedback", "error"]
}
},
"required": ["id", "result"]
}
}
# The root model for parsing is be: List[EvaluationEntry]
def generate_request(file, full_label):
"""Generates request for Gemini."""
prompt = make_prompt(full_label)
image_path = Path(file)
contents = [
types.Content(
role="user",
parts=[
types.Part.from_bytes(
data=image_path.read_bytes(),
mime_type="image/jpeg"
),
types.Part.from_text(text=prompt),
],
)
]
generate_content_config = types.GenerateContentConfig(
temperature=1.0,
top_p=0.95,
seed=0,
max_output_tokens=65535,
response_mime_type= "application/json",
response_json_schema= TypeAdapter(List[EvaluationEntry]).json_schema()
)
return (contents, generate_content_config)
client = genai.Client(api_key=api_key) client = genai.Client(api_key=api_key)
output_path = INPUT_DIR / "correction.json" output_path = INPUT_DIR / "correction.json"
progress_path = INPUT_DIR / "correction_progress.json" progress_path = INPUT_DIR / "correction_progress.json"
@ -386,76 +190,17 @@ def call_gemini_with_retries(model_id, contents, config,
tprint(f"\tGemini API failure: {e}. Maximum retries reached.") tprint(f"\tGemini API failure: {e}. Maximum retries reached.")
raise raise
import io
from pdf2image import convert_from_path
from PIL import Image
def get_single_image_bytes(pdf_path):
"""Converts a multi-page PDF into a single stitched JPEG in memory."""
imgs = convert_from_path(pdf_path, dpi=200) # Same DPI as grouping.py
if not imgs:
raise ValueError(f"No pages in {pdf_path}")
if len(imgs) == 1:
combined = imgs[0]
else:
max_width = max(img.width for img in imgs)
total_height = sum(img.height for img in imgs)
combined = Image.new('RGB', (max_width, total_height), 'white')
y_offset = 0
for img in imgs:
combined.paste(img, (0, y_offset))
y_offset += img.height
img_byte_arr = io.BytesIO()
combined.save(img_byte_arr, format='JPEG', quality=85)
return img_byte_arr.getvalue()
def correct_boxes_with_gemini(pid, label, original_feedbacks, def correct_boxes_with_gemini(pid, label, original_feedbacks,
yming, ymaxg, width_r, total_height): yming, ymaxg, width_r, total_height):
"""Requests corrected bounding boxes from Gemini Flash on the single image.""" """Requests corrected bounding boxes from Gemini Flash on the single image."""
pdf_path = COPIES_DIR / f"Copie{pid}" / f"{label}.pdf" pdf_path = COPIES_DIR / f"Copie{pid}" / f"{label}.pdf"
img_bytes = get_single_image_bytes(pdf_path)
localized_feedbacks = [f for f in original_feedbacks if f["box_2d"]]
global_feedbacks = [f for f in original_feedbacks if not f["box_2d"]]
prompt = f"""
Here is a single student's submission to a question in a written exam. The following JSON contains feedback items with bounding boxes (box_2d) that are incorrect. Each piece of feedback is supposed to be related to a piece of the answer that is wrong.
For example, if the student says a function is continuous when it
isn't, the coordinates should be where the word «continuous» is. If a
calculation went wrong, the coordinates should be where the step where
it goes wrong, and the feedback is what went wrong.
Please analyze the image and return the exact same feedback text, but with ONLY the box_2d coordinates corrected for this specific image.
Coordinates must be [ymin, xmin, ymax, xmax] scaled to 1000. If a box is invalid/not found, return null for it.
Original feedback:
{json.dumps(localized_feedbacks, indent=2)}
"""
contents = [
types.Content(
role="user",
parts=[
types.Part.from_bytes(data=img_bytes, mime_type="image/jpeg"),
types.Part.from_text(text=prompt),
],
)
]
config = types.GenerateContentConfig(
temperature=0.0, # Low temperature for accurate correction
response_mime_type="application/json",
response_json_schema=TypeAdapter(List[FeedbackItem]).json_schema()
)
contents, config = prompting.request_for_box_correction(pdf_path, original_feedbacks)
response_text = call_gemini_with_retries(MODEL_ID_flash, contents, config) response_text = call_gemini_with_retries(MODEL_ID_flash, contents, config)
corrected_feedbacks = json.loads(response_text) corrected_feedbacks = json.loads(response_text)
global_feedbacks = [f for f in original_feedbacks if not f["box_2d"]]
# Map the coordinates back from the single image to the group canvas # Map the coordinates back from the single image to the group canvas
for f in corrected_feedbacks: for f in corrected_feedbacks:
b = f.get("box_2d") b = f.get("box_2d")
@ -499,27 +244,7 @@ def handle_label_errors(pid, label, res, pdf_path):
if error_type == "wrong-label": if error_type == "wrong-label":
tprint(f"\tHandling wrong-label for {pid} {label}") tprint(f"\tHandling wrong-label for {pid} {label}")
prompt = f"""This image is a part of the answer of a student to a written exam. contents, config = prompting.request_for_wrong_label(pdf_path, label, enonce, labels_txt)
It was initially labeled '{label}' but I suspect this label is wrong. Perhaps the student himself wrote the wrong label.
You need to analyse this image, and find the label of the question it answers. Do not trust the label written by the student but instead check the content of its answer and the notation he uses to identify the correct label of the question the student answered.
Return ONLY the exact label string.
Here is the full content of the exam :
{enonce}
Here is a list of all possible labels. You need to answer with one of these :
{labels_txt}
"""
contents = [types.Content(role="user", parts=[
types.Part.from_bytes(data=get_single_image_bytes(pdf_path), mime_type="image/jpeg"),
types.Part.from_text(text=prompt) ])]
config = types.GenerateContentConfig(temperature=0.0)
new_label = call_gemini_with_retries(MODEL_ID_flash, contents, config).strip().strip('"\'') new_label = call_gemini_with_retries(MODEL_ID_flash, contents, config).strip().strip('"\'')
if new_label not in all_labels: if new_label not in all_labels:
tprint(f"\t\tCopie{pid} returned an incorrect label {new_label} from an initial wrong label {label}. Ignoring") tprint(f"\t\tCopie{pid} returned an incorrect label {new_label} from an initial wrong label {label}. Ignoring")
@ -547,34 +272,13 @@ Here is a list of all possible labels. You need to answer with one of these :
new_label, False)) new_label, False))
elif error_type == "additional-answer": elif error_type == "additional-answer":
prompt = f"""This image is a part of the answer of a student to a written exam. contents, config = prompting.request_for_additional_answer(pdf_path, label, enonce, labels_txt)
It was initially labeled '{label}' but I suspect this image also contains answers to another, or several other questions.
You need to analyse this image, and find the list of the labels of the questions it answers. Return ONLY the list of the exact label strings.
If the end of the image only contains the first line of an answer to another question, ignore it.
Here is the full content of the exam :
{enonce}
Here is a list of all possible labels. You need to answer with a list one of these :
{labels_txt}
"""
tprint(f"\tHandling additional-answer for {pid} {label}") tprint(f"\tHandling additional-answer for {pid} {label}")
contents = [types.Content(role="user", parts=[
types.Part.from_bytes(data=get_single_image_bytes(pdf_path), mime_type="image/jpeg"),
types.Part.from_text(text=prompt)
])]
config = types.GenerateContentConfig(temperature=0.0, response_mime_type="application/json")
try: try:
add_labels = json.loads(call_gemini_with_retries(MODEL_ID_flash, contents, config)) add_labels = json.loads(call_gemini_with_retries(MODEL_ID_flash, contents, config))
except Exception: except Exception:
add_labels = [] add_labels = []
tprint(f"\tHandling additional-answer for {pid} {label}")
keep_error = False keep_error = False
error = "al:" error = "al:"
for add_label in add_labels: for add_label in add_labels:
@ -644,7 +348,7 @@ def process_single_task(task_tuple, precomputed_response=None):
flash_count += 1 flash_count += 1
try: try:
contents, config = generate_request(file_path, label) contents, config = prompting.generate_request(INPUT_DIR, file_path, label)
model_to_use = MODEL_ID_flash if use_flash else MODEL_ID_pro model_to_use = MODEL_ID_flash if use_flash else MODEL_ID_pro
if precomputed_response: if precomputed_response:
@ -725,6 +429,8 @@ def process_single_task(task_tuple, precomputed_response=None):
except json.JSONDecodeError: except json.JSONDecodeError:
tprint(f"Error decoding JSON for {file_path}", file=sys.stderr) tprint(f"Error decoding JSON for {file_path}", file=sys.stderr)
with io_lock:
errors_summary.append(("Error decoding JSON response", file_path))
except Exception as e: except Exception as e:
error_msg = f"Exception processing {file_path}: {e}" error_msg = f"Exception processing {file_path}: {e}"
print(error_msg, file=sys.stderr) print(error_msg, file=sys.stderr)
@ -798,11 +504,14 @@ if __name__ == "__main__":
if args.batch or args.batch_from: if args.batch or args.batch_from:
from utils import read_all_labels
all_labels = read_all_labels(INPUT_DIR) all_labels = read_all_labels(INPUT_DIR)
batch_tasks = [] batch_tasks = []
if args.batch_from: if args.batch_from:
for label in all_labels:
if label.startswith(args.batch_from):
args.batch_from = label
print("Batching from : ", args.batch_from)
break
if args.batch_from not in all_labels: if args.batch_from not in all_labels:
sys.exit(f"Error: Label '{args.batch_from}' not found. Available labels: {all_labels}") sys.exit(f"Error: Label '{args.batch_from}' not found. Available labels: {all_labels}")
@ -852,7 +561,7 @@ if __name__ == "__main__":
"role": "user", "role": "user",
"parts": [ "parts": [
{"inlineData": {"mimeType": "image/jpeg", "data": b64_img}}, {"inlineData": {"mimeType": "image/jpeg", "data": b64_img}},
{"text": make_prompt(label)} {"text": prompting.make_prompt(INPUT_DIR,label)}
] ]
}], }],
"generation_config": { "generation_config": {
@ -860,7 +569,7 @@ if __name__ == "__main__":
"topP": 0.95, "topP": 0.95,
"maxOutputTokens": 65535, "maxOutputTokens": 65535,
"responseMimeType": "application/json", "responseMimeType": "application/json",
"responseSchema": UNROLLED_SCHEMA "responseSchema": prompting.UNROLLED_SCHEMA
} }
} }
} }

View File

@ -183,9 +183,12 @@ for path_str in args.input_paths:
# 1. Determine which files to process # 1. Determine which files to process
if input_arg.is_file(): if input_arg.is_file():
INPUT_DIR = input_arg.parent.parent
target_files = [input_arg] target_files = [input_arg]
elif input_arg.is_dir(): elif input_arg.is_dir():
target_files = list(input_arg.glob("Copie*.pdf")) INPUT_DIR = input_arg
COPIES_DIR = INPUT_DIR / "Copies"
target_files = list(COPIES_DIR.glob("Copie*.pdf"))
if not target_files: if not target_files:
print(f"Warning: No Copie*.pdf files found in {input_arg}") print(f"Warning: No Copie*.pdf files found in {input_arg}")
else: else:
@ -194,7 +197,7 @@ for path_str in args.input_paths:
# 2. Run the logic for all collected files # 2. Run the logic for all collected files
for target_file in target_files: for target_file in target_files:
INPUT_DIR = target_file.parent # INPUT_DIR = target_file.parent
CUTLEFT_DIR = INPUT_DIR / 'Cutleft' CUTLEFT_DIR = INPUT_DIR / 'Cutleft'
# Matches stem_01.jpg, stem_02.jpg, etc. # Matches stem_01.jpg, stem_02.jpg, etc.
@ -306,7 +309,7 @@ def process_copy_group(group_key, files):
accumulated_labels.append(box.label) accumulated_labels.append(box.label)
break # exit retry loop break # exit retry loop
except Exception as e: except Exception as e:
print(f"Error processing {image_file.name}: {e}") print(f"Error processing {image_file.name}: {e}\n\tIt will be retried.")
# Rate Limiting # Rate Limiting
elapsed = time.time() - start_time elapsed = time.time() - start_time

View File

@ -2,6 +2,8 @@ import os
import sys import sys
import json import json
import re import re
from pathlib import Path
from collections import defaultdict from collections import defaultdict
def main(): def main():
@ -10,6 +12,7 @@ def main():
sys.exit(1) sys.exit(1)
work_dir = os.path.abspath(sys.argv[1]) work_dir = os.path.abspath(sys.argv[1])
copies_dir = Path(work_dir) / "Copies"
bnot_dir = sys.argv[2] bnot_dir = sys.argv[2]
target_subdir = os.path.join(work_dir, "A Rendre") target_subdir = os.path.join(work_dir, "A Rendre")
os.makedirs(target_subdir, exist_ok=True) os.makedirs(target_subdir, exist_ok=True)
@ -31,11 +34,11 @@ def main():
copies_map = defaultdict(list) copies_map = defaultdict(list)
assigned_names = set() # To track which names were successfully linked assigned_names = set() # To track which names were successfully linked
for filename in os.listdir(work_dir): for filename in os.listdir(copies_dir):
match = pattern.match(filename) match = pattern.match(filename)
if match: if match:
copie_id = match.group(1) copie_id = match.group(1)
json_path = os.path.join(work_dir, filename) json_path = os.path.join(copies_dir, filename)
try: try:
with open(json_path, 'r', encoding='utf-8') as f: with open(json_path, 'r', encoding='utf-8') as f:
data = json.load(f) data = json.load(f)

View File

@ -13,7 +13,7 @@ print("o to open pdf, O original pdf, e to emacs part, p to go back, i to interr
# --- Configuration & Globals --- # --- Configuration & Globals ---
padding = 60 padding = 60
valid_labels_set = None
# Queue payload: (pil_image, json_path, metadata) # Queue payload: (pil_image, json_path, metadata)
# metadata is a dict: {'copie': str, 'part': int, 'schema': dict} # metadata is a dict: {'copie': str, 'part': int, 'schema': dict}
@ -277,6 +277,17 @@ class ImageViewer:
self.current_meta["schema"] self.current_meta["schema"]
) )
labels = [v["label"] for v in current_data["list"]]
labels = [label for label in labels if label != "_"]
labels = [label[1:] for label in labels if label[0] == "|"]
labels = [label[:-1] for label in labels if label[-1] == "|"]
false_labels = [label for label in labels if label not in valid_labels_set]
if false_labels:
msg = f"Wrong label in {self.current_json_path.name}: {false_labels}\n\n\tPlease press 'e' to fix it, then press Enter again."
print(msg)
messagebox.showerror("Label Error", msg)
return
num_added = len(converted_items) num_added = len(converted_items)
# Add to accumulator # Add to accumulator
@ -362,6 +373,7 @@ if __name__ == "__main__":
input_path = Path(sys.argv[1]) input_path = Path(sys.argv[1])
files_to_process = [] files_to_process = []
if input_path.is_file(): if input_path.is_file():
# Correctly identify base_dir if we are in 'Copies' or 'Cutleft' # Correctly identify base_dir if we are in 'Copies' or 'Cutleft'
if input_path.parent.name in ["Copies", "Cutleft"]: if input_path.parent.name in ["Copies", "Cutleft"]:
@ -386,6 +398,10 @@ if __name__ == "__main__":
sys.exit(1) sys.exit(1)
files_to_process = sorted(cutleft_dir.glob("*.jpg")) files_to_process = sorted(cutleft_dir.glob("*.jpg"))
labels_txt = (base_dir / "labels").read_text()
valid_labels_set = set(line.strip() for line in labels_txt.splitlines() if line.strip())
try: try:
all_labels = read_all_labels(base_dir) all_labels = read_all_labels(base_dir)
except FileNotFoundError: except FileNotFoundError:

315
prompting.py Normal file
View File

@ -0,0 +1,315 @@
from pathlib import Path
import io
main_prompt = """I'm giving you an image of several written answers to an exam.
Each answer is separated by a black horizontal line, and underneath,
to the left, is indicated the ID of the answer, from `01` to `50`.
I want you to score each answer, from 0 to 4, you may score half
points, such as 2.5. Even if a result is wrong, if the reasoning is
correct and could lead to a right answer, you should give at least
half the points.
You also need to give feedback to the student, in french :
- which part of his answer is wrong,
- why is it wrong
- possibly, what he should have done instead.
Your feedback may contain LaTeX fragments written like `$a^2 + b^2 = c^2$`.
If your score is not 4, you should always provide some feedback
explaining what's missing.
For each piece of feedback, if it is related to a specific part of the
answer that is wrong, you may provide a `box_2d`, to locate this
specific part of the answer. This `box_2d` should be in the form
[ymin, xmin, ymax, xmax] normalized to 0-1000. If you do not provide
one, set `box_2d` to `null`.
If the answer is correct, there is no need to provide feedback. You do
not have to give positive feedback, but if you do, do not provide a
`box_2d` for it.
For example, if the student says a function is continuous when it
isn't, provide the coordinates where the word «continuous» is. If a
calculation went wrong, gives the coordinates of the step where it
goes wrong, and as feedback, what went wrong.
Avoid giving feedback about confusing letters `n` with `m`, `x` with
`n` or `h` with `k`. If it looks wrong, assume you read it wrong,
unless the distinction is very important.
You should also give me a measure of confidence, from 0 to 1 that you
were able to correctly understand the answer. A score below 0.5 means
that you think it is likely that you couldn't understand an important
part.
In some case, you may find that either
- The student didn't answer the right question. Set the score to 0.
Since it could be a labeling error, indicate is by setting `error`
to \"wrong-label\".
- You can find an answer to another question of the exercice (taking
more than a couple of lines). Score the question you are supposed
to score, but set `error` to \"additional-answer\".
- The answer to the question is empty, or the student has only
rewritten the statement of the question. In this case, set `error`
to \"empty-answer\" and do not provide any kind of feedback.
If there's no error, set `error` to `\"\"`.
You will answer using json describing a list of dictionary with a key
\"id\", and a key \"result\" that contains the \"score\", the \"confidence\", a
list \"feedback\", and possibly an \"error\". Like this example :
[{ \"id\": \"01\",
\"result\": {\"score\" : 2.5,
\"confidence\" : 0.8,
\"feedback\": [{text: \"Un retour générique. Il faut apprendre le cours.\", box_2d: null},
{text: \"Non, la fonction n'est pas forcément continue\", pos: [145, 280, 340, 500]}],
\"error\": \"\"}
},
{ \"id\": \"04\",
\"result\": {\"score\" : 4.,
\"confidence\" : 0.9,
\"feedback\" : []
\"error\": \"\" }
}
]
Here is the text of the exercice (or the relevant part of the problem)
of the exam :
```
<<text>>
```
Here is a possible correct answer :
```
<<corr>>
```
<<persp>>
You are asked to score the question or exercice labeled `<<label>>`,
do not score or give feedback to any other question."""
def make_prompt(input_dir,full_label):
def read_longest_prefix_file(subdir):
dir_path = input_dir / subdir
matches = [f for f in dir_path.iterdir()
if f.is_file()
and full_label.startswith(f.name)
and f.suffix not in [".pdf", ".tex"]]
if not matches:
return ""
return max(matches, key=lambda f: len(f.name)).read_text(encoding="utf-8", errors="replace")
text = read_longest_prefix_file("Text")
corr = read_longest_prefix_file("Sol")
persp = read_longest_prefix_file("Persp")
if persp != "":
persp = "\n\nHere are additional scoring instructions : \n\n```\n" + persp +"\n```\n"
return main_prompt.replace("<<text>>", text).replace("<<corr>>", corr).replace("<<persp>>", persp).replace("<<label>>", full_label)
from pydantic import BaseModel, Field, TypeAdapter
from typing import List, Optional, Tuple
class FeedbackItem(BaseModel):
text: str = Field(description="Feedback content")
box_2d: Optional[List[int]] = Field(None, description="box coordinates or null")
class ResultData(BaseModel):
score: float = Field(description="The numeric score")
confidence: float = Field(description="Confidence level")
feedback: List[FeedbackItem] = Field(description="List of feedback items")
error: str = Field(description="Indicates if an error occurred")
class EvaluationEntry(BaseModel):
id: str = Field(description="Entry identifier")
result: ResultData = Field(description="Result details")
# These nested definitions do not work with the batch api, unroll them
UNROLLED_SCHEMA = {
"type": "ARRAY",
"items": {
"type": "OBJECT",
"properties": {
"id": {"type": "STRING", "description": "Entry identifier"},
"result": {
"type": "OBJECT",
"properties": {
"score": {"type": "NUMBER", "description": "The numeric score"},
"confidence": {"type": "NUMBER", "description": "Confidence level"},
"error": {"type": "STRING", "description": "Indicates if an error occurred"},
"feedback": {
"type": "ARRAY",
"description": "List of feedback items",
"items": {
"type": "OBJECT",
"properties": {
"text": {"type": "STRING", "description": "Feedback content"},
"box_2d": {
"type": "ARRAY",
"items": {"type": "INTEGER"},
"nullable": True,
"description": "box coordinates or null"
}
},
"required": ["text"]
}
}
},
"required": ["score", "confidence", "feedback", "error"]
}
},
"required": ["id", "result"]
}
}
from google.genai import types
# The root model for parsing is be: List[EvaluationEntry]
def generate_request(input_dir, file, full_label):
"""Generates request for Gemini."""
prompt = make_prompt(input_dir, full_label)
image_path = Path(file)
contents = [
types.Content(
role="user",
parts=[
types.Part.from_bytes(
data=image_path.read_bytes(),
mime_type="image/jpeg"
),
types.Part.from_text(text=prompt),
],
)
]
generate_content_config = types.GenerateContentConfig(
temperature=1.0,
top_p=0.95,
seed=0,
max_output_tokens=65535,
response_mime_type= "application/json",
response_json_schema= TypeAdapter(List[EvaluationEntry]).json_schema()
)
return (contents, generate_content_config)
from pdf2image import convert_from_path
from PIL import Image
import json
def get_single_image_bytes(pdf_path):
"""Converts a multi-page PDF into a single stitched JPEG in memory."""
imgs = convert_from_path(pdf_path, dpi=200) # Same DPI as grouping.py
if not imgs:
raise ValueError(f"No pages in {pdf_path}")
if len(imgs) == 1:
combined = imgs[0]
else:
max_width = max(img.width for img in imgs)
total_height = sum(img.height for img in imgs)
combined = Image.new('RGB', (max_width, total_height), 'white')
y_offset = 0
for img in imgs:
combined.paste(img, (0, y_offset))
y_offset += img.height
img_byte_arr = io.BytesIO()
combined.save(img_byte_arr, format='JPEG', quality=85)
return img_byte_arr.getvalue()
def request_for_box_correction(pdf_path, original_feedbacks):
img_bytes = get_single_image_bytes(pdf_path)
localized_feedbacks = [f for f in original_feedbacks if f["box_2d"]]
prompt = f"""
Here is a single student's submission to a question in a written exam. The following JSON contains feedback items with bounding boxes (box_2d) that are incorrect. Each piece of feedback is supposed to be related to a piece of the answer that is wrong.
For example, if the student says a function is continuous when it
isn't, the coordinates should be where the word «continuous» is. If a
calculation went wrong, the coordinates should be where the step where
it goes wrong, and the feedback is what went wrong.
Please analyze the image and return the same feedback json content, but with ONLY the box_2d coordinates corrected for this specific image.
Coordinates must be [ymin, xmin, ymax, xmax] scaled to 1000. If a box is invalid/not found, return null for it.
Original feedback:
{json.dumps(localized_feedbacks, indent=2)}
"""
contents = [
types.Content(
role="user",
parts=[
types.Part.from_bytes(data=img_bytes, mime_type="image/jpeg"),
types.Part.from_text(text=prompt),
],
)
]
config = types.GenerateContentConfig(
temperature=1.0,
response_mime_type="application/json",
response_json_schema=TypeAdapter(List[FeedbackItem]).json_schema()
)
return contents,config
def request_for_wrong_label(pdf_path, label, enonce, labels_txt):
prompt = f"""This image is a part of the answer of a student to a written exam.
It was initially labeled '{label}' but I suspect this label is wrong. Perhaps the student himself wrote the wrong label.
You need to analyse this image, and find the label of the question it answers. Do not trust the label written by the student but instead check the content of its answer and the notation he uses to identify the correct label of the question the student answered.
Return ONLY the exact label string.
Here is the full content of the exam :
{enonce}
Here is a list of all possible labels. You need to answer with one of these :
{labels_txt}
"""
contents = [types.Content(role="user", parts=[
types.Part.from_bytes(data=get_single_image_bytes(pdf_path), mime_type="image/jpeg"),
types.Part.from_text(text=prompt)])]
config = types.GenerateContentConfig(temperature=1.0)
return contents, config
def request_for_additional_answer(pdf_path, label, enonce, labels_txt):
prompt = f"""This image is a part of the answer of a student to a written exam.
It was initially labeled '{label}' but I suspect this image also contains answers to another, or several other questions.
You need to analyse this image, and find the list of the labels of the questions it answers. Return ONLY the list of the exact label strings.
If the end of the image only contains the first line of an answer to another question, ignore it.
Here is the full content of the exam :
{enonce}
Here is a list of all possible labels. You need to answer with a list one of these :
{labels_txt}
"""
contents = [types.Content(role="user", parts=[
types.Part.from_bytes(data=get_single_image_bytes(pdf_path), mime_type="image/jpeg"),
types.Part.from_text(text=prompt)
])]
config = types.GenerateContentConfig(temperature=1.0, response_mime_type="application/json")
return contents, config

View File

@ -24,15 +24,20 @@ def get_extra_pdfs_as_images(root_dir, label, annotating_module):
return extra_images return extra_images
def save_paginated_pdf(image_groups, output_path): def save_paginated_pdf(image_groups, output_path):
"""Concatenates groups of images vertically, adding specific inner borders.""" """Concatenates groups of images vertically, adding inner borders and margins."""
if not image_groups: if not image_groups:
return return
max_w = max(img.width for group in image_groups for img in group) max_w = max(img.width for group in image_groups for img in group)
max_page_h = int(max_w * 1.414 * 1.3) max_page_h = int(max_w * 1.414 * 1.25)
# Calculate 0.2 cm in pixels at 100 DPI (0.2 / 2.54 inches * 100) # Calculate sizes in pixels at 100 DPI
border_px = int((0.2 / 2.54) * 100) border_px = int((0.2 / 2.54) * 100)
left_margin = int((0.3 / 2.54) * 100)
tb_margin = int((0.2 / 2.54) * 100)
# Available height for images once top/bottom margins are added
max_content_h = max_page_h - (2 * tb_margin)
pages = [] pages = []
current_page_imgs = [] current_page_imgs = []
@ -46,11 +51,10 @@ def save_paginated_pdf(image_groups, output_path):
processed_group = [] processed_group = []
for i, img in enumerate(group): for i, img in enumerate(group):
if i in (0, 1): if i in (0, 1):
img = img.copy() # Do not modify the original image object in memory img = img.copy()
draw = ImageDraw.Draw(img) draw = ImageDraw.Draw(img)
color = "black" if i == 0 else "blue" color = "black" if i == 0 else "blue"
# Draw the border inside the image edges
draw.rectangle( draw.rectangle(
[0, 0, img.width - 1, img.height - 1], [0, 0, img.width - 1, img.height - 1],
outline=color, outline=color,
@ -60,11 +64,12 @@ def save_paginated_pdf(image_groups, output_path):
group_h = sum(img.height for img in processed_group) group_h = sum(img.height for img in processed_group)
if current_page_imgs and (current_h + group_h > max_page_h): if current_page_imgs and (current_h + group_h > max_content_h):
page = Image.new("RGB", (max_w, current_h), "white") # Create page with margins included in dimensions
y = 0 page = Image.new("RGB", (max_w + left_margin, current_h + 2 * tb_margin), "white")
y = tb_margin
for c_img in current_page_imgs: for c_img in current_page_imgs:
page.paste(c_img, (0, y)) page.paste(c_img, (left_margin, y))
y += c_img.height y += c_img.height
pages.append(page) pages.append(page)
@ -75,10 +80,10 @@ def save_paginated_pdf(image_groups, output_path):
current_h += group_h current_h += group_h
if current_page_imgs: if current_page_imgs:
page = Image.new("RGB", (max_w, current_h), "white") page = Image.new("RGB", (max_w + left_margin, current_h + 2 * tb_margin), "white")
y = 0 y = tb_margin
for c_img in current_page_imgs: for c_img in current_page_imgs:
page.paste(c_img, (0, y)) page.paste(c_img, (left_margin, y))
y += c_img.height y += c_img.height
pages.append(page) pages.append(page)