提交 693e1afb 作者: LiuLiYuan

纳斯达克企业动态 10/08

上级 25796701
import datetime
import json
import time
import requests
from kafka import KafkaProducer
from retry import retry
from bs4 import BeautifulSoup
from requests.packages import urllib3
from base import BaseCore
urllib3.disable_warnings()
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36',
'Accept': 'application/json, text/plain, */*',
}
baseCore = BaseCore.BaseCore()
cnx = baseCore.cnx
cursor = baseCore.cursor
log = baseCore.getLogger()
r = baseCore.r
taskType = '纳斯达克/企业动态'
# 获取企业基本信息
def getInfomation(social_code):
sql = f"select * from mgzqyjwyh_list where state=2 and xydm='{social_code}';"
cursor.execute(sql)
data = cursor.fetchone()
return data
# 时间转换
def conversionTime(time):
try:
date_obj = datetime.datetime.strptime(time, "%B %d, %Y")
except:
date_obj = datetime.datetime.strptime(time, "%b%d,%Y")
pub_time = date_obj.strftime("%Y-%m-%d")
return pub_time
# 获取总页数
@retry(tries=3, delay=1)
def getTotal(gpdm):
url = f'https://api.nasdaq.com/api/news/topic/articlebysymbol?q={gpdm}|stocks&offset=0&limit=100&fallback=false'
req = requests.get(url, headers=headers, verify=False)
req.encoding = req.apparent_encoding
total = req.json()['data']['totalrecords']
req.close()
return total
# 获取信息列表
@retry(tries=3, delay=1)
def getDataList(gpdm, offest, social_code):
data_list = []
url = f'https://api.nasdaq.com/api/news/topic/articlebysymbol?q={gpdm}|stocks&offset={offest}&limit=100&fallback=false'
# print(url)
req = requests.get(url, headers=headers, verify=False)
req.encoding = req.apparent_encoding
datas = req.json()['data']['rows']
if datas != []:
for data in datas:
title = data['title']
author = data['publisher']
url = data['url']
if 'http' not in url:
url = 'https://www.nasdaq.com' + url
data_list.append([url, title, author, social_code])
req.close()
return data_list
@retry(tries=3, delay=1)
def getsoup(url):
req = requests.get(url, headers=headers, verify=False)
# req = session.get(url)
req.encoding = req.apparent_encoding
soup = BeautifulSoup(req.text, 'lxml')
return soup
# 页面A类型解析
def getDicA(data, soup):
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
url = data[0]
pub_time = soup.find('p', class_='jupiter22-c-author-byline__timestamp').text.split('—')[0].lstrip().strip()
pub_time = conversionTime(pub_time)
contentWithTag = soup.find('div', class_='nsdq-l-grid__item syndicated-article-body')
try:
contentWithTag.find('div', class_='jupiter22-c-tags jupiter22-c-tags-default').decompose()
except:
pass
try:
contentWithTag.find('div', class_='taboola-placeholder').decompose()
except:
pass
try:
divs_del = contentWithTag.find_all('div', class_='ads__inline')
for div_del in divs_del:
div_del.decompose()
except:
pass
try:
divs_del = contentWithTag.find_all('script')
for div_del in divs_del:
div_del.decompose()
except:
pass
content = contentWithTag.text
dic_news = {
'attachmentIds': '',
'author': data[2],
'content': content,
'contentWithTag': str(contentWithTag),
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': 'en',
'origin': '纳斯达克',
'publishDate': pub_time,
'sid': '1684032033495392257',
'sourceAddress': url, # 原文链接
'summary': '',
'title': data[1],
'type': 2,
'socialCreditCode': data[3],
'year': pub_time[:4]
}
return dic_news
# 页面B类型解析
def getDicB(data, soup):
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
url = data[0]
pub_time = soup.find('div', class_='timestamp').find('time').text
pub_time = pub_time.split(' ')[0] + pub_time.split(' ')[1] + pub_time.split(' ')[2]
pub_time = conversionTime(pub_time)
contentWithTag = soup.find('div', class_='body__content')
try:
divs_del = contentWithTag.find_all('div', class_='ads__inline')
for div_del in divs_del:
div_del.decompose()
except:
pass
try:
divs_del = contentWithTag.find_all('script')
for div_del in divs_del:
div_del.decompose()
except:
pass
content = contentWithTag.text
imgs = contentWithTag.find_all('img')
for img in imgs:
src = img.get('src')
src_ = 'https://www.nasdaq.com' + src
contentWithTag = str(contentWithTag).replace(src, src_)
dic_news = {
'attachmentIds': '',
'author': data[2],
'content': content,
'contentWithTag': str(contentWithTag),
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': 'en',
'origin': '纳斯达克',
'publishDate': pub_time,
'sid': '1684032033495392257',
'sourceAddress': url, # 原文链接
'summary': '',
'title': data[1],
'type': 2,
'socialCreditCode': data[3],
'year': pub_time[:4]
}
return dic_news
# 数据发送至Kafka
@retry(tries=3, delay=1)
def sendKafka(dic_news, start_time):
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("researchReportTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
dic_result = {
'success': 'ture',
'message': '操作成功',
'code': '200',
}
log.info(dic_result)
# 传输成功,写入日志中
state = 1
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(dic_news['socialCreditCode'], taskType, state, takeTime, dic_news['sourceAddress'], '')
# 数据保存入库,用于判重
@retry(tries=3, delay=1)
def insertMysql(social_code, link):
insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,create_time) values(%s,%s,%s,%s,now())'''
# 动态信息列表
list_info = [
social_code,
link,
'纳斯达克',
'2',
]
cursor.execute(insert_sql, tuple(list_info))
cnx.commit()
# 判断动态是否采集过
@retry(tries=3, delay=1)
def selectUrl(url, social_code):
sel_sql = '''select social_credit_code from brpa_source_article where source_address = %s and social_credit_code=%s and type='2' '''
cursor.execute(sel_sql, (url, social_code))
selects = cursor.fetchone()
return selects
def doJob():
while True:
social_code = ''
data_enterprise = getInfomation(social_code)
gpdm = data_enterprise[3]
social_code = data_enterprise[6]
# gpdm = 'GOOGL'
# social_code = 'ZZSN22080900000013'
start_time = time.time()
try:
total = getTotal(gpdm)
except:
log.error(f'{social_code}==={gpdm}===获取总数失败')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', f'{social_code}==={gpdm}===获取总数失败')
baseCore.rePutIntoR('NewsEnterprise:gnqy_socialCode', social_code)
continue
for offest in range(0, total + 1, 100):
try:
data_list = getDataList(gpdm, offest, social_code)
except:
log.error(f'{social_code}==={gpdm}===获取信息列表失败({offest}~{offest + 100}条)')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '',
f'{social_code}==={gpdm}===获取信息列表失败({offest}~{offest + 100}条)')
continue
# 只能获取前10000条数据
if data_list != []:
for data in data_list:
start_time = time.time()
url = data[0]
selects = selectUrl(url, social_code)
if selects:
log.info(f'{url}===已采集过')
# 全量使用
continue
# 增量使用
# break
try:
soup = getsoup(url)
try:
try:
dic_info = getDicA(data, soup)
except:
dic_info = getDicB(data, soup)
except:
log.error(f'{url}===正文解析失败')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', f'{url}===正文解析失败')
continue
try:
sendKafka(dic_info, start_time)
try:
insertMysql(social_code, url)
except:
log.error(f'{url}===数据入库失败')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', f'{url}===数据入库失败')
except Exception as e:
print(e)
log.error(f'{url}===发送kafka失败')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', f'{url}===发送kafka失败')
time.sleep(1)
except:
log.error(f'{url}===页面访问失败')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', f'{url}===页面访问失败')
break
else:
break
break
if __name__ == "__main__":
doJob()
baseCore.close()
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论