414 lines
15 KiB
Python
414 lines
15 KiB
Python
import sys
|
|
import os
|
|
import json
|
|
import collections
|
|
import concurrent.futures
|
|
from pathlib import Path
|
|
from PIL import Image, ImageDraw
|
|
import threading
|
|
|
|
import annotating
|
|
|
|
from utils import natural_key
|
|
from reading_annotations import detect_checks_and_notes, has_significant_notes
|
|
|
|
def get_extra_pdfs_as_images(root_dir, label, annotating_module):
|
|
"""Fetches Text and Sol pdfs for a given label and converts them to images."""
|
|
extra_images = []
|
|
for folder in ["Text", "Sol"]:
|
|
pdf_path = os.path.join(root_dir, folder, f"{label}.pdf")
|
|
if os.path.exists(pdf_path):
|
|
img, _, _ = annotating_module.make_base_image(pdf_path)
|
|
if img:
|
|
extra_images.append(img)
|
|
return extra_images
|
|
|
|
def save_paginated_pdf(image_groups, output_path):
|
|
"""Concatenates groups of images vertically, adding specific inner borders."""
|
|
if not image_groups:
|
|
return
|
|
|
|
max_w = max(img.width for group in image_groups for img in group)
|
|
max_page_h = int(max_w * 1.414 * 1.3)
|
|
|
|
# Calculate 0.2 cm in pixels at 100 DPI (0.2 / 2.54 inches * 100)
|
|
border_px = int((0.2 / 2.54) * 100)
|
|
|
|
pages = []
|
|
current_page_imgs = []
|
|
current_h = 0
|
|
|
|
for group in image_groups:
|
|
if not group:
|
|
continue
|
|
|
|
# Process the group to add borders
|
|
processed_group = []
|
|
for i, img in enumerate(group):
|
|
if i in (0, 1):
|
|
img = img.copy() # Do not modify the original image object in memory
|
|
draw = ImageDraw.Draw(img)
|
|
color = "black" if i == 0 else "blue"
|
|
|
|
# Draw the border inside the image edges
|
|
draw.rectangle(
|
|
[0, 0, img.width - 1, img.height - 1],
|
|
outline=color,
|
|
width=border_px
|
|
)
|
|
processed_group.append(img)
|
|
|
|
group_h = sum(img.height for img in processed_group)
|
|
|
|
if current_page_imgs and (current_h + group_h > max_page_h):
|
|
page = Image.new("RGB", (max_w, current_h), "white")
|
|
y = 0
|
|
for c_img in current_page_imgs:
|
|
page.paste(c_img, (0, y))
|
|
y += c_img.height
|
|
pages.append(page)
|
|
|
|
current_page_imgs = processed_group
|
|
current_h = group_h
|
|
else:
|
|
current_page_imgs.extend(processed_group)
|
|
current_h += group_h
|
|
|
|
if current_page_imgs:
|
|
page = Image.new("RGB", (max_w, current_h), "white")
|
|
y = 0
|
|
for c_img in current_page_imgs:
|
|
page.paste(c_img, (0, y))
|
|
y += c_img.height
|
|
pages.append(page)
|
|
|
|
if pages:
|
|
pages[0].save(output_path, "PDF", resolution=100.0, save_all=True, append_images=pages[1:])
|
|
|
|
def apply_actions_and_regenerate_grouped(root_dir, data, student_id, actions, label_notes, all_labels):
|
|
"""
|
|
Modifies data based on actions, pastes label-specific note crops,
|
|
regenerates label images for consistency, saves dirty ones,
|
|
and generates Concat.jpg in the BGnot/Copie{id} directory.
|
|
Returns a string of accumulated log messages.
|
|
"""
|
|
logs = [f"\nProcessing compilation for: Copie{student_id}"]
|
|
output_dir = os.path.join(root_dir, "BGnot", f"Copie{student_id}")
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
score_path = os.path.join(output_dir, "score.json")
|
|
labels_data = data.get(student_id, {})
|
|
|
|
# --- 1. Apply Actions to Data (Update scores / Flags for deletion) ---
|
|
actions_by_label = collections.defaultdict(list)
|
|
for a in actions:
|
|
actions_by_label[a['label']].append(a)
|
|
|
|
dirty_labels = set()
|
|
|
|
for label, acts in actions_by_label.items():
|
|
if label not in labels_data: continue
|
|
|
|
content = labels_data[label]
|
|
result = content['result']
|
|
feedbacks = result.get('feedback', [])
|
|
|
|
# Helpers to find objects by index
|
|
global_fb = [f for f in feedbacks if not f.get('box_2d')]
|
|
local_fb = [f for f in feedbacks if f.get('box_2d')]
|
|
local_fb.sort(key=lambda x: x['box_2d'][0])
|
|
|
|
for act in acts:
|
|
if act['type'] == 'score':
|
|
result['score'] = act['value']
|
|
dirty_labels.add(label)
|
|
logs.append(f" > Updated score for {label} to {act['value']}")
|
|
|
|
elif act['type'] == 'clear_all':
|
|
for fb in feedbacks:
|
|
fb["to_delete"] = True
|
|
if fb.get("box_2d"):
|
|
fb["norectangle"] = True
|
|
dirty_labels.add(label)
|
|
logs.append(f" > Cleared all feedbacks in {label}")
|
|
|
|
elif act['type'] == 'del_global':
|
|
if act['index'] < len(global_fb):
|
|
global_fb[act['index']]["to_delete"] = True
|
|
dirty_labels.add(label)
|
|
logs.append(f" > Deleted global feedback in {label}")
|
|
|
|
elif act['type'] in ('del_local', 'del_local_rect'):
|
|
if act['index'] < len(local_fb):
|
|
target = local_fb[act['index']]
|
|
if act['type'] == 'del_local':
|
|
target["to_delete"] = True
|
|
logs.append(f" > Deleted local feedback in {label}")
|
|
else:
|
|
target["norectangle"] = True
|
|
logs.append(f" > Deleted rect in {label}")
|
|
dirty_labels.add(label)
|
|
|
|
# --- 2. Process Images (Regenerate & Concatenate) ---
|
|
concat_list = []
|
|
concat_list_F = []
|
|
d_notes = dict.fromkeys(all_labels, "")
|
|
|
|
# Iterate over all labels naturally to assemble a complete student profile
|
|
sorted_labels = sorted(labels_data.items(), key=lambda x: natural_key(x[0]))
|
|
|
|
for label, content in sorted_labels:
|
|
result = content['result']
|
|
d_notes[label] = str(result.get('score', 0))
|
|
|
|
pdf_path = os.path.join(root_dir, f"Copie{student_id}", f"{label}.pdf")
|
|
if not os.path.exists(pdf_path): continue
|
|
|
|
(base_img, _, _) = annotating.make_base_image(pdf_path)
|
|
|
|
# Compose uses the result object we modified in step 1
|
|
final_img, new_header_h = annotating.compose_label_image(
|
|
base_img, label, content['result'], content['coordinates'][0],
|
|
with_error=False
|
|
)
|
|
if final_img is None:
|
|
continue
|
|
|
|
# Overlay manual notes specific to this label
|
|
has_notes = False
|
|
if label in label_notes:
|
|
note_info = label_notes[label]
|
|
sub_note = note_info['img']
|
|
old_header_h = int(note_info['old_header_h'])
|
|
|
|
if has_significant_notes(sub_note):
|
|
has_notes = True
|
|
w, h = sub_note.size
|
|
|
|
# 1. Paste header ink at the top
|
|
if old_header_h > 0:
|
|
header_crop = sub_note.crop((0, 0, w, min(h, old_header_h)))
|
|
final_img.paste(header_crop, (0, 0), mask=header_crop)
|
|
|
|
# 2. Paste student-content ink at the new header height
|
|
if h > old_header_h:
|
|
body_crop = sub_note.crop((0, old_header_h, w, h))
|
|
final_img.paste(body_crop, (0, new_header_h), mask=body_crop)
|
|
|
|
# Save individual file if Modified (Dirty logic or visual notes)
|
|
if (label in dirty_labels) or has_notes:
|
|
save_path = os.path.join(output_dir, f"{label}.jpg")
|
|
final_img.save(save_path)
|
|
logs.append(f" Saved dirty image: {label}.jpg")
|
|
|
|
concat_list.append(final_img)
|
|
|
|
perfect_no_comment = True
|
|
if float(d_notes[label]) != 4.0:
|
|
perfect_no_comment = False
|
|
else:
|
|
if len(result.get('feedback', [])) != 0:
|
|
perfect_no_comment = False
|
|
|
|
if not perfect_no_comment:
|
|
extras = get_extra_pdfs_as_images(root_dir, label, annotating)
|
|
extras.append(final_img)
|
|
concat_list_F.append(extras)
|
|
|
|
# --- 3. Save Final Outputs ---
|
|
with open(score_path, "w") as f:
|
|
json.dump(d_notes, f, indent=4)
|
|
logs.append(f" Saved {score_path}")
|
|
|
|
if concat_list:
|
|
max_w = max(i.width for i in concat_list)
|
|
total_h = sum(i.height for i in concat_list)
|
|
full_img = Image.new("RGB", (max_w, total_h), "white")
|
|
|
|
y = 0
|
|
for img in concat_list:
|
|
full_img.paste(img, (0, y))
|
|
y += img.height
|
|
|
|
full_img.save(os.path.join(output_dir, "Concat.jpg"))
|
|
logs.append(f" Saved regenerated Concat.jpg")
|
|
|
|
if concat_list_F:
|
|
pdf_out_path = os.path.join(output_dir, "Concat_F.pdf")
|
|
save_paginated_pdf(concat_list_F, pdf_out_path)
|
|
logs.append(f" Saved regenerated Concat_F.pdf")
|
|
|
|
# max_w = max(i.width for i in concat_list_F)
|
|
# total_h = sum(i.height for i in concat_list_F)
|
|
# full_img = Image.new("RGB", (max_w, total_h), "white")
|
|
|
|
# y = 0
|
|
# for img in concat_list_F:
|
|
# full_img.paste(img, (0, y))
|
|
# y += img.height
|
|
|
|
# full_img.save(os.path.join(output_dir, "Concat_F.jpg"))
|
|
# logs.append(f" Saved regenerated Concat_F.jpg")
|
|
|
|
return "\n".join(logs)
|
|
|
|
from utils import read_all_labels
|
|
import argparse
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Read grouped annotations and compile PDFs")
|
|
parser.add_argument("input_path", help="Directory path")
|
|
parser.add_argument("--refaire", action="store_true", help="Merge refaire annotations from Bnot")
|
|
args = parser.parse_args()
|
|
|
|
root_dir = sys.argv[1]
|
|
bgnot_dir = os.path.join(root_dir, "BGnot")
|
|
|
|
if not os.path.exists(bgnot_dir):
|
|
print(f"Directory {bgnot_dir} does not exist. Run annotating_by_label.py first.")
|
|
sys.exit(1)
|
|
|
|
try:
|
|
all_labels = read_all_labels(Path(root_dir))
|
|
except FileNotFoundError:
|
|
all_labels = []
|
|
|
|
refaire_dict = {}
|
|
if args.refaire:
|
|
refaire_path = os.path.join(root_dir, "refaire.json")
|
|
if os.path.exists(refaire_path):
|
|
with open(refaire_path, "r", encoding="utf-8") as f:
|
|
refaire_list = json.load(f)
|
|
for c_name, labels in refaire_list:
|
|
sid = c_name.replace("Copie", "")
|
|
refaire_dict[sid] = labels
|
|
else:
|
|
print(f"Warning: --refaire flag used, but {refaire_path} not found.")
|
|
|
|
|
|
# Load original data
|
|
if args.refaire and refaire_list:
|
|
original_data = annotating.make_dictionary(root_dir,
|
|
refaire=True,
|
|
refaire_list=refaire_list)
|
|
else:
|
|
original_data = annotating.make_dictionary(root_dir)
|
|
|
|
lock = threading.Lock()
|
|
actions_by_student = collections.defaultdict(list)
|
|
notes_by_student = collections.defaultdict(dict)
|
|
|
|
|
|
def process_bgnot_entry(entry, only_ids=None):
|
|
gdir = os.path.join(bgnot_dir, entry)
|
|
if not os.path.isdir(gdir) or entry.startswith("Copie"):
|
|
return
|
|
bnote_path = os.path.join(gdir, "bnote.json")
|
|
with open(bnote_path, "r") as f:
|
|
bnote_data = json.load(f)
|
|
|
|
if only_ids:
|
|
id_found = False
|
|
for d in bnote_data["images"]:
|
|
if d["id"] in only_ids:
|
|
id_found = True
|
|
if not id_found:
|
|
return
|
|
|
|
actions, notes_img = detect_checks_and_notes(gdir)
|
|
if not os.path.exists(bnote_path) or notes_img is None:
|
|
return
|
|
|
|
|
|
with lock:
|
|
for act in actions:
|
|
sid = str(act.get("student_id"))
|
|
if sid: actions_by_student[sid].append(act)
|
|
|
|
for img_info in bnote_data.get("images", []):
|
|
sid, lbl = str(img_info.get("id")), img_info.get("label")
|
|
hmin, hmax = img_info.get("hmin", 0), img_info.get("hmax", 0)
|
|
if hmax > hmin:
|
|
crop = notes_img.crop((0, hmin, notes_img.width, hmax))
|
|
if has_significant_notes(crop):
|
|
notes_by_student[sid][lbl] = {'img': crop, 'old_header_h': img_info.get("header_height", 0)}
|
|
|
|
|
|
def process_refaire_entry(sid, r_labels):
|
|
s_bnot_dir = os.path.join(root_dir, "BRnot", f"Copie{sid}")
|
|
if not os.path.exists(s_bnot_dir): return
|
|
if not r_labels:
|
|
r_labels = list(original_data.get(sid, {}).keys())
|
|
|
|
with lock:
|
|
actions_by_student[sid] = [a for a in actions_by_student[sid]
|
|
if a.get('label') not in r_labels]
|
|
for lbl in r_labels:
|
|
notes_by_student[sid].pop(lbl, None)
|
|
|
|
b_actions, b_notes_img = detect_checks_and_notes(s_bnot_dir)
|
|
b_bnote_path = os.path.join(s_bnot_dir, "bnote.json")
|
|
if os.path.exists(b_bnote_path):
|
|
with open(b_bnote_path, "r") as f:
|
|
b_bnote_data = json.load(f)
|
|
with lock:
|
|
for act in b_actions:
|
|
act["student_id"] = sid
|
|
actions_by_student[sid].append(act)
|
|
if b_notes_img:
|
|
for img_info in b_bnote_data.get("images", []):
|
|
lbl = img_info.get("label")
|
|
hmin, hmax = img_info.get("hmin", 0), img_info.get("hmax", 0)
|
|
if hmax > hmin:
|
|
crop = b_notes_img.crop((0, hmin, b_notes_img.width, hmax))
|
|
if has_significant_notes(crop):
|
|
notes_by_student[sid][lbl] = {'img': crop, 'old_header_h': img_info.get("header_height", 0)}
|
|
|
|
|
|
|
|
# --- 0. Read refaire.json if requested ---
|
|
|
|
if refaire_dict:
|
|
only_ids = [ids for ids in refaire_dict]
|
|
else:
|
|
only_ids = None
|
|
|
|
|
|
# Lecture des bgnot
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=6) as executor:
|
|
executor.map(lambda x: process_bgnot_entry(x, only_ids=only_ids),
|
|
os.listdir(bgnot_dir))
|
|
|
|
# Refaire
|
|
if args.refaire and refaire_dict:
|
|
for sid, labels in refaire_dict.items():
|
|
process_refaire_entry(sid, labels)
|
|
|
|
|
|
def process_student(sid):
|
|
if sid not in original_data:
|
|
return ""
|
|
return apply_actions_and_regenerate_grouped(
|
|
root_dir,
|
|
original_data,
|
|
sid,
|
|
actions_by_student[sid],
|
|
notes_by_student[sid],
|
|
all_labels
|
|
)
|
|
|
|
# --- 2. Process each student concurrently using 4 threads ---
|
|
sids = sorted(original_data.keys(), key=natural_key)
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
|
|
if refaire_dict:
|
|
futures = {executor.submit(process_student, sid): sid for sid in refaire_dict}
|
|
else:
|
|
futures = {executor.submit(process_student, sid): sid for sid in sids}
|
|
|
|
for future in concurrent.futures.as_completed(futures):
|
|
output = future.result()
|
|
if output:
|
|
print(output)
|