通过前面的文章我们可以了解到,当创建好Transport的时候,socket已经建立好。具备了相应的网络传输能力。我们来看一下socket接收到数据是如何处理的。

UdpSocketHandler::OnUvRecv

Socket接收数据

inline void UdpSocketHandler::OnUvRecv(
  ssize_t nread, const uv_buf_t* buf, const struct sockaddr* addr, unsigned int flags)
{
	MS_TRACE();

	// NOTE: Ignore if there is nothing to read or if it was an empty datagram.
	if (nread == 0)
		return;

	// Check flags.
	if ((flags & UV_UDP_PARTIAL) != 0u)
	{
		MS_ERROR("received datagram was truncated due to insufficient buffer, ignoring it");

		return;
	}

	// Data received.
	if (nread > 0)
	{
		// Update received bytes.更新接收字节。
		this->recvBytes += nread;

		// Notify the subclass.通知子类。UdpSocket 是其子类
		UserOnUdpDatagramReceived(reinterpret_cast<uint8_t*>(buf->base), nread, addr);
	}
	// Some error.
	else
	{
		MS_DEBUG_DEV("read error: %s", uv_strerror(nread));
	}
}
UserOnUdpDatagramReceived

具体由UdpSocket其子类实现,其中listener是在创建transport创建时的具体transport

	void UdpSocket::UserOnUdpDatagramReceived(const uint8_t* data, size_t len, const struct sockaddr* addr)
	{
		MS_TRACE();

		if (!this->listener)
		{
			MS_ERROR("no listener set");

			return;
		}

		// Notify the reader.通知读者。
		this->listener->OnUdpSocketPacketReceived(this, data, len, addr);
	}
OnUdpSocketPacketReceived

以PlainTransport为例

    //从udpsocket获得了接收数据
	inline void PlainTransport::OnUdpSocketPacketReceived(
	  RTC::UdpSocket* socket, const uint8_t* data, size_t len, const struct sockaddr* remoteAddr)
	{
		MS_TRACE();
        //形成元组,记录IP等内容
		RTC::TransportTuple tuple(socket, remoteAddr);
        //进入到当前transport处理
		OnPacketReceived(&tuple, data, len);
	}
PlainTransport::OnPacketReceived
	inline void PlainTransport::OnPacketReceived(RTC::TransportTuple* tuple, const uint8_t* data, size_t len)
	{
		MS_TRACE();

		// Increase receive transmission.增加接收传输。
		RTC::Transport::DataReceived(len);

		// Check if it's RTCP.检查它是否是RTCP。
		if (RTC::RTCP::Packet::IsRtcp(data, len))
		{
			OnRtcpDataReceived(tuple, data, len);
		}
		// Check if it's RTP.检查它是否是RTP。
		else if (RTC::RtpPacket::IsRtp(data, len))
		{
			OnRtpDataReceived(tuple, data, len);
		}
		// Check if it's SCTP.检查它是否是SCTP。
		else if (RTC::SctpAssociation::IsSctp(data, len))
		{
			OnSctpDataReceived(tuple, data, len);
		}
		else
		{
			MS_WARN_DEV("ignoring received packet of unknown type");
		}
	}
RTP数据处理方式

首先来处理是不是加密的RTP数据;然后根据既定格式重构RTP数据为Packet;最后透传整理好的packet到上层Transport

	inline void PlainTransport::OnRtpDataReceived(
	  RTC::TransportTuple* tuple, const uint8_t* data, size_t len)
	{
		MS_TRACE();
        
		if (HasSrtp() && !IsSrtpReady())
			return;

		// Decrypt the SRTP packet.解密SRTP报文。
		auto intLen = static_cast<int>(len);

		if (HasSrtp() && !this->srtpRecvSession->DecryptSrtp(const_cast<uint8_t*>(data), &intLen))
		{
			RTC::RtpPacket* packet = RTC::RtpPacket::Parse(data, static_cast<size_t>(intLen));

			if (!packet)
			{
				MS_WARN_TAG(srtp, "DecryptSrtp() failed due to an invalid RTP packet");
			}
			else
			{
				MS_WARN_TAG(
				  srtp,
				  "DecryptSrtp() failed [ssrc:%" PRIu32 ", payloadType:%" PRIu8 ", seq:%" PRIu16 "]",
				  packet->GetSsrc(),
				  packet->GetPayloadType(),
				  packet->GetSequenceNumber());

				delete packet;
			}

			return;
		}
        //解析socket数据,获取格式化后的RtpPacket
		RTC::RtpPacket* packet = RTC::RtpPacket::Parse(data, static_cast<size_t>(intLen));

		if (!packet)
		{
			MS_WARN_TAG(rtp, "received data is not a valid RTP packet");

			return;
		}

		// If we don't have a RTP tuple yet, check whether comedia mode is set.
		if (!this->tuple)
		{
			if (!this->comedia)
			{
				MS_DEBUG_TAG(rtp, "ignoring RTP packet while not connected");

				// Remove this SSRC.
				RecvStreamClosed(packet->GetSsrc());

				delete packet;

				return;
			}

			MS_DEBUG_TAG(rtp, "setting RTP tuple (comedia mode enabled)");

			auto wasConnected = IsConnected();

			this->tuple = new RTC::TransportTuple(tuple);

			if (!this->listenIp.announcedIp.empty())
				this->tuple->SetLocalAnnouncedIp(this->listenIp.announcedIp);

			// If not yet connected do it now.
			if (!wasConnected)
			{
				// Notify the Node PlainTransport.
				json data = json::object();

				this->tuple->FillJson(data["tuple"]);

				this->shared->channelNotifier->Emit(this->id, "tuple", data);

				RTC::Transport::Connected();
			}
		}
		// Otherwise, if RTP tuple is set, verify that it matches the origin
		// of the packet.
		else if (!this->tuple->Compare(tuple))
		{
			MS_DEBUG_TAG(rtp, "ignoring RTP packet from unknown IP:port");

			// Remove this SSRC.
			RecvStreamClosed(packet->GetSsrc());

			delete packet;

			return;
		}

		// Pass the packet to the parent transport.将数据包传递给父传输。
		RTC::Transport::ReceiveRtpPacket(packet);
	}
