目录
前言
正文
前言
正文
话不多说,上干货。封装层可以直接调用webrtc的DeliverPacket方法,传入rtp数据包,接下来就是将接收的rtp包进行拆包、解码、渲染了。其实,外界不需要关心接口方法的具体实现。但是今天我们就来看看这个方法的源码,具体实现如下:
//webrtc/call/call.cc
PacketReceiver::DeliveryStatus Call::DeliverPacket(
MediaType media_type,
const uint8_t* packet,
size_t length,
const PacketTime& packet_time) {
// TODO(solenberg): Tests call this function on a network thread, libjingle
// calls on the worker thread. We should move towards always using a network
// thread. Then this check can be enabled.
// RTC_DCHECK(!configuration_thread_checker_.CalledOnValidThread());
if (RtpHeaderParser::IsRtcp(packet, length))
return DeliverRtcp(media_type, packet, length);
return DeliverRtp(media_type, packet, length, packet_time);
}
h264数据保存在rtp包中,我不需要关心DeliverRtcp方法,直接看DeliverRtp方法。
//webrtc/call/call.cc
PacketReceiver::DeliveryStatus Call::DeliverRtp(MediaType media_type,
const uint8_t* packet,
size_t length,
const PacketTime& packet_time) {
TRACE_EVENT0("webrtc", "Call::DeliverRtp");
// Minimum RTP header size.
if (length < 12)
return DELIVERY_PACKET_ERROR;
uint32_t ssrc = ByteReader<uint32_t>::ReadBigEndian(&packet[8]);
uint8_t payload_type = packet[1] & 0x7f;
ReadLockScoped read_lock(*receive_crit_);
if (media_type == MediaType::ANY || media_type == MediaType::AUDIO) {
auto it = audio_receive_ssrcs_.find(ssrc);
if (it != audio_receive_ssrcs_.end()) {
received_bytes_per_second_counter_.Add(static_cast<int>(length));
received_audio_bytes_per_second_counter_.Add(static_cast<int>(length));
auto status = it->second->DeliverRtp(packet, length, packet_time)
? DELIVERY_OK
: DELIVERY_PACKET_ERROR;
if (status == DELIVERY_OK)
event_log_->LogRtpHeader(kIncomingPacket, media_type, packet, length);
return status;
}
}
if (media_type == MediaType::ANY || media_type == MediaType::VIDEO) {
auto it = video_receive_ssrcs_.find(ssrc);
if (it != video_receive_ssrcs_.end()) {
received_bytes_per_second_counter_.Add(static_cast<int>(length));
received_video_bytes_per_second_counter_.Add(static_cast<int>(length));
// TODO(brandtr): Notify the BWE of received media packets here.
auto status = DELIVERY_OK;
const std::vector<VideoReceiveStream::Decoder>& decoders = it->second->config().decoders;
if (decoders.size() > 0) {
int decoder_pt = decoders[0].payload_type;
if (payload_type == decoder_pt) {
status = it->second->DeliverRtp(packet, length, packet_time) ? DELIVERY_OK : DELIVERY_PACKET_ERROR;
} else {
LOG(LS_WARNING) << "Receive FlexFec packet ,decoder_pt=" << decoder_pt
<< ", receive packet pt=" << payload_type;
}
}
// Deliver media packets to FlexFEC subsystem. RTP header extensions need
// not be parsed, as FlexFEC is oblivious to the semantic meaning of the
// packet contents beyond the 12 byte RTP base header. The BWE is fed
// information about these media packets from the regular media pipeline.
rtc::Optional<RtpPacketReceived> parsed_packet =
ParseRtpPacket(packet, length, packet_time);
if (parsed_packet) {
auto it_bounds = flexfec_receive_ssrcs_media_.equal_range(ssrc);
for (auto it = it_bounds.first; it != it_bounds.second; ++it)
it->second->AddAndProcessReceivedPacket(*parsed_packet);
}
if (status == DELIVERY_OK)
event_log_->LogRtpHeader(kIncomingPacket, media_type, packet, length);
return status;
}
}
if (media_type == MediaType::ANY || media_type == MediaType::VIDEO) {
auto it = flexfec_receive_ssrcs_protection_.find(ssrc);
if (it != flexfec_receive_ssrcs_protection_.end()) {
rtc::Optional<RtpPacketReceived> parsed_packet =
ParseRtpPacket(packet, length, packet_time);
if (parsed_packet) {
NotifyBweOfReceivedPacket(*parsed_packet);
auto status = it->second->AddAndProcessReceivedPacket(*parsed_packet)
? DELIVERY_OK
: DELIVERY_PACKET_ERROR;
if (status == DELIVERY_OK)
event_log_->LogRtpHeader(kIncomingPacket, media_type, packet, length);
return status;
}
}
}
return DELIVERY_UNKNOWN_SSRC;
}
如果你细心就会发现方法中判断了两次if (media_type == MediaType::ANY || media_type == MediaType::VIDEO) ,这是怎么回事儿呢?其实,这就涉及到了webrtc到fec机制。这里不深入展开了,接下来的文章会详细介绍。我们现在需要关心的是第一次的判断,看ParseRtpPacket和AddAndProcessReceivedPacket方法,这两个方法的实现如下。
ParseRtpPacket:
//webrtc/call/call.cc
rtc::Optional<RtpPacketReceived> Call::ParseRtpPacket(
const uint8_t* packet,
size_t length,
const PacketTime& packet_time) {
RtpPacketReceived parsed_packet;
if (!parsed_packet.Parse(packet, length))
return rtc::Optional<RtpPacketReceived>();
auto it = received_rtp_header_extensions_.find(parsed_packet.Ssrc());
if (it != received_rtp_header_extensions_.end())
parsed_packet.IdentifyExtensions(it->second);
int64_t arrival_time_ms;
if (packet_time.timestamp != -1) {
arrival_time_ms = (packet_time.timestamp + 500) / 1000;
} else {
arrival_time_ms = clock_->TimeInMilliseconds();
}
parsed_packet.set_arrival_time_ms(arrival_time_ms);
return rtc::Optional<RtpPacketReceived>(std::move(parsed_packet));
}
AddAndProcessReceivedPacket:
//webrtc/call/flexfec_receive_stream_impl.cc
bool FlexfecReceiveStreamImpl::AddAndProcessReceivedPacket(
const RtpPacketReceived& packet) {
{
rtc::CritScope cs(&crit_);
if (!started_)
return false;
}
if (!receiver_)
return false;
if (!receiver_->AddAndProcessReceivedPacket(packet))
return false;
// Do not report media packets in the RTCP RRs generated by |rtp_rtcp_|.
if (packet.Ssrc() == config_.remote_ssrc) {
RTPHeader header;
packet.GetHeader(&header);
// FlexFEC packets are never retransmitted.
const bool kNotRetransmitted = false;
rtp_receive_statistics_->IncomingPacket(header, packet.size(),
kNotRetransmitted);
}
return true;
}
这里先看AddAndProcessReceivedPacket方法,注意IncomingPacket方法也很重要,他将数据存储了起来,一会儿会介绍。
//webrtc/modules/rtp_rtcp/source/flexfec_receiver.cc
bool FlexfecReceiver::AddAndProcessReceivedPacket(
const RtpPacketReceived& packet) {
RTC_DCHECK_CALLED_SEQUENTIALLY(&sequence_checker_);
if (!AddReceivedPacket(std::move(packet))) {
return false;
}
return ProcessReceivedPackets();
}
接着看AddReceivedPacket方法:
//webrtc/modules/rtp_rtcp/source/flexfec_receiver.cc
bool FlexfecReceiver::AddReceivedPacket(const RtpPacketReceived& packet) {
RTC_DCHECK_CALLED_SEQUENTIALLY(&sequence_checker_);
// RTP packets with a full base header (12 bytes), but without payload,
// could conceivably be useful in the decoding. Therefore we check
// with a non-strict inequality here.
RTC_DCHECK_GE(packet.size(), kRtpHeaderSize);
// Demultiplex based on SSRC, and insert into erasure code decoder.
std::unique_ptr<ReceivedPacket> received_packet(new ReceivedPacket());
received_packet->seq_num = packet.SequenceNumber();
received_packet->ssrc = packet.Ssrc();
int packet_pt = packet.PayloadType();
if (packet_pt == payload_type_) {
// This is a FlexFEC packet.
if (packet.payload_size() < kMinFlexfecHeaderSize) {
LOG(LS_WARNING) << "Truncated FlexFEC packet, discarding.";
return false;
}
received_packet->is_fec = true;
++packet_counter_.num_fec_packets;
// Insert packet payload into erasure code.
// TODO(brandtr): Remove this memcpy when the FEC packet classes
// are using COW buffers internally.
received_packet->pkt = rtc::scoped_refptr<Packet>(new Packet());
auto payload = packet.payload();
memcpy(received_packet->pkt->data, payload.data(), payload.size());
received_packet->pkt->length = payload.size();
} else {
// This is a media packet, or a FlexFEC packet belonging to some
// other FlexFEC stream.
if (received_packet->ssrc != protected_media_ssrc_) {
return false;
}
received_packet->is_fec = false;
// Insert entire packet into erasure code.
// TODO(brandtr): Remove this memcpy too.
received_packet->pkt = rtc::scoped_refptr<Packet>(new Packet());
memcpy(received_packet->pkt->data, packet.data(), packet.size());
received_packet->pkt->length = packet.size();
}
received_packets_.push_back(std::move(received_packet));
++packet_counter_.num_packets;
return true;
}
这里对fec包和媒体包分别进行了处理,最后都插入了received_packets_里,放到里边后由方法ProcessReceivedPackets()来处理后续流程。
//webrtc/modules/rtp_rtcp/source/flexfec_receiver.cc
bool FlexfecReceiver::ProcessReceivedPackets() {
RTC_DCHECK_CALLED_SEQUENTIALLY(&sequence_checker_);
// LOG(LS_ERROR)<<"liuzhen ProcessReceivedPackets";
// Decode.
if (!received_packets_.empty()) {
if (erasure_code_->DecodeFec(&received_packets_, &recovered_packets_) !=
0) {
return false;
}
}
// Return recovered packets through callback.
for (const auto& recovered_packet : recovered_packets_) {
if (recovered_packet->returned) {
continue;
}
++packet_counter_.num_recovered_packets;
if (!recovered_packet_receiver_->OnRecoveredPacket(
recovered_packet->pkt->data, recovered_packet->pkt->length)) {
return false;
}
recovered_packet->returned = true;
// Periodically log the incoming packets.
int64_t now_ms = clock_->TimeInMilliseconds();
if (now_ms - last_recovered_packet_ms_ > kPacketLogIntervalMs) {
uint32_t media_ssrc =
ForwardErrorCorrection::ParseSsrc(recovered_packet->pkt->data);
LOG(LS_VERBOSE) << "Recovered media packet with SSRC: " << media_ssrc
<< " from FlexFEC stream with SSRC: " << ssrc_ << ".";
last_recovered_packet_ms_ = now_ms;
}
}
return true;
}
其中主要流程就是调用DecodeFec方法:
//webrtc/modules/rtp_rtcp/source/forward_error_correction.cc
int ForwardErrorCorrection::DecodeFec(
ReceivedPacketList* received_packets,
RecoveredPacketList* recovered_packets) {
// TODO(marpan/ajm): can we check for multiple ULP headers, and return an
// error?
const size_t max_media_packets = fec_header_reader_->MaxMediaPackets();
if (recovered_packets->size() == max_media_packets) {
const unsigned int seq_num_diff =
abs(static_cast<int>(received_packets->front()->seq_num) -
static_cast<int>(recovered_packets->back()->seq_num));
if (seq_num_diff > max_media_packets) {
// A big gap in sequence numbers. The old recovered packets
// are now useless, so it's safe to do a reset.
ResetState(recovered_packets);
}
}
InsertPackets(received_packets, recovered_packets);
AttemptRecovery(recovered_packets);
return 0;
}
主要看InsertPackets方法:
//webrtc/modules/rtp_rtcp/source/forward_error_correction.cc
void ForwardErrorCorrection::InsertPackets(
ReceivedPacketList* received_packets,
RecoveredPacketList* recovered_packets) {
while (!received_packets->empty()) {
ReceivedPacket* received_packet = received_packets->front().get();
// Check for discarding oldest FEC packet, to avoid wrong FEC decoding from
// sequence number wrap-around. Detection of old FEC packet is based on
// sequence number difference of received packet and oldest packet in FEC
// packet list.
// TODO(marpan/holmer): We should be able to improve detection/discarding of
// old FEC packets based on timestamp information or better sequence number
// thresholding (e.g., to distinguish between wrap-around and reordering).
if (!received_fec_packets_.empty()) {
uint16_t seq_num_diff =
abs(static_cast<int>(received_packet->seq_num) -
static_cast<int>(received_fec_packets_.front()->seq_num));
if (seq_num_diff > 0x3fff) {
received_fec_packets_.pop_front();
}
}
if (received_packet->is_fec) {
InsertFecPacket(*recovered_packets, received_packet);
} else {
InsertMediaPacket(recovered_packets, received_packet);
}
// Delete the received packet "wrapper".
received_packets->pop_front();
}
RTC_DCHECK(received_packets->empty());
DiscardOldRecoveredPackets(recovered_packets);
}
到目前为止,fec相关的方法都是在预处理数据,主要就是给解码线程准备食材,让解码线程吃起来方便。那么解码线程是什么时候启动的呢,接下来我们开始介绍。
//webrtc/video/video_receive_stream.cc
VideoReceiveStream::VideoReceiveStream(
int num_cpu_cores,
CongestionController* congestion_controller,
PacketRouter* packet_router,
VideoReceiveStream::Config config,
webrtc::VoiceEngine* voice_engine,
ProcessThread* process_thread,
CallStats* call_stats,
VieRemb* remb)
: transport_adapter_(config.rtcp_send_transport),
config_(std::move(config)),
num_cpu_cores_(num_cpu_cores),
process_thread_(process_thread),
clock_(Clock::GetRealTimeClock()),
decode_thread_(DecodeThreadFunction, this, "DecodingThread"),
congestion_controller_(congestion_controller),
call_stats_(call_stats),
timing_(new VCMTiming(clock_)),
video_receiver_(clock_, nullptr, this, timing_.get(), this, this),
stats_proxy_(&config_, clock_),
rtp_stream_receiver_(
&video_receiver_,
congestion_controller_->GetRemoteBitrateEstimator(
UseSendSideBwe(config_)),
&transport_adapter_,
call_stats_->rtcp_rtt_stats(),
congestion_controller_->pacer(),
packet_router,
remb,
&config_,
&stats_proxy_,
process_thread_,
congestion_controller_->GetRetransmissionRateLimiter(),
this, // NackSender
this, // KeyFrameRequestSender
this, // OnCompleteFrameCallback
timing_.get()),
rtp_stream_sync_(&video_receiver_, &rtp_stream_receiver_),
jitter_buffer_experiment_(
field_trial::FindFullName("WebRTC-NewVideoJitterBuffer") ==
"Enabled") {
LOG(LS_INFO) << "VideoReceiveStream: " << config_.ToString();
RTC_DCHECK(process_thread_);
RTC_DCHECK(congestion_controller_);
RTC_DCHECK(call_stats_);
RTC_DCHECK(!config_.decoders.empty());
std::set<int> decoder_payload_types;
for (const Decoder& decoder : config_.decoders) {
RTC_CHECK(decoder.decoder);
RTC_CHECK(decoder_payload_types.find(decoder.payload_type) ==
decoder_payload_types.end())
<< "Duplicate payload type (" << decoder.payload_type
<< ") for different decoders.";
decoder_payload_types.insert(decoder.payload_type);
}
video_receiver_.SetRenderDelay(config.render_delay_ms);
if (jitter_buffer_experiment_) {
jitter_estimator_.reset(new VCMJitterEstimator(clock_));
frame_buffer_.reset(new video_coding::FrameBuffer(
clock_, jitter_estimator_.get(), timing_.get()));
}
process_thread_->RegisterModule(&video_receiver_);
process_thread_->RegisterModule(&rtp_stream_sync_);
}
VideoReceiveStream定义对象的时候会声明一个decode_thread_解码线程,启动该线程就会开始解码。
//webrtc/video/video_receive_stream.cc
bool VideoReceiveStream::DecodeThreadFunction(void* ptr) {
static_cast<VideoReceiveStream*>(ptr)->Decode();
return true;
}
线程调用的解码方法如下:
//webrtc/video/video_receive_stream.cc
void VideoReceiveStream::Decode() {
static const int kMaxDecodeWaitTimeMs = 50;
if (jitter_buffer_experiment_) {
static const int kMaxWaitForFrameMs = 3000;
std::unique_ptr<video_coding::FrameObject> frame;
video_coding::FrameBuffer::ReturnReason res =
frame_buffer_->NextFrame(kMaxWaitForFrameMs, &frame);
if (res == video_coding::FrameBuffer::ReturnReason::kStopped)
return;
if (frame) {
if (video_receiver_.Decode(frame.get()) == VCM_OK)
rtp_stream_receiver_.FrameDecoded(frame->picture_id);
} else {
LOG(LS_WARNING) << "No decodable frame in " << kMaxWaitForFrameMs
<< " ms, requesting keyframe.";
RequestKeyFrame();
}
} else {
video_receiver_.Decode(kMaxDecodeWaitTimeMs);
}
}
这里主要看Decode方法:
//webrtc/modules/video_coding/video_receiver.cc
int32_t VideoReceiver::Decode(uint16_t maxWaitTimeMs) {
bool prefer_late_decoding = false;
{
rtc::CritScope cs(&receive_crit_);
prefer_late_decoding = _codecDataBase.PrefersLateDecoding();
}
VCMEncodedFrame* frame =
_receiver.FrameForDecoding(maxWaitTimeMs, prefer_late_decoding);//重点1
if (!frame)
return VCM_FRAME_NOT_READY;
{
rtc::CritScope cs(&process_crit_);
if (drop_frames_until_keyframe_) {
// Still getting delta frames, schedule another keyframe request as if
// decode failed.
if (frame->FrameType() != kVideoFrameKey) {
_scheduleKeyRequest = true;
_receiver.ReleaseFrame(frame);
return VCM_FRAME_NOT_READY;
}
drop_frames_until_keyframe_ = false;
}
}
if (pre_decode_image_callback_) {
EncodedImage encoded_image(frame->EncodedImage());
int qp = -1;
if (qp_parser_.GetQp(*frame, &qp)) {
encoded_image.qp_ = qp;
}
pre_decode_image_callback_->OnEncodedImage(encoded_image,
frame->CodecSpecific(), nullptr);
}
rtc::CritScope cs(&receive_crit_);
// If this frame was too late, we should adjust the delay accordingly
_timing->UpdateCurrentDelay(frame->RenderTimeMs(),
clock_->TimeInMilliseconds());
if (first_frame_received_()) {
LOG(LS_INFO) << "Received first "
<< (frame->Complete() ? "complete" : "incomplete")
<< " decodable video frame";
}
const int32_t ret = Decode(*frame);//重点2
_receiver.ReleaseFrame(frame);
return ret;
}
注意,这里我们更应该关心Decode处理的frame数据来自哪里,看FrameForDecoding方法:
//webrtc/modules/video_coding/receiver.cc
VCMEncodedFrame* VCMReceiver::FrameForDecoding(uint16_t max_wait_time_ms,
bool prefer_late_decoding) {
const int64_t start_time_ms = clock_->TimeInMilliseconds();
uint32_t frame_timestamp = 0;
int min_playout_delay_ms = -1;
int max_playout_delay_ms = -1;
int64_t render_time_ms = 0;
// Exhaust wait time to get a complete frame for decoding.
VCMEncodedFrame* found_frame =
jitter_buffer_.NextCompleteFrame(max_wait_time_ms);
if (found_frame) {
frame_timestamp = found_frame->TimeStamp();
min_playout_delay_ms = found_frame->EncodedImage().playout_delay_.min_ms;
max_playout_delay_ms = found_frame->EncodedImage().playout_delay_.max_ms;
} else {
if (!jitter_buffer_.NextMaybeIncompleteTimestamp(&frame_timestamp))
return nullptr;
}
if (min_playout_delay_ms >= 0)
timing_->set_min_playout_delay(min_playout_delay_ms);
if (max_playout_delay_ms >= 0)
timing_->set_max_playout_delay(max_playout_delay_ms);
// We have a frame - Set timing and render timestamp.
timing_->SetJitterDelay(jitter_buffer_.EstimatedJitterMs());
const int64_t now_ms = clock_->TimeInMilliseconds();
timing_->UpdateCurrentDelay(frame_timestamp);
render_time_ms = timing_->RenderTimeMs(frame_timestamp, now_ms);
// Check render timing.
bool timing_error = false;
// Assume that render timing errors are due to changes in the video stream.
if (render_time_ms < 0) {
timing_error = true;
} else if (std::abs(render_time_ms - now_ms) > max_video_delay_ms_) {
int frame_delay = static_cast<int>(std::abs(render_time_ms - now_ms));
LOG(LS_WARNING) << "A frame about to be decoded is out of the configured "
<< "delay bounds (" << frame_delay << " > "
<< max_video_delay_ms_
<< "). Resetting the video jitter buffer.";
timing_error = true;
} else if (static_cast<int>(timing_->TargetVideoDelay()) >
max_video_delay_ms_) {
LOG(LS_WARNING) << "The video target delay has grown larger than "
<< max_video_delay_ms_ << " ms. Resetting jitter buffer.";
timing_error = true;
}
if (timing_error) {
// Timing error => reset timing and flush the jitter buffer.
jitter_buffer_.Flush();
timing_->Reset();
return NULL;
}
if (prefer_late_decoding) {
// Decode frame as close as possible to the render timestamp.
const int32_t available_wait_time =
max_wait_time_ms -
static_cast<int32_t>(clock_->TimeInMilliseconds() - start_time_ms);
uint16_t new_max_wait_time =
static_cast<uint16_t>(VCM_MAX(available_wait_time, 0));
uint32_t wait_time_ms =
timing_->MaxWaitingTime(render_time_ms, clock_->TimeInMilliseconds());
if (new_max_wait_time < wait_time_ms) {
// We're not allowed to wait until the frame is supposed to be rendered,
// waiting as long as we're allowed to avoid busy looping, and then return
// NULL. Next call to this function might return the frame.
render_wait_event_->Wait(new_max_wait_time);
return NULL;
}
// Wait until it's time to render.
render_wait_event_->Wait(wait_time_ms);
}
// Extract the frame from the jitter buffer and set the render time.
VCMEncodedFrame* frame = jitter_buffer_.ExtractAndSetDecode(frame_timestamp);
if (frame == NULL) {
return NULL;
}
frame->SetRenderTime(render_time_ms);
TRACE_EVENT_ASYNC_STEP1