Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
Z
zzsn_spider
概览
概览
详情
活动
周期分析
版本库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
统计图
问题
0
议题
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
CI / CD
CI / CD
流水线
作业
日程表
图表
维基
Wiki
代码片段
代码片段
成员
成员
折叠边栏
关闭边栏
活动
图像
聊天
创建新问题
作业
提交
问题看板
Open sidebar
王景浩
zzsn_spider
Commits
a741a5e1
提交
a741a5e1
authored
8月 29, 2023
作者:
薛凌堃
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
微信公众号
上级
300d75bb
隐藏空白字符变更
内嵌
并排
正在显示
1 个修改的文件
包含
74 行增加
和
71 行删除
+74
-71
oneWeixin.py
comData/weixin_solo/oneWeixin.py
+74
-71
没有找到文件。
comData/weixin_solo/oneWeixin.py
浏览文件 @
a741a5e1
...
@@ -26,7 +26,7 @@ r = baseCore.r
...
@@ -26,7 +26,7 @@ r = baseCore.r
urllib3
.
disable_warnings
()
urllib3
.
disable_warnings
()
def
check_url
(
sid
,
article_url
):
def
check_url
(
sid
,
article_url
):
r
=
redis
.
Redis
(
host
=
"114.115.236.206"
,
port
=
6379
,
password
=
'clbzzsn'
)
#
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn')
res
=
r
.
sismember
(
f
'wx_url_{sid}'
,
article_url
)
res
=
r
.
sismember
(
f
'wx_url_{sid}'
,
article_url
)
if
res
==
True
:
if
res
==
True
:
return
True
return
True
...
@@ -34,7 +34,7 @@ def check_url(sid, article_url):
...
@@ -34,7 +34,7 @@ def check_url(sid, article_url):
return
False
return
False
def
add_url
(
sid
,
article_url
):
def
add_url
(
sid
,
article_url
):
r
=
redis
.
Redis
(
host
=
"114.115.236.206"
,
port
=
6379
,
password
=
'clbzzsn'
)
#
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn')
res
=
r
.
sadd
(
f
'wx_url_{sid}'
,
article_url
,
3
)
# 注意是 保存set的方式
res
=
r
.
sadd
(
f
'wx_url_{sid}'
,
article_url
,
3
)
# 注意是 保存set的方式
if
res
==
0
:
# 若返回0,说明插入不成功,表示有重复
if
res
==
0
:
# 若返回0,说明插入不成功,表示有重复
return
True
return
True
...
@@ -88,10 +88,10 @@ def get_info(sid,json_search,origin,url_,info_source_code,page):
...
@@ -88,10 +88,10 @@ def get_info(sid,json_search,origin,url_,info_source_code,page):
url_news
=
one_news
[
'link'
]
url_news
=
one_news
[
'link'
]
url_ft
=
check_url
(
sid
,
url_news
)
#
url_ft = check_url(sid, url_news)
if
url_ft
:
#
if url_ft:
log
.
info
(
f
'-----{origin}--第{page}页--已采过该篇文章--文章链接--{url_news}-----'
)
#
log.info(f'-----{origin}--第{page}页--已采过该篇文章--文章链接--{url_news}-----')
return
list_all_info
,
num_caiji
#
return list_all_info,num_caiji
try
:
try
:
ip
=
baseCore
.
get_proxy
()
ip
=
baseCore
.
get_proxy
()
res_news
=
requests
.
get
(
url_news
,
timeout
=
20
,
proxies
=
ip
)
res_news
=
requests
.
get
(
url_news
,
timeout
=
20
,
proxies
=
ip
)
...
@@ -163,56 +163,59 @@ def get_info(sid,json_search,origin,url_,info_source_code,page):
...
@@ -163,56 +163,59 @@ def get_info(sid,json_search,origin,url_,info_source_code,page):
time_now
=
time
.
strftime
(
"
%
Y-
%
m-
%
d
%
H:
%
M:
%
S"
,
time
.
localtime
())
time_now
=
time
.
strftime
(
"
%
Y-
%
m-
%
d
%
H:
%
M:
%
S"
,
time
.
localtime
())
#将信息传输到kafka中
#将信息传输到kafka中
dic_info
=
{
# dic_info = {
'sid'
:
sid
,
# 'sid': sid,
'title'
:
news_title
,
# 'title': news_title,
'content'
:
news_content
,
# 'content': news_content,
'contentWithtag'
:
str
(
news_html
),
# 'contentWithtag': str(news_html),
'summary'
:
''
,
# 'summary': '',
'author'
:
''
,
# 'author': '',
'origin'
:
origin
,
# 'origin': origin,
'publishDate'
:
news_date
,
# 'publishDate': news_date,
'sourceAddress'
:
url_news
,
# 'sourceAddress': url_news,
'source'
:
'11'
,
# 'source': '11',
'createDate'
:
time_now
# 'createDate': time_now
}
# }
for
nnn
in
range
(
0
,
3
):
# for nnn in range(0, 3):
producer
=
KafkaProducer
(
bootstrap_servers
=
[
'114.115.159.144:9092'
])
# producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
try
:
# try:
kafka_result
=
producer
.
send
(
"crawlerInfo"
,
json
.
dumps
(
dic_info
,
ensure_ascii
=
False
)
.
encode
(
'utf8'
))
# kafka_result = producer.send("crawlerInfo", json.dumps(dic_info, ensure_ascii=False).encode('utf8'))
kafka_time_out
=
kafka_result
.
get
(
timeout
=
10
)
# kafka_time_out = kafka_result.get(timeout=10)
add_url
(
sid
,
url_news
)
# add_url(sid, url_news)
break
# break
except
:
# except:
time
.
sleep
(
5
)
# time.sleep(5)
continue
# continue
finally
:
# finally:
producer
.
close
()
# producer.close()
num_caiji
=
num_caiji
+
1
# num_caiji = num_caiji + 1
list_all_info
.
append
(
dic_info
)
# list_all_info.append(dic_info)
time
.
sleep
(
5
)
# time.sleep(5)
time_now
=
time
.
strftime
(
"
%
Y-
%
m-
%
d
%
H:
%
M:
%
S"
,
time
.
localtime
())
# time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_info2
=
{
# dic_info2 = {
'infoSourceId'
:
sid
,
# 'infoSourceId': sid,
'code'
:
info_source_code
,
# 'code': info_source_code,
'num'
:
num_caiji
,
# 'num': num_caiji,
'collectTime'
:
kaishi_time
,
# 'collectTime': kaishi_time,
'dispatcherTime'
:
time_now
,
# 'dispatcherTime': time_now,
'dispatcherStatus'
:
'1'
,
# 'dispatcherStatus': '1',
'source'
:
'1'
,
# 'source': '1',
}
# }
for
nnn2
in
range
(
0
,
3
):
# for nnn2 in range(0, 3):
try
:
# producer2 = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
producer2
=
KafkaProducer
(
bootstrap_servers
=
[
'114.115.159.144:9092'
])
# try:
kafka_result2
=
producer2
.
send
(
"collectionAndDispatcherInfo"
,
# # producer2 = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
json
.
dumps
(
dic_info2
,
ensure_ascii
=
False
)
.
encode
(
'utf8'
))
# kafka_result2 = producer2.send("collectionAndDispatcherInfo",
break
# json.dumps(dic_info2, ensure_ascii=False).encode('utf8'))
except
:
# break
time
.
sleep
(
5
)
# except:
continue
# time.sleep(5)
# continue
# finally:
# producer2.close()
return
list_all_info
,
num_caiji
return
list_all_info
,
num_caiji
def
RequestUrl
(
dic_url
,
token
,
key
):
def
RequestUrl
(
dic_url
,
token
,
key
,
i
):
start_
=
time
.
time
()
start_
=
time
.
time
()
url_
=
dic_url
[
'url_'
]
url_
=
dic_url
[
'url_'
]
origin
=
dic_url
[
'name'
]
origin
=
dic_url
[
'name'
]
...
@@ -220,14 +223,13 @@ def RequestUrl(dic_url,token,key):
...
@@ -220,14 +223,13 @@ def RequestUrl(dic_url,token,key):
sid
=
dic_url
[
'sid'
]
sid
=
dic_url
[
'sid'
]
biz
=
dic_url
[
'biz'
]
biz
=
dic_url
[
'biz'
]
fakeid
=
biz
+
'=='
fakeid
=
biz
+
'=='
url_search
=
f
'https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin=
0
&count=5&fakeid={fakeid}&type=9&query=&token={token}&lang=zh_CN&f=json&ajax=1'
url_search
=
f
'https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin=
{i}
&count=5&fakeid={fakeid}&type=9&query=&token={token}&lang=zh_CN&f=json&ajax=1'
ret
=
-
1
ret
=
-
1
json_search
=
''
json_search
=
''
# 获取页数
# 获取页数
try
:
try
:
# ip = baseCore.get_proxy()
ip
=
baseCore
.
get_proxy
()
json_search
=
s
.
get
(
url_search
,
headers
=
headers
,
json_search
=
s
.
get
(
url_search
,
headers
=
headers
,
proxies
=
ip
,
verify
=
False
)
.
json
()
# , proxies=ip, verify=False
verify
=
False
)
.
json
()
# , proxies=ip, verify=False
str_t
=
json
.
dumps
(
json_search
)
str_t
=
json
.
dumps
(
json_search
)
time
.
sleep
(
1
)
time
.
sleep
(
1
)
except
Exception
as
e
:
except
Exception
as
e
:
...
@@ -243,7 +245,7 @@ def RequestUrl(dic_url,token,key):
...
@@ -243,7 +245,7 @@ def RequestUrl(dic_url,token,key):
# {'base_resp': {'err_msg': 'invalid args', 'ret': 200002}} 公众号biz错误 链接
# {'base_resp': {'err_msg': 'invalid args', 'ret': 200002}} 公众号biz错误 链接
# 'base_resp': {'err_msg': 'ok', 'ret': 0} 正常
# 'base_resp': {'err_msg': 'ok', 'ret': 0} 正常
if
ret
==
0
:
if
ret
==
0
:
pass
return
json_search
,
ret
elif
ret
==
200013
:
elif
ret
==
200013
:
# 重新放入redis
# 重新放入redis
# time.sleep(3600)
# time.sleep(3600)
...
@@ -315,17 +317,17 @@ def job(count,key):
...
@@ -315,17 +317,17 @@ def job(count,key):
log
.
info
(
'===========获取公众号============'
)
log
.
info
(
'===========获取公众号============'
)
start_
=
time
.
time
()
start_
=
time
.
time
()
#todo:redis中数据 pop一条
#todo:redis中数据 pop一条
infoSourceCode
=
baseCore
.
redicPullData
(
'WeiXinGZH:infoSourceCode'
)
#
infoSourceCode = baseCore.redicPullData('WeiXinGZH:infoSourceCode')
if
infoSourceCode
==
'None'
or
infoSourceCode
==
None
:
#
if infoSourceCode == 'None' or infoSourceCode == None:
#当一次采集完之后,重新插入数据并等待插入完成
#
#当一次采集完之后,重新插入数据并等待插入完成
getFromSql
()
#
getFromSql()
time
.
sleep
(
20
)
#
time.sleep(20)
log
.
info
(
f
'========本次公众号已采集完毕,共采集{count}个公众号=========总耗时:{baseCore.getTimeCost(start_,time.time())}'
)
#
log.info(f'========本次公众号已采集完毕,共采集{count}个公众号=========总耗时:{baseCore.getTimeCost(start_,time.time())}')
return
count
#
return count
sql
=
f
"
SELECT site_uri,id,site_name,info_source_code from info_source where info_source_code = '{infoSourceCode}' "
# sql = f"--
SELECT site_uri,id,site_name,info_source_code from info_source where info_source_code = '{infoSourceCode}' "
# '一带一路百人论坛'
# '一带一路百人论坛'
# sql = f"-- SELECT site_uri,id,site_name,info_source_code from info_source where info_source_code = 'IN-20220609-57436
' "
sql
=
f
"SELECT site_uri,id,site_name,info_source_code from info_source where info_source_code = 'IN-20230630-0010
' "
cursor
.
execute
(
sql
)
cursor
.
execute
(
sql
)
row
=
cursor
.
fetchone
()
row
=
cursor
.
fetchone
()
...
@@ -362,7 +364,8 @@ def job(count,key):
...
@@ -362,7 +364,8 @@ def job(count,key):
cursor_
.
execute
(
insertSql
,
tuple
(
error
))
cursor_
.
execute
(
insertSql
,
tuple
(
error
))
cnx_
.
commit
()
cnx_
.
commit
()
return
count
return
count
json_search
,
ret
=
RequestUrl
(
dic_url
,
token
,
key
)
i
=
0
json_search
,
ret
=
RequestUrl
(
dic_url
,
token
,
key
,
i
)
if
ret
==
0
:
if
ret
==
0
:
try
:
try
:
Max_data
=
int
(
json_search
[
'app_msg_cnt'
])
Max_data
=
int
(
json_search
[
'app_msg_cnt'
])
...
@@ -376,7 +379,7 @@ def job(count,key):
...
@@ -376,7 +379,7 @@ def job(count,key):
Max_data
=
5
Max_data
=
5
log
.
info
(
f
'开始采集{origin}-----共{Max_page}页---{Max_data}条数据-----'
)
log
.
info
(
f
'开始采集{origin}-----共{Max_page}页---{Max_data}条数据-----'
)
for
i
in
range
(
0
,
Max_data
,
5
):
for
i
in
range
(
0
,
Max_data
,
5
):
json_search
,
ret
=
RequestUrl
(
dic_url
,
token
,
key
)
json_search
,
ret
=
RequestUrl
(
dic_url
,
token
,
key
,
i
)
if
ret
==
0
:
if
ret
==
0
:
pass
pass
else
:
else
:
...
...
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论