提交 727d72b9 作者: 薛凌堃

企业公告调整

上级 5641214f
import json
import json import json
import re import re
import time import time
import uuid
from datetime import datetime
import requests import requests
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from kafka import KafkaProducer from kafka import KafkaProducer
from retry import retry
from base import BaseCore from base import BaseCore
from obs import ObsClient from obs import ObsClient
import fitz import fitz
...@@ -24,6 +30,11 @@ obsClient = ObsClient( ...@@ -24,6 +30,11 @@ obsClient = ObsClient(
secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk
server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址 server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址
) )
pathType = 'QYNotice/'
def getuuid():
get_timestamp_uuid = uuid.uuid1() # 根据 时间戳生成 uuid , 保证全球唯一
return get_timestamp_uuid
#获取文件大小 #获取文件大小
def convert_size(size_bytes): def convert_size(size_bytes):
...@@ -44,26 +55,28 @@ def uptoOBS(pdf_url,pdf_name,type_id,social_code): ...@@ -44,26 +55,28 @@ def uptoOBS(pdf_url,pdf_name,type_id,social_code):
headers['User-Agent'] = baseCore.getRandomUserAgent() headers['User-Agent'] = baseCore.getRandomUserAgent()
for i in range(0, 3): for i in range(0, 3):
try: try:
response = requests.get(pdf_url, headers=headers,verify=False, timeout=20) response = requests.get(pdf_url, headers=headers, verify=False, timeout=20)
file_size = int(response.headers.get('Content-Length')) file_size = int(response.headers.get('Content-Length'))
break break
except: except:
time.sleep(3) time.sleep(3)
continue continue
page_size = 0 page_size = 0
for i in range(0, 3): name = str(getuuid()) + '.pdf'
try:
name = pdf_name
now_time = time.strftime("%Y-%m") now_time = time.strftime("%Y-%m")
result = obsClient.putContent('zzsn', 'QYNotice/'+name, content=response.content) try:
result = getOBSres(pathType, name, response)
except:
log.error(f'OBS发送失败')
return retData
try:
with fitz.open(stream=response.content, filetype='pdf') as doc: with fitz.open(stream=response.content, filetype='pdf') as doc:
page_size = doc.page_count page_size = doc.page_count
for page in doc.pages(): for page in doc.pages():
retData['content'] += page.get_text() retData['content'] += page.get_text()
break
except: except:
time.sleep(3) log.error(f'文件损坏')
continue return retData
if page_size < 1: if page_size < 1:
# pdf解析失败 # pdf解析失败
...@@ -73,8 +86,8 @@ def uptoOBS(pdf_url,pdf_name,type_id,social_code): ...@@ -73,8 +86,8 @@ def uptoOBS(pdf_url,pdf_name,type_id,social_code):
try: try:
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
retData['state'] = True retData['state'] = True
retData['path'] = unquote(result['body']['objectUrl'].split('.com')[1]) retData['path'] = result['body']['objectUrl'].split('.com')[1]
retData['full_path'] = unquote(result['body']['objectUrl']) retData['full_path'] = result['body']['objectUrl']
retData['file_size'] = convert_size(file_size) retData['file_size'] = convert_size(file_size)
retData['create_time'] = time_now retData['create_time'] = time_now
retData['page_size'] = page_size retData['page_size'] = page_size
...@@ -86,15 +99,21 @@ def uptoOBS(pdf_url,pdf_name,type_id,social_code): ...@@ -86,15 +99,21 @@ def uptoOBS(pdf_url,pdf_name,type_id,social_code):
return retData return retData
def secrchATT(item_id, name, type_id,order_by): @retry(tries=3, delay=1)
sel_sql = '''select id from clb_sys_attachment where item_id = %s and name = %s and type_id=%s and order_by=%s ''' def getOBSres(pathType,name, response):
cursor_.execute(sel_sql, (item_id, name, type_id,order_by)) result = obsClient.putContent('zzsn', pathType + name, content=response.content)
select = cursor_.fetchall() # resp = obsClient.putFile('zzsn', pathType + name, file_path='要上传的那个文件的本地路径')
selects = select[-1] return result
def secrchATT(item_id, retData, type_id,order_by):
sel_sql = '''select id from clb_sys_attachment where item_id = %s and path = %s and type_id=%s and order_by=%s '''
cursor_.execute(sel_sql, (item_id, retData['path'], type_id,order_by))
selects = cursor_.fetchone()
return selects return selects
# 插入到att表 返回附件id # 插入到att表 返回附件id
def tableUpdate(retData, com_name, year, pdf_name, num): def tableUpdate(retData, com_name, year, pdf_name, num,pub_time,origin):
item_id = retData['item_id'] item_id = retData['item_id']
type_id = retData['type_id'] type_id = retData['type_id']
group_name = retData['group_name'] group_name = retData['group_name']
...@@ -115,18 +134,19 @@ def tableUpdate(retData, com_name, year, pdf_name, num): ...@@ -115,18 +134,19 @@ def tableUpdate(retData, com_name, year, pdf_name, num):
# return id # return id
# else: # else:
try: try:
Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,page_size,object_key,bucket_name) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''' Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,page_size,object_key,bucket_name,publish_time,source) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
values = ( values = (
year, pdf_name, type_id, item_id, group_name, path, full_path, category, file_size, order_by, year, pdf_name, type_id, item_id, group_name, path, full_path, category, file_size, order_by,
status, create_by, status, create_by,
create_time, page_size,full_path.split('https://zzsn.obs.cn-north-1.myhuaweicloud.com/')[1],'zzsn') create_time, page_size, full_path.split('https://zzsn.obs.cn-north-1.myhuaweicloud.com/')[1], 'zzsn',
pub_time, origin)
cursor_.execute(Upsql, values) # 插入 cursor_.execute(Upsql, values) # 插入
cnx_.commit() # 提交 cnx_.commit() # 提交
except Exception as e: except Exception as e:
print(e) print(e)
log.info(f"更新完成:{item_id}===={pdf_name}") log.info(f"更新完成:{item_id}===={pdf_name}")
selects = secrchATT(item_id, pdf_name, type_id,order_by) selects = secrchATT(item_id, retData, type_id,order_by)
id = selects[0] id = selects[0]
return id return id
...@@ -300,7 +320,8 @@ def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time,com_na ...@@ -300,7 +320,8 @@ def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time,com_na
log.info(f'====pdf解析失败====') log.info(f'====pdf解析失败====')
return False return False
num = num + 1 num = num + 1
att_id = tableUpdate(retData,com_name,year,pdf_name,num) origin = '证监会'
att_id = tableUpdate(retData,com_name,year,pdf_name,num,pub_time,origin)
if att_id: if att_id:
pass pass
else: else:
...@@ -318,7 +339,7 @@ def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time,com_na ...@@ -318,7 +339,7 @@ def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time,com_na
'id': '', 'id': '',
'keyWords': '', 'keyWords': '',
'lang': 'zh', 'lang': 'zh',
'origin': '证监会', 'origin': origin,
'publishDate': pub_time, 'publishDate': pub_time,
'sid': '1684032033495392257', 'sid': '1684032033495392257',
'sourceAddress': pdf_url, # 原文链接 'sourceAddress': pdf_url, # 原文链接
...@@ -394,7 +415,7 @@ def SpiderByZJH(url, payload, dic_info, start_time,num): # dic_info 数据库 ...@@ -394,7 +415,7 @@ def SpiderByZJH(url, payload, dic_info, start_time,num): # dic_info 数据库
pass pass
else: else:
Maxpage = 50 Maxpage = 50
for i in range(1,Maxpage): for i in range(1,Maxpage+1):
log.info(f'==========正在采集第{i}页=========') log.info(f'==========正在采集第{i}页=========')
if i == 1: if i == 1:
href = url href = url
...@@ -415,17 +436,22 @@ def SpiderByZJH(url, payload, dic_info, start_time,num): # dic_info 数据库 ...@@ -415,17 +436,22 @@ def SpiderByZJH(url, payload, dic_info, start_time,num): # dic_info 数据库
pdf_url = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[0].strip('\'') pdf_url = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[0].strip('\'')
name_pdf = pdf_url_info['onclick'].strip('downloadPdf1(').split('\',')[1].strip('\'') + '.pdf' name_pdf = pdf_url_info['onclick'].strip('downloadPdf1(').split('\',')[1].strip('\'') + '.pdf'
pub_time = pdf_url_info['onclick'].strip('downloadPdf1(').split('\',')[2].strip('\'') pub_time_ = pdf_url_info['onclick'].strip('downloadPdf1(').split('\',')[2].strip('\'')
#todo:判断发布日期是否是日期格式 #todo:判断发布日期是否是日期格式
pattern = r"^\d{4}-\d{2}-\d{2}$" # 正则表达式匹配YYYY-MM-DD格式的日期 pattern = r"^\d{4}-\d{2}-\d{2}$" # 正则表达式匹配YYYY-MM-DD格式的日期
date_time_pattern = r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}" date_time_pattern = r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}"
if re.match(pattern, pub_time): if re.match(pattern, pub_time_):
pass pass
else: else:
if re.match(date_time_pattern, pub_time): if re.match(date_time_pattern, pub_time_):
pass pass
else: else:
continue continue
# 将时间年月日字符串转换为datetime对象
date_object = datetime.strptime(pub_time_, "%Y-%m-%d")
# 将datetime对象转换为年月日时分秒字符串
pub_time = date_object.strftime("%Y-%m-%d %H:%M:%S")
year = pub_time[:4] year = pub_time[:4]
report_type = td_list[4].text.strip() report_type = td_list[4].text.strip()
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论