Compare commits

..

10 Commits

19 changed files with 22180 additions and 204 deletions

136
Script.org Normal file
View File

@ -0,0 +1,136 @@
#+title: Script
#+author: Sébastien Miquel
#+date: 14-03-2026
# Time-stamp: <22-04-26 10:54>
#+OPTIONS:
NEEDS :
- `export GEMINI_API_KEY=…`
- fichier `names` in the directory.
* Prétraitement
1. Rotate every single page 180
`./rotate_all.sh Interro14`
2. `./rename_to_copie.sh Interro14`
3. `python page_splitter.py Interro`.
Fix issues with `python page_splitter.py Interro14/Copie01.pdf`
4. `python cutleft.py Interro`
Rerun on a single file with `python cutleft.py Interro/Copie01.pdf`
5. `python enonce_info.py Interro`
* Labelisation et regroupement
Set proxy with `export HTTPS_PROXY="http://192.168.241.1:3128"`
7. `python gemini_for_labels.py Interro`, avec éventuellement `--overwrite`
8. Vérification visuelle : `python plotting.py Interro`
`python plotting.py InterroTest/Copie01.pdf`
It also generates les `Copie01.json`, à partir des `Copie01_01.json`
In case of issue, you may need to
- Reorder the pdf
- Run `python cutleft.py Interro/Copie`
- Run `python gemini_dir_batching.py Interro/Copie`
9. `python splitting_int.py Interro`
10. `python grouping.py Interro`
* Correction et annotation
Success! Batch Job Name: batches/0hc83m3anayrs5iljygg2v6ozsvxz58k6a5r
Processing batch_requests_pro.jsonl for model gemini-3.1-pro-preview...
Uploading file...
Uploaded successfully! File ID: files/oasj4aty5kco
Starting batch job...
Success! Batch Job Name: batches/8pk4m2snr17n31pun3vwzn646qvtkj8ao192
Set proxy with `export HTTPS_PROXY="http://10.0.0.1:3128"`
1. Il faut créer des persp, pour indication de comment corriger, et
relancer `enonce_info.py`
2. `python correction.py Interro --limit 240` OU
`python correction.py Interro/Ex\ 2/Group_1.jpg` OU
`python correction.py Interro --overwrite`
Will it resume ? It seems so. Best to wait a bit.
To batch it :
+ `python correction.py Interro --batch`
+ `python submit_batches.py Interro`
+ `python batch_status.py`
+ `python fetch_batched_results.py DS08VB`
+ `python correction.py DS08VB --deal-with-batched`
3. Try `python post-correction.py Interro` ; It makes a
`fixed_correction.json`, to check.
4. Facultatif : `python annotating.py Interro` dans `Anot`, pass `--overwrite`
5.
+ `python annotating_with_checks.py Interro` dans `Bnot`, pass `--overwrite`
OU
+ `python annotating_by_label.py Interro` dans `BGnot`
_Needs_ : label_groups file. (made automatically by this function)
6. `python to_tablette.py Interro`
Cela déplace les groupes dans `SyncCopies/À Annoter`.
- Les mettre dans le dossier racine de la tablette, et renommer en `aaa`.
- Vider `Syncthing/Annotées` sur la tablette et localement.
À automatiser, aussi c'est lent…
* Lecture de la correction manuelle
16. Manually : delete `~/SyncCopies/Annotées`, copy from the tablette to here.
Then `python from_tablette.py Interro`
17.
+ `python reading_annotations.py Interro`
OU
+ `python reading_grouped_annotations.py Interro`
18. `python giving_names.py InterroTest BGnot`
It will make `A Rendre` with symlink to the Concat.jpg file
either in Anot or Bnot, and score.json.
+ In case of Unknown : rename both directory and file inside.
+ Here, you can change `score.json` manually.
19.
+ `gestion_classe ne` pour créer l'interro puis
+ `gestion_classe we` (set barème here)
+ `python update_ods.py Interro`
+ `gestion_classe re`
+ `gestion_classe wsent`
+ `python add_final_score.py Interro21`
(this makes files in `Server/copies`)
20.
+ Deploy `miqmacs-copies-assets`, and
+ update the copies from `miqmacs.fr/admin`.
* Recorrection d'une seule copie
!! Attention, refaire ne marchera pas si tu fais une annotation non
groupée into refaire !!
1. Redécoupage
+ `python plotting.py InterroTest/Copie01.pdf`
+ `python splitting_int.py InterroTest/Copie20.pdf`
2. Créer `refaire.json`, avec un contenu comme
[["Copie01", []],
["Copie01", ["Ex 1 : 1)"]]]
3. Appeler `correction` avec --refaire. Il doit créer des groupes
individuels, faire des requêtes, et remplacer les corrections
précédentes (à sauver ailleurs).
Ou non, si tu veux le faire à la main.
4. ?? Si je fais refaire, avant d'avoir créer les annotating with
checks, que se passe-t-il ???
5. Appeler `annotating_with_checks.py --refaire --overwrite` avec --refaire.
6. `python to_tablette.py --refaire Interro24`
6. `python from_tablette.py --refaire Interro24`
7. `python reading_grouped_annotations.py --refaire Interro24`
* Install
#+BEGIN_SRC bash
pip install highlight-text --break-system-packages
#+END_SRC

View File

@ -2,6 +2,7 @@ import argparse
import math
import sys
import os
import shutil
from pathlib import Path
import pandas as pd
from PIL import Image, ImageDraw, ImageFont
@ -96,6 +97,13 @@ def process_images(base_dir):
except Exception as e:
print(f"Error processing image for '{student_name}': {e}")
for pdf_path in sorted(search_path.glob("*/*.pdf")):
student_name = pdf_path.stem # Filename without extension
save_path = OUTPUT_DIR / f"{student_name}.pdf"
shutil.copy(str(pdf_path), str(save_path))
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Stamp scores on exam copies.")
parser.add_argument("dir", type=Path, help="Root directory containing 'A Rendre' folder")

View File

@ -12,7 +12,7 @@ ANNOT_WIDTH = 600
# Results is : Copie id -> label -> {pdf_path, gemini_result, coordinates}
# Coordinates are the real coordinates (hmin, hmax) of the image in the Group
# The gemini_result coordinates should be un-normalized !
def make_dictionary(root_dir):
def make_dictionary(root_dir, refaire=False, refaire_list=[]):
correction_path = os.path.join(root_dir, "correction.json")
# Load correction data
@ -81,6 +81,48 @@ def make_dictionary(root_dir):
"coordinates": coordinates
}
if refaire:
for copie_name, labels_to_redo in refaire_list:
sid = copie_name.replace("Copie", "") # Extract "01" from "Copie01"
if sid in result_data:
# Si des labels à refaire ne sont pas présent dans la correction
# On ajoute des dummies
if labels_to_redo: # Si la liste est non vide
for lbl in labels_to_redo:
pdf_path = os.path.join(root_dir,
f"Copie{sid}", f"{lbl}.pdf")
if not Path(pdf_path).exists():
print("Debug : asked to refaire", sid, lbl, "but pdf absent")
continue
result_data[sid][lbl] = {
"pdf_path": pdf_path,
"result": {
"score": 0.0,
"confidence": 1.0,
"feedback": [],
"error": "non traité"
},
"coordinates": (0,0)
}
else: # Ce student id n'a jamais été corrigé
result_data[sid] = {}
for lbl in labels_to_redo:
pdf_path = os.path.join(root_dir,
f"Copie{sid}", f"{lbl}.pdf")
if not pdf_path.exists():
print("Debug : asked to refaire", sid, lbl, "but pdf absent")
continue
result_data[sid][lbl] = {
"pdf_path": pdf_path,
"result": {
"score": 0.0,
"confidence": 1.0,
"feedback": [],
"error": "non traité"
},
"coordinates": (0,0)
}
return result_data
def make_base_image(pdf_path):

