Initial commit
commit
6a0c1a3958
|
|
@ -0,0 +1,450 @@
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
import json
|
||||||
|
import glob
|
||||||
|
from PIL import Image
|
||||||
|
|
||||||
|
|
||||||
|
# Results is : Copie id -> label -> {pdf_path, gemini_result, coordinates}
|
||||||
|
# Coordinates are the real coordinates (hmin, hmax) of the image in the Group
|
||||||
|
# The gemini_result coordinates should be un-normalized !
|
||||||
|
def make_dictionary(root_dir):
|
||||||
|
correction_path = os.path.join(root_dir, "correction.json")
|
||||||
|
|
||||||
|
# Load correction data
|
||||||
|
try:
|
||||||
|
with open(correction_path, 'r', encoding='utf-8') as f:
|
||||||
|
corrections = json.load(f)
|
||||||
|
except FileNotFoundError:
|
||||||
|
print(f"Error: {correction_path} not found.")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
# Dictionary: keys are IDs
|
||||||
|
result_data = {}
|
||||||
|
|
||||||
|
# Iterate through labels and items in correction.json
|
||||||
|
for label, items in corrections.items():
|
||||||
|
items = sum(items, []) # Flatten
|
||||||
|
for item in items:
|
||||||
|
# print(item)
|
||||||
|
student_id = item['id']
|
||||||
|
result_obj = item['result']
|
||||||
|
|
||||||
|
# Find coordinates
|
||||||
|
coordinates = None
|
||||||
|
height,width= None, None
|
||||||
|
label_dir = os.path.join(root_dir, label)
|
||||||
|
|
||||||
|
# Search all json files in Dir/label
|
||||||
|
json_files = glob.glob(os.path.join(label_dir, "*.json"))
|
||||||
|
for jf in json_files:
|
||||||
|
try:
|
||||||
|
with open(jf, 'r', encoding='utf-8') as f:
|
||||||
|
coord_list = json.load(f)
|
||||||
|
# Format: [["id", x, y], ...]
|
||||||
|
for entry in coord_list:
|
||||||
|
if entry[0] == student_id:
|
||||||
|
coordinates = (entry[1], entry[2])
|
||||||
|
img_path = os.path.splitext(jf)[0] + ".jpg"
|
||||||
|
with Image.open(img_path) as img:
|
||||||
|
width, height = img.size
|
||||||
|
break
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
continue
|
||||||
|
if coordinates:
|
||||||
|
break
|
||||||
|
|
||||||
|
# Construct PDF path: Dir/Copie{id}/{label}.pdf
|
||||||
|
pdf_path = os.path.join(root_dir, f"Copie{student_id}", f"{label}.pdf")
|
||||||
|
|
||||||
|
# Initialize dictionary structure for this ID if missing
|
||||||
|
if student_id not in result_data:
|
||||||
|
result_data[student_id] = {}
|
||||||
|
|
||||||
|
fb = result_obj.get("feedback", [])
|
||||||
|
for i in range(len(fb)):
|
||||||
|
el = fb[i]
|
||||||
|
if "box_2d" in el and el["box_2d"]:
|
||||||
|
el["box_2d"][0] = (el["box_2d"][0] * height)//1000
|
||||||
|
el["box_2d"][2] = (el["box_2d"][2] * height)//1000
|
||||||
|
el["box_2d"][1] = (el["box_2d"][1] * width)//1000
|
||||||
|
el["box_2d"][3] = (el["box_2d"][3] * width)//1000
|
||||||
|
|
||||||
|
# Populate the object
|
||||||
|
result_data[student_id][label] = {
|
||||||
|
"pdf_path": pdf_path,
|
||||||
|
"result": result_obj,
|
||||||
|
"coordinates": coordinates
|
||||||
|
}
|
||||||
|
|
||||||
|
return result_data
|
||||||
|
# output the resulting dictionary
|
||||||
|
# print(json.dumps(result_data, indent=2, ensure_ascii=False))
|
||||||
|
|
||||||
|
import io
|
||||||
|
import shutil
|
||||||
|
from pdf2image import convert_from_path
|
||||||
|
from PIL import Image, ImageDraw, ImageFont
|
||||||
|
import matplotlib.pyplot as plt
|
||||||
|
|
||||||
|
# plt.rcParams.update({ "text.usetex": True,
|
||||||
|
# "text.latex.preamble": r"\usepackage{bbold}"})
|
||||||
|
|
||||||
|
import re
|
||||||
|
import textwrap
|
||||||
|
|
||||||
|
def normalize_mathtext(text):
|
||||||
|
"""
|
||||||
|
Replaces LaTeX shortcuts not supported by Matplotlib's mathtext parser.
|
||||||
|
e.g. \\le -> \\leq, \\ge -> \\geq
|
||||||
|
Using lookahead (?![a-zA-Z]) prevents replacing \\left with \\leqft.
|
||||||
|
"""
|
||||||
|
text = re.sub(r'\\le(?![a-zA-Z])', r'\\leq', text)
|
||||||
|
text = re.sub(r'\\ge(?![a-zA-Z])', r'\\geq', text)
|
||||||
|
text = re.sub(r'\\implies', r'\\Rightarrow', text)
|
||||||
|
# Sometimes, Gemini escapes too much ? Not sure
|
||||||
|
text = text.replace("\\\\", "\\")
|
||||||
|
text = text.replace("\\llbracket", "[\\![")
|
||||||
|
text = text.replace("\\rrbracket", "]\\!]")
|
||||||
|
# Sometimes, Gemini doesn't escape enough. In the json, you should have \\f
|
||||||
|
text = text.replace('\f', r'\f')
|
||||||
|
text = re.sub('\u0010', "", text)
|
||||||
|
return text
|
||||||
|
|
||||||
|
import re
|
||||||
|
def wrap_latex_text(text, width_chars):
|
||||||
|
"""
|
||||||
|
Wraps text but keeps LaTeX math blocks ($...$) intact.
|
||||||
|
"""
|
||||||
|
# 1. Split text into chunks of: text, math, text, math...
|
||||||
|
# The regex looks for $...$ (non-greedy).
|
||||||
|
parts = re.split(r'(\$[^\$]+\$)', text)
|
||||||
|
|
||||||
|
# 2. Tokenize: Break plain text by spaces, keep math blocks whole.
|
||||||
|
tokens = []
|
||||||
|
for part in parts:
|
||||||
|
if part.startswith('$') and part.endswith('$'):
|
||||||
|
tokens.append(part) # Keep math block distinct
|
||||||
|
else:
|
||||||
|
tokens.extend(part.split()) # Split normal text by whitespace
|
||||||
|
|
||||||
|
# 3. Reconstruct lines using textwrap logic
|
||||||
|
lines = []
|
||||||
|
current_line = []
|
||||||
|
current_length = 0
|
||||||
|
|
||||||
|
for token in tokens:
|
||||||
|
# +1 for the space we will add
|
||||||
|
token_len = len(token)
|
||||||
|
|
||||||
|
if current_length + token_len + 1 > width_chars:
|
||||||
|
lines.append(" ".join(current_line))
|
||||||
|
current_line = [token]
|
||||||
|
current_length = token_len
|
||||||
|
else:
|
||||||
|
current_line.append(token)
|
||||||
|
current_length += token_len + 1
|
||||||
|
|
||||||
|
if current_line:
|
||||||
|
lines.append(" ".join(current_line))
|
||||||
|
|
||||||
|
res = "\n".join(lines)
|
||||||
|
return res
|
||||||
|
|
||||||
|
def render_latex_text(text, width_px, bg_color=(255, 255, 255, 255), max_lines=None,
|
||||||
|
fontsize=14):
|
||||||
|
# 1. Fix unsupported symbols
|
||||||
|
text = normalize_mathtext(text)
|
||||||
|
|
||||||
|
dpi = 100
|
||||||
|
fig_width = width_px / dpi
|
||||||
|
|
||||||
|
# Estimate characters per line based on width and font size (heuristic)
|
||||||
|
# FontSize 12 approx 0.5 inches wide for ~15 chars usually,
|
||||||
|
# but let's approximate: Width (inches) * ~10 chars/inch for size 12
|
||||||
|
chars_per_line = int(fig_width * 10)
|
||||||
|
|
||||||
|
# Pre-wrap the text respecting LaTeX boundaries
|
||||||
|
wrapped_text = wrap_latex_text(text, chars_per_line)
|
||||||
|
|
||||||
|
# Dynamic height based on actual number of lines
|
||||||
|
num_lines = wrapped_text.count('\n') + 1
|
||||||
|
if max_lines and num_lines > max_lines:
|
||||||
|
# logic to truncate if strictly necessary, or just expand
|
||||||
|
pass
|
||||||
|
|
||||||
|
# 0.3 inches per line buffer
|
||||||
|
fig_height = num_lines * 0.3 + 0.2
|
||||||
|
|
||||||
|
fig = plt.figure(figsize=(fig_width, fig_height), dpi=dpi)
|
||||||
|
|
||||||
|
# print(wrapped_text)
|
||||||
|
# print("\n\n")
|
||||||
|
# NOTE: wrap=False because we did it ourselves
|
||||||
|
plt.text(0.01, 0.95, wrapped_text, fontsize=fontsize,
|
||||||
|
verticalalignment='top', horizontalalignment='left',
|
||||||
|
wrap=False)
|
||||||
|
|
||||||
|
plt.axis('off')
|
||||||
|
|
||||||
|
buf = io.BytesIO()
|
||||||
|
plt.savefig(buf, format='png', bbox_inches='tight', pad_inches=0.1, transparent=True)
|
||||||
|
plt.close(fig)
|
||||||
|
buf.seek(0)
|
||||||
|
|
||||||
|
img = Image.open(buf).convert("RGBA")
|
||||||
|
|
||||||
|
# Create background
|
||||||
|
final_img = Image.new("RGBA", img.size, bg_color)
|
||||||
|
final_img.alpha_composite(img)
|
||||||
|
return final_img
|
||||||
|
|
||||||
|
|
||||||
|
def process_correction(root_dir, data, all_labels):
|
||||||
|
margin_left = 200
|
||||||
|
|
||||||
|
for student_id, labels in data.items():
|
||||||
|
# Prepare output directory: Dir/Anot_CopieID
|
||||||
|
output_dir = os.path.join(root_dir, f"Anot_Copie{student_id}")
|
||||||
|
|
||||||
|
# Check if already processed (Concat.jpg exists)
|
||||||
|
concat_path = os.path.join(output_dir, "Concat.jpg")
|
||||||
|
if os.path.exists(concat_path):
|
||||||
|
print(f"Skipping Copie {student_id} (Concat.jpg exists)")
|
||||||
|
continue
|
||||||
|
|
||||||
|
print("Processing :", student_id)
|
||||||
|
|
||||||
|
# Clean folder if re-processing
|
||||||
|
if os.path.exists(output_dir):
|
||||||
|
shutil.rmtree(output_dir)
|
||||||
|
os.makedirs(output_dir)
|
||||||
|
|
||||||
|
d_notes = dict.fromkeys(all_labels,"")
|
||||||
|
|
||||||
|
for label, content in labels.items():
|
||||||
|
# 1. Find PDF path
|
||||||
|
copie_folder = f"Copie{student_id}"
|
||||||
|
pdf_rel_path = os.path.join(copie_folder, f"{label}.pdf")
|
||||||
|
pdf_full_path = os.path.join(root_dir, pdf_rel_path)
|
||||||
|
|
||||||
|
if not os.path.exists(pdf_full_path):
|
||||||
|
print(f"File not found: {pdf_full_path}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# 2. Convert PDF to Image
|
||||||
|
try:
|
||||||
|
pages = convert_from_path(pdf_full_path)
|
||||||
|
|
||||||
|
# Calculate total dimensions
|
||||||
|
total_h = sum(page.height for page in pages)
|
||||||
|
max_w = max(page.width for page in pages)
|
||||||
|
|
||||||
|
# Create concatenated base image
|
||||||
|
base_img = Image.new("RGBA", (max_w, total_h), "white")
|
||||||
|
|
||||||
|
current_y = 0
|
||||||
|
for page in pages:
|
||||||
|
base_img.paste(page.convert("RGBA"), (0, current_y))
|
||||||
|
current_y += page.height
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error converting {pdf_full_path}: {e}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
coordinates = content.get('coordinates', (0, 0)) # (hmin, hmax)
|
||||||
|
hmin = coordinates[0]
|
||||||
|
result = content.get('result', {})
|
||||||
|
score = result.get('score', 0)
|
||||||
|
error = result.get('error', "")
|
||||||
|
feedbacks = result.get('feedback', [])
|
||||||
|
|
||||||
|
# Organize feedbacks
|
||||||
|
global_fb = [f for f in feedbacks if not f.get('box_2d')]
|
||||||
|
local_fb = [f for f in feedbacks if f.get('box_2d')]
|
||||||
|
# Sort local feedback by Y position
|
||||||
|
local_fb.sort(key=lambda x: x['box_2d'][0])
|
||||||
|
|
||||||
|
# --- PREPARE HEADERS ---
|
||||||
|
header_elements = []
|
||||||
|
score_text = f"{label} ; Note : {score}"
|
||||||
|
d_notes[label] = str(score)
|
||||||
|
if error and error != "null":
|
||||||
|
score_text += f" | Error: {error}"
|
||||||
|
|
||||||
|
# Render Row 1
|
||||||
|
row1_img = render_latex_text(score_text, base_img.width,fontsize=18)
|
||||||
|
header_elements.append(row1_img)
|
||||||
|
|
||||||
|
# --- OTHER HEADERS
|
||||||
|
# Render Global Feedbacks (Rows 2+)
|
||||||
|
for fb in global_fb:
|
||||||
|
fb_img = render_latex_text(fb['text'], base_img.width)
|
||||||
|
header_elements.append(fb_img)
|
||||||
|
|
||||||
|
# Calculate total new height
|
||||||
|
header_height = sum(img.height for img in header_elements)
|
||||||
|
total_height = base_img.height + header_height
|
||||||
|
|
||||||
|
# Create Canvas
|
||||||
|
final_img = Image.new("RGB", (base_img.width + margin_left, total_height), "white")
|
||||||
|
|
||||||
|
# Paste Headers
|
||||||
|
current_y = 0
|
||||||
|
for elem in header_elements:
|
||||||
|
final_img.paste(elem, (0, current_y))
|
||||||
|
current_y += elem.height
|
||||||
|
|
||||||
|
# Paste Original Image
|
||||||
|
# Note: current_y is now the offset for the actual image content
|
||||||
|
image_offset_y = current_y
|
||||||
|
final_img.paste(base_img, (margin_left, image_offset_y))
|
||||||
|
|
||||||
|
# --- DRAW LOCAL ANNOTATIONS ---
|
||||||
|
draw = ImageDraw.Draw(final_img, "RGBA")
|
||||||
|
|
||||||
|
last_text_bottom = 0
|
||||||
|
|
||||||
|
for fb in local_fb:
|
||||||
|
# raw_pos = fb.get('pos')
|
||||||
|
box = fb.get('box_2d')
|
||||||
|
if not box or len(box) < 4:
|
||||||
|
continue
|
||||||
|
|
||||||
|
ymin, xmin, ymax, xmax = box[0], box[1], box[2], box[3]
|
||||||
|
|
||||||
|
target_ymin = (ymin - hmin) + image_offset_y
|
||||||
|
target_ymax = (ymax - hmin) + image_offset_y
|
||||||
|
target_xmin = xmin + margin_left
|
||||||
|
target_xmax = xmax + margin_left
|
||||||
|
|
||||||
|
# Draw Rectangle
|
||||||
|
draw.rectangle([target_xmin, target_ymin, target_xmax, target_ymax], outline="red", width=3)
|
||||||
|
|
||||||
|
# Render Text with transparent red background
|
||||||
|
# (255, 0, 0, 50) is transparent red
|
||||||
|
txt_img = render_latex_text(
|
||||||
|
fb['text'],
|
||||||
|
width_px=500,
|
||||||
|
bg_color=(255, 200, 200, 180), # Light Red semi-transparent
|
||||||
|
max_lines=3
|
||||||
|
)
|
||||||
|
|
||||||
|
# Calculate placement
|
||||||
|
txt_h = txt_img.height
|
||||||
|
center_y = (target_ymin + target_ymax) / 2
|
||||||
|
paste_y = center_y - (txt_h / 2)
|
||||||
|
|
||||||
|
paste_y = max(paste_y, image_offset_y)
|
||||||
|
|
||||||
|
# Prevent overlap with previous text
|
||||||
|
if paste_y < last_text_bottom:
|
||||||
|
paste_y = last_text_bottom + 5 # Move down + padding
|
||||||
|
|
||||||
|
# Check for overflow and resize if necessary
|
||||||
|
required_height = int(paste_y + txt_h + 20) # +20 for bottom padding
|
||||||
|
if required_height > final_img.height:
|
||||||
|
# Create a new taller image
|
||||||
|
new_final = Image.new("RGB", (final_img.width, required_height), "white")
|
||||||
|
# Paste the current image content onto the new one
|
||||||
|
new_final.paste(final_img, (0, 0))
|
||||||
|
final_img = new_final
|
||||||
|
# Re-initialize the draw object for the new image so subsequent rectangles are drawn correctly
|
||||||
|
draw = ImageDraw.Draw(final_img, "RGBA")
|
||||||
|
|
||||||
|
|
||||||
|
# Paste in the left margin
|
||||||
|
final_img.paste(txt_img, (10, int(paste_y)), mask=txt_img)
|
||||||
|
last_text_bottom = paste_y + txt_h
|
||||||
|
|
||||||
|
# 7. Save Image
|
||||||
|
save_path = os.path.join(output_dir, f"{label}.jpg")
|
||||||
|
final_img.save(save_path)
|
||||||
|
|
||||||
|
json_path = os.path.join(output_dir, "score.json")
|
||||||
|
with open(json_path, "w") as f:
|
||||||
|
json.dump(d_notes, f, indent=4)
|
||||||
|
concat_display_image(output_dir)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
|
import subprocess
|
||||||
|
|
||||||
|
def concat_display_image(subdir):
|
||||||
|
subdir = Path(subdir)
|
||||||
|
# Find valid images, excluding previous concatenations
|
||||||
|
images = sorted([
|
||||||
|
f for f in subdir.glob("*.jpg")
|
||||||
|
if f.name != "Concat.jpg"
|
||||||
|
])
|
||||||
|
|
||||||
|
if not images:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Load images
|
||||||
|
opened_imgs = [Image.open(img) for img in images]
|
||||||
|
|
||||||
|
# Calculate dimensions (max width, sum of heights)
|
||||||
|
max_w = max(i.width for i in opened_imgs)
|
||||||
|
total_h = sum(i.height for i in opened_imgs)
|
||||||
|
|
||||||
|
# Create canvas and paste vertically
|
||||||
|
canvas = Image.new('RGB', (max_w, total_h))
|
||||||
|
current_y = 0
|
||||||
|
for img in opened_imgs:
|
||||||
|
canvas.paste(img, (0, current_y))
|
||||||
|
current_y += img.height
|
||||||
|
|
||||||
|
# Save
|
||||||
|
save_path = subdir / "Concat.jpg"
|
||||||
|
canvas.save(save_path)
|
||||||
|
print(f"Saved: {save_path}")
|
||||||
|
# subprocess.call(('xdg-open', save_path))
|
||||||
|
|
||||||
|
def concat_anot_images(directory):
|
||||||
|
root = Path(directory)
|
||||||
|
|
||||||
|
for subdir in root.iterdir():
|
||||||
|
if subdir.is_dir() and subdir.name.startswith("Anot"):
|
||||||
|
# Find valid images, excluding previous concatenations
|
||||||
|
images = sorted([
|
||||||
|
f for f in subdir.glob("*.jpg")
|
||||||
|
if f.name != "Concat.jpg"
|
||||||
|
])
|
||||||
|
|
||||||
|
if not images:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Load images
|
||||||
|
opened_imgs = [Image.open(img) for img in images]
|
||||||
|
|
||||||
|
# Calculate dimensions (max width, sum of heights)
|
||||||
|
max_w = max(i.width for i in opened_imgs)
|
||||||
|
total_h = sum(i.height for i in opened_imgs)
|
||||||
|
|
||||||
|
# Create canvas and paste vertically
|
||||||
|
canvas = Image.new('RGB', (max_w, total_h))
|
||||||
|
current_y = 0
|
||||||
|
for img in opened_imgs:
|
||||||
|
canvas.paste(img, (0, current_y))
|
||||||
|
current_y += img.height
|
||||||
|
|
||||||
|
# Save
|
||||||
|
save_path = subdir / "Concat.jpg"
|
||||||
|
canvas.save(save_path)
|
||||||
|
print(f"Saved: {save_path}")
|
||||||
|
subprocess.call(('xdg-open', save_path))
|
||||||
|
|
||||||
|
|
||||||
|
if len(sys.argv) < 2:
|
||||||
|
print("Usage: python script.py <Dir>")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
root_dir = sys.argv[1]
|
||||||
|
labels = list(filter(None, (Path(root_dir) / "labels").read_text().splitlines()))
|
||||||
|
results = make_dictionary(root_dir)
|
||||||
|
# Results is : Copie id -> label -> {pdf_path, gemini_result, coordinates}
|
||||||
|
# Coordinates are the real coordinates (hmin, hmax) of the image in the Group
|
||||||
|
# print(results,"\n\n\n")
|
||||||
|
process_correction(root_dir, results, labels)
|
||||||
|
# concat_anot_images(root_dir)
|
||||||
|
|
@ -0,0 +1,291 @@
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
if len(sys.argv) < 2:
|
||||||
|
sys.exit("Usage: python script.py InterroTest/Ex 2/Group_1.jpg OR <InputDir>")
|
||||||
|
|
||||||
|
arg_path = Path(sys.argv[1])
|
||||||
|
tasks = [] # List of tuples: (filepath_str, label_str)
|
||||||
|
results = {}
|
||||||
|
|
||||||
|
# Parse Arguments
|
||||||
|
parser = argparse.ArgumentParser()
|
||||||
|
parser.add_argument("--overwrite", action="store_true", help="Force redo requests even if output exists")
|
||||||
|
# parse_known_args is used to avoid conflicts if run inside an environment passing other flags
|
||||||
|
args, _ = parser.parse_known_args()
|
||||||
|
|
||||||
|
|
||||||
|
if arg_path.suffix == ".jpg":
|
||||||
|
# Preserve original behaviour
|
||||||
|
INPUT_DIR = str(arg_path.parents[1])
|
||||||
|
FULL_LABEL = arg_path.parent.name
|
||||||
|
tasks.append((str(arg_path), FULL_LABEL))
|
||||||
|
results[FULL_LABEL] = []
|
||||||
|
else:
|
||||||
|
# Directory behaviour
|
||||||
|
INPUT_DIR = str(arg_path)
|
||||||
|
if not arg_path.exists():
|
||||||
|
sys.exit(f"Directory {INPUT_DIR} not found.")
|
||||||
|
|
||||||
|
for sub in arg_path.iterdir():
|
||||||
|
if sub.is_dir() and sub.name.startswith("Ex"):
|
||||||
|
label = sub.name
|
||||||
|
results[label] = []
|
||||||
|
for img in sub.glob("*.jpg"):
|
||||||
|
tasks.append((str(img), label))
|
||||||
|
|
||||||
|
my_prompt = """I'm giving you an image of several written answers to an exam.
|
||||||
|
|
||||||
|
Each answer is separated by a black horizontal line, and underneath,
|
||||||
|
to the left, is indicated the ID of the answer, from `01` to `50`.
|
||||||
|
|
||||||
|
I want you to score each answer, from 0 to 4, you may score half
|
||||||
|
points, such as 2.5. Even if a result is wrong, if the reasoning is
|
||||||
|
correct and could lead to a right answer, you should give at least
|
||||||
|
half the points.
|
||||||
|
|
||||||
|
You also need to give feedback to the student, in french :
|
||||||
|
- which part of his answer is wrong,
|
||||||
|
- why is it wrong
|
||||||
|
- possibly, what he should have done instead.
|
||||||
|
Your feedback may contain LaTeX fragments written like `$a^2 + b^2 = c^2$`.
|
||||||
|
|
||||||
|
If your score is note 4, you should always provide some feedback
|
||||||
|
explaining what's missing.
|
||||||
|
|
||||||
|
For each piece of feedback, if it is related to a specific part of the
|
||||||
|
answer that is wrong, you may provide a `box_2d`, to locate this
|
||||||
|
specific part of the answer. This `box_2d` should be in the form
|
||||||
|
[ymin, xmin, ymax, xmax] normalized to 0-1000. If you do not provide
|
||||||
|
one, set `box_2d` to `null`.
|
||||||
|
|
||||||
|
If the answer is correct, there is no need to provide feedback.
|
||||||
|
|
||||||
|
For example, if the student says a function is continuous when it
|
||||||
|
isn't, provide the coordinates where the word «continuous» is. If a
|
||||||
|
calculation went wrong, gives the coordinates of the step where it
|
||||||
|
goes wrong, and as feedback, what went wrong.
|
||||||
|
|
||||||
|
You should also give me a measure of confidence, from 0 to 1 that you
|
||||||
|
were able to correctly understand the answer. A score below 0.5 means
|
||||||
|
that you think it is likely that you couldn't understand an important
|
||||||
|
part.
|
||||||
|
|
||||||
|
In some case, you may find that either
|
||||||
|
- The student didn't answer the right question. Set the score to 0.
|
||||||
|
Since it could be a labeling error, indicate is by setting `error`
|
||||||
|
to \"wrong-label\".
|
||||||
|
- You can find an answer to another question of the exercice (taking
|
||||||
|
more than a couple of lines). Score the question you are supposed
|
||||||
|
to score, but set `error` to \"additional-answer\".
|
||||||
|
If there's no error, set `error` to `\"\"`.
|
||||||
|
|
||||||
|
You will answer using json describing a list of dictionary with a key
|
||||||
|
\"id\", and a key \"result\" that contains the \"score\", the \"confidence\", a
|
||||||
|
list \"feedback\", and possibly an \"error\". Like this example :
|
||||||
|
|
||||||
|
[{ \"id\": \"01\",
|
||||||
|
\"result\": {\"score\" : 2.5,
|
||||||
|
\"confidence\" : 0.8,
|
||||||
|
\"feedback\": [{text: \"Un retour générique. Il faut apprendre le cours.\", box_2d: null},
|
||||||
|
{text: \"Non, la fonction n'est pas forcément continue\", pos: [145, 280, 340, 500]}],
|
||||||
|
\"error\": \"\"}
|
||||||
|
},
|
||||||
|
{ \"id\": \"04\",
|
||||||
|
\"result\": {\"score\" : 4.,
|
||||||
|
\"confidence\" : 0.9,
|
||||||
|
\"feedback\" : []
|
||||||
|
\"error\": \"\" }
|
||||||
|
}
|
||||||
|
]
|
||||||
|
|
||||||
|
Here is the text of the exercice of the exam :
|
||||||
|
|
||||||
|
```
|
||||||
|
<<text>>
|
||||||
|
```
|
||||||
|
|
||||||
|
Here is a possible correct answer :
|
||||||
|
|
||||||
|
```
|
||||||
|
<<corr>>
|
||||||
|
```
|
||||||
|
|
||||||
|
Here is some additional scoring instructions :
|
||||||
|
|
||||||
|
```
|
||||||
|
<<persp>>
|
||||||
|
```
|
||||||
|
|
||||||
|
You are asked to score the question or exercice labeled `<<label>>`,
|
||||||
|
do not score or give feedback to any other question."""
|
||||||
|
|
||||||
|
def make_prompt(full_label):
|
||||||
|
l = full_label.split(" ")
|
||||||
|
ex_label = l[0] + " " + l[1]
|
||||||
|
text = (Path(INPUT_DIR) / "Text" / ex_label).read_text()
|
||||||
|
corr = (Path(INPUT_DIR) / "Sol" / ex_label).read_text()
|
||||||
|
persp = (Path(INPUT_DIR) / "Persp" / ex_label).read_text()
|
||||||
|
if persp == "":
|
||||||
|
perps = "There is no additional scoring instructions."
|
||||||
|
return my_prompt.replace("<<text>>", text).replace("<<corr>>", corr).replace("<<persp>>", persp).replace("<<label>>", full_label)
|
||||||
|
|
||||||
|
from google import genai
|
||||||
|
from google.genai import types
|
||||||
|
import base64
|
||||||
|
import json
|
||||||
|
from pathlib import Path
|
||||||
|
import os
|
||||||
|
import threading
|
||||||
|
import concurrent.futures
|
||||||
|
|
||||||
|
# PROXY_URL = "http://192.168.241.1:3128"
|
||||||
|
PROXY_URL = None
|
||||||
|
|
||||||
|
if PROXY_URL:
|
||||||
|
os.environ["http_proxy"] = PROXY_URL
|
||||||
|
os.environ["https_proxy"] = PROXY_URL
|
||||||
|
|
||||||
|
|
||||||
|
MODEL_ID = "gemini-3-pro-preview"
|
||||||
|
api_key="REMOVED_API_KEY"
|
||||||
|
|
||||||
|
from pydantic import BaseModel, Field, TypeAdapter
|
||||||
|
from typing import List, Optional, Tuple
|
||||||
|
|
||||||
|
class FeedbackItem(BaseModel):
|
||||||
|
text: str = Field(description="Feedback content")
|
||||||
|
box_2d: Optional[List[int]] = Field(None, description="box coordinates or null")
|
||||||
|
|
||||||
|
class ResultData(BaseModel):
|
||||||
|
score: float = Field(description="The numeric score")
|
||||||
|
confidence: float = Field(description="Confidence level")
|
||||||
|
feedback: List[FeedbackItem] = Field(description="List of feedback items")
|
||||||
|
error: str = Field(description="Indicates if an error occurred")
|
||||||
|
|
||||||
|
class EvaluationEntry(BaseModel):
|
||||||
|
id: str = Field(description="Entry identifier")
|
||||||
|
result: ResultData = Field(description="Result details")
|
||||||
|
|
||||||
|
# The root model for parsing is be: List[EvaluationEntry]
|
||||||
|
|
||||||
|
def generate_request(file, full_label):
|
||||||
|
"""Generates request for Gemini."""
|
||||||
|
prompt = make_prompt(full_label)
|
||||||
|
image_path = Path(file)
|
||||||
|
|
||||||
|
contents = [
|
||||||
|
types.Content(
|
||||||
|
role="user",
|
||||||
|
parts=[
|
||||||
|
types.Part.from_bytes(
|
||||||
|
data=image_path.read_bytes(),
|
||||||
|
mime_type="image/jpeg"
|
||||||
|
),
|
||||||
|
types.Part.from_text(text=prompt),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
]
|
||||||
|
|
||||||
|
generate_content_config = types.GenerateContentConfig(
|
||||||
|
temperature=1.0,
|
||||||
|
top_p=0.95,
|
||||||
|
seed=0,
|
||||||
|
max_output_tokens=65535,
|
||||||
|
response_mime_type= "application/json",
|
||||||
|
response_json_schema= TypeAdapter(List[EvaluationEntry]).json_schema()
|
||||||
|
# Thinking config is not compatible with response_json ? Unsure.
|
||||||
|
# thinking_config=types.ThinkingConfig(
|
||||||
|
# thinking_budget=-1,
|
||||||
|
# ),
|
||||||
|
# thinking_config=types.ThinkingConfig(
|
||||||
|
# include_thoughts=True,
|
||||||
|
# thinking_budget=1024, # Optimized for Gemini 3 capabilities
|
||||||
|
# ),
|
||||||
|
)
|
||||||
|
return (contents, generate_content_config)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
client = genai.Client(api_key=api_key)
|
||||||
|
output_path = Path(INPUT_DIR) / "correction.json"
|
||||||
|
progress_path = Path(INPUT_DIR) / "correction_progress.json"
|
||||||
|
start_time = time.time()
|
||||||
|
overwrite = args.overwrite
|
||||||
|
completed_tasks = []
|
||||||
|
|
||||||
|
# --- Lock for thread-safe file writing ---
|
||||||
|
io_lock = threading.Lock()
|
||||||
|
|
||||||
|
if overwrite:
|
||||||
|
if output_path.exists():
|
||||||
|
output_path.unlink()
|
||||||
|
if progress_path.exists():
|
||||||
|
progress_path.unlink()
|
||||||
|
else:
|
||||||
|
if progress_path.exists():
|
||||||
|
with open(progress_path, "r", encoding="utf-8") as f:
|
||||||
|
completed_tasks = json.load(f)
|
||||||
|
# Reload existing results to avoid overwriting them with partial data
|
||||||
|
if output_path.exists():
|
||||||
|
with open(output_path, "r", encoding="utf-8") as f:
|
||||||
|
results = json.load(f)
|
||||||
|
|
||||||
|
# Create a set for O(1) lookup. Normalize paths to strings.
|
||||||
|
completed_set = set((str(f), l) for f, l in completed_tasks)
|
||||||
|
|
||||||
|
# Filter tasks first to avoid overhead in threads
|
||||||
|
tasks_to_process = [t for t in tasks if (str(t[0]), t[1]) not in completed_set]
|
||||||
|
|
||||||
|
def process_single_task(task_tuple):
|
||||||
|
file_path, label = task_tuple
|
||||||
|
|
||||||
|
try:
|
||||||
|
contents, config = generate_request(file_path, label)
|
||||||
|
print(f"Asking Gemini: {label} {file_path}")
|
||||||
|
|
||||||
|
full_response_text = ""
|
||||||
|
# Assuming client is thread-safe (usually is).
|
||||||
|
# If not, create a new client instance inside this function.
|
||||||
|
for chunk in client.models.generate_content_stream(
|
||||||
|
model=MODEL_ID,
|
||||||
|
contents=contents,
|
||||||
|
config=config,
|
||||||
|
):
|
||||||
|
if chunk.text:
|
||||||
|
full_response_text += chunk.text
|
||||||
|
|
||||||
|
# Parse JSON
|
||||||
|
json_data = json.loads(full_response_text)
|
||||||
|
print(f"Gemini answered correctly for {file_path}")
|
||||||
|
|
||||||
|
# --- CRITICAL: Use Lock for writing shared data ---
|
||||||
|
with io_lock:
|
||||||
|
if label not in results:
|
||||||
|
results[label] = [] # Ensure key exists if not using defaultdict
|
||||||
|
results[label].append(json_data)
|
||||||
|
|
||||||
|
# Save Results
|
||||||
|
with open(output_path, "w", encoding="utf-8") as f:
|
||||||
|
json.dump(results, f, indent=2)
|
||||||
|
|
||||||
|
# Save Progress (Optional, based on your logic)
|
||||||
|
# completed_tasks.append((str(file_path), label))
|
||||||
|
# with open(progress_path, "w", encoding="utf-8") as f:
|
||||||
|
# json.dump(completed_tasks, f)
|
||||||
|
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
print(f"Error decoding JSON for {file_path}", file=sys.stderr)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Exception processing {file_path}: {e}", file=sys.stderr)
|
||||||
|
|
||||||
|
print(f"Starting processing on {len(tasks_to_process)} tasks with 6 threads...")
|
||||||
|
|
||||||
|
with concurrent.futures.ThreadPoolExecutor(max_workers=6) as executor:
|
||||||
|
executor.map(process_single_task, tasks_to_process)
|
||||||
|
|
||||||
|
end_time = time.time()
|
||||||
|
print("Time elapsed : ", end_time - start_time,"\n\n\n\n\n")
|
||||||
|
|
@ -0,0 +1,239 @@
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
import tkinter as tk
|
||||||
|
from threading import Thread
|
||||||
|
from queue import Queue, Empty
|
||||||
|
from pdf2image import convert_from_path
|
||||||
|
from PIL import Image, ImageTk
|
||||||
|
|
||||||
|
# --- Configuration ---
|
||||||
|
DELIMITER_WIDTH = 5
|
||||||
|
DELIMITER_COLOR = (0, 0, 0)
|
||||||
|
OUTPUT_SIZE = (1000, 1000)
|
||||||
|
|
||||||
|
if len(sys.argv) < 2:
|
||||||
|
sys.exit("Usage: python script.py <directory_path_or_file_path>")
|
||||||
|
|
||||||
|
path_arg = sys.argv[1]
|
||||||
|
files = []
|
||||||
|
INPUT_DIR = ""
|
||||||
|
|
||||||
|
if os.path.isfile(path_arg) and path_arg.lower().endswith('.pdf'):
|
||||||
|
INPUT_DIR = os.path.dirname(path_arg)
|
||||||
|
files = [os.path.basename(path_arg)]
|
||||||
|
elif os.path.isdir(path_arg):
|
||||||
|
INPUT_DIR = path_arg
|
||||||
|
files = sorted([f for f in os.listdir(INPUT_DIR) if f.lower().endswith('.pdf')])
|
||||||
|
else:
|
||||||
|
sys.exit("Error: Input must be a directory or a PDF file.")
|
||||||
|
|
||||||
|
OUTPUT_DIR = os.path.join(INPUT_DIR, 'Cutleft')
|
||||||
|
|
||||||
|
if not os.path.exists(OUTPUT_DIR):
|
||||||
|
os.makedirs(OUTPUT_DIR)
|
||||||
|
|
||||||
|
# --- Processing Logic ---
|
||||||
|
|
||||||
|
def process_single_pdf(filename, shift_offset=0):
|
||||||
|
"""
|
||||||
|
Converts PDF to stitched JPG image (PIL object).
|
||||||
|
"""
|
||||||
|
pdf_path = os.path.join(INPUT_DIR, filename)
|
||||||
|
try:
|
||||||
|
pages = convert_from_path(pdf_path)
|
||||||
|
cropped_images = []
|
||||||
|
|
||||||
|
for img in pages:
|
||||||
|
width, height = img.size
|
||||||
|
left = 100 + shift_offset
|
||||||
|
right = (width // 3) + 100 + shift_offset
|
||||||
|
|
||||||
|
# Ensure crop box is valid
|
||||||
|
left = max(0, left)
|
||||||
|
right = min(width, right)
|
||||||
|
|
||||||
|
if right > left:
|
||||||
|
crop_box = (left, 0, right, height)
|
||||||
|
cropped = img.crop(crop_box)
|
||||||
|
cropped_images.append(cropped)
|
||||||
|
|
||||||
|
if not cropped_images:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Combine
|
||||||
|
num_images = len(cropped_images)
|
||||||
|
total_width = sum(img.width for img in cropped_images) + (num_images - 1) * DELIMITER_WIDTH
|
||||||
|
max_height = max(img.height for img in cropped_images)
|
||||||
|
|
||||||
|
combined = Image.new('RGB', (total_width, max_height), color=(255, 255, 255))
|
||||||
|
|
||||||
|
x_offset = 0
|
||||||
|
for idx, img in enumerate(cropped_images):
|
||||||
|
combined.paste(img, (x_offset, 0))
|
||||||
|
x_offset += img.width
|
||||||
|
if idx < num_images - 1:
|
||||||
|
delimiter = Image.new('RGB', (DELIMITER_WIDTH, max_height), color=DELIMITER_COLOR)
|
||||||
|
combined.paste(delimiter, (x_offset, 0))
|
||||||
|
x_offset += DELIMITER_WIDTH
|
||||||
|
|
||||||
|
# Resize
|
||||||
|
resized = combined.resize(OUTPUT_SIZE, Image.LANCZOS)
|
||||||
|
return resized
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error processing {filename}: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def save_image(pil_img, filename):
|
||||||
|
output_filename = os.path.splitext(filename)[0] + ".jpg"
|
||||||
|
output_path = os.path.join(OUTPUT_DIR, output_filename)
|
||||||
|
pil_img.save(output_path, "JPEG", quality=95)
|
||||||
|
print(f"Saved: {output_filename}")
|
||||||
|
|
||||||
|
# --- GUI Application ---
|
||||||
|
|
||||||
|
class ImageReviewer:
|
||||||
|
def __init__(self, file_list):
|
||||||
|
self.files = file_list
|
||||||
|
self.index = 0
|
||||||
|
self.current_shift = 0
|
||||||
|
self.current_pil = None
|
||||||
|
self.is_processing = False
|
||||||
|
|
||||||
|
# Queue for pre-fetched images (index, image)
|
||||||
|
self.prefetch_queue = Queue(maxsize=1)
|
||||||
|
# Queue for manual re-processing results
|
||||||
|
self.manual_queue = Queue()
|
||||||
|
|
||||||
|
# Setup GUI
|
||||||
|
self.root = tk.Tk()
|
||||||
|
self.root.title("PDF Cropper")
|
||||||
|
self.root.geometry("+100+100")
|
||||||
|
|
||||||
|
self.label_img = tk.Label(self.root)
|
||||||
|
self.label_img.pack()
|
||||||
|
|
||||||
|
self.label_info = tk.Label(self.root, text="", font=("Arial", 12, "bold"))
|
||||||
|
self.label_info.pack(pady=5)
|
||||||
|
|
||||||
|
# Bindings
|
||||||
|
self.root.bind('<Return>', self.on_next)
|
||||||
|
self.root.bind('n', lambda e: self.on_shift(50))
|
||||||
|
self.root.bind('N', lambda e: self.on_shift(100))
|
||||||
|
self.root.bind('t', lambda e: self.on_shift(-50))
|
||||||
|
|
||||||
|
# Start background pre-fetcher
|
||||||
|
self.bg_thread = Thread(target=self.prefetch_worker, daemon=True)
|
||||||
|
self.bg_thread.start()
|
||||||
|
|
||||||
|
# Load first image
|
||||||
|
self.load_current_image()
|
||||||
|
|
||||||
|
self.root.lift()
|
||||||
|
self.root.focus_force()
|
||||||
|
self.root.mainloop()
|
||||||
|
|
||||||
|
def prefetch_worker(self):
|
||||||
|
"""Background thread to process the NEXT image constantly."""
|
||||||
|
idx_to_process = 0
|
||||||
|
while True:
|
||||||
|
target = self.index + 1
|
||||||
|
if target < len(self.files):
|
||||||
|
if idx_to_process != target:
|
||||||
|
fname = self.files[target]
|
||||||
|
img = process_single_pdf(fname, shift_offset=0)
|
||||||
|
if img:
|
||||||
|
self.prefetch_queue.put((target, img)) # Blocks if full
|
||||||
|
idx_to_process = target
|
||||||
|
|
||||||
|
# Crucial fix: Sleep briefly to release CPU
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
def load_current_image(self, use_prefetch=False):
|
||||||
|
if self.index >= len(self.files):
|
||||||
|
print("All files processed.")
|
||||||
|
self.root.destroy()
|
||||||
|
return
|
||||||
|
|
||||||
|
filename = self.files[self.index]
|
||||||
|
self.is_processing = False
|
||||||
|
|
||||||
|
img_found = None
|
||||||
|
|
||||||
|
if use_prefetch and not self.prefetch_queue.empty():
|
||||||
|
q_idx, q_img = self.prefetch_queue.queue[0]
|
||||||
|
if q_idx == self.index:
|
||||||
|
_, img_found = self.prefetch_queue.get()
|
||||||
|
self.current_shift = 0
|
||||||
|
print(f"Loaded {filename} from prefetch.")
|
||||||
|
|
||||||
|
if img_found:
|
||||||
|
self.current_pil = img_found
|
||||||
|
save_image(self.current_pil, filename)
|
||||||
|
self.update_display(filename)
|
||||||
|
else:
|
||||||
|
# Not in queue (first load or queue mismatch), process manually
|
||||||
|
self.trigger_processing(filename, self.current_shift)
|
||||||
|
|
||||||
|
def trigger_processing(self, filename, shift):
|
||||||
|
"""Starts a thread to process image so GUI doesn't freeze."""
|
||||||
|
self.is_processing = True
|
||||||
|
self.label_info.configure(text=f"Processing {filename} (Shift {shift})... Please wait.", fg="red")
|
||||||
|
|
||||||
|
def worker():
|
||||||
|
img = process_single_pdf(filename, shift)
|
||||||
|
self.manual_queue.put(img)
|
||||||
|
|
||||||
|
Thread(target=worker, daemon=True).start()
|
||||||
|
self.check_manual_queue(filename)
|
||||||
|
|
||||||
|
def check_manual_queue(self, filename):
|
||||||
|
"""Polls the manual queue for result."""
|
||||||
|
try:
|
||||||
|
img = self.manual_queue.get_nowait()
|
||||||
|
self.current_pil = img
|
||||||
|
if self.current_pil:
|
||||||
|
save_image(self.current_pil, filename)
|
||||||
|
self.update_display(filename)
|
||||||
|
else:
|
||||||
|
print(f"Failed to process {filename}, skipping.")
|
||||||
|
self.index += 1
|
||||||
|
self.load_current_image(use_prefetch=True)
|
||||||
|
self.is_processing = False
|
||||||
|
except Empty:
|
||||||
|
# Check again in 100ms
|
||||||
|
self.root.after(100, lambda: self.check_manual_queue(filename))
|
||||||
|
|
||||||
|
def update_display(self, filename):
|
||||||
|
if self.current_pil:
|
||||||
|
tk_image = ImageTk.PhotoImage(self.current_pil)
|
||||||
|
self.label_img.configure(image=tk_image)
|
||||||
|
self.label_img.image = tk_image
|
||||||
|
self.label_info.configure(
|
||||||
|
text=f"[{self.index+1}/{len(self.files)}] {filename} | Shift: {self.current_shift}px\n"
|
||||||
|
f"Enter: Next | n: +50 | N: +100 | t: -50",
|
||||||
|
fg="black"
|
||||||
|
)
|
||||||
|
|
||||||
|
def on_shift(self, amount):
|
||||||
|
if self.is_processing:
|
||||||
|
return # Ignore keys while processing
|
||||||
|
self.current_shift += amount
|
||||||
|
print(f"Applying shift: {self.current_shift}")
|
||||||
|
self.trigger_processing(self.files[self.index], self.current_shift)
|
||||||
|
|
||||||
|
def on_next(self, event):
|
||||||
|
if self.is_processing:
|
||||||
|
return
|
||||||
|
self.index += 1
|
||||||
|
self.current_shift = 0
|
||||||
|
self.load_current_image(use_prefetch=True)
|
||||||
|
|
||||||
|
# --- Entry Point ---
|
||||||
|
if __name__ == "__main__":
|
||||||
|
if not files:
|
||||||
|
print("No PDF files found.")
|
||||||
|
else:
|
||||||
|
app = ImageReviewer(files)
|
||||||
|
|
||||||
|
|
@ -0,0 +1,119 @@
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
import glob
|
||||||
|
import json
|
||||||
|
import urllib.request
|
||||||
|
|
||||||
|
import re
|
||||||
|
|
||||||
|
def replace_dots(text):
|
||||||
|
# (?m) enables multiline mode so ^ matches start of each line
|
||||||
|
return re.sub(r"(?m)^(\s*.)\.", r"\1)", text)
|
||||||
|
|
||||||
|
|
||||||
|
def format_indices(indices):
|
||||||
|
"""Converts [2, 1] to '2)a)' based on requirements."""
|
||||||
|
if not indices:
|
||||||
|
return ""
|
||||||
|
|
||||||
|
# First level: numeric (1 -> 1))
|
||||||
|
res = f"{indices[0]})"
|
||||||
|
|
||||||
|
# Second level: alpha (1 -> a))
|
||||||
|
if len(indices) > 1:
|
||||||
|
res += f"{chr(96 + indices[1])})"
|
||||||
|
|
||||||
|
return res
|
||||||
|
|
||||||
|
|
||||||
|
def process_directory(directory):
|
||||||
|
# Find the first .tex file in the directory
|
||||||
|
tex_files = glob.glob(os.path.join(directory, "*.tex"))
|
||||||
|
if not tex_files:
|
||||||
|
print(f"No .tex file found in {directory}")
|
||||||
|
return
|
||||||
|
|
||||||
|
tex_file = tex_files[0]
|
||||||
|
|
||||||
|
# Prepare output directories
|
||||||
|
paths = {
|
||||||
|
'Text': os.path.join(directory, "Text"),
|
||||||
|
'Sol': os.path.join(directory, "Sol"),
|
||||||
|
'Persp': os.path.join(directory, "Persp")
|
||||||
|
}
|
||||||
|
for p in paths.values():
|
||||||
|
os.makedirs(p, exist_ok=True)
|
||||||
|
|
||||||
|
labels_file = os.path.join(directory, "labels")
|
||||||
|
current_ex_num = 1
|
||||||
|
|
||||||
|
|
||||||
|
with open(tex_file, 'r', encoding='utf-8') as f_in, \
|
||||||
|
open(labels_file, 'w', encoding='utf-8') as f_labels:
|
||||||
|
for line in f_in:
|
||||||
|
if line.startswith("%%SHEETINFO :"):
|
||||||
|
try:
|
||||||
|
json_str = line.split(":", 1)[1].strip()
|
||||||
|
data = json.loads(json_str)
|
||||||
|
|
||||||
|
# 2. Handle Labels
|
||||||
|
indexes = data.get('indexes', [])
|
||||||
|
if not indexes:
|
||||||
|
f_labels.write(f"Ex {current_ex_num}\n")
|
||||||
|
else:
|
||||||
|
for item in indexes:
|
||||||
|
suffix = format_indices(item['indices'])
|
||||||
|
if suffix != "":
|
||||||
|
f_labels.write(f"Ex {current_ex_num} : {suffix}\n")
|
||||||
|
else:
|
||||||
|
f_labels.write(f"Ex {current_ex_num}\n")
|
||||||
|
|
||||||
|
# Construct 'ids' parameter
|
||||||
|
ex_id = str(data['id'])
|
||||||
|
selection = data.get('select')
|
||||||
|
|
||||||
|
if selection is not None:
|
||||||
|
# Format: "ID.sel1,sel2"
|
||||||
|
sel_s = [i+1 for i in selection]
|
||||||
|
ids = f"{ex_id}.{','.join(map(str, sel_s))}"
|
||||||
|
else:
|
||||||
|
ids = ex_id
|
||||||
|
|
||||||
|
# Construct URL
|
||||||
|
url = f"http://localhost:8080/exercices/emacs/{ids}?pretty=true&all=true&persp=true"
|
||||||
|
|
||||||
|
# Perform GET request
|
||||||
|
with urllib.request.urlopen(url) as response:
|
||||||
|
content = response.read().decode('utf-8')
|
||||||
|
|
||||||
|
# 4. Split and Save content
|
||||||
|
parts = content.split('###')
|
||||||
|
|
||||||
|
# Ensure we have at least 3 parts, pad if necessary to avoid crashes
|
||||||
|
while len(parts) < 3:
|
||||||
|
parts.append("")
|
||||||
|
|
||||||
|
base_filename = f"Ex {current_ex_num}"
|
||||||
|
|
||||||
|
with open(os.path.join(paths['Text'], base_filename), 'w', encoding='utf-8') as f:
|
||||||
|
f.write(replace_dots(parts[0].strip("\n")))
|
||||||
|
|
||||||
|
with open(os.path.join(paths['Sol'], base_filename), 'w', encoding='utf-8') as f:
|
||||||
|
f.write(replace_dots(parts[1].strip("\n")))
|
||||||
|
|
||||||
|
with open(os.path.join(paths['Persp'], base_filename), 'w', encoding='utf-8') as f:
|
||||||
|
f.write(replace_dots(parts[2].strip("\n")))
|
||||||
|
|
||||||
|
current_ex_num += 1
|
||||||
|
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
print(f"Error decoding JSON in line: {line.strip()}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error processing {ids}: {e}")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
if len(sys.argv) < 2:
|
||||||
|
print("Usage: python script.py <Dir>")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
process_directory(sys.argv[1])
|
||||||
|
|
@ -0,0 +1,166 @@
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
from google import genai
|
||||||
|
from google.genai import types
|
||||||
|
import base64
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
if len(sys.argv) < 2:
|
||||||
|
sys.exit("Usage: python script.py <directory_path>")
|
||||||
|
|
||||||
|
INPUT_DIR = sys.argv[1]
|
||||||
|
CUTLEFT_DIR = os.path.join(INPUT_DIR, 'Cutleft')
|
||||||
|
|
||||||
|
|
||||||
|
MODEL_ID = "gemini-3-flash-preview"
|
||||||
|
api_key="REMOVED_API_KEY"
|
||||||
|
|
||||||
|
my_prompt = """I'm giving you an image of the left columns of a written exam.
|
||||||
|
Students answer several exercises, which can have several questions.
|
||||||
|
|
||||||
|
The image consists of several columns, separated by vertical black
|
||||||
|
lines. The image should be read top to bottom and then left to right,
|
||||||
|
meaning first column, then second column, etc.
|
||||||
|
|
||||||
|
In their sheet, students delimit exercises and questions using
|
||||||
|
delimiters such as `Ex 1`, or `Exercice 1`, and `1)` or `a)`. You need
|
||||||
|
to give me the bounding boxes of each delimiter.
|
||||||
|
|
||||||
|
When giving the bounding box of the first question of an exercise, the
|
||||||
|
box should be large enough to contain both the exercice label
|
||||||
|
(`Exercice i`) and the question label (`1)`) parts.
|
||||||
|
|
||||||
|
You also need to give me the student name. It should appear on the top
|
||||||
|
left of the image. Disregard any mention of `MPSI 3`, it is their
|
||||||
|
class. A list of possible student names will be given below.
|
||||||
|
|
||||||
|
You will answer with a JSON object, containing a `name` field with the
|
||||||
|
name, and a `list` field, with the list of the bounding boxes and
|
||||||
|
their labels. The box_2d should be [ymin, xmin, ymax, xmax] normalized
|
||||||
|
to 0-1000.
|
||||||
|
|
||||||
|
Here is an example :
|
||||||
|
{\"name\" : \"John Doe\", \"list\" : [{\"box_2d\": (10, 20, 30, 40), \"label\" : \"Ex 1 : 1)\"}]}
|
||||||
|
|
||||||
|
Do not provide a box_2d for the name. Only for the labels.
|
||||||
|
|
||||||
|
You may find the same label present several times, as a student either
|
||||||
|
recall the current label on a new page, or adds content to its answer
|
||||||
|
later on. Give the position of each instance of each label.
|
||||||
|
|
||||||
|
For this exam you should look for the labels given below, separated by
|
||||||
|
newlines. A student need not have answered every question, so some may
|
||||||
|
be missing.
|
||||||
|
|
||||||
|
##labels##
|
||||||
|
|
||||||
|
Here's a list of the names of the students, pick the one that matches
|
||||||
|
the best or `\"Unknown\"` if you cannot read the name
|
||||||
|
|
||||||
|
##names##"""
|
||||||
|
from tqdm import tqdm
|
||||||
|
|
||||||
|
def process_batch(directory):
|
||||||
|
client = genai.Client(api_key=api_key)
|
||||||
|
image_files = list(Path(directory).glob("*.jpg"))
|
||||||
|
|
||||||
|
if not image_files:
|
||||||
|
print("No .jpg files found.")
|
||||||
|
return
|
||||||
|
|
||||||
|
# 1. Upload images to File API (Batch requirement)
|
||||||
|
batch_requests = []
|
||||||
|
print(f"Uploading {len(image_files)} images to File API...")
|
||||||
|
|
||||||
|
for img_path in tqdm(image_files, unit="img"):
|
||||||
|
# Upload file
|
||||||
|
file_ref = client.files.upload(path=img_path)
|
||||||
|
|
||||||
|
# Construct Request for JSONL
|
||||||
|
# Note: We must serialize config manually for the JSONL body
|
||||||
|
req_body = {
|
||||||
|
"contents": [
|
||||||
|
{"role": "user", "parts": [
|
||||||
|
{"fileData": {"mimeType": file_ref.mime_type, "fileUri": file_ref.uri}},
|
||||||
|
{"text": my_prompt}
|
||||||
|
]}
|
||||||
|
],
|
||||||
|
"generationConfig": {
|
||||||
|
"temperature": 1.0,
|
||||||
|
"topP": 0.95,
|
||||||
|
"maxOutputTokens": 65535,
|
||||||
|
"thinkingConfig": {"thinkingBudget": -1}
|
||||||
|
},
|
||||||
|
"safetySettings": [
|
||||||
|
{"category": cat, "threshold": "BLOCK_NONE"}
|
||||||
|
for cat in ["HARM_CATEGORY_HATE_SPEECH", "HARM_CATEGORY_DANGEROUS_CONTENT",
|
||||||
|
"HARM_CATEGORY_SEXUALLY_EXPLICIT", "HARM_CATEGORY_HARASSMENT"]
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
# Batch Request Entry
|
||||||
|
batch_requests.append({
|
||||||
|
"custom_id": img_path.name,
|
||||||
|
"method": "POST",
|
||||||
|
"url": f"/v1beta/models/{MODEL_ID}:generateContent",
|
||||||
|
"body": req_body
|
||||||
|
})
|
||||||
|
|
||||||
|
# 2. Create and Upload Batch Source File (JSONL)
|
||||||
|
batch_file_path = os.path.join(INPUT_DIR, "batch_input.jsonl")
|
||||||
|
with open(batch_file_path, "w") as f:
|
||||||
|
for req in batch_requests:
|
||||||
|
f.write(json.dumps(req) + "\n")
|
||||||
|
|
||||||
|
batch_input_file = client.files.upload(path=batch_file_path)
|
||||||
|
|
||||||
|
# 3. Submit Batch Job
|
||||||
|
print("Submitting batch job...")
|
||||||
|
job = client.batches.create(
|
||||||
|
model=MODEL_ID,
|
||||||
|
src=batch_input_file.name
|
||||||
|
)
|
||||||
|
print(f"Batch Job ID: {job.name}")
|
||||||
|
|
||||||
|
# 4. Poll for Completion
|
||||||
|
pbar = tqdm(desc="Processing Batch", unit="poll")
|
||||||
|
while True:
|
||||||
|
job = client.batches.get(name=job.name)
|
||||||
|
if job.state == "ACTIVE":
|
||||||
|
pbar.set_description("Processing")
|
||||||
|
elif job.state == "SUCCEEDED" or job.state == "FAILED":
|
||||||
|
break
|
||||||
|
|
||||||
|
pbar.update(1)
|
||||||
|
time.sleep(10) # Poll every 10 seconds
|
||||||
|
|
||||||
|
pbar.close()
|
||||||
|
|
||||||
|
if job.state == "FAILED":
|
||||||
|
print(f"Batch job failed: {job.error}")
|
||||||
|
return
|
||||||
|
|
||||||
|
# 5. Retrieve and Save Results
|
||||||
|
print("Downloading results...")
|
||||||
|
# The output file is a remote URI, we download its content
|
||||||
|
output_content = client.files.content(path=job.output_file.name)
|
||||||
|
|
||||||
|
# Parse JSONL output and map back to files
|
||||||
|
# Output format: {"custom_id": "...", "response": {...}}
|
||||||
|
results_saved = 0
|
||||||
|
for line in output_content.decode("utf-8").splitlines():
|
||||||
|
if not line: continue
|
||||||
|
result = json.loads(line)
|
||||||
|
|
||||||
|
filename = result.get("custom_id")
|
||||||
|
if filename:
|
||||||
|
output_path = Path(directory) / f"{filename}.json"
|
||||||
|
with open(output_path, "w", encoding="utf-8") as f:
|
||||||
|
# Save the full response part
|
||||||
|
json.dump(result.get("response", {}), f, indent=2)
|
||||||
|
results_saved += 1
|
||||||
|
|
||||||
|
print(f"Batch complete. Saved {results_saved} result files.")
|
||||||
|
|
||||||
|
process_batch(CUTLEFT_DIR)
|
||||||
|
|
@ -0,0 +1,119 @@
|
||||||
|
from google import genai
|
||||||
|
from google.genai import types
|
||||||
|
import base64
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
MODEL_ID = "gemini-3-flash-preview"
|
||||||
|
api_key="REMOVED_API_KEY"
|
||||||
|
|
||||||
|
my_prompt = """I'm giving you an image of the left columns of a written exam.
|
||||||
|
Students answer several exercises, which can have several questions.
|
||||||
|
|
||||||
|
The image consists of several columns, separated by vertical black
|
||||||
|
lines. The image should be read top to bottom and then left to right,
|
||||||
|
meaning first column, then second column, etc.
|
||||||
|
|
||||||
|
In their sheet, students delimit exercises and questions using
|
||||||
|
delimiters such as `Ex 1`, or `Exercice 1`, and `1)` or `a)`. You need
|
||||||
|
to give me the bounding boxes of each delimiter.
|
||||||
|
|
||||||
|
When giving the bounding box of the first question of an exercise, the
|
||||||
|
box should be large enough to contain both the exercice label
|
||||||
|
(`Exercice i`) and the question label (`1)`) parts.
|
||||||
|
|
||||||
|
You also need to give me the student name. It should appear on the top
|
||||||
|
left of the image. Disregard any mention of `MPSI 3`, it is their
|
||||||
|
class. A list of possible student names will be given below.
|
||||||
|
|
||||||
|
You will answer with a JSON object, containing a `name` field with the
|
||||||
|
name, and a `list` field, with the list of the bounding boxes and
|
||||||
|
their labels. The box_2d should be [ymin, xmin, ymax, xmax] normalized
|
||||||
|
to 0-1000.
|
||||||
|
|
||||||
|
Here is an example :
|
||||||
|
{\"name\" : \"John Doe\", \"list\" : [{\"box_2d\": (10, 20, 30, 40), \"label\" : \"Ex 1 : 1)\"}]}
|
||||||
|
|
||||||
|
Do not provide a box_2d for the name. Only for the labels.
|
||||||
|
|
||||||
|
You may find the same label present several times, as a student either
|
||||||
|
recall the current label on a new page, or adds content to its answer
|
||||||
|
later on. Give the position of each instance of each label.
|
||||||
|
|
||||||
|
For this exam you should look for the labels given below, separated by
|
||||||
|
newlines. A student need not have answered every question, so some may
|
||||||
|
be missing.
|
||||||
|
|
||||||
|
##labels##
|
||||||
|
|
||||||
|
Here's a list of the names of the students, pick the one that matches
|
||||||
|
the best or `\"Unknown\"` if you cannot read the name
|
||||||
|
|
||||||
|
##names##"""
|
||||||
|
|
||||||
|
from pydantic import BaseModel, Field
|
||||||
|
from typing import List
|
||||||
|
|
||||||
|
class BoxItem(BaseModel):
|
||||||
|
box_2d: List[int] = Field(description="Bounding box coordinates (e.g., [ymin, xmin, ymax, xmax])")
|
||||||
|
label: str = Field(description="The label associated with the specific box")
|
||||||
|
|
||||||
|
class AnnotationData(BaseModel):
|
||||||
|
name: str = Field(description="The name identifier")
|
||||||
|
list: List[BoxItem] = Field(description="List of bounding box items")
|
||||||
|
|
||||||
|
|
||||||
|
def generate_request(file, labels):
|
||||||
|
"""Generates request for Gemini."""
|
||||||
|
|
||||||
|
image_path = Path(file)
|
||||||
|
|
||||||
|
contents = [
|
||||||
|
types.Content(
|
||||||
|
role="user",
|
||||||
|
parts=[
|
||||||
|
types.Part.from_bytes(
|
||||||
|
data=image_path.read_bytes(),
|
||||||
|
mime_type="image/jpeg"
|
||||||
|
),
|
||||||
|
types.Part.from_text(text=my_prompt + labels),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
]
|
||||||
|
|
||||||
|
generate_content_config = types.GenerateContentConfig(
|
||||||
|
temperature=1.0,
|
||||||
|
top_p=0.95,
|
||||||
|
seed=0,
|
||||||
|
max_output_tokens=65535,
|
||||||
|
response_mime_type= "application/json",
|
||||||
|
response_json_schema= AnnotationData.model_json_schema(),
|
||||||
|
# Thinking config is not compatible with response_json ? Unsure.
|
||||||
|
# thinking_config=types.ThinkingConfig(
|
||||||
|
# thinking_budget=-1,
|
||||||
|
# ),
|
||||||
|
# thinking_config=types.ThinkingConfig(
|
||||||
|
# include_thoughts=True,
|
||||||
|
# thinking_budget=1024, # Optimized for Gemini 3 capabilities
|
||||||
|
# ),
|
||||||
|
)
|
||||||
|
return (contents, generate_content_config)
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
|
||||||
|
if len(sys.argv) < 2:
|
||||||
|
sys.exit("Usage: python script.py Staging/cutleft1000.jpg labels")
|
||||||
|
|
||||||
|
INPUT_FILE = sys.argv[1]
|
||||||
|
contents, config = generate_request(INPUT_FILE)
|
||||||
|
|
||||||
|
client = genai.Client(api_key=api_key)
|
||||||
|
|
||||||
|
for chunk in client.models.generate_content_stream(
|
||||||
|
model=MODEL_ID,
|
||||||
|
contents=contents,
|
||||||
|
config=config,
|
||||||
|
):
|
||||||
|
if chunk.text:
|
||||||
|
print(chunk.text, end="", flush=True)
|
||||||
|
|
@ -0,0 +1,170 @@
|
||||||
|
from google import genai
|
||||||
|
from google.genai import types
|
||||||
|
import base64
|
||||||
|
from pathlib import Path
|
||||||
|
from pydantic import BaseModel, Field
|
||||||
|
from typing import List
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
import json
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
MODEL_ID = "gemini-3-flash-preview"
|
||||||
|
api_key="REMOVED_API_KEY"
|
||||||
|
|
||||||
|
my_prompt = """I'm giving you an image of the left columns of a written exam.
|
||||||
|
Students answer several exercises, which can have several questions.
|
||||||
|
|
||||||
|
The image consists of several columns, separated by vertical black
|
||||||
|
lines. The image should be read top to bottom and then left to right,
|
||||||
|
meaning first column, then second column, etc.
|
||||||
|
|
||||||
|
In their sheet, students delimit exercises and questions using
|
||||||
|
delimiters such as `Ex 1`, or `Exercice 1`, and `1)` or `a)`. You need
|
||||||
|
to give me the bounding boxes of each delimiter.
|
||||||
|
|
||||||
|
When giving the bounding box of the first question of an exercise, the
|
||||||
|
box should be large enough to contain both the exercice label
|
||||||
|
(`Exercice i`) and the question label (`1)`) parts.
|
||||||
|
|
||||||
|
You also need to give me the student name. It should appear on the top
|
||||||
|
left of the image. Disregard any mention of `MPSI 3`, it is their
|
||||||
|
class. A list of possible student names will be given below.
|
||||||
|
|
||||||
|
You will answer with a JSON object, containing a `name` field with the
|
||||||
|
name, and a `list` field, with the list of the bounding boxes and
|
||||||
|
their labels. The box_2d should be [ymin, xmin, ymax, xmax] normalized
|
||||||
|
to 0-1000.
|
||||||
|
|
||||||
|
Here is an example :
|
||||||
|
{\"name\" : \"John Doe\", \"list\" : [{\"box_2d\": (10, 20, 30, 40), \"label\" : \"Ex 1 : 1)\"}]}
|
||||||
|
|
||||||
|
Do not provide a box_2d for the name. Only for the labels.
|
||||||
|
|
||||||
|
You may find the same label present several times, as a student either
|
||||||
|
recall the current label on a new page, or adds content to its answer
|
||||||
|
later on. Give the position of each instance of each label.
|
||||||
|
|
||||||
|
For this exam you should look for the labels given below, separated by
|
||||||
|
newlines. A student need not have answered every question, so some may
|
||||||
|
be missing.
|
||||||
|
|
||||||
|
##labels##
|
||||||
|
|
||||||
|
Here's a list of the names of the students, pick the one that matches
|
||||||
|
the best or `\"Unknown\"` if you cannot read the name
|
||||||
|
|
||||||
|
##names##"""
|
||||||
|
|
||||||
|
class BoxItem(BaseModel):
|
||||||
|
box_2d: List[int] = Field(description="Bounding box coordinates (e.g., [ymin, xmin, ymax, xmax])")
|
||||||
|
label: str = Field(description="The label associated with the specific box")
|
||||||
|
|
||||||
|
class AnnotationData(BaseModel):
|
||||||
|
name: str = Field(description="The name identifier")
|
||||||
|
list: List[BoxItem] = Field(description="List of bounding box items")
|
||||||
|
|
||||||
|
|
||||||
|
def generate_request(file, labels, names):
|
||||||
|
"""Generates request for Gemini."""
|
||||||
|
|
||||||
|
image_path = Path(file)
|
||||||
|
|
||||||
|
text = my_prompt.replace("##labels##",labels).replace("##names##", names)
|
||||||
|
contents = [
|
||||||
|
types.Content(
|
||||||
|
role="user",
|
||||||
|
parts=[
|
||||||
|
types.Part.from_bytes(
|
||||||
|
data=image_path.read_bytes(),
|
||||||
|
mime_type="image/jpeg"
|
||||||
|
),
|
||||||
|
types.Part.from_text(text=text),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
]
|
||||||
|
|
||||||
|
generate_content_config = types.GenerateContentConfig(
|
||||||
|
temperature=1.0,
|
||||||
|
top_p=0.95,
|
||||||
|
seed=0,
|
||||||
|
max_output_tokens=65535,
|
||||||
|
response_mime_type= "application/json",
|
||||||
|
response_json_schema= AnnotationData.model_json_schema(),
|
||||||
|
)
|
||||||
|
return (contents, generate_content_config)
|
||||||
|
|
||||||
|
# Argument Parsing
|
||||||
|
parser = argparse.ArgumentParser(description="Process a directory or specific file using Gemini.")
|
||||||
|
parser.add_argument("input_path", help="The input directory or specific file (e.g., Dir/File.pdf)")
|
||||||
|
parser.add_argument("--overwrite", action="store_true", help="Regenerate output even if it exists")
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
input_arg = Path(args.input_path)
|
||||||
|
image_files = []
|
||||||
|
|
||||||
|
# Logic to handle Directory vs File argument
|
||||||
|
if input_arg.is_file():
|
||||||
|
# If argument is Dir/Copiedd.pdf
|
||||||
|
INPUT_DIR = input_arg.parent
|
||||||
|
CUTLEFT_DIR = INPUT_DIR / 'Cutleft'
|
||||||
|
|
||||||
|
# Look for matching .jpg in Cutleft (e.g., Copiedd.jpg)
|
||||||
|
target_image = CUTLEFT_DIR / f"{input_arg.stem}.jpg"
|
||||||
|
|
||||||
|
if target_image.exists():
|
||||||
|
image_files = [target_image]
|
||||||
|
else:
|
||||||
|
print(f"Error: Corresponding image {target_image} not found.")
|
||||||
|
sys.exit(1)
|
||||||
|
else:
|
||||||
|
# If argument is just Dir
|
||||||
|
INPUT_DIR = input_arg
|
||||||
|
CUTLEFT_DIR = INPUT_DIR / 'Cutleft'
|
||||||
|
image_files = sorted(list(CUTLEFT_DIR.glob("*.jpg")))
|
||||||
|
|
||||||
|
labels = (INPUT_DIR / "labels").read_text()
|
||||||
|
names = (INPUT_DIR / "names").read_text()
|
||||||
|
client = genai.Client(api_key=api_key)
|
||||||
|
|
||||||
|
# Target > 3.0s per request to stay under 20 RPM
|
||||||
|
TARGET_INTERVAL = 3.5
|
||||||
|
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
|
||||||
|
def process_image(image_file):
|
||||||
|
start_time = time.time()
|
||||||
|
base_name, _ = os.path.splitext(image_file.name)
|
||||||
|
output_json = os.path.join(INPUT_DIR, f"{base_name}.json")
|
||||||
|
|
||||||
|
# Skip if already processed unless overwrite is enabled
|
||||||
|
if os.path.exists(output_json) and not args.overwrite:
|
||||||
|
print(f"Skipping {image_file.name}, output exists.")
|
||||||
|
return
|
||||||
|
|
||||||
|
print(f"Processing {image_file.name}...")
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Prepare and execute request
|
||||||
|
contents, config = generate_request(image_file, labels, names)
|
||||||
|
response = client.models.generate_content(
|
||||||
|
model=MODEL_ID,
|
||||||
|
contents=contents,
|
||||||
|
config=config
|
||||||
|
)
|
||||||
|
annota = AnnotationData.model_validate_json(response.text)
|
||||||
|
# Save result
|
||||||
|
with open(output_json, "w", encoding="utf-8") as f:
|
||||||
|
json.dump(annota.model_dump(), f, indent=2)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error processing {image_file.name}: {e}")
|
||||||
|
|
||||||
|
# Rate Limiting (Note: This limits per-thread, not global total)
|
||||||
|
elapsed = time.time() - start_time
|
||||||
|
time.sleep(max(0, TARGET_INTERVAL - elapsed))
|
||||||
|
|
||||||
|
# Run with 6 threads
|
||||||
|
with ThreadPoolExecutor(max_workers=6) as executor:
|
||||||
|
executor.map(process_image, image_files)
|
||||||
|
|
@ -0,0 +1,48 @@
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import json
|
||||||
|
import shutil
|
||||||
|
import re
|
||||||
|
|
||||||
|
def main():
|
||||||
|
if len(sys.argv) < 2:
|
||||||
|
print("Usage: python rename_copies.py <directory_path>")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
work_dir = sys.argv[1]
|
||||||
|
target_subdir = os.path.join(work_dir, "Copies annotées")
|
||||||
|
|
||||||
|
# Create destination folder if it doesn't exist
|
||||||
|
os.makedirs(target_subdir, exist_ok=True)
|
||||||
|
|
||||||
|
# Regex to match "CopieXX.json" and capture XX
|
||||||
|
pattern = re.compile(r"^Copie(\d+)\.json$")
|
||||||
|
|
||||||
|
for filename in os.listdir(work_dir):
|
||||||
|
match = pattern.match(filename)
|
||||||
|
if match:
|
||||||
|
copie_id = match.group(1)
|
||||||
|
json_path = os.path.join(work_dir, filename)
|
||||||
|
source_folder = os.path.join(work_dir, f"Anot_Copie{copie_id}")
|
||||||
|
|
||||||
|
# Check if corresponding folder exists
|
||||||
|
if os.path.isdir(source_folder):
|
||||||
|
try:
|
||||||
|
with open(json_path, 'r', encoding='utf-8') as f:
|
||||||
|
data = json.load(f)
|
||||||
|
name = data.get("name", "Unknown").strip()
|
||||||
|
|
||||||
|
# Sanitize filename (remove characters invalid in paths)
|
||||||
|
safe_name = re.sub(r'[<>:"/\\|?*]', '', name)
|
||||||
|
|
||||||
|
new_folder_name = f"{safe_name} ({copie_id})"
|
||||||
|
dest_path = os.path.join(target_subdir, new_folder_name)
|
||||||
|
|
||||||
|
print(f"Moving '{source_folder}' -> '{dest_path}'")
|
||||||
|
shutil.move(source_folder, dest_path)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error processing {filename}: {e}")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
|
@ -0,0 +1,248 @@
|
||||||
|
import os
|
||||||
|
import json
|
||||||
|
import re
|
||||||
|
import sys
|
||||||
|
import shutil
|
||||||
|
from collections import defaultdict
|
||||||
|
from PIL import Image, ImageDraw, ImageFont
|
||||||
|
from pdf2image import convert_from_path, pdfinfo_from_path
|
||||||
|
|
||||||
|
# Configuration
|
||||||
|
DPI = 200 # Good balance for readability and size
|
||||||
|
A4_HEIGHT_INCHES = 11.69
|
||||||
|
FULL_PAGE_PX = int(A4_HEIGHT_INCHES * DPI)
|
||||||
|
MAX_GROUP_HEIGHT = 2.5 * FULL_PAGE_PX
|
||||||
|
MAX_GROUP_COUNT = 15
|
||||||
|
SEPARATOR_HEIGHT = 20
|
||||||
|
LABEL_HEIGHT = 50
|
||||||
|
MAX_FILE_SIZE_BYTES = 2.5 * 1024 * 1024 # 2MB
|
||||||
|
|
||||||
|
|
||||||
|
# def get_pdf_height(path):
|
||||||
|
# """Returns height in pixels at defined DPI without rendering."""
|
||||||
|
# try:
|
||||||
|
# info = pdfinfo_from_path(path)
|
||||||
|
# # info["Page size"] is usually "width height pts"
|
||||||
|
# # 1 pt = 1/72 inch
|
||||||
|
# # We assume single page PDFs as per prompt implication, or take the first page
|
||||||
|
# pts_height = float(info['Page size'].split(' ')[2]) if 'Page size' in info else 0
|
||||||
|
# return int((pts_height / 72.0) * DPI)
|
||||||
|
# except Exception as e:
|
||||||
|
# print(f"Error reading {path}: {e}")
|
||||||
|
# return 0
|
||||||
|
|
||||||
|
def get_pdf_height(path):
|
||||||
|
"""Returns total height of all pages in pixels at defined DPI."""
|
||||||
|
try:
|
||||||
|
info = pdfinfo_from_path(path)
|
||||||
|
# Get page count (default to 1)
|
||||||
|
num_pages = int(info["Pages"]) if "Pages" in info else 1
|
||||||
|
|
||||||
|
# 1 pt = 1/72 inch
|
||||||
|
pts_height = float(info['Page size'].split(' ')[2]) if 'Page size' in info else 0
|
||||||
|
|
||||||
|
# Height of one page in pixels
|
||||||
|
single_page_px = int((pts_height / 72.0) * DPI)
|
||||||
|
|
||||||
|
# Return total height
|
||||||
|
return single_page_px * num_pages
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error reading {path}: {e}")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
def collect_files(root_dir):
|
||||||
|
"""
|
||||||
|
Scans Dir/Copiedd/identifier.pdf
|
||||||
|
Returns dict: {identifier: [(dd, path, height), ...]}
|
||||||
|
"""
|
||||||
|
data = defaultdict(list)
|
||||||
|
|
||||||
|
# Regex to match 'Copie' followed by 2 digits
|
||||||
|
folder_pattern = re.compile(r'Copie(\d{2})')
|
||||||
|
|
||||||
|
for root, dirs, files in os.walk(root_dir):
|
||||||
|
folder_name = os.path.basename(root)
|
||||||
|
match = folder_pattern.match(folder_name)
|
||||||
|
|
||||||
|
if match:
|
||||||
|
dd = match.group(1)
|
||||||
|
for file in files:
|
||||||
|
if file.lower().endswith('.pdf'):
|
||||||
|
identifier = os.path.splitext(file)[0]
|
||||||
|
full_path = os.path.join(root, file)
|
||||||
|
|
||||||
|
# Calculate height (c)
|
||||||
|
height = get_pdf_height(full_path)
|
||||||
|
|
||||||
|
# Store triple (a, b, c)
|
||||||
|
data[identifier].append((dd, full_path, height))
|
||||||
|
return data
|
||||||
|
|
||||||
|
def group_files(file_list):
|
||||||
|
"""Groups files based on constraints."""
|
||||||
|
sorted_files = sorted(file_list, key=lambda x: x[0])
|
||||||
|
|
||||||
|
groups = []
|
||||||
|
current_group = []
|
||||||
|
current_height = 0
|
||||||
|
|
||||||
|
for item in sorted_files:
|
||||||
|
dd, path, height = item
|
||||||
|
|
||||||
|
# Calculate added height (image + separator + approx text space)
|
||||||
|
# We add separator height only if it's not the first image
|
||||||
|
added_overhead = SEPARATOR_HEIGHT + 30 if current_group else 0
|
||||||
|
|
||||||
|
# Check conditions
|
||||||
|
if (len(current_group) >= MAX_GROUP_COUNT or
|
||||||
|
(current_height + height + added_overhead) > MAX_GROUP_HEIGHT):
|
||||||
|
|
||||||
|
# Push current group and start new
|
||||||
|
if current_group:
|
||||||
|
groups.append(current_group)
|
||||||
|
current_group = []
|
||||||
|
current_height = 0
|
||||||
|
added_overhead = 0 # Reset for first file of new group
|
||||||
|
|
||||||
|
current_group.append(item)
|
||||||
|
current_height += height + added_overhead
|
||||||
|
|
||||||
|
if current_group:
|
||||||
|
groups.append(current_group)
|
||||||
|
|
||||||
|
return groups
|
||||||
|
|
||||||
|
def stitch_pdf_pages(images_list):
|
||||||
|
"""Vertically concatenates a list of PIL images with no separator."""
|
||||||
|
if not images_list:
|
||||||
|
return None
|
||||||
|
if len(images_list) == 1:
|
||||||
|
return images_list[0]
|
||||||
|
|
||||||
|
max_width = max(img.width for img in images_list)
|
||||||
|
total_height = sum(img.height for img in images_list)
|
||||||
|
|
||||||
|
combined = Image.new('RGB', (max_width, total_height), 'white')
|
||||||
|
|
||||||
|
y_offset = 0
|
||||||
|
for img in images_list:
|
||||||
|
combined.paste(img, (0, y_offset))
|
||||||
|
y_offset += img.height
|
||||||
|
|
||||||
|
return combined
|
||||||
|
|
||||||
|
def create_jpg(identifier, group_index, group, root_dir):
|
||||||
|
images = []
|
||||||
|
metadata = [] # To store (id, h_min, h_max)
|
||||||
|
|
||||||
|
# Render PDFs to images
|
||||||
|
for dd, path, _ in group:
|
||||||
|
try:
|
||||||
|
# Convert pdf to image
|
||||||
|
imgs = convert_from_path(path, dpi=DPI)
|
||||||
|
# if imgs:
|
||||||
|
# images.append((dd, imgs[0])) # Assume 1 page per pdf !! ??
|
||||||
|
if imgs:
|
||||||
|
# Concatenate multi-page PDFs into one single image object
|
||||||
|
combined_img = stitch_pdf_pages(imgs)
|
||||||
|
if combined_img:
|
||||||
|
images.append((dd, combined_img))
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Failed to convert {path}: {e}")
|
||||||
|
|
||||||
|
if not images:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Calculate total canvas size
|
||||||
|
total_width = max(img.width for _, img in images)
|
||||||
|
total_height = sum(img.height for _, img in images) + ((len(images) - 1) * SEPARATOR_HEIGHT)
|
||||||
|
|
||||||
|
# Add space for text (approx 40px per label)
|
||||||
|
total_height += len(images) * LABEL_HEIGHT
|
||||||
|
|
||||||
|
canvas = Image.new('RGB', (total_width, total_height), 'white')
|
||||||
|
draw = ImageDraw.Draw(canvas)
|
||||||
|
|
||||||
|
# Try loading a font, fallback to default
|
||||||
|
try:
|
||||||
|
font = ImageFont.truetype("DejaVuSans.ttf", 40)
|
||||||
|
except IOError:
|
||||||
|
print("font not found")
|
||||||
|
font = ImageFont.load_default()
|
||||||
|
|
||||||
|
y_offset = 0
|
||||||
|
|
||||||
|
for i, (dd, img) in enumerate(images):
|
||||||
|
# Draw separator if not first image
|
||||||
|
if i > 0:
|
||||||
|
draw.rectangle([0, y_offset, total_width, y_offset + SEPARATOR_HEIGHT], fill='black')
|
||||||
|
y_offset += SEPARATOR_HEIGHT
|
||||||
|
|
||||||
|
# Draw Text (dd)
|
||||||
|
text = f"ID: {dd}"
|
||||||
|
draw.text((10, y_offset + 5), text, fill='black', font=font)
|
||||||
|
y_offset += LABEL_HEIGHT # Space for text
|
||||||
|
|
||||||
|
# Record Image Coordinates
|
||||||
|
h_min = y_offset
|
||||||
|
h_max = y_offset + img.height
|
||||||
|
metadata.append((dd, h_min, h_max))
|
||||||
|
|
||||||
|
# Draw Image
|
||||||
|
x_pos = 0
|
||||||
|
canvas.paste(img, (x_pos, y_offset))
|
||||||
|
y_offset += img.height
|
||||||
|
|
||||||
|
target_folder = os.path.join(root_dir, identifier)
|
||||||
|
os.makedirs(target_folder, exist_ok=True)
|
||||||
|
|
||||||
|
# Save JSON metadata
|
||||||
|
json_filename = f"Group_{group_index+1}.json"
|
||||||
|
json_path = os.path.join(target_folder, json_filename)
|
||||||
|
with open(json_path, 'w') as f:
|
||||||
|
json.dump(metadata, f)
|
||||||
|
|
||||||
|
# Save with size constraints
|
||||||
|
output_filename = f"Group_{group_index+1}.jpg"
|
||||||
|
output_path = os.path.join(target_folder, output_filename)
|
||||||
|
|
||||||
|
quality = 90
|
||||||
|
while quality > 10:
|
||||||
|
canvas.save(output_path, "JPEG", quality=quality, optimize=True)
|
||||||
|
if os.path.getsize(output_path) <= MAX_FILE_SIZE_BYTES:
|
||||||
|
if quality < 90:
|
||||||
|
print("quality : ", quality)
|
||||||
|
break
|
||||||
|
quality -= 5
|
||||||
|
|
||||||
|
print(f"Saved {output_path} ({os.path.getsize(output_path)/1024/1024:.2f} MB)")
|
||||||
|
|
||||||
|
def main():
|
||||||
|
if len(sys.argv) < 2:
|
||||||
|
print("Usage: python app.py <Path_to_Dir>")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
root_dir = sys.argv[1]
|
||||||
|
|
||||||
|
print("Scanning files...")
|
||||||
|
data = collect_files(root_dir)
|
||||||
|
|
||||||
|
print(f"Found {len(data)} identifiers. Processing...")
|
||||||
|
|
||||||
|
for identifier, files_info in data.items():
|
||||||
|
# Clear output directory if it exists
|
||||||
|
target_folder = os.path.join(root_dir, identifier)
|
||||||
|
if os.path.exists(target_folder):
|
||||||
|
shutil.rmtree(target_folder)
|
||||||
|
os.makedirs(target_folder, exist_ok=True)
|
||||||
|
|
||||||
|
# files_info is list of (dd, path, height)
|
||||||
|
file_groups = group_files(files_info)
|
||||||
|
|
||||||
|
for idx, group in enumerate(file_groups):
|
||||||
|
create_jpg(identifier, idx, group, root_dir)
|
||||||
|
|
||||||
|
print("Done.")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
|
@ -0,0 +1,377 @@
|
||||||
|
import fitz # PyMuPDF
|
||||||
|
import tkinter as tk
|
||||||
|
from tkinter import messagebox
|
||||||
|
from PIL import Image, ImageTk, ImageDraw
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
import glob
|
||||||
|
import shutil
|
||||||
|
from pypdf import PdfReader, PdfWriter
|
||||||
|
|
||||||
|
# --- Constants ---
|
||||||
|
# Conversion factor: 1 cm to points (1 inch = 2.54 cm, 72 points = 1 inch)
|
||||||
|
CM_TO_POINTS = (1 / 2.54) * 72
|
||||||
|
|
||||||
|
def list_pdf_files(directory):
|
||||||
|
return list(reversed(sorted(glob.glob(os.path.join(directory, "*.pdf")))))
|
||||||
|
|
||||||
|
class PDFPreviewer:
|
||||||
|
|
||||||
|
def setup_next_file(self):
|
||||||
|
self.num += 1
|
||||||
|
if len(self.inputs) == 0:
|
||||||
|
return False
|
||||||
|
self.pdf_path = self.inputs.pop()
|
||||||
|
self.file_rotation = 0
|
||||||
|
self.base_name = os.path.splitext(os.path.basename(self.pdf_path))[0]
|
||||||
|
self.split_dir = f"{self.base_name}_split"
|
||||||
|
self.reorder_dir = f"{self.base_name}_reorder"
|
||||||
|
if self.output_dir is None:
|
||||||
|
self.final_file = f"{self.base_name}_final"
|
||||||
|
else:
|
||||||
|
self.final_file = f"{self.output_dir}/Copie{self.num:02}.pdf"
|
||||||
|
self.current_page_index = 0
|
||||||
|
self.page_settings = []
|
||||||
|
self.processing = False # Flag to prevent multiple finish calls
|
||||||
|
try:
|
||||||
|
self.doc = fitz.open(self.pdf_path)
|
||||||
|
except Exception as e:
|
||||||
|
messagebox.showerror("Error", f"Failed to open PDF file: {e}")
|
||||||
|
self.master.destroy()
|
||||||
|
return
|
||||||
|
self.master.title(f"PDF Splitter - {os.path.basename(self.pdf_path)}")
|
||||||
|
return True
|
||||||
|
|
||||||
|
def __init__(self, master, path):
|
||||||
|
"""
|
||||||
|
Initializes the application.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
master (tk.Tk): The root Tkinter window.
|
||||||
|
pdf_path (str): The path to the input PDF file.
|
||||||
|
"""
|
||||||
|
if not os.path.exists(path):
|
||||||
|
messagebox.showerror("Error", f"File not found: {path}")
|
||||||
|
master.destroy()
|
||||||
|
return
|
||||||
|
|
||||||
|
if os.path.isdir(path):
|
||||||
|
self.inputs = list_pdf_files(path)
|
||||||
|
self.output_dir = f"{path}_out"
|
||||||
|
else:
|
||||||
|
self.inputs = [path]
|
||||||
|
self.output_dir = None
|
||||||
|
|
||||||
|
self.master = master
|
||||||
|
self.num = 0
|
||||||
|
self.global_rotation = 0 # Rotation appliquée à tous les fichiers
|
||||||
|
self.setup_next_file()
|
||||||
|
|
||||||
|
self._resize_job = None # For debouncing resize events
|
||||||
|
|
||||||
|
self._initialize_current_page_settings()
|
||||||
|
|
||||||
|
# --- UI Setup ---
|
||||||
|
# Set a reasonable initial size for the window
|
||||||
|
self.master.geometry("800x1000")
|
||||||
|
|
||||||
|
instructions = (
|
||||||
|
"← / → : Move line 1cm left/right\n"
|
||||||
|
"'c': Rotate page 180°, 'C' : rotate all pages, ',' : rotate all files\n"
|
||||||
|
"t s r n: keep left, next page, keep none, keep right\n"
|
||||||
|
"z: send this page to the end\n"
|
||||||
|
)
|
||||||
|
self.info_label = tk.Label(master, text=instructions, justify=tk.LEFT)
|
||||||
|
self.info_label.pack(pady=5, side=tk.TOP)
|
||||||
|
|
||||||
|
self.page_label = tk.Label(master, text="", font=("Helvetica", 12))
|
||||||
|
self.page_label.pack(pady=5, side=tk.TOP)
|
||||||
|
|
||||||
|
# Canvas for PDF page preview
|
||||||
|
self.canvas = tk.Canvas(master, bg="gray")
|
||||||
|
self.canvas.pack(fill="both", expand=True)
|
||||||
|
|
||||||
|
# --- Bindings ---
|
||||||
|
self.master.bind("<Left>", self.move_line_left)
|
||||||
|
self.master.bind("<Right>", self.move_line_right)
|
||||||
|
self.master.bind("<Return>", self.confirm_and_next_page)
|
||||||
|
self.master.bind("c", self.rotate_page)
|
||||||
|
self.master.bind("C", self.rotate_all_pages)
|
||||||
|
self.master.bind(",", self.rotate_all_files)
|
||||||
|
self.master.bind("t", self.keep_left)
|
||||||
|
self.master.bind("n", self.keep_right)
|
||||||
|
self.master.bind("s", self.confirm_and_next_page)
|
||||||
|
self.master.bind("r", self.discard_page)
|
||||||
|
self.master.bind("z", self.send_page_end)
|
||||||
|
|
||||||
|
# Bind the resize event on the canvas
|
||||||
|
self.canvas.bind("<Configure>", self.on_resize)
|
||||||
|
|
||||||
|
self.current_zoom = 1.0
|
||||||
|
|
||||||
|
def on_resize(self, event):
|
||||||
|
"""
|
||||||
|
Handles window resize events by reloading the page.
|
||||||
|
Uses a "debounce" mechanism to avoid excessive redrawing.
|
||||||
|
"""
|
||||||
|
if self._resize_job:
|
||||||
|
self.master.after_cancel(self._resize_job)
|
||||||
|
self._resize_job = self.master.after(250, self.load_page) # Redraw after 250ms of no resizing
|
||||||
|
|
||||||
|
def _initialize_current_page_settings(self):
|
||||||
|
"""Initializes or resets the settings for the current page."""
|
||||||
|
if self.current_page_index < len(self.doc):
|
||||||
|
page = self.doc.load_page(self.current_page_index)
|
||||||
|
self.current_line_x = page.rect.width / 2
|
||||||
|
self.current_rotation = 0
|
||||||
|
|
||||||
|
def load_page(self):
|
||||||
|
"""Loads and displays the current page on the canvas, scaled to fit."""
|
||||||
|
if self.current_page_index >= len(self.doc):
|
||||||
|
if not self.processing:
|
||||||
|
self.processing = True
|
||||||
|
self.finish_and_process()
|
||||||
|
return
|
||||||
|
|
||||||
|
page = self.doc.load_page(self.current_page_index)
|
||||||
|
self.page_label.config(text=f"Page {self.current_page_index + 1} of {len(self.doc)}")
|
||||||
|
|
||||||
|
# --- Calculate Scaling ---
|
||||||
|
canvas_width = self.canvas.winfo_width()
|
||||||
|
canvas_height = self.canvas.winfo_height()
|
||||||
|
|
||||||
|
# Don't try to render if the canvas has no size yet.
|
||||||
|
if canvas_width <= 1 or canvas_height <= 1:
|
||||||
|
return
|
||||||
|
|
||||||
|
page_rect = page.rect
|
||||||
|
zoom_x = canvas_width / page_rect.width
|
||||||
|
zoom_y = canvas_height / page_rect.height
|
||||||
|
# Use 98% of the smallest zoom factor to leave a small margin
|
||||||
|
self.current_zoom = min(zoom_x, zoom_y) * 0.98
|
||||||
|
|
||||||
|
# --- Render Page ---
|
||||||
|
mat = fitz.Matrix(self.current_zoom, self.current_zoom)
|
||||||
|
pix = page.get_pixmap(matrix=mat, alpha=False)
|
||||||
|
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
|
||||||
|
|
||||||
|
|
||||||
|
# Apply rotation if needed *after* drawing the line
|
||||||
|
if (self.current_rotation + self.file_rotation + self.global_rotation) % 360 != 0:
|
||||||
|
img = img.rotate(180, expand=True)
|
||||||
|
|
||||||
|
# --- Draw Line and Rotate ---
|
||||||
|
draw = ImageDraw.Draw(img)
|
||||||
|
# The line position is scaled by the same zoom factor
|
||||||
|
line_x_scaled = self.current_line_x * self.current_zoom
|
||||||
|
draw.line([(line_x_scaled, 0), (line_x_scaled, pix.height)], fill="red", width=3)
|
||||||
|
|
||||||
|
|
||||||
|
# --- Display on Canvas ---
|
||||||
|
self.photo_img = ImageTk.PhotoImage(img)
|
||||||
|
self.canvas.delete("all")
|
||||||
|
# Center the image on the canvas
|
||||||
|
self.canvas.create_image(canvas_width / 2, canvas_height / 2, anchor="center",
|
||||||
|
image=self.photo_img)
|
||||||
|
|
||||||
|
def move_line_left(self, event=None):
|
||||||
|
"""Moves the split line to the left."""
|
||||||
|
self.current_line_x = max(0, self.current_line_x - CM_TO_POINTS / 2)
|
||||||
|
self.load_page()
|
||||||
|
|
||||||
|
def move_line_right(self, event=None):
|
||||||
|
"""Moves the split line to the right."""
|
||||||
|
page = self.doc.load_page(self.current_page_index)
|
||||||
|
self.current_line_x = min(page.rect.width, self.current_line_x + CM_TO_POINTS / 2)
|
||||||
|
self.load_page()
|
||||||
|
|
||||||
|
def rotate_page(self, event=None):
|
||||||
|
"""Toggles the page rotation between 0 and 180 degrees."""
|
||||||
|
self.current_rotation = 180 if self.current_rotation == 0 else 0
|
||||||
|
self.load_page()
|
||||||
|
|
||||||
|
def rotate_all_pages(self, event=None):
|
||||||
|
"""Toggles the page rotation between 0 and 180 degrees."""
|
||||||
|
self.file_rotation = 180 if self.file_rotation == 0 else 0
|
||||||
|
self.load_page()
|
||||||
|
|
||||||
|
def rotate_all_files(self, event=None):
|
||||||
|
"""Toggles the page rotation between 0 and 180 degrees."""
|
||||||
|
self.global_rotation = 180 if self.global_rotation == 0 else 0
|
||||||
|
self.load_page()
|
||||||
|
|
||||||
|
def keep_left(self, event=None):
|
||||||
|
self.confirm_and_next_page(keep="left")
|
||||||
|
def keep_right(self, event=None):
|
||||||
|
self.confirm_and_next_page(keep="right")
|
||||||
|
def discard_page(self, event=None):
|
||||||
|
self.confirm_and_next_page(keep="none")
|
||||||
|
def send_page_end(self, event=None):
|
||||||
|
# Do nothing if we are already at or past the last page
|
||||||
|
if self.current_page_index >= len(self.doc) - 1:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Move the current page to the end of the document
|
||||||
|
# -1 as the destination puts it after the last page
|
||||||
|
self.doc.move_page(self.current_page_index, -1)
|
||||||
|
|
||||||
|
# Initialize settings for the page that shifted into the current slot
|
||||||
|
self._initialize_current_page_settings()
|
||||||
|
|
||||||
|
# Reload the canvas to show the new page
|
||||||
|
self.load_page()
|
||||||
|
|
||||||
|
def confirm_and_next_page(self, event=None, keep="both"):
|
||||||
|
"""Saves the settings for the current page and moves to the next."""
|
||||||
|
self.page_settings.append({
|
||||||
|
"line_x": self.current_line_x,
|
||||||
|
"rotation": self.current_rotation,
|
||||||
|
"keep": keep
|
||||||
|
})
|
||||||
|
|
||||||
|
self.current_page_index += 1
|
||||||
|
|
||||||
|
if self.current_page_index < len(self.doc):
|
||||||
|
self._initialize_current_page_settings()
|
||||||
|
self.load_page()
|
||||||
|
else:
|
||||||
|
self.finish_and_process()
|
||||||
|
if self.setup_next_file():
|
||||||
|
self._initialize_current_page_settings()
|
||||||
|
self.load_page()
|
||||||
|
else:
|
||||||
|
self.master.destroy()
|
||||||
|
|
||||||
|
def finish_and_process(self):
|
||||||
|
"""Starts the PDF splitting process."""
|
||||||
|
self.split_pdf()
|
||||||
|
self.reorder_pdfs()
|
||||||
|
self.concate_files()
|
||||||
|
self.remove_dirs()
|
||||||
|
|
||||||
|
def split_filename_left(self, i):
|
||||||
|
return os.path.join(self.split_dir, f"{self.base_name}_{i+1}l.pdf")
|
||||||
|
def split_filename_right(self, i):
|
||||||
|
return os.path.join(self.split_dir, f"{self.base_name}_{i+1}r.pdf")
|
||||||
|
def reorder_filename(self, i):
|
||||||
|
return os.path.join(self.reorder_dir, f"{self.base_name}_{i+1}.pdf")
|
||||||
|
|
||||||
|
def clean_up_dir(self, dir, make=True):
|
||||||
|
if make:
|
||||||
|
os.makedirs(dir, exist_ok=True)
|
||||||
|
pdf_files = glob.glob(os.path.join(dir, "*.pdf"))
|
||||||
|
for pdf in pdf_files:
|
||||||
|
try:
|
||||||
|
os.remove(pdf)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error deleting {pdf}: {e}")
|
||||||
|
|
||||||
|
def remove_dirs(self):
|
||||||
|
shutil.rmtree(self.split_dir)
|
||||||
|
shutil.rmtree(self.reorder_dir)
|
||||||
|
|
||||||
|
def split_pdf(self):
|
||||||
|
"""Splits each page of the PDF according to the saved settings."""
|
||||||
|
print("Starting PDF processing...")
|
||||||
|
self.clean_up_dir(self.split_dir)
|
||||||
|
|
||||||
|
for i, settings in enumerate(self.page_settings):
|
||||||
|
page = self.doc.load_page(i)
|
||||||
|
line_x = settings['line_x']
|
||||||
|
rotation_settings = settings['rotation']
|
||||||
|
keep = settings['keep']
|
||||||
|
rotation = (page.rotation + rotation_settings +
|
||||||
|
self.file_rotation + self.global_rotation) % 360
|
||||||
|
|
||||||
|
# --- Create Left Part ---
|
||||||
|
if rotation == 0:
|
||||||
|
rect_left = fitz.Rect(0, 0, line_x, page.rect.height)
|
||||||
|
else:
|
||||||
|
rect_left = fitz.Rect(page.rect.width-line_x, 0, page.rect.width, page.rect.height)
|
||||||
|
doc_left = fitz.open()
|
||||||
|
page_left = doc_left.new_page(width=rect_left.width, height=rect_left.height)
|
||||||
|
page_left.show_pdf_page(page_left.rect, self.doc, i, clip=rect_left)
|
||||||
|
page_left.set_rotation(rotation)
|
||||||
|
|
||||||
|
if keep == "both" or keep == "left":
|
||||||
|
output_path_left = self.split_filename_left(i)
|
||||||
|
doc_left.save(output_path_left)
|
||||||
|
doc_left.close()
|
||||||
|
|
||||||
|
# --- Create Right Part ---
|
||||||
|
if rotation == 0:
|
||||||
|
rect_right = fitz.Rect(line_x, 0, page.rect.width, page.rect.height)
|
||||||
|
else:
|
||||||
|
rect_right = fitz.Rect(0, 0, page.rect.width-line_x, page.rect.height)
|
||||||
|
doc_right = fitz.open()
|
||||||
|
page_right = doc_right.new_page(width=rect_right.width, height=rect_right.height)
|
||||||
|
page_right.show_pdf_page(page_right.rect, self.doc, i, clip=rect_right)
|
||||||
|
page_right.set_rotation(rotation)
|
||||||
|
|
||||||
|
if keep == "both" or keep == "right":
|
||||||
|
output_path_right = self.split_filename_right(i)
|
||||||
|
doc_right.save(output_path_right)
|
||||||
|
doc_right.close()
|
||||||
|
|
||||||
|
self.doc.close()
|
||||||
|
print(f"\nProcessing complete. Files are in '{self.split_dir}' directory.")
|
||||||
|
|
||||||
|
def reorder_pdfs(self):
|
||||||
|
"""Reordonne les pages, si ce sont des copies doubles."""
|
||||||
|
self.clean_up_dir(self.reorder_dir)
|
||||||
|
ps = self.page_settings
|
||||||
|
ri = 0
|
||||||
|
i = 0
|
||||||
|
while i < len(ps):
|
||||||
|
# Si c'est une copie double
|
||||||
|
if (ps[i]['keep'] == "both" or ps[i]['keep'] == "right") \
|
||||||
|
and i < len(ps)-1 and (ps[i+1]['keep'] != "right"):
|
||||||
|
shutil.copy2(self.split_filename_right(i), self.reorder_filename(ri))
|
||||||
|
ri += 1
|
||||||
|
if ps[i+1]['keep'] != "none":
|
||||||
|
shutil.copy2(self.split_filename_left(i+1), self.reorder_filename(ri))
|
||||||
|
ri += 1
|
||||||
|
if ps[i+1]['keep'] != "left":
|
||||||
|
shutil.copy2(self.split_filename_right(i+1), self.reorder_filename(ri))
|
||||||
|
ri += 1
|
||||||
|
if ps[i]['keep'] == "both":
|
||||||
|
shutil.copy2(self.split_filename_left(i), self.reorder_filename(ri))
|
||||||
|
ri += 1
|
||||||
|
i += 2
|
||||||
|
else:
|
||||||
|
psk = ps[i]['keep']
|
||||||
|
if psk == "left" or psk == "both":
|
||||||
|
shutil.copy2(self.split_filename_left(i), self.reorder_filename(ri))
|
||||||
|
ri += 1
|
||||||
|
if psk == "right" or psk == "both":
|
||||||
|
shutil.copy2(self.split_filename_right(i), self.reorder_filename(ri))
|
||||||
|
ri += 1
|
||||||
|
i += 1
|
||||||
|
|
||||||
|
def concate_files(self):
|
||||||
|
writer = PdfWriter()
|
||||||
|
|
||||||
|
pdf_files = sorted(glob.glob(os.path.join(self.reorder_dir, "*.pdf")))
|
||||||
|
|
||||||
|
for pdf in pdf_files:
|
||||||
|
reader = PdfReader(pdf)
|
||||||
|
for page in reader.pages:
|
||||||
|
writer.add_page(page)
|
||||||
|
|
||||||
|
if self.output_dir != None:
|
||||||
|
os.makedirs(os.path.dirname(self.final_file), exist_ok=True)
|
||||||
|
with open(self.final_file, "wb") as f:
|
||||||
|
writer.write(f)
|
||||||
|
print(f"Created merged PDF: {self.final_file}")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
if len(sys.argv) != 2:
|
||||||
|
print("Usage: python script_name.py <path_to_pdf_file>")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
pdf_file_path = sys.argv[1]
|
||||||
|
|
||||||
|
root = tk.Tk()
|
||||||
|
app = PDFPreviewer(root, pdf_file_path)
|
||||||
|
root.mainloop()
|
||||||
|
|
@ -0,0 +1,272 @@
|
||||||
|
import sys
|
||||||
|
import json
|
||||||
|
import threading
|
||||||
|
import queue
|
||||||
|
import subprocess
|
||||||
|
import tkinter as tk
|
||||||
|
from pathlib import Path
|
||||||
|
from PIL import Image, ImageDraw, ImageFont, ImageTk
|
||||||
|
from pypdf import PdfReader
|
||||||
|
|
||||||
|
# --- Configuration & Globals ---
|
||||||
|
padding = 60 # White margin to the right
|
||||||
|
|
||||||
|
|
||||||
|
image_queue = queue.Queue(maxsize=5) # Buffer a few images ahead
|
||||||
|
try:
|
||||||
|
font = ImageFont.truetype("DejaVuSans.ttf", size=30)
|
||||||
|
except OSError:
|
||||||
|
font = ImageFont.load_default()
|
||||||
|
|
||||||
|
# --- Processing Logic (Worker Thread) ---
|
||||||
|
|
||||||
|
def page_number(b, nb_pages):
|
||||||
|
column_width = 1000 // nb_pages
|
||||||
|
center_x = (b[1] + b[3]) // 2
|
||||||
|
return center_x // column_width
|
||||||
|
|
||||||
|
def prepare_image(image_path: str, bounding_boxes, all_labels, nb_pages):
|
||||||
|
"""
|
||||||
|
Draws boxes on the image and returns the PIL Image object.
|
||||||
|
Does NOT display it.
|
||||||
|
"""
|
||||||
|
im = Image.open(image_path)
|
||||||
|
# Ensure image is loaded so we can pass it between threads safely
|
||||||
|
im.load()
|
||||||
|
|
||||||
|
width, height = im.size
|
||||||
|
|
||||||
|
# Add white padding to the right
|
||||||
|
new_im = Image.new(im.mode, (width + padding, height), "white")
|
||||||
|
new_im.paste(im, (0, 0))
|
||||||
|
|
||||||
|
draw = ImageDraw.Draw(new_im)
|
||||||
|
|
||||||
|
bounding_boxes.sort(key=lambda b: (page_number(b["box_2d"], nb_pages), b["box_2d"][0]))
|
||||||
|
|
||||||
|
last_label_index = -1
|
||||||
|
|
||||||
|
for bbox in bounding_boxes:
|
||||||
|
raw_y_min = int(bbox["box_2d"][0] * height / 1000)
|
||||||
|
raw_x_min = int(bbox["box_2d"][1] * width / 1000)
|
||||||
|
raw_y_max = int(bbox["box_2d"][2] * height / 1000)
|
||||||
|
raw_x_max = int(bbox["box_2d"][3] * width / 1000)
|
||||||
|
|
||||||
|
abs_y_min = max(0, raw_y_min - 10)
|
||||||
|
abs_x_min = max(0, raw_x_min - 10)
|
||||||
|
abs_y_max = min(height, raw_y_max + 10)
|
||||||
|
abs_x_max = min(width, raw_x_max + 10)
|
||||||
|
|
||||||
|
color = "black"
|
||||||
|
label = bbox.get("label")
|
||||||
|
|
||||||
|
if label and label in all_labels:
|
||||||
|
current_index = all_labels.index(label)
|
||||||
|
if current_index < last_label_index:
|
||||||
|
color = "red"
|
||||||
|
last_label_index = current_index
|
||||||
|
|
||||||
|
draw.rectangle(
|
||||||
|
((abs_x_min, abs_y_min), (abs_x_max, abs_y_max)),
|
||||||
|
outline=color,
|
||||||
|
width=4,
|
||||||
|
)
|
||||||
|
if label:
|
||||||
|
# draw.text((abs_x_min + 8, abs_y_min + 6), label, fill=color, font=font)
|
||||||
|
if abs_y_min > 80:
|
||||||
|
draw.text((abs_x_min + 8, abs_y_min - 30), label, fill=color, font=font)
|
||||||
|
else:
|
||||||
|
draw.text((abs_x_min + 8, abs_y_max + 6), label, fill=color, font=font)
|
||||||
|
|
||||||
|
return new_im
|
||||||
|
|
||||||
|
def worker_thread(base_dir, files_to_process, all_labels):
|
||||||
|
"""
|
||||||
|
Iterates through files, processes them, and puts them in the queue.
|
||||||
|
"""
|
||||||
|
for img_path in files_to_process:
|
||||||
|
json_path = base_dir / f"{img_path.stem}.json"
|
||||||
|
pdf_path = base_dir / f"{img_path.stem}.pdf"
|
||||||
|
|
||||||
|
nb_pages = 1
|
||||||
|
if pdf_path.exists():
|
||||||
|
try:
|
||||||
|
nb_pages = len(PdfReader(pdf_path).pages)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
if json_path.exists():
|
||||||
|
try:
|
||||||
|
with open(json_path, 'r') as f:
|
||||||
|
json_result = json.load(f)
|
||||||
|
|
||||||
|
bb_list = json_result.get("list", [])
|
||||||
|
print(f"Processing {img_path.name}...")
|
||||||
|
|
||||||
|
# Draw boxes
|
||||||
|
pil_image = prepare_image(str(img_path), bb_list, all_labels, nb_pages)
|
||||||
|
|
||||||
|
# Block if queue is full (waiting for user to view)
|
||||||
|
image_queue.put((pil_image, json_path))
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error processing {img_path.name}: {e}")
|
||||||
|
|
||||||
|
# Sentinel to indicate finished
|
||||||
|
image_queue.put((None, None))
|
||||||
|
|
||||||
|
# --- GUI Logic (Main Thread) ---
|
||||||
|
|
||||||
|
class ImageViewer:
|
||||||
|
def __init__(self, root, base_dir):
|
||||||
|
self.root = root
|
||||||
|
self.base_dir = base_dir
|
||||||
|
self.root.title("Bounding Box Viewer")
|
||||||
|
|
||||||
|
# UI Elements
|
||||||
|
self.label = tk.Label(root, text="Waiting for images...")
|
||||||
|
self.label.pack(expand=True, fill="both")
|
||||||
|
|
||||||
|
# State
|
||||||
|
self.current_image = None
|
||||||
|
self.current_json_path = None
|
||||||
|
self.is_viewing = False
|
||||||
|
self.scale_factor = 1.0 # To track resizing
|
||||||
|
self.orig_size = (1, 1) # To track original dimensions
|
||||||
|
|
||||||
|
# Input Bindings
|
||||||
|
self.root.bind('<Return>', self.on_enter)
|
||||||
|
self.root.bind('e', self.on_edit)
|
||||||
|
self.root.bind('o', self.on_open_pdf) # <--- 3. Add Key Binding
|
||||||
|
self.root.bind('<Escape>', lambda e: self.root.quit())
|
||||||
|
self.label.bind('<Button-1>', self.on_click) # Bind left mouse click
|
||||||
|
|
||||||
|
# Start polling queue
|
||||||
|
self.poll_queue()
|
||||||
|
|
||||||
|
def poll_queue(self):
|
||||||
|
if not self.is_viewing:
|
||||||
|
try:
|
||||||
|
pil_image, json_path = image_queue.get_nowait()
|
||||||
|
|
||||||
|
if pil_image is None:
|
||||||
|
print("All images processed.")
|
||||||
|
self.root.quit() # Stop the program
|
||||||
|
return
|
||||||
|
|
||||||
|
self.display_image(pil_image, json_path)
|
||||||
|
except queue.Empty:
|
||||||
|
pass
|
||||||
|
self.root.after(100, self.poll_queue)
|
||||||
|
|
||||||
|
def on_open_pdf(self, event):
|
||||||
|
if self.is_viewing and self.current_json_path:
|
||||||
|
# Replace .json extension with .pdf
|
||||||
|
pdf_path = self.current_json_path.with_suffix(".pdf")
|
||||||
|
|
||||||
|
print(f"Opening {pdf_path}")
|
||||||
|
# Use subprocess to run xdg-open without blocking
|
||||||
|
subprocess.Popen(['xdg-open', str(pdf_path)])
|
||||||
|
|
||||||
|
def display_image(self, pil_image, json_path):
|
||||||
|
self.orig_size = pil_image.size
|
||||||
|
self.scale_factor = 1.0
|
||||||
|
|
||||||
|
# Resize if too large for screen
|
||||||
|
screen_h = self.root.winfo_screenheight() - 100
|
||||||
|
if pil_image.height > screen_h:
|
||||||
|
self.scale_factor = screen_h / pil_image.height
|
||||||
|
pil_image = pil_image.resize((int(pil_image.width * self.scale_factor),
|
||||||
|
int(pil_image.height * self.scale_factor)))
|
||||||
|
|
||||||
|
self.tk_image = ImageTk.PhotoImage(pil_image)
|
||||||
|
self.label.config(image=self.tk_image, text="")
|
||||||
|
self.current_json_path = json_path
|
||||||
|
self.is_viewing = True
|
||||||
|
self.root.lift()
|
||||||
|
|
||||||
|
def on_enter(self, event):
|
||||||
|
if self.is_viewing:
|
||||||
|
print("Next...")
|
||||||
|
self.is_viewing = False
|
||||||
|
self.label.config(image="", text="Loading next...")
|
||||||
|
|
||||||
|
def on_edit(self, event):
|
||||||
|
if self.is_viewing and self.current_json_path:
|
||||||
|
print(f"Opening {self.current_json_path}")
|
||||||
|
subprocess.Popen(['xdg-open', str(self.current_json_path)])
|
||||||
|
|
||||||
|
def on_click(self, event):
|
||||||
|
if not self.is_viewing: return
|
||||||
|
|
||||||
|
# Map click to original image coordinates
|
||||||
|
x = int(event.x / self.scale_factor)
|
||||||
|
y = int(event.y / self.scale_factor)
|
||||||
|
w, h = self.orig_size
|
||||||
|
|
||||||
|
# Create 10px box (5px radius)
|
||||||
|
# Coordinate format: [y_min, x_min, y_max, x_max] (0-1000 scale)
|
||||||
|
box = [
|
||||||
|
int(max(0, y - 5) / h * 1000),
|
||||||
|
int(max(0, x - 5) / (w- padding) * 1000),
|
||||||
|
int(min(h, y + 5) / h * 1000),
|
||||||
|
int(min(w, x + 5) / (w - padding) * 1000),
|
||||||
|
]
|
||||||
|
|
||||||
|
box_str = "{ \"box_2d\": " + str(box) + ", \"label\": \"\" },"
|
||||||
|
print(f"Copied box at ({x},{y}): {box_str}")
|
||||||
|
|
||||||
|
self.root.clipboard_clear()
|
||||||
|
self.root.clipboard_append(box_str)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
if len(sys.argv) < 2:
|
||||||
|
print("Usage: python plotting_gui.py <directory_or_file>")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
input_path = Path(sys.argv[1])
|
||||||
|
files_to_process = []
|
||||||
|
|
||||||
|
if input_path.is_file():
|
||||||
|
# File mode
|
||||||
|
base_dir = input_path.parent
|
||||||
|
stem = input_path.stem
|
||||||
|
|
||||||
|
# Try to locate the image in Cutleft directory
|
||||||
|
img_path = base_dir / "Cutleft" / f"{stem}.jpg"
|
||||||
|
|
||||||
|
# Fallback: Check if user provided the jpg inside Cutleft directly
|
||||||
|
if not img_path.exists() and input_path.parent.name == "Cutleft" and input_path.suffix.lower() == ".jpg":
|
||||||
|
base_dir = input_path.parent.parent
|
||||||
|
img_path = input_path
|
||||||
|
|
||||||
|
if not img_path.exists():
|
||||||
|
print(f"Error: Could not find image at {img_path}")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
files_to_process = [img_path]
|
||||||
|
|
||||||
|
else:
|
||||||
|
# Directory mode
|
||||||
|
base_dir = input_path
|
||||||
|
cutleft_dir = base_dir / "Cutleft"
|
||||||
|
|
||||||
|
if not cutleft_dir.exists():
|
||||||
|
print(f"Error: {cutleft_dir} does not exist.")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
files_to_process = sorted(cutleft_dir.glob("*.jpg"))
|
||||||
|
|
||||||
|
try:
|
||||||
|
all_labels = list(filter(None, (base_dir / "labels").read_text().splitlines()))
|
||||||
|
except FileNotFoundError:
|
||||||
|
all_labels = []
|
||||||
|
|
||||||
|
# Start Processing Thread
|
||||||
|
t = threading.Thread(target=worker_thread, args=(base_dir, files_to_process, all_labels))
|
||||||
|
t.daemon = True # Kill thread if main app closes
|
||||||
|
t.start()
|
||||||
|
|
||||||
|
# Start GUI
|
||||||
|
root = tk.Tk()
|
||||||
|
app = ImageViewer(root, base_dir)
|
||||||
|
root.mainloop()
|
||||||
|
|
@ -0,0 +1,20 @@
|
||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
# Ensure a directory is provided
|
||||||
|
if [ ! -d "$1" ]; then
|
||||||
|
echo "Usage: $0 <directory_path>"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Go to the directory
|
||||||
|
cd "$1" || exit
|
||||||
|
|
||||||
|
count=1
|
||||||
|
for file in *.pdf; do
|
||||||
|
# Handle case where no pdfs exist
|
||||||
|
[ -e "$file" ] || continue
|
||||||
|
|
||||||
|
# Rename with 0-padding (e.g., Copie01.pdf)
|
||||||
|
mv -- "$file" "$(printf "Copie%02d.pdf" "$count")"
|
||||||
|
((count++))
|
||||||
|
done
|
||||||
|
|
@ -0,0 +1,25 @@
|
||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
# Check if an argument is provided
|
||||||
|
if [ -z "$1" ]; then
|
||||||
|
echo "Usage: $0 <directory>"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Try to change into the directory, exit on failure
|
||||||
|
cd "$1" || { echo "Error: Cannot access directory '$1'"; exit 1; }
|
||||||
|
|
||||||
|
# Enable nullglob: if no pdfs exist, the loop won't run once with "*.pdf"
|
||||||
|
shopt -s nullglob
|
||||||
|
|
||||||
|
for file in *.pdf; do
|
||||||
|
# Rotate to a temporary file
|
||||||
|
if qpdf --rotate=+180 "$file" "temp_rotated.pdf"; then
|
||||||
|
mv "temp_rotated.pdf" "$file"
|
||||||
|
echo "Rotated: $file"
|
||||||
|
else
|
||||||
|
echo "Error processing: $file"
|
||||||
|
# Clean up temp file if pdftk failed but created garbage
|
||||||
|
[ -f "temp_rotated.pdf" ] && rm "temp_rotated.pdf"
|
||||||
|
fi
|
||||||
|
done
|
||||||
|
|
@ -0,0 +1,202 @@
|
||||||
|
import fitz # PyMuPDF
|
||||||
|
from pypdf import PdfWriter
|
||||||
|
from pypdf import PdfReader
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import json
|
||||||
|
import shutil
|
||||||
|
from pathlib import Path
|
||||||
|
from collections import defaultdict # Added for grouping
|
||||||
|
|
||||||
|
# input_pdf = "Une Interro/Split.pdf"
|
||||||
|
|
||||||
|
def decode_json(pdf_file):
|
||||||
|
file_path = Path(pdf_file)
|
||||||
|
# Load JSON content from File.json
|
||||||
|
with open(file_path.with_suffix(".json"), "r") as f:
|
||||||
|
json_result = json.load(f)
|
||||||
|
|
||||||
|
# Get number of pages from File.pdf
|
||||||
|
nb_pages = len(PdfReader(file_path).pages)
|
||||||
|
|
||||||
|
bb_list = json_result["list"]
|
||||||
|
name = json_result["name"]
|
||||||
|
column_width = 1000 // nb_pages
|
||||||
|
|
||||||
|
def page_number(b):
|
||||||
|
return ((b[1] + b[3]) // 2) // column_width
|
||||||
|
|
||||||
|
result = [] # contient la page, et, en millième, au dessus du label
|
||||||
|
# (marge en plus), et au dessus du label (marge en moins)
|
||||||
|
for d in bb_list:
|
||||||
|
(b, label) = d["box_2d"], d["label"]
|
||||||
|
# print(b)
|
||||||
|
pn = page_number(b)
|
||||||
|
# 38 carreaux dans une page
|
||||||
|
carreau = 1000 // 38
|
||||||
|
# result.append((label, pn, b[2] - 3 * carreau, b[2] + int(carreau)))
|
||||||
|
result.append((label, pn, b[0] - int(carreau), b[2]-int(carreau)))
|
||||||
|
result.sort(key=lambda x: (x[1], x[2]))
|
||||||
|
return (name, result)
|
||||||
|
|
||||||
|
|
||||||
|
def split_an_interro(base_dir,input_pdf, coords_list):
|
||||||
|
doc = fitz.open(input_pdf)
|
||||||
|
|
||||||
|
output_dir = base_dir / input_pdf.stem
|
||||||
|
generated_files = set()
|
||||||
|
|
||||||
|
# Dictionary to collect parts for each label
|
||||||
|
parts_by_label = defaultdict(list)
|
||||||
|
|
||||||
|
# Filter coords_list to remove consecutive duplicate labels.
|
||||||
|
# If a label appears at the end of a page and again at the start of the next,
|
||||||
|
# we want to treat it as one continuous block, not two separate cuts.
|
||||||
|
filtered_coords = []
|
||||||
|
if coords_list:
|
||||||
|
filtered_coords.append(coords_list[0])
|
||||||
|
for item in coords_list[1:]:
|
||||||
|
# item[0] is the label/title
|
||||||
|
if item[0] != filtered_coords[-1][0]:
|
||||||
|
filtered_coords.append(item)
|
||||||
|
coords_list = filtered_coords
|
||||||
|
|
||||||
|
def scale_coord(y, page):
|
||||||
|
"""Scale y from 0–1000 range to PDF points."""
|
||||||
|
page_height = page.rect.height
|
||||||
|
return (y / 1000) * page_height
|
||||||
|
|
||||||
|
def save_cropped_page(doc, page_num, y0, y1, out_path):
|
||||||
|
"""
|
||||||
|
Saves a cropped portion of a page as a new PDF,
|
||||||
|
correctly handling the original page's rotation.
|
||||||
|
"""
|
||||||
|
# print(f"Saving cropped_page with : {y0} and {y1}")
|
||||||
|
# Get the source page object
|
||||||
|
page = doc[page_num]
|
||||||
|
# print("Debug : ", page_num, y0, y1, output_dir)
|
||||||
|
# 1. Define the crop rectangle in the VISUAL (rotated) coordinate system.
|
||||||
|
# The page.rect gives unrotated dimensions, so we apply the transformation
|
||||||
|
# matrix to get the visual dimensions.
|
||||||
|
rotated_rect = page.rect * page.transformation_matrix
|
||||||
|
visual_crop_rect = fitz.Rect(rotated_rect.x0, y0, rotated_rect.x1, y1)
|
||||||
|
|
||||||
|
# 2. Transform this visual crop rectangle back into the UNROTATED system.
|
||||||
|
# The 'clip' argument for show_pdf_page requires unrotated coordinates.
|
||||||
|
# The derotation_matrix does this conversion for us.
|
||||||
|
unrotated_clip_rect = visual_crop_rect * page.derotation_matrix
|
||||||
|
|
||||||
|
# Create a new temporary document for the output
|
||||||
|
temp_doc = fitz.open()
|
||||||
|
|
||||||
|
# Create a new page with the dimensions of our visual crop
|
||||||
|
temp_page = temp_doc.new_page(
|
||||||
|
width=visual_crop_rect.width,
|
||||||
|
height=visual_crop_rect.height
|
||||||
|
)
|
||||||
|
|
||||||
|
# Display the cropped and de-rotated content on the new page
|
||||||
|
temp_page.show_pdf_page(
|
||||||
|
temp_page.rect, # Where to place the content on the new page (the whole page)
|
||||||
|
doc, # Source document
|
||||||
|
page_num,
|
||||||
|
rotate=-page.rotation, # Cancel the original page's rotation
|
||||||
|
clip=unrotated_clip_rect # The area to take from the source page
|
||||||
|
)
|
||||||
|
|
||||||
|
# Save the new one-page PDF and close the document
|
||||||
|
temp_doc.save(out_path)
|
||||||
|
temp_doc.close()
|
||||||
|
|
||||||
|
|
||||||
|
for idx, (title, page_nb, ymin, _) in enumerate(coords_list):
|
||||||
|
temp_parts = []
|
||||||
|
y_start = scale_coord(ymin, doc[page_nb])
|
||||||
|
|
||||||
|
if idx + 1 < len(coords_list):
|
||||||
|
_, next_page_nb, _, next_ymax = coords_list[idx + 1]
|
||||||
|
if next_page_nb == page_nb:
|
||||||
|
# Same page
|
||||||
|
y_end = scale_coord(next_ymax, doc[page_nb])
|
||||||
|
temp_path = f"_part_{idx}_0.pdf"
|
||||||
|
save_cropped_page(doc, page_nb, y_start, y_end, temp_path)
|
||||||
|
temp_parts.append(temp_path)
|
||||||
|
else:
|
||||||
|
# Current page part
|
||||||
|
temp_path1 = f"_part_{idx}_0.pdf"
|
||||||
|
save_cropped_page(doc, page_nb, y_start, doc[page_nb].rect.height, temp_path1)
|
||||||
|
temp_parts.append(temp_path1)
|
||||||
|
|
||||||
|
# Next page part
|
||||||
|
y_end_next = scale_coord(next_ymax, doc[next_page_nb])
|
||||||
|
temp_path2 = f"_part_{idx}_1.pdf"
|
||||||
|
if y_end_next >= 10:
|
||||||
|
save_cropped_page(doc, next_page_nb, 0, y_end_next, temp_path2)
|
||||||
|
temp_parts.append(temp_path2)
|
||||||
|
else:
|
||||||
|
# Last segment to end of page
|
||||||
|
temp_path = f"_part_{idx}_0.pdf"
|
||||||
|
save_cropped_page(doc, page_nb, y_start, doc[page_nb].rect.height, temp_path)
|
||||||
|
temp_parts.append(temp_path)
|
||||||
|
|
||||||
|
# Collect parts for this label instead of writing immediately
|
||||||
|
parts_by_label[title].extend(temp_parts)
|
||||||
|
|
||||||
|
output_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
# Process aggregated parts by label
|
||||||
|
for title, parts in parts_by_label.items():
|
||||||
|
merger = PdfWriter()
|
||||||
|
for part in parts:
|
||||||
|
merger.append(part)
|
||||||
|
|
||||||
|
filename = f"{title}.pdf"
|
||||||
|
merger.write(output_dir / filename)
|
||||||
|
merger.close()
|
||||||
|
generated_files.add(filename)
|
||||||
|
|
||||||
|
# Cleanup temporary files for this label
|
||||||
|
for part in parts:
|
||||||
|
if os.path.exists(part):
|
||||||
|
os.remove(part)
|
||||||
|
|
||||||
|
doc.close()
|
||||||
|
# --- Cleanup Logic ---
|
||||||
|
# Move files not generated in this run to 'Missing' folder
|
||||||
|
if output_dir.exists():
|
||||||
|
missing_dir = output_dir / "Missing"
|
||||||
|
for item in output_dir.iterdir():
|
||||||
|
if item.is_file() and item.name not in generated_files:
|
||||||
|
print(f"ALERT: File '{item.name}' in '{input_pdf.stem}' was not generated. Moving to {missing_dir}")
|
||||||
|
missing_dir.mkdir(exist_ok=True)
|
||||||
|
item.rename(missing_dir / item.name)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
if len(sys.argv) < 2:
|
||||||
|
print("Usage: python scrit.py <directory or pdf_file>")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
input_arg = Path(sys.argv[1])
|
||||||
|
|
||||||
|
if input_arg.is_file():
|
||||||
|
# If a single file is provided, process only that file.
|
||||||
|
# base_dir is assumed to be the directory containing the file.
|
||||||
|
base_dir = input_arg.parent
|
||||||
|
pdf_files = [input_arg]
|
||||||
|
elif input_arg.is_dir():
|
||||||
|
# If a directory is provided, process all PDFs inside.
|
||||||
|
base_dir = input_arg
|
||||||
|
pdf_files = sorted(base_dir.glob("*.pdf"))
|
||||||
|
else:
|
||||||
|
print(f"Error: {input_arg} is not a valid file or directory.")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
for pdf_path in pdf_files:
|
||||||
|
json_path = pdf_path.with_suffix(".json")
|
||||||
|
if json_path.exists():
|
||||||
|
(name, coords) = decode_json(pdf_path)
|
||||||
|
print("Decoded name : ", name)
|
||||||
|
split_an_interro(base_dir, pdf_path, coords)
|
||||||
|
else:
|
||||||
|
print(f"Warning: No JSON found for {pdf_path.name} at {json_path}")
|
||||||
Loading…
Reference in New Issue