如何在GPU集群(multi-gpu)上训练CNN?

我正在使用以下代码在具有UCF101数据集的单个GPU上训练cnn,但是由于数据集的大小,它需要花费很长时间来训练。

def _get_data_label_from_info(train_info_tensor,name,mode):
    """ Wrapper for `tf.py_func`,get video clip and label from info list."""
    clip_holder,label_holder = tf.py_func(
        process_video,[train_info_tensor,mode],[tf.float32,tf.int64])
    return clip_holder,label_holder


def process_video(data_info,mode,is_training=True):
    """ Get video clip and label from data info list."""
    data = action_Dataset(name,[data_info])
    if is_training:
        clip_seq,label_seq = data.next_batch(1,_CLIP_SIZE)
    else:
        clip_seq,label_seq = data.next_batch(
            1,_EACH_VIDEO_TEST_SIZE+1,shuffle=False,data_augment=False)
    clip_seq = 2*(clip_seq/255) - 1
    clip_seq = np.array(clip_seq,dtype='float32')
    return clip_seq,label_seq


def main(dataset='ucf101',mode='rgb',split=1):
    assert mode in ['rgb','flow'],'Only RGB data and flow data is supported'
    log_dir = os.path.join(_LOG_ROOT,'finetune-%s-%s-%d' %
                           (dataset,split))
    if not os.path.exists(log_dir):
        os.makedirs(log_dir)
    logging.basicConfig(level=logging.INFO,filename=os.path.join(log_dir,'log.txt'),filemode='w',format='%(message)s')

    ##  Data Preload  ###
    train_info,test_info = split_data(
        os.path.join('./data',dataset,mode+'.txt'),os.path.join('./data','testlist%02d' % split+'.txt'))
