目录

前言

正文

前言

WebRTC是谷歌为实时音视频通讯提供的一个近乎完美的解决方案,功能强大且使用简单,关键是开源,方便我们进行私有化定制开发。本文主要分析其中H264编码的视频包的接收、解码的流程。

正文

话不多说,上干货。

封装层可以直接调用webrtc的DeliverPacket方法,传入rtp数据包,接下来就是将接收的rtp包进行拆包、解码、渲染了。其实,外界不需要关心接口方法的具体实现。但是今天我们就来看看这个方法的源码,具体实现如下:

//webrtc/call/call.cc

PacketReceiver::DeliveryStatus Call::DeliverPacket(

    MediaType media_type,

    const uint8_t* packet,

    size_t length,

    const PacketTime& packet_time) {

  // TODO(solenberg): Tests call this function on a network thread, libjingle

  // calls on the worker thread. We should move towards always using a network

  // thread. Then this check can be enabled.

  // RTC_DCHECK(!configuration_thread_checker_.CalledOnValidThread());

  if (RtpHeaderParser::IsRtcp(packet, length))

    return DeliverRtcp(media_type, packet, length);

 

  return DeliverRtp(media_type, packet, length, packet_time);

}

h264数据保存在rtp包中,我不需要关心DeliverRtcp方法,直接看DeliverRtp方法。

//webrtc/call/call.cc

PacketReceiver::DeliveryStatus Call::DeliverRtp(MediaType media_type,

                                                const uint8_t* packet,

                                                size_t length,

                                                const PacketTime& packet_time) {

  TRACE_EVENT0("webrtc", "Call::DeliverRtp");

  // Minimum RTP header size.

  if (length < 12)

    return DELIVERY_PACKET_ERROR;

 

  uint32_t ssrc = ByteReader<uint32_t>::ReadBigEndian(&packet[8]);

  uint8_t payload_type = packet[1] & 0x7f;

  ReadLockScoped read_lock(*receive_crit_);

  if (media_type == MediaType::ANY || media_type == MediaType::AUDIO) {

    auto it = audio_receive_ssrcs_.find(ssrc);

    if (it != audio_receive_ssrcs_.end()) {

      received_bytes_per_second_counter_.Add(static_cast<int>(length));

      received_audio_bytes_per_second_counter_.Add(static_cast<int>(length));

      auto status = it->second->DeliverRtp(packet, length, packet_time)

                        ? DELIVERY_OK

                        : DELIVERY_PACKET_ERROR;

      if (status == DELIVERY_OK)

        event_log_->LogRtpHeader(kIncomingPacket, media_type, packet, length);

      return status;

    }

  }

  if (media_type == MediaType::ANY || media_type == MediaType::VIDEO) {

    auto it = video_receive_ssrcs_.find(ssrc);

    if (it != video_receive_ssrcs_.end()) {

      received_bytes_per_second_counter_.Add(static_cast<int>(length));

      received_video_bytes_per_second_counter_.Add(static_cast<int>(length));

      // TODO(brandtr): Notify the BWE of received media packets here.

        auto status = DELIVERY_OK;

        const std::vector<VideoReceiveStream::Decoder>& decoders = it->second->config().decoders;

        if (decoders.size() > 0) {

            int decoder_pt = decoders[0].payload_type;

            if (payload_type == decoder_pt) {

                status = it->second->DeliverRtp(packet, length, packet_time) ? DELIVERY_OK : DELIVERY_PACKET_ERROR;

            } else {

                LOG(LS_WARNING) << "Receive FlexFec packet ,decoder_pt=" << decoder_pt

                    << ", receive packet pt=" << payload_type;

            }

        }

      // Deliver media packets to FlexFEC subsystem. RTP header extensions need

      // not be parsed, as FlexFEC is oblivious to the semantic meaning of the

      // packet contents beyond the 12 byte RTP base header. The BWE is fed

      // information about these media packets from the regular media pipeline.

      rtc::Optional<RtpPacketReceived> parsed_packet =

          ParseRtpPacket(packet, length, packet_time);

      if (parsed_packet) {

        auto it_bounds = flexfec_receive_ssrcs_media_.equal_range(ssrc);

        for (auto it = it_bounds.first; it != it_bounds.second; ++it)

          it->second->AddAndProcessReceivedPacket(*parsed_packet);

      }

      if (status == DELIVERY_OK)

        event_log_->LogRtpHeader(kIncomingPacket, media_type, packet, length);

      return status;

    }

  }

  if (media_type == MediaType::ANY || media_type == MediaType::VIDEO) {

    auto it = flexfec_receive_ssrcs_protection_.find(ssrc);

    if (it != flexfec_receive_ssrcs_protection_.end()) {

      rtc::Optional<RtpPacketReceived> parsed_packet =

          ParseRtpPacket(packet, length, packet_time);

      if (parsed_packet) {

        NotifyBweOfReceivedPacket(*parsed_packet);

        auto status = it->second->AddAndProcessReceivedPacket(*parsed_packet)

                          ? DELIVERY_OK

                          : DELIVERY_PACKET_ERROR;

        if (status == DELIVERY_OK)

          event_log_->LogRtpHeader(kIncomingPacket, media_type, packet, length);

        return status;

      }

    }

  }

  return DELIVERY_UNKNOWN_SSRC;

}

