提交 1e3071db 作者: 薛凌堃

福布斯 欧盟年报

上级 e7e291e0
"""
读取文件 path = 'D:\kkwork\zzsn_spider\data\'
"""
import json
import os
import time
from kafka import KafkaProducer
from obs import ObsClient
import fitz
from urllib.parse import unquote
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
from retry import retry
obsClient = ObsClient(
access_key_id='VEHN7D0TJ9316H8AHCAV', # 你的华为云的ak码
secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk
server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址
)
import requests
import BaseCore
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
cnx = baseCore.cnx
cursor = baseCore.cursor
pathType = 'QYYearReport/'
type_id = 1
create_by = 'XueLingKun'
taskType = '企业年报'
# file_path = 'D:/kkwork/zzsn_spider/data/1_福布斯2000_PDF_50_郑'
def sendKafka(dic_news):
start_time = time.time()
try: # 114.116.116.241
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("policy",
json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
dic_result = {
'success': 'ture',
'message': '操作成功',
'code': '200',
}
log.info(dic_result)
# 传输成功,写入日志中
state = 1
takeTime = baseCore.getTimeCost(start_time, time.time())
return True
except Exception as e:
dic_result = {
'success': 'false',
'message': '操作失败',
'code': '204',
'e': e
}
log.error(dic_result)
e = 'Kafka操作失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
return False
def uptoOBS(retData, pathType, taskType, start_time,file_name,pdf_url):
state = retData['state']
type_id = retData['type_id']
social_code = retData['item_id']
group_name = retData['group_name']
path = retData['path']
full_path = retData['full_path']
category = retData['category']
file_size = retData['file_size']
status = retData['status']
create_by = retData['create_by']
create_time = retData['create_time']
page_size = retData['page_size']
content = retData['content']
header = {
'Connection': 'keep-alive',
'Cache-Control': 'max-age=0',
'sec-ch-ua': '"Chromium";v="112", "Microsoft Edge";v="112", "Not:A-Brand";v="99"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.64',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-User': '?1',
'Sec-Fetch-Dest': 'document',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
}
retData_f = {'state': state, 'type_id': type_id, 'item_id': social_code, 'group_name': group_name, 'path': path,
'full_path': full_path,
'category': category, 'file_size': file_size, 'status': status, 'create_by': create_by,
'create_time': create_time, 'page_size': page_size, 'content': content}
# headers['User-Agent'] = baseCore.getRandomUserAgent()
for i in range(0, 3):
try:
response = requests.get(pdf_url, headers=header, verify=False, timeout=30)
file_size = int(response.headers.get('Content-Length'))
break
except Exception as e:
time.sleep(3)
continue
try:
result = getOBSres(pathType, file_name, response)
except Exception as e:
log = baseCore.getLogger()
log.error(f'OBS发送失败')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, f'{e}---OBS发送失败')
return retData
page_size = 0
with fitz.open(stream=response.content, filetype='pdf') as doc:
page_size = doc.page_count
for page in doc.pages():
retData_f['content'] += page.get_text()
if page_size < 1:
# pdf解析失败
# print(f'======pdf解析失败=====')
return retData
try:
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
retData_f['state'] = True
retData_f['path'] = unquote(result['body']['objectUrl'].split('.com')[1])
retData_f['full_path'] = unquote(result['body']['objectUrl'])
retData_f['create_time'] = time_now
retData_f['file_size'] = baseCore.convert_size(file_size)
retData_f['page_size'] = page_size
except Exception as e:
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, f'{e}')
return retData_f
return retData_f
@retry(tries=3, delay=1)
def getOBSres(pathType, name, response):
result = obsClient.putContent('zzsn', pathType + name, content=response.content)
# result = obsClient.putFile('zzsn', pathType+name, file_path=response)
return result
if __name__=='__main__':
while True:
start_time = time.time()
id = baseCore.redicPullData('fbspdfinfo:id')
# id = 514
selectsql = f"select * from fbspdfinfo where id={id}"
cursor.execute(selectsql)
data = cursor.fetchone()
num = 1
if data:
social_code = data[1]
ename = data[2]
cname = data[3]
year = data[4]
pdf_url = data[5]
file_name = cname + year +'年年度报告' +'.pdf'
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': '', 'path': '',
'full_path': '',
'category': 'pdf', 'file_size': '', 'status': 1, 'create_by': create_by,
'create_time': time_now, 'page_size': '', 'content': ''}
log.info(f'开始处理{ename}---{social_code}')
#文件上传至文件服务器
#annualreports 英国的年报来源
try:
retData_f = uptoOBS(retData, pathType, taskType, start_time,file_name,pdf_url)
if retData_f['state']:
content = retData_f['content']
#retData, com_name, year, pdf_name, num, pub_time
att_id= baseCore.tableUpdate(retData_f, cname,year,file_name, num,time_now)
if att_id:
dic_news = {
'attachmentIds': att_id,
'author': '',
'content': content,
'contentWithTag': '',
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': 'zh',
'origin': '企业官网',
'publishDate': time_now,
'sid': '1684032033495392257',
'sourceAddress': pdf_url, # 原文链接
'summary': '',
'title': file_name,
'type': 1,
'socialCreditCode': social_code,
'year': year
}
if sendKafka(dic_news):
#100表示成功
updatesql = f"update fbspdfinfo set state=100 where pdf_url = '{pdf_url}'"
cursor.execute(updatesql)
cnx.commit()
else:
#删除插入的数据 400表示发送数据失败
baseCore.deliteATT(att_id)
log.info(f'已删除插入附件表的数据---{pdf_url}-----{social_code}')
updatesql = f"update fbspdfinfo set state=400 where pdf_url = '{pdf_url}'"
cursor.execute(updatesql)
cnx.commit()
except Exception as e:
log.info(f'error------{e}')
\ No newline at end of file
"""
读取文件 path = 'D:\kkwork\zzsn_spider\data\'
"""
import json
import os
import time
from kafka import KafkaProducer
from obs import ObsClient
import fitz
from urllib.parse import unquote
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
from retry import retry
obsClient = ObsClient(
access_key_id='VEHN7D0TJ9316H8AHCAV', # 你的华为云的ak码
secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk
server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址
)
import requests
import BaseCore
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
cnx = baseCore.cnx
cursor = baseCore.cursor
pathType = 'QYYearReport/'
type_id = 1
create_by = 'XueLingKun'
taskType = '企业年报'
file_path = 'D:/kkwork/zzsn_spider/data/1_福布斯2000_PDF_50_郑'
def sendKafka(dic_news):
start_time = time.time()
try: # 114.116.116.241
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("policy",
json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
dic_result = {
'success': 'ture',
'message': '操作成功',
'code': '200',
}
log.info(dic_result)
# 传输成功,写入日志中
state = 1
takeTime = baseCore.getTimeCost(start_time, time.time())
return True
except Exception as e:
dic_result = {
'success': 'false',
'message': '操作失败',
'code': '204',
'e': e
}
log.error(dic_result)
e = 'Kafka操作失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, f'{e}')
return False
def uptoOBS(retData, pathType, taskType, start_time,file_name,pdf_url):
state = retData['state']
type_id = retData['type_id']
social_code = retData['item_id']
group_name = retData['group_name']
path = retData['path']
full_path = retData['full_path']
category = retData['category']
file_size = retData['file_size']
status = retData['status']
create_by = retData['create_by']
create_time = retData['create_time']
page_size = retData['page_size']
content = retData['content']
headers = {}
retData_f = {'state': state, 'type_id': type_id, 'item_id': social_code, 'group_name': group_name, 'path': path,
'full_path': full_path,
'category': category, 'file_size': file_size, 'status': status, 'create_by': create_by,
'create_time': create_time, 'page_size': page_size, 'content': content}
headers['User-Agent'] = baseCore.getRandomUserAgent()
for i in range(0, 3):
try:
response = requests.get(pdf_url, headers=headers, verify=False, timeout=30)
file_size = int(response.headers.get('Content-Length'))
break
except Exception as e:
time.sleep(3)
continue
try:
result = getOBSres(pathType, file_name, response)
except:
log = baseCore.getLogger()
log.error(f'OBS发送失败')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, f'{e}---OBS发送失败')
return retData
page_size = 0
with fitz.open(stream=response.content, filetype='pdf') as doc:
page_size = doc.page_count
for page in doc.pages():
retData_f['content'] += page.get_text()
if page_size < 1:
# pdf解析失败
# print(f'======pdf解析失败=====')
return retData
try:
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
retData_f['state'] = True
retData_f['path'] = unquote(result['body']['objectUrl'].split('.com')[1])
retData_f['full_path'] = unquote(result['body']['objectUrl'])
retData_f['create_time'] = time_now
retData_f['file_size'] = baseCore.convert_size(file_size)
retData_f['page_size'] = page_size
except Exception as e:
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, f'{e}')
return retData_f
return retData_f
@retry(tries=3, delay=1)
def getOBSres(pathType, name, response):
result = obsClient.putContent('zzsn', pathType + name, content=response.content)
# result = obsClient.putFile('zzsn', pathType+name, file_path=response)
return result
if __name__=='__main__':
while True:
start_time = time.time()
id = baseCore.redicPullData('ompdfurlinfo:id')
# id = 514
if id:
pass
else:
log.info('已没有数据')
break
selectsql = f"select * from omengpdfinfo where id={id}"
cursor.execute(selectsql)
data = cursor.fetchone()
num = 1
if data:
social_code = data[1]
ename = data[2]
cname = data[3]
year = data[4]
pdf_url = data[5]
file_name = ename + year +'年年度报告' +'.pdf'
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
log.info(f'开始处理{ename}---{social_code}')
retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': '', 'path': '',
'full_path': '',
'category': 'pdf', 'file_size': '', 'status': 1, 'create_by': create_by,
'create_time': time_now, 'page_size': '', 'content': ''}
#文件上传至文件服务器
#annualreports 英国的年报来源
try:
retData_f = uptoOBS(retData, pathType, taskType, start_time,file_name,pdf_url)
if retData_f['state']:
content = retData_f['content']
#retData, com_name, year, pdf_name, num, pub_time
att_id= baseCore.tableUpdate(retData_f, cname,year,file_name, num,time_now)
if att_id:
dic_news = {
'attachmentIds': att_id,
'author': '',
'content': content,
'contentWithTag': '',
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': 'zh',
'origin': '企业官网',
'publishDate': time_now,
'sid': '1684032033495392257',
'sourceAddress': pdf_url, # 原文链接
'summary': '',
'title': file_name,
'type': 1,
'socialCreditCode': social_code,
'year': year
}
if sendKafka(dic_news):
updatesql = f"update omengpdfinfo set state=100 where pdf_url = '{pdf_url}'"
cursor.execute(updatesql)
cnx.commit()
else:
# 删除插入的数据 400表示发送数据失败
baseCore.deliteATT(att_id)
log.info(f'已删除插入附件表的数据---{pdf_url}-----{social_code}')
updatesql = f"update omengpdfinfo set state=400 where pdf_url = '{pdf_url}'"
cursor.execute(updatesql)
cnx.commit()
except Exception as e:
log.info(f'error------{e}')
\ No newline at end of file
import json
import os
import time
from kafka import KafkaProducer
from obs import ObsClient
import fitz
from urllib.parse import unquote
from retry import retry
import requests
import BaseCore
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
cnx = baseCore.cnx
cursor = baseCore.cursor
cnx_ = baseCore.cnx_
cursor_ = baseCore.cursor_
pathType = 'QYYearReport/'
type_id = 1
create_by = 'XueLingKun'
taskType = '企业年报'
file_path = 'D:/kkwork/zzsn_spider/data/1_福布斯2000_PDF_50_郭'
file_list = os.listdir(file_path)
# print(pdf_list)
for file in file_list:
num = 1
start_time = time.time()
pdf_path = file_path + '/'+file
file_rank = int(file.split('-')[0])
file_year = file.split('-')[1]
file_name = file.split('-')[2].strip('.pdf') + file_year +'年年度报告' + '.pdf'
#file_rank 对应上企业信用代码
selectsql = f"select * from rankandcode where id = {file_rank}"
cursor.execute(selectsql)
data = cursor.fetchone()
cnx.commit()
social_code = data[1]
ename = data[2]
cname = data[3]
content = ''
#解析文件页数和内容
with open(pdf_path, 'rb') as file:
byte_stream = file.read()
# print(byte_stream)
try:
with fitz.open(stream=byte_stream, filetype='pdf') as doc:
# page_size = doc.pageCount
page_size = doc.page_count
print(page_size)
for page in doc.pages():
content += page.get_text()
# print(content)
except Exception as e:
print(f'文件已损坏:{cname}')
continue
#解析文件大小
file_size = os.path.getsize(pdf_path)
file_size = baseCore.convert_size(file_size)
# selectssql = f"select file_size from clb_sys_attachment where file_size='48.00 KB' and name='{file_name}' and item_id = '{social_code}'and create_time>='2023-10-14' "
# cursor_.execute(selectssql)
# result = cursor_.fetchone()
# if result:
# if result[0]=='48.00 KB':
# pass
# else:
# continue
# else:
# continue
if social_code=='ZZSN230824151210788':
continue
updatesql = f"update clb_sys_attachment set file_size='{file_size}' where file_size = '48.00 KB' and name='{file_name}' and item_id = '{social_code}'and create_time>='2023-10-14' "
cursor_.execute(updatesql)
cnx_.commit()
print('更新成功')
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论