Skip to content

Commit 6960d59

Browse files
committed
feat: 实现缓存sps/pps帧信息
1 parent 094cfa8 commit 6960d59

2 files changed

Lines changed: 53 additions & 65 deletions

File tree

src/main.rs

Lines changed: 31 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use protocol::rtmp::*;
1313
use crate::util::{bytes_hex_format, print_hex, spawn_and_log_error, gen_random_bytes};
1414
use crate::eventbus::EventBus;
1515
use once_cell::sync::OnceCell;
16+
use dashmap::DashMap;
1617

1718
mod util;
1819
mod protocol;
@@ -28,6 +29,11 @@ fn rtmp_msg_eventbus() -> &'static EventBus<RtmpMessage> {
2829
INSTANCE.get_or_init(|| EventBus::with_label("RTMP_MSG"))
2930
}
3031

32+
fn sps_message_map() -> &'static DashMap<String, RtmpMessage> {
33+
static INSTANCE: OnceCell<DashMap<String, RtmpMessage>> = OnceCell::new();
34+
INSTANCE.get_or_init(|| DashMap::new())
35+
}
36+
3137
/// TCP 连接处理
3238
async fn accept_loop(addr: &str) -> anyhow::Result<()> {
3339
let listener = TcpListener::bind(addr.clone()).await?;
@@ -65,6 +71,7 @@ async fn connection_loop(stream: TcpStream) -> anyhow::Result<()> {
6571
let buffer_length = BigEndian::read_u32(&bytes[6..10]);
6672
log::info!("[peer={}] C->S, [{}] set buffer length={}, streamId={}", ctx.peer_addr, message.message_type_desc(), buffer_length, stream_id);
6773
response_play(&mut ctx, stream_id).await?;
74+
// TODO 使用动态的值
6875
send_meta_data_for_play(&mut ctx, &RtmpMetaData {
6976
width: 1280.0,
7077
height: 720.0,
@@ -75,24 +82,20 @@ async fn connection_loop(stream: TcpStream) -> anyhow::Result<()> {
7582
frame_rate: 30.0,
7683
duration: 30.0,
7784
}).await?;
78-
// TODO send video stream to peer
79-
fn is_key_frame(msg: &RtmpMessage) -> bool {
80-
if let ChunkMessageType::VideoMessage = msg.header.message_type {
81-
msg.body[0] >> 4 == 1
82-
} else {
83-
false
85+
86+
ctx.ctx_begin_timestamp = Local::now().timestamp_millis();
87+
88+
// 发送sps/pps帧
89+
if let Some(msg) = sps_message_map().get(&ctx.stream_name) {
90+
let chunks = msg.split_chunks_bytes(ctx.chunk_size);
91+
for chunk in chunks {
92+
ctx.write_to_peer(&chunk).await?;
8493
}
8594
}
8695

87-
let mut wait_key_frame = true;
8896
let receiver = rtmp_msg_eventbus().register_receiver();
8997
while let Ok(mut msg) = receiver.recv().await {
90-
if wait_key_frame && !is_key_frame(&msg) {
91-
continue;
92-
}
93-
wait_key_frame = false;
94-
95-
msg.header.msid = 1;
98+
// log::info!("[peer={}] frame type = {:#04X}, acv_packet_type={:#04X}", &ctx.peer_addr, msg.body[0], msg.body[1]);
9699
msg.header.timestamp = (Local::now().timestamp_millis() - ctx.ctx_begin_timestamp) as u32;
97100
let chunks = msg.split_chunks_bytes(ctx.chunk_size);
98101
for chunk in chunks {
@@ -123,8 +126,14 @@ async fn connection_loop(stream: TcpStream) -> anyhow::Result<()> {
123126
response_create_stream(&mut ctx, &values[1]).await?;
124127
}
125128
"publish" => {
129+
ctx.stream_name = values[3].try_as_str().unwrap_or_default().to_string();
130+
log::info!("[peer={}] stream_name={}", ctx.peer_addr, ctx.stream_name);
126131
response_publish(&mut ctx).await?;
127132
}
133+
"play" => {
134+
ctx.stream_name = values[3].try_as_str().unwrap_or_default().to_string();
135+
log::info!("[peer={}] stream_name={}", ctx.peer_addr, ctx.stream_name);
136+
}
128137
_ => ()
129138
}
130139
}
@@ -145,8 +154,13 @@ async fn connection_loop(stream: TcpStream) -> anyhow::Result<()> {
145154

146155
ChunkMessageType::VideoMessage => {
147156
log::debug!("[peer={}] C->S, [{}] header={:?}", ctx.peer_addr, message.message_type_desc(), message.header);
148-
rtmp_msg_eventbus().publish(message.clone()).await;
149-
handle_video_data(&message.body, &ctx);
157+
if message.body[0] == 0x17 && message.body[1] == 0x00 {
158+
sps_message_map().insert(ctx.stream_name.clone(), message.clone());
159+
log::info!("[peer={}] C->S, cache sps/pps message, stream_name={}", ctx.peer_addr, ctx.stream_name);
160+
} else {
161+
rtmp_msg_eventbus().publish(message.clone()).await;
162+
}
163+
// handle_video_data(&message.body, &ctx);
150164
}
151165
ChunkMessageType::AudioMessage => {
152166
// sender.send(message).await?;
@@ -156,25 +170,6 @@ async fn connection_loop(stream: TcpStream) -> anyhow::Result<()> {
156170
}
157171
}
158172
}
159-
160-
// log::info!("C->S, others:");
161-
// let mut i = 0;
162-
// let mut arr: [char; 8] = ['.'; 8];
163-
// loop {
164-
// let byte = stream.read_one_return().await?;
165-
// print!("{:#04X}", byte);
166-
// if byte.is_ascii_graphic() {
167-
// arr[i % 8] = byte as char;
168-
// } else {
169-
// arr[i % 8] = '.';
170-
// }
171-
// print!(" ");
172-
// std::io::stdout().flush()?;
173-
// i += 1;
174-
// if i % 8 == 0 {
175-
// println!(" {}", arr.iter().collect::<String>());
176-
// }
177-
// }
178173
}
179174

180175
/// 处理RTMP握手流程
@@ -356,7 +351,7 @@ async fn response_publish(ctx: &mut RtmpContext) -> anyhow::Result<()> {
356351

357352
async fn response_play(ctx: &mut RtmpContext, stream_id: u32) -> anyhow::Result<()> {
358353
{
359-
let mut rs: Vec<u8> = vec![
354+
let rs: Vec<u8> = vec![
360355
0x02, 0x00, 0x00, 0x00,
361356
0x00, 0x00, 0x02, 0x04,
362357
0x00, 0x00, 0x00, 0x01,
@@ -503,6 +498,6 @@ async fn send_meta_data_for_play(ctx: &mut RtmpContext, meta_data: &RtmpMetaData
503498

504499
fn main() -> anyhow::Result<()> {
505500
util::init_logger();
506-
let server = "127.0.0.1:1935";
501+
let server = "127.0.0.1:11935";
507502
smol::block_on(accept_loop(server))
508503
}

src/protocol/rtmp.rs

Lines changed: 22 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ pub struct RtmpContext {
108108
pub recv_bytes_num: u32,
109109
pub arc_receiver: Arc<Receiver<Vec<u8>>>,
110110
pub peer_addr: String,
111+
pub stream_name: String,
111112
}
112113

113114
impl RtmpContext {
@@ -158,6 +159,7 @@ impl RtmpContext {
158159
recv_bytes_num: 0,
159160
arc_receiver: receiver,
160161
peer_addr,
162+
stream_name: Default::default(),
161163
}
162164
}
163165

@@ -190,11 +192,20 @@ pub struct RtmpMessageHeader {
190192

191193
impl RtmpMessageHeader {
192194
pub fn to_bytes(&self) -> Vec<u8> {
195+
let enable_extend_timestamp_field = self.timestamp >= 0xFFFFFF;
196+
193197
let mut rs = vec![self.csid];
194-
rs.write_u24::<BigEndian>(self.timestamp).unwrap();
198+
if enable_extend_timestamp_field {
199+
rs.write_u24::<BigEndian>(0xFFFFFF).unwrap();
200+
} else {
201+
rs.write_u24::<BigEndian>(self.timestamp).unwrap();
202+
}
195203
rs.write_u24::<BigEndian>(self.message_length).unwrap();
196204
rs.write_u8(self.message_type_id).unwrap();
197205
rs.write_u32::<BigEndian>(self.msid).unwrap();
206+
if enable_extend_timestamp_field {
207+
rs.write_u32::<BigEndian>(self.timestamp).unwrap();
208+
}
198209
rs
199210
}
200211
}
@@ -231,11 +242,17 @@ impl RtmpMessage {
231242
// 时间差值置零
232243
ctx.last_timestamp_delta = 0;
233244
ctx.last_timestamp = BigEndian::read_u24(&h[0..3]);
245+
234246
ctx.last_message_length = BigEndian::read_u24(&h[3..6]);
235247
ctx.remain_message_length = 0;
236248
ctx.last_message_type_id = h[6];
237249
ctx.last_message_stream_id = BigEndian::read_u32(&h[7..11]);
238250
ctx.recv_bytes_num += 12;
251+
if ctx.last_timestamp >= 0xFFFFFF {
252+
let extend = ctx.read_exact_from_peer(4).await?;
253+
ctx.last_timestamp = BigEndian::read_u32(&extend[0..4]);
254+
ctx.recv_bytes_num += 4;
255+
}
239256

240257
(ctx.last_timestamp,
241258
ctx.last_message_length,
@@ -510,6 +527,7 @@ pub fn read_all_amf_value(bytes: &[u8]) -> Option<Vec<Value>> {
510527
/// 0
511528
/// See ISO 14496-12, 8.15.3 for an explanation of composition
512529
/// times. The offset in an FLV file is always in milliseconds.
530+
#[allow(unused)]
513531
pub fn handle_video_data(bytes: &[u8], ctx: &RtmpContext) {
514532
let frame_type = bytes[0];
515533
let mut read_index = 1;
@@ -584,38 +602,13 @@ pub fn handle_video_data(bytes: &[u8], ctx: &RtmpContext) {
584602
} else {
585603
unreachable!("unknown acv packet type")
586604
};
587-
}
588605

589-
fn handle_nalu(nalu_bytes: Vec<u8>) {
590-
smol::block_on(nalu_eventbus().publish(nalu_bytes));
606+
fn handle_nalu(nalu_bytes: Vec<u8>) {
607+
smol::block_on(nalu_eventbus().publish(nalu_bytes));
608+
}
591609
}
592610

593-
fn nalu_type_desc(nalu_type: &u8) -> String {
594-
let priority: String = match (nalu_type & 0x60) >> 5 {
595-
0 => "DISPOSABLE".into(),
596-
1 => "LOW".into(),
597-
2 => "HIGH".into(),
598-
3 => "HIGHEST".into(),
599-
_ => "UNKNOWN".into(),
600-
};
601611

602-
let t: String = match nalu_type & 0x1F {
603-
1 => "SLICE".into(),
604-
2 => "DPA".into(),
605-
3 => "DPB".into(),
606-
4 => "DPC".into(),
607-
5 => "IDR".into(),
608-
6 => "SEI".into(),
609-
7 => "SPS".into(),
610-
8 => "PPS".into(),
611-
9 => "AUD".into(),
612-
10 => "EOSEQ".into(),
613-
11 => "EOSTREAM".into(),
614-
12 => "FILL".into(),
615-
_ => "UNKNOWN".into(),
616-
};
617612

618-
format!("{}::{}", priority, t)
619-
}
620613

621614

0 commit comments

Comments
 (0)