提交 afb6cfd1 作者: 薛凌堃

企业公告脚本测试

上级 3e7c3b28
import requests, pymysql, re, time, json, sys import requests,time, json, sys
import requests, pymysql, re, time, json, sys import requests,time, json, sys
import pandas as pd from kafka import KafkaProducer
from bs4 import BeautifulSoup from base import BaseCore
from selenium import webdriver from obs import ObsClient
from concurrent.futures.thread import ThreadPoolExecutor import fitz
from urllib.parse import unquote
from base.BaseCore import BaseCore baseCore = BaseCore.BaseCore()
baseCore = BaseCore()
log = baseCore.getLogger() log = baseCore.getLogger()
cnx = baseCore.cnx cnx = baseCore.cnx
cursor = baseCore.cursor cursor = baseCore.cursor
cnx_ = baseCore.cnx_
cursor_ = baseCore.cursor_
def InsterInto(short_name, social_code, pdf_url): obsClient = ObsClient(
inster = False access_key_id='VEHN7D0TJ9316H8AHCAV', # 你的华为云的ak码
secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk
server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址
)
sel_sql = '''select social_credit_code,source_address from brpa_source_article where social_credit_code = %s and source_address = %s''' #获取文件大小
cursor.execute(sel_sql, (social_code, pdf_url)) def convert_size(size_bytes):
selects = cursor.fetchone() # 定义不同单位的转换值
if selects: units = ['bytes', 'KB', 'MB', 'GB', 'TB']
print(f'com_name:{short_name}、{pdf_url}已存在') i = 0
return inster while size_bytes >= 1024 and i < len(units)-1:
size_bytes /= 1024
i += 1
return f"{size_bytes:.2f} {units[i]}"
def uptoOBS(pdf_url,pdf_name,type_id,social_code):
headers = {}
retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': '', 'path': '',
'full_path': '',
'category': 'pdf', 'file_size': '', 'status': 1, 'create_by': 'XueLingKun',
'create_time': '', 'page_size': '', 'content': ''}
headers['User-Agent'] = baseCore.getRandomUserAgent()
for i in range(0, 3):
try:
response = requests.get(pdf_url, headers=headers,verify=False, timeout=20)
file_size = int(response.headers.get('Content-Length'))
break
except:
time.sleep(3)
continue
page_size = 0
for i in range(0, 3):
try:
name = pdf_name
now_time = time.strftime("%Y-%m")
result = obsClient.putContent('zzsn', 'QYNotice/'+name, content=response.content)
with fitz.open(stream=response.content, filetype='pdf') as doc:
page_size = doc.page_count
for page in doc.pages():
retData['content'] += page.get_text()
break
except:
time.sleep(3)
continue
if page_size < 1:
# pdf解析失败
# print(f'======pdf解析失败=====')
return retData
else:
try:
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
retData['state'] = True
retData['path'] = unquote(result['body']['objectUrl'].split('.com')[1])
retData['full_path'] = unquote(result['body']['objectUrl'])
retData['file_size'] = convert_size(file_size)
retData['create_time'] = time_now
retData['page_size'] = page_size
except Exception as e:
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, f'{e}')
return retData
return retData
def secrchATT(item_id, name, type_id,order_by):
sel_sql = '''select id from clb_sys_attachment where item_id = %s and name = %s and type_id=%s and order_by=%s '''
cursor_.execute(sel_sql, (item_id, name+'.pdf', type_id,order_by))
select = cursor_.fetchall()
selects = select[-1]
return selects
# 插入到att表 返回附件id
def tableUpdate(retData, com_name, year, pdf_name, num):
item_id = retData['item_id']
type_id = retData['type_id']
group_name = retData['group_name']
path = retData['path']
full_path = retData['full_path']
category = retData['category']
file_size = retData['file_size']
status = retData['status']
create_by = retData['create_by']
page_size = retData['page_size']
create_time = retData['create_time']
order_by = num
# selects = secrchATT(item_id, pdf_name, type_id)
#
# if selects:
# log.info(f'pdf_name:{pdf_name}已存在')
# id = ''
# return id
# else:
try:
Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,page_size,object_key,bucket_name) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
values = (
year, pdf_name+'.pdf', type_id, item_id, group_name, path, full_path, category, file_size, order_by,
status, create_by,
create_time, page_size,full_path.split('https://zzsn.obs.cn-north-1.myhuaweicloud.com/')[1],'zzsn')
cursor_.execute(Upsql, values) # 插入
cnx_.commit() # 提交
except Exception as e:
print(e)
log.info(f"更新完成:{item_id}===={pdf_name+'.pdf'}")
selects = secrchATT(item_id, pdf_name, type_id,order_by)
id = selects[0]
return id
def InsterInto(social_code, pdf_url,pub_time,pdf_name):
insert = False
# 信息插入数据库 # 信息插入数据库
try: try:
insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,create_time) values(%s,%s,%s,%s,now())''' insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,publish_time,title,create_time) values(%s,%s,%s,%s,%s,%s,now())'''
list_info = [ list_info = [
social_code, social_code,
pdf_url, pdf_url,
'东方财富网', '东方财富网',
'1', '1',
pub_time,
pdf_name
] ]
#144数据库 #144数据库
cursor.execute(insert_sql, tuple(list_info)) cursor.execute(insert_sql, tuple(list_info))
...@@ -43,8 +149,90 @@ def InsterInto(short_name, social_code, pdf_url): ...@@ -43,8 +149,90 @@ def InsterInto(short_name, social_code, pdf_url):
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, '数据库传输失败') baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, '数据库传输失败')
return insert return insert
def ifInstert(short_name, social_code, pdf_url):
ifexist = True
sel_sql = '''select social_credit_code,source_address from brpa_source_article where social_credit_code = %s and source_address = %s and origin='证监会' and type='1' '''
cursor.execute(sel_sql, (social_code, pdf_url))
selects = cursor.fetchone()
#如果数据库中存在 则跳过
if selects:
ifexist = False
log.info(f'com_name:{short_name}、{pdf_url}已存在')
return ifexist
else:
return ifexist
def GetContent(pdf_url,info_url, pdf_name, social_code, year, pub_time, start_time,com_name,num):
# 上传至华为云服务器
retData = uptoOBS(pdf_url, pdf_name, 8, social_code)
# 附件插入att数据库
if retData['state']:
pass
else:
log.info(f'====pdf解析失败====')
return False
num = num + 1
att_id = tableUpdate(retData, com_name, year, pdf_name, num)
if att_id:
pass
else:
return False
content = retData['content']
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_news = {
'attachmentIds': att_id,
'author': '',
'content': content,
'contentWithTag': '',
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': 'zh',
'origin': '东方财富网',
'publishDate': pub_time,
'sid': '1684032033495392257',
'sourceAddress': info_url, # 原文链接
'summary': '',
'title': pdf_name.replace('.pdf', ''),
'type': 3,
'socialCreditCode': social_code,
'year': year
}
# print(dic_news)
# 将相应字段通过kafka传输保存
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("researchReportTopicaaaas",
json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
dic_result = {
'success': 'ture',
'message': '操作成功',
'code': '200',
}
log.info(dic_result)
return True
except Exception as e:
dic_result = {
'success': 'false',
'message': '操作失败',
'code': '204',
'e': e
}
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, 'Kafka操作失败')
log.info(dic_result)
return False
def gonggao_info(dic_info): def gonggao_info(dic_info):
list_all_info = []
code = dic_info[3] code = dic_info[3]
com_name = dic_info[4] com_name = dic_info[4]
social__code = dic_info[2] social__code = dic_info[2]
...@@ -67,7 +255,7 @@ def gonggao_info(dic_info): ...@@ -67,7 +255,7 @@ def gonggao_info(dic_info):
com_code = 'BJ' + code1 com_code = 'BJ' + code1
break_id = 0 break_id = 0
for page1 in range(1, 2): for page1 in range(1, 100):
if break_id == 1: if break_id == 1:
break break
url = f'https://np-anotice-stock.eastmoney.com/api/security/ann?sr=-1&page_size=50&page_index={page1}&ann_type=A&client_source=web&stock_list={code1}&f_node=0&s_node=0' url = f'https://np-anotice-stock.eastmoney.com/api/security/ann?sr=-1&page_size=50&page_index={page1}&ann_type=A&client_source=web&stock_list={code1}&f_node=0&s_node=0'
...@@ -83,12 +271,12 @@ def gonggao_info(dic_info): ...@@ -83,12 +271,12 @@ def gonggao_info(dic_info):
continue continue
res_json = res.json() res_json = res.json()
list_all = res_json['data']['list'] list_all = res_json['data']['list']
if list_all: if list_all:
for one_info in list_all: for one_info in list_all:
title = one_info['title'] title = one_info['title']
info_date = one_info['notice_date'] info_date = one_info['notice_date']
year = info_date[:4]
if page1 > 1 and '2022' in info_date: if page1 > 1 and '2022' in info_date:
break_id = 1 break_id = 1
break break
...@@ -114,59 +302,62 @@ def gonggao_info(dic_info): ...@@ -114,59 +302,62 @@ def gonggao_info(dic_info):
sys.exit(0) sys.exit(0)
time.sleep(5) time.sleep(5)
continue continue
try: try:
pdf_url = json_2['data']['attach_url'] pdf_url = json_2['data']['attach_url']
except: except:
pdf_url = '' pdf_url = ''
#拿到pdfurl去数据库中查找,如果有该条信息 则跳过,否则继续采集
sel_sql = '''select social_credit_code from brpa_source_article where source_address = %s and type='1' '''
cursor.execute(sel_sql, info_url)
selects = cursor.fetchall()
if selects:
return
else:
pass
try: try:
info_content = json_2['data']['notice_content'] info_content = json_2['data']['notice_content']
except: except:
info_content = '' info_content = ''
ifexist = ifInstert(com_name, social_code, info_url)
if ifexist:
# 解析PDF内容,先获取PDF链接 下载 解析成功,解析失败 ,传输成功,传输失败
result = GetContent(pdf_url, info_url,title, social_code, year, info_date, start_time, com_name, num)
if result:
# 公告信息列表
log.info(f'{com_name}==============解析传输操作成功')
state = 1
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, '成功')
list_info = [ # 发送kafka成功之后 再插入数据库
social_code, insert = InsterInto(social_code, pdf_url, info_date, title)
title, if insert:
info_content[:2000], log.info(f'===={social_code}========{title}=====插入库成功')
info_date, pass
info_url,
pdf_url,
'东方财富网',
'1',
'zh'
]
# list_all_info.append(tuple(list_info))
with cnx.cursor() as cursor:
sel_sql = '''select social_credit_code from brpa_source_article where source_address = %s '''
cursor.execute(sel_sql, info_url)
selects = cursor.fetchall()
if selects:
break
else: else:
#todo:取消入库操作 continue
insert_sql = '''insert into brpa_source_article(social_credit_code,title,summary,publish_date,source_address,pdf_address,origin,type,lang) values(%s,%s,%s,%s,%s,%s,%s,%s,%s)''' else:
cursor.execute(insert_sql, tuple(list_info)) log.info(f'======={com_name}========{code}===已存在')
cnx.commit() continue
else:
break
print(f'{code}:传输完成')
# list_all_info_1.append(list_all_info)
list_c.append(code)
# list_info = [
# social_code,
# title,
# info_content[:2000],
# info_date,
# info_url,
# pdf_url,
# '东方财富网',
# '1',
# 'zh'
# ]
# # list_all_info.append(tuple(list_info))
# with cnx.cursor() as cursor:
# sel_sql = '''select social_credit_code from brpa_source_article where source_address = %s '''
#
# cursor.execute(sel_sql, info_url)
# selects = cursor.fetchall()
# if selects:
# break
# else:
# #todo:取消入库操作
# insert_sql = '''insert into brpa_source_article(social_credit_code,title,summary,publish_date,source_address,pdf_address,origin,type,lang) values(%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
# cursor.execute(insert_sql, tuple(list_info))
# cnx.commit()
# else:
# break
if __name__ =='__main__': if __name__ =='__main__':
#从redis中读取social_code' #从redis中读取social_code'
...@@ -178,8 +369,8 @@ if __name__ =='__main__': ...@@ -178,8 +369,8 @@ if __name__ =='__main__':
while True: while True:
start_time = time.time() start_time = time.time()
# 获取企业信息 # 获取企业信息
social_code = baseCore.redicPullData('NoticeEnterpriseEasteFinance:gnshqy_socialCode') # social_code = baseCore.redicPullData('NoticeEnterpriseEasteFinance:gnshqy_socialCode')
# social_code = '911100007109288314' social_code = '911100007109288314'
if not social_code: if not social_code:
time.sleep(20) time.sleep(20)
continue continue
...@@ -193,6 +384,7 @@ if __name__ =='__main__': ...@@ -193,6 +384,7 @@ if __name__ =='__main__':
count = dic_info[15] count = dic_info[15]
code = dic_info[3] code = dic_info[3]
com_name = dic_info[4] com_name = dic_info[4]
log.info(f'-----开始处理{com_name}----{social_code}------')
gonggao_info(dic_info) gonggao_info(dic_info)
......
import json import json
...@@ -120,7 +120,7 @@ def tableUpdate(retData, com_name, year, pdf_name, num): ...@@ -120,7 +120,7 @@ def tableUpdate(retData, com_name, year, pdf_name, num):
values = ( values = (
year, pdf_name, type_id, item_id, group_name, path, full_path, category, file_size, order_by, year, pdf_name, type_id, item_id, group_name, path, full_path, category, file_size, order_by,
status, create_by, status, create_by,
create_time, page_size,path,'zzsn') create_time, page_size,full_path.split('https://zzsn.obs.cn-north-1.myhuaweicloud.com/')[1],'zzsn')
cursor_.execute(Upsql, values) # 插入 cursor_.execute(Upsql, values) # 插入
cnx_.commit() # 提交 cnx_.commit() # 提交
except Exception as e: except Exception as e:
...@@ -283,14 +283,14 @@ def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time,com_na ...@@ -283,14 +283,14 @@ def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time,com_na
#判断文件是否已经存在obs服务器中 #判断文件是否已经存在obs服务器中
# file_path = 'QYNotice//浙江国祥股份有限公司首次公开发行股票并在主板上市暂缓发行公告' # file_path = 'QYNotice//浙江国祥股份有限公司首次公开发行股票并在主板上市暂缓发行公告'
now_time = time.strftime("%Y-%m") now_time = time.strftime("%Y-%m")
file_path = 'QYNotice/'+pdf_name # file_path = 'QYNotice/'+pdf_name
response = obsClient.getObjectMetadata('zzsn', file_path) # response = obsClient.getObjectMetadata('zzsn', file_path)
if response.status >= 300: # if response.status >= 300:
log.info('=====文件不存在obs=====') # log.info('=====文件不存在obs=====')
pass # pass
else: # else:
log.info(f'=====文件存在obs========{file_path}') # log.info(f'=====文件存在obs========{file_path}')
return False # return False
#上传至华为云服务器 #上传至华为云服务器
retData = uptoOBS(pdf_url,pdf_name,8,social_code) retData = uptoOBS(pdf_url,pdf_name,8,social_code)
#附件插入att数据库 #附件插入att数据库
...@@ -323,7 +323,7 @@ def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time,com_na ...@@ -323,7 +323,7 @@ def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time,com_na
'sid': '1684032033495392257', 'sid': '1684032033495392257',
'sourceAddress': pdf_url, # 原文链接 'sourceAddress': pdf_url, # 原文链接
'summary': '', 'summary': '',
'title': pdf_name, 'title': pdf_name.replace('.pdf',''),
'type': 3, 'type': 3,
'socialCreditCode': social_code, 'socialCreditCode': social_code,
'year': year 'year': year
...@@ -332,7 +332,7 @@ def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time,com_na ...@@ -332,7 +332,7 @@ def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time,com_na
# 将相应字段通过kafka传输保存 # 将相应字段通过kafka传输保存
try: try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092']) producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("researchReportTopic", json.dumps(dic_news, ensure_ascii=False).encode('utf8')) kafka_result = producer.send("researchReportTopicaaaas", json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10)) print(kafka_result.get(timeout=10))
...@@ -430,6 +430,7 @@ def SpiderByZJH(url, payload, dic_info, start_time,num): # dic_info 数据库 ...@@ -430,6 +430,7 @@ def SpiderByZJH(url, payload, dic_info, start_time,num): # dic_info 数据库
# 判断数据库中是否有该条资讯 # 判断数据库中是否有该条资讯
ifexist = ifInstert(short_name, social_code, pdf_url) ifexist = ifInstert(short_name, social_code, pdf_url)
#如果不存在 ifexist = True #如果不存在 ifexist = True
# ifexist = True
if ifexist: if ifexist:
# 解析PDF内容,先获取PDF链接 下载 解析成功,解析失败 ,传输成功,传输失败 # 解析PDF内容,先获取PDF链接 下载 解析成功,解析失败 ,传输成功,传输失败
result = GetContent(pdf_url, name_pdf, social_code, year, pub_time, start_time,com_name,num) result = GetContent(pdf_url, name_pdf, social_code, year, pub_time, start_time,com_name,num)
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论