nlp

RLHF Cookbook

RLHF详解

Posted by Kylin on March 30, 2024

[TOC]

为什么用RL?

使用强化学习(而非监督学习)的方式更新语言模型,最大的优势是在于能够使得「模型更加自由的探索更新方向,从而突破监督学习的性能天花板」。

截屏2024-03-31 08.13.11

RLHF

整体流程

640

这里的reward我们使用判别模型输出,以降低人工成本

1) 生成采样(Rollout)

640-1

# prompt池
prompts = [
    '刚收到货,感觉',
    '这部电影很',
    '说实话,真的很',
    '这次购物总的来说体验很'
]
...

for _ in range(config['batch_size']):
        random_prompt = random.choice(prompts)                                  # 随机选择一个prompt
        tokens = gpt2_tokenizer.encode(random_prompt)
        batch['tokens'].append(tokens)
        batch['query'].append(random_prompt)
query_tensors = [torch.tensor(t).long().to(device) for t in batch["tokens"]]
...

for i in range(config['batch_size']):
    gen_len = config['gen_len']
    response = gpt2_model.generate(query_tensors[i].unsqueeze(dim=0),           # 利用当前选择的prompt生成句子
                                   max_new_tokens=gen_len, **gen_kwargs)
    response_tensors.append(response.squeeze()[-gen_len:])

为了保证生成句子的多样性,我们设定了一个 prompt 池,从中随机选择一个进行生成,获得多个模型的生成结果。

2)Reward 评估(Evaluation)

640-2

利用roberta进行情感分类打分

# 情绪识别模型初始化
senti_tokenizer = AutoTokenizer.from_pretrained('uer/roberta-base-finetuned-jd-binary-chinese')
senti_model = AutoModelForSequenceClassification.from_pretrained('uer/roberta-base-finetuned-jd-binary-chinese')
sentiment_pipe = pipeline('sentiment-analysis', model=senti_model, tokenizer=senti_tokenizer, device=pipe_device)
...

# 将 prompt 和生成的 response 做拼接
texts = [q + r for q,r in zip(batch['query'], batch['response'])]           
pipe_outputs = sentiment_pipe(texts)           # 计算正向/负向情感得分

3)模型迭代(Optimization)

640-3

ppo_trainer.step(query_tensors, response_tensors, rewards)      # PPO Update

PPO具体算法参考1

具体来说,PPO在更新时一共会计算 2 个 loss:pg_loss、value_loss:

loss_p, loss_v, train_stats  = self.loss(logprobs, values, rewards, query, response, model_input)
loss = loss_p + loss_v
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
...
pg_loss

pg_loss 是 PPO 中 actor 的 loss 函数,其通过 discount reward 和 importance ratio 来计算当前 step 的 reward 应该是多少: \(l o s s_{p g}=\frac{P_{\pi_{\text {new }(t \text { token })}}}{P_{\pi_{\text {old }}(t o k e n)}}\left(r+\gamma V_{\text {next }}-V_{\text {current }}\right)\) 其中,importance ratio 是指产生同样的 token,在 active actor model 和 reference actor model 下的概率比值,这也是 PPO 模型中的 Importance Sampling 系数。

for t in reversed(range(gen_len)):
    nextvalues = values[:, t + 1] if t < gen_len - 1 else 0.0
    delta = rewards[:, t] + self.ppo_params['gamma'] * nextvalues - values[:, t]          # 优势函数:r + Vnext - V
    lastgaelam = delta + self.ppo_params['gamma'] * self.ppo_params['lam'] * lastgaelam   # GAE, 用于平衡 bias 和 variance
    advantages_reversed.append(lastgaelam)
    advantages = torch.stack(advantages_reversed[::-1]).transpose(0, 1)

logits, _, vpred = self.model(model_input)                                  # 跑一遍模型,得到句子中每个token被选择的概率
logprob = logprobs_from_logits(logits[:,:-1,:], model_input[:, 1:])         # 将概率取log对数
ratio = torch.exp(logprob - old_logprobs)                                   # log相减,等同于概率相除
pg_losses = -advantages * ratio
value_loss

value_loss 是 PPO 中 critic 的 loss 函数,其目的在于评判每一个 token 被生成后的 value 是多少。

这是因为在 PPO 中需要有一个 critic 网络,为了实现这个效果,我们需要对 GPT 模型进行改造。

我们在 GPT 中加入一个 Value Head,用于将 hidden_size 向量映射到一个 1 维的 value 向量:

class GPT2HeadWithValueModel(GPT2PreTrainedModel):
    """The GPT2HeadWithValueModel class implements a GPT2 language model with a secondary, scalar head."""
    def __init__(self, config):
        super().__init__(config)
        config.num_labels = 1
        self.transformer = GPT2Model(config)
        self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias=False)
        self.v_head = ValueHead(config)                                       # 添加 Value Head
        self.init_weights()
    ...

