Divers, en lien avec DMI04 (gemini_for_enonce.py, smaller images, link to enonce.pdf)

master
Sébastien Miquel 2026-05-09 16:24:53 +02:00
parent 8d9165d0ac
commit 0836d5809d
9 changed files with 202 additions and 59 deletions

View File

@ -1,7 +1,7 @@
#+title: Script #+title: Script
#+author: Sébastien Miquel #+author: Sébastien Miquel
#+date: 14-03-2026 #+date: 14-03-2026
# Time-stamp: <08-05-26 22:18> # Time-stamp: <08-05-26 22:52>
#+OPTIONS: #+OPTIONS:
* Quézaco * Quézaco
@ -157,6 +157,7 @@ Set proxy with ~export HTTPS_PROXY="http://10.0.0.1:3128"~
2. =python correction.py Interro --limit 240= OU 2. =python correction.py Interro --limit 240= OU
=python correction.py Interro/Ex\ 2/Group_1.jpg= OU =python correction.py Interro/Ex\ 2/Group_1.jpg= OU
=python correction.py Interro --overwrite= =python correction.py Interro --overwrite=
=python correction.py Interro --pro-by-label= (needs `labels_for_pro`)
Fais les requêtes de correction à Gemini. Fais les requêtes de correction à Gemini.

View File

