提交 3646fd0b 作者: 薛凌堃

Merge remote-tracking branch 'origin/master'

import json
import json
......@@ -11,18 +11,19 @@ from NewsYahoo import news
from base.BaseCore import BaseCore
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
cursor = cnx.cursor()
taskType = '企业基本信息/雅虎财经'
baseCore = BaseCore()
log= baseCore.getLogger()
log = baseCore.getLogger()
headers = {
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'accept-encoding': 'gzip, deflate, br',
'accept-language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7',
'cache-control': 'max-age=0',
#'cookie': 'maex=%7B%22v2%22%3A%7B%7D%7D; GUC=AQEBBwFjY49jkEIa8gQo&s=AQAAABw20C7P&g=Y2JIFQ; A1=d=AQABBBIpnmICEOnPTXZVmK6DESXgxq3niTMFEgEBBwGPY2OQYysNb2UB_eMBAAcIEimeYq3niTM&S=AQAAAobGawhriFKqJdu9-rSz9nc; A3=d=AQABBBIpnmICEOnPTXZVmK6DESXgxq3niTMFEgEBBwGPY2OQYysNb2UB_eMBAAcIEimeYq3niTM&S=AQAAAobGawhriFKqJdu9-rSz9nc; A1S=d=AQABBBIpnmICEOnPTXZVmK6DESXgxq3niTMFEgEBBwGPY2OQYysNb2UB_eMBAAcIEimeYq3niTM&S=AQAAAobGawhriFKqJdu9-rSz9nc&j=WORLD; PRF=t%3D6954.T%252BTEL%252BSOLB.BR%252BSTM%252BEMR%252BGT%252BAMD%252BSYM.DE%252BPEMEX%252BSGO.PA%252BLRLCF%252BSYNH%252B001040.KS; cmp=t=1669714927&j=0&u=1---',
# 'cookie': 'maex=%7B%22v2%22%3A%7B%7D%7D; GUC=AQEBBwFjY49jkEIa8gQo&s=AQAAABw20C7P&g=Y2JIFQ; A1=d=AQABBBIpnmICEOnPTXZVmK6DESXgxq3niTMFEgEBBwGPY2OQYysNb2UB_eMBAAcIEimeYq3niTM&S=AQAAAobGawhriFKqJdu9-rSz9nc; A3=d=AQABBBIpnmICEOnPTXZVmK6DESXgxq3niTMFEgEBBwGPY2OQYysNb2UB_eMBAAcIEimeYq3niTM&S=AQAAAobGawhriFKqJdu9-rSz9nc; A1S=d=AQABBBIpnmICEOnPTXZVmK6DESXgxq3niTMFEgEBBwGPY2OQYysNb2UB_eMBAAcIEimeYq3niTM&S=AQAAAobGawhriFKqJdu9-rSz9nc&j=WORLD; PRF=t%3D6954.T%252BTEL%252BSOLB.BR%252BSTM%252BEMR%252BGT%252BAMD%252BSYM.DE%252BPEMEX%252BSGO.PA%252BLRLCF%252BSYNH%252B001040.KS; cmp=t=1669714927&j=0&u=1---',
'sec-ch-ua': '"Chromium";v="106", "Google Chrome";v="106", "Not;A=Brand";v="99"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': "Windows",
......@@ -33,42 +34,22 @@ headers = {
'upgrade-insecure-requests': '1',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36'
}
# 获取股票代码
# def getGpdm(name):
# start=time.time()
# gpdm=""
# try:
#
# url = f'https://query1.finance.yahoo.com/v1/finance/search?q={name}&lang=en-US&region=US&quotesCount=6&newsCount=2&listsCount=2&enableFuzzyQuery=false&quotesQueryId=tss_match_phrase_query&multiQuoteQueryId=multi_quote_single_token_query&newsQueryId=news_cie_vespa&enableCb=true&enableNavLinks=true&enableEnhancedTrivialQuery=true&enableResearchReports=true&enableCulturalAssets=true&enableLogoUrl=true&researchReportsCount=2'
# response = requests.get(url, headers=headers, verify=False,timeout=(3.05, 3))
# time.sleep(3)
# except:
# return gpdm
# if (response.status_code == 200):
# pass
# else:
# log.error(f"{name}------获取股票接口返回失败:{response.status_code}")
# return gpdm
# retJson = json.loads(response.content.decode('utf-8'))
# try:
# gpdm =retJson['quotes'][0]['symbol']
# except:
# log.error(f"{name}---获取股票代码异常")
# return gpdm
# log.info(f"获取股票代码--{name},耗时{baseCore.getTimeCost(start, time.time())}")
#
# return gpdm
# 根据股票代码 获取企业基本信息 高管信息
def getInfo(gpdm,xydm):
gpdm_ = gpdm
if 'HK' in gpdm_:
gpdm_ = gpdm_
start = time.time()
retData={}
def getInfo(name,enname,gpdm, xydm, start):
if 'HK' in str(gpdm):
tmp_g = str(gpdm).split('.')[0]
if len(tmp_g) == 5:
gpdm_ = str(gpdm)[1:]
else:
pass
else:
gpdm_ = gpdm
retData = {}
retData['base_info'] = {
'公司名称': '',
'英文名':'',
'公司名称': name,
'英文名': enname,
'信用代码': xydm,
'股票代码': gpdm,
'地址': '',
......@@ -79,12 +60,12 @@ def getInfo(gpdm,xydm):
'员工人数': '',
'公司简介': ''
}
retData['people_info']=[]
retData['people_info'] = []
# https://finance.yahoo.com/quote/VOW3.DE/profile?p=VOW3.DE
url = f'https://finance.yahoo.com/quote/{gpdm}/profile?p={gpdm}'
url = f'https://finance.yahoo.com/quote/{gpdm_}/profile?p={gpdm_}'
time.sleep(3)
for i in range(0,3):
for i in range(0, 3):
try:
response = requests.get(url, headers=headers, verify=False)
time.sleep(1)
......@@ -92,14 +73,20 @@ def getInfo(gpdm,xydm):
break
else:
log.error(f"{gpdm}---第{i}次---获取基本信息接口返回失败:{response.status_code}")
except :
except:
continue
if (response.status_code == 200):
pass
else:
log.error(f"{gpdm}------获取基本信息接口重试后依然失败失败:{response.status_code}")
return retData
exeception = '获取基本信息接口返回失败'
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, url, exeception)
return [state,retData]
state = 1
soup = BeautifulSoup(response.content, 'html.parser')
page = soup.find('div', {'id': 'Col1-0-Profile-Proxy'})
try:
......@@ -135,8 +122,8 @@ def getInfo(gpdm,xydm):
except:
com_jianjie = ''
dic_com_info = {
'公司名称':'',
'英文名':'',
'公司名称': name,
'英文名': enname,
'信用代码': xydm,
'股票代码': gpdm,
'地址': com_address,
......@@ -147,8 +134,8 @@ def getInfo(gpdm,xydm):
'员工人数': com_people,
'公司简介': com_jianjie
}
retData['base_info']=dic_com_info
#高管信息
retData['base_info'] = dic_com_info
# 高管信息
retPeople = []
try:
list_people = page.find('table', {'class': 'W(100%)'}).find_all('tr')[1:]
......@@ -177,8 +164,8 @@ def getInfo(gpdm,xydm):
except:
p_year = ''
if(p_zhiwu=="N/A"):
p_zhiwu=""
if (p_zhiwu == "N/A"):
p_zhiwu = ""
if (p_money == "N/A"):
p_money = ""
if (p_xingshi == "N/A"):
......@@ -186,7 +173,7 @@ def getInfo(gpdm,xydm):
if (p_year == "N/A"):
p_year = ""
dic_main_people = {
'公司名称': '',
'公司名称': name,
'股票代码': gpdm,
'信用代码': xydm,
'姓名': p_name,
......@@ -197,46 +184,20 @@ def getInfo(gpdm,xydm):
}
retPeople.append(dic_main_people)
retData['people_info'] = retPeople
df_retData = pd.DataFrame(retPeople)
# df_a = pd.DataFrame(retData['base_info'])
df_retData.to_excel('./data/采集高管结果1.xlsx',index=False)
log.info(f"获取基本信息--{gpdm},耗时{baseCore.getTimeCost(start, time.time())}")
return retData
return [state,retData]
# def Nongpdm(xydm,name):
# start = time.time()
# company_dict = {
# 'name': name, # 企业名称
# 'shortName': '', # 企业简称
# 'socialCreditCode': xydm, # 统一社会信用代码
# 'officialPhone': '', # 电话
# 'officialUrl': '', # 官网
# 'briefInfo': '', # 简介
# 'industry': '', # 所属行业
# 'englishName': name, # 英文名
# 'address': '', # 地址
# 'status': 0, # 状态
# }
# # print(company_dict)
# producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 0, 2))
# kafka_result = producer.send("regionInfo", json.dumps(company_dict, ensure_ascii=False).encode('utf8'))
# kafka_result.get(timeout=10)
# # log.info(f"保存基本信息--{info['base_info']['信用代码']},耗时{baseCore.getTimeCost(start, time.time())}")
# log.info(f"保存基本信息--{company_dict['name']},耗时{baseCore.getTimeCost(start, time.time())}")
# return company_dict
#
#保存基本信息
def saveBaseInfo(info):
start = time.time()
#基本信息发送到kafka
# 保存基本信息
def saveBaseInfo(info,start):
# 基本信息发送到kafka
company_dict = {
'name': '', # 企业名称
'name': info['base_info']['公司名称'], # 企业名称
'shortName': '', # 企业简称
'socialCreditCode': info['base_info']['信用代码'], # 统一社会信用代码
'officialPhone': info['base_info']['电话'], # 电话
'officialUrl': info['base_info']['公司网站'], # 官网
'briefInfo': info['base_info']['公司简介'], # 简介
'briefInfo': info['base_info']['公司简介'], # 简介
'industry': info['base_info']['行业'], # 所属行业
'englishName': info['base_info']['英文名'], # 英文名
'address': info['base_info']['地址'], # 地址
......@@ -249,27 +210,27 @@ def saveBaseInfo(info):
log.info(f"保存基本信息--{info['base_info']['信用代码']},耗时{baseCore.getTimeCost(start, time.time())}")
log.info(f"保存基本信息--{company_dict['name']},耗时{baseCore.getTimeCost(start, time.time())}")
#保存高管信息
def savePeopleInfo(info):
start = time.time()
# 保存高管信息
def savePeopleInfo(info,start):
# 高管信息调用接口
list_people = info['people_info']
list_one_info = []
for i in range(0,len(list_people)):
for i in range(0, len(list_people)):
dic_json = {
"socialCreditCode": list_people[i]['信用代码'],
"name": list_people[i]['姓名'],
"sex": '',
"education": '',
"position": list_people[i]['职务'],
"salary": list_people[i]['薪资'],
"salary": list_people[i]['薪资'],
"birthYear": list_people[i]['出生年份'],
"shareNum": '',
"shareRatio": '',
"benefitShare": '',
"currentTerm": '',
"personInfo": '',
"sort": str(i+1)
"sort": str(i + 1)
}
list_one_info.append(dic_json)
json_updata = json.dumps(list_one_info)
......@@ -277,11 +238,12 @@ def savePeopleInfo(info):
if json_updata == '[]':
pass
else:
for i in range(0,3):
response = requests.post('http://114.115.236.206:9988/datapull/sync/executive',data=json_updata,timeout=300, verify=False)
for i in range(0, 3):
response = requests.post('http://114.115.236.206:9988/datapull/sync/executive', data=json_updata,
timeout=300, verify=False)
if (response.status_code == 200):
retJson = json.loads(response.content.decode('utf-8'))
if(retJson['success'] or retJson['success']=='true'):
if (retJson['success'] or retJson['success'] == 'true'):
break
if (response.status_code == 200):
......@@ -290,123 +252,91 @@ def savePeopleInfo(info):
pass
else:
log.error("保存高管接口失败---{retJson}")
exception = '保存高管接口失败'
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(dic_json['socialCreditCode'], taskType, state, takeTime, '', exception)
return state
else:
log.error("保存高管接口失败---{response.status_code}")
exception = '保存高管接口失败'
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(dic_json['socialCreditCode'], taskType, state, takeTime, '', exception)
return state
state = 1
log.info(f"保存高管信息--{info['base_info']['信用代码']},耗时{baseCore.getTimeCost(start, time.time())}")
return state
#根据名字获取股票代码 必须是英文名字 如果提供的数据有股票代码 则跳过此步骤
#gpdm=getGpdm("Volkswagen")
#生成一个新的信用代码 如果提供的原始数据有信用代码 则不能生成新的信用代码
#xydm=baseCore.getNextXydm()
# xydm='ZZSN230710201009006'
# retData=getInfo("Volkswagen","VOW3.DE",xydm)
# saveBaseInfo(retData)
# savePeopleInfo(retData)
# print(retData)
#采集工作
# 采集工作
def beginWork():
#给定excel名单 保存股票代码
okCount=0
errorCount=0
df_all = pd.read_excel('./data/福布斯美国企业股票代码.xlsx',dtype=str,keep_default_na=False)
a = []
b = []
# df_all = pd.read_excel('../../data/23年500强企业新榜股票代码.xlsx',dtype=str, keep_default_na=False)
for i in range(len(df_all)):
# # name = df_all['中文名称'][i]
# # rank = df_all['排名'][i]
# # officialUrl = df_all['企业官网'][i]
# # industry = df_all['行业'][i]
# # englishName = df_all['英文名称'][i]
# # address = df_all['企业总部地址'][i]
#
# xydm_name = df_all_xydm['名称'][i]
# # print(xydm_name)
# for j in range(len(df_all)):
# name = df_all['中文名称'][j]
# if name == xydm_name:
# print(name,xydm_name)
# xydm = df_all_xydm['信用代码'][i]
# if i>=22:
# pass
# else:
# continue
# log.info(f"{i}----------开始")
# # country = df_all['企业所属国家'][i]
# # if country=='中国':
# # continue
# # else:
# # log.info(f"{i}----------为国外企业 继续")
name = df_all['name'][i]
ename = df_all['companyName'][i]
gpdm= df_all['code'][i]
xydm = df_all['xydm'][i]
# if gpdm == '':
# Nongpdm(xydm,name)
# continue
# # print(f'名字:{name},股票代码:{gpdm},信用代码:{xydm}')
# # #没有股票代码,就保存榜单中的数据
# if gpdm == '':
# Nongpdm(gpdm,name)
# continue
# # xydm = baseCore.getNextXydm()
# # # Nongpdm(xydm,name,officialUrl,industry,englishName,address)
# # else:
# # log.info(f"{j}----------为股票代码不为空 继续")
# # pass
# # enname = df_all['英文名称'][j]
# # if enname != '':
# # pass
# # else:
# # log.info(f"{j}----------英文名字为空 跳过")
# # continue
# # # log.info(f"{i}----------开始股票代码")
# # # gpdm = getGpdm(enname)
# # # xydm=baseCore.getNextXydm()
# #
while True:
social_code = baseCore.redicPullData('')
if not social_code:
time.sleep(20)
continue
if social_code == 'None':
time.sleep(20)
continue
# 数据库中获取基本信息
data = baseCore.getInfomation(social_code)
name = data[1]
enname = data[5]
gpdm = data[3]
xydm = data[2]
# 获取该企业对应项目的采集次数
count = data[13]
start_time = time.time()
# 股票代码为空跳过
if (gpdm == ''):
log.error(f"{name}--股票代码为空 跳过")
exception = '股票代码为空'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, '', exception)
continue
try:
retData = getInfo(name,ename,gpdm,xydm)
saveBaseInfo(retData)
savePeopleInfo(retData)
#采集企业动态
news(i,gpdm,xydm)
okCount += 1
b.append([name,gpdm,xydm])
print(name,'.......采集成功')
except:
a.append([name,gpdm,xydm])
errorCount += 1
print(name,'......采集失败')
print(f'成功{okCount}个,失败{errorCount}个')
df_a = pd.DataFrame(np.array(a))
df_a.columns = ['name','gpdm','xydm']
df_a.to_excel('./data/没有采集到.xlsx',index=False)
df_b = pd.DataFrame(np.array(b))
df_b.columns = ['name','gpdm','xydm']
df_b.to_excel('./data/采集到.xlsx',index=False)
#
# if gpdm!='':
# okCount=okCount+1
# else:
# errorCount=errorCount+1
# log.info(f"{i}-------成功{okCount}--失败-{errorCount}")
# # if gpdm == '':
# # continue
# # else:
# # pass
# # df_all['股票代码'][j]=gpdm
# # else:
# # continue
# # if (i % 10 == 0):
# # df_all.to_excel(r'./data/23年500强企业新上榜_ret22.xlsx', sheet_name='Sheet1', index=False, header=True)
# # df_all.to_excel(r'./data/23年500强企业新榜_ret22.xlsx', sheet_name='Sheet1', index=False, header=True)
# # 释放资源
retData = getInfo(name,enname,gpdm, xydm, start_time)
# 基本信息采集成功 进行数据入库,否则不入库
if retData[0] == 1:
# 企业基本信息入库
try:
saveBaseInfo(retData[1],start_time)
except:
log.error(f'{name}....企业基本信息Kafka操作失败')
exception = 'Kafka操作失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, '', exception)
# 企业高管信息入库
state = savePeopleInfo(retData[1],start_time)
# 只有企业高管信息和企业基本信息都采集到,该企业才算采集成功
if state == 1:
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, '', '')
else:
pass
else:
pass
except Exception as e:
# 若出现尚未发现的错误,则保存错误信息以及出错位置
ee = e.__traceback__.tb_lineno
log.error(f'{name}...{xydm}...{gpdm}.....数据采集失败,原因:{ee}行 {e}')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, '', f'数据采集失败,原因:{ee}行 {e}')
# 企业数据采集完成,采集次数加一
count += 1
runType = 'BaseInfoRunCount'
baseCore.updateRun(social_code,runType,count)
# 释放资源
baseCore.close()
if __name__ == '__main__':
#gpdm = getGpdm("Volkswagen")
#print(gpdm)
beginWork()
\ No newline at end of file
beginWork()
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论