In [4]:
__file__ = "XUEQIU"
In [5]:
# -*- coding: utf-8 -*-
from util import setApp, request_mapping, get_mapping, put_mapping, delete_mapping, post_mapping
import logging
from flask import Flask, render_template, request
__author__ = 'LDS'


logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s [%(levelname)s] [%(module)s-%(threadName)s]  %(lineno)d - %(message)s',
                    datefmt="%Y-%m-%d %H-%M-%S",
                    handlers=[logging.FileHandler(
                        '%s.log' % __file__.split('.')[0], mode="a", encoding="utf-8")])

log = logging.getLogger(__name__)
app = Flask(__name__)

View = render_template


# 转换为 Java 中SpringMVC 的方法名
exec(setApp(app))


@request_mapping("/", methods=['GET', 'POST'])
def home():
    return View('home.html')


@get_mapping('/signin')
def signin_from():
    return View('form.html')


@post_mapping('/signin')
def signin():
    username = request.form['username']
    password = request.form['password']
    if username == 'admin' and password == 'password':
        return View('signin-ok.html', username=username)
    return View('form.html', message='Bad username or password', username=username)


if __name__ == '__main__':
    log.info("服务已开启...")
    app.run(debug=True)
 * Serving Flask app 'XUEQIU' (lazy loading)
 * Environment: production
   WARNING: This is a development server. Do not use it in a production deployment.
   Use a production WSGI server instead.
 * Debug mode: on
2022-10-10 09:20:16 [INFO] [_internal-MainThread]  225 -  * Restarting with stat
An exception has occurred, use %tb to see the full traceback.

SystemExit: 1
d:\Users\Administrator\anaconda3\envs\python3.6\lib\site-packages\IPython\core\interactiveshell.py:3327: UserWarning: To exit: use 'exit', 'quit', or Ctrl-D.
  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
In [6]:
 
UsageError: Line magic function `%t` not found.
In [1]:
import requests

trigger = '416693'
headers = {
    'Host': 'ucp.emnj',
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:103.0) Gecko/20100101 Firefox/103.0',
    'Accept': 'application/json, text/plain, */*',
    'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
    'Accept-Encoding': 'gzip, deflate',
    'X-Token': '8b9d9a13-7ff8-45e7-a502-24ced6ff8e92',
    'Connection': 'keep-alive',
    'Referer': 'http://ucp.emnj/',
    'Cookie': 'sid=8b9d9a13-7ff8-45e7-a502-24ced6ff8e92; token=8b9d9a13-7ff8-45e7-a502-24ced6ff8e92',
    'Pragma': 'no-cache',
    'Cache-Control': 'no-cache',
}

fail_items = []
for i in range(1, 16):
    fail_items.extend(
        requests.get(f'	http://ucp.emnj/task-center-mgr/api/spider/task/list?trigger={trigger}&page={i}&pageSize=10', headers = headers)\
            .json()['data']
    )
import json
fail_items = list(filter(lambda it: it['state'] == 'F', fail_items))
run_parms = list(map(lambda it: json.loads(it['runParam']), fail_items))
print(len(run_parms))
json.dump(run_parms, open(r'C:\Users\Administrator\Desktop\ctrip_fail_runparams.json', 'w', encoding='utf-8'),ensure_ascii=False)
85
In [4]:
 
In [ ]:
import random
with open(r"D:\active_user2_20220817.txt", "r", encoding='utf-8') as fr:
    with open(r'd:/uids.txt', 'a', encoding= 'utf-8') as fw:
        fw.write('[')
        for _ in range(1000_0000):
            lines = [fr.readline() for _ in range(10)]
            print(lines)
            uid = random.choice(lines).split('|')[1].strip()
            fw.write(f'"{uid}",')
        fw.write('""]')    
In [ ]:
# @author: AIslandX
# @date: 2022-01-01

import hashlib
import json
import logging
import random
import time

import requests
from fake_useragent import UserAgent

# 参考文章:
#   - 机场列表 - 维基百科
#     https://zh.wikipedia.org/wiki/%E4%B8%AD%E5%8D%8E%E4%BA%BA%E6%B0%91%E5%85%B1%E5%92%8C%E5%9B%BD%E6%9C%BA%E5%9C%BA%E5%88%97%E8%A1%A8
#   - 携程国际机票sign破解 https://blog.csdn.net/weixin_38927522/article/details/108214323
#   - 至于前端反反爬虫,看完这篇你就可以毕业了 https://zhuanlan.zhihu.com/p/250176143


ua = UserAgent()


def get_cookie_bfa():
    random_str = "abcdefghijklmnopqrstuvwxyz1234567890"
    random_id = ""
    for _ in range(6):
        random_id += random.choice(random_str)
    t = str(int(round(time.time() * 1000)))

    bfa_list = ["1", t, random_id, "1", t, t, "1", "1"]
    bfa = "_bfa={}".format(".".join(bfa_list))
    # e.g. _bfa=1.1639722810158.u3jal2.1.1639722810158.1639722810158.1.1
    return bfa


# 获取调用携程 API 查询航班接口 Header 中所需的参数 sign
def get_sign(transaction_id, departure_city_code, arrival_city_code, departure_date):
    sign_value = transaction_id + departure_city_code + arrival_city_code + departure_date
    _sign = hashlib.md5()
    _sign.update(sign_value.encode('utf-8'))
    return _sign.hexdigest()


# 获取 transactionID 及航线数据
def get_transaction_id(departure_city_code, arrival_city_code, departure_date, cabin):
    flight_list_url = "https://flights.ctrip.com/international/search/api/flightlist" \
                      "/oneway-{}-{}?_=1&depdate={}&cabin={}&containstax=1" \
        .format(departure_city_code, arrival_city_code, departure_date, cabin)
    flight_list_req = requests.get(url=flight_list_url)
    if flight_list_req.status_code != 200:
        logging.error("get transaction id failed, status code {}".format(flight_list_req.status_code))
        return "", None

    try:
        flight_list_data = flight_list_req.json()["data"]
        transaction_id = flight_list_data["transactionID"]
    except Exception as e:
        logging.error("get transaction id failed, {}".format(e))
        return "", None

    return transaction_id, flight_list_data


