Python tensorflow 模块,variable_scope() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tensorflow.variable_scope()。
def block35(net, scale=1.0, activation_fn=tf.nn.relu, scope=None, reuse=None):
"""Builds the 35x35 resnet block."""
with tf.variable_scope(scope, 'Block35', [net], reuse=reuse):
with tf.variable_scope('Branch_0'):
tower_conv = slim.conv2d(net, 32, 1, scope='Conv2d_1x1')
with tf.variable_scope('Branch_1'):
tower_conv1_0 = slim.conv2d(net, scope='Conv2d_0a_1x1')
tower_conv1_1 = slim.conv2d(tower_conv1_0, 3, scope='Conv2d_0b_3x3')
with tf.variable_scope('Branch_2'):
tower_conv2_0 = slim.conv2d(net, scope='Conv2d_0a_1x1')
tower_conv2_1 = slim.conv2d(tower_conv2_0, 48, scope='Conv2d_0b_3x3')
tower_conv2_2 = slim.conv2d(tower_conv2_1, 64, scope='Conv2d_0c_3x3')
mixed = tf.concat(axis=3, values=[tower_conv, tower_conv1_1, tower_conv2_2])
up = slim.conv2d(mixed, net.get_shape()[3], normalizer_fn=None,
activation_fn=None, scope='Conv2d_1x1')
net += scale * up
if activation_fn:
net = activation_fn(net)
return net
def block17(net, reuse=None):
"""Builds the 17x17 resnet block."""
with tf.variable_scope(scope, 'Block17', 192, 128, 160, [1, 7],
scope='Conv2d_0b_1x7')
tower_conv1_2 = slim.conv2d(tower_conv1_1, [7, 1],
scope='Conv2d_0c_7x1')
mixed = tf.concat(axis=3, tower_conv1_2])
up = slim.conv2d(mixed, scope='Conv2d_1x1')
net += scale * up
if activation_fn:
net = activation_fn(net)
return net
def block8(net, reuse=None):
"""Builds the 8x8 resnet block."""
with tf.variable_scope(scope, 'Block8', 224, 3],
scope='Conv2d_0b_1x3')
tower_conv1_2 = slim.conv2d(tower_conv1_1, 256, [3,
scope='Conv2d_0c_3x1')
mixed = tf.concat(axis=3, scope='Conv2d_1x1')
net += scale * up
if activation_fn:
net = activation_fn(net)
return net
def _load_data_graph(self):
"""
Loads the data graph consisting of the encoder and decoder input placeholders,Label (Target tip summary)
placeholders and the weights of the hidden layer of the Seq2Seq model.
:return: None
"""
# input
with tf.variable_scope("train_test", reuse=True):
# review input - Both original and reversed
self.enc_inp_fwd = [tf.placeholder(tf.int32, shape=(None,), name="input%i" % t)
for t in range(self.seq_length)]
self.enc_inp_bwd = [tf.placeholder(tf.int32, name="input%i" % t)
for t in range(self.seq_length)]
# desired output
self.labels = [tf.placeholder(tf.int32, name="labels%i" % t)
for t in range(self.seq_length)]
# weight of the hidden layer
self.weights = [tf.ones_like(labels_t, dtype=tf.float32)
for labels_t in self.labels]
# Decoder input: prepend some "GO" token and drop the final
# token of the encoder input
self.dec_inp = ([tf.zeros_like(self.labels[0], dtype=np.int32, name="GO")] + self.labels[:-1])
def _load_data_graph(self):
"""
Loads the data graph consisting of the encoder and decoder input placeholders, reuse=True):
self.enc_inp = [tf.placeholder(tf.int32,
name="input%i" % t)
for t in range(self.seq_length)]
# desired output
self.labels = [tf.placeholder(tf.int32,
name="labels%i" % t)
for t in range(self.seq_length)]
# weight of the hidden layer
self.weights = [tf.ones_like(labels_t, name="GO")]
+ self.labels[:-1])
def _load_data_graph(self):
"""
Loads the data graph consisting of the encoder and decoder input placeholders, name="GO")] + self.labels[:-1])
def _load_optimizer(self):
"""
Load the SGD optimizer
:return: None
"""
# loss function
with tf.variable_scope("forward"):
self.loss_fwd = tf.nn.seq2seq.sequence_loss(self.dec_outputs_fwd, self.labels,
self.weights, self.vocab_size)
# optimizer
self.optimizer_fwd = tf.train.MomentumOptimizer(self.learning_rate, self.momentum)
self.train_op_fwd = self.optimizer_fwd.minimize(self.loss_fwd)
with tf.variable_scope("backward"):
self.loss_bwd = tf.nn.seq2seq.sequence_loss(self.dec_outputs_bwd, self.vocab_size)
# optimizer
self.optimizer_bwd = tf.train.MomentumOptimizer(self.learning_rate, self.momentum)
self.train_op_bwd = self.optimizer_bwd.minimize(self.loss_bwd)
def gen_sent_on_topic(idxvocab, vocabxid, start_symbol, end_symbol, cf):
output = codecs.open(args.gen_sent_on_topic, "w", "utf-8")
topics, entropy = tm.get_topics(sess, topn=topn)
with tf.variable_scope("model", reuse=True, initializer=initializer):
mgen = LM(is_training=False, vocab_size=len(idxvocab), batch_size=1, num_steps=1, config=cf, \
reuse_conv_variables=True)
for t in range(cf.topic_number):
output.write("\n" + "="*100 + "\n")
output.write("Topic " + str(t) + ":\n")
output.write(" ".join([ idxvocab[item] for item in topics[t] ]) + "\n\n")
output.write("\nSentence generation (greedy; argmax):" + "\n")
s = mgen.generate_on_topic(sess, t, vocabxid[start_symbol], 0, cf.lm_sent_len+10, vocabxid[end_symbol])
output.write("[0] " + " ".join([ idxvocab[item] for item in s ]) + "\n")
for temp in gen_temps:
output.write("\nSentence generation (random; temperature = " + str(temp) + "):\n")
for i in xrange(gen_num):
s = mgen.generate_on_topic(sess, temp, \
vocabxid[end_symbol])
output.write("[" + str(i) + "] " + " ".join([ idxvocab[item] for item in s ]) + "\n")
def __call__(self, left_state, right_state, extra_input=None):
with tf.variable_scope('TreeLSTM'):
c1, h1 = left_state
c2, h2 = right_state
if extra_input is not None:
input_concat = tf.concat((extra_input, h1, h2), axis=1)
else:
input_concat = tf.concat((h1, axis=1)
concat = tf.layers.dense(input_concat, 5 * self._num_cells)
i, f1, f2, o, g = tf.split(concat, 5, axis=1)
i = tf.sigmoid(i)
f1 = tf.sigmoid(f1)
f2 = tf.sigmoid(f2)
o = tf.sigmoid(o)
g = tf.tanh(g)
cnew = f1 * c1 + f2 * c2 + i * g
hnew = o * cnew
newstate = LSTMStateTuple(c=cnew, h=hnew)
return hnew, newstate
def bag_of_tokens(config, labels, label_lengths):
if config.train_output_embeddings:
with tf.variable_scope('embed', reuse=True):
output_embeddings = tf.get_variable('output_embedding')
else:
output_embeddings = tf.constant(config.output_embedding_matrix)
#everything_label_placeholder = tf.placeholder(shape=(None,config.max_length,dtype=tf.int32)
#everything_label_length_placeholder = tf.placeholder(shape=(None,dtype=tf.int32)
labels = tf.constant(np.array(labels))
embedded_output = tf.gather(output_embeddings, labels)
print('embedded_output before', embedded_output)
#mask = tf.sequence_mask(label_lengths,maxlen=config.max_length,dtype=tf.float32)
# note: this multiplication will broadcast the mask along all elements of the depth dimension
# (which is why we run the expand_dims to choose how to broadcast)
#embedded_output = embedded_output * tf.expand_dims(mask,axis=2)
#print('embedded_output after',embedded_output)
return tf.reduce_sum(embedded_output, axis=1)
def triplet_loss(anchor, positive, negative, alpha):
"""Calculate the triplet loss according to the FaceNet paper
Args:
anchor: the embeddings for the anchor images.
positive: the embeddings for the positive images.
negative: the embeddings for the negative images.
Returns:
the triplet loss according to the FaceNet paper as a float tensor.
"""
with tf.variable_scope('triplet_loss'):
pos_dist = tf.reduce_sum(tf.square(tf.subtract(anchor, positive)), 1)
neg_dist = tf.reduce_sum(tf.square(tf.subtract(anchor, negative)), 1)
basic_loss = tf.add(tf.subtract(pos_dist,neg_dist), alpha)
loss = tf.reduce_mean(tf.maximum(basic_loss, 0.0), 0)
return loss
def build_encoder(self):
"""Inference Network. q(h|X)"""
with tf.variable_scope("encoder"):
q_cell = tf.nn.rnn_cell.LSTMCell(self.embed_dim, self.vocab_size)
a_cell = tf.nn.rnn_cell.LSTMCell(self.embed_dim, self.vocab_size)
l1 = tf.nn.relu(tf.nn.rnn_cell.linear(tf.expand_dims(self.x, 0), self.embed_dim, bias=True, scope="l1"))
l2 = tf.nn.relu(tf.nn.rnn_cell.linear(l1, scope="l2"))
self.mu = tf.nn.rnn_cell.linear(l2, self.h_dim, scope="mu")
self.log_sigma_sq = tf.nn.rnn_cell.linear(l2, scope="log_sigma_sq")
eps = tf.random_normal((1, self.h_dim), dtype=tf.float32)
sigma = tf.sqrt(tf.exp(self.log_sigma_sq))
_ = tf.histogram_summary("mu", self.mu)
_ = tf.histogram_summary("sigma", sigma)
self.h = self.mu + sigma * eps
def build_encoder(self):
"""Inference Network. q(h|X)"""
with tf.variable_scope("encoder"):
self.l1_lin = linear(tf.expand_dims(self.x, scope="l1")
self.l1 = tf.nn.relu(self.l1_lin)
self.l2_lin = linear(self.l1, scope="l2")
self.l2 = tf.nn.relu(self.l2_lin)
self.mu = linear(self.l2, scope="mu")
self.log_sigma_sq = linear(self.l2, scope="log_sigma_sq")
self.eps = tf.random_normal((1, dtype=tf.float32)
self.sigma = tf.sqrt(tf.exp(self.log_sigma_sq))
self.h = tf.add(self.mu, tf.mul(self.sigma, self.eps))
_ = tf.histogram_summary("mu", self.sigma)
_ = tf.histogram_summary("h", self.h)
_ = tf.histogram_summary("mu + sigma", self.mu + self.sigma)
def _pooling_layer(
self, layer_name, inputs, size, stride, padding='SAME'):
"""Pooling layer operation constructor.
Args:
layer_name: layer name.
inputs: input tensor
size: kernel size.
stride: stride
padding: 'SAME' or 'VALID'. See tensorflow doc for detailed description.
Returns:
A pooling layer operation.
"""
with tf.variable_scope(layer_name) as scope:
out = tf.nn.max_pool(inputs,
ksize=[1,
strides=[1,
padding=padding)
activation_size = np.prod(out.get_shape().as_list()[1:])
self.activation_counter.append((layer_name, activation_size))
return out
def bBox_transform_inv(bBox):
"""convert a bBox of form [xmin,ymin,xmax,ymax] to [cx,cy,w,h]. Works
for numpy array or list of tensors.
"""
with tf.variable_scope('bBox_transform_inv') as scope:
xmin, ymin, xmax, ymax = bBox
out_Box = [[]]*4
width = xmax - xmin + 1.0
height = ymax - ymin + 1.0
out_Box[0] = xmin + 0.5*width
out_Box[1] = ymin + 0.5*height
out_Box[2] = width
out_Box[3] = height
return out_Box
def sub_lstm(self, model_input, num_frames, lstm_size, number_of_layers, sub_scope=""):
stacked_lstm = tf.contrib.rnn.MultiRNNCell(
[
tf.contrib.rnn.BasicLSTMCell(
lstm_size, forget_bias=1.0, state_is_tuple=True)
for _ in range(number_of_layers)
],
state_is_tuple=True)
loss = 0.0
with tf.variable_scope(sub_scope+"-RNN"):
outputs, state = tf.nn.dynamic_rnn(stacked_lstm,
sequence_length=num_frames,
swap_memory=FLAGS.rnn_swap_memory,
dtype=tf.float32)
final_state = tf.concat(map(lambda x: x.c, state), axis = 1)
return final_state
def prenet(inputs, num_units=None, dropout_rate=0, is_training=True, scope="prenet", reuse=None):
'''Prenet for Encoder and Decoder.
Args:
inputs: A 3D tensor of shape [N,T,hp.embed_size].
num_units" A list of two integers.
is_training: A boolean.
scope: Optional scope for `variable_scope`.
reuse: Boolean,whether to reuse the weights of a prevIoUs layer
by the same name.
Returns:
A 3D tensor of shape [N,num_units/2].
'''
if num_units is None:
num_units = [inputs.get_shape()[-1], inputs.get_shape()[-1]]
with tf.variable_scope(scope, reuse=reuse):
outputs = tf.layers.dense(inputs, units=num_units[0], activation=tf.nn.relu, name="dense1")
outputs = tf.layers.dropout(outputs, rate=dropout_rate, training=is_training, name="dropout1")
outputs = tf.layers.dense(outputs, units=num_units[1], name="dense2")
outputs = tf.layers.dropout(outputs, name="dropout2")
return outputs # (N,num_units[1])
def highwaynet(inputs, scope="highwaynet", reuse=None):
'''Highway networks,see https://arxiv.org/abs/1505.00387
Args:
inputs: A 3D tensor of shape [N,W].
num_units: An int or `None`. Specifies the number of units in the highway layer
or uses the input size if `None`.
scope: Optional scope for `variable_scope`.
reuse: Boolean,whether to reuse the weights of a prevIoUs layer
by the same name.
Returns:
A 3D tensor of shape [N,W].
'''
if num_units is None:
num_units = inputs.get_shape()[-1]
with tf.variable_scope(scope, reuse=reuse):
H = tf.layers.dense(inputs, units=num_units, name="H")
T = tf.layers.dense(inputs, activation=tf.nn.sigmoid, name="T")
C = 1. - T
outputs = H * T + inputs * C
return outputs
def trainable_initial_state(self, batch_size):
"""
Create a trainable initial state for the SkipLSTMCell
:param batch_size: number of samples per batch
:return: SkipLSTMStateTuple
"""
with tf.variable_scope('initial_c'):
initial_c = rnn_ops.create_initial_state(batch_size, self._num_units)
with tf.variable_scope('initial_h'):
initial_h = rnn_ops.create_initial_state(batch_size, self._num_units)
with tf.variable_scope('initial_update_prob'):
initial_update_prob = rnn_ops.create_initial_state(batch_size, trainable=False,
initializer=tf.ones_initializer())
with tf.variable_scope('initial_cum_update_prob'):
initial_cum_update_prob = rnn_ops.create_initial_state(batch_size,
initializer=tf.zeros_initializer())
return SkipLSTMStateTuple(initial_c, initial_h, initial_update_prob, initial_cum_update_prob)
def trainable_initial_state(self, batch_size):
"""
Create a trainable initial state for the MultiSkipGRUCell
:param batch_size: number of samples per batch
:return: list of tensors and SkipGRUStateTuple
"""
initial_states = []
for idx in range(self._num_layers - 1):
with tf.variable_scope('layer_%d' % (idx + 1)):
with tf.variable_scope('initial_h'):
initial_h = rnn_ops.create_initial_state(batch_size, self._num_units[idx])
initial_states.append(initial_h)
with tf.variable_scope('layer_%d' % self._num_layers):
with tf.variable_scope('initial_h'):
initial_h = rnn_ops.create_initial_state(batch_size, self._num_units[-1])
with tf.variable_scope('initial_update_prob'):
initial_update_prob = rnn_ops.create_initial_state(batch_size,
initializer=tf.ones_initializer())
with tf.variable_scope('initial_cum_update_prob'):
initial_cum_update_prob = rnn_ops.create_initial_state(batch_size,
initializer=tf.zeros_initializer())
initial_states.append(SkipGRUStateTuple(initial_h, initial_cum_update_prob))
return initial_states
def trainable_initial_state(self, batch_size):
"""
Create a trainable initial state for the BasicLSTMCell
:param batch_size: number of samples per batch
:return: LSTMStateTuple
"""
def _create_initial_state(batch_size, state_size, trainable=True, initializer=tf.random_normal_initializer()):
with tf.device('/cpu:0'):
s = tf.get_variable('initial_state', shape=[1, state_size], dtype=tf.float32, trainable=trainable,
initializer=initializer)
state = tf.tile(s, tf.stack([batch_size] + [1]))
return state
with tf.variable_scope('initial_c'):
initial_c = _create_initial_state(batch_size, self._num_units)
with tf.variable_scope('initial_h'):
initial_h = _create_initial_state(batch_size, self._num_units)
return tf.contrib.rnn.LSTMStateTuple(initial_c, initial_h)
def __call__(self, state, scope=None):
"""Gated recurrent unit (GRU) with num_units cells."""
with tf.variable_scope(scope or type(self).__name__):
with tf.variable_scope("gates"): # Reset gate and update gate.
# We start with bias of 1.0 to not reset and not update.
concat = rnn_ops.linear([inputs, state], 2 * self._num_units, True, bias_start=1.0)
r, u = tf.split(value=concat, num_or_size_splits=2, axis=1)
if self._layer_norm:
r = rnn_ops.layer_norm(r, name="r")
u = rnn_ops.layer_norm(u, name="u")
# Apply non-linearity after layer normalization
r = tf.sigmoid(r)
u = tf.sigmoid(u)
with tf.variable_scope("candidate"):
c = self._activation(rnn_ops.linear([inputs, r * state], self._num_units, True))
new_h = u * state + (1 - u) * c
return new_h, new_h
def conv2d(x, num_filters, name, filter_size=(3, 3), stride=(1, 1), pad="SAME", collections=None):
with tf.variable_scope(name):
stride_shape = [1, stride[0], stride[1], 1]
filter_shape = [filter_size[0], filter_size[1], int(x.get_shape()[3]), num_filters]
# there are "num input feature maps * filter height * filter width"
# inputs to each hidden unit
fan_in = np.prod(filter_shape[:3])
# each unit in the lower layer receives a gradient from:
# "num output feature maps * filter height * filter width" /
# pooling size
fan_out = np.prod(filter_shape[:2]) * num_filters
# initialize weights with random weights
w_bound = np.sqrt(6. / (fan_in + fan_out))
w = tf.get_variable("W", filter_shape, dtype, tf.random_uniform_initializer(-w_bound, w_bound),
collections=collections)
b = tf.get_variable("b", num_filters], initializer=tf.constant_initializer(0.0),
collections=collections)
return tf.nn.conv2d(x, w, stride_shape, pad) + b
def __call__(self, *args):
if args in self.cache:
print("(%s) retrieving value from cache"%self.name)
return self.cache[args]
with tf.variable_scope(self.name, reuse=not self.first_time):
scope = tf.get_variable_scope().name
if self.first_time:
self.scope = scope
print("(%s) running function for the first time"%self.name)
else:
assert self.scope == scope, "Tried calling function with a different scope"
print("(%s) running function on new inputs"%self.name)
self.first_time = False
out = self._call(*args)
self.cache[args] = out
return out
def forward(self, x):
length = lambda mx: int(mx.get_shape()[0])
with tf.variable_scope("QRNN/Forward"):
if self.c is None:
# init context cell
self.c = tf.zeros([length(x), self.kernel.size], dtype=tf.float32)
if self.conv_size <= 2:
# x is batch_size x sentence_length x word_length
# -> Now,transpose it to sentence_length x batch_size x word_length
_x = tf.transpose(x, 2])
for i in range(length(_x)):
t = _x[i] # t is batch_size x word_length matrix
f, z, o = self.kernel.forward(t)
self._step(f, o)
else:
c_f, c_z, c_o = self.kernel.conv(x)
for i in range(length(c_f)):
f, o = c_f[i], c_z[i], c_o[i]
self._step(f, o)
return self.h
def _test_with_residuals(self, **kwargs):
"""Runs the cell in a session"""
inputs = tf.convert_to_tensor(inputs)
state = (tf.constant(np.random.randn(1, 2)),
tf.constant(np.random.randn(1, 2)))
with tf.variable_scope("root", initializer=tf.constant_initializer(0.5)):
test_cell = rnn_cell.ExtendedMultiRNNCell(
[tf.contrib.rnn.GRUCell(2) for _ in range(2)],
residual_connections=True,
**kwargs)
res_test = test_cell(inputs, scope="test")
with self.test_session() as sess:
sess.run([tf.global_variables_initializer()])
return sess.run(res_test)
def _build_graph(self):
config = self.config
config.fast_test = False
eval_config = Config(clone=config)
eval_config.batch_size = 1
initializer = self.model_initializer
with tf.name_scope("Train"):
with tf.variable_scope("Model", reuse=False, initializer=initializer):
self.train_model = self.Model(config=config, loss_fct=self.loss_fct)
tf.summary.scalar("Training Loss", self.train_model.cost)
tf.summary.scalar("Learning Rate", self.train_model.lr)
with tf.name_scope("Valid"):
with tf.variable_scope("Model", initializer=initializer):
self.validation_model = self.Model(config=config, is_training=False, loss_fct="softmax")
tf.summary.scalar("Validation Loss", self.validation_model.cost)
with tf.name_scope("Test"):
with tf.variable_scope("Model", initializer=initializer):
self.test_model = self.Model(config=eval_config, is_training=False)
def _add_cross_entropy(labels, logits, pref):
"""Compute average cross entropy and add to loss collection.
Args:
labels: Single dimension labels from distorted_inputs() or inputs().
logits: Output map from inference().
pref: Either 'c' or 's',for contours or segments,respectively.
"""
with tf.variable_scope('{}_cross_entropy'.format(pref)) as scope:
class_prop = C_CLASS_PROP if pref == 'c' else S_CLASS_PROP
weight_per_label = tf.scalar_mul(class_prop, tf.cast(tf.equal(labels,
tf.float32)) + \
tf.scalar_mul(1.0 - class_prop,
tf.float32))
cross_entropy = tf.losses.sparse_softmax_cross_entropy(
labels=tf.squeeze(labels, squeeze_dims=[3]), logits=logits)
cross_entropy_weighted = tf.multiply(weight_per_label, cross_entropy)
cross_entropy_mean = tf.reduce_mean(cross_entropy_weighted, name=scope.name)
tf.add_to_collection('losses', cross_entropy_mean)
def max_pool2d(inputs,
kernel_size,
scope,
stride=[2, 2],
padding='VALID'):
""" 2D max pooling.
Args:
inputs: 4-D tensor BxHxWxC
kernel_size: a list of 2 ints
stride: a list of 2 ints
Returns:
Variable tensor
"""
with tf.variable_scope(scope) as sc:
kernel_h, kernel_w = kernel_size
stride_h, stride_w = stride
outputs = tf.nn.max_pool(inputs,
ksize=[1, kernel_h, kernel_w,
strides=[1, stride_h, stride_w,
padding=padding,
name=sc.name)
return outputs
def avg_pool2d(inputs,
padding='VALID'):
""" 2D avg pooling.
Args:
inputs: 4-D tensor BxHxWxC
kernel_size: a list of 2 ints
stride: a list of 2 ints
Returns:
Variable tensor
"""
with tf.variable_scope(scope) as sc:
kernel_h, stride_w = stride
outputs = tf.nn.avg_pool(inputs,
name=sc.name)
return outputs
def max_pool3d(inputs, 2,
padding='VALID'):
""" 3D max pooling.
Args:
inputs: 5-D tensor BxDxHxWxC
kernel_size: a list of 3 ints
stride: a list of 3 ints
Returns:
Variable tensor
"""
with tf.variable_scope(scope) as sc:
kernel_d, kernel_w = kernel_size
stride_d, stride_w = stride
outputs = tf.nn.max_pool3d(inputs,
ksize=[1, kernel_d,
strides=[1, stride_d,
padding=padding,
name=sc.name)
return outputs
def avg_pool3d(inputs,
padding='VALID'):
""" 3D avg pooling.
Args:
inputs: 5-D tensor BxDxHxWxC
kernel_size: a list of 3 ints
stride: a list of 3 ints
Returns:
Variable tensor
"""
with tf.variable_scope(scope) as sc:
kernel_d, stride_w = stride
outputs = tf.nn.avg_pool3d(inputs,
name=sc.name)
return outputs
def dropout(inputs,
is_training,
scope,
keep_prob=0.5,
noise_shape=None):
""" Dropout layer.
Args:
inputs: tensor
is_training: boolean tf.Variable
scope: string
keep_prob: float in [0,1]
noise_shape: list of ints
Returns:
tensor variable
"""
with tf.variable_scope(scope) as sc:
outputs = tf.cond(is_training,
lambda: tf.nn.dropout(inputs, keep_prob, noise_shape),
lambda: inputs)
return outputs
def __discriminator(self, x, scope, reuse):
with tf.variable_scope(scope, reuse=reuse):
x1 = tf.layers.conv2d(x, strides=2, padding='same')
x1 = LeakyReLU(x1, self.alpha)
# 16x16x64
x2 = tf.layers.conv2d(x1, padding='same')
x2 = tf.layers.batch_normalization(x2, training=self.training)
x2 = LeakyReLU(x2, self.alpha)
# 8x8x128
x3 = tf.layers.conv2d(x2, padding='same')
x3 = tf.layers.batch_normalization(x3, training=self.training)
x3 = LeakyReLU(x3, self.alpha)
# 4x4x256
# Flatten it
flat = tf.reshape(x3, (-1, 4*4*256))
logits = tf.layers.dense(flat, 1)
out = tf.sigmoid(logits)
return out, logits
#---------------------------------------------------------------------------
def build_discriminator(self, image_size):
self.inputs_real = tf.placeholder(tf.float32, [None, *image_size],
name='inputs_real')
#-----------------------------------------------------------------------
# Process input so that it matches what the generator produces
#-----------------------------------------------------------------------
with tf.variable_scope('process_real'):
processed = self.inputs_real/128-1
#-----------------------------------------------------------------------
# Real discriminator
#-----------------------------------------------------------------------
ret = self.__discriminator(processed, 'discriminator', False)
self.dsc_real_out = ret[0]
self.dsc_real_logits = ret[1]
#-----------------------------------------------------------------------
# Fake discriminator
#-----------------------------------------------------------------------
ret = self.__discriminator(self.gen_out, True)
self.dsc_fake_out = ret[0]
self.dsc_fake_logits = ret[1]
#---------------------------------------------------------------------------
def get_optimizer(self, learning_rate = 0.001, grad_clip = 5):
#-----------------------------------------------------------------------
# Build a loss function
#-----------------------------------------------------------------------
with tf.variable_scope('loss'):
loss = tf.losses.mean_squared_error(self.target, self.output)
#-----------------------------------------------------------------------
# Build the optimizer
#-----------------------------------------------------------------------
with tf.variable_scope('optimizer'):
tvars = tf.trainable_variables()
grads, _ = tf.clip_by_global_norm(tf.gradients(loss, tvars),
grad_clip)
train_op = tf.train.AdamOptimizer(learning_rate)
optimizer = train_op.apply_gradients(zip(grads, tvars))
return optimizer, loss
def __init__(self, rnd_vec_dim, hidden_units, output_dim, alpha):
#-----------------------------------------------------------------------
# Inputs
#-----------------------------------------------------------------------
self.inputs_rnd = tf.placeholder(tf.float32, (None, rnd_vec_dim),
name='inputs_rnd')
#-----------------------------------------------------------------------
# The generator
#-----------------------------------------------------------------------
self.alpha = alpha
with tf.variable_scope('generator'):
h1 = tf.layers.dense(self.inputs_rnd, activation=None)
h1 = LeakyReLU(h1, self.alpha)
self.gen_logits = tf.layers.dense(h1, activation=None)
self.gen_out = tf.tanh(self.gen_logits)
#---------------------------------------------------------------------------
def _load_optimizer(self):
"""
Load the SGD optimizer
:return: None
"""
# loss function
with tf.variable_scope("forward"):
self.loss_fwd = tf.nn.seq2seq.sequence_loss(self.dec_outputs_fwd,
self.labels, self.weights, self.vocab_size)
# optimizer
# self.optimizer_fwd = tf.train.MomentumOptimizer(self.learning_rate,
# self.momentum)
self.optimizer_fwd = tf.train.GradientDescentOptimizer(self.learning_rate)
self.train_op_fwd = self.optimizer_fwd.minimize(self.loss_fwd)
with tf.variable_scope("backward"):
self.loss_bwd = tf.nn.seq2seq.sequence_loss(self.dec_outputs_bwd, self.vocab_size)
# optimizer
# self.optimizer_bwd = tf.train.MomentumOptimizer(self.learning_rate,
# self.momentum)
self.optimizer_bwd = tf.train.GradientDescentOptimizer(self.learning_rate)
self.train_op_bwd = self.optimizer_bwd.minimize(self.loss_bwd)
def _load_model(self):
"""
Creates the encoder decoder model
:return: None
"""
# Initial memory value for recurrence.
self.prev_mem = tf.zeros((self.train_batch_size, self.memory_dim))
# choose RNN/GRU/LSTM cell
with tf.variable_scope("train_test", reuse=True):
cell = self.get_cell()
# Stacks layers of RNN's to form a stacked decoder
self.cell = tf.nn.rnn_cell.MultiRNNCell([cell] * self.num_layers)
# embedding model
if not self.attention:
with tf.variable_scope("train_test"):
self.dec_outputs, self.dec_memory = tf.nn.seq2seq.embedding_rnn_seq2seq(
self.enc_inp, self.dec_inp, self.cell,
self.vocab_size, self.vocab_size, self.seq_length)
with tf.variable_scope("train_test", reuse=True):
self.dec_outputs_tst, _ = tf.nn.seq2seq.embedding_rnn_seq2seq(
self.enc_inp, self.seq_length, Feed_prevIoUs=True)
else:
with tf.variable_scope("train_test"):
self.dec_outputs, self.dec_memory = tf.nn.seq2seq.embedding_attention_seq2seq(
self.enc_inp, _ = tf.nn.seq2seq.embedding_attention_seq2seq(
self.enc_inp, Feed_prevIoUs=True)
def _load_model(self):
"""
Creates the encoder decoder model
:return: None
"""
# Initial memory value for recurrence.
self.prev_mem = tf.zeros((self.train_batch_size, reuse=True):
self.cell = self.get_cell()
# embedding model
if not self.attention:
with tf.variable_scope("train_test"):
self.dec_outputs, Feed_prevIoUs=True)
def setupOutput(self):
with tf.variable_scope(self.name):
self.output = max_pool_2x2(self.input)
def setupOutput(self):
with tf.variable_scope(self.name):
self.output = max_pool(self.input,shape=self.shape)
def setupOutput(self):
with tf.variable_scope(self.name):
try:
self.output = tf.image.resize_images(self.input,self.outputShape,method=self.method)#,align_corners=self.alignCorners)
except:
self.output = tf.image.resize_images(self.input,self.outputShape[0],self.outputShape[1],align_corners=self.alignCorners)
def initialize(self):
with tf.variable_scope(self.name):
self.keepProb = tf.placeholder('float') # Variable to hold the dropout probability
def setupOutput(self):
with tf.variable_scope(self.name):
self.output = tf.nn.dropout(self.input,self.keepProb)
self.output.get_shape = self.input.get_shape # DEBUG: remove this whenever TensorFlow fixes this bug
#*** Main Part ***
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。