Transport::ReceiveRtpPacket
    //当前调用来源于子类的OnRtpDataReceived中触发了当前接口
	void Transport::ReceiveRtpPacket(RTC::RtpPacket* packet)
	{
		MS_TRACE();

		packet->logger.recvTransportId = this->id;

		// Apply the Transport RTP header extension ids so the RTP listener can use them.
		// 应用传输RTP报头扩展id,以便RTP侦听器可以使用它们。
		packet->SetMidExtensionId(this->recvRtpHeaderExtensionIds.mid);
		packet->SetRidExtensionId(this->recvRtpHeaderExtensionIds.rid);
		packet->SetRepairedRidExtensionId(this->recvRtpHeaderExtensionIds.rrid);
		packet->SetAbsSendTimeExtensionId(this->recvRtpHeaderExtensionIds.absSendTime);
		packet->SetTransportWideCc01ExtensionId(this->recvRtpHeaderExtensionIds.transportWideCc01);

		auto nowMs = DepLibUV::GetTimeMs();

		// Feed the TransportCongestionControlServer.
		if (this->tccServer)
		{
			this->tccServer->IncomingPacket(nowMs, packet);
		}

		// Get the associated Producer.
		/*根据收到的packet,查找关联的producer。*/
		RTC::Producer* producer = this->rtpListener.GetProducer(packet);

		if (!producer)
		{
			packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::PRODUCER_NOT_FOUND);

			MS_WARN_TAG(
			  rtp,
			  "no suitable Producer for received RTP packet [ssrc:%" PRIu32 ", payloadType:%" PRIu8 "]",
			  packet->GetSsrc(),
			  packet->GetPayloadType());

			// Tell the child class to remove this SSRC.告诉子类删除这个SSRC。
			RecvStreamClosed(packet->GetSsrc());

			delete packet;

			return;
		}

		// MS_DEBUG_DEV(
		//   "RTP packet received [ssrc:%" PRIu32 ", payloadType:%" PRIu8 ", producerId:%s]",
		//   packet->GetSsrc(),
		//   packet->GetPayloadType(),
		//   producer->id.c_str());

		// Pass the RTP packet to the corresponding Producer.
		/*将packet传给指定的producer,进行下一步处理。*/
		auto result = producer->ReceiveRtpPacket(packet);

		switch (result)/*根据packet包类型不同,进行不同通道的码率统计。*/
		{
			case RTC::Producer::ReceiveRtpPacketResult::MEDIA:
				this->recvRtpTransmission.Update(packet);/*媒体通道的码率统计*/ 
				break;
			case RTC::Producer::ReceiveRtpPacketResult::RETRANSMISSION:
				this->recvRtxTransmission.Update(packet); /*重传通道的码率统计*/   
				break;
			case RTC::Producer::ReceiveRtpPacketResult::DISCARDED:
				// Tell the child class to remove this SSRC.
				RecvStreamClosed(packet->GetSsrc());
				break;
			default:;
		}
        /*释放rtp包*/
		delete packet;
	}
