提交 13609e3d 作者: 薛凌堃

雪球网年报

上级 d30620e6
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
......@@ -35,8 +35,10 @@ chromedriver = r'D:/cmd100/chromedriver.exe'
browser = webdriver.Chrome(chrome_options=opt, executable_path=chromedriver)
log = baseCore.getLogger()
requests.adapters.DEFAULT_RETRIES = 3
cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
#11数据库
cnx = baseCore.cnx_
cursor = baseCore.cursor_
#144数据库
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
......@@ -159,94 +161,92 @@ def spider_annual_report(dict_info,num):
# name_pdf = f"{com_name}:{year}年年报.pdf".replace('*', '')
# name_pdf = pdf_name_a + '.pdf'
with cnx.cursor() as cursor:
if '年' in year:
year = year.split('年')[0]
else:
if '年' in year:
year = year.split('年')[0]
else:
pass
sel_sql = '''select item_id,year from clb_sys_attachment where item_id = %s and year = %s and type_id="1" '''
cursor.execute(sel_sql, (social_code, int(year)))
selects = cursor.fetchone()
if selects:
log.info(f'com_name:{com_name}、{year}已存在')
continue
else:
#上传文件至obs服务器
retData = baseCore.uptoOBS(pdf_url,name_pdf,1,social_code,pathType,taskType,start_time,'XueLingKun')
if retData['state']:
pass
sel_sql = '''select item_id,year from clb_sys_attachment where item_id = %s and year = %s and type_id="1" '''
cursor.execute(sel_sql, (social_code, int(year)))
selects = cursor.fetchone()
if selects:
log.info(f'com_name:{com_name}、{year}已存在')
continue
else:
#上传文件至obs服务器
retData = baseCore.uptoOBS(pdf_url,name_pdf,1,social_code,pathType,taskType,start_time,'XueLingKun')
if retData['state']:
pass
else:
log.info(f'====pdf解析失败====')
continue
num = num + 1
try:
origin = '雪球网'
att_id = baseCore.tableUpdate(retData,com_name,year,name_pdf,num,pub_time,origin)
content = retData['content']
state = 1
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, year_url, '成功')
except Exception as e:
exception = '数据库传输失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, year_url, f'{exception} - --{e}')
return False
#发送数据到kafka
lang = baseCore.detect_language(content)
if lang == 'cn':
lang = 'zh'
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_news = {
'attachmentIds': att_id,
'author': '',
'content': content,
'contentWithTag': '',
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': lang,
'origin': origin,
'publishDate': datetime_string,
'sid': '1684032033495392257',
'sourceAddress': year_url, # 原文链接
'summary': '',
'title': name_pdf.replace(',pdf', ''),
'type': 1,
'socialCreditCode': social_code,
'year': year
}
# 将相应字段通过kafka传输保存
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'],max_request_size=1024*1024*20)
kafka_result = producer.send("researchReportTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
log.info(f'====pdf解析失败====')
continue
num = num + 1
try:
origin = '雪球网'
att_id = baseCore.tableUpdate(retData,com_name,year,name_pdf,num,pub_time,origin)
content = retData['content']
state = 1
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, year_url, '成功')
except Exception as e:
exception = '数据库传输失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, year_url, f'{exception} - --{e}')
return False
#发送数据到kafka
lang = baseCore.detect_language(content)
if lang == 'cn':
lang = 'zh'
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_news = {
'attachmentIds': att_id,
'author': '',
'content': content,
'contentWithTag': '',
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': lang,
'origin': origin,
'publishDate': datetime_string,
'sid': '1684032033495392257',
'sourceAddress': year_url, # 原文链接
'summary': '',
'title': name_pdf.replace(',pdf', ''),
'type': 1,
'socialCreditCode': social_code,
'year': year
}
# 将相应字段通过kafka传输保存
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'],max_request_size=1024*1024*20)
kafka_result = producer.send("researchReportTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
print(kafka_result.get(timeout=10))
dic_result = {
'success': 'ture',
'message': '操作成功',
'code': '200',
}
log.info(dic_result)
# return True
except Exception as e:
dic_result = {
'success': 'false',
'message': '操作失败',
'code': '204',
'e': e
}
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, 'Kafka操作失败')
log.info(dic_result)
return False
dic_result = {
'success': 'ture',
'message': '操作成功',
'code': '200',
}
log.info(dic_result)
# return True
except Exception as e:
dic_result = {
'success': 'false',
'message': '操作失败',
'code': '204',
'e': e
}
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, 'Kafka操作失败')
log.info(dic_result)
return False
# num = num + 1
time.sleep(2)
time.sleep(2)
# browser.quit()
return True
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论