提交 fcae0da4 作者: 薛凌堃

证监会公告文件上传obs

上级 1a56ae4f
import json
import json
......@@ -18,12 +18,23 @@ cnx_ = baseCore.cnx_
cursor_ = baseCore.cursor_
taskType = '企业公告/证监会'
obsClient = ObsClient(
access_key_id='VEHN7D0TJ9316H8AHCAV', # 你的华为云的ak码
secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk
server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址
)
#获取文件大小
def convert_size(size_bytes):
# 定义不同单位的转换值
units = ['bytes', 'KB', 'MB', 'GB', 'TB']
i = 0
while size_bytes >= 1024 and i < len(units)-1:
size_bytes /= 1024
i += 1
return f"{size_bytes:.2f} {units[i]}"
def uptoOBS(pdf_url,pdf_name,type_id,social_code):
headers = {}
retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': 'group1', 'path': '',
......@@ -33,7 +44,8 @@ def uptoOBS(pdf_url,pdf_name,type_id,social_code):
headers['User-Agent'] = baseCore.getRandomUserAgent()
for i in range(0, 3):
try:
resp_content = requests.get(pdf_url, headers=headers, verify=False, timeout=20).content
response = requests.get(pdf_url, headers=headers,verify=False, timeout=20)
file_size = int(response.headers.get('Content-Length'))
break
except:
time.sleep(3)
......@@ -42,8 +54,9 @@ def uptoOBS(pdf_url,pdf_name,type_id,social_code):
for i in range(0, 3):
try:
name = pdf_name + '.pdf'
result = obsClient.putContent('zzsn', 'ZJH/'+name, content=resp_content)
with fitz.open(stream=resp_content, filetype='pdf') as doc:
now_time = time.strftime("%Y-%m")
result = obsClient.putContent('zzsn', f'ZJH/{now_time}/'+name, content=response.content)
with fitz.open(stream=response.content, filetype='pdf') as doc:
page_size = doc.page_count
for page in doc.pages():
retData['content'] += page.get_text()
......@@ -60,12 +73,15 @@ def uptoOBS(pdf_url,pdf_name,type_id,social_code):
try:
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
retData['state'] = True
retData['path'] = result['body']['objectUrl'].split('/ZJH')[0]
retData['path'] = result['body']['objectUrl'].split('.com')[1]
retData['full_path'] = unquote(result['body']['objectUrl'])
retData['file_size'] = result['Uploaded size']
retData['file_size'] = convert_size(file_size)
retData['create_time'] = time_now
retData['page_size'] = page_size
except:
except Exception as e:
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, f'{e}')
return retData
return retData
......@@ -76,7 +92,6 @@ def secrchATT(item_id, name, type_id):
selects = cursor_.fetchone()
return selects
# 插入到att表 返回附件id
def tableUpdate(retData, com_name, year, pdf_name, num):
item_id = retData['item_id']
......@@ -94,8 +109,8 @@ def tableUpdate(retData, com_name, year, pdf_name, num):
selects = secrchATT(item_id, pdf_name, type_id)
if selects:
log.info(f'com_name:{com_name}已存在')
id = selects[0]
log.info(f'pdf_name:{pdf_name}已存在')
id = ''
return id
else:
Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,page_size) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
......@@ -125,13 +140,20 @@ def RequestUrl(url, payload, social_code,start_time):
pass
# 检查响应状态码
if response.status_code == 200:
# 请求成功,处理响应数据
# print(response.text)
soup = BeautifulSoup(response.text, 'html.parser')
pass
else:
# 请求失败,输出错误信息
try:
if response.status_code == 200:
# 请求成功,处理响应数据
# print(response.text)
soup = BeautifulSoup(response.text, 'html.parser')
pass
else:
# 请求失败,输出错误信息
log.error('请求失败:', url)
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, '请求失败')
soup = ''
except:
log.error('请求失败:', url)
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
......@@ -215,7 +237,6 @@ def getUrl(code, url_parms, Catagory2_parms):
}
return dic_parms
def ifInstert(short_name, social_code, pdf_url):
ifexist = True
......@@ -229,16 +250,19 @@ def ifInstert(short_name, social_code, pdf_url):
return ifexist
else:
return ifexist
def InsterInto(short_name, social_code, pdf_url):
def InsterInto(social_code, pdf_url,pub_time):
insert = False
# 信息插入数据库
try:
insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,create_time) values(%s,%s,%s,%s,now())'''
insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,publish_time,create_time) values(%s,%s,%s,%s,%s,now())'''
list_info = [
social_code,
pdf_url,
'证监会',
'1',
pub_time,
]
#144数据库
cursor.execute(insert_sql, tuple(list_info))
......@@ -251,7 +275,6 @@ def InsterInto(short_name, social_code, pdf_url):
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, '数据库传输失败')
return insert
def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time,com_name,num):
#上传至华为云服务器
retData = uptoOBS(pdf_url,pdf_name,8,social_code)
......@@ -263,12 +286,11 @@ def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time,com_na
return False
num = num + 1
att_id = tableUpdate(retData,com_name,year,pdf_name,num)
content = retData['content']
if retData['state']:
if att_id:
pass
else:
log.info(f'====pdf解析失败====')
return False
content = retData['content']
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_news = {
......@@ -304,7 +326,7 @@ def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time,com_na
'message': '操作成功',
'code': '200',
}
print(dic_result)
log.info(dic_result)
return True
except Exception as e:
dic_result = {
......@@ -316,14 +338,11 @@ def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time,com_na
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, 'Kafka操作失败')
print(dic_result)
log.info(dic_result)
return False
# 采集信息
def SpiderByZJH(url, payload, dic_info, start_time,num): # dic_info 数据库中获取到的基本信息
okCount = 0
errorCount = 0
social_code = dic_info[2]
short_name = dic_info[4]
com_name = dic_info[1]
......@@ -335,26 +354,26 @@ def SpiderByZJH(url, payload, dic_info, start_time,num): # dic_info 数据库
try:
is_exist = soup.find('div',class_='con').text
if is_exist == '没有查询到数据':
state = 1
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, '')
baseCore.recordLog(social_code, taskType, state, takeTime, url, '没有查询到数据')
return
except:
pass
# 先获取页数
page = soup.find('div', class_='pages').find('ul', class_='g-ul').text
total = re.findall(r'\d+', page)[0]
r_page = int(total) % 15
if r_page == 0:
Maxpage = int(total) // 15
else:
Maxpage = int(total) // 15 + 1
log.info(f'{short_name}====={code}===========一共{total}条,{Maxpage}页')
# 首页和其他页不同,遍历 如果是首页 修改一下链接
for i in range(1, Maxpage + 1):
# # 先获取页数
# page = soup.find('div', class_='pages').find('ul', class_='g-ul').text
#
# total = re.findall(r'\d+', page)[0]
#
# r_page = int(total) % 15
# if r_page == 0:
# Maxpage = int(total) // 15
# else:
# Maxpage = int(total) // 15 + 1
# log.info(f'{short_name}====={code}===========一共{total}条,{Maxpage}页')
# # 首页和其他页不同,遍历 如果是首页 修改一下链接
for i in range(1,51):
log.info(f'==========正在采集第{i}页=========')
if i == 1:
href = url
......@@ -366,9 +385,9 @@ def SpiderByZJH(url, payload, dic_info, start_time,num): # dic_info 数据库
if soup == '':
continue
tr_list = soup.find('div', id='txt').find_all('tr')
pageIndex = 0
# pageIndex = 0
for tr in tr_list[1:]:
pageIndex += 1
# pageIndex += 1
td_list = tr.find_all('td')
pdf_url_info = td_list[2]
# print(pdf_url)
......@@ -376,6 +395,12 @@ def SpiderByZJH(url, payload, dic_info, start_time,num): # dic_info 数据库
name_pdf = pdf_url_info['onclick'].strip('downloadPdf1(').split('\',')[1].strip('\'')
pub_time = pdf_url_info['onclick'].strip('downloadPdf1(').split('\',')[2].strip('\'')
#todo:判断发布日期是否是日期格式
pattern = r"^\d{4}-\d{2}-\d{2}$" # 正则表达式匹配YYYY-MM-DD格式的日期
if re.match(pattern, pub_time):
pass
else:
continue
year = pub_time[:4]
report_type = td_list[4].text.strip()
......@@ -383,30 +408,22 @@ def SpiderByZJH(url, payload, dic_info, start_time,num): # dic_info 数据库
ifexist = ifInstert(short_name, social_code, pdf_url)
#如果不存在 ifexist = True
if ifexist:
# # 公告信息列表
# okCount = okCount + 1
# 解析PDF内容,先获取PDF链接 下载 解析成功,解析失败 ,传输成功,传输失败
log.info(f'======={short_name}========{code}===插入公告库成功')
result = GetContent(pdf_url, name_pdf, social_code, year, pub_time, start_time,com_name,num)
if result:
# 公告信息列表
okCount = okCount + 1
log.info(f'{short_name}==============解析传输操作成功')
state = 1
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, '')
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, '成功')
#发送kafka成功之后 再插入数据库
insert = InsterInto(social_code,pdf_url,pub_time)
if insert:
log.info(f'===={social_code}========{name_pdf}=====插入库成功')
pass
else:
errorCount += 1
# time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
log.error(f'{short_name}=============解析或传输操作失败')
# try:
# insert_err_sql = f"insert into dt_err(xydm,`from`,url,title,pub_date,zhaiyao,create_date,state,pageNo,pageIndex,type) values('{social_code}','证监会','{pdf_url}','{name_pdf}','{pub_time}',' ',now(),1,{i},{pageIndex},'1')"
# cursor_.execute(insert_err_sql)
# cnx_.commit()
# except:
# pass
continue
else:
log.info(f'======={short_name}========{code}===已存在')
......@@ -449,14 +466,15 @@ if __name__ == '__main__':
while True:
start_time = time.time()
# 获取企业信息
social_code = baseCore.redicPullData('NoticeEnterprise:gnqy_socialCode')
# social_code = '9110000071092841XX'
# social_code = baseCore.redicPullData('NoticeEnterprise:gnqy_socialCode')
social_code = '91440500617540496Q'
# 判断 如果Redis中已经没有数据,则等待
if social_code == None:
time.sleep(20)
continue
dic_info = baseCore.getInfomation(social_code)
count = dic_info[16]
count = dic_info[17]
# 沪市http://eid.csrc.gov.cn/101111/index.html 深市http://eid.csrc.gov.cn/101811/index.html 北交所http://eid.csrc.gov.cn/102611/index.html
# url 变量 翻页 栏目 http://eid.csrc.gov.cn/101811/index_3_f.html
......@@ -474,11 +492,14 @@ if __name__ == '__main__':
com_name = dic_info[1]
dic_parms = getUrl(code, url_parms, Catagory2_parms)
dic_parms_ls = getUrl(code, url_parms_ls, Catagory2_parms_ls)
if dic_parms:
start_time_cj = time.time()
log.info(f'======开始处理{com_name}=====发行公告=======')
SpiderByZJH(dic_parms["url"], dic_parms["payload"], dic_info, start_time,num)
log.info(f'{code}==========={short_name},{com_name},发行公告,耗时{baseCore.getTimeCost(start_time_cj, time.time())}')
start_time_ls = time.time()
log.info(f'======开始处理{com_name}=====临时报告=======')
SpiderByZJH(dic_parms_ls['url'], dic_parms_ls['payload'], dic_info, start_time,num)
log.info(f'{code}==========={short_name},{com_name},临时报告,耗时{baseCore.getTimeCost(start_time_ls, time.time())}')
# UpdateInfoSql(retData,retData_ls,social_code)
......@@ -488,10 +509,6 @@ if __name__ == '__main__':
count += 1
runType = 'NoticeReportCount'
baseCore.updateRun(code, runType, count)
cursor.close()
cnx.close()
# cursor_.close()
# cnx_.close()
# 释放资源
baseCore.close()
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论