In [4]:
__file__ = "XUEQIU"
In [5]:
# -*- coding: utf-8 -*-
from util import setApp, request_mapping, get_mapping, put_mapping, delete_mapping, post_mapping
import logging
from flask import Flask, render_template, request
__author__ = 'LDS'
logging.basicConfig(level=logging.INFO,
format='%(asctime)s [%(levelname)s] [%(module)s-%(threadName)s] %(lineno)d - %(message)s',
datefmt="%Y-%m-%d %H-%M-%S",
handlers=[logging.FileHandler(
'%s.log' % __file__.split('.')[0], mode="a", encoding="utf-8")])
log = logging.getLogger(__name__)
app = Flask(__name__)
View = render_template
# 转换为 Java 中SpringMVC 的方法名
exec(setApp(app))
@request_mapping("/", methods=['GET', 'POST'])
def home():
return View('home.html')
@get_mapping('/signin')
def signin_from():
return View('form.html')
@post_mapping('/signin')
def signin():
username = request.form['username']
password = request.form['password']
if username == 'admin' and password == 'password':
return View('signin-ok.html', username=username)
return View('form.html', message='Bad username or password', username=username)
if __name__ == '__main__':
log.info("服务已开启...")
app.run(debug=True)
* Serving Flask app 'XUEQIU' (lazy loading) * Environment: production WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead. * Debug mode: on
2022-10-10 09:20:16 [INFO] [_internal-MainThread] 225 - * Restarting with stat
An exception has occurred, use %tb to see the full traceback. SystemExit: 1
d:\Users\Administrator\anaconda3\envs\python3.6\lib\site-packages\IPython\core\interactiveshell.py:3327: UserWarning: To exit: use 'exit', 'quit', or Ctrl-D. warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
In [6]:
UsageError: Line magic function `%t` not found.
In [1]:
import requests
trigger = '416693'
headers = {
'Host': 'ucp.emnj',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:103.0) Gecko/20100101 Firefox/103.0',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
'Accept-Encoding': 'gzip, deflate',
'X-Token': '8b9d9a13-7ff8-45e7-a502-24ced6ff8e92',
'Connection': 'keep-alive',
'Referer': 'http://ucp.emnj/',
'Cookie': 'sid=8b9d9a13-7ff8-45e7-a502-24ced6ff8e92; token=8b9d9a13-7ff8-45e7-a502-24ced6ff8e92',
'Pragma': 'no-cache',
'Cache-Control': 'no-cache',
}
fail_items = []
for i in range(1, 16):
fail_items.extend(
requests.get(f' http://ucp.emnj/task-center-mgr/api/spider/task/list?trigger={trigger}&page={i}&pageSize=10', headers = headers)\
.json()['data']
)
import json
fail_items = list(filter(lambda it: it['state'] == 'F', fail_items))
run_parms = list(map(lambda it: json.loads(it['runParam']), fail_items))
print(len(run_parms))
json.dump(run_parms, open(r'C:\Users\Administrator\Desktop\ctrip_fail_runparams.json', 'w', encoding='utf-8'),ensure_ascii=False)
85
In [4]:
In [ ]:
import random
with open(r"D:\active_user2_20220817.txt", "r", encoding='utf-8') as fr:
with open(r'd:/uids.txt', 'a', encoding= 'utf-8') as fw:
fw.write('[')
for _ in range(1000_0000):
lines = [fr.readline() for _ in range(10)]
print(lines)
uid = random.choice(lines).split('|')[1].strip()
fw.write(f'"{uid}",')
fw.write('""]')
In [ ]:
# @author: AIslandX
# @date: 2022-01-01
import hashlib
import json
import logging
import random
import time
import requests
from fake_useragent import UserAgent
# 参考文章:
# - 机场列表 - 维基百科
# https://zh.wikipedia.org/wiki/%E4%B8%AD%E5%8D%8E%E4%BA%BA%E6%B0%91%E5%85%B1%E5%92%8C%E5%9B%BD%E6%9C%BA%E5%9C%BA%E5%88%97%E8%A1%A8
# - 携程国际机票sign破解 https://blog.csdn.net/weixin_38927522/article/details/108214323
# - 至于前端反反爬虫,看完这篇你就可以毕业了 https://zhuanlan.zhihu.com/p/250176143
ua = UserAgent()
def get_cookie_bfa():
random_str = "abcdefghijklmnopqrstuvwxyz1234567890"
random_id = ""
for _ in range(6):
random_id += random.choice(random_str)
t = str(int(round(time.time() * 1000)))
bfa_list = ["1", t, random_id, "1", t, t, "1", "1"]
bfa = "_bfa={}".format(".".join(bfa_list))
# e.g. _bfa=1.1639722810158.u3jal2.1.1639722810158.1639722810158.1.1
return bfa
# 获取调用携程 API 查询航班接口 Header 中所需的参数 sign
def get_sign(transaction_id, departure_city_code, arrival_city_code, departure_date):
sign_value = transaction_id + departure_city_code + arrival_city_code + departure_date
_sign = hashlib.md5()
_sign.update(sign_value.encode('utf-8'))
return _sign.hexdigest()
# 获取 transactionID 及航线数据
def get_transaction_id(departure_city_code, arrival_city_code, departure_date, cabin):
flight_list_url = "https://flights.ctrip.com/international/search/api/flightlist" \
"/oneway-{}-{}?_=1&depdate={}&cabin={}&containstax=1" \
.format(departure_city_code, arrival_city_code, departure_date, cabin)
flight_list_req = requests.get(url=flight_list_url)
if flight_list_req.status_code != 200:
logging.error("get transaction id failed, status code {}".format(flight_list_req.status_code))
return "", None
try:
flight_list_data = flight_list_req.json()["data"]
transaction_id = flight_list_data["transactionID"]
except Exception as e:
logging.error("get transaction id failed, {}".format(e))
return "", None
return transaction_id, flight_list_data
# 获取航线具体信息与航班数据
def get_flight_info(departure_city_code, arrival_city_code, departure_date, cabin):
# 获取 transactionID 及航线数据
transaction_id, flight_list_data = get_transaction_id(departure_city_code, arrival_city_code, departure_date, cabin)
print(transaction_id, flight_list_data)
if transaction_id == "" or flight_list_data is None:
return False, None
# 获取调用携程 API 查询航班接口 Header 中所需的参数 sign
sign = get_sign(transaction_id, departure_city_code, arrival_city_code, departure_date)
# cookie 中的 bfa
bfa = get_cookie_bfa()
# 构造请求,查询数据
search_url = "https://flights.ctrip.com/international/search/api/search/batchSearch"
search_headers = {
"transactionid": transaction_id,
"sign": sign,
"scope": flight_list_data["scope"],
"origin": "https://flights.ctrip.com",
"referer": "https://flights.ctrip.com/online/list/oneway-{}-{}"
"?_=1&depdate={}&cabin={}&containstax=1".format(departure_city_code, arrival_city_code,
departure_date, cabin),
"content-type": "application/json;charset=UTF-8",
"user-agent": ua.chrome,
"x-forwarded-for": "196.32.65.5",
"X-Forwarded-For": "196.32.65.5",
"WL-Proxy-Client-IP": "196.32.65.5",
"Proxy-Client-IP": "196.32.65.5",
"cookie": bfa,
}
r = requests.post(url=search_url, headers=search_headers, data=json.dumps(flight_list_data))
if r.status_code != 200:
logging.error("get flight info failed, status code {}".format(r.status_code))
return False, None
try:
result_json = r.json()
if result_json["data"]["context"]["flag"] != 0:
logging.error("get flight info failed, {}".format(result_json))
return False, None
except Exception as e:
logging.error("get flight info failed, {}".format(e))
return False, None
if "flightItineraryList" not in result_json["data"]:
result_data = []
else:
result_data = result_json["data"]["flightItineraryList"]
return True, result_data
for _ in range(30):
# 日志通用配置
LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s"
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
# 离开城市代码
departureCityCode = "TNA"
# 到达城市代码
arrivalCityCode = "CGQ"
# 离开时间
departureDate = time.strftime('%Y-%m-%d')
# 飞机舱位 Y - 经济舱
# 参考:https://baike.baidu.com/item/%E9%A3%9E%E6%9C%BA%E8%88%B1%E4%BD%8D/4764328
cabin = "Y"
# departureCityCode, arrivalCityCode, departureDate = "GOQ", "CGQ", "2022-01-29"
ok, example_result = get_flight_info(departureCityCode, arrivalCityCode, departureDate, cabin)
if ok:
print(json.dumps(example_result, ensure_ascii=False))
print("success", end="\r")
else:
print("获取失败")
break
1 文字定位¶
1.1 大图片¶
In [17]:
import cv2
import matplotlib.pyplot as plt
import util
s = cv2.imread(r's')
plt.imshow(cv2.cvtColor(s, cv2.COLOR_BGR2RGB))
plt.show()
b = cv2.imread(r'b')
plt.imshow(cv2.cvtColor(b, cv2.COLOR_BGR2RGB))
plt.show()
In [18]:
util.imshow(b)
In [6]:
import numpy as np
b_gray = cv2.cvtColor(b, cv2.COLOR_RGB2GRAY)
b_binary = np.where(b_gray < 20, 255, 0).astype(np.uint8)
# b_binary = cv2.adaptiveThreshold(
# b_binary,
# 255,
# cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
# cv2.THRESH_BINARY,
# 3,
# 2,
# )
plt.imshow(b_binary)
util.imwrite(b_binary)
In [7]:
import util
k = [[0,1,0],
[1,1,1],
[0,1,0]]
img = util.erode(b_binary, k)
plt.imshow(util.dilate(img, k))
plt.show()
In [8]:
# 弄脏
import util
k = [[1,1,1],[1,1,1],[1,1,1]]
b_erode = util.erode(b_binary, k)
b_dilate = util.dilate(b_erode, k)
b_dilate = util.dilate(b_dilate, k)
b_dilate = util.dilate(b_dilate, k)
b_dilate = util.dilate(b_dilate, k)
b_dilate = util.dilate(b_dilate, k)
b_dilate = util.dilate(b_dilate, k)
plt.imshow(b_dilate)
plt.show()
In [9]:
rects = util.filter_list(lambda rt: util.calc_h(rt) > 20 and util.calc_w(rt) > 20, util.getMaxRects(b_dilate))
_ = util.drawRects(b, rects=rects, line_color=[0,255,0], show_result_img=True)
1.2 小图片¶
In [10]:
import importlib
importlib.reload(util)
Out[10]:
<module 'util' from 'c:\\Users\\Administrator\\Desktop\\lb-gdp-lds-learning-record\\lds\\python\\util.py'>
In [11]:
import numpy as np
import util
s_gary = cv2.cvtColor(s, cv2.COLOR_RGB2GRAY)
s_binary = np.where(s_gary < 20, 255, 0)
s_dilate = util.dilate(s_binary, util.CV2_K_CROSS((3,3)))
plt.imshow(s_dilate)
util.imwrite(s_dilate)
# edge_output = cv2.Canny(grayImg, 230, 255)
# #提取上一步中处理好的图像边缘,50和150分别代表低阈值和高阈值,高阈值用来将物体与背景区分开来,低的用于平滑连接高阈值产生的片段,使图像成一个整体
# plt.imshow(edge_output)#输出灰度图像
# plt.show()
2.1 计算点击的位置(相对于验证码图片中心的位置)¶
In [12]:
plt.imshow(s_binary)
Out[12]:
<matplotlib.image.AxesImage at 0x200f3b0b248>
In [21]:
s_chars_imgs = []
n = 0
for i in util.getSubImg(s, util.getMaxRects(s_dilate)):
s_chars_imgs.append(i)
n += 1
cv2.imwrite(f'{n}.png', i)
plt.imshow(i)
plt.show()
util.imshow(i)
In [22]:
b_chars_imgs = []
for i in util.getSubImg(b, util.getMaxRects(b_dilate)):
b_chars_imgs.append(i)
n += 1
cv2.imwrite(f'{n}.png', i)
plt.imshow(i)
plt.show()
In [24]:
len(s_chars_imgs), len(b_chars_imgs)
Out[24]:
(3, 7)
In [29]:
result
Out[29]:
[('钟', 0.99425983)]
In [2]:
import paddleocr.paddleocr as ppocr
from paddleocr import PaddleOCR
# args = ppocr.parse_args()
image_path = '1.png'
engine = PaddleOCR(use_angle_cls=True)
for i in range(1, 11):
image_path = f'{i}.png'
result = engine.ocr(image_path,
det=False,
rec=True,
cls=True)
if result is not None:
for line in result:
char, prob = line
print(char, prob)
Namespace(benchmark=False, cls_batch_num=6, cls_image_shape='3, 48, 192', cls_model_dir='C:\\Users\\Administrator/.paddleocr/2.4\\ocr\\cls\\ch_ppocr_mobile_v2.0_cls_infer', cls_thresh=0.9, cpu_threads=10, crop_res_save_dir='./output', det=True, det_algorithm='DB', det_db_box_thresh=0.6, det_db_score_mode='fast', det_db_thresh=0.3, det_db_unclip_ratio=1.5, det_east_cover_thresh=0.1, det_east_nms_thresh=0.2, det_east_score_thresh=0.8, det_limit_side_len=960, det_limit_type='max', det_model_dir='C:\\Users\\Administrator/.paddleocr/2.4\\ocr\\det\\ch\\ch_PP-OCRv2_det_infer', det_pse_box_thresh=0.85, det_pse_box_type='box', det_pse_min_area=16, det_pse_scale=1, det_pse_thresh=0, det_sast_nms_thresh=0.2, det_sast_polygon=False, det_sast_score_thresh=0.5, draw_img_save_dir='./inference_results', drop_score=0.5, e2e_algorithm='PGNet', e2e_char_dict_path='./ppocr/utils/ic15_dict.txt', e2e_limit_side_len=768, e2e_limit_type='max', e2e_model_dir=None, e2e_pgnet_mode='fast', e2e_pgnet_score_thresh=0.5, e2e_pgnet_valid_set='totaltext', enable_mkldnn=False, gpu_mem=500, help='==SUPPRESS==', image_dir=None, ir_optim=True, label_list=['0', '180'], label_map_path='./vqa/labels/labels_ser.txt', lang='ch', layout_path_model='lp://PubLayNet/ppyolov2_r50vd_dcn_365e_publaynet/config', max_batch_size=10, max_seq_length=512, max_text_length=25, min_subgraph_size=15, mode='structure', model_name_or_path=None, ocr_version='PP-OCRv2', output='./output', precision='fp32', process_id=0, rec=True, rec_algorithm='CRNN', rec_batch_num=6, rec_char_dict_path='d:\\Users\\Administrator\\anaconda3\\envs\\paddle\\lib\\site-packages\\paddleocr\\ppocr\\utils\\ppocr_keys_v1.txt', rec_image_shape='3, 32, 320', rec_model_dir='C:\\Users\\Administrator/.paddleocr/2.4\\ocr\\rec\\ch\\ch_PP-OCRv2_rec_infer', save_crop_res=False, save_log_path='./log_output/', show_log=True, structure_version='STRUCTURE', table_char_dict_path=None, table_char_type='en', table_max_len=488, table_model_dir=None, total_process_num=1, type='ocr', use_angle_cls=True, use_dilation=False, use_gpu=False, use_mp=False, use_onnx=False, use_pdserving=False, use_space_char=True, use_tensorrt=False, vis_font_path='./doc/fonts/simfang.ttf', warmup=False) 大 0.8738931 本 0.99865985 钟 0.99425983 数 0.9624093 细 0.994358 本 0.9114106 钟 0.9931177 松 0.9952101 大 0.9434993 1 0.11578398
3.1 确定点击顺序¶
In [15]:
scs = s_chars_imgs
bcs = b_chars_imgs
In [16]:
for ci in scs:
util.imshow(ci)
In [37]:
for ci in bcs:
util.imshow(ci)
In [38]:
bcs_, scs_ = [], []
b_mask = util.getSubImg(b_binary, util.simpleLocate(b_dilate))
s_mask = util.getSubImg(s_binary, util.simpleLocate(s_dilate))
for i in range(len(b_mask)):
bcs_.append(np.bitwise_and(np.bitwise_not(bcs[i]), np.expand_dims(b_mask[i], 2)))
for i in range(len(s_mask)):
scs_.append(np.bitwise_and(np.bitwise_not(scs[i]), np.expand_dims(s_mask[i], 2)))
In [39]:
def getDistance(img1, img2):
"""_summary_
Args:
img1 (_type_): _description_
img2 (_type_): _description_
"""
# 初始化SIFT描述符
sift = cv2.xfeatures2d.SIFT_create()
kp1, des1 = sift.detectAndCompute(img1, None)
kp2, des2 = sift.detectAndCompute(img2, None)
# 默认参数初始化BF匹配器
bf = cv2.BFMatcher()
matches = bf.knnMatch(des1, des2, k=2)
# 从k个匹配结果种筛选出好的匹配结果
dis = 0
good = []
for m, n in matches:
dis += m.distance
if m.distance < 0.45*n.distance:
good.append(m)
# print(dis, good)
if len(good) > 0:
dis = 0
return dis, good
for c in scs_:
dies = np.array(list(map(lambda c2:getDistance(c.astype(np.uint8), c2.astype(np.uint8))[0], bcs_[:-1])))
ind = np.argmin(dies)
print(dies, ind)
util.imshow([c, bcs_[ind]], title=f"{ind}")
[415.57067871 396.49591064 467.38955688 431.74066162 437.40942383 282.36679077] 5
[ 943.55612183 1089.0607605 992.17965698 930.19680786 1091.43273926 1040.69631958] 3
[125.49900055 259.79608154 300.69918823 388.82385254 400.69064331 323.56915283] 0
In [141]:
for i in scs:
util.imwrite(scs[1])
In [123]:
util.imshow(bcs_)
In [2]:
!pip list | findstr padd
paddleocr 2.4 paddlepaddle 2.2.2
1. 下载数据¶
In [1]:
import pandas as pd
def excel2list(excel_file: str, sheet: int = 0, engine="openpyxl", rows: tuple = None) -> list:
wb = pd.ExcelFile(excel_file, engine=engine)
df = wb.parse(sheet_name=sheet)
print(df.iloc[3])
li = []
for index, series in df.iterrows():
# if index == 0:
# print(series._index)
ar = series.array
if not rows:
li.append(ar.to_numpy().tolist())
else:
li.append([ar[i] for i in rows])
return li
items = excel2list(r"XQ1.xlsx")
import pandas as np
np.DataFrame(items)
D:\Users\Administrator\anaconda3\envs\python3.6\lib\importlib\_bootstrap.py:219: RuntimeWarning: numpy.ufunc size changed, may indicate binary incompatibility. Expected 192 from C header, got 216 from PyObject return f(*args, **kwds)
Unnamed: 0 3 stock_code SZ000651 author 格力电器(SZ000651) author_id -1 datetime 1637388604000 is_column 0 doc_title [招商证券:买入]拟控股盾安环境 增强产业链实力 新能源布局更进一步 retweet_count 5 reply_count 9 like_count 46 fans -1 doc 事件描述:格力电器11 月16 日晚公告,公司拟受让盾安精工所持盾安环境2.70 亿股股份,... crawl_time 1644914302970 doc_url https://xueqiu.com/S/SZ000651/203674545 user_url NaN Name: 3, dtype: object
Out[1]:
0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 14 | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 0 | SZ000651 | huanji | 7424865298 | 1641095114000 | 0 | NaN | 0 | 55 | 10 | 19 | <a href="http://xueqiu.com/S/SZ000651" target=... | 1644914302970 | https://xueqiu.com/7424865298/207728340 | https://xueqiu.com/u/7424865298 |
1 | 1 | SZ000651 | 佛系小资 | 1566609429 | 1640952292000 | 0 | NaN | 1 | 8 | 23 | 21517 | <h4>品质护航,驾“浴”清凉,格力推出新一代顶置式驻车空调</h4><p><a href=... | 1644914302970 | https://xueqiu.com/1566609429/207634993 | https://xueqiu.com/u/1566609429 |
2 | 2 | SZ000651 | 泉州李国彬 | 8995599040 | 1639200847000 | 0 | 第一次出现“格力光储空系统技术推广活动”报道 | 2 | 9 | 49 | 7036 | <p>12月9日-10日,</p><p>2021格力中央空调</p><p>全国巡回<stro... | 1644914302970 | https://xueqiu.com/8995599040/205664621 | https://xueqiu.com/u/8995599040 |
3 | 3 | SZ000651 | 格力电器(SZ000651) | -1 | 1637388604000 | 0 | [招商证券:买入]拟控股盾安环境 增强产业链实力 新能源布局更进一步 | 5 | 9 | 46 | -1 | 事件描述:格力电器11 月16 日晚公告,公司拟受让盾安精工所持盾安环境2.70 亿股股份,... | 1644914302970 | https://xueqiu.com/S/SZ000651/203674545 | NaN |
4 | 4 | SZ000651 | GM笨小孩 | 9770976443 | 1637160804000 | 0 | NaN | 3 | 42 | 39 | 2046 | 这个妹子是谁呢?年轻时候的董明珠。为啥咱觉得挺漂亮呢?胜过那些网红脸,动过刀子的脸吧?十倍还... | 1644914302970 | https://xueqiu.com/9770976443/203416713 | https://xueqiu.com/u/9770976443 |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
26741 | 26741 | SZ301168 | 梧桐树新股 | 6160714490 | 1639314657000 | 0 | 全面注册制&下周次新股出现重磅级信号! | 1 | 3 | 7 | 1911 | <p>周末最大新闻是:12月8日-10日,中央经J工作会议透露2022年资本市场改革风向,首... | 1644914306981 | https://xueqiu.com/6160714490/205719765 | https://xueqiu.com/u/6160714490 |
26742 | 26742 | SZ301168 | 无风说次新股 | 1071411538 | 1638173436000 | 1 | 通灵股份301168上市估值分析和申购建议 | 1 | 0 | 6 | 9521 | <p><b>重要警告:本号本人不荐股,文章内容属于个人操作心得的分享,仅供参考和交流学习,所... | 1644914306981 | https://xueqiu.com/1071411538/204457768 | https://xueqiu.com/u/1071411538 |
26743 | 26743 | SZ301168 | 无风说次新股 | 1071411538 | 1639128223000 | 1 | 12月10日次新股复盘,新股通灵股份偷袭临停成功! | 0 | 2 | 6 | 9521 | <p><b>重要警告:本号本人不荐股,文章内容属于个人操作心得的分享,仅供参考和交流学习,文... | 1644914306981 | https://xueqiu.com/1071411538/205615804 | https://xueqiu.com/u/1071411538 |
26744 | 26744 | SZ301168 | 唯红茶 | 1920422334 | 1639552100000 | 0 | NaN | 0 | 4 | 5 | 6024 | 12月15日收盘总结:<br/>1、集合竞价:核了春兰股份,垃圾。竞价嘉和美康,血套,垃圾。... | 1644914306981 | https://xueqiu.com/1920422334/206033355 | https://xueqiu.com/u/1920422334 |
26745 | 26745 | SZ301168 | 刘轶南_教师_珠海 | 8850764119 | 1638153156000 | 0 | NaN | 0 | 0 | 5 | 15663 | <p>泽宇智能(301179),江苏泽宇智能电力股份有限公司,电力系统集成配套建设,智能电网... | 1644914306981 | https://xueqiu.com/8850764119/204411506 | https://xueqiu.com/u/8850764119 |
26746 rows × 15 columns
In [1]:
import requests
import openpyxl
import util
util.debug = False
items = util.excel2list(r"C:\Users\Administrator\Desktop\雪球内容爬虫需求2022.1.10.xlsx")
import time
url_tem = 'http://xueqiu.com/query/v1/symbol/excellent/status.json?count=10&symbol=%s&hl=0&source=all&sort=1&page=%s&q=&type=11'
headers = {
"Host": "xueqiu.com",
"User-Agent": "Xueqiu Android 13.9",
"Accept": "*/*",
"Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
"Accept-Encoding": "gzip, deflate, br",
"Cookie": "xq_a_token=512da9d222c381fa39dc775676c85ba2aa1ae80b;",
}
def breakLoop(resp, ind):
"""判断是否跳出循环
Args:
resp ([响应数据]): [description]
ind ([int]): [已采集数量]
Returns:
[type]: [description]
"""
r = json.loads(resp)
# return ind >= r["maxPage"] or r["list"][0]["created_at"] < 1609430400000
return ind >= r["maxPage"]
# 存储下载失败的股票
erros = []
2022-02-15 15-35-28 [INFO] [util-MainThread] 89 - Index(['统计时间', '股吧内码', '基金代码', '实时排名', '股票中文名'], dtype='object')
In [3]:
import random
import json
import time
import os
for it in items:
try:
ind = 1
while True:
u = url_tem % (it[2], ind)
resp = requests.get(url=u, headers=headers).text
time.sleep(random.randint(15, 25) / 100)
d = f"xq/{it[2]}_{it[-1].replace('*', '-')}"
r = json.loads(resp)
if r["maxPage"] == 0:
u = url_tem % (it[2], 0)
resp = requests.get(url=u, headers=headers).text
# print("only one data: ", resp[:200])
if not os.path.exists(d):
os.makedirs(d)
with open(f"{d}/{ind}.json", "w", encoding="utf-8") as fw:
fw.write(resp)
break
print(it[2], ind, end="\r")
if not os.path.exists(d):
os.makedirs(d)
with open(f"{d}/{ind}.json", "w", encoding="utf-8") as fw:
fw.write(resp)
if breakLoop(resp, ind):
break
ind += 1
except Exception as e:
time.sleep(1.25)
print(f"{it[2]}_{it[-1]} ERRO", e, file=open("erro.log", "a", encoding="utf-8"))
erros.append(it)
SZ002709 73
后400条¶
In [3]:
import shutil
import os
for it in items[100:]:
d = f"xq/{it[2]}_{it[-1].replace('*', '-')}"
if os.path.exists(d):
print("remove", d)
shutil.rmtree(d)
remove xq/SH600444_国机通用 remove xq/SZ300624_万兴科技 remove xq/SZ300398_飞凯材料 remove xq/SZ000002_万科A remove xq/SH603466_风语筑 remove xq/SZ000858_五粮液 remove xq/SH603368_柳药股份 remove xq/SH600976_健民集团 remove xq/SH600276_恒瑞医药 remove xq/SH600277_亿利洁能 remove xq/SZ002265_西仪股份 remove xq/SZ002385_大北农 remove xq/SH600188_兖矿能源 remove xq/SZ002240_盛新锂能 remove xq/SZ002694_顾地科技 remove xq/SZ002746_仙坛股份 remove xq/SZ300603_立昂技术 remove xq/SZ001296_长江材料 remove xq/SZ300199_翰宇药业 remove xq/SH600010_包钢股份 remove xq/SH601318_中国平安
In [5]:
import random
import json
import time
import os
for it in items[100:]:
try:
ind = 1
while True:
u = url_tem % (it[2], ind)
resp = requests.get(url=u, headers=headers).text
time.sleep(random.randint(15, 25) / 100)
d = f"xq/{it[2]}_{it[-1].replace('*', '-')}"
r = json.loads(resp)
if r["maxPage"] == 0:
u = url_tem % (it[2], 0)
resp = requests.get(url=u, headers=headers).text
print("only one data: ", resp[:200])
if not os.path.exists(d):
os.makedirs(d)
with open(f"{d}/{ind}.json", "w", encoding="utf-8") as fw:
fw.write(resp)
break
print(it[2], ind, end="\r")
if not os.path.exists(d):
os.makedirs(d)
with open(f"{d}/{ind}.json", "w", encoding="utf-8") as fw:
fw.write(resp)
if breakLoop(resp, ind):
break
ind += 1
except Exception as e:
time.sleep(1.25)
print(f"{it[2]}_{it[-1]} ERRO", e, file=open("erro.log", "a", encoding="utf-8"))
erros.append(it)
only one data: {"about":"","count":0,"key":"","list":[],"maxPage":0,"page":1,"q":"SZ300325","query_id":1493053356506312704,"recommend_cards":[]} only one data: {"about":"","count":0,"key":"","list":[],"maxPage":0,"page":1,"q":"SZ002347","query_id":1493054767872827392,"recommend_cards":[]} only one data: {"about":"","count":0,"key":"","list":[],"maxPage":0,"page":1,"q":"SZ300878","query_id":1493055390986948610,"recommend_cards":[]} SZ002074 19
2. 处理一些出错的条目¶
In [3]:
import os
import json
erros.extend(
[["", '', "SH600518","*ST康美"],
["", '', "SZ000980","*ST众泰"]
]
)
for it in erros:
print(it)
try:
ind = 1
while True:
if os.path.exists(f"xq/{it[2]}_{it[-1].replace('*', '-')}/{ind}.json"):
ind += 1
continue
u = url_tem % (it[2], ind)
resp = requests.get(url=u, headers=headers).text
r = json.loads(resp)
if r["maxPage"] == 0:
u = url_tem % (it[2], 0)
resp = requests.get(url=u, headers=headers).text
print(resp[:200])
if not os.path.exists(f"xq/{it[2]}_{it[-1].replace('*', '-')}"):
os.makedirs(f"xq/{it[2]}_{it[-1].replace('*', '-')}")
with open(f"xq/{it[2]}_{it[-1].replace('*', '-')}/{ind}.json", "w", encoding="utf-8") as fw:
fw.write(resp)
break
time.sleep(0.25)
print(it[2], ind, end="\r")
if not os.path.exists(f"xq/{it[2]}_{it[-1].replace('*', '-')}"):
os.makedirs(f"xq/{it[2]}_{it[-1].replace('*', '-')}")
with open(f"xq/{it[2]}_{it[-1].replace('*', '-')}/{ind}.json", "w", encoding="utf-8") as fw:
fw.write(resp)
if breakLoop(resp, ind):
break
ind += 1
except Exception as e:
print(f"{it[2]}_{it[-1].replace('*', '-')} ERRO", e, )
['', '', 'SH600518', '*ST康美'] ['', '', 'SZ000980', '*ST众泰'] ['', '', 'SH600518', '*ST康美'] {"about":"SH600518","count":181,"key":"SH600518","list":[{"blocked":false,"blocking":false,"canEdit":true,"commentId":0,"controversial":false,"created_at":1637307251000,"description":"以前觉着当独立董事很容易,每年拿 ['', '', 'SZ000980', '*ST众泰'] {"about":"SZ000980","count":34,"key":"SZ000980","list":[{"blocked":false,"blocking":false,"canEdit":true,"card":{"data":"{\"items\":[{\"id\":61916,\"tag\":\"#雪球星计划#\",\"content\":\"\",\"pic\":null,\"b
3. 整理数据¶
In [ ]:
import math
import threading
import pandas as pd
import os
from collections import namedtuple as ntuple
import time
import json
import logging
lock = threading.Lock()
logging.basicConfig(level=logging.INFO,
format='%(asctime)s [%(levelname)s] [%(module)s-%(threadName)s] %(lineno)d - %(message)s',
datefmt="%Y-%m-%d %H-%M-%S")
logger = logging.getLogger()
# 看重了它的提示功能
e = ntuple("DTO", ['stock_code', 'author', 'author_id', 'datetime', 'is_column', 'doc_title',
'retweet_count', 'reply_count', 'like_count', 'fans', 'doc', 'crawl_time', 'doc_url', 'user_url'])
class Entity(dict):
'''实体定义'''
def __init__(self, *args):
it = iter(*(args))
for i in e._fields:
self[i] = str(next(it))
# excel = "XueQiu.xlsx"
# writer = pd.ExcelWriter(excel)
emptys = []
def get_entitys(cmt_file: str) -> list:
logger.info("%s接收到:%s%s", '#'*20, cmt_file, '#'*20)
info = json.load(open(cmt_file, 'r', encoding="utf-8"))
stock = info['key']
cmts = info['list']
if len(cmts) < 1:
logger.error("【%s】没东西啊", cmt_file)
emptys.append(cmt_file)
res = []
for cmt in cmts:
res.append(
Entity(e(
stock,
cmt["user"]["screen_name"],
cmt["user_id"],
cmt["created_at"],
1 if cmt["mark"] == 5 else 0,
cmt["title"],
cmt["retweet_count"],
cmt["reply_count"],
cmt["like_count"],
cmt["user"]["followers_count"] if cmt["user_id"] != -1 else -1,
cmt["text"],
int(time.time() * 1000),
'https://xueqiu.com' + cmt["target"],
('https://xueqiu.com/u/' + str(cmt["user_id"]) if cmt["user_id"] != -1 else "")
))
)
return res
def write_to_excel(cmt_file: str, writer: pd.ExcelWriter=None):
global start_row, result, size
try:
res = get_entitys(cmt_file)
result.extend(res)
if len(res) > size:
df1 = pd.DataFrame(result)
excel = f"XQ{start_row}.xlsx"
writer = pd.ExcelWriter(excel, mode="w", engine='xlsxwriter')
df1.to_excel(writer, 'dataset', startcol=0,
startrow=0, encoding="utf-8")
writer.save()
result = []
print("write to", excel)
start_row += 1
except Exception as e:
logger.error("deal[%s] Failed, %s", cmt_file, e)
size = 25000
root = r'xq'
dirs = os.listdir(root)
logger.info("要开始了")
start_row = 0
tasks = []
result = []
s = time.time()
for d in dirs:
for f in os.listdir(os.path.join(root,d)):
cmt_file = f'xq/{d}/{f}'
write_to_excel(cmt_file)
logger.info("处理耗时: %s[s]", time.time() - s)
# import util
# util.dump(emptys, "ry.json")
# s = time.time()
# for i in range(math.ceil(len(result) / size)):
# df1 = pd.DataFrame(result[i * size: min((i + 1) * size, len(result))])
# excel = f"XQ{i}.xlsx"
# writer = pd.ExcelWriter(excel, mode="w", engine='xlsxwriter')
# df1.to_excel(writer, 'dataset', startcol=0,
# startrow=0, encoding="utf-8")
# writer.save()
# logger.info("写入耗时: %s[s]", time.time() - s)
In [ ]:
print("SUCCESS")
In [ ]:
In [ ]:
In [1]:
import json
xq = json.load(open(r"C:\Users\Administrator\Desktop\xq.json", "r", encoding="utf-8"))
In [10]:
n = 0
for it in xq["list"]:
if it["mark"] == 5:
n += 1
print(f"专栏{n}:", it["description"][:300])
print()
专栏1: 21年度老柏的证券投资回撤幅度创个人投资史新高,负复利的威力对长期投资复利伤害较大,从记录投资以来年化复利降为4%,可见一般。 对于这一短期结果,从资金代入感的角度,难言满意。然而,从持有股份数量变动的角度,我是满意的,这并不是聊以自慰。 我提倡长期持续净买入,把收集优质股权作为... 专栏2: A股经过了2019年(收益68%)和2020年(收益60%)的吃大肉行情,我年初预测2021是投资小年。可是当真的走过了2021年,还是要感叹太不容易了。2021年是有人喝酒吃肉,有人吃糠咽菜的一年。如果是大盘单边下跌,大家也没什么意见。问题是有人赚的盆满钵满,有人亏的丁零当啷。所以2021年是股市投资见... 专栏3: <a href="https://www.ximalaya.com/shangye/18599130/487919373" title="https://www.ximalaya.com/shangye/18599130/487919373" target="_blank">本文语音版</a> 早晨起来,天气格外的好,打开窗户,放一首淡淡的音乐,认真的和2021年做个告别。 去年元旦的情景还历历在目,转眼2021年就要过去了。 时间可真快,父母又老了一岁,自己也成长了一年。长大后唯一的愿望就是多赚点钱,多带他们... 专栏4: 最近网络上流传一段价值投资大v唐朝关于三傻的随想,不经意已经看到了3,4次别人的分享,近期有朋友私信我分享了这段随想。 (注,三傻指的是股市中近几年估值低但不涨的一些板块,可以理解为银行地产保险,狭义上可以理解为平安,万科,格力) 原文:(唐门小卒为唐朝的某粉丝) 唐门小卒: 持仓多... 专栏5: 昨天,中国平安公布了2021年全年的保费收入,评论区一篇沸腾。 主流观点是:总保费下降4.6%,新业务下降4.8%,代理人相比2020年底下降了30%。在代理人大幅下降的情况下,总保费和新业务价值只有小幅下滑,且2021年12月份单月保费实现了1.6%的正增长,中国平安的春天终于来了。 中国平安的拐点,真... 专栏6: A股散户数最多的10只股票,看看有你的股票吗? 第一名:京东方 A 156.95万 1 第二名:中国平安 130.36万 2 第三名:三一重工 115.03万 3 第四名:中国电信 109.63万 4 第五名:三峡能源 104.30万 5 第六名:兰州银行 98.59万 6 第七名:包钢股份 94.78万 7 第八名:格力电器 88.80万 8 第九名:TC...
In [15]:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from PIL import Image
from six import BytesIO
import time
from selenium.webdriver import ActionChains
In [16]:
driver = webdriver.Chrome("d:/chromedriver.exe")
In [42]:
driver.get('http://172.31.227.161:9527/#/login?redirect=%2Fcrawler-configuration%2Fjob-config%2Fpage%2Findex')
In [46]:
captcha = driver.find_element_by_css_selector('div[class="em_widget em_show"')
In [40]:
verify_bt = driver.find_element_by_css_selector(".em_init")
In [26]:
from selenium.webdriver.common import touch_actions, action_chains
from selenium.webdriver.remote.command import Command
touch = touch_actions.TouchActions(driver=driver)
In [50]:
ActionChains(driver=driver).move_to_element(captcha) \
.move_by_offset(0, -40) \
.click() \
.perform() \
In [41]:
driver.execute(Command.CLICK_ELEMENT, {'id': verify_bt.id,
'button': 0,
'xoffset': 50,
'yoffset': 50})
Out[41]:
{'value': None}
In [2]:
driver = webdriver.Chrome()
driver.get('https://m.ctrip.com/html5/flight/swift/domestic/SHA/CAN/2022-02-18')
In [5]:
from bs4 import BeautifulSoup as bs
source = driver.page_source
In [6]:
soup = bs(source)
In [12]:
div = soup.find('div', {'class': 'cpt-choose-box cpt-choose-box-pop'})
In [16]:
b = div.find('img', {'class': "cpt-big-img"})
s = div.find('img', {'class': "cpt-small-img"})
In [ ]:
import cv2
In [33]:
import base64
from PIL import Image
with open("t", 'wb') as fr:
fr.write(base64.b64decode(b.attrs['src'][22:]))
Image.open('t')
Out[33]:
In [ ]:
def get_url(url,user,password):
browser = webdriver.Chrome()
browser.get(url)
browser.maximize_window()
wait = WebDriverWait(browser,10)
wait.until(EC.presence_of_element_located((By.CLASS_NAME, 'geetest_radar_btn')))
user_input = browser.find_element_by_id('username')
pwd_input = browser.find_element_by_id('password')
btn = browser.find_element_by_css_selector('.geetest_radar_btn')
user_input.send_keys(user)
pwd_input.send_keys(password)
btn.click()
time.sleep(0.5)
return browser
In [ ]:
def get_position(img_label):
location = img_label.location
size = img_label.size
top, bottom, left, right = location['y'], location['y'] + size['height'], location['x'], location['x'] + size[
'width']
return (left, top, right, bottom)
In [ ]:
def get_screenshot(browser):
screenshot = browser.get_screenshot_as_png()
f = BytesIO()
f.write(screenshot)
return Image.open(f)
In [ ]:
def get_position_scale(browser,screen_shot):
height = browser.execute_script('return document.documentElement.clientHeight')
width = browser.execute_script('return document.documentElement.clientWidth')
x_scale = screen_shot.size[0] / (width+10)
y_scale = screen_shot.size[1] / (height)
return (x_scale,y_scale)
In [ ]:
def get_slideimg_screenshot(screenshot,position,scale):
x_scale,y_scale = scale
position = [position[0] * x_scale, position[1] * y_scale, position[2] * x_scale, position[3] * y_scale]
return screenshot.crop(position)
In [ ]:
def compare_pixel(img1,img2,x,y):
pixel1 = img1.load()[x,y]
pixel2 = img2.load()[x,y]
threshold = 50
if abs(pixel1[0]-pixel2[0])<=threshold:
if abs(pixel1[1]-pixel2[1])<=threshold:
if abs(pixel1[2]-pixel2[2])<=threshold:
return True
return False
def compare(full_img,slice_img):
left = 0
for i in range(full_img.size[0]):
for j in range(full_img.size[1]):
if not compare_pixel(full_img,slice_img,i,j):
return i
return left
In [6]:
distance = 100
# 移动轨迹
track = []
# 当前位移
current = 0
# 减速阈值
mid = distance * 4 / 5
# 计算间隔
t = 0.2
# 初速度
v = 0
while current < distance:
if current < mid:
# 加速度为正 2
a = 4
else:
# 加速度为负 3
a = -3
# 初速度 v0
v0 = v
# 当前速度 v = v0 + at
v = v0 + a * t
# 移动距离 x = v0t + 1/2 * a * t^2
move = v0 * t + 1 / 2 * a * t * t
# 当前位移
current += move
# 加入轨迹
track.append(round(current))
In [8]:
track
Out[8]:
[0, 0, 1, 1, 2, 3, 4, 5, 6, 8, 10, 12, 14, 16, 18, 20, 23, 26, 29, 32, 35, 39, 42, 46, 50, 54, 58, 63, 67, 72, 77, 82, 87, 92, 97, 101]
In [1]:
def get_track(distance):
"""
根据偏移量获取移动轨迹
:param distance: 偏移量
:return: 移动轨迹
"""
# 移动轨迹
track = []
# 当前位移
current = 0
# 减速阈值
mid = distance * 4 / 5
# 计算间隔
t = 0.2
# 初速度
v = 0
while current < distance:
if current < mid:
# 加速度为正 2
a = 4
else:
# 加速度为负 3
a = -3
# 初速度 v0
v0 = v
# 当前速度 v = v0 + at
v = v0 + a * t
# 移动距离 x = v0t + 1/2 * a * t^2
move = v0 * t + 1 / 2 * a * t * t
# 当前位移
current += move
# 加入轨迹
# track.append(round(move))
track.append(round(current))
return track
In [ ]:
def move_to_gap(browser,slider, tracks):
"""
拖动滑块到缺口处
:param slider: 滑块
:param tracks: 轨迹
:return:
"""
ActionChains(browser).click_and_hold(slider).perform()
for x in tracks:
ActionChains(browser).move_by_offset(xoffset=x, yoffset=0).perform()
time.sleep(0.5)
ActionChains(browser).release().perform()
In [ ]:
if __name__ == '__main__':
browser = get_url('https://account.zbj.com/login','11111111111','********') #此函数的定义在第3点
time.sleep(1)
slice_img_label = browser.find_element_by_css_selector('div.geetest_slicebg') #找到滑动图片标签
browser.execute_script("document.getElementsByClassName('geetest_canvas_slice')[0].style['display'] = 'none'") #将小块隐藏
full_img_label = browser.find_element_by_css_selector('canvas.geetest_canvas_fullbg') #原始图片的标签
position = get_position(slice_img_label) #获取滑动验证图片的位置,此函数的定义在第4点
screenshot = get_screenshot(browser) # 截取整个浏览器图片,此函数的定义在第5点
position_scale = get_position_scale(browser,screenshot) #获取截取图片宽高和浏览器宽高的比例,此函数的定义在第6点
slice_img = get_slideimg_screenshot(screenshot,position,position_scale) #截取有缺口的滑动验证图片,此函数的定义在第7点
browser.execute_script("document.getElementsByClassName('geetest_canvas_fullbg')[0].style['display'] = 'block'") #在浏览器中显示原图
screenshot = get_screenshot(browser) #获取整个浏览器图片
full_img = get_slideimg_screenshot(screenshot,position,position_scale) # 截取滑动验证原图
browser.execute_script("document.getElementsByClassName('geetest_canvas_slice')[0].style['display'] = 'block'") #将小块重新显示
left = compare(full_img,slice_img) #将原图与有缺口图片进行比对,获得缺口的最左端的位置,此函数定义在第8点
left = left / position_scale[0] #将该位置还原为浏览器中的位置
slide_btn = browser.find_element_by_css_selector('.geetest_slider_button') #获取滑动按钮
track = get_track(left) #获取滑动的轨迹,此函数定义在第9点
move_to_gap(browser,slide_btn,track) #进行滑动,此函数定义在第10点
success = browser.find_element_by_css_selector('.geetest_success_radar_tip') #获取显示结果的标签
time.sleep(2)
if success.text == "验证成功":
login_btn = browser.find_element_by_css_selector('button.j-login-btn') #如果验证成功,则点击登录按钮
login_btn.click()
else:
print(success.text)
print('失败')
上下文管理工具¶
In [1]:
import contextlib
import logging as log
from urllib.request import urlopen
log.basicConfig(level=log.DEBUG,
datefmt="%Y-%m-%d %H:%M:%S",
format="[%(asctime)s] [%(levelname)s] %(lineno)d: %(message)s"
)
log.info("##########################################")
with contextlib.closing(urlopen('https://www.python.org')) as page:
ind = 0
for line in page:
if ind > 3:
break
log.info(line)
ind += 1
log.info("##########################################")
class Test(object):
def __init__(self, name, *var, **kv):
self.name = name
@contextlib.contextmanager
def create_test(*var, **kv):
test = Test(var, kv)
log.info("进入 Test[%s]..." % test.name)
try:
yield test
except Exception as s:
log.error(s)
...
log.info("退出 Test[%s]..." % test.name)
with create_test("李元芳") as test:
log.info("test Test", 0/0)
log.info("##########################################")
class TestAutoClose(object):
def __init__(self, name, *var, **kv):
self.name = name
def __enter__(self):
log.info("进入 TestAutoClose[%s]..." % self.name)
def __exit__(self, exc_type, exc_value, traceback):
if traceback:
for i in dir(traceback):
log.warning(i, eval('traceback.' + i))
log.info("退出 TestAutoClose[%s]..." % self.name)
with TestAutoClose("狄仁杰") as test:
log.info("test TestAutoClose")
[2022-01-13 18:39:48] [INFO] 10: ########################################## [2022-01-13 18:39:48] [INFO] 16: b'<!doctype html>\n' [2022-01-13 18:39:48] [INFO] 16: b'<!--[if lt IE 7]> <html class="no-js ie6 lt-ie7 lt-ie8 lt-ie9"> <![endif]-->\n' [2022-01-13 18:39:48] [INFO] 16: b'<!--[if IE 7]> <html class="no-js ie7 lt-ie8 lt-ie9"> <![endif]-->\n' [2022-01-13 18:39:48] [INFO] 16: b'<!--[if IE 8]> <html class="no-js ie8 lt-ie9"> <![endif]-->\n' [2022-01-13 18:39:48] [INFO] 19: ########################################## [2022-01-13 18:39:48] [INFO] 27: 进入 Test[李元芳]... [2022-01-13 18:39:48] [ERROR] 31: division by zero [2022-01-13 18:39:48] [INFO] 33: 退出 Test[李元芳]... [2022-01-13 18:39:48] [INFO] 38: ########################################## [2022-01-13 18:39:48] [INFO] 44: 进入 TestAutoClose[狄仁杰]... [2022-01-13 18:39:48] [INFO] 53: test TestAutoClose [2022-01-13 18:39:48] [INFO] 50: 退出 TestAutoClose[狄仁杰]...
操作 sqlite 数据库¶
In [1]:
import sqlite3
conn = sqlite3.connect("test.db")
cursor = conn.cursor()
cursor.execute("""
create table user(
id varchar(20) primary key,
name varchar(20)
)
""")
# 插入数据
cursor.execute("""
insert into user (id, name)
values ('1', '狄仁杰'), ('2', '李元芳'), ('3', '曾泰'), ('4', '张环')
""")
conn.commit()
# 查询数据
cursor.execute("""
select * from user;
""")
print(cursor.fetchall())
cursor.close()
conn.close()
[('1', '狄仁杰'), ('2', '李元芳'), ('3', '曾泰'), ('4', '张环')]
In [2]:
import struct
In [11]:
img = r"D:\DongDongsFiles\image\2022-01\1e8ad5e4-8d95-461c-a15e-4d77da9ad13f.jpg"
with open(img, "rb") as fr:
res = struct.unpack(">" + "c" * 8, fr.read(8))
In [12]:
print(res)
(b'\xff', b'\xd8', b'\xff', b'\xe0', b'\x00', b'\x10', b'J', b'F')
In [15]:
for c in res:
print(c.decode("unicode-escape"))
ÿ Ø ÿ à J F