提交 156d4509 作者: 薛凌堃

补充年报修改

上级 584826b4
...@@ -29,7 +29,7 @@ type_id = 1 ...@@ -29,7 +29,7 @@ type_id = 1
create_by = 'XueLingKun' create_by = 'XueLingKun'
taskType = '企业年报' taskType = '企业年报'
#付俊雪的需要改为巨潮资讯网 #付俊雪的需要改为巨潮资讯网
file_path = 'D:\\BaiduNetdiskDownload\\Belgium' file_path = 'D:\\BaiduNetdiskDownload\\1_福布斯2000_PDF_50_马'
log.info(f'=============当前pid为{baseCore.getPID()}==============') log.info(f'=============当前pid为{baseCore.getPID()}==============')
def sendKafka(dic_news): def sendKafka(dic_news):
...@@ -137,7 +137,7 @@ if __name__=='__main__': ...@@ -137,7 +137,7 @@ if __name__=='__main__':
social_code = data[1] social_code = data[1]
ename = data[2] ename = data[2]
cname = data[3] cname = data[3]
file_name = ename + ':' + file_year + '年年度报告' + '.pdf' file_name = cname + ':' + file_year + '年年度报告' + '.pdf'
content = '' content = ''
#解析文件页数和内容 #解析文件页数和内容
log.info(f"-----------正在处理{file_name}--------------") log.info(f"-----------正在处理{file_name}--------------")
......
"""
读取文件 path = 'D:\kkwork\zzsn_spider\data\'
"""
import json
import os
import time
from kafka import KafkaProducer
from obs import ObsClient
import fitz
from urllib.parse import unquote
from retry import retry
obsClient = ObsClient(
access_key_id='VEHN7D0TJ9316H8AHCAV', # 你的华为云的ak码
secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk
server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址
)
import requests
import BaseCore
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
cnx = baseCore.cnx
cursor = baseCore.cursor
pathType = 'QYYearReport/'
type_id = 1
create_by = 'XueLingKun'
taskType = '企业年报'
#付俊雪的需要改为巨潮资讯网
file_path = 'D:\\BaiduNetdiskDownload\\1_福布斯2000_PDF_50_郭'
log.info(f'=============当前pid为{baseCore.getPID()}==============')
def sendKafka(dic_news):
start_time = time.time()
try: # 114.116.116.241
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'],max_request_size=1024*1024*20)
kafka_result = producer.send("researchReportTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
dic_result = {
'success': 'ture',
'message': '操作成功',
'code': '200',
}
log.info(dic_result)
# 传输成功,写入日志中
state = 1
takeTime = baseCore.getTimeCost(start_time, time.time())
return True
except Exception as e:
dic_result = {
'success': 'false',
'message': '操作失败',
'code': '204',
'e': e
}
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, dic_news['title'], 'Kafka操作失败')
log.info(dic_result)
return False
def uptoOBS(retData, pathType, taskType, start_time,file_name,pdf_path):
"""
retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': '', 'path': '',
'full_path': '',
'category': 'pdf', 'file_size': file_size, 'status': 1, 'create_by': create_by,
'create_time': '', 'page_size': page_size, 'content': content}
"""
state = retData['state']
type_id = retData['type_id']
social_code = retData['item_id']
group_name = retData['group_name']
path = retData['path']
full_path = retData['full_path']
category = retData['category']
file_size = retData['file_size']
status = retData['status']
create_by = retData['create_by']
create_time = retData['create_time']
page_size = retData['page_size']
content = retData['content']
retData_f = {'state': state, 'type_id': type_id, 'item_id': social_code, 'group_name': group_name, 'path': path,
'full_path': full_path,
'category': category, 'file_size': file_size, 'status': status, 'create_by': create_by,
'create_time': create_time, 'page_size': page_size, 'content': content}
try:
result = getOBSres(pathType, file_name, pdf_path)
except:
log = baseCore.getLogger()
log.error(f'OBS发送失败')
return retData
try:
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
retData_f['state'] = True
retData_f['path'] = unquote(result['body']['objectUrl'].split('.com')[1])
retData_f['full_path'] = unquote(result['body']['objectUrl'])
retData_f['create_time'] = time_now
except Exception as e:
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_path, f'{e}')
return retData_f
return retData_f
@retry(tries=3, delay=1)
def getOBSres(pathType, name, response):
# result = obsClient.putContent('zzsn', f'{pathType}{now_time}/' + name, content=response.content)
result = obsClient.putFile('zzsn', pathType+name, file_path=response)
return result
if __name__=='__main__':
log.info(f'-----------当前文件{file_path}---------------')
file_list = os.listdir(file_path)
# print(pdf_list)
num = 1
for file in file_list:
start_time = time.time()
pdf_path = file_path + '/'+file
file_rank = int(file.split('-')[0])
file_year = file.split('-')[1]
#file_rank 对应上企业信用代码
selectsql = f"select * from rankandcode where id = {file_rank}"
cursor.execute(selectsql)
data = cursor.fetchone()
cnx.commit()
social_code = data[1]
ename = data[2]
cname = data[3]
file_name = cname + ':' + file_year + '年年度报告' + '.pdf'
content = ''
#解析文件页数和内容
log.info(f"-----------正在处理{file_name}--------------")
with open(pdf_path, 'rb') as file:
byte_stream = file.read()
# print(byte_stream)
try:
with fitz.open(stream=byte_stream, filetype='pdf') as doc:
# page_size = doc.pageCount
page_size = doc.page_count
print(page_size)
for page in doc.pages():
content += page.get_text()
# print(content)
except Exception as e:
log.info(f'文件已损坏:{cname}')
continue
#解析文件大小
file_size = os.path.getsize(pdf_path)
file_size = baseCore.convert_size(file_size)
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': '', 'path': '',
'full_path': '',
'category': 'pdf', 'file_size': file_size, 'status': 1, 'create_by': create_by,
'create_time': time_now, 'page_size': page_size, 'content': content}
#文件上传至文件服务器
try:
retData_f = uptoOBS(retData, pathType, taskType, start_time,file_name,pdf_path)
if retData_f['state']:
#retData, com_name, year, pdf_name, num, pub_time
att_id= baseCore.tableUpdate(retData_f, cname,file_year,file_name, num,file_year+'-12-31')
if att_id:
dic_news = {
'attachmentIds': att_id,
'author': '',
'content': content,
'contentWithTag': '',
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': 'zh',
'origin': '企业官网',
'publishDate': file_year + '-12-31',
'sid': '1684032033495392257',
'sourceAddress': '', # 原文链接
'summary': '',
'title': file_name,
'type': 1,
'socialCreditCode': social_code,
'year': file_year
}
if sendKafka(dic_news):
log.info(f'成功-{file_rank}--{file_name}----{att_id}---{social_code}')
num += 1
else:
log.info(f'失败-{file_rank}--{file_name}----{att_id}---{social_code}')
# 删除插入的数据 400表示发送数据失败
baseCore.deliteATT(att_id)
log.info(f'已删除插入附件表的数据---{file_name}-----{social_code}')
except Exception as e:
log.info(f'error------{e}')
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论