提交 127b1931 作者: 薛凌堃

雪球网年报

上级 335b0090
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
"""
从数据库中读取年报缺失年份,采集对应网站上的年报,存在两种情况,标题中有年份,标题中无年份。
如果标题中有年份的话,按照原方式命名,有年份的应该都已经采过,跳过不插入更新
如果标题中无年份的话,则解析正文内容,正则表达式匹配年份,
采集一条,state 加1 如果报错的话就将state改为100,单独处理。
"""
import json
from kafka import KafkaProducer
from base.BaseCore import BaseCore
baseCore = BaseCore()
import requests, re, time, pymysql, fitz
from bs4 import BeautifulSoup as bs
from selenium import webdriver
chromedriver = "D:/chrome/chromedriver.exe"
browser = webdriver.Chrome(chromedriver)
from fdfs_client.client import get_tracker_conf, Fdfs_client
log = baseCore.getLogger()
requests.adapters.DEFAULT_RETRIES = 3
# conn = cx_Oracle.connect('cis/ZZsn9988_1qaz@114.116.91.1:1521/orcl')
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
# cnx_ = pymysql.connect(host='114.115.159.144', user='root', password='zzsn9988', db='caiji', charset='utf8mb4')
# # cnx_ip = pymysql.connect(host='114.115.159.144',user='root', password='zzsn9988', db='clb_project', charset='utf8mb4')
# cursor_ = cnx_.cursor()
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36",
}
def clean_text(text):
"""
清理多余空行
:param text:
:return:
"""
soup = bs(text, 'html.parser')
# print(soup.get_text())
text = soup.get_text()
# str1 = re.sub('[\n]+', '\n', 'dfadf d\n \n\n \nfa ds ')
text_ = re.sub('\n+', '\n', text.replace('\t', '').replace('\r', ''))
return text_
def spider_annual_report(dict_info,num):
social_code = dict_info['social_code']
com_name = dict_info['com_name']
code = dict_info['code']
url_1 = f'https://vip.stock.finance.sina.com.cn/corp/go.php/vCB_Bulletin/stockid/{code}/page_type/ndbg.phtml'
browser.get(url_1)
time.sleep(3)
page_source = browser.page_source
soup = bs(page_source, 'html.parser')
# res_1 = requests.get(url_1, proxies=ip)
# soup = bs(res_1.content, 'html.parser')
try:
list_all = soup.find('div', {'class': 'datelist'}).find_all('a')
except:
log.info(f'{social_code}.........年度报告列表为空')
exception = '年度报告列表为空'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', exception)
return
for i in list_all:
# ip = get_proxy()[random.randint(0, 3)]
pdf_name_a = i.text
year_url = 'https://vip.stock.finance.sina.com.cn' + i.get('href')
year_name = i.text
browser.get(year_url)
time.sleep(5)
page_source_2 = browser.page_source
# res_2 = requests.get(year_url, proxies=ip)
soup_2 = bs(page_source_2, 'html.parser')
try:
pdf_url = soup_2.find('th', {'style': 'text-align:center'}).find('a').get('href')
except:
#todo:无连接但是有正文内容
log.error(f'{social_code}....{year_url}....无下载链接')
exception = '无下载链接'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, year_url, exception)
continue
#公告日期
pub_time = soup_2.find('td',{'class':'head'}).text.split('公告日期')[1]
try:
# 标题中有年份,
year = re.findall('\d{4}', year_name)[0]
if com_name != 'null':
name_pdf = f"{com_name}:{year}年年报.pdf".replace('*', '')
else:
name_pdf = pdf_name_a + '.pdf'
except:
# 标题中无年份
content = soup_2.find('div', {'id': 'content'}).text
# 清除多余空行
content_c = clean_text(content)
for i in range(0, 4):
# 取第i行的数据
try:
line = content_c.split('\n')[i]
try:
# 正则表达式匹配年份
year_ = re.findall('\d{4}\s*年年度报告', line)[0]
year = re.findall('\d{4}', year_)[0]
if com_name != '':
name_pdf = f"{com_name}:{year}年年报.pdf".replace('*', '')
else:
name_pdf = pdf_name_a + '.pdf'
break
except:
try:
result = soup_2.find('td', class_='head').text
year = str(int(re.findall('\d{4}', result)[0]) - 1)
if com_name != '':
name_pdf = f"{com_name}:{year}年年报.pdf".replace('*', '')
else:
name_pdf = pdf_name_a + '.pdf'
except:
continue
except:
# result = soup_2.find('td', class_='head').text
year = str(int(re.findall('\d{4}', pub_time)[0]) - 1)
if com_name != '':
name_pdf = f"{com_name}:{year}年年报.pdf".replace('*', '')
else:
name_pdf = pdf_name_a + '.pdf'
# name_pdf = f"{com_name}:{year}年年报.pdf".replace('*', '')
# name_pdf = pdf_name_a + '.pdf'
with cnx.cursor() as cursor:
sel_sql = '''select item_id,year from clb_sys_attachment where item_id = %s and year = %s and type_id="1" '''
cursor.execute(sel_sql, (social_code, int(year)))
selects = cursor.fetchone()
if selects:
print(f'com_name:{com_name}、{year}已存在')
continue
else:
page_size = 0
#上传文件至文件服务器
retData = baseCore.upLoadToServe(pdf_url,1,social_code)
num = num + 1
try:
att_id = baseCore.tableUpdate(retData,com_name,year,name_pdf,num)
content = retData['content']
if retData['state']:
pass
else:
log.info(f'====pdf解析失败====')
return False
state = 1
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, year_url, '')
except:
exception = '数据库传输失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, year_url, exception)
#发送数据到kafka
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_news = {
'attachmentIds': att_id,
'author': '',
'content': content,
'contentWithTag': '',
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': 'zh',
'origin': '雪球网',
'publishDate': pub_time,
'sid': '1684032033495392257',
'sourceAddress': year_url, # 原文链接
'summary': '',
'title': name_pdf,
'type': 1,
'socialCreditCode': social_code,
'year': year
}
# 将相应字段通过kafka传输保存
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("researchReportTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
dic_result = {
'success': 'ture',
'message': '操作成功',
'code': '200',
}
print(dic_result)
return True
except Exception as e:
dic_result = {
'success': 'false',
'message': '操作失败',
'code': '204',
'e': e
}
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, 'Kafka操作失败')
print(dic_result)
return False
# num = num + 1
time.sleep(2)
# browser.quit()
#state1
if __name__ == '__main__':
num = 0
taskType = '企业年报/雪球网/福布斯'
while True:
start_time = time.time()
# 获取企业信息
social_code = baseCore.redicPullData('AnnualEnterprise:gnshqy_socialCode')
# social_code = '911100007109288314'
if not social_code:
time.sleep(20)
continue
if social_code == 'None':
time.sleep(20)
continue
if social_code == '':
time.sleep(20)
continue
dic_info = baseCore.getInfomation(social_code)
count = dic_info[15]
code = dic_info[3]
com_name = dic_info[4]
if code is None:
exeception = '股票代码为空'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', exeception)
continue
while True:
if len(code) < 6:
code = "0"+code
else:
break
# years = tuple(call_year)
dict_info = {
'social_code':social_code,
'com_name':com_name,
'code':code,
}
# list_info.append(dict_info)
spider_annual_report(dict_info,num)
count += 1
runType = 'AnnualReportCount'
baseCore.updateRun(social_code, runType, count)
# cursor.close()
cnx_.close()
# 释放资源
baseCore.close()
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论