Some files, and things.

master
Sébastien Miquel 2026-02-08 13:35:11 +01:00
parent 56b158969d
commit 8059544e26
5 changed files with 410 additions and 464 deletions

View File

@ -1,6 +1,7 @@
import sys
import os
import time
import json # Added for schema output
import tkinter as tk
from threading import Thread
from queue import Queue, Empty
@ -35,9 +36,56 @@ if not os.path.exists(OUTPUT_DIR):
# --- Processing Logic ---
def distribute_pages(total_pages, max_per_file=5):
"""
Calculates how to split pages into chunks <= max_per_file,
balancing the number of columns per file.
Example: 12 pages, max 5 -> [4, 4, 4]
"""
if total_pages == 0:
return []
# Calculate minimum number of files needed
num_files = (total_pages + max_per_file - 1) // max_per_file
# Calculate base size and remainder
base_count = total_pages // num_files
remainder = total_pages % num_files
distribution = []
for i in range(num_files):
# Distribute remainder to the first few files
count = base_count + (1 if i < remainder else 0)
distribution.append(count)
return distribution
def stitch_images(image_list):
"""Helper to stitch a list of images horizontally with delimiters."""
if not image_list:
return None
num_images = len(image_list)
total_width = sum(img.width for img in image_list) + (num_images - 1) * DELIMITER_WIDTH
max_height = max(img.height for img in image_list)
combined = Image.new('RGB', (total_width, max_height), color=(255, 255, 255))
x_offset = 0
for idx, img in enumerate(image_list):
combined.paste(img, (x_offset, 0))
x_offset += img.width
if idx < num_images - 1:
delimiter = Image.new('RGB', (DELIMITER_WIDTH, max_height), color=DELIMITER_COLOR)
combined.paste(delimiter, (x_offset, 0))
x_offset += DELIMITER_WIDTH
return combined
def process_single_pdf(filename, shift_offset=0):
"""
Converts PDF to stitched JPG image (PIL object).
Converts PDF to stitched images.
Returns a tuple: (preview_image_resized, list_of_split_images, schema_dict)
"""
pdf_path = os.path.join(INPUT_DIR, filename)
try:
@ -61,35 +109,57 @@ def process_single_pdf(filename, shift_offset=0):
if not cropped_images:
return None
# Combine
num_images = len(cropped_images)
total_width = sum(img.width for img in cropped_images) + (num_images - 1) * DELIMITER_WIDTH
max_height = max(img.height for img in cropped_images)
# 1. Generate Schema / Distribution
col_distribution = distribute_pages(len(cropped_images), max_per_file=5)
combined = Image.new('RGB', (total_width, max_height), color=(255, 255, 255))
# 2. Generate Split Images (Full Resolution)
split_images = []
current_idx = 0
for count in col_distribution:
chunk = cropped_images[current_idx : current_idx + count]
stitched_chunk = stitch_images(chunk)
split_images.append(stitched_chunk)
current_idx += count
x_offset = 0
for idx, img in enumerate(cropped_images):
combined.paste(img, (x_offset, 0))
x_offset += img.width
if idx < num_images - 1:
delimiter = Image.new('RGB', (DELIMITER_WIDTH, max_height), color=DELIMITER_COLOR)
combined.paste(delimiter, (x_offset, 0))
x_offset += DELIMITER_WIDTH
# 3. Generate Preview (All stitched together, Resized)
full_stitch = stitch_images(cropped_images)
preview_resized = full_stitch.resize(OUTPUT_SIZE, Image.LANCZOS)
# Resize
resized = combined.resize(OUTPUT_SIZE, Image.LANCZOS)
return resized
schema = {
"original_filename": filename,
"total_pages": len(cropped_images),
"number_of_files": len(split_images),
"columns_per_file": col_distribution
}
return (preview_resized, split_images, schema)
except Exception as e:
print(f"Error processing {filename}: {e}")
return None
def save_image(pil_img, filename):
output_filename = os.path.splitext(filename)[0] + ".jpg"
output_path = os.path.join(OUTPUT_DIR, output_filename)
pil_img.save(output_path, "JPEG", quality=95)
print(f"Saved: {output_filename}")
def save_results(result_tuple, filename):
"""
Saves the split images and the schema JSON.
"""
_, splits, schema = result_tuple
base_name = os.path.splitext(filename)[0]
# Save Images
for i, img in enumerate(splits):
# Suffix _01, _02, etc.
suffix = f"_{i+1:02d}"
output_filename = f"{base_name}{suffix}.jpg"
output_path = os.path.join(OUTPUT_DIR, output_filename)
img.save(output_path, "JPEG", quality=95)
print(f"Saved: {output_filename}")
# Save Schema
json_filename = f"{base_name}_schema.json"
json_path = os.path.join(OUTPUT_DIR, json_filename)
with open(json_path, 'w') as f:
json.dump(schema, f, indent=4)
print(f"Saved schema: {json_filename}")
# --- GUI Application ---
@ -98,10 +168,10 @@ class ImageReviewer:
self.files = file_list
self.index = 0
self.current_shift = 0
self.current_pil = None
self.current_preview = None # Only stores the resized preview for GUI
self.is_processing = False
# Queue for pre-fetched images (index, image)
# Queue for pre-fetched results (index, (preview, splits, schema))
self.prefetch_queue = Queue(maxsize=1)
# Queue for manual re-processing results
self.manual_queue = Queue()
@ -142,12 +212,11 @@ class ImageReviewer:
if target < len(self.files):
if idx_to_process != target:
fname = self.files[target]
img = process_single_pdf(fname, shift_offset=0)
if img:
self.prefetch_queue.put((target, img)) # Blocks if full
result = process_single_pdf(fname, shift_offset=0)
if result:
self.prefetch_queue.put((target, result)) # Blocks if full
idx_to_process = target
# Crucial fix: Sleep briefly to release CPU
time.sleep(0.1)
def load_current_image(self, use_prefetch=False):
@ -159,19 +228,17 @@ class ImageReviewer:
filename = self.files[self.index]
self.is_processing = False
img_found = None
result_found = None
if use_prefetch and not self.prefetch_queue.empty():
q_idx, q_img = self.prefetch_queue.queue[0]
q_idx, q_result = self.prefetch_queue.queue[0]
if q_idx == self.index:
_, img_found = self.prefetch_queue.get()
_, result_found = self.prefetch_queue.get()
self.current_shift = 0
print(f"Loaded {filename} from prefetch.")
if img_found:
self.current_pil = img_found
save_image(self.current_pil, filename)
self.update_display(filename)
if result_found:
self.handle_processing_result(result_found, filename)
else:
# Not in queue (first load or queue mismatch), process manually
self.trigger_processing(filename, self.current_shift)
@ -182,8 +249,8 @@ class ImageReviewer:
self.label_info.configure(text=f"Processing {filename} (Shift {shift})... Please wait.", fg="red")
def worker():
img = process_single_pdf(filename, shift)
self.manual_queue.put(img)
res = process_single_pdf(filename, shift)
self.manual_queue.put(res)
Thread(target=worker, daemon=True).start()
self.check_manual_queue(filename)
@ -191,11 +258,9 @@ class ImageReviewer:
def check_manual_queue(self, filename):
"""Polls the manual queue for result."""
try:
img = self.manual_queue.get_nowait()
self.current_pil = img
if self.current_pil:
save_image(self.current_pil, filename)
self.update_display(filename)
result = self.manual_queue.get_nowait()
if result:
self.handle_processing_result(result, filename)
else:
print(f"Failed to process {filename}, skipping.")
self.index += 1
@ -205,13 +270,29 @@ class ImageReviewer:
# Check again in 100ms
self.root.after(100, lambda: self.check_manual_queue(filename))
def update_display(self, filename):
if self.current_pil:
tk_image = ImageTk.PhotoImage(self.current_pil)
def handle_processing_result(self, result, filename):
"""Unpacks result, saves files, and updates display."""
preview, splits, schema = result
self.current_preview = preview
# Save immediately upon loading/calculating
save_results(result, filename)
self.update_display(filename, schema)
def update_display(self, filename, schema=None):
if self.current_preview:
tk_image = ImageTk.PhotoImage(self.current_preview)
self.label_img.configure(image=tk_image)
self.label_img.image = tk_image
schema_info = ""
if schema:
cols = str(schema['columns_per_file'])
schema_info = f"\nFiles: {schema['number_of_files']} | Cols: {cols}"
self.label_info.configure(
text=f"[{self.index+1}/{len(self.files)}] {filename} | Shift: {self.current_shift}px\n"
text=f"[{self.index+1}/{len(self.files)}] {filename} | Shift: {self.current_shift}px"
f"{schema_info}\n"
f"Enter: Next | n: +50 | N: +100 | t: -50",
fg="black"
)

