SRT协议控制包处理源码解读
SRT协议控制包处理源码解读
本⽂为个⼈在看协议栈时候的理解,不⼀定对,还望发现错误能够及时指正
ACK包
1. 如果ACK包长度为SEND_LITE_ACK(4)
该包在srt中被称为SEND LITE ACK,仅包含ACK的seq
取出当前收到ACK seq(ack),同已发送的ACK seq(m_iSndLastAck)⽐较,使⽤seqcmp函数⽐较,seqcmp⽐较⽅式如下(abs(seq1 - seq2) < m_iSeqNoTH) ? (seq1 - seq2) : (seq2 - seq1) 其中m_iSeqNoTH = 0x3FFFFFFF为最⼤seq,超过最⼤seq从1开始
若⽐较返回值值⼤于等于0,则作出以下修改
1. 滑动窗⼝⼤⼩(m_iFlowWindowSize)
重设为m_iFlowWindowSize - CSeqNo::seqoff(m_iSndLastAck, ack)
其中seqoff为common.h中定义的获取调整值的公⽤函数
2. 上⼀个发送的ACK seq(m_iSndLastAck)调整,供下⼀次使⽤,m_iSndLastAck = ack
3. 上⼀次回复ACK时间(m_ullLastRspAckTime_tk)调整为m_ullLastRspAckTime_tk = currtime_tk 4. 重传包数量(m_iReXmitCount)重置,由于ACK seq已置为收到的ACK seq,所以⽆需重传,m_iReXmitCount = 1
处理结束
2. 普通ACK包
蜂衣
普通ACK包处理过程
1. 判断是否需要回复ACKACK包,两个判断条件其中之⼀为真则进⾏如下处理 1. 回复ACK的ACKACK包回复时间是否⼤于最⼩时间间隔(COMM_SYN_INTERVAL_US = 10000)(推测应该是为了防⽌每
次都发送ACKACK,同ACK⼀段时间再回复的原理⼀致),⼤于该间隔则重发ACKACK
2. 判断ACK是否同上次回复的ACKACK相同,如果相同,说明ACKACK对⽅没有收到,重发
3. 重发做两件事情,第⼀是重置ACKACK seq(m_iSndLastAck2)为当前回复的ACK seq,第⼆则是记录该ACKACK包回复
时间(m_ullSndLastAck2Time)为当前时间,⽅便下次判断
2. 验证ACK包seq的有效性,⽆效包则进⾏如下处理
1. 设置m_bBroken为真,连接会断开
2. m_iBrokenCounter置0(作⽤暂时未知)
3. 取出ACK seq,同上⼀次的ACK seq(m_iSndLastAck)⽐较,依旧是使⽤seqcmp函数⽐较
1. 滑动窗⼝m_iFlowWindowSize设置为ackdata[ACKD_BUFFERLEFT]的值,推测是及时调整滑动窗⼝⼤⼩来达到限速的⽬的
2. m_iSndLastAck调整为当前ACK seq的值
3. 上⼀次回复ACK时间(m_ullLastRspAckTime_tk)调整为m_ullLastRspAckTime_tk = currtime_tk
4. 重传包数量(m_iReXmitCount)重置,由于ACK seq已置为收到的ACK seq,所以⽆需重传,m_iReXmitCount = 1
4. 判断ACK是否重复,重复则进⾏如下操作
1. 直接丢弃当前包,不处理
5. 处理⾄此,说明该ACK包前的序列为完整,则设置m_iSndLastFullAck = ack
6. 判断上⼀个ACK seq和当前ACK seq有⽆偏移,偏移⼤于0则做如下处理
1. 移除当前的ACK seq之前的数据,并且调整上⼀个ACK seq
2. 对srt信息做记录,总发送时间等
3. 将当前ACK seq的包从重发列表移除
7. 告知所有等待的epoll更新其状态,开始写
8. 将当前socket插⼊等待队列(如果socket没有在队列的话)
9. 对控制包长度进⾏判断
1. 如果包长不是int32_t长度的整数倍,截取到整数倍为⽌
2. 如果⼩于最⼩ACK包长度,报错,丢弃
10. 获取ACK包中的rtt,进⾏调整RTT⽅差和RTT
1. m_iRTTVar = (m_iRTTVar * (4 - 1) + abs(rtt - m_iRTT)) / 4
2. m_iRTT = (m_iRTT * (4 - 1) + rtt) / 4
11. 判断包长度是否⼤于最⼩ACK包长度,如果这意味着ACKD_RCVSPEED和ACKD_BANDWIDTH字段是可⽤的
1. m_iBandwidth = (m_iBandwidth * (8 - 1) + bandwidth) / 8
2. m_iDeliveryRate = (m_iDeliveryRate * (8 - 1) + pktps) / 8
3. m_iByteDeliveryRate = (m_iByteDeliveryRate * (8 - 1) + bytesps) / 8
12. 更新srt的总的状态信息
ACKACK包
回复ACK的包
1. 调整RTT⽅差
m_iRTTVar = (m_iRTTVar * 3 + abs(rtt - m_iRTT)) >> 2
2. 调整RTT
m_iRTT = (m_iRTT * 7 + rtt) >> 3
3. ⽐较ACKACK和上⼀个收到的ACKACK,如果较新,则更新
LOSSREPORT包
只有该包会导致重传,⽤于报告重传的包
1. 判断losslist的长度,losslist可能存在多个list,list可能是单个seq,也可能为⼀个range
2. 循环处理单个seq或者⼀个seq range,判别依据为第⼀位是否设置,设置则为seq range
1. seq range
获取seq low和seq high,检查和上⼀个发送的ACK seq之间的差距,将差的部分添加⾄m_pSndLosslist,记录重发包数量
2. 单个seq
判断是否⼤于上⼀个ACK,⼤于才重发,插⼊m_pSndLosslist,记录重发包数量
3. 更新srt的重发包数量
4. 通知m_pSndUList⽴即更新,发送这些丢失的包
5. 更新srt状态中NAK包的数量
CGWARNING包
延时增加的警告包,srt⾃⾝并不会发送该类型的包,可能由⽤户控制,预测ACKACK有⼀个地⽅可能会⽤到这种类型的包,但是被注释掉了
1. 调整包间时间间隔m_ullInterval_tk = (uint64_t)ceil(m_ullInterval_tk * 1.125)
2. 更新上⼀次减缓发送的seq序号m_iLastDecSeq = m_iSndCurrSeqNo
KEEPALIVE包
不做任何处理,仅是⼀个提⽰包,1.3.4版本此包内部⽆逻辑
HANDSHAKE
握⼿的包有好⼏种,分别为INDUCTION、CONCLUSION、WAVEAHAND和AGREEMENT,其中INDUCTION、WAVEAHAND两种类型的包为第⼀部分握⼿发送的包,CONCLUSION为第⼆部分握⼿时候发送的包,AGREEMENT为额外的握⼿包
// Note: the client-server connection uses:
// --> INDUCTION (empty)
// <-- INDUCTION (cookie)
接菜// --> CONCLUSION (cookie)
// <-- CONCLUSION (ok)
// The rendezvous HSv4 (legacy):
// --> WAVEAHAND (effective only if peer is also connecting)
// <-- CONCLUSION (empty) (consider yourself connected upon reception)
// --> AGREEMENT (sent as a response for conclusion, requires no response)
// The rendezvous HSv5 (using SRT extensions):
// --> WAVEAHAND (with cookie)
// --- (selecting INITIATOR/RESPONDER by cookie contest - comparing one another's cookie)
// <-- CONCLUSION (without extensions, if RESPONDER, with extensions, if INITIATOR)
// --> CONCLUSION (with response extensions, if RESPONDER)
// <-- AGREEMENT (sent exclusively by INITIATOR upon reception of CONCLUSIOn with response extensions)
1. 判断包为第⼀部分的包(INDUCTION、WAVEAHAND)或者rendezvous模式下的⾮AGREEMENT包,如果均不是,则说明包⾮
HANDSHAKE,忽略
2. 初始化ISN、MSS、socket等,回复包类型设置为URQ_INDUCTION或URQ_AGREEMENT(依据连接⽅式决定)
3. 判断版本为UDT4还是UDT5,将version更新到信息中
如果版本为UDT5,除了设置版本之外,还将做额外的⼯作,判断解析内容,如果内容解析出现问题,则认为是⾮法的握⼿
包,version置0,设置回复类型为SRT_REJ_ROGUE
4. 初始化extension参数,表明回复中有额外内容,如果是reject的包则该值为false
5. 组织response,设置回复包为handshake包,申请空间,填充信息,PeerID,时间戳等,并插⼊队列等待发送,更新上⼀个包发送
时间为当前时间,解析完成
蓄电池模拟器
SHUTDOWN
收到SHUTDOWN包,连接将会断开
1. 设置shutdown、closing、broken、brokenCounter等参数,为关闭做准备
2. 给发送者和接受者发送信号,如果他们处于等待数据的状态,其实就是个上锁做⼀些⼯作再释放的过程
3. 更新事件,让事件队列清空
4. 通知事件线程
DROPREQ
丢弃包消息,收到该包会将丢包重传等从丢包列表去除,不再重发
1. 收到该消息,上锁,删除msg
2. 清理loss list等,重置lossttl等
PEERERROR
⽤来告知peer端出现错误
1. 更新PeerHealth为false
EXT
该包作为保留包和⽤户定义包
1. 判断包内容是否能够解析,能够解析且只有包的extended type为SRT_CMD_HSREQ、SRT_CMD_HSRSP才认为包有效并进⾏操作源码
源码位于core.cpp
void CUDT::processCtrl(CPacket& ctrlpkt)
{
// Just heard from the peer, reset the expiration count.
m_iEXPCount =1;
uint64_t currtime_tk;
CTimer::rdtsc(currtime_tk);
m_ullLastRspTime_tk = currtime_tk;
bool using_rexmit_flag = m_bPeerRexmitFlag;
HLOGC(mglog.Debug, log <<CONID()<<"incoming UMSG:"<< Type()<<" ("
<<Type(), ExtendedType())<<") socket=%"<< ctrlpkt.m_iID);
Type())
{
case UMSG_ACK://010 - Acknowledgement
{
int32_t ack;
int32_t* ackdata =(int32_t*)ctrlpkt.m_pcData;
/
/ process a lite ACK
Length()==(size_t)SEND_LITE_ACK)
{
ack =*ackdata;
if(CSeqNo::seqcmp(ack, m_iSndLastAck)>=0)
{
m_iFlowWindowSize -= CSeqNo::seqoff(m_iSndLastAck, ack);
无心磨床调机方法HLOGC(mglog.Debug, log <<CONID()<<"ACK covers: "<< m_iSndLastDataAck <<" - "<< ack <<" [ACK="<< m_iSndLastAck <<"] (FLW: "<< m _iFlowWindowSize <<") [LITE]");
m_iSndLastAck = ack;
m_ullLastRspAckTime_tk = currtime_tk;
m_iReXmitCount =1;// Reset re-transmit count since last ACK
}
break;
}
// read ACK seq. no.
ack = AckSeqNo();
// send ACK acknowledgement
// number of ACK2 can be much less than number of ACK
uint64_t now = CTimer::getTime();
if((now - m_ullSndLastAck2Time >(uint64_t)COMM_SYN_INTERVAL_US)||(ack == m_iSndLastAck2))
全自动面膜灌装机{
sendCtrl(UMSG_ACKACK,&ack);
m_iSndLastAck2 = ack;
m_ullSndLastAck2Time = now;
}
// Got data ACK
ack = ackdata[ACKD_RCVLASTACK];
经络油// New code, with TLPKTDROP
// protect packet retransmission
CGuard::enterCS(m_AckLock);
// check the validation of the ack