提交 7449d79d 作者: 薛凌堃

雪球网年报

上级 17e7120f
...@@ -663,7 +663,7 @@ class BaseCore: ...@@ -663,7 +663,7 @@ class BaseCore:
return selects return selects
# 插入到att表 返回附件id # 插入到att表 返回附件id
def tableUpdate(self, retData, com_name, year, pdf_name, num, pub_time): def tableUpdate(self, retData, com_name, year, pdf_name, num, pub_time,origin):
item_id = retData['item_id'] item_id = retData['item_id']
type_id = retData['type_id'] type_id = retData['type_id']
group_name = retData['group_name'] group_name = retData['group_name']
...@@ -683,13 +683,13 @@ class BaseCore: ...@@ -683,13 +683,13 @@ class BaseCore:
id = '' id = ''
return id return id
else: else:
Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,page_size,object_key,bucket_name,publish_time) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''' Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,page_size,object_key,bucket_name,publish_time,source) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
values = ( values = (
year, pdf_name, type_id, item_id, group_name, path, full_path, category, file_size, order_by, year, pdf_name, type_id, item_id, group_name, path, full_path, category, file_size, order_by,
status, create_by, status, create_by,
create_time, page_size, full_path.split('https://zzsn.obs.cn-north-1.myhuaweicloud.com/')[1], 'zzsn', create_time, page_size, full_path.split('https://zzsn.obs.cn-north-1.myhuaweicloud.com/')[1], 'zzsn',
pub_time) pub_time,origin)
self.cursor_.execute(Upsql, values) # 插入 self.cursor_.execute(Upsql, values) # 插入
self.cnx_.commit() # 提交 self.cnx_.commit() # 提交
......
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
...@@ -6,7 +6,9 @@ ...@@ -6,7 +6,9 @@
采集一条,state 加1 如果报错的话就将state改为100,单独处理。 采集一条,state 加1 如果报错的话就将state改为100,单独处理。
""" """
import json import json
from datetime import datetime
from kafka import KafkaProducer from kafka import KafkaProducer
from base.BaseCore import BaseCore from base.BaseCore import BaseCore
...@@ -31,19 +33,13 @@ opt.add_experimental_option('useAutomationExtension', False) ...@@ -31,19 +33,13 @@ opt.add_experimental_option('useAutomationExtension', False)
opt.binary_location = r'D:/Google/Chrome/Application/chrome.exe' opt.binary_location = r'D:/Google/Chrome/Application/chrome.exe'
chromedriver = r'D:/cmd100/chromedriver.exe' chromedriver = r'D:/cmd100/chromedriver.exe'
browser = webdriver.Chrome(chrome_options=opt, executable_path=chromedriver) browser = webdriver.Chrome(chrome_options=opt, executable_path=chromedriver)
from fdfs_client.client import get_tracker_conf, Fdfs_client
log = baseCore.getLogger() log = baseCore.getLogger()
requests.adapters.DEFAULT_RETRIES = 3 requests.adapters.DEFAULT_RETRIES = 3
# conn = cx_Oracle.connect('cis/ZZsn9988_1qaz@114.116.91.1:1521/orcl')
cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4') cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
cnx_ = baseCore.cnx cnx_ = baseCore.cnx
cursor_ = baseCore.cursor cursor_ = baseCore.cursor
# cnx_ = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='caiji', charset='utf8mb4')
# # cnx_ip = pymysql.connect(host='114.115.159.144',user='caiji', password='zzsn9988', db='clb_project', charset='utf8mb4')
# cursor_ = cnx_.cursor()
headers = { headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36",
...@@ -107,15 +103,20 @@ def spider_annual_report(dict_info,num): ...@@ -107,15 +103,20 @@ def spider_annual_report(dict_info,num):
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, year_url, exception) baseCore.recordLog(social_code, taskType, state, takeTime, year_url, exception)
continue continue
#公告日期 #日期
pub_time = soup_2.find('td',{'class':'head'}).text.split('公告日期:')[1] pub_time = soup_2.find('td',{'class':'head'}).text.split('公告日期:')[1]
# 将时间年月日字符串转换为datetime对象
date_object = datetime.strptime(pub_time, "%Y-%m-%d")
# 将datetime对象转换为年月日时分秒字符串
datetime_string = date_object.strftime("%Y-%m-%d %H:%M:%S")
try: try:
# 标题中有年份, # 标题中有年份,
year = re.findall('\d{4}\s*年', year_name)[0] year = re.findall('\d{4}\s*年', year_name)[0]
if com_name != 'null': if com_name != 'null':
name_pdf = f"{com_name}:{year}年.pdf".replace('*', '') name_pdf = f"{com_name}:{year}年度报告.pdf".replace('*', '')
else: else:
name_pdf = pdf_name_a + '.pdf' name_pdf = pdf_name_a + '.pdf'
except: except:
...@@ -134,7 +135,7 @@ def spider_annual_report(dict_info,num): ...@@ -134,7 +135,7 @@ def spider_annual_report(dict_info,num):
year = re.findall('\d{4}', year_)[0] year = re.findall('\d{4}', year_)[0]
if com_name != '': if com_name != '':
name_pdf = f"{com_name}:{year}年年.pdf".replace('*', '') name_pdf = f"{com_name}:{year}年年度报告.pdf".replace('*', '')
else: else:
name_pdf = pdf_name_a + '.pdf' name_pdf = pdf_name_a + '.pdf'
break break
...@@ -143,7 +144,7 @@ def spider_annual_report(dict_info,num): ...@@ -143,7 +144,7 @@ def spider_annual_report(dict_info,num):
result = soup_2.find('td', class_='head').text result = soup_2.find('td', class_='head').text
year = str(int(re.findall('\d{4}', result)[0]) - 1) year = str(int(re.findall('\d{4}', result)[0]) - 1)
if com_name != '': if com_name != '':
name_pdf = f"{com_name}:{year}年年.pdf".replace('*', '') name_pdf = f"{com_name}:{year}年年度报告.pdf".replace('*', '')
else: else:
name_pdf = pdf_name_a + '.pdf' name_pdf = pdf_name_a + '.pdf'
except: except:
...@@ -152,7 +153,7 @@ def spider_annual_report(dict_info,num): ...@@ -152,7 +153,7 @@ def spider_annual_report(dict_info,num):
# result = soup_2.find('td', class_='head').text # result = soup_2.find('td', class_='head').text
year = str(int(re.findall('\d{4}', pub_time)[0]) - 1) year = str(int(re.findall('\d{4}', pub_time)[0]) - 1)
if com_name != '': if com_name != '':
name_pdf = f"{com_name}:{year}年年.pdf".replace('*', '') name_pdf = f"{com_name}:{year}年年度报告.pdf".replace('*', '')
else: else:
name_pdf = pdf_name_a + '.pdf' name_pdf = pdf_name_a + '.pdf'
# name_pdf = f"{com_name}:{year}年年报.pdf".replace('*', '') # name_pdf = f"{com_name}:{year}年年报.pdf".replace('*', '')
...@@ -179,7 +180,8 @@ def spider_annual_report(dict_info,num): ...@@ -179,7 +180,8 @@ def spider_annual_report(dict_info,num):
return False return False
num = num + 1 num = num + 1
try: try:
att_id = baseCore.tableUpdate(retData,com_name,year,name_pdf,num,pub_time) origin = '雪球网'
att_id = baseCore.tableUpdate(retData,com_name,year,name_pdf,num,pub_time,origin)
content = retData['content'] content = retData['content']
state = 1 state = 1
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
...@@ -191,6 +193,9 @@ def spider_annual_report(dict_info,num): ...@@ -191,6 +193,9 @@ def spider_annual_report(dict_info,num):
baseCore.recordLog(social_code, taskType, state, takeTime, year_url, f'{exception} - --{e}') baseCore.recordLog(social_code, taskType, state, takeTime, year_url, f'{exception} - --{e}')
return False return False
#发送数据到kafka #发送数据到kafka
lang = baseCore.detect_language(content)
if lang == 'cn':
lang = 'zh'
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_news = { dic_news = {
'attachmentIds': att_id, 'attachmentIds': att_id,
...@@ -201,20 +206,20 @@ def spider_annual_report(dict_info,num): ...@@ -201,20 +206,20 @@ def spider_annual_report(dict_info,num):
'deleteFlag': '0', 'deleteFlag': '0',
'id': '', 'id': '',
'keyWords': '', 'keyWords': '',
'lang': 'zh', 'lang': lang,
'origin': '雪球网', 'origin': origin,
'publishDate': pub_time, 'publishDate': datetime_string,
'sid': '1684032033495392257', 'sid': '1684032033495392257',
'sourceAddress': year_url, # 原文链接 'sourceAddress': year_url, # 原文链接
'summary': '', 'summary': '',
'title': name_pdf.split('.pdf')[0], 'title': name_pdf.replace(',pdf', ''),
'type': 1, 'type': 1,
'socialCreditCode': social_code, 'socialCreditCode': social_code,
'year': year 'year': year
} }
# 将相应字段通过kafka传输保存 # 将相应字段通过kafka传输保存
try: try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092']) producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'],max_request_size=1024*1024*20)
kafka_result = producer.send("researchReportTopic", kafka_result = producer.send("researchReportTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8')) json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
...@@ -254,8 +259,8 @@ if __name__ == '__main__': ...@@ -254,8 +259,8 @@ if __name__ == '__main__':
while True: while True:
start_time = time.time() start_time = time.time()
# 获取企业信息 # 获取企业信息
# social_code = baseCore.redicPullData('AnnualEnterprise:gnshqy_socialCode') social_code = baseCore.redicPullData('AnnualEnterprise:gnshqy_socialCode')
social_code = '91100000100003962T' # social_code = '91100000100003962T'
if not social_code: if not social_code:
time.sleep(20) time.sleep(20)
continue continue
...@@ -292,7 +297,7 @@ if __name__ == '__main__': ...@@ -292,7 +297,7 @@ if __name__ == '__main__':
count += 1 count += 1
runType = 'AnnualReportCount' runType = 'AnnualReportCount'
baseCore.updateRun(social_code, runType, count) baseCore.updateRun(social_code, runType, count)
break # break
# cursor.close() # cursor.close()
cnx_.close() cnx_.close()
# 释放资源 # 释放资源
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论