提交 9dbcd59d 作者: LiuLiYuan

Merge remote-tracking branch 'origin/master'

......@@ -49,8 +49,8 @@ class File():
class Token():
# 获取token
def getToken(self):
cursor.execute(f"select id,cookies from QCC_token where fenghao_time < DATE_SUB(NOW(), INTERVAL 2 HOUR) order by update_time asc limit 1")
# cursor.execute(f" select id, cookies from QCC_token")
# cursor.execute(f"select id,cookies from QCC_token where fenghao_time < DATE_SUB(NOW(), INTERVAL 2 HOUR) order by update_time asc limit 1")
cursor.execute(f" select id, cookies from QCC_token where id = 63")
# rows = cursor.fetchall()
# cnx.commit()
# if rows:
......
# -*- coding: utf-8 -*-
import json
import re
import time
import pymongo
import requests
from bs4 import BeautifulSoup
from kafka import KafkaProducer
import urllib3
from selenium.webdriver.support.wait import WebDriverWait
db_storage = pymongo.MongoClient('mongodb://114.115.221.202:27017/', username='admin', password='ZZsn@9988').ZZSN[
'天眼查登录信息']
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
import sys
# sys.path.append('D:\\KK\\zzsn_spider\\base')
sys.path.append('D:\\kkwork\\zzsn_spider\\base')
import BaseCore
baseCore = BaseCore.BaseCore()
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
log = baseCore.getLogger()
from classtool import Token, File, Tag
token = Token()
file = File()
tag = Tag()
from selenium import webdriver
from selenium.webdriver.common.by import By
def create_driver():
path = r'D:\soft\msedgedriver.exe'
# options = webdriver.EdgeOptions()
options = {
"browserName": "MicrosoftEdge",
"ms:edgeOptions": {
"extensions": [], "args": ["--start-maximized"] # 添加最大化窗口运作参数
}
}
session = webdriver.Edge(executable_path=path, capabilities=options)
return session
# 发送数据
def sendkafka(post_data):
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 0, 2))
kafka_result = producer.send("enterpriseInfo", json.dumps(post_data, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
except:
exception = 'kafka传输失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', exception)
log.info(f"{com_name}--{social_code}--kafka传输失败")
# 合并基本信息和工商信息字段
def getinfo(dict1,dict2):
# 取出两个字典的key值集合
keys1 = set(dict1.keys())
keys2 = set(dict2.keys())
# 取出并集
union_keys = keys1 | keys2
# 根据并集的key值,从两个字典中取出value值,组成新的字典
result_dict = {key: dict1.get(key, None) or dict2.get(key, None) for key in union_keys}
return result_dict
# 获取基本信息
def baseinfo(com_soup):
baseinfo = com_soup.find('div', class_='index_detail__JSmQM')
cominfo_list = baseinfo.find_all('div', class_='index_detail-info-item__oAOqL') #name
data = {}
for cominfo in cominfo_list:
name = cominfo.find('span', class_='index_detail-label__oRf2J').text.replace(':', '').replace(' ', '')
# print(name)
tag.deletep(cominfo, 'span', 'class', 'index_detail-label__oRf2J')
tag.deletep(cominfo, 'i', 'class', 'index_detail-text-desc__myXYK')
# print(info)
value = cominfo.text.replace('', '').replace('\ue657', '').replace('\ue655', '')
if name == '法定代表人':
try:
value = cominfo.find('a').text
except:
value = None
if name == '电话':
try:
value = cominfo.find('span').text
except:
value = None
if name == '邮箱':
try:
value = cominfo.find('a').text
except:
value = None
if name == '网址':
try:
value = cominfo.find('a').text
except:
value = None
if name == '地址':
try:
value = cominfo.find('span').text
except:
value = None
data[name] = value
# print("==================")
briefTag = baseinfo.find('div', class_='index_detail-linewrap__AKtCa index_-intro__ma3Qd')
span_list = briefTag.find_all('span')
for span in span_list:
if len(span.attrs) == 0:
data['简介'] = span.text.split('通过天眼查大数据分析')[0]
break
return data
def dic_handle(result_dic):
zxss = ['北京市', '天津市', '上海市', '重庆市']
try:
company_name = result_dic['企业名称']
except:
company_name = None
try:
CreditCode = result_dic['统一社会信用代码']
except:
CreditCode = None
try:
OperName = result_dic['法定代表人']
except:
OperName = None
try:
PhoneNumber = result_dic['电话']
except:
PhoneNumber = None
try:
WebSite = result_dic['网址']
except:
WebSite = None
try:
Email = result_dic['邮箱']
except:
Email = None
try:
Desc = result_dic['简介']
except:
Desc = None
try:
Status = result_dic['经营状态']
except:
try:
Status = result_dic['公司现状']
except:
Status = None
try:
StartDate = result_dic['成立日期']
except:
StartDate = None
try:
RecCap = result_dic['实缴资本']
except:
RecCap = None
try:
RegistCapi = result_dic['注册资本']
except:
RegistCapi = None
try:
CheckDate = result_dic['核准日期']
except:
CheckDate = None
try:
OrgNo = result_dic['组织机构代码']
except:
OrgNo = None
try:
No = result_dic['工商注册号']
except:
No = None
try:
taxpayerNo = result_dic['纳税人识别号']
except:
taxpayerNo = None
try:
EconKind = result_dic['企业类型']
except:
EconKind = None
try:
TermStart = result_dic['营业期限'].split('至')[0]
except:
TermStart = None
try:
TeamEnd = result_dic['营业期限'].split('至')[1]
except:
TeamEnd = None
try:
TaxpayerType = result_dic['纳税人资质']
except:
TaxpayerType = None
try:
SubIndustry = result_dic['国标行业']
except:
SubIndustry = None
# try:
# region = result_dic['所属地区']
# except:
# region = None
# try:
# pattern = r'^(.*?省|.*?自治区)?(.*?市|.*?自治州)?(.*?区|.*?县|.*?自治县|.*?市辖区)?(.*?区|.*?县|.*?自治县|.*?市辖区)?$'
# matches = re.match(pattern, region)
# Province = matches.group(1)
# City = matches.group(2)
# County = matches.group(3)
# if Province is None:
# for zxs in zxss:
# if zxs in region:
# Province = zxs
# break
# except:
# Province = None
# City = None
# County = None
try:
BelongOrg = result_dic['登记机关']
except:
BelongOrg = None
try:
Info = result_dic['人员规模']
except:
Info = None
try:
can_bao = result_dic['参保人数']
except:
can_bao = None
try:
OriginalName = result_dic['曾用名']
except:
OriginalName = None
try:
EnglishName = result_dic['英文名称']
except:
EnglishName = None
try:
IxCode = result_dic['进出口企业代码']
except:
IxCode = None
try:
Address = result_dic['地址']
except:
Address = None
try:
Scope = result_dic['经营范围']
except:
Scope = None
aa_dict = {
'name': company_name, # 企业名称
'shortName': None, # 企业简称
'socialCreditCode': CreditCode, # 统一社会信用代码
'legalPerson': OperName, # 法定代表人
'officialPhone': PhoneNumber, # 电话
'officialUrl': WebSite, # 官网
'officialEmail': Email, # 邮箱
'briefInfo': Desc, # 简介
'registerStatus': Status, # 登记状态
'incorporationDate': StartDate, # 成立日期
'capital': RegistCapi, # 注册资本
'paidCapital': RecCap, # 实缴资本
'approvalDate': CheckDate, # 核准日期
'organizationCode': OrgNo, # 组织机构代码
'registerNo': No, # 工商注册号
'taxpayerNo': taxpayerNo, # 纳税人识别号
'type': EconKind, # 企业类型
'businessStartDate': TermStart, # 营业期限自
'businessEndDate': TeamEnd, # 营业期限至
'taxpayerQualification': TaxpayerType, # 纳税人资质
'industry': SubIndustry, # 所属行业
'region': None,
'province': None, # 所属省
'city': None, # 所属市
'county': None, # 所属县
'registerDepartment': BelongOrg, # 登记机关
'scale': Info, # 人员规模
'insured': can_bao, # 参保人数
'beforeName': OriginalName, # 曾用名
'englishName': EnglishName, # 英文名
'importExportEnterpriseCode': IxCode, # 进出口企业代码
'address': Address, # 地址
'businessRange': Scope, # 经营范围
'status': 0, # 状态
}
return aa_dict
# 检查登陆状态
def checklogin(key):
t = int(time.time())
# url = 'https://www.tianyancha.com/search?key=%E4%B8%AD%E5%9B%BD%E7%9F%B3%E6%B2%B9%E5%8C%96%E5%B7%A5%E9%9B%86%E5%9B%A2%E6%9C%89%E9%99%90%E5%85%AC%E5%8F%B8&sessionNo=1706594186.22975563'
url = f'https://www.tianyancha.com/search?key={key}&sessionNo={t}'
# ip = baseCore.get_proxy()
# req = requests.get(headers=headers, url=url, proxies=ip)
req = s.get(headers=headers, url=url)
time.sleep(1)
soup = BeautifulSoup(req.content, 'html.parser')
# todo:检查未登录状态
# if soup.find('title').text == '会员登录 - 企查查':
# log.info('状态---未登录')
# soup = ''
# return soup
return soup
# 采集准备
def redaytowork(com_name,social_code,securitiesCode, securitiesShortName, listingDate, category, exchange, listType, ynDomestic, countryName, file_name):
log.info(f'----当前企业{social_code}-{com_name}--开始处理---')
count = 0
# 如果没有信用代码 就通过名字搜索 如果有信用代码 就通过信用代码
if social_code and 'ZZSN' not in social_code and 'ZD' not in social_code:
soup = checklogin(social_code)
else:
soup = checklogin(com_name)
if not soup:
log.info("登录失效===重新放入redis")
baseCore.r.lpush('BaseInfoEnterprise:gnqy_socialCode', company_field)
token.updateTokeen(id_cookie,2)
# log.info('=====已重新放入redis,失效cookies已删除======')
time.sleep(20)
return count
else:
try:
searchinfo = soup.find('div', class_='index_content-tool-title__K1Z6C').find('span', class_='index_title-count__lDSjB').text
except:
log.info("登录失效===重新放入redis")
baseCore.r.lpush('BaseInfoEnterprise:gnqy_socialCode', company_field)
token.updateTokeen(id_cookie,2)
log.info('=====已重新放入redis,cookies已封号======')
time.sleep(20)
return count
if searchinfo == '0':
log.info('=====搜索不到该企业====')
data = [com_name, social_code]
# todo:搜不到的企业需要返回到一个表格中
file.appenddata(file_name, '需处理企业', data)
return count
else:
# 开始采集
try:
if spiderwork(soup, com_name, securitiesCode, securitiesShortName, listingDate, category, exchange, listType, ynDomestic, countryName, file_name):
count += 1
log.info(f'采集{com_name}成功=======耗时{baseCore.getTimeCost(start_time, time.time())}')
token.updateTokeen(id_cookie,3)
return count
else:
return count
except Exception as e:
log.info(f'====={social_code}=====获取基本信息失败,重新放入redis=====')
baseCore.r.lpush('BaseInfoEnterprise:gnqy_socialCode', company_field)
token.updateTokeen(id_cookie,2)
log.info('=====已重新放入redis,cookies已封号======')
return count
def ifbeforename(company_url):
req_ = s.get(headers=headers, url=company_url)
com_soup = BeautifulSoup(req_.content, 'html.parser')
try:
businessinfo = com_soup.find('table', {'class': 'index_tableBox__ZadJW'})
except:
businessinfo = ''
if businessinfo:
try:
name = businessinfo.find('span', class_='index_history-gray-tags__o8mkl').text
value = businessinfo.find('span', class_='index_copy-text__ri7W6').text.replace('展开', '').replace(' ', '').replace('…','').replace('\n', '').replace('复制', '').split('(')[0]
except:
name = '曾用名'
value = ''
return value
else:
return ''
# 采集基本信息和工商信息
def spiderinfo(company_url, securitiesCode, securitiesShortName, listingDate, category, exchange, listType, ynDomestic, countryName, file_name):
qccid = company_url.split('company/')[1]
req_ = s.get(headers=headers, url=company_url)
com_soup = BeautifulSoup(req_.content, 'html.parser')
try:
businessinfo = com_soup.find('table', {'class': 'index_tableBox__ZadJW'})
except:
businessinfo = ''
if businessinfo:
data_baseinfo = baseinfo(com_soup)
# print(data_baseinfo)
tr_list = businessinfo.find_all('tr')
dic_buseniss = {}
for tr in tr_list:
# td_count = len(tr.find_all('td'))
# print(td_count)
td_list = tr.find_all('td')
td_count = len(td_list)
name_list = [td_list[i].text for i in range(td_count) if i % 2 == 0]
# print(name_list)
# value_list = [td_list[i].text for i in range(td_count) if i % 2 != 0]
value_list = []
for i in range(td_count):
if i % 2 != 0:
value_tag = td_list[i]
# print(value_tag)
# print("==============")
tag.deletep(value_tag, 'span', 'class', 'index_history-operate__t3kjv')
tag.deletep(value_tag, 'div', 'class', '_efcb8')
tag.deletep(value_tag, 'span', 'class', 'index_legal-bottom-info__bYvYZ')
tag.deletep(value_tag, 'a', 'class', 'ml8 link-click')
tag.deletep(value_tag, 'span', 'class', 'index_report-jump__z__UW')
tag.deletep(value_tag, 'span', 'class', 'index_branch-report__Nyf_Y')
# for value_tag in value_tag_list:
value_list.append(value_tag.text.replace('\xa0', ''))
# print(value_list)
if len(name_list) == len(value_list):
for i in range(len(name_list)):
dic_buseniss[name_list[i]] = value_list[i]
if '曾用名' in value_list[i]:
dic_buseniss['曾用名'] = value_list[i].split('曾用名')[1].split('更多')[0]
dic_buseniss[name_list[i]] = value_list[i].split('曾用名')[0]
if name_list[i] == '法定代表人':
value_list[i] = value_list[i].split('任职')[0]
dic_buseniss[name_list[i]] = value_list[i]
try:
del dic_buseniss['天眼评分']
except:
pass
# print(dic_buseniss)
result_dict = getinfo(dic_buseniss, data_baseinfo)
# 主要针对香港台湾企业,社会信用代码传为给定的
try:
result_dict['统一社会信用代码']
except:
# log.info('未获取到统一社会信用代码')
if social_code:
result_dict['统一社会信用代码'] = social_code
else:
# 如果未给定社会信用代码,则返回
return False
if result_dict['企业名称'].startswith('(') and result_dict['企业名称'].endswith(')'):
result_dict['企业名称'] = result_dict['企业名称'][1:-1]
if result_dict['企业名称'] == '-' and com_name:
result_dict['企业名称'] = com_name
elif not com_name:
return False
else:
pass
# print(result_dict)
# 采集成功的企业
data = [com_name, result_dict['企业名称'], social_code, result_dict['统一社会信用代码']]
file.appenddata(file_name, '获取基本信息成功企业', data)
# 将字段转化成英文驼峰
aa_dic = dic_handle(result_dict)
aa_dic['qccId'] = qccid
aa_dic['ynDomestic'] = ynDomestic
aa_dic['countryName'] = countryName
aa_dic['securitiesCode'] = securitiesCode
aa_dic['securitiesShortName'] = securitiesShortName
aa_dic['listingDate'] = listingDate
aa_dic['category'] = category
aa_dic['exchange'] = exchange
aa_dic['listingType'] = listType
# print(aa_dic)
sendkafka(aa_dic)
# print(aa_dic)
else:
data_baseinfo = baseinfo(com_soup)
# 主要针对香港台湾企业,社会信用代码传为给定的
try:
data_baseinfo['统一社会信用代码']
except:
log.info('未获取到统一社会信用代码')
if social_code:
data_baseinfo['统一社会信用代码'] = social_code
else:
# 如果未给定社会信用代码,则返回
return False
if data_baseinfo['企业名称'].startswith('(') and data_baseinfo['企业名称'].endswith(')'):
data_baseinfo['企业名称'] = data_baseinfo['企业名称'][1:-1]
if data_baseinfo['企业名称'] == '-' and com_name:
data_baseinfo['企业名称'] = com_name
elif not com_name:
return False
else:
pass
# 采集成功的企业
data = [com_name, data_baseinfo['企业名称'], social_code, data_baseinfo['统一社会信用代码']]
file.appenddata(file_name, '获取基本信息成功企业', data)
# 将字段转化成英文驼峰
aa_dic = dic_handle(data_baseinfo)
aa_dic['qccId'] = qccid
aa_dic['ynDomestic'] = ynDomestic
aa_dic['countryName'] = countryName
aa_dic['securitiesCode'] = securitiesCode
aa_dic['securitiesShortName'] = securitiesShortName
aa_dic['listingDate'] = listingDate
aa_dic['category'] = category
aa_dic['exchange'] = exchange
aa_dic['listingType'] = listType
sendkafka(aa_dic)
def remove_parentheses(text):
# 清除中文小括号
text = re.sub(r'(|)', '', text)
# 清除英文小括号
text = re.sub(r'\(|\)', '', text)
return text.replace(' ', '')
# 判断名称是否统一
def spiderwork(soup, receptname, securitiesCode, securitiesShortName, listingDate, category, exchange, listType, ynDomestic, countryName, file_name):
company_url = ''
try:
company_list = soup.find_all('div', class_='index_search-box__7YVh6')
except:
log.info(f'====={social_code}=====获取基本信息失败,重新放入redis=====')
baseCore.r.lpush('BaseInfoEnterprise:gnqy_socialCode', company_field)
token.updateTokeen(id_cookie,2)
log.info('=====已重新放入redis,cookies已封号======')
return False
# receptname = '小米通讯技术有限公司'
for compamy in company_list:
info_t = compamy.find('div', class_='index_name__qEdWi')
getname = info_t.find('span').text
log.info(f'接收到的企业名称--{receptname}---采到的企业名称--{getname}')
if receptname and getname == receptname:
company_url = info_t.find('a')['href']
break
elif not receptname:
company_url = info_t.find('a')['href']
break
else:
jian_name = remove_parentheses(baseCore.hant_2_hans(getname))
if remove_parentheses(receptname) == jian_name:
log.info(f'接收到的企业名称--{receptname}---转化成简体字的企业名称--{jian_name}')
company_url = info_t.find('a')['href']
break
else:
continue
if company_url:
# company_url = 'https://www.qcc.com/firm/80af5085726bb6b9c7770f1e4d0580f4.html'
# company_url = 'https://www.qcc.com/firm/50f75e8a8859e609ec37976f8abe827d.html'
# 采集基本信息和工商信息
spiderinfo(company_url, securitiesCode, securitiesShortName, listingDate, category, exchange, listType, ynDomestic, countryName, file_name)
else:
# 判断是否是曾用名
getname = ''
for child in company_list[0].find_all():
if child.has_attr('class'):
print(child['class'])
if 'index_name' in child['class'][0]:
getname = child.text
company_url = child.find('a')['href']
break
# tr = company_list[:1][0]
# info_t = tr.find('div', class_='index_name__qEdWi')
# getname = info_t.find('span').text
if getname:
log.info(f'------可能是曾用名------接收到的企业名称--{receptname}---采到的企业名称--{getname}')
beforename = ifbeforename(company_url)
if beforename == receptname:
spiderinfo(company_url, securitiesCode, securitiesShortName, listingDate, category, exchange, listType,
ynDomestic, countryName, file_name)
else:
# 没有搜到相同的企业名称
data = [com_name, social_code]
file.appenddata(file_name, '需处理企业', data)
time.sleep(2)
return False
else:
# 没有搜到相同的企业名称
data = [com_name, social_code]
file.appenddata(file_name, '需处理企业', data)
time.sleep(2)
return False
return True
if __name__ == '__main__':
taskType = '基本信息/天眼查'
# driver, id_cookie = login()
while True:
nowtime = baseCore.getNowTime(1).replace('-', '')[:8]
file_name = f'./data/国内企业基本信息采集情况.xlsx'
file.createFile(file_name)
headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Cache-Control': 'max-age=0',
'Connection': 'keep-alive',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
}
cookies_list, id_cookie = token.get_cookies()
cookies = {}
for cookie in cookies_list:
cookies[cookie['name']] = cookie['value']
s = requests.Session()
s.cookies.update(cookies)
start_time = time.time()
# 获取企业信息
company_field = baseCore.redicPullData('BaseInfoEnterprise:gnqy_socialCode')
# company_field = '|北京华信瑞德信息技术有限公司|北京华信瑞德信息技术有限公司|||||||||||||1|中国内地|||||||'
if company_field == 'end':
# 本轮处理完毕,需要发送邮件,并且进入下一轮
baseCore.sendEmail(file_name)
time.sleep(20)
file.deleteFile(file_name)
continue
if company_field == '' or company_field is None:
# 本轮结束后没有新增的企业要采集
file.deleteFile(file_name)
flag = True
while flag:
log.info('--------已没有数据---------')
time.sleep(30)
if not baseCore.check_mysql_conn(cnx_):
# 144数据库
cnx_ = baseCore.cnx
cursor_ = cnx_.cursor()
log.info('===11数据库重新连接成功===')
company_field = baseCore.redicPullData('BaseInfoEnterprise:gnqy_socialCode')
if company_field:
flag = False
log.info("-----已添加数据------")
baseCore.r.lpush('BaseInfoEnterprise:gnqy_socialCode', company_field)
continue
continue
# company_field_ = f'|{company_field}'
social_code = company_field.split('|')[0]
com_name = company_field.split('|')[2].replace(' ', '')
ynDomestic = company_field.split('|')[15]
countryName = company_field.split('|')[16]
securitiesCode = company_field.split('|')[17]
securitiesShortName = company_field.split('|')[18]
listingDate = company_field.split('|')[21]
category = company_field.split('|')[19]
exchange = company_field.split('|')[20]
listType = company_field.split('|')[21]
# ynDomestic = None
# countryName = None
# securitiesCode = None
# securitiesShortName = None
# listingDate = None
# category = None
# exchange = None
# listType = None
count = redaytowork(com_name, social_code, securitiesCode, securitiesShortName, listingDate, category, exchange,
listType, ynDomestic, countryName, file_name)
time.sleep(10)
# break
# baseCore.r.close()
# baseCore.sendEmail(file_name)
# 信息采集完成后将该企业的采集次数更新
# runType = 'BaseInfoRunCount'
# baseCore.updateRun(social_code, runType, count)
# break
baseCore.close()
\ No newline at end of file
import datetime
import json
import os.path
import random
import pymongo
from bson import ObjectId
from openpyxl import Workbook, load_workbook
from base.BaseCore import BaseCore
baseCore = BaseCore()
log = baseCore.getLogger()
cnx = baseCore.cnx
cursor = baseCore.cursor
db_storage = pymongo.MongoClient('mongodb://114.115.221.202:27017/', username='admin', password='ZZsn@9988').ZZSN[
'天眼查登录信息']
class File():
# 创建文件
def createFile(self,file_name):
if os.path.exists(file_name):
return
else:
wb = Workbook()
sheet = wb.active
# 更改默认的sheet名称
sheet.title = "需处理企业"
sheet.append(["企业名称", "社会信用代码"])
# 创建另一个sheet
sheet2 = wb.create_sheet("获取基本信息成功企业")
sheet2.append(["企业名称", "采到的企业名称", "社会信用代码", "采到的信用代码"])
wb.save(file_name)
wb.close()
# 删除文件
def deleteFile(self,file_name):
if os.path.exists(file_name):
os.remove(file_name)
else:
pass
# 追加数据
def appenddata(self,file_name, sheet, data):
# 打开现有的Excel文件
wb = load_workbook(file_name)
# 选择要追加数据的sheet
sheet = wb[sheet]
sheet.append(data)
# 保存Excel文件
wb.save(file_name)
wb.close()
class Token():
# 获取token
def get_cookies(self):
query = {
'fenghaoTime': {'$lt': 'updateTime'}, # 封号时间小于更新时间
}
result = db_storage.find_one(query, sort=[('updateTime', 1)])
cookies = result['cookies']
id_token = result['_id']
return cookies, id_token
# 删除失效的token
def delete_token(self, cookie_):
deletesql = f"delete from QCC_token where id='{cookie_}' "
cursor.execute(deletesql)
cnx.commit()
# token的处理
def updateTokeen(self,id_token, type):
if type == 1:
# session失效,删除token
cursor.execute(f"delete from QCC_token where id={id_token}")
if type == 2:
# 封号了 修改封号时间
filter = {'_id': ObjectId(id_token)}
# 更新操作
update = {'$set': {'fenghaoTime': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}}
# 执行更新操作
db_storage.update_one(filter, update)
if type == 3:
# 修改使用时间
filter = {'_id': ObjectId(id_token)}
# 更新操作
update = {'$set': {'updateTime': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}}
# 执行更新操作
db_storage.update_one(filter, update)
cnx.commit()
class Tag():
# 删除特定属性标签
def deletep(self, soup, tag_, attribute_to_delete, value_to_delete):
if attribute_to_delete and value_to_delete:
# 查找带有指定属性的P标签并删除
tags = soup.find_all(tag_, {attribute_to_delete: value_to_delete})
for tag in tags:
# print(tag)
tag.decompose()
else:
tags = soup.find_all(tag_)
for tag in tags:
# print(tag)
tag.decompose()
# 删除空标签
def deletek(self, soup):
# 删除空白标签(例如<p></p>、<p><br></p>, img、video、hr除外)
for i in soup.find_all(lambda tag: len(tag.get_text()) == 0 and tag.name not in ["img", "video",
"br"] and tag.name != "br" or tag.get_text() == ' ' or tag.get_text() == ' '):
for j in i.descendants:
if j.name in ["img", "video", "br"]:
break
else:
i.decompose()
# 删除span标签
def deletespan(self, td):
spans = td.find_all('span', class_='app-copy copy-button-item')
for span in spans:
if '复制' in span.text:
span.extract() # 删除span标签
spans2 = td.find_all('span', slot='content')
for span2 in spans2:
if '趋势图' in span2.text:
span2.extract()
spans3 = td.find_all('span', class_='m-l-r-10')
for span3 in spans3:
if '年报' in span3.text:
span3.extract()
spans4 = td.find_all('span', class_='text-span')
for span4 in spans4:
span4.extract()
\ No newline at end of file
import datetime
import time
from selenium import webdriver
import pymongo
from selenium.webdriver.common.by import By
db_storage = pymongo.MongoClient('mongodb://114.115.221.202:27017/', username='admin', password='ZZsn@9988').ZZSN[
'天眼查登录信息']
url = 'https://www.tianyancha.com/'
def create_driver():
path = r'D:\soft\msedgedriver.exe'
# options = webdriver.EdgeOptions()
options = {
"browserName": "MicrosoftEdge",
"ms:edgeOptions": {
"extensions": [], "args": ["--start-maximized"] # 添加最大化窗口运作参数
}
}
driver = webdriver.Edge(executable_path=path, capabilities=options)
return driver
if __name__ == "__main__":
name = input('所属用户:')
driver = create_driver()
driver.get(url)
time.sleep(60)
cookies = driver.get_cookies()
# print(driver.get_cookies())
# 要存储的数据
create_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
fenghaoTime = (datetime.datetime.now() - datetime.timedelta(days=1)).strftime('%Y-%m-%d %H:%M:%S')
data = {
'name': name,
'cookies': cookies,
'createTime': create_time,
'fenghaoTime': fenghaoTime,
'updateTime': create_time,
}
# 插入数据
result = db_storage.insert_one(data)
# 打印插入的数据的 ID
print(result.inserted_id)
\ No newline at end of file
import os
import os
......@@ -49,9 +49,15 @@ def uptoOBS(pdf_url,pdf_name,type_id,social_code):
for i in range(0, 3):
try:
response = requests.get(pdf_url, headers=headers,verify=False, timeout=20)
if response.status_code != 200:
return retData
file_size = int(response.headers.get('Content-Length'))
retData['content'] = response.text
break
#todo:判断内容是否成功
if '<div class="K">403</div>' in retData['content'] or 'Error Times: ' in retData['content']:
return retData
else:
break
except:
time.sleep(3)
continue
......@@ -339,7 +345,8 @@ def gonggao_info(dic_info):
info_content = json_2['data']['notice_content']
except:
info_content = ''
ifexist = ifInstert(com_name, social_code, info_url)
# ifexist = ifInstert(com_name, social_code, info_url)
ifexist = True
if ifexist:
# 解析PDF内容,先获取PDF链接 下载 解析成功,解析失败 ,传输成功,传输失败
result = GetContent(pdf_url, info_url,title, social_code, year, info_date, start_time, com_name, num)
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论