提交 0227ff97 作者: 薛凌堃

11/15

上级 bed1863a
...@@ -286,6 +286,20 @@ class BaseCore: ...@@ -286,6 +286,20 @@ class BaseCore:
charset='utf8mb4' charset='utf8mb4'
) )
self.pool_11 = PooledDB(
creator=pymysql,
maxconnections=5,
mincached=2,
maxcached=5,
blocking=True,
host='114.116.44.11',
port=3306,
user='caiji',
password='f7s0&7qqtK',
database='clb_project',
charset='utf8mb4'
)
def check_mysql_conn(self,conn): def check_mysql_conn(self,conn):
try: try:
conn.ping() conn.ping()
...@@ -864,7 +878,7 @@ class BaseCore: ...@@ -864,7 +878,7 @@ class BaseCore:
# 发送邮箱地址 # 发送邮箱地址
sender = '1195236739@qq.com' sender = '1195236739@qq.com'
# 接收邮箱地址 # 接收邮箱地址
receiver = '1074481431@qq.com' receiver = '1007765445@qq.com'
smtpserver = 'smtp.qq.com' smtpserver = 'smtp.qq.com'
# 发送邮箱登录 账户 密码 # 发送邮箱登录 账户 密码
username = '1195236739@qq.com' username = '1195236739@qq.com'
......
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import json import json
import os.path
import openpyxl import openpyxl
import re import re
...@@ -23,20 +24,30 @@ urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) ...@@ -23,20 +24,30 @@ urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
from openpyxl import Workbook, load_workbook from openpyxl import Workbook, load_workbook
#创建文件 # 创建文件
def createFile(file_name): def createFile(file_name):
wb = Workbook() if os.path.exists(file_name):
sheet = wb.active return
# 更改默认的sheet名称 else:
sheet.title = "需处理企业" wb = Workbook()
sheet.append(["企业名称", "社会信用代码"]) sheet = wb.active
# 创建另一个sheet # 更改默认的sheet名称
sheet2 = wb.create_sheet("获取基本信息成功企业") sheet.title = "需处理企业"
sheet2.append(["企业名称", "社会信用代码", "采到的信用代码"]) sheet.append(["企业名称", "社会信用代码"])
wb.save(file_name) # 创建另一个sheet
wb.close() sheet2 = wb.create_sheet("获取基本信息成功企业")
sheet2.append(["企业名称", "社会信用代码", "采到的信用代码"])
#追加数据 wb.save(file_name)
wb.close()
# 删除文件
def deleteFile(file_name):
if os.path.exists(file_name):
os.remove(file_name)
else:
pass
# 追加数据
def appenddata(file_name,sheet,data): def appenddata(file_name,sheet,data):
# 打开现有的Excel文件 # 打开现有的Excel文件
wb = load_workbook(file_name) wb = load_workbook(file_name)
...@@ -48,10 +59,11 @@ def appenddata(file_name,sheet,data): ...@@ -48,10 +59,11 @@ def appenddata(file_name,sheet,data):
wb.save(file_name) wb.save(file_name)
wb.close() wb.close()
# 发送数据
def sendkafka(post_data): def sendkafka(post_data):
try: try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 0, 2)) producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 0, 2))
kafka_result = producer.send("regionInfo", json.dumps(post_data, ensure_ascii=False).encode('utf8')) kafka_result = producer.send("enterpriseInfo", json.dumps(post_data, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10)) print(kafka_result.get(timeout=10))
except: except:
exception = 'kafka传输失败' exception = 'kafka传输失败'
...@@ -60,6 +72,7 @@ def sendkafka(post_data): ...@@ -60,6 +72,7 @@ def sendkafka(post_data):
baseCore.recordLog(social_code, taskType, state, takeTime, '', exception) baseCore.recordLog(social_code, taskType, state, takeTime, '', exception)
log.info(f"{com_name}--{social_code}--kafka传输失败") log.info(f"{com_name}--{social_code}--kafka传输失败")
# 删除特定属性标签
def deletep(soup,tag_,attribute_to_delete,value_to_delete): def deletep(soup,tag_,attribute_to_delete,value_to_delete):
if attribute_to_delete and value_to_delete: if attribute_to_delete and value_to_delete:
# 查找带有指定属性的P标签并删除 # 查找带有指定属性的P标签并删除
...@@ -73,6 +86,7 @@ def deletep(soup,tag_,attribute_to_delete,value_to_delete): ...@@ -73,6 +86,7 @@ def deletep(soup,tag_,attribute_to_delete,value_to_delete):
# print(tag) # print(tag)
tag.decompose() tag.decompose()
# 删除空标签
def deletek(soup): def deletek(soup):
# 删除空白标签(例如<p></p>、<p><br></p>, img、video、hr除外) # 删除空白标签(例如<p></p>、<p><br></p>, img、video、hr除外)
for i in soup.find_all(lambda tag: len(tag.get_text()) == 0 and tag.name not in ["img", "video", "br"] and tag.name != "br" or tag.get_text()==' 'or tag.get_text()==' '): for i in soup.find_all(lambda tag: len(tag.get_text()) == 0 and tag.name not in ["img", "video", "br"] and tag.name != "br" or tag.get_text()==' 'or tag.get_text()==' '):
...@@ -82,6 +96,7 @@ def deletek(soup): ...@@ -82,6 +96,7 @@ def deletek(soup):
else: else:
i.decompose() i.decompose()
# 删除span标签
def deletespan(td): def deletespan(td):
spans = td.find_all('span', class_='app-copy copy-button-item') spans = td.find_all('span', class_='app-copy copy-button-item')
for span in spans: for span in spans:
...@@ -96,7 +111,11 @@ def deletespan(td): ...@@ -96,7 +111,11 @@ def deletespan(td):
for span3 in spans3: for span3 in spans3:
if '年报' in span3.text: if '年报' in span3.text:
span3.extract() span3.extract()
spans4 = td.find_all('span',class_='text-span')
for span4 in spans4:
span4.extract()
# 合并基本信息和工商信息字段
def getinfo(dict1,dict2): def getinfo(dict1,dict2):
# 取出两个字典的key值集合 # 取出两个字典的key值集合
keys1 = set(dict1.keys()) keys1 = set(dict1.keys())
...@@ -108,6 +127,7 @@ def getinfo(dict1,dict2): ...@@ -108,6 +127,7 @@ def getinfo(dict1,dict2):
result_dict = {key: dict1.get(key, None) or dict2.get(key, None) for key in union_keys} result_dict = {key: dict1.get(key, None) or dict2.get(key, None) for key in union_keys}
return result_dict return result_dict
# 获取基本信息
def baseinfo(com_soup): def baseinfo(com_soup):
baseinfo = com_soup.find('div', class_='contact-info') baseinfo = com_soup.find('div', class_='contact-info')
cominfo_list = baseinfo.find_all('span', class_='f') cominfo_list = baseinfo.find_all('span', class_='f')
...@@ -132,6 +152,7 @@ def baseinfo(com_soup): ...@@ -132,6 +152,7 @@ def baseinfo(com_soup):
data[name] = value data[name] = value
return data return data
# 检查登陆状态
def checklogin(key): def checklogin(key):
# url = f'https://www.qcc.com/web/search?key=91110108558521630L' # url = f'https://www.qcc.com/web/search?key=91110108558521630L'
...@@ -144,19 +165,225 @@ def checklogin(key): ...@@ -144,19 +165,225 @@ def checklogin(key):
return soup return soup
return soup return soup
def redaytowork(com_name,social_code): # 处理要发送的字段
if social_code: def dic_handle(result_dic):
dic_info = baseCore.getInfomation(social_code) zxss = ['北京市', '天津市', '上海市', '重庆市']
elif not social_code: try:
dic_info = baseCore.getBYnameInfomation(com_name) company_name = result_dic['企业名称']
else: except:
dic_info = '' company_name = ''
if dic_info:
pass try:
CreditCode = result_dic['统一社会信用代码']
except:
CreditCode = ''
try:
OperName = result_dic['法定代表人']
except:
OperName = ''
try:
PhoneNumber = result_dic['电话']
except:
PhoneNumber = ''
try:
WebSite = result_dic['官网']
except:
WebSite = ''
try:
Email = result_dic['邮箱']
except:
Email = ''
try:
Desc = result_dic['简介']
except:
Desc = ''
try:
Status = result_dic['登记状态']
except:
Status = ''
try:
StartDate = result_dic['成立日期']
except:
StartDate = ''
try:
RecCap = result_dic['实缴资本']
except:
RecCap = ''
try:
RegistCapi = result_dic['注册资本']
except:
RegistCapi = ''
try:
CheckDate = result_dic['核准日期']
except:
CheckDate = ''
try:
OrgNo = result_dic['组织机构代码']
except:
OrgNo = ''
try:
No = result_dic['工商注册号']
except:
No = ''
try:
taxpayerNo = result_dic['纳税人识别号']
except:
taxpayerNo = ''
try:
EconKind = result_dic['企业类型']
except:
EconKind = ''
try:
TermStart = result_dic['营业期限'].split('至')[0]
except:
TermStart = ''
try:
TeamEnd = result_dic['营业期限'].split('至')[1]
except:
TeamEnd = ''
try:
TaxpayerType = result_dic['纳税人资质']
except:
TaxpayerType = ''
try:
SubIndustry = result_dic['国标行业']
except:
SubIndustry = ''
try:
region = result_dic['所属地区']
except:
region = ''
try:
pattern = r'^(.*?省|.*?自治区)?(.*?市|.*?自治州)?(.*?区|.*?县|.*?自治县|.*?市辖区)?(.*?区|.*?县|.*?自治县|.*?市辖区)?$'
matches = re.match(pattern, region)
Province = matches.group(1)
City = matches.group(2)
County = matches.group(3)
if Province is None:
for zxs in zxss:
if zxs in region:
Province = zxs
break
except:
Province = ''
City = ''
County = ''
try:
BelongOrg = result_dic['登记机关']
except:
BelongOrg = ''
try:
Info = result_dic['人员规模']
except:
Info = ''
try:
can_bao = result_dic['参保人数']
except:
can_bao = ''
try:
OriginalName = result_dic['曾用名']
except:
OriginalName = ''
try:
EnglishName = result_dic['英文名']
except:
EnglishName = ''
try:
IxCode = result_dic['进出口企业代码']
except:
IxCode = ''
try:
Address = result_dic['地址']
except:
Address = ''
try:
Scope = result_dic['经营范围']
except:
Scope = ''
aa_dict = {
'name': company_name, # 企业名称
'shortName': '', # 企业简称
'socialCreditCode': CreditCode, # 统一社会信用代码
'legalPerson': OperName, # 法定代表人
'officialPhone': PhoneNumber, # 电话
'officialUrl': WebSite, # 官网
'officialEmail': Email, # 邮箱
'briefInfo': Desc, # 简介
'registerStatus': Status, # 登记状态
'incorporationDate': StartDate, # 成立日期
'capital': RegistCapi, # 注册资本
'paidCapital': RecCap, # 实缴资本
'approvalDate': CheckDate, # 核准日期
'organizationCode': OrgNo, # 组织机构代码
'registerNo': No, # 工商注册号
'taxpayerNo': taxpayerNo, # 纳税人识别号
'type': EconKind, # 企业类型
'businessStartDate': TermStart, # 营业期限自
'businessEndDate': TeamEnd, # 营业期限至
'taxpayerQualification': TaxpayerType, # 纳税人资质
'industry': SubIndustry, # 所属行业
'region': region,
'province': Province, # 所属省
'city': City, # 所属市
'county': County, # 所属县
'registerDepartment': BelongOrg, # 登记机关
'scale': Info, # 人员规模
'insured': can_bao, # 参保人数
'beforeName': OriginalName, # 曾用名
'englishName': EnglishName, # 英文名
'importExportEnterpriseCode': IxCode, # 进出口企业代码
'address': Address, # 地址
'businessRange': Scope, # 经营范围
'status': 0, # 状态
}
return aa_dict
# 采集准备
def redaytowork(com_name,social_code,securitiesCode, securitiesShortName, listingDate, category, exchange, ynDomestic, countryName, file_name):
# if social_code:
# dic_info = baseCore.getInfomation(social_code)
# elif not social_code:
# dic_info = baseCore.getBYnameInfomation(com_name)
# else:
# dic_info = ''
# if dic_info:
# pass
log.info(f'----当前企业{social_code}-{com_name}--开始处理---') log.info(f'----当前企业{social_code}-{com_name}--开始处理---')
count = dic_info[14] # count = dic_info[14]
count = 0
# 企查查id # 企查查id
company_id = dic_info[12] # company_id = dic_info[12]
# 如果没有信用代码 就通过名字搜索 如果有信用代码 就通过信用代码 # 如果没有信用代码 就通过名字搜索 如果有信用代码 就通过信用代码
if social_code: if social_code:
soup = checklogin(social_code) soup = checklogin(social_code)
...@@ -164,7 +391,7 @@ def redaytowork(com_name,social_code): ...@@ -164,7 +391,7 @@ def redaytowork(com_name,social_code):
soup = checklogin(com_name) soup = checklogin(com_name)
if not soup: if not soup:
log.info("登录失效===重新放入redis") log.info("登录失效===重新放入redis")
baseCore.rePutIntoR('BaseInfoEnterprise:gnqy_socialCode', company_field) baseCore.r.lpush('BaseInfoEnterprise:gnqy_socialCode', company_field)
# baseCore.delete_token(token) # baseCore.delete_token(token)
log.info('=====已重新放入redis,失效token已删除======') log.info('=====已重新放入redis,失效token已删除======')
time.sleep(20) time.sleep(20)
...@@ -180,29 +407,177 @@ def redaytowork(com_name,social_code): ...@@ -180,29 +407,177 @@ def redaytowork(com_name,social_code):
else: else:
# 开始采集 # 开始采集
try: try:
spiderwork(soup, com_name) if spiderwork(soup, com_name, securitiesCode, securitiesShortName, listingDate, category, exchange, ynDomestic, countryName, file_name):
count += 1 count += 1
log.info(f'采集{com_name}成功=======耗时{baseCore.getTimeCost(start_time, time.time())}') log.info(f'采集{com_name}成功=======耗时{baseCore.getTimeCost(start_time, time.time())}')
return count return count
except: else:
return count
except Exception as e:
log.info(f'====={social_code}=====获取基本信息失败,重新放入redis=====') log.info(f'====={social_code}=====获取基本信息失败,重新放入redis=====')
baseCore.rePutIntoR('BaseInfoEnterprise:gnqy_social_code', social_code) baseCore.r.lpush('BaseInfoEnterprise:gnqy_social_code', company_field)
# baseCore.delete_token(token) # baseCore.delete_token(token)
log.info('=====已重新放入redis,失效token已删除======') log.info('=====已重新放入redis,失效token已删除======')
return count return count
def spiderwork(soup,receptname): def ifbeforename(company_url):
req_ = requests.get(headers=headers, url=company_url)
com_soup = BeautifulSoup(req_.content, 'html.parser')
try:
businessinfo = com_soup.find('div', class_='cominfo-normal')
except:
businessinfo = ''
if businessinfo:
try:
name = businessinfo.find('div', class_='ntag text-gray original-tag').text
value = businessinfo.find('div', class_='original-name-list').text.replace('展开', '').replace(' ', '').replace('…','').replace('\n', '').replace('复制', '').split('(')[0]
except:
name = '曾用名'
value = ''
return value
else:
return ''
# 采集基本信息和工商信息
def spiderinfo(company_url, securitiesCode, securitiesShortName, listingDate, category, exchange, ynDomestic, countryName, file_name):
qccid = company_url.split('firm/')[1].split('.html')[0]
# 将采集到的企查查id更新
updateSql = f"update EnterpriseInfo set QCCID = '{qccid}' where SocialCode = '{social_code}'"
cursor_.execute(updateSql)
cnx_.commit()
req_ = requests.get(headers=headers, url=company_url)
com_soup = BeautifulSoup(req_.content, 'html.parser')
try:
businessinfo = com_soup.find('div', class_='cominfo-normal')
except:
businessinfo = ''
if businessinfo:
data_businfo = {}
data_baseinfo = baseinfo(com_soup)
# print(data_baseinfo)
try:
name = businessinfo.find('div', class_='ntag text-gray original-tag').text
value = \
businessinfo.find('div', class_='original-name-list').text.replace('展开', '').replace(' ',
'').replace(
'…',
'').replace(
'\n', '').replace('复制', '').split('(')[0]
except:
name = '曾用名'
value = ''
data_businfo[name] = value
td_tags = businessinfo.find_all('td')
# print(td_tags)
for td in td_tags:
if 'class' in td.attrs and 'tb' in td['class']:
div_tags = td.find_all('div')
texts = [div.text for div in div_tags]
if len(texts) > 0:
for text in texts[::-1]:
data_businfo[text.replace('复制', '').replace('\n', '').strip(' ')] = None
else:
data_businfo[td.text.replace('复制', '').replace('\n', '').strip(' ')] = None
else:
# 没有class='tb'属性的标签
att_list = ['inline-block', 'ntag-v2', 'm-l-r-10', 'm-l-sm']
for att in att_list:
deletep(td, 'a', 'class', att)
deletek(td)
deletep(td, 'div', 'class', 'text-gray clearfix original-name-part')
deletespan(td)
# if len(result_dict) <= len(td_tags) // 2:
div_tags = td.find_all('div')
texts = [div.text for div in div_tags if len(div.attrs) == 0]
if len(texts) > 0:
i = 1
for text in texts:
if text == ' ':
continue
data_businfo[list(data_businfo.keys())[-i]] = text.replace('复制', '').replace('\n',
'').replace(
' ', '')
i += 1
else:
if '实缴资本' in td.text:
# pattern = r"\d+万美元"
# match = re.search(pattern, td.text.replace('复制', '').replace('\n', '').replace(' ', ''))
# if match:
# value = match.group()
value = td.text.replace('复制', '').replace('\n', '').replace(' ', '').split('实缴资本')[0]
data_businfo[list(data_businfo.keys())[-1]] = value
else:
data_businfo[list(data_businfo.keys())[-1]] = td.text.replace('复制', '').replace('\n',
'').replace(
' ',
'')
result_dict = getinfo(data_businfo, data_baseinfo)
# print(result_dict)
# 采集成功的企业
data = [com_name, social_code, result_dict['统一社会信用代码']]
appenddata(file_name, '获取基本信息成功企业', data)
# 将字段转化成英文驼峰
aa_dic = dic_handle(result_dict)
aa_dic['qccId'] = qccid
aa_dic['ynDomestic'] = ynDomestic
aa_dic['countryName'] = countryName
aa_dic['securitiesCode'] = securitiesCode
aa_dic['securitiesShortName'] = securitiesShortName
aa_dic['listingDate'] = listingDate
aa_dic['category'] = category
aa_dic['exchange'] = exchange
print(aa_dic)
sendkafka(aa_dic)
else:
data_baseinfo = baseinfo(com_soup)
# 采集成功的企业
data = [com_name, social_code, data_baseinfo['统一社会信用代码']]
appenddata(file_name, '获取基本信息成功企业', data)
# 将字段转化成英文驼峰
aa_dic = dic_handle(data_baseinfo)
aa_dic['qccId'] = qccid
aa_dic['ynDomestic'] = ynDomestic
aa_dic['countryName'] = countryName
aa_dic['securitiesCode'] = securitiesCode
aa_dic['securitiesShortName'] = securitiesShortName
aa_dic['listingDate'] = listingDate
aa_dic['category'] = category
aa_dic['exchange'] = exchange
sendkafka(aa_dic)
# 判断名称是否统一
def spiderwork(soup, receptname, securitiesCode, securitiesShortName, listingDate, category, exchange, ynDomestic, countryName, file_name):
company_url = '' company_url = ''
company_list = soup.find('table',class_='app-ltable ntable ntable-list ntable ntable-list') try:
tr_list = company_list.find_all('tr',class_='tsd0') company_list = soup.find('table', class_='app-ltable ntable ntable-list ntable ntable-list')
tr_list = company_list.find_all('tr', class_='tsd0')
except:
log.info(f'====={social_code}=====获取基本信息失败,重新放入redis=====')
baseCore.r.lpush('BaseInfoEnterprise:gnqy_social_code', company_field)
# baseCore.delete_token(token)
log.info('=====已重新放入redis,失效token已删除======')
return False
# receptname = '小米通讯技术有限公司' # receptname = '小米通讯技术有限公司'
for tr in tr_list: for tr in tr_list:
info_t = tr.find('span',class_='copy-title') info_t = tr.find('span',class_='copy-title')
getname = info_t.find('span').text getname = info_t.find('span').text
log.info(f'接收到的企业名称--{com_name}---采到的企业名称--{getname}') log.info(f'接收到的企业名称--{receptname}---采到的企业名称--{getname}')
if getname == receptname:
if receptname and getname == receptname:
company_url = info_t.find('a')['href']
break
elif not receptname:
company_url = info_t.find('a')['href'] company_url = info_t.find('a')['href']
break break
else: else:
...@@ -210,95 +585,33 @@ def spiderwork(soup,receptname): ...@@ -210,95 +585,33 @@ def spiderwork(soup,receptname):
if company_url: if company_url:
# company_url = 'https://www.qcc.com/firm/80af5085726bb6b9c7770f1e4d0580f4.html' # company_url = 'https://www.qcc.com/firm/80af5085726bb6b9c7770f1e4d0580f4.html'
# company_url = 'https://www.qcc.com/firm/50f75e8a8859e609ec37976f8abe827d.html' # company_url = 'https://www.qcc.com/firm/50f75e8a8859e609ec37976f8abe827d.html'
qccid = company_url.split('firm/')[1].split('.html')[0] # 采集基本信息和工商信息
#将采集到的企查查id更新 spiderinfo(company_url, securitiesCode, securitiesShortName, listingDate, category, exchange, ynDomestic, countryName, file_name)
updateSql = f"update EnterpriseInfo set QCCID = '{qccid}' where SocialCode = '{social_code}'"
cursor_.execute(updateSql)
cnx_.commit()
req_ = requests.get(headers=headers,url=company_url)
com_soup = BeautifulSoup(req_.content,'html.parser')
try:
businessinfo = com_soup.find('div', class_='cominfo-normal')
except:
businessinfo = ''
if businessinfo:
data_businfo = {}
data_baseinfo = baseinfo(com_soup)
# print(data_baseinfo)
try:
name = businessinfo.find('div', class_='ntag text-gray original-tag').text
value = businessinfo.find('div', class_='original-name-list').text.replace('展开', '').replace(' ', '').replace('…', '').replace('\n', '').replace('复制', '')
except:
name = '曾用名'
value = ''
data_businfo[name] = value
td_tags = businessinfo.find_all('td')
# print(td_tags)
for td in td_tags:
if 'class' in td.attrs and 'tb' in td['class']:
div_tags = td.find_all('div')
texts = [div.text for div in div_tags]
if len(texts) > 0:
for text in texts[::-1]:
data_businfo[text.replace('复制', '').replace('\n', '').strip(' ')] = None
else:
data_businfo[td.text.replace('复制', '').replace('\n', '').strip(' ')] = None
else:
# 没有class='tb'属性的标签
att_list = ['inline-block', 'ntag-v2', 'm-l-r-10', 'm-l-sm']
for att in att_list:
deletep(td, 'a', 'class', att)
deletek(td)
deletep(td,'div','class','text-gray clearfix original-name-part')
deletespan(td)
# if len(result_dict) <= len(td_tags) // 2:
div_tags = td.find_all('div')
texts = [div.text for div in div_tags if len(div.attrs) == 0]
if len(texts) > 0:
i = 1
for text in texts:
if text == ' ':
continue
data_businfo[list(data_businfo.keys())[-i]] = text.replace('复制', '').replace('\n', '').replace(' ','')
i += 1
else:
if '实缴资本' in td.text:
# pattern = r"\d+万美元"
# match = re.search(pattern, td.text.replace('复制', '').replace('\n', '').replace(' ', ''))
# if match:
# value = match.group()
value = td.text.replace('复制', '').replace('\n', '').replace(' ', '').split('实缴资本')[0]
data_businfo[list(data_businfo.keys())[-1]] = value
else:
data_businfo[list(data_businfo.keys())[-1]] = td.text.replace('复制', '').replace('\n', '').replace(' ', '')
result_dict = getinfo(data_businfo,data_baseinfo)
print(result_dict)
#采集成功的企业
data = [com_name,social_code,result_dict['统一社会信用代码']]
appenddata(file_name,'获取基本信息成功企业',data)
# sendkafka(result_dict)
else:
data_baseinfo = baseinfo(com_soup)
else: else:
#没有搜到相同的企业名称 # 判断是否是曾用名
data = [com_name, social_code] tr = tr_list[:1][0]
appenddata(file_name, '需处理企业',data) info_t = tr.find('span', class_='copy-title')
getname = info_t.find('span').text
log.info(f'------可能是曾用名------接收到的企业名称--{receptname}---采到的企业名称--{getname}')
company_url = info_t.find('a')['href']
beforename = ifbeforename(company_url)
if beforename == receptname:
spiderinfo(company_url, securitiesCode, securitiesShortName, listingDate, category, exchange, ynDomestic, countryName, file_name)
else:
#没有搜到相同的企业名称
data = [com_name, social_code]
appenddata(file_name, '需处理企业',data)
time.sleep(2)
return False
return True
if __name__ == '__main__': if __name__ == '__main__':
taskType = '基本信息/企查查' taskType = '基本信息/企查查'
# 从redis里拿数据
nowtime = baseCore.getNowTime(1).replace('-', '')[:8]
file_name = f'./data/企业基本信息采集情况_{nowtime}.xlsx'
createFile(file_name)
while True: while True:
nowtime = baseCore.getNowTime(1).replace('-', '')[:8]
file_name = f'./data/国内企业基本信息采集情况_{nowtime}.xlsx'
createFile(file_name)
# TODO:需要隔两个小时左右抓包修改,token从数据库中获得 # TODO:需要隔两个小时左右抓包修改,token从数据库中获得
# token = baseCore.GetToken() # token = baseCore.GetToken()
# if token: # if token:
...@@ -312,7 +625,7 @@ if __name__ == '__main__': ...@@ -312,7 +625,7 @@ if __name__ == '__main__':
'Accept-Encoding': 'gzip, deflate, br', 'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9', 'Accept-Language': 'zh-CN,zh;q=0.9',
'Connection': 'keep-alive', 'Connection': 'keep-alive',
'Cookie': 'qcc_did=046d99c9-566e-4046-9094-689901b79748; UM_distinctid=18aac5b8c21810-046f8431aecf58-26031f51-1fa400-18aac5b8c22efd; CNZZDATA1254842228=109635008-1695108795-https%253A%252F%252Fwww.qcc.com%252F%7C1695113473; _uab_collina=169935323766710839405007; QCCSESSID=4e595fd804c28ae43780e55183; acw_tc=7522281e16999324472113552e97729806c88361a71c9bc96f8d5ff1c0', 'Cookie': 'qcc_did=046d99c9-566e-4046-9094-689901b79748; UM_distinctid=18aac5b8c21810-046f8431aecf58-26031f51-1fa400-18aac5b8c22efd; CNZZDATA1254842228=109635008-1695108795-https%253A%252F%252Fwww.qcc.com%252F%7C1695113473; _uab_collina=169935323766710839405007; acw_tc=db9062a717000200596487102e63dac7bed6aad2a049361c973816fabf; QCCSESSID=3c95642bd6445b7681c8fc6411',
'Host': 'www.qcc.com', 'Host': 'www.qcc.com',
'Referer': 'https://www.qcc.com/', 'Referer': 'https://www.qcc.com/',
'Sec-Ch-Ua': '"Google Chrome";v="119", "Chromium";v="119", "Not?A_Brand";v="24"', 'Sec-Ch-Ua': '"Google Chrome";v="119", "Chromium";v="119", "Not?A_Brand";v="24"',
...@@ -327,28 +640,46 @@ if __name__ == '__main__': ...@@ -327,28 +640,46 @@ if __name__ == '__main__':
} }
start_time = time.time() start_time = time.time()
# 获取企业信息 # 获取企业信息
# company_field = baseCore.redicPullData('BaseInfoEnterprise:gnqy_socialCode') company_field = baseCore.redicPullData('BaseInfoEnterprise:gnqy_socialCode')
company_field = '小米通讯技术有限公司|91110108558521630L' # company_field = '||浙江绿脉农业科技有限公司'
if company_field == 'end': if company_field == 'end':
# 本轮处理完毕,需要发送邮件,并且进入下一轮 # 本轮处理完毕,需要发送邮件,并且进入下一轮
baseCore.sendEmail(file_name) baseCore.sendEmail(file_name)
time.sleep(20) time.sleep(20)
deleteFile(file_name)
#创建下一轮的文件
nowtime = baseCore.getNowTime(1).replace('-', '')[:10]
file_name = f'./企业基本信息采集情况_{nowtime}.xlsx'
createFile(file_name)
continue continue
if company_field == '' or company_field is None: if company_field == '' or company_field is None:
# 本轮结束后没有新增的企业要采集 # 本轮结束后没有新增的企业要采集
deleteFile(file_name)
time.sleep(20) time.sleep(20)
continue continue
com_name = company_field.split('|')[0] social_code = company_field.split('|')[0]
social_code = company_field.split('|')[1] com_name = company_field.split('|')[2]
count = redaytowork(com_name,social_code) ynDomestic = company_field.split('|')[15]
countryName = company_field.split('|')[16]
securitiesCode = company_field.split('|')[17]
securitiesShortName = company_field.split('|')[18]
listingDate = company_field.split('|')[21]
category = company_field.split('|')[19]
exchange = company_field.split('|')[20]
# ynDomestic = ''
# countryName = ''
# securitiesCode = ''
# securitiesShortName = ''
# listingDate = ''
# category = ''
# exchange = ''
count = redaytowork(com_name, social_code, securitiesCode, securitiesShortName, listingDate, category, exchange,ynDomestic, countryName, file_name)
# baseCore.r.close()
# break
# baseCore.sendEmail(file_name)
# 信息采集完成后将该企业的采集次数更新 # 信息采集完成后将该企业的采集次数更新
runType = 'BaseInfoRunCount' # runType = 'BaseInfoRunCount'
baseCore.updateRun(social_code, runType, count) # baseCore.updateRun(social_code, runType, count)
\ No newline at end of file
baseCore.close()
\ No newline at end of file
...@@ -13,6 +13,8 @@ from selenium.webdriver.support import expected_conditions as EC ...@@ -13,6 +13,8 @@ from selenium.webdriver.support import expected_conditions as EC
from base.BaseCore import BaseCore from base.BaseCore import BaseCore
baseCore = BaseCore() baseCore = BaseCore()
log = baseCore.getLogger() log = baseCore.getLogger()
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
def createDriver(): def createDriver():
chrome_driver = r'D:\cmd100\chromedriver.exe' chrome_driver = r'D:\cmd100\chromedriver.exe'
path = Service(chrome_driver) path = Service(chrome_driver)
...@@ -32,10 +34,16 @@ def flushAndGetToken(): ...@@ -32,10 +34,16 @@ def flushAndGetToken():
for cookie in cookie_list: for cookie in cookie_list:
cookies[cookie['name']] = cookie['value'] cookies[cookie['name']] = cookie['value']
print(cookies) print(cookies)
insert = f"insert into QCC_token (token,cookies,create_time,fenghao_time,user_name,update_time) values ('{token}','{escape_string(cookies)}',now(),DATE_SUB(NOW(), INTERVAL 1 DAY),'{user_name}',now())"
cursor_.execute(insert)
cnx_.commit()
baseCore.close()
def getrequest_soup(headers,url): def getrequest_soup(headers,url):
req = requests.get(headers=headers,url=url) req = requests.get(headers=headers, url=url)
result = BeautifulSoup(req.content,'html.parser') result = BeautifulSoup(req.content, 'html.parser')
return result return result
def dojob(): def dojob():
...@@ -57,7 +65,7 @@ def dojob(): ...@@ -57,7 +65,7 @@ def dojob():
'Upgrade-Insecure-Requests': '1', 'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36' 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36'
} }
url = 'https://www.qcc.com/web/search?key=%E5%B0%8F%E7%B1%B3%E9%80%9A%E8%AE%AF%E6%8A%80%E6%9C%AF%E6%9C%89%E9%99%90%E5%85%AC%E5%8F%B8' url = 'https://www.qcc.com/api/userCenter/getAuthInfo'
soup = getrequest_soup(headers,url) soup = getrequest_soup(headers,url)
pass pass
......
""" """
...@@ -19,12 +19,12 @@ from urllib.parse import unquote ...@@ -19,12 +19,12 @@ from urllib.parse import unquote
baseCore = BaseCore.BaseCore() baseCore = BaseCore.BaseCore()
log = baseCore.getLogger() log = baseCore.getLogger()
baseCore = BaseCore.BaseCore() baseCore = BaseCore.BaseCore()
local = threading.local()
cnx = baseCore.cnx # 使用连接池
cursor = baseCore.cursor cnx_ = baseCore.pool_11.connection()
cursor_ = cnx_.cursor()
cnx_ = baseCore.cnx_
cursor_ = baseCore.cursor_
pathType = 'QYNotice/' pathType = 'QYNotice/'
taskType = '企业公告/证监会' taskType = '企业公告/证监会'
obsClient = ObsClient( obsClient = ObsClient(
...@@ -102,6 +102,14 @@ class EsMethod(object): ...@@ -102,6 +102,14 @@ class EsMethod(object):
log.info('更新结果:%s' % result) log.info('更新结果:%s' % result)
# 获取当前线程的数据库连接
def get_connection():
# 检查当前线程是否已经有连接对象
if not hasattr(local, 'conn'):
# 如果没有,则创建一个连接对象并保存到thread-local中
local.conn = baseCore.pool_11.connection()
return local.conn
def getuuid(): def getuuid():
get_timestamp_uuid = uuid.uuid1() # 根据 时间戳生成 uuid , 保证全球唯一 get_timestamp_uuid = uuid.uuid1() # 根据 时间戳生成 uuid , 保证全球唯一
return get_timestamp_uuid return get_timestamp_uuid
...@@ -211,6 +219,7 @@ def tableUpdate(retData, year, pdf_name, num,pub_time,origin): ...@@ -211,6 +219,7 @@ def tableUpdate(retData, year, pdf_name, num,pub_time,origin):
status, create_by, status, create_by,
create_time, page_size, full_path.split('https://zzsn.obs.cn-north-1.myhuaweicloud.com/')[1], 'zzsn', create_time, page_size, full_path.split('https://zzsn.obs.cn-north-1.myhuaweicloud.com/')[1], 'zzsn',
pub_time, origin) pub_time, origin)
cursor_.execute(Upsql, values) # 插入 cursor_.execute(Upsql, values) # 插入
cnx_.commit() # 提交 cnx_.commit() # 提交
except Exception as e: except Exception as e:
...@@ -236,7 +245,7 @@ def upload(sourceAddress,num,title,social_code,year,publishDate,createDate): ...@@ -236,7 +245,7 @@ def upload(sourceAddress,num,title,social_code,year,publishDate,createDate):
return att_id return att_id
else: else:
return None return None
from multiprocessing import Process, Queue
def main(page,p,esMethod): def main(page,p,esMethod):
# esMethod = EsMethod() # esMethod = EsMethod()
# esMethod.getFileds(index_name=esMethod.index_name) # esMethod.getFileds(index_name=esMethod.index_name)
......
""" """
...@@ -53,7 +53,7 @@ class EsMethod(object): ...@@ -53,7 +53,7 @@ class EsMethod(object):
}, },
{ {
"wildcard": { "wildcard": {
"attachmentIds.keyword": "None" "attachmentIds.keyword": "911*"
} }
} }
] ]
...@@ -62,7 +62,7 @@ class EsMethod(object): ...@@ -62,7 +62,7 @@ class EsMethod(object):
"sort": [ "sort": [
{ {
"createDate": { "createDate": {
"order": "desc" "order": "asc"
} }
} }
], ],
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论