微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

分布式 Tensorflow 训练

如何解决分布式 Tensorflow 训练

我是 Tensorflow 的新手,并尝试使用基于 Documentation 的 ParameterStrategy 实现分布式 Tensorflow。到目前为止,我有下面的代码

import multiprocessing
import os
import portpicker
import tensorflow as tf
import tensorflow.keras as keras
import tensorflow.keras.layers.experimental.preprocessing as kpl
import tensorflow_hub as hub
import numpy as np
print(tf.__version__)

def create_in_process_cluster(num_workers,num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec,job_name="worker",task_index=i,config=worker_config,protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec,job_name="ps",protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec,task_id=0,task_type="worker",rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS,NUM_PS)

variable_partitioner = (
    tf.distribute.experimental.partitioners.FixedShardsPartitioner(
        num_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(cluster_resolver)

word = "Elephant"
sentence = "I am a sentence for which I would like to get its embedding."
paragraph = (
    "Universal Sentence Encoder embeddings also support short paragraphs. "
    "There is no hard limit on how long the paragraph is. Roughly,the longer "
    "the more 'diluted' the embedding will be.")
messages = [word,sentence,paragraph]
labels=["1","2","3"]
reviews = [[1,0],[0,1,1]]

encoder=hub.load("https://tfhub.dev/google/universal-sentence-encoder/4")

X_train=encoder(messages)

with strategy.scope():
  feature_lookup_layer = kpl.StringLookup(vocabulary=messages)

  label_lookup_layer = kpl.StringLookup(vocabulary=labels,num_oov_indices=0,mask_token=None)

  raw_feature_input = keras.layers.Input(
      shape=(1,),dtype=tf.string,name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = keras.Model(
      {"features": raw_feature_input},feature_id_input)

  raw_label_input = keras.layers.Input(
      shape=(3,name="label")
  label_id_input = label_lookup_layer(raw_label_input)
  label_preprocess_stage = keras.Model({"label": raw_label_input},label_id_input)

examples = {"features": [word,paragraph],"label": [["1","0","0"],["0","1","1"]]}
print(examples)
def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset


# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with KPLs.
  model_input = keras.layers.Input(
      shape=(3,dtype=tf.int64,name="model_input")

  emb_layer = keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()),output_dim=20)
  emb_output = tf.reduce_mean(emb_layer(model_input),axis=1)
  dense_output = keras.layers.Dense(units=1,activation="sigmoid")(emb_output)
  model = keras.Model({"features": model_input},dense_output)

  optimizer = keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = keras.metrics.Accuracy()


@tf.function
def step_fn(iterator):

  def replica_fn(batch_data,labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data,training=True)
      per_example_loss = keras.losses.CategoricalCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels,pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss,model.trainable_variables)

    optimizer.apply_gradients(zip(gradients,model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred,0.5),tf.int64)
    accuracy.update_state(labels,actual_pred)
    return loss

  batch_data,labels = next(iterator)
  losses = strategy.run(replica_fn,args=(batch_data,labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM,losses,axis=None)


coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)
@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)


per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)

num_epoches = 2
steps_per_epoch = 1
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn,args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d,accuracy is %f." % (i,accuracy.result().numpy()))

在此示例中,我尝试将文档示例从二元分类转换为分类分类。但我收到以下错误

ValueError: 层密集的输入 0 与层不兼容:预期 min_ndim=2,发现 ndim=1。收到完整形状:(无,)

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。