
| from aip import AipOcr import requests, time, re, random, threading
space = re.compile(r'\s+') htag = re.compile(r'<[^>]+>')
""" 你的 APPID AK SK """ APP_ID = '' API_KEY = '' SECRET_KEY = ''
client = AipOcr(APP_ID, API_KEY, SECRET_KEY) options = {} options["language_type"] = "CHN_ENG" options["detect_direction"] = "true" options["detect_language"] = "true"
def BaiduOCR(image, options=options): if isinstance(image, str): results = client.basicGeneralUrl(image, options) else: results = client.basicGeneral(image, options) if 'error_code' in results: print (f'BaiduOCR error {results["error_code"]} {results["error_msg"]}') return '' return ''.join(line['words'] for line in results['words_result'] if 'words' in line) def jsonfy(s:str)->object: if s[0] not in ('{','['): print(s + '\n' + 'jsonfy 登录失效,请重新获取Cookie!') try: obj = eval(s, type('js', (dict,), dict(__getitem__=lambda s, n: n))()) return obj except Exception: return {}
def getScLc(d): return (int(d['sc']), int(d['lc']))
def getXkList(r): a = r.text.find('[') b = r.text.find(r';/*sc 当前人数, lc 人数上限*/') c = r.text.find('{', b) print(f'getXkList {a} {b} {c}') if b==-1 or c==-1: print(r.text); time.sleep(0.5); return j = jsonfy(r.text[a:b]) j2 = jsonfy(r.text[c:]) return [(one['id'], one['no'], one['name'], getScLc(j2[str(one['id'])])) for one in j]
def getData(Form = None): if Form: r = requests.post('', data = Form, headers=headers) else: r = requests.get(r'' print(f'getData status_code {r.status_code}') if '请不要过快点击' in r.text: time.sleep(random.randint(3,9)/3) return r
def getTimestamp(size = 1000): t = time.time() return int(round(t * size))
def getCaptcha(): r = requests.get(f'', headers=headers) print(f'getCaptcha status_code {r.status_code}') img = r.content text = BaiduOCR(img) print(f'getCaptcha results {text}') return text.replace(' ','')
def Xk(CourseId, lock): print(f'Xk 怎么会死锁? {lock.locked()}') with lock: data = { "optype": "true", "operator0": f"{CourseId}:true:0", "captcha_response": getCaptcha() } r = requests.post(r'', data = data, headers=headers) print(f'Xk status_code {r.status_code}') return htag.sub(' ', space.sub('', r.text))
def Xk2S(CourseId, lock): i = 3 ord = 1 while i >= 0: st = Xk(CourseId, lock) if '选课成功' in st: print(f'Xk2S 第{ord}次选课成功!') return True else: print(f'Xk2S 第{ord}次选课:') print(st) assert '选课失败:公选人数已满' not in st, 'Xk2S 公选人数已满 无法继续' assert '选课失败:你已经选过' not in st, 'Xk2S 选课成功 无需继续' if '操作失败:验证码错误' not in st: i -= 1 ord += 1 else: return False
from headers import headers
def getCourse(Form): Courses = getXkList(getData(Form)) while not Courses: print(f'getCourse 想必是要死循环了 {Courses}') time.sleep(random.randint(5,16)/2) Courses = getXkList(getData(Form)) print (Courses) for Course in Courses: if Course[1] in Form['lessonNo']: print (Course) return Course assert False, 'getCourse 出现未知错误,请重试!'
def isSuccessful(Form): time.sleep(5) Courses = getXkList(getData()) print (Courses) for Course in Courses: if Course[1] == Form['lessonNo']: print (Course) return True return False
def start(lessonNo, lock, event): Form = { "lessonNo": lessonNo, "courseCode": "", "courseName": "" } time.sleep(random.randint(0,50)/10) print(f'start {lessonNo} {id(lock)} {id(event)}') while True: try : Course = getCourse(Form) if Course[3][0] < Course[3][1]: if Xk2S(Course[0], lock) and isSuccessful(Form): print(f'主循环 选课成功 {time.ctime(time.time())}') break else: print(f'主循环 选课失败 继续尝试 {time.ctime(time.time())}') else: print(f'主循环 人数已达上限 继续等待 {time.ctime(time.time())}') event.set() time.sleep(random.randint(20,50)/5) except requests.ConnectionError: print('网络超时, 延时等待1分钟') for i in range(6): event.set() time.sleep(10)
printf = print def wrapprint(*args, **kw): printf(threading.current_thread().name, ":",end='') printf(*args, **kw) print = wrapprint
PYTHON
|