TensorFlow2.x-distribute

Tensorflow distribute

Tensorflow2.0 单机多卡设置。

Tensorflow2.0 多GPU设置。

在Tensorflow2.0 中,有一个非常一个非常方便的函数 tf.distribute.MirroredStrategy()只需在代码中进行修改即可。

我们可以简单理解 单机多卡,就是将数据根据batch进行分发到每一张GPU上,然后单独计算loss,最后对loss进行汇总,然后进行反向传播,对每张GPU上的模型参数镜像 进行更新。

MirroredStrategy 的步骤如下:

  • 训练开始前,该策略在所有 N 个计算设备上均各复制一份完整的模型;

  • 每次训练传入一个批次的数据时,将数据分成 N 份,分别传入 N 个计算设备(即数据并行);

  • N 个计算设备使用本地变量(镜像变量)分别计算自己所获得的部分数据的梯度;

  • 使用分布式计算的 All-reduce 操作,在计算设备间高效交换梯度数据并进行求和,使得最终每个设备都有了所有设备的梯度之和;

  • 使用梯度求和的结果更新本地变量(镜像变量);

  • 当所有设备均更新本地变量后,进行下一轮训练(即该并行策略是同步的)。

其中,最主要的 就是下面3个函数。

1
2
3
4
5
6
7
8
# 创建strategy
strategy = tf.distribute.MirroredStrategy()

# 将train_step 分发到各个GPU, 并得到各GPU返回的句柄
loss = strategy.experimental_run_v2(train_step, args=(None,))

# 根据返回的句柄,将各GPU计算的loss进行汇总
strategy.reduce(tf.distribute.ReduceOp.SUM, loss, axis=None)

因此,在进行代码设计时,需要对每张GPU的loss计算进行,重新设计。

数据处理

数据在传入tf.data.Dataset,通过 strategy.experimental_distribute_dataset() 函数对数据进行处理,会自动根据GPU的数量进行平均分配。

loss 处理

在原有train_step 上 进行GPU distribute 封装。

通过strategy.experimental_run_v2 对train_step 在各个GPU上产生相应镜像, 并得到各个GPU计算出来的loss。

然后通过调用strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None) 来对各个GPU产生的loss 进行汇总sum。所以,我们在处理loss时 需要考虑总的GLOBAL_BATCH_SIZE。

1
2
3
4
5
@tf.function
def destribute_train_loss(dataset_input):
inp, targ, enc_hidden = dataset_input
per_replica_losses = strategy.experimental_run_v2(train_step, args=(inp, targ, enc_hidden))
return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)

参考代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import tensorflow as tf
from modeling import Decoder, Encoder
from data_process import load_dataset, max_length, preprocess_sentence_en, preprocess_sentence_zh
from sklearn.model_selection import train_test_split
import os
import time
import datetime
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
from tqdm import tqdm



strategy = tf.distribute.MirroredStrategy()
print(strategy.num_replicas_in_sync) #这里可以得到 GPU的数量

with strategy.scope():
input_tensor, target_tensor, inp_lang_tokenizer, targ_lang_tokenizer = load_dataset('cmn.txt')
# 得到input,targe中的最大长度
max_length_targ, max_length_inp = max_length(target_tensor), max_length(input_tensor)

input_tensor_train, input_tensor_val, target_tensor_train, target_tensor_val = train_test_split(input_tensor,
target_tensor,
test_size=0.2)
# 设置相关参数
EPOCHS = 100
BUFFER_SIZE = len(input_tensor_train) # 设置buffer大小
BATCH_SIZE = 64 # 设置batch——size 每张GPU读入的Batch—size
GLOBAL_BATCH_SZIE = BATCH_SIZE*strategy.num_replicas_in_sync #全局batch-size, 这在后面计算每个GLOBAL_BATCH loss 时要用到
steps_per_epoch = len(input_tensor_train) // BATCH_SIZE # 得到训练集中每一个epoch中 batch的个数
embedding_dim = 256 # 设置embedding的输出维度
units = 1024 # 设置GRU 的输出维度,也就是GRU内部中 9W 的维度