如果你细心就会发现方法中判断了两次if (media_type == MediaType::ANY || media_type == MediaType::VIDEO) ,这是怎么回事儿呢?其实,这就涉及到了webrtc到fec机制。这里不深入展开了,接下来的文章会详细介绍。我们现在需要关心的是第一次的判断,看ParseRtpPacket和AddAndProcessReceivedPacket方法,这两个方法的实现如下。

ParseRtpPacket:

//webrtc/call/call.cc

rtc::Optional<RtpPacketReceived> Call::ParseRtpPacket(

    const uint8_t* packet,

    size_t length,

    const PacketTime& packet_time) {

  RtpPacketReceived parsed_packet;

  if (!parsed_packet.Parse(packet, length))

    return rtc::Optional<RtpPacketReceived>();

 

  auto it = received_rtp_header_extensions_.find(parsed_packet.Ssrc());

  if (it != received_rtp_header_extensions_.end())

    parsed_packet.IdentifyExtensions(it->second);

 

  int64_t arrival_time_ms;

  if (packet_time.timestamp != -1) {

    arrival_time_ms = (packet_time.timestamp + 500) / 1000;

  } else {

    arrival_time_ms = clock_->TimeInMilliseconds();

  }

  parsed_packet.set_arrival_time_ms(arrival_time_ms);

 

  return rtc::Optional<RtpPacketReceived>(std::move(parsed_packet));

}

AddAndProcessReceivedPacket: 

//webrtc/call/flexfec_receive_stream_impl.cc

bool FlexfecReceiveStreamImpl::AddAndProcessReceivedPacket(

    const RtpPacketReceived& packet) {

  {

    rtc::CritScope cs(&crit_);

    if (!started_)

      return false;

  }

 

  if (!receiver_)

    return false;

 

  if (!receiver_->AddAndProcessReceivedPacket(packet))

    return false;

 

  // Do not report media packets in the RTCP RRs generated by |rtp_rtcp_|.

  if (packet.Ssrc() == config_.remote_ssrc) {

    RTPHeader header;

    packet.GetHeader(&header);

    // FlexFEC packets are never retransmitted.

    const bool kNotRetransmitted = false;

    rtp_receive_statistics_->IncomingPacket(header, packet.size(),

                                            kNotRetransmitted);

  }

 

  return true;

}

这里先看AddAndProcessReceivedPacket方法,注意IncomingPacket方法也很重要,他将数据存储了起来,一会儿会介绍。

//webrtc/modules/rtp_rtcp/source/flexfec_receiver.cc

bool FlexfecReceiver::AddAndProcessReceivedPacket(

    const RtpPacketReceived& packet) {

  RTC_DCHECK_CALLED_SEQUENTIALLY(&sequence_checker_);

  if (!AddReceivedPacket(std::move(packet))) {

    return false;

  }

  return ProcessReceivedPackets();

}

接着看AddReceivedPacket方法:

//webrtc/modules/rtp_rtcp/source/flexfec_receiver.cc

