forked from rabbit2rabbit/bili2.0
-
Notifications
You must be signed in to change notification settings - Fork 0
/
web_session.py
116 lines (107 loc) · 4.7 KB
/
web_session.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import sys
import asyncio
import aiohttp
import printer
from exceptions import LogoutError, RspError
from json_rsp_ctrl import Ctrl, JsonRspType, DEFAULT_CTRL, TMP_DEFAULT_CTRL
sem = asyncio.Semaphore(3)
class WebSession:
def __init__(self):
self.var_session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=4))
@staticmethod
async def __get_json_body(rsp):
json_body = await rsp.json(content_type=None)
return json_body
@staticmethod
async def __get_text_body(rsp):
text = await rsp.text()
return text
@staticmethod
async def __get_binary_body(rsp):
return await rsp.read()
# method 类似于aiohttp里面的对应method,目前只支持GET、POST
# is_login后期移除,json这里应该与expected_code协同
async def request_json(self,
method,
url,
is_login=False,
ctrl: Ctrl = TMP_DEFAULT_CTRL,
**kwargs) -> dict:
async with sem:
i = 0
while True:
i += 1
if i >= 10:
printer.warn(url)
try:
async with self.var_session.request(method, url, **kwargs) as rsp:
if rsp.status == 200:
json_body = await self.__get_json_body(rsp)
if json_body: # 有时候是None或空,直接屏蔽。下面的read/text类似,禁止返回空的东西
json_rsp_type = ctrl.verify(json_body)
if json_rsp_type == JsonRspType.OK:
return json_body
elif json_rsp_type == JsonRspType.IGNORE:
await asyncio.sleep(1.0)
elif json_rsp_type == JsonRspType.LOGOUT:
print('api提示没有登录')
print(json_body)
if not is_login:
raise LogoutError(msg='提示没有登陆')
else:
return json_body
elif rsp.status == 403:
printer.warn(f'403频繁, {url}')
await asyncio.sleep(240)
except RspError:
raise
except:
# print('当前网络不好,正在重试,请反馈开发者!!!!')
print(sys.exc_info()[0], sys.exc_info()[1], url)
await asyncio.sleep(0.02)
async def request_binary(self,
method,
url,
**kwargs) -> bytes:
async with sem:
i = 0
while True:
i += 1
if i >= 10:
printer.warn(url)
try:
async with self.var_session.request(method, url, **kwargs) as rsp:
if rsp.status == 200:
binary_body = await self.__get_binary_body(rsp)
if binary_body:
return binary_body
elif rsp.status == 403:
printer.warn(f'403频繁, {url}')
await asyncio.sleep(240)
except:
# print('当前网络不好,正在重试,请反馈开发者!!!!')
print(sys.exc_info()[0], sys.exc_info()[1], url)
await asyncio.sleep(0.02)
async def request_text(self,
method,
url,
**kwargs) -> str:
async with sem:
i = 0
while True:
i += 1
if i >= 10:
printer.warn(url)
try:
async with self.var_session.request(method, url, **kwargs) as rsp:
if rsp.status == 200:
text_body = await self.__get_text_body(rsp)
if text_body:
return text_body
elif rsp.status == 403:
printer.warn(f'403频繁, {url}')
await asyncio.sleep(240)
except:
# print('当前网络不好,正在重试,请反馈开发者!!!!')
print(sys.exc_info()[0], sys.exc_info()[1], url)
await asyncio.sleep(0.02)