forked from dongweiming/wechat-admin
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathredis.py
More file actions
34 lines (29 loc) · 1.02 KB
/
redis.py
File metadata and controls
34 lines (29 loc) · 1.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from walrus import Database, Model, ListField, SetField, HashField
from config import REDIS_URL
db = Database.from_url(REDIS_URL)
LISTENER_TASK_KEY = 'listener:task_id'
class RBase(Model):
__database__ = db
def to_dict(self):
data = {}
for name, field in self._fields.items():
if name in self._data:
val = self._data[name]
data[name] = val if field._as_json else field.db_value(val)
else:
if isinstance(field, ListField):
type_func = list
elif isinstance(field, SetField):
type_func = set
elif isinstance(field, HashField):
type_func = dict
else:
type_func = lambda x: x
data[name] = type_func(getattr(self, name))
return data
@classmethod
def get(cls, id):
try:
return super().get(cls.id == id)
except ValueError:
return cls.create(id=id)