View File

@ -1,166 +0,0 @@
import sys
import os
import time
from google import genai
from google.genai import types
import base64
from pathlib import Path
if len(sys.argv) < 2:
sys.exit("Usage: python script.py <directory_path>")
INPUT_DIR = sys.argv[1]
CUTLEFT_DIR = os.path.join(INPUT_DIR, 'Cutleft')
MODEL_ID = "gemini-3-flash-preview"
api_key="REMOVED_API_KEY"
my_prompt = """I'm giving you an image of the left columns of a written exam.
Students answer several exercises, which can have several questions.
The image consists of several columns, separated by vertical black
lines. The image should be read top to bottom and then left to right,
meaning first column, then second column, etc.
In their sheet, students delimit exercises and questions using
delimiters such as `Ex 1`, or `Exercice 1`, and `1)` or `a)`. You need
to give me the bounding boxes of each delimiter.
When giving the bounding box of the first question of an exercise, the
box should be large enough to contain both the exercice label
(`Exercice i`) and the question label (`1)`) parts.
You also need to give me the student name. It should appear on the top
left of the image. Disregard any mention of `MPSI 3`, it is their
class. A list of possible student names will be given below.
You will answer with a JSON object, containing a `name` field with the
name, and a `list` field, with the list of the bounding boxes and
their labels. The box_2d should be [ymin, xmin, ymax, xmax] normalized
to 0-1000.
Here is an example :
{\"name\" : \"John Doe\", \"list\" : [{\"box_2d\": (10, 20, 30, 40), \"label\" : \"Ex 1 : 1)\"}]}
Do not provide a box_2d for the name. Only for the labels.
You may find the same label present several times, as a student either
recall the current label on a new page, or adds content to its answer
later on. Give the position of each instance of each label.
For this exam you should look for the labels given below, separated by
newlines. A student need not have answered every question, so some may
be missing.
##labels##
Here's a list of the names of the students, pick the one that matches
the best or `\"Unknown\"` if you cannot read the name
##names##"""
from tqdm import tqdm
def process_batch(directory):
client = genai.Client(api_key=api_key)
image_files = list(Path(directory).glob("*.jpg"))
if not image_files:
print("No .jpg files found.")
return
# 1. Upload images to File API (Batch requirement)
batch_requests = []
print(f"Uploading {len(image_files)} images to File API...")
for img_path in tqdm(image_files, unit="img"):
# Upload file
file_ref = client.files.upload(path=img_path)
# Construct Request for JSONL
# Note: We must serialize config manually for the JSONL body
req_body = {
"contents": [
{"role": "user", "parts": [
{"fileData": {"mimeType": file_ref.mime_type, "fileUri": file_ref.uri}},
{"text": my_prompt}
]}
],
"generationConfig": {
"temperature": 1.0,
"topP": 0.95,
"maxOutputTokens": 65535,
"thinkingConfig": {"thinkingBudget": -1}
},
"safetySettings": [
{"category": cat, "threshold": "BLOCK_NONE"}
for cat in ["HARM_CATEGORY_HATE_SPEECH", "HARM_CATEGORY_DANGEROUS_CONTENT",
"HARM_CATEGORY_SEXUALLY_EXPLICIT", "HARM_CATEGORY_HARASSMENT"]
]
}
# Batch Request Entry
batch_requests.append({
"custom_id": img_path.name,
"method": "POST",
"url": f"/v1beta/models/{MODEL_ID}:generateContent",
"body": req_body
})
# 2. Create and Upload Batch Source File (JSONL)
batch_file_path = os.path.join(INPUT_DIR, "batch_input.jsonl")
with open(batch_file_path, "w") as f:
for req in batch_requests:
f.write(json.dumps(req) + "\n")
batch_input_file = client.files.upload(path=batch_file_path)
# 3. Submit Batch Job
print("Submitting batch job...")
job = client.batches.create(
model=MODEL_ID,
src=batch_input_file.name
)
print(f"Batch Job ID: {job.name}")
# 4. Poll for Completion
pbar = tqdm(desc="Processing Batch", unit="poll")
while True:
job = client.batches.get(name=job.name)
if job.state == "ACTIVE":
pbar.set_description("Processing")
elif job.state == "SUCCEEDED" or job.state == "FAILED":
break
pbar.update(1)
time.sleep(10) # Poll every 10 seconds
pbar.close()
if job.state == "FAILED":
print(f"Batch job failed: {job.error}")
return
# 5. Retrieve and Save Results
print("Downloading results...")
# The output file is a remote URI, we download its content
output_content = client.files.content(path=job.output_file.name)
# Parse JSONL output and map back to files
# Output format: {"custom_id": "...", "response": {...}}
results_saved = 0
for line in output_content.decode("utf-8").splitlines():
if not line: continue
result = json.loads(line)
filename = result.get("custom_id")
if filename:
output_path = Path(directory) / f"{filename}.json"
with open(output_path, "w", encoding="utf-8") as f:
# Save the full response part
json.dump(result.get("response", {}), f, indent=2)
results_saved += 1
print(f"Batch complete. Saved {results_saved} result files.")
process_batch(CUTLEFT_DIR)