vocab_inp_size = len(inp_lang_tokenizer.word_index) + 1
vocab_tar_size = len(targ_lang_tokenizer.word_index) + 1
# 得到 train数据集
dataset_train = tf.data.Dataset.from_tensor_slices((input_tensor_train, target_tensor_train)).shuffle(BUFFER_SIZE)
dataset_train = dataset_train.batch(GLOBAL_BATCH_SZIE, drop_remainder=True) #按照GLOBAL_BATCH_SZIE 对数据进行划分 因为后面会根据GPU数量对数据进行平均分发到各个GPU
dataset_train = strategy.experimental_distribute_dataset(dataset_train)


# 定义encoder 和 decoder
encoder = Encoder(vocab_szie=vocab_inp_size, embedding_dim=embedding_dim, enc_units=units, batch_sz=BATCH_SIZE)
decoder = Decoder(vocab_size=vocab_tar_size, embedding_dim=embedding_dim, dec_units=units, batch_sz=BATCH_SIZE)

# 定义loss 和 optimizer
optimizer = tf.keras.optimizers.Adam()
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction=tf.keras.losses.Reduction.NONE)
#Reduction 设置为None

# train_loss = tf.keras.metrics.Mean(name='train_loss')



# 计算 去除mask后的loss 的平均值
def loss_function(real, pred):
mask = tf.math.logical_not(tf.math.equal(real, 0))
loss_ = loss_object(real, pred)

mask = tf.cast(mask, dtype=tf.float32)

loss_ *= mask
return tf.reduce_mean(loss_)
# return tf.nn.compute_average_loss(per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE)


# 定义checkpoint, checkpoint只保存模型参数
checkpoint_dir = './train_checkpoint'
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt')
checkpoint = tf.train.Checkpoint(optimizer=optimizer,
encoder=encoder,
decoder=decoder)


# 定义train_step

def train_step(inp, targ, enc_hidden):
loss = 0
with tf.GradientTape() as tape:
enc_output, enc_hidden = encoder((inp, enc_hidden))
dec_hidden = enc_hidden
dec_input = tf.expand_dims([targ_lang_tokenizer.word_index['<start>']] * BATCH_SIZE, 1)

for t in range(1, targ.shape[1]):
predictions, dec_hidden, _ = decoder((dec_input, dec_hidden, enc_output))
loss += loss_function(targ[:, t], predictions)
# Teacher forcing
dec_input = tf.expand_dims(targ[:, t], 1)

batch_loss = (loss / int(targ.shape[1]))
variables = encoder.trainable_variables + decoder.trainable_variables

gradients = tape.gradient(loss, variables)
optimizer.apply_gradients(zip(gradients, variables))

return batch_loss



@tf.function
def destribute_train_loss(dataset_input):
inp, targ, enc_hidden = dataset_input
per_replica_losses = strategy.experimental_run_v2(train_step, args=(inp, targ, enc_hidden))
return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
# strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None) 将每个GPU的输出进行汇总sum。


# 开始训练

for epoch in range(EPOCHS):
start = time.time()
enc_hidden = encoder.initialize_hidden_state()
train_loss = 0
gbatch = 0
for (inp, tar) in tqdm(dataset_train):

train_loss += destribute_train_loss((inp, tar, enc_hidden))
gbatch+=1

if gbatch % 50 == 0:
template = "Epoch {} Batch {} loss {:.4f} "
tf.print(template.format(epoch + 1, gbatch, train_loss/(2.0*gbatch)))


tf.print("Epoch {} loss {:4f} ".format(epoch + 1, train_loss/(2.0*gbatch)))
tf.print("Time take for 1 epoch {} sec\n".format(time.time() - start))


参考:

https://www.tensorflow.org/tutorials/distribute/custom_training

https://www.tensorflow.org/api_docs/python/tf/distribute/Strategy?version=stable

https://tf.wiki/zh/appendix/distributed.html