Rausda6 commited on
Commit
107e7c3
·
verified ·
1 Parent(s): a7a2c3b

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +395 -478
app.py CHANGED
@@ -9,10 +9,8 @@ from typing import Dict, Generator, List, Optional, Tuple, Union
9
 
10
  import cv2
11
  import gradio as gr
12
- import imageio.v2 as imageio
13
  import numpy as np
14
  from PIL import Image, ImageDraw
15
- from scipy import ndimage
16
  from skimage.restoration import richardson_lucy
17
 
18
  try:
@@ -33,19 +31,18 @@ class FrameMeta:
33
  idx: int
34
  path: Path
35
  sharpness: float
36
- dm_score: float
37
 
38
 
39
  @dataclass
40
- class CropResult:
41
  frame_idx: int
42
  bbox: Tuple[int, int, int, int]
43
  crop: np.ndarray
 
44
  score: float
45
- decoded_text: Optional[str] = None
46
 
47
 
48
- # --------------------- utility ---------------------
49
 
50
  def ensure_dir(path: Path) -> Path:
51
  path.mkdir(parents=True, exist_ok=True)
@@ -65,13 +62,6 @@ def resolve_video_path(video_input: Union[str, Dict, None]) -> str:
65
  raise ValueError("Unsupported video input format from Gradio.")
66
 
67
 
68
- def load_frame(path: Path) -> np.ndarray:
69
- frame = cv2.imread(str(path), cv2.IMREAD_COLOR)
70
- if frame is None:
71
- raise RuntimeError(f"Could not read frame: {path}")
72
- return frame
73
-
74
-
75
  def get_video_info(video_path: str) -> Dict[str, float]:
76
  cap = cv2.VideoCapture(video_path)
77
  if not cap.isOpened():
@@ -88,118 +78,95 @@ def get_video_info(video_path: str) -> Dict[str, float]:
88
  return info
89
 
90
 
91
- def gray(img_bgr: np.ndarray) -> np.ndarray:
92
- return cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY)
93
-
94
-
95
- def lap_var(img_bgr: np.ndarray) -> float:
96
- return float(cv2.Laplacian(gray(img_bgr), cv2.CV_32F).var())
97
 
98
 
99
- def clahe_gray(g: np.ndarray) -> np.ndarray:
100
- clahe = cv2.createCLAHE(clipLimit=2.2, tileGridSize=(8, 8))
101
- return clahe.apply(g)
102
 
103
 
104
- def clahe_bgr(img_bgr: np.ndarray) -> np.ndarray:
105
  lab = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2LAB)
106
  l, a, b = cv2.split(lab)
107
- l = clahe_gray(l)
108
- return cv2.cvtColor(cv2.merge([l, a, b]), cv2.COLOR_LAB2BGR)
 
109
 
110
 
111
- def unsharp(img_bgr: np.ndarray, sigma: float = 1.0, amount: float = 1.2) -> np.ndarray:
112
  blur = cv2.GaussianBlur(img_bgr, (0, 0), sigmaX=sigma, sigmaY=sigma)
113
  out = cv2.addWeighted(img_bgr, 1.0 + amount, blur, -amount, 0)
114
  return np.clip(out, 0, 255).astype(np.uint8)
115
 
116
 
117
- def upscale(img_bgr: np.ndarray, scale: int) -> np.ndarray:
118
  return cv2.resize(img_bgr, None, fx=scale, fy=scale, interpolation=cv2.INTER_CUBIC)
119
 
120
 
121
- def pad_bbox(bbox: Tuple[int, int, int, int], pad: int, shape_hw: Tuple[int, int]) -> Tuple[int, int, int, int]:
122
- x1, y1, x2, y2 = bbox
123
- h, w = shape_hw
124
- return max(0, x1 - pad), max(0, y1 - pad), min(w, x2 + pad), min(h, y2 + pad)
125
-
126
-
127
- def crop_img(img: np.ndarray, bbox: Tuple[int, int, int, int]) -> np.ndarray:
128
- x1, y1, x2, y2 = bbox
129
- return img[y1:y2, x1:x2]
130
-
131
-
132
- def bbox_center(bbox: Tuple[int, int, int, int]) -> Tuple[float, float]:
133
- x1, y1, x2, y2 = bbox
134
- return 0.5 * (x1 + x2), 0.5 * (y1 + y2)
135
-
136
-
137
- # --------------------- candidate search ---------------------
138
-
139
- def detect_datamatrix_candidates(frame_bgr: np.ndarray) -> List[Tuple[int, int, int, int, float]]:
140
- h, w = frame_bgr.shape[:2]
141
- g = gray(frame_bgr)
142
-
143
- # Restrict to plausible lower-central region where tool labels appear.
144
- x1 = int(0.15 * w)
145
- x2 = int(0.92 * w)
146
- y1 = int(0.16 * h)
147
- y2 = int(0.92 * h)
148
- roi = g[y1:y2, x1:x2]
149
-
150
- blackhat = cv2.morphologyEx(roi, cv2.MORPH_BLACKHAT, np.ones((11, 11), np.uint8))
151
- thr = cv2.adaptiveThreshold(
152
- blackhat,
153
- 255,
154
- cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
155
- cv2.THRESH_BINARY,
156
- 35,
157
- -4,
158
- )
159
- mask = cv2.morphologyEx(thr, cv2.MORPH_CLOSE, np.ones((5, 5), np.uint8), iterations=2)
160
- mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, np.ones((3, 3), np.uint8), iterations=1)
161
-
162
- contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
163
- out: List[Tuple[int, int, int, int, float]] = []
164
- roi_area = float(roi.shape[0] * roi.shape[1])
165
- for cnt in contours:
166
- x, y, bw, bh = cv2.boundingRect(cnt)
167
- area = bw * bh
168
- if area < 120 or area > 0.08 * roi_area:
169
- continue
170
- aspect = bw / max(1.0, float(bh))
171
- if not (0.65 <= aspect <= 1.45):
172
- continue
173
 
174
- patch = roi[y:y + bh, x:x + bw]
175
- if patch.size == 0:
176
- continue
177
 
178
- gx = cv2.Sobel(patch, cv2.CV_32F, 1, 0, ksize=3)
179
- gy = cv2.Sobel(patch, cv2.CV_32F, 0, 1, ksize=3)
180
- edge_balance = float(min(np.mean(np.abs(gx)), np.mean(np.abs(gy))))
181
- contrast = float(patch.std())
182
- sharp = float(cv2.Laplacian(patch, cv2.CV_32F).var())
183
- darkness = float(255.0 - np.mean(patch))
184
- score = 0.8 * contrast + 0.8 * sharp + 0.6 * edge_balance + 0.2 * darkness
 
 
 
 
 
 
 
 
185
 
186
- pad = int(max(bw, bh) * 0.45)
187
- xx1 = max(0, x1 + x - pad)
188
- yy1 = max(0, y1 + y - pad)
189
- xx2 = min(w, x1 + x + bw + pad)
190
- yy2 = min(h, y1 + y + bh + pad)
191
- out.append((xx1, yy1, xx2, yy2, score))
192
 
193
- out.sort(key=lambda t: t[4], reverse=True)
194
- return out[:12]
 
 
 
195
 
196
 
197
- def frame_dm_score(frame_bgr: np.ndarray) -> float:
198
- cands = detect_datamatrix_candidates(frame_bgr)
199
- return float(cands[0][4]) if cands else 0.0
 
 
 
 
 
 
 
 
200
 
