提交 632d5a17 作者: 薛凌堃

企业公告维护

上级 8cf6e366
import os import os
...@@ -46,22 +46,41 @@ def uptoOBS(pdf_url,pdf_name,type_id,social_code): ...@@ -46,22 +46,41 @@ def uptoOBS(pdf_url,pdf_name,type_id,social_code):
'category': category, 'file_size': '', 'status': 1, 'create_by': 'XueLingKun', 'category': category, 'file_size': '', 'status': 1, 'create_by': 'XueLingKun',
'create_time': '', 'page_size': '', 'content': ''} 'create_time': '', 'page_size': '', 'content': ''}
headers['User-Agent'] = baseCore.getRandomUserAgent() headers['User-Agent'] = baseCore.getRandomUserAgent()
for i in range(0, 3): if category == '.pdf':
try: try:
response = requests.get(pdf_url, headers=headers,verify=False, timeout=20) response = requests.get(pdf_url, headers=headers, verify=False, timeout=20)
if response.status_code != 200: if response.status_code != 200:
return retData return retData
file_size = int(response.headers.get('Content-Length')) file_size = int(response.headers.get('Content-Length'))
retData['content'] = response.text with fitz.open(stream=response.content, filetype='pdf') as doc:
#todo:判断内容是否成功 page_size = doc.page_count
for page in doc.pages():
retData['content'] += page.get_text()
# todo:判断内容是否成功
if '<div class="K">403</div>' in retData['content'] or 'Error Times: ' in retData['content']: if '<div class="K">403</div>' in retData['content'] or 'Error Times: ' in retData['content']:
return retData return retData
else: else:
break pass
except: except:
time.sleep(3) log.error(f'文件损坏')
continue return retData
page_size = 1 else:
for i in range(0, 3):
try:
page_size = 1
response = requests.get(pdf_url, headers=headers,verify=False, timeout=20)
if response.status_code != 200:
return retData
file_size = int(response.headers.get('Content-Length'))
retData['content'] = response.text
#todo:判断内容是否成功
if '<div class="K">403</div>' in retData['content'] or 'Error Times: ' in retData['content']:
return retData
else:
break
except:
time.sleep(3)
continue
name = str(getuuid()) + category name = str(getuuid()) + category
try: try:
result = getOBSres(pathType, name, response) result = getOBSres(pathType, name, response)
...@@ -85,7 +104,7 @@ def uptoOBS(pdf_url,pdf_name,type_id,social_code): ...@@ -85,7 +104,7 @@ def uptoOBS(pdf_url,pdf_name,type_id,social_code):
except Exception as e: except Exception as e:
state = 0 state = 0
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, f'{e}') #baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, f'{e}')
return retData return retData
return retData return retData
......
"""
"""
港股公告-更换采用接口的方式
"""
import os
import subprocess
import sys
import uuid
import fitz
import requests
from bs4 import BeautifulSoup
import time, json
from kafka import KafkaProducer
from obs import ObsClient
from urllib.parse import unquote
from retry import retry
from base import BaseCore
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
cnx = baseCore.cnx
cursor = baseCore.cursor
pathType = 'QYNotice/'
cnx_ = baseCore.cnx_
cursor_ = baseCore.cursor_
obsClient = ObsClient(
access_key_id='VEHN7D0TJ9316H8AHCAV', # 你的华为云的ak码
secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk
server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址
)
#获取文件大小
def convert_size(size_bytes):
# 定义不同单位的转换值
units = ['bytes', 'KB', 'MB', 'GB', 'TB']
i = 0
while size_bytes >= 1024 and i < len(units)-1:
size_bytes /= 1024
i += 1
return f"{size_bytes:.2f} {units[i]}"
def getuuid():
get_timestamp_uuid = uuid.uuid1() # 根据 时间戳生成 uuid , 保证全球唯一
return get_timestamp_uuid
def uptoOBS(pdf_url,pdf_name,type_id,social_code):
headers = {}
category = os.path.splitext(pdf_url)[1]
retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': '', 'path': '',
'full_path': '',
'category': category, 'file_size': '', 'status': 1, 'create_by': 'XueLingKun',
'create_time': '', 'page_size': '', 'content': ''}
headers['User-Agent'] = baseCore.getRandomUserAgent()
if category == '.pdf':
try:
response = requests.get(pdf_url, headers=headers, verify=False, timeout=20)
if response.status_code != 200:
return retData
file_size = int(response.headers.get('Content-Length'))
with fitz.open(stream=response.content, filetype='pdf') as doc:
page_size = doc.page_count
for page in doc.pages():
retData['content'] += page.get_text()
# todo:判断内容是否成功
if '<div class="K">403</div>' in retData['content'] or 'Error Times: ' in retData['content']:
return retData
else:
pass
except:
log.error(f'文件损坏')
return retData
else:
for i in range(0, 3):
try:
page_size = 1
response = requests.get(pdf_url, headers=headers,verify=False, timeout=20)
if response.status_code != 200:
return retData
file_size = int(response.headers.get('Content-Length'))
retData['content'] = response.text
#todo:判断内容是否成功
if '<div class="K">403</div>' in retData['content'] or 'Error Times: ' in retData['content']:
return retData
else:
break
except:
time.sleep(3)
continue
name = str(getuuid()) + category
try:
result = getOBSres(pathType, name, response)
except:
log.error(f'OBS发送失败')
return retData
if page_size < 1:
# pdf解析失败
# print(f'======pdf解析失败=====')
return retData
else:
try:
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
retData['state'] = True
retData['path'] = unquote(result['body']['objectUrl'].split('.com')[1])
retData['full_path'] = unquote(result['body']['objectUrl'])
retData['file_size'] = convert_size(file_size)
retData['create_time'] = time_now
retData['page_size'] = page_size
except Exception as e:
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
#baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, f'{e}')
return retData
return retData
@retry(tries=3, delay=1)
def getOBSres(pathType,name, response):
result = obsClient.putContent('zzsn', pathType + name, content=response.content)
# resp = obsClient.putFile('zzsn', pathType + name, file_path='要上传的那个文件的本地路径')
return result
def secrchATT(item_id, retData, type_id):
sel_sql = '''select id from clb_sys_attachment where item_id = %s and path = %s and type_id=%s '''
cursor_.execute(sel_sql, (item_id, retData['path'], type_id))
selects = cursor_.fetchone()
return selects
# 插入到att表 返回附件id
def tableUpdate(retData, com_name, year, pdf_name, num):
item_id = retData['item_id']
type_id = retData['type_id']
group_name = retData['group_name']
path = retData['path']
full_path = retData['full_path']
category = retData['category']
file_size = retData['file_size']
status = retData['status']
create_by = retData['create_by']
page_size = retData['page_size']
create_time = retData['create_time']
order_by = num
# selects = secrchATT(item_id, pdf_name, type_id)
#
# if selects:
# log.info(f'pdf_name:{pdf_name}已存在')
# id = ''
# return id
# else:
try:
Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,page_size,object_key,bucket_name) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
values = (
year, pdf_name+category, type_id, item_id, group_name, path, full_path, category, file_size, order_by,
status, create_by,
create_time, page_size,full_path.split('https://zzsn.obs.cn-north-1.myhuaweicloud.com/')[1],'zzsn')
cursor_.execute(Upsql, values) # 插入
cnx_.commit() # 提交
except Exception as e:
log.info(e)
log.info(f"更新完成:{item_id}===={pdf_name+category}")
try:
selects = secrchATT(item_id, retData, type_id)
except Exception as e:
log.info(e)
return ''
id = selects[0]
return id
def InsterInto(social_code, pdf_url,pub_time,pdf_name):
insert = False
# 信息插入数据库
try:
insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,publish_time,title,create_time) values(%s,%s,%s,%s,%s,%s,now())'''
list_info = [
social_code,
pdf_url,
'东方财富网',
'1',
pub_time[:10],
pdf_name
]
#144数据库
cursor.execute(insert_sql, tuple(list_info))
cnx.commit()
insert = True
return insert
except:
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, '数据库传输失败')
return insert
def ifInstert(short_name, social_code, title, info_date):
ifexist = True
aa = info_date[:10]
sel_sql = '''select social_credit_code,source_address from brpa_source_article where social_credit_code = %s and title = %s and publish_time = %s and origin='东方财富网' and type='1' '''
cursor.execute(sel_sql, (social_code, title, aa))
selects = cursor.fetchone()
#如果数据库中存在 则跳过
if selects:
ifexist = False
log.info(f'com_name:{short_name}、{title}, {info_date}已存在')
return ifexist
else:
return ifexist
def sendKafka(social_code,newsUrl,dic_news):
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], max_request_size=1024*1024*20)
kafka_result = producer.send("researchReportNoticeTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
dic_result = {
'success': 'ture',
'message': '操作成功',
'code': '200',
}
log.info(dic_result)
return True
except Exception as e:
dic_result = {
'success': 'false',
'message': '操作失败',
'code': '204',
'e': e
}
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, newsUrl, 'Kafka操作失败')
log.info(dic_result)
return False
def GetContent(pdf_url,info_url, pdf_name, social_code, year, pub_time, start_time,com_name,num):
# 上传至华为云服务器
retData = uptoOBS(pdf_url, pdf_name, 8, social_code)
# 附件插入att数据库
if retData['state']:
pass
else:
log.info(f'====pdf解析失败====')
# 获取当前进程pid
current_pid = baseCore.getPID()
# todo: 重新启动新进程,杀死当前进程
subprocess.Popen([sys.executable] + sys.argv)
os.kill(current_pid, 9)
return False
num = num + 1
att_id = tableUpdate(retData, com_name, year, pdf_name, num)
if att_id:
pass
else:
return False
content = retData['content']
lang = baseCore.detect_language(content)
if lang == 'cn':
lang = 'zh'
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_news = {
'attachmentIds': att_id,
'author': '',
'content': content,
'contentWithTag': '',
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': lang,
'origin': '东方财富网',
'publishDate': pub_time,
'sid': '1684032033495392257',
'sourceAddress': info_url, # 原文链接
'summary': '',
'title': pdf_name.replace('.pdf', ''),
'type': 3,
'socialCreditCode': social_code,
'year': year
}
# print(dic_news)
# 将相应字段通过kafka传输保存
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'],max_request_size=1024*1024*20)
kafka_result = producer.send("researchReportNoticeTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
dic_result = {
'success': 'ture',
'message': '操作成功',
'code': '200',
}
log.info(dic_result)
return True
except Exception as e:
dic_result = {
'success': 'false',
'message': '操作失败',
'code': '204',
'e': e
}
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, 'Kafka操作失败')
log.info(dic_result)
return False
def gonggao_info(dic_info):
code = dic_info[3]
com_name = dic_info[1]
social_code = dic_info[2]
if 'HK' in code:
pass
else:
return
#https://np-anotice-stock.eastmoney.com/api/security/ann?sr=-1&page_size=50&page_index=1&ann_type=H&client_source=web&stock_list=00175&f_node=0
url = f'https://np-anotice-stock.eastmoney.com/api/security/ann?sr=-1&page_size=50&page_index=1&ann_type=H&client_source=web&stock_list={code.split(".HK")[0]}&f_node=0'
for n1 in range(0, 3):
try:
res = requests.get(url, verify=False)
break
except:
if n1 == 2:
sys.exit(0)
time.sleep(5)
continue
res_json = res.json()
total_hits = res_json['data']['total_hits']
for page1 in range(1,total_hits+1):
url = f'https://np-anotice-stock.eastmoney.com/api/security/ann?sr=-1&page_size=50&page_index={page1}&ann_type=H&client_source=web&stock_list={code.split(".HK")[0]}&f_node=0'
for n1 in range(0, 3):
try:
res = requests.get(url, verify=False)
break
except:
if n1 == 2:
sys.exit(0)
time.sleep(5)
continue
res_json = res.json()
list_all = res_json['data']['list']
if list_all:
for one_info in list_all:
title = one_info['title']
info_date = one_info['notice_date']
year = info_date[:4]
# if page1 > 1 and '2022' in info_date:
# break_id = 1
# break
# if '2021' in info_date: # 只采集22年以后的数据
# break_id = 1
# break
try:
info_type = one_info['columns'][0]['column_name']
except:
info_type = ''
art_code = one_info['art_code']
info_url = 'https://data.eastmoney.com/notices/detail/' + code + '/' + art_code + '.html'
t = int(time.time() * 1000)
# https://np-cnotice-stock.eastmoney.com/api/content/ann?art_code=AN202308221595478274&client_source=web&page_index=1&_=1708918810986
json_url = f'https://np-cnotice-stock.eastmoney.com/api/content/ann?art_code={art_code}&client_source=web&page_index=1&_={t}'
for n1 in range(0, 3):
try:
ip = baseCore.get_proxy()
json_2 = requests.get(json_url, proxies=ip,verify=False).json()
break
except:
if n1 == 2:
sys.exit(0)
time.sleep(60)
continue
try:
pdf_url = json_2['data']['attach_url']
except:
pdf_url = ''
try:
info_content = json_2['data']['notice_content']
except:
info_content = ''
ifexist = ifInstert(com_name, social_code, title, info_date)
# ifexist = True
if ifexist:
# 解析PDF内容,先获取PDF链接 下载 解析成功,解析失败 ,传输成功,传输失败
result = GetContent(pdf_url, info_url,title, social_code, year, info_date, start_time, com_name, num)
if result:
# 公告信息列表
log.info(f'{com_name}==============解析传输操作成功')
state = 1
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, '成功')
# 发送kafka成功之后 再插入数据库
insert = InsterInto(social_code, info_url, info_date, title)
if insert:
log.info(f'===={social_code}========{title}=====插入库成功')
pass
else:
continue
else:
log.info(f'======={com_name}========{code}===已存在')
continue
if __name__ =='__main__':
#从redis中读取social_code'
list_c = []
list_all_info_1 = []
num = 0
taskType = '企业公告/东方财富网'
while True:
start_time = time.time()
# 获取企业信息
# social_code = baseCore.redicPullData('NoticeEnterprise:ggqy_socialCode_add')
social_code = '91330000747735638J'
if not social_code:
time.sleep(20)
continue
if social_code == 'None':
time.sleep(20)
continue
if social_code == '':
time.sleep(20)
continue
dic_info = baseCore.getInfomation(social_code)
# count = dic_info[15]
code = dic_info[3]
com_name = dic_info[1]
log.info(f'-----开始处理{com_name}----{social_code}------')
try:
gonggao_info(dic_info)
except:
log.info(f'-----error:{com_name}----{social_code}------')
break
import os import os
...@@ -48,6 +48,7 @@ def getuuid(): ...@@ -48,6 +48,7 @@ def getuuid():
get_timestamp_uuid = uuid.uuid1() # 根据 时间戳生成 uuid , 保证全球唯一 get_timestamp_uuid = uuid.uuid1() # 根据 时间戳生成 uuid , 保证全球唯一
return get_timestamp_uuid return get_timestamp_uuid
def uptoOBS(pdf_url,pdf_name,type_id,social_code): def uptoOBS(pdf_url,pdf_name,type_id,social_code):
headers = {} headers = {}
category = os.path.splitext(pdf_url)[1] category = os.path.splitext(pdf_url)[1]
...@@ -56,16 +57,41 @@ def uptoOBS(pdf_url,pdf_name,type_id,social_code): ...@@ -56,16 +57,41 @@ def uptoOBS(pdf_url,pdf_name,type_id,social_code):
'category': category, 'file_size': '', 'status': 1, 'create_by': 'XueLingKun', 'category': category, 'file_size': '', 'status': 1, 'create_by': 'XueLingKun',
'create_time': '', 'page_size': '', 'content': ''} 'create_time': '', 'page_size': '', 'content': ''}
headers['User-Agent'] = baseCore.getRandomUserAgent() headers['User-Agent'] = baseCore.getRandomUserAgent()
for i in range(0, 3): if category == '.pdf':
try: try:
ip = baseCore.get_proxy() response = requests.get(pdf_url, headers=headers, verify=False, timeout=20)
response = requests.get(pdf_url, headers=headers,verify=False,proxies=ip, timeout=20) if response.status_code != 200:
return retData
file_size = int(response.headers.get('Content-Length')) file_size = int(response.headers.get('Content-Length'))
retData['content'] = response.text with fitz.open(stream=response.content, filetype='pdf') as doc:
break page_size = doc.page_count
except Exception as e: for page in doc.pages():
time.sleep(60) retData['content'] += page.get_text()
continue # todo:判断内容是否成功
if '<div class="K">403</div>' in retData['content'] or 'Error Times: ' in retData['content']:
return retData
else:
pass
except:
log.error(f'文件损坏')
return retData
else:
for i in range(0, 3):
try:
page_size = 1
response = requests.get(pdf_url, headers=headers,verify=False, timeout=20)
if response.status_code != 200:
return retData
file_size = int(response.headers.get('Content-Length'))
retData['content'] = response.text
#todo:判断内容是否成功
if '<div class="K">403</div>' in retData['content'] or 'Error Times: ' in retData['content']:
return retData
else:
break
except:
time.sleep(3)
continue
name = str(getuuid()) + category name = str(getuuid()) + category
try: try:
...@@ -73,12 +99,6 @@ def uptoOBS(pdf_url,pdf_name,type_id,social_code): ...@@ -73,12 +99,6 @@ def uptoOBS(pdf_url,pdf_name,type_id,social_code):
except: except:
log.error(f'OBS发送失败') log.error(f'OBS发送失败')
return retData return retData
try:
with fitz.open(stream=response.content, filetype='pdf') as doc:
page_size = doc.page_count
except:
log.error(f'文件损坏')
return retData
if page_size < 1: if page_size < 1:
# pdf解析失败 # pdf解析失败
# print(f'======pdf解析失败=====') # print(f'======pdf解析失败=====')
...@@ -95,11 +115,12 @@ def uptoOBS(pdf_url,pdf_name,type_id,social_code): ...@@ -95,11 +115,12 @@ def uptoOBS(pdf_url,pdf_name,type_id,social_code):
except Exception as e: except Exception as e:
state = 0 state = 0
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, f'{e}') #baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, f'{e}')
return retData return retData
return retData return retData
@retry(tries=3, delay=1) @retry(tries=3, delay=1)
def getOBSres(pathType,name, response): def getOBSres(pathType,name, response):
result = obsClient.putContent('zzsn', pathType + name, content=response.content) result = obsClient.putContent('zzsn', pathType + name, content=response.content)
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论