Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
Z
zzsn_spider
概览
概览
详情
活动
周期分析
版本库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
统计图
问题
0
议题
0
列表
看板
标记
里程碑
合并请求
1
合并请求
1
CI / CD
CI / CD
流水线
作业
日程表
图表
维基
Wiki
代码片段
代码片段
成员
成员
折叠边栏
关闭边栏
活动
图像
聊天
创建新问题
作业
提交
问题看板
Open sidebar
丁双波
zzsn_spider
Commits
8cf2bc43
提交
8cf2bc43
authored
8月 17, 2024
作者:
XveLingKun
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
天眼查-工具包
上级
617cfcb9
显示空白字符变更
内嵌
并排
正在显示
1 个修改的文件
包含
168 行增加
和
0 行删除
+168
-0
classtool.py
enterprise_tyc/classtool.py
+168
-0
没有找到文件。
enterprise_tyc/classtool.py
0 → 100644
浏览文件 @
8cf2bc43
# -*- coding: utf-8 -*-
import
datetime
import
json
import
os.path
import
random
import
time
import
pymongo
import
requests
from
bson
import
ObjectId
from
openpyxl
import
Workbook
,
load_workbook
from
retry
import
retry
from
base.BaseCore
import
BaseCore
from
selenium
import
webdriver
baseCore
=
BaseCore
()
log
=
baseCore
.
getLogger
()
cnx
=
baseCore
.
cnx
cursor
=
baseCore
.
cursor
db_storage
=
pymongo
.
MongoClient
(
'mongodb://114.115.221.202:27017/'
,
username
=
'admin'
,
password
=
'ZZsn@9988'
)
.
ZZSN
[
'天眼查登录信息'
]
class
File
():
# 创建文件
def
createFile
(
self
,
file_name
):
if
os
.
path
.
exists
(
file_name
):
return
else
:
wb
=
Workbook
()
sheet
=
wb
.
active
# 更改默认的sheet名称
sheet
.
title
=
"需处理企业"
sheet
.
append
([
"企业名称"
,
"社会信用代码"
])
# 创建另一个sheet
sheet2
=
wb
.
create_sheet
(
"获取基本信息成功企业"
)
sheet2
.
append
([
"企业名称"
,
"采到的企业名称"
,
"社会信用代码"
,
"采到的信用代码"
])
wb
.
save
(
file_name
)
wb
.
close
()
# 删除文件
def
deleteFile
(
self
,
file_name
):
if
os
.
path
.
exists
(
file_name
):
os
.
remove
(
file_name
)
else
:
pass
# 追加数据
def
appenddata
(
self
,
file_name
,
sheet
,
data
):
# 打开现有的Excel文件
wb
=
load_workbook
(
file_name
)
# 选择要追加数据的sheet
sheet
=
wb
[
sheet
]
sheet
.
append
(
data
)
# 保存Excel文件
wb
.
save
(
file_name
)
wb
.
close
()
class
Token
():
# 获取token
def
get_cookies
(
self
):
flg
=
False
query
=
{
"fenghaoTime"
:
{
"$lt"
:
str
(
datetime
.
datetime
.
now
()
-
datetime
.
timedelta
(
hours
=
2
))}}
result
=
db_storage
.
find_one
(
query
,
sort
=
[(
'updateTime'
,
1
)])
# results = db_storage.find({}, sort=[('updateTime', 1)])
if
result
:
flg
=
True
# for result in results:
# if result['fenghaoTime'] < result['updateTime']:
# flg = True
# break
if
flg
:
cookies
=
result
[
'cookies'
]
id_token
=
result
[
'_id'
]
user_name
=
result
[
'name'
]
return
cookies
,
id_token
,
user_name
else
:
return
''
,
''
,
''
# 删除失效的token
def
delete_token
(
self
,
cookie_
):
deletesql
=
f
"delete from QCC_token where id='{cookie_}' "
cursor
.
execute
(
deletesql
)
cnx
.
commit
()
# token的处理
def
updateTokeen
(
self
,
id_token
,
type
):
if
type
==
1
:
# session失效,删除token
cursor
.
execute
(
f
"delete from QCC_token where id={id_token}"
)
if
type
==
2
:
# 封号了 修改封号时间
filter
=
{
'_id'
:
ObjectId
(
id_token
)}
# 更新操作
update
=
{
'$set'
:
{
'fenghaoTime'
:
datetime
.
datetime
.
now
()
.
strftime
(
'
%
Y-
%
m-
%
d
%
H:
%
M:
%
S'
)}}
# 执行更新操作
db_storage
.
update_one
(
filter
,
update
)
if
type
==
3
:
# 修改使用时间
filter
=
{
'_id'
:
ObjectId
(
id_token
)}
# 更新操作
update
=
{
'$set'
:
{
'updateTime'
:
datetime
.
datetime
.
now
()
.
strftime
(
'
%
Y-
%
m-
%
d
%
H:
%
M:
%
S'
)}}
# 执行更新操作
db_storage
.
update_one
(
filter
,
update
)
# cnx.commit()
@retry
(
tries
=
2
,
delay
=
5
)
def
sendData
(
url
,
dics
):
dicJson
=
json
.
dumps
(
dics
)
req
=
requests
.
post
(
url
=
url
,
timeout
=
300
,
verify
=
False
,
data
=
dicJson
)
if
req
.
status_code
!=
200
:
raise
return
req
class
Driver
():
def
create_driver
(
self
):
path
=
r'D:\soft\msedgedriver.exe'
# options = webdriver.EdgeOptions()
options
=
{
"browserName"
:
"MicrosoftEdge"
,
"ms:edgeOptions"
:
{
"extensions"
:
[],
"args"
:
[
"--start-maximized"
]
# 添加最大化窗口运作参数
}
}
session
=
webdriver
.
Edge
(
executable_path
=
path
,
capabilities
=
options
)
return
session
class
Login
():
def
login
(
self
,
driver
):
cookies
=
{}
cookies_list
,
id_cookie
,
user_name
=
Token
()
.
get_cookies
()
if
cookies_list
:
pass
else
:
log
.
info
(
"没有账号了,等待30分钟"
)
time
.
sleep
(
30
*
60
)
return
''
,
''
,
''
log
.
info
(
f
'=====当前使用的是{user_name}的cookie======'
)
for
cookie
in
cookies_list
:
driver
.
add_cookie
(
cookie
)
time
.
sleep
(
3
)
driver
.
refresh
()
time
.
sleep
(
3
)
# jar = requests.cookies.RequestsCookieJar() # 创建一个Cookie Jar对象
update_headers
=
{}
for
cookie
in
cookies_list
:
cookies
[
cookie
[
'name'
]]
=
cookie
[
'value'
]
# jar.set(cookie['name'], cookie['value'])
if
cookie
[
'name'
]
==
'auth_token'
:
update_headers
[
'X-AUTH-TOKEN'
]
=
cookie
[
'value'
]
if
cookie
[
'name'
]
==
'TYCID'
:
update_headers
[
'X-TYCID'
]
=
cookie
[
'value'
]
s
=
requests
.
Session
()
s
.
cookies
.
update
(
cookies
)
# s.cookies.update(jar)
return
driver
,
id_cookie
,
s
,
update_headers
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论