提交 fdc2816c 作者: LiuLiYuan

BaseCore 10/13

上级 31e25a8d
...@@ -11,28 +11,33 @@ import pandas as pd ...@@ -11,28 +11,33 @@ import pandas as pd
import requests import requests
import zhconv import zhconv
import redis import redis
from retry import retry
from selenium import webdriver from selenium import webdriver
from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.service import Service
import langid import langid
#创建连接池 # 创建连接池
import pymysql import pymysql
from DBUtils.PooledDB import PooledDB from DBUtils.PooledDB import PooledDB
# import sys # import sys
# sys.path.append('D://zzsn_spider//base//fdfs_client') # sys.path.append('D://zzsn_spider//base//fdfs_client')
from fdfs_client.client import get_tracker_conf, Fdfs_client from fdfs_client.client import get_tracker_conf, Fdfs_client
tracker_conf = get_tracker_conf('D:\\kkwork\\zzsn_spider\\base\\client.conf') tracker_conf = get_tracker_conf('D:\\kkwork\\zzsn_spider\\base\\client.conf')
client = Fdfs_client(tracker_conf) client = Fdfs_client(tracker_conf)
from obs import ObsClient from obs import ObsClient
import fitz import fitz
from urllib.parse import unquote from urllib.parse import unquote
obsClient = ObsClient( obsClient = ObsClient(
access_key_id='VEHN7D0TJ9316H8AHCAV', # 你的华为云的ak码 access_key_id='VEHN7D0TJ9316H8AHCAV', # 你的华为云的ak码
secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk
server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址 server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址
) )
# 注意 程序退出前 调用BaseCore.close() 关闭相关资源 # 注意 程序退出前 调用BaseCore.close() 关闭相关资源
class BaseCore: class BaseCore:
...@@ -239,8 +244,9 @@ class BaseCore: ...@@ -239,8 +244,9 @@ class BaseCore:
'Mozilla/5.0 (iPod; U; CPU iPhone OS 4_3_3 like Mac OS X; en-us) AppleWebKit/533.17.9 (KHTML, like Gecko) Version/5.0.2 Mobile/8J2 Safari/6533.18.5' 'Mozilla/5.0 (iPod; U; CPU iPhone OS 4_3_3 like Mac OS X; en-us) AppleWebKit/533.17.9 (KHTML, like Gecko) Version/5.0.2 Mobile/8J2 Safari/6533.18.5'
] ]
#Android agent池 # Android agent池
__USER_PHONE_AGENT_LIST = ['Mozilla/5.0 (Linux; Android 7.1.1; OPPO R9sk) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.111 Mobile Safari/537.36'] __USER_PHONE_AGENT_LIST = [
'Mozilla/5.0 (Linux; Android 7.1.1; OPPO R9sk) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.111 Mobile Safari/537.36']
def __init__(self): def __init__(self):
# self.__cnx_proxy = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='clb_project', # self.__cnx_proxy = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='clb_project',
...@@ -251,9 +257,9 @@ class BaseCore: ...@@ -251,9 +257,9 @@ class BaseCore:
self.cursor = self.cnx.cursor() self.cursor = self.cnx.cursor()
#11数据库 # 11数据库
self.cnx_ = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='clb_project', self.cnx_ = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='clb_project',
charset='utf8mb4') charset='utf8mb4')
self.cursor_ = self.cnx_.cursor() self.cursor_ = self.cnx_.cursor()
# 连接到Redis # 连接到Redis
self.r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=6) self.r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=6)
...@@ -276,11 +282,11 @@ class BaseCore: ...@@ -276,11 +282,11 @@ class BaseCore:
try: try:
self.cursor.close() self.cursor.close()
self.cnx.close() self.cnx.close()
except : except:
pass pass
# 计算耗时 # 计算耗时
def getTimeCost(self,start, end): def getTimeCost(self, start, end):
seconds = int(end - start) seconds = int(end - start)
m, s = divmod(seconds, 60) m, s = divmod(seconds, 60)
h, m = divmod(m, 60) h, m = divmod(m, 60)
...@@ -293,6 +299,7 @@ class BaseCore: ...@@ -293,6 +299,7 @@ class BaseCore:
else: else:
ms = int((end - start) * 1000) ms = int((end - start) * 1000)
return "%d毫秒" % (ms) return "%d毫秒" % (ms)
# 当前时间格式化 # 当前时间格式化
# 1 : 2001-01-01 12:00:00 %Y-%m-%d %H:%M:%S # 1 : 2001-01-01 12:00:00 %Y-%m-%d %H:%M:%S
# 2 : 010101120000 %y%m%d%H%M%S # 2 : 010101120000 %y%m%d%H%M%S
...@@ -322,7 +329,7 @@ class BaseCore: ...@@ -322,7 +329,7 @@ class BaseCore:
return "ZZSN" + self.getNowTime(2) + str(self.__seq).zfill(3) return "ZZSN" + self.getNowTime(2) + str(self.__seq).zfill(3)
# 日志格式 # 日志格式
def logFormate(self,record, handler): def logFormate(self, record, handler):
formate = "[{date}] [{level}] [{filename}] [{func_name}] [{lineno}] {msg}".format( formate = "[{date}] [{level}] [{filename}] [{func_name}] [{lineno}] {msg}".format(
date=record.time, # 日志时间 date=record.time, # 日志时间
level=record.level_name, # 日志等级 level=record.level_name, # 日志等级
...@@ -332,8 +339,9 @@ class BaseCore: ...@@ -332,8 +339,9 @@ class BaseCore:
msg=record.message # 日志内容 msg=record.message # 日志内容
) )
return formate return formate
# 获取logger # 获取logger
def getLogger(self,fileLogFlag=True, stdOutFlag=True): def getLogger(self, fileLogFlag=True, stdOutFlag=True):
dirname, filename = os.path.split(os.path.abspath(sys.argv[0])) dirname, filename = os.path.split(os.path.abspath(sys.argv[0]))
dirname = os.path.join(dirname, "logs") dirname = os.path.join(dirname, "logs")
filename = filename.replace(".py", "") + ".log" filename = filename.replace(".py", "") + ".log"
...@@ -382,34 +390,34 @@ class BaseCore: ...@@ -382,34 +390,34 @@ class BaseCore:
proxy_list.append(proxy) proxy_list.append(proxy)
return proxy_list[random.randint(0, 3)] return proxy_list[random.randint(0, 3)]
#字符串截取 # 字符串截取
def getSubStr(self,str,beginStr,endStr): def getSubStr(self, str, beginStr, endStr):
if beginStr=='': if beginStr == '':
pass pass
else: else:
begin=str.rfind(beginStr) begin = str.rfind(beginStr)
if begin==-1: if begin == -1:
begin=0 begin = 0
str=str[begin:] str = str[begin:]
if endStr=='': if endStr == '':
pass pass
else: else:
end=str.rfind(endStr) end = str.rfind(endStr)
if end==-1: if end == -1:
pass pass
else: else:
str = str[0:end+1] str = str[0:end + 1]
return str return str
# 繁体字转简体字 # 繁体字转简体字
def hant_2_hans(self,hant_str: str): def hant_2_hans(self, hant_str: str):
''' '''
Function: 将 hant_str 由繁体转化为简体 Function: 将 hant_str 由繁体转化为简体
''' '''
return zhconv.convert(hant_str, 'zh-hans') return zhconv.convert(hant_str, 'zh-hans')
# 判断字符串里是否含数字 # 判断字符串里是否含数字
def str_have_num(self,str_num): def str_have_num(self, str_num):
panduan = False panduan = False
for str_1 in str_num: for str_1 in str_num:
...@@ -429,7 +437,7 @@ class BaseCore: ...@@ -429,7 +437,7 @@ class BaseCore:
# return gw_item.decode() if gw_item else None # return gw_item.decode() if gw_item else None
# 从Redis的List中获取并移除一个元素 # 从Redis的List中获取并移除一个元素
def redicPullData(self,key): def redicPullData(self, key):
item = self.r.lpop(key) item = self.r.lpop(key)
return item.decode() if item else None return item.decode() if item else None
...@@ -443,12 +451,13 @@ class BaseCore: ...@@ -443,12 +451,13 @@ class BaseCore:
IP = socket.gethostbyname(socket.gethostname()) IP = socket.gethostbyname(socket.gethostname())
return IP return IP
def mkPath(self,path): def mkPath(self, path):
folder = os.path.exists(path) folder = os.path.exists(path)
if not folder: # 判断是否存在文件夹如果不存在则创建为文件夹 if not folder: # 判断是否存在文件夹如果不存在则创建为文件夹
os.makedirs(path) # makedirs 创建文件时如果路径不存在会创建这个路径 os.makedirs(path) # makedirs 创建文件时如果路径不存在会创建这个路径
else: else:
pass pass
# 生成google模拟浏览器 必须传入值为googledriver位置信息 # 生成google模拟浏览器 必须传入值为googledriver位置信息
# headless用于决定是否为无头浏览器,初始默认为无头浏览器 # headless用于决定是否为无头浏览器,初始默认为无头浏览器
# 正常浏览器可用于开始对页面解析使用或一些网站无头时无法正常采集 # 正常浏览器可用于开始对页面解析使用或一些网站无头时无法正常采集
...@@ -528,7 +537,7 @@ class BaseCore: ...@@ -528,7 +537,7 @@ class BaseCore:
# self.cnx.commit() # self.cnx.commit()
cnn = self.pool_caiji.connection() cnn = self.pool_caiji.connection()
cursor = cnn.cursor() cursor = cnn.cursor()
cursor.execute(sql,values) cursor.execute(sql, values)
cnn.commit() cnn.commit()
cursor.close() cursor.close()
cnn.close() cnn.close()
...@@ -536,11 +545,10 @@ class BaseCore: ...@@ -536,11 +545,10 @@ class BaseCore:
log = self.getLogger() log = self.getLogger()
log.info('======保存日志失败=====') log.info('======保存日志失败=====')
# 获取企查查token
#获取企查查token
def GetToken(self): def GetToken(self):
#获取企查查token # 获取企查查token
query = "select token from QCC_token " query = "select token from QCC_token "
# token = '67ec7402166df1da84ae83c4b95cefc0' # 需要隔两个小时左右抓包修改 # token = '67ec7402166df1da84ae83c4b95cefc0' # 需要隔两个小时左右抓包修改
self.cursor.execute(query) self.cursor.execute(query)
...@@ -548,7 +556,7 @@ class BaseCore: ...@@ -548,7 +556,7 @@ class BaseCore:
self.cnx.commit() self.cnx.commit()
return token return token
#获取天眼查token # 获取天眼查token
def GetTYCToken(self): def GetTYCToken(self):
query = 'select token from TYC_token' query = 'select token from TYC_token'
self.cursor.execute(query) self.cursor.execute(query)
...@@ -556,7 +564,7 @@ class BaseCore: ...@@ -556,7 +564,7 @@ class BaseCore:
self.cnx.commit() self.cnx.commit()
return token return token
#检测语言 # 检测语言
def detect_language(self, text): def detect_language(self, text):
# 使用langid.py判断文本的语言 # 使用langid.py判断文本的语言
result = langid.classify(text) result = langid.classify(text)
...@@ -566,11 +574,11 @@ class BaseCore: ...@@ -566,11 +574,11 @@ class BaseCore:
return 'cn' return 'cn'
return result[0] return result[0]
#追加接入excel # 追加接入excel
def writerToExcel(self,detailList,filename): def writerToExcel(self, detailList, filename):
# filename='baidu搜索.xlsx' # filename='baidu搜索.xlsx'
# 读取已存在的xlsx文件 # 读取已存在的xlsx文件
existing_data = pd.read_excel(filename,engine='openpyxl',dtype=str) existing_data = pd.read_excel(filename, engine='openpyxl', dtype=str)
# 创建新的数据 # 创建新的数据
new_data = pd.DataFrame(data=detailList) new_data = pd.DataFrame(data=detailList)
# 将新数据添加到现有数据的末尾 # 将新数据添加到现有数据的末尾
...@@ -579,19 +587,19 @@ class BaseCore: ...@@ -579,19 +587,19 @@ class BaseCore:
combined_data.to_excel(filename, index=False) combined_data.to_excel(filename, index=False)
# return combined_data # return combined_data
#对失败或者断掉的企业 重新放入redis # 对失败或者断掉的企业 重新放入redis
def rePutIntoR(self,key,item): def rePutIntoR(self, key, item):
self.r.rpush(key, item) self.r.rpush(key, item)
#增加计数器的值并返回增加后的值 # 增加计数器的值并返回增加后的值
def incrSet(self,key): def incrSet(self, key):
# 增加计数器的值并返回增加后的值 # 增加计数器的值并返回增加后的值
new_value = self.r.incr(key) new_value = self.r.incr(key)
print("增加后的值:", new_value) print("增加后的值:", new_value)
return new_value return new_value
#获取key剩余的过期时间 # 获取key剩余的过期时间
def getttl(self,key): def getttl(self, key):
# 获取key的剩余过期时间 # 获取key的剩余过期时间
ttl = self.r.ttl(key) ttl = self.r.ttl(key)
print("剩余过期时间:", ttl) print("剩余过期时间:", ttl)
...@@ -601,12 +609,14 @@ class BaseCore: ...@@ -601,12 +609,14 @@ class BaseCore:
self.r.set(key, 0) self.r.set(key, 0)
self.r.expire(key, 3600) self.r.expire(key, 3600)
time.sleep(2) time.sleep(2)
#上传至文件服务器,并解析pdf的内容和页数
def upLoadToServe(self,pdf_url,type_id,social_code): # 上传至文件服务器,并解析pdf的内容和页数
def upLoadToServe(self, pdf_url, type_id, social_code):
headers = {} headers = {}
retData = {'state':False,'type_id':type_id,'item_id':social_code,'group_name':'group1','path':'','full_path':'', retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': 'group1', 'path': '',
'category':'pdf','file_size':'','status':1,'create_by':'XueLingKun', 'full_path': '',
'create_time':'','page_size':'','content':''} 'category': 'pdf', 'file_size': '', 'status': 1, 'create_by': 'XueLingKun',
'create_time': '', 'page_size': '', 'content': ''}
headers['User-Agent'] = self.getRandomUserAgent() headers['User-Agent'] = self.getRandomUserAgent()
for i in range(0, 3): for i in range(0, 3):
try: try:
...@@ -643,49 +653,49 @@ class BaseCore: ...@@ -643,49 +653,49 @@ class BaseCore:
return retData return retData
def secrchATT(self,item_id,year,type_id): def secrchATT(self, item_id, year, type_id):
sel_sql = '''select id from clb_sys_attachment where item_id = %s and year = %s and type_id=%s ''' sel_sql = '''select id from clb_sys_attachment where item_id = %s and year = %s and type_id=%s '''
self.cursor_.execute(sel_sql, (item_id, year, type_id)) self.cursor_.execute(sel_sql, (item_id, year, type_id))
selects = self.cursor_.fetchone() selects = self.cursor_.fetchone()
return selects return selects
#插入到att表 返回附件id # 插入到att表 返回附件id
def tableUpdate(self,retData,com_name,year,pdf_name,num,pub_time): def tableUpdate(self, retData, com_name, year, pdf_name, num):
item_id = retData['item_id'] item_id = retData['item_id']
type_id = retData['type_id'] type_id = retData['type_id']
group_name = retData['group_name'] group_name = retData['group_name']
path = retData['path'] path = retData['path']
full_path = retData['full_path'] full_path = retData['full_path']
category = retData['category'] category = retData['category']
file_size = retData['file_size'] file_size = retData['file_size']
status = retData['status'] status = retData['status']
create_by = retData['create_by'] create_by = retData['create_by']
page_size = retData['page_size'] page_size = retData['page_size']
create_time = retData['create_time'] create_time = retData['create_time']
order_by = num order_by = num
selects = self.secrchATT(item_id,year,type_id) selects = self.secrchATT(item_id, year, type_id)
if selects: if selects:
self.getLogger().info(f'com_name:{com_name}--{year}已存在') self.getLogger().info(f'com_name:{com_name}--{year}已存在')
id = '' id = ''
return id return id
else: else:
Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,page_size,publish_time) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s.%s)''' Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,page_size) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
values = ( values = (
year, pdf_name, type_id, item_id, group_name, path, full_path, category, file_size, order_by, year, pdf_name, type_id, item_id, group_name, path, full_path, category, file_size, order_by,
status, create_by, status, create_by,
create_time, page_size,pub_time) create_time, page_size)
self.cursor_.execute(Upsql, values) # 插入 self.cursor_.execute(Upsql, values) # 插入
self.cnx_.commit() # 提交 self.cnx_.commit() # 提交
self.getLogger().info("更新完成:{}".format(Upsql)) self.getLogger().info("更新完成:{}".format(Upsql))
selects = self.secrchATT(item_id,year,type_id) selects = self.secrchATT(item_id, year, type_id)
id = selects[0] id = selects[0]
return id return id
# 更新企业的CIK # 更新企业的CIK
def updateCIK(self,social_code,cik): def updateCIK(self, social_code, cik):
try: try:
sql = f"UPDATE EnterpriseInfo SET CIK = '{cik}' WHERE SocialCode = '{social_code}'" sql = f"UPDATE EnterpriseInfo SET CIK = '{cik}' WHERE SocialCode = '{social_code}'"
cnn = self.pool_caiji.connection() cnn = self.pool_caiji.connection()
...@@ -698,9 +708,9 @@ class BaseCore: ...@@ -698,9 +708,9 @@ class BaseCore:
log = self.getLogger() log = self.getLogger()
log.info('======保存企业CIK失败=====') log.info('======保存企业CIK失败=====')
#上传至obs华为云服务器,并解析破地方的内容和页数 # 上传至obs华为云服务器,并解析破地方的内容和页数
# 获取文件大小 # 获取文件大小
def convert_size(self,size_bytes): def convert_size(self, size_bytes):
# 定义不同单位的转换值 # 定义不同单位的转换值
units = ['bytes', 'KB', 'MB', 'GB', 'TB'] units = ['bytes', 'KB', 'MB', 'GB', 'TB']
i = 0 i = 0
...@@ -709,7 +719,8 @@ class BaseCore: ...@@ -709,7 +719,8 @@ class BaseCore:
i += 1 i += 1
return f"{size_bytes:.2f} {units[i]}" return f"{size_bytes:.2f} {units[i]}"
def obsexist(self,file_path): # 查看obs文件是否已经上传
def obsexist(self, file_path):
# # 文件路径 # # 文件路径
# file_path = 'XQWAnnualReport/2023-10/浙江国祥股份有限公司首次公开发行股票并在主板上市暂缓发行公告.doc' # file_path = 'XQWAnnualReport/2023-10/浙江国祥股份有限公司首次公开发行股票并在主板上市暂缓发行公告.doc'
...@@ -718,10 +729,11 @@ class BaseCore: ...@@ -718,10 +729,11 @@ class BaseCore:
if response.status >= 300: if response.status >= 300:
self.getLogger().info('=====文件不存在obs=====') self.getLogger().info('=====文件不存在obs=====')
return True
else: else:
self.getLogger().info(f'=====文件存在obs========{file_path}') self.getLogger().info(f'=====文件存在obs========{file_path}')
def uptoOBS(self,pdf_url, name_pdf,type_id, social_code,pathType,taskType,start_time): def uptoOBS(self, pdf_url, name_pdf, type_id, social_code, pathType, taskType, start_time):
headers = {} headers = {}
retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': 'group1', 'path': '', retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': 'group1', 'path': '',
'full_path': '', 'full_path': '',
...@@ -737,19 +749,19 @@ class BaseCore: ...@@ -737,19 +749,19 @@ class BaseCore:
time.sleep(3) time.sleep(3)
continue continue
page_size = 0 page_size = 0
for i in range(0, 3): name = name_pdf + '.pdf'
try: now_time = time.strftime("%Y-%m")
name = name_pdf + '.pdf' try:
now_time = time.strftime("%Y-%m") result = self.getOBSres(pathType, now_time, name, response)
result = obsClient.putContent('zzsn', f'{pathType}{now_time}/' + name, content=response.content) except:
with fitz.open(stream=response.content, filetype='pdf') as doc: log = self.getLogger()
page_size = doc.page_count log.error(f'OBS发送失败')
for page in doc.pages(): return retData
retData['content'] += page.get_text()
break with fitz.open(stream=response.content, filetype='pdf') as doc:
except: page_size = doc.page_count
time.sleep(3) for page in doc.pages():
continue retData['content'] += page.get_text()
if page_size < 1: if page_size < 1:
# pdf解析失败 # pdf解析失败
...@@ -771,13 +783,8 @@ class BaseCore: ...@@ -771,13 +783,8 @@ class BaseCore:
return retData return retData
return retData return retData
@retry(tries=3,delay=1)
def getOBSres(self, pathType, now_time, name, response):
result = obsClient.putContent('zzsn', f'{pathType}{now_time}/' + name, content=response.content)
# resp = obsClient.putFile('zzsn', f'{pathType}{now_time}/' + name, file_path='要上传的那个文件的本地路径')
return result
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论