forked from Jrohy/multi-v2ray
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
179 lines (149 loc) · 4.47 KB
/
utils.py
File metadata and controls
179 lines (149 loc) · 4.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import urllib.request
import os
import re
from enum import Enum, unique
from OpenSSL import crypto
@unique
class Color(Enum):
"""
终端显示颜色 枚举类
"""
# 显示格式: \033[显示方式;前景色;背景色m
# 只写一个字段表示前景色,背景色默认
RED = '\033[31m' # 红色
GREEN = '\033[32m' # 绿色
YELLOW = '\033[33m' # 黄色
BLUE = '\033[34m' # 蓝色
FUCHSIA = '\033[35m' # 紫红色
CYAN = '\033[36m' # 青蓝色
WHITE = '\033[37m' # 白色
#: no color
RESET = '\033[0m' # 终端默认颜色
@unique
class StreamType(Enum):
TCP = 'tcp'
TCP_HOST = 'tcp_host'
SOCKS = 'socks'
SS = 'ss'
MTPROTO = 'mtproto'
H2 = 'h2'
WS = 'ws'
KCP = 'kcp'
KCP_UTP = 'utp'
KCP_SRTP = 'srtp'
KCP_DTLS = 'dtls'
KCP_WECHAT = 'wechat'
KCP_WG = 'wireguard'
def stream_list():
return [
StreamType.KCP_WG,
StreamType.KCP_DTLS,
StreamType.KCP_WECHAT,
StreamType.KCP_UTP,
StreamType.KCP_SRTP,
StreamType.MTPROTO,
StreamType.SOCKS,
StreamType.SS
]
def ss_method():
return ("aes-256-cfb", "aes-128-cfb", "chacha20",
"chacha20-ietf", "aes-256-gcm", "aes-128-gcm", "chacha20-poly1305")
def color_str(color: Color, str: str) -> str:
"""
返回有色字符串
"""
return '{}{}{}'.format(
color.value,
str,
Color.RESET.value
)
def is_number(s):
"""
判断是否为数字的函数
"""
try:
float(s)
return True
except ValueError:
pass
try:
import unicodedata
unicodedata.numeric(s)
return True
except (TypeError, ValueError):
pass
return False
def get_ip():
"""
获取本地ip
"""
my_ip = urllib.request.urlopen('http://api.ipify.org').read()
return bytes.decode(my_ip)
def port_is_use(port):
"""
判断端口是否占用
"""
cmd = "lsof -i:" + str(port)
result = os.popen(cmd).readlines()
return result != []
def is_email(email):
"""
判断是否是邮箱格式
"""
str=r'^[a-zA-Z0-9_-]+(\.[a-zA-Z0-9_-]+){0,4}@[a-zA-Z0-9_-]+(\.[a-zA-Z0-9_-]+){0,4}$'
return re.match(str, email)
def get_domain_by_crt_file(crt_path):
"""
通过证书文件获取域名, 证书文件有误或不存在则返回空
"""
try:
cert = crypto.load_certificate(crypto.FILETYPE_PEM, open(crt_path).read())
except:
return
return cert.get_subject().CN
def bytes_2_human_readable(number_of_bytes, precision=1):
"""
流量bytes转换为kb, mb, gb等单位
"""
if number_of_bytes < 0:
raise ValueError("!!! number_of_bytes can't be smaller than 0 !!!")
step_to_greater_unit = 1024.
number_of_bytes = float(number_of_bytes)
unit = 'bytes'
if (number_of_bytes / step_to_greater_unit) >= 1:
number_of_bytes /= step_to_greater_unit
unit = 'KB'
if (number_of_bytes / step_to_greater_unit) >= 1:
number_of_bytes /= step_to_greater_unit
unit = 'MB'
if (number_of_bytes / step_to_greater_unit) >= 1:
number_of_bytes /= step_to_greater_unit
unit = 'GB'
if (number_of_bytes / step_to_greater_unit) >= 1:
number_of_bytes /= step_to_greater_unit
unit = 'TB'
number_of_bytes = round(number_of_bytes, precision)
return str(number_of_bytes) + ' ' + unit
def gen_cert(domain):
service_name = ["v2ray", "nginx", "httpd", "apache2"]
start_cmd = "service {} start >/dev/null 2>&1"
stop_cmd = "service {} stop >/dev/null 2>&1"
if not os.path.exists("/root/.acme.sh/acme.sh"):
os.system("curl https://get.acme.sh | sh")
get_ssl_cmd = "bash /root/.acme.sh/acme.sh --issue -d " + domain + " --standalone --keylength ec-256"
for name in service_name:
os.system(stop_cmd.format(name))
os.system(get_ssl_cmd)
for name in service_name:
os.system(start_cmd.format(name))
def open_port():
from loader import Loader
group_list = Loader().profile.group_list
port_set = set([group.port for group in group_list])
for port in port_set:
cmd1="iptables -I INPUT -m state --state NEW -m tcp -p tcp --dport " + str(port) +" -j ACCEPT"
cmd2="iptables -I INPUT -m state --state NEW -m udp -p udp --dport " + str(port) +" -j ACCEPT"
os.system(cmd1)
os.system(cmd2)