119
gemini.py
View File

@ -1,119 +0,0 @@
from google import genai
from google.genai import types
import base64
from pathlib import Path
MODEL_ID = "gemini-3-flash-preview"
api_key="REMOVED_API_KEY"
my_prompt = """I'm giving you an image of the left columns of a written exam.
Students answer several exercises, which can have several questions.
The image consists of several columns, separated by vertical black
lines. The image should be read top to bottom and then left to right,
meaning first column, then second column, etc.
In their sheet, students delimit exercises and questions using
delimiters such as `Ex 1`, or `Exercice 1`, and `1)` or `a)`. You need
to give me the bounding boxes of each delimiter.
When giving the bounding box of the first question of an exercise, the
box should be large enough to contain both the exercice label
(`Exercice i`) and the question label (`1)`) parts.
You also need to give me the student name. It should appear on the top
left of the image. Disregard any mention of `MPSI 3`, it is their
class. A list of possible student names will be given below.
You will answer with a JSON object, containing a `name` field with the
name, and a `list` field, with the list of the bounding boxes and
their labels. The box_2d should be [ymin, xmin, ymax, xmax] normalized
to 0-1000.
Here is an example :
{\"name\" : \"John Doe\", \"list\" : [{\"box_2d\": (10, 20, 30, 40), \"label\" : \"Ex 1 : 1)\"}]}
Do not provide a box_2d for the name. Only for the labels.
You may find the same label present several times, as a student either
recall the current label on a new page, or adds content to its answer
later on. Give the position of each instance of each label.
For this exam you should look for the labels given below, separated by
newlines. A student need not have answered every question, so some may
be missing.
##labels##
Here's a list of the names of the students, pick the one that matches
the best or `\"Unknown\"` if you cannot read the name
##names##"""
from pydantic import BaseModel, Field
from typing import List
class BoxItem(BaseModel):
box_2d: List[int] = Field(description="Bounding box coordinates (e.g., [ymin, xmin, ymax, xmax])")
label: str = Field(description="The label associated with the specific box")
class AnnotationData(BaseModel):
name: str = Field(description="The name identifier")
list: List[BoxItem] = Field(description="List of bounding box items")
def generate_request(file, labels):
"""Generates request for Gemini."""
image_path = Path(file)
contents = [
types.Content(
role="user",
parts=[
types.Part.from_bytes(
data=image_path.read_bytes(),
mime_type="image/jpeg"
),
types.Part.from_text(text=my_prompt + labels),
],
)
]
generate_content_config = types.GenerateContentConfig(
temperature=1.0,
top_p=0.95,
seed=0,
max_output_tokens=65535,
response_mime_type= "application/json",
response_json_schema= AnnotationData.model_json_schema(),
# Thinking config is not compatible with response_json ? Unsure.
# thinking_config=types.ThinkingConfig(
# thinking_budget=-1,
# ),
# thinking_config=types.ThinkingConfig(
# include_thoughts=True,
# thinking_budget=1024, # Optimized for Gemini 3 capabilities
# ),
)
return (contents, generate_content_config)
import sys
import os
import time
if len(sys.argv) < 2:
sys.exit("Usage: python script.py Staging/cutleft1000.jpg labels")
INPUT_FILE = sys.argv[1]
contents, config = generate_request(INPUT_FILE)
client = genai.Client(api_key=api_key)
for chunk in client.models.generate_content_stream(
model=MODEL_ID,
contents=contents,
config=config,
):
if chunk.text:
print(chunk.text, end="", flush=True)

