forked from Guovin/iptv-api
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathspeed.py
More file actions
405 lines (372 loc) · 14 KB
/
speed.py
File metadata and controls
405 lines (372 loc) · 14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
import asyncio
import http.cookies
import json
import re
import subprocess
from time import time
from urllib.parse import quote, urljoin
import m3u8
from aiohttp import ClientSession, TCPConnector
from multidict import CIMultiDictProxy
import utils.constants as constants
from utils.config import config
from utils.tools import get_resolution_value
from utils.types import TestResult, ChannelTestResult, TestResultCacheData
http.cookies._is_legal_key = lambda _: True
cache: TestResultCacheData = {}
speed_test_timeout = config.speed_test_timeout
speed_test_filter_host = config.speed_test_filter_host
open_filter_resolution = config.open_filter_resolution
min_resolution_value = config.min_resolution_value
max_resolution_value = config.max_resolution_value
open_supply = config.open_supply
open_filter_speed = config.open_filter_speed
min_speed_value = config.min_speed
m3u8_headers = ['application/x-mpegurl', 'application/vnd.apple.mpegurl', 'audio/mpegurl', 'audio/x-mpegurl']
default_ipv6_delay = 0.1
default_ipv6_resolution = "1920x1080"
default_ipv6_result = {
'speed': float("inf"),
'delay': default_ipv6_delay,
'resolution': default_ipv6_resolution
}
async def get_speed_with_download(url: str, headers: dict = None, session: ClientSession = None,
timeout: int = speed_test_timeout) -> dict[
str, float | None]:
"""
Get the speed of the url with a total timeout
"""
start_time = time()
delay = -1
total_size = 0
if session is None:
session = ClientSession(connector=TCPConnector(ssl=False), trust_env=True)
created_session = True
else:
created_session = False
try:
async with session.get(url, headers=headers, timeout=timeout) as response:
if response.status != 200:
raise Exception("Invalid response")
delay = int(round((time() - start_time) * 1000))
async for chunk in response.content.iter_any():
if chunk:
total_size += len(chunk)
except:
pass
finally:
total_time = time() - start_time
if created_session:
await session.close()
return {
'speed': total_size / total_time / 1024 / 1024,
'delay': delay,
'size': total_size,
'time': total_time,
}
async def get_headers(url: str, headers: dict = None, session: ClientSession = None, timeout: int = 5) -> \
CIMultiDictProxy[str] | dict[
any, any]:
"""
Get the headers of the url
"""
if session is None:
session = ClientSession(connector=TCPConnector(ssl=False), trust_env=True)
created_session = True
else:
created_session = False
res_headers = {}
try:
async with session.head(url, headers=headers, timeout=timeout) as response:
res_headers = response.headers
except:
pass
finally:
if created_session:
await session.close()
return res_headers
async def get_url_content(url: str, headers: dict = None, session: ClientSession = None,
timeout: int = speed_test_timeout) -> str:
"""
Get the content of the url
"""
if session is None:
session = ClientSession(connector=TCPConnector(ssl=False), trust_env=True)
created_session = True
else:
created_session = False
content = ""
try:
async with session.get(url, headers=headers, timeout=timeout) as response:
if response.status == 200:
content = await response.text()
else:
raise Exception("Invalid response")
except:
pass
finally:
if created_session:
await session.close()
return content
def check_m3u8_valid(headers: CIMultiDictProxy[str] | dict[any, any]) -> bool:
"""
Check if the m3u8 url is valid
"""
content_type = headers.get('Content-Type', '').lower()
if not content_type:
return False
return any(item in content_type for item in m3u8_headers)
async def get_result(url: str, headers: dict = None, resolution: str = None,
filter_resolution: bool = config.open_filter_resolution,
timeout: int = speed_test_timeout) -> dict[str, float | None]:
"""
Get the test result of the url
"""
info = {'speed': 0, 'delay': -1, 'resolution': resolution}
location = None
try:
url = quote(url, safe=':/?$&=@[]%').partition('$')[0]
async with ClientSession(connector=TCPConnector(ssl=False), trust_env=True) as session:
res_headers = await get_headers(url, headers, session)
location = res_headers.get('Location')
if location:
info.update(await get_result(location, headers, resolution, filter_resolution, timeout))
else:
url_content = await get_url_content(url, headers, session, timeout)
if url_content:
m3u8_obj = m3u8.loads(url_content)
playlists = m3u8_obj.playlists
segments = m3u8_obj.segments
if playlists:
best_playlist = max(m3u8_obj.playlists, key=lambda p: p.stream_info.bandwidth)
playlist_url = urljoin(url, best_playlist.uri)
playlist_content = await get_url_content(playlist_url, headers, session, timeout)
if playlist_content:
media_playlist = m3u8.loads(playlist_content)
segment_urls = [urljoin(playlist_url, segment.uri) for segment in media_playlist.segments]
else:
segment_urls = [urljoin(url, segment.uri) for segment in segments]
if not segment_urls:
raise Exception("Segment urls not found")
else:
res_info = await get_speed_with_download(url, headers, session, timeout)
info.update({'speed': res_info['speed'], 'delay': res_info['delay']})
raise Exception("No url content, use download with timeout to test")
start_time = time()
tasks = [get_speed_with_download(ts_url, headers, session, timeout) for ts_url in segment_urls[:5]]
results = await asyncio.gather(*tasks, return_exceptions=True)
total_size = sum(result['size'] for result in results if isinstance(result, dict))
total_time = sum(result['time'] for result in results if isinstance(result, dict))
info['speed'] = total_size / total_time / 1024 / 1024 if total_time > 0 else 0
info['delay'] = int(round((time() - start_time) * 1000))
except:
pass
finally:
if not resolution and filter_resolution and not location and info['delay'] != -1:
info['resolution'] = await get_resolution_ffprobe(url, headers, timeout)
return info
async def get_delay_requests(url, timeout=speed_test_timeout, proxy=None):
"""
Get the delay of the url by requests
"""
async with ClientSession(
connector=TCPConnector(ssl=False), trust_env=True
) as session:
start = time()
end = None
try:
async with session.get(url, timeout=timeout, proxy=proxy) as response:
if response.status == 404:
return -1
content = await response.read()
if content:
end = time()
else:
return -1
except Exception as e:
return -1
return int(round((end - start) * 1000)) if end else -1
def check_ffmpeg_installed_status():
"""
Check ffmpeg is installed
"""
status = False
try:
result = subprocess.run(
["ffmpeg", "-version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
status = result.returncode == 0
except FileNotFoundError:
status = False
except Exception as e:
print(e)
finally:
return status
async def ffmpeg_url(url, timeout=speed_test_timeout):
"""
Get url info by ffmpeg
"""
args = ["ffmpeg", "-t", str(timeout), "-stats", "-i", url, "-f", "null", "-"]
proc = None
res = None
try:
proc = await asyncio.create_subprocess_exec(
*args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
out, err = await asyncio.wait_for(proc.communicate(), timeout=timeout + 2)
if out:
res = out.decode("utf-8")
if err:
res = err.decode("utf-8")
return None
except asyncio.TimeoutError:
if proc:
proc.kill()
return None
except Exception:
if proc:
proc.kill()
return None
finally:
if proc:
await proc.wait()
return res
async def get_resolution_ffprobe(url: str, headers: dict = None, timeout: int = speed_test_timeout) -> str | None:
"""
Get the resolution of the url by ffprobe
"""
resolution = None
proc = None
try:
probe_args = [
'ffprobe',
'-v', 'error',
'-headers', ''.join(f'{k}: {v}\r\n' for k, v in headers.items()) if headers else '',
'-select_streams', 'v:0',
'-show_entries', 'stream=width,height',
"-of", 'json',
url
]
proc = await asyncio.create_subprocess_exec(*probe_args, stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
out, _ = await asyncio.wait_for(proc.communicate(), timeout)
video_stream = json.loads(out.decode('utf-8'))["streams"][0]
resolution = f"{video_stream['width']}x{video_stream['height']}"
except:
if proc:
proc.kill()
finally:
if proc:
await proc.wait()
return resolution
def get_video_info(video_info):
"""
Get the video info
"""
frame_size = -1
resolution = None
if video_info is not None:
info_data = video_info.replace(" ", "")
matches = re.findall(r"frame=(\d+)", info_data)
if matches:
frame_size = int(matches[-1])
match = re.search(r"(\d{3,4}x\d{3,4})", video_info)
if match:
resolution = match.group(0)
return frame_size, resolution
async def check_stream_delay(url_info):
"""
Check the stream delay
"""
try:
url = url_info["url"]
video_info = await ffmpeg_url(url)
if video_info is None:
return -1
frame, resolution = get_video_info(video_info)
if frame is None or frame == -1:
return -1
url_info["resolution"] = resolution
return url_info, frame
except Exception as e:
print(e)
return -1
def get_avg_result(result) -> TestResult:
return {
'speed': sum(item['speed'] or 0 for item in result) / len(result),
'delay': max(
int(sum(item['delay'] or -1 for item in result) / len(result)), -1),
'resolution': max((item['resolution'] for item in result), key=get_resolution_value)
}
def get_speed_result(key: str) -> TestResult:
"""
Get the speed result of the url
"""
if key in cache:
return get_avg_result(cache[key])
else:
return {'speed': 0, 'delay': -1, 'resolution': 0}
async def get_speed(data, headers=None, ipv6_proxy=None, filter_resolution=open_filter_resolution,
timeout=speed_test_timeout, callback=None) -> TestResult:
"""
Get the speed (response time and resolution) of the url
"""
url = data['url']
resolution = data['resolution']
result: TestResult = {'speed': 0, 'delay': -1, 'resolution': resolution}
try:
cache_key = data['host'] if speed_test_filter_host else url
if cache_key and cache_key in cache:
result = get_avg_result(cache[cache_key])
else:
if data['ipv_type'] == "ipv6" and ipv6_proxy:
result.update(default_ipv6_result)
elif constants.rt_url_pattern.match(url) is not None:
start_time = time()
if not result['resolution'] and filter_resolution:
result['resolution'] = await get_resolution_ffprobe(url, headers, timeout)
result['delay'] = int(round((time() - start_time) * 1000))
if result['resolution'] is not None:
result['speed'] = float("inf")
else:
result.update(await get_result(url, headers, resolution, filter_resolution, timeout))
if cache_key:
cache.setdefault(cache_key, []).append(result)
finally:
if callback:
callback()
return result
def get_sort_result(
results,
supply=open_supply,
filter_speed=open_filter_speed,
min_speed=min_speed_value,
filter_resolution=open_filter_resolution,
min_resolution=min_resolution_value,
max_resolution=max_resolution_value,
ipv6_support=True
) -> list[ChannelTestResult]:
"""
get the sort result
"""
total_result = []
for result in results:
if not ipv6_support and result["ipv_type"] == "ipv6":
result.update(default_ipv6_result)
result_speed, result_delay, resolution = (
result.get("speed") or 0,
result.get("delay"),
result.get("resolution")
)
if result_delay == -1:
continue
if not supply:
if filter_speed and result_speed < min_speed:
continue
if filter_resolution and resolution:
resolution_value = get_resolution_value(resolution)
if resolution_value < min_resolution or resolution_value > max_resolution:
continue
total_result.append(result)
total_result.sort(key=lambda item: item.get("speed") or 0, reverse=True)
return total_result