提交 c3f4587f 作者: 薛凌堃

证监会年报

上级 2d7e135f
import json
import json
from kafka import KafkaProducer
from fdfs_client.client import get_tracker_conf, Fdfs_client
......@@ -9,6 +12,7 @@ from base import BaseCore
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
# conn = cx_Oracle.connect('cis/ZZsn9988_1qaz@114.116.91.1:1521/orcl')
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
cursor_ = cnx.cursor()
......@@ -18,28 +22,6 @@ client = Fdfs_client(tracker_conf)
taskType = '企业年报/证监会'
# def get_proxy():
# cursor = cnx_ip.cursor()
# sql = "select proxy from clb_proxy"
# cursor.execute(sql)
# proxy_lists = cursor.fetchall()
# ip_list = []
# for proxy_ in proxy_lists:
# ip_list.append(str(proxy_).replace("('", '').replace("',)", ''))
# proxy_list = []
# for str_ip in ip_list:
# str_ip_list = str_ip.split('-')
# proxyMeta = "http://%(host)s:%(port)s" % {
# "host": str_ip_list[0],
# "port": str_ip_list[1],
# }
# proxy = {
# "HTTP": proxyMeta,
# "HTTPS": proxyMeta
# }
# proxy_list.append(proxy)
# return proxy_list
def RequestUrl(url, payload, item_id, start_time):
# ip = get_proxy()[random.randint(0, 3)]
......@@ -118,7 +100,7 @@ def SpiderByZJH(url, payload, dic_info, num, start_time):
pdf_url = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[0].strip('\'')
name_pdf = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[1].strip('\'')
# pub_time = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[2].strip('\'')
pub_time = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[2].strip('\'')
# print(name)
report_type = td_list[4].text.strip()
# print(report_type)
......@@ -129,11 +111,11 @@ def SpiderByZJH(url, payload, dic_info, num, start_time):
try:
year = re.findall('\d{4}\s*年', name_pdf)[0].replace('年', '')
except Exception as e:
pub_time = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[2].strip('\'')[:4]
# pub_time = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[2].strip('\'')[:4]
year = int(pub_time) - 1
year = str(year)
page_size = 0
# page_size = 0
sel_sql = '''select item_id,year from clb_sys_attachment where item_id = %s and year = %s'''
cursor_.execute(sel_sql, (item_id, year))
......@@ -142,77 +124,137 @@ def SpiderByZJH(url, payload, dic_info, num, start_time):
print(f'com_name:{short_name}、{year}已存在')
continue
else:
# 类型为年报的话就解析该年报pdf,并入库
for i in range(0, 3):
try:
resp_content = requests.request("GET", pdf_url).content
# 获取pdf页数
with fitz.open(stream=resp_content, filetype='pdf') as doc:
page_size = doc.page_count
break
except Exception as e:
print(e)
time.sleep(3)
continue
if page_size < 1:
# pdf解析失败
print(f'==={short_name}、{year}===pdf解析失败')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(item_id, taskType, state, takeTime, pdf_url, 'pdf解析失败')
continue
result = ''
for i in range(0, 3):
try:
result = client.upload_by_buffer(resp_content, file_ext_name='pdf')
break
except Exception as e:
print(e)
time.sleep(3)
continue
if result == '':
e = '上传服务器失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(item_id, taskType, state, takeTime, pdf_url, e)
continue
if 'Remote file_id' in str(result) and 'Uploaded size' in str(result):
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
type_id = '1'
item_id = dic_info['social_code']
group_name = 'group1'
path = bytes.decode(result['Remote file_id']).replace('group1', '')
full_path = bytes.decode(result['Remote file_id'])
category = 'pdf'
file_size = result['Uploaded size']
order_by = num
status = 1
create_by = 'XueLingKun'
create_time = time_now
page_size = page_size
try:
tableUpdate(year, name_pdf, type_id, item_id, group_name, path, full_path,
category, file_size, order_by, status, create_by, create_time, page_size)
state = 1
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(item_id, taskType, state, takeTime, pdf_url, '')
except:
e = '数据库传输失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(item_id, taskType, state, takeTime, pdf_url, e)
num = num + 1
time.sleep(2)
# # 类型为年报的话就解析该年报pdf,并入库
# for i in range(0, 3):
# try:
# resp_content = requests.request("GET", pdf_url).content
# # 获取pdf页数
# with fitz.open(stream=resp_content, filetype='pdf') as doc:
# page_size = doc.page_count
# break
# except Exception as e:
# print(e)
# time.sleep(3)
# continue
# if page_size < 1:
# # pdf解析失败
# print(f'==={short_name}、{year}===pdf解析失败')
# state = 0
# takeTime = baseCore.getTimeCost(start_time, time.time())
# baseCore.recordLog(item_id, taskType, state, takeTime, pdf_url, 'pdf解析失败')
# continue
# result = ''
# for i in range(0, 3):
# try:
# result = client.upload_by_buffer(resp_content, file_ext_name='pdf')
# break
# except Exception as e:
# print(e)
# time.sleep(3)
# continue
# if result == '':
# e = '上传服务器失败'
# state = 0
# takeTime = baseCore.getTimeCost(start_time, time.time())
# baseCore.recordLog(item_id, taskType, state, takeTime, pdf_url, e)
# continue
#
# if 'Remote file_id' in str(result) and 'Uploaded size' in str(result):
#
# time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
#
# type_id = '1'
# item_id = dic_info['social_code']
# group_name = 'group1'
#
# path = bytes.decode(result['Remote file_id']).replace('group1', '')
# full_path = bytes.decode(result['Remote file_id'])
# category = 'pdf'
# file_size = result['Uploaded size']
# order_by = num
# status = 1
# create_by = 'XueLingKun'
# create_time = time_now
# page_size = page_size
# try:
# tableUpdate(year, name_pdf, type_id, item_id, group_name, path, full_path,
# category, file_size, order_by, status, create_by, create_time, page_size)
# state = 1
# takeTime = baseCore.getTimeCost(start_time, time.time())
# baseCore.recordLog(item_id, taskType, state, takeTime, pdf_url, '')
# except:
# e = '数据库传输失败'
# state = 0
# takeTime = baseCore.getTimeCost(start_time, time.time())
# baseCore.recordLog(item_id, taskType, state, takeTime, pdf_url, e)
# num = num + 1
# time.sleep(2)
# else:
# e = '采集失败'
# state = 0
# takeTime = baseCore.getTimeCost(start_time, time.time())
# baseCore.recordLog(item_id, taskType, state, takeTime, pdf_url, e)
# continue
#上传至文件服务器
retData = baseCore.upLoadToServe(pdf_url, 1, social_code)
#插入数据库获取att_id
num = num + 1
att_id = baseCore.tableUpdate(retData, short_name, year, name_pdf, num)
content = retData['content']
if retData['state']:
pass
else:
e = '采集失败'
log.info(f'====pdf解析失败====')
return False
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_news = {
'attachmentIds': att_id,
'author': '',
'content': content,
'contentWithTag': '',
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': 'zh',
'origin': '证监会',
'publishDate': pub_time,
'sid': '1684032033495392257',
'sourceAddress': '', # 原文链接
'summary': '',
'title': name_pdf,
'type': 1,
'socialCreditCode': social_code,
'year': year
}
# print(dic_news)
# 将相应字段通过kafka传输保存
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("researchReportTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
dic_result = {
'success': 'ture',
'message': '操作成功',
'code': '200',
}
print(dic_result)
return True
except Exception as e:
dic_result = {
'success': 'false',
'message': '操作失败',
'code': '204',
'e': e
}
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(item_id, taskType, state, takeTime, pdf_url, e)
continue
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, 'Kafka操作失败')
print(dic_result)
return False
else:
continue
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论