微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

具有 Tensorflow 概率的 Bayesian-NN:预测不起作用

如何解决具有 Tensorflow 概率的 Bayesian-NN:预测不起作用

我正在尝试实现用于基因组预测的贝叶斯神经网络。我的 X 是一个经过缩放并标准化的矩阵,因此值介于 0 和 1 之间。y 是再次标准化的值向量,因此值介于 0 和 1 之间。

网络似乎在学习,如下所示:

enter image description here

但是,当我尝试做出预测时,这些预测看起来很奇怪,而且行为似乎很随机。而 y 的真实值分布在 0 和 1 之间。预测值在 ~ 0.4 - 0.6 之间,我的 R2 为负。 MSE 在 0.02 左右,看起来还不错,但可能是因为预测范围很窄。

enter image description here

我有点想不通是什么地方出了问题。任何建议表示赞赏:)

我还尝试预测训练数据。这也不起作用,我的 R2 为负。

X 具有维度 (5000,500) 和 y (5000,)

增加隐藏层的数量(最多 3 个)和单元(最多 128 个)不会改变任何东西。

# Import necessary packages:

import sys

from os.path import join
import warnings
warnings.filterwarnings('ignore')
from IPython import display

import tensorflow as tf
from tensorflow import keras
import tensorflow_probability as tfp
import kerastuner as kt

from keras import backend as K
from keras import activations,initializers
from keras.layers import Layer

import tensorflow_docs as tfdocs
import tensorflow_docs.modeling
import tensorflow_docs.plots

import numpy as np
import numpy.ma as ma

import pandas as pd
import seaborn as sns
import matplotlib.pylab as plt
import time
import tempfile
import math
import statsmodels.api as sm

from sklearn.model_selection import train_test_split,KFold
from sklearn.metrics import r2_score,mean_absolute_error
from sklearn.linear_model import LinearRegression,BayesianRidge
from sklearn.utils import shuffle
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler,scale

from pandas_plink import read_plink
from pandas_plink import read_plink1_bin
from pandas_plink import get_data_folder

tfd = tfp.distributions

# Set random seed and start timer

np.random.seed(12345)
start = time.time()

### functions

def get_optimizer():
    return tf.keras.optimizers.SGD()

def get_callbacks():
    return [
        #tfdocs.modeling.EpochDots(),tf.keras.callbacks.EarlyStopping(monitor='val_loss',min_delta=1,patience=500,restore_best_weights=True),]

def normalize_data(df):
    return (df - df.min())/(df.max() - df.min())

def compile_model(model,optimizer=None):
    if optimizer is None:
        optimizer = get_optimizer()
        
    model.compile(optimizer=optimizer,loss=keras.losses.MeanSquaredError())
    
    return model

def MSE(test,pred):
    sqr_err = np.subtract(test,pred)**2
    return sqr_err.mean()

# ## load & preprocess data

# ### load genotype data

G = np.genfromtxt("genotype.txt")
G[np.isnan(G)] = 0.

G = normalize_data(G)
print(G.mean())
print(G.var())

# ### load phenotype data

traits = np.genfromtxt("phenotype.txt")

traits = normalize_data(traits)
print(traits.mean())
print(traits.var())

# ### split training and validation set

train_X,test_X,train_y,test_y = train_test_split(G,traits,test_size = 0.2,random_state = 42)

X = np.concatenate((train_X,test_X),axis=0)
y = np.concatenate((train_y,test_y),axis=0)

# ### parameter deFinition

N = G.shape[0]
p = G.shape[1]

NUM_FOLDS = 5
kfold = KFold(n_splits=NUM_FOLDS,shuffle=True)
INPUT_SHAPE = X.shape[1]
OUTPUT_SHAPE = y.shape[0]
BATCH_SIZE = 32
STEPS_PER_EPOCH = math.ceil((X.shape[0]*(1-1/NUM_FOLDS)*0.8)/BATCH_SIZE)
MAX_EPOCHS = 5000

df = pd.DataFrame(columns = ['method','MSE','R2'])
histories = {}

# Specify the surrogate posterior over `keras.layers.Dense` `kernel` and `bias`.
def  posterior_mean_field(kernel_size,bias_size=0,dtype=None):
    n = kernel_size + bias_size
    c = np.log(np.expm1(1.))
    return tf.keras.Sequential([
        tfp.layers.VariableLayer(2 * n,dtype=dtype),tfp.layers.distributionLambda(lambda t: tfd.Independent(
            tfd.normal(loc=t[...,:n],scale=1e-5 + tf.nn.softplus(c + t[...,n:])),reinterpreted_batch_ndims=1)),])

# Specify the prior over `keras.layers.Dense` `kernel` and `bias`.
def prior_trainable(kernel_size,dtype=None):
    n = kernel_size + bias_size
    return tf.keras.Sequential([
        tfp.layers.VariableLayer(n,tfp.layers.distributionLambda(lambda t: tfd.Independent(
            tfd.normal(loc=t,scale=1),])

def neg_log_likelihood(y_true,y_pred,sigma=1.0):
    dist = tfp.distributions.normal(loc=y_pred,scale=sigma)
    return K.sum(-dist.log_prob(y_true))

#neg_log_likelihood = lambda y,p_y: -p_y.log_prob(y)

kl_loss_weight = 1.0 / STEPS_PER_EPOCH

histories = {}

fold_no = 1
for train,test in kfold.split(X,y):
    
    model = tf.keras.Sequential([
        keras.layers.InputLayer(input_shape=(INPUT_SHAPE,)),tfp.layers.DenseVariational(units=32,make_posterior_fn=posterior_mean_field,make_prior_fn=prior_trainable,kl_weight=kl_loss_weight,activation='sigmoid'),tfp.layers.DenseVariational(units=1,tfp.layers.distributionLambda(lambda t: tfd.normal(loc=t,scale=1)),])
    
    model.compile(loss=neg_log_likelihood,optimizer=tf.keras.optimizers.Adam(lr=0.0001),metrics=['mse'])
    
    history = model.fit(X[train],y[train],validation_split = 0.2,steps_per_epoch = STEPS_PER_EPOCH,epochs=MAX_EPOCHS,callbacks=get_callbacks(),verbose=0)
    
    histories['BNN2_'+str(fold_no)] = history
    
    y_pred_list = []
    for i in range(500):
        y_pred = model.predict(X[test])
        y_pred_list.append(y_pred)
    
    y_preds = np.concatenate(y_pred_list,axis=1)
    y_mean = np.mean(y_preds,axis=1)
    m_err = MSE(y[test],y_mean)
    r2_acc = r2_score(y[test],y_mean)
    df = df.append({'MSE':m_err,'R2':r2_acc,'method':'BNN2'},ignore_index=True)
   
    
    fold_no = fold_no + 1
    
df.to_csv("results.csv")

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。