@ -396,6 +396,8 @@ def render_score_text(label, score, error, width_px, fontsize=30,
return img return img
A4_WIDTH_200DPI = 1654
TARGET_MIN_WIDTH = int(A4_WIDTH_200DPI * 0.9) # 1406 pixels
def compose_label_image(base_img, label, result, hmin, def compose_label_image(base_img, label, result, hmin,
render_fn=render_real_latex_text, render_fn=render_real_latex_text,
draw_callback=None, draw_callback=None,
@ -415,6 +417,17 @@ def compose_label_image(base_img, label, result, hmin,
draw_callback: Optional function(type, draw_obj, position_dict, data_dict) draw_callback: Optional function(type, draw_obj, position_dict, data_dict)
called when elements are placed. Used for checkboxes. called when elements are placed. Used for checkboxes.
""" """
left_pad = 0
if base_img.width < TARGET_MIN_WIDTH:
total_missing = TARGET_MIN_WIDTH - base_img.width
left_pad = min(total_missing, MARGIN_LEFT)
right_pad = total_missing - left_pad
new_base = Image.new("RGB", (TARGET_MIN_WIDTH, base_img.height), "white")
new_base.paste(base_img, (left_pad, 0))
base_img = new_base
score = result.get('score', 0) score = result.get('score', 0)
error = result.get('error', "") error = result.get('error', "")
feedbacks = result.get('feedback', []) feedbacks = result.get('feedback', [])
@ -485,8 +498,8 @@ def compose_label_image(base_img, label, result, hmin,
target_ymin = (ymin - hmin) + image_offset_y target_ymin = (ymin - hmin) + image_offset_y
target_ymax = (ymax - hmin) + image_offset_y target_ymax = (ymax - hmin) + image_offset_y
target_xmin = xmin + MARGIN_LEFT target_xmin = xmin + MARGIN_LEFT + left_pad
target_xmax = xmax + MARGIN_LEFT target_xmax = xmax + MARGIN_LEFT + left_pad
# Draw Rectangle (if not suppressed) # Draw Rectangle (if not suppressed)
if "norectangle" not in fb: if "norectangle" not in fb:

View File

@ -582,7 +582,7 @@ Here is a list of all possible labels. You need to answer with a list one of the
height = grouping.get_pdf_height(str(new_pdf_path)) height = grouping.get_pdf_height(str(new_pdf_path))
grouping.create_jpg(add_label, idx, [(pid, str(new_pdf_path), height)], INPUT_DIR) grouping.create_jpg(add_label, idx, [(pid, str(new_pdf_path), height)], INPUT_DIR)
new_tasks.append((str(Path(INPUT_DIR) / add_label / f"Group_{idx+1}.jpg"), new_tasks.append((str(Path(INPUT_DIR) / add_label / f"Group_{idx+1}.jpg"),
add_label, False, f"{label}(->)")) add_label, False))
error += f"(->){add_label}" error += f"(->){add_label}"
keep_error = True keep_error = True
else: else:
@ -603,7 +603,6 @@ def process_single_task(task_tuple, precomputed_response=None):
file_path = task_tuple[0] file_path = task_tuple[0]
label = task_tuple[1] label = task_tuple[1]
can_spawn_tasks = task_tuple[2] if len(task_tuple) > 2 else True can_spawn_tasks = task_tuple[2] if len(task_tuple) > 2 else True
injected_error = task_tuple[3] if len(task_tuple) > 3 else ""
group_name = os.path.splitext(file_path)[0] group_name = os.path.splitext(file_path)[0]
json_path = group_name + '.json' json_path = group_name + '.json'
@ -649,15 +648,6 @@ def process_single_task(task_tuple, precomputed_response=None):
for p in json_data: for p in json_data:
pid = p["id"] pid = p["id"]
res = p["result"] res = p["result"]
# Inject additional error if present
if injected_error:
if res["error"]:
res["error"] = f"{injected_error} {res['error']}"
else:
res["error"] = injected_error
yming, ymaxg, width_r = d_data[pid] yming, ymaxg, width_r = d_data[pid]
pdf_path = Path(INPUT_DIR) / f"Copie{pid}" / f"{label}.pdf" pdf_path = Path(INPUT_DIR) / f"Copie{pid}" / f"{label}.pdf"
@ -720,8 +710,6 @@ def process_single_task(task_tuple, precomputed_response=None):
tprint(f"Error decoding JSON for {file_path}", file=sys.stderr) tprint(f"Error decoding JSON for {file_path}", file=sys.stderr)
except Exception as e: except Exception as e:
error_msg = f"Exception processing {file_path}: {e}" error_msg = f"Exception processing {file_path}: {e}"
import traceback
traceback.print_exc() # <--- Add this line to see the real crash
print(error_msg, file=sys.stderr) print(error_msg, file=sys.stderr)
with io_lock: with io_lock:
errors_summary.append((error_msg, file_path)) errors_summary.append((error_msg, file_path))

View File

@ -26,7 +26,8 @@ if os.path.isfile(path_arg) and path_arg.lower().endswith('.pdf'):
files = [os.path.basename(path_arg)] files = [os.path.basename(path_arg)]
elif os.path.isdir(path_arg): elif os.path.isdir(path_arg):
INPUT_DIR = path_arg INPUT_DIR = path_arg
files = sorted([f for f in os.listdir(INPUT_DIR) if f.lower().endswith('.pdf')]) files = sorted([f for f in os.listdir(INPUT_DIR) if f.lower().endswith('.pdf') and
"nonc" not in f.lower()])
else: else:
sys.exit("Error: Input must be a directory or a PDF file.") sys.exit("Error: Input must be a directory or a PDF file.")
@ -83,12 +84,20 @@ def stitch_images(image_list):
return combined return combined
import threading
pdf_cache_lock = threading.Lock()
@lru_cache(maxsize=3) @lru_cache(maxsize=3)
def get_pdf_pages(filename): def _get_pdf_pages_cached(filename):
"""Caches the heavy PDF rendering step for the current and next files."""
pdf_path = os.path.join(INPUT_DIR, filename) pdf_path = os.path.join(INPUT_DIR, filename)
return convert_from_path(pdf_path) return convert_from_path(pdf_path)
def get_pdf_pages(filename):
"""Thread-safe wrapper for the cached PDF conversion."""
with pdf_cache_lock:
return _get_pdf_pages_cached(filename)
def process_single_pdf(filename, shift_offset=0, max_per_file=5): def process_single_pdf(filename, shift_offset=0, max_per_file=5):
""" """
Converts PDF to stitched images. Converts PDF to stitched images.
@ -137,7 +146,8 @@ def process_single_pdf(filename, shift_offset=0, max_per_file=5):
# 3. Generate Preview (All stitched together, Resized) # 3. Generate Preview (All stitched together, Resized)
full_stitch = stitch_images(cropped_images) full_stitch = stitch_images(cropped_images)
preview_resized = full_stitch.resize(OUTPUT_SIZE, Image.LANCZOS) # preview_resized = full_stitch.resize(OUTPUT_SIZE, Image.LANCZOS)
preview_resized = full_stitch.resize(OUTPUT_SIZE, Image.BILINEAR)
schema = { schema = {
"original_filename": filename, "original_filename": filename,
@ -200,8 +210,6 @@ class ImageReviewer:
self.current_preview = None # Only stores the resized preview for GUI self.current_preview = None # Only stores the resized preview for GUI
self.is_processing = False self.is_processing = False
# Queue for pre-fetched results (index, (preview, splits, schema))
self.prefetch_queue = Queue(maxsize=1)
# Queue for manual re-processing results # Queue for manual re-processing results
self.manual_queue = Queue() self.manual_queue = Queue()
@ -244,19 +252,15 @@ class ImageReviewer:
self.trigger_processing(self.files[self.index], self.current_shift) self.trigger_processing(self.files[self.index], self.current_shift)
def prefetch_worker(self): def prefetch_worker(self):
"""Background thread to process the NEXT image constantly.""" """Background thread to load the NEXT file's PDF pages into RAM."""
idx_to_process = 0 idx_to_process = -1
while True: while True:
target = self.index + 1 target = self.index + 1
if target < len(self.files): if target < len(self.files) and target != idx_to_process:
if idx_to_process != target: fname = self.files[target]
fname = self.files[target] get_pdf_pages(fname) # Just calling it warms the lru_cache
result = process_single_pdf(fname, shift_offset=0) idx_to_process = target
if result: time.sleep(0.05)
self.prefetch_queue.put((target, result)) # Blocks if full
idx_to_process = target
time.sleep(0.1)
def load_current_image(self, use_prefetch=False): def load_current_image(self, use_prefetch=False):
if self.index >= len(self.files): if self.index >= len(self.files):
@ -266,21 +270,10 @@ class ImageReviewer:
filename = self.files[self.index] filename = self.files[self.index]
self.is_processing = False self.is_processing = False
self.current_shift = 0
result_found = None # Always trigger processing. If prefetched, get_pdf_pages returns instantly.
self.trigger_processing(filename, self.current_shift)
if use_prefetch and not self.prefetch_queue.empty():
q_idx, q_result = self.prefetch_queue.queue[0]
if q_idx == self.index:
_, result_found = self.prefetch_queue.get()
self.current_shift = 0
print(f"Loaded {filename} from prefetch.")
if result_found:
self.handle_processing_result(result_found, filename)
else:
# Not in queue (first load or queue mismatch), process manually
self.trigger_processing(filename, self.current_shift)
def trigger_processing(self, filename, shift): def trigger_processing(self, filename, shift):
"""Starts a thread to process image so GUI doesn't freeze.""" """Starts a thread to process image so GUI doesn't freeze."""

136
gemini_for_enonce.py Normal file
View File

@ -0,0 +1,136 @@
import os
import sys
import argparse
from pathlib import Path
from pydantic import BaseModel, Field
from typing import List
from google import genai
from google.genai import types
MODEL_ID = "gemini-3-flash-preview"
api_key = os.environ.get("GEMINI_API_KEY")
class QuestionItem(BaseModel):
label: str = Field(description="The unique label of the question (e.g., '1.a', 'Exercice 1')")
question_content: str = Field(description="The source text of the question, strictly extracted from the enonce file, EXCLUDING the label itself.")
solution_content: str = Field(description="The source text of the solution, strictly extracted from the correction file.")
class ExamExtraction(BaseModel):
questions: List[QuestionItem]
PROMPT = """I am providing:
1. A PDF of an exam (`enonce.pdf`)
2. The source code of the exam questions (`enonce` file)
3. The source code of the exam solutions (`correction` file)
Your task:
1. Identify all distinct question labels using the PDF document.
These labels should be unique : use `Ex 1 : 1)a)` or `I)1)b)`.
2. For each label, extract its exact corresponding question text
from the `enonce` source file. Do not include the label itself
in this extracted text (nor LaTeX like `item` nor org-mode list
labelling like `2.`).
3. For each label, extract its exact corresponding solution textual
content from the `correction` source file. Return the result as
a JSON list in the exact reading order of the document.
"""
def find_file(folder: Path, base_name: str) -> Path:
for ext in [".org", ".tex"]:
path = folder / f"{base_name}{ext}"
if path.is_file():
return path
return None
def process_exam(folder_path: str):
folder = Path(folder_path)
# 1. Resolve files
pdf_path = folder / "enonce.pdf"
enonce_path = find_file(folder, "enonce")
correction_path = find_file(folder, "correction")
missing = []
if not pdf_path.is_file(): missing.append("enonce.pdf")
if not enonce_path: missing.append("enonce.org or enonce.tex")
if not correction_path: missing.append("correction.org or correction.tex")
if missing:
print(f"Error: Missing files in {folder}: {', '.join(missing)}")
sys.exit(1)
print("Reading files...")
pdf_bytes = pdf_path.read_bytes()
enonce_text = enonce_path.read_text(encoding="utf-8")
correction_text = correction_path.read_text(encoding="utf-8")
client = genai.Client(api_key=api_key)
contents = [
types.Content(
role="user",
parts=[
types.Part.from_text(text=PROMPT),
types.Part.from_bytes(data=pdf_bytes, mime_type="application/pdf"),
types.Part.from_text(text=f"--- ENONCE SOURCE ({enonce_path.name}) ---\n{enonce_text}"),
types.Part.from_text(text=f"--- CORRECTION SOURCE ({correction_path.name}) ---\n{correction_text}"),
],
)
]
config = types.GenerateContentConfig(
temperature=0.1,
response_mime_type="application/json",
response_json_schema=ExamExtraction.model_json_schema(),
)
print("Sending request to Gemini...")
response = client.models.generate_content(
model=MODEL_ID,
contents=contents,
config=config
)
extracted_data = ExamExtraction.model_validate_json(response.text)
# 2. Setup output directories
text_dir = folder / "Text"
sol_dir = folder / "Sol"
text_dir.mkdir(exist_ok=True)
sol_dir.mkdir(exist_ok=True)
labels_file = folder / "labels"
print("Writing files...")
with open(labels_file, "w", encoding="utf-8") as flabels:
for q in extracted_data.questions:
# Sanitize label for filesystem (prevent directory traversal if label contains '/')
safe_label = q.label.replace("/", "_")
flabels.write(f"{safe_label}\n")
# Fix double-escaped newlines
q_content = q.question_content.replace("\\n", "\n")
s_content = q.solution_content.replace("\\n", "\n")
# Write Text/label
with open(text_dir / safe_label, "w", encoding="utf-8") as f:
f.write(f"{q.label}\n{q.question_content}")
# Write Sol/label
with open(sol_dir / safe_label, "w", encoding="utf-8") as f:
f.write(f"{q.label}\n{q.solution_content}")
print(f"Success! Processed {len(extracted_data.questions)} questions.")
if __name__ == "__main__":
if not api_key:
print("Error: GEMINI_API_KEY environment variable is not set.")
sys.exit(1)
parser = argparse.ArgumentParser(description="Extract exam and solution code via Gemini.")
parser.add_argument("folder", help="Directory containing the exam files")
args = parser.parse_args()
process_exam(args.folder)

View File

@ -157,7 +157,7 @@ def generate_request(file, labels, names, context_labels):
] ]
generate_content_config = types.GenerateContentConfig( generate_content_config = types.GenerateContentConfig(
temperature=1., temperature=1.0,
top_p=0.95, top_p=0.95,
seed=0, seed=0,
max_output_tokens=65535, max_output_tokens=65535,
@ -314,7 +314,7 @@ def process_copy_group(group_key, files):
# Run ThreadPool on GROUPS (Copies), not individual files # Run ThreadPool on GROUPS (Copies), not individual files
# Each thread handles one student's full exam copy sequentially # Each thread handles one student's full exam copy sequentially
with ThreadPoolExecutor(max_workers=16) as executor: with ThreadPoolExecutor(max_workers=12) as executor:
# Convert dict items to arguments for map # Convert dict items to arguments for map
# executor.map expects a function and an iterable. # executor.map expects a function and an iterable.
# We use a lambda or separate function to unpack the tuple if needed, # We use a lambda or separate function to unpack the tuple if needed,

View File

@ -7,6 +7,7 @@ import os
import re import re
import glob import glob
import shutil import shutil
import subprocess
from pypdf import PdfReader, PdfWriter from pypdf import PdfReader, PdfWriter
# --- Constants --- # --- Constants ---
@ -94,7 +95,7 @@ class PDFPreviewer:
"← / → : Move line 1cm left/right\n" "← / → : Move line 1cm left/right\n"
"'c': Rotate page 180°, 'C' : rotate all pages, ',' : rotate all files\n" "'c': Rotate page 180°, 'C' : rotate all pages, ',' : rotate all files\n"
"t s r n m: keep left, next page, keep none, keep right, keep as is\n" "t s r n m: keep left, next page, keep none, keep right, keep as is\n"
"z: send this page to the end, 'R':restart file, 'P':back to previous file\n" "z: send this page to the end, 'A':pdf arranger 'R':restart file, 'P':back to previous file\n"
) )
self.info_label = tk.Label(master, text=instructions, justify=tk.LEFT) self.info_label = tk.Label(master, text=instructions, justify=tk.LEFT)
self.info_label.pack(pady=5, side=tk.TOP) self.info_label.pack(pady=5, side=tk.TOP)
@ -123,6 +124,7 @@ class PDFPreviewer:
self.master.bind("r", self.discard_page) self.master.bind("r", self.discard_page)
self.master.bind("z", self.send_page_end) self.master.bind("z", self.send_page_end)
self.master.bind("R", self.restart_current_file) self.master.bind("R", self.restart_current_file)
self.master.bind("A", self.start_arranger)
self.master.bind("P", self.go_to_previous_file) self.master.bind("P", self.go_to_previous_file)
@ -131,6 +133,9 @@ class PDFPreviewer:
self.current_zoom = 1.0 self.current_zoom = 1.0
def start_arranger(self):
subprocess.Popen(["pdf-arranger", self.pdf_path])
def on_resize(self, event): def on_resize(self, event):
""" """
Handles window resize events by reloading the page. Handles window resize events by reloading the page.

View File

@ -309,7 +309,18 @@ class ImageViewer:
def on_open_interro(self, event): def on_open_interro(self, event):
if self.is_viewing and self.current_json_path: if self.is_viewing and self.current_json_path:
pdf_path = "/home/sebastien/Prépa/Staging/Interro/" + str(base_dir) + ".pdf" # Check local directory first
local_accent = self.base_dir / "énoncé.pdf"
local_plain = self.base_dir / "enonce.pdf"
if local_accent.exists():
pdf_path = str(local_accent)
elif local_plain.exists():
pdf_path = str(local_plain)
else:
# Fallback to the Interro staging directory
pdf_path = f"/home/sebastien/Prépa/Staging/Interro/{self.base_dir.name}.pdf"
print(f"Opening {pdf_path}") print(f"Opening {pdf_path}")
subprocess.Popen(['xdg-open', pdf_path]) subprocess.Popen(['xdg-open', pdf_path])

View File

@ -5,20 +5,16 @@ def natural_key(text):
return [int(c) if c.isdigit() else c.lower() for c in re.split(r'(\d+)', str(text))] return [int(c) if c.isdigit() else c.lower() for c in re.split(r'(\d+)', str(text))]
def read_all_labels(base_dir): def read_all_labels(base_dir):
# return sorted(list(filter(None, return sorted(list(filter(None,
# (Path(base_dir) / "labels").read_text().splitlines())), (Path(base_dir) / "labels").read_text().splitlines())),
# key = natural_key) key = natural_key)
return list(filter(None, (Path(base_dir) / "labels").read_text().splitlines()))
def enonce_total(base_dir): def enonce_total(base_dir):
text_dir = Path(base_dir) / 'Text' text_dir = Path(base_dir) / 'Text'
if not text_dir.is_dir(): if not text_dir.is_dir():
return "" return ""
# Exclude .tex and .pdf files files = [f for f in text_dir.iterdir() if f.is_file()]
files = [f for f in text_dir.iterdir()
if f.is_file() and f.suffix.lower() not in ('.tex', '.pdf')]
files.sort(key=lambda f: natural_key(f.name)) files.sort(key=lambda f: natural_key(f.name))
output = [] output = []