259 lines
9.2 KiB
Python
259 lines
9.2 KiB
Python
import sys
|
|
import os
|
|
import json
|
|
import collections
|
|
import concurrent.futures
|
|
from pathlib import Path
|
|
from PIL import Image
|
|
|
|
import annotating
|
|
|
|
from utils import natural_key
|
|
from reading_annotations import detect_checks_and_notes, has_significant_notes
|
|
|
|
def apply_actions_and_regenerate_grouped(root_dir, data, student_id, actions, label_notes, all_labels):
|
|
"""
|
|
Modifies data based on actions, pastes label-specific note crops,
|
|
regenerates label images for consistency, saves dirty ones,
|
|
and generates Concat.jpg in the BGnot/Copie{id} directory.
|
|
Returns a string of accumulated log messages.
|
|
"""
|
|
logs = [f"\nProcessing compilation for: Copie{student_id}"]
|
|
output_dir = os.path.join(root_dir, "BGnot", f"Copie{student_id}")
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
score_path = os.path.join(output_dir, "score.json")
|
|
labels_data = data.get(student_id, {})
|
|
|
|
# --- 1. Apply Actions to Data (Update scores / Flags for deletion) ---
|
|
actions_by_label = collections.defaultdict(list)
|
|
for a in actions:
|
|
actions_by_label[a['label']].append(a)
|
|
|
|
dirty_labels = set()
|
|
|
|
for label, acts in actions_by_label.items():
|
|
if label not in labels_data: continue
|
|
|
|
content = labels_data[label]
|
|
result = content['result']
|
|
feedbacks = result.get('feedback', [])
|
|
|
|
# Helpers to find objects by index
|
|
global_fb = [f for f in feedbacks if not f.get('box_2d')]
|
|
local_fb = [f for f in feedbacks if f.get('box_2d')]
|
|
local_fb.sort(key=lambda x: x['box_2d'][0])
|
|
|
|
for act in acts:
|
|
if act['type'] == 'score':
|
|
result['score'] = act['value']
|
|
dirty_labels.add(label)
|
|
logs.append(f" > Updated score for {label} to {act['value']}")
|
|
|
|
elif act['type'] == 'clear_all':
|
|
for fb in feedbacks:
|
|
fb["to_delete"] = True
|
|
if fb.get("box_2d"):
|
|
fb["norectangle"] = True
|
|
dirty_labels.add(label)
|
|
logs.append(f" > Cleared all feedbacks in {label}")
|
|
|
|
elif act['type'] == 'del_global':
|
|
if act['index'] < len(global_fb):
|
|
global_fb[act['index']]["to_delete"] = True
|
|
dirty_labels.add(label)
|
|
logs.append(f" > Deleted global feedback in {label}")
|
|
|
|
elif act['type'] in ('del_local', 'del_local_rect'):
|
|
if act['index'] < len(local_fb):
|
|
target = local_fb[act['index']]
|
|
if act['type'] == 'del_local':
|
|
target["to_delete"] = True
|
|
logs.append(f" > Deleted local feedback in {label}")
|
|
else:
|
|
target["norectangle"] = True
|
|
logs.append(f" > Deleted rect in {label}")
|
|
dirty_labels.add(label)
|
|
|
|
# --- 2. Process Images (Regenerate & Concatenate) ---
|
|
concat_list = []
|
|
concat_list_F = []
|
|
d_notes = dict.fromkeys(all_labels, "")
|
|
|
|
# Iterate over all labels naturally to assemble a complete student profile
|
|
sorted_labels = sorted(labels_data.items(), key=lambda x: natural_key(x[0]))
|
|
|
|
for label, content in sorted_labels:
|
|
result = content['result']
|
|
d_notes[label] = str(result.get('score', 0))
|
|
|
|
pdf_path = os.path.join(root_dir, f"Copie{student_id}", f"{label}.pdf")
|
|
if not os.path.exists(pdf_path): continue
|
|
|
|
(base_img, _, _) = annotating.make_base_image(pdf_path)
|
|
|
|
# Compose uses the result object we modified in step 1
|
|
final_img, new_header_h = annotating.compose_label_image(
|
|
base_img, label, content['result'], content['coordinates'][0],
|
|
with_error=False
|
|
)
|
|
if final_img is None:
|
|
continue
|
|
|
|
# Overlay manual notes specific to this label
|
|
has_notes = False
|
|
if label in label_notes:
|
|
note_info = label_notes[label]
|
|
sub_note = note_info['img']
|
|
old_header_h = int(note_info['old_header_h'])
|
|
|
|
if has_significant_notes(sub_note):
|
|
has_notes = True
|
|
w, h = sub_note.size
|
|
|
|
# 1. Paste header ink at the top
|
|
if old_header_h > 0:
|
|
header_crop = sub_note.crop((0, 0, w, min(h, old_header_h)))
|
|
final_img.paste(header_crop, (0, 0), mask=header_crop)
|
|
|
|
# 2. Paste student-content ink at the new header height
|
|
if h > old_header_h:
|
|
body_crop = sub_note.crop((0, old_header_h, w, h))
|
|
final_img.paste(body_crop, (0, new_header_h), mask=body_crop)
|
|
|
|
# Save individual file if Modified (Dirty logic or visual notes)
|
|
if (label in dirty_labels) or has_notes:
|
|
save_path = os.path.join(output_dir, f"{label}.jpg")
|
|
final_img.save(save_path)
|
|
logs.append(f" Saved dirty image: {label}.jpg")
|
|
|
|
concat_list.append(final_img)
|
|
|
|
perfect_no_comment = True
|
|
if float(d_notes[label]) != 4.0:
|
|
perfect_no_comment = False
|
|
else:
|
|
if len(result.get('feedback', [])) != 0:
|
|
perfect_no_comment = False
|
|
if not perfect_no_comment:
|
|
concat_list_F.append(final_img)
|
|
|
|
# --- 3. Save Final Outputs ---
|
|
with open(score_path, "w") as f:
|
|
json.dump(d_notes, f, indent=4)
|
|
logs.append(f" Saved {score_path}")
|
|
|
|
if concat_list:
|
|
max_w = max(i.width for i in concat_list)
|
|
total_h = sum(i.height for i in concat_list)
|
|
full_img = Image.new("RGB", (max_w, total_h), "white")
|
|
|
|
y = 0
|
|
for img in concat_list:
|
|
full_img.paste(img, (0, y))
|
|
y += img.height
|
|
|
|
full_img.save(os.path.join(output_dir, "Concat.jpg"))
|
|
logs.append(f" Saved regenerated Concat.jpg")
|
|
|
|
if concat_list_F:
|
|
max_w = max(i.width for i in concat_list_F)
|
|
total_h = sum(i.height for i in concat_list_F)
|
|
full_img = Image.new("RGB", (max_w, total_h), "white")
|
|
|
|
y = 0
|
|
for img in concat_list_F:
|
|
full_img.paste(img, (0, y))
|
|
y += img.height
|
|
|
|
full_img.save(os.path.join(output_dir, "Concat_F.jpg"))
|
|
logs.append(f" Saved regenerated Concat_F.jpg")
|
|
|
|
return "\n".join(logs)
|
|
|
|
|
|
from utils import read_all_labels
|
|
|
|
if __name__ == "__main__":
|
|
if len(sys.argv) < 2:
|
|
print("Usage: python reading_grouped_annotations.py <Dir>")
|
|
sys.exit(1)
|
|
|
|
root_dir = sys.argv[1]
|
|
bgnot_dir = os.path.join(root_dir, "BGnot")
|
|
|
|
if not os.path.exists(bgnot_dir):
|
|
print(f"Directory {bgnot_dir} does not exist. Run annotating_by_label.py first.")
|
|
sys.exit(1)
|
|
|
|
try:
|
|
all_labels = read_all_labels(Path(root_dir))
|
|
except FileNotFoundError:
|
|
all_labels = []
|
|
|
|
# Load original data
|
|
original_data = annotating.make_dictionary(root_dir)
|
|
|
|
actions_by_student = collections.defaultdict(list)
|
|
notes_by_student = collections.defaultdict(dict)
|
|
|
|
# --- 1. Scan BGnot grouped directories and extract all checks & notes ---
|
|
for entry in os.listdir(bgnot_dir):
|
|
gdir = os.path.join(bgnot_dir, entry)
|
|
|
|
if not os.path.isdir(gdir) or entry.startswith("Copie"):
|
|
continue # Ignore files and already compiled student folders
|
|
|
|
print(f"\nScanning grouped annotations in {entry}")
|
|
actions, notes_img = detect_checks_and_notes(gdir)
|
|
|
|
bnote_path = os.path.join(gdir, "bnote.json")
|
|
if not os.path.exists(bnote_path) or notes_img is None:
|
|
continue
|
|
|
|
with open(bnote_path, "r") as f:
|
|
bnote_data = json.load(f)
|
|
|
|
# Route actions to specific students
|
|
for act in actions:
|
|
sid = str(act.get("student_id"))
|
|
if sid:
|
|
actions_by_student[sid].append(act)
|
|
|
|
# Route manual note crops to specific students and labels
|
|
for img_info in bnote_data.get("images", []):
|
|
sid = str(img_info.get("id"))
|
|
lbl = img_info.get("label")
|
|
hmin = img_info.get("hmin", 0)
|
|
hmax = img_info.get("hmax", 0)
|
|
|
|
if hmax > hmin:
|
|
crop = notes_img.crop((0, hmin, notes_img.width, hmax))
|
|
# Store it if there are pen marks on it
|
|
if has_significant_notes(crop):
|
|
notes_by_student[sid][lbl] = {
|
|
'img': crop,
|
|
'old_header_h': img_info.get("header_height", 0)
|
|
}
|
|
|
|
def process_student(sid):
|
|
if sid not in original_data:
|
|
return ""
|
|
return apply_actions_and_regenerate_grouped(
|
|
root_dir,
|
|
original_data,
|
|
sid,
|
|
actions_by_student[sid],
|
|
notes_by_student[sid],
|
|
all_labels
|
|
)
|
|
|
|
# --- 2. Process each student concurrently using 4 threads ---
|
|
sids = sorted(original_data.keys(), key=natural_key)
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
|
|
futures = {executor.submit(process_student, sid): sid for sid in sids}
|
|
for future in concurrent.futures.as_completed(futures):
|
|
output = future.result()
|
|
if output:
|
|
print(output)
|