bool FlexfecReceiver::AddReceivedPacket(const RtpPacketReceived& packet) {

  RTC_DCHECK_CALLED_SEQUENTIALLY(&sequence_checker_);

 

  // RTP packets with a full base header (12 bytes), but without payload,

  // could conceivably be useful in the decoding. Therefore we check

  // with a non-strict inequality here.

  RTC_DCHECK_GE(packet.size(), kRtpHeaderSize);

 

  // Demultiplex based on SSRC, and insert into erasure code decoder.

  std::unique_ptr<ReceivedPacket> received_packet(new ReceivedPacket());

  received_packet->seq_num = packet.SequenceNumber();

  received_packet->ssrc = packet.Ssrc();

  int packet_pt = packet.PayloadType();

  if (packet_pt == payload_type_) {

    // This is a FlexFEC packet.

    if (packet.payload_size() < kMinFlexfecHeaderSize) {

      LOG(LS_WARNING) << "Truncated FlexFEC packet, discarding.";

      return false;

    }

    received_packet->is_fec = true;

    ++packet_counter_.num_fec_packets;

 

    // Insert packet payload into erasure code.

    // TODO(brandtr): Remove this memcpy when the FEC packet classes

    // are using COW buffers internally.

    received_packet->pkt = rtc::scoped_refptr<Packet>(new Packet());

    auto payload = packet.payload();

    memcpy(received_packet->pkt->data, payload.data(), payload.size());

    received_packet->pkt->length = payload.size();

  } else {

    // This is a media packet, or a FlexFEC packet belonging to some

    // other FlexFEC stream.

    if (received_packet->ssrc != protected_media_ssrc_) {

      return false;

    }

    received_packet->is_fec = false;

 

    // Insert entire packet into erasure code.

    // TODO(brandtr): Remove this memcpy too.

    received_packet->pkt = rtc::scoped_refptr<Packet>(new Packet());

    memcpy(received_packet->pkt->data, packet.data(), packet.size());

    received_packet->pkt->length = packet.size();

  }

 

  received_packets_.push_back(std::move(received_packet));

  ++packet_counter_.num_packets;

 

  return true;

}

这里对fec包和媒体包分别进行了处理,最后都插入了received_packets_里,放到里边后由方法ProcessReceivedPackets()来处理后续流程。

//webrtc/modules/rtp_rtcp/source/flexfec_receiver.cc

bool FlexfecReceiver::ProcessReceivedPackets() {

  RTC_DCHECK_CALLED_SEQUENTIALLY(&sequence_checker_);

  // LOG(LS_ERROR)<<"liuzhen ProcessReceivedPackets";

  // Decode.

  if (!received_packets_.empty()) {

    if (erasure_code_->DecodeFec(&received_packets_, &recovered_packets_) !=

        0) {

      return false;

    }

  }

  // Return recovered packets through callback.

  for (const auto& recovered_packet : recovered_packets_) {

    if (recovered_packet->returned) {

      continue;

    }

    ++packet_counter_.num_recovered_packets;

    if (!recovered_packet_receiver_->OnRecoveredPacket(

            recovered_packet->pkt->data, recovered_packet->pkt->length)) {

      return false;

    }

    recovered_packet->returned = true;

    // Periodically log the incoming packets.

    int64_t now_ms = clock_->TimeInMilliseconds();

    if (now_ms - last_recovered_packet_ms_ > kPacketLogIntervalMs) {

      uint32_t media_ssrc =

          ForwardErrorCorrection::ParseSsrc(recovered_packet->pkt->data);

      LOG(LS_VERBOSE) << "Recovered media packet with SSRC: " << media_ssrc

                      << " from FlexFEC stream with SSRC: " << ssrc_ << ".";

      last_recovered_packet_ms_ = now_ms;

    }

  }

  return true;

}

其中主要流程就是调用DecodeFec方法:

//webrtc/modules/rtp_rtcp/source/forward_error_correction.cc