# 获取航线具体信息与航班数据
def get_flight_info(departure_city_code, arrival_city_code, departure_date, cabin):
    # 获取 transactionID 及航线数据
    transaction_id, flight_list_data = get_transaction_id(departure_city_code, arrival_city_code, departure_date, cabin)
    print(transaction_id, flight_list_data)
    if transaction_id == "" or flight_list_data is None:
        return False, None

    # 获取调用携程 API 查询航班接口 Header 中所需的参数 sign
    sign = get_sign(transaction_id, departure_city_code, arrival_city_code, departure_date)

    # cookie 中的 bfa
    bfa = get_cookie_bfa()

    # 构造请求,查询数据
    search_url = "https://flights.ctrip.com/international/search/api/search/batchSearch"
    search_headers = {
        "transactionid": transaction_id,
        "sign": sign,
        "scope": flight_list_data["scope"],
        "origin": "https://flights.ctrip.com",
        "referer": "https://flights.ctrip.com/online/list/oneway-{}-{}"
                   "?_=1&depdate={}&cabin={}&containstax=1".format(departure_city_code, arrival_city_code,
                                                                   departure_date, cabin),
        "content-type": "application/json;charset=UTF-8",
        "user-agent": ua.chrome,
        "x-forwarded-for": "196.32.65.5",
        "X-Forwarded-For": "196.32.65.5",
        "WL-Proxy-Client-IP": "196.32.65.5",
        "Proxy-Client-IP": "196.32.65.5",
        "cookie": bfa,
    }
    r = requests.post(url=search_url, headers=search_headers, data=json.dumps(flight_list_data))

    if r.status_code != 200:
        logging.error("get flight info failed, status code {}".format(r.status_code))
        return False, None

    try:
        result_json = r.json()
        if result_json["data"]["context"]["flag"] != 0:
            logging.error("get flight info failed, {}".format(result_json))
            return False, None
    except Exception as e:
        logging.error("get flight info failed, {}".format(e))
        return False, None

    if "flightItineraryList" not in result_json["data"]:
        result_data = []
    else:
        result_data = result_json["data"]["flightItineraryList"]
    return True, result_data

for _ in range(30):
    # 日志通用配置
    LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s"
    logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)

    # 离开城市代码
    departureCityCode = "TNA"
    # 到达城市代码
    arrivalCityCode = "CGQ"
    # 离开时间
    departureDate = time.strftime('%Y-%m-%d')
    # 飞机舱位 Y - 经济舱
    # 参考:https://baike.baidu.com/item/%E9%A3%9E%E6%9C%BA%E8%88%B1%E4%BD%8D/4764328
    cabin = "Y"

    # departureCityCode, arrivalCityCode, departureDate = "GOQ", "CGQ", "2022-01-29"

    ok, example_result = get_flight_info(departureCityCode, arrivalCityCode, departureDate, cabin)
    if ok:
        print(json.dumps(example_result, ensure_ascii=False))
        print("success", end="\r")
    else:
        print("获取失败")
        break

1 文字定位¶

1.1 大图片¶

In [17]:
import cv2
import matplotlib.pyplot as plt
import util


s = cv2.imread(r's')
plt.imshow(cv2.cvtColor(s, cv2.COLOR_BGR2RGB))
plt.show()

b = cv2.imread(r'b')
plt.imshow(cv2.cvtColor(b, cv2.COLOR_BGR2RGB))
plt.show()
No description has been provided for this image
No description has been provided for this image
In [18]:
util.imshow(b)
No description has been provided for this image
In [6]:
import numpy as np

b_gray = cv2.cvtColor(b, cv2.COLOR_RGB2GRAY) 
b_binary = np.where(b_gray < 20, 255, 0).astype(np.uint8)
# b_binary = cv2.adaptiveThreshold(
#             b_binary,
#             255,
#             cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
#             cv2.THRESH_BINARY,
#             3,
#             2,
#         )
plt.imshow(b_binary)
util.imwrite(b_binary)
No description has been provided for this image
In [7]:
import util
k = [[0,1,0],
     [1,1,1],
     [0,1,0]]
img = util.erode(b_binary, k)
plt.imshow(util.dilate(img, k))
plt.show()
No description has been provided for this image
In [8]:
# 弄脏
import util
k = [[1,1,1],[1,1,1],[1,1,1]]
b_erode = util.erode(b_binary, k)
b_dilate = util.dilate(b_erode, k)
b_dilate = util.dilate(b_dilate, k)
b_dilate = util.dilate(b_dilate, k)
b_dilate = util.dilate(b_dilate, k)
b_dilate = util.dilate(b_dilate, k)
b_dilate = util.dilate(b_dilate, k)

plt.imshow(b_dilate)
plt.show()
No description has been provided for this image
In [9]:
rects = util.filter_list(lambda rt: util.calc_h(rt) > 20 and util.calc_w(rt) > 20, util.getMaxRects(b_dilate))
_ = util.drawRects(b, rects=rects, line_color=[0,255,0], show_result_img=True)
No description has been provided for this image

1.2 小图片¶

In [10]:
import importlib

importlib.reload(util)
Out[10]:
<module 'util' from 'c:\\Users\\Administrator\\Desktop\\lb-gdp-lds-learning-record\\lds\\python\\util.py'>
In [11]:
import numpy as np

import util

s_gary = cv2.cvtColor(s, cv2.COLOR_RGB2GRAY) 
s_binary = np.where(s_gary < 20, 255, 0)
s_dilate = util.dilate(s_binary, util.CV2_K_CROSS((3,3)))
plt.imshow(s_dilate)
util.imwrite(s_dilate)

# edge_output = cv2.Canny(grayImg, 230, 255)
# #提取上一步中处理好的图像边缘,50和150分别代表低阈值和高阈值,高阈值用来将物体与背景区分开来,低的用于平滑连接高阈值产生的片段,使图像成一个整体
# plt.imshow(edge_output)#输出灰度图像
# plt.show()
No description has been provided for this image

2.1 计算点击的位置(相对于验证码图片中心的位置)¶

In [12]:
plt.imshow(s_binary)
Out[12]:
<matplotlib.image.AxesImage at 0x200f3b0b248>
No description has been provided for this image
In [21]:
s_chars_imgs = []
n = 0
for i in util.getSubImg(s, util.getMaxRects(s_dilate)):
    s_chars_imgs.append(i)
    n += 1
    cv2.imwrite(f'{n}.png', i)
    plt.imshow(i)
    plt.show()
util.imshow(i)
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
In [22]:
b_chars_imgs = []
for i in util.getSubImg(b, util.getMaxRects(b_dilate)):
    b_chars_imgs.append(i)
    n += 1
    cv2.imwrite(f'{n}.png', i)
    plt.imshow(i)
    plt.show()
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
In [24]:
len(s_chars_imgs), len(b_chars_imgs)
Out[24]:
(3, 7)
In [29]:
result
Out[29]:
[('钟', 0.99425983)]
In [2]:
import paddleocr.paddleocr as ppocr
from paddleocr import PaddleOCR

