11use std:: fmt:: { Debug , Formatter } ;
2+ use std:: sync:: { Arc , Weak } ;
3+ use std:: time:: Duration ;
24
35use amf:: amf0;
46use amf:: amf0:: Value ;
57use byteorder:: { BigEndian , ByteOrder , WriteBytesExt } ;
68use chrono:: Local ;
79use num:: FromPrimitive ;
10+ use smol:: channel:: Receiver ;
811use smol:: io:: { AsyncReadExt , AsyncWriteExt } ;
912use smol:: net:: TcpStream ;
13+ use smol_timeout:: TimeoutExt ;
1014
15+ use crate :: { nalu_eventbus, rtmp_msg_eventbus} ;
16+ use crate :: protocol:: flv:: FlvTag ;
1117use crate :: util:: { bytes_hex_format, spawn_and_log_error} ;
12- use smol:: channel:: { Receiver } ;
13- use crate :: nalu_eventbus;
14- use std:: sync:: { Arc , Weak } ;
15- use smol_timeout:: TimeoutExt ;
16- use std:: time:: Duration ;
18+ use crate :: protocol:: flv;
1719
1820#[ derive( Clone , Debug ) ]
1921pub struct Handshake0 {
@@ -106,16 +108,22 @@ pub struct RtmpContext {
106108 pub chunk_size : u32 ,
107109 pub remain_message_length : u32 ,
108110 pub recv_bytes_num : u32 ,
109- pub arc_receiver : Arc < Receiver < Vec < u8 > > > ,
111+ pub nalu_rx : Arc < Receiver < Vec < u8 > > > ,
110112 pub peer_addr : String ,
111113 pub stream_name : String ,
114+ pub flv_rx : Arc < Receiver < RtmpMessage > >
112115}
113116
114117impl RtmpContext {
115118 pub fn new ( stream : TcpStream ) -> Self {
116- let receiver = nalu_eventbus ( ) . register_receiver ( ) ;
117- let receiver = Arc :: new ( receiver) ;
118- let nalu_rx_weak = Arc :: downgrade ( & receiver) ;
119+ let nalu_rx = nalu_eventbus ( ) . register_receiver ( ) ;
120+ let nalu_rx = Arc :: new ( nalu_rx) ;
121+ let nalu_rx_weak = Arc :: downgrade ( & nalu_rx) ;
122+
123+ let flv_rx = rtmp_msg_eventbus ( ) . register_receiver ( ) ;
124+ let flv_rx = Arc :: new ( flv_rx) ;
125+ let flv_rx_weak = Arc :: downgrade ( & flv_rx) ;
126+
119127 let peer_addr = stream. peer_addr ( ) . map ( |a| a. to_string ( ) ) . unwrap_or_default ( ) ;
120128
121129 async fn handle_nalu_rx ( nalu_rx : Weak < Receiver < Vec < u8 > > > , peer_addr : String ) -> anyhow:: Result < ( ) > {
@@ -144,7 +152,39 @@ impl RtmpContext {
144152 Ok ( ( ) )
145153 }
146154
155+ async fn handle_flv_rx ( flv_rx : Weak < Receiver < RtmpMessage > > , peer_addr : String ) -> anyhow:: Result < ( ) > {
156+ let tmp_dir = "tmp" ;
157+ if smol:: fs:: read_dir ( tmp_dir) . await . is_err ( ) {
158+ smol:: fs:: create_dir_all ( tmp_dir) . await ?;
159+ }
160+
161+ let mut file = smol:: fs:: OpenOptions :: new ( )
162+ . create ( true )
163+ . write ( true )
164+ . truncate ( true )
165+ . open ( "tmp/output.flv" )
166+ . await ?;
167+
168+ // write header
169+ file. write_all ( & flv:: FLV_HEADER_WITH_TAG0 ) . await ?;
170+
171+ while let Some ( rx) = flv_rx. upgrade ( ) {
172+ if let Some ( rs) = rx. recv ( ) . timeout ( Duration :: from_secs ( 1 ) ) . await {
173+ if let Ok ( msg) = rs {
174+ let flv_tag = FlvTag :: from_rtmp_message ( msg) ?;
175+ file. write_all ( flv_tag. as_ref ( ) ) . await ?;
176+ file. write_all ( & ( flv_tag. as_ref ( ) . len ( ) as u32 ) . to_be_bytes ( ) ) . await ?;
177+ } else {
178+ break ;
179+ }
180+ }
181+ }
182+ log:: warn!( "[peer={}][handle_flv_rx] closed" , peer_addr) ;
183+ Ok ( ( ) )
184+ }
185+
147186 spawn_and_log_error ( handle_nalu_rx ( nalu_rx_weak, peer_addr. clone ( ) ) ) ;
187+ spawn_and_log_error ( handle_flv_rx ( flv_rx_weak, peer_addr. clone ( ) ) ) ;
148188
149189 RtmpContext {
150190 stream,
@@ -157,7 +197,8 @@ impl RtmpContext {
157197 chunk_size : 128 ,
158198 remain_message_length : 0 ,
159199 recv_bytes_num : 0 ,
160- arc_receiver : receiver,
200+ nalu_rx,
201+ flv_rx,
161202 peer_addr,
162203 stream_name : Default :: default ( ) ,
163204 }
@@ -178,7 +219,6 @@ impl RtmpContext {
178219#[ derive( Debug , Clone ) ]
179220pub struct RtmpMessageHeader {
180221 /// chunk stream id
181- /// 2 (low level), 3 (high level), 4 (control stream), 5 (video) and 6 (audio).
182222 pub csid : u8 ,
183223 pub timestamp : u32 ,
184224 pub message_length : u32 ,
0 commit comments