@@ -7,7 +7,10 @@ use smol::net::{SocketAddr, TcpListener, TcpStream};
77
88use crate :: protocol:: h264:: Nalu ;
99use crate :: rtmp_server:: { eventbus_map, video_header_map} ;
10- use crate :: protocol:: rtmp:: ChunkMessageType ;
10+ use crate :: protocol:: rtmp:: { ChunkMessageType , RtmpMessage } ;
11+ use smol:: channel:: Receiver ;
12+ use smol:: stream:: { Stream } ;
13+ use smol:: stream;
1114
1215#[ allow( unused) ]
1316pub ( crate ) async fn run_server ( addr : & str ) -> anyhow:: Result < ( ) > {
@@ -53,26 +56,56 @@ async fn handle_connection(raw_stream: TcpStream, addr: SocketAddr) -> anyhow::R
5356 let rx = el. register_receiver ( ) ;
5457 std:: mem:: drop ( el) ;
5558
59+ let rx = rtmp_rx_into_nalu_rx ( rx) ;
60+ futures:: pin_mut!( rx) ;
61+ while let Some ( nalu) = StreamExt :: next ( & mut rx) . await {
62+ outgoing. send ( Message :: binary ( nalu. as_ref ( ) ) ) . await ?;
63+ }
64+
5665 // 第一个关键帧是否出现
57- let mut first_key_frame = false ;
66+ // let mut first_key_frame = false;
67+ // while let Ok(msg) = rx.recv().await {
68+ // if msg.header.message_type != ChunkMessageType::VideoMessage {
69+ // continue;
70+ // }
71+ //
72+ // for nalu in Nalu::from_rtmp_message(&msg) {
73+ // if !first_key_frame {
74+ // if nalu.is_key_frame {
75+ // first_key_frame = true;
76+ // } else {
77+ // continue;
78+ // }
79+ // }
80+ // outgoing.send(Message::binary(nalu.as_ref())).await?;
81+ // }
82+ // }
83+ }
84+ log:: info!( "WebSocket disconnected: {}, stream_name={}" , addr, stream_name) ;
85+ Ok ( ( ) )
86+ }
87+
88+ // 把RMTP流转换城NALU流,并保证首帧为关键帧
89+ fn rtmp_rx_into_nalu_rx ( rx : Receiver < RtmpMessage > ) -> impl Stream < Item =Nalu > {
90+ stream:: unfold ( ( rx, false ) , |( rx, first_key_frame) | async move {
5891 while let Ok ( msg) = rx. recv ( ) . await {
5992 if msg. header . message_type != ChunkMessageType :: VideoMessage {
6093 continue ;
6194 }
6295
63- for nalu in Nalu :: from_rtmp_message ( & msg) {
64- if !first_key_frame {
65- if nalu. is_key_frame {
66- first_key_frame = true ;
67- } else {
68- continue ;
69- }
70- }
71- outgoing. send ( Message :: binary ( nalu. as_ref ( ) ) ) . await ?;
96+ let nalus = Nalu :: from_rtmp_message ( & msg) ;
97+ if first_key_frame {
98+ return Some ( ( stream:: iter ( nalus) , ( rx, first_key_frame) ) ) ;
7299 }
100+
101+ let nalus = nalus. into_iter ( ) . skip_while ( |nalu| !nalu. is_key_frame ) . collect :: < Vec < Nalu > > ( ) ;
102+
103+ if nalus. is_empty ( ) {
104+ continue ;
105+ }
106+ return Some ( ( stream:: iter ( nalus) , ( rx, true ) ) ) ;
73107 }
74- }
75- log:: info!( "WebSocket disconnected: {}, stream_name={}" , addr, stream_name) ;
76- Ok ( ( ) )
108+ None
109+ } ) . flatten ( )
77110}
78111
0 commit comments