class ValueHead(nn.Module):
    """The ValueHead class implements a head for GPT2 that returns a scalar for each output token."""

    def __init__(self, config):
        super().__init__()
        self.summary = nn.Linear(config.hidden_size, 1)                        # (hidden_size -> 1)
    ...

value_loss 就应该等于 Value Head 产生的预测值 v_pred 和真实值 r + v_next 之间的差值: \(\text { loss }_{\text {value }}=\left\|V_{\text {pred }}-\left(r+V_{\text {next }}\right)\right\|\)

returns = advantages + values                      # r + v_next - v + v => r + v_next
logits, _, vpred = self.model(model_input)         # 跑一遍语言模型,得到每个 token 的 v_pred
vf_losses1 = (vpred - returns) ** 2                # MSE

Reward Model

ChatGPT 中,奖励模型是通过人工标注的「排序序列」来进行训练的:

640-4

把任务转为一个排序问题,以规避主观绝对分数标准不确定的问题:

假定现在有一个排好的序列:A > B > C >D。

我们需要训练一个打分模型,模型给四句话打出来的分要满足 r(A) > r(B) > r(C) > r(D)。

那么,我们可以使用下面这个损失函数: \(\operatorname{loss}(\theta)=-\frac{1}{\left(\begin{array}{c} K \\ 2 \end{array}\right)} E_{\left(x, y_w, y_l\right) \sim D}\left[\log \left(\sigma\left(r_\theta\left(x, y_w\right)-r_\theta\left(x, y_l\right)\right)\right)\right]\) 用上述例子(A > B > C > D)来讲,loss 应该等于:

loss = r(A) - r(B) + r(A) - r(C) + r(A) - r(D) + r(B) - r(C) + ... + r(C) - r(D)
loss = -loss

为了更好的归一化差值,我们对每两项差值都过一个 sigmoid 函数将值拉到 0 ~ 1 之间。可以看到,loss 的值等于排序列表中所有「排在前面项的reward」减去「排在后面项的reward」的和。

而我们希望模型能够「最大化」这个「好句子得分」和「坏句子得分」差值,而梯度下降是做的「最小化」操作。因此,我们需要对 loss 取负数,就能实现「最大化差值」的效果了。

class RewardModel(nn.Module):
    def __init__(self, encoder):
        """
        init func.

        Args:
            encoder (transformers.AutoModel): backbone, 默认使用 ernie 3.0
        """
        super().__init__()
        self.encoder = encoder
        self.reward_layer = nn.Linear(768, 1)            # reward layer 用于映射到 1 维 reward

    def forward(
        self,
        input_ids: torch.tensor,
        token_type_ids: torch.tensor,
        attention_mask=None,
        pos_ids=None,
    ) -> torch.tensor:
        """
        forward 函数,返回每句话的得分值。

        Args:
            input_ids (torch.tensor): (batch, seq_len)
            token_type_ids (torch.tensor): (batch, seq_len)
            attention_mask (torch.tensor): (batch, seq_len)
            pos_ids (torch.tensor): (batch, seq_len)

        Returns:
            reward: (batch, 1)
        """
        pooler_output = self.encoder(
            input_ids=input_ids,
            token_type_ids=token_type_ids,
            position_ids=pos_ids,
            attention_mask=attention_mask,
        )["pooler_output"]                              # (batch, hidden_size)
        reward = self.reward_layer(pooler_output)       # (batch, 1)
        return reward

计算 rank_loss 函数如下,因为样本里的句子已经默认按从高到低得分排好,因此我们只需要遍历的求前后项的得分差值加起来即可:

def compute_rank_list_loss(rank_rewards_list: List[List[torch.tensor]], device='cpu') -> torch.Tensor:
    """
    通过给定的有序(从高到低)的ranklist的reward列表,计算rank loss。
    所有排序高的句子的得分减去排序低的句子的得分差的总和,并取负。
Args:
    rank_rewards_list (torch.tensor): 有序(从高到低)排序句子的reward列表,e.g. -> 
                                    [
                                        [torch.tensor([0.3588]), torch.tensor([0.2481]), ...],
                                        [torch.tensor([0.5343]), torch.tensor([0.2442]), ...],
                                        ...
                                    ]
    device (str): 使用设备

Returns:
    loss (torch.tensor): tensor([0.4891], grad_fn=<DivBackward0>)
"""
if type(rank_rewards_list) != list:
    raise TypeError(f'@param rank_rewards expected "list", received {type(rank_rewards)}.')

loss, add_count = torch.tensor([0]).to(device), 0
for rank_rewards in rank_rewards_list:
    for i in range(len(rank_rewards)-1):                                   # 遍历所有前项-后项的得分差
        for j in range(i+1, len(rank_rewards)):
            diff = F.sigmoid(rank_rewards[i] - rank_rewards[j])            # sigmoid到0~1之间
            loss = loss + diff
            add_count += 1
loss = loss / add_count
return -loss  

Further Strategy

Reference

  1. 强化学习之PPO算法. https://zhuanlan.zhihu.com/p/468828804