| import tensorflow as tf |
| from tensorflow.keras import layers, models |
| import numpy as np |
|
|
| |
| class ConvGRU2DLayer(layers.Layer): |
| def __init__(self, filters, kernel_size, return_sequences=True, **kwargs): |
| super().__init__(**kwargs) |
| self.filters = filters |
| self.kernel_size = kernel_size |
| self.return_sequences = return_sequences |
|
|
| def build(self, input_shape): |
| self.input_projection = layers.Conv2D(self.filters, (1, 1), padding="same") |
| self.conv_z = layers.Conv2D(self.filters, self.kernel_size, padding="same", activation="sigmoid") |
| self.conv_r = layers.Conv2D(self.filters, self.kernel_size, padding="same", activation="sigmoid") |
| self.conv_h = layers.Conv2D(self.filters, self.kernel_size, padding="same", activation="tanh") |
| super().build(input_shape) |
|
|
| def call(self, inputs): |
| batch_size, time_steps, height, width, channels = tf.unstack(tf.shape(inputs)) |
| time_steps=inputs.shape[1] |
| h_t = tf.zeros((batch_size, height, width, self.filters)) |
| outputs = [] |
|
|
| for t in range(time_steps): |
| x_t = inputs[:, t, :, :, :] |
| x_projected = self.input_projection(x_t) |
| z = self.conv_z(x_projected)+self.conv_z(h_t) |
| r = self.conv_r(x_projected)+self.conv_z(h_t) |
| h_tilde = self.conv_h(r * h_t) |
| h_t = (1 - z) * h_t + z * h_tilde |
|
|
| if self.return_sequences: |
| outputs.append(h_t) |
|
|
| if self.return_sequences: |
| outputs = tf.stack(outputs, axis=1) |
| else: |
| outputs = h_t |
|
|
| return outputs |
|
|
| |
| def build_convgru_model(input_shape=(8, 95, 95, 2)): |
| input_tensor = layers.Input(shape=input_shape) |
| x = ConvGRU2DLayer(filters=32, kernel_size=(3, 3), return_sequences=True)(input_tensor) |
| x = layers.Conv3D(filters=32, kernel_size=(3, 3, 3), padding='same', activation='relu')(x) |
| x = layers.MaxPooling3D(pool_size=(2, 2, 2), strides=(4, 3, 3), padding='same')(x) |
| x = ConvGRU2DLayer(filters=64, kernel_size=(3, 3), return_sequences=True)(x) |
| x = layers.Conv3D(filters=64, kernel_size=(3, 3, 3), padding='same', activation='relu')(x) |
| x = layers.MaxPooling3D(pool_size=(2, 2, 2), strides=(4, 3, 3), padding='same')(x) |
| x = ConvGRU2DLayer(filters=128, kernel_size=(3, 3), return_sequences=True)(x) |
| x = layers.Conv3D(filters=128, kernel_size=(3, 3, 3), padding='same', activation='relu')(x) |
| x = layers.MaxPooling3D(pool_size=(2, 2, 2), strides=(2, 2, 2), padding='same')(x) |
| x = layers.Flatten()(x) |
| model = models.Model(inputs=input_tensor, outputs=x) |
| return model |
|
|
| def radial_structure_subnet(input_shape): |
| """ |
| Creates the subnet for extracting TC radial structure features using a five-branch CNN design with 2D convolutions. |
| |
| Parameters: |
| - input_shape: tuple, shape of the input data (e.g., (95, 95, 3)) |
| |
| Returns: |
| - model: tf.keras.Model, the radial structure subnet model |
| """ |
| |
| input_tensor = layers.Input(shape=input_shape) |
|
|
| |
| |
| |
| |
| nw_quadrant = input_tensor[:, :input_shape[0]//2, :input_shape[1]//2, :] |
| ne_quadrant = input_tensor[:, :input_shape[0]//2, input_shape[1]//2:, :] |
| sw_quadrant = input_tensor[:, input_shape[0]//2:, :input_shape[1]//2, :] |
| se_quadrant = input_tensor[:, input_shape[0]//2:, input_shape[1]//2:, :] |
|
|
|
|
| target_height = max(input_shape[0]//2, input_shape[0] - input_shape[0]//2) |
| target_width = max(input_shape[1]//2, input_shape[1] - input_shape[1]//2) |
| |
| |
| nw_quadrant = layers.ZeroPadding2D(padding=((0, target_height - nw_quadrant.shape[1]), |
| (0, target_width - nw_quadrant.shape[2])))(nw_quadrant) |
| ne_quadrant = layers.ZeroPadding2D(padding=((0, target_height - ne_quadrant.shape[1]), |
| (0, target_width - ne_quadrant.shape[2])))(ne_quadrant) |
| sw_quadrant = layers.ZeroPadding2D(padding=((0, target_height - sw_quadrant.shape[1]), |
| (0, target_width - sw_quadrant.shape[2])))(sw_quadrant) |
| se_quadrant = layers.ZeroPadding2D(padding=((0, target_height - se_quadrant.shape[1]), |
| (0, target_width - se_quadrant.shape[2])))(se_quadrant) |
|
|
| print(nw_quadrant.shape) |
| print(ne_quadrant.shape) |
| print(sw_quadrant.shape) |
| print(se_quadrant.shape) |
| |
| main_branch = layers.Conv2D(filters=8, kernel_size=(3, 3), padding='same', activation='relu')(input_tensor) |
| y=layers.MaxPool2D()(main_branch) |
|
|
| y = layers.ZeroPadding2D(padding=((0, target_height - y.shape[1]), |
| (0, target_width - y.shape[2])))(y) |
| |
| nw_branch = layers.Conv2D(filters=8, kernel_size=(3, 3), padding='same', activation='relu')(nw_quadrant) |
| ne_branch = layers.Conv2D(filters=8, kernel_size=(3, 3), padding='same', activation='relu')(ne_quadrant) |
| sw_branch = layers.Conv2D(filters=8, kernel_size=(3, 3), padding='same', activation='relu')(sw_quadrant) |
| se_branch = layers.Conv2D(filters=8, kernel_size=(3, 3), padding='same', activation='relu')(se_quadrant) |
| |
| |
| |
| |
| |
| |
| |
| |
| fusion = layers.concatenate([y, nw_branch, ne_branch, sw_branch, se_branch], axis=-1) |
| |
| |
| x = layers.Conv2D(filters=16, kernel_size=(3, 3), padding='same', activation='relu')(fusion) |
| x=layers.MaxPool2D(pool_size=(2, 2))(x) |
| |
| nw_branch = layers.Conv2D(filters=16, kernel_size=(3, 3), padding='same', activation='relu')(nw_branch) |
| |
| ne_branch = layers.Conv2D(filters=16, kernel_size=(3, 3), padding='same', activation='relu')(ne_branch) |
| sw_branch = layers.Conv2D(filters=16, kernel_size=(3, 3), padding='same', activation='relu')(sw_branch) |
| se_branch = layers.Conv2D(filters=16, kernel_size=(3, 3), padding='same', activation='relu')(se_branch) |
| nw_branch = layers.MaxPool2D(pool_size=(2, 2))(nw_branch) |
| ne_branch = layers.MaxPool2D(pool_size=(2, 2))(ne_branch) |
| sw_branch = layers.MaxPool2D(pool_size=(2, 2))(sw_branch) |
| se_branch = layers.MaxPool2D(pool_size=(2, 2))(se_branch) |
|
|
| fusion = layers.concatenate([x, nw_branch, ne_branch, sw_branch, se_branch], axis=-1) |
| x = layers.Conv2D(filters=32, kernel_size=(3, 3), padding='same', activation='relu')(fusion) |
| x=layers.MaxPool2D(pool_size=(2, 2))(x) |
| |
| nw_branch = layers.Conv2D(filters=32, kernel_size=(3, 3), padding='same', activation='relu')(nw_branch) |
| |
| ne_branch = layers.Conv2D(filters=32, kernel_size=(3, 3), padding='same', activation='relu')(ne_branch) |
| sw_branch = layers.Conv2D(filters=32, kernel_size=(3, 3), padding='same', activation='relu')(sw_branch) |
| se_branch = layers.Conv2D(filters=32, kernel_size=(3, 3), padding='same', activation='relu')(se_branch) |
| nw_branch = layers.MaxPool2D(pool_size=(2, 2))(nw_branch) |
| ne_branch = layers.MaxPool2D(pool_size=(2, 2))(ne_branch) |
| sw_branch = layers.MaxPool2D(pool_size=(2, 2))(sw_branch) |
| se_branch = layers.MaxPool2D(pool_size=(2, 2))(se_branch) |
|
|
| fusion = layers.concatenate([x, nw_branch, ne_branch, sw_branch, se_branch], axis=-1) |
| x = layers.Conv2D(filters=32, kernel_size=(3, 3), activation='relu')(fusion) |
| x=layers.Conv2D(filters=32, kernel_size=(3, 3), activation=None)(x) |
| |
| x=layers.Flatten()(x) |
| model = models.Model(inputs=input_tensor, outputs=x) |
| return model |
|
|
| |
| |
|
|
| |
| |
|
|
| |
| |
|
|
| def build_cnn_model(input_shape=(8, 8, 1)): |
| |
| input_tensor = layers.Input(shape=input_shape) |
| |
| |
| x = layers.Conv2D(64, (3, 3), padding='same')(input_tensor) |
| x = layers.BatchNormalization()(x) |
| x = layers.ReLU()(x) |
| |
| |
| x = layers.Flatten()(x) |
| |
| |
| model = models.Model(inputs=input_tensor, outputs=x) |
| |
| return model |
|
|
| from tensorflow.keras import layers, models, Input |
|
|
| def build_combined_model(): |
| |
| input_shape_3d = (8, 95, 95, 2) |
| input_shape_radial = (95, 95, 8) |
| input_shape_cnn = (8, 8, 1) |
| |
| input_shape_latitude = (8,) |
| input_shape_longitude = (8,) |
| input_shape_other = (9,) |
|
|
| |
| model_3d = build_convgru_model(input_shape=input_shape_3d) |
| model_radial = radial_structure_subnet(input_shape=input_shape_radial) |
| model_cnn = build_cnn_model(input_shape=input_shape_cnn) |
|
|
| |
| input_latitude = Input(shape=input_shape_latitude ,name="latitude_input") |
| input_longitude = Input(shape=input_shape_longitude, name="longitude_input") |
| input_other = Input(shape=input_shape_other, name="other_input") |
|
|
| |
| flat_latitude = layers.Dense(32,activation='relu')(input_latitude) |
| flat_longitude = layers.Dense(32,activation='relu')(input_longitude) |
| flat_other = layers.Dense(64,activation='relu')(input_other) |
|
|
| |
| combined = layers.concatenate([ |
| model_3d.output, |
| model_radial.output, |
| model_cnn.output, |
| flat_latitude, |
| flat_longitude, |
| flat_other |
| ]) |
|
|
| |
| x = layers.Dense(128, activation='relu')(combined) |
| x = layers.Dense(1, activation=None)(x) |
|
|
| |
| final_model = models.Model( |
| inputs=[model_3d.input, model_radial.input, model_cnn.input, |
| input_latitude, input_longitude, input_other ], |
| outputs=x |
| ) |
|
|
| return final_model |
|
|
| import h5py |
| with h5py.File(r"convgru-model.h5", 'r') as f: |
| print(f.attrs.get('keras_version')) |
| print(f.attrs.get('backend')) |
| print("Model layers:", list(f['model_weights'].keys())) |
|
|
| model = build_combined_model() |
| model.load_weights(r"convgru-model.h5") |
|
|
|
|
| def predict(reduced_images_test,hov_m_test,test_vmax_3d,lat_test,lon_test,int_diff_test): |
| y=model.predict([reduced_images_test,hov_m_test,test_vmax_3d,lat_test,lon_test,int_diff_test ]) |
| return y |