forked from nintha/river
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathws_h264.rs
More file actions
162 lines (138 loc) · 5.17 KB
/
Copy pathws_h264.rs
File metadata and controls
162 lines (138 loc) · 5.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
use async_tungstenite::tungstenite::handshake::server::{ErrorResponse, Request, Response};
use async_tungstenite::tungstenite::Message;
use crossbeam_utils::atomic::AtomicCell;
use futures::sink::SinkExt;
use futures::StreamExt;
use smol::net::{SocketAddr, TcpListener, TcpStream};
use crate::protocol::h264::Nalu;
use crate::rtmp_server::{eventbus_map, video_header_map, audio_header_map};
use crate::protocol::rtmp::{ChunkMessageType, RtmpMessage};
use smol::channel::Receiver;
use smol::stream::{Stream};
use smol::stream;
use crate::protocol::aac::{AAC, ADTS};
#[allow(unused)]
pub async fn run_server(addr: String) -> anyhow::Result<()> {
// Create the event loop and TCP listener we'll accept connections on.
let try_socket = TcpListener::bind(&addr).await;
let listener = try_socket.expect("Failed to bind");
log::info!("Websocket Listening on: {}", addr);
// Let's spawn the handling of each connection in a separate task.
while let Ok((stream, addr)) = listener.accept().await {
smol::spawn(handle_connection(stream, addr)).detach();
}
Ok(())
}
async fn handle_connection(raw_stream: TcpStream, addr: SocketAddr) -> anyhow::Result<()> {
log::info!("Incoming TCP connection from: {}", addr);
let uri = AtomicCell::default();
let callback = |req: &Request, res: Response| -> Result<Response, ErrorResponse>{
uri.store(req.uri().clone());
Ok(res)
};
let ws_stream = async_tungstenite::accept_hdr_async(raw_stream, callback).await?;
let (mut outgoing, _incoming) = ws_stream.split();
let uri = uri.take();
let stream_name = uri.path().strip_prefix("/websocket/")
.ok_or(anyhow::anyhow!("invalid uri path"))?;
log::info!("WebSocket connection established: {}, stream_name={}", addr, stream_name);
// send video header
if let Some(header) = video_header_map().get(stream_name) {
for mix in Mix::from_rtmp_message(&header, &stream_name) {
outgoing.send(Message::binary(mix.to_bytes())).await?;
}
}
if let Some(el) = eventbus_map().get(stream_name) {
let rx = el.register_receiver();
std::mem::drop(el);
let rx = rtmp_rx_into_mix_rx(rx, stream_name.to_string());
futures::pin_mut!(rx);
while let Some(mix) = StreamExt::next(&mut rx).await {
outgoing.send(Message::binary(mix.to_bytes())).await?;
}
}
log::info!("WebSocket disconnected: {}, stream_name={}", addr, stream_name);
Ok(())
}
// 把RMTP流转换城MIX流,并保证首帧为关键帧
fn rtmp_rx_into_mix_rx(rx: Receiver<RtmpMessage>, stream_name: String) -> impl Stream<Item=Mix> {
stream::unfold((rx, false, stream_name), |(rx, first_key_frame, stream_name)| async move {
while let Ok(msg) = rx.recv().await {
let mixes = Mix::from_rtmp_message(&msg, &stream_name);
if mixes.is_empty() {
continue;
}
if first_key_frame {
return Some((stream::iter(mixes), (rx, first_key_frame, stream_name)));
}
let mut mixes = mixes.into_iter().skip_while(|mix| !mix.is_key_frame()).collect::<Vec<Mix>>();
// 消息堆积,丢弃视频非关键帧
if rx.len() > 30 {
mixes.retain(|x| x.is_audio() || x.is_key_frame());
}
if mixes.is_empty() {
continue;
}
return Some((stream::iter(mixes), (rx, true, stream_name)));
}
None
}).flatten()
}
/// 媒体混合数据
enum Mix {
Video(Nalu),
Audio(ADTS),
}
impl Mix {
const VIDEO_FLAG: u8 = 0x00;
const AUDIO_FLAG: u8 = 0x01;
pub fn from_rtmp_message(msg: &RtmpMessage, stream_name: &str) -> Vec<Self> {
match msg.header.message_type {
ChunkMessageType::VideoMessage => {
Nalu::from_rtmp_message(&msg).into_iter().map(Mix::Video).collect()
}
ChunkMessageType::AudioMessage => {
if let Some(header) = audio_header_map().get(stream_name) {
AAC::from_rtmp_message(&msg, header.value())
.into_iter()
.map(|x| x.to_adts())
.flatten()
.map(Mix::Audio)
.collect()
} else {
vec![]
}
}
_ => vec![]
}
}
#[allow(unused)]
pub fn is_video(&self) -> bool {
matches!(self, Mix::Video(_))
}
#[allow(unused)]
pub fn is_audio(&self) -> bool {
matches!(self, Mix::Audio(_))
}
pub fn is_key_frame(&self) -> bool {
if let Mix::Video(nalu) = self {
nalu.is_key_frame
} else {
false
}
}
fn to_bytes(&self) -> Vec<u8> {
match self {
Mix::Video(nalu) => {
let mut bytes = vec![Mix::VIDEO_FLAG];
bytes.extend_from_slice(nalu.as_ref());
bytes
}
Mix::Audio(aac) => {
let mut bytes = vec![Mix::AUDIO_FLAG];
bytes.extend_from_slice(&aac.to_bytes());
bytes
}
}
}
}