# args = ppocr.parse_args()
image_path = '1.png'

engine = PaddleOCR(use_angle_cls=True)

for i in range(1, 11):
    image_path = f'{i}.png'
    result = engine.ocr(image_path,
            det=False,
            rec=True,
            cls=True)

    if result is not None:
        for line in result:
            char, prob = line
            print(char, prob)
Namespace(benchmark=False, cls_batch_num=6, cls_image_shape='3, 48, 192', cls_model_dir='C:\\Users\\Administrator/.paddleocr/2.4\\ocr\\cls\\ch_ppocr_mobile_v2.0_cls_infer', cls_thresh=0.9, cpu_threads=10, crop_res_save_dir='./output', det=True, det_algorithm='DB', det_db_box_thresh=0.6, det_db_score_mode='fast', det_db_thresh=0.3, det_db_unclip_ratio=1.5, det_east_cover_thresh=0.1, det_east_nms_thresh=0.2, det_east_score_thresh=0.8, det_limit_side_len=960, det_limit_type='max', det_model_dir='C:\\Users\\Administrator/.paddleocr/2.4\\ocr\\det\\ch\\ch_PP-OCRv2_det_infer', det_pse_box_thresh=0.85, det_pse_box_type='box', det_pse_min_area=16, det_pse_scale=1, det_pse_thresh=0, det_sast_nms_thresh=0.2, det_sast_polygon=False, det_sast_score_thresh=0.5, draw_img_save_dir='./inference_results', drop_score=0.5, e2e_algorithm='PGNet', e2e_char_dict_path='./ppocr/utils/ic15_dict.txt', e2e_limit_side_len=768, e2e_limit_type='max', e2e_model_dir=None, e2e_pgnet_mode='fast', e2e_pgnet_score_thresh=0.5, e2e_pgnet_valid_set='totaltext', enable_mkldnn=False, gpu_mem=500, help='==SUPPRESS==', image_dir=None, ir_optim=True, label_list=['0', '180'], label_map_path='./vqa/labels/labels_ser.txt', lang='ch', layout_path_model='lp://PubLayNet/ppyolov2_r50vd_dcn_365e_publaynet/config', max_batch_size=10, max_seq_length=512, max_text_length=25, min_subgraph_size=15, mode='structure', model_name_or_path=None, ocr_version='PP-OCRv2', output='./output', precision='fp32', process_id=0, rec=True, rec_algorithm='CRNN', rec_batch_num=6, rec_char_dict_path='d:\\Users\\Administrator\\anaconda3\\envs\\paddle\\lib\\site-packages\\paddleocr\\ppocr\\utils\\ppocr_keys_v1.txt', rec_image_shape='3, 32, 320', rec_model_dir='C:\\Users\\Administrator/.paddleocr/2.4\\ocr\\rec\\ch\\ch_PP-OCRv2_rec_infer', save_crop_res=False, save_log_path='./log_output/', show_log=True, structure_version='STRUCTURE', table_char_dict_path=None, table_char_type='en', table_max_len=488, table_model_dir=None, total_process_num=1, type='ocr', use_angle_cls=True, use_dilation=False, use_gpu=False, use_mp=False, use_onnx=False, use_pdserving=False, use_space_char=True, use_tensorrt=False, vis_font_path='./doc/fonts/simfang.ttf', warmup=False)
大 0.8738931
本 0.99865985
钟 0.99425983
数 0.9624093
细 0.994358
本 0.9114106
钟 0.9931177
松 0.9952101
大 0.9434993
1 0.11578398

3.1 确定点击顺序¶

In [15]:
scs = s_chars_imgs
bcs = b_chars_imgs
In [16]:
for ci in scs:
    util.imshow(ci)
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
In [37]:
for ci in bcs:
    util.imshow(ci)
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
In [38]:
bcs_, scs_ = [], []
b_mask = util.getSubImg(b_binary, util.simpleLocate(b_dilate))
s_mask = util.getSubImg(s_binary, util.simpleLocate(s_dilate))
for i in range(len(b_mask)):
    bcs_.append(np.bitwise_and(np.bitwise_not(bcs[i]), np.expand_dims(b_mask[i], 2)))
for i in range(len(s_mask)):
    scs_.append(np.bitwise_and(np.bitwise_not(scs[i]), np.expand_dims(s_mask[i], 2)))
In [39]:
def getDistance(img1, img2):
    """_summary_

    Args:
        img1 (_type_): _description_
        img2 (_type_): _description_
    """
    # 初始化SIFT描述符
    sift = cv2.xfeatures2d.SIFT_create()
    kp1, des1 = sift.detectAndCompute(img1, None)
    kp2, des2 = sift.detectAndCompute(img2, None)

    # 默认参数初始化BF匹配器
    bf = cv2.BFMatcher()
    matches = bf.knnMatch(des1, des2, k=2)

    # 从k个匹配结果种筛选出好的匹配结果
    dis = 0
    good = []
    for m, n in matches:
        dis += m.distance
        if m.distance < 0.45*n.distance:
            good.append(m)
    # print(dis, good)
    if len(good) > 0:
        dis = 0
    return dis, good


for c in scs_:
    dies = np.array(list(map(lambda c2:getDistance(c.astype(np.uint8), c2.astype(np.uint8))[0], bcs_[:-1])))
    ind = np.argmin(dies)
    print(dies, ind)
    util.imshow([c, bcs_[ind]], title=f"{ind}")
[415.57067871 396.49591064 467.38955688 431.74066162 437.40942383
 282.36679077] 5
No description has been provided for this image
No description has been provided for this image
[ 943.55612183 1089.0607605   992.17965698  930.19680786 1091.43273926
 1040.69631958] 3
No description has been provided for this image
No description has been provided for this image
[125.49900055 259.79608154 300.69918823 388.82385254 400.69064331
 323.56915283] 0
No description has been provided for this image
No description has been provided for this image
In [141]:
for i in scs:
    util.imwrite(scs[1])
In [123]:
util.imshow(bcs_)
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
No description has been provided for this image
In [2]:
!pip list | findstr padd
paddleocr                     2.4
paddlepaddle                  2.2.2

1. 下载数据¶

In [1]:
import pandas as pd