int ForwardErrorCorrection::DecodeFec(

    ReceivedPacketList* received_packets,

    RecoveredPacketList* recovered_packets) {

  // TODO(marpan/ajm): can we check for multiple ULP headers, and return an

  // error?

  const size_t max_media_packets = fec_header_reader_->MaxMediaPackets();

  if (recovered_packets->size() == max_media_packets) {

    const unsigned int seq_num_diff =

        abs(static_cast<int>(received_packets->front()->seq_num) -

            static_cast<int>(recovered_packets->back()->seq_num));

    if (seq_num_diff > max_media_packets) {

      // A big gap in sequence numbers. The old recovered packets

      // are now useless, so it's safe to do a reset.

      ResetState(recovered_packets);

    }

  }

  InsertPackets(received_packets, recovered_packets);

  AttemptRecovery(recovered_packets);

  return 0;

}

主要看InsertPackets方法:

//webrtc/modules/rtp_rtcp/source/forward_error_correction.cc

void ForwardErrorCorrection::InsertPackets(

    ReceivedPacketList* received_packets,

    RecoveredPacketList* recovered_packets) {

  while (!received_packets->empty()) {

    ReceivedPacket* received_packet = received_packets->front().get();

 

    // Check for discarding oldest FEC packet, to avoid wrong FEC decoding from

    // sequence number wrap-around. Detection of old FEC packet is based on

    // sequence number difference of received packet and oldest packet in FEC

    // packet list.

    // TODO(marpan/holmer): We should be able to improve detection/discarding of

    // old FEC packets based on timestamp information or better sequence number

    // thresholding (e.g., to distinguish between wrap-around and reordering).

    if (!received_fec_packets_.empty()) {

      uint16_t seq_num_diff =

          abs(static_cast<int>(received_packet->seq_num) -

              static_cast<int>(received_fec_packets_.front()->seq_num));

      if (seq_num_diff > 0x3fff) {

        received_fec_packets_.pop_front();

      }

    }

 

    if (received_packet->is_fec) {

      InsertFecPacket(*recovered_packets, received_packet);

    } else {

      InsertMediaPacket(recovered_packets, received_packet);

    }

    // Delete the received packet "wrapper".

    received_packets->pop_front();

  }

  RTC_DCHECK(received_packets->empty());

  DiscardOldRecoveredPackets(recovered_packets);

}

到目前为止,fec相关的方法都是在预处理数据,主要就是给解码线程准备食材,让解码线程吃起来方便。那么解码线程是什么时候启动的呢,接下来我们开始介绍。

//webrtc/video/video_receive_stream.cc

VideoReceiveStream::VideoReceiveStream(

    int num_cpu_cores,

    CongestionController* congestion_controller,

    PacketRouter* packet_router,

    VideoReceiveStream::Config config,

    webrtc::VoiceEngine* voice_engine,

    ProcessThread* process_thread,

    CallStats* call_stats,

    VieRemb* remb)

    : transport_adapter_(config.rtcp_send_transport),

      config_(std::move(config)),

      num_cpu_cores_(num_cpu_cores),

      process_thread_(process_thread),

      clock_(Clock::GetRealTimeClock()),

      decode_thread_(DecodeThreadFunction, this, "DecodingThread"),

      congestion_controller_(congestion_controller),

      call_stats_(call_stats),

      timing_(new VCMTiming(clock_)),

      video_receiver_(clock_, nullptr, this, timing_.get(), this, this),

      stats_proxy_(&config_, clock_),

      rtp_stream_receiver_(

          &video_receiver_,

          congestion_controller_->GetRemoteBitrateEstimator(

              UseSendSideBwe(config_)),

          &transport_adapter_,

          call_stats_->rtcp_rtt_stats(),

          congestion_controller_->pacer(),

          packet_router,

          remb,

          &config_,

          &stats_proxy_,

          process_thread_,

          congestion_controller_->GetRetransmissionRateLimiter(),

          this,  // NackSender

          this,  // KeyFrameRequestSender

          this,  // OnCompleteFrameCallback

          timing_.get()),

      rtp_stream_sync_(&video_receiver_, &rtp_stream_receiver_),

      jitter_buffer_experiment_(

          field_trial::FindFullName("WebRTC-NewVideoJitterBuffer") ==

          "Enabled") {

  LOG(LS_INFO) << "VideoReceiveStream: " << config_.ToString();

 

  RTC_DCHECK(process_thread_);

  RTC_DCHECK(congestion_controller_);

  RTC_DCHECK(call_stats_);

 

  RTC_DCHECK(!config_.decoders.empty());

  std::set<int> decoder_payload_types;

  for (const Decoder& decoder : config_.decoders) {

    RTC_CHECK(decoder.decoder);

    RTC_CHECK(decoder_payload_types.find(decoder.payload_type) ==

              decoder_payload_types.end())

        << "Duplicate payload type (" << decoder.payload_type

        << ") for different decoders.";

    decoder_payload_types.insert(decoder.payload_type);

  }

 

  video_receiver_.SetRenderDelay(config.render_delay_ms);

 

  if (jitter_buffer_experiment_) {

    jitter_estimator_.reset(new VCMJitterEstimator(clock_));

    frame_buffer_.reset(new video_coding::FrameBuffer(

        clock_, jitter_estimator_.get(), timing_.get()));

  }

 

  process_thread_->RegisterModule(&video_receiver_);

  process_thread_->RegisterModule(&rtp_stream_sync_);

}

