多线程模块threading的简单使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
from aip import AipOcr
import requests, time, re, random, threading

space = re.compile(r'\s+')
htag = re.compile(r'<[^>]+>')

""" 你的 APPID AK SK """
APP_ID = ''
API_KEY = ''
SECRET_KEY = ''

client = AipOcr(APP_ID, API_KEY, SECRET_KEY)
options = {}
options["language_type"] = "CHN_ENG"
options["detect_direction"] = "true"
options["detect_language"] = "true"

def BaiduOCR(image, options=options):
if isinstance(image, str):
results = client.basicGeneralUrl(image, options)
else:
results = client.basicGeneral(image, options)
if 'error_code' in results:
print (f'BaiduOCR error {results["error_code"]} {results["error_msg"]}')
return ''
return ''.join(line['words'] for line in results['words_result'] if 'words' in line)

def jsonfy(s:str)->object:
#此函数将不带双引号的json的key标准化
if s[0] not in ('{','['): print(s + '\n' + 'jsonfy 登录失效,请重新获取Cookie!')
try:
obj = eval(s, type('js', (dict,), dict(__getitem__=lambda s, n: n))())
return obj
except Exception:
return {}


def getScLc(d):
return (int(d['sc']), int(d['lc']))

def getXkList(r):
a = r.text.find('[')
b = r.text.find(r';/*sc 当前人数, lc 人数上限*/')
c = r.text.find('{', b)
print(f'getXkList {a} {b} {c}')
if b==-1 or c==-1: print(r.text); time.sleep(0.5); return
j = jsonfy(r.text[a:b])
j2 = jsonfy(r.text[c:])
return [(one['id'], one['no'], one['name'], getScLc(j2[str(one['id'])])) for one in j]

