Copies/reading_grouped_annotations.py

334 lines
12 KiB
Python

import sys
import os
import json
import collections
import concurrent.futures
from pathlib import Path
from PIL import Image
import threading
import annotating
from utils import natural_key
from reading_annotations import detect_checks_and_notes, has_significant_notes
def apply_actions_and_regenerate_grouped(root_dir, data, student_id, actions, label_notes, all_labels):
"""
Modifies data based on actions, pastes label-specific note crops,
regenerates label images for consistency, saves dirty ones,
and generates Concat.jpg in the BGnot/Copie{id} directory.
Returns a string of accumulated log messages.
"""
logs = [f"\nProcessing compilation for: Copie{student_id}"]
output_dir = os.path.join(root_dir, "BGnot", f"Copie{student_id}")
os.makedirs(output_dir, exist_ok=True)
score_path = os.path.join(output_dir, "score.json")
labels_data = data.get(student_id, {})
# --- 1. Apply Actions to Data (Update scores / Flags for deletion) ---
actions_by_label = collections.defaultdict(list)
for a in actions:
actions_by_label[a['label']].append(a)
dirty_labels = set()
for label, acts in actions_by_label.items():
if label not in labels_data: continue
content = labels_data[label]
result = content['result']
feedbacks = result.get('feedback', [])
# Helpers to find objects by index
global_fb = [f for f in feedbacks if not f.get('box_2d')]
local_fb = [f for f in feedbacks if f.get('box_2d')]
local_fb.sort(key=lambda x: x['box_2d'][0])
for act in acts:
if act['type'] == 'score':
result['score'] = act['value']
dirty_labels.add(label)
logs.append(f" > Updated score for {label} to {act['value']}")
elif act['type'] == 'clear_all':
for fb in feedbacks:
fb["to_delete"] = True
if fb.get("box_2d"):
fb["norectangle"] = True
dirty_labels.add(label)
logs.append(f" > Cleared all feedbacks in {label}")
elif act['type'] == 'del_global':
if act['index'] < len(global_fb):
global_fb[act['index']]["to_delete"] = True
dirty_labels.add(label)
logs.append(f" > Deleted global feedback in {label}")
elif act['type'] in ('del_local', 'del_local_rect'):
if act['index'] < len(local_fb):
target = local_fb[act['index']]
if act['type'] == 'del_local':
target["to_delete"] = True
logs.append(f" > Deleted local feedback in {label}")
else:
target["norectangle"] = True
logs.append(f" > Deleted rect in {label}")
dirty_labels.add(label)
# --- 2. Process Images (Regenerate & Concatenate) ---
concat_list = []
concat_list_F = []
d_notes = dict.fromkeys(all_labels, "")
# Iterate over all labels naturally to assemble a complete student profile
sorted_labels = sorted(labels_data.items(), key=lambda x: natural_key(x[0]))
for label, content in sorted_labels:
result = content['result']
d_notes[label] = str(result.get('score', 0))
pdf_path = os.path.join(root_dir, f"Copie{student_id}", f"{label}.pdf")
if not os.path.exists(pdf_path): continue
(base_img, _, _) = annotating.make_base_image(pdf_path)
# Compose uses the result object we modified in step 1
final_img, new_header_h = annotating.compose_label_image(
base_img, label, content['result'], content['coordinates'][0],
with_error=False
)
if final_img is None:
continue
# Overlay manual notes specific to this label
has_notes = False
if label in label_notes:
note_info = label_notes[label]
sub_note = note_info['img']
old_header_h = int(note_info['old_header_h'])
if has_significant_notes(sub_note):
has_notes = True
w, h = sub_note.size
# 1. Paste header ink at the top
if old_header_h > 0:
header_crop = sub_note.crop((0, 0, w, min(h, old_header_h)))
final_img.paste(header_crop, (0, 0), mask=header_crop)
# 2. Paste student-content ink at the new header height
if h > old_header_h:
body_crop = sub_note.crop((0, old_header_h, w, h))
final_img.paste(body_crop, (0, new_header_h), mask=body_crop)
# Save individual file if Modified (Dirty logic or visual notes)
if (label in dirty_labels) or has_notes:
save_path = os.path.join(output_dir, f"{label}.jpg")
final_img.save(save_path)
logs.append(f" Saved dirty image: {label}.jpg")
concat_list.append(final_img)
perfect_no_comment = True
if float(d_notes[label]) != 4.0:
perfect_no_comment = False
else:
if len(result.get('feedback', [])) != 0:
perfect_no_comment = False
if not perfect_no_comment:
concat_list_F.append(final_img)
# --- 3. Save Final Outputs ---
with open(score_path, "w") as f:
json.dump(d_notes, f, indent=4)
logs.append(f" Saved {score_path}")
if concat_list:
max_w = max(i.width for i in concat_list)
total_h = sum(i.height for i in concat_list)
full_img = Image.new("RGB", (max_w, total_h), "white")
y = 0
for img in concat_list:
full_img.paste(img, (0, y))
y += img.height
full_img.save(os.path.join(output_dir, "Concat.jpg"))
logs.append(f" Saved regenerated Concat.jpg")
if concat_list_F:
max_w = max(i.width for i in concat_list_F)
total_h = sum(i.height for i in concat_list_F)
full_img = Image.new("RGB", (max_w, total_h), "white")
y = 0
for img in concat_list_F:
full_img.paste(img, (0, y))
y += img.height
full_img.save(os.path.join(output_dir, "Concat_F.jpg"))
logs.append(f" Saved regenerated Concat_F.jpg")
return "\n".join(logs)
from utils import read_all_labels
import argparse
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Read grouped annotations and compile PDFs")
parser.add_argument("input_path", help="Directory path")
parser.add_argument("--refaire", action="store_true", help="Merge refaire annotations from Bnot")
args = parser.parse_args()
root_dir = sys.argv[1]
bgnot_dir = os.path.join(root_dir, "BGnot")
if not os.path.exists(bgnot_dir):
print(f"Directory {bgnot_dir} does not exist. Run annotating_by_label.py first.")
sys.exit(1)
try:
all_labels = read_all_labels(Path(root_dir))
except FileNotFoundError:
all_labels = []
refaire_dict = {}
if args.refaire:
refaire_path = os.path.join(root_dir, "refaire.json")
if os.path.exists(refaire_path):
with open(refaire_path, "r", encoding="utf-8") as f:
refaire_list = json.load(f)
for c_name, labels in refaire_list:
sid = c_name.replace("Copie", "")
refaire_dict[sid] = labels
else:
print(f"Warning: --refaire flag used, but {refaire_path} not found.")
# Load original data
if args.refaire and refaire_list:
original_data = annotating.make_dictionary(root_dir,
refaire=True,
refaire_list=refaire_list)
else:
original_data = annotating.make_dictionary(root_dir)
lock = threading.Lock()
actions_by_student = collections.defaultdict(list)
notes_by_student = collections.defaultdict(dict)
def process_bgnot_entry(entry, only_ids=None):
gdir = os.path.join(bgnot_dir, entry)
if not os.path.isdir(gdir) or entry.startswith("Copie"):
return
bnote_path = os.path.join(gdir, "bnote.json")
with open(bnote_path, "r") as f:
bnote_data = json.load(f)
if only_ids:
id_found = False
for d in bnote_data["images"]:
if d["id"] in only_ids:
id_found = True
if not id_found:
return
actions, notes_img = detect_checks_and_notes(gdir)
if not os.path.exists(bnote_path) or notes_img is None:
return
with lock:
for act in actions:
sid = str(act.get("student_id"))
if sid: actions_by_student[sid].append(act)
for img_info in bnote_data.get("images", []):
sid, lbl = str(img_info.get("id")), img_info.get("label")
hmin, hmax = img_info.get("hmin", 0), img_info.get("hmax", 0)
if hmax > hmin:
crop = notes_img.crop((0, hmin, notes_img.width, hmax))
if has_significant_notes(crop):
notes_by_student[sid][lbl] = {'img': crop, 'old_header_h': img_info.get("header_height", 0)}
def process_refaire_entry(sid, r_labels):
s_bnot_dir = os.path.join(root_dir, "BRnot", f"Copie{sid}")
if not os.path.exists(s_bnot_dir): return
if not r_labels:
r_labels = list(original_data.get(sid, {}).keys())
with lock:
actions_by_student[sid] = [a for a in actions_by_student[sid]
if a.get('label') not in r_labels]
for lbl in r_labels:
notes_by_student[sid].pop(lbl, None)
b_actions, b_notes_img = detect_checks_and_notes(s_bnot_dir)
b_bnote_path = os.path.join(s_bnot_dir, "bnote.json")
if os.path.exists(b_bnote_path):
with open(b_bnote_path, "r") as f:
b_bnote_data = json.load(f)
with lock:
for act in b_actions:
act["student_id"] = sid
actions_by_student[sid].append(act)
if b_notes_img:
for img_info in b_bnote_data.get("images", []):
lbl = img_info.get("label")
hmin, hmax = img_info.get("hmin", 0), img_info.get("hmax", 0)
if hmax > hmin:
crop = b_notes_img.crop((0, hmin, b_notes_img.width, hmax))
if has_significant_notes(crop):
notes_by_student[sid][lbl] = {'img': crop, 'old_header_h': img_info.get("header_height", 0)}
# --- 0. Read refaire.json if requested ---
if refaire_dict:
only_ids = [ids for ids in refaire_dict]
else:
only_ids = None
# Lecture des bgnot
with concurrent.futures.ThreadPoolExecutor(max_workers=6) as executor:
executor.map(lambda x: process_bgnot_entry(x, only_ids=only_ids),
os.listdir(bgnot_dir))
# Refaire
if args.refaire and refaire_dict:
for sid, labels in refaire_dict.items():
process_refaire_entry(sid, labels)
def process_student(sid):
if sid not in original_data:
return ""
return apply_actions_and_regenerate_grouped(
root_dir,
original_data,
sid,
actions_by_student[sid],
notes_by_student[sid],
all_labels
)
# --- 2. Process each student concurrently using 4 threads ---
sids = sorted(original_data.keys(), key=natural_key)
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
if refaire_dict:
futures = {executor.submit(process_student, sid): sid for sid in refaire_dict}
else:
futures = {executor.submit(process_student, sid): sid for sid in sids}
for future in concurrent.futures.as_completed(futures):
output = future.result()
if output:
print(output)