提交 a9946e9a 作者: 薛凌堃

国外企业基本信息-高管信息-企业动态

上级 8f9f0213
...@@ -41,7 +41,7 @@ def beinWork(tyc_code,social_code): ...@@ -41,7 +41,7 @@ def beinWork(tyc_code,social_code):
# time.sleep(random.randint(3, 5)) # time.sleep(random.randint(3, 5))
break break
except Exception as e : except Exception as e :
log.error("request请求异常----m-----{e}") log.error(f"request请求异常----m-----{e}")
pass pass
if (response.status_code == 200): if (response.status_code == 200):
......
"""
增量采集:
取state为3、update_state为空的企业 表示上次采集成功的企业,
新增update_state字段,取一个企业更新为2,表示该企业正在采集。
采集完毕更新为1.
表示已经采集完成。跟据date_time 来排列 每次就不会拿到重复的数据。
okCount
errorCount
repectCount
新增三个字段分别对应更新的up_okCount up_errorCount up_repectCount ,
记录这些更新的数据 然后加到原来的数据上表示该企业已采集多少动态
8.8日改版,企业动态也传kafka
"""
import json
import requests,time,pymysql
import jieba
import sys
from kafka import KafkaProducer
from base.BaseCore import BaseCore
from base.smart import smart_extractor
# sys.path.append('D:\\KK\\zzsn_spider\\base')
# import BaseCore
# from smart import smart_extractor
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
jieba.cut("必须加载jieba")
# 初始化,设置中文分词
smart =smart_extractor.SmartExtractor('cn')
baseCore = BaseCore()
log = baseCore.getLogger()
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
cursor= cnx.cursor()
pageSize = 10
headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Connection': 'keep-alive',
'Content-Type': 'application/json',
'Cookie':'jsid=SEO-BAIDU-ALL-SY-000001; TYCID=77e997401d5f11ee9e91d5a0fd3c0b89; ssuid=6450041974; _ga=GA1.2.858826166.1688800641; _gid=GA1.2.2142449376.1689575510; tyc-user-info-save-time=1689764135027; sensorsdata2015jssdkcross=%7B%22distinct_id%22%3A%22309757777%22%2C%22first_id%22%3A%22189345cb10257d-0cfee05327f673-26031d51-1327104-189345cb10375b%22%2C%22props%22%3A%7B%22%24latest_traffic_source_type%22%3A%22%E8%87%AA%E7%84%B6%E6%90%9C%E7%B4%A2%E6%B5%81%E9%87%8F%22%2C%22%24latest_search_keyword%22%3A%22%E6%9C%AA%E5%8F%96%E5%88%B0%E5%80%BC%22%2C%22%24latest_referrer%22%3A%22https%3A%2F%2Fwww.baidu.com%2Flink%22%7D%2C%22identities%22%3A%22eyIkaWRlbnRpdHlfY29va2llX2lkIjoiMTg5MzQ1Y2IxMDI1N2QtMGNmZWUwNTMyN2Y2NzMtMjYwMzFkNTEtMTMyNzEwNC0xODkzNDVjYjEwMzc1YiIsIiRpZGVudGl0eV9sb2dpbl9pZCI6IjMwOTc1Nzc3NyJ9%22%2C%22history_login_id%22%3A%7B%22name%22%3A%22%24identity_login_id%22%2C%22value%22%3A%22309757777%22%7D%2C%22%24device_id%22%3A%22189345cb10257d-0cfee05327f673-26031d51-1327104-189345cb10375b%22%7D; bannerFlag=true; Hm_lvt_e92c8d65d92d534b0fc290df538b4758=1689752829,1689821665,1689831487,1689845884; searchSessionId=1689845917.81838207; HWWAFSESID=146bb1d25b1515339d3; HWWAFSESTIME=1689858023324; Hm_lpvt_e92c8d65d92d534b0fc290df538b4758=1689859758',
'Host': 'capi.tianyancha.com',
'Origin': 'https://www.tianyancha.com',
'Referer': 'https://www.tianyancha.com/',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Edg/114.0.1823.51'
}
def beinWork(tyc_code,social_code):
time.sleep(3)
retData={'up_state':False,'total':0,'up_okCount':0,'up_errorCount':0,'up_repetCount':0}
t=time.time()
url = f'https://capi.tianyancha.com/cloud-yq-news/company/detail/publicmsg/news/webSimple?_={t}&id={tyc_code}&ps={pageSize}&pn=1&emotion=-100&event=-100'
for m in range(0, 3):
try:
ip = baseCore.get_proxy()
headers['User-Agent']=baseCore.getRandomUserAgent()
response = requests.get(url=url, headers=headers, proxies=ip, verify=False)
# time.sleep(random.randint(3, 5))
break
except Exception as e :
log.error("request请求异常----m-----{e}")
pass
if (response.status_code == 200):
pass
else:
log.error(f"{tyc_code}-----获取总数接口失败")
return retData
try:
json_1 = json.loads(response.content.decode('utf-8'))
total = json_1['data']['total']
except:
log.error(f"{tyc_code}-----获取总数失败")
return retData
if (total > 0):
if (total % pageSize == 0):
totalPage = total // pageSize
else:
totalPage = total // pageSize + 1
else:
log.error(f"{tyc_code}--------总数为0")
retData['state']=True
return retData
log.info(f"{tyc_code}-------总数:{total}----总页数:{totalPage}")
retData['total']=total
up_okCount = 0
up_errorCount = 0
up_repetCount = 0
for num in range(1, totalPage+1):
time.sleep(3)
log.info(f"获取分页数据--{tyc_code}----分页{num}----开始")
start_page = time.time()
url_page = f'https://capi.tianyancha.com/cloud-yq-news/company/detail/publicmsg/news/webSimple?_={time.time()}&id={tyc_code}&ps={pageSize}&pn={num}&emotion=-100&event=-100'
ip = baseCore.get_proxy()
for m in range(0, 3):
try:
headers['User-Agent']=baseCore.getRandomUserAgent()
response_page = requests.get(url=url_page,headers=headers, proxies=ip, verify=False)
# time.sleep(3)
break
except:
pass
if (response_page.status_code == 200):
pass
else:
log.error(f"{tyc_code}--{num}页---获取分页数据失败")
up_errorCount = up_errorCount+pageSize
continue
try:
json_page = json.loads(response_page.content.decode('utf-8'))
info_list_page = json_page['data']['items']
except:
log.error(f"{tyc_code}--{num}页---获取分页数据失败")
up_errorCount = up_errorCount + pageSize
continue
pageIndex=0
for info_page in info_list_page:
pageIndex=pageIndex+1
title = info_page['title']
source = info_page['website']
link = info_page['uri']
try:
sel_sql = '''select social_credit_code from brpa_source_article where source_address = %s and social_credit_code=%s and type='2' '''
cursor.execute(sel_sql,(link,social_code))
except Exception as e:
print(e)
selects = cursor.fetchone()
if selects:
log.info(f'{tyc_code}-----{social_code}----{link}:已经存在')
# up_repetCount = up_repetCount + 1
# continue
#todo:如果该条数据存在则说明该条数据之后的都已经采集完成,就可以跳出函数,执行下一个企业
retData['up_state'] = True
retData['up_okCount'] = up_okCount
retData['up_errorCount'] = up_errorCount
retData['up_repetCount'] = up_repetCount
return retData
try:
time_struct = time.localtime(int(info_page['rtm'] / 1000)) # 首先把时间戳转换为结构化时间
time_format = time.strftime("%Y-%m-%d %H-%M-%S", time_struct) # 把结构化时间转换为格式化时间
except:
time_format = baseCore.getNowTime(1)
try:
# 开始进行智能解析
contentText = smart.extract_by_url(link).text
# time.sleep(3)
except Exception as e:
contentText = ''
if contentText == '':
log.error(f'获取正文失败:--------{tyc_code}--------{num}--------{link}')
up_errorCount = up_errorCount + 1
try:
insert_err_sql = f"insert into dt_err(xydm,`from`,url,title,pub_date,zhaiyao,create_date,state,pageNo,pageIndex) values('{social_code}','{source}','{link}','{title}','{time_format}','{info_page['abstracts']}',now(),1,{num},{pageIndex})"
cursor.execute(insert_err_sql)
cnx.commit()
except:
pass
continue
try:
insert_sql = '''insert into brpa_source_article(social_credit_code,title,summary,content,publish_date,source_address,origin,author,type,lang) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
# 动态信息列表
up_okCount = up_okCount + 1
list_info = [
social_code,
title,
info_page['abstracts'], # 摘要
contentText, # 正文
time_format, # 发布时间
link,
'天眼查',
source,
'2',
'zh'
]
cursor.execute(insert_sql, tuple(list_info))
cnx.commit()
# 采集一条资讯记录一条,记录该企业采到了多少的资讯
log.info(f'{social_code}----{link}:新增一条')
sel_sql = "select article_id from brpa_source_article where source_address = %s and social_credit_code = %s"
cursor.execute(sel_sql, (link,social_code))
row = cursor.fetchone()
id = row[0]
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
#todo:插入一条数据,并传入kafka
dic_news = {
'attachmentIds': id,
'author': '',
'content': contentText,
'contentWithTag': contentText,
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': 'zh',
'origin': '天眼查',
'publishDate': time_format,
'sid': '1684032033495392257',
'sourceAddress': link, # 原文链接
'summary': info_page['abstracts'],
'title': contentText,
'type': 2,
'socialCreditCode': social_code,
'year': time_format[:4]
}
# print(dic_news)
# 将相应字段通过kafka传输保存
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("researchReportTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
dic_result = {
'success': 'ture',
'message': '操作成功',
'code': '200',
}
log.info(dic_result)
return True
except Exception as e:
dic_result = {
'success': 'false',
'message': '操作失败',
'code': '204',
'e': e
}
log.error(dic_result)
except Exception as e:
log.info(f'传输失败:{social_code}----{link}')
log.info(f"获取分页数据--{tyc_code}----分页{num},耗时{baseCore.getTimeCost(start_page, time.time())}")
retData['up_state'] = True
retData['up_okCount'] = up_okCount
retData['up_errorCount'] = up_errorCount
retData['up_repetCount'] = up_repetCount
return retData
def doJob():
while True:
# selectSql = f"select id,xydm,tycid from ssqy_tyc where state=3 and update_state =1 order by date_time asc limit 1"
selectSql = "select id,xydm,tycid from ssqy_tyc where xydm = '91520200214409696J' "
cursor.execute(selectSql)
data = cursor.fetchone()
if (data):
pass
else:
log.info("没有数据了,结束脚本")
break
data_list = list(data)
id = data_list[0]
xydm = data_list[1]
tycid = data_list[2]
log.info(f"{id}---{xydm}----{tycid}----开始处理")
start_time = time.time()
updateBeginSql = f"update ssqy_tyc set update_state=2,date_time=now() where id={id}"
cursor.execute(updateBeginSql)
cnx.commit()
# 开始采集企业动态
retData = beinWork(tycid, xydm)
up_state = retData['up_state']
total= retData['total']
up_okCount = retData['up_okCount']
up_errorCount = retData['up_errorCount']
up_repetCount = retData['up_repetCount']
if up_state:
stateNum = 1
else:
stateNum = 4
# updateEndSql = f"update ssqy_tyc set update_state={stateNum},up_okCount={up_okCount},up_errorCount={up_errorCount},up_repetCount={up_repetCount} ,date_time=now() where id={id}"
# cursor.execute(updateEndSql)
# cnx.commit()
# 取出数据库中okCount errorCount repetCount 并更新
selectOrginSql = f"select okCount,errorCount,repetCount,total from ssqy_tyc where id={id}"
cursor.execute(selectOrginSql)
count_info = cursor.fetchone()
okCount = count_info[0]
errorCount = count_info[1]
repetCount = count_info[2]
updateEndSql = f"update ssqy_tyc set update_state={stateNum},up_okCount={up_okCount},up_errorCount={up_errorCount},up_repetCount={up_repetCount} ,date_time=now(),okCount={okCount+up_okCount},errorCount={errorCount+up_errorCount},repetCount={repetCount+up_repetCount},total={total} where id={id}"
cursor.execute(updateEndSql)
cnx.commit()
log.info(f"{id}---{xydm}----{tycid}----结束处理,耗时{baseCore.getTimeCost(start_time, time.time())}---总数:{total}---成功数:{up_okCount}----失败数:{up_errorCount}--重复数:{up_repetCount}")
cursor.close()
cnx.close()
#释放资源
baseCore.close()
# Press the green button in the gutter to run the script.
if __name__ == '__main__':
doJob()
# link = 'https://m.thepaper.cn/newsDetail_forward_24049067'
# social_code = '915101006653023886'
# try:
# sel_sql = '''select social_credit_code from brpa_source_article where source_address = %s and social_credit_code = %s and type='2' '''
# print(sel_sql)
# cursor.execute(sel_sql, (link,social_code))
# aa = cursor.fetchone()
# print(aa)
# except Exception as e:
# print(e)
# 雅虎财经企业动态获取
import time
import pandas as pd
import pymysql
import requests
from bs4 import BeautifulSoup
from selenium.webdriver.common.by import By
from selenium import webdriver
from base.BaseCore import BaseCore
baseCore = BaseCore()
log= baseCore.getLogger()
#获取资讯详情
def getZx(xydm,url,title,cnx):
start_time_content= time.time()
try:
chrome_options_content = webdriver.ChromeOptions()
chrome_options_content.add_argument('--disable-gpu')
chrome_options_content.add_argument('--ignore-certificate-errors')
chrome_options_content.add_experimental_option('excludeSwitches', ['enable-automation'])
chrome_options_content.add_argument("--disable-blink-features=AutomationControlled")
chrome_options_content.add_argument("--start-maximized")
prefs_content = {'profile.managed_default_content_settings.images': 2}
chrome_options_content.add_experimental_option('prefs', prefs_content)
chrome_options_content.add_argument('--headless')
executable_path = r'D:\chrome\chromedriver.exe'
driverContent = webdriver.Chrome(options=chrome_options_content, executable_path=executable_path)
driverContent.get(url)
try:
clickButton = driverContent.find_element(By.CLASS_NAME,"collapse-button")
clickButton.click()
except Exception as e:
pass
time.sleep(0.5)
authorElement = driverContent.find_element(By.CLASS_NAME,"caas-author-byline-collapse")
timeElement = driverContent.find_element(By.CLASS_NAME,"caas-attr-time-style").find_element(By.TAG_NAME,"time")
contentElement = driverContent.find_element(By.CLASS_NAME,"caas-body")
author = authorElement.text.lstrip().strip().replace("'","''")
pub_time = timeElement.get_attribute("datetime").lstrip().strip().replace("'","''").replace("T"," ")
pub_time = pub_time[0:19]
content = contentElement.text.lstrip().strip().replace("'","''")
driverContent.close()
# 动态信息列表
list_info = [
xydm,
title,
'',
content,
pub_time,
url,
'雅虎财经',
author,
'2',
'zh'
]
with cnx.cursor() as cursor:
try:
insert_sql = '''insert into brpa_source_article(social_credit_code,title,summary,content,publish_date,source_address,origin,author,type,lang) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
cursor.execute(insert_sql, tuple(list_info))
cnx.commit()
except Exception as e1:
log.error("保存数据库失败")
log.info(f"文章耗时,耗时{baseCore.getTimeCost(start_time_content,time.time())}")
except Exception as e:
log.error("获取正文失败")
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--ignore-certificate-errors')
chrome_options.add_experimental_option('excludeSwitches', ['enable-automation'])
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
chrome_options.add_argument("--start-maximized")
prefs = {'profile.managed_default_content_settings.images': 2}
chrome_options.add_experimental_option('prefs',prefs)
chrome_options.add_argument('--headless')
executable_path = r'D:\chrome\chromedriver.exe'
driver = webdriver.Chrome(options=chrome_options, executable_path=executable_path)
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
def scroll(driver):
for i in range(0,30):
#js = "window.scrollTo(0,document.body.scrollHeight)"
js = "var q=document.documentElement.scrollTop=100000"
driver.execute_script(js)
time.sleep(0.1)
# #读取excel数据
# df_all = pd.read_excel(r'./../data/2023年500强新上榜名单.xlsx', sheet_name='500强23年国外', keep_default_na=False)
# for num in range(len(df_all)):
# start_time = time.time()
# # country = df_all['国别'][num]
# # if(country!='国外'):
# # continue
# enname=df_all['英文名称'][num]
# gpdm = df_all['股票票代码'][num]
# xydm = df_all['信用代码'][num]
# if(gpdm==''):
# log.error(f"{num}--{gpdm}--股票代码为空 跳过")
# continue
# if (xydm == ''):
# log.error(f"{num}--{gpdm}--信用代码为空 跳过")
# continue
# count = int(df_all['企业动态数量(7.15)'][num])
# # if(count>0):
# # log.error(f"{num}--{gpdm}--动态大于0 跳过")
# # continue
#https://finance.yahoo.com/quote/GOOG/press-releases?p=GOOG
def news(num,gpdm,xydm):
start_time = time.time()
url=f"https://finance.yahoo.com/quote/{gpdm}/press-releases?p={gpdm}"
driver.get(url)
scroll(driver)
# if True:
# continue
try:
news_div = driver.find_element(By.ID, 'summaryPressStream-0-Stream')
except Exception as e:
log.error(f"{num}--{gpdm}--没找到新闻元素")
return
news_lis = news_div.find_elements(By.XPATH,"./ul/li")
log.info(f"{num}--{gpdm}--{len(news_lis)}条信息")
for i in range(0,len(news_lis)):
try:
a_ele= news_lis[i].find_element(By.XPATH,"./div[1]/div[1]/div[2]/h3[1]/a")
except Exception :
log.error(f"{num}--{gpdm}--{i}----a标签没找到")
continue
news_url = a_ele.get_attribute("href").lstrip().strip().replace("'","''")
if(news_url.startswith("https://finance.yahoo.com")):
pass
else:
continue
#判断url是否已经存在
with cnx.cursor() as cursor:
sel_sql = '''select social_credit_code from brpa_source_article where source_address = %s and social_credit_code=%s '''
cursor.execute(sel_sql, (news_url,xydm))
selects = cursor.fetchall()
if selects:
log.error(f"{num}--{gpdm}--网址已经存在----{news_url}")
continue
title = a_ele.text.lstrip().strip().replace("'","''")
getZx(xydm,news_url,title,cnx)
log.info(f"{num}--{gpdm}--{i}----{news_url}----------{news_url}")
log.info(f"{num}--{gpdm}--企业整体,耗时{baseCore.getTimeCost(start_time,time.time())}")
#释放资源
baseCore.close()
\ No newline at end of file
# 雅虎财经企业动态获取 # 雅虎财经企业动态获取
...@@ -100,12 +100,12 @@ def scroll(driver): ...@@ -100,12 +100,12 @@ def scroll(driver):
#读取excel数据 #读取excel数据
df_all = pd.read_excel(r'.\data\国外企业.xlsx', sheet_name=0, keep_default_na=False) df_all = pd.read_excel(r'./../data/2023年500强新上榜名单.xlsx', sheet_name='500强23年国外', keep_default_na=False)
for num in range(718,len(df_all)): for num in range(len(df_all)):
start_time = time.time() start_time = time.time()
country = df_all['国别'][num] # country = df_all['国别'][num]
if(country!='国外'): # if(country!='国外'):
continue # continue
enname=df_all['英文名称'][num] enname=df_all['英文名称'][num]
gpdm = df_all['股票票代码'][num] gpdm = df_all['股票票代码'][num]
xydm = df_all['信用代码'][num] xydm = df_all['信用代码'][num]
...@@ -121,6 +121,7 @@ for num in range(718,len(df_all)): ...@@ -121,6 +121,7 @@ for num in range(718,len(df_all)):
# continue # continue
#https://finance.yahoo.com/quote/GOOG/press-releases?p=GOOG #https://finance.yahoo.com/quote/GOOG/press-releases?p=GOOG
# def news(i,gpdm):
url=f"https://finance.yahoo.com/quote/{gpdm}/press-releases?p={gpdm}" url=f"https://finance.yahoo.com/quote/{gpdm}/press-releases?p={gpdm}"
driver.get(url) driver.get(url)
scroll(driver) scroll(driver)
......
import json import json
...@@ -5,11 +5,15 @@ import pandas as pd ...@@ -5,11 +5,15 @@ import pandas as pd
import requests import requests
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from kafka import KafkaProducer from kafka import KafkaProducer
from NewsYahoo import news
from base.BaseCore import BaseCore from base.BaseCore import BaseCore
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
baseCore = BaseCore() baseCore = BaseCore()
log= BaseCore.getLogger() log= baseCore.getLogger()
headers = { headers = {
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9', 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'accept-encoding': 'gzip, deflate, br', 'accept-encoding': 'gzip, deflate, br',
...@@ -185,29 +189,54 @@ def getInfo(name,gpdm,xydm): ...@@ -185,29 +189,54 @@ def getInfo(name,gpdm,xydm):
} }
retPeople.append(dic_main_people) retPeople.append(dic_main_people)
retData['people_info'] = retPeople retData['people_info'] = retPeople
df_retData = pd.DataFrame(retPeople)
# df_a = pd.DataFrame(retData['base_info'])
df_retData.to_excel('采集高管结果1.xlsx',index=False)
log.info(f"获取基本信息--{gpdm},耗时{baseCore.getTimeCost(start, time.time())}") log.info(f"获取基本信息--{gpdm},耗时{baseCore.getTimeCost(start, time.time())}")
return retData return retData
#保存基本信息 def Nongpdm(xydm,name,officialUrl,industry,englishName,address):
def saveBaseInfo(info):
start = time.time() start = time.time()
#基本信息发送到kafka
company_dict = { company_dict = {
'name': info['base_info']['公司名称'], # 企业名称 'name': name, # 企业名称
'shortName': info['base_info']['公司名称'], # 企业简称 'shortName': '', # 企业简称
'socialCreditCode': info['base_info']['信用代码'], # 统一社会信用代码 'socialCreditCode': xydm, # 统一社会信用代码
'officialPhone': info['base_info']['电话'], # 电话 'officialPhone': '', # 电话
'officialUrl': info['base_info']['公司网站'], # 官网 'officialUrl': officialUrl, # 官网
'briefInfo': info['base_info']['公司简介'], # 简介 'briefInfo': '', # 简介
'industry': info['base_info']['行业'], # 所属行业 'industry': industry, # 所属行业
'englishName': info['base_info']['公司名称'], # 英文名 'englishName': englishName, # 英文名
'address': info['base_info']['地址'], # 地址 'address': address, # 地址
'status': 0, # 状态 'status': 0, # 状态
} }
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 0, 2)) producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 0, 2))
kafka_result = producer.send("regionInfo", json.dumps(company_dict, ensure_ascii=False).encode('utf8')) kafka_result = producer.send("regionInfo", json.dumps(company_dict, ensure_ascii=False).encode('utf8'))
kafka_result.get(timeout=10) kafka_result.get(timeout=10)
log.info(f"保存基本信息--{info['base_info']['信用代码']},耗时{baseCore.getTimeCost(start, time.time())}") # log.info(f"保存基本信息--{info['base_info']['信用代码']},耗时{baseCore.getTimeCost(start, time.time())}")
log.info(f"保存基本信息--{company_dict['name']},耗时{baseCore.getTimeCost(start, time.time())}")
return company_dict
#保存基本信息
# def saveBaseInfo(info):
# start = time.time()
# #基本信息发送到kafka
# company_dict = {
# 'name': info['base_info']['公司名称'], # 企业名称
# 'shortName': info['base_info']['公司名称'], # 企业简称
# 'socialCreditCode': info['base_info']['信用代码'], # 统一社会信用代码
# 'officialPhone': info['base_info']['电话'], # 电话
# 'officialUrl': info['base_info']['公司网站'], # 官网
# 'briefInfo': info['base_info']['公司简介'], # 简介
# 'industry': info['base_info']['行业'], # 所属行业
# 'englishName': info['base_info']['公司名称'], # 英文名
# 'address': info['base_info']['地址'], # 地址
# 'status': 0, # 状态
# }
# producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 0, 2))
# kafka_result = producer.send("regionInfo", json.dumps(company_dict, ensure_ascii=False).encode('utf8'))
# kafka_result.get(timeout=10)
# log.info(f"保存基本信息--{info['base_info']['信用代码']},耗时{baseCore.getTimeCost(start, time.time())}")
# # log.info(f"保存基本信息--{company_dict['name']},耗时{baseCore.getTimeCost(start, time.time())}")
#保存高管信息 #保存高管信息
def savePeopleInfo(info): def savePeopleInfo(info):
...@@ -269,43 +298,73 @@ def beginWork(): ...@@ -269,43 +298,73 @@ def beginWork():
#给定excel名单 保存股票代码 #给定excel名单 保存股票代码
okCount=0 okCount=0
errorCount=0 errorCount=0
df_all = pd.read_excel('./data/96-22的500强企业清单.xlsx', dtype=str, keep_default_na=False) df_all_xydm = pd.read_excel('../../data/工作簿1.xlsx',dtype=str,keep_default_na=False)
for i in range(300, len(df_all)): df_all = pd.read_excel('../../data/23年500强企业新榜股票代码.xlsx',dtype=str, keep_default_na=False)
log.info(f"{i}----------开始") for i in range(len(df_all_xydm)):
country = df_all['国内外'][i] # name = df_all['中文名称'][i]
if country=='国外': # rank = df_all['排名'][i]
pass # officialUrl = df_all['企业官网'][i]
else: # industry = df_all['行业'][i]
log.info(f"{i}----------为国内企业 跳过") # englishName = df_all['英文名称'][i]
continue # address = df_all['企业总部地址'][i]
gpdm = df_all['股票代码'][i]
if gpdm == '':
pass
else:
log.info(f"{i}----------为股票代码不为空 跳过")
continue
enname = df_all['英文名称'][i]
if enname != '':
pass
else:
log.info(f"{i}----------英文名字为空 跳过")
continue
log.info(f"{i}----------开始股票代码")
gpdm = getGpdm(enname)
if gpdm!='': xydm_name = df_all_xydm['名称'][i]
okCount=okCount+1 # print(xydm_name)
else: for j in range(len(df_all)):
errorCount=errorCount+1 name = df_all['中文名称'][j]
log.info(f"{i}-------成功{okCount}--失败-{errorCount}") if name == xydm_name:
if gpdm == '': print(name,xydm_name)
continue xydm = df_all_xydm['信用代码'][i]
else: if i>=22:
pass pass
df_all['股票代码'][i]=gpdm else:
continue
log.info(f"{i}----------开始")
# country = df_all['企业所属国家'][i]
# if country=='中国':
# continue
# else:
# log.info(f"{i}----------为国外企业 继续")
gpdm = df_all['股票代码'][j]
#没有股票代码,就保存榜单中的数据
if gpdm == '':
continue
# xydm = baseCore.getNextXydm()
# Nongpdm(xydm,name,officialUrl,industry,englishName,address)
else:
log.info(f"{j}----------为股票代码不为空 继续")
pass
enname = df_all['英文名称'][j]
if enname != '':
pass
else:
log.info(f"{j}----------英文名字为空 跳过")
continue
# log.info(f"{i}----------开始股票代码")
# gpdm = getGpdm(enname)
# xydm=baseCore.getNextXydm()
retData = getInfo(enname,gpdm,xydm)
# saveBaseInfo(retData)
savePeopleInfo(retData)
#也可以去采集企业动态
news(j,gpdm,xydm)
if gpdm!='':
okCount=okCount+1
else:
errorCount=errorCount+1
log.info(f"{j}-------成功{okCount}--失败-{errorCount}")
if gpdm == '':
continue
else:
pass
df_all['股票代码'][j]=gpdm
else:
continue
if (i % 10 == 0): if (i % 10 == 0):
df_all.to_excel(r'.\data\96-22的500强企业清单_ret.xlsx', sheet_name='Sheet1', index=False, header=True) df_all.to_excel(r'..\..\data\23年500强企业新上榜_ret22.xlsx', sheet_name='Sheet1', index=False, header=True)
df_all.to_excel(r'.\data\96-22的500强企业清单_ret.xlsx', sheet_name='Sheet1', index=False, header=True) df_all.to_excel(r'..\..\data\23年500强企业新榜_ret22.xlsx', sheet_name='Sheet1', index=False, header=True)
# 释放资源 # 释放资源
baseCore.close() baseCore.close()
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论