06 处理粘包问题

处理粘包问题

常用的处理方式是,使用tlv协议,即消息id+消息长度+消息内容。简化的方案就是,消息长度+消息内容。

方案实现

首先要注意的是,发送的时候就要按照顺序发送,即通过队列发送,接收就能按照顺序接收,包就能按照顺序进行切包处理。

MsgNode设计

  1. 在数组中的前两个字节设置为内容的长度。
  2. 后续的字节存储消息内容
  3. 设置发送长度为2 + 消息内容length
class MsgNode
{
public:
using str_len_type = short;
MsgNode(char* msg, str_len_type max_len) : _total_len(max_len)
{
_msg = new char[_total_len + 1];
memcpy(_msg, msg, max_len);
_msg[_total_len] = '\0';
}

MsgNode(char* msg) : MsgNode(msg, strlen(msg) - 1) {}

MsgNode(str_len_type max_len) : _total_len(max_len)
{
_msg = new char[_total_len + 1];
::memset(_msg, 0, _total_len + 1);
}

~MsgNode()
{
delete[] _msg;
}

void Clear()
{
::memset(_msg, 0, _total_len);
_cur_len = 0;
}
private:
char* _msg = nullptr;
int _total_len;
int _cur_len = 0;
public:
constexpr int HEADER_LEN = sizeof(str_len_type);
};

注意点:包的长度

发包不建议发高频率的小包,心跳建议在10ms以上。发包也不建议一次性发送很大的包。

接收包HandleRead

接收包的思路是,先接收并解析头部长度,再根据头部长度,去读取剩余长度。如果继续接收消息

  • 收到的消息长度小于需要的长度,将消息存储拷贝好,挂起下一次读。
  • 收到的消息长度大于需要的长度,则将本次包的消息补全后,将剩余的数据交给下一次的头部解析的循环。
if(err)
{
// ...
return;
}

int copy_len = 0; // 已经处理的字节数
while(byte_transferred > 0)
{
if(!b_head_parse)
{
// 收到的总数据不足头部大小,直接拷贝到node中
if(byte_transferred + _recv_head_node->_cur_len < MsgNode::HEADER_LEN)
{
memcpy(_recv_head_node->_msg + _recv_head_node->_cur_len, _data + copy_len, byte_transferred);
_recv_head_node->_cur_len += byte_transferred;
::memset(_data, 0, MAX_LENGTH);
// 挂起新的read事件
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::HandleRead, this, std::placeholder::_1, std::placeholders::_2, shared_from_this()));
return;
}

// 收到了至少头部长度,先把头部补齐
int head_remain = MsgNode::HEADER_LEN - _recv_head_node->_cur_len;
memcpy(_recv_head_node->_msg + _recv_head_node->_cur_len, _data + copy_len, head_remain);

copy_len += head_remain;
bytes_transferred -= head_remain;

// 解析头部长度
short data_len = 0;
memcpy(&data_len, _recv_head_node->_msg, MsgNode::HEADER_LEN);


// head length legal test
if(data_len > MAX_LENGTH)
{
std::cout << "invalid data length is " << data_len << '\n';
_server->ClearSession(_uuid);
return;
}

// 正式开始处理
_recv_msg_node = make_shared<MsgNode>(data_len);
// 数据不足,全部拷贝,继续监听
if(bytes_transferred < data_len)
{
memcpy(_recv_msg_node->_msg + _recv_msg_node->_cur_len, _msg + copy_len, bytes_transferred);
_recv_msg_node->_cur_len += bytes_transferred;
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_from_this()));
b_head_parse = true;
return;
}
}

// 头部解析完成,并且本次数据至少满足data_len
memcpy(_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, data_len);
_recv_msg_node->_cur_len += data_len;
copy_len += data_len;
bytes_transferred -= data_len;
_recv_msg_node->data[_recv_msg_node->_total_len] = '\0';

std::cout << "This Pkg data is " << _recv_msg_node->_msg << '\n';

// 本次包处理到此完成,继续处理剩下的数据
b_head_parse = false;
_recv_head_node->Clear();
}

// 刚好处理完成
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENTH), std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_from_this()));

发送包设计

发送设计只需在发送的基础上,先发送2字节的长度。