1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
| from aip import AipOcr import requests, time, re, random, threading
space = re.compile(r'\s+') htag = re.compile(r'<[^>]+>')
""" 你的 APPID AK SK """ APP_ID = '' API_KEY = '' SECRET_KEY = ''
client = AipOcr(APP_ID, API_KEY, SECRET_KEY) options = {} options["language_type"] = "CHN_ENG" options["detect_direction"] = "true" options["detect_language"] = "true"
def BaiduOCR(image, options=options): if isinstance(image, str): results = client.basicGeneralUrl(image, options) else: results = client.basicGeneral(image, options) if 'error_code' in results: print (f'BaiduOCR error {results["error_code"]} {results["error_msg"]}') return '' return ''.join(line['words'] for line in results['words_result'] if 'words' in line) def jsonfy(s:str)->object: if s[0] not in ('{','['): print(s + '\n' + 'jsonfy 登录失效,请重新获取Cookie!') try: obj = eval(s, type('js', (dict,), dict(__getitem__=lambda s, n: n))()) return obj except Exception: return {}
def getScLc(d): return (int(d['sc']), int(d['lc']))
def getXkList(r): a = r.text.find('[') b = r.text.find(r';/*sc 当前人数, lc 人数上限*/') c = r.text.find('{', b) print(f'getXkList {a} {b} {c}') if b==-1 or c==-1: print(r.text); time.sleep(0.5); return j = jsonfy(r.text[a:b]) j2 = jsonfy(r.text[c:]) return [(one['id'], one['no'], one['name'], getScLc(j2[str(one['id'])])) for one in j]
def getData(Form = None): if Form: r = requests.post('', data = Form, headers=headers) else: r = requests.get(r'' print(f'getData status_code {r.status_code}') if '请不要过快点击' in r.text: time.sleep(random.randint(3,9)/3) return r
def getTimestamp(size = 1000): t = time.time() return int(round(t * size))
def getCaptcha(): r = requests.get(f'', headers=headers) print(f'getCaptcha status_code {r.status_code}') img = r.content text = BaiduOCR(img) print(f'getCaptcha results {text}') return text.replace(' ','')
def Xk(CourseId, lock): print(f'Xk 怎么会死锁? {lock.locked()}') with lock: data = { "optype": "true", "operator0": f"{CourseId}:true:0", "captcha_response": getCaptcha() } r = requests.post(r'', data = data, headers=headers) print(f'Xk status_code {r.status_code}') return htag.sub(' ', space.sub('', r.text))
def Xk2S(CourseId, lock): i = 3 ord = 1 while i >= 0: st = Xk(CourseId, lock) if '选课成功' in st: print(f'Xk2S 第{ord}次选课成功!') return True else: print(f'Xk2S 第{ord}次选课:') print(st) assert '选课失败:公选人数已满' not in st, 'Xk2S 公选人数已满 无法继续' assert '选课失败:你已经选过' not in st, 'Xk2S 选课成功 无需继续' if '操作失败:验证码错误' not in st: i -= 1 ord += 1 else: return False
from headers import headers
def getCourse(Form): Courses = getXkList(getData(Form)) while not Courses: print(f'getCourse 想必是要死循环了 {Courses}') time.sleep(random.randint(5,16)/2) Courses = getXkList(getData(Form)) print (Courses) for Course in Courses: if Course[1] in Form['lessonNo']: print (Course) return Course assert False, 'getCourse 出现未知错误,请重试!'
def isSuccessful(Form): time.sleep(5) Courses = getXkList(getData()) print (Courses) for Course in Courses: if Course[1] == Form['lessonNo']: print (Course) return True return False
def start(lessonNo, lock, event): Form = { "lessonNo": lessonNo, "courseCode": "", "courseName": "" } time.sleep(random.randint(0,50)/10) print(f'start {lessonNo} {id(lock)} {id(event)}') while True: try : Course = getCourse(Form) if Course[3][0] < Course[3][1]: if Xk2S(Course[0], lock) and isSuccessful(Form): print(f'主循环 选课成功 {time.ctime(time.time())}') break else: print(f'主循环 选课失败 继续尝试 {time.ctime(time.time())}') else: print(f'主循环 人数已达上限 继续等待 {time.ctime(time.time())}') event.set() time.sleep(random.randint(20,50)/5) except requests.ConnectionError: print('网络超时, 延时等待1分钟') for i in range(6): event.set() time.sleep(10)
printf = print def wrapprint(*args, **kw): printf(threading.current_thread().name, ":",end='') printf(*args, **kw) print = wrapprint
|