201
 
202
- # --------------------- extraction ---------------------
203
 
204
  def extract_frames(video_path: str, out_dir: Path, stride: int = 1, max_frames: int = 0) -> List[FrameMeta]:
205
  cap = cv2.VideoCapture(video_path)
@@ -213,11 +180,10 @@ def extract_frames(video_path: str, out_dir: Path, stride: int = 1, max_frames:
213
  if not ok:
214
  break
215
  if idx % max(1, stride) == 0:
216
- sharp = lap_var(frame)
217
- dm = frame_dm_score(frame)
218
  frame_path = out_dir / f"frame_{idx:06d}.jpg"
219
  cv2.imwrite(str(frame_path), frame, [int(cv2.IMWRITE_JPEG_QUALITY), 95])
220
- records.append(FrameMeta(idx=idx, path=frame_path, sharpness=sharp, dm_score=dm))
221
  saved += 1
222
  if max_frames and saved >= max_frames:
223
  break
@@ -228,309 +194,275 @@ def extract_frames(video_path: str, out_dir: Path, stride: int = 1, max_frames:
228
  return records
229
 
230
 
231
- def choose_reference_frame(records: List[FrameMeta]) -> int:
232
- # Prefer frames with both candidate code texture and high sharpness.
233
- sharp = np.array([r.sharpness for r in records], np.float32)
234
- dm = np.array([r.dm_score for r in records], np.float32)
235
- sharp_n = (sharp - sharp.min()) / max(1e-6, float(sharp.max() - sharp.min()))
236
- dm_n = (dm - dm.min()) / max(1e-6, float(dm.max() - dm.min())) if float(dm.max()) > 0 else np.zeros_like(dm)
237
- combo = 0.55 * sharp_n + 0.45 * dm_n
238
- return int(np.argmax(combo))
239
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
240
 
241
- # --------------------- tracking and local fusion ---------------------
242
 
243
- def phase_shift(ref_gray: np.ndarray, mov_gray: np.ndarray) -> Tuple[float, float]:
244
- ref32 = np.float32(ref_gray)
245
- mov32 = np.float32(mov_gray)
246
- (shift_x, shift_y), _ = cv2.phaseCorrelate(ref32, mov32)
247
- return float(shift_x), float(shift_y)
 
 
 
 
248
 
249
 
250
- def align_local_crop(
251
- ref_frame: np.ndarray,
252
- frame: np.ndarray,
253
- bbox: Tuple[int, int, int, int],
254
- search_margin: int = 36,
255
- ) -> Optional[Tuple[np.ndarray, Tuple[float, float]]]:
256
- h, w = ref_frame.shape[:2]
257
- x1, y1, x2, y2 = bbox
258
- ref_crop = crop_img(ref_frame, bbox)
259
- if ref_crop.size == 0:
260
- return None
261
 
262
- sx1, sy1, sx2, sy2 = pad_bbox(bbox, search_margin, (h, w))
263
- search = crop_img(frame, (sx1, sy1, sx2, sy2))
264
- if search.shape[0] < ref_crop.shape[0] or search.shape[1] < ref_crop.shape[1]:
265
- return None
266
 
267
- ref_g = clahe_gray(gray(ref_crop))
268
- search_g = clahe_gray(gray(search))
269
-
270
- # Match on a central template to reduce border influence.
271
- rx1 = int(0.15 * ref_g.shape[1])
272
- ry1 = int(0.15 * ref_g.shape[0])
273
- rx2 = max(rx1 + 8, int(0.85 * ref_g.shape[1]))
274
- ry2 = max(ry1 + 8, int(0.85 * ref_g.shape[0]))
275
- templ = ref_g[ry1:ry2, rx1:rx2]
276
- res = cv2.matchTemplate(search_g, templ, cv2.TM_CCOEFF_NORMED)
277
- _, maxv, _, maxloc = cv2.minMaxLoc(res)
278
- if maxv < 0.15:
279
- return None
280
 
281
- top_left = (maxloc[0] - rx1, maxloc[1] - ry1)
282
- cx1 = sx1 + top_left[0]
283
- cy1 = sy1 + top_left[1]
284
- cx2 = cx1 + (x2 - x1)
285
- cy2 = cy1 + (y2 - y1)
286
- if cx1 < 0 or cy1 < 0 or cx2 > w or cy2 > h:
287
- return None
 
 
 
 
 
 
 
 
 
 
 
 
288
 
289
- coarse = crop_img(frame, (cx1, cy1, cx2, cy2))
290
- if coarse.shape[:2] != ref_crop.shape[:2]:
291
- return None
292
 
293
- try:
294
- dx, dy = phase_shift(ref_g, clahe_gray(gray(coarse)))
295
- except Exception:
296
- dx, dy = 0.0, 0.0
297
-
298
- M = np.array([[1.0, 0.0, -dx], [0.0, 1.0, -dy]], dtype=np.float32)
299
- aligned = cv2.warpAffine(coarse, M, (coarse.shape[1], coarse.shape[0]), flags=cv2.INTER_CUBIC, borderMode=cv2.BORDER_REFLECT)
300
- return aligned, (top_left[0] + dx, top_left[1] + dy)
301
-
302
-
303
- def sharpness_map(img_bgr: np.ndarray) -> np.ndarray:
304
- g = gray(img_bgr)
305
- lap = cv2.Laplacian(g, cv2.CV_32F, ksize=3)
306
- s = cv2.GaussianBlur(np.abs(lap), (0, 0), 1.0)
307
- return s + 1e-3
308
-
309
-
310
- def local_sharp_fusion(aligned_crops: List[np.ndarray]) -> np.ndarray:
311
- if len(aligned_crops) == 1:
312
- return aligned_crops[0]
313
- imgs = [x.astype(np.float32) for x in aligned_crops]
314
- maps = [sharpness_map(x) for x in aligned_crops]
315
- W = np.stack(maps, axis=0)
316
- W /= np.sum(W, axis=0, keepdims=True)
317
- I = np.stack(imgs, axis=0)
318
- fused = np.sum(I * W[..., None], axis=0)
319
- # blend slightly with pixelwise median for robustness.
320
- med = np.median(I, axis=0)
321
- out = 0.75 * fused + 0.25 * med
322
- return np.clip(out, 0, 255).astype(np.uint8)
323
 
 
 
 
324
 
325
- def fuse_crop_burst(
326
- records: List[FrameMeta],
327
- ref_pos: int,
328
- bbox: Tuple[int, int, int, int],
329
- radius: int = 6,
330
- max_neighbors: int = 11,
331
- ) -> Tuple[np.ndarray, List[int]]:
332
- ref_frame = load_frame(records[ref_pos].path)
333
- positions = list(range(max(0, ref_pos - radius), min(len(records), ref_pos + radius + 1)))
334
- positions = sorted(positions, key=lambda p: records[p].sharpness, reverse=True)[:max_neighbors]
335
- positions = sorted(positions)
336
  aligned: List[np.ndarray] = []
337
- used: List[int] = []
338
- ref_crop = crop_img(ref_frame, bbox)
339
- aligned.append(ref_crop)
340
- used.append(records[ref_pos].idx)
341
- for p in positions:
342
- if p == ref_pos:
343
- continue
344
- frame = load_frame(records[p].path)
345
- got = align_local_crop(ref_frame, frame, bbox)
346
- if got is None:
347
  continue
348
- crop_aligned, _ = got
349
- aligned.append(crop_aligned)
350
- used.append(records[p].idx)
351
- fused = local_sharp_fusion(aligned)
352
- return fused, used
353
 
 
 
 
 
 
 
 
 
354
 
355
- # --------------------- deblurring ---------------------
356
 
357
- def motion_kernel(length: int, angle_deg: float) -> np.ndarray:
358
- length = max(1, int(length))
359
- size = max(9, length * 2 + 1)
360
- kernel = np.zeros((size, size), np.float32)
361
- c = size // 2
362
- angle = math.radians(angle_deg)
363
- dx = math.cos(angle)
364
- dy = math.sin(angle)
365
- for i in range(length):
366
- t = i - (length - 1) / 2.0
367
- x = int(round(c + t * dx))
368
- y = int(round(c + t * dy))
369
- if 0 <= x < size and 0 <= y < size:
370
- kernel[y, x] = 1.0
371
- s = float(kernel.sum())
372
- if s <= 0:
373
- kernel[c, c] = 1.0
374
- s = 1.0
375
- return kernel / s
376
-
377
-
378
- def wiener_deconv_gray(gray_img: np.ndarray, kernel: np.ndarray, balance: float = 0.01) -> np.ndarray:
379
- gray_f = gray_img.astype(np.float32) / 255.0
380
- kh, kw = kernel.shape
381
- ih, iw = gray_f.shape
382
- psf = np.zeros_like(gray_f, dtype=np.float32)
383
- y0 = (ih - kh) // 2
384
- x0 = (iw - kw) // 2
385
- psf[y0:y0 + kh, x0:x0 + kw] = kernel
386
- psf = np.fft.ifftshift(psf)
387
- G = np.fft.fft2(gray_f)
388
- H = np.fft.fft2(psf)
389
- F_hat = (np.conj(H) / (np.abs(H) ** 2 + balance)) * G
390
- out = np.real(np.fft.ifft2(F_hat))
391
- out = np.clip(out, 0.0, 1.0)
392
- return (out * 255.0).astype(np.uint8)
393
 
 
 
 
 
 
 
 
394
 
395
- def try_decode(img_bgr: np.ndarray) -> Optional[str]:
396
- if zxingcpp is None:
397
- return None
398
- rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
399
- try:
400
- result = zxingcpp.read_barcode(rgb)
401
- if result is not None and getattr(result, "text", None):
402
- return str(result.text)
403
- except Exception:
404
- return None
405
- return None
406
 
 
 
 
 
 
 
 
 
 
 
 
 
 
407
 
408
- def variant_score(img_bgr: np.ndarray) -> float:
409
- g = gray(img_bgr)
410
- gx = cv2.Sobel(g, cv2.CV_32F, 1, 0, ksize=3)
411
- gy = cv2.Sobel(g, cv2.CV_32F, 0, 1, ksize=3)
412
- score = float(g.std() + 0.8 * cv2.Laplacian(g, cv2.CV_32F).var())
413
- score += 0.8 * float(min(np.mean(np.abs(gx)), np.mean(np.abs(gy))))
414
- return score
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
415
 
 
 
416
 
417
- def restore_code_crop(crop_bgr: np.ndarray) -> Tuple[np.ndarray, Optional[str], float]:
418
- variants: List[np.ndarray] = []
419
- base = crop_bgr
420
- variants.append(base)
421
- variants.append(unsharp(clahe_bgr(base), sigma=1.0, amount=1.1))
422
 
423
- # Denoise first, then upscale.
424
- den = cv2.fastNlMeansDenoisingColored(base, None, 4, 4, 7, 21)
425
- variants.append(den)
426
 
427
- # Use the stronger path on 3x scale for tiny code cells.
428
- for scale in (2, 3):
429
- up = upscale(den, scale)
430
- up = unsharp(clahe_bgr(up), sigma=1.0, amount=1.0)
431
- variants.append(up)
 
432
 
433
- g = gray(up)
434
- for angle in range(0, 180, 15):
435
- for length in (3, 5, 7, 9, 11, 13):
436
- k = motion_kernel(length, angle)
437
- try:
438
- rl = richardson_lucy(g.astype(np.float32) / 255.0, k, num_iter=12, clip=False)
439
- rl8 = np.clip(rl * 255.0, 0, 255).astype(np.uint8)
440
- variants.append(cv2.cvtColor(rl8, cv2.COLOR_GRAY2BGR))
441
- except Exception:
442
- pass
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
443
  try:
444
- wd = wiener_deconv_gray(g, k, balance=0.005)
445
- variants.append(cv2.cvtColor(wd, cv2.COLOR_GRAY2BGR))
 
 
 
 
 
446
  except Exception:
447
  pass
 
 
 
 
 
 
 
 
 
 
 
448
 
449
- # Threshold families after sharpening/deconvolution.
450
- for v in variants[-10:]:
451
- vg = gray(v)
452
- thr1 = cv2.adaptiveThreshold(vg, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 31, 5)
453
- thr2 = cv2.threshold(vg, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)[1]
454
- variants.append(cv2.cvtColor(thr1, cv2.COLOR_GRAY2BGR))
455
- variants.append(cv2.cvtColor(thr2, cv2.COLOR_GRAY2BGR))
456
-
457
- best_img = base
458
- best_score = -1e18
459
- best_text: Optional[str] = None
460
- seen = set()
461
- for v in variants:
462
- key = (v.shape[0], v.shape[1], int(v.mean()), int(v.std()))
463
- if key in seen:
464
- continue
465
- seen.add(key)
466
- text = try_decode(v)
467
- score = variant_score(v)
468
- if text:
469
- return v, text, score + 1e9
470
- if score > best_score:
471
- best_score = score
472
- best_img = v
473
- return best_img, best_text, best_score
474
-
475
-
476
- # --------------------- ruler and summary ---------------------
477
-
478
- def ruler_bbox(frame_bgr: np.ndarray) -> Tuple[int, int, int, int]:
479
- h, w = frame_bgr.shape[:2]
480
- return int(0.02 * w), int(0.08 * h), int(0.98 * w), int(0.26 * h)
481
 
482
 
483
- def annotate_frame(frame_bgr: np.ndarray, code_bbox: Optional[Tuple[int, int, int, int]]) -> np.ndarray:
484
- out = frame_bgr.copy()
485
- rx1, ry1, rx2, ry2 = ruler_bbox(out)
486
- cv2.rectangle(out, (rx1, ry1), (rx2, ry2), (0, 255, 255), 2)
487
- cv2.putText(out, "Ruler ROI", (rx1, max(20, ry1 - 10)), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (0, 255, 255), 2, cv2.LINE_AA)
488
- if code_bbox is not None:
489
- x1, y1, x2, y2 = code_bbox
490
- cv2.rectangle(out, (x1, y1), (x2, y2), (0, 200, 0), 2)
491
- cv2.putText(out, "Best code ROI", (x1, max(20, y1 - 10)), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (0, 200, 0), 2, cv2.LINE_AA)
492
- return out
493
 
 
 
494
 
495
- def bgr_to_pil(img: np.ndarray) -> Image.Image:
496
- return Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
497
 
498
-
499
- def make_contact_sheet(items: List[Tuple[str, np.ndarray]], out_path: Path) -> Path:
500
- tiles = []
501
- for title, img in items:
502
  pil = bgr_to_pil(img)
503
  pil.thumbnail((520, 520))
504
- canvas = Image.new("RGB", (pil.width, pil.height + 36), (255, 255, 255))
505
- canvas.paste(pil, (0, 36))
506
  draw = ImageDraw.Draw(canvas)
507
  draw.text((8, 8), title, fill=(0, 0, 0))
508
- tiles.append(canvas)
509
 
510
  cols = 2
511
- rows = int(math.ceil(len(tiles) / cols))
512
- cell_w = max(t.width for t in tiles)
513
- cell_h = max(t.height for t in tiles)
514
  sheet = Image.new("RGB", (cols * cell_w, rows * cell_h), (245, 245, 245))
515
- for i, tile in enumerate(tiles):
516
  x = (i % cols) * cell_w
517
  y = (i // cols) * cell_h
518
- sheet.paste(tile, (x, y))
519
  sheet.save(out_path)
520
  return out_path
521
 
522
 
523
  def write_video(frames: List[np.ndarray], out_path: Path, fps: float) -> Path:
524
- writer = imageio.get_writer(str(out_path), fps=max(1.0, fps), codec="libx264", quality=8)
 
 
 
 
 
 
 
 
525
  try:
526
- for fr in frames:
527
- writer.append_data(cv2.cvtColor(fr, cv2.COLOR_BGR2RGB))
 
 
 
 
528
  finally:
529
- writer.close()
530
  return out_path
531
 
532
 
533
- # --------------------- main pipeline ---------------------
534
 
535
  def process_video(
536
  video_input: Union[str, Dict, None],
@@ -538,161 +470,146 @@ def process_video(
538
  stride: int,
539
  max_frames: int,
540
  burst_radius: int,
541
- ) -> Generator[Tuple[Optional[str], Optional[str], Optional[str], str, str], None, None]:
542
  logs: List[str] = []
543
 
544
- def emit(msg: str) -> Tuple[Optional[str], Optional[str], Optional[str], str, str]:
545
  logs.append(msg)
546
- return None, None, None, "", "\n".join(logs)
547
 
548
  try:
549
  video_path = resolve_video_path(video_input)
550
- work = ensure_dir(Path(tempfile.mkdtemp(prefix="motion_deblur_")))
551
- frames_dir = ensure_dir(work / "frames")
552
- outputs_dir = ensure_dir(work / "outputs")
 
553
 
554
- yield emit(f"Workspace: {work}")
555
  info = get_video_info(video_path)
556
  yield emit("Input video info: " + json.dumps(info, indent=2))
557
 
 
558
  yield emit("Starting frame extraction ...")
559
- records = extract_frames(video_path, frames_dir, stride=max(1, int(stride)), max_frames=max(0, int(max_frames)))
560
  yield emit(f"Extracted {len(records)} frame(s).")
561
 
562
- ref_pos = choose_reference_frame(records)
563
  ref_record = records[ref_pos]
564
- ref_frame = load_frame(ref_record.path)
565
- yield emit(f"Selected reference frame: {ref_record.idx} (sharpness={ref_record.sharpness:.1f}, dm_score={ref_record.dm_score:.1f})")
566
-
567
- candidates = detect_datamatrix_candidates(ref_frame)
568
- if not candidates:
569
- yield emit("No strong code candidate in reference frame. Searching neighboring high-score frames ...")
570
- ranked_positions = sorted(range(len(records)), key=lambda i: (records[i].dm_score, records[i].sharpness), reverse=True)[:12]
571
- best_cands = []
572
- for pos in ranked_positions:
573
- fr = load_frame(records[pos].path)
574
- cands = detect_datamatrix_candidates(fr)
575
- if cands:
576
- best_cands = cands
577
- ref_pos = pos
578
- ref_record = records[pos]
579
- ref_frame = fr
580
- candidates = cands
581
- yield emit(f"Switched reference frame to: {ref_record.idx}")
582
- break
583
  if not candidates:
584
- raise RuntimeError("No plausible DataMatrix candidate was found in the video.")
585
-
586
- best_overall: Optional[CropResult] = None
587
- best_code_image: Optional[np.ndarray] = None
588
-
589
- max_candidates = 3 if mode == "Advanced stable (recommended)" else 1
590
- for ci, cand in enumerate(candidates[:max_candidates], start=1):
591
- x1, y1, x2, y2, score = cand
592
- bbox = (x1, y1, x2, y2)
593
- yield emit(f"Processing code candidate {ci}/{min(max_candidates, len(candidates))} at bbox={bbox}")
594
-
595
- crop_pad = max(12, int(0.35 * max(x2 - x1, y2 - y1)))
596
- burst_bbox = pad_bbox(bbox, crop_pad, ref_frame.shape[:2])
597
- fused_crop, used = fuse_crop_burst(records, ref_pos, burst_bbox, radius=max(1, int(burst_radius)), max_neighbors=11)
598
- yield emit(f"Local burst fusion used {len(used)} frame(s): {used}")
599
-
600
- if mode == "Fast stable":
601
- restored = unsharp(clahe_bgr(fused_crop), sigma=1.0, amount=1.0)
602
- decoded = try_decode(restored)
603
- score2 = variant_score(restored) + (1e9 if decoded else 0.0)
604
  else:
605
- yield emit("Running aggressive code restoration sweep ...")
606
- restored, decoded, score2 = restore_code_crop(fused_crop)
607
-
608
- result = CropResult(frame_idx=ref_record.idx, bbox=burst_bbox, crop=restored, score=score + score2, decoded_text=decoded)
609
- if best_overall is None or result.score > best_overall.score:
610
- best_overall = result
611
- best_code_image = restored
612
-
613
- if decoded:
614
- yield emit(f"Decoded DataMatrix text: {decoded}")
615
- break
616
-
617
- if best_overall is None or best_code_image is None:
618
- raise RuntimeError("No code crop could be reconstructed.")
619
-
620
- yield emit("Reconstructing ruler crop with local burst fusion ...")
621
- rb = ruler_bbox(ref_frame)
622
- ruler_crop, ruler_used = fuse_crop_burst(records, ref_pos, rb, radius=max(1, int(burst_radius)), max_neighbors=9)
623
- ruler_crop = unsharp(clahe_bgr(ruler_crop), sigma=1.0, amount=1.1)
624
- yield emit(f"Ruler burst fusion used {len(ruler_used)} frame(s): {ruler_used}")
625
-
626
- annotated = annotate_frame(ref_frame, best_overall.bbox)
627
- summary_path = outputs_dir / "summary.png"
628
- code_path = outputs_dir / "best_code.png"
629
- cv2.imwrite(str(code_path), best_code_image)
630
  make_contact_sheet(
631
  [
632
- ("Reference frame with ROIs", annotated),
633
- ("Fused ruler crop", ruler_crop),
634
- ("Best restored code crop", best_code_image),
635
- ("Reference crop before restoration", crop_img(ref_frame, best_overall.bbox)),
636
  ],
637
  summary_path,
638
  )
639
  yield emit(f"Summary image written: {summary_path}")
640
 
641
  yield emit("Writing enhanced review video ...")
642
- preview_frames: List[np.ndarray] = []
643
- for rec in records:
644
- fr = load_frame(rec.path)
645
- preview_frames.append(annotate_frame(unsharp(clahe_bgr(fr), sigma=1.0, amount=0.8), best_overall.bbox))
646
- out_video = outputs_dir / "enhanced_review.mp4"
647
- write_video(preview_frames, out_video, fps=float(info.get("fps", 15.0) or 15.0))
 
648
  yield emit(f"Enhanced review video written: {out_video}")
649
 
650
- decoded_text = best_overall.decoded_text or "No decode yet. Best restored crop exported for manual review."
651
- logs_text = "\n".join(logs)
652
- yield str(out_video), str(summary_path), str(code_path), decoded_text, logs_text
653
- except Exception as exc:
654
- logs.append(f"Error: {type(exc).__name__}: {exc}")
655
- raise gr.Error("\n".join(logs))
656
-
657
 
658
- # --------------------- UI ---------------------
 
 
659
 
660
- def build_demo() -> gr.Blocks:
661
- with gr.Blocks(title="Motion Blur Recovery for Tool Video") as demo:
662
- gr.Markdown(
663
- "# Motion Blur Recovery for Tool Video\n"
664
- "Stable hybrid pipeline focused on the ruler and the DataMatrix region.\n"
665
- "Use **Advanced stable** for stronger local reconstruction."
666
- )
667
- with gr.Row():
668
- with gr.Column(scale=1):
669
- video_in = gr.Video(label="Input video")
670
- mode = gr.Dropdown(
671
- choices=["Advanced stable (recommended)", "Fast stable"],
672
- value="Advanced stable (recommended)",
673
- label="Processing mode",
674
- )
675
- stride = gr.Slider(1, 4, value=1, step=1, label="Frame stride")
676
- max_frames = gr.Slider(0, 300, value=0, step=1, label="Max frames (0 = all)")
677
- burst_radius = gr.Slider(2, 10, value=6, step=1, label="Neighbor radius for local burst fusion")
678
- run_btn = gr.Button("Process video", variant="primary")
679
- with gr.Column(scale=1):
680
- out_video = gr.Video(label="Enhanced review video")
681
- out_summary = gr.Image(label="Summary image", type="filepath")
682
- out_code = gr.Image(label="Best restored code crop", type="filepath")
683
- decoded = gr.Textbox(label="Decoded text / status")
684
- logs = gr.Textbox(label="Log", lines=20)
685
-
686
- run_btn.click(
687
- fn=process_video,
688
- inputs=[video_in, mode, stride, max_frames, burst_radius],
689
- outputs=[out_video, out_summary, out_code, decoded, logs],
690
- )
691
- return demo
692
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
693
 
694
- demo = build_demo()
695
 
696
  if __name__ == "__main__":
697
  demo.launch(server_name="0.0.0.0", server_port=7860, ssr_mode=False)
698
-
 
9
 
10
  import cv2
11
  import gradio as gr
 
12
  import numpy as np
13
  from PIL import Image, ImageDraw
 
14
  from skimage.restoration import richardson_lucy
15
 
16
  try:
 
31
  idx: int
32
  path: Path
33
  sharpness: float
 
34
 
35
 
36
  @dataclass
37
+ class DetectionResult:
38
  frame_idx: int
39
  bbox: Tuple[int, int, int, int]
40
  crop: np.ndarray
41
+ decode_text: Optional[str]
42
  score: float
 
43
 
44
 
45
+ # ---------- helpers ----------
46
 
47
  def ensure_dir(path: Path) -> Path:
48
  path.mkdir(parents=True, exist_ok=True)
 
62
  raise ValueError("Unsupported video input format from Gradio.")
63
 
64
 
 
 
 
 
 
 
 
65
  def get_video_info(video_path: str) -> Dict[str, float]:
66
  cap = cv2.VideoCapture(video_path)
67
  if not cap.isOpened():
 
78
  return info
79
 
80
 
81
+ def load_frame(path: Path) -> np.ndarray:
82
+ frame = cv2.imread(str(path), cv2.IMREAD_COLOR)
83
+ if frame is None:
84
+ raise RuntimeError(f"Could not read frame: {path}")
85
+ return frame
 
86
 
87
 
88
+ def laplacian_sharpness(frame_bgr: np.ndarray) -> float:
89
+ gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
90
+ return float(cv2.Laplacian(gray, cv2.CV_32F).var())
91
 
92
 
93
+ def clahe_l_channel(img_bgr: np.ndarray) -> np.ndarray:
94
  lab = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2LAB)
95
  l, a, b = cv2.split(lab)
96
+ clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
97
+ l2 = clahe.apply(l)
98
+ return cv2.cvtColor(cv2.merge([l2, a, b]), cv2.COLOR_LAB2BGR)
99
 
100
 
101
+ def unsharp_mask(img_bgr: np.ndarray, sigma: float = 1.0, amount: float = 1.2) -> np.ndarray:
102
  blur = cv2.GaussianBlur(img_bgr, (0, 0), sigmaX=sigma, sigmaY=sigma)
103
  out = cv2.addWeighted(img_bgr, 1.0 + amount, blur, -amount, 0)
104
  return np.clip(out, 0, 255).astype(np.uint8)
105
 
106
 
107
+ def upscale(img_bgr: np.ndarray, scale: int = 3) -> np.ndarray:
108
  return cv2.resize(img_bgr, None, fx=scale, fy=scale, interpolation=cv2.INTER_CUBIC)
109
 
110
 
111
+ def motion_kernel(length: int, angle_deg: float) -> np.ndarray:
112
+ length = max(1, int(length))
113
+ size = max(9, length * 2 + 1)
114
+ kernel = np.zeros((size, size), np.float32)
115
+ c = size // 2
116
+ angle = math.radians(angle_deg)
117
+ dx = math.cos(angle)
118
+ dy = math.sin(angle)
119
+ for i in range(length):
120
+ t = i - (length - 1) / 2.0
121
+ x = int(round(c + t * dx))
122
+ y = int(round(c + t * dy))
123
+ if 0 <= x < size and 0 <= y < size:
124
+ kernel[y, x] = 1.0
125
+ s = float(kernel.sum())
126
+ if s <= 0:
127
+ kernel[c, c] = 1.0
128
+ s = 1.0
129
+ return kernel / s
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
130
 
 
 
 
131
 
132
+ def wiener_deconv_gray(gray: np.ndarray, kernel: np.ndarray, balance: float = 0.02) -> np.ndarray:
133
+ gray_f = gray.astype(np.float32) / 255.0
134
+ kh, kw = kernel.shape
135
+ ih, iw = gray_f.shape
136
+ psf = np.zeros_like(gray_f, dtype=np.float32)
137
+ y0 = (ih - kh) // 2
138
+ x0 = (iw - kw) // 2
139
+ psf[y0:y0 + kh, x0:x0 + kw] = kernel
140
+ psf = np.fft.ifftshift(psf)
141
+ G = np.fft.fft2(gray_f)
142
+ H = np.fft.fft2(psf)
143
+ F_hat = (np.conj(H) / (np.abs(H) ** 2 + balance)) * G
144
+ out = np.real(np.fft.ifft2(F_hat))
145
+ out = np.clip(out, 0.0, 1.0)
146
+ return (out * 255.0).astype(np.uint8)
147
 
 
 
 
 
 
 
148
 
149
+ def richardson_lucy_gray(gray: np.ndarray, kernel: np.ndarray, iterations: int = 15) -> np.ndarray:
150
+ arr = gray.astype(np.float32) / 255.0
151
+ out = richardson_lucy(arr, kernel, num_iter=iterations, clip=False)
152
+ out = np.clip(out, 0.0, 1.0)
153
+ return (out * 255.0).astype(np.uint8)
154
 
155
 
156
+ def try_decode_datamatrix(img_bgr: np.ndarray) -> Optional[str]:
157
+ if zxingcpp is None:
158
+ return None
159
+ rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
160
+ try:
161
+ result = zxingcpp.read_barcode(rgb)
162
+ if result is not None and getattr(result, "text", None):
163
+ return str(result.text)
164
+ except Exception:
165
+ return None
166
+ return None
167
 
168
 
169
+ # ---------- frame extraction ----------
170
 
171
  def extract_frames(video_path: str, out_dir: Path, stride: int = 1, max_frames: int = 0) -> List[FrameMeta]:
172
  cap = cv2.VideoCapture(video_path)
 
180
  if not ok:
181
  break
182
  if idx % max(1, stride) == 0:
183
+ sharp = laplacian_sharpness(frame)
 
184
  frame_path = out_dir / f"frame_{idx:06d}.jpg"
185
  cv2.imwrite(str(frame_path), frame, [int(cv2.IMWRITE_JPEG_QUALITY), 95])
186
+ records.append(FrameMeta(idx=idx, path=frame_path, sharpness=sharp))
187
  saved += 1
188
  if max_frames and saved >= max_frames:
189
  break
 
194
  return records
195
 
196
 
197
+ # ---------- alignment / fusion ----------
 
 
 
 
 
 
 
198
 
199
+ def estimate_affine_to_ref(ref_bgr: np.ndarray, img_bgr: np.ndarray, scale: float = 0.5) -> np.ndarray:
200
+ ref_gray = cv2.cvtColor(ref_bgr, cv2.COLOR_BGR2GRAY)
201
+ img_gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY)
202
+ if scale != 1.0:
203
+ ref_gray = cv2.resize(ref_gray, None, fx=scale, fy=scale, interpolation=cv2.INTER_AREA)
204
+ img_gray = cv2.resize(img_gray, None, fx=scale, fy=scale, interpolation=cv2.INTER_AREA)
205
+ ref_gray = cv2.equalizeHist(ref_gray)
206
+ img_gray = cv2.equalizeHist(img_gray)
207
+ warp = np.eye(2, 3, dtype=np.float32)
208
+ criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 80, 1e-4)
209
+ try:
210
+ cv2.findTransformECC(ref_gray, img_gray, warp, cv2.MOTION_EUCLIDEAN, criteria, None, 1)
211
+ if scale != 1.0:
212
+ warp[:, 2] /= scale
213
+ return warp
214
+ except Exception:
215
+ return np.eye(2, 3, dtype=np.float32)
216
 
 
217
 
218
+ def warp_to_ref(img_bgr: np.ndarray, warp: np.ndarray, out_shape: Tuple[int, int]) -> np.ndarray:
219
+ h, w = out_shape
220
+ return cv2.warpAffine(
221
+ img_bgr,
222
+ warp,
223
+ (w, h),
224
+ flags=cv2.INTER_LINEAR + cv2.WARP_INVERSE_MAP,
225
+ borderMode=cv2.BORDER_REPLICATE,
226
+ )
227
 
228
 
229
+ def choose_reference_index(records: List[FrameMeta]) -> int:
230
+ ranked = sorted(enumerate(records), key=lambda t: t[1].sharpness, reverse=True)
231
+ return ranked[0][0]
 
 
 
 
 
 
 
 
232
 
 
 
 
 
233
 
234
+ def fuse_global_burst(records: List[FrameMeta], ref_pos: int, radius: int = 5) -> Tuple[np.ndarray, List[int]]:
235
+ left = max(0, ref_pos - radius)
236
+ right = min(len(records), ref_pos + radius + 1)
237
+ selected = records[left:right]
238
+ ref_record = records[ref_pos]
239
+ ref = load_frame(ref_record.path)
240
+ h, w = ref.shape[:2]
 
 
 
 
 
 
241
 
242
+ aligned: List[np.ndarray] = []
243
+ weights: List[float] = []
244
+ used: List[int] = []
245
+ for record in selected:
246
+ img = load_frame(record.path)
247
+ warp = estimate_affine_to_ref(ref, img)
248
+ aligned_img = warp_to_ref(img, warp, (h, w)).astype(np.float32)
249
+ aligned.append(aligned_img)
250
+ weights.append(max(1e-3, record.sharpness))
251
+ used.append(record.idx)
252
+
253
+ w_arr = np.array(weights, dtype=np.float32)
254
+ w_arr /= np.sum(w_arr)
255
+ fused = np.zeros_like(aligned[0], dtype=np.float32)
256
+ for arr, wgt in zip(aligned, w_arr):
257
+ fused += arr * wgt
258
+ fused = np.clip(fused, 0, 255).astype(np.uint8)
259
+ fused = unsharp_mask(clahe_l_channel(fused), sigma=1.0, amount=1.0)
260
+ return fused, used
261
 
 
 
 
262
 
263
+ def fuse_local_crop(records: List[FrameMeta], ref_pos: int, bbox: Tuple[int, int, int, int], radius: int = 6) -> np.ndarray:
264
+ x1, y1, x2, y2 = bbox
265
+ ref = load_frame(records[ref_pos].path)
266
+ ref_crop = ref[y1:y2, x1:x2]
267
+ if ref_crop.size == 0:
268
+ return ref_crop
269
+ h, w = ref_crop.shape[:2]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
270
 
271
+ left = max(0, ref_pos - radius)
272
+ right = min(len(records), ref_pos + radius + 1)
273
+ selected = records[left:right]
274
 
 
 
 
 
 
 
 
 
 
 
 
275
  aligned: List[np.ndarray] = []
276
+ weights: List[float] = []
277
+ for record in selected:
278
+ img = load_frame(record.path)
279
+ crop = img[y1:y2, x1:x2]
280
+ if crop.shape[:2] != (h, w):
 
 
 
 
 
281
  continue
282
+ warp = estimate_affine_to_ref(ref_crop, crop, scale=1.0)
283
+ aligned_crop = warp_to_ref(crop, warp, (h, w)).astype(np.float32)
284
+ aligned.append(aligned_crop)
285
+ weights.append(max(1e-3, record.sharpness))
 
286
 
287
+ if not aligned:
288
+ return ref_crop
289
+ w_arr = np.array(weights, dtype=np.float32)
290
+ w_arr /= np.sum(w_arr)
291
+ fused = np.zeros_like(aligned[0], dtype=np.float32)
292
+ for arr, wgt in zip(aligned, w_arr):
293
+ fused += arr * wgt
294
+ return np.clip(fused, 0, 255).astype(np.uint8)
295
 
 
296
 
297
+ # ---------- detection ----------
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
298
 
299
+ def ruler_bbox(frame_bgr: np.ndarray) -> Tuple[int, int, int, int]:
300
+ h, w = frame_bgr.shape[:2]
301
+ x1 = int(w * 0.02)
302
+ x2 = int(w * 0.98)
303
+ y1 = int(h * 0.08)
304
+ y2 = int(h * 0.24)
305
+ return x1, y1, x2, y2
306
 
 
 
 
 
 
 
 
 
 
 
 
307
 
308
+ def detect_datamatrix_candidates(frame_bgr: np.ndarray) -> List[Tuple[int, int, int, int, float]]:
309
+ h, w = frame_bgr.shape[:2]
310
+ gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
311
+ x1 = int(w * 0.20)
312
+ x2 = int(w * 0.88)
313
+ y1 = int(h * 0.14)
314
+ y2 = int(h * 0.92)
315
+ roi = gray[y1:y2, x1:x2]
316
+
317
+ blackhat = cv2.morphologyEx(roi, cv2.MORPH_BLACKHAT, np.ones((9, 9), np.uint8))
318
+ thr = cv2.adaptiveThreshold(blackhat, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 35, -3)
319
+ mask = cv2.morphologyEx(thr, cv2.MORPH_CLOSE, np.ones((5, 5), np.uint8), iterations=2)
320
+ mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, np.ones((3, 3), np.uint8), iterations=1)
321
 
