为什么最少的Tensorflow V2代码的计算时间会增加GPU的数量而不是并行运行?

我试图通过以下代码(数据输入和模型为虚拟)在多个GPU上使用Tensorflow V2的镜像分发策略。

如您所见,每次迭代都会打印训练步骤的计算时间。奇怪的是,随着GPU数量的增加,计算量也随之增加。

$ CUDA_VISIBLE_DEVICES=0 python train_v2_multi_example.py
...
Ep 01/100 | step 02 | 0.473 sec/step | loss: 46485.430
Ep 01/100 | step 03 | 0.482 sec/step | loss: 9216.726

$ CUDA_VISIBLE_DEVICES=0,1 python train_v2_multi_example.py
...
Ep 01/100 | step 02 | 1.141 sec/step | loss: 22627.699
Ep 01/100 | step 03 | 1.091 sec/step | loss: 11679.490

$ CUDA_VISIBLE_DEVICES=0,1,2 python train_v2_multi_example.py
...
Ep 01/100 | step 02 | 1.408 sec/step | loss: 32166.996
Ep 01/100 | step 03 | 1.380 sec/step | loss: 14036.578
  • 在所有情况下,我检查了“每个副本”的输入和输出是否相同。
  • 我在这里重复相同的伪数据,所以我不认为这是与数据管道相关的问题。

为什么我在这里没有利用并行性的优势?有什么建议吗?

代码如下:

from __future__ import absolute_import,division,print_function,unicode_literals
import os,time,sys,numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import (Conv2D,Conv3D,Dense)


@tf.function
def loss_fn(y_pred,y_true):
    return tf.reduce_mean(tf.math.square(y_pred - y_true))

@tf.function
def train_step(dist_inputs):
    def step_fn(inputs):
        inputs,labels = inputs

        # tf.print("in",tf.shape(inputs),"out",tf.shape(labels),output_stream=sys.stdout)
        with tf.GradientTape() as tape:
            out = model(inputs)
            loss_value = loss_fn(out,labels)
        grads = tape.gradient(loss_value,model.trainable_weights)
        optimizer.apply_gradients(zip(grads,model.trainable_weights))
        return loss_value

    per_example_losses = strategy.experimental_run_v2(step_fn,args=(dist_inputs,))
    mean_loss = strategy.reduce(tf.distribute.ReduceOp.MEAN,per_example_losses,axis=None)
    return mean_loss


if __name__ == "__main__":

    BATCH_SIZE_PER_SYNC = 4
    logdir = os.path.join('logs/test')
    strategy = tf.distribute.MirroredStrategy()
    num_gpus = strategy.num_replicas_in_sync
    global_batch_size = BATCH_SIZE_PER_SYNC * num_gpus
    print('num GPUs: {},global batch size: {}'.format(num_gpus,global_batch_size))

    # fake data ------------------------------------------------------
    fakea = np.random.rand(global_batch_size,10,200,128).astype(np.float32)
    targets = np.random.rand(global_batch_size,14)

    # tf.Dataset ------------------------------------------------------
    def gen():
        while True:
            yield (fakea,targets)

    dataset = tf.data.Dataset.from_generator(gen,(tf.float32,tf.float32),(tf.TensorShape(fakea.shape),tf.TensorShape(targets.shape)))

    dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
    dist_dataset = strategy.experimental_distribute_dataset(dataset)

    # Model ------------------------------------------------------
    training = True
    with strategy.scope():
        # Model
        va = keras.Input(shape=(10,128),dtype=tf.float32,name='va')
        x = Conv3D(64,kernel_size=3,strides=1,padding='same')(va)
        x = Conv3D(64,padding='same')(x)
        x = Conv3D(64,padding='same')(x)
        x = tf.reduce_max(x,axis=1,name='maxpool')  # [ΣK,128]
        b = Conv2D(14,padding='same')(x)
        model = keras.Model(inputs=va,outputs=b,name='net')
        optimizer = keras.optimizers.RMSprop()
    model.summary()

    # TRAIN ---------------------------------------------------------
    writer = tf.summary.create_file_writer(logdir)

    num_steps = 100
    num_epoches = 100
    global_step = 0

    with strategy.scope():
        iterator = iter(dist_dataset)
        with writer.as_default():
            for epoch in range(num_epoches):
                for step in range(num_steps):

                    if global_step == 0 or 5 < global_step < 8:
                        tf.summary.trace_on(graph=True,profiler=True)

                    start = time.time()
                    loss_value = train_step(next(iterator))
                    duration = time.time() - start

                    prefix = 'Ep {:02d}/{:02d} | step {:02d} '.format(epoch + 1,num_epoches,step)
                    suffix = '| {:.3f} sec/step | loss: {:.3f} '.format(duration,float(loss_value))
                    print(prefix + suffix)

                    tf.summary.scalar("loss",loss_value,step=global_step)

                    if global_step == 0 or 5 < global_step < 8:
                        tf.summary.trace_export(name="model_trace",step=global_step,profiler_outdir=logdir)
                    writer.flush()
                    global_step += 1
wuhengqu520 回答:为什么最少的Tensorflow V2代码的计算时间会增加GPU的数量而不是并行运行?

暂时没有好的解决方案,如果你有好的解决方案,请发邮件至:iooj@foxmail.com
本文链接:https://www.f2er.com/3159959.html

大家都在问