提交 25796701 作者: LiuLiYuan

新浪财经动态 10/08

上级 fae5a5c1
"""
新浪财经美股企业动态
"""
import json
import time
import jieba
import requests
from bs4 import BeautifulSoup
from kafka import KafkaProducer
from retry import retry
from base.smart import smart_extractor
from base.BaseCore import BaseCore
# 初始化,设置中文分词
jieba.cut("必须加载jieba")
smart = smart_extractor.SmartExtractor('cn')
baseCore = BaseCore()
log = baseCore.getLogger()
cnx = baseCore.cnx
cursor = baseCore.cursor
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36',
'Cache-Control': 'no-cache',
'Pragma': 'no-cache'
}
taskType = '新浪财经/天眼查'
# 获取企业信息
def getinfomation(social_code):
selectSql = f"select * from mgzqjywyh_list where state = '2' and xydm='{social_code}' "
cursor.execute(selectSql)
......@@ -18,17 +39,182 @@ def getinfomation(social_code):
cnx.close()
return data
# 获取响应页面
@retry(tries=3, delay=1)
def getrequests(url):
req = requests.get(url)
soup = BeautifulSoup(req.text,'html.parser')
req = requests.get(url, headers=headers)
req.encoding = req.apparent_encoding
soup = BeautifulSoup(req.text, 'html.parser')
return soup
if __name__ == "__main__":
social_code = ''
#从redis中获取企业信用代码
data = getinfomation(social_code)
com_code = data[6]
#拼接链接
# url = f'http://biz.finance.sina.com.cn/usstock/usstock_news.php?symbol={com_code}'
url = 'http://biz.finance.sina.com.cn/usstock/usstock_news.php?symbol=AAPL'
soup = getrequests(url)
# 解析内容
def getDic(social_code, li):
start_time = time.time()
title = li.find('a').text
href = li.find('a').get('href')
tag_at = li.find('span', class_='xb_list_r').text
author = tag_at.split('|')[0].lstrip().strip()
pub_time = tag_at.split('|')[1].lstrip().strip()
pub_time = pub_time.split(' ')[0].replace('年', '-').replace('月', '-').replace('日', '')
if 'http' not in href:
href = 'https://finance.sina.com.cn' + href
href_ = href.replace('https', 'http')
try:
# 带标签正文
contentText = smart.extract_by_url(href_).text
# 不带标签正文
content = smart.extract_by_url(href_).cleaned_text
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
except:
log.error(f'{href}===页面解析失败')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===页面解析失败')
return
dic_news = {
'attachmentIds': '',
'author': author,
'content': content,
'contentWithTag': contentText,
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': 'zh',
'origin': '新浪财经',
'publishDate': pub_time,
'sid': '1684032033495392257',
'sourceAddress': href, # 原文链接
'summary': '',
'title': title,
'type': 2,
'socialCreditCode': social_code,
'year': pub_time[:4]
}
# print(dic_news)
try:
sendKafka(dic_news, start_time)
log.info(f'Kafka发送成功')
try:
insertMysql(social_code, href)
log.info(f'数据库保存成功')
except:
log.error(f'{href}===数据入库失败')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===数据入库失败')
except:
log.error(f'{href}===发送Kafka失败')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===发送Kafka失败')
# 数据发送至Kafka
@retry(tries=3, delay=1)
def sendKafka(dic_news, start_time):
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("researchReportTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
dic_result = {
'success': 'ture',
'message': '操作成功',
'code': '200',
}
log.info(dic_result)
# 传输成功,写入日志中
state = 1
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(dic_news['socialCreditCode'], taskType, state, takeTime, dic_news['sourceAddress'], '')
# 数据保存入库,用于判重
@retry(tries=3, delay=1)
def insertMysql(social_code, link):
insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,create_time) values(%s,%s,%s,%s,now())'''
# 动态信息列表
list_info = [
social_code,
link,
'新浪财经',
'2',
]
cursor.execute(insert_sql, tuple(list_info))
cnx.commit()
# 判断动态是否采集过
@retry(tries=3, delay=1)
def selectUrl(url, social_code):
sel_sql = '''select social_credit_code from brpa_source_article where source_address = %s and social_credit_code=%s and type='2' '''
cursor.execute(sel_sql, (url, social_code))
selects = cursor.fetchone()
return selects
def doJob():
# while True:
# social_code = ''
# # 从redis中获取企业信用代码
# try:
# data = getinfomation(social_code)
# com_code = data[6]
com_code = 'AAPL'
social_code = 'ZZSN22080900000004'
log.info(f'{social_code}==={com_code}===开始采集')
start_time = time.time()
pageIndex = 1
while True:
# 拼接链接
# url = 'http://biz.finance.sina.com.cn/usstock/usstock_news.php?pageIndex=1&symbol=AAPL&type=1'
url = f'http://biz.finance.sina.com.cn/usstock/usstock_news.php?pageIndex={pageIndex}&symbol={com_code}&type=1'
soup_home = getrequests(url)
li_list = soup_home.select('body > div > div.xb_news > ul > li')
# 有可能第一次获取的li标签列表为空
for i in range(5):
if len(li_list) == 0:
li_list = soup_home.select('body > div > div.xb_news > ul > li')
else:
break
for li in li_list:
title = li.find('a').text
if title == '':
continue
href = li.find('a').get('href')
selects = selectUrl(href, social_code)
if selects:
log.info(f'{url}==已采集过')
else:
getDic(social_code, li)
break
break
# # 如果采集到已采集过动态,证明最新发布动态已经全部采集过
# 增量使用
# if selects:
# break
next = soup_home.select('body > div > div.xb_news > div.xb_pages > a')
for i in range(5):
if len(next) == 0:
next = soup_home.select('body > div > div.xb_news > div.xb_pages > a')
else:
break
if len(next) == 2:
break
pageIndex += 1
time.sleep(2)
log.info(f'{social_code}==={com_code}===企业整体耗时{baseCore.getTimeCost(start_time,time.time())}')
# except:
# log.info(f'==={social_code}=====获取企业信息失败====')
# #重新塞入redis
# baseCore.rePutIntoR('NewsEnterprise:gnqy_socialCode',social_code)
# state = 0
# takeTime = baseCore.getTimeCost(start, time.time())
# baseCore.recordLog(social_code, taskType, state, takeTime, '', f'获取企业信息失败--{e}')
# time.sleep(5)
if __name__ == "__main__":
doJob()
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论