322
+ contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
323
+ candidates: List[Tuple[int, int, int, int, float]] = []
324
+ for cnt in contours:
325
+ x, y, bw, bh = cv2.boundingRect(cnt)
326
+ area = bw * bh
327
+ if area < 160 or area > 0.10 * roi.shape[0] * roi.shape[1]:
328
+ continue
329
+ aspect = bw / max(1.0, float(bh))
330
+ if not (0.6 <= aspect <= 1.6):
331
+ continue
332
+ patch = gray[y1 + y:y1 + y + bh, x1 + x:x1 + x + bw]
333
+ if patch.size == 0:
334
+ continue
335
+ gx = cv2.Sobel(patch, cv2.CV_32F, 1, 0, ksize=3)
336
+ gy = cv2.Sobel(patch, cv2.CV_32F, 0, 1, ksize=3)
337
+ score = float(patch.std() + 0.5 * min(np.abs(gx).mean(), np.abs(gy).mean()))
338
+ pad = int(max(bw, bh) * 0.35)
339
+ xx1 = max(0, x1 + x - pad)
340
+ yy1 = max(0, y1 + y - pad)
341
+ xx2 = min(w, x1 + x + bw + pad)
342
+ yy2 = min(h, y1 + y + bh + pad)
343
+ candidates.append((xx1, yy1, xx2, yy2, score))
344
 
