提交 f751b7bb 作者: 薛凌堃

天眼查基本信息

上级 862e97ab
# -*- coding: utf-8 -*-
import json
import re
import time
import pymongo
import requests
from bs4 import BeautifulSoup
from kafka import KafkaProducer
import urllib3
from selenium.webdriver.support.wait import WebDriverWait
db_storage = pymongo.MongoClient('mongodb://114.115.221.202:27017/', username='admin', password='ZZsn@9988').ZZSN[
'天眼查登录信息']
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
import sys
# sys.path.append('D:\\KK\\zzsn_spider\\base')
sys.path.append('D:\\kkwork\\zzsn_spider\\base')
import BaseCore
baseCore = BaseCore.BaseCore()
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
log = baseCore.getLogger()
from classtool import Token, File, Tag
token = Token()
file = File()
tag = Tag()
from selenium import webdriver
from selenium.webdriver.common.by import By
def create_driver():
path = r'D:\soft\msedgedriver.exe'
# options = webdriver.EdgeOptions()
options = {
"browserName": "MicrosoftEdge",
"ms:edgeOptions": {
"extensions": [], "args": ["--start-maximized"] # 添加最大化窗口运作参数
}
}
session = webdriver.Edge(executable_path=path, capabilities=options)
return session
# 发送数据
def sendkafka(post_data):
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 0, 2))
kafka_result = producer.send("enterpriseInfo", json.dumps(post_data, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
except:
exception = 'kafka传输失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', exception)
log.info(f"{com_name}--{social_code}--kafka传输失败")
# 合并基本信息和工商信息字段
def getinfo(dict1,dict2):
# 取出两个字典的key值集合
keys1 = set(dict1.keys())
keys2 = set(dict2.keys())
# 取出并集
union_keys = keys1 | keys2
# 根据并集的key值,从两个字典中取出value值,组成新的字典
result_dict = {key: dict1.get(key, None) or dict2.get(key, None) for key in union_keys}
return result_dict
# 获取基本信息
def baseinfo(com_soup):
baseinfo = com_soup.find('div', class_='index_detail__JSmQM')
cominfo_list = baseinfo.find_all('div', class_='index_detail-info-item__oAOqL') #name
data = {}
for cominfo in cominfo_list:
name = cominfo.find('span', class_='index_detail-label__oRf2J').text.replace(':', '').replace(' ', '')
# print(name)
tag.deletep(cominfo, 'span', 'class', 'index_detail-label__oRf2J')
tag.deletep(cominfo, 'i', 'class', 'index_detail-text-desc__myXYK')
# print(info)
value = cominfo.text.replace('', '').replace('\ue657', '').replace('\ue655', '')
if name == '法定代表人':
value = cominfo.find('a').text
if name == '电话':
value = cominfo.find('span').text
if name == '邮箱':
value = cominfo.find('a').text
if name == '网址':
value = cominfo.find('a').text
if name == '地址':
value = cominfo.find('span').text
data[name] = value
# print("==================")
briefTag = baseinfo.find('div', class_='index_detail-linewrap__AKtCa index_-intro__ma3Qd')
span_list = briefTag.find_all('span')
for span in span_list:
if len(span.attrs) == 0:
data['简介'] = span.text
break
return data
def dic_handle(result_dic):
zxss = ['北京市', '天津市', '上海市', '重庆市']
try:
company_name = result_dic['企业名称']
except:
company_name = ''
try:
CreditCode = result_dic['统一社会信用代码']
except:
CreditCode = ''
try:
OperName = result_dic['法定代表人']
except:
OperName = ''
try:
PhoneNumber = result_dic['电话']
except:
PhoneNumber = ''
try:
WebSite = result_dic['网址']
except:
WebSite = ''
try:
Email = result_dic['邮箱']
except:
Email = ''
try:
Desc = result_dic['简介']
except:
Desc = ''
try:
Status = result_dic['经营状态']
except:
Status = ''
try:
StartDate = result_dic['成立日期']
except:
StartDate = ''
try:
RecCap = result_dic['实缴资本']
except:
RecCap = ''
try:
RegistCapi = result_dic['注册资本']
except:
RegistCapi = ''
try:
CheckDate = result_dic['核准日期']
except:
CheckDate = ''
try:
OrgNo = result_dic['组织机构代码']
except:
OrgNo = ''
try:
No = result_dic['工商注册号']
except:
No = ''
try:
taxpayerNo = result_dic['纳税人识别号']
except:
taxpayerNo = ''
try:
EconKind = result_dic['企业类型']
except:
EconKind = ''
try:
TermStart = result_dic['营业期限'].split('至')[0]
except:
TermStart = ''
try:
TeamEnd = result_dic['营业期限'].split('至')[1]
except:
TeamEnd = ''
try:
TaxpayerType = result_dic['纳税人资质']
except:
TaxpayerType = ''
try:
SubIndustry = result_dic['国标行业']
except:
SubIndustry = ''
try:
region = result_dic['所属地区']
except:
region = ''
try:
pattern = r'^(.*?省|.*?自治区)?(.*?市|.*?自治州)?(.*?区|.*?县|.*?自治县|.*?市辖区)?(.*?区|.*?县|.*?自治县|.*?市辖区)?$'
matches = re.match(pattern, region)
Province = matches.group(1)
City = matches.group(2)
County = matches.group(3)
if Province is None:
for zxs in zxss:
if zxs in region:
Province = zxs
break
except:
Province = ''
City = ''
County = ''
try:
BelongOrg = result_dic['登记机关']
except:
BelongOrg = ''
try:
Info = result_dic['人员规模']
except:
Info = ''
try:
can_bao = result_dic['参保人数']
except:
can_bao = ''
try:
OriginalName = result_dic['曾用名']
except:
OriginalName = ''
try:
EnglishName = result_dic['英文名称']
except:
EnglishName = ''
try:
IxCode = result_dic['进出口企业代码']
except:
IxCode = ''
try:
Address = result_dic['地址']
except:
Address = ''
try:
Scope = result_dic['经营范围']
except:
Scope = ''
aa_dict = {
'name': company_name, # 企业名称
'shortName': '', # 企业简称
'socialCreditCode': CreditCode, # 统一社会信用代码
'legalPerson': OperName, # 法定代表人
'officialPhone': PhoneNumber, # 电话
'officialUrl': WebSite, # 官网
'officialEmail': Email, # 邮箱
'briefInfo': Desc, # 简介
'registerStatus': Status, # 登记状态
'incorporationDate': StartDate, # 成立日期
'capital': RegistCapi, # 注册资本
'paidCapital': RecCap, # 实缴资本
'approvalDate': CheckDate, # 核准日期
'organizationCode': OrgNo, # 组织机构代码
'registerNo': No, # 工商注册号
'taxpayerNo': taxpayerNo, # 纳税人识别号
'type': EconKind, # 企业类型
'businessStartDate': TermStart, # 营业期限自
'businessEndDate': TeamEnd, # 营业期限至
'taxpayerQualification': TaxpayerType, # 纳税人资质
'industry': SubIndustry, # 所属行业
'region': region,
'province': Province, # 所属省
'city': City, # 所属市
'county': County, # 所属县
'registerDepartment': BelongOrg, # 登记机关
'scale': Info, # 人员规模
'insured': can_bao, # 参保人数
'beforeName': OriginalName, # 曾用名
'englishName': EnglishName, # 英文名
'importExportEnterpriseCode': IxCode, # 进出口企业代码
'address': Address, # 地址
'businessRange': Scope, # 经营范围
'status': 0, # 状态
}
return aa_dict
# 检查登陆状态
def checklogin(key):
t = int(time.time())
# url = 'https://www.tianyancha.com/search?key=%E4%B8%AD%E5%9B%BD%E7%9F%B3%E6%B2%B9%E5%8C%96%E5%B7%A5%E9%9B%86%E5%9B%A2%E6%9C%89%E9%99%90%E5%85%AC%E5%8F%B8&sessionNo=1706594186.22975563'
url = f'https://www.tianyancha.com/search?key={key}&sessionNo={t}'
# ip = baseCore.get_proxy()
# req = requests.get(headers=headers, url=url, proxies=ip)
req = s.get(headers=headers, url=url)
time.sleep(1)
soup = BeautifulSoup(req.content, 'html.parser')
# todo:检查未登录状态
# if soup.find('title').text == '会员登录 - 企查查':
# log.info('状态---未登录')
# soup = ''
# return soup
return soup
# 采集准备
def redaytowork(com_name,social_code,securitiesCode, securitiesShortName, listingDate, category, exchange, listType, ynDomestic, countryName, file_name):
log.info(f'----当前企业{social_code}-{com_name}--开始处理---')
count = 0
# 如果没有信用代码 就通过名字搜索 如果有信用代码 就通过信用代码
if social_code:
soup = checklogin(social_code)
else:
soup = checklogin(com_name)
if not soup:
log.info("登录失效===重新放入redis")
baseCore.r.lpush('BaseInfoEnterprise:gnqy_socialCode', company_field)
token.updateTokeen(id_cookie,2)
# log.info('=====已重新放入redis,失效cookies已删除======')
time.sleep(20)
return count
else:
try:
searchinfo = soup.find('div', class_='index_content-tool-title__K1Z6C').find('span', class_='index_title-count__lDSjB').text
except:
log.info("登录失效===重新放入redis")
baseCore.r.lpush('BaseInfoEnterprise:gnqy_socialCode', company_field)
token.updateTokeen(id_cookie,2)
log.info('=====已重新放入redis,cookies已封号======')
time.sleep(20)
return count
if searchinfo == '0':
log.info('=====搜索不到该企业====')
data = [com_name, social_code]
# todo:搜不到的企业需要返回到一个表格中
file.appenddata(file_name, '需处理企业', data)
return count
else:
# 开始采集
try:
if spiderwork(soup, com_name, securitiesCode, securitiesShortName, listingDate, category, exchange, listType, ynDomestic, countryName, file_name):
count += 1
log.info(f'采集{com_name}成功=======耗时{baseCore.getTimeCost(start_time, time.time())}')
token.updateTokeen(id_cookie,3)
return count
else:
return count
except Exception as e:
log.info(f'====={social_code}=====获取基本信息失败,重新放入redis=====')
baseCore.r.lpush('BaseInfoEnterprise:gnqy_socialCode', company_field)
token.updateTokeen(id_cookie,2)
log.info('=====已重新放入redis,cookies已封号======')
return count
def ifbeforename(company_url):
req_ = s.get(headers=headers, url=company_url)
com_soup = BeautifulSoup(req_.content, 'html.parser')
try:
businessinfo = com_soup.find('table', {'class': 'index_tableBox__ZadJW'})
except:
businessinfo = ''
if businessinfo:
try:
name = businessinfo.find('span', class_='index_history-gray-tags__o8mkl').text
value = businessinfo.find('span', class_='index_copy-text__ri7W6').text.replace('展开', '').replace(' ', '').replace('…','').replace('\n', '').replace('复制', '').split('(')[0]
except:
name = '曾用名'
value = ''
return value
else:
return ''
# 采集基本信息和工商信息
def spiderinfo(company_url, securitiesCode, securitiesShortName, listingDate, category, exchange, listType, ynDomestic, countryName, file_name):
qccid = company_url.split('company/')[1]
req_ = s.get(headers=headers, url=company_url)
com_soup = BeautifulSoup(req_.content, 'html.parser')
try:
businessinfo = com_soup.find('table', {'class': 'index_tableBox__ZadJW'})
except:
businessinfo = ''
if businessinfo:
data_baseinfo = baseinfo(com_soup)
# print(data_baseinfo)
tr_list = businessinfo.find_all('tr')
dic_buseniss = {}
for tr in tr_list:
# td_count = len(tr.find_all('td'))
# print(td_count)
td_list = tr.find_all('td')
td_count = len(td_list)
name_list = [td_list[i].text for i in range(td_count) if i % 2 == 0]
# print(name_list)
# value_list = [td_list[i].text for i in range(td_count) if i % 2 != 0]
value_list = []
for i in range(td_count):
if i % 2 != 0:
value_tag = td_list[i]
# print(value_tag)
# print("==============")
tag.deletep(value_tag, 'span', 'class', 'index_history-operate__t3kjv')
tag.deletep(value_tag, 'div', 'class', '_efcb8')
tag.deletep(value_tag, 'span', 'class', 'index_legal-bottom-info__bYvYZ')
tag.deletep(value_tag, 'a', 'class', 'ml8 link-click')
tag.deletep(value_tag, 'span', 'class', 'index_report-jump__z__UW')
tag.deletep(value_tag, 'span', 'class', 'index_branch-report__Nyf_Y')
# for value_tag in value_tag_list:
value_list.append(value_tag.text.replace('\xa0', ''))
# print(value_list)
if len(name_list) == len(value_list):
for i in range(len(name_list)):
dic_buseniss[name_list[i]] = value_list[i]
if '曾用名' in value_list[i]:
dic_buseniss['曾用名'] = value_list[i].split('曾用名')[1].split('更多')[0]
dic_buseniss[name_list[i]] = value_list[i].split('曾用名')[0]
if name_list[i] == '法定代表人':
value_list[i] = value_list[i].split('任职')[0]
dic_buseniss[name_list[i]] = value_list[i]
del dic_buseniss['天眼评分']
# print(dic_buseniss)
result_dict = getinfo(dic_buseniss, data_baseinfo)
# print(result_dict)
# 采集成功的企业
data = [com_name, result_dict['企业名称'], social_code, result_dict['统一社会信用代码']]
file.appenddata(file_name, '获取基本信息成功企业', data)
# 将字段转化成英文驼峰
aa_dic = dic_handle(result_dict)
aa_dic['qccId'] = qccid
aa_dic['ynDomestic'] = ynDomestic
aa_dic['countryName'] = countryName
aa_dic['securitiesCode'] = securitiesCode
aa_dic['securitiesShortName'] = securitiesShortName
aa_dic['listingDate'] = listingDate
aa_dic['category'] = category
aa_dic['exchange'] = exchange
aa_dic['listingType'] = listType
# print(aa_dic)
sendkafka(aa_dic)
else:
data_baseinfo = baseinfo(com_soup)
# 采集成功的企业
data = [com_name, data_baseinfo['企业名称'], social_code, data_baseinfo['统一社会信用代码']]
file.appenddata(file_name, '获取基本信息成功企业', data)
# 将字段转化成英文驼峰
aa_dic = dic_handle(data_baseinfo)
aa_dic['qccId'] = qccid
aa_dic['ynDomestic'] = ynDomestic
aa_dic['countryName'] = countryName
aa_dic['securitiesCode'] = securitiesCode
aa_dic['securitiesShortName'] = securitiesShortName
aa_dic['listingDate'] = listingDate
aa_dic['category'] = category
aa_dic['exchange'] = exchange
aa_dic['listingType'] = listType
sendkafka(aa_dic)
# 判断名称是否统一
def spiderwork(soup, receptname, securitiesCode, securitiesShortName, listingDate, category, exchange, listType, ynDomestic, countryName, file_name):
company_url = ''
try:
company_list = soup.find('div', class_='index_search-box__7YVh6')
except:
log.info(f'====={social_code}=====获取基本信息失败,重新放入redis=====')
baseCore.r.lpush('BaseInfoEnterprise:gnqy_socialCode', company_field)
token.updateTokeen(id_cookie,2)
log.info('=====已重新放入redis,cookies已封号======')
return False
# receptname = '小米通讯技术有限公司'
for compamy in company_list:
info_t = compamy.find('div', class_='index_name__qEdWi')
getname = info_t.find('span').text
log.info(f'接收到的企业名称--{receptname}---采到的企业名称--{getname}')
if receptname and getname == receptname:
company_url = info_t.find('a')['href']
break
elif not receptname:
company_url = info_t.find('a')['href']
break
else:
continue
if company_url:
# company_url = 'https://www.qcc.com/firm/80af5085726bb6b9c7770f1e4d0580f4.html'
# company_url = 'https://www.qcc.com/firm/50f75e8a8859e609ec37976f8abe827d.html'
# 采集基本信息和工商信息
spiderinfo(company_url, securitiesCode, securitiesShortName, listingDate, category, exchange, listType, ynDomestic, countryName, file_name)
else:
# 判断是否是曾用名
for child in company_list[0].find_all():
if child.has_attr('class'):
print(child['class'])
if 'index_name' in child['class']:
getname = child.text
company_url = child.find('a')['href']
break
else:
# 没有搜到相同的企业名称
data = [com_name, social_code]
file.appenddata(file_name, '需处理企业', data)
time.sleep(2)
return False
# tr = company_list[:1][0]
# info_t = tr.find('div', class_='index_name__qEdWi')
# getname = info_t.find('span').text
log.info(f'------可能是曾用名------接收到的企业名称--{receptname}---采到的企业名称--{getname}')
beforename = ifbeforename(company_url)
if beforename == receptname:
spiderinfo(company_url, securitiesCode, securitiesShortName, listingDate, category, exchange, listType, ynDomestic, countryName, file_name)
else:
#没有搜到相同的企业名称
data = [com_name, social_code]
file.appenddata(file_name, '需处理企业',data)
time.sleep(2)
return False
return True
if __name__ == '__main__':
taskType = '基本信息/天眼查'
# driver, id_cookie = login()
while True:
nowtime = baseCore.getNowTime(1).replace('-', '')[:8]
file_name = f'./国内企业基本信息采集情况.xlsx'
file.createFile(file_name)
headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Cache-Control': 'max-age=0',
'Connection': 'keep-alive',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
}
cookies_list, id_cookie = token.get_cookies()
cookies = {}
for cookie in cookies_list:
cookies[cookie['name']] = cookie['value']
s = requests.Session()
s.cookies.update(cookies)
start_time = time.time()
# 获取企业信息
# company_field = baseCore.redicPullData('BaseInfoEnterprise:gnqy_socialCode')
company_field = '91130000738711917Q||'
if company_field == 'end':
# 本轮处理完毕,需要发送邮件,并且进入下一轮
baseCore.sendEmail(file_name)
time.sleep(20)
file.deleteFile(file_name)
continue
if company_field == '' or company_field is None:
# 本轮结束后没有新增的企业要采集
file.deleteFile(file_name)
flag = True
while flag:
log.info('--------已没有数据---------')
time.sleep(30)
if not baseCore.check_mysql_conn(cnx_):
# 144数据库
cnx_ = baseCore.cnx
cursor_ = cnx_.cursor()
log.info('===11数据库重新连接成功===')
company_field = baseCore.redicPullData('BaseInfoEnterprise:gnqy_socialCode')
if company_field:
flag = False
log.info("-----已添加数据------")
baseCore.r.lpush('BaseInfoEnterprise:gnqy_socialCode', company_field)
continue
continue
social_code = company_field.split('|')[0]
com_name = company_field.split('|')[1].replace(' ', '')
# ynDomestic = company_field.split('|')[15]
# countryName = company_field.split('|')[16]
# securitiesCode = company_field.split('|')[17]
# securitiesShortName = company_field.split('|')[18]
# listingDate = company_field.split('|')[21]
# category = company_field.split('|')[19]
# exchange = company_field.split('|')[20]
# listType = company_field.split('|')[21]
ynDomestic = '1'
countryName = ''
securitiesCode = ''
securitiesShortName = ''
listingDate = ''
category = ''
exchange = ''
listType = ''
count = redaytowork(com_name, social_code, securitiesCode, securitiesShortName, listingDate, category, exchange,
listType, ynDomestic, countryName, file_name)
time.sleep(10)
# break
# baseCore.r.close()
# baseCore.sendEmail(file_name)
# 信息采集完成后将该企业的采集次数更新
# runType = 'BaseInfoRunCount'
# baseCore.updateRun(social_code, runType, count)
break
baseCore.close()
\ No newline at end of file
import datetime
import json
import os.path
import random
import pymongo
from bson import ObjectId
from openpyxl import Workbook, load_workbook
from base.BaseCore import BaseCore
baseCore = BaseCore()
log = baseCore.getLogger()
cnx = baseCore.cnx
cursor = baseCore.cursor
db_storage = pymongo.MongoClient('mongodb://114.115.221.202:27017/', username='admin', password='ZZsn@9988').ZZSN[
'天眼查登录信息']
class File():
# 创建文件
def createFile(self,file_name):
if os.path.exists(file_name):
return
else:
wb = Workbook()
sheet = wb.active
# 更改默认的sheet名称
sheet.title = "需处理企业"
sheet.append(["企业名称", "社会信用代码"])
# 创建另一个sheet
sheet2 = wb.create_sheet("获取基本信息成功企业")
sheet2.append(["企业名称", "采到的企业名称", "社会信用代码", "采到的信用代码"])
wb.save(file_name)
wb.close()
# 删除文件
def deleteFile(self,file_name):
if os.path.exists(file_name):
os.remove(file_name)
else:
pass
# 追加数据
def appenddata(self,file_name, sheet, data):
# 打开现有的Excel文件
wb = load_workbook(file_name)
# 选择要追加数据的sheet
sheet = wb[sheet]
sheet.append(data)
# 保存Excel文件
wb.save(file_name)
wb.close()
class Token():
# 获取token
def get_cookies(self):
query = {
'fenghaoTime': {'$lt': 'updateTime'}, # 封号时间小于更新时间
}
result = db_storage.find_one(query, sort=[('updateTime', -1)])
cookies = result['cookies']
id_token = result['_id']
return cookies, id_token
# 删除失效的token
def delete_token(self, cookie_):
deletesql = f"delete from QCC_token where id='{cookie_}' "
cursor.execute(deletesql)
cnx.commit()
# token的处理
def updateTokeen(self,id_token, type):
if type == 1:
# session失效,删除token
cursor.execute(f"delete from QCC_token where id={id_token}")
if type == 2:
# 封号了 修改封号时间
filter = {'_id': ObjectId(id_token)}
# 更新操作
update = {'$set': {'fenghaoTime': datetime.datetime.now()}}
# 执行更新操作
db_storage.update_one(filter, update)
if type == 3:
# 修改使用时间
filter = {'_id': ObjectId(id_token)}
# 更新操作
update = {'$set': {'updateTtime': datetime.datetime.now()}}
# 执行更新操作
db_storage.update_one(filter, update)
cnx.commit()
class Tag():
# 删除特定属性标签
def deletep(self, soup, tag_, attribute_to_delete, value_to_delete):
if attribute_to_delete and value_to_delete:
# 查找带有指定属性的P标签并删除
tags = soup.find_all(tag_, {attribute_to_delete: value_to_delete})
for tag in tags:
# print(tag)
tag.decompose()
else:
tags = soup.find_all(tag_)
for tag in tags:
# print(tag)
tag.decompose()
# 删除空标签
def deletek(self, soup):
# 删除空白标签(例如<p></p>、<p><br></p>, img、video、hr除外)
for i in soup.find_all(lambda tag: len(tag.get_text()) == 0 and tag.name not in ["img", "video",
"br"] and tag.name != "br" or tag.get_text() == ' ' or tag.get_text() == ' '):
for j in i.descendants:
if j.name in ["img", "video", "br"]:
break
else:
i.decompose()
# 删除span标签
def deletespan(self, td):
spans = td.find_all('span', class_='app-copy copy-button-item')
for span in spans:
if '复制' in span.text:
span.extract() # 删除span标签
spans2 = td.find_all('span', slot='content')
for span2 in spans2:
if '趋势图' in span2.text:
span2.extract()
spans3 = td.find_all('span', class_='m-l-r-10')
for span3 in spans3:
if '年报' in span3.text:
span3.extract()
spans4 = td.find_all('span', class_='text-span')
for span4 in spans4:
span4.extract()
\ No newline at end of file
import datetime
import time
from selenium import webdriver
import pymongo
from selenium.webdriver.common.by import By
db_storage = pymongo.MongoClient('mongodb://114.115.221.202:27017/', username='admin', password='ZZsn@9988').ZZSN[
'天眼查登录信息']
url = 'https://www.tianyancha.com/'
def create_driver():
path = r'D:\soft\msedgedriver.exe'
# options = webdriver.EdgeOptions()
options = {
"browserName": "MicrosoftEdge",
"ms:edgeOptions": {
"extensions": [], "args": ["--start-maximized"] # 添加最大化窗口运作参数
}
}
driver = webdriver.Edge(executable_path=path, capabilities=options)
return driver
if __name__ == "__main__":
name = input('所属用户:')
driver = create_driver()
driver.get(url)
time.sleep(100)
cookies = driver.get_cookies()
# print(driver.get_cookies())
# 要存储的数据
create_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
fenghaoTime = (datetime.datetime.now() - datetime.timedelta(days=1)).strftime('%Y-%m-%d %H:%M:%S')
data = {
'name': name,
'cookies': cookies,
'createTime': create_time,
'fenghaoTime': fenghaoTime,
'updateTime': create_time,
}
# 插入数据
result = db_storage.insert_one(data)
# 打印插入的数据的 ID
print(result.inserted_id)
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论