| import numpy as np |
| from utils.motion_utils import ( |
| calc_avg_speed, |
| calc_motion_intensity, |
| calc_sudden_movements, |
| ) |
| from utils.interaction_utils import ( |
| get_box_center, |
| euclidean_distance, |
| relative_distance, |
| relative_keypoints, |
| ) |
|
|
|
|
| class InteractionAnalyzer: |
| """ |
| Analyze human motion and interactions between people based on poses and bounding boxes. |
| """ |
|
|
| def __init__(self): |
| |
| pass |
|
|
| def calculate_motion_features( |
| self, |
| prev_poses: list[list[list[float]]], |
| current_poses: list[list[list[float]]], |
| ) -> dict: |
| """ |
| Calculate motion features between consecutive frames. |
| |
| Args: |
| prev_poses: List of keypoints for all people in previous frame |
| current_poses: List of keypoints for all people in current frame |
| |
| Returns: |
| dict: { |
| "average_speed": float, |
| "motion_intensity": float, |
| "sudden_movements": int |
| } |
| """ |
| return { |
| "average_speed": calc_avg_speed(prev_poses, current_poses), |
| "motion_intensity": calc_motion_intensity(prev_poses, current_poses), |
| "sudden_movements": calc_sudden_movements(prev_poses, current_poses), |
| } |
|
|
| def calculate_interactions( |
| self, |
| person_boxes: list[list[float]], |
| current_poses: list[list[list[float]]], |
| tracked_persons: dict, |
| ) -> list[dict]: |
| """ |
| Calculate interactions between people based on bounding boxes and keypoints. |
| |
| Args: |
| person_boxes: List of bounding boxes [[x1,y1,x2,y2], ...] for each person |
| current_poses: List of keypoints for each person |
| tracked_persons: Dict mapping person_id -> last tracked box |
| |
| Returns: |
| List of dictionaries describing interactions between people |
| """ |
| interactions = [] |
|
|
| if len(person_boxes) < 2: |
| return interactions |
|
|
| for i in range(len(person_boxes)): |
| for j in range(i + 1, len(person_boxes)): |
| try: |
| |
| if i >= len(current_poses) or j >= len(current_poses): |
| continue |
|
|
| box1, box2 = person_boxes[i], person_boxes[j] |
| pose1, pose2 = current_poses[i], current_poses[j] |
|
|
| |
| id1, id2 = None, None |
| for pid, tracked_box in tracked_persons.items(): |
| if np.array_equal(box1, tracked_box): |
| id1 = pid |
| if np.array_equal(box2, tracked_box): |
| id2 = pid |
|
|
| if id1 is None or id2 is None: |
| continue |
|
|
| |
| interaction = { |
| "person1_idx": i, |
| "person2_idx": j, |
| "person1_id": id1, |
| "person2_id": id2, |
| "box1": box1, |
| "box2": box2, |
| "center1": get_box_center(box1), |
| "center2": get_box_center(box2), |
| "distance": euclidean_distance( |
| get_box_center(box1), get_box_center(box2) |
| ), |
| "relative_distance": relative_distance(box1, box2), |
| "keypoints": { |
| "person1": pose1, |
| "person2": pose2, |
| "relative": relative_keypoints(pose1, pose2), |
| }, |
| } |
| interactions.append(interaction) |
|
|
| except Exception as e: |
| print(f"Skipping interaction {i}-{j}: {e}") |
| continue |
|
|
| return interactions |
|
|