提交 80e7804c 作者: 薛凌堃

雅虎财经企业动态

上级 e34838cc
# 雅虎财经企业动态获取
# 雅虎财经企业动态获取
......@@ -5,6 +5,8 @@ import pymysql
from kafka import KafkaProducer
from selenium.webdriver.common.by import By
import sys
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
sys.path.append('D:/zzsn_spider/base')
import BaseCore
......@@ -13,6 +15,8 @@ import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
r = baseCore.r
taskType = '企业动态/雅虎财经'
smart =smart_extractor.SmartExtractor('cn')
......@@ -178,6 +182,10 @@ def scroll(xydm,name,gpdm):
break
last_url = last_url_
#采集失败的公众号 重新放入redis
def rePutIntoR(item):
r.rpush('NewsEnterprise:gwqy_socialCode', item)
if __name__ == "__main__":
path = r'D:\zzsn_spider\comData\cmd6\chromedriver.exe'
......@@ -185,6 +193,7 @@ if __name__ == "__main__":
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
cursor = cnx.cursor()
while True:
# 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息
social_code = baseCore.redicPullData('NewsEnterprise:gwqy_socialCode')
......@@ -214,66 +223,94 @@ if __name__ == "__main__":
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, '', exception)
continue
url = f"https://finance.yahoo.com/quote/{gpdm}/press-releases?p={gpdm}"
driver.get(url)
try:
news_div = driver.find_element(By.ID, 'summaryPressStream-0-Stream')
except Exception as e:
log.error(f"{name}--{gpdm}--没找到新闻元素")
exception = '没找到新闻元素'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, url, exception)
continue
try:
scroll(xydm,name,gpdm)
except Exception as e:
print(e)
log.error(f"{name}--{gpdm}--拖拽出现问题")
news_lis = news_div.find_elements(By.XPATH, "./ul/li")
log.info(f"{name}--{gpdm}--{len(news_lis)}条信息")
for i in range(0, len(news_lis)):
url = f"https://finance.yahoo.com/quote/{gpdm}/press-releases?p={gpdm}"
driver.get(url)
try:
a_ele = news_lis[i].find_element(By.XPATH, "./div[1]/div[1]/div[2]/h3[1]/a")
WebDriverWait(driver, 15).until(EC.visibility_of_element_located((By.ID, 'summaryPressStream-0-Stream')))
news_div = driver.find_element(By.ID, 'summaryPressStream-0-Stream')
news_div.find_element(By.TAG_NAME, 'a')
except Exception as e:
log.error(f"{name}--{gpdm}--{i}----a标签没找到")
exception = 'a标签没找到'
log.error(f"{name}--{gpdm}--没找到新闻元素")
exception = '没找到新闻元素'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, url, exception)
continue
news_url = a_ele.get_attribute("href").lstrip().strip().replace("'", "''")
if (news_url.startswith("https://finance.yahoo.com")):
pass
else:
continue
# 判断url是否已经存在
sel_sql = '''select social_credit_code from brpa_source_article where source_address = %s and social_credit_code=%s '''
cursor.execute(sel_sql, (news_url, xydm))
selects = cursor.fetchall()
if selects:
log.error(f"{name}--{gpdm}--网址已经存在----{news_url}")
exception = '网址已存在'
state = 0
try:
scroll(xydm,name,gpdm)
except Exception as e:
print(e)
log.error(f"{name}--{gpdm}--拖拽出现问题")
news_lis = news_div.find_elements(By.XPATH, "./ul/li")
log.info(f"{name}--{gpdm}--{len(news_lis)}条信息")
#标识符 判断脚本是否断开连接
flag = 0
for i in range(0, len(news_lis)):
try:
try:
a_ele = news_lis[i].find_element(By.XPATH, "./div[1]/div[1]/div[2]/h3[1]/a")
except:
a_ele = news_lis[i].find_element(By.XPATH, "./div[1]/div[1]/div[1]/h3[1]/a")
except Exception as e:
if news_lis[i].is_displayed():
log.error(f"{name}--{gpdm}--{i}----a标签没找到")
exception = 'a标签没找到'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, url, exception)
continue
else:
log.error(f"{name}--{gpdm}--{i}----与网站断开连接")
#todo:重新放入redis
rePutIntoR(xydm)
time.sleep(300)
flag = 1
break
news_url = a_ele.get_attribute("href").lstrip().strip().replace("'", "''")
if (news_url.startswith("https://finance.yahoo.com")):
pass
else:
continue
# 判断url是否已经存在
sel_sql = '''select social_credit_code from brpa_source_article where source_address = %s and social_credit_code=%s '''
cursor.execute(sel_sql, (news_url, xydm))
selects = cursor.fetchall()
if selects:
log.error(f"{name}--{gpdm}--网址已经存在----{news_url}")
exception = '网址已存在'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, news_url, exception)
break
title = a_ele.text.lstrip().strip().replace("'", "''")
exception = getZx(xydm, news_url, title, cnx, path)
if exception == '':
state = 1
else:
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, news_url, exception)
break
title = a_ele.text.lstrip().strip().replace("'", "''")
exception = getZx(xydm, news_url, title, cnx, path)
if exception == '':
state = 1
else:
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, news_url, exception)
log.info(f"{name}--{gpdm}--{i}----{news_url}")
log.info(f"{name}--{gpdm}--{i}----{news_url}")
log.info(f"{name}--{gpdm}--企业整体,耗时{baseCore.getTimeCost(start_time, time.time())}")
if flag==1:
continue
log.info(f"{name}--{gpdm}--企业整体,耗时{baseCore.getTimeCost(start_time, time.time())}")
# 信息采集完成后将该企业的采集次数更新
runType = 'NewsRunCount'
count += 1
baseCore.updateRun(social_code, runType, count)
# 信息采集完成后将该企业的采集次数更新
runType = 'NewsRunCount'
count += 1
baseCore.updateRun(social_code, runType, count)
except:
rePutIntoR(xydm)
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, '', '远程主机强迫关闭了一个现有的连接。')
log.info(f"-------{name}--{gpdm}---'远程主机强迫关闭了一个现有的连接。'--------")
log.info('===========连接已被关闭========等待重新连接===========')
time.sleep(1200)
continue
cursor.close()
cnx.close()
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论