提交 59beffab 作者: 薛凌堃

工具包

上级 1e3071db
......@@ -5,29 +5,39 @@ import socket
import sys
import time
import fitz
import logbook
import logbook.more
import pandas as pd
import requests
import zhconv
import pymysql
import redis
from retry import retry
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from openpyxl import Workbook
import langid
#创建连接池
# 创建连接池
import pymysql
from pymysql import connections
from DBUtils.PooledDB import PooledDB
# import sys
# sys.path.append('D://zzsn_spider//base//fdfs_client')
from fdfs_client.client import get_tracker_conf, Fdfs_client
tracker_conf = get_tracker_conf('D:\\kkwork\\zzsn_spider\\base\\client.conf')
client = Fdfs_client(tracker_conf)
from obs import ObsClient
import fitz
from urllib.parse import unquote
obsClient = ObsClient(
access_key_id='VEHN7D0TJ9316H8AHCAV', # 你的华为云的ak码
secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk
server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址
)
# 注意 程序退出前 调用BaseCore.close() 关闭相关资源
class BaseCore:
......@@ -234,8 +244,9 @@ class BaseCore:
'Mozilla/5.0 (iPod; U; CPU iPhone OS 4_3_3 like Mac OS X; en-us) AppleWebKit/533.17.9 (KHTML, like Gecko) Version/5.0.2 Mobile/8J2 Safari/6533.18.5'
]
#Android agent池
__USER_PHONE_AGENT_LIST = ['Mozilla/5.0 (Linux; Android 7.1.1; OPPO R9sk) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.111 Mobile Safari/537.36']
# Android agent池
__USER_PHONE_AGENT_LIST = [
'Mozilla/5.0 (Linux; Android 7.1.1; OPPO R9sk) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.111 Mobile Safari/537.36']
def __init__(self):
# self.__cnx_proxy = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='clb_project',
......@@ -246,9 +257,9 @@ class BaseCore:
self.cursor = self.cnx.cursor()
#11数据库
# 11数据库
self.cnx_ = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='clb_project',
charset='utf8mb4')
charset='utf8mb4')
self.cursor_ = self.cnx_.cursor()
# 连接到Redis
self.r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=6)
......@@ -267,29 +278,15 @@ class BaseCore:
charset='utf8mb4'
)
self.pool_11 = PooledDB(
creator=pymysql,
maxconnections=5,
mincached=2,
maxcached=5,
blocking=True,
host='114.116.44.11',
port=3306,
user='caiji',
password='f7s0&7qqtK',
database='clb_project',
charset='utf8mb4'
)
def close(self):
try:
self.cursor.close()
self.cnx.close()
except :
except:
pass
# 计算耗时
def getTimeCost(self,start, end):
def getTimeCost(self, start, end):
seconds = int(end - start)
m, s = divmod(seconds, 60)
h, m = divmod(m, 60)
......@@ -302,6 +299,7 @@ class BaseCore:
else:
ms = int((end - start) * 1000)
return "%d毫秒" % (ms)
# 当前时间格式化
# 1 : 2001-01-01 12:00:00 %Y-%m-%d %H:%M:%S
# 2 : 010101120000 %y%m%d%H%M%S
......@@ -331,7 +329,7 @@ class BaseCore:
return "ZZSN" + self.getNowTime(2) + str(self.__seq).zfill(3)
# 日志格式
def logFormate(self,record, handler):
def logFormate(self, record, handler):
formate = "[{date}] [{level}] [{filename}] [{func_name}] [{lineno}] {msg}".format(
date=record.time, # 日志时间
level=record.level_name, # 日志等级
......@@ -341,11 +339,13 @@ class BaseCore:
msg=record.message # 日志内容
)
return formate
# 获取logger
def getLogger(self,fileLogFlag=True, stdOutFlag=True):
def getLogger(self, fileLogFlag=True, stdOutFlag=True):
pid = self.getPID()
dirname, filename = os.path.split(os.path.abspath(sys.argv[0]))
dirname = os.path.join(dirname, "logs")
filename = filename.replace(".py", "") + ".log"
filename = filename.replace(".py", "") + f"{pid}.log"
if not os.path.exists(dirname):
os.mkdir(dirname)
......@@ -391,34 +391,34 @@ class BaseCore:
proxy_list.append(proxy)
return proxy_list[random.randint(0, 3)]
#字符串截取
def getSubStr(self,str,beginStr,endStr):
if beginStr=='':
# 字符串截取
def getSubStr(self, str, beginStr, endStr):
if beginStr == '':
pass
else:
begin=str.rfind(beginStr)
if begin==-1:
begin=0
str=str[begin:]
if endStr=='':
begin = str.rfind(beginStr)
if begin == -1:
begin = 0
str = str[begin:]
if endStr == '':
pass
else:
end=str.rfind(endStr)
if end==-1:
end = str.rfind(endStr)
if end == -1:
pass
else:
str = str[0:end+1]
str = str[0:end + 1]
return str
# 繁体字转简体字
def hant_2_hans(self,hant_str: str):
def hant_2_hans(self, hant_str: str):
'''
Function: 将 hant_str 由繁体转化为简体
'''
return zhconv.convert(hant_str, 'zh-hans')
# 判断字符串里是否含数字
def str_have_num(self,str_num):
def str_have_num(self, str_num):
panduan = False
for str_1 in str_num:
......@@ -438,7 +438,7 @@ class BaseCore:
# return gw_item.decode() if gw_item else None
# 从Redis的List中获取并移除一个元素
def redicPullData(self,key):
def redicPullData(self, key):
item = self.r.lpop(key)
return item.decode() if item else None
......@@ -452,12 +452,13 @@ class BaseCore:
IP = socket.gethostbyname(socket.gethostname())
return IP
def mkPath(self,path):
def mkPath(self, path):
folder = os.path.exists(path)
if not folder: # 判断是否存在文件夹如果不存在则创建为文件夹
os.makedirs(path) # makedirs 创建文件时如果路径不存在会创建这个路径
else:
pass
# 生成google模拟浏览器 必须传入值为googledriver位置信息
# headless用于决定是否为无头浏览器,初始默认为无头浏览器
# 正常浏览器可用于开始对页面解析使用或一些网站无头时无法正常采集
......@@ -489,13 +490,14 @@ class BaseCore:
def getInfomation(self, social_code):
data = []
try:
sql = f"SELECT * FROM sys_base_enterprise_ipo WHERE social_credit_code = '{social_code}'and securities_type='新三板' and listed='1' "
sql = f"SELECT * FROM EnterpriseInfo WHERE SocialCode = '{social_code}'"
# self.cursor.execute(sql)
# data = self.cursor.fetchone()
conn = self.pool_11.connection()
conn = self.pool_caiji.connection()
cursor = conn.cursor()
cursor.execute(sql)
data = cursor.fetchone()
conn.commit()
data = list(data)
cursor.close()
conn.close()
......@@ -536,7 +538,7 @@ class BaseCore:
# self.cnx.commit()
cnn = self.pool_caiji.connection()
cursor = cnn.cursor()
cursor.execute(sql,values)
cursor.execute(sql, values)
cnn.commit()
cursor.close()
cnn.close()
......@@ -544,25 +546,26 @@ class BaseCore:
log = self.getLogger()
log.info('======保存日志失败=====')
#获取企查查token
# 获取企查查token
def GetToken(self):
#获取企查查token
# 获取企查查token
query = "select token from QCC_token "
# token = '67ec7402166df1da84ae83c4b95cefc0' # 需要隔两个小时左右抓包修改
self.cursor.execute(query)
token = self.cursor.fetchone()[0]
self.cnx.commit()
return token
#获取天眼查token
# 获取天眼查token
def GetTYCToken(self):
query = 'select token from TYC_token'
self.cursor.execute(query)
token = self.cursor.fetchone()[0]
self.cnx.commit()
return token
#检测语言
# 检测语言
def detect_language(self, text):
# 使用langid.py判断文本的语言
result = langid.classify(text)
......@@ -572,11 +575,11 @@ class BaseCore:
return 'cn'
return result[0]
#追加接入excel
def writerToExcel(self,detailList,filename):
# 追加接入excel
def writerToExcel(self, detailList, filename):
# filename='baidu搜索.xlsx'
# 读取已存在的xlsx文件
existing_data = pd.read_excel(filename,engine='openpyxl',dtype=str)
existing_data = pd.read_excel(filename, engine='openpyxl', dtype=str)
# 创建新的数据
new_data = pd.DataFrame(data=detailList)
# 将新数据添加到现有数据的末尾
......@@ -585,19 +588,19 @@ class BaseCore:
combined_data.to_excel(filename, index=False)
# return combined_data
#对失败或者断掉的企业 重新放入redis
def rePutIntoR(self,key,item):
# 对失败或者断掉的企业 重新放入redis
def rePutIntoR(self, key, item):
self.r.rpush(key, item)
#增加计数器的值并返回增加后的值
def incrSet(self,key):
# 增加计数器的值并返回增加后的值
def incrSet(self, key):
# 增加计数器的值并返回增加后的值
new_value = self.r.incr(key)
print("增加后的值:", new_value)
return new_value
#获取key剩余的过期时间
def getttl(self,key):
# 获取key剩余的过期时间
def getttl(self, key):
# 获取key的剩余过期时间
ttl = self.r.ttl(key)
print("剩余过期时间:", ttl)
......@@ -607,12 +610,14 @@ class BaseCore:
self.r.set(key, 0)
self.r.expire(key, 3600)
time.sleep(2)
#上传至文件服务器,并解析pdf的内容和页数
def upLoadToServe(self,pdf_url,type_id,social_code):
# 上传至文件服务器,并解析pdf的内容和页数
def upLoadToServe(self, pdf_url, type_id, social_code):
headers = {}
retData = {'state':False,'type_id':type_id,'item_id':social_code,'group_name':'group1','path':'','full_path':'',
'category':'pdf','file_size':'','status':1,'create_by':'XueLingKun',
'create_time':'','page_size':'','content':''}
retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': 'group1', 'path': '',
'full_path': '',
'category': 'pdf', 'file_size': '', 'status': 1, 'create_by': 'XueLingKun',
'create_time': '', 'page_size': '', 'content': ''}
headers['User-Agent'] = self.getRandomUserAgent()
for i in range(0, 3):
try:
......@@ -649,51 +654,55 @@ class BaseCore:
return retData
def secrchATT(self,item_id,year,type_id):
sel_sql = '''select id from clb_sys_attachment where item_id = %s and year = %s and type_id=%s '''
self.cursor_.execute(sel_sql, (item_id, year, type_id))
def secrchATT(self, item_id, year, type_id,path):
sel_sql = '''select id from clb_sys_attachment where item_id = %s and year = %s and type_id=%s and path=%s'''
self.cursor_.execute(sel_sql, (item_id, year, type_id,path))
selects = self.cursor_.fetchone()
return selects
def deliteATT(self,id):
delitesql = f"delete from clb_sys_attachment where id = '{id}' "
self.cursor_.execute(delitesql)
self.cnx_.commit()
# 插入到att表 返回附件id
def tableUpdate(self, retData, com_name, year, pdf_name, num, pub_time):
item_id = retData['item_id']
type_id = retData['type_id']
group_name = retData['group_name']
path = retData['path']
full_path = retData['full_path']
category = retData['category']
file_size = retData['file_size']
status = retData['status']
create_by = retData['create_by']
page_size = retData['page_size']
create_time = retData['create_time']
order_by = num
# selects = self.secrchATT(item_id, year, type_id)
#
# if selects:
# self.getLogger().info(f'com_name:{com_name}--{year}已存在')
# id = ''
# return id
# else:
#插入到att表 返回附件id
def tableUpdate(self,retData,com_name,year,pdf_name,num):
item_id = retData['item_id']
type_id = retData['type_id']
group_name = retData['group_name']
path = retData['path']
full_path = retData['full_path']
category = retData['category']
file_size = retData['file_size']
status = retData['status']
create_by = retData['create_by']
page_size = retData['page_size']
create_time = retData['create_time']
order_by = num
selects = self.secrchATT(item_id,year,type_id)
# sel_sql = '''select id,item_id from clb_sys_attachment where item_id = %s and year = %s and type_id=%s '''
# self.cursor.execute(sel_sql, (item_id, year,type_id))
# selects = self.cursor.fetchone()
if selects:
self.getLogger().info(f'com_name:{com_name}已存在')
id = selects[0]
return id
else:
Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,page_size) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,page_size,object_key,bucket_name,publish_time) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
values = (
year, pdf_name, type_id, item_id, group_name, path, full_path, category, file_size, order_by,
status, create_by,
create_time, page_size,path,'zzsn',create_time)
values = (
year, pdf_name, type_id, item_id, group_name, path, full_path, category, file_size, order_by,
status, create_by,
create_time, page_size)
self.cursor_.execute(Upsql, values) # 插入
self.cnx_.commit() # 提交
self.getLogger().info("更新完成:{}".format(Upsql))
self.cursor_.execute(Upsql, values) # 插入
self.cnx_.commit() # 提交
self.getLogger().info("更新完成:{}".format(Upsql))
selects = self.secrchATT(item_id,year,type_id)
id = selects[0]
return id
selects = self.secrchATT(item_id, year, type_id,path)
id = selects[0]
return id
# 更新企业的CIK
def updateCIK(self,social_code,cik):
def updateCIK(self, social_code, cik):
try:
sql = f"UPDATE EnterpriseInfo SET CIK = '{cik}' WHERE SocialCode = '{social_code}'"
cnn = self.pool_caiji.connection()
......@@ -706,12 +715,84 @@ class BaseCore:
log = self.getLogger()
log.info('======保存企业CIK失败=====')
# 上传至obs华为云服务器,并解析破地方的内容和页数
# 获取文件大小
def convert_size(self, size_bytes):
# 定义不同单位的转换值
units = ['bytes', 'KB', 'MB', 'GB', 'TB']
i = 0
while size_bytes >= 1024 and i < len(units) - 1:
size_bytes /= 1024
i += 1
return f"{size_bytes:.2f} {units[i]}"
# 查看obs文件是否已经上传
def obsexist(self, file_path):
# # 文件路径
# file_path = 'XQWAnnualReport/2023-10/浙江国祥股份有限公司首次公开发行股票并在主板上市暂缓发行公告.doc'
# 检查文件是否存在
response = obsClient.getObjectMetadata('zzsn', file_path)
if response.status >= 300:
self.getLogger().info('=====文件不存在obs=====')
return True
else:
self.getLogger().info(f'=====文件存在obs========{file_path}')
def uptoOBS(self, pdf_url, name_pdf, type_id, social_code, pathType, taskType, start_time,create_by,file_path):
headers = {}
retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': '', 'path': '',
'full_path': '',
'category': 'pdf', 'file_size': '', 'status': 1, 'create_by': create_by,
'create_time': '', 'page_size': '', 'content': ''}
headers['User-Agent'] = self.getRandomUserAgent()
for i in range(0, 3):
try:
response = requests.get(pdf_url, headers=headers, verify=False, timeout=20)
file_size = int(response.headers.get('Content-Length'))
break
except:
time.sleep(3)
continue
page_size = 0
name = name_pdf + '.pdf'
now_time = time.strftime("%Y-%m")
try:
result = self.getOBSres(pathType, now_time, name, file_path)
except:
log = self.getLogger()
log.error(f'OBS发送失败')
return retData
with fitz.open(stream=response.content, filetype='pdf') as doc:
page_size = doc.page_count
for page in doc.pages():
retData['content'] += page.get_text()
if page_size < 1:
# pdf解析失败
# print(f'======pdf解析失败=====')
return retData
else:
try:
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
retData['state'] = True
retData['path'] = unquote(result['body']['objectUrl'].split('.com')[1])
retData['full_path'] = unquote(result['body']['objectUrl'])
retData['file_size'] = self.convert_size(file_size)
retData['create_time'] = time_now
retData['page_size'] = page_size
except Exception as e:
state = 0
takeTime = self.getTimeCost(start_time, time.time())
self.recordLog(social_code, taskType, state, takeTime, pdf_url, f'{e}')
return retData
return retData
@retry(tries=3, delay=1)
def getOBSres(self, pathType, now_time, name, response):
# result = obsClient.putContent('zzsn', f'{pathType}{now_time}/' + name, content=response.content)
result = obsClient.putFile('zzsn', f'{pathType}{now_time}/' + name, file_path=response)
return result
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论