提交 222110f7 作者: 薛凌堃

Merge remote-tracking branch 'origin/master'

...@@ -11,7 +11,6 @@ from bs4 import BeautifulSoup ...@@ -11,7 +11,6 @@ from bs4 import BeautifulSoup
from requests.adapters import HTTPAdapter from requests.adapters import HTTPAdapter
from requests.packages import urllib3 from requests.packages import urllib3
from retry import retry from retry import retry
from base import BaseCore from base import BaseCore
urllib3.disable_warnings() urllib3.disable_warnings()
...@@ -20,6 +19,7 @@ log = baseCore.getLogger() ...@@ -20,6 +19,7 @@ log = baseCore.getLogger()
cnx = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='caiji', cnx = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='caiji',
charset='utf8mb4') charset='utf8mb4')
cursor = cnx.cursor() cursor = cnx.cursor()
r = baseCore.r
URL = 'https://www.nasdaq.com/' URL = 'https://www.nasdaq.com/'
session = requests.session() session = requests.session()
session.mount('https://', HTTPAdapter(pool_connections=20, pool_maxsize=100)) session.mount('https://', HTTPAdapter(pool_connections=20, pool_maxsize=100))
...@@ -65,6 +65,7 @@ def add_date(com_code, date_list): ...@@ -65,6 +65,7 @@ def add_date(com_code, date_list):
# 数据发送端口 # 数据发送端口
def sendData(start_time, social_code, gpdm, dic_info): def sendData(start_time, social_code, gpdm, dic_info):
data = json.dumps(dic_info) data = json.dumps(dic_info)
# print(data)
url_baocun = 'http://114.115.236.206:8088/sync/finance/nsdk' url_baocun = 'http://114.115.236.206:8088/sync/finance/nsdk'
for nnn in range(0, 3): for nnn in range(0, 3):
try: try:
...@@ -86,7 +87,7 @@ def getUnit(gpdm): ...@@ -86,7 +87,7 @@ def getUnit(gpdm):
req.encoding = req.apparent_encoding req.encoding = req.apparent_encoding
soup = BeautifulSoup(req.text, 'lxml') soup = BeautifulSoup(req.text, 'lxml')
unit = soup.find('div', class_='financials__note').text.split(' ')[1].lstrip().strip() unit = soup.find('div', class_='financials__note').text.split(' ')[1].lstrip().strip()
unit = f'(千){unit}' unit = f'{unit}(千)'
req.close() req.close()
return unit return unit
...@@ -104,9 +105,11 @@ def getlist(table, tableName): ...@@ -104,9 +105,11 @@ def getlist(table, tableName):
value = re.sub(r"[^\d+-]", "", value) value = re.sub(r"[^\d+-]", "", value)
else: else:
value = '-' value = '-'
date = years[f'value{i}'].split('/')[2] + '-' + years[f'value{i}'].split('/')[0] + '-' + \ date_ = years[f'value{i}']
years[f'value{i}'].split('/')[1] if date_ :
list.append({f'{tableName}': name, 'value': value, 'date': date, }) date = date_.split('/')[2] + '-' + date_.split('/')[0] + '-' + \
date_.split('/')[1]
list.append({f'{tableName}': name, 'value': value, 'date': date, })
return list return list
...@@ -162,6 +165,7 @@ def getYear(start_time, session, social_code, gpdm): ...@@ -162,6 +165,7 @@ def getYear(start_time, session, social_code, gpdm):
# 判断该报告期是否已采过 # 判断该报告期是否已采过
panduan = check_date(social_code, date + '-year') panduan = check_date(social_code, date + '-year')
if panduan: if panduan:
log.info(f'{social_code}=={gpdm}=={date}年度数据采集过')
continue continue
xjll_list_f = reviseData([item for item in final_list if 'xjll' in item], unit, 'xjll') xjll_list_f = reviseData([item for item in final_list if 'xjll' in item], unit, 'xjll')
zcfz_list_f = reviseData([item for item in final_list if 'zcfz' in item], unit, 'zcfz') zcfz_list_f = reviseData([item for item in final_list if 'zcfz' in item], unit, 'zcfz')
...@@ -177,6 +181,7 @@ def getYear(start_time, session, social_code, gpdm): ...@@ -177,6 +181,7 @@ def getYear(start_time, session, social_code, gpdm):
"ynFirst": ynFirst, "ynFirst": ynFirst,
} }
sendData(start_time, social_code, gpdm, dic_info) sendData(start_time, social_code, gpdm, dic_info)
log.info(f'{social_code}=={gpdm}=={date}年度财务数据采集成功')
date_list.append(date + '-year') date_list.append(date + '-year')
else: else:
log.error(f'找不到{social_code}=={gpdm}年度财务数据') log.error(f'找不到{social_code}=={gpdm}年度财务数据')
...@@ -184,6 +189,7 @@ def getYear(start_time, session, social_code, gpdm): ...@@ -184,6 +189,7 @@ def getYear(start_time, session, social_code, gpdm):
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===无年度财务数据') baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===无年度财务数据')
except: except:
log.error(f'{social_code}===年度财务数据访问失败')
state = 0 state = 0
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===年度财务数据访问失败') baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===年度财务数据访问失败')
...@@ -217,6 +223,7 @@ def getQuarter(start_time, session, social_code, gpdm): ...@@ -217,6 +223,7 @@ def getQuarter(start_time, session, social_code, gpdm):
# 判断该报告期是否已采过 # 判断该报告期是否已采过
panduan = check_date(social_code, date + '-quarter') panduan = check_date(social_code, date + '-quarter')
if panduan: if panduan:
log.info(f'{social_code}=={gpdm}=={date}季度数据采集过')
continue continue
xjll_list_f = reviseData([item for item in final_list if 'xjll' in item], unit, 'xjll') xjll_list_f = reviseData([item for item in final_list if 'xjll' in item], unit, 'xjll')
zcfz_list_f = reviseData([item for item in final_list if 'zcfz' in item], unit, 'zcfz') zcfz_list_f = reviseData([item for item in final_list if 'zcfz' in item], unit, 'zcfz')
...@@ -236,13 +243,15 @@ def getQuarter(start_time, session, social_code, gpdm): ...@@ -236,13 +243,15 @@ def getQuarter(start_time, session, social_code, gpdm):
if panduan_flag: if panduan_flag:
dic_info['dateFlag'] = 'year' dic_info['dateFlag'] = 'year'
sendData(start_time, social_code, gpdm, dic_info) sendData(start_time, social_code, gpdm, dic_info)
log.info(f'{social_code}=={gpdm}=={date}季度财务数据采集成功')
date_list.append(date + '-quarter') date_list.append(date + '-quarter')
else: else:
log.error(f'{social_code}=={gpdm}无季度财务数据')
state = 0 state = 0
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===无季度财务数据') baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===无季度财务数据')
except: except:
log.error(f'{social_code}===季度财务数据访问失败')
state = 0 state = 0
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===季度财务数据访问失败') baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===季度财务数据访问失败')
...@@ -250,36 +259,52 @@ def getQuarter(start_time, session, social_code, gpdm): ...@@ -250,36 +259,52 @@ def getQuarter(start_time, session, social_code, gpdm):
return date_list return date_list
def FinanceFromNasdaq():
sql = "select xydm from mgzqyjwyh_list where state=2 and exchange='Nasdaq';"
cursor.execute(sql)
finance = cursor.fetchall()
finance_list = [item[0] for item in finance]
for item in finance_list:
r.rpush('FinanceFromNasdaq:nasdaqfinance_socialCode', item)
print('redis放入成功')
def getInfomation(social_code):
sql = f"select * from mgzqyjwyh_list where state=2 and xydm='{social_code}';"
cursor.execute(sql)
data = cursor.fetchone()
return data
def doJob(): def doJob():
# while True:
# social_code = baseCore.redicPullData('')
# datas_enterprise = baseCore.getInfomation(social_code)
session.get(URL, headers=headers) session.get(URL, headers=headers)
# sql = "select * from mgzqyjwyh_list where state=2 and exchange='Nasdaq';" while True:
# cursor.execute(sql) social_code = baseCore.redicPullData('FinanceFromNasdaq:nasdaqfinance_socialCode')
# datas_enterprise = cursor.fetchall() if not social_code or social_code == None:
# for data_enterprise in datas_enterprise: log.info('============已没有数据============等待===============')
start_time = time.time() time.sleep(600)
# gpdm = data_enterprise[3] continue
# social_code = data_enterprise[6] data_enterprise = getInfomation(social_code)
social_code = 'ZD0CN0012309000172' start_time = time.time()
gpdm = 'NTES' gpdm = data_enterprise[3]
# 采集年度数据 social_code = data_enterprise[6]
date_list_year = getYear(start_time, session, social_code, gpdm) # print(gpdm,social_code)
# 保存年度数据到redis # 采集年度数据
add_date(social_code, date_list_year) date_list_year = getYear(start_time, session, social_code, gpdm)
# 采集季度数据 # 保存年度数据到redis
date_list_quarter = getQuarter(start_time, session, social_code, gpdm) add_date(social_code, date_list_year)
# 保存季度数据到redis # 采集季度数据
add_date(social_code, date_list_quarter) date_list_quarter = getQuarter(start_time, session, social_code, gpdm)
timeCost = baseCore.getTimeCost(start_time, time.time()) # 保存季度数据到redis
state = 1 add_date(social_code, date_list_quarter)
baseCore.recordLog(social_code, taskType, state, timeCost, '', '') timeCost = baseCore.getTimeCost(start_time, time.time())
log.info(f'{social_code}=={gpdm}==耗时{timeCost}') state = 1
# break baseCore.recordLog(social_code, taskType, state, timeCost, '', '')
cursor.close() log.info(f'{social_code}=={gpdm}==耗时{timeCost}')
cnx.close()
if __name__ == '__main__': if __name__ == '__main__':
# 财务数据采集
doJob() doJob()
# 企业股票代码放入redis
# FinanceFromNasdaq()
cursor.close()
cnx.close()
\ No newline at end of file
# created by virtualenv automatically
*
import gc
from flask import Flask, render_template, request, current_app
import configparser
from controller.Main import Main # 导入全部蓝图变量
import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime
from dao.Conn import ConnMySql
import sys
import io
# 清除登录状态
def clearLoginStateIn24H():
conn = ConnMySql()
conn.userClearLoginStateIn24H()
print("清除登录状态-" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
app = Flask(__name__) # 初始化Flask对象
app.register_blueprint(Main) # 将所有蓝图对象注册到app这个flask对象内
# 上传文件最大16M字节
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024
# App配置信息,键=段名+键名,如:db.port=3306
cfg = configparser.ConfigParser()
cfg.optionxform = str # 保持配置文件中键的大小写
cfg.read("static/conf/sys.ini", encoding='utf-8')
sections = cfg.sections()
for section in sections:
items = cfg.items(section)
for key, val in items:
app.config[section + '.' + key] = val
# 个别取值进行特殊处理
app.config['db.port'] = int(app.config['db.port'])
if app.config['sys.useProxy'] == "0":
app.config['sys.useProxy'] = False
else:
app.config['sys.useProxy'] = True
app.config['sys.proxyid'] = 0 #当前使用的代理id
app.config['sys.userid'] = 0 #当前使用的账号id
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
if __name__ == '__main__':
# webbrowser.open("0.0.0.0:5000")
app.run(host='0.0.0.0', port=5201, debug=True) # 启动入口
# 启动定时任务,定时清除异常登录状态,每半小时一次
# sched = BlockingScheduler()
# sched.add_job(clearLoginStateIn24H, 'interval', seconds=1800, id='task-clearLoginStateIn24H')
# sched.start()
import gc
from flask import Blueprint, request, current_app, make_response, send_file # 导入蓝图
import datetime
import re
import os
import logging
import sys
import io
import tempfile
import openpyxl
import string
import json
from selenium.webdriver.common.proxy import Proxy, ProxyType
from selenium import webdriver
from selenium.webdriver.common.by import By
from util import UtilDate
from service.Service02 import Service02
Main = Blueprint('Main', __name__) # 初始化一个蓝图,而不是Flask对象
# 接受请求,读取请求体中的JSON参数,根据参数进行抓取动作
# {"from":"1900-01-01","last":最近x天数, "orgs":["单位1全称","单位2全称","单位3全称",...]}
@Main.route('/Main/getData', methods=["POST"])
def getData():
print("POST /Main/getData")
paras = request.get_json(force=True)
dateFrom = paras['from']
lastDays = paras['last']
orgs = paras['orgs']
if dateFrom == "":
if lastDays == "":
lastDays = 0
else:
lastDays = -(int(lastDays) - 1)
dateFrom = UtilDate.dateAdd("", "d", lastDays)
service02 = Service02()
return service02.getData(dateFrom, orgs) #"https://wenshu.court.gov.cn/website/wenshu/181029CR4M5A62CH/index.html"
import json
import configparser
# import mysql.connector
from flask import current_app
import pymysql
from pymssql import Cursor
from vo.ProxyInfo import ProxyInfo
from vo.LoginInfo import LoginInfo
class Conn(object):
def __init__(self, conn):
self._conn: pymysql.Connect = conn
def close(self) -> None:
'''
关闭游标对象和连接对象
:param:NULL
:return:None
'''
if self._conn is not None:
self._conn.close()
def genDict(self,oCursor: Cursor) -> {}:
ret={}
try:
i = -1
for field in oCursor.description:
i = i + 1
ret[field[0]] = i
except Exception as err:
print('error:', err)
return ret
# 获取所有proxy
def proxyGetAll(self) -> Cursor:
'''
执行SQL语句
:param sqlstring: Sql语句
:return: 返回结果
'''
cursor: Cursor = None
try:
sql = "SELECT id, proxy FROM caiji.clb_proxy"
cursor = self._conn.cursor()
cursor.execute(sql)
except Exception as err:
print('error:', err)
return cursor
# 获取下一个proxy
def proxyGetNext(self, id: int)->ProxyInfo:
'''
执行SQL语句
:param sqlstring: Sql语句
:return: 返回结果
'''
ret: ProxyInfo = None
sql = "SELECT id, proxy FROM caiji.clb_proxy where id>" + str(id) + " order by id asc limit 1"
try:
cursor = self._conn.cursor()
cursor.execute(sql)
results = cursor.fetchall()
if cursor.rowcount>0:
ret = ProxyInfo()
fields = self.genDict(cursor)
for row in results:
ret.id = row[fields["id"]]
sProxy = row[fields["proxy"]]
proxyInfos = sProxy.split('-')
ret.ip = proxyInfos[0]
ret.port = proxyInfos[1]
ret.user_name = proxyInfos[2]
ret.user_passwd = proxyInfos[3]
cursor.close()
except Exception as err:
print('error:', err)
return ret
# 获取一个空闲账号
def userGetFree(self, userGroup: str, id: int) -> LoginInfo:
"""
执行SQL语句
:param userGroup:
:return: 返回结果
"""
ret: LoginInfo = None
sql = f"SELECT * FROM caiji.login_info where user_group='{userGroup}' and id > {id} and login_time is null order by id asc limit 1"
try:
cursor = self._conn.cursor()
cursor.execute(sql)
results = cursor.fetchall()
if cursor.rowcount>0:
ret = LoginInfo()
fields = self.genDict(cursor)
for row in results:
ret.id = row[fields["id"]]
ret.user_group = row[fields["user_group"]]
ret.user_name = row[fields["user_name"]]
ret.user_passwd = row[fields["user_passwd"]]
cursor.close()
except Exception as err:
print('error:', err)
return ret
# 清除24小时未主动退出(异常退出)的用户的登录状态,
def userClearLoginStateIn24H(self):
'''
执行SQL语句
:param sqlstring: Sql语句
:return: 返回结果
'''
sql = "update caiji.login_info set login_time=null where TIME_TO_SEC(TIMEDIFF(now(), login_time))>86400"
try:
cursor = self._conn.cursor()
cursor.execute(sql)
self._conn.commit()
except Exception as err:
print('error:', err)
# 主动退出登录状态,退出后下次可继续使用,可能需要满足一定的条件
def userSetLoginStateByID(self, id: int):
'''
执行SQL语句
:param id:
:return: 返回结果
'''
sql = "update caiji.login_info set login_time=now() where id=" + str(id)
try:
cursor = self._conn.cursor()
cursor.execute(sql)
self._conn.commit()
except Exception as err:
print('error:', err)
# 主动退出登录状态,退出后下次可继续使用,可能需要满足一定的条件
def userClearLoginStateByID(self, id: int):
'''
执行SQL语句
:param id:
:return: 返回结果
'''
sql = "update caiji.login_info set login_time=null where id=" + str(id)
try:
cursor = self._conn.cursor()
cursor.execute(sql)
self._conn.commit()
except Exception as err:
print('error:', err)
def doSelectByColumns(self, tbname: str, *columns: str) -> list:
'''
通过列名进行Select查询
:param tbname: 表名
:param columns: 需要查询的列名
:return: 查询结果
'''
col = str(columns).replace("[", "").replace("]", "").replace("'", "").replace("(", "").replace(")", "")
sqlstring = f"select {col} from {tbname} "
if len(columns) == 0: sqlstring = f"select *from {tbname}"
self._cursor.execute(sqlstring)
strjson = self.transToJson(self._cursor)
return strjson
def doSelectWhere(self, tbname: str, where: str) -> list:
'''
通过where子句表达式进行Select查询
:param tbname: 表名
:param expression:where子句
:return: 查询结果
'''
sqlstring = f"select *from {tbname} where {where}"
self._cursor.execute(sqlstring)
strjson = self.transToJson(self._cursor)
return strjson
def doInsertRecord(self, tbname: str, *values) -> None:
'''
通过全部字段值新增数据到表
:param tbname: 表名
:param values: 所有字段的值
:return: None
'''
vls = str(values).replace("[", "").replace("]", "")
sqlstring = f"insert into {tbname} values {vls}"
print(sqlstring)
self._cursor.execute(sqlstring)
self._conn.commit()
def doInsertByKV(self, tbname: str, **keyvalues) -> None:
'''
通过 字段名=值 的键值对新增记录
:param tbname: 表名
:param keyvalues: 字段名=值的字典
:return: None
'''
keys = str(keyvalues.keys()).replace("dict_keys", "").replace("'", "").replace("[", "").replace("]", "")
values = str(keyvalues.values()).replace("dict_keys", "").replace("[", "").replace("]", "")
sqlstring = f"insert into {tbname} {keys} values {values}"
self._cursor.execute(sqlstring)
self._conn.commit()
def doDeleteByKV(self, tbname: str, **keyvalues) -> None:
'''
通过 字段名=值 的方式查找到对于键值对并删除
:param tbname: 表名
:param keyvalues: 键值对
:return: None
'''
keys = list(keyvalues.keys())
values = list(keyvalues.values())
pairs = []
for i in range(len(keys)):
pairs.append(f"{keys[i]}={values[i]}")
pairs.append("and") # 使用and连接词
del pairs[len(pairs) - 1] # 删除最后一个and连接词
pairs = str(pairs).replace("[", "").replace("]", "").replace("'", "").replace(",", "")
sqlstring = f"delete from {tbname} where {pairs}"
self._cursor.execute(sqlstring)
self._conn.commit()
def doDeleteWhere(self, tbname: str, where: str) -> None:
'''
通过where表达式进行查询并删除
:param tbname: 表名
:param expression:表达式
:return: None
'''
sqlstring = f"delete from {tbname} where {where}"
self._cursor.execute(sqlstring)
self._conn.commit()
def doUpdateKV(self, tbname: str, expression: str, **keyvalues) -> None:
'''
通过expression表达式找到数据后对 字段名=值 进行修改
:param tbname: 表名
:param expression:where表达式
:param keyvalues: 修改的字段名=值对
:return: None
'''
keys = list(keyvalues.keys())
values = list(keyvalues.values())
keypairs = []
for i in range(len(keys)):
temp = f"{keys[i]}=\"{(values[i])}\""
keypairs.append(temp)
keypairs = str(keypairs).replace("[", "").replace("]", "").replace("'", "")
sqlstring = f"update {tbname} set {keypairs} where {expression}"
self._cursor.execute(sqlstring)
self._conn.commit()
# 执行返回单值的操作,适用于返回行计数等
def selectCount(self, sqlstring):
cnt = self._cursor.execute_scalar(sqlstring)
return cnt
# 获取标题,以及标题类型字典
def MSSQL_GetTitleDict(self, cursor):
titleDict = {}
for rows in cursor.get_header():
titleDict[rows[0]] = rows[1]
# 如果调用conn完成后千万记得,要吧连接关闭。
return titleDict
def createtable(self, tbname: str, *args: list) -> None:
'''
通过List创建新表格
比如createtable("TB_TestTbale",["ID","nchar(10)"],["Password","nchar(20)","NOT NULL"])
每个字段用一个list表示 顺序为 [字段名,类型名,*约束,*其他]
:param tbname: 表格名称
:param args: 参数
:return:None
'''
data = []
for i in range(len(args)):
temp = str(args[i]).replace("[", "").replace("]", "").replace("'", "").replace(",", "")
data.append(temp)
data = str(data).replace("[", "(").replace("]", ")").replace("'", "")
sqlstring = f"create table {tbname} {data}"
self._cursor.execute(sqlstring)
self._conn.commit()
class ConnMySql(Conn):
def __init__(self):
oConn: pymysql.Connect = None
try:
oConn = pymysql.Connect(
host=current_app.config["db.host"],
user=current_app.config["db.user"],
passwd=current_app.config["db.passwd"],
db=current_app.config["db.db"],
port=int(current_app.config["db.port"]),
charset=current_app.config["db.charset"]
)
except Exception as err:
print('error:', err)
Conn.__init__(self, oConn)
class MySqlTemp(Conn):
def __init__(self):
oConn: pymysql.Connect = pymysql.Connect(
host="114.115.159.144",
user="caiji",
passwd="zzsn9988",
db="caiji",
port=3306,
charset="utf8"
)
Conn.__init__(self, oConn)
# 测试
# conn=MySqlTemp()
# o=conn.userGetFree("wenshu")
# print (o.user_name)
# for row in results:
# id = row[0]
# proxy = row[1]
# print(id, proxy)
# proxyInfos=proxy.split('-')
# for i in range(0,4): #proxyInfos:
# print("----",proxyInfos[i])
class ProxyDao():
def t(self):
pass
# 基本信息
from util import UtilDate
from util import UtilNumber
class BaseInfo:
info_title: str # 标题
key_word: str # 关键词
info_bianhao: str # 案号
info_address: str # 管辖法院
info_time: str # 发布日期 #yyyy-mm-dd
info_id: str # 案件ID
info_yuanyou: str # 裁判理由
info_content: str # 正文内容
# 判断本条信息日期是否在指定日期(含)之后
def isAfter(self, sDate: str) -> bool:
if sDate == "":
return False
else:
if self.info_time >= sDate:
return True
else:
return False
def toString(self):
return self.info_title + "\t" + self.key_word + "\t" + self.info_bianhao + "\t" + self.info_address + "\t" + self.info_time + "\t" + self.info_id
home = C:\Program Files\Python
implementation = CPython
version_info = 3.8.0.final.0
virtualenv = 20.13.0
include-system-site-packages = true
base-prefix = C:\Program Files\Python
base-exec-prefix = C:\Program Files\Python
base-executable = C:\Program Files\Python\python.exe
# 裁判文书抓取
from datetime import datetime, timedelta
import json
import time
from flask import current_app as app
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.proxy import Proxy, ProxyType
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from typing import List
import io
import sys
from dao.Conn import ConnMySql
from util import UtilBrowser
from util import UtilCaptcha
from entity.BaseInfo import BaseInfo
from selenium.webdriver.chrome.webdriver import WebDriver
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import jsonpickle
from util.UtilCaptcha import getCaptchaMode1
from vo.LoginInfo import LoginInfo
class Service02:
browser: WebDriver
url = ""
dateFrom = ""
loginInfo: LoginInfo
#在浏览器的命令行直接指定打开的url时,chrome会保留原来的默认打开的标签页,此时有2个标签页。通过driver.get打开网页,则直接在默认的标签页打开,此时有1个标签页
tab1=1 # 主页、列表页;driver.get打开时为0
tab2=2 # 裁判文书网页;driver.get打开时为1
baseInfo = []
nRetry = 100 # 重试次数,暂未使用
lstRet = []
# 主过程
def getData(self, sDateFrom: str, orgs: List[str]):
# 循环抓取数据,直到指定日期前的数据都抓取完成,基本信息总是抓取。
self.dateFrom = sDateFrom
# sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
print("getData...",flush=True)
for org in orgs:
ok1 = 0
print(org, flush=True)
# 循环采集一个单位的数据,直到全部完成
while True:
# 打开浏览器,查找单位并切换到单位详情,失败则更换代理IP后重来
while True:
if self.openBrowser(org):
break
else:
self.quitBrowser()
if ok1 == 0:
# 采集未完成,继续采集
ok1 = self.getData1()
if ok1 == 1:
# 采集已完成,退出并继续下一单位
break
conn = ConnMySql()
if self.loginInfo is not None:
conn.userClearLoginStateByID(self.loginInfo.id)
conn.close()
# 保存数据到缓冲区
o = {"org": org, "baseInfo": self.baseInfo}
self.lstRet.append(o)
# 全部单位数据采集完成,退出并返回数据给调用者
retData = jsonpickle.encode(self.lstRet, unpicklable=False)
print(json.loads(retData))
return retData
# 打开浏览器,查找单位并转到单位信息页面
def openBrowser(self, org: str) -> bool:
ret = False
print("openBrowser...", flush=True)
conn = ConnMySql()
self.loginInfo = conn.userGetFree("wenshu", app.config['sys.userid'])
if self.loginInfo is None:
app.config['sys.userid'] = 0
self.loginInfo = conn.userGetFree("wenshu", app.config['sys.userid'])
app.config['sys.userid'] = self.loginInfo.id
#conn.userSetLoginStateByID(self.loginInfo.id)
conn.close()
self.browser = UtilBrowser.newChrome(app.config['sys.mainUrl'], False , app.config['sys.useProxy'])
#打开浏览器及裁判文书主页后,删除可能存在的多余的窗口页签。暂未使用
# if len(self.browser.window_handles)>1:
# self.browser.switch_to.window(self.browser.window_handles[0])
# self.browser.close()
# self.browser.switch_to.window(self.browser.window_handles[0])
loginMode = app.config['sys.loginMode']
# 登录,后续放到单独的过程中统一处理
if loginMode == "0":
pass
elif loginMode == "1":
# 需要账号登录,账号存在数据库中,轮换使用
objLogin = UtilBrowser.waitElement(self.browser, By.CSS_SELECTOR, app.config['sys.loginButton'], 30, 1)
if objLogin != None:
objLogin.click()
#可能随机出现独立的图形验证码输入界面,4个大小写字母和数字组成
nTry = 0
hasPass = False
objLoginCaptchaButton0 = UtilBrowser.waitElement(self.browser, By.CSS_SELECTOR, app.config['sys.loginCaptchaButton'], 10, 1)
if objLoginCaptchaButton0 is not None:
# 尝试5次,若验证通过,则继续,否则退出,切换账号重新尝试
while nTry < 10:
nTry = nTry + 1
objLoginCaptchaButton = UtilBrowser.waitElement(self.browser, By.CSS_SELECTOR, app.config['sys.loginCaptchaButton'], 10, 1)
# 获取验证码图片Url
# url = self.browser.find_element(By.CSS_SELECTOR, app.config['sys.loginCaptchaImage']).get_attribute('src')
strCaptcha = UtilCaptcha.getCaptchaMode1(self.browser, app.config['sys.loginCaptchaImage'])
self.browser.find_element(By.CSS_SELECTOR, app.config['sys.loginCaptchaInput']).send_keys(strCaptcha)
objLoginCaptchaButton.click()
time.sleep(1)
objLoginCaptchaButton = UtilBrowser.waitElement(self.browser, By.CSS_SELECTOR, app.config['sys.loginCaptchaButton'], 10, 1)
if objLoginCaptchaButton is None:
hasPass = True
break
if hasPass:
nTry = 0
else:
nTry = 4
iframe = None # 登录信息
while nTry < 3:
iframe = UtilBrowser.waitElement(self.browser, By.CSS_SELECTOR, "#contentIframe", 20, 1)
# WebDriverWait(driver=driver, timeout=20, poll_frequency=1, ignored_exceptions=None).until(expected_conditions.presence_of_element_located((By.ID,'contentIframe')))
if iframe is None:
self.browser.refresh()
else:
break
nTry = nTry + 1
if iframe is not None:
self.browser.switch_to.frame(iframe)
objUser = UtilBrowser.getElement(self.browser, By.CSS_SELECTOR, app.config['sys.loginUser'])
if objUser is not None:
objUser.send_keys(self.loginInfo.user_name)
objPass = UtilBrowser.getElement(self.browser, By.CSS_SELECTOR, app.config['sys.loginPasswd'])
objPass.send_keys(self.loginInfo.user_passwd)
objLogin = UtilBrowser.getElement(self.browser, By.CSS_SELECTOR, app.config['sys.loginOk'])
# 登录时需要短信验证码,暂未处理
if app.config['sys.loginSMSCode'] != "":
objSMS = UtilBrowser.hasElement(self.browser, By.CSS_SELECTOR,
app.config['sys.loginSMSCode'])
if objSMS is not None:
self.browser.find_element(By.CSS_SELECTOR, app.config['sys.loginSMSCode']).send_keys("")
objLogin.click()
time.sleep(5)
self.browser.refresh()
#ret = True # 此处应确认出现特定元素后才返回True
# 若填写列口令则自动登录,否则等待人工登录
# self.browser.find_element(By.CSS_SELECTOR, app.config['sys.loginButton']).click()
elif loginMode == "2":
# cookie登录,暂未处理
pass
# self.browser.get(app.config['sys.mainUrl'])
# 搜索框填写单位名称并单击提交
objSearchInput = UtilBrowser.getElement(self.browser, By.CSS_SELECTOR, app.config['css.searchInput'])
#登录失败则页面不会有搜索框
if objSearchInput is None:
ret = False
else:
objSearchInput.send_keys(org)
time.sleep(2)
# 模拟单击搜索按钮
objSearchButton = UtilBrowser.getElement(self.browser, By.CSS_SELECTOR, app.config['css.searchButton'])
objSearchButton.click()
time.sleep(5)
self.browser.refresh()
# 设置为按日期倒排序
objDateSort = UtilBrowser.getElement(self.browser, By.CSS_SELECTOR, app.config['css.listDateSort'])
if objDateSort is not None:
objDateSort.click()
time.sleep(5)
ret = True
return ret
# 退出浏览器
def quitBrowser(self):
try:
self.browser.quit()
self.browser = None
except:
pass
# 裁判文书信息,重新查找某单位的文书时,和上次打开浏览器相比可能增加了新的裁判文书,所以每次都从首页开始,无需从上次中断的页开始
def getData1(self) -> int:
ret = 0
print("getData1...", flush=True)
# 以下在列表页
selector_title = app.config['css.listTitle'] # "#_view_1545184311000 > div:nth-child(?) > div.list_title.clearfix > h4 > a"
# 以下在详情页,需打开裁判文书,裁判文书在新页签打开
baseInfo1: BaseInfo
# 裁判文书数量
s = self.getAttr(By.CSS_SELECTOR, app.config['css.listCount'], "textContent")
n = self.toInt(s)
if n == 0:
# 若无数据则退出,且不在继续采集本类数据
return 1
pageNo = 0
while True:
# 循环采集多页数据
pageNo = pageNo + 1
for i in range(1, 6): # 每页5条数据
print(f"----文书数量:{n},每页文书个数:5,当前页号:{pageNo},当前序号:{i}", flush=True)
# 不满一页时遇到不存在的行则退出
# 文书列表从nth-child(3)开始
if not UtilBrowser.hasElement(self.browser, By.CSS_SELECTOR,
selector_title.replace("?", str(i + 2), 1)):
break
baseInfo1 = BaseInfo()
baseInfo1.info_title = self.getAttr(By.CSS_SELECTOR, selector_title.replace("?", str(i + 2), 1), "textContent")
baseInfo1.info_bianhao = self.getAttr(By.CSS_SELECTOR,
app.config['css.listBianhao'].replace("?", str(i + 2), 1),
"textContent")
baseInfo1.info_address = self.getAttr(By.CSS_SELECTOR,
app.config['css.listAddress'].replace("?", str(i + 2), 1),
"textContent")
baseInfo1.info_time = self.getAttr(By.CSS_SELECTOR,
app.config['css.listTime'].replace("?", str(i + 2), 1),
"textContent")
baseInfo1.info_yuanyou = self.getAttr(By.CSS_SELECTOR,
app.config['css.listYuanyou'].replace("?", str(i + 2), 1),
"textContent")
# https://wenshu.court.gov.cn/website/wenshu/181107ANFZ0BXSK4/index.html?docId=OUD3Tm7EvEQVkiexnBa5S3nnG9zDkQyxiWoR8jr7QJJtFc9Y6vX89Z/dgBYosE2gstL9HQn+C934OzwMvqVgk+DtAz+qRVZWr9dI7ybeiFnaPaFBceYmelTK0+qydxfd
link = self.getAttr(By.CSS_SELECTOR, selector_title.replace("?", str(i + 2), 1),
"href")
pos = link.index("=") + 1
baseInfo1.info_id = link[pos:]
if baseInfo1.isAfter(self.dateFrom):
# 当前数据条目在指定日期之后,则如果缓冲区没有的话追加到缓冲区,已经存在的,则忽略
exist = False
for e in self.baseInfo:
# 法律文书可抓取到原始的文书ID,可直接用ID查找是否已经抓取
if e.info_id == baseInfo1.info_id: # e.toString == baseInfo1.toString()
exist = True
break
if exist == False:
# 获取裁判文书正文,未抓取的才抓取正文
# 模拟单击正文链接
self.browser.find_element(By.CSS_SELECTOR, selector_title.replace("?", str(i + 2), 1)).click()
# 单击后会自动在新标签页打开正文链接,并且为活动页签
t1 = datetime.now()
while True:
if len(self.browser.window_handles) > self.tab2:
break
t2 = datetime.now()
if (t2 - t1).seconds > 60:
break
time.sleep(1)
if len(self.browser.window_handles) > self.tab2:
self.browser.switch_to.window(self.browser.window_handles[self.tab2])
baseInfo1.info_content = self.getAttr(By.CSS_SELECTOR, app.config['css.contContent'],
"textContent")
time.sleep(5)
# 关闭文书正文页签,回到文书列表页签
self.browser.close()
self.browser.switch_to.window(self.browser.window_handles[self.tab1])
print("--------当前文书长度:", len(baseInfo1.info_content), flush=True)
self.baseInfo.append(baseInfo1)
else:
# 当前数据条目在指定日期之后
ret = 1
break
# 如果有下一页,则继续,否则数据采集完成
if self.getAttr(By.CSS_SELECTOR, app.config['css.listNextPage'], "class").find("disabled"):
# 存在disabled则无下一页
ret = 1
else:
self.browser.find_element(By.CSS_SELECTOR, app.config['css.listNextPage']).click()
time.sleep(5)
# 出现验证码窗口或IP锁定界面则退出重新切换IP采集
if self.hasCaptcha() or self.hasBlock():
break
if ret == 1:
break
return ret
def toInt(self, s) -> int:
ret = 0
try:
ret = int(s)
except:
pass
return ret
# 返回页面元素指定属性的值,如class
def getAttr(self, by: str, selector: str, attr: str) -> str:
ret = ""
try:
if attr == "text":
ret = self.browser.find_element(by, selector).text
else:
ret = self.browser.find_element(by, selector).get_attribute(attr)
except:
pass
return ret
# 判断是否出现了验证码。
def hasCaptcha(self) -> bool:
ret = False
# we: WebElement #
# wes = self.browser.find_elements(By.TAG_NAME, "div")
# for we in wes:
# if we.get_attribute("class").find("geetest_box"): #
# if we.get_attribute("style").find("display: block;"):
# ret = True
return ret
# 判断是否出现了IP锁定。
def hasBlock(self) -> bool:
ret = False
# if self.getAttr(By.CSS_SELECTOR, "body > div > p", "text").find("夹带攻击行为"): # p.prom 您的地址(1.2.3.4)访问疑似夹带攻击行为,请稍后重试,或注册/登录
# print("*******夹带攻击行为*******")
# ret = True
return ret
#系统配置
[sys]
#文字识别Url,用于识别裁判文书网的验证码
ocrUrl=http://114.116.49.86:8013/wzsb_app?withCrLf=false
#登录模式,0-无需登录,1-账号登录(需要口令、短信、验证码相应的选择器不能为空),2-cookie登录
loginMode=1
#是否使用代理,0-不用,1-使用,需登录的一般不适用代理
useProxy=1
#验证码识别,0-不识别,1-识别,暂采用固定的方法识别验证码,后续扩展为不同的识别模式
verifiCode=0
#登录Url ?open=login
loginUrl=https://wenshu.court.gov.cn/website/wenshu/181010CARHS5BS3C/index.html
#正常Url,登录后可能会自动跳转到正常Url
mainUrl=https://wenshu.court.gov.cn
#登录-用户
loginUser=#root > div > form > div > div:nth-child(1) > div > div > div > input
#登录-口令
loginPasswd=#root > div > form > div > div:nth-child(2) > div > div > div > input
#裁判文书网的图形验证码在单独的页面,输入正确后返回到登录界面
#登录-图形验证码输入框,不为空时则需要识别验证码
loginCaptchaInput=body > div > div.card-body > div > form > div.captcha > input
#登录-图形验证码图片
loginCaptchaImage=#Image1
#登录-图形验证码确认按钮
loginCaptchaButton=body > div > div.card-body > div > form > div.warnbtn > input
#登录-短信验证码,可能和图形验证码同时需要,暂未处理
loginSMSCode=
#主界面登录按钮
loginButton=#loginLi > a
#登录界面确认登录按钮
loginOk=#root > div > form > div > div.login-button-container > span
#数据库配置
[db]
host=114.115.159.144
port=3306
user=caiji
passwd=zzsn9988
db=caiji
charset=utf8
#css选择器配置
[css]
#搜索-文本框
searchInput=#_view_1540966814000 > div > div.search-wrapper.clearfix > div.search-middle > input
#搜索-按钮
searchButton=#_view_1540966814000 > div > div.search-wrapper.clearfix > div.search-rightBtn.search-click
#列表-日期倒排按钮
listDateSort=#_view_1545184311000 > div.LM_tool.clearfix > div:nth-child(2) > a
#列表-案件数量
listCount=#_view_1545184311000 > div.LM_con.clearfix > div.fr.con_right > span
#列表-案件名称
listTitle=#_view_1545184311000 > div:nth-child(?) > div.list_title.clearfix > h4 > a
#列表-编号
listBianhao=#_view_1545184311000 > div:nth-child(?) > div.list_subtitle > span.ah
#列表-法院
listAddress=#_view_1545184311000 > div:nth-child(?) > div.list_subtitle > span.slfyName
#列表-审结日期
listTime=#_view_1545184311000 > div:nth-child(?) > div.list_subtitle > span.cprq
#列表-案由
listYuanyou=#_view_1545184311000 > div:nth-child(?) > div.list_reason > p
#下一页按钮
listNextPage=#_view_1545184311000 > div.left_7_3 > a:last-child
#正文-链接,一般和title相同
contLink=#_view_1545184311000 > div:nth-child(?) > div.list_title.clearfix > h4 > a
#正文-正文
contContent=#_view_1541573883000 > div > div.PDF_box > div.PDF_pox
from flask import current_app
from datetime import datetime, timedelta
import time
import random
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.proxy import Proxy, ProxyType
from selenium.webdriver.chrome.options import Options
from msedge.selenium_tools import EdgeOptions
from msedge.selenium_tools import Edge
from selenium.webdriver.chrome.webdriver import DesiredCapabilities
from selenium.webdriver.chrome.service import Service as ChromeService
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.webdriver import WebDriver
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions
from selenium.webdriver.remote.webelement import WebElement
import seleniumwire.undetected_chromedriver.v2
from dao.Conn import ConnMySql
from vo.ProxyInfo import ProxyInfo
# 等待元素出现,timeout-等待时长,frequency等待期间检查频率,均为秒
def waitElement(browser: WebDriver, by: str, selecter: str, timeout: int = 20, frequency: int = 1) -> WebElement:
ret = None
t1 = datetime.now()
while (datetime.now() - t1).seconds < timeout:
if hasElement(browser, by, selecter):
ret = getElement(browser, by, selecter)
break
time.sleep(frequency)
return ret
# 获取元素,无则返回None
def getElement(browser: WebDriver, by: str, selecter: str) -> WebElement:
ret = None
try:
ret = browser.find_element(by, selecter)
except:
pass
return ret
# 判断元素是否存在。
def hasElement(browser: WebDriver, by: str, selecter: str) -> bool:
ret = True
try:
browser.find_element(by, selecter)
except:
ret = False
return ret
# 返回页面元素指定属性的值,如class,若未找到元素,则返回空串
def getAttr(brow: webdriver, by: str, selector: str, attr: str) -> str:
ret = ""
try:
s = brow.find_element(str, selector).get_attribute(attr) #
except:
pass
return ret
# 打开Edge浏览器
def newEdge(useProxy):
edge_options = EdgeOptions()
edge_options.use_chromium = True
edge_options.add_argument('--disable-blink-features=AutomationControlled')
driver = webdriver.Edge(options=edge_options)
# driver.get('https://bing.com')
# element = driver.find_element(By.ID, 'sb_form_q')
# element.send_keys('WebDriver')
# element.submit()
return driver
# 打开Chrome浏览器
# url:浏览器启动时需要打开的url
# useProxy:使用使用代理,True-使用
# cookie:需要设置的cookie信息,如登录信息。
def newChrome(url: str = "", debugMode: bool = False, useProxy: bool = False, cookie: str = "") -> WebDriver:
# 禁止浏览器自动关闭
option = webdriver.ChromeOptions()
if debugMode == False:
option.add_experimental_option("detach", True)
option.add_experimental_option('excludeSwitches',['enable-automation']) # 去掉web自动化,window.navigator.webdriver=undefined
option.add_experimental_option('useAutomationExtension', False) # 取消chrome受自动控制提示
option.add_argument("--disable-blink-features=AutomationControlled")
option.add_argument('disable-infobars') # 不显示Chrome正在受自动软件控制
option.add_argument('--disable-gpu') # 谷歌文档提到需要加上这个属性来规避bug
option.add_argument('--ignore-certificate-errors')
# option.add_argument("--user-data-dir=C:/Users/Administrator/AppData/Local/Google/Chrome/User Data/Default");
# option.add_argument("--test-type=allow-running-insecure-content");
# option.add_argument('--headless') # 后台运行Chrome
# 随机设置代理IP,分为http代理和socks5代理。useProxy=False时,需要清除原来设置的代理
isHttpProxy = False
seleniumwire_options = {}
proxy = None
if useProxy:
# 读取一个IP代理,如果已经到最后一个,则重新从0开始
conn = ConnMySql()
proxyInfo: ProxyInfo
proxyInfo = conn.proxyGetNext(current_app.config['sys.proxyid'])
print("proxy_id:"+str(current_app.config['sys.proxyid'])+","+proxyInfo.ip,flush=True)
if proxyInfo is None:
current_app.config['sys.proxyid'] = 0
proxyInfo = conn.proxyGetNext(current_app.config['sys.proxyid'])
current_app.config['sys.proxyid'] = proxyInfo.id
desired_capabilities = webdriver.DesiredCapabilities.CHROME.copy()
sProxy = ""
if proxyInfo.user_name == "":
# option.add_argument(f'--proxy-server=http://{proxy_ip}:{proxy_port}')
# sProxy = f'--proxy-server=http://{proxy_ip}:{proxy_port}'
sProxy = f'http://{proxyInfo.ip}:{proxyInfo.port}'
else:
# option.add_argument(f'--proxy-server=http://{proxy_username}:{proxy_password}@{proxy_ip}:{proxy_port}')
# sProxy = f'--proxy-server=http://{proxy_username}:{proxy_password}@{proxy_ip}:{proxy_port}'
sProxy = f'http://{proxyInfo.user_name}:{proxyInfo.user_passwd}@{proxyInfo.ip}:{proxyInfo.port}'
webdriver.DesiredCapabilities.CHROME['proxy'] = {
"httpProxy": sProxy,
"sslProxy": sProxy,
"proxyType": "manual"
}
conn.close()
# 随机设置UserAgent
userAgent = getUserAgent()
# option.add_argument('user-agent=%s' %userAgent)
if url != "":
option.add_argument('--app ' + url) # 在默认窗口打开链接 https://wenshu.court.gov.cn
if debugMode:
#option.debugger_address = "127.0.0.1:9222"
option.add_experimental_option('debuggerAddress', '127.0.0.1:9222')
# 创建 Chrome 浏览器实例,同时设置代理信息
driver = webdriver.Chrome(
options=option) # service=ChromeService(ChromeDriverManager().install()), chrome_options options=option , desired_capabilities=desired_capabilities, seleniumwire_options=seleniumwire_options
driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
"source": """
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
})
"""
})
if debugMode == False:
driver.maximize_window() # 窗口最大化
driver.delete_all_cookies() # 清除cookies
# cookie后续改为bool型,=True则每次从数据库中读取一个保存的cookie登录,登录后可能需要保存cookie,因为过期时间可能被重新设置。
# 需要时也可由专门的后台任务定期对最近未登录的cookie进行重新登录并保存新的cookie
if cookie != "":
cookie_dict = eval(cookie)
driver.add_cookie(cookie_dict)
driver.refresh()
return driver
# 返回代理IP及端口号,例:1.2.3.4:555
def getProxyIP():
return ""
# 随机返回一个浏览器的UserAgent
def getUserAgent():
user_agents = [
"Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; AcooBrowser; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0; Acoo Browser; SLCC1; .NET CLR 2.0.50727; Media Center PC 5.0; .NET CLR 3.0.04506)",
"Mozilla/4.0 (compatible; MSIE 7.0; AOL 9.5; AOLBuild 4337.35; Windows NT 5.1; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
"Mozilla/5.0 (Windows; U; MSIE 9.0; Windows NT 9.0; en-US)",
"Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Win64; x64; Trident/5.0; .NET CLR 3.5.30729; .NET CLR 3.0.30729; .NET CLR 2.0.50727; Media Center PC 6.0)",
"Mozilla/5.0 (compatible; MSIE 8.0; Windows NT 6.0; Trident/4.0; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; .NET CLR 1.0.3705; .NET CLR 1.1.4322)",
"Mozilla/4.0 (compatible; MSIE 7.0b; Windows NT 5.2; .NET CLR 1.1.4322; .NET CLR 2.0.50727; InfoPath.2; .NET CLR 3.0.04506.30)",
"Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN) AppleWebKit/523.15 (KHTML, like Gecko, Safari/419.3) Arora/0.3 (Change: 287 c9dfb30)",
"Mozilla/5.0 (X11; U; Linux; en-US) AppleWebKit/527+ (KHTML, like Gecko, Safari/419.3) Arora/0.6",
"Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.8.1.2pre) Gecko/20070215 K-Ninja/2.1.1",
"Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN; rv:1.9) Gecko/20080705 Firefox/3.0 Kapiko/3.0",
"Mozilla/5.0 (X11; Linux i686; U;) Gecko/20070322 Kazehakase/0.4.5",
"Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.9.0.8) Gecko Fedora/1.9.0.8-1.fc10 Kazehakase/0.5.6",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_3) AppleWebKit/535.20 (KHTML, like Gecko) Chrome/19.0.1036.7 Safari/535.20",
"Opera/9.80 (Macintosh; Intel Mac OS X 10.6.8; U; fr) Presto/2.9.168 Version/11.52",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.11 TaoBrowser/2.0 Safari/536.11",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/21.0.1180.71 Safari/537.1 LBBROWSER",
"Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; LBBROWSER)",
"Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; QQDownload 732; .NET4.0C; .NET4.0E; LBBROWSER)",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.84 Safari/535.11 LBBROWSER",
"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.1; WOW64; Trident/5.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E)",
"Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; QQBrowser/7.0.3698.400)",
"Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; QQDownload 732; .NET4.0C; .NET4.0E)",
"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Trident/4.0; SV1; QQDownload 732; .NET4.0C; .NET4.0E; 360SE)",
"Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; QQDownload 732; .NET4.0C; .NET4.0E)",
"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.1; WOW64; Trident/5.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E)",
"Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/21.0.1180.89 Safari/537.1",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/21.0.1180.89 Safari/537.1",
"Mozilla/5.0 (iPad; U; CPU OS 4_2_1 like Mac OS X; zh-cn) AppleWebKit/533.17.9 (KHTML, like Gecko) Version/5.0.2 Mobile/8C148 Safari/6533.18.5",
"Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:2.0b13pre) Gecko/20110307 Firefox/4.0b13pre",
"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:16.0) Gecko/20100101 Firefox/16.0",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.64 Safari/537.11",
"Mozilla/5.0 (X11; U; Linux x86_64; zh-CN; rv:1.9.2.10) Gecko/20100922 Ubuntu/10.10 (maverick) Firefox/3.6.10"
]
# random.choice返回列表的随机项
user_agent = random.choice(user_agents)
return user_agent
# 验证码识别,暂只处理裁判文书网的验证码
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.webdriver import WebDriver
import requests
from flask import current_app
from pathlib import Path
import tempfile
import uuid
import hashlib
import os
import json
# selecter: 验证码图片css选择器
def getCaptchaMode1(browser: WebDriver,selecter: str):
ret = ""
# 通过requests发送一个get请求到图片地址,返回的响应就是图片内容
out_path = "./Temp_file"
try:
Path(out_path).mkdir(parents=True, exist_ok=True)
# 将获取到的图片二进制流写入本地文件
path_name = os.path.join(out_path, str(uuid.uuid4())) + ".png"
print(path_name)
# 保存验证码图片
img = browser.find_element(By.CSS_SELECTOR, selecter)
img.screenshot(path_name)
# #url方式下载
# r = requests.get(imgUrl)
# with open(path_name, 'wb') as f:
# # 对于图片类型的通过r.content方式访问响应内容,将响应内容写入baidu.png中
# f.write(r.content)
ocrUrl = current_app.config['sys.ocrUrl']
# 调用文字识别服务
file = open(path_name, "rb")
response = requests.post(ocrUrl, files={"multiRequest": file})
file.close()
os.remove(path_name)
# 返回:{"code":200,"logs":null,"message":"success","resultData":"2rVK"}
oRet = json.loads(response.text)
ret = oRet["resultData"]
#os.remove(path_name)
print(ret)
except Exception as err:
print('getCaptchaMode1 error:', err)
return ret
from datetime import datetime,timedelta
from dateutil.relativedelta import relativedelta
#将yyyy月m月d日格式的日期转为yyyy-mm-dd格式的日期
def convertDate(sDate:str):
sDate = sDate.replace("年","-")
sDate = sDate.replace("月", "-")
sDate = sDate.replace("日", "")
date_obj = datetime.strptime(sDate, '%Y-%m-%d')
sDate = date_obj.strftime('%Y-%m-%d')
return sDate
#日期加减偏置,参数ymd为单位,y=年,m=月,d=日
def dateAdd(sDate:str,ymd:str="d",diff:int=1):
if sDate=="":
sDate = datetime.now()
sDate = sDate.strftime('%Y-%m-%d')
date_obj = datetime.strptime(sDate, '%Y-%m-%d')
if ymd=="y":
if diff > 0:
date_obj = date_obj+relativedelta(years=diff)
else:
diff=-diff
date_obj = date_obj - relativedelta(years=diff)
elif ymd=="m":
if diff>0:
date_obj = date_obj + relativedelta(months=diff)
else:
diff=-diff
date_obj = date_obj - relativedelta(months=diff)
elif ymd=="d":
date_obj = date_obj + timedelta(days=diff)
else:
pass
sDate = date_obj.strftime('%Y-%m-%d')
return sDate
\ No newline at end of file
#数值处理类
#将字符串的金额转换为数值型金额,字符串金额可能包含万元,人民币等
def convertMoney(sMoney:str):
sMoney = sMoney.replace("万", "")
sMoney = sMoney.replace("亿", "")
sMoney = sMoney.replace("人民币", "")
sMoney = sMoney.replace("元", "")
return float(sMoney)
from selenium.webdriver.chrome.webdriver import WebDriver
from seleniumwire import webdriver
from selenium.webdriver.chrome.service import Service as ChromeService
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
#IP代理池
class UtilProxy:
id:int
ip:str
port:str
name:str
password:str
#切换IP代理
def alterIP(self,browser:WebDriver):
pass
# 账号信息
from util import UtilDate
from util import UtilNumber
class LoginInfo:
id: int # 标题
user_group: str
user_name: str
user_passwd: str
# 代理IP信息
from util import UtilDate
from util import UtilNumber
class ProxyInfo:
id: int # 标题
ip: str
port: str
user_name: str
user_passwd: str
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论