我试图通过以下代码(数据输入和模型为虚拟)在多个GPU上使用Tensorflow V2的镜像分发策略。
如您所见,每次迭代都会打印训练步骤的计算时间。奇怪的是,随着GPU数量的增加,计算量也随之增加。
$ CUDA_VISIBLE_DEVICES=0 python train_v2_multi_example.py
...
Ep 01/100 | step 02 | 0.473 sec/step | loss: 46485.430
Ep 01/100 | step 03 | 0.482 sec/step | loss: 9216.726
$ CUDA_VISIBLE_DEVICES=0,1 python train_v2_multi_example.py
...
Ep 01/100 | step 02 | 1.141 sec/step | loss: 22627.699
Ep 01/100 | step 03 | 1.091 sec/step | loss: 11679.490
$ CUDA_VISIBLE_DEVICES=0,1,2 python train_v2_multi_example.py
...
Ep 01/100 | step 02 | 1.408 sec/step | loss: 32166.996
Ep 01/100 | step 03 | 1.380 sec/step | loss: 14036.578
- 在所有情况下,我检查了“每个副本”的输入和输出是否相同。
- 我在这里重复相同的伪数据,所以我不认为这是与数据管道相关的问题。
为什么我在这里没有利用并行性的优势?有什么建议吗?
代码如下:
from __future__ import absolute_import,division,print_function,unicode_literals
import os,time,sys,numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import (Conv2D,Conv3D,Dense)
@tf.function
def loss_fn(y_pred,y_true):
return tf.reduce_mean(tf.math.square(y_pred - y_true))
@tf.function
def train_step(dist_inputs):
def step_fn(inputs):
inputs,labels = inputs
# tf.print("in",tf.shape(inputs),"out",tf.shape(labels),output_stream=sys.stdout)
with tf.GradientTape() as tape:
out = model(inputs)
loss_value = loss_fn(out,labels)
grads = tape.gradient(loss_value,model.trainable_weights)
optimizer.apply_gradients(zip(grads,model.trainable_weights))
return loss_value
per_example_losses = strategy.experimental_run_v2(step_fn,args=(dist_inputs,))
mean_loss = strategy.reduce(tf.distribute.ReduceOp.MEAN,per_example_losses,axis=None)
return mean_loss
if __name__ == "__main__":
BATCH_SIZE_PER_SYNC = 4
logdir = os.path.join('logs/test')
strategy = tf.distribute.MirroredStrategy()
num_gpus = strategy.num_replicas_in_sync
global_batch_size = BATCH_SIZE_PER_SYNC * num_gpus
print('num GPUs: {},global batch size: {}'.format(num_gpus,global_batch_size))
# fake data ------------------------------------------------------
fakea = np.random.rand(global_batch_size,10,200,128).astype(np.float32)
targets = np.random.rand(global_batch_size,14)
# tf.Dataset ------------------------------------------------------
def gen():
while True:
yield (fakea,targets)
dataset = tf.data.Dataset.from_generator(gen,(tf.float32,tf.float32),(tf.TensorShape(fakea.shape),tf.TensorShape(targets.shape)))
dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
dist_dataset = strategy.experimental_distribute_dataset(dataset)
# Model ------------------------------------------------------
training = True
with strategy.scope():
# Model
va = keras.Input(shape=(10,128),dtype=tf.float32,name='va')
x = Conv3D(64,kernel_size=3,strides=1,padding='same')(va)
x = Conv3D(64,padding='same')(x)
x = Conv3D(64,padding='same')(x)
x = tf.reduce_max(x,axis=1,name='maxpool') # [ΣK,128]
b = Conv2D(14,padding='same')(x)
model = keras.Model(inputs=va,outputs=b,name='net')
optimizer = keras.optimizers.RMSprop()
model.summary()
# TRAIN ---------------------------------------------------------
writer = tf.summary.create_file_writer(logdir)
num_steps = 100
num_epoches = 100
global_step = 0
with strategy.scope():
iterator = iter(dist_dataset)
with writer.as_default():
for epoch in range(num_epoches):
for step in range(num_steps):
if global_step == 0 or 5 < global_step < 8:
tf.summary.trace_on(graph=True,profiler=True)
start = time.time()
loss_value = train_step(next(iterator))
duration = time.time() - start
prefix = 'Ep {:02d}/{:02d} | step {:02d} '.format(epoch + 1,num_epoches,step)
suffix = '| {:.3f} sec/step | loss: {:.3f} '.format(duration,float(loss_value))
print(prefix + suffix)
tf.summary.scalar("loss",loss_value,step=global_step)
if global_step == 0 or 5 < global_step < 8:
tf.summary.trace_export(name="model_trace",step=global_step,profiler_outdir=logdir)
writer.flush()
global_step += 1