def excel2list(excel_file: str, sheet: int = 0, engine="openpyxl", rows: tuple = None) -> list:
    wb = pd.ExcelFile(excel_file, engine=engine)
    df = wb.parse(sheet_name=sheet)
    print(df.iloc[3])
    li = []
    for index, series in df.iterrows():
        # if index == 0:
        #     print(series._index)
        ar = series.array
        if not rows:
            li.append(ar.to_numpy().tolist())
        else:
            li.append([ar[i] for i in rows])
    return li
items = excel2list(r"XQ1.xlsx")
import pandas as np

np.DataFrame(items)
D:\Users\Administrator\anaconda3\envs\python3.6\lib\importlib\_bootstrap.py:219: RuntimeWarning: numpy.ufunc size changed, may indicate binary incompatibility. Expected 192 from C header, got 216 from PyObject
  return f(*args, **kwds)
Unnamed: 0                                                       3
stock_code                                                SZ000651
author                                              格力电器(SZ000651)
author_id                                                       -1
datetime                                             1637388604000
is_column                                                        0
doc_title                       [招商证券:买入]拟控股盾安环境 增强产业链实力 新能源布局更进一步
retweet_count                                                    5
reply_count                                                      9
like_count                                                      46
fans                                                            -1
doc              事件描述:格力电器11 月16 日晚公告,公司拟受让盾安精工所持盾安环境2.70 亿股股份,...
crawl_time                                           1644914302970
doc_url                    https://xueqiu.com/S/SZ000651/203674545
user_url                                                       NaN
Name: 3, dtype: object
Out[1]:
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14
0 0 SZ000651 huanji 7424865298 1641095114000 0 NaN 0 55 10 19 <a href="http://xueqiu.com/S/SZ000651" target=... 1644914302970 https://xueqiu.com/7424865298/207728340 https://xueqiu.com/u/7424865298
1 1 SZ000651 佛系小资 1566609429 1640952292000 0 NaN 1 8 23 21517 <h4>品质护航,驾“浴”清凉,格力推出新一代顶置式驻车空调</h4><p><a href=... 1644914302970 https://xueqiu.com/1566609429/207634993 https://xueqiu.com/u/1566609429
2 2 SZ000651 泉州李国彬 8995599040 1639200847000 0 第一次出现“格力光储空系统技术推广活动”报道 2 9 49 7036 <p>12月9日-10日,</p><p>2021格力中央空调</p><p>全国巡回<stro... 1644914302970 https://xueqiu.com/8995599040/205664621 https://xueqiu.com/u/8995599040
3 3 SZ000651 格力电器(SZ000651) -1 1637388604000 0 [招商证券:买入]拟控股盾安环境 增强产业链实力 新能源布局更进一步 5 9 46 -1 事件描述:格力电器11 月16 日晚公告,公司拟受让盾安精工所持盾安环境2.70 亿股股份,... 1644914302970 https://xueqiu.com/S/SZ000651/203674545 NaN
4 4 SZ000651 GM笨小孩 9770976443 1637160804000 0 NaN 3 42 39 2046 这个妹子是谁呢?年轻时候的董明珠。为啥咱觉得挺漂亮呢?胜过那些网红脸,动过刀子的脸吧?十倍还... 1644914302970 https://xueqiu.com/9770976443/203416713 https://xueqiu.com/u/9770976443
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
26741 26741 SZ301168 梧桐树新股 6160714490 1639314657000 0 全面注册制&下周次新股出现重磅级信号! 1 3 7 1911 <p>周末最大新闻是:12月8日-10日,中央经J工作会议透露2022年资本市场改革风向,首... 1644914306981 https://xueqiu.com/6160714490/205719765 https://xueqiu.com/u/6160714490
26742 26742 SZ301168 无风说次新股 1071411538 1638173436000 1 通灵股份301168上市估值分析和申购建议 1 0 6 9521 <p><b>重要警告:本号本人不荐股,文章内容属于个人操作心得的分享,仅供参考和交流学习,所... 1644914306981 https://xueqiu.com/1071411538/204457768 https://xueqiu.com/u/1071411538
26743 26743 SZ301168 无风说次新股 1071411538 1639128223000 1 12月10日次新股复盘,新股通灵股份偷袭临停成功! 0 2 6 9521 <p><b>重要警告:本号本人不荐股,文章内容属于个人操作心得的分享,仅供参考和交流学习,文... 1644914306981 https://xueqiu.com/1071411538/205615804 https://xueqiu.com/u/1071411538
26744 26744 SZ301168 唯红茶 1920422334 1639552100000 0 NaN 0 4 5 6024 12月15日收盘总结:<br/>1、集合竞价:核了春兰股份,垃圾。竞价嘉和美康,血套,垃圾。... 1644914306981 https://xueqiu.com/1920422334/206033355 https://xueqiu.com/u/1920422334
26745 26745 SZ301168 刘轶南_教师_珠海 8850764119 1638153156000 0 NaN 0 0 5 15663 <p>泽宇智能(301179),江苏泽宇智能电力股份有限公司,电力系统集成配套建设,智能电网... 1644914306981 https://xueqiu.com/8850764119/204411506 https://xueqiu.com/u/8850764119

26746 rows × 15 columns

In [1]:
import requests
import openpyxl
import util
util.debug = False
items = util.excel2list(r"C:\Users\Administrator\Desktop\雪球内容爬虫需求2022.1.10.xlsx")
import time
url_tem = 'http://xueqiu.com/query/v1/symbol/excellent/status.json?count=10&symbol=%s&hl=0&source=all&sort=1&page=%s&q=&type=11'
headers = {
    "Host": "xueqiu.com",
    "User-Agent": "Xueqiu Android 13.9",
    "Accept": "*/*",
    "Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
    "Accept-Encoding": "gzip, deflate, br",
    "Cookie": "xq_a_token=512da9d222c381fa39dc775676c85ba2aa1ae80b;",
}

def breakLoop(resp, ind):
    """判断是否跳出循环

    Args:
        resp ([响应数据]): [description]
        ind ([int]): [已采集数量]

    Returns:
        [type]: [description]
    """
    
    r = json.loads(resp)
    # return ind >= r["maxPage"] or r["list"][0]["created_at"] < 1609430400000 
    return ind >= r["maxPage"]

# 存储下载失败的股票
erros = []
2022-02-15 15-35-28 [INFO] [util-MainThread]  89 - Index(['统计时间', '股吧内码', '基金代码', '实时排名', '股票中文名'], dtype='object')
In [3]:
import random
import json
import time
import os

