204 lines
7.2 KiB
Python
204 lines
7.2 KiB
Python
import shlex
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import argparse
|
|
from pathlib import Path
|
|
from pydantic import BaseModel, Field
|
|
from typing import List
|
|
from google import genai
|
|
from google.genai import types
|
|
|
|
# Bug : l'output est limité à 8k token…
|
|
# MODEL_ID = "gemini-3-flash-preview"
|
|
MODEL_ID = "gemini-3.1-flash-lite"
|
|
api_key = os.environ.get("GEMINI_API_KEY")
|
|
|
|
class QuestionItem(BaseModel):
|
|
label: str = Field(description="The unique label of the question (e.g., '1.a', 'Exercice 1')")
|
|
question_content: str = Field(description="The source text of the question, strictly extracted from the enonce file, EXCLUDING the label itself.")
|
|
solution_content: str = Field(description="The source text of the solution, strictly extracted from the correction file.")
|
|
|
|
class ExamExtraction(BaseModel):
|
|
questions: List[QuestionItem]
|
|
|
|
PROMPT = """I am providing:
|
|
1. A PDF of an exam (`enonce.pdf`)
|
|
2. The source code of the exam questions (`enonce` file)
|
|
3. The source code of the exam solutions (`correction` file)
|
|
|
|
Your task:
|
|
1. Identify all distinct question labels using the PDF document.
|
|
These labels should be unique : use `Ex 1 : 1)a)` or `I)1)b)`.
|
|
2. For each label, extract its exact corresponding question text
|
|
from the `enonce` source file. Do not include the label itself
|
|
in this extracted text (nor LaTeX like `item` nor org-mode list
|
|
labelling like `2.`).
|
|
3. For each label, extract its exact corresponding solution textual
|
|
content from the `correction` source file. Return the result as
|
|
a JSON list in the exact reading order of the document.
|
|
"""
|
|
|
|
def find_file(folder: Path, base_name: str) -> Path:
|
|
for ext in [".org", ".tex"]:
|
|
path = folder / f"{base_name}{ext}"
|
|
if path.is_file():
|
|
return path
|
|
return None
|
|
|
|
def process_exam(folder_path: str):
|
|
folder = Path(folder_path)
|
|
|
|
# 1. Resolve files
|
|
pdf_path = folder / "enonce.pdf"
|
|
enonce_path = find_file(folder, "enonce")
|
|
correction_path = find_file(folder, "correction")
|
|
|
|
missing = []
|
|
if not pdf_path.is_file(): missing.append("enonce.pdf")
|
|
if not enonce_path: missing.append("enonce.org or enonce.tex")
|
|
if not correction_path: missing.append("correction.org or correction.tex")
|
|
|
|
if missing:
|
|
print(f"Error: Missing files in {folder}: {', '.join(missing)}")
|
|
sys.exit(1)
|
|
|
|
print("Reading files...")
|
|
pdf_bytes = pdf_path.read_bytes()
|
|
enonce_text = enonce_path.read_text(encoding="utf-8")
|
|
correction_text = correction_path.read_text(encoding="utf-8")
|
|
|
|
client = genai.Client(api_key=api_key)
|
|
|
|
contents = [
|
|
types.Content(
|
|
role="user",
|
|
parts=[
|
|
types.Part.from_text(text=PROMPT),
|
|
types.Part.from_bytes(data=pdf_bytes, mime_type="application/pdf"),
|
|
types.Part.from_text(text=f"--- ENONCE SOURCE ({enonce_path.name}) ---\n{enonce_text}"),
|
|
types.Part.from_text(text=f"--- CORRECTION SOURCE ({correction_path.name}) ---\n{correction_text}"),
|
|
],
|
|
)
|
|
]
|
|
|
|
config = types.GenerateContentConfig(
|
|
temperature=0.1,
|
|
response_mime_type="application/json",
|
|
response_json_schema=ExamExtraction.model_json_schema(),
|
|
)
|
|
|
|
cache_file = folder / "gemini_response.json"
|
|
|
|
if cache_file.is_file():
|
|
print("Loading cached response from gemini_response.json...")
|
|
response_text = cache_file.read_text(encoding="utf-8")
|
|
else:
|
|
print("Sending request to Gemini...")
|
|
response = client.models.generate_content(
|
|
model=MODEL_ID,
|
|
contents=contents,
|
|
config=config
|
|
)
|
|
response_text = response.text
|
|
|
|
print("Saving response to cache...")
|
|
cache_file.write_text(response_text, encoding="utf-8")
|
|
|
|
# Validate from the text variable (cached or fresh)
|
|
extracted_data = ExamExtraction.model_validate_json(response_text)
|
|
|
|
# 2. Setup output directories
|
|
text_dir = folder / "Text"
|
|
sol_dir = folder / "Sol"
|
|
text_dir.mkdir(exist_ok=True)
|
|
sol_dir.mkdir(exist_ok=True)
|
|
|
|
labels_file = folder / "labels"
|
|
|
|
# Step 1: Write initial labels
|
|
print("Writing initial labels file...")
|
|
with open(labels_file, "w", encoding="utf-8") as flabels:
|
|
for q in extracted_data.questions:
|
|
flabels.write(f"{q.label}\n")
|
|
|
|
# Step 2: Open labels file for user editing
|
|
print("Opening labels file for editing...")
|
|
editor = os.environ.get("EDITOR")
|
|
try:
|
|
if editor:
|
|
subprocess.run(shlex.split(editor) + [str(labels_file)])
|
|
else:
|
|
# Fallbacks if $EDITOR is not set
|
|
if sys.platform.startswith("linux"):
|
|
subprocess.Popen(["xdg-open", str(labels_file)])
|
|
elif sys.platform == "darwin":
|
|
subprocess.Popen(["open", str(labels_file)])
|
|
else:
|
|
os.startfile(str(labels_file))
|
|
|
|
# xdg-open/open usually do not block, so we wait for user confirmation
|
|
input("Press ENTER here once you have saved and closed the labels file...")
|
|
except Exception:
|
|
print("Error running editor, using labels as given.")
|
|
|
|
# Step 3 & 4: Read the edited file back and create a mapping
|
|
with open(labels_file, "r", encoding="utf-8") as flabels:
|
|
edited_lines = [line.strip() for line in flabels if line.strip()]
|
|
|
|
mapping = []
|
|
final_labels = []
|
|
orig_idx = 0
|
|
|
|
for line in edited_lines:
|
|
if line.startswith("+"):
|
|
new_label = line[1:].lstrip()
|
|
final_labels.append(new_label)
|
|
# New label, no source content
|
|
mapping.append((new_label, None))
|
|
else:
|
|
new_label = line
|
|
final_labels.append(new_label)
|
|
# Map to initial order, advancing index only for non-'+' items
|
|
q_item = extracted_data.questions[orig_idx] if orig_idx < len(extracted_data.questions) else None
|
|
mapping.append((new_label, q_item))
|
|
orig_idx += 1
|
|
|
|
# Rewrite the labels file cleanly (removing '+' prefixes)
|
|
with open(labels_file, "w", encoding="utf-8") as flabels:
|
|
for lbl in final_labels:
|
|
flabels.write(f"{lbl}\n")
|
|
|
|
# Step 5: Write the final question and solution files
|
|
print("Writing question and solution files...")
|
|
for new_label, q_item in mapping:
|
|
safe_label = new_label.replace("/", "_")
|
|
|
|
if q_item:
|
|
q_content = q_item.question_content.replace("\\n", "\n")
|
|
s_content = q_item.solution_content.replace("\\n", "\n")
|
|
else:
|
|
q_content = ""
|
|
s_content = ""
|
|
|
|
# Write Text/label
|
|
with open(text_dir / safe_label, "w", encoding="utf-8") as f:
|
|
f.write(f"{new_label}\n{q_content}")
|
|
|
|
# Write Sol/label
|
|
with open(sol_dir / safe_label, "w", encoding="utf-8") as f:
|
|
f.write(f"{new_label}\n{s_content}")
|
|
|
|
print(f"Success! Processed {len(mapping)} labels.")
|
|
|
|
if __name__ == "__main__":
|
|
if not api_key:
|
|
print("Error: GEMINI_API_KEY environment variable is not set.")
|
|
sys.exit(1)
|
|
|
|
parser = argparse.ArgumentParser(description="Extract exam and solution code via Gemini.")
|
|
parser.add_argument("folder", help="Directory containing the exam files")
|
|
|
|
args = parser.parse_args()
|
|
process_exam(args.folder)
|