提交 1c9e1d5d 作者: 王景浩

外汇交易财报读取excle

上级 bcf9dc87
import json
import os
import uuid
import time
import pandas as pd
from kafka import KafkaProducer
from obs import ObsClient
import fitz
from urllib.parse import unquote
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
from retry import retry
obsClient = ObsClient(
access_key_id='VEHN7D0TJ9316H8AHCAV', # 你的华为云的ak码
secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk
server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址
)
import requests,pymysql
# cnx = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='caiji',
# charset='utf8mb4')
#
# cursor = cnx.cursor()
import BaseCore
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
cnx = baseCore.cnx
cursor = baseCore.cursor
pathType = 'QYYearReport/'
type_id = 1
create_by = 'WangJingHao'
taskType = '企业年报'
# file_path = 'D:/kkwork/zzsn_spider/data/1_福布斯2000_PDF_50_郑'
def sendKafka(dic_news):
start_time = time.time()
try: # 114.116.116.241
producer = KafkaProducer(bootstrap_servers=['1.95.78.131:9092'])
kafka_result = producer.send("researchReportYearTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
dic_result = {
'success': 'ture',
'message': '操作成功',
'code': '200',
}
log.info(dic_result)
# 传输成功,写入日志中
state = 1
takeTime = baseCore.getTimeCost(start_time, time.time())
return True
except Exception as e:
dic_result = {
'success': 'false',
'message': '操作失败',
'code': '204',
'e': e
}
log.error(dic_result)
e = 'Kafka操作失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
return False
def getuuid():
get_timestamp_uuid = uuid.uuid1() # 根据 时间戳生成 uuid , 保证全球唯一
return get_timestamp_uuid
def uptoOBS(retData, pathType, taskType, start_time,file_name,pdf_url):
state = retData['state']
type_id = retData['type_id']
social_code = retData['item_id']
group_name = retData['group_name']
path = retData['path']
full_path = retData['full_path']
category = retData['category']
file_size = retData['file_size']
status = retData['status']
create_by = retData['create_by']
create_time = retData['create_time']
page_size = retData['page_size']
content = retData['content']
header = {
'Connection': 'keep-alive',
'Cache-Control': 'max-age=0',
'sec-ch-ua': '"Chromium";v="112", "Microsoft Edge";v="112", "Not:A-Brand";v="99"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.64',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-User': '?1',
'Sec-Fetch-Dest': 'document',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
}
retData_f = {'state': state, 'type_id': type_id, 'item_id': social_code, 'group_name': group_name, 'path': path,
'full_path': full_path,
'category': category, 'file_size': file_size, 'status': status, 'create_by': create_by,
'create_time': create_time, 'page_size': page_size, 'content': content}
# headers['User-Agent'] = baseCore.getRandomUserAgent()
for i in range(0, 3):
try:
response = requests.get(pdf_url, headers=header, timeout=30)
break
except Exception as e:
time.sleep(3)
continue
name = str(getuuid()) + '.pdf'
now_time = time.strftime("%Y-%m")
print("=-=-=-=-=-=-=-=-=-=-=-=")
print(pathType, now_time, name, response)
try:
name = str(getuuid()) + '.pdf'
now_time = time.strftime("%Y-%m")
print(name)
result = getOBSres(pathType, name, response)
except Exception as e:
log = baseCore.getLogger()
log.error(f'OBS发送失败')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, f'{e}---OBS发送失败')
return retData
with fitz.open(stream=response.content, filetype='pdf') as doc:
page_size = doc.page_count
log = baseCore.getLogger()
log.info(f'当前页码----{page_size}')
for page in doc.pages():
retData_f['content'] += page.get_text()
try:
req = requests.head(pdf_url)
file_size = int(req.headers.get('Content-Length',0))
except:
file_size = 0
if page_size < 1:
# pdf解析失败
# print(f'======pdf解析失败=====')
return retData
try:
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
retData_f['state'] = True
retData_f['path'] = unquote(result['body']['objectUrl'].split('.com')[1])
retData_f['full_path'] = unquote(result['body']['objectUrl'])
retData_f['create_time'] = time_now
retData_f['file_size'] = baseCore.convert_size(file_size)
retData_f['page_size'] = page_size
print(retData_f['full_path'])
except Exception as e:
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, f'{e}')
return retData_f
print("--------------------------")
print(retData_f)
return retData_f
@retry(tries=3, delay=1)
def getOBSres(pathType, name, response):
result = obsClient.putContent('zzsn', pathType + name, content=response.content)
# result = obsClient.putFile('zzsn', pathType+name, file_path=response)
print(result)
return result
if __name__=='__main__':
# # while True:
df = pd.read_excel('jytest.xlsx', header=0)
df.insert(15, "是否更新成功", '')
# 假设我们要筛选列名为'ColumnA'的值等于某个特定值,例如'ValueToFilter'
filter_value = 1
filtered_df = df[df['批次'] == filter_value]
start_time = time.time()
# 加载Excel文件
# df = pd.read_excel('20240904_YJZX_中央集团企业数据采集结果_中国外汇交易中心v1(1).xlsx')
# 遍历筛选后的DataFrame
row_list = []
for index, row in filtered_df.iterrows():
msg=row.tolist()
df.at[index, "是否更新成功"] = 1
# row_list.append(row.tolist())
sql="select SecuritiesShortName from EnterpriseInfo where SocialCode=%s"
cursor.execute(sql,msg[1])
result = cursor.fetchone()
try:
if not result:
data = [msg[0], msg[1], msg[2], msg[2], msg[6], msg[10], msg[4]]
elif not result[0]:
data=[msg[0],msg[1],msg[2],msg[2],msg[6],msg[10],msg[4]]
# print(data)
else:
data=[msg[0],msg[1],msg[2],result[0],msg[6],msg[10],msg[4]]
# 处理每一行的数据
except:
data = [msg[0], msg[1], msg[2], result[0], msg[6], msg[10], msg[4]]
num = 1
if data:
social_code = data[1]
ename = data[2]
cname = data[3]
year = str(int(data[4]))
pdf_url = data[5]
pulish_time= data[6]
file_name = cname +":"+ year +'年年度报告'
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': '', 'path': '',
'full_path': '',
'category': 'pdf', 'file_size': '', 'status': 1, 'create_by': create_by,
'create_time': time_now, 'page_size': '', 'content': ''}
log.info(f'开始处理{ename}---{social_code}')
print(retData)
#文件上传至文件服务器
#annualreports 英国的年报来源
try:
obsfilename=file_name +'.pdf'
retData_f = uptoOBS(retData, pathType, taskType, start_time,obsfilename,pdf_url)
if retData_f['state']:
content = retData_f['content']
page_size = retData_f['page_size']
log.info(f'当前页数为{page_size}')
#retData, com_name, year, pdf_name, num, pub_time
cname="中国外汇交易中心"
att_id= baseCore.tableUpdate(retData_f, year,obsfilename, num,pulish_time,cname)
if att_id:
dic_news = {
'attachmentIds': att_id,
'author': '',
'content': content,
'contentWithTag': '',
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': 'zh',
'origin': '企业官网',
'publishDate': year + '-12-31',
'sid': '1684032033495392257',
'sourceAddress': pdf_url, # 原文链接
'summary': '',
'title': file_name,
'type': 1,
'socialCreditCode': social_code,
'year': year
}
print("===============================")
if sendKafka(dic_news):
log.info(f'kafka发送成功')
#100表示成功
# updatesql = f"update fbspdfinfo set state=100 where pdf_url = '{pdf_url}'"
# cursor.execute(updatesql)
# cnx.commit()
df.at[index, "是否更新成功"] = 1
else:
#删除插入的数据 400表示发送数据失败
baseCore.deliteATT(att_id)
log.info(f'已删除插入附件表的数据---{pdf_url}-----{social_code}')
# updatesql = f"update fbspdfinfo set state=400 where pdf_url = '{pdf_url}'"
# cursor.execute(updatesql)
# cnx.commit()
df.at[index, "是否更新成功"] = 2
except Exception as e:
log.info(f'error------{e}')
df.to_excel('jytest.xlsx', index=False)
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论