View File

@ -3,12 +3,15 @@ from google.genai import types
import base64
from pathlib import Path
from pydantic import BaseModel, Field
from typing import List
from typing import List, Dict
import sys
import os
import time
import json
import argparse
import re
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
MODEL_ID = "gemini-3-flash-preview"
api_key="REMOVED_API_KEY"
@ -24,6 +27,55 @@ In their sheet, students delimit exercises and questions using
delimiters such as `Ex 1`, or `Exercice 1`, and `1)` or `a)`. You need
to give me the bounding boxes of each delimiter.
When giving the bounding box of the first question of an exercise, the
box should be large enough to contain both the exercice label
(`Exercice i`) and the question label (`1)`) parts. If they are
horizontally far apart (example : if the `1)` is to the left and the
`Exercice i` is either to the right, or in the middle) then give only
the bounding box of the question label `1)` part. You should still
label it as `Exercice i : 1)` though.
You also need to give me the student name. It should appear on the top
left of the image. Disregard any mention of `MPSI 3`, it is their
class. A list of possible student names will be given below.
You will answer with a JSON object, containing a `name` field with the
name, and a `list` field, with the list of the bounding boxes and
their labels. The box_2d should be [ymin, xmin, ymax, xmax] normalized
to 0-1000.
Here is an example :
{\"name\" : \"John Doe\", \"list\" : [{\"box_2d\": (10, 20, 30, 40), \"label\" : \"Ex 1 : 1)\"}]}
Do not provide a box_2d for the name. Only for the labels. Order the
box_2d by their position in the page, column by column : first column
(top to bottom), then second column, etc.
You may find the same label present several times, as a student either
recall the current label on a new page, or adds content to its answer
later on. Give the position of each instance of each label.
For this exam you should look for the labels given below, separated by
newlines. A student need not have answered every question, so some may
be missing.
##labels##
Here's a list of the names of the students, pick the one that matches
the best or `\"Unknown\"` if you cannot read the name
##names##"""
my_prompt2 = """I'm giving you an image of the left columns of a written exam.
Students answer several exercises, which can have several questions.
The image consists of several columns, separated by vertical black
lines. The image should be read top to bottom and then left to right,
meaning first column, then second column, etc.
In their sheet, students delimit exercises and questions using
delimiters such as `Ex 1`, or `Exercice 1`, and `1)` or `a)`. You need
to give me the bounding boxes of each delimiter.
When giving the bounding box of the first question of an exercise, the
box should be large enough to contain both the exercice label
(`Exercice i`) and the question label (`1)`) parts.
@ -46,16 +98,26 @@ You may find the same label present several times, as a student either
recall the current label on a new page, or adds content to its answer
later on. Give the position of each instance of each label.
This image is one part of a sequence (e.g., part 2 of 3) for a single
student. Here is the list of labels found in the *previous* parts of
this copy:
[
##prev_context##
]
If the first column starts with a number like =3)= or =c)=, look at
the labels in the list above. If the last relevant label was =Ex 4 :
2)=, you should label the new box =Ex 4 : 3)=.
For this exam you should look for the labels given below, separated by
newlines. A student need not have answered every question, so some may
be missing.
##labels##
Here's a list of the names of the students, pick the one that matches
the best or `\"Unknown\"` if you cannot read the name
##names##"""
Since this copy isn't the first part of a sequence, simply set the
name to `\"Continued\"`."""
class BoxItem(BaseModel):
box_2d: List[int] = Field(description="Bounding box coordinates (e.g., [ymin, xmin, ymax, xmax])")
@ -66,12 +128,21 @@ class AnnotationData(BaseModel):
list: List[BoxItem] = Field(description="List of bounding box items")
def generate_request(file, labels, names):
"""Generates request for Gemini."""
def generate_request(file, labels, names, context_labels):
"""Generates request for Gemini with context."""
image_path = Path(file)
text = my_prompt.replace("##labels##",labels).replace("##names##", names)
# Format context list as a string
context_str = ", ".join([f'"{l}"' for l in context_labels]) if context_labels else "No previous context"
if context_labels == []:
text = my_prompt.replace("##labels##", labels)\
.replace("##names##", names)
else:
text = my_prompt2.replace("##labels##", labels)\
.replace("##prev_context##", context_str)
contents = [
types.Content(
role="user",
@ -97,74 +168,115 @@ def generate_request(file, labels, names):
# Argument Parsing
parser = argparse.ArgumentParser(description="Process a directory or specific file using Gemini.")
parser.add_argument("input_path", help="The input directory or specific file (e.g., Dir/File.pdf)")
parser.add_argument("input_path", help="The input directory or specific file")
parser.add_argument("--overwrite", action="store_true", help="Regenerate output even if it exists")
args = parser.parse_args()
input_arg = Path(args.input_path)
image_files = []
# Logic to handle Directory vs File argument
# Setup Paths and Files
if input_arg.is_file():
# If argument is Dir/Copiedd.pdf
INPUT_DIR = input_arg.parent
CUTLEFT_DIR = INPUT_DIR / 'Cutleft'
# Look for matching .jpg in Cutleft (e.g., Copiedd.jpg)
# For a single file, we verify it exists but we might miss context if we don't look for siblings
# Simplification: We add just this file, context will be empty.
target_image = CUTLEFT_DIR / f"{input_arg.stem}.jpg"
if target_image.exists():
image_files = [target_image]
else:
print(f"Error: Corresponding image {target_image} not found.")
print(f"Error: {target_image} not found.")
sys.exit(1)
else:
# If argument is just Dir
INPUT_DIR = input_arg
CUTLEFT_DIR = INPUT_DIR / 'Cutleft'
image_files = sorted(list(CUTLEFT_DIR.glob("*.jpg")))
labels = (INPUT_DIR / "labels").read_text()
names = (INPUT_DIR / "names").read_text()
labels_txt = (INPUT_DIR / "labels").read_text()
names_txt = (INPUT_DIR / "names").read_text()
client = genai.Client(api_key=api_key)
# Target > 3.0s per request to stay under 20 RPM
# Group files by Copy ID (e.g. Copie01_01.jpg -> Copie01)
# regex: match everything before the last underscore if it ends in digits
file_groups = defaultdict(list)
for img in image_files:
stem = img.stem
# match CopieXX_YY -> Group CopieXX
match = re.match(r"(.+)_(\d+)$", stem)
if match:
group_key = match.group(1)
file_groups[group_key].append(img)
else:
# Fallback for files without underscore numbering
file_groups[stem].append(img)
# Sort files within each group to ensure sequential processing
for key in file_groups:
file_groups[key].sort(key=lambda x: x.name)
TARGET_INTERVAL = 3.5
from concurrent.futures import ThreadPoolExecutor
def process_copy_group(group_key, files):
"""Processes a list of files belonging to one copy sequentially to maintain context."""
def process_image(image_file):
start_time = time.time()
base_name, _ = os.path.splitext(image_file.name)
output_json = os.path.join(INPUT_DIR, f"{base_name}.json")
# Context accumulator for this specific copy
accumulated_labels = []
# Skip if already processed unless overwrite is enabled
if os.path.exists(output_json) and not args.overwrite:
print(f"Skipping {image_file.name}, output exists.")
return
for image_file in files:
start_time = time.time()
base_name = image_file.stem
output_json = INPUT_DIR / f"{base_name}.json"
print(f"Processing {image_file.name}...")
# Check existing
if output_json.exists() and not args.overwrite:
print(f"[{group_key}] Skipping {image_file.name}, output exists.")
# If skipping, we should try to load existing labels to keep context for next parts
try:
with open(output_json, 'r') as f:
data = json.load(f)
for item in data.get('list', []):
accumulated_labels.append(item['label'])
except:
pass # If read fails, next part has no context
continue
try:
# Prepare and execute request
contents, config = generate_request(image_file, labels, names)
response = client.models.generate_content(
model=MODEL_ID,
contents=contents,
config=config
)
annota = AnnotationData.model_validate_json(response.text)
# Save result
with open(output_json, "w", encoding="utf-8") as f:
json.dump(annota.model_dump(), f, indent=2)
print(f"[{group_key}] Processing {image_file.name} with {len(accumulated_labels)} ctx items...")
except Exception as e:
print(f"Error processing {image_file.name}: {e}")
try:
contents, config = generate_request(image_file, labels_txt, names_txt, accumulated_labels)
# Rate Limiting (Note: This limits per-thread, not global total)
elapsed = time.time() - start_time
time.sleep(max(0, TARGET_INTERVAL - elapsed))
response = client.models.generate_content(
model=MODEL_ID,
contents=contents,
config=config
)
# Run with 6 threads
annota = AnnotationData.model_validate_json(response.text)
# Save result
with open(output_json, "w", encoding="utf-8") as f:
json.dump(annota.model_dump(), f, indent=2)
# Update context for the next part in this group
for box in annota.list:
accumulated_labels.append(box.label)
except Exception as e:
print(f"Error processing {image_file.name}: {e}")
# Rate Limiting
elapsed = time.time() - start_time
time.sleep(max(0, TARGET_INTERVAL - elapsed))
# Run ThreadPool on GROUPS (Copies), not individual files
# Each thread handles one student's full exam copy sequentially
with ThreadPoolExecutor(max_workers=6) as executor:
executor.map(process_image, image_files)
# Convert dict items to arguments for map
# executor.map expects a function and an iterable.
# We use a lambda or separate function to unpack the tuple if needed,
# but here we'll just submit futures.
futures = [executor.submit(process_copy_group, k, v) for k, v in file_groups.items()]
# Wait for all to complete
for future in futures:
future.result()

View File

@ -6,44 +6,57 @@ import subprocess
import tkinter as tk
from pathlib import Path
from PIL import Image, ImageDraw, ImageFont, ImageTk
from pypdf import PdfReader
# --- Configuration & Globals ---
padding = 60 # White margin to the right
padding = 60
# Queue payload: (pil_image, json_path, metadata)
# metadata is a dict: {'copie': str, 'part': int, 'schema': dict}
image_queue = queue.Queue(maxsize=5)
image_queue = queue.Queue(maxsize=5) # Buffer a few images ahead
try:
font = ImageFont.truetype("DejaVuSans.ttf", size=30)
except OSError:
font = ImageFont.load_default()
# --- Processing Logic (Worker Thread) ---
# --- Helper Functions (Shared) ---
def page_number(b, nb_pages):
column_width = 1000 // nb_pages
center_x = (b[1] + b[3]) // 2
return center_x // column_width
def convert_box2d(b, pn_ori, npn, tot_ori, tot_dest):
l = b.copy()
l[1] = (l[1] - (1000 // tot_ori) * (pn_ori-1)) * tot_ori // tot_dest\
+ (1000 // tot_dest) * (npn - 1)
l[3] = (l[3] - (1000 // tot_ori) * (pn_ori-1)) * tot_ori // tot_dest\
+ (1000 // tot_dest) * (npn - 1)
return l
def convert_list(l, group_id, json_schema):
ll = []
nb_pages = json_schema["columns_per_file"][group_id-1]
nb_previous_pages = sum([json_schema["columns_per_file"][i] for i in range(group_id-1)])
nb_tot_pages = sum([e for e in json_schema["columns_per_file"]])
for e in l:
ee = e.copy()
pn = page_number(e["box_2d"], nb_pages)
npn = pn + nb_previous_pages
ee["box_2d"] = convert_box2d(ee["box_2d"], pn, npn, nb_pages, nb_tot_pages)
ee["part"] = group_id
ee["pn"] = npn
ll.append(ee)
return ll
def prepare_image(image_path: str, bounding_boxes, all_labels, nb_pages):
"""
Draws boxes on the image and returns the PIL Image object.
Does NOT display it.
"""
im = Image.open(image_path)
# Ensure image is loaded so we can pass it between threads safely
im.load()
width, height = im.size
# Add white padding to the right
new_im = Image.new(im.mode, (width + padding, height), "white")
new_im.paste(im, (0, 0))
draw = ImageDraw.Draw(new_im)
bounding_boxes.sort(key=lambda b: (page_number(b["box_2d"], nb_pages), b["box_2d"][0]))
last_label_index = -1
for bbox in bounding_boxes:
@ -51,7 +64,6 @@ def prepare_image(image_path: str, bounding_boxes, all_labels, nb_pages):
raw_x_min = int(bbox["box_2d"][1] * width / 1000)
raw_y_max = int(bbox["box_2d"][2] * height / 1000)
raw_x_max = int(bbox["box_2d"][3] * width / 1000)
abs_y_min = max(0, raw_y_min - 10)
abs_x_min = max(0, raw_x_min - 10)
abs_y_max = min(height, raw_y_max + 10)
@ -59,60 +71,67 @@ def prepare_image(image_path: str, bounding_boxes, all_labels, nb_pages):
color = "black"
label = bbox.get("label")
if label and label in all_labels:
current_index = all_labels.index(label)
if current_index < last_label_index:
color = "red"
last_label_index = current_index
draw.rectangle(
((abs_x_min, abs_y_min), (abs_x_max, abs_y_max)),
outline=color,
width=4,
)
draw.rectangle(((abs_x_min, abs_y_min), (abs_x_max, abs_y_max)), outline=color, width=4)
if label:
# draw.text((abs_x_min + 8, abs_y_min + 6), label, fill=color, font=font)
if abs_y_min > 80:
draw.text((abs_x_min + 8, abs_y_min - 30), label, fill=color, font=font)
else:
draw.text((abs_x_min + 8, abs_y_max + 6), label, fill=color, font=font)
return new_im
# --- Processing Logic (Worker Thread) ---
def worker_thread(base_dir, files_to_process, all_labels):
"""
Iterates through files, processes them, and puts them in the queue.
Iterates through files, prepares VISUALS only, and puts metadata in queue.
Does NOT write final JSON files anymore.
"""
for img_path in files_to_process:
json_path = base_dir / f"{img_path.stem}.json"
pdf_path = base_dir / f"{img_path.stem}.pdf"
copie_part = int(img_path.stem[-2:])
copie = img_path.stem[:-3]
json_schema_path = base_dir / 'Cutleft' / f"{copie}_schema.json"
nb_pages = 1
if pdf_path.exists():
try:
nb_pages = len(PdfReader(pdf_path).pages)
except Exception:
pass
try:
with open(json_schema_path, 'r') as f:
json_schema = json.load(f)
except:
print("No json_schema : ", json_schema_path)
continue
nb_pages = json_schema["columns_per_file"][copie_part-1]
if json_path.exists():
try:
# Read strictly for visualization purposes
with open(json_path, 'r') as f:
json_result = json.load(f)
bb_list = json_result.get("list", [])
print(f"Processing {img_path.name}...")
print(f"Buffering {img_path.name}...")
# Draw boxes
pil_image = prepare_image(str(img_path), bb_list, all_labels, nb_pages)
# Block if queue is full (waiting for user to view)
image_queue.put((pil_image, json_path))
# Package metadata needed for final calculation later
metadata = {
"copie": copie,
"part": copie_part,
"schema": json_schema,
"name": json_result.get("name", "")
}
image_queue.put((pil_image, json_path, metadata))
except Exception as e:
print(f"Error processing {img_path.name}: {e}")
# Sentinel to indicate finished
image_queue.put((None, None))
image_queue.put((None, None, None))
# --- GUI Logic (Main Thread) ---
@ -121,57 +140,66 @@ class ImageViewer:
self.root = root
self.base_dir = base_dir
self.root.title("Bounding Box Viewer")
# UI Elements
self.label = tk.Label(root, text="Waiting for images...")
self.label.pack(expand=True, fill="both")
# State
# Display State
self.current_image = None
self.current_json_path = None
self.current_meta = None # Stores schema/copie info
self.is_viewing = False
self.scale_factor = 1.0 # To track resizing
self.orig_size = (1, 1) # To track original dimensions
self.scale_factor = 1.0
self.orig_size = (1, 1)
# Input Bindings
# Data Aggregation State
self.active_copie_name = None
self.accumulated_results = None # Dict with "name" and "list"
# Bindings
self.root.bind('<Return>', self.on_enter)
self.root.bind('e', self.on_edit)
self.root.bind('o', self.on_open_pdf) # <--- 3. Add Key Binding
self.root.bind('o', self.on_open_pdf)
self.root.bind('<Escape>', lambda e: self.root.quit())
self.label.bind('<Button-1>', self.on_click) # Bind left mouse click
self.label.bind('<Button-1>', self.on_click)
# Start polling queue
self.poll_queue()
def poll_queue(self):
if not self.is_viewing:
try:
pil_image, json_path = image_queue.get_nowait()
pil_image, json_path, metadata = image_queue.get_nowait()
# Handle End of Stream
if pil_image is None:
self.save_current_batch() # Save any remaining data
print("All images processed.")
self.root.quit() # Stop the program
self.root.quit()
return
self.display_image(pil_image, json_path)
# Check if we switched to a new "Copie" group
if self.active_copie_name != metadata["copie"]:
self.save_current_batch() # Write previous group to disk
# Start new batch
self.active_copie_name = metadata["copie"]
self.accumulated_results = {"name": metadata["name"], "list": []}
self.display_image(pil_image, json_path, metadata)
except queue.Empty:
pass
self.root.after(100, self.poll_queue)
def on_open_pdf(self, event):
if self.is_viewing and self.current_json_path:
# Replace .json extension with .pdf
pdf_path = self.current_json_path.with_suffix(".pdf")
def save_current_batch(self):
"""Writes the accumulated data to the main JSON file."""
if self.active_copie_name and self.accumulated_results:
main_json_path = self.base_dir / f"{self.active_copie_name}.json"
print(f"Writing aggregated result to {main_json_path}")
with open(main_json_path, 'w') as f:
json.dump(self.accumulated_results, f)
self.accumulated_results = None
print(f"Opening {pdf_path}")
# Use subprocess to run xdg-open without blocking
subprocess.Popen(['xdg-open', str(pdf_path)])
def display_image(self, pil_image, json_path):
def display_image(self, pil_image, json_path, metadata):
self.orig_size = pil_image.size
self.scale_factor = 1.0
# Resize if too large for screen
screen_h = self.root.winfo_screenheight() - 100
if pil_image.height > screen_h:
self.scale_factor = screen_h / pil_image.height
@ -179,17 +207,47 @@ class ImageViewer:
int(pil_image.height * self.scale_factor)))
self.tk_image = ImageTk.PhotoImage(pil_image)
self.label.config(image=self.tk_image, text="")
self.label.config(image=self.tk_image, text=f"Processing: {json_path.name}")
self.current_json_path = json_path
self.current_meta = metadata
self.is_viewing = True
self.root.lift()
def on_enter(self, event):
if self.is_viewing:
print("Next...")
print(f"Committing data for {self.current_json_path.name}...")
# --- CRITICAL CHANGE: Re-read JSON here to capture user edits ---
try:
with open(self.current_json_path, 'r') as f:
current_data = json.load(f)
# Perform the conversion now, post-edit
converted_items = convert_list(
current_data["list"],
self.current_meta["part"],
self.current_meta["schema"]
)
# Add to accumulator
if self.accumulated_results:
self.accumulated_results["list"].extend(converted_items)
# Update name just in case (though usually consistent per group)
self.accumulated_results["name"] = current_data.get("name", self.accumulated_results["name"])
except Exception as e:
print(f"Error re-reading/saving {self.current_json_path}: {e}")
# Advance UI
self.is_viewing = False
self.label.config(image="", text="Loading next...")
def on_open_pdf(self, event):
if self.is_viewing and self.current_json_path:
pdf_path = self.current_json_path.with_suffix(".pdf")
print(f"Opening {pdf_path}")
subprocess.Popen(['xdg-open', str(pdf_path)])
def on_edit(self, event):
if self.is_viewing and self.current_json_path:
print(f"Opening {self.current_json_path}")
@ -197,63 +255,45 @@ class ImageViewer:
def on_click(self, event):
if not self.is_viewing: return
# Map click to original image coordinates
x = int(event.x / self.scale_factor)
y = int(event.y / self.scale_factor)
w, h = self.orig_size
# Create 10px box (5px radius)
# Coordinate format: [y_min, x_min, y_max, x_max] (0-1000 scale)
box = [
int(max(0, y - 5) / h * 1000),
int(max(0, x - 5) / (w- padding) * 1000),
int(min(h, y + 5) / h * 1000),
int(min(w, x + 5) / (w - padding) * 1000),
]
box_str = "{ \"box_2d\": " + str(box) + ", \"label\": \"\" },"
print(f"Copied box at ({x},{y}): {box_str}")
self.root.clipboard_clear()
self.root.clipboard_append(box_str)
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python plotting_gui.py <directory_or_file>")
print("Usage: python plotting.py <directory_or_file>")
sys.exit(1)
input_path = Path(sys.argv[1])
files_to_process = []
if input_path.is_file():
# File mode
base_dir = input_path.parent
stem = input_path.stem
# Try to locate the image in Cutleft directory
img_path = base_dir / "Cutleft" / f"{stem}.jpg"
# Fallback: Check if user provided the jpg inside Cutleft directly
if not img_path.exists() and input_path.parent.name == "Cutleft" and input_path.suffix.lower() == ".jpg":
if not img_path.exists() and input_path.parent.name == "Cutleft":
base_dir = input_path.parent.parent
img_path = input_path
if not img_path.exists():
print(f"Error: Could not find image at {img_path}")
sys.exit(1)
files_to_process = [img_path]
else:
# Directory mode
base_dir = input_path
cutleft_dir = base_dir / "Cutleft"
if not cutleft_dir.exists():
print(f"Error: {cutleft_dir} does not exist.")
sys.exit(1)
files_to_process = sorted(cutleft_dir.glob("*.jpg"))
try:
@ -261,12 +301,10 @@ if __name__ == "__main__":
except FileNotFoundError:
all_labels = []
# Start Processing Thread
t = threading.Thread(target=worker_thread, args=(base_dir, files_to_process, all_labels))
t.daemon = True # Kill thread if main app closes
t.daemon = True
t.start()
# Start GUI
root = tk.Tk()
app = ImageViewer(root, base_dir)
root.mainloop()