VideoReceiveStream定义对象的时候会声明一个decode_thread_解码线程,启动该线程就会开始解码。

//webrtc/video/video_receive_stream.cc

bool VideoReceiveStream::DecodeThreadFunction(void* ptr) {

  static_cast<VideoReceiveStream*>(ptr)->Decode();

  return true;

}

线程调用的解码方法如下: 

//webrtc/video/video_receive_stream.cc

void VideoReceiveStream::Decode() {

  static const int kMaxDecodeWaitTimeMs = 50;

  if (jitter_buffer_experiment_) {

    static const int kMaxWaitForFrameMs = 3000;

    std::unique_ptr<video_coding::FrameObject> frame;

    video_coding::FrameBuffer::ReturnReason res =

        frame_buffer_->NextFrame(kMaxWaitForFrameMs, &frame);

 

    if (res == video_coding::FrameBuffer::ReturnReason::kStopped)

      return;

 

    if (frame) {

      if (video_receiver_.Decode(frame.get()) == VCM_OK)

        rtp_stream_receiver_.FrameDecoded(frame->picture_id);

    } else {

      LOG(LS_WARNING) << "No decodable frame in " << kMaxWaitForFrameMs

                      << " ms, requesting keyframe.";

      RequestKeyFrame();

    }

  } else {

    video_receiver_.Decode(kMaxDecodeWaitTimeMs);

  }

}

这里主要看Decode方法: 

//webrtc/modules/video_coding/video_receiver.cc

int32_t VideoReceiver::Decode(uint16_t maxWaitTimeMs) {

  bool prefer_late_decoding = false;

  {

    rtc::CritScope cs(&receive_crit_);

    prefer_late_decoding = _codecDataBase.PrefersLateDecoding();

  }

 

  VCMEncodedFrame* frame =

      _receiver.FrameForDecoding(maxWaitTimeMs, prefer_late_decoding);//重点1

 

  if (!frame)

    return VCM_FRAME_NOT_READY;

 

  {

    rtc::CritScope cs(&process_crit_);

    if (drop_frames_until_keyframe_) {

      // Still getting delta frames, schedule another keyframe request as if

      // decode failed.

      if (frame->FrameType() != kVideoFrameKey) {

        _scheduleKeyRequest = true;

        _receiver.ReleaseFrame(frame);

        return VCM_FRAME_NOT_READY;

      }

      drop_frames_until_keyframe_ = false;

    }

  }

 

  if (pre_decode_image_callback_) {

    EncodedImage encoded_image(frame->EncodedImage());

    int qp = -1;

    if (qp_parser_.GetQp(*frame, &qp)) {

      encoded_image.qp_ = qp;

    }

    pre_decode_image_callback_->OnEncodedImage(encoded_image,

                                               frame->CodecSpecific(), nullptr);

  }

 

  rtc::CritScope cs(&receive_crit_);

  // If this frame was too late, we should adjust the delay accordingly

  _timing->UpdateCurrentDelay(frame->RenderTimeMs(),

                              clock_->TimeInMilliseconds());

 

  if (first_frame_received_()) {

    LOG(LS_INFO) << "Received first "

                 << (frame->Complete() ? "complete" : "incomplete")

                 << " decodable video frame";

  }

 

  const int32_t ret = Decode(*frame);//重点2

  _receiver.ReleaseFrame(frame);

  return ret;

}