View File

@ -12,8 +12,7 @@ import annotating_with_checks
from utils import natural_key
# Roughly 10 A4 pages at 100 DPI
MAX_HEIGHT_PX = 20000 # Can be increased by 10%.
MAX_HEIGHT_PX = 25000 # Can be increased by 10%.
def render_item(item):
student_id, label, content = item
@ -134,15 +133,32 @@ def main():
shutil.rmtree(bgnot_dir)
os.makedirs(bgnot_dir, exist_ok=True)
used_prefixes = set()
previous_prefix = None
for line in lines:
labels = [l.strip() for l in line.split(',') if l.strip()]
safe_labels = [l.replace(":", "").strip() for l in line.split(',') if l.strip()]
if not labels:
continue
prefix = os.path.commonprefix(safe_labels).strip()
if not prefix:
prefix = "Group"
base_prefix = os.path.commonprefix(safe_labels).strip()
if not base_prefix:
base_prefix = "Group"
unique_prefix = base_prefix
if unique_prefix[-1] == "i":
unique_prefix = unique_prefix[:-1]
counter = 2
while unique_prefix in used_prefixes:
unique_prefix = f"{base_prefix}-{counter}"
counter += 1
if counter == 2 and previous_prefix and previous_prefix in unique_prefix:
unique_prefix = f"{previous_prefix}-{counter}"
elif counter == 2:
previous_prefx = unique_prefix
used_prefixes.add(unique_prefix)
items_to_render = []
for sid, lbls in results.items():
@ -201,7 +217,7 @@ def main():
batches = batches2
for i, batch in enumerate(batches, 1):
save_batch(batch, prefix, i, root_dir, args.overwrite)
save_batch(batch, unique_prefix, i, root_dir, args.overwrite)
if __name__ == "__main__":
main()

View File

@ -108,9 +108,9 @@ from utils import natural_key
def process_student(args):
"""Thread worker: Processes one student."""
root_dir, student_id, labels, overwrite = args
root_dir, student_id, labels, overwrite, sub_folder = args
output_dir = os.path.join(root_dir, "Bnot", f"Copie{student_id}")
output_dir = os.path.join(root_dir, sub_folder, f"Copie{student_id}")
if os.path.exists(output_dir):
if not overwrite:
@ -230,14 +230,16 @@ if __name__ == "__main__":
else:
root_dir = input_path
results = annotating.make_dictionary(root_dir)
if not args.refaire:
results = annotating.make_dictionary(root_dir)
# --- ADD THE REFAIRE BLOCK HERE ---
if args.refaire:
refaire_path = os.path.join(root_dir, "refaire.json")
if os.path.exists(refaire_path):
with open(refaire_path, "r", encoding="utf-8") as f:
refaire_list = json.load(f)
results = annotating.make_dictionary(root_dir,
refaire=True,refaire_list=refaire_list)
filtered_results = {}
for copie_name, labels_to_redo in refaire_list:
@ -262,7 +264,10 @@ if __name__ == "__main__":
print(f"Student ID {target_id} not found in directory scan.")
results = {}
tasks = sorted([(root_dir, sid, lbls, overwrite) for sid, lbls in results.items()])
sub_folder = "BRnot" if args.refaire else "Bnot"
tasks = sorted([(root_dir, sid, lbls, overwrite, sub_folder)
for sid, lbls in results.items()])
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
results = executor.map(process_student, tasks)

88
batch_status.py Normal file
View File

@ -0,0 +1,88 @@
import os
import sys
import argparse
from google import genai
if "GEMINI_API_KEY" not in os.environ:
sys.exit("Error: GEMINI_API_KEY environment variable not set.")
client = genai.Client()
def list_jobs():
print("Fetching recent batch jobs...\n")
try:
batch_jobs = client.batches.list()
jobs_found = False
for job in batch_jobs:
jobs_found = True
state = job.state.name if hasattr(job.state, 'name') else job.state
print("-" * 60)
print(f"Job Name: {job.name}")
if hasattr(job, 'display_name') and job.display_name:
print(f"Display Name: {job.display_name}")
print(f"State: {state}")
if state == 'JOB_STATE_FAILED' and hasattr(job, 'error'):
print(f"Error: {job.error}")
if state == 'JOB_STATE_SUCCEEDED' and hasattr(job, 'dest') and job.dest:
if hasattr(job.dest, 'file_name') and job.dest.file_name:
print(f"Output File: {job.dest.file_name}")
if not jobs_found:
print("No batch jobs found.")
else:
print("-" * 60)
print("\nTo download a completed job, run:")
print("python batch_status.py --download batches/<YOUR_BATCH_ID>")
except Exception as e:
sys.exit(f"An error occurred while listing jobs: {e}")
def download_job(job_name):
print(f"Checking status for {job_name}...\n")
try:
job = client.batches.get(name=job_name)
state = job.state.name if hasattr(job.state, 'name') else job.state
print(f"State: {state}")
if state != 'JOB_STATE_SUCCEEDED':
print("Job is not ready yet or has failed.")
if state == 'JOB_STATE_FAILED' and hasattr(job, 'error'):
print(f"Error: {job.error}")
return
if hasattr(job, 'dest') and job.dest and hasattr(job.dest, 'file_name') and job.dest.file_name:
result_file_name = job.dest.file_name
print(f"Downloading results from {result_file_name}...")
file_content_bytes = client.files.download(file=result_file_name)
output_path = f"results_{job_name.replace('/', '_')}.jsonl"
with open(output_path, "wb") as f:
f.write(file_content_bytes)
print(f"Success! Saved to {output_path}")
print(f"You can now feed this to your correction script using: --deal-with-batched {output_path}")
else:
print("Job succeeded but no output file was found.")
except Exception as e:
sys.exit(f"An error occurred while fetching the job: {e}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Manage Gemini Batch Jobs")
parser.add_argument("--download", type=str, metavar="JOB_NAME",
help="Download the results for a specific batch job (e.g. batches/123456)")
args = parser.parse_args()
if args.download:
download_job(args.download)
else:
list_jobs()

View File

