forked from mindsdb/mindsdb
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhttp_test_helpers.py
More file actions
130 lines (103 loc) · 4.3 KB
/
http_test_helpers.py
File metadata and controls
130 lines (103 loc) · 4.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import requests
import time
from mindsdb.api.executor.data_types.response_type import RESPONSE_TYPE
from tests.integration_tests.flows.conftest import HTTP_API_ROOT
class HTTPHelperMixin:
_sql_via_http_context = {}
@staticmethod
def api_request(method, url, payload=None, headers=None):
method = method.lower()
fnc = getattr(requests, method)
url = f'{HTTP_API_ROOT}/{url.lstrip("/")}'
response = fnc(url, json=payload, headers=headers)
return response
def sql_via_http(self, request: str, expected_resp_type: str = None, context: dict = None,
headers: dict = None, company_id: int = None) -> dict:
if context is None:
context = self._sql_via_http_context
if headers is None:
headers = {}
if company_id is not None:
headers['company-id'] = str(company_id)
payload = {
'query': request,
'context': context
}
response = self.api_request('post', '/sql/query', payload, headers)
assert response.status_code == 200, f"sql/query is not accessible - {response.text}"
response = response.json()
if expected_resp_type is not None:
assert response.get('type') == expected_resp_type, response
else:
assert response.get('type') in [RESPONSE_TYPE.OK, RESPONSE_TYPE.TABLE, RESPONSE_TYPE.ERROR], response
assert isinstance(response.get('context'), dict)
if response['type'] == 'table':
assert isinstance(response.get('data'), list)
assert isinstance(response.get('column_names'), list)
elif response['type'] == 'error':
assert isinstance(response.get('error_code'), int)
assert isinstance(response.get('error_message'), str)
self._sql_via_http_context = response['context']
return response
def await_model(self, model_name: str, project_name: str = 'mindsdb',
version_number: int = 1, timeout: int = 60):
start = time.time()
status = None
while (time.time() - start) < timeout:
response = self.sql_via_http(
f"""
SELECT status
FROM {project_name}.models
WHERE name='{model_name}' and version = {version_number}
""", RESPONSE_TYPE.TABLE
)
status = response['data'][0][0]
if status in ['complete', 'error']:
break
time.sleep(1)
return status
def await_model_by_query(self, query, timeout=60):
start = time.time()
status = None
while (time.time() - start) < timeout:
resp = self.sql_via_http(query, RESPONSE_TYPE.TABLE)
status_index = [x.lower() for x in resp['column_names']].index('status')
status = resp['data'][0][status_index]
if status in ['complete', 'error']:
break
time.sleep(1)
return status
def get_predictors_list(company_id=None):
headers = {}
if company_id is not None:
headers['company-id'] = f'{company_id}'
res = requests.get(f'{HTTP_API_ROOT}/predictors/', headers=headers)
assert res.status_code == 200
return res.json()
def get_predictors_names_list(company_id=None):
predictors = get_predictors_list(company_id=company_id)
return [x['name'] for x in predictors]
def check_predictor_exists(name):
assert name in get_predictors_names_list()
def check_predictor_not_exists(name):
assert name not in get_predictors_names_list()
def get_predictor_data(name):
predictors = get_predictors_list()
for p in predictors:
if p['name'] == name:
return p
return None
def wait_predictor_learn(predictor_name):
start_time = time.time()
learn_done = False
while learn_done is False and (time.time() - start_time) < 180:
learn_done = get_predictor_data(predictor_name)['status'] == 'complete'
time.sleep(1)
assert learn_done
def get_integrations_names(company_id=None):
headers = {}
if company_id is not None:
headers['company-id'] = f'{company_id}'
res = requests.get(f'{HTTP_API_ROOT}/config/integrations', headers=headers)
assert res.status_code == 200
return res.json()['integrations']