提交 d53906a2 作者: 薛凌堃

年报上传

上级 0be1bdd9
import requests import requests
import requests import requests
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from base import BaseCore
import os import os
import pandas as pd import pandas as pd
import numpy as np import numpy as np
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
headers = { headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0',
} }
......
# coding:utf-8
import pandas as pd
from datetime import datetime
"""
读取文件 path = 'D:\kkwork\zzsn_spider\data\'
"""
import json
import os
import re
import time
import uuid
from kafka import KafkaProducer
from obs import ObsClient
import fitz
from urllib.parse import unquote
from retry import retry
obsClient = ObsClient(
access_key_id='VEHN7D0TJ9316H8AHCAV', # 你的华为云的ak码
secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk
server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址
)
import BaseCore
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
cnx = baseCore.cnx
cursor = baseCore.cursor
cnx_ = baseCore.cnx_
cursor_ = baseCore.cursor_
pathType = 'QYYearReport/'
type_id = 1
create_by = 'LiuLiYuan'
taskType = '企业年报'
def sendKafka(dic_news,xydm):
start_time = time.time()
try: # 114.116.116.241
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], max_request_size=1024 * 1024 * 20)
kafka_result = producer.send("researchReportTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
dic_result = {
'success': 'ture',
'message': '操作成功',
'code': '200',
}
log.info(dic_result)
# 传输成功,写入日志中
state = 1
takeTime = baseCore.getTimeCost(start_time, time.time())
return True
except Exception as e:
dic_result = {
'success': 'false',
'message': '操作失败',
'code': '204',
'e': e
}
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, dic_news['title'], 'Kafka操作失败')
log.info(dic_result)
return False
def getuuid():
get_timestamp_uuid = uuid.uuid1() # 根据 时间戳生成 uuid , 保证全球唯一
return get_timestamp_uuid
def uptoOBS(retData, pathType, taskType, start_time, file_name, pdf_path):
"""
retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': '', 'path': '',
'full_path': '',
'category': 'pdf', 'file_size': file_size, 'status': 1, 'create_by': create_by,
'create_time': '', 'page_size': page_size, 'content': content}
"""
state = retData['state']
type_id = retData['type_id']
social_code = retData['item_id']
group_name = retData['group_name']
path = retData['path']
full_path = retData['full_path']
category = retData['category']
file_size = retData['file_size']
status = retData['status']
create_by = retData['create_by']
create_time = retData['create_time']
page_size = retData['page_size']
content = retData['content']
retData_f = {'state': state, 'type_id': type_id, 'item_id': social_code, 'group_name': group_name, 'path': path,
'full_path': full_path,
'category': category, 'file_size': file_size, 'status': status, 'create_by': create_by,
'create_time': create_time, 'page_size': page_size, 'content': content}
try:
name = str(getuuid()) + '.pdf'
result = getOBSres(pathType, name, pdf_path)
except:
log = baseCore.getLogger()
log.error(f'OBS发送失败')
return retData
try:
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
retData_f['state'] = True
retData_f['path'] = unquote(result['body']['objectUrl'].split('.com')[1])
retData_f['full_path'] = unquote(result['body']['objectUrl'])
retData_f['create_time'] = time_now
except Exception as e:
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_path, f'{e}')
return retData_f
return retData_f
@retry(tries=3, delay=1)
def getOBSres(pathType, name, response):
# result = obsClient.putContent('zzsn', f'{pathType}{now_time}/' + name, content=response.content)
result = obsClient.putFile('zzsn', pathType + name, file_path=response)
return result
def secrchATT(item_id, year, type_id):
sel_sql = '''select id from clb_sys_attachment where item_id = %s and year = %s and type_id=%s'''
cursor_.execute(sel_sql, (item_id, year, type_id))
selects = cursor_.fetchone()
return selects
def getXydm(df, xydm_list):
for i in range(df.shape[0]):
year = df.iloc[i]['年报年份']
if not pd.isna(year):
ename = str(df.iloc[i]['企业名称_英文']).lstrip().strip()
shortName = str(df.iloc[i]['企业简称']).lstrip().strip()
xydm = str(df.iloc[i]['企业信用代码']).lstrip().strip()
publishDate_ = str(df.iloc[i]['发布时间']).lstrip().strip()
source_ = str(df.iloc[i]['来源']).lstrip().strip()
xydm_list[f'{ename}'] = xydm
xydm_list[f'{shortName}'] = xydm
xydm_list[f'publishDate_{xydm}_{year}'] = publishDate_
xydm_list[f'source_{xydm}_{year}'] = source_
return xydm_list
if __name__ == '__main__':
xydm_list = {}
df_1 = pd.read_excel('./中国500强榜单年报.xlsx', sheet_name='Sheet1')
# df_2 = pd.read_excel('./年报补充_20231120.xlsx', sheet_name='福布斯2000强')
xydm_list_ = getXydm(df_1, xydm_list)
# getXydm(df_2, xydm_list)
fileList = [r'D:\年报\中国500强_年报补充_97_20231206']
for file_ in fileList:
# file_ = r'D:/年报/欧盟2500_年报补充_71_20231123'
path = os.listdir(file_)
for one in path:
pdf_path = fr'{file_}/{one}'
start_time = time.time()
title = one.lstrip().strip()
file_name_ = title.split(':')[0].lstrip().strip()
year = re.findall('(\d{4})', title)[0]
xydm = xydm_list[f'{file_name_}']
print(xydm_list_[f'publishDate_{xydm}_{year}'])
if year == '2023' and '官网无' in xydm_list_[f'publishDate_{xydm}_{year}']:
publishDate = '2023-08-31'
elif '官网无' not in xydm_list_[f'publishDate_{xydm}_{year}']:
publishDate = xydm_list_[f'publishDate_{xydm}_{year}']
# pattern1 = r"\d{4}年\d{1,2}月\d{1,2}日"
# pattern2 = r"\d{4}/\d{1,2}/\d{1,2}"
# match1 = re.match(pattern1, xydm_list_[f'publishDate_{xydm}'])
# match2 = re.match(pattern2, xydm_list_[f'publishDate_{xydm}'])
# if match1:
# date1 = datetime.strptime(xydm_list_[f'publishDate_{xydm}'], "%Y年%m月%d日")
# publishDate = date1.strftime("%Y-%m-%d")
# if match2:
# date2 = datetime.strptime(xydm_list_[f'publishDate_{xydm}'], "%Y/%m/%d")
# publishDate = date2.strftime("%Y-%m-%d")
else:
publishDate = str(year) + '-12-31'
num = 1
content = ''
if xydm_list[f'source_{xydm}_{year}'] == '官网':
origin = file_name_ + '官网'
else:
origin = xydm_list[f'source_{xydm}_{year}']
test = {
'xydm' : xydm,
'origin' : origin,
'year' : year,
'publishDate' : publishDate,
'file_name_':file_name_
}
log.info(test)
selects = secrchATT(xydm, year, 1)
if selects:
# self.getLogger().info(f'com_name:{com_name}--{year}已存在')
log.info(f'===={title}--年报已存在===')
continue
# 解析文件页数和内容
log.info(f"-----------正在处理{title}--------------")
with open(pdf_path, 'rb') as file:
byte_stream = file.read()
# print(byte_stream)
try:
with fitz.open(stream=byte_stream, filetype='pdf') as doc:
# page_size = doc.pageCount
page_size = doc.page_count
print(page_size)
for page in doc.pages():
content += page.get_text()
# print(content)
except Exception as e:
log.info(f'文件已损坏:{title}==={e}')
continue
# 解析文件大小
file_size = os.path.getsize(pdf_path)
file_size = baseCore.convert_size(file_size)
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
retData = {'state': False, 'type_id': type_id, 'item_id': xydm, 'group_name': '', 'path': '',
'full_path': '',
'category': 'pdf', 'file_size': file_size, 'status': 1, 'create_by': create_by,
'create_time': time_now, 'page_size': page_size, 'content': content}
# 文件上传至文件服务器
try:
retData_f = uptoOBS(retData, pathType, taskType, start_time, title, pdf_path)
if retData_f['state']:
# retData, com_name, year, pdf_name, num, pub_time
att_id = baseCore.tableUpdate(retData_f, year, title, num, publishDate, origin)
if att_id:
detect_language = baseCore.detect_language(content)
dic_news = {
'attachmentIds': att_id,
'author': '',
'content': content,
'contentWithTag': '',
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': detect_language,
'origin': origin,
'publishDate': publishDate,
'sid': '1684032033495392257',
'sourceAddress': '', # 原文链接
'summary': '',
'title': title.replace('.pdf', ''),
'type': 1,
'socialCreditCode': xydm,
'year': year
}
if sendKafka(dic_news,xydm):
log.info(f'成功---{title}----{att_id}---{xydm}')
num += 1
else:
log.info(f'失败---{title}----{att_id}---{xydm}')
# 删除插入的数据 400表示发送数据失败
baseCore.deliteATT(att_id)
log.info(f'已删除插入附件表的数据---{title}-----{xydm}')
else:
log.info(f'-----年报已存在--{title}--{xydm}-----')
except Exception as e:
log.info(f'error------{e}')
baseCore.close()
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论