提交 8f9f0213 作者: 丁双波

雅虎采集脚本提交

上级 42cc2e60
雅虎财经 国外上市企业信息采集
# 雅虎财经企业动态获取
# 雅虎财经企业动态获取
import time
import pandas as pd
import pymysql
import requests
from bs4 import BeautifulSoup
from selenium.webdriver.common.by import By
from selenium import webdriver
from base.BaseCore import BaseCore
baseCore = BaseCore()
log= BaseCore.getLogger()
#获取资讯详情
def getZx(xydm,url,title,cnx):
start_time_content= time.time()
try:
chrome_options_content = webdriver.ChromeOptions()
chrome_options_content.add_argument('--disable-gpu')
chrome_options_content.add_argument('--ignore-certificate-errors')
chrome_options_content.add_experimental_option('excludeSwitches', ['enable-automation'])
chrome_options_content.add_argument("--disable-blink-features=AutomationControlled")
chrome_options_content.add_argument("--start-maximized")
prefs_content = {'profile.managed_default_content_settings.images': 2}
chrome_options_content.add_experimental_option('prefs', prefs_content)
chrome_options_content.add_argument('--headless')
executable_path = r'E:\chromedriver_win32\chromedriver.exe'
driverContent = webdriver.Chrome(options=chrome_options_content, executable_path=executable_path)
driverContent.get(url)
try:
clickButton = driverContent.find_element(By.CLASS_NAME,"collapse-button")
clickButton.click()
except Exception as e:
pass
time.sleep(0.5)
authorElement = driverContent.find_element(By.CLASS_NAME,"caas-author-byline-collapse")
timeElement = driverContent.find_element(By.CLASS_NAME,"caas-attr-time-style").find_element(By.TAG_NAME,"time")
contentElement = driverContent.find_element(By.CLASS_NAME,"caas-body")
author = authorElement.text.lstrip().strip().replace("'","''")
pub_time = timeElement.get_attribute("datetime").lstrip().strip().replace("'","''").replace("T"," ")
pub_time = pub_time[0:19]
content = contentElement.text.lstrip().strip().replace("'","''")
driverContent.close()
# 动态信息列表
list_info = [
xydm,
title,
'',
content,
pub_time,
url,
'雅虎财经',
author,
'2',
'zh'
]
with cnx.cursor() as cursor:
try:
insert_sql = '''insert into brpa_source_article(social_credit_code,title,summary,content,publish_date,source_address,origin,author,type,lang) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
cursor.execute(insert_sql, tuple(list_info))
cnx.commit()
except Exception as e1:
log.error("保存数据库失败")
log.info(f"文章耗时,耗时{baseCore.getTimeCost(start_time_content,time.time())}")
except Exception as e:
log.error("获取正文失败")
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--ignore-certificate-errors')
chrome_options.add_experimental_option('excludeSwitches', ['enable-automation'])
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
chrome_options.add_argument("--start-maximized")
prefs = {'profile.managed_default_content_settings.images': 2}
chrome_options.add_experimental_option('prefs',prefs)
chrome_options.add_argument('--headless')
executable_path = r'E:\chromedriver_win32\chromedriver.exe'
driver = webdriver.Chrome(options=chrome_options, executable_path=executable_path)
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
def scroll(driver):
for i in range(0,30):
#js = "window.scrollTo(0,document.body.scrollHeight)"
js = "var q=document.documentElement.scrollTop=100000"
driver.execute_script(js)
time.sleep(0.1)
#读取excel数据
df_all = pd.read_excel(r'.\data\国外企业.xlsx', sheet_name=0, keep_default_na=False)
for num in range(718,len(df_all)):
start_time = time.time()
country = df_all['国别'][num]
if(country!='国外'):
continue
enname=df_all['英文名称'][num]
gpdm = df_all['股票票代码'][num]
xydm = df_all['信用代码'][num]
if(gpdm==''):
log.error(f"{num}--{gpdm}--股票代码为空 跳过")
continue
if (xydm == ''):
log.error(f"{num}--{gpdm}--信用代码为空 跳过")
continue
count = int(df_all['企业动态数量(7.15)'][num])
# if(count>0):
# log.error(f"{num}--{gpdm}--动态大于0 跳过")
# continue
#https://finance.yahoo.com/quote/GOOG/press-releases?p=GOOG
url=f"https://finance.yahoo.com/quote/{gpdm}/press-releases?p={gpdm}"
driver.get(url)
scroll(driver)
# if True:
# continue
try:
news_div = driver.find_element(By.ID, 'summaryPressStream-0-Stream')
except Exception as e:
log.error(f"{num}--{gpdm}--没找到新闻元素")
continue
news_lis = news_div.find_elements(By.XPATH,"./ul/li")
log.info(f"{num}--{gpdm}--{len(news_lis)}条信息")
for i in range(0,len(news_lis)):
try:
a_ele= news_lis[i].find_element(By.XPATH,"./div[1]/div[1]/div[2]/h3[1]/a")
except Exception :
log.error(f"{num}--{gpdm}--{i}----a标签没找到")
continue
news_url = a_ele.get_attribute("href").lstrip().strip().replace("'","''")
if(news_url.startswith("https://finance.yahoo.com")):
pass
else:
continue
#判断url是否已经存在
with cnx.cursor() as cursor:
sel_sql = '''select social_credit_code from brpa_source_article where source_address = %s and social_credit_code=%s '''
cursor.execute(sel_sql, (news_url,xydm))
selects = cursor.fetchall()
if selects:
log.error(f"{num}--{gpdm}--网址已经存在----{news_url}")
continue
title = a_ele.text.lstrip().strip().replace("'","''")
getZx(xydm,news_url,title,cnx)
log.info(f"{num}--{gpdm}--{i}----{news_url}----------{news_url}")
log.info(f"{num}--{gpdm}--企业整体,耗时{baseCore.getTimeCost(start_time,time.time())}")
#释放资源
baseCore.close()
\ No newline at end of file
import json
import json
import time
import pandas as pd
import requests
from bs4 import BeautifulSoup
from kafka import KafkaProducer
from base.BaseCore import BaseCore
baseCore = BaseCore()
log= BaseCore.getLogger()
headers = {
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'accept-encoding': 'gzip, deflate, br',
'accept-language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7',
'cache-control': 'max-age=0',
#'cookie': 'maex=%7B%22v2%22%3A%7B%7D%7D; GUC=AQEBBwFjY49jkEIa8gQo&s=AQAAABw20C7P&g=Y2JIFQ; A1=d=AQABBBIpnmICEOnPTXZVmK6DESXgxq3niTMFEgEBBwGPY2OQYysNb2UB_eMBAAcIEimeYq3niTM&S=AQAAAobGawhriFKqJdu9-rSz9nc; A3=d=AQABBBIpnmICEOnPTXZVmK6DESXgxq3niTMFEgEBBwGPY2OQYysNb2UB_eMBAAcIEimeYq3niTM&S=AQAAAobGawhriFKqJdu9-rSz9nc; A1S=d=AQABBBIpnmICEOnPTXZVmK6DESXgxq3niTMFEgEBBwGPY2OQYysNb2UB_eMBAAcIEimeYq3niTM&S=AQAAAobGawhriFKqJdu9-rSz9nc&j=WORLD; PRF=t%3D6954.T%252BTEL%252BSOLB.BR%252BSTM%252BEMR%252BGT%252BAMD%252BSYM.DE%252BPEMEX%252BSGO.PA%252BLRLCF%252BSYNH%252B001040.KS; cmp=t=1669714927&j=0&u=1---',
'sec-ch-ua': '"Chromium";v="106", "Google Chrome";v="106", "Not;A=Brand";v="99"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': "Windows",
'sec-fetch-dest': 'document',
'sec-fetch-mode': 'navigate',
'sec-fetch-site': 'same-origin',
'sec-fetch-user': '?1',
'upgrade-insecure-requests': '1',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36'
}
# 获取股票代码
def getGpdm(name):
start=time.time()
gpdm=""
try:
url = f'https://query1.finance.yahoo.com/v1/finance/search?q={name}&lang=en-US&region=US&quotesCount=6&newsCount=2&listsCount=2&enableFuzzyQuery=false&quotesQueryId=tss_match_phrase_query&multiQuoteQueryId=multi_quote_single_token_query&newsQueryId=news_cie_vespa&enableCb=true&enableNavLinks=true&enableEnhancedTrivialQuery=true&enableResearchReports=true&enableCulturalAssets=true&enableLogoUrl=true&researchReportsCount=2'
response = requests.get(url, headers=headers, verify=False,timeout=(3.05, 3))
time.sleep(3)
except:
return gpdm
if (response.status_code == 200):
pass
else:
log.error(f"{name}------获取股票接口返回失败:{response.status_code}")
return gpdm
retJson = json.loads(response.content.decode('utf-8'))
try:
gpdm =retJson['quotes'][0]['symbol']
except:
log.error(f"{name}---获取股票代码异常")
return gpdm
log.info(f"获取股票代码--{name},耗时{baseCore.getTimeCost(start, time.time())}")
return gpdm
# 根据股票代码 获取企业基本信息 高管信息
def getInfo(name,gpdm,xydm):
start = time.time()
retData={}
retData['base_info'] = {
'公司名称': name,
'信用代码': xydm,
'股票代码': gpdm,
'地址': '',
'电话': '',
'公司网站': '',
'部门': '',
'行业': '',
'员工人数': '',
'公司简介': ''
}
retData['people_info']=[]
# https://finance.yahoo.com/quote/VOW3.DE/profile?p=VOW3.DE
url = f'https://finance.yahoo.com/quote/{gpdm}/profile?p={gpdm}'
time.sleep(3)
for i in range(0,3):
try:
response = requests.get(url, headers=headers, verify=False)
time.sleep(1)
if (response.status_code == 200):
break
else:
log.error(f"{gpdm}---第{i}次---获取基本信息接口返回失败:{response.status_code}")
except :
continue
if (response.status_code == 200):
pass
else:
log.error(f"{gpdm}------获取基本信息接口重试后依然失败失败:{response.status_code}")
return retData
soup = BeautifulSoup(response.content, 'html.parser')
page = soup.find('div', {'id': 'Col1-0-Profile-Proxy'})
try:
com_info = page.find('div', {'class': 'Mb(25px)'})
except:
com_info = ''
try:
com_phone = com_info.find_all('p')[0].find('a').text
except:
com_phone = ''
try:
com_url = com_info.find_all('p')[0].find('a', {'target': '_blank'}).text
except:
com_url = ''
try:
com_address = com_info.find_all('p')[0].text.replace(com_phone, '').replace(com_url, '')
except:
com_address = ''
try:
com_bumen = com_info.find_all('p')[1].find_all('span', {'class': 'Fw(600)'})[0].text
except:
com_bumen = ''
try:
com_hangye = com_info.find_all('p')[1].find_all('span', {'class': 'Fw(600)'})[1].text
except:
com_hangye = ''
try:
com_people = com_info.find_all('p')[1].find_all('span', {'class': 'Fw(600)'})[2].text
except:
com_people = ''
try:
com_jianjie = page.find('p', {'class': 'Mt(15px) Lh(1.6)'}).text
except:
com_jianjie = ''
dic_com_info = {
'公司名称':name,
'信用代码': xydm,
'股票代码': gpdm,
'地址': com_address,
'电话': com_phone,
'公司网站': com_url,
'部门': com_bumen,
'行业': com_hangye,
'员工人数': com_people,
'公司简介': com_jianjie
}
retData['base_info']=dic_com_info
#高管信息
retPeople = []
try:
list_people = page.find('table', {'class': 'W(100%)'}).find_all('tr')[1:]
except:
list_people = []
for one_people in list_people:
try:
p_name = one_people.find_all('td')[0].text
except:
p_name = ''
continue
try:
p_zhiwu = one_people.find_all('td')[1].text
except:
p_zhiwu = ''
try:
p_money = one_people.find_all('td')[2].text
except:
p_money = ''
try:
p_xingshi = one_people.find_all('td')[3].text
except:
p_xingshi = ''
try:
p_year = one_people.find_all('td')[4].text
except:
p_year = ''
if(p_zhiwu=="N/A"):
p_zhiwu=""
if (p_money == "N/A"):
p_money = ""
if (p_xingshi == "N/A"):
p_xingshi = ""
if (p_year == "N/A"):
p_year = ""
dic_main_people = {
'公司名称': name,
'股票代码': gpdm,
'信用代码': xydm,
'姓名': p_name,
'职务': p_zhiwu,
'薪资': p_money,
'行使': p_xingshi,
'出生年份': p_year
}
retPeople.append(dic_main_people)
retData['people_info'] = retPeople
log.info(f"获取基本信息--{gpdm},耗时{baseCore.getTimeCost(start, time.time())}")
return retData
#保存基本信息
def saveBaseInfo(info):
start = time.time()
#基本信息发送到kafka
company_dict = {
'name': info['base_info']['公司名称'], # 企业名称
'shortName': info['base_info']['公司名称'], # 企业简称
'socialCreditCode': info['base_info']['信用代码'], # 统一社会信用代码
'officialPhone': info['base_info']['电话'], # 电话
'officialUrl': info['base_info']['公司网站'], # 官网
'briefInfo': info['base_info']['公司简介'], # 简介
'industry': info['base_info']['行业'], # 所属行业
'englishName': info['base_info']['公司名称'], # 英文名
'address': info['base_info']['地址'], # 地址
'status': 0, # 状态
}
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 0, 2))
kafka_result = producer.send("regionInfo", json.dumps(company_dict, ensure_ascii=False).encode('utf8'))
kafka_result.get(timeout=10)
log.info(f"保存基本信息--{info['base_info']['信用代码']},耗时{baseCore.getTimeCost(start, time.time())}")
#保存高管信息
def savePeopleInfo(info):
start = time.time()
# 高管信息调用接口
list_people = info['people_info']
list_one_info = []
for i in range(0,len(list_people)):
dic_json = {
"socialCreditCode": list_people[i]['信用代码'],
"name": list_people[i]['姓名'],
"sex": '',
"education": '',
"position": list_people[i]['职务'],
"salary": list_people[i]['薪资'],
"birthYear": list_people[i]['出生年份'],
"shareNum": '',
"shareRatio": '',
"benefitShare": '',
"currentTerm": '',
"personInfo": '',
"sort": str(i+1)
}
list_one_info.append(dic_json)
json_updata = json.dumps(list_one_info)
if json_updata == '[]':
pass
else:
for i in range(0,3):
response = requests.post('http://114.115.236.206:9988/datapull/sync/executive',data=json_updata,timeout=300, verify=False)
if (response.status_code == 200):
retJson = json.loads(response.content.decode('utf-8'))
if(retJson['success'] or retJson['success']=='true'):
break
if (response.status_code == 200):
retJson = json.loads(response.content.decode('utf-8'))
if (retJson['success'] or retJson['success'] == 'true'):
pass
else:
log.error("保存高管接口失败---{retJson}")
else:
log.error("保存高管接口失败---{response.status_code}")
log.info(f"保存高管信息--{info['base_info']['信用代码']},耗时{baseCore.getTimeCost(start, time.time())}")
#根据名字获取股票代码 必须是英文名字 如果提供的数据有股票代码 则跳过此步骤
#gpdm=getGpdm("Volkswagen")
#生成一个新的信用代码 如果提供的原始数据有信用代码 则不能生成新的信用代码
#xydm=baseCore.getNextXydm()
# xydm='ZZSN230710201009006'
# retData=getInfo("Volkswagen","VOW3.DE",xydm)
# saveBaseInfo(retData)
# savePeopleInfo(retData)
# print(retData)
#采集工作
def beginWork():
#给定excel名单 保存股票代码
okCount=0
errorCount=0
df_all = pd.read_excel('./data/96-22的500强企业清单.xlsx', dtype=str, keep_default_na=False)
for i in range(300, len(df_all)):
log.info(f"{i}----------开始")
country = df_all['国内外'][i]
if country=='国外':
pass
else:
log.info(f"{i}----------为国内企业 跳过")
continue
gpdm = df_all['股票代码'][i]
if gpdm == '':
pass
else:
log.info(f"{i}----------为股票代码不为空 跳过")
continue
enname = df_all['英文名称'][i]
if enname != '':
pass
else:
log.info(f"{i}----------英文名字为空 跳过")
continue
log.info(f"{i}----------开始股票代码")
gpdm = getGpdm(enname)
if gpdm!='':
okCount=okCount+1
else:
errorCount=errorCount+1
log.info(f"{i}-------成功{okCount}--失败-{errorCount}")
if gpdm == '':
continue
else:
pass
df_all['股票代码'][i]=gpdm
if (i % 10 == 0):
df_all.to_excel(r'.\data\96-22的500强企业清单_ret.xlsx', sheet_name='Sheet1', index=False, header=True)
df_all.to_excel(r'.\data\96-22的500强企业清单_ret.xlsx', sheet_name='Sheet1', index=False, header=True)
# 释放资源
baseCore.close()
if __name__ == '__main__':
#gpdm = getGpdm("Volkswagen")
#print(gpdm)
beginWork()
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论