#        os.path.join('/data1/yunfeng/i3d_test/data',#        os.path.join('/data1/yunfeng/i3d_test/data','testlist%02d' % split+'.txt'))
    train_data = action_Dataset(dataset,train_info)
    test_data = action_Dataset(dataset,test_info)

    num_train_sample = len(train_info)
    # Every element in train_info is shown as below:
    # ['v_ApplyEyeMakeup_g08_c01',# '/data4/zhouhao/dataset/ucf101/jpegs_256/v_ApplyEyeMakeup_g08_c01',# '121','0']
    train_info_tensor = tf.constant(train_info)
    test_info_tensor = tf.constant(test_info)

    # Dataset building
    # Phase 1 Trainning
    # one element in this dataset is (train_info list)
    train_info_dataset = tf.data.Dataset.from_tensor_slices(
        (train_info_tensor))
    # one element in this dataset is (single image_postprocess,single label)
    # one element in this dataset is (batch image_postprocess,batch label)
    train_info_dataset = train_info_dataset.shuffle(
        buffer_size=num_train_sample)
    train_dataset = train_info_dataset.map(lambda x: _get_data_label_from_info(
        x,mode),num_parallel_calls=_NUM_PARALLEL_CALLS)
    train_dataset = train_dataset.repeat().batch(_BATCH_SIZE)
    train_dataset = train_dataset.prefetch(buffer_size=_PREFETCH_BUFFER_SIZE)

    # Phase 2 Testing
    # one element in this dataset is (train_info list)
    test_info_dataset = tf.data.Dataset.from_tensor_slices(
        (test_info_tensor))
    # one element in this dataset is (single image_postprocess,single label)
    test_dataset = test_info_dataset.map(lambda x: _get_data_label_from_info(
        x,num_parallel_calls=_NUM_PARALLEL_CALLS)
    # one element in this dataset is (batch image_postprocess,batch label)
    test_dataset = test_dataset.batch(1).repeat()
    test_dataset = test_dataset.prefetch(buffer_size=_PREFETCH_BUFFER_SIZE)

    # iterator = dataset.make_one_shot_iterator()
    # clip_holder,label_holder = iterator.get_next()
    iterator = tf.data.Iterator.from_structure(
        train_dataset.output_types,train_dataset.output_shapes)
    train_init_op = iterator.make_initializer(train_dataset)
    test_init_op = iterator.make_initializer(test_dataset)

    clip_holder,label_holder = iterator.get_next()
    clip_holder = tf.squeeze(clip_holder,[1])
    label_holder = tf.squeeze(label_holder,[1])
    clip_holder.set_shape(
        [None,None,_FRAME_SIZE,_CHANNEL[mode]])
    dropout_holder = tf.placeholder(tf.float32)
    is_train_holder = tf.placeholder(tf.bool)

    # inference module
    # Inference Module
    with tf.variable_scope(_SCOPE[train_data.mode]):
        # insert i3d model
        model = i3d.InceptionI3d(
            400,spatial_squeeze=True,final_endpoint='Logits')
        # the line below outputs the final results with logits
        # __call__ uses _template,and _template uses _build when defined
        logits,_ = model(clip_holder,is_training=is_train_holder,dropout_keep_prob=dropout_holder)
        logits_dropout = tf.nn.dropout(logits,dropout_holder)
        # To change 400 classes to the ucf101 or hdmb classes
        fc_out = tf.layers.dense(
            logits_dropout,_CLASS_NUM[dataset],use_bias=True)
        # compute the top-k results for the whole batch size
        is_in_top_1_op = tf.nn.in_top_k(fc_out,label_holder,1)

    # Loss calculation,including L2-norm
    variable_map = {}
    train_var = []
    for variable in tf.global_variables():
        tmp = variable.name.split('/')
        if tmp[0] == _SCOPE[train_data.mode] and 'dense' not in tmp[1]:
            variable_map[variable.name.replace(':0','')] = variable
        if tmp[-1] == 'w:0' or tmp[-1] == 'kernel:0':
            weight_l2 = tf.nn.l2_loss(variable)
            tf.add_to_collection('weight_l2',weight_l2)
    loss_weight = tf.add_n(tf.get_collection('weight_l2'),'loss_weight')
    loss = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(
        labels=label_holder,logits=fc_out))
    total_loss = loss + _WEIGHT_OF_LOSS_WEIGHT * loss_weight
    tf.summary.scalar('loss',loss)
    tf.summary.scalar('loss_weight',loss_weight)
    tf.summary.scalar('total_loss',total_loss)

    # Import Pre-trainned model
    saver = tf.train.Saver(var_list=variable_map,reshape=True)
    saver2 = tf.train.Saver(max_to_keep=_SAVER_MAX_TO_KEEP)
    # Specific Hyperparams
    # steps for training: the number of steps on batch per epoch
    per_epoch_step = int(np.ceil(train_data.size/_BATCH_SIZE))
    # global step constant
    global_step = _GLOBAL_EPOCH * per_epoch_step
    # global step counting
    global_index = tf.Variable(0,trainable=False)

    # Set learning rate schedule by hand,also you can use an auto way
    boundaries = [10000,20000,30000,40000,50000]
    values = [_LEARNING_RATE,0.0008,0.0005,0.0003,0.0001,5e-5]
    learning_rate = tf.train.piecewise_constant(
        global_index,boundaries,values)
    tf.summary.scalar('learning_rate',learning_rate)

    # Optimizer set-up
    # FOR BATCH norm,we then use this updata_ops
    update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
    with tf.control_dependencies(update_ops):
        optimizer = tf.train.MomentumOptimizer(learning_rate,_MOMENTUM).minimize(total_loss,global_step=global_index)
    sess = tf.Session()
    merged_summary = tf.summary.merge_all()
    train_writer = tf.summary.FileWriter(log_dir,sess.graph)
    sess.run(tf.global_variables_initializer())
    sess.run(train_init_op)
    saver.restore(sess,_CHECKPOINT_PATHS[train_data.mode+'_imagenet'])

    print('----Here we start!----')
    print('Output wirtes to ' + log_dir)
    # logging.info('----Here we start!----')
    step = 0
    # for one epoch
    true_count = 0
    # for 20 batches
    tmp_count = 0
    accuracy_tmp = 0
    epoch_completed = 0
    while step <= global_step:
        step += 1
        start_time = time.time()
        _,loss_now,loss_plus,is_in_top_1,summary = sess.run(
            [optimizer,total_loss,loss_weight,is_in_top_1_op,merged_summary],feed_dict={dropout_holder: _DROPOUT,is_train_holder: True})
        duration = time.time() - start_time
        tmp = np.sum(is_in_top_1)
        true_count += tmp
        tmp_count += tmp
        train_writer.add_summary(summary,step)
        # responsible for printing relevant results
        if step % _OUTPUT_STEP == 0:
            accuracy = tmp_count / (_OUTPUT_STEP * _BATCH_SIZE)
            print('step: %-4d,loss: %-.4f,accuracy: %.3f (%.2f sec/batch)' %
                  (step,accuracy,float(duration)))
            logging.info('step: % -4d,loss: % -.4f,\
                             accuracy: % .3f ( % .2f sec/batch)' %
                         (step,float(duration)))
            tmp_count = 0
        if step % per_epoch_step == 0:
            epoch_completed += 1
            accuracy = true_count / (per_epoch_step * _BATCH_SIZE)
            print('Epoch%d,train accuracy: %.3f' %
                  (epoch_completed,accuracy))
            logging.info('Epoch%d,train accuracy: %.3f' %
                         (train_data.epoch_completed,accuracy))
            true_count = 0
            if step % per_epoch_step == 0 and accuracy > _RUN_TEST_THRESH:
                sess.run(test_init_op)
                true_count = 0
                # start test process
                print(test_data.size)
                for i in range(test_data.size):
                    # print(i,true_count)
                    is_in_top_1 = sess.run(is_in_top_1_op,feed_dict={dropout_holder: 1,is_train_holder: False})
                    true_count += np.sum(is_in_top_1)
                accuracy = true_count / test_data.size
                true_count = 0
                # to ensure every test procedure has the same test size
                test_data.index_in_epoch = 0
                print('Epoch%d,test accuracy: %.3f' %
                      (epoch_completed,accuracy))
                logging.info('Epoch%d,test accuracy: %.3f' %
                             (train_data.epoch_completed,accuracy))
                # saving the best params in test set
                if accuracy > _SAVE_MODEL_THRESH:
                    if accuracy > accuracy_tmp:
                        accuracy_tmp = accuracy
                        saver2.save(sess,os.path.join(log_dir,test_data.name+'_'+train_data.mode +
                                                       '_{:.3f}_model'.format(accuracy)),step)
                sess.run(train_init_op)
    train_writer.close()
    sess.close()

