| import streamlit as st
|
| import numpy as np
|
| from PIL import Image
|
| import cv2
|
| from scipy.ndimage import gaussian_filter
|
|
|
|
|
|
|
| def find_tc_center(ir_image, smoothing_sigma=3):
|
| smoothed_image = gaussian_filter(ir_image, sigma=smoothing_sigma)
|
| min_coords = np.unravel_index(np.argmin(smoothed_image), smoothed_image.shape)
|
| return min_coords[::-1]
|
|
|
| def extract_local_region(ir_image, center, region_size=95):
|
| h, w = ir_image.shape
|
| half_size = region_size // 2
|
| x_min = max(center[0] - half_size, 0)
|
| x_max = min(center[0] + half_size, w)
|
| y_min = max(center[1] - half_size, 0)
|
| y_max = min(center[1] + half_size, h)
|
| region = np.full((region_size, region_size), np.nan)
|
| extracted = ir_image[y_min:y_max, x_min:x_max]
|
| region[:extracted.shape[0], :extracted.shape[1]] = extracted
|
| return region
|
|
|
| def generate_hovmoller(X_data):
|
| hovmoller_list = []
|
| for ir_images in X_data:
|
| time_steps = ir_images.shape[0]
|
| hovmoller_data = np.zeros((time_steps, 95, 95))
|
| for t in range(time_steps):
|
| tc_center = find_tc_center(ir_images[t])
|
| hovmoller_data[t] = extract_local_region(ir_images[t], tc_center, 95)
|
| hovmoller_list.append(hovmoller_data)
|
| return np.array(hovmoller_list)
|
|
|
| def reshape_vmax(vmax_values, chunk_size=8):
|
| trimmed_size = (len(vmax_values) // chunk_size) * chunk_size
|
| vmax_values_trimmed = vmax_values[:trimmed_size]
|
| return vmax_values_trimmed.reshape(-1, chunk_size)
|
| def create_3d_vmax(vmax_2d_array):
|
|
|
| vmax_3d_array = np.zeros((vmax_2d_array.shape[0], 8, 8))
|
|
|
|
|
| for i in range(vmax_2d_array.shape[0]):
|
| np.fill_diagonal(vmax_3d_array[i], vmax_2d_array[i])
|
|
|
|
|
| vmax_3d_array = vmax_3d_array.reshape(-1, 8, 8, 1)
|
|
|
|
|
| return vmax_3d_array
|
|
|
| def process_lat_values(data):
|
| lat_values = data
|
|
|
|
|
| trimmed_size = (len(lat_values) // 8) * 8
|
| lat_values_trimmed = lat_values[:trimmed_size]
|
| lat_values_trimmed=np.array(lat_values_trimmed)
|
|
|
| lat_2d_array = lat_values_trimmed.reshape(-1, 8)
|
|
|
| return lat_2d_array
|
|
|
| def process_lon_values(data):
|
| lon_values =data
|
| lon_values = np.array(lon_values)
|
|
|
| trimmed_size = (len(lon_values) // 8) * 8
|
| lon_values_trimmed = lon_values[:trimmed_size]
|
|
|
|
|
| lon_2d_array = lon_values_trimmed.reshape(-1, 8)
|
|
|
| return lon_2d_array
|
|
|
| import numpy as np
|
|
|
| def calculate_intensity_difference(vmax_2d_array):
|
| """Calculates intensity difference for each row in Vmax 2D array."""
|
| int_diff = []
|
|
|
| for i in vmax_2d_array:
|
| k = abs(i[0] - i[-1])
|
| i = np.append(i, k)
|
| int_diff.append(i)
|
|
|
| return np.array(int_diff)
|
|
|
| import numpy as np
|
|
|
|
|
| def process_images(images, batch_size=8, img_size=(95, 95, 1)):
|
| num_images = images.shape[0]
|
|
|
|
|
| trimmed_size = (num_images // batch_size) * batch_size
|
| images_trimmed = images[:trimmed_size]
|
|
|
|
|
| images_reshaped = images_trimmed.reshape(-1, batch_size, *img_size)
|
|
|
| return images_reshaped
|
|
|
| import numpy as np
|
|
|
| def process_cc_mask(cc_data):
|
| """Processes CC mask images by trimming and reshaping into (x, 8, 95, 95, 1)."""
|
| num_images = cc_data.shape[0]
|
| batch_size = 8
|
| trimmed_size = (num_images // batch_size) * batch_size
|
|
|
| images_trimmed = cc_data[:trimmed_size]
|
| cc_images = images_trimmed.reshape(-1, batch_size, 95, 95, 1)
|
|
|
| return cc_images
|
| def extract_convective_cores(ir_data):
|
| """
|
| Extract Convective Cores (CCs) from IR imagery based on the criteria in the paper.
|
| Args:
|
| ir_data: IR imagery of shape (height, width).
|
| Returns:
|
| cc_mask: Binary mask of CCs (1 for CC, 0 otherwise) of shape (height, width).
|
| """
|
| height, width,c = ir_data.shape
|
| cc_mask = np.zeros_like(ir_data, dtype=np.float32)
|
|
|
|
|
| neighbors = [(-1, -1), (-1, 0), (-1, 1),
|
| (0, -1), (0,0) , (0, 1),
|
| (1, -1), (1, 0), (1, 1)]
|
|
|
| for i in range(1, height - 1):
|
| for j in range(1, width - 1):
|
| bt_ij = ir_data[i, j]
|
|
|
|
|
| if (bt_ij >= 253).any():
|
| continue
|
|
|
|
|
| is_local_min = True
|
| for di, dj in neighbors:
|
| if ir_data[i + di, j + dj] < bt_ij:
|
| is_local_min = False
|
| break
|
| if not is_local_min:
|
| continue
|
|
|
|
|
| numerator1 = (ir_data[i - 1, j] + ir_data[i + 1, j] - 2 * bt_ij) / 3.1
|
| numerator2 = (ir_data[i, j - 1] + ir_data[i, j + 1] - 2 * bt_ij) / 8.0
|
| lhs = numerator1 + numerator2
|
| rhs = (4 / 5.8) * np.exp(0.0826 * (bt_ij - 217))
|
|
|
| if lhs > rhs:
|
| cc_mask[i, j] = 1
|
|
|
| return cc_mask
|
|
|
| def compute_convective_core_masks(ir_data):
|
| """Extracts convective core masks for each IR image."""
|
| cc_mask = []
|
|
|
| for i in ir_data:
|
| c = extract_convective_cores(i)
|
| c = np.array(c)
|
| cc_mask.append(c)
|
|
|
| return np.array(cc_mask)
|
|
|
|
|
|
|
| st.set_page_config(page_title="TCIR Daily Input", layout="wide")
|
|
|
| st.title("Tropical Cyclone Input Uploader (8 sets/day)")
|
|
|
| ir_images = st.file_uploader("Upload 8 IR images", type=["jpg", "jpeg", "png"], accept_multiple_files=True)
|
| pmw_images = st.file_uploader("Upload 8 PMW images", type=["jpg", "jpeg", "png"], accept_multiple_files=True)
|
|
|
| if len(ir_images) != 8 or len(pmw_images) != 8:
|
| st.warning("Please upload exactly 8 IR and 8 PMW images.")
|
| else:
|
| st.success("Uploaded 8 IR and 8 PMW images successfully.")
|
|
|
| st.header("Input Latitude, Longitude, Vmax")
|
| lat_values, lon_values, vmax_values = [], [], []
|
|
|
| col1, col2, col3 = st.columns(3)
|
| with col1:
|
| for i in range(8):
|
| lat_values.append(st.number_input(f"Latitude {i+1}", key=f"lat{i}"))
|
| with col2:
|
| for i in range(8):
|
| lon_values.append(st.number_input(f"Longitude {i+1}", key=f"lon{i}"))
|
| with col3:
|
| for i in range(8):
|
| vmax_values.append(st.number_input(f"Vmax {i+1}", key=f"vmax{i}"))
|
| st.header("Select Prediction Model")
|
| model_choice = st.selectbox(
|
| "Choose a model for prediction",
|
| ("ConvGRU", "ConvLSTM", "Traj-GRU","3DCNN","spatiotemporalLSTM","Unet_LSTM"),
|
| index=0
|
| )
|
|
|
| if st.button("Submit for Processing"):
|
|
|
| if len(ir_images) == 8 and len(pmw_images) == 8:
|
|
|
| if model_choice == "Unet_LSTM":
|
| from unetlstm import predict_unetlstm
|
| model_predict_fn = predict_unetlstm
|
| elif model_choice == "ConvGRU":
|
| from gru_model import predict
|
| model_predict_fn = predict
|
| elif model_choice == "ConvLSTM":
|
| from convlstm import predict_lstm
|
| model_predict_fn = predict_lstm
|
| elif model_choice == "3DCNN":
|
| from cnn3d import predict_3dcnn
|
| model_predict_fn = predict_3dcnn
|
| elif model_choice == "Traj-GRU":
|
| from trjgru import predict_trajgru
|
| model_predict_fn = predict_trajgru
|
| elif model_choice == "spatiotemporalLSTM":
|
| from spaio_temp import predict_stlstm
|
| model_predict_fn = predict_stlstm
|
|
|
|
|
|
|
|
|
|
|
|
|
| ir_arrays = []
|
| pmw_arrays = []
|
| train_vmax_2d = reshape_vmax(np.array(vmax_values))
|
|
|
| train_vmax_3d= create_3d_vmax(train_vmax_2d)
|
|
|
| lat_processed = process_lat_values(lat_values)
|
| lon_processed = process_lon_values(lon_values)
|
|
|
|
|
| v_max_diff = calculate_intensity_difference(train_vmax_2d)
|
|
|
| for ir in ir_images:
|
| img = Image.open(ir).convert("L")
|
| arr = np.array(img).astype(np.float32)
|
| bt_arr = (arr / 255.0) * (310 - 190) + 190
|
| resized = cv2.resize(bt_arr, (95, 95), interpolation=cv2.INTER_CUBIC)
|
| ir_arrays.append(resized)
|
|
|
| for pmw in pmw_images:
|
| img = Image.open(pmw).convert("L")
|
| arr = np.array(img).astype(np.float32) / 255.0
|
| resized = cv2.resize(arr, (95, 95), interpolation=cv2.INTER_CUBIC)
|
| pmw_arrays.append(resized)
|
| ir=np.array(ir_arrays)
|
| pmw=np.array(pmw_arrays)
|
|
|
| ir_seq = process_images(ir)
|
| pmw_seq = process_images(pmw)
|
|
|
|
|
|
|
|
|
|
|
| X_train_new = ir_seq.reshape((1, 8, 95, 95))
|
|
|
|
|
| cc_mask= compute_convective_core_masks(X_train_new)
|
|
|
| hov_m_train = generate_hovmoller(X_train_new)
|
|
|
|
|
| hov_m_train[np.isnan(hov_m_train)] = 0
|
| hov_m_train = hov_m_train.transpose(0, 2, 3, 1)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| cc_mask[np.isnan(cc_mask)] = 0
|
| cc_mask=cc_mask.reshape(1, 8, 95, 95, 1)
|
| i_images=cc_mask+ir_seq
|
| reduced_images = np.concatenate([i_images,pmw_seq ], axis=-1)
|
| reduced_images[np.isnan(reduced_images)] = 0
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| if model_choice == "Unet_LSTM":
|
| import tensorflow as tf
|
|
|
| def tf_gradient_magnitude(images):
|
|
|
| sobel_x = tf.constant([[1, 0, -1], [2, 0, -2], [1, 0, -1]], dtype=tf.float32)
|
| sobel_y = tf.constant([[1, 2, 1], [0, 0, 0], [-1, -2, -1]], dtype=tf.float32)
|
| sobel_x = tf.reshape(sobel_x, [3, 3, 1, 1])
|
| sobel_y = tf.reshape(sobel_y, [3, 3, 1, 1])
|
|
|
| images = tf.convert_to_tensor(images, dtype=tf.float32)
|
| images = tf.expand_dims(images, -1)
|
|
|
| gx = tf.nn.conv2d(images, sobel_x, strides=1, padding='SAME')
|
| gy = tf.nn.conv2d(images, sobel_y, strides=1, padding='SAME')
|
| grad_mag = tf.sqrt(tf.square(gx) + tf.square(gy) + 1e-6)
|
|
|
| return tf.squeeze(grad_mag, -1).numpy()
|
| def GM_maps_prep(ir):
|
| GM_maps=[]
|
| for i in ir:
|
| GM_map = tf_gradient_magnitude(i)
|
| GM_maps.append(GM_map)
|
| GM_maps=np.array(GM_maps)
|
| return GM_maps
|
| ir_seq=ir_seq.reshape(8, 95, 95, 1)
|
| GM_maps = GM_maps_prep(ir_seq)
|
| print(GM_maps.shape)
|
| GM_maps=GM_maps.reshape(1, 8, 95, 95, 1)
|
| i_images=cc_mask+ir_seq+GM_maps
|
| reduced_images = np.concatenate([i_images,pmw_seq ], axis=-1)
|
| reduced_images[np.isnan(reduced_images)] = 0
|
| print(reduced_images.shape)
|
| y = model_predict_fn(reduced_images, hov_m_train, train_vmax_3d, lat_processed, lon_processed, v_max_diff)
|
| else:
|
| y = model_predict_fn(reduced_images, hov_m_train, train_vmax_3d, lat_processed, lon_processed, v_max_diff)
|
| st.write("Predicted Vmax:", y)
|
| else:
|
| st.error("Make sure you uploaded exactly 8 IR and 8 PMW images.")
|
|
|