Producer::ReceiveRtpPacket
    /*接收到transport传入的packet,对packet进行指定的处理。*/
	Producer::ReceiveRtpPacketResult Producer::ReceiveRtpPacket(RTC::RtpPacket* packet)
	{
		MS_TRACE();
 
		packet->logger.producerId = this->id;

		// Reset current packet.
		/*重置当前正在处理的packet*/
		this->currentRtpPacket = nullptr;

		// Count number of RTP streams.统计当前接收流的数目
		auto numRtpStreamsBefore = this->mapSsrcRtpStream.size();
        /*通过packet,获取对应的接收流。*/
		auto* rtpStream = GetRtpStream(packet);

		if (!rtpStream)/*没有查找到对应的rtp接收流*/
		{
			MS_WARN_TAG(rtp, "no stream found for received packet [ssrc:%" PRIu32 "]", packet->GetSsrc());

			packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::RECV_RTP_STREAM_NOT_FOUND);

			return ReceiveRtpPacketResult::DISCARDED;/*将packet丢弃*/
		}

		// Pre-process the packet.
		/*对packet进行预处理:如果是视频,则添加头部扩展id。*/
		PreProcessRtpPacket(packet);

		ReceiveRtpPacketResult result;
		bool isRtx{ false };/*packet是否是rtx流中的packet*/

		// Media packet.
		/*是主流中的rtp包*/
		if (packet->GetSsrc() == rtpStream->GetSsrc())
		{   
			/*设置返回结果,表示是媒体流,视频流或音频流。*/
			result = ReceiveRtpPacketResult::MEDIA;

			// Process the packet.
			/*rtp接收流处理接收到的packet*/
			if (!rtpStream->ReceivePacket(packet))
			{
				// May have to announce a new RTP stream to the listener.
				/*如果添加了新的rtp接收流,则通知其订阅者。*/
				if (this->mapSsrcRtpStream.size() > numRtpStreamsBefore)
					NotifyNewRtpStream(rtpStream); /*最终通知到的是与producer相关的consumer*/

				packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::RECV_RTP_STREAM_DISCARDED);

				return result;
			}
		}
		// RTX packet.
		/*重传流中的rtp包*/
		else if (packet->GetSsrc() == rtpStream->GetRtxSsrc())
		{
			result = ReceiveRtpPacketResult::RETRANSMISSION;
			isRtx  = true;

			// Process the packet.
			/*rtp接收流处理重传流中的packet*/
			if (!rtpStream->ReceiveRtxPacket(packet))
			{
				packet->logger.Dropped(RtcLogger::RtpPacket::DropReason::RECV_RTP_STREAM_NOT_FOUND);

				return result;
			}
		}
		// Should not happen.
		else
		{
			MS_ABORT("found stream does not match received packet");
		}
        /*判断packet是否是关键帧中的包*/
		if (packet->IsKeyFrame())
		{
			MS_DEBUG_TAG(
			  rtp,
			  "key frame received [ssrc:%" PRIu32 ", seq:%" PRIu16 "]",
			  packet->GetSsrc(),
			  packet->GetSequenceNumber());

			// Tell the keyFrameRequestManager.
			if (this->keyFrameRequestManager)
				this->keyFrameRequestManager->KeyFrameReceived(packet->GetSsrc()); /*更新关键帧*/
		}

		// May have to announce a new RTP stream to the listener.
		if (this->mapSsrcRtpStream.size() > numRtpStreamsBefore)
		{
			// Request a key frame for this stream since we may have lost the first packets
			// (do not do it if this is a key frame).
			if (this->keyFrameRequestManager && !this->paused && !packet->IsKeyFrame())
				this->keyFrameRequestManager->ForceKeyFrameNeeded(packet->GetSsrc());

			// Update current packet.
			this->currentRtpPacket = packet;

			NotifyNewRtpStream(rtpStream);

			// Reset current packet.
			this->currentRtpPacket = nullptr;
		}

		// If paused stop here.
		if (this->paused)
			return result;

		// May emit 'trace' event.
		EmitTraceEventRtpAndKeyFrameTypes(packet, isRtx);

		// Mangle the packet before providing the listener with it.
		/*在将packet发布至其订阅者之前,对其进行倾轧。
		主要进行payload type,ssrc,header extension的处理*/
		if (!MangleRtpPacket(packet, rtpStream))
			return ReceiveRtpPacketResult::DISCARDED;

		// Post-process the packet.
		/*最后再对packet进行一次处理*/
		PostProcessRtpPacket(packet);
        /*将处理后的packet,发送到其订阅者transport中。*/
		this->listener->OnProducerRtpPacketReceived(this, packet);

		return result;
	}
向上传递到Transport层
	inline void Transport::OnProducerRtpPacketReceived(RTC::Producer* producer, RTC::RtpPacket* packet)
	{
		MS_TRACE();
        //listener是上层的Router
		this->listener->OnTransportProducerRtpPacketReceived(this, producer, packet);
	}
