提交 1a56ae4f 作者: 薛凌堃

Merge remote-tracking branch 'origin/master'

......@@ -8,7 +8,6 @@ import pymysql
import redis
import requests
from bs4 import BeautifulSoup
from requests.adapters import HTTPAdapter
from requests.packages import urllib3
from retry import retry
from base import BaseCore
......@@ -20,10 +19,6 @@ cnx = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988',
charset='utf8mb4')
cursor = cnx.cursor()
r = baseCore.r
URL = 'https://www.nasdaq.com/'
session = requests.session()
session.mount('https://', HTTPAdapter(pool_connections=20, pool_maxsize=100))
session.mount('http://', HTTPAdapter(pool_connections=20, pool_maxsize=100))
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36',
}
......@@ -65,7 +60,6 @@ def add_date(com_code, date_list):
# 数据发送端口
def sendData(start_time, social_code, gpdm, dic_info):
data = json.dumps(dic_info)
# print(data)
url_baocun = 'http://114.115.236.206:8088/sync/finance/nsdk'
for nnn in range(0, 3):
try:
......@@ -106,7 +100,7 @@ def getlist(table, tableName):
else:
value = '-'
date_ = years[f'value{i}']
if date_ :
if date_:
date = date_.split('/')[2] + '-' + date_.split('/')[0] + '-' + \
date_.split('/')[1]
list.append({f'{tableName}': name, 'value': value, 'date': date, })
......@@ -139,13 +133,12 @@ def reviseData(lists, unit, tableName):
# 获取年度财务数据
def getYear(start_time, session, social_code, gpdm):
def getYear(start_time, social_code, gpdm):
ynFirst = check_code(social_code)
date_list = []
url = f'https://api.nasdaq.com/api/company/{gpdm}/financials?frequency=1'
try:
req = session.get(url, headers=headers, verify=False)
req.encoding = req.apparent_encoding
req = requests.get(url, headers=headers, verify=False)
data = req.json()['data']
if data:
unit = getUnit(gpdm)
......@@ -188,8 +181,8 @@ def getYear(start_time, session, social_code, gpdm):
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===无年度财务数据')
except:
log.error(f'{social_code}===年度财务数据访问失败')
except Exception as e:
r.rpush('FinanceFromNasdaq:nasdaqfinance_socialCode', social_code)
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===年度财务数据访问失败')
......@@ -198,13 +191,12 @@ def getYear(start_time, session, social_code, gpdm):
# 获取季度财务数据 需要判断日期是否取与年度数据日期重合,重合需要修改类型为dateFlag字段为year
def getQuarter(start_time, session, social_code, gpdm):
def getQuarter(start_time, social_code, gpdm):
ynFirst = check_code(social_code)
date_list = []
url = f'https://api.nasdaq.com/api/company/{gpdm}/financials?frequency=2'
try:
req = session.get(url, headers=headers, verify=False)
req.encoding = req.apparent_encoding
req = requests.get(url, headers=headers, verify=False, timeout=60)
data = req.json()['data']
if data:
unit = getUnit(gpdm)
......@@ -250,8 +242,9 @@ def getQuarter(start_time, session, social_code, gpdm):
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===无季度财务数据')
except:
log.error(f'{social_code}===季度财务数据访问失败')
except Exception as e:
r.rpush('FinanceFromNasdaq:nasdaqfinance_socialCode', social_code)
log.error(f'{social_code}=={gpdm}===季度财务数据访问失败')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===季度财务数据访问失败')
......@@ -259,8 +252,9 @@ def getQuarter(start_time, session, social_code, gpdm):
return date_list
# 信用代码放入redis中
def FinanceFromNasdaq():
sql = "select xydm from mgzqyjwyh_list where state=2 and exchange='Nasdaq';"
sql = "select xydm from mgzqyjwyh_list where state=2 and exchange='Nasdaq;"
cursor.execute(sql)
finance = cursor.fetchall()
finance_list = [item[0] for item in finance]
......@@ -268,14 +262,15 @@ def FinanceFromNasdaq():
r.rpush('FinanceFromNasdaq:nasdaqfinance_socialCode', item)
print('redis放入成功')
def getInfomation(social_code):
sql = f"select * from mgzqyjwyh_list where state=2 and xydm='{social_code}';"
cursor.execute(sql)
data = cursor.fetchone()
return data
def doJob():
session.get(URL, headers=headers)
while True:
social_code = baseCore.redicPullData('FinanceFromNasdaq:nasdaqfinance_socialCode')
if not social_code or social_code == None:
......@@ -288,17 +283,18 @@ def doJob():
social_code = data_enterprise[6]
# print(gpdm,social_code)
# 采集年度数据
date_list_year = getYear(start_time, session, social_code, gpdm)
date_list_year = getYear(start_time, social_code, gpdm)
# 保存年度数据到redis
add_date(social_code, date_list_year)
# 采集季度数据
date_list_quarter = getQuarter(start_time, session, social_code, gpdm)
date_list_quarter = getQuarter(start_time, social_code, gpdm)
# 保存季度数据到redis
add_date(social_code, date_list_quarter)
timeCost = baseCore.getTimeCost(start_time, time.time())
state = 1
baseCore.recordLog(social_code, taskType, state, timeCost, '', '')
log.info(f'{social_code}=={gpdm}==耗时{timeCost}')
time.sleep(2)
if __name__ == '__main__':
......
"""
新浪财经美股企业动态
"""
import json
import time
import jieba
import requests
from bs4 import BeautifulSoup
from kafka import KafkaProducer
from retry import retry
from base.smart import smart_extractor
from base.BaseCore import BaseCore
# 初始化,设置中文分词
jieba.cut("必须加载jieba")
smart = smart_extractor.SmartExtractor('cn')
baseCore = BaseCore()
log = baseCore.getLogger()
cnx = baseCore.cnx
cursor = baseCore.cursor
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36',
'Cache-Control': 'no-cache',
'Pragma': 'no-cache'
}
taskType = '新浪财经/天眼查'
# 获取企业信息
def getinfomation(social_code):
selectSql = f"select * from mgzqjywyh_list where state = '2' and xydm='{social_code}' "
cursor.execute(selectSql)
......@@ -18,17 +39,182 @@ def getinfomation(social_code):
cnx.close()
return data
# 获取响应页面
@retry(tries=3, delay=1)
def getrequests(url):
req = requests.get(url)
soup = BeautifulSoup(req.text,'html.parser')
req = requests.get(url, headers=headers)
req.encoding = req.apparent_encoding
soup = BeautifulSoup(req.text, 'html.parser')
return soup
if __name__ == "__main__":
social_code = ''
#从redis中获取企业信用代码
data = getinfomation(social_code)
com_code = data[6]
#拼接链接
# url = f'http://biz.finance.sina.com.cn/usstock/usstock_news.php?symbol={com_code}'
url = 'http://biz.finance.sina.com.cn/usstock/usstock_news.php?symbol=AAPL'
soup = getrequests(url)
# 解析内容
def getDic(social_code, li):
start_time = time.time()
title = li.find('a').text
href = li.find('a').get('href')
tag_at = li.find('span', class_='xb_list_r').text
author = tag_at.split('|')[0].lstrip().strip()
pub_time = tag_at.split('|')[1].lstrip().strip()
pub_time = pub_time.split(' ')[0].replace('年', '-').replace('月', '-').replace('日', '')
if 'http' not in href:
href = 'https://finance.sina.com.cn' + href
href_ = href.replace('https', 'http')
try:
# 带标签正文
contentText = smart.extract_by_url(href_).text
# 不带标签正文
content = smart.extract_by_url(href_).cleaned_text
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
except:
log.error(f'{href}===页面解析失败')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===页面解析失败')
return
dic_news = {
'attachmentIds': '',
'author': author,
'content': content,
'contentWithTag': contentText,
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': 'zh',
'origin': '新浪财经',
'publishDate': pub_time,
'sid': '1684032033495392257',
'sourceAddress': href, # 原文链接
'summary': '',
'title': title,
'type': 2,
'socialCreditCode': social_code,
'year': pub_time[:4]
}
# print(dic_news)
try:
sendKafka(dic_news, start_time)
log.info(f'Kafka发送成功')
try:
insertMysql(social_code, href)
log.info(f'数据库保存成功')
except:
log.error(f'{href}===数据入库失败')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===数据入库失败')
except:
log.error(f'{href}===发送Kafka失败')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, href, f'{href}===发送Kafka失败')
# 数据发送至Kafka
@retry(tries=3, delay=1)
def sendKafka(dic_news, start_time):
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("researchReportTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
dic_result = {
'success': 'ture',
'message': '操作成功',
'code': '200',
}
log.info(dic_result)
# 传输成功,写入日志中
state = 1
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(dic_news['socialCreditCode'], taskType, state, takeTime, dic_news['sourceAddress'], '')
# 数据保存入库,用于判重
@retry(tries=3, delay=1)
def insertMysql(social_code, link):
insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,create_time) values(%s,%s,%s,%s,now())'''
# 动态信息列表
list_info = [
social_code,
link,
'新浪财经',
'2',
]
cursor.execute(insert_sql, tuple(list_info))
cnx.commit()
# 判断动态是否采集过
@retry(tries=3, delay=1)
def selectUrl(url, social_code):
sel_sql = '''select social_credit_code from brpa_source_article where source_address = %s and social_credit_code=%s and type='2' '''
cursor.execute(sel_sql, (url, social_code))
selects = cursor.fetchone()
return selects
def doJob():
# while True:
# social_code = ''
# # 从redis中获取企业信用代码
# try:
# data = getinfomation(social_code)
# com_code = data[6]
com_code = 'AAPL'
social_code = 'ZZSN22080900000004'
log.info(f'{social_code}==={com_code}===开始采集')
start_time = time.time()
pageIndex = 1
while True:
# 拼接链接
# url = 'http://biz.finance.sina.com.cn/usstock/usstock_news.php?pageIndex=1&symbol=AAPL&type=1'
url = f'http://biz.finance.sina.com.cn/usstock/usstock_news.php?pageIndex={pageIndex}&symbol={com_code}&type=1'
soup_home = getrequests(url)
li_list = soup_home.select('body > div > div.xb_news > ul > li')
# 有可能第一次获取的li标签列表为空
for i in range(5):
if len(li_list) == 0:
li_list = soup_home.select('body > div > div.xb_news > ul > li')
else:
break
for li in li_list:
title = li.find('a').text
if title == '':
continue
href = li.find('a').get('href')
selects = selectUrl(href, social_code)
if selects:
log.info(f'{url}==已采集过')
else:
getDic(social_code, li)
break
break
# # 如果采集到已采集过动态,证明最新发布动态已经全部采集过
# 增量使用
# if selects:
# break
next = soup_home.select('body > div > div.xb_news > div.xb_pages > a')
for i in range(5):
if len(next) == 0:
next = soup_home.select('body > div > div.xb_news > div.xb_pages > a')
else:
break
if len(next) == 2:
break
pageIndex += 1
time.sleep(2)
log.info(f'{social_code}==={com_code}===企业整体耗时{baseCore.getTimeCost(start_time,time.time())}')
# except:
# log.info(f'==={social_code}=====获取企业信息失败====')
# #重新塞入redis
# baseCore.rePutIntoR('NewsEnterprise:gnqy_socialCode',social_code)
# state = 0
# takeTime = baseCore.getTimeCost(start, time.time())
# baseCore.recordLog(social_code, taskType, state, takeTime, '', f'获取企业信息失败--{e}')
# time.sleep(5)
if __name__ == "__main__":
doJob()
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论