注意,这里我们更应该关心Decode处理的frame数据来自哪里,看FrameForDecoding方法:

//webrtc/modules/video_coding/receiver.cc

VCMEncodedFrame* VCMReceiver::FrameForDecoding(uint16_t max_wait_time_ms,

                                               bool prefer_late_decoding) {

  const int64_t start_time_ms = clock_->TimeInMilliseconds();

  uint32_t frame_timestamp = 0;

  int min_playout_delay_ms = -1;

  int max_playout_delay_ms = -1;

  int64_t render_time_ms = 0;

  // Exhaust wait time to get a complete frame for decoding.

  VCMEncodedFrame* found_frame =

      jitter_buffer_.NextCompleteFrame(max_wait_time_ms);

 

  if (found_frame) {

    frame_timestamp = found_frame->TimeStamp();

    min_playout_delay_ms = found_frame->EncodedImage().playout_delay_.min_ms;

    max_playout_delay_ms = found_frame->EncodedImage().playout_delay_.max_ms;

  } else {

    if (!jitter_buffer_.NextMaybeIncompleteTimestamp(&frame_timestamp))

      return nullptr;

  }

 

  if (min_playout_delay_ms >= 0)

    timing_->set_min_playout_delay(min_playout_delay_ms);

 

  if (max_playout_delay_ms >= 0)

    timing_->set_max_playout_delay(max_playout_delay_ms);

 

  // We have a frame - Set timing and render timestamp.

  timing_->SetJitterDelay(jitter_buffer_.EstimatedJitterMs());

  const int64_t now_ms = clock_->TimeInMilliseconds();

  timing_->UpdateCurrentDelay(frame_timestamp);

  render_time_ms = timing_->RenderTimeMs(frame_timestamp, now_ms);

  // Check render timing.

  bool timing_error = false;

  // Assume that render timing errors are due to changes in the video stream.

  if (render_time_ms < 0) {

    timing_error = true;

  } else if (std::abs(render_time_ms - now_ms) > max_video_delay_ms_) {

    int frame_delay = static_cast<int>(std::abs(render_time_ms - now_ms));

    LOG(LS_WARNING) << "A frame about to be decoded is out of the configured "

                    << "delay bounds (" << frame_delay << " > "

                    << max_video_delay_ms_

                    << "). Resetting the video jitter buffer.";

    timing_error = true;

  } else if (static_cast<int>(timing_->TargetVideoDelay()) >

             max_video_delay_ms_) {

    LOG(LS_WARNING) << "The video target delay has grown larger than "

                    << max_video_delay_ms_ << " ms. Resetting jitter buffer.";

    timing_error = true;

  }

 

  if (timing_error) {

    // Timing error => reset timing and flush the jitter buffer.

    jitter_buffer_.Flush();

    timing_->Reset();

    return NULL;

  }

 

  if (prefer_late_decoding) {

    // Decode frame as close as possible to the render timestamp.

    const int32_t available_wait_time =

        max_wait_time_ms -

        static_cast<int32_t>(clock_->TimeInMilliseconds() - start_time_ms);

    uint16_t new_max_wait_time =

        static_cast<uint16_t>(VCM_MAX(available_wait_time, 0));

    uint32_t wait_time_ms =

        timing_->MaxWaitingTime(render_time_ms, clock_->TimeInMilliseconds());

    if (new_max_wait_time < wait_time_ms) {

      // We're not allowed to wait until the frame is supposed to be rendered,

      // waiting as long as we're allowed to avoid busy looping, and then return

      // NULL. Next call to this function might return the frame.

      render_wait_event_->Wait(new_max_wait_time);

      return NULL;

    }

    // Wait until it's time to render.

    render_wait_event_->Wait(wait_time_ms);

  }

 

  // Extract the frame from the jitter buffer and set the render time.

  VCMEncodedFrame* frame = jitter_buffer_.ExtractAndSetDecode(frame_timestamp);

  if (frame == NULL) {

    return NULL;

  }

  frame->SetRenderTime(render_time_ms);

  TRACE_EVENT_ASYNC_STEP1