| import streamlit as st |
| import numpy as np |
| from PIL import Image |
| import cv2 |
| from scipy.ndimage import gaussian_filter |
|
|
| |
|
|
| def find_tc_center(ir_image, smoothing_sigma=3): |
| smoothed_image = gaussian_filter(ir_image, sigma=smoothing_sigma) |
| min_coords = np.unravel_index(np.argmin(smoothed_image), smoothed_image.shape) |
| return min_coords[::-1] |
|
|
| def extract_local_region(ir_image, center, region_size=95): |
| h, w = ir_image.shape |
| half_size = region_size // 2 |
| x_min = max(center[0] - half_size, 0) |
| x_max = min(center[0] + half_size, w) |
| y_min = max(center[1] - half_size, 0) |
| y_max = min(center[1] + half_size, h) |
| region = np.full((region_size, region_size), np.nan) |
| extracted = ir_image[y_min:y_max, x_min:x_max] |
| region[:extracted.shape[0], :extracted.shape[1]] = extracted |
| return region |
|
|
| def generate_hovmoller(X_data): |
| hovmoller_list = [] |
| for ir_images in X_data: |
| time_steps = ir_images.shape[0] |
| hovmoller_data = np.zeros((time_steps, 95, 95)) |
| for t in range(time_steps): |
| tc_center = find_tc_center(ir_images[t]) |
| hovmoller_data[t] = extract_local_region(ir_images[t], tc_center, 95) |
| hovmoller_list.append(hovmoller_data) |
| return np.array(hovmoller_list) |
|
|
| def reshape_vmax(vmax_values, chunk_size=8): |
| trimmed_size = (len(vmax_values) // chunk_size) * chunk_size |
| vmax_values_trimmed = vmax_values[:trimmed_size] |
| return vmax_values_trimmed.reshape(-1, chunk_size) |
| def create_3d_vmax(vmax_2d_array): |
| |
| vmax_3d_array = np.zeros((vmax_2d_array.shape[0], 8, 8)) |
|
|
| |
| for i in range(vmax_2d_array.shape[0]): |
| np.fill_diagonal(vmax_3d_array[i], vmax_2d_array[i]) |
|
|
| |
| vmax_3d_array = vmax_3d_array.reshape(-1, 8, 8, 1) |
| |
|
|
| return vmax_3d_array |
|
|
| def process_lat_values(data): |
| lat_values = data |
|
|
| |
| trimmed_size = (len(lat_values) // 8) * 8 |
| lat_values_trimmed = lat_values[:trimmed_size] |
| lat_values_trimmed=np.array(lat_values_trimmed) |
| |
| lat_2d_array = lat_values_trimmed.reshape(-1, 8) |
|
|
| return lat_2d_array |
|
|
| def process_lon_values(data): |
| lon_values =data |
| lon_values = np.array(lon_values) |
| |
| trimmed_size = (len(lon_values) // 8) * 8 |
| lon_values_trimmed = lon_values[:trimmed_size] |
|
|
| |
| lon_2d_array = lon_values_trimmed.reshape(-1, 8) |
|
|
| return lon_2d_array |
|
|
| import numpy as np |
|
|
| def calculate_intensity_difference(vmax_2d_array): |
| """Calculates intensity difference for each row in Vmax 2D array.""" |
| int_diff = [] |
| |
| for i in vmax_2d_array: |
| k = abs(i[0] - i[-1]) |
| i = np.append(i, k) |
| int_diff.append(i) |
| |
| return np.array(int_diff) |
|
|
| import numpy as np |
|
|
| |
| def process_images(images, batch_size=8, img_size=(95, 95, 1)): |
| num_images = images.shape[0] |
| |
| |
| trimmed_size = (num_images // batch_size) * batch_size |
| images_trimmed = images[:trimmed_size] |
|
|
| |
| images_reshaped = images_trimmed.reshape(-1, batch_size, *img_size) |
|
|
| return images_reshaped |
|
|
| import numpy as np |
|
|
| def process_cc_mask(cc_data): |
| """Processes CC mask images by trimming and reshaping into (x, 8, 95, 95, 1).""" |
| num_images = cc_data.shape[0] |
| batch_size = 8 |
| trimmed_size = (num_images // batch_size) * batch_size |
|
|
| images_trimmed = cc_data[:trimmed_size] |
| cc_images = images_trimmed.reshape(-1, batch_size, 95, 95, 1) |
|
|
| return cc_images |
| def extract_convective_cores(ir_data): |
| """ |
| Extract Convective Cores (CCs) from IR imagery based on the criteria in the paper. |
| Args: |
| ir_data: IR imagery of shape (height, width). |
| Returns: |
| cc_mask: Binary mask of CCs (1 for CC, 0 otherwise) of shape (height, width). |
| """ |
| height, width,c = ir_data.shape |
| cc_mask = np.zeros_like(ir_data, dtype=np.float32) |
|
|
| |
| neighbors = [(-1, -1), (-1, 0), (-1, 1), |
| (0, -1), (0,0) , (0, 1), |
| (1, -1), (1, 0), (1, 1)] |
|
|
| for i in range(1, height - 1): |
| for j in range(1, width - 1): |
| bt_ij = ir_data[i, j] |
|
|
| |
| if (bt_ij >= 253).any(): |
| continue |
|
|
| |
| is_local_min = True |
| for di, dj in neighbors: |
| if ir_data[i + di, j + dj] < bt_ij: |
| is_local_min = False |
| break |
| if not is_local_min: |
| continue |
|
|
| |
| numerator1 = (ir_data[i - 1, j] + ir_data[i + 1, j] - 2 * bt_ij) / 3.1 |
| numerator2 = (ir_data[i, j - 1] + ir_data[i, j + 1] - 2 * bt_ij) / 8.0 |
| lhs = numerator1 + numerator2 |
| rhs = (4 / 5.8) * np.exp(0.0826 * (bt_ij - 217)) |
|
|
| if lhs > rhs: |
| cc_mask[i, j] = 1 |
|
|
| return cc_mask |
|
|
| def compute_convective_core_masks(ir_data): |
| """Extracts convective core masks for each IR image.""" |
| cc_mask = [] |
| |
| for i in ir_data: |
| c = extract_convective_cores(i) |
| c = np.array(c) |
| cc_mask.append(c) |
| |
| return np.array(cc_mask) |
|
|
|
|
| |
| st.set_page_config(page_title="TCIR Daily Input", layout="wide") |
|
|
| st.title("Tropical Cyclone U-Net Wind Speed (Intensity) Predictor") |
|
|
| ir_images = st.file_uploader("Upload 8 IR images", type=["jpg", "jpeg", "png"], accept_multiple_files=True) |
| pmw_images = st.file_uploader("Upload 8 PMW images", type=["jpg", "jpeg", "png"], accept_multiple_files=True) |
|
|
| if len(ir_images) != 8 or len(pmw_images) != 8: |
| st.warning("Please upload exactly 8 IR and 8 PMW images.") |
| else: |
| st.success("Uploaded 8 IR and 8 PMW images successfully.") |
|
|
| st.header("Input Latitude, Longitude, Vmax") |
| lat_values, lon_values, vmax_values = [], [], [] |
|
|
| import pandas as pd |
|
|
| import numpy as np |
|
|
| |
| csv_file = st.file_uploader("Upload CSV file", type=["csv"]) |
|
|
| if csv_file is not None: |
| try: |
| df = pd.read_csv(csv_file) |
| required_columns = {'Latitude', 'Longitude', 'Vmax'} |
|
|
| if required_columns.issubset(df.columns): |
| lat_values = df['Latitude'].values |
| lon_values = df['Longitude'].values |
| vmax_values = df['Vmax'].values |
|
|
| lat_values = np.array(lat_values) |
| lon_values = np.array(lon_values) |
| vmax_values = np.array(vmax_values) |
|
|
| st.success("CSV file loaded and processed successfully!") |
| st.write(df.head()) |
|
|
| else: |
| st.error("CSV file must contain 'Latitude', 'Longitude', and 'Vmax' columns.") |
| except Exception as e: |
| st.error(f"Error reading CSV: {e}") |
| else: |
| st.warning("Please upload a CSV file.") |
| st.header("Select Prediction Model") |
| model_choice = st.selectbox( |
| "Choose a model for prediction", |
| ("ConvGRU", "ConvLSTM", "Traj-GRU","3DCNN","spatiotemporalLSTM","Unet_LSTM"), |
| index=0 |
| ) |
| |
| if st.button("Submit for Processing"): |
|
|
| if len(ir_images) == 8 and len(pmw_images) == 8: |
| |
| if model_choice == "Unet_LSTM": |
| from unetlstm import predict_unetlstm |
| model_predict_fn = predict_unetlstm |
| elif model_choice == "ConvGRU": |
| from gru_model import predict |
| model_predict_fn = predict |
| elif model_choice == "ConvLSTM": |
| from convlstm import predict_lstm |
| model_predict_fn = predict_lstm |
| elif model_choice == "3DCNN": |
| from cnn3d import predict_3dcnn |
| model_predict_fn = predict_3dcnn |
| elif model_choice == "Traj-GRU": |
| from trjgru import predict_trajgru |
| model_predict_fn = predict_trajgru |
| elif model_choice == "spatiotemporalLSTM": |
| from spaio_temp import predict_stlstm |
| model_predict_fn = predict_stlstm |
|
|
| ir_arrays = [] |
| pmw_arrays = [] |
| train_vmax_2d = reshape_vmax(np.array(vmax_values)) |
|
|
| train_vmax_3d= create_3d_vmax(train_vmax_2d) |
|
|
| lat_processed = process_lat_values(lat_values) |
| lon_processed = process_lon_values(lon_values) |
|
|
| v_max_diff = calculate_intensity_difference(train_vmax_2d) |
|
|
| for ir in ir_images: |
| img = Image.open(ir).convert("L") |
| arr = np.array(img).astype(np.float32) |
| bt_arr = (arr / 255.0) * (310 - 190) + 190 |
| resized = cv2.resize(bt_arr, (95, 95), interpolation=cv2.INTER_CUBIC) |
| ir_arrays.append(resized) |
|
|
| for pmw in pmw_images: |
| img = Image.open(pmw).convert("L") |
| arr = np.array(img).astype(np.float32) / 255.0 |
| resized = cv2.resize(arr, (95, 95), interpolation=cv2.INTER_CUBIC) |
| pmw_arrays.append(resized) |
| ir=np.array(ir_arrays) |
| pmw=np.array(pmw_arrays) |
| |
| |
| ir_seq = process_images(ir) |
| pmw_seq = process_images(pmw) |
|
|
|
|
| |
| X_train_new = ir_seq.reshape((1, 8, 95, 95)) |
| |
| cc_mask= compute_convective_core_masks(X_train_new) |
| hov_m_train = generate_hovmoller(X_train_new) |
| hov_m_train[np.isnan(hov_m_train)] = 0 |
| hov_m_train = hov_m_train.transpose(0, 2, 3, 1) |
|
|
| cc_mask[np.isnan(cc_mask)] = 0 |
| cc_mask=cc_mask.reshape(1, 8, 95, 95, 1) |
| i_images=cc_mask+ir_seq |
| reduced_images = np.concatenate([i_images,pmw_seq ], axis=-1) |
| reduced_images[np.isnan(reduced_images)] = 0 |
|
|
| if model_choice == "Unet_LSTM": |
| import tensorflow as tf |
|
|
| def tf_gradient_magnitude(images): |
| |
| sobel_x = tf.constant([[1, 0, -1], [2, 0, -2], [1, 0, -1]], dtype=tf.float32) |
| sobel_y = tf.constant([[1, 2, 1], [0, 0, 0], [-1, -2, -1]], dtype=tf.float32) |
| sobel_x = tf.reshape(sobel_x, [3, 3, 1, 1]) |
| sobel_y = tf.reshape(sobel_y, [3, 3, 1, 1]) |
|
|
| images = tf.convert_to_tensor(images, dtype=tf.float32) |
| images = tf.expand_dims(images, -1) |
|
|
| gx = tf.nn.conv2d(images, sobel_x, strides=1, padding='SAME') |
| gy = tf.nn.conv2d(images, sobel_y, strides=1, padding='SAME') |
| grad_mag = tf.sqrt(tf.square(gx) + tf.square(gy) + 1e-6) |
|
|
| return tf.squeeze(grad_mag, -1).numpy() |
| def GM_maps_prep(ir): |
| GM_maps=[] |
| for i in ir: |
| GM_map = tf_gradient_magnitude(i) |
| GM_maps.append(GM_map) |
| GM_maps=np.array(GM_maps) |
| return GM_maps |
| ir_seq=ir_seq.reshape(8, 95, 95, 1) |
| GM_maps = GM_maps_prep(ir_seq) |
| print(GM_maps.shape) |
| GM_maps=GM_maps.reshape(1, 8, 95, 95, 1) |
| i_images=cc_mask+ir_seq+GM_maps |
| reduced_images = np.concatenate([i_images,pmw_seq ], axis=-1) |
| reduced_images[np.isnan(reduced_images)] = 0 |
| print(reduced_images.shape) |
| y = model_predict_fn(reduced_images, hov_m_train, train_vmax_3d, lat_processed, lon_processed, v_max_diff) |
| else: |
| y = model_predict_fn(reduced_images, hov_m_train, train_vmax_3d, lat_processed, lon_processed, v_max_diff) |
| st.write("Predicted Vmax:", y) |
| else: |
| st.error("Make sure you uploaded exactly 8 IR and 8 PMW images.") |
|
|