def getData(Form = None):
if Form:
r = requests.post('', data = Form, headers=headers)
else:
r = requests.get(r''
print(f'getData status_code {r.status_code}')
if '请不要过快点击' in r.text: time.sleep(random.randint(3,9)/3)
return r

def getTimestamp(size = 1000):
t = time.time()
return int(round(t * size))

def getCaptcha():
r = requests.get(f'', headers=headers)
print(f'getCaptcha status_code {r.status_code}')
img = r.content
text = BaiduOCR(img)
print(f'getCaptcha results {text}')
return text.replace(' ','')

def Xk(CourseId, lock):
print(f'Xk 怎么会死锁? {lock.locked()}')
with lock:
data = {
"optype": "true",
"operator0": f"{CourseId}:true:0",
"captcha_response": getCaptcha()
}
r = requests.post(r'', data = data, headers=headers)
print(f'Xk status_code {r.status_code}')
return htag.sub(' ', space.sub('', r.text))

def Xk2S(CourseId, lock):
i = 3
ord = 1
while i >= 0:
st = Xk(CourseId, lock)
if '选课成功' in st:
print(f'Xk2S 第{ord}次选课成功!')
return True
else:
print(f'Xk2S 第{ord}次选课:')
print(st)
assert '选课失败:公选人数已满' not in st, 'Xk2S 公选人数已满 无法继续'
assert '选课失败:你已经选过' not in st, 'Xk2S 选课成功 无需继续'
if '操作失败:验证码错误' not in st: i -= 1
ord += 1
else:
return False
#PHPM110047.01

from headers import headers

def getCourse(Form):
Courses = getXkList(getData(Form))
while not Courses:
print(f'getCourse 想必是要死循环了 {Courses}')
time.sleep(random.randint(5,16)/2)
Courses = getXkList(getData(Form))

print (Courses)
for Course in Courses:
if Course[1] in Form['lessonNo']:
print (Course)
return Course
assert False, 'getCourse 出现未知错误,请重试!'

def isSuccessful(Form):
time.sleep(5)
Courses = getXkList(getData())
print (Courses)
for Course in Courses:
if Course[1] == Form['lessonNo']:
print (Course)
return True
return False

#Form['lessonNo'] = 'PEDU110091.19'

def start(lessonNo, lock, event):
Form = {
"lessonNo": lessonNo,
"courseCode": "",
"courseName": ""
}
time.sleep(random.randint(0,50)/10)
print(f'start {lessonNo} {id(lock)} {id(event)}')
while True:
try :
Course = getCourse(Form)
if Course[3][0] < Course[3][1]:
if Xk2S(Course[0], lock) and isSuccessful(Form):
print(f'主循环 选课成功 {time.ctime(time.time())}')
break
else:
print(f'主循环 选课失败 继续尝试 {time.ctime(time.time())}')
else:
print(f'主循环 人数已达上限 继续等待 {time.ctime(time.time())}')
event.set()
time.sleep(random.randint(20,50)/5)
except requests.ConnectionError:
print('网络超时, 延时等待1分钟')
for i in range(6):
event.set()
time.sleep(10)

printf = print
def wrapprint(*args, **kw):
printf(threading.current_thread().name, ":",end='')
printf(*args, **kw)
print = wrapprint





1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import threading, time, ctypes, inspect

def _async_raise(tid, exctype):
"""raises the exception, performs cleanup if needed"""
tid = ctypes.c_long(tid)
if not inspect.isclass(exctype):
exctype = type(exctype)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
if res == 0:
#raise ValueError("invalid thread id")
return "ValueError: invalid thread id"
elif res != 1:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
#raise SystemError("PyThreadState_SetAsyncExc failed")
return "SystemError: PyThreadState_SetAsyncExc failed"
return "success!"
def startThread(tasks, timeout=35):
lock = threading.Lock()
tasks = [(target, args, threading.Event()) for target,args in tasks]
#for task in tasks: task['args'] = task.get('args',()) + (lock, watchdog)
threads = [[threading.Thread(target=target, args=args+(lock, event)), event] for target,args,event in tasks]
#if threads: timeout = timeout // len(threads) + 1
for t,event in threads: t.setDaemon(True)
for t,event in threads: t.start()
print('startThread 守护线程开始执行!')
counter = 1
while any(t.is_alive() for t,event in threads):
print(f'startThread 守护线程第{counter}轮 开始 {time.ctime(time.time())}')
time.sleep(timeout)
for t,event in threads:
if not event.isSet() and t.is_alive(): break
time.sleep(0.1)
else: print(f'startThread 守护线程第{counter}轮 正常 {time.ctime(time.time())}'); counter += 1;continue
print(f'startThread 守护线程第{counter}轮 发生异常,准备重启... {time.ctime(time.time())}')
lock = threading.Lock()
restart = []
for i,one in enumerate(threads):
t, event = one
if not t.is_alive(): print(f'startThread 线程自然退出,忽略 {t.name}'); continue
if event.isSet(): event.clear()
else:
print(f'startThread 正在结束线程 {t.name} {_async_raise(t.ident, SystemExit)}')
target,args,event = tasks[i]
t = threads[i][0] = threading.Thread(target=target, args=args+(lock, event))
t.setDaemon(True)
restart.append(t)
for t in restart: t.start(); print(f'startThread 启动替代线程 {t.name}')
del restart
print('startThread 守护线程准备退出中...')
for t,event in threads:
if t.is_alive(): print(f'startThread 正在结束线程 {t.name} {_async_raise(t.ident, SystemExit)}')
print('startThread 守护线程已退出!')

from FDUxk1 import start

cl = []
#cl.append('PEDU110091.19') #乒乓球
cl.append('PHPM110047.01') #医院经营管理
#cl.append('TOUR110001.01') #旅游与经济管理
cl.append('PTSS110016.01') #当代世界经济与政治
cl.append('PHIL119056.01') #哲学视野中的人工智能


多线程模块threading的简单使用
https://b.limour.top/154.html
Author
Limour
Posted on
June 19, 2020
Licensed under