向上传递到Router层
	inline void Router::OnTransportProducerRtpPacketReceived(
	  RTC::Transport* /*transport*/, RTC::Producer* producer, RTC::RtpPacket* packet)
	{
		MS_TRACE();

		packet->logger.routerId = this->id;
        //通过生产者,所以出订阅者列表
		auto& consumers = this->mapProducerConsumers.at(producer);
        //如果存在对应的订阅者
		if (!consumers.empty())
		{
			// Cloned ref-counted packet that RtpStreamSend will store for as long as
			// needed avoiding multiple allocations unless absolutely necessary.
			// Clone only happens if needed.
			std::shared_ptr<RTC::RtpPacket> sharedPacket;

			for (auto* consumer : consumers)
			{
				// Update MID RTP extension value.
				const auto& mid = consumer->GetRtpParameters().mid;

				if (!mid.empty())
					packet->UpdateMid(mid);
                //发送RTP数据
				consumer->SendRtpPacket(packet, sharedPacket);
			}
		}

		auto it = this->mapProducerRtpObservers.find(producer);

		if (it != this->mapProducerRtpObservers.end())
		{
			auto& rtpObservers = it->second;

			for (auto* rtpObserver : rtpObservers)
			{
				rtpObserver->ReceiveRtpPacket(producer, packet);
			}
		}
	}
具体transport通道转发数据
void PlainTransport::SendRtpPacket(
	  RTC::Consumer* /*consumer*/, RTC::RtpPacket* packet, RTC::Transport::onSendCallback* cb)
	{
		MS_TRACE();

		if (!IsConnected())
		{
			if (cb)
			{
				(*cb)(false);
				delete cb;
			}

			return;
		}

		const uint8_t* data = packet->GetData();
		auto intLen         = static_cast<int>(packet->GetSize());

		if (HasSrtp() && !this->srtpSendSession->EncryptRtp(&data, &intLen))
		{
			if (cb)
			{
				(*cb)(false);
				delete cb;
			}

			return;
		}

		auto len = static_cast<size_t>(intLen);
        //使用元组获发送RTP数据
		this->tuple->Send(data, len, cb);

		// Increase send transmission.增加发送传输的数据大小。
		RTC::Transport::DataSent(len);
	}
		void Send(const uint8_t* data, size_t len, RTC::TransportTuple::onSendCallback* cb = nullptr)
		{
			if (this->protocol == Protocol::UDP)
				this->udpSocket->Send(data, len, this->udpRemoteAddr, cb);
			else
				this->tcpConnection->Send(data, len, cb);
		}

底层实际发送

void UdpSocketHandler::Send(
  const uint8_t* data, size_t len, const struct sockaddr* addr, UdpSocketHandler::onSendCallback* cb)
{
	MS_TRACE();

	if (this->closed)
	{
		if (cb)
		{
			(*cb)(false);
			delete cb;
		}

		return;
	}

	if (len == 0)
	{
		if (cb)
		{
			(*cb)(false);
			delete cb;
		}

		return;
	}

	// First try uv_udp_try_send(). In case it can not directly send the datagram
	// then build a uv_req_t and use uv_udp_send().

	uv_buf_t buffer = uv_buf_init(reinterpret_cast<char*>(const_cast<uint8_t*>(data)), len);
	const int sent  = uv_udp_try_send(this->uvHandle, &buffer, 1, addr);

	// Entire datagram was sent. Done.
	if (sent == static_cast<int>(len))
	{
		// Update sent bytes.
		this->sentBytes += sent;

		if (cb)
		{
			(*cb)(true);
			delete cb;
		}

		return;
	}
	else if (sent >= 0)
	{
		MS_WARN_DEV("datagram truncated (just %d of %zu bytes were sent)", sent, len);

		// Update sent bytes.
		this->sentBytes += sent;

		if (cb)
		{
			(*cb)(false);
			delete cb;
		}

		return;
	}
	// Any error but legit EAGAIN. Use uv_udp_send().
	else if (sent != UV_EAGAIN)
	{
		MS_WARN_DEV("uv_udp_try_send() failed, trying uv_udp_send(): %s", uv_strerror(sent));
	}

	auto* sendData = new UvSendData(len);

	sendData->req.data = static_cast<void*>(sendData);
	std::memcpy(sendData->store, data, len);
	sendData->cb = cb;

	buffer = uv_buf_init(reinterpret_cast<char*>(sendData->store), len);

	int err = uv_udp_send(
	  &sendData->req, this->uvHandle, &buffer, 1, addr, static_cast<uv_udp_send_cb>(onSend));

	if (err != 0)
	{
		// NOTE: uv_udp_send() returns error if a wrong INET family is given
		// (IPv6 destination on a IPv4 binded socket), so be ready.
		MS_WARN_DEV("uv_udp_send() failed: %s", uv_strerror(err));

		if (cb)
			(*cb)(false);

		// Delete the UvSendData struct (it will delete the store and cb too).
		delete sendData;
	}
	else
	{
		// Update sent bytes.
		this->sentBytes += len;
	}
}
10-09 15:30