forked from nintha/river
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrtmp.rs
More file actions
535 lines (491 loc) · 17.8 KB
/
Copy pathrtmp.rs
File metadata and controls
535 lines (491 loc) · 17.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
use std::fmt::{Debug, Formatter};
use amf::amf0;
use amf::amf0::Value;
use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
use chrono::Local;
use num::FromPrimitive;
use smol::io::{AsyncReadExt, AsyncWriteExt};
use smol::net::TcpStream;
use crate::rtmp_server::eventbus_map;
use crate::util::bytes_hex_format;
use std::convert::TryFrom;
#[derive(Clone, Debug)]
pub struct Handshake0 {
/// Version (8 bits): In C0, this field identifies the RTMP version
/// requested by the client. In S0, this field identifies the RTMP
/// version selected by the server. The version defined by this
/// specification is 3. Values 0-2 are deprecated values used by
/// earlier proprietary products; 4-31 are reserved for future
/// implementations; and 32-255 are not allowed (to allow
/// distinguishing RTMP from text-based protocols, which always start
/// with a printable character). A server that does not recognize the
/// client’s requested version SHOULD respond with 3. The client MAY
/// choose to degrade to version 3, or to abandon the handshake.
pub version: u8,
}
impl Handshake0 {
pub const S0_V3: Handshake0 = Handshake0 { version: 3 };
pub fn to_bytes(&self) -> Vec<u8> {
vec![self.version.to_owned()]
}
}
#[derive(Clone, Debug)]
pub struct Handshake1 {
/// Time (4 bytes): This field contains a timestamp, which SHOULD be
/// used as the epoch for all future chunks sent from this endpoint.
/// This may be 0, or some arbitrary value. To synchronize multiple
/// chunkstreams, the endpoint may wish to send the current value of
/// the other chunkstream’s timestamp.
pub time: u32,
/// Zero (4 bytes): This field MUST be all 0s.
pub zero: u32,
/// Random data (1528 bytes): This field can contain any arbitrary
/// values. Since each endpoint has to distinguish between the
/// response to the handshake it has initiated and the handshake
/// initiated by its peer,this data SHOULD send something sufficiently
/// random. But there is no need for cryptographically-secure
/// randomness, or even dynamic values.
pub random_data: Vec<u8>,
}
impl Handshake1 {
pub const PACKET_LENGTH: u32 = 1536;
pub fn to_bytes(&self) -> Vec<u8> {
let mut v = Vec::new();
v.append(self.time.to_be_bytes().to_vec().as_mut());
v.append(self.zero.to_be_bytes().to_vec().as_mut());
v.append(self.random_data.clone().as_mut());
v
}
}
#[derive(Clone, Debug)]
pub struct Handshake2 {
/// Time (4 bytes): This field MUST contain the timestamp sent by the
/// peer in S1 (for C2) or C1 (for S2).
pub time: u32,
/// Time2 (4 bytes): This field MUST contain the timestamp at which the
/// previous packet(s1 or c1) sent by the peer was read.
pub time2: u32,
/// Random echo (1528 bytes): This field MUST contain the random data
/// field sent by the peer in S1 (for C2) or S2 (for C1). Either peer
/// can use the time and time2 fields together with the current
/// timestamp as a quick estimate of the bandwidth and/or latency of
/// the connection, but this is unlikely to be useful.
pub random_echo: Vec<u8>,
}
impl Handshake2 {
pub const PACKET_LENGTH: u32 = 1536;
pub fn to_bytes(&self) -> Vec<u8> {
let mut v = Vec::new();
v.append(self.time.to_be_bytes().to_vec().as_mut());
v.append(self.time2.to_be_bytes().to_vec().as_mut());
v.append(self.random_echo.clone().as_mut());
v
}
}
#[derive(Debug)]
pub struct RtmpContext {
pub stream: TcpStream,
pub ctx_begin_timestamp: i64,
pub last_timestamp: u32,
pub last_timestamp_delta: u32,
pub last_message_length: u32,
pub last_message_type_id: u8,
pub last_message_stream_id: u32,
pub chunk_size: u32,
pub remain_message_length: u32,
pub recv_bytes_num: u32,
pub peer_addr: String,
pub stream_name: String,
pub is_publisher: bool,
}
impl RtmpContext {
pub fn new(stream: TcpStream) -> Self {
let peer_addr = stream
.peer_addr()
.map(|a| a.to_string())
.unwrap_or_default();
RtmpContext {
stream,
ctx_begin_timestamp: Local::now().timestamp_millis(),
last_timestamp_delta: 0,
last_timestamp: 0,
last_message_length: 0,
last_message_type_id: 0,
last_message_stream_id: 0,
chunk_size: 128,
remain_message_length: 0,
recv_bytes_num: 0,
peer_addr,
stream_name: Default::default(),
is_publisher: false,
}
}
pub async fn read_exact_from_peer(&mut self, bytes_num: u32) -> anyhow::Result<Vec<u8>> {
let mut data = vec![0u8; bytes_num as usize];
AsyncReadExt::read_exact(&mut self.stream, &mut data).await?;
Ok(data)
}
/// Receives data without removing it from the queue.
pub async fn peek_exact_from_peer(&mut self, bytes_num: u32) -> anyhow::Result<Vec<u8>> {
let mut data = vec![0u8; bytes_num as usize];
self.stream.peek(&mut data).await?;
Ok(data)
}
pub async fn write_to_peer(&mut self, bytes: &[u8]) -> anyhow::Result<()> {
self.stream.write_all(bytes).await?;
Ok(())
}
}
impl Drop for RtmpContext {
fn drop(&mut self) {
if self.is_publisher {
eventbus_map().remove(&self.stream_name);
log::warn!(
"[{}][RtmpContext] remove eventbus, stream_name={}",
self.peer_addr,
self.stream_name
);
}
}
}
#[derive(Debug, Clone)]
pub struct RtmpMessageHeader {
/// chunk stream id
pub csid: u8,
pub timestamp: u32,
pub message_length: u32,
pub message_type_id: u8,
pub message_type: ChunkMessageType,
/// message stream id
/// 0 => 信令,
/// 1 => play 信令| publish 信令 | 音视频数据
pub msid: u32,
}
impl RtmpMessageHeader {
pub fn to_bytes(&self) -> Vec<u8> {
let enable_extend_timestamp_field = self.timestamp >= 0xFFFFFF;
let mut rs = vec![self.csid];
if enable_extend_timestamp_field {
rs.write_u24::<BigEndian>(0xFFFFFF).unwrap();
} else {
rs.write_u24::<BigEndian>(self.timestamp).unwrap();
}
rs.write_u24::<BigEndian>(self.message_length).unwrap();
rs.write_u8(self.message_type_id).unwrap();
rs.write_u32::<BigEndian>(self.msid).unwrap();
if enable_extend_timestamp_field {
rs.write_u32::<BigEndian>(self.timestamp).unwrap();
}
rs
}
}
#[derive(Clone)]
pub struct RtmpMessage {
pub header: RtmpMessageHeader,
pub body: Vec<u8>,
pub chunk_count: u32,
}
impl RtmpMessage {
/// 读取完整消息
pub async fn read_from(ctx: &mut RtmpContext) -> anyhow::Result<Self> {
let mut chunk = RtmpMessage::read_chunk_from(ctx).await?;
while ctx.remain_message_length > 0 {
let mut remain_chunk = RtmpMessage::read_chunk_from(ctx).await?;
chunk.body.append(&mut remain_chunk.body);
chunk.chunk_count += 1;
}
Ok(chunk)
}
/// 读取一个消息分片
async fn read_chunk_from(ctx: &mut RtmpContext) -> anyhow::Result<Self> {
let one = ctx.read_exact_from_peer(1).await?[0];
let fmt = one >> 6;
let csid = one << 2 >> 2;
let (timestamp, message_length, message_type_id, message_stream_id) = match fmt {
0 => {
let h = ctx.read_exact_from_peer(11).await?;
// print_hex(&h);
// 时间差值置零
ctx.last_timestamp_delta = 0;
ctx.last_timestamp = BigEndian::read_u24(&h[0..3]);
ctx.last_message_length = BigEndian::read_u24(&h[3..6]);
ctx.remain_message_length = 0;
ctx.last_message_type_id = h[6];
ctx.last_message_stream_id = BigEndian::read_u32(&h[7..11]);
ctx.recv_bytes_num += 12;
if ctx.last_timestamp >= 0xFFFFFF {
let extend = ctx.read_exact_from_peer(4).await?;
ctx.last_timestamp = BigEndian::read_u32(&extend[0..4]);
ctx.recv_bytes_num += 4;
}
(
ctx.last_timestamp,
ctx.last_message_length,
ctx.last_message_type_id,
ctx.last_message_stream_id,
)
}
1 => {
let h = ctx.read_exact_from_peer(7).await?;
// bytes_hex_format(&h);
let timestamp_delta = BigEndian::read_u24(&h[0..3]);
ctx.last_message_length = BigEndian::read_u24(&h[3..6]);
ctx.remain_message_length = 0;
ctx.last_message_type_id = h[6];
ctx.last_timestamp += timestamp_delta;
ctx.recv_bytes_num += 8;
(
ctx.last_timestamp,
ctx.last_message_length,
ctx.last_message_type_id,
ctx.last_message_stream_id,
)
}
2 => {
let h = ctx.read_exact_from_peer(3).await?;
let timestamp_delta = BigEndian::read_u24(&h[0..3]);
ctx.last_timestamp_delta = timestamp_delta;
ctx.last_timestamp += timestamp_delta;
ctx.recv_bytes_num += 4;
(
ctx.last_timestamp,
ctx.last_message_length,
ctx.last_message_type_id,
ctx.last_message_stream_id,
)
}
3 => {
ctx.last_timestamp += ctx.last_timestamp_delta;
(
ctx.last_timestamp,
ctx.last_message_length,
ctx.last_message_type_id,
ctx.last_message_stream_id,
)
}
_ => unreachable!(),
};
// 当前分片的body长度
let read_num = {
let remain_length = if ctx.remain_message_length > 0 {
ctx.remain_message_length
} else {
message_length
};
if remain_length > ctx.chunk_size {
ctx.remain_message_length = remain_length - ctx.chunk_size;
ctx.chunk_size
} else {
ctx.remain_message_length = 0;
remain_length
}
};
let message_data = ctx.read_exact_from_peer(read_num).await?;
ctx.recv_bytes_num += read_num;
let message_type = FromPrimitive::from_u8(message_type_id).ok_or(anyhow::anyhow!(
format!("invalid message type: {}", message_type_id)
))?;
Ok(RtmpMessage {
header: RtmpMessageHeader {
csid,
msid: message_stream_id,
message_length,
timestamp,
message_type_id,
message_type,
},
body: message_data,
chunk_count: 1,
})
}
pub fn message_type_desc(&self) -> String {
match self.header.message_type_id {
1 => "ProtocolControlMessages::SetChunkSize",
2 => "ProtocolControlMessages::AbortMessage",
3 => "ProtocolControlMessages::Acknowledgement",
4 => "ProtocolControlMessages::UserControlMessage",
5 => "ProtocolControlMessages::WindowAcknowledgementSize",
6 => "ProtocolControlMessages::SetPeerBandwidth",
17 => "CommandMessages::AMF3CommandMessage",
20 => "CommandMessages::AMF0CommandMessage",
15 => "CommandMessages::AMF3DataMessage",
18 => "CommandMessages::AMF0DataMessage",
16 => "CommandMessages::AMF3SharedObjectMessage",
19 => "CommandMessages::AMF0SharedObjectMessage",
8 => "CommandMessages::AudioMessage",
9 => "CommandMessages::VideoMessage",
22 => "CommandMessages::AggregateMessage",
_ => "UnknownMessage",
}
.to_string()
}
/// 把body数据解析成amf0格式
pub fn try_read_body_to_amf0(&self) -> Option<Vec<Value>> {
match self.header.message_type_id {
18 | 19 | 20 => read_all_amf_value(&self.body),
_ => None,
}
}
/// 把一个长message分离成多个chunk,第一个chunk的type=0,后续的type=3
pub fn split_chunks_bytes(&self, chunk_size: u32) -> Vec<Vec<u8>> {
let chunk_size = chunk_size as usize;
let mut rs = vec![];
let mut remain = self.body.clone();
while remain.len() > chunk_size {
let right = remain.split_off(chunk_size);
rs.push(remain);
remain = right;
}
rs.push(remain);
// 添加type0头部
for item in self.header.to_bytes().iter().rev() {
(&mut rs[0]).insert(0, item.clone());
}
// 添加type3头部
if rs.len() > 1 {
let type3_fmt = 0xC0 | self.header.csid;
for item in &mut rs[1..] {
item.insert(0, type3_fmt);
}
}
rs
}
}
impl Debug for RtmpMessage {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"ChunkMessage {{\nheader: {:?}\nmessage type: {}\nchunk count={}\nbody:\n{}}}",
self.header,
self.message_type_desc(),
self.chunk_count,
bytes_hex_format(&self.body)
)
}
}
#[derive(Debug, PartialEq, FromPrimitive, Clone, Copy)]
pub enum ChunkMessageType {
SetChunkSize = 1,
AbortMessage = 2,
Acknowledgement = 3,
UserControlMessage = 4,
WindowAcknowledgementSize = 5,
SetPeerBandwidth = 6,
AMF3CommandMessage = 17,
AMF0CommandMessage = 20,
AMF3DataMessage = 15,
AMF0DataMessage = 18,
AMF3SharedObjectMessage = 16,
AMF0SharedObjectMessage = 19,
AudioMessage = 8,
VideoMessage = 9,
AggregateMessage = 22,
}
#[derive(Default, Clone)]
pub struct RtmpMetaData {
pub width: f64,
pub height: f64,
pub video_codec_id: String,
pub video_data_rate: f64,
pub audio_codec_id: String,
pub audio_data_rate: f64,
pub frame_rate: f64,
pub duration: f64,
pub begin_time: i64,
}
impl TryFrom<&amf::amf0::Value> for RtmpMetaData {
type Error = anyhow::Error;
fn try_from(value: &amf::amf0::Value) -> Result<Self, Self::Error> {
let mut meta_data = RtmpMetaData::default();
if let Value::EcmaArray { entries } = value {
for item in entries {
match item.key.as_ref() {
"duration" => {
meta_data.duration = item.value.try_as_f64().unwrap_or_default();
}
"width" => {
meta_data.width = item.value.try_as_f64().unwrap_or_default();
}
"height" => {
meta_data.height = item.value.try_as_f64().unwrap_or_default();
}
"videocodecid" => {
meta_data.video_codec_id =
item.value.try_as_str().unwrap_or_default().to_owned();
}
"videodatarate" => {
meta_data.video_data_rate =
item.value.try_as_f64().unwrap_or_default();
}
"framerate" => {
meta_data.frame_rate =
item.value.try_as_f64().unwrap_or_default();
}
"audiocodecid" => {
meta_data.audio_codec_id =
item.value.try_as_str().unwrap_or_default().to_owned();
}
"audiodatarate" => {
meta_data.audio_data_rate =
item.value.try_as_f64().unwrap_or_default();
}
_ => {}
}
}
meta_data.begin_time = Local::now().timestamp_millis();
Ok(meta_data)
} else {
Err(anyhow::anyhow!("value is not Value::EcmaArray"))?
}
}
}
/// 计算一个AMF值的字节长度
pub fn calc_amf_byte_len(v: &amf0::Value) -> usize {
match v {
Value::Number(_) => 9,
Value::Boolean(_) => 2,
Value::String(s) => (s.len() + 3),
Value::Object { entries, .. } => {
// marker and tail
let mut len = 4;
for en in entries {
len += en.key.len() + 2;
len += calc_amf_byte_len(&en.value);
}
len
}
Value::Null => 1,
Value::Undefined => 1,
Value::EcmaArray { entries } => {
// marker and tail
let mut len = 8;
for en in entries {
len += en.key.len() + 2;
len += calc_amf_byte_len(&en.value);
}
len
}
Value::Array { entries: _ } => unimplemented!(),
Value::Date { unix_time: _ } => unimplemented!(),
Value::XmlDocument(_) => unimplemented!(),
Value::AvmPlus(_) => unimplemented!(),
}
}
/// 从字节数组中读取全部的AMF值
pub fn read_all_amf_value(bytes: &[u8]) -> Option<Vec<Value>> {
let mut read_num = 0;
let mut list = Vec::new();
loop {
if let Ok(v) = amf::amf0::Value::read_from(&mut &bytes[read_num..]) {
let len = calc_amf_byte_len(&v);
read_num += len;
list.push(v);
if read_num >= bytes.len() {
break;
}
} else {
return None;
}
}
Some(list)
}