提交 f4a32ade 作者: LiuLiYuan

天眼查基本信息 2/27

上级 2ea9c487
# -*- coding: utf-8 -*-
import datetime
import json
import re
import time
import pymongo
import requests
from bs4 import BeautifulSoup
from kafka import KafkaProducer
import urllib3
from selenium.webdriver.support.wait import WebDriverWait
db_storage = pymongo.MongoClient('mongodb://114.115.221.202:27017/', username='admin', password='ZZsn@9988').ZZSN[
'天眼查登录信息']
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
from dateutil.relativedelta import relativedelta
import sys
# sys.path.append('D:\\KK\\zzsn_spider\\base')
# sys.path.append('D:\\kkwork\\zzsn_spider\\base')
# import BaseCore
from base import BaseCore
baseCore = BaseCore.BaseCore()
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
log = baseCore.getLogger()
from classtool import Token, File, Tag
token = Token()
file = File()
tag = Tag()
from selenium import webdriver
from selenium.webdriver.common.by import By
def create_driver():
path = r'D:\soft\msedgedriver.exe'
# options = webdriver.EdgeOptions()
options = {
"browserName": "MicrosoftEdge",
"ms:edgeOptions": {
"extensions": [], "args": ["--start-maximized"] # 添加最大化窗口运作参数
}
}
session = webdriver.Edge(executable_path=path, capabilities=options)
return session
# 发送数据
def sendkafka(post_data):
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 0, 2))
kafka_result = producer.send("enterpriseInfo", json.dumps(post_data, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
except:
exception = 'kafka传输失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', exception)
log.info(f"{com_name}--{social_code}--kafka传输失败")
def Lreputredis(company_field):
# todo: 重新放入redis
baseCore.r.lrem('BaseInfoEnterprise:gnqy_socialCode', 0, 'end')
baseCore.r.rpush('BaseInfoEnterprise:gnqy_socialCode', company_field)
baseCore.r.rpush('BaseInfoEnterprise:gnqy_socialCode', 'end')
# 检查登陆状态
def checklogin(key):
t = int(time.time())
# url = 'https://www.tianyancha.com/search?key=%E4%B8%AD%E5%9B%BD%E7%9F%B3%E6%B2%B9%E5%8C%96%E5%B7%A5%E9%9B%86%E5%9B%A2%E6%9C%89%E9%99%90%E5%85%AC%E5%8F%B8&sessionNo=1706594186.22975563'
url = f'https://www.tianyancha.com/search?key={key}&sessionNo={t}'
driver.get(url)
time.sleep(2)
page_source = driver.page_source
soup = BeautifulSoup(page_source, 'html.parser')
# todo:检查未登录状态
# if soup.find('title').text == '会员登录 - 企查查':
# log.info('状态---未登录')
# soup = ''
# return soup
return soup
# 采集准备
def redaytowork(com_name, social_code, securitiesCode, securitiesShortName, listingDate, category, exchange, listType,
ynDomestic, countryName, file_name):
log.info(f'----当前企业{social_code}-{com_name}--开始处理---')
count = 0
# 如果没有信用代码 就通过名字搜索 如果有信用代码 就通过信用代码
if social_code and 'ZZSN' not in social_code and 'ZD' not in social_code:
soup = checklogin(social_code)
else:
soup = checklogin(com_name)
if not soup:
log.info("登录失效===重新放入redis")
# baseCore.r.lpush('BaseInfoEnterprise:gnqy_socialCode', company_field)
Lreputredis(company_field)
token.updateTokeen(id_cookie, 2)
# log.info('=====已重新放入redis,失效cookies已删除======')
time.sleep(20)
return count
else:
try:
searchinfo = soup.find('div', class_='index_content-tool-title__K1Z6C').find('span',
class_='index_title-count__lDSjB').text
except:
try:
# todo:可能是搜不到该企业
errormessage = soup.find('div', class_='index_no-data-reason-title__V3gFY').text
if '抱歉' in errormessage:
log.info('=====搜索不到该企业====')
data = [com_name, social_code]
# todo:搜不到的企业需要返回到一个表格中
file.appenddata(file_name, '需处理企业', data)
return count
except:
log.info("登录失效===重新放入redis")
# baseCore.r.lpush('UpdateBasdeInfo:SocialCode_CompanyName', company_field)
Lreputredis(company_field)
token.updateTokeen(id_cookie, 2)
# log.info('=====已重新放入redis,cookies已封号======')
time.sleep(20)
return count
else:
# 开始采集
try:
if spiderwork(soup, com_name, securitiesCode, securitiesShortName, listingDate, category, exchange,
listType, ynDomestic, countryName, file_name):
count += 1
log.info(f'采集{com_name}成功=======耗时{baseCore.getTimeCost(start_time, time.time())}')
token.updateTokeen(id_cookie, 3)
return count
else:
return count
except Exception as e:
log.info(f'====={social_code}=====获取基本信息失败,重新放入redis=====')
# baseCore.r.lpush('BaseInfoEnterprise:gnqy_socialCode', company_field)
Lreputredis(company_field)
token.updateTokeen(id_cookie, 2)
log.info('=====已重新放入redis,cookies已封号======')
return count
def ifbeforename(company_url):
driver.get(company_url)
time.sleep(2)
com_soup = BeautifulSoup(driver.page_source, 'html.parser')
try:
businessinfo = com_soup.find('table', {'class': 'index_tableBox__ZadJW'})
except:
businessinfo = ''
if businessinfo:
try:
name = businessinfo.find('span', class_='index_history-gray-tags__o8mkl').text
value = \
businessinfo.find('span', class_='index_copy-text__ri7W6').text.replace('展开', '').replace(' ',
'').replace(
'…', '').replace('\n', '').replace('复制', '').split('(')[0]
except:
name = '曾用名'
value = ''
return value
else:
return ''
# 采集基本信息和工商信息
def spiderinfo(company_url, securitiesCode, securitiesShortName, listingDate, category, exchange, listType, ynDomestic,
countryName, file_name):
qccid = company_url.split('company/')[1]
log.info(f'====={qccid}=====')
driver.get(company_url)
page_source_detail = driver.page_source
com_soup = BeautifulSoup(page_source_detail, 'html.parser')
script = com_soup.find('script', attrs={'id': '__NEXT_DATA__'}).text
script = json.loads(script)
script = script['props']['pageProps']['dehydratedState']['queries'][0]['state']['data']['data']
companyName = script['name']
updateTime = int(script['updateTimes'])
updateTime = datetime.datetime.fromtimestamp(updateTime / 1000).strftime('%Y-%m-%d %H:%M:%S')
creditCode = script['creditCode']
operName = script['legalPersonName']
phoneNumber = script['phoneNumber']
webSite = script['websiteList']
try:
email = script['emailList'][0]
except:
email = None
desc = script['baseInfo']
status = script['regStatus']
startDate = int(script['estiblishTime'])
startDate = datetime.datetime.fromtimestamp(startDate / 1000).strftime('%Y-%m-%d %H:%M:%S')
registCapi = script['regCapital']
recCap = script['actualCapital']
checkDate = int(script['approvedTime'])
checkDate = datetime.datetime.fromtimestamp(checkDate / 1000).strftime('%Y-%m-%d %H:%M:%S')
orgNo = script['orgNumber']
No = script['regNumber']
taxpayerNo = script['taxNumber']
econKind = script['companyOrgType']
termStart = int(script['fromTime'])
termStart = datetime.datetime.fromtimestamp(termStart / 1000).strftime('%Y-%m-%d %H:%M:%S')
termEnd = script['toTime']
termEnd = datetime.datetime.fromtimestamp(termEnd / 1000).strftime('%Y-%m-%d %H:%M:%S')
taxpayerType = script['taxQualification']
subIndustry = script['industryInfo']['nameLevel3']
belogOrg = script['regInstitute']
info = script['staffNumRange']
canbao = script['socialStaffNum']
try:
originalName = script['historyNames']
originalName = originalName.split('\n')[0]
except:
originalName = None
englishName = script['property3']
address = script['taxAddress']
scope = script['businessScope']
aa_dic = {
'name': companyName, # 企业名称
'shortName': None, # 企业简称
'socialCreditCode': creditCode, # 统一社会信用代码
'legalPerson': operName, # 法定代表人
'officialPhone': phoneNumber, # 电话
'officialUrl': webSite, # 官网
'officialEmail': email, # 邮箱
'briefInfo': desc, # 简介
'registerStatus': status, # 登记状态
'incorporationDate': startDate, # 成立日期
'capital': registCapi, # 注册资本
'paidCapital': recCap, # 实缴资本
'approvalDate': checkDate, # 核准日期
'organizationCode': orgNo, # 组织机构代码
'registerNo': No, # 工商注册号
'taxpayerNo': taxpayerNo, # 纳税人识别号
'type': econKind, # 企业类型
'businessStartDate': termStart, # 营业期限自
'businessEndDate': termEnd, # 营业期限至
'taxpayerQualification': taxpayerType, # 纳税人资质
'industry': subIndustry, # 所属行业
'region': None,
'province': None, # 所属省
'city': None, # 所属市
'county': None, # 所属县
'registerDepartment': belogOrg, # 登记机关
'scale': info, # 人员规模
'insured': canbao, # 参保人数
'beforeName': originalName, # 曾用名
'englishName': englishName, # 英文名
'importExportEnterpriseCode': None, # 进出口企业代码
'address': address, # 地址
'businessRange': scope, # 经营范围
'status': 0, # 状态
'sourceUpdateTime': updateTime, # 更新时间
'qccId': qccid,
'ynDomestic': ynDomestic,
'countryName': countryName,
'securitiesCode': securitiesCode,
'securitiesShortName': securitiesShortName,
'listingDate': listingDate,
'category': category,
'exchange': exchange,
'listingType': listType,
}
for key, value in aa_dic.items():
if value == 'None':
aa_dic[key] = None
# 发送kafka
# sendkafka(aa_dic)
def remove_parentheses(text):
# 清除中文小括号
text = re.sub(r'(|)', '', text)
# 清除英文小括号
text = re.sub(r'\(|\)', '', text)
return text.replace(' ', '')
# 判断名称是否统一
def spiderwork(soup, receptname, securitiesCode, securitiesShortName, listingDate, category, exchange, listType,
ynDomestic, countryName, file_name):
company_url = ''
try:
company_list = soup.find_all('div', class_='index_search-box__7YVh6')
except:
log.info(f'====={social_code}=====获取基本信息失败,重新放入redis=====')
# baseCore.r.lpush('BaseInfoEnterprise:gnqy_socialCode', company_field)
Lreputredis(company_field)
token.updateTokeen(id_cookie, 2)
log.info('=====已重新放入redis,cookies已封号======')
return False
# receptname = '小米通讯技术有限公司'
for compamy in company_list:
info_t = compamy.find('div', class_='index_name__qEdWi')
getname = info_t.find('span').text
log.info(f'接收到的企业名称--{receptname}---采到的企业名称--{getname}')
if receptname and getname == receptname:
company_url = info_t.find('a')['href']
break
elif not receptname:
company_url = info_t.find('a')['href']
break
else:
jian_name = remove_parentheses(baseCore.hant_2_hans(getname))
if remove_parentheses(receptname) == jian_name:
log.info(f'接收到的企业名称--{receptname}---转化成简体字的企业名称--{jian_name}')
company_url = info_t.find('a')['href']
break
else:
continue
if company_url:
# company_url = 'https://www.qcc.com/firm/80af5085726bb6b9c7770f1e4d0580f4.html'
# company_url = 'https://www.qcc.com/firm/50f75e8a8859e609ec37976f8abe827d.html'
# 采集基本信息和工商信息
spiderinfo(company_url, securitiesCode, securitiesShortName, listingDate, category, exchange, listType,
ynDomestic, countryName, file_name)
else:
# 判断是否是曾用名
getname = ''
for child in company_list[0].find_all():
if child.has_attr('class'):
print(child['class'])
if 'index_name' in child['class'][0]:
getname = child.text
company_url = child.find('a')['href']
break
# tr = company_list[:1][0]
# info_t = tr.find('div', class_='index_name__qEdWi')
# getname = info_t.find('span').text
if getname:
log.info(f'------可能是曾用名------接收到的企业名称--{receptname}---采到的企业名称--{getname}')
beforename = ifbeforename(company_url)
if beforename == receptname:
spiderinfo(company_url, securitiesCode, securitiesShortName, listingDate, category, exchange, listType,
ynDomestic, countryName, file_name)
else:
# 没有搜到相同的企业名称
data = [com_name, social_code]
file.appenddata(file_name, '需处理企业', data)
time.sleep(2)
return False
else:
# 没有搜到相同的企业名称
data = [com_name, social_code]
file.appenddata(file_name, '需处理企业', data)
time.sleep(2)
return False
return True
def login():
# time.sleep(10)
cookies_list, id_cookie, user_name = token.get_cookies()
log.info(f'=====当前使用的是{user_name}的cookie======')
for cookie in cookies_list:
driver.add_cookie(cookie)
time.sleep(5)
driver.refresh()
# url_test = 'https://www.qcc.com/firm/a5f5bb3776867b3e273cd034d6fb4baa.html'
# driver.get(url_test)
# # driver.get('https://www.qcc.com/')
time.sleep(5)
return driver, id_cookie
if __name__ == '__main__':
taskType = '基本信息/天眼查'
# driver, id_cookie = login()
driver = create_driver()
url = 'https://www.tianyancha.com/'
driver.get(url)
driver.maximize_window()
while True:
driver, id_cookie = login()
nowtime = baseCore.getNowTime(1).replace('-', '')[:8]
file_name = f'./data/国内企业基本信息采集情况.xlsx'
file.createFile(file_name)
headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Cache-Control': 'max-age=0',
'Connection': 'keep-alive',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Cookie': 'TYCID=6f6298905d3011ee96146793e725899d; ssuid=3467188160; _ga=GA1.2.1049062268.1697190322; HWWAFSESID=2eb035742bde209aa60; HWWAFSESTIME=1706586308439; csrfToken=bT_looAjInHGeAnvjjl12L9v; bannerFlag=true; jsid=SEO-BAIDU-ALL-SY-000001; bdHomeCount=0; tyc-user-phone=%255B%252216603863075%2522%252C%2522152%25203756%25200528%2522%252C%2522159%25200367%25203315%2522%255D; sensorsdata2015jssdkcross=%7B%22distinct_id%22%3A%22310689501%22%2C%22first_id%22%3A%2218ad696a2ef680-0ae5cd9293a1538-26031f51-921600-18ad696a2f0dc5%22%2C%22props%22%3A%7B%22%24latest_traffic_source_type%22%3A%22%E7%9B%B4%E6%8E%A5%E6%B5%81%E9%87%8F%22%2C%22%24latest_search_keyword%22%3A%22%E6%9C%AA%E5%8F%96%E5%88%B0%E5%80%BC_%E7%9B%B4%E6%8E%A5%E6%89%93%E5%BC%80%22%2C%22%24latest_referrer%22%3A%22%22%7D%2C%22identities%22%3A%22eyIkaWRlbnRpdHlfY29va2llX2lkIjoiMThhZDY5NmEyZWY2ODAtMGFlNWNkOTI5M2ExNTM4LTI2MDMxZjUxLTkyMTYwMC0xOGFkNjk2YTJmMGRjNSIsIiRpZGVudGl0eV9sb2dpbl9pZCI6IjMxMDY4OTUwMSJ9%22%2C%22history_login_id%22%3A%7B%22name%22%3A%22%24identity_login_id%22%2C%22value%22%3A%22310689501%22%7D%2C%22%24device_id%22%3A%2218ad696a2ef680-0ae5cd9293a1538-26031f51-921600-18ad696a2f0dc5%22%7D; tyc-user-info=%7B%22state%22%3A%220%22%2C%22vipManager%22%3A%220%22%2C%22mobile%22%3A%2218703752600%22%2C%22userId%22%3A%22310689501%22%7D; tyc-user-info-save-time=1707008605562; auth_token=eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiIxODcwMzc1MjYwMCIsImlhdCI6MTcwNzAwODYwNSwiZXhwIjoxNzA5NjAwNjA1fQ.i8WEUrXjG2X__SnGGlnjwNXyOEdXlslrnvzvKZ_xlVA0rdjdsYHdaieAzkmIjoKbuv6Lc4Eqpb70hWIlq2zeoQ; Hm_lvt_e92c8d65d92d534b0fc290df538b4758=1705286979,1706586312; searchSessionId=1707118324.99879267;'
}
# cookies_list, id_cookie = token.get_cookies()
# cookies = {}
# for cookie in cookies_list:
# cookies[cookie['name']] = cookie['value']
# s = requests.Session()
# s.cookies.update(cookies)
start_time = time.time()
# 获取企业信息
# company_field = baseCore.redicPullData('BaseInfoEnterprise:gnqy_socialCode')
company_field = '|北京华信瑞德信息技术有限公司|北京华信瑞德信息技术有限公司|||||||||||||1|中国内地|||||||'
if company_field == 'end':
# 本轮处理完毕,需要发送邮件,并且进入下一轮
baseCore.sendEmail(file_name)
time.sleep(20)
file.deleteFile(file_name)
continue
if company_field == '' or company_field is None:
# 本轮结束后没有新增的企业要采集
file.deleteFile(file_name)
flag = True
while flag:
log.info('--------已没有数据---------')
time.sleep(30)
if not baseCore.check_mysql_conn(cnx_):
# 144数据库
cnx_ = baseCore.cnx
cursor_ = cnx_.cursor()
log.info('===11数据库重新连接成功===')
company_field = baseCore.redicPullData('BaseInfoEnterprise:gnqy_socialCode')
if company_field:
flag = False
log.info("-----已添加数据------")
# baseCore.r.lpush('BaseInfoEnterprise:gnqy_socialCode', company_field)
Lreputredis(company_field)
continue
continue
# company_field_ = f'|{company_field}'
social_code = company_field.split('|')[0]
com_name = company_field.split('|')[2].replace(' ', '')
ynDomestic = company_field.split('|')[15]
countryName = company_field.split('|')[16]
securitiesCode = company_field.split('|')[17]
securitiesShortName = company_field.split('|')[18]
listingDate = company_field.split('|')[21]
category = company_field.split('|')[19]
exchange = company_field.split('|')[20]
listType = company_field.split('|')[21]
# ynDomestic = None
# countryName = None
# securitiesCode = None
# securitiesShortName = None
# listingDate = None
# category = None
# exchange = None
# listType = None
count = redaytowork(com_name, social_code, securitiesCode, securitiesShortName, listingDate, category, exchange,
listType, ynDomestic, countryName, file_name)
time.sleep(10)
break
# baseCore.r.close()
# baseCore.sendEmail(file_name)
# 信息采集完成后将该企业的采集次数更新
# runType = 'BaseInfoRunCount'
# baseCore.updateRun(social_code, runType, count)
# break
baseCore.close()
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论