现在,我想在具有10个节点(每个节点都带有i7 7700 CPU,GTX1060 6GB GPU,and 16GB of RAM)的GPU群集上训练cnn,如何最好地调整代码以最大化资源并减少训练时间?

Python 3.7.1,Tensorflow 1.14

完整的存储库位于https://github.com/USTC-Video-Understanding/I3D_Finetune

m13432900836 回答:如何在GPU集群(multi-gpu)上训练CNN?

有多种方法可以在多个设备上训练单个模型。

  1. 模型并行性:在设备上划分模型的位置。
  2. 数据并行性:在每个设备上复制模型,并在数据的子集上训练每个副本。

在这种情况下,由于同一台计算机上有多个GPU,因此可以使用 Tensorflow的分布式策略

下面是说明分布式训练的伪代码。

mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0","/gpu:1"]) #list all the devices you want to use. 
with distribution.scope():
  mirrored_model = tf.keras.Sequential([...])
  mirrored_model.compile([...])
batch_size = 100 # must be divisible by the number of replicas
history = mirrored_model.fit(X_train,y_train,epochs=10) 

以上过程将在所有可用的GPU设备上复制所有变量和操作。并且fit()方法将自动将每个训练批次分配到所有副本中,非常重要的是,批次大小可被列出的GPU数量除以predict()方法也是如此。 调用save()方法后,该模型将另存为常规模型,并且默认情况下会在单个设备上加载。
如果希望模型在所有可用设备上加载,则可以在如下所示的分发范围上下文中使用它。

with distribution.scope():
   mirrored_model = tf.keras.models.load_model("my_mnist_model.h5")
本文链接:https://www.f2er.com/1459298.html

大家都在问