@ -18,6 +18,10 @@ parser.add_argument("--overwrite", action="store_true",
parser.add_argument("--limit", type=int, help="limit calls to gemini rpo integer")
parser.add_argument("--refaire", action="store_true",
help="Redo specific copies/labels defined in refaire.json")
parser.add_argument("--batch", action="store_true",
help="Generate a JSONL file of requests to send to the Gemini Batch API")
parser.add_argument("--deal-with-batched", action="store_true",
help="Process a JSONL file containing completed batch results")
args, _ = parser.parse_known_args()
@ -113,7 +117,8 @@ list \"feedback\", and possibly an \"error\". Like this example :
}
]
Here is the text of the exercice of the exam :
Here is the text of the exercice (or the relevant part of the problem)
of the exam :
```
<<text>>
@ -123,25 +128,31 @@ Here is a possible correct answer :
```
<<corr>>
```
Here is some additional scoring instructions :
```
<<persp>>
```
You are asked to score the question or exercice labeled `<<label>>`,
do not score or give feedback to any other question."""
def make_prompt(full_label):
l = full_label.split(" ")
ex_label = l[0] + " " + l[1]
text = (Path(INPUT_DIR) / "Text" / ex_label).read_text()
corr = (Path(INPUT_DIR) / "Sol" / ex_label).read_text()
persp = (Path(INPUT_DIR) / "Persp" / ex_label).read_text()
if persp == "":
perps = "There is no additional scoring instructions."
# l = full_label.split(" ")
# ex_label = l[0] + " " + l[1]
# text = (Path(INPUT_DIR) / "Text" / ex_label).read_text()
# corr = (Path(INPUT_DIR) / "Sol" / ex_label).read_text()
# persp = (Path(INPUT_DIR) / "Persp" / ex_label).read_text()
def read_longest_prefix_file(subdir):
dir_path = Path(INPUT_DIR) / subdir
matches = [f for f in dir_path.iterdir() if f.is_file() and full_label.startswith(f.name)]
if not matches:
return ""
return max(matches, key=lambda f: len(f.name)).read_text()
text = read_longest_prefix_file("Text")
corr = read_longest_prefix_file("Sol")
persp = read_longest_prefix_file("Persp")
if persp != "":
persp = "\n\nHere are additional scoring instructions : \n\n```\n" + persp +"\n```\n"
return my_prompt.replace("<<text>>", text).replace("<<corr>>", corr).replace("<<persp>>", persp).replace("<<label>>", full_label)
from google import genai
@ -225,6 +236,44 @@ class EvaluationEntry(BaseModel):
id: str = Field(description="Entry identifier")
result: ResultData = Field(description="Result details")
# These nested definitions do not work with the batch api, unroll them
UNROLLED_SCHEMA = {
"type": "ARRAY",
"items": {
"type": "OBJECT",
"properties": {
"id": {"type": "STRING", "description": "Entry identifier"},
"result": {
"type": "OBJECT",
"properties": {
"score": {"type": "NUMBER", "description": "The numeric score"},
"confidence": {"type": "NUMBER", "description": "Confidence level"},
"error": {"type": "STRING", "description": "Indicates if an error occurred"},
"feedback": {
"type": "ARRAY",
"description": "List of feedback items",
"items": {
"type": "OBJECT",
"properties": {
"text": {"type": "STRING", "description": "Feedback content"},
"box_2d": {
"type": "ARRAY",
"items": {"type": "INTEGER"},
"nullable": True,
"description": "box coordinates or null"
}
},
"required": ["text"]
}
}
},
"required": ["score", "confidence", "feedback", "error"]
}
},
"required": ["id", "result"]
}
}
# The root model for parsing is be: List[EvaluationEntry]
def generate_request(file, full_label):
"""Generates request for Gemini."""
@ -548,7 +597,7 @@ Here is a list of all possible labels. You need to answer with a list one of the
return new_tasks
def process_single_task(task_tuple):
def process_single_task(task_tuple, precomputed_response=None):
try:
global pro_count, flash_count, pro_quota_exhausted
file_path = task_tuple[0]
@ -567,25 +616,32 @@ def process_single_task(task_tuple):
total_height = group_data[-1][2]
use_flash = n >= 4 or total_height <= 500
if not use_flash:
with pro_lock:
if pro_quota_exhausted:
use_flash = True
elif limit is None or pro_count < limit:
pro_count += 1
else:
use_flash = True
# Only apply limits and counts if we are making a live call
if precomputed_response is None:
if not use_flash:
with pro_lock:
if pro_quota_exhausted:
use_flash = True
elif limit is None or pro_count < limit:
pro_count += 1
else:
use_flash = True
if use_flash:
with pro_lock:
flash_count += 1
if use_flash:
with pro_lock:
flash_count += 1
try:
contents, config = generate_request(file_path, label)
model_to_use = MODEL_ID_flash if use_flash else MODEL_ID_pro
tprint(f"Asking Gemini {'Flash' if use_flash else 'Pro '}: {label} {group_name}")
full_response_text = call_gemini_with_retries(model_to_use, contents, config)
if precomputed_response:
tprint(f"Using batched response for: {label} {group_name}")
full_response_text = precomputed_response
else:
tprint(f"Asking Gemini {'Flash' if use_flash else 'Pro '}: {label} {group_name}")
full_response_text = call_gemini_with_retries(model_to_use, contents, config)
json_data = json.loads(full_response_text)
# Ensure consistency of answer placements
@ -623,25 +679,6 @@ def process_single_task(task_tuple):
needs_correction.append(i)
break
#
# if ymin < yming-50 or ymax > ymaxg+50:
# print("Error : Gemini answered box2d too low/up", pid, label, group_name)
# if ymax < yming or ymin > ymaxg:
# print("Removing the box.")
# f["box_2d"] = None
# continue
# nymin = max(ymin, yming) * 1000 // total_height
# nymax = min(ymax, ymaxg) * 1000 // total_height
# f["box_2d"] = [nymin, xmin, nymax, xmax]
# if f["box_2d"] and xmax / 1000 > width_r:
# print("Error : Gemini answered box2d too right", pid, label, group_name)
# if xmin / 1000 > width_r:
# print("Removing the box.")
# f["box_2d"] = None
# continue
# f["box_2d"][3] = int(width_r * 1000)
if needs_correction:
tprint(f"\tBox anomalies detected for Copie {pid} {group_name}. \n\tRequesting isolated correction from Gemini Flash...")
try:
@ -742,10 +779,94 @@ if __name__ == "__main__":
else:
print(f"Warning: --refaire flag used, but {refaire_path} not found.", file=sys.stderr)
print(f"Starting processing on {len(tasks_to_process)} tasks with {NB_THREADS} threads...")
if args.batch:
batch_flash_file = Path(INPUT_DIR) / "batch_requests_flash.jsonl"
batch_pro_file = Path(INPUT_DIR) / "batch_requests_pro.jsonl"
count_flash = 0
count_pro = 0
with open(batch_flash_file, "w", encoding="utf-8") as f_flash, \
open(batch_pro_file, "w", encoding="utf-8") as f_pro:
for task in tasks_to_process:
file_path, label = task[0], task[1]
group_name = os.path.splitext(file_path)[0]
json_path = group_name + '.json'
with open(json_path, 'r') as jf:
group_data = json.load(jf)
use_flash = len(group_data) >= 4 or group_data[-1][2] <= 500
image_data = Path(file_path).read_bytes()
b64_img = base64.b64encode(image_data).decode("utf-8")
# Format payload matching Gemini Batch API file requirements
req = {
"key": file_path, # The ID returned in the output file
"request": {
"contents": [{
"role": "user",
"parts": [
{"inlineData": {"mimeType": "image/jpeg", "data": b64_img}},
{"text": make_prompt(label)}
]
}],
"generation_config": {
"temperature": 1.0,
"topP": 0.95,
"maxOutputTokens": 65535,
"responseMimeType": "application/json",
"responseSchema": UNROLLED_SCHEMA
# TypeAdapter(List[EvaluationEntry]).json_schema()
}
}
}
if use_flash:
f_flash.write(json.dumps(req) + "\n")
count_flash += 1
else:
f_pro.write(json.dumps(req) + "\n")
count_pro += 1
print(f"Batch generation complete.")
print(f" - {count_flash} requests saved to {batch_flash_file} (for {MODEL_ID_flash})")
print(f" - {count_pro} requests saved to {batch_pro_file} (for {MODEL_ID_pro})")
print("Upload these files via the File API and create two separate batch jobs.")
sys.exit(0)
batched_responses = {}
if args.deal_with_batched:
batch_results_path = Path(INPUT_DIR) / "batched_correction_result.jsonl"
if batch_results_path.exists():
print(f"Loading batch results from {batch_results_path}...")
with open(batch_results_path, "r", encoding="utf-8") as f:
for line in f:
if not line.strip(): continue
data = json.loads(line)
task_id = data.get("key") # Corresponds to the key sent in the request
if "response" in data:
try:
# Extract the JSON response text per standard Batch API schema
resp_text = data["response"]["candidates"][0]["content"]["parts"][0]["text"]
batched_responses[task_id] = resp_text
except (KeyError, IndexError) as e:
print(f"Warning: Could not parse response for {task_id}: {e}", file=sys.stderr)
elif "error" in data:
print(f"Batch API Error for {task_id}: {data['error']}", file=sys.stderr)
else:
print(f"Warning: Batch results file {batch_results_path} not found.", file=sys.stderr)
print(f"Starting processing on {len(tasks_to_process)} tasks with {NB_THREADS} threads...")
with concurrent.futures.ThreadPoolExecutor(max_workers=NB_THREADS) as executor:
futures = {executor.submit(process_single_task, task): task for task in tasks_to_process}
futures = {}
for task in tasks_to_process:
file_path = task[0]
precomp = batched_responses.get(file_path)
futures[executor.submit(process_single_task, task, precomp)] = task
# Process tasks as they complete, allowing dynamic task addition
for future in concurrent.futures.as_completed(futures):
@ -753,11 +874,11 @@ if __name__ == "__main__":
new_generated_tasks = future.result()
if new_generated_tasks:
for new_task in new_generated_tasks:
# New tasks from wrong-label/additional-answer will fallback to live API
futures[executor.submit(process_single_task, new_task)] = new_task
except Exception as e:
print(f"Exception during task execution: {e}", file=sys.stderr)
end_time = time.time()
print("Time elapsed : ", end_time - start_time)
print("Requests to pro / flash : ", pro_count, flash_count)

View File

@ -3,27 +3,163 @@ import os
import glob
import json
import urllib.request
import re
import subprocess
import tempfile
import shutil
def compile_to_pdf(text, output_pdf_path): # 21 cm + 3.8 (dimension de la marge de gauche)
"""Wraps text in a standalone template and compiles it to PDF."""
latex_template = f"""\\documentclass[varwidth=24.8cm,margin=0.4cm]{{standalone}}
\\usepackage[utf8]{{inputenc}}
\\usepackage[T1]{{fontenc}}
\\usepackage{{lmodern}}
\\usepackage{{amsmath, amssymb}}
\\usepackage{{commands}}
\\usepackage{{graphicx}}
\\usepackage{{enumitem}}
\\begin{{document}}
\\begin{{minipage}}{{24.8cm}}
{text}
\\end{{minipage}}
\\end{{document}}
"""
with tempfile.TemporaryDirectory() as temp_dir:
tex_filename = 'text.tex'
pdf_filename = 'text.pdf'
tex_path = os.path.join(temp_dir, tex_filename)
with open(tex_path, 'w', encoding='utf-8') as f:
f.write(latex_template)
# Set TEXINPUTS so pdflatex can find commands.sty if it's in the current dir
# env = os.environ.copy()
# current_dir = os.getcwd()
# env['TEXINPUTS'] = f".:{current_dir}:"
try:
subprocess.run(
['pdflatex', '-interaction=nonstopmode', tex_filename],
cwd=temp_dir,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False
)
generated_pdf = os.path.join(temp_dir, pdf_filename)
if os.path.exists(generated_pdf):
shutil.move(generated_pdf, output_pdf_path)
except Exception as e:
print(f"Compilation error for {output_pdf_path}: {e}")
def fetch_and_save_sub_text(ex_id, indices, label, text_path):
"""Fetches text for a specific sub-question and saves it to Text/{label}.tex"""
qinds = ",".join(map(str, indices))
url = f"http://localhost:8080/exercices/exo_q_text/{ex_id}/{qinds}"
try:
with urllib.request.urlopen(url) as response:
content = response.read().decode('utf-8')
content = replace_dots(content.strip("\n"))
with open(os.path.join(text_path, f"{label}.tex"), 'w', encoding='utf-8') as f:
f.write(content)
# Compile PDF
pdf_file = os.path.join(text_path, f"{label}.pdf")
compile_to_pdf(content, pdf_file)
except Exception as e:
print(f"Error fetching sub-text from {url}: {e}")
def fetch_and_save_sub_sol(ex_id, indices, label, sol_path):
"""Fetches text for a specific sub-question and saves it to Text/{label}.tex"""
qinds = ",".join(map(str, indices))
url = f"http://localhost:8080/exercices/exo_q_sol/{ex_id}/{qinds}"
try:
with urllib.request.urlopen(url) as response:
content = response.read().decode('utf-8')
content = replace_dots(content.strip("\n"))
with open(os.path.join(sol_path, f"{label}.tex"), 'w', encoding='utf-8') as f:
f.write(content)
# Compile PDF
pdf_file = os.path.join(sol_path, f"{label}.pdf")
compile_to_pdf(content, pdf_file)
except Exception as e:
print(f"Error fetching sub-text from {url}: {e}")
ROMANS_CAP = ["", "I", "II", "III", "IV", "V", "VI", "VII", "VIII", "IX", "X"]
ROMANS_LOW = ["", "i", "ii", "iii", "iv", "v", "vi", "vii", "viii", "ix", "x"]
def replace_dots(text):
# (?m) enables multiline mode so ^ matches start of each line
return re.sub(r"(?m)^(\s*.)\.", r"\1)", text)
def replace_problem_labels(text):
"""Replaces labels according to spaces depth when problem=True."""
def repl(m):
spaces = m.group(1)
label = m.group(2)
n = len(spaces)
try:
if n == 1 and label.isdigit(): # 1 space: 1) -> I)
return f"{spaces}{ROMANS_CAP[int(label)]})"
elif n == 4 and label.isalpha(): # 4 spaces: a) -> 1)
return f"{spaces}{ord(label.lower()) - 96})"
elif n == 7 and label.isdigit(): # 7 spaces: 1) -> a)
return f"{spaces}{chr(96 + int(label))})"
elif n == 10 and label.isdigit(): # 10 spaces: 1) -> i)
return f"{spaces}{ROMANS_LOW[int(label)]})"
except (IndexError, ValueError):
pass
return m.group(0)
def format_indices(indices):
"""Converts [2, 1] to '2)a)' based on requirements."""
if not indices:
return ""
# Matches start of line, spaces, alphanumeric label, and closing parenthesis
return re.sub(r"(?m)^([ \t]+)([a-zA-Z0-9]+)\)", repl, text)
# First level: numeric (1 -> 1))
res = f"{indices[0]})"
def format_indices(indices, problem=False):
if not indices: return ""
if not problem:
res = f"{indices[0]})"
if len(indices) > 1: res += f"{chr(96 + indices[1])})"
if len(indices) > 2: res += f"{ROMANS_LOW[indices[2]]})"
return res
else:
res = ""
if len(indices) > 0: res += f"{ROMANS_CAP[indices[0]]})"
if len(indices) > 1: res += f"{indices[1]})"
if len(indices) > 2: res += f"{chr(96 + indices[2])})"
if len(indices) > 3: res += f"{ROMANS_LOW[indices[3]]})"
return res
# Second level: alpha (1 -> a))
if len(indices) > 1:
res += f"{chr(96 + indices[1])})"
return res
def save_split_content(text, path, base_fname, problem):
# Always save the main aggregated file
with open(os.path.join(path, base_fname), 'w', encoding='utf-8') as f:
f.write(text)
pattern = re.compile(r"(?m)^([ \t]+)([a-zA-Z0-9]+)\)")
all_matches = list(pattern.finditer(text))
target_spaces = 4 if problem else 1
splits = [m for m in all_matches if len(m.group(1)) == target_spaces]
for i, match in enumerate(splits):
start_idx = match.start()
end_idx = splits[i+1].start() if i + 1 < len(splits) else len(text)
chunk = text[start_idx:end_idx].strip("\n")
label = match.group(2) + ")"
if problem:
# Find the most recent 1-space match before this 4-space match
sec_match = next((m for m in reversed(all_matches)
if len(m.group(1)) == 1 and m.start() < match.start()), None)
if sec_match:
label = f"{sec_match.group(2)}){label}"
sub_fname = f"{base_fname} : {label}"
with open(os.path.join(path, sub_fname), 'w', encoding='utf-8') as f:
f.write(chunk)
def process_directory(directory):
@ -31,11 +167,8 @@ def process_directory(directory):
tex_files = glob.glob(os.path.join(directory, "*.tex"))
if not tex_files:
print(f"No .tex file found in {directory}. Looking in /Staging/Interro/")
if directory[-1] == "/":
int_name = directory[:-1]
else:
int_name = directory
tex_path = os.path.join("~/Prépa/Staging/Interro/", int_name, ".tex")
int_name = directory[:-1] if directory.endswith("/") else directory
tex_path = os.path.join(os.path.expanduser("~"), "Prépa/Staging/Interro/", int_name, ".tex")
if os.path.exists(tex_path):
tex_file = tex_path
else:
@ -56,69 +189,98 @@ def process_directory(directory):
labels_file = os.path.join(directory, "labels")
current_ex_num = 1
# Read entirely to allow chunking
with open(tex_file, 'r', encoding='utf-8') as f_in:
content = f_in.read()
with open(tex_file, 'r', encoding='utf-8') as f_in, \
open(labels_file, 'w', encoding='utf-8') as f_labels:
for line in f_in:
if line.startswith("%%SHEETINFO :"):
try:
json_str = line.split(":", 1)[1].strip()
data = json.loads(json_str)
# Split by the specific SHEETINFO tag
blocks = content.split("%%SHEETINFO :")
# 2. Handle Labels
indexes = data.get('indexes', [])
if not indexes:
f_labels.write(f"Ex {current_ex_num}\n")
else:
for item in indexes:
suffix = format_indices(item['indices'])
if suffix != "":
f_labels.write(f"Ex {current_ex_num} : {suffix}\n")
else:
f_labels.write(f"Ex {current_ex_num}\n")
with open(labels_file, 'w', encoding='utf-8') as f_labels:
# Skip blocks[0] (content before first SHEETINFO)
for block in blocks[1:]:
parts_line = block.split("\n", 1)
json_str = parts_line[0].strip()
block_content = parts_line[1] if len(parts_line) > 1 else ""
# Construct 'ids' parameter
ex_id = str(data['id'])
selection = data.get('select')
# Check if text until next SHEETINFO block contains \Roman
problem = r"\Roman" in block_content
if selection is not None:
# Format: "ID.sel1,sel2"
sel_s = [i+1 for i in selection]
ids = f"{ex_id}.{','.join(map(str, sel_s))}"
else:
ids = ex_id
if not json_str: continue
# Construct URL
url = f"http://localhost:8080/exercices/emacs/{ids}?pretty=true&all=true&persp=true"
try:
data = json.loads(json_str)
# Construct 'ids' parameter
ex_id = str(data['id'])
selection = data.get('select')
# Perform GET request
with urllib.request.urlopen(url) as response:
content = response.read().decode('utf-8')
if selection is not None:
sel_s = [i+1 for i in selection]
ids = f"{ex_id}.{','.join(map(str, sel_s))}"
else:
ids = ex_id
# 4. Split and Save content
parts = content.split('###')
# Ensure we have at least 3 parts, pad if necessary to avoid crashes
while len(parts) < 3:
parts.append("")
# 2. Handle Labels
indexes = data.get('indexes', [])
if not indexes:
label = f"Ex {current_ex_num}"
f_labels.write(f"{label}\n")
fetch_and_save_sub_text(ids, [], label, paths['Text'])
fetch_and_save_sub_sol(ids, [], label, paths['Sol'])
else:
for item in indexes:
suffix = format_indices(item['indices'], problem)
label = f"Ex {current_ex_num}" + (f" : {suffix}" if suffix else "")
f_labels.write(f"{label}\n")
fetch_and_save_sub_text(ids, item['indices'], label, paths['Text'])
fetch_and_save_sub_sol(ids, item['indices'], label, paths['Sol'])
base_filename = f"Ex {current_ex_num}"
# Construct URL (append pb=true if \Roman matched)
url = f"http://localhost:8080/exercices/emacs/{ids}?pretty=true&all=true&persp=true"
# if problem:
# url += "&pb=true"
# Perform GET request
with urllib.request.urlopen(url) as response:
res_content = response.read().decode('utf-8')
# 4. Split and Save content
parts = res_content.split('###')
# Ensure we have at least 3 parts
while len(parts) < 3:
parts.append("")
t_text = replace_dots(parts[0].strip("\n"))
s_text = replace_dots(parts[1].strip("\n"))
p_text = replace_dots(parts[2].strip("\n"))
# Apply hierarchy depth replace if problem context
if problem:
t_text = replace_problem_labels(t_text)
s_text = replace_problem_labels(s_text)
p_text = replace_problem_labels(p_text)
base_filename = f"Ex {current_ex_num}"
if problem:
save_split_content(t_text, paths['Text'], base_filename, False)
else:
with open(os.path.join(paths['Text'], base_filename), 'w', encoding='utf-8') as f:
f.write(replace_dots(parts[0].strip("\n")))
f.write(t_text)
with open(os.path.join(paths['Sol'], base_filename), 'w', encoding='utf-8') as f:
f.write(replace_dots(parts[1].strip("\n")))
with open(os.path.join(paths['Persp'], base_filename), 'w', encoding='utf-8') as f:
f.write(replace_dots(parts[2].strip("\n")))
save_split_content(s_text, paths['Sol'], base_filename, problem)
save_split_content(p_text, paths['Persp'], base_filename, problem)
current_ex_num += 1
current_ex_num += 1
except json.JSONDecodeError:
print(f"Error decoding JSON in line: {line.strip()}")
except Exception as e:
print(f"Error processing {ids}: {e}")
except json.JSONDecodeError:
print(f"Error decoding JSON in block: {json_str}")
except Exception as e:
print(f"Error processing block {ex_id if 'ex_id' in locals() else 'unknown'}: {e}")
if __name__ == "__main__":
if len(sys.argv) < 2:

63
fetch_batched_results.py Normal file
View File

@ -0,0 +1,63 @@
import os
import sys
import argparse
from pathlib import Path
from google import genai
def main():
parser = argparse.ArgumentParser(description="Download and combine completed batch jobs for a directory.")
parser.add_argument("root_dir", type=str, help="Directory containing the original batches")
args = parser.parse_args()
target_dir = Path(args.root_dir)
dir_name = target_dir.name
output_path = target_dir / "batched_correction_result.jsonl"
if "GEMINI_API_KEY" not in os.environ:
sys.exit("Error: GEMINI_API_KEY environment variable not set.")
client = genai.Client()
print(f"Fetching jobs matching '{dir_name}'...")
all_jobs = client.batches.list()
matching_jobs = []
# 1. Find jobs associated with this directory
for job in all_jobs:
if hasattr(job, 'display_name') and job.display_name and dir_name in job.display_name:
matching_jobs.append(job)
if not matching_jobs:
sys.exit(f"No batch jobs found containing '{dir_name}' in their display name.")
# 2. Check that all matching jobs are complete
for job in matching_jobs:
state = job.state.name if hasattr(job.state, 'name') else job.state
print(f"Found Job: {job.display_name} | State: {state}")
if state != 'JOB_STATE_SUCCEEDED':
sys.exit(f"Error: Job '{job.display_name}' has not succeeded yet. Try again later.")
# 3. Download and concatenate
print("\nAll jobs succeeded. Downloading results...")
combined_data = b""
for job in matching_jobs:
if hasattr(job, 'dest') and job.dest and hasattr(job.dest, 'file_name') and job.dest.file_name:
print(f"Downloading output for {job.display_name}...")
file_content_bytes = client.files.download(file=job.dest.file_name)
combined_data += file_content_bytes
# Ensure proper line separation between files in JSONL
if combined_data and not combined_data.endswith(b'\n'):
combined_data += b'\n'
else:
print(f"Warning: Job {job.display_name} succeeded but has no output file.")
# 4. Save to destination
with open(output_path, "wb") as f:
f.write(combined_data)
print(f"\nSuccess! All results concatenated and saved to:\n{output_path}")
if __name__ == "__main__":
main()

View File

@ -2,8 +2,12 @@ import sys
import shutil
from pathlib import Path
def sync_annotated(dir_arg):
bgnot_dir = Path(dir_arg) / "BGnot"
def sync_annotated(dir_arg, refaire):
if not refaire:
bgnot_dir = Path(dir_arg) / "BGnot"
else:
bgnot_dir = Path(dir_arg) / "BRnot"
annotated_dir = Path.home() / "SyncCopies" / "Annotées"
if not annotated_dir.is_dir():
@ -22,9 +26,16 @@ def sync_annotated(dir_arg):
print("copying ", pdf_file, " to ", dest_file)
shutil.copy2(pdf_file, dest_file)
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python script.py <dir>")
sys.exit(1)
import argparse
sync_annotated(sys.argv[1])
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Move to tablette folder.")
parser.add_argument("dir", help="The directory to process")
parser.add_argument("--refaire", action="store_true", help="Process only copies/labels defined in refaire.json")
args = parser.parse_args()
root_dir = args.dir
sync_annotated(root_dir, args.refaire)

View File

@ -211,7 +211,7 @@ for path_str in args.input_paths:
labels_txt = (INPUT_DIR / "labels").read_text()
valid_labels_set = set(line.strip() for line in labels_txt.splitlines() if line.strip())
names_path = (INPUT_DIR / "names")
if !os.path.exists(names_path):
if not os.path.exists(names_path):
names_path = Path("names")
names_txt = names_path.read_text()

View File

@ -73,13 +73,13 @@ def main():
dest_path = os.path.join(target_subdir, dest_folder_name)
os.makedirs(dest_path, exist_ok=True)
links = [("Concat.jpg", f"{safe_name}.jpg"), ("score.json", "score.json")]
links = [("Concat.jpg", f"{safe_name}.jpg"),("Concat_F.pdf", f"{safe_name}.pdf"), ("score.json", "score.json")]
for src_name, dst_name in links:
src_file = os.path.join(source_folder, src_name)
dst_link = os.path.join(dest_path, dst_name)
try:
if os.path.lexists(dst_link): os.remove(dst_link)
os.symlink(src_file, dst_link)
if os.path.exists(src_file): os.symlink(src_file, dst_link)
except Exception as e:
print(f"Error linking {src_name} for {dest_folder_name}: {e}")

21078
liste_francais.txt Normal file

File diff suppressed because it is too large Load Diff

View File

@ -9,7 +9,7 @@ from tkinter import messagebox
from pathlib import Path
from PIL import Image, ImageDraw, ImageFont, ImageTk
print("o to open pdf, O original pdf, e to emacs part, click for coordinates")
print("o to open pdf, O original pdf, e to emacs part, i to interro, click for coordinates")
# --- Configuration & Globals ---
padding = 60
@ -54,7 +54,7 @@ def convert_list(l, group_id, json_schema):
ll.append(ee)
return ll
def prepare_image(image_path: str, bounding_boxes, all_labels, nb_pages):
def prepare_image(image_path: str, bounding_boxes, all_labels, nb_pages, last_label_index):
im = Image.open(image_path)
im.load()
width, height = im.size
@ -62,7 +62,6 @@ def prepare_image(image_path: str, bounding_boxes, all_labels, nb_pages):
new_im.paste(im, (0, 0))
draw = ImageDraw.Draw(new_im)
bounding_boxes.sort(key=lambda b: (page_number(b["box_2d"], nb_pages), b["box_2d"][0]))
last_label_index = -1
for bbox in bounding_boxes:
raw_y_min = int(bbox["box_2d"][0] * height / 1000)
@ -78,7 +77,7 @@ def prepare_image(image_path: str, bounding_boxes, all_labels, nb_pages):
label = bbox.get("label")
if label and label in all_labels:
current_index = all_labels.index(label)
if current_index < last_label_index:
if current_index < last_label_index or (last_label_index == -1 and current_index != 0):
color = "red"
last_label_index = current_index
@ -88,7 +87,7 @@ def prepare_image(image_path: str, bounding_boxes, all_labels, nb_pages):
draw.text((abs_x_min + 8, abs_y_min - 30), label, fill=color, font=font)
else:
draw.text((abs_x_min + 8, abs_y_max + 6), label, fill=color, font=font)
return new_im
return (new_im, last_label_index)
# --- Processing Logic (Worker Thread) ---
@ -97,10 +96,15 @@ def worker_thread(base_dir, files_to_process, all_labels):
Iterates through files, prepares VISUALS only, and puts metadata in queue.
Does NOT write final JSON files anymore.
"""
previous_copie = None
last_label_index = None
for img_path in files_to_process:
json_path = base_dir / f"{img_path.stem}.json"
copie_part = int(img_path.stem[-2:])
copie = img_path.stem[:-3]
if copie != previous_copie:
last_label_index = -1
previous_copie = copie
json_schema_path = base_dir / 'Cutleft' / f"{copie}_schema.json"
try:
@ -127,7 +131,8 @@ def worker_thread(base_dir, files_to_process, all_labels):
try:
print(f"Buffering {img_path.name}...")
pil_image = prepare_image(str(img_path), bb_list, all_labels, nb_pages)
(pil_image, last_label_index) = \
prepare_image(str(img_path), bb_list, all_labels, nb_pages, last_label_index)
metadata = {
"copie": copie,
@ -169,6 +174,7 @@ class ImageViewer:
self.root.bind('<Return>', self.on_enter)
self.root.bind('e', self.on_edit)
self.root.bind('o', self.on_open_pdf)
self.root.bind('i', self.on_open_interro)
self.root.bind('O', self.on_open_ori_pdf)
self.root.bind('<Escape>', lambda e: self.root.quit())
self.label.bind('<Button-1>', self.on_click)
@ -265,6 +271,12 @@ class ImageViewer:
subprocess.Popen(['xdg-open', str(pdf_path.absolute())])
def on_open_ori_pdf(self, event):
if self.is_viewing and self.current_json_path:
pdf_path = "/home/sebastien/Staging/Interro/" + str(base_dir) + "pdf"
print(f"Opening {pdf_path}")
subprocess.Popen(['xdg-open', pdf_path])
def on_open_interro(self, event):
if self.is_viewing and self.current_json_path:
new_filename = self.current_json_path.stem.split('_')[0] + ".pdf"
pdf_path = self.current_json_path.parent / "Copies Originales" / new_filename

View File

@ -20,42 +20,66 @@ import ftfy
import re
import urllib.request
# url = "https://raw.githubusercontent.com/hbenbel/French-Dictionary/master/dictionary/dictionary.txt"
# french_words = urllib.request.urlopen(url).read().decode('utf-8').splitlines()
with open('liste_francais.txt', 'r') as f:
french_words = f.read().splitlines()
# 2. Pre-compute an O(1) lookup dictionary
# We simulate the corruption by replacing accents with null bytes (\x00)
# lookup_map = {}
# for word in french_words:
# # Replace all French accents with \x00 to create the "broken" key
# broken_key = re.sub(r'[éèêëàâäîïôöùûüçœÉÈÊËÀÂÄÎÏÔÖÙÛÜÇŒ]', '\x00', word)
# if '\x00' in broken_key:
# lookup_map[broken_key] = word # e.g., "\x00cole" -> "école"
lookup_map = {}
for word in french_words:
# Replace all French accents with \x00 to create the "broken" key
broken_key = re.sub(r'[éèêëàâäîïôöùûüçœÉÈÊËÀÂÄÎÏÔÖÙÛÜÇŒ]', '\x00', word)
if '\x00' in broken_key:
lookup_map[broken_key] = word # e.g., "\x00cole" -> "école"
# 3. Fast replace function
def fast_fix(text):
# Find words containing regular letters and null bytes
# def replacer(match):
# broken_word = match.group(0)
# # Return the fixed word from our map, or leave it if not found
# # (Handles case-insensitivity by falling back to lowercase map)
# return lookup_map.get(broken_word.lower(), broken_word)
def replacer(match):
broken_word = match.group(0)
# Return the fixed word from our map, or leave it if not found
# (Handles case-insensitivity by falling back to lowercase map)
fixed = lookup_map.get(broken_word.lower())
# if not fixed:
# print(f"No match found for: {repr(broken_word)}")
return fixed or broken_word
# return re.sub(r'[a-zA-Z\x00]+', replacer, text)
return text
return re.sub(r'[a-zA-Z\x00]+', replacer, text)
# return text
INPUT_FILE = Path(INPUT_DIR) / "correction.json"
OUTPUT_FILE = Path(INPUT_DIR) / "correction.json"
def fix_hex_corruption_safe(text):
# Only matches \x00 followed by hex if it results in an accented character
# or common Latin-1 symbols
return re.sub(r'\x00([eEfF][0-9a-fA-F])',
lambda m: chr(int(m.group(1), 16)),
text)
def some_other_replacements(s):
s = s.replace("\neq", "\\neq")
s = s.replace("\not", "\\not")
return s
def clean_string(s: str) -> str:
# fix encoding issues
s = ftfy.fix_text(s)
# s = ftfy.fix_text(s)
# print(s)
s = fix_hex_corruption_safe(s)
s = s.replace('\x19', '\x00')
s = s.replace('\x18', '\x00')
s = fast_fix(s)
s = s.replace('\x00', '')
return s
s = s.replace('\x00\x00', '\x00')
s = re.sub(r' \x00{1,2} ', ' à ', s)
if '\x00' in s:
s = fast_fix(s)
s = s.replace('\x00', '')
return some_other_replacements(s)
def clean_obj(obj):

View File

@ -4,13 +4,87 @@ import json
import collections
import concurrent.futures
from pathlib import Path
from PIL import Image
from PIL import Image, ImageDraw
import threading
import annotating
from utils import natural_key
from reading_annotations import detect_checks_and_notes, has_significant_notes
def get_extra_pdfs_as_images(root_dir, label, annotating_module):
"""Fetches Text and Sol pdfs for a given label and converts them to images."""
extra_images = []
for folder in ["Text", "Sol"]:
pdf_path = os.path.join(root_dir, folder, f"{label}.pdf")
if os.path.exists(pdf_path):
img, _, _ = annotating_module.make_base_image(pdf_path)
if img:
extra_images.append(img)
return extra_images
def save_paginated_pdf(image_groups, output_path):
"""Concatenates groups of images vertically, adding specific inner borders."""
if not image_groups:
return
max_w = max(img.width for group in image_groups for img in group)
max_page_h = int(max_w * 1.414 * 1.3)
# Calculate 0.2 cm in pixels at 100 DPI (0.2 / 2.54 inches * 100)
border_px = int((0.2 / 2.54) * 100)
pages = []
current_page_imgs = []
current_h = 0
for group in image_groups:
if not group:
continue
# Process the group to add borders
processed_group = []
for i, img in enumerate(group):
if i in (0, 1):
img = img.copy() # Do not modify the original image object in memory
draw = ImageDraw.Draw(img)
color = "black" if i == 0 else "blue"
# Draw the border inside the image edges
draw.rectangle(
[0, 0, img.width - 1, img.height - 1],
outline=color,
width=border_px
)
processed_group.append(img)
group_h = sum(img.height for img in processed_group)
if current_page_imgs and (current_h + group_h > max_page_h):
page = Image.new("RGB", (max_w, current_h), "white")
y = 0
for c_img in current_page_imgs:
page.paste(c_img, (0, y))
y += c_img.height
pages.append(page)
current_page_imgs = processed_group
current_h = group_h
else:
current_page_imgs.extend(processed_group)
current_h += group_h
if current_page_imgs:
page = Image.new("RGB", (max_w, current_h), "white")
y = 0
for c_img in current_page_imgs:
page.paste(c_img, (0, y))
y += c_img.height
pages.append(page)
if pages:
pages[0].save(output_path, "PDF", resolution=100.0, save_all=True, append_images=pages[1:])
def apply_actions_and_regenerate_grouped(root_dir, data, student_id, actions, label_notes, all_labels):
"""
Modifies data based on actions, pastes label-specific note crops,
@ -135,8 +209,11 @@ def apply_actions_and_regenerate_grouped(root_dir, data, student_id, actions, la
else:
if len(result.get('feedback', [])) != 0:
perfect_no_comment = False
if not perfect_no_comment:
concat_list_F.append(final_img)
extras = get_extra_pdfs_as_images(root_dir, label, annotating)
extras.append(final_img)
concat_list_F.append(extras)
# --- 3. Save Final Outputs ---
with open(score_path, "w") as f:
@ -157,17 +234,21 @@ def apply_actions_and_regenerate_grouped(root_dir, data, student_id, actions, la
logs.append(f" Saved regenerated Concat.jpg")
if concat_list_F:
max_w = max(i.width for i in concat_list_F)
total_h = sum(i.height for i in concat_list_F)
full_img = Image.new("RGB", (max_w, total_h), "white")
pdf_out_path = os.path.join(output_dir, "Concat_F.pdf")
save_paginated_pdf(concat_list_F, pdf_out_path)
logs.append(f" Saved regenerated Concat_F.pdf")
y = 0
for img in concat_list_F:
full_img.paste(img, (0, y))
y += img.height
# max_w = max(i.width for i in concat_list_F)
# total_h = sum(i.height for i in concat_list_F)
# full_img = Image.new("RGB", (max_w, total_h), "white")
full_img.save(os.path.join(output_dir, "Concat_F.jpg"))
logs.append(f" Saved regenerated Concat_F.jpg")
# y = 0
# for img in concat_list_F:
# full_img.paste(img, (0, y))
# y += img.height
# full_img.save(os.path.join(output_dir, "Concat_F.jpg"))
# logs.append(f" Saved regenerated Concat_F.jpg")
return "\n".join(logs)
@ -179,7 +260,7 @@ import argparse
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Read grouped annotations and compile PDFs")
parser.add_argument("input_path", help="Directory path")
parser.add_argument("--with-refaire", action="store_true", help="Merge refaire annotations from Bnot")
parser.add_argument("--refaire", action="store_true", help="Merge refaire annotations from Bnot")
args = parser.parse_args()
root_dir = sys.argv[1]
@ -194,26 +275,52 @@ if __name__ == "__main__":
except FileNotFoundError:
all_labels = []
refaire_dict = {}
if args.refaire:
refaire_path = os.path.join(root_dir, "refaire.json")
if os.path.exists(refaire_path):
with open(refaire_path, "r", encoding="utf-8") as f:
refaire_list = json.load(f)
for c_name, labels in refaire_list:
sid = c_name.replace("Copie", "")
refaire_dict[sid] = labels
else:
print(f"Warning: --refaire flag used, but {refaire_path} not found.")
# Load original data
original_data = annotating.make_dictionary(root_dir)
if args.refaire and refaire_list:
original_data = annotating.make_dictionary(root_dir,
refaire=True,
refaire_list=refaire_list)
else:
original_data = annotating.make_dictionary(root_dir)
lock = threading.Lock()
actions_by_student = collections.defaultdict(list)
notes_by_student = collections.defaultdict(dict)
def process_bgnot_entry(entry):
def process_bgnot_entry(entry, only_ids=None):
gdir = os.path.join(bgnot_dir, entry)
if not os.path.isdir(gdir) or entry.startswith("Copie"):
return
bnote_path = os.path.join(gdir, "bnote.json")
with open(bnote_path, "r") as f:
bnote_data = json.load(f)
if only_ids:
id_found = False
for d in bnote_data["images"]:
if d["id"] in only_ids:
id_found = True
if not id_found:
return
actions, notes_img = detect_checks_and_notes(gdir)
bnote_path = os.path.join(gdir, "bnote.json")
if not os.path.exists(bnote_path) or notes_img is None:
return
with open(bnote_path, "r") as f:
bnote_data = json.load(f)
with lock:
for act in actions:
@ -230,13 +337,16 @@ if __name__ == "__main__":
def process_refaire_entry(sid, r_labels):
s_bnot_dir = os.path.join(root_dir, "Bnot", f"Copie{sid}")
s_bnot_dir = os.path.join(root_dir, "BRnot", f"Copie{sid}")
if not os.path.exists(s_bnot_dir): return
if not r_labels: r_labels = list(original_data.get(sid, {}).keys())
if not r_labels:
r_labels = list(original_data.get(sid, {}).keys())
with lock:
actions_by_student[sid] = [a for a in actions_by_student[sid] if a.get('label') not in r_labels]
for lbl in r_labels: notes_by_student[sid].pop(lbl, None)
actions_by_student[sid] = [a for a in actions_by_student[sid]
if a.get('label') not in r_labels]
for lbl in r_labels:
notes_by_student[sid].pop(lbl, None)
b_actions, b_notes_img = detect_checks_and_notes(s_bnot_dir)
b_bnote_path = os.path.join(s_bnot_dir, "bnote.json")
@ -259,24 +369,20 @@ if __name__ == "__main__":
# --- 0. Read refaire.json if requested ---
refaire_dict = {}
if args.with_refaire:
refaire_path = os.path.join(root_dir, "refaire.json")
if os.path.exists(refaire_path):
with open(refaire_path, "r", encoding="utf-8") as f:
refaire_list = json.load(f)
for c_name, labels in refaire_list:
sid = c_name.replace("Copie", "")
refaire_dict[sid] = labels
else:
print(f"Warning: --with-refaire flag used, but {refaire_path} not found.")
# Part 1 : lecture des bgnot
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
executor.map(process_bgnot_entry, os.listdir(bgnot_dir))
if refaire_dict:
only_ids = [ids for ids in refaire_dict]
else:
only_ids = None
# Part 1.5: Refaire
if args.with_refaire and refaire_dict:
# Lecture des bgnot
with concurrent.futures.ThreadPoolExecutor(max_workers=6) as executor:
executor.map(lambda x: process_bgnot_entry(x, only_ids=only_ids),
os.listdir(bgnot_dir))
# Refaire
if args.refaire and refaire_dict:
for sid, labels in refaire_dict.items():
process_refaire_entry(sid, labels)
@ -296,7 +402,11 @@ if __name__ == "__main__":
# --- 2. Process each student concurrently using 4 threads ---
sids = sorted(original_data.keys(), key=natural_key)
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(process_student, sid): sid for sid in sids}
if refaire_dict:
futures = {executor.submit(process_student, sid): sid for sid in refaire_dict}
else:
futures = {executor.submit(process_student, sid): sid for sid in sids}
for future in concurrent.futures.as_completed(futures):
output = future.result()
if output:

View File

@ -150,7 +150,7 @@ def split_an_interro(base_dir, input_pdf, coords_list):
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python scrit.py <directory or pdf_file>")
print("Usage: python script.py <directory or pdf_file>")
sys.exit(1)
input_arg = Path(sys.argv[1])
@ -167,6 +167,7 @@ if __name__ == "__main__":
for pdf_path in pdf_files:
json_path = pdf_path.with_suffix(".json")
# print("Debug :", json_path)
if json_path.exists():
(name, coords) = decode_json(pdf_path)
print("Decoded name : ", name)

79
submit_batches.py Normal file
View File

@ -0,0 +1,79 @@
import os
import sys
import argparse
from pathlib import Path
from google import genai
from google.genai import types
def main():
parser = argparse.ArgumentParser(description="Upload JSONL files and create Gemini Batch jobs.")
parser.add_argument("root_dir", type=str, help="Root directory containing the batch JSONL files")
args = parser.parse_args()
root_dir = Path(args.root_dir)
if "GEMINI_API_KEY" not in os.environ:
sys.exit("Error: GEMINI_API_KEY environment variable not set.")
client = genai.Client()
# Define the batch files and their corresponding models
batches_to_create = [
{
"file_path": root_dir / "batch_requests_flash.jsonl",
"model_id": "gemini-3-flash-preview",
"display_name": f"flash-correction-{root_dir.name}"
},
{
"file_path": root_dir / "batch_requests_pro.jsonl",
"model_id": "gemini-3.1-pro-preview",
"display_name": f"pro-correction-{root_dir.name}"
}
]
for batch in batches_to_create:
file_path = batch["file_path"]
model_id = batch["model_id"]
display_name = batch["display_name"]
# Check if the file exists
if not file_path.exists():
print(f"Skipping {model_id}: {file_path.name} does not exist.")
continue
# Check if the file is empty (e.g., if all tasks went to Flash, Pro might be empty)
if file_path.stat().st_size == 0:
print(f"Skipping {model_id}: {file_path.name} is empty.")
continue
print(f"Processing {file_path.name} for model {model_id}...")
# 1. Upload the file to the File API
print(f" Uploading file...")
uploaded_file = client.files.upload(
file=str(file_path),
config=types.UploadFileConfig(
display_name=f"{display_name}-input",
mime_type='jsonl'
)
)
print(f" Uploaded successfully! File ID: {uploaded_file.name}")
# 2. Create the batch job
print(f" Starting batch job...")
batch_job = client.batches.create(
model=model_id,
src=uploaded_file.name,
config={
'display_name': display_name,
},
)
print(f" Success! Batch Job Name: {batch_job.name}\n")
print("-" * 50)
print("All batch jobs have been initiated.")
print("Save the Batch Job Names above. You can monitor them with:")
print(" client.batches.get(name='YOUR_BATCH_JOB_NAME')")
if __name__ == "__main__":
main()

View File

@ -44,9 +44,29 @@ def process_directory(dir_arg):
# Création du lien symbolique (pointe vers le chemin absolu pour éviter les problèmes)
os.link(concat_file.absolute(), symlink_path)
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python script.py <dir>")
sys.exit(1)
import argparse
process_directory(sys.argv[1])
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Move to tablette folder.")
parser.add_argument("dir", help="The directory to process")
parser.add_argument("--refaire", action="store_true", help="Process only copies/labels defined in refaire.json")
args = parser.parse_args()
root_dir = args.dir
if args.refaire:
base_dir = Path(root_dir)
brnot_dir = base_dir / "BRnot"
sync_dir = Path.home() / "SyncCopies" / "À Annoter" / root_dir
sync_dir.mkdir(parents=True, exist_ok=True)
for f in brnot_dir.iterdir():
concat_file = f / "Concat.pdf"
if f.is_dir() and concat_file.is_file():
symlink_path = sync_dir / f"{f.name}.pdf"
if symlink_path.exists():
symlink_path.unlink()
os.link(concat_file.absolute(), symlink_path)
sys.exit(0)
else:
process_directory(root_dir)