for it in items:
    try:
        ind = 1
        while True:
            
            u = url_tem % (it[2], ind)
            resp = requests.get(url=u, headers=headers).text  
            time.sleep(random.randint(15, 25) / 100) 
            d = f"xq/{it[2]}_{it[-1].replace('*', '-')}"
            
            r = json.loads(resp)
            if r["maxPage"] == 0:
                u = url_tem % (it[2], 0)
                resp = requests.get(url=u, headers=headers).text 
                # print("only one data: ", resp[:200])
                if not os.path.exists(d):
                    os.makedirs(d)
                with open(f"{d}/{ind}.json", "w", encoding="utf-8") as fw:
                    fw.write(resp)
                break
                
            print(it[2], ind, end="\r")
            
            if not os.path.exists(d):
                os.makedirs(d)

              
            with open(f"{d}/{ind}.json", "w", encoding="utf-8") as fw:
                fw.write(resp)
            
            if breakLoop(resp, ind):
                break
            ind += 1
    except Exception as e:
        time.sleep(1.25)
        print(f"{it[2]}_{it[-1]} ERRO", e, file=open("erro.log", "a", encoding="utf-8"))
        erros.append(it)
SZ002709 73

后400条¶

In [3]:
import shutil 
import os

for it in items[100:]:
    d = f"xq/{it[2]}_{it[-1].replace('*', '-')}"
    if os.path.exists(d):
        print("remove", d)
        shutil.rmtree(d)
remove xq/SH600444_国机通用
remove xq/SZ300624_万兴科技
remove xq/SZ300398_飞凯材料
remove xq/SZ000002_万科A
remove xq/SH603466_风语筑
remove xq/SZ000858_五粮液
remove xq/SH603368_柳药股份
remove xq/SH600976_健民集团
remove xq/SH600276_恒瑞医药
remove xq/SH600277_亿利洁能
remove xq/SZ002265_西仪股份
remove xq/SZ002385_大北农
remove xq/SH600188_兖矿能源
remove xq/SZ002240_盛新锂能
remove xq/SZ002694_顾地科技
remove xq/SZ002746_仙坛股份
remove xq/SZ300603_立昂技术
remove xq/SZ001296_长江材料
remove xq/SZ300199_翰宇药业
remove xq/SH600010_包钢股份
remove xq/SH601318_中国平安
In [5]:
import random
import json
import time
import os

for it in items[100:]:
    try:
        ind = 1
        while True:
            
            u = url_tem % (it[2], ind)
            resp = requests.get(url=u, headers=headers).text  
            time.sleep(random.randint(15, 25) / 100) 
            d = f"xq/{it[2]}_{it[-1].replace('*', '-')}"
            
            r = json.loads(resp)
            if r["maxPage"] == 0:
                u = url_tem % (it[2], 0)
                resp = requests.get(url=u, headers=headers).text 
                print("only one data: ", resp[:200])
                if not os.path.exists(d):
                    os.makedirs(d)
                with open(f"{d}/{ind}.json", "w", encoding="utf-8") as fw:
                    fw.write(resp)
                break
                
            print(it[2], ind, end="\r")
            
            
            
            if not os.path.exists(d):
                os.makedirs(d)

              
            with open(f"{d}/{ind}.json", "w", encoding="utf-8") as fw:
                fw.write(resp)
            
            if breakLoop(resp, ind):
                break
            ind += 1
    except Exception as e:
        time.sleep(1.25)
        print(f"{it[2]}_{it[-1]} ERRO", e, file=open("erro.log", "a", encoding="utf-8"))
        erros.append(it)
only one data:  {"about":"","count":0,"key":"","list":[],"maxPage":0,"page":1,"q":"SZ300325","query_id":1493053356506312704,"recommend_cards":[]}
only one data:  {"about":"","count":0,"key":"","list":[],"maxPage":0,"page":1,"q":"SZ002347","query_id":1493054767872827392,"recommend_cards":[]}
only one data:  {"about":"","count":0,"key":"","list":[],"maxPage":0,"page":1,"q":"SZ300878","query_id":1493055390986948610,"recommend_cards":[]}
SZ002074 19

2. 处理一些出错的条目¶

In [3]:
import os
import json

erros.extend(
    [["", '', "SH600518","*ST康美"],
     ["", '', "SZ000980","*ST众泰"]
    ]
)

for it in erros:
    print(it)
    try:
        ind = 1
        while True:
            if os.path.exists(f"xq/{it[2]}_{it[-1].replace('*', '-')}/{ind}.json"):
                ind += 1
                continue
            
            u = url_tem % (it[2], ind)
            resp = requests.get(url=u, headers=headers).text 
            r = json.loads(resp)
            if r["maxPage"] == 0:
                u = url_tem % (it[2], 0)
                resp = requests.get(url=u, headers=headers).text 
                print(resp[:200])
                if not os.path.exists(f"xq/{it[2]}_{it[-1].replace('*', '-')}"):
                    os.makedirs(f"xq/{it[2]}_{it[-1].replace('*', '-')}")
                with open(f"xq/{it[2]}_{it[-1].replace('*', '-')}/{ind}.json", "w", encoding="utf-8") as fw:
                    fw.write(resp)
                break
                
            time.sleep(0.25)     
            print(it[2], ind, end="\r")
            
            
            if not os.path.exists(f"xq/{it[2]}_{it[-1].replace('*', '-')}"):
                os.makedirs(f"xq/{it[2]}_{it[-1].replace('*', '-')}")
            
            
            with open(f"xq/{it[2]}_{it[-1].replace('*', '-')}/{ind}.json", "w", encoding="utf-8") as fw:
                fw.write(resp)
            
            if breakLoop(resp, ind):
                break
            ind += 1
    except Exception as e:
        print(f"{it[2]}_{it[-1].replace('*', '-')} ERRO", e, )
