提交 51c06fd7 作者: 薛凌堃

Merge remote-tracking branch 'origin/master'

import pandas as pd
import numpy as np
import pymysql
import time
import requests
import certifi
from bs4 import BeautifulSoup
from base import BaseCore
baseCore = BaseCore.BaseCore()
log= baseCore.getLogger()
# cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
# cursor = cnx.cursor()
cnx = pymysql.connect(host='114.115.159.144', user='root', password='zzsn9988', db='caiji', charset='utf8mb4')
curosr = cnx.cursor()
headers = {
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'accept-encoding': 'gzip, deflate, br',
'accept-language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7',
'cache-control': 'max-age=0',
#'cookie': 'maex=%7B%22v2%22%3A%7B%7D%7D; GUC=AQEBBwFjY49jkEIa8gQo&s=AQAAABw20C7P&g=Y2JIFQ; A1=d=AQABBBIpnmICEOnPTXZVmK6DESXgxq3niTMFEgEBBwGPY2OQYysNb2UB_eMBAAcIEimeYq3niTM&S=AQAAAobGawhriFKqJdu9-rSz9nc; A3=d=AQABBBIpnmICEOnPTXZVmK6DESXgxq3niTMFEgEBBwGPY2OQYysNb2UB_eMBAAcIEimeYq3niTM&S=AQAAAobGawhriFKqJdu9-rSz9nc; A1S=d=AQABBBIpnmICEOnPTXZVmK6DESXgxq3niTMFEgEBBwGPY2OQYysNb2UB_eMBAAcIEimeYq3niTM&S=AQAAAobGawhriFKqJdu9-rSz9nc&j=WORLD; PRF=t%3D6954.T%252BTEL%252BSOLB.BR%252BSTM%252BEMR%252BGT%252BAMD%252BSYM.DE%252BPEMEX%252BSGO.PA%252BLRLCF%252BSYNH%252B001040.KS; cmp=t=1669714927&j=0&u=1---',
'sec-ch-ua': '"Chromium";v="106", "Google Chrome";v="106", "Not;A=Brand";v="99"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': "Windows",
'sec-fetch-dest': 'document',
'sec-fetch-mode': 'navigate',
'sec-fetch-site': 'same-origin',
'sec-fetch-user': '?1',
'upgrade-insecure-requests': '1',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36'
}
def getInfo(gpdm,xydm):
print('开始')
gpdm_ = gpdm
if 'HK' in gpdm_:
gpdm_ = gpdm_[1:]
start = time.time()
retData={}
retData['base_info'] = {
'公司名称': '',
'英文名':'',
'信用代码': xydm,
'股票代码': gpdm_,
'地址': '',
'电话': '',
'公司网站': '',
'部门': '',
'行业': '',
'员工人数': '',
'公司简介': ''
}
retData['people_info']=[]
# https://finance.yahoo.com/quote/VOW3.DE/profile?p=VOW3.DE
url = f'https://finance.yahoo.com/quote/{gpdm}/profile?p={gpdm}'
time.sleep(3)
for i in range(0,3):
try:
response = requests.get(url, headers=headers, verify=False)
time.sleep(1)
if (response.status_code == 200):
break
else:
log.error(f"{gpdm}---第{i}次---获取基本信息接口返回失败:{response.status_code}")
except :
continue
if (response.status_code == 200):
pass
else:
log.error(f"{gpdm}------获取基本信息接口重试后依然失败失败:{response.status_code}")
return retData
soup = BeautifulSoup(response.content, 'html.parser')
page = soup.find('div', {'id': 'Col1-0-Profile-Proxy'})
name = page.find('h3',{'class':'Fz(m) Mb(10px)'})
try:
com_info = page.find('div', {'class': 'Mb(25px)'})
except:
com_info = ''
try:
com_phone = com_info.find_all('p')[0].find('a').text
except:
com_phone = ''
try:
com_url = com_info.find_all('p')[0].find('a', {'target': '_blank'}).text
except:
com_url = ''
try:
com_address = com_info.find_all('p')[0].text.replace(com_phone, '').replace(com_url, '')
except:
com_address = ''
try:
com_bumen = com_info.find_all('p')[1].find_all('span', {'class': 'Fw(600)'})[0].text
except:
com_bumen = ''
try:
com_hangye = com_info.find_all('p')[1].find_all('span', {'class': 'Fw(600)'})[1].text
except:
com_hangye = ''
try:
com_people = com_info.find_all('p')[1].find_all('span', {'class': 'Fw(600)'})[2].text
except:
com_people = ''
try:
com_jianjie = page.find('p', {'class': 'Mt(15px) Lh(1.6)'}).text
except:
com_jianjie = ''
dic_com_info = {
'公司名称':'',
'英文名':name,
'信用代码': xydm,
'股票代码': gpdm_,
'地址': com_address,
'电话': com_phone,
'公司网站': com_url,
'部门': com_bumen,
'行业': com_hangye,
'员工人数': com_people,
'公司简介': com_jianjie
}
retData['base_info']=dic_com_info
#高管信息
retPeople = []
try:
list_people = page.find('table', {'class': 'W(100%)'}).find_all('tr')[1:]
except:
list_people = []
for one_people in list_people:
try:
p_name = one_people.find_all('td')[0].text
except:
p_name = ''
continue
try:
p_zhiwu = one_people.find_all('td')[1].text
except:
p_zhiwu = ''
try:
p_money = one_people.find_all('td')[2].text
except:
p_money = ''
try:
p_xingshi = one_people.find_all('td')[3].text
except:
p_xingshi = ''
try:
p_year = one_people.find_all('td')[4].text
except:
p_year = ''
if(p_zhiwu=="N/A"):
p_zhiwu=""
if (p_money == "N/A"):
p_money = ""
if (p_xingshi == "N/A"):
p_xingshi = ""
if (p_year == "N/A"):
p_year = ""
dic_main_people = {
'公司名称': name,
'股票代码': gpdm_,
'信用代码': xydm,
'姓名': p_name,
'职务': p_zhiwu,
'薪资': p_money,
'行使': p_xingshi,
'出生年份': p_year
}
retPeople.append(dic_main_people)
retData['people_info'] = retPeople
# df_a = pd.DataFrame(retData['base_info'])
log.info(f"获取基本信息--{gpdm},耗时{baseCore.getTimeCost(start, time.time())}")
return retData
# # 数据库中获取企业gpdm、xydm
sql_select = "SELECT * FROM Tfbs where col3 is not null and length(col3)>3 and col6 is not null and state1=1 and col3 like 'ZZSN%' order by date1 ,id LIMIT 1"
curosr.execute(sql_select)
data = curosr.fetchone()
id = data[0]
# 更新以获取企业的采集状态
# sql_update = f"UPDATE Tfbs set state1 = 2 WHERE id = {id}"
# curosr.execute(sql_update)
# cnx.commit()
xydm = data[4]
gpdm = data[7]
# 获取企业的基本信息和高管信息
retData = getInfo(gpdm,xydm)
print(retData)
curosr.close()
cnx.close()
url = 'https://finance.yahoo.com/quote/VOW3.DE/profile?p=VOW3.DE'
req = requests.get(url=url,headers=headers,verify=False)
print(req.status_code)
\ No newline at end of file
import json
import json
import json
import time
import numpy as np
import pandas as pd
import pymysql
import requests
from bs4 import BeautifulSoup
from kafka import KafkaProducer
......@@ -11,7 +13,8 @@ from base.BaseCore import BaseCore
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
cursor = cnx.cursor()
baseCore = BaseCore()
log= baseCore.getLogger()
headers = {
......@@ -31,37 +34,41 @@ headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36'
}
# 获取股票代码
def getGpdm(name):
start=time.time()
gpdm=""
try:
url = f'https://query1.finance.yahoo.com/v1/finance/search?q={name}&lang=en-US&region=US&quotesCount=6&newsCount=2&listsCount=2&enableFuzzyQuery=false&quotesQueryId=tss_match_phrase_query&multiQuoteQueryId=multi_quote_single_token_query&newsQueryId=news_cie_vespa&enableCb=true&enableNavLinks=true&enableEnhancedTrivialQuery=true&enableResearchReports=true&enableCulturalAssets=true&enableLogoUrl=true&researchReportsCount=2'
response = requests.get(url, headers=headers, verify=False,timeout=(3.05, 3))
time.sleep(3)
except:
return gpdm
if (response.status_code == 200):
pass
else:
log.error(f"{name}------获取股票接口返回失败:{response.status_code}")
return gpdm
retJson = json.loads(response.content.decode('utf-8'))
try:
gpdm =retJson['quotes'][0]['symbol']
except:
log.error(f"{name}---获取股票代码异常")
return gpdm
log.info(f"获取股票代码--{name},耗时{baseCore.getTimeCost(start, time.time())}")
return gpdm
# def getGpdm(name):
# start=time.time()
# gpdm=""
# try:
#
# url = f'https://query1.finance.yahoo.com/v1/finance/search?q={name}&lang=en-US&region=US&quotesCount=6&newsCount=2&listsCount=2&enableFuzzyQuery=false&quotesQueryId=tss_match_phrase_query&multiQuoteQueryId=multi_quote_single_token_query&newsQueryId=news_cie_vespa&enableCb=true&enableNavLinks=true&enableEnhancedTrivialQuery=true&enableResearchReports=true&enableCulturalAssets=true&enableLogoUrl=true&researchReportsCount=2'
# response = requests.get(url, headers=headers, verify=False,timeout=(3.05, 3))
# time.sleep(3)
# except:
# return gpdm
# if (response.status_code == 200):
# pass
# else:
# log.error(f"{name}------获取股票接口返回失败:{response.status_code}")
# return gpdm
# retJson = json.loads(response.content.decode('utf-8'))
# try:
# gpdm =retJson['quotes'][0]['symbol']
# except:
# log.error(f"{name}---获取股票代码异常")
# return gpdm
# log.info(f"获取股票代码--{name},耗时{baseCore.getTimeCost(start, time.time())}")
#
# return gpdm
# 根据股票代码 获取企业基本信息 高管信息
def getInfo(name,gpdm,xydm):
def getInfo(gpdm,xydm):
gpdm_ = gpdm
if 'HK' in gpdm_:
gpdm_ = gpdm_
start = time.time()
retData={}
retData['base_info'] = {
'公司名称': name,
'公司名称': '',
'英文名':'',
'信用代码': xydm,
'股票代码': gpdm,
'地址': '',
......@@ -128,7 +135,8 @@ def getInfo(name,gpdm,xydm):
except:
com_jianjie = ''
dic_com_info = {
'公司名称':name,
'公司名称':'',
'英文名':'',
'信用代码': xydm,
'股票代码': gpdm,
'地址': com_address,
......@@ -178,7 +186,7 @@ def getInfo(name,gpdm,xydm):
if (p_year == "N/A"):
p_year = ""
dic_main_people = {
'公司名称': name,
'公司名称': '',
'股票代码': gpdm,
'信用代码': xydm,
'姓名': p_name,
......@@ -191,52 +199,55 @@ def getInfo(name,gpdm,xydm):
retData['people_info'] = retPeople
df_retData = pd.DataFrame(retPeople)
# df_a = pd.DataFrame(retData['base_info'])
df_retData.to_excel('采集高管结果1.xlsx',index=False)
df_retData.to_excel('./data/采集高管结果1.xlsx',index=False)
log.info(f"获取基本信息--{gpdm},耗时{baseCore.getTimeCost(start, time.time())}")
return retData
def Nongpdm(xydm,name,officialUrl,industry,englishName,address):
# def Nongpdm(xydm,name):
# start = time.time()
# company_dict = {
# 'name': name, # 企业名称
# 'shortName': '', # 企业简称
# 'socialCreditCode': xydm, # 统一社会信用代码
# 'officialPhone': '', # 电话
# 'officialUrl': '', # 官网
# 'briefInfo': '', # 简介
# 'industry': '', # 所属行业
# 'englishName': name, # 英文名
# 'address': '', # 地址
# 'status': 0, # 状态
# }
# # print(company_dict)
# producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 0, 2))
# kafka_result = producer.send("regionInfo", json.dumps(company_dict, ensure_ascii=False).encode('utf8'))
# kafka_result.get(timeout=10)
# # log.info(f"保存基本信息--{info['base_info']['信用代码']},耗时{baseCore.getTimeCost(start, time.time())}")
# log.info(f"保存基本信息--{company_dict['name']},耗时{baseCore.getTimeCost(start, time.time())}")
# return company_dict
#
#保存基本信息
def saveBaseInfo(info):
start = time.time()
#基本信息发送到kafka
company_dict = {
'name': name, # 企业名称
'name': '', # 企业名称
'shortName': '', # 企业简称
'socialCreditCode': xydm, # 统一社会信用代码
'officialPhone': '', # 电话
'officialUrl': officialUrl, # 官网
'briefInfo': '', # 简介
'industry': industry, # 所属行业
'englishName': englishName, # 英文名
'address': address, # 地址
'socialCreditCode': info['base_info']['信用代码'], # 统一社会信用代码
'officialPhone': info['base_info']['电话'], # 电话
'officialUrl': info['base_info']['公司网站'], # 官网
'briefInfo': info['base_info']['公司简介'], # 简介
'industry': info['base_info']['行业'], # 所属行业
'englishName': info['base_info']['英文名'], # 英文名
'address': info['base_info']['地址'], # 地址
'status': 0, # 状态
}
# print(company_dict)
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 0, 2))
kafka_result = producer.send("regionInfo", json.dumps(company_dict, ensure_ascii=False).encode('utf8'))
kafka_result.get(timeout=10)
# log.info(f"保存基本信息--{info['base_info']['信用代码']},耗时{baseCore.getTimeCost(start, time.time())}")
log.info(f"保存基本信息--{info['base_info']['信用代码']},耗时{baseCore.getTimeCost(start, time.time())}")
log.info(f"保存基本信息--{company_dict['name']},耗时{baseCore.getTimeCost(start, time.time())}")
return company_dict
#保存基本信息
# def saveBaseInfo(info):
# start = time.time()
# #基本信息发送到kafka
# company_dict = {
# 'name': info['base_info']['公司名称'], # 企业名称
# 'shortName': info['base_info']['公司名称'], # 企业简称
# 'socialCreditCode': info['base_info']['信用代码'], # 统一社会信用代码
# 'officialPhone': info['base_info']['电话'], # 电话
# 'officialUrl': info['base_info']['公司网站'], # 官网
# 'briefInfo': info['base_info']['公司简介'], # 简介
# 'industry': info['base_info']['行业'], # 所属行业
# 'englishName': info['base_info']['公司名称'], # 英文名
# 'address': info['base_info']['地址'], # 地址
# 'status': 0, # 状态
# }
# producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 0, 2))
# kafka_result = producer.send("regionInfo", json.dumps(company_dict, ensure_ascii=False).encode('utf8'))
# kafka_result.get(timeout=10)
# log.info(f"保存基本信息--{info['base_info']['信用代码']},耗时{baseCore.getTimeCost(start, time.time())}")
# # log.info(f"保存基本信息--{company_dict['name']},耗时{baseCore.getTimeCost(start, time.time())}")
#保存高管信息
def savePeopleInfo(info):
......@@ -262,8 +273,9 @@ def savePeopleInfo(info):
}
list_one_info.append(dic_json)
json_updata = json.dumps(list_one_info)
# print(json_updata)
if json_updata == '[]':
pass
pass
else:
for i in range(0,3):
response = requests.post('http://114.115.236.206:9988/datapull/sync/executive',data=json_updata,timeout=300, verify=False)
......@@ -298,74 +310,99 @@ def beginWork():
#给定excel名单 保存股票代码
okCount=0
errorCount=0
df_all_xydm = pd.read_excel('../../data/工作簿1.xlsx',dtype=str,keep_default_na=False)
df_all = pd.read_excel('../../data/23年500强企业新榜股票代码.xlsx',dtype=str, keep_default_na=False)
for i in range(len(df_all_xydm)):
# name = df_all['中文名称'][i]
# rank = df_all['排名'][i]
# officialUrl = df_all['企业官网'][i]
# industry = df_all['行业'][i]
# englishName = df_all['英文名称'][i]
# address = df_all['企业总部地址'][i]
xydm_name = df_all_xydm['名称'][i]
# print(xydm_name)
for j in range(len(df_all)):
name = df_all['中文名称'][j]
if name == xydm_name:
print(name,xydm_name)
xydm = df_all_xydm['信用代码'][i]
if i>=22:
pass
else:
continue
log.info(f"{i}----------开始")
# country = df_all['企业所属国家'][i]
# if country=='中国':
# continue
# else:
# log.info(f"{i}----------为国外企业 继续")
gpdm = df_all['股票代码'][j]
#没有股票代码,就保存榜单中的数据
if gpdm == '':
continue
# xydm = baseCore.getNextXydm()
# Nongpdm(xydm,name,officialUrl,industry,englishName,address)
else:
log.info(f"{j}----------为股票代码不为空 继续")
pass
enname = df_all['英文名称'][j]
if enname != '':
pass
else:
log.info(f"{j}----------英文名字为空 跳过")
continue
# log.info(f"{i}----------开始股票代码")
# gpdm = getGpdm(enname)
# xydm=baseCore.getNextXydm()
retData = getInfo(enname,gpdm,xydm)
# saveBaseInfo(retData)
savePeopleInfo(retData)
#也可以去采集企业动态
news(j,gpdm,xydm)
if gpdm!='':
okCount=okCount+1
else:
errorCount=errorCount+1
log.info(f"{j}-------成功{okCount}--失败-{errorCount}")
if gpdm == '':
continue
else:
pass
df_all['股票代码'][j]=gpdm
else:
continue
if (i % 10 == 0):
df_all.to_excel(r'..\..\data\23年500强企业新上榜_ret22.xlsx', sheet_name='Sheet1', index=False, header=True)
df_all.to_excel(r'..\..\data\23年500强企业新榜_ret22.xlsx', sheet_name='Sheet1', index=False, header=True)
# 释放资源
df_all = pd.read_excel('./data/福布斯美国企业股票代码.xlsx',dtype=str,keep_default_na=False)
a = []
b = []
# df_all = pd.read_excel('../../data/23年500强企业新榜股票代码.xlsx',dtype=str, keep_default_na=False)
for i in range(len(df_all)):
# # name = df_all['中文名称'][i]
# # rank = df_all['排名'][i]
# # officialUrl = df_all['企业官网'][i]
# # industry = df_all['行业'][i]
# # englishName = df_all['英文名称'][i]
# # address = df_all['企业总部地址'][i]
#
# xydm_name = df_all_xydm['名称'][i]
# # print(xydm_name)
# for j in range(len(df_all)):
# name = df_all['中文名称'][j]
# if name == xydm_name:
# print(name,xydm_name)
# xydm = df_all_xydm['信用代码'][i]
# if i>=22:
# pass
# else:
# continue
# log.info(f"{i}----------开始")
# # country = df_all['企业所属国家'][i]
# # if country=='中国':
# # continue
# # else:
# # log.info(f"{i}----------为国外企业 继续")
name = df_all['name'][i]
ename = df_all['companyName'][i]
gpdm= df_all['code'][i]
xydm = df_all['xydm'][i]
# if gpdm == '':
# Nongpdm(xydm,name)
# continue
# # print(f'名字:{name},股票代码:{gpdm},信用代码:{xydm}')
# # #没有股票代码,就保存榜单中的数据
# if gpdm == '':
# Nongpdm(gpdm,name)
# continue
# # xydm = baseCore.getNextXydm()
# # # Nongpdm(xydm,name,officialUrl,industry,englishName,address)
# # else:
# # log.info(f"{j}----------为股票代码不为空 继续")
# # pass
# # enname = df_all['英文名称'][j]
# # if enname != '':
# # pass
# # else:
# # log.info(f"{j}----------英文名字为空 跳过")
# # continue
# # # log.info(f"{i}----------开始股票代码")
# # # gpdm = getGpdm(enname)
# # # xydm=baseCore.getNextXydm()
# #
try:
retData = getInfo(name,ename,gpdm,xydm)
saveBaseInfo(retData)
savePeopleInfo(retData)
#采集企业动态
news(i,gpdm,xydm)
okCount += 1
b.append([name,gpdm,xydm])
print(name,'.......采集成功')
except:
a.append([name,gpdm,xydm])
errorCount += 1
print(name,'......采集失败')
print(f'成功{okCount}个,失败{errorCount}个')
df_a = pd.DataFrame(np.array(a))
df_a.columns = ['name','gpdm','xydm']
df_a.to_excel('./data/没有采集到.xlsx',index=False)
df_b = pd.DataFrame(np.array(b))
df_b.columns = ['name','gpdm','xydm']
df_b.to_excel('./data/采集到.xlsx',index=False)
#
# if gpdm!='':
# okCount=okCount+1
# else:
# errorCount=errorCount+1
# log.info(f"{i}-------成功{okCount}--失败-{errorCount}")
# # if gpdm == '':
# # continue
# # else:
# # pass
# # df_all['股票代码'][j]=gpdm
# # else:
# # continue
# # if (i % 10 == 0):
# # df_all.to_excel(r'./data/23年500强企业新上榜_ret22.xlsx', sheet_name='Sheet1', index=False, header=True)
# # df_all.to_excel(r'./data/23年500强企业新榜_ret22.xlsx', sheet_name='Sheet1', index=False, header=True)
# # 释放资源
baseCore.close()
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论