提交 d9c9ea4a 作者: XveLingKun

股东信息

上级 12fce26b
import json
import requests, time
from bs4 import BeautifulSoup
import urllib3
from retry import retry
from base import BaseCore
from getTycId import getTycIdByXYDM
baseCore = BaseCore.BaseCore()
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
log = baseCore.getLogger()
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
cnx = baseCore.cnx_
cursor = baseCore.cursor_
list_all_1 = []
list_all_2 = []
taskType = '天眼查/股东信息'
from classtool import Token, Info
token = Token()
info = Info()
@retry(tries=3, delay=1)
def get_html(tycid, driver, dic_info):
url = f"https://www.tianyancha.com/company/{tycid}"
driver.get(url=url)
time.sleep(3)
page_source = driver.page_source
soup = BeautifulSoup(page_source, 'html.parser')
script = soup.find('script', attrs={'id': '__NEXT_DATA__'}).text
script = json.loads(script)
script = script['props']['pageProps']['dehydratedState']['queries'][0]['state']['data']['data']['tagListV2']
tag_list = []
filter_list = ['存续', '曾用名', '竞争风险', '司法案件', '合作风险', '股权出质', '仍注册']
for tag in script:
if tag['title'] in filter_list:
continue
if tag['color'] == '#FF463C':
continue
tag_list.append(tag['title'])
dic_info['企业标签'] = tag_list
try:
div_part = soup.find('div', attrs={'data-dim': 'holder'})
# div_part.find('div', class_='dimHeader_root__XTCLe')
except:
return -1, dic_info, -1
if div_part is None:
return -2, dic_info, -2
else:
try:
tmp_field = div_part.find('h3', class_='dimHeader_main-title-txt__GPoaZ').text
if '股东信息' in tmp_field:
log.info('股东信息')
total = div_part.find('div', class_='dim-tab-root').find('span').get_text().split('股东信息')[1].replace(
' ', '')
return int(total), dic_info, 1
else: # 否则就是主要股东接口
if '主要股东' in tmp_field:
log.info('主要股东')
total = div_part.find('div', class_='dim-tab-root').find('span').get_text().split('最新公示')[1].replace(' ', '')
return int(total), dic_info, 2
except:
return 0, dic_info
@retry(tries=5, delay=3)
def get_page(url, s, headers):
ip = baseCore.get_proxy()
res = s.get(url=url, headers=headers, proxies=ip, timeout=(5, 10))
if res.status_code != 200:
raise
data_page = res.json()
# log.info(f'接口获取总数---{data_page}')
try:
total_page_ = data_page['data']['total']
except:
raise
return total_page_, data_page
@retry(tries=5, delay=3)
def get_page1(url, s, headers):
ip = baseCore.get_proxy()
header = {
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Connection': 'keep-alive',
'Content-Type': 'application/json',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-site',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36',
'X-AUTH-TOKEN': 'eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiIxMzYzNjcxMTc0NiIsImlhdCI6MTcxNDk1Njg3MywiZXhwIjoxNzE3NTQ4ODczfQ.qMEvtETT7RS3Rhwq9idu5H2AKMxc2cjtr5bDDW6C6yOFKR-ErgDwT4SOBX9PB2LWDexAG2hNaeAvn6swr-n6VA',
'X-TYCID': 'dad485900fcc11ee8c0de34479b5b939',
'sec-ch-ua': '"Chromium";v="124", "Google Chrome";v="124", "Not-A.Brand";v="99"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'version': 'TYC-Web'
}
res = s.get(url=url, headers=headers, proxies=ip, timeout=(5, 10))
if res.status_code != 200:
raise
data_page = res.json()
# log.info(f'接口获取总数---{data_page}')
try:
total_page_ = data_page['data']['stockHolder']['total']
except:
raise
return total_page_, data_page
@retry(tries=5, delay=3)
def post_page(url, s, headers, payload):
ip = baseCore.get_proxy()
res = s.post(url=url, headers=headers, data=json.dumps(payload), proxies=ip, timeout=(5, 10))
if res.status_code != 200:
raise
json_info = res.json()
try:
total_page_ = json_info['data']['total']
except:
raise
return total_page_, json_info
from selenium import webdriver
def create_driver():
path = r'D:\soft\msedgedriver.exe'
# options = webdriver.EdgeOptions()
options = {
"browserName": "MicrosoftEdge",
"ms:edgeOptions": {
"extensions": [], "args": ["--start-maximized"] # 添加最大化窗口运作参数
}
}
session = webdriver.Edge(executable_path=path, capabilities=options)
return session
def login(driver):
cookies = {}
cookies_list, id_cookie, user_name = token.get_cookies()
if cookies_list:
pass
else:
log.info("没有账号了,等待30分钟")
time.sleep(30 * 60)
return '', '', ''
log.info(f'=====当前使用的是{user_name}的cookie======')
for cookie in cookies_list:
driver.add_cookie(cookie)
time.sleep(3)
driver.refresh()
time.sleep(3)
for cookie in cookies_list:
cookies[cookie['name']] = cookie['value']
s = requests.Session()
s.cookies.update(cookies)
return driver, id_cookie, s
def doJob():
# for social_code in social_code_list:
driver = create_driver()
url = 'https://www.tianyancha.com/'
driver.get(url)
driver.maximize_window()
for i in range(1000):
# while True:
# todo:设置cookies的使用
dic_info = {}
headers = {
'Accept-Language': 'zh-CN,zh;q=0.9',
'Content-Type': 'application/json',
'Connection': 'keep-alive',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'version': 'TYC-Web'
}
driver, id_cookie, s = login(driver)
if id_cookie:
pass
else:
continue
# 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息
item = baseCore.redicPullData('shareHolderInfo')
# item = '1|914401010885128005'
# 判断 如果Redis中已经没有数据,则等待
# social_code = '91110108780992804C'
if item == None:
time.sleep(30 * 60)
continue
start = time.time()
no = item.split('|')[0]
social_code = item.split('|')[1]
try:
try:
data = baseCore.getInfomation(social_code)
except:
data = []
if len(data) != 0:
id = data[0]
com_name = data[1]
xydm = data[2]
tycid = data[11]
count = data[17]
else:
# 数据重新塞入redis
# log.info(f'数据库中无该企业{social_code}')
sql = f"SELECT * FROM sys_base_enterprise WHERE social_credit_code = '{social_code}'"
cursor.execute(sql)
data = cursor.fetchone()
if data:
com_name = data[3]
xydm = data[1]
# 写入数据库
insert = "INSERT INTO EnterpriseInfo(CompanyName, SocialCode) VALUES (%s, %s)"
cursor_.execute(insert, (com_name, xydm))
cnx_.commit()
tycid = ''
else:
# 数据库中并没有该企业 需要新增
xydm = social_code
tycid = ''
if tycid == None or tycid == '':
try:
retData = getTycIdByXYDM(xydm, s)
if retData['state']:
tycid = retData['tycData']['id']
else:
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', '获取天眼查id失败')
log.info(f'======={social_code}====重新放入redis====')
baseCore.rePutIntoR('shareHorder:Error', item)
continue
except:
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', '获取天眼查id失败')
baseCore.rePutIntoR('shareHorder:Error', item)
continue
log.info(f"{no}---{xydm}----{tycid}----开始采集股东信息")
try:
charge, dic_info, num = get_html(tycid, driver, dic_info)
# 页面请求三次都失败
except:
charge = -1
t = int(time.time() * 1000)
if charge == -1:
token.updateTokeen(id_cookie, 2)
# 重新塞入redis
baseCore.rePutIntoR('shareHolderInfo', item)
log.info(f"{no}---{xydm}----{tycid}----请求失败----重新放入redis")
time.sleep(3)
continue
elif charge == -2:
# 该企业没有股东信息
token.updateTokeen(id_cookie, 2)
baseCore.rePutIntoR('shareHolderInfo', item)
log.info(f"{no}---{xydm}----{tycid}----没有股东信息或需要滑动验证----重新放入redis")
time.sleep(5)
# log.info(f"{id}---{xydm}----{tycid}----没有核心人员")
continue
else:
log.info(f"{no}---{xydm}----{tycid}")
url2 = f'https://capi.tianyancha.com/cloud-company-background/companyV2/dim/holderV2'
url3 = f'https://capi.tianyancha.com/cloud-listed-company/listed/holder/topTen?&gid={tycid}&pageSize=20&pageNum=1&percentLevel=-100&type=1'
url1 = f'https://capi.tianyancha.com/cloud-listed-company/listed/holder/hk?date=&gid={tycid}&sortField=&sortType=-100&pageSize=10&pageNum=1&percentLevel=-100&keyword='
payload = {"gid": f"{tycid}", "pageSize": 10, "pageNum": 1, "sortField": "", "sortType": "-100", "historyType": 1}
try:
total_page2, data_page2 = post_page(url2, s, headers, payload)
except Exception as e:
log.info(e)
total_page2 = 0
data_page2 = {}
time.sleep(1)
try:
total_page3, data_page3 = get_page(url3, s, headers)
except:
total_page3 = 0
data_page3 = {}
try:
total_page1, data_page1 = get_page1(url1, s, headers)
except:
total_page1 = 0
data_page1 = {}
if total_page2 == charge:
url = 'https://capi.tianyancha.com/cloud-company-background/companyV2/dim/holderV2?'
total_page = total_page2
data_page_one = data_page2
flag = 1
else:
if total_page3 == charge:
url = 'https://capi.tianyancha.com/cloud-listed-company/listed/holder/topTen?_={}&gid={}&pageSize=20&pageNum={}&percentLevel=-100&type=1'
total_page = total_page3
data_page_one = data_page3
flag = 3
else:
total_page = total_page1
data_page_one = data_page1
flag = 0
if total_page == 0:
# token.updateTokeen(id_cookie, 2)
# 重新塞入redis
baseCore.rePutIntoR('shareHolderInfo', item)
log.info(f'==={social_code}=====总数请求失败===重新放入redis====')
continue
# # todo:获取页数
# total_page = 34
# flag = 2
# todo: 测试程序是否执行到这一步
log.info(f'总数为{total_page}')
data_page = data_page_one
errorCode = data_page['errorCode']
if errorCode == 0:
pass
else:
# token.updateTokeen(id_cookie, 2)
# 重新塞入redis
# baseCore.rePutIntoR('shareHolderInfo', item)
log.info(f'{no}---{xydm}----{tycid}--{data_page}--股东信息请求失败')
continue
# todo:test测试
log.info(f'{no}---{xydm}----{tycid}---')
try:
list_all = data_page['data']['holderList']
except:
try:
list_all = data_page['data']['result']
except:
list_all = data_page['data']['stockHolder']['result']
if list_all:
pass
else:
log.info(f'{no}---{xydm}----{tycid}----没有股东信息')
# todo: 关闭连接
# res.close()
log.info(f'----flag:{flag}----')
log.info(f'-----list_all:{len(list_all)}----')
shareHolderName, percent = '', ''
if flag == 1:
holder_info = list_all[0]
shareHolderName = holder_info['shareHolderName']
percent = holder_info['percent']
elif flag == 3:
holder_info = list_all[0]
shareHolderName = holder_info['name']
percent = holder_info['proportion']
else:
holder_info = list_all[0]
shareHolderName = holder_info['holder_name']
percent = holder_info['longHeldRatioWithUnit']
if shareHolderName and percent:
dic_info['最大持股名称'] = shareHolderName
dic_info['持股比例'] = percent
# todo: 更新字段
# info.update_holder(no, dic_info)
log.info('=========成功======')
token.updateTokeen(id_cookie, 3)
# time.sleep(randint(5,10))
time.sleep(5)
except Exception as e:
# 4月28日采集失败不更新封号时间,更新使用时间
token.updateTokeen(id_cookie, 3)
# token.updateTokeen(id_cookie, 2)
log.info(f'==={social_code}=====企业核心人员采集失败===重新放入redis====')
log.info(e)
# 重新塞入redis
baseCore.rePutIntoR('shareHolderInfo', item)
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', f'获取企业信息失败--{e}')
time.sleep(5)
break
# df_img = pd.DataFrame(list_all_2)
# df_img.to_excel('企业主要人员-头像.xlsx',index=False)
if __name__ == "__main__":
doJob()
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论