['', '', 'SH600518', '*ST康美']
['', '', 'SZ000980', '*ST众泰']
['', '', 'SH600518', '*ST康美']
{"about":"SH600518","count":181,"key":"SH600518","list":[{"blocked":false,"blocking":false,"canEdit":true,"commentId":0,"controversial":false,"created_at":1637307251000,"description":"以前觉着当独立董事很容易,每年拿
['', '', 'SZ000980', '*ST众泰']
{"about":"SZ000980","count":34,"key":"SZ000980","list":[{"blocked":false,"blocking":false,"canEdit":true,"card":{"data":"{\"items\":[{\"id\":61916,\"tag\":\"#雪球星计划#\",\"content\":\"\",\"pic\":null,\"b

3. 整理数据¶

In [ ]:
import math
import threading

import pandas as pd
import os
from collections import namedtuple as ntuple
import time
import json
import logging

lock = threading.Lock()
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s [%(levelname)s] [%(module)s-%(threadName)s]  %(lineno)d - %(message)s',
                    datefmt="%Y-%m-%d %H-%M-%S")

logger = logging.getLogger()

# 看重了它的提示功能
e = ntuple("DTO", ['stock_code', 'author', 'author_id', 'datetime', 'is_column', 'doc_title',
                   'retweet_count', 'reply_count', 'like_count', 'fans', 'doc', 'crawl_time', 'doc_url', 'user_url'])


class Entity(dict):
    '''实体定义'''

    def __init__(self, *args):
        it = iter(*(args))
        for i in e._fields:
            self[i] = str(next(it))


# excel = "XueQiu.xlsx"
# writer = pd.ExcelWriter(excel)

emptys = []

def get_entitys(cmt_file: str) -> list:
    logger.info("%s接收到:%s%s", '#'*20, cmt_file, '#'*20)
    info = json.load(open(cmt_file, 'r', encoding="utf-8"))
    stock = info['key']
    cmts = info['list']
    if len(cmts) < 1:
        logger.error("【%s】没东西啊", cmt_file)
        emptys.append(cmt_file)
    res = []
    for cmt in cmts:
        res.append(
            Entity(e(
                stock,
                cmt["user"]["screen_name"],
                cmt["user_id"],
                cmt["created_at"],
                1 if cmt["mark"] == 5 else 0,
                cmt["title"],
                cmt["retweet_count"],
                cmt["reply_count"],
                cmt["like_count"],
                cmt["user"]["followers_count"] if cmt["user_id"] != -1 else -1,
                cmt["text"],
                int(time.time() * 1000),
                'https://xueqiu.com' + cmt["target"],
                ('https://xueqiu.com/u/' + str(cmt["user_id"]) if cmt["user_id"] != -1 else "")
            ))
        )
    
    return res
 
def write_to_excel(cmt_file: str, writer: pd.ExcelWriter=None):
    global start_row, result, size
    try:
        res = get_entitys(cmt_file)
        result.extend(res)
        
        if len(res) > size:
            df1 = pd.DataFrame(result)
            excel = f"XQ{start_row}.xlsx"
            writer = pd.ExcelWriter(excel, mode="w", engine='xlsxwriter')
            df1.to_excel(writer, 'dataset', startcol=0,
                        startrow=0, encoding="utf-8")
            writer.save()
            result = []
            print("write to", excel)

        start_row += 1
    except Exception as e:
        logger.error("deal[%s] Failed, %s", cmt_file, e)
    
size = 25000
root = r'xq'
dirs = os.listdir(root)

logger.info("要开始了")

start_row = 0
tasks = []
result = []

s = time.time()
for d in dirs:
    for f in os.listdir(os.path.join(root,d)):
        cmt_file = f'xq/{d}/{f}'
        write_to_excel(cmt_file)

logger.info("处理耗时: %s[s]", time.time() - s)

# import util
# util.dump(emptys, "ry.json")


# s = time.time()

# for i in range(math.ceil(len(result) / size)):
#     df1 = pd.DataFrame(result[i * size: min((i + 1) * size, len(result))])
#     excel = f"XQ{i}.xlsx"
#     writer = pd.ExcelWriter(excel, mode="w", engine='xlsxwriter')
#     df1.to_excel(writer, 'dataset', startcol=0,
#                 startrow=0, encoding="utf-8")
#     writer.save()
# logger.info("写入耗时: %s[s]", time.time() - s)
In [ ]:
print("SUCCESS")
In [ ]:
 
In [ ]:
 
In [1]:
import json

xq = json.load(open(r"C:\Users\Administrator\Desktop\xq.json", "r", encoding="utf-8"))
In [10]:
n = 0
for it in xq["list"]:
   if it["mark"] == 5:
       n += 1
       print(f"专栏{n}:", it["description"][:300])
       print()
专栏1: 21年度老柏的证券投资回撤幅度创个人投资史新高,负复利的威力对长期投资复利伤害较大,从记录投资以来年化复利降为4%,可见一般。 对于这一短期结果,从资金代入感的角度,难言满意。然而,从持有股份数量变动的角度,我是满意的,这并不是聊以自慰。 我提倡长期持续净买入,把收集优质股权作为...

专栏2: A股经过了2019年(收益68%)和2020年(收益60%)的吃大肉行情,我年初预测2021是投资小年。可是当真的走过了2021年,还是要感叹太不容易了。2021年是有人喝酒吃肉,有人吃糠咽菜的一年。如果是大盘单边下跌,大家也没什么意见。问题是有人赚的盆满钵满,有人亏的丁零当啷。所以2021年是股市投资见...

专栏3: <a href="https://www.ximalaya.com/shangye/18599130/487919373" title="https://www.ximalaya.com/shangye/18599130/487919373" target="_blank">本文语音版</a> 早晨起来,天气格外的好,打开窗户,放一首淡淡的音乐,认真的和2021年做个告别。 去年元旦的情景还历历在目,转眼2021年就要过去了。 时间可真快,父母又老了一岁,自己也成长了一年。长大后唯一的愿望就是多赚点钱,多带他们...

专栏4: 最近网络上流传一段价值投资大v唐朝关于三傻的随想,不经意已经看到了3,4次别人的分享,近期有朋友私信我分享了这段随想。 (注,三傻指的是股市中近几年估值低但不涨的一些板块,可以理解为银行地产保险,狭义上可以理解为平安,万科,格力) 原文:(唐门小卒为唐朝的某粉丝) 唐门小卒: 持仓多...

专栏5: 昨天,中国平安公布了2021年全年的保费收入,评论区一篇沸腾。 主流观点是:总保费下降4.6%,新业务下降4.8%,代理人相比2020年底下降了30%。在代理人大幅下降的情况下,总保费和新业务价值只有小幅下滑,且2021年12月份单月保费实现了1.6%的正增长,中国平安的春天终于来了。 中国平安的拐点,真...

专栏6: A股散户数最多的10只股票,看看有你的股票吗? 第一名:京东方 A 156.95万 1 第二名:中国平安 130.36万 2 第三名:三一重工 115.03万 3 第四名:中国电信 109.63万 4 第五名:三峡能源 104.30万 5 第六名:兰州银行 98.59万 6 第七名:包钢股份 94.78万 7 第八名:格力电器 88.80万 8 第九名:TC...

滑动验证¶

In [15]:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from PIL import Image
from six import BytesIO
import time
from selenium.webdriver import ActionChains
In [16]:
driver = webdriver.Chrome("d:/chromedriver.exe")
In [42]:
driver.get('http://172.31.227.161:9527/#/login?redirect=%2Fcrawler-configuration%2Fjob-config%2Fpage%2Findex')
In [46]:
captcha = driver.find_element_by_css_selector('div[class="em_widget em_show"')
In [40]:
verify_bt = driver.find_element_by_css_selector(".em_init")
In [26]:
from selenium.webdriver.common import touch_actions, action_chains
from selenium.webdriver.remote.command import Command

touch = touch_actions.TouchActions(driver=driver)
In [50]:
ActionChains(driver=driver).move_to_element(captcha) \
    .move_by_offset(0, -40) \
    .click() \
    .perform() \
In [41]:
driver.execute(Command.CLICK_ELEMENT, {'id': verify_bt.id,
                                       'button': 0,
                                       'xoffset': 50,
                                       'yoffset': 50})
Out[41]:
{'value': None}
In [2]:
driver = webdriver.Chrome()
driver.get('https://m.ctrip.com/html5/flight/swift/domestic/SHA/CAN/2022-02-18')
In [5]:
from bs4 import BeautifulSoup as bs
source = driver.page_source
In [6]:
soup = bs(source)
In [12]:
div = soup.find('div', {'class': 'cpt-choose-box cpt-choose-box-pop'})
In [16]:
b = div.find('img', {'class': "cpt-big-img"})
s = div.find('img', {'class': "cpt-small-img"})
In [ ]:
import cv2
In [33]:
import base64
from PIL import Image

with open("t", 'wb') as fr:
    fr.write(base64.b64decode(b.attrs['src'][22:]))
    
Image.open('t')
Out[33]:
No description has been provided for this image
In [ ]:
def get_url(url,user,password):
    browser = webdriver.Chrome()
    browser.get(url)
    browser.maximize_window()
    wait = WebDriverWait(browser,10)
    wait.until(EC.presence_of_element_located((By.CLASS_NAME, 'geetest_radar_btn')))
    user_input = browser.find_element_by_id('username')
    pwd_input = browser.find_element_by_id('password')
    btn = browser.find_element_by_css_selector('.geetest_radar_btn')
    user_input.send_keys(user)
    pwd_input.send_keys(password)
    btn.click()
    time.sleep(0.5)
    return browser
In [ ]:
def get_position(img_label):
    location = img_label.location
    size = img_label.size
    top, bottom, left, right = location['y'], location['y'] + size['height'], location['x'], location['x'] + size[
        'width']
    return (left, top, right, bottom)
In [ ]:
def get_screenshot(browser):
    screenshot = browser.get_screenshot_as_png()
    f = BytesIO()
    f.write(screenshot)
    return Image.open(f)
In [ ]:
def get_position_scale(browser,screen_shot):
    height = browser.execute_script('return document.documentElement.clientHeight')
    width = browser.execute_script('return document.documentElement.clientWidth')
    x_scale = screen_shot.size[0] / (width+10)
    y_scale = screen_shot.size[1] / (height)
    return (x_scale,y_scale)
In [ ]:
def get_slideimg_screenshot(screenshot,position,scale):
    x_scale,y_scale = scale
    position = [position[0] * x_scale, position[1] * y_scale, position[2] * x_scale, position[3] * y_scale]
    return screenshot.crop(position)
In [ ]:
def compare_pixel(img1,img2,x,y):
    pixel1 = img1.load()[x,y]
    pixel2 = img2.load()[x,y]
    threshold = 50
    if abs(pixel1[0]-pixel2[0])<=threshold:
        if abs(pixel1[1]-pixel2[1])<=threshold:
            if abs(pixel1[2]-pixel2[2])<=threshold:
                return True
    return False


def compare(full_img,slice_img):
    left = 0
    for i in range(full_img.size[0]):
        for j in range(full_img.size[1]):
            if not compare_pixel(full_img,slice_img,i,j):
                return i
    return left
In [6]:
distance = 100
# 移动轨迹
track = []
# 当前位移
current = 0
# 减速阈值
mid = distance * 4 / 5
# 计算间隔
t = 0.2
# 初速度
v = 0

while current < distance:
    if current < mid:
        # 加速度为正 2
        a = 4
    else:
        # 加速度为负 3
        a = -3
    # 初速度 v0
    v0 = v
    # 当前速度 v = v0 + at
    v = v0 + a * t
    # 移动距离 x = v0t + 1/2 * a * t^2
    move = v0 * t + 1 / 2 * a * t * t
    # 当前位移
    current += move
    # 加入轨迹
    track.append(round(current))
In [8]:
track
Out[8]:
[0,
 0,
 1,
 1,
 2,
 3,
 4,
 5,
 6,
 8,
 10,
 12,
 14,
 16,
 18,
 20,
 23,
 26,
 29,
 32,
 35,
 39,
 42,
 46,
 50,
 54,
 58,
 63,
 67,
 72,
 77,
 82,
 87,
 92,
 97,
 101]
In [1]:
def get_track(distance):
    """
    根据偏移量获取移动轨迹
    :param distance: 偏移量
    :return: 移动轨迹
    """
    # 移动轨迹
    track = []
    # 当前位移
    current = 0
    # 减速阈值
    mid = distance * 4 / 5
    # 计算间隔
    t = 0.2
    # 初速度
    v = 0

    while current < distance:
        if current < mid:
            # 加速度为正 2
            a = 4
        else:
            # 加速度为负 3
            a = -3
        # 初速度 v0
        v0 = v
        # 当前速度 v = v0 + at
        v = v0 + a * t
        # 移动距离 x = v0t + 1/2 * a * t^2
        move = v0 * t + 1 / 2 * a * t * t
        # 当前位移
        current += move
        # 加入轨迹
        # track.append(round(move))
        track.append(round(current))
    return track
In [ ]:
def move_to_gap(browser,slider, tracks):
    """
    拖动滑块到缺口处
    :param slider: 滑块
    :param tracks: 轨迹
    :return:
    """
    ActionChains(browser).click_and_hold(slider).perform()
    for x in tracks:
        ActionChains(browser).move_by_offset(xoffset=x, yoffset=0).perform()
    time.sleep(0.5)
    ActionChains(browser).release().perform()
In [ ]:
if __name__ == '__main__':
    browser = get_url('https://account.zbj.com/login','11111111111','********')  #此函数的定义在第3点
    time.sleep(1)
    slice_img_label = browser.find_element_by_css_selector('div.geetest_slicebg') #找到滑动图片标签
    browser.execute_script("document.getElementsByClassName('geetest_canvas_slice')[0].style['display'] = 'none'") #将小块隐藏
    full_img_label = browser.find_element_by_css_selector('canvas.geetest_canvas_fullbg') #原始图片的标签
    position = get_position(slice_img_label) #获取滑动验证图片的位置,此函数的定义在第4点
    screenshot = get_screenshot(browser) # 截取整个浏览器图片,此函数的定义在第5点
    position_scale = get_position_scale(browser,screenshot) #获取截取图片宽高和浏览器宽高的比例,此函数的定义在第6点
    slice_img = get_slideimg_screenshot(screenshot,position,position_scale) #截取有缺口的滑动验证图片,此函数的定义在第7点
   

    browser.execute_script("document.getElementsByClassName('geetest_canvas_fullbg')[0].style['display'] = 'block'") #在浏览器中显示原图
    screenshot = get_screenshot(browser) #获取整个浏览器图片
    full_img = get_slideimg_screenshot(screenshot,position,position_scale) # 截取滑动验证原图
    browser.execute_script("document.getElementsByClassName('geetest_canvas_slice')[0].style['display'] = 'block'")  #将小块重新显示
    left = compare(full_img,slice_img) #将原图与有缺口图片进行比对,获得缺口的最左端的位置,此函数定义在第8点
    left = left / position_scale[0] #将该位置还原为浏览器中的位置

    slide_btn = browser.find_element_by_css_selector('.geetest_slider_button') #获取滑动按钮
    track = get_track(left) #获取滑动的轨迹,此函数定义在第9点
    move_to_gap(browser,slide_btn,track) #进行滑动,此函数定义在第10点
    success = browser.find_element_by_css_selector('.geetest_success_radar_tip') #获取显示结果的标签
    time.sleep(2)
    if success.text == "验证成功":
        login_btn = browser.find_element_by_css_selector('button.j-login-btn') #如果验证成功,则点击登录按钮
        login_btn.click()
    else:
        print(success.text)
        print('失败')

上下文管理工具¶

In [1]:
import contextlib
import logging as log
from urllib.request import urlopen

log.basicConfig(level=log.DEBUG,
                datefmt="%Y-%m-%d %H:%M:%S",
                format="[%(asctime)s] [%(levelname)s] %(lineno)d: %(message)s"
                )

log.info("##########################################")
with contextlib.closing(urlopen('https://www.python.org')) as page:
    ind = 0
    for line in page:
        if ind > 3:
            break
        log.info(line)
        ind += 1

log.info("##########################################")


class Test(object):
    def __init__(self, name, *var, **kv):
        self.name = name


@contextlib.contextmanager
def create_test(*var, **kv):
    test = Test(var, kv)
    log.info("进入 Test[%s]..." % test.name)
    try:
        yield test
    except Exception as s:
        log.error(s)
        ...
    log.info("退出 Test[%s]..." % test.name)


with create_test("李元芳") as test:
    log.info("test Test", 0/0)

log.info("##########################################")


class TestAutoClose(object):
    def __init__(self, name, *var, **kv):
        self.name = name

    def __enter__(self):
        log.info("进入 TestAutoClose[%s]..." % self.name)

    def __exit__(self, exc_type, exc_value, traceback):
        if traceback:
            for i in dir(traceback):
                log.warning(i, eval('traceback.' + i))
        log.info("退出 TestAutoClose[%s]..." % self.name)


with TestAutoClose("狄仁杰") as test:
    log.info("test TestAutoClose")
[2022-01-13 18:39:48] [INFO] 10: ##########################################
[2022-01-13 18:39:48] [INFO] 16: b'<!doctype html>\n'
[2022-01-13 18:39:48] [INFO] 16: b'<!--[if lt IE 7]>   <html class="no-js ie6 lt-ie7 lt-ie8 lt-ie9">   <![endif]-->\n'
[2022-01-13 18:39:48] [INFO] 16: b'<!--[if IE 7]>      <html class="no-js ie7 lt-ie8 lt-ie9">          <![endif]-->\n'
[2022-01-13 18:39:48] [INFO] 16: b'<!--[if IE 8]>      <html class="no-js ie8 lt-ie9">                 <![endif]-->\n'
[2022-01-13 18:39:48] [INFO] 19: ##########################################
[2022-01-13 18:39:48] [INFO] 27: 进入 Test[李元芳]...
[2022-01-13 18:39:48] [ERROR] 31: division by zero
[2022-01-13 18:39:48] [INFO] 33: 退出 Test[李元芳]...
[2022-01-13 18:39:48] [INFO] 38: ##########################################
[2022-01-13 18:39:48] [INFO] 44: 进入 TestAutoClose[狄仁杰]...
[2022-01-13 18:39:48] [INFO] 53: test TestAutoClose
[2022-01-13 18:39:48] [INFO] 50: 退出 TestAutoClose[狄仁杰]...

操作 sqlite 数据库¶

In [1]:
import sqlite3

conn = sqlite3.connect("test.db")
cursor = conn.cursor()

cursor.execute("""
create table user(
    id varchar(20) primary key,
    name varchar(20)
)
""")

# 插入数据
cursor.execute("""
insert into user (id, name)
    values ('1', '狄仁杰'), ('2', '李元芳'), ('3', '曾泰'), ('4', '张环')
""")
conn.commit()


# 查询数据
cursor.execute("""
select * from user;
""")
print(cursor.fetchall())
cursor.close()
conn.close()
[('1', '狄仁杰'), ('2', '李元芳'), ('3', '曾泰'), ('4', '张环')]
In [2]:
import struct
In [11]:
img = r"D:\DongDongsFiles\image\2022-01\1e8ad5e4-8d95-461c-a15e-4d77da9ad13f.jpg"
with open(img, "rb") as fr:
    res = struct.unpack(">" + "c" * 8, fr.read(8))
In [12]:
print(res)
(b'\xff', b'\xd8', b'\xff', b'\xe0', b'\x00', b'\x10', b'J', b'F')
In [15]:
for c in res:
    print(c.decode("unicode-escape"))
ÿ
Ø
ÿ
à


J
F