| import cv2 |
| import numpy as np |
| import gradio as gr |
| import time |
| from collections import deque |
| import matplotlib.pyplot as plt |
| from ultralytics import YOLO |
| import os |
|
|
| |
|
|
| def compare_images_optical_flow(img1, img2): |
| """ |
| Compares two images and returns a grayscale image of flow magnitude normalized to 0 - 1. |
| |
| Args: |
| |
| Returns: |
| A grayscale image of flow magnitude normalized to 0 - 1, or None if an error occurs. |
| """ |
| |
| gray1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY) |
| gray2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY) |
|
|
|
|
| |
| flow = cv2.calcOpticalFlowFarneback(gray1, gray2, None, 0.5, 3, 15, 3, 5, 1.2, 0) |
|
|
| |
| flow_magnitude = np.sqrt(flow[..., 0]**2 + flow[..., 1]**2) |
|
|
| |
| |
|
|
| |
| return flow_magnitude |
|
|
| model = YOLO("yolov8s-world.pt") |
| |
| CUSTOM_CLASSES = ["one bird", "one airplane", "one kite","a flying object","sky"] |
| model.set_classes(CUSTOM_CLASSES) |
|
|
| def detect_birds(image): |
| results = model(image, |
| conf = 0.1, |
| verbose=False, |
|
|
| ) |
| return results[0].plot() |
|
|
| optical_flow_runtime = [] |
| object_detection_runtime = [] |
| change_detection_runtime = [] |
| example_videos_folder = "./example_videos" |
|
|
| EXAMPLE_VIDEOS_LIST = os.listdir(example_videos_folder) |
| EXAMPLE_VIDEOS_LIST = [os.path.join(example_videos_folder, v) |
| for v in EXAMPLE_VIDEOS_LIST] |
|
|
| HEIGHT_STANDARD = 480 |
| WIDTH_STANDARD = 640 |
| frame_stack = deque(maxlen=2) |
| detection_stack = deque(maxlen=1) |
|
|
| fall_back_frame = np.zeros((256, 256, 3), dtype=np.uint8) + 127 |
| flow_magnitude_normalized = np.zeros((256, 256), dtype=np.uint8) |
| FLAGS = { |
| "OBJECT_DETECTING": False, |
| } |
| CAP = [] |
|
|
| |
| def compute_optical_flow(mean_norm = None): |
| global FLAGS, flow_magnitude_normalized, frame_stack |
| if mean_norm is None: |
| mean_norm = .4 |
| else: |
| mean_norm = float(mean_norm) |
| FLAGS["OBJECT_DETECTING"] = False |
| while True: |
| if (len(frame_stack) > 1) and not(FLAGS["OBJECT_DETECTING"]): |
|
|
| prev_frame, curr_frame = frame_stack |
| original_height, original_width = curr_frame.shape[:2] |
| start_time = time.time() |
| prev_frame_resized, curr_frame_resized = [ |
| cv2.resize( |
| frame, |
| (original_width // 4, original_height // 4) |
| ) for frame in [prev_frame, curr_frame] |
| ] |
| flow_magnitude = compare_images_optical_flow(prev_frame_resized, |
| curr_frame_resized) |
| end_time = time.time() |
| optical_flow_runtime.append(end_time - start_time) |
| |
| flow_magnitude_normalized = cv2.normalize(flow_magnitude, None, 0, 1, cv2.NORM_MINMAX, cv2.CV_32F) |
| flow_magnitude_normalized = cv2.resize( |
| flow_magnitude_normalized, |
| (original_width, original_height) |
| ) |
| yield flow_magnitude_normalized |
| |
| if flow_magnitude_normalized.mean() < mean_norm: |
| detection_stack.append((curr_frame,prev_frame, flow_magnitude_normalized)) |
| else: |
| yield np.stack((flow_magnitude_normalized,flow_magnitude_normalized*0, flow_magnitude_normalized*0), axis=-1) |
|
|
| |
| def object_detection_stream(classes = ""): |
| if classes.strip() == "": |
| classes = "one bird, one airplane, one kite,a flying object,sky" |
| classes_list = classes.split(",") |
| global FLAGS, fall_back_frame, model |
| model.set_classes(classes_list) |
| |
| detected_frame = fall_back_frame.copy() |
| while True: |
| if len(detection_stack)>0: |
| FLAGS["OBJECT_DETECTING"] = True |
| curr_frame, prev_frame, flow_magnitude_normalized = detection_stack.pop() |
| frame = curr_frame |
| start_time = time.time() |
| detected_frame = detect_birds(frame) |
| end_time = time.time() |
| object_detection_runtime.append(end_time - start_time) |
| FLAGS["OBJECT_DETECTING"] = False |
| yield detected_frame |
| FLAGS["OBJECT_DETECTING"] = False |
|
|
| def change_detection_stream(useless_var = None): |
| detected_frame = fall_back_frame.copy() |
| while True: |
| if len(detection_stack)>0: |
| FLAGS["OBJECT_DETECTING"] = True |
| curr_frame, prev_frame, flow_magnitude_normalized = detection_stack.pop() |
| frame = curr_frame |
| start_time = time.time() |
| ret, thresh = cv2.threshold((flow_magnitude_normalized*255).astype(np.uint8), |
| 127, 255, 0) |
| contours_tuple= cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) |
| contours = contours_tuple[0] if len(contours_tuple) == 2 else contours_tuple[1] |
| detected_frame = frame.copy() |
| for contour in contours: |
| x, y, w, h = cv2.boundingRect(contour) |
| cv2.rectangle(detected_frame, (x, y), (x + w, y + h), (0, 255, 0), 2) |
| end_time = time.time() |
| change_detection_runtime.append(end_time - start_time) |
| FLAGS["OBJECT_DETECTING"] = False |
| yield detected_frame |
| FLAGS["OBJECT_DETECTING"] = False |
|
|
| def video_stream(frame_rate = ""): |
| if frame_rate.strip() == "": |
| frame_rate = 2.0 |
| else: |
| frame_rate = float(frame_rate) |
| if len(CAP) > 0: |
| while True: |
| cap = cv2.VideoCapture(CAP[-1]) |
| ret, frame = cap.read() |
| while ret: |
| frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
| frame_stack.append( |
| cv2.resize( |
| frame, |
| (WIDTH_STANDARD, HEIGHT_STANDARD) |
| ) |
| ) |
| yield frame |
| ret, frame = cap.read() |
| time.sleep(1/frame_rate) |
| else: |
| yield fall_back_frame |
|
|
| def yield_frame(s): |
| while True: |
| yield frame_stack[0] |
| def video_stream_HIKvision(video_address, frame_rate = ""): |
| if frame_rate.strip() == "": |
| frame_rate = 2.0 |
| else: |
| frame_rate = float(frame_rate) |
| cap = cv2.VideoCapture(video_address, cv2.CAP_FFMPEG) |
| while True: |
| ret, frame = cap.read() |
| if ret: |
| frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
| frame_stack.append( |
| cv2.resize( |
| frame, |
| (WIDTH_STANDARD, HEIGHT_STANDARD) |
| ) |
| ) |
| yield frame |
| ret, frame = cap.read() |
| time.sleep(1/frame_rate) |
| else: |
| yield fall_back_frame |
| |
| with gr.Blocks() as demo: |
| gr.Markdown("### Birds Detection Real Time (suitable for CPU Embedded Systems)") |
| with gr.Tab("Using a custom Video"): |
| with gr.Row(): |
| with gr.Column(): |
| with gr.Row(): |
| video = gr.Video(label="Video Source") |
| with gr.Row(): |
| examples = gr.Examples( |
| examples=EXAMPLE_VIDEOS_LIST, |
| inputs=[video], |
| ) |
| |
| with gr.Column(): |
| webcam_img = gr.Interface(video_stream, |
| inputs=gr.Textbox(label="Acquisition: Enter the frame rate", value = 2.0), |
| outputs="image") |
| with gr.Row(): |
| with gr.Column(): |
| optical_flow_img = gr.Interface(compute_optical_flow, |
| inputs=gr.Slider(label="Optical Flow: Noise Tolerance", minimum=0.0, maximum=1.0, value=0.4), |
| outputs=gr.Image(), |
| ) |
| with gr.Column(): |
| detection_img = gr.Interface(object_detection_stream, |
| inputs=gr.Textbox(label="Classes: Enter the classes", value = "one bird, one airplane, one kite,a flying object,sky"), |
| outputs="image") |
| |
| video.change( |
| fn=lambda video: CAP.append(video), |
| inputs=[video], |
| ) |
| |
| with gr.Tab("Using a custom Video (Change Detection)"): |
| with gr.Row(): |
| with gr.Column(): |
| with gr.Row(): |
| video_CD = gr.Video(label="Video Source") |
| with gr.Row(): |
| examples_CD = gr.Examples( |
| examples=EXAMPLE_VIDEOS_LIST, |
| inputs=[video_CD], |
| ) |
| |
| with gr.Column(): |
| webcam_img_CD = gr.Interface(video_stream, |
| inputs=gr.Textbox(label="Acquisition: Enter the frame rate", value = 2.0), |
| outputs="image") |
| with gr.Row(): |
| with gr.Column(): |
| optical_flow_img_CD = gr.Interface(compute_optical_flow, |
| inputs=gr.Slider(label="Optical Flow: Noise Tolerance", minimum=0.0, maximum=1.0, value=0.4), |
| outputs=gr.Image(), |
| ) |
| with gr.Column(): |
| detection_img_CD = gr.Interface(change_detection_stream, |
| inputs=gr.Textbox(label="Change detection", value = "DUMMY"), |
| outputs="image") |
| |
| video_CD.change( |
| fn=lambda video: CAP.append(video), |
| inputs=[video_CD], |
| ) |
| |
| |
| with gr.Tab("Using a Real Time Camera"): |
| with gr.Row(): |
| |
| with gr.Column(): |
| webcam_img_RT = gr.Image(label="Webcam", sources="webcam") |
| webcam_img_RT.stream(lambda s: frame_stack.append( |
| cv2.resize( |
| s, |
| (WIDTH_STANDARD, HEIGHT_STANDARD) |
| ) |
| ), |
| webcam_img_RT, |
| time_limit=15, stream_every=1.0, |
| concurrency_limit=30 |
| ) |
| |
| with gr.Column(): |
| optical_flow_img_RT = gr.Interface(compute_optical_flow, |
| inputs=gr.Slider(label="Optical Flow: Noise Tolerance", minimum=0.0, maximum=1.0, value=0.4), |
| outputs="image", |
| ) |
| |
| |
| with gr.Row(): |
| |
| with gr.Column(): |
| detection_img_RT = gr.Interface(object_detection_stream, |
| inputs=gr.Textbox(label="Classes: Enter the classes", |
| value = "one bird, one airplane, one kite,a flying object,sky"), |
| outputs="image") |
| |
| |
|
|
| with gr.Tab("Using a Real Time Camera (Change Detection)"): |
| with gr.Row(): |
| |
| with gr.Column(): |
| webcam_img_RT_CD = gr.Image(label="Webcam", sources="webcam") |
| webcam_img_RT_CD.stream(lambda s: frame_stack.append( |
| cv2.resize( |
| s, |
| (WIDTH_STANDARD, HEIGHT_STANDARD) |
| ) |
| ), |
| webcam_img_RT_CD, |
| time_limit=15, stream_every=1.0, |
| concurrency_limit=30 |
| ) |
| |
| with gr.Column(): |
| optical_flow_img_RT_CD = gr.Interface(compute_optical_flow, |
| inputs=gr.Slider(label="Optical Flow: Noise Tolerance", minimum=0.0, maximum=1.0, value=0.4), |
| outputs="image", |
| ) |
| |
| |
| |
| with gr.Row(): |
| |
| with gr.Column(): |
| detection_img_RT_CD = gr.Interface(change_detection_stream, |
| inputs=gr.Textbox(label="Changes will be detected here", |
| value = "DUMMY"), |
| outputs="image") |
| |
| with gr.Tab("Using a Hikvision Camera"): |
| with gr.Row(): |
| with gr.Column(): |
| with gr.Row(): |
| video_address = gr.Textbox(label="Video Source Address") |
| with gr.Row(): |
| example_addresses = gr.Examples( |
| examples=EXAMPLE_VIDEOS_LIST+[ |
| 'rtsp://admin:Admin123@192.168.254.200:554/Streaming/Channels/101', |
| 'rtsp://admin:Admin123@192.168.254.201:554/Streaming/Channels/101', |
| 'rtsp://admin:Admin123@192.168.254.202:554/Streaming/Channels/101', |
| 'rtsp://admin:Admin123@192.168.254.203:554/Streaming/Channels/101' |
| ], |
| inputs=[video_address], |
| ) |
| |
| with gr.Column(): |
| webcam_img_HIKvision = gr.Interface(video_stream_HIKvision, |
| inputs=[video_address, gr.Textbox(label="Acquisition: Enter the frame rate", value = 2.0)], |
| outputs="image") |
| with gr.Row(): |
| with gr.Column(): |
| optical_flow_img = gr.Interface(compute_optical_flow, |
| inputs=gr.Slider(label="Optical Flow: Noise Tolerance", minimum=0.0, maximum=1.0, value=0.4), |
| outputs=gr.Image(), |
| ) |
| with gr.Column(): |
| detection_img = gr.Interface(object_detection_stream, |
| inputs=gr.Textbox(label="Classes: Enter the classes", value = "one bird, one airplane, one kite,a flying object,sky"), |
| outputs="image") |
| |
| with gr.Tab("Using a Hikvision Camera (Change Detection)"): |
| with gr.Row(): |
| with gr.Column(): |
| with gr.Row(): |
| video_address_CD = gr.Textbox(label="Hikvision Camera Address (RTSP)") |
| with gr.Row(): |
| example_addresses_CD = gr.Examples( |
| examples=EXAMPLE_VIDEOS_LIST+[ |
| 'rtsp://admin:Admin123@192.168.254.200:554/Streaming/Channels/101', |
| 'rtsp://admin:Admin123@192.168.254.201:554/Streaming/Channels/101', |
| 'rtsp://admin:Admin123@192.168.254.202:554/Streaming/Channels/101', |
| 'rtsp://admin:Admin123@192.168.254.203:554/Streaming/Channels/101' |
| ], |
| inputs=[video_address_CD], |
| ) |
|
|
| with gr.Column(): |
| hikvision_stream_CD = gr.Interface( |
| video_stream_HIKvision, |
| inputs=[ |
| video_address_CD, |
| gr.Textbox(label="Acquisition: Enter the frame rate", value=2.0) |
| ], |
| outputs="image" |
| ) |
|
|
| with gr.Row(): |
| with gr.Column(): |
| optical_flow_img_HIK_CD = gr.Interface( |
| compute_optical_flow, |
| inputs=gr.Slider(label="Optical Flow: Noise Tolerance", minimum=0.0, maximum=1.0, value=0.4), |
| outputs="image" |
| ) |
|
|
| with gr.Column(): |
| detection_img_HIK_CD = gr.Interface( |
| change_detection_stream, |
| inputs=gr.Textbox(label="Changes will be detected here", value="DUMMY"), |
| outputs="image" |
| ) |
| |
| |
| |
| with gr.Tab("Runtime Histograms"): |
| def plot_histogram(data, title, color): |
| plt.figure(figsize=(9, 5)) |
| plt.hist(data, bins=30, color=color, alpha=0.7) |
| plt.title(title) |
| plt.xlabel('Runtime (seconds)') |
| plt.ylabel('Frequency') |
| plt.grid(True) |
| plt.tight_layout() |
| filename = title.replace(" ", "_").lower() + ".png" |
| plt.savefig(filename) |
| if os.path.exists(filename): |
| img_plt = cv2.imread(filename) |
| return img_plt |
| else: |
| return np.zeros((256, 256, 3), dtype=np.uint8) + 127 |
|
|
| def update_optical_flow_plot(): |
| return plot_histogram(np.array(optical_flow_runtime), 'Histogram of Optical Flow Runtime', 'blue') |
|
|
| def update_object_detection_plot(): |
| return plot_histogram(object_detection_runtime, 'Histogram of Object Detection Runtime', 'green') |
|
|
| def update_change_detection_plot(): |
| return plot_histogram(change_detection_runtime, 'Histogram of Change Detection Runtime', 'red') |
|
|
| with gr.Row(): |
| optical_flow_image = gr.Image(update_optical_flow_plot, label="Optical Flow Runtime Histogram") |
| with gr.Row(): |
| optical_flow_button = gr.Button("Update Optical Flow Histogram") |
| optical_flow_button.click(fn=update_optical_flow_plot, outputs=optical_flow_image) |
| with gr.Row(): |
| object_detection_image = gr.Image(update_object_detection_plot, label="Object Detection Runtime Histogram") |
| with gr.Row(): |
| object_detection_button = gr.Button("Update Object Detection Histogram") |
| object_detection_button.click(fn=update_object_detection_plot, outputs=object_detection_image) |
| with gr.Row(): |
| change_detection_image = gr.Image(update_change_detection_plot, label="Change Detection Runtime Histogram") |
| with gr.Row(): |
| change_detection_button = gr.Button("Update Change Detection Histogram") |
| change_detection_button.click(fn=update_change_detection_plot, outputs=change_detection_image) |
| demo.launch(debug=True) |
|
|