345
+ candidates.sort(key=lambda t: t[4], reverse=True)
346
+ return candidates[:12]
347
 
 
 
 
 
 
348
 
349
+ # ---------- restoration ----------
 
 
350
 
351
+ def restore_code_crop(crop_bgr: np.ndarray) -> Tuple[np.ndarray, Optional[str], List[str]]:
352
+ notes: List[str] = []
353
+ best_img = crop_bgr
354
+ best_text: Optional[str] = try_decode_datamatrix(crop_bgr)
355
+ if best_text:
356
+ return crop_bgr, best_text, ["Decoded directly from raw crop."]
357
 
358
+ base = clahe_l_channel(crop_bgr)
359
+ best_img = base
360
+ scales = [2, 3, 4]
361
+ balances = [0.01, 0.02, 0.04]
362
+ lengths = [3, 5, 7, 9]
363
+ angles = [0, 45, 90, 135]
364
+
365
+ for scale in scales:
366
+ up = upscale(base, scale=scale)
367
+ gray_up = cv2.cvtColor(up, cv2.COLOR_BGR2GRAY)
368
+
369
+ # Simple sharpen / threshold paths.
370
+ variants = [
371
+ cv2.cvtColor(gray_up, cv2.COLOR_GRAY2BGR),
372
+ unsharp_mask(up, sigma=0.8, amount=1.0),
373
+ cv2.cvtColor(cv2.adaptiveThreshold(gray_up, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 31, 5), cv2.COLOR_GRAY2BGR),
374
+ cv2.cvtColor(cv2.threshold(gray_up, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)[1], cv2.COLOR_GRAY2BGR),
375
+ ]
376
+ for variant in variants:
377
+ text = try_decode_datamatrix(variant)
378
+ if text:
379
+ notes.append(f"Decoded after upscale x{scale} and simple enhancement.")
380
+ return variant, text, notes
381
+ best_img = variant
382
+
383
+ # Deconvolution sweep.
384
+ for angle in angles:
385
+ for length in lengths:
386
+ kernel = motion_kernel(length, angle)
387
  try:
388
+ rl = richardson_lucy_gray(gray_up, kernel, iterations=15)
389
+ rl_bgr = cv2.cvtColor(rl, cv2.COLOR_GRAY2BGR)
390
+ text = try_decode_datamatrix(rl_bgr)
391
+ if text:
392
+ notes.append(f"Decoded after Richardson-Lucy, scale={scale}, len={length}, angle={angle}.")
393
+ return rl_bgr, text, notes
394
+ best_img = rl_bgr
395
  except Exception:
396
  pass
397
+ for balance in balances:
398
+ try:
399
+ wd = wiener_deconv_gray(gray_up, kernel, balance=balance)
400
+ wd_bgr = cv2.cvtColor(wd, cv2.COLOR_GRAY2BGR)
401
+ text = try_decode_datamatrix(wd_bgr)
402
+ if text:
403
+ notes.append(f"Decoded after Wiener, scale={scale}, len={length}, angle={angle}, balance={balance}.")
404
+ return wd_bgr, text, notes
405
+ best_img = wd_bgr
406
+ except Exception:
407
+ pass
408
 
409
+ notes.append("No decode achieved; best restored candidate saved for manual inspection.")
410
+ return best_img, None, notes
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
411
 
412
 
413
+ # ---------- outputs ----------
 
 
 
 
 
 
 
 
 
414
 
415
+ def bgr_to_pil(img_bgr: np.ndarray) -> Image.Image:
416
+ return Image.fromarray(cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB))
417
 
 
 
418
 
419
+ def make_contact_sheet(images: List[Tuple[str, np.ndarray]], out_path: Path) -> Path:
420
+ pil_images = []
421
+ for title, img in images:
 
422
  pil = bgr_to_pil(img)
423
  pil.thumbnail((520, 520))
424
+ canvas = Image.new("RGB", (pil.width, pil.height + 34), (255, 255, 255))
425
+ canvas.paste(pil, (0, 34))
426
  draw = ImageDraw.Draw(canvas)
427
  draw.text((8, 8), title, fill=(0, 0, 0))
428
+ pil_images.append(canvas)
429
 
430
  cols = 2
431
+ rows = math.ceil(len(pil_images) / cols)
432
+ cell_w = max(im.width for im in pil_images)
433
+ cell_h = max(im.height for im in pil_images)
434
  sheet = Image.new("RGB", (cols * cell_w, rows * cell_h), (245, 245, 245))
435
+ for i, im in enumerate(pil_images):
436
  x = (i % cols) * cell_w
437
  y = (i // cols) * cell_h
438
+ sheet.paste(im, (x, y))
439
  sheet.save(out_path)
440
  return out_path
441
 
442
 
443
  def write_video(frames: List[np.ndarray], out_path: Path, fps: float) -> Path:
444
+ if not frames:
445
+ raise ValueError("No frames provided for video writing.")
446
+
447
+ h, w = frames[0].shape[:2]
448
+ fourcc = cv2.VideoWriter_fourcc(*"mp4v")
449
+ writer = cv2.VideoWriter(str(out_path), fourcc, float(max(1.0, fps)), (w, h))
450
+ if not writer.isOpened():
451
+ raise RuntimeError(f"Could not open video writer for {out_path}")
452
+
453
  try:
454
+ for frame in frames:
455
+ if frame.shape[:2] != (h, w):
456
+ frame = cv2.resize(frame, (w, h), interpolation=cv2.INTER_CUBIC)
457
+ if frame.dtype != np.uint8:
458
+ frame = np.clip(frame, 0, 255).astype(np.uint8)
459
+ writer.write(frame)
460
  finally:
461
+ writer.release()
462
  return out_path
463
 
464
 
465
+ # ---------- main pipeline ----------
466
 
467
  def process_video(
468
  video_input: Union[str, Dict, None],
 
470
  stride: int,
471
  max_frames: int,
472
  burst_radius: int,
473
+ ) -> Generator[Tuple[Optional[str], Optional[str], str], None, None]:
474
  logs: List[str] = []
475
 
476
+ def emit(msg: str) -> Tuple[Optional[str], Optional[str], str]:
477
  logs.append(msg)
478
+ return None, None, "\n".join(logs)
479
 
480
  try:
481
  video_path = resolve_video_path(video_input)
482
+ yield emit(f"Workspace: creating temporary workspace ...")
483
+ work = Path(tempfile.mkdtemp(prefix="motion_deblur_"))
484
+ logs[-1] = f"Workspace: {work}"
485
+ yield None, None, "\n".join(logs)
486
 
 
487
  info = get_video_info(video_path)
488
  yield emit("Input video info: " + json.dumps(info, indent=2))
489
 
490
+ raw_dir = ensure_dir(work / "frames_raw")
491
  yield emit("Starting frame extraction ...")
492
+ records = extract_frames(video_path, raw_dir, stride=max(1, stride), max_frames=max_frames)
493
  yield emit(f"Extracted {len(records)} frame(s).")
494
 
495
+ ref_pos = choose_reference_index(records)
496
  ref_record = records[ref_pos]
497
+ yield emit(f"Selected reference frame: index {ref_record.idx}.")
498
+
499
+ if mode == "Advanced stable":
500
+ yield emit("Starting global burst fusion ...")
501
+ fused_frame, used_indices = fuse_global_burst(records, ref_pos, radius=burst_radius)
502
+ yield emit(f"Global burst fusion completed using frames: {used_indices}")
503
+ else:
504
+ fused_frame = unsharp_mask(clahe_l_channel(load_frame(ref_record.path)), sigma=1.0, amount=1.0)
505
+ yield emit("Using single-frame enhancement mode.")
506
+
507
+ # Ruler crop
508
+ rx1, ry1, rx2, ry2 = ruler_bbox(fused_frame)
509
+ ruler_crop = fused_frame[ry1:ry2, rx1:rx2]
510
+ ruler_crop = unsharp_mask(clahe_l_channel(ruler_crop), sigma=0.8, amount=1.1)
511
+ yield emit("Ruler crop reconstructed.")
512
+
513
+ # Code candidate search on fused frame.
514
+ yield emit("Searching DataMatrix candidates ...")
515
+ candidates = detect_datamatrix_candidates(fused_frame)
516
  if not candidates:
517
+ code_crop = np.zeros((160, 160, 3), dtype=np.uint8)
518
+ decode_text = None
519
+ code_bbox = None
520
+ yield emit("No plausible DataMatrix candidate found.")
521
+ else:
522
+ best_candidate = candidates[0]
523
+ code_bbox = tuple(map(int, best_candidate[:4]))
524
+ yield emit(f"Top candidate bbox: {code_bbox}")
525
+ if mode == "Advanced stable":
526
+ yield emit("Starting local crop fusion for code region ...")
527
+ local_fused = fuse_local_crop(records, ref_pos, code_bbox, radius=max(4, burst_radius + 1))
528
+ yield emit("Local crop fusion completed.")
 
 
 
 
 
 
 
 
529
  else:
530
+ x1, y1, x2, y2 = code_bbox
531
+ local_fused = fused_frame[y1:y2, x1:x2]
532
+
533
+ yield emit("Running code restoration sweep ...")
534
+ code_crop, decode_text, notes = restore_code_crop(local_fused)
535
+ for note in notes:
536
+ yield emit(note)
537
+
538
+ # Review frame with overlays.
539
+ review = fused_frame.copy()
540
+ cv2.rectangle(review, (rx1, ry1), (rx2, ry2), (0, 255, 0), 2)
541
+ if code_bbox is not None:
542
+ x1, y1, x2, y2 = code_bbox
543
+ cv2.rectangle(review, (x1, y1), (x2, y2), (0, 165, 255), 2)
544
+
545
+ summary_path = work / "summary.png"
 
 
 
 
 
 
 
 
 
546
  make_contact_sheet(
547
  [
548
+ (f"Reference / fused frame #{ref_record.idx}", review),
549
+ ("Ruler crop", ruler_crop),
550
+ ("Best DataMatrix crop", code_crop),
551
+ ("Fused frame", fused_frame),
552
  ],
553
  summary_path,
554
  )
555
  yield emit(f"Summary image written: {summary_path}")
556
 
557
  yield emit("Writing enhanced review video ...")
558
+ enhanced_frames: List[np.ndarray] = []
559
+ for record in records:
560
+ frame = load_frame(record.path)
561
+ enhanced = unsharp_mask(clahe_l_channel(frame), sigma=0.9, amount=0.9)
562
+ enhanced_frames.append(enhanced)
563
+ out_video = work / "enhanced.mp4"
564
+ write_video(enhanced_frames, out_video, fps=max(1.0, info["fps"] / max(1, stride)))
565
  yield emit(f"Enhanced review video written: {out_video}")
566
 
567
+ if decode_text:
568
+ yield str(out_video), str(summary_path), "\n".join(logs + [f"Decoded text: {decode_text}"])
569
+ else:
570
+ yield str(out_video), str(summary_path), "\n".join(logs + ["Decoded text: none"])
 
 
 
571
 
572
+ except Exception as e:
573
+ logs.append(f"Error: {type(e).__name__}: {e}")
574
+ raise gr.Error("\n".join(logs))
575
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
576
 
577
+ DESCRIPTION = """
578
+ # Motion-deblur tool for handheld machine-tool inspection videos
579
+
580
+ This version stays on a stable, self-contained path:
581
+ - no external repo cloning
582
+ - no model downloads
583
+ - Python + OpenCV + local multi-frame fusion
584
+ - simple stage logging only
585
+
586
+ Recommended mode:
587
+ - **Advanced stable** for the best self-contained result
588
+ - **Baseline** if you want the simplest fallback
589
+ """
590
+
591
+
592
+ with gr.Blocks() as demo:
593
+ gr.Markdown(DESCRIPTION)
594
+ with gr.Row():
595
+ with gr.Column(scale=1):
596
+ video = gr.Video(label="Input video")
597
+ mode = gr.Dropdown(["Advanced stable", "Baseline"], value="Advanced stable", label="Mode")
598
+ stride = gr.Slider(1, 4, value=1, step=1, label="Frame stride")
599
+ max_frames = gr.Slider(0, 600, value=0, step=1, label="Max frames (0 = all extracted frames)")
600
+ burst_radius = gr.Slider(2, 10, value=6, step=1, label="Burst radius")
601
+ run_btn = gr.Button("Process", variant="primary")
602
+ with gr.Column(scale=1):
603
+ out_video = gr.Video(label="Enhanced video")
604
+ out_image = gr.Image(label="Summary")
605
+ out_log = gr.Textbox(label="Logs", lines=24)
606
+
607
+ run_btn.click(
608
+ fn=process_video,
609
+ inputs=[video, mode, stride, max_frames, burst_radius],
610
+ outputs=[out_video, out_image, out_log],
611
+ )
612
 
 
613
 
614
  if __name__ == "__main__":
615
  demo.launch(server_name="0.0.0.0", server_port=7860, ssr_mode=False)