Source code for seisnn.model.attention

from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import Activation, add, BatchNormalization, \
    Bidirectional, concatenate, Conv1D, Dense, Dropout, Input, Layer, \
    LayerNormalization, LSTM, MaxPooling1D, UpSampling1D
import tensorflow as tf
import numpy as np


[docs]def transformer(img_rows=None, img_cols=None, color_type=3, num_class=3): """ EQtransformer_model :param img_rows: Height of the data. :param img_cols: Width of the data. :param color_type: Channel number of the data. :param num_class: Output class number. :return: EQtransformer model. """ inputs = Input(shape=(img_rows, img_cols, color_type)) new_dim = tf.squeeze(inputs, axis=1, name=None) ff_dim = 64 num_head = 1 conv1 = Conv1D(8, 11, activation='relu', padding='same')(new_dim) pool1 = MaxPooling1D(2)(conv1) conv2 = Conv1D(16, 9, activation='relu', padding='same')(pool1) pool2 = MaxPooling1D(2)(conv2) conv3 = Conv1D(16, 7, activation='relu', padding='same')(pool2) pool3 = MaxPooling1D(2)(conv3) conv4 = Conv1D(32, 5, activation='relu', padding='same')(pool3) pool4 = MaxPooling1D(2)(conv4) conv5 = Conv1D(64, 5, activation='relu', padding='same')(pool4) pool5 = MaxPooling1D(2)(conv5) conv6 = Conv1D(64, 3, activation='relu', padding='same')(pool5) pool6 = MaxPooling1D(2)(conv6) resCNN = ResNet_build(64, 3) res1 = resCNN(pool6) bilstm1 = Bidirectional(LSTM(64, return_sequences=True))(res1) bilstm1 = Dropout(0.1)(bilstm1) bilstm1 = Conv1D(64, 1, activation='relu', padding='same')(bilstm1) bilstm1 = LayerNormalization(epsilon=1e-6)(bilstm1) bilstm1 = Activation('relu')(bilstm1) bilstm2 = Bidirectional(LSTM(64, return_sequences=True))(bilstm1) bilstm2 = Dropout(0.1)(bilstm2) bilstm2 = Conv1D(64, 1, activation='relu', padding='same')(bilstm2) bilstm2 = LayerNormalization(epsilon=1e-6)(bilstm2) bilstm2 = Activation('relu')(bilstm2) lstm1 = LSTM(64, return_sequences=True)(bilstm2) transformer_block1 = TransformerBlockE(64, num_head, ff_dim) transE = transformer_block1(lstm1) transformer_block2 = TransformerBlockE(64, num_head, ff_dim) transE = transformer_block2(transE) up1 = UpSampling1D(size=2)(transE) conv7 = Conv1D(96, 3, activation='relu', padding='same')(up1) up2 = UpSampling1D(size=2)(conv7) conv8 = Conv1D(96, 5, activation='relu', padding='same')(up2) up3 = UpSampling1D(size=2)(conv8) conv9 = Conv1D(32, 5, activation='relu', padding='same')(up3) up4 = UpSampling1D(size=2)(conv9) conv10 = Conv1D(16, 7, activation='relu', padding='same')(up4) up5 = UpSampling1D(size=2)(conv10) conv11 = Conv1D(16, 9, activation='relu', padding='same')(up5) up6 = UpSampling1D(size=2)(conv11) conv12 = Conv1D(8, 11, activation='relu', padding='same')(up6) conv13 = Conv1D(1, 1, activation='sigmoid', padding='same')(conv12) ############################################################################# lstm2 = LSTM(64, return_sequences=True)(transE) transformer_block3 = TransformerBlockE(64, num_head, ff_dim) transE_P = transformer_block3(lstm2) up1_P = UpSampling1D(size=2)(transE_P) conv7_P = Conv1D(96, 3, activation='relu', padding='same')(up1_P) up2_P = UpSampling1D(size=2)(conv7_P) conv8_P = Conv1D(96, 5, activation='relu', padding='same')(up2_P) up3_P = UpSampling1D(size=2)(conv8_P) conv9_P = Conv1D(32, 5, activation='relu', padding='same')(up3_P) up4_P = UpSampling1D(size=2)(conv9_P) conv10_P = Conv1D(16, 7, activation='relu', padding='same')(up4_P) up5_P = UpSampling1D(size=2)(conv10_P) conv11_P = Conv1D(16, 9, activation='relu', padding='same')(up5_P) up6_P = UpSampling1D(size=2)(conv11_P) conv12_P = Conv1D(8, 11, activation='relu', padding='same')(up6_P) conv13_P = Conv1D(1, 1, activation='sigmoid', padding='same')(conv12_P) ############################################################################# lstm3 = LSTM(64, return_sequences=True)(transE) transformer_block4 = TransformerBlockE(64, num_head, ff_dim) transE_S = transformer_block4(lstm3) up1_S = UpSampling1D(size=2)(transE_S) conv7_S = Conv1D(96, 3, activation='relu', padding='same')(up1_S) up2_S = UpSampling1D(size=2)(conv7_S) conv8_S = Conv1D(96, 5, activation='relu', padding='same')(up2_S) up3_S = UpSampling1D(size=2)(conv8_S) conv9_S = Conv1D(32, 5, activation='relu', padding='same')(up3_S) up4_S = UpSampling1D(size=2)(conv9_S) conv10_S = Conv1D(16, 7, activation='relu', padding='same')(up4_S) up5_S = UpSampling1D(size=2)(conv10_S) conv11_S = Conv1D(16, 9, activation='relu', padding='same')(up5_S) up6_S = UpSampling1D(size=2)(conv11_S) conv12_S = Conv1D(8, 11, activation='relu', padding='same')(up6_S) conv13_S = Conv1D(1, 1, activation='sigmoid', padding='same')(conv12_S) ############################################################################# output = concatenate([conv13_P, conv13_S,conv13], axis=2) output = output[:, tf.newaxis, :, :] model = Model(inputs=inputs, outputs=output) return model
[docs]class ResBlock(Layer): """ Residual CNN block Structure """
[docs] def __init__(self, filter_nums, strides=1, residual_path=False, **kwargs): super(ResBlock, self).__init__() self.filter_nums = filter_nums self.strides = strides self.residual_path = residual_path self.bn_1 = BatchNormalization() self.act_relu1 = Activation('relu') self.drop_1 = Dropout(0.1) self.conv_1 = Conv1D(filter_nums, 3, strides=strides, padding='same') self.bn_2 = BatchNormalization() self.act_relu2 = Activation('relu') self.drop_2 = Dropout(0.1) self.conv_2 = Conv1D(filter_nums, 3, strides=1, padding='same') if strides != 1: self.block = Sequential() self.block.add(Conv1D(filter_nums, 1, strides=strides)) else: self.block = lambda x: x
[docs] def call(self, inputs, training=None): x = self.bn_1(inputs, training=training) x = self.act_relu1(x) x = self.drop_1(x) x = self.conv_1(x) x = self.bn_2(x, training=training) x = self.act_relu2(x) x = self.drop_2(x) x = self.conv_2(x) identity = self.block(inputs) outputs = add([x, identity]) outputs = tf.nn.relu(outputs) return outputs
[docs] def get_config(self): """ Save model config. :return: config """ config = super().get_config().copy() config.update({ 'filter_nums': self.filter_nums, 'strides': self.strides, 'residual_path': self.residual_path, }) return config
[docs]def ResNet_build(filter_nums, block_nums, strides=1): """ Build ResNet :param filter_nums: number of convolution channel. :param block_nums: number of residual block. :param strides: stride of convolution layer :return: model """ build_model = Sequential() build_model.add(ResBlock(filter_nums, strides)) for _ in range(1, block_nums): build_model.add(ResBlock(filter_nums, strides=1)) return build_model
[docs]class MultiHeadSelfAttention(Layer):
[docs] def __init__(self, embed_dim, num_heads=4, mask=False, **kwargs): super(MultiHeadSelfAttention, self).__init__() self.embed_dim = embed_dim self.num_heads = num_heads if embed_dim % num_heads != 0: raise ValueError( f"embedding dimension = {embed_dim} should be divisible by number of heads = {num_heads}" ) self.projection_dim = embed_dim // num_heads self.query_dense = Dense(embed_dim) self.key_dense = Dense(embed_dim) self.value_dense = Dense(embed_dim) self.combine_heads = Dense(embed_dim) self.mask = mask
def attention(self, query, key, value): score = tf.matmul(query, key, transpose_b=True) dim_key = tf.cast(tf.shape(key)[-1], tf.float32) scaled_score = score / tf.math.sqrt(dim_key) if self.mask: infmatrix = np.ones([self.seq_dim, self.seq_dim]) * -np.inf infmatrix = np.triu(infmatrix, 1) infmatrix = tf.constant(infmatrix, tf.float32) scaled_score = scaled_score + infmatrix weights = tf.nn.softmax(scaled_score, axis=-1) output = tf.matmul(weights, value) return output, weights def separate_heads(self, x, batch_size): x = tf.reshape(x, (batch_size, -1, self.num_heads, self.projection_dim)) return tf.transpose(x, perm=[0, 2, 1, 3])
[docs] def call(self, inputs): # x.shape = [batch_size, seq_len, embedding_dim] batch_size = tf.shape(inputs)[0] self.seq_dim = tf.shape(inputs)[1] query = self.query_dense(inputs) # (batch_size, seq_len, embed_dim) key = self.key_dense(inputs) # (batch_size, seq_len, embed_dim) value = self.value_dense(inputs) # (batch_size, seq_len, embed_dim) query = self.separate_heads( query, batch_size ) # (batch_size, num_heads, seq_len, projection_dim) key = self.separate_heads( key, batch_size ) # (batch_size, num_heads, seq_len, projection_dim) value = self.separate_heads( value, batch_size ) # (batch_size, num_heads, seq_len, projection_dim) attention, weights = self.attention(query, key, value) attention = tf.transpose( attention, perm=[0, 2, 1, 3] ) # (batch_size, seq_len, num_heads, projection_dim) concat_attention = tf.reshape( attention, (batch_size, -1, self.embed_dim) ) # (batch_size, seq_len, embed_dim) output = self.combine_heads( concat_attention ) # (batch_size, seq_len, embed_dim) return output
[docs] def get_config(self): config = super().get_config().copy() config.update({ 'embed_dim': self.embed_dim, 'num_heads': self.num_heads, 'mask': self.mask, }) return config
[docs]class TransformerBlockE(Layer):
[docs] def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1, **kwargs): super(TransformerBlockE, self).__init__() self.embed_dim = embed_dim self.num_heads = num_heads self.ff_dim = ff_dim self.att1 = MultiHeadSelfAttention(embed_dim, num_heads, mask=False) self.ffn1 = tf.keras.Sequential( [Dense(ff_dim, activation="relu"), Dense(embed_dim), ] ) self.layernorm1 = LayerNormalization(epsilon=1e-6) self.layernorm2 = LayerNormalization(epsilon=1e-6) self.dropout1 = Dropout(rate) self.dropout2 = Dropout(rate)
[docs] def call(self, inputs, **kwrags): att1_output = self.att1(inputs) att1_output = self.dropout1(att1_output, training=True) out1 = self.layernorm1(inputs + att1_output) ffn_output = self.ffn1(out1) ffn_output = self.dropout2(ffn_output, training=True) return self.layernorm2(out1 + ffn_output)
[docs] def get_config(self): config = super().get_config().copy() config.update({ 'embed_dim': self.embed_dim, 'num_heads': self.num_heads, 'ff_dim': self.ff_dim, }) return config
[docs]class TransformerBlockD(Layer):
[docs] def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1, **kwargs): super(TransformerBlockD, self).__init__() self.embed_dim = embed_dim self.num_heads = num_heads self.ff_dim = ff_dim self.att1 = MultiHeadSelfAttention(embed_dim, num_heads, mask=True) self.att2 = MultiHeadSelfAttention(embed_dim, num_heads, mask=False) self.ffn1 = tf.keras.Sequential( [Dense(ff_dim, activation="relu"), Dense(embed_dim), ] ) self.layernorm1 = LayerNormalization(epsilon=1e-6) self.layernorm2 = LayerNormalization(epsilon=1e-6) self.layernorm3 = LayerNormalization(epsilon=1e-6) self.dropout1 = Dropout(rate) self.dropout2 = Dropout(rate) self.dropout3 = Dropout(rate)
[docs] def call(self, inputs, **kwargs): att1_output = self.att1(inputs[1]) att1_output = self.dropout1(att1_output, training=True) out1 = self.layernorm1(inputs[1] + att1_output) att2_output = self.att2(inputs[0] + out1) att2_output = self.dropout2(att2_output, training=True) out2 = self.layernorm2(att2_output + out1) ffn_output = self.ffn1(out2) ffn_output = self.dropout3(ffn_output, training=True) out3 = self.layernorm3(out2 + ffn_output) return out3
[docs] def get_config(self): config = super().get_config().copy() config.update({ 'embed_dim': self.embed_dim, 'num_heads': self.num_heads, 'ff_dim': self.ff_dim, }) return config