Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
Z
zzsn_spider
概览
概览
详情
活动
周期分析
版本库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
统计图
问题
0
议题
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
CI / CD
CI / CD
流水线
作业
日程表
图表
维基
Wiki
代码片段
代码片段
成员
成员
折叠边栏
关闭边栏
活动
图像
聊天
创建新问题
作业
提交
问题看板
Open sidebar
王景浩
zzsn_spider
Commits
f30a6dae
提交
f30a6dae
authored
8月 02, 2023
作者:
刘伟刚
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
上传新文件
上级
ac997c9c
显示空白字符变更
内嵌
并排
正在显示
1 个修改的文件
包含
195 行增加
和
0 行删除
+195
-0
baidutaskJob.py
百度采集/baidutaskJob.py
+195
-0
没有找到文件。
百度采集/baidutaskJob.py
0 → 100644
浏览文件 @
f30a6dae
"""
"""
任务集成测试
1、连接redis做取出
2、连接kafka做信息的获取,与存储
"""
import
time
import
redis
from
kafka
import
KafkaProducer
from
kafka
import
KafkaConsumer
import
json
import
itertools
from
baiduSpider
import
BaiduSpider
import
concurrent.futures
from
baseCore
import
BaseCore
from
queue
import
Queue
import
configparser
class
BaiduTaskJob
(
object
):
def
__init__
(
self
):
# 创建ConfigParser对象
self
.
config
=
configparser
.
ConfigParser
()
# 读取配置文件
self
.
config
.
read
(
'config.ini'
)
self
.
r
=
redis
.
Redis
(
host
=
self
.
config
.
get
(
'redis'
,
'host'
),
port
=
self
.
config
.
get
(
'redis'
,
'port'
),
password
=
self
.
config
.
get
(
'redis'
,
'pass'
),
db
=
0
)
def
getkafka
(
self
):
# Kafka集群的地址
bootstrap_servers
=
self
.
config
.
get
(
'kafka'
,
'bootstrap_servers'
)
# 要订阅的主题
topic
=
self
.
config
.
get
(
'kafka'
,
'topic'
)
groupId
=
self
.
config
.
get
(
'kafka'
,
'groupId'
)
consumer
=
KafkaConsumer
(
topic
,
group_id
=
groupId
,
bootstrap_servers
=
[
bootstrap_servers
],
value_deserializer
=
lambda
m
:
json
.
loads
(
m
.
decode
(
'utf-8'
)))
try
:
for
record
in
consumer
:
try
:
logger
.
info
(
"value:"
,
record
.
value
)
keymsg
=
record
.
value
if
keymsg
:
break
else
:
continue
#print("%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value))
except
Exception
as
e
:
logger
.
info
(
"msg.value error:"
,
e
)
except
KeyboardInterrupt
as
e
:
keymsg
=
{}
finally
:
consumer
.
close
()
return
keymsg
def
getkeyFromredis
(
self
,
codeid
):
kvalue
=
self
.
r
.
get
(
'KEY_WORDS_TO_REDIS::'
+
codeid
)
kvalue
=
kvalue
.
decode
(
'utf-8'
)
kvalue
=
json
.
loads
(
kvalue
)
return
kvalue
def
getkeywords
(
self
,
keywords
):
kwList
=
[]
if
')+('
in
keywords
:
k1List
=
keywords
.
split
(
'+'
)
kk2
=
[]
for
k2
in
k1List
:
k2
=
k2
.
strip
(
"()"
)
k2List
=
k2
.
split
(
'|'
)
kk2
.
append
(
k2List
)
if
len
(
kk2
)
==
2
:
result
=
list
(
itertools
.
product
(
kk2
[
0
],
kk2
[
1
]))
elif
len
(
kk2
)
==
3
:
result
=
list
(
itertools
.
product
(
kk2
[
0
],
kk2
[
1
],
kk2
[
2
]))
elif
len
(
kk2
)
==
4
:
result
=
list
(
itertools
.
product
(
kk2
[
0
],
kk2
[
1
],
kk2
[
2
],
kk2
[
3
]))
for
res
in
result
:
kwstr
=
''
for
kw
in
res
:
kwstr
+=
kw
+
"+"
kwList
.
append
(
kwstr
.
strip
(
'+'
))
elif
'+('
in
keywords
:
k1List
=
keywords
.
split
(
'+'
)
kk2
=
[]
for
k2
in
k1List
:
k2
=
k2
.
strip
(
"()"
)
k2List
=
k2
.
split
(
'|'
)
kk2
.
append
(
k2List
)
if
len
(
kk2
)
==
2
:
result
=
list
(
itertools
.
product
(
kk2
[
0
],
kk2
[
1
]))
for
res
in
result
:
kwstr
=
''
for
kw
in
res
:
kwstr
+=
kw
+
"+"
kwList
.
append
(
kwstr
.
strip
(
'+'
))
else
:
k3
=
keywords
.
split
(
"|"
)
kwList
=
k3
return
kwList
def
paserKeyMsg
(
self
,
keymsg
):
logger
.
info
(
'----------'
)
wordsCode
=
keymsg
[
'wordsCode'
]
id
=
keymsg
[
'id'
]
try
:
searchEngines
=
keymsg
[
'searchEngines'
]
except
Exception
as
e
:
searchEngines
=
[]
kwList
=
[]
if
searchEngines
:
if
'3'
in
searchEngines
:
keyword
=
keymsg
[
'keyWord'
]
keymsglist
=
self
.
getkeywords
(
keyword
)
for
kw
in
keymsglist
:
kwmsg
=
{
'kw'
:
kw
,
'wordsCode'
:
wordsCode
,
'sid'
:
id
}
kwList
.
append
(
kwmsg
)
else
:
logger
.
info
(
'+++++'
)
keyword
=
keymsg
[
'keyWord'
]
keymsglist
=
self
.
getkeywords
(
keyword
)
for
kw
in
keymsglist
:
kwmsg
=
{
'kw'
:
kw
,
'wordsCode'
:
wordsCode
,
'sid'
:
id
}
kwList
.
append
(
kwmsg
)
return
kwList
def
runSpider
(
self
,
kwmsg
):
try
:
searchkw
=
kwmsg
[
'kw'
]
wordsCode
=
kwmsg
[
'wordsCode'
]
sid
=
kwmsg
[
'sid'
]
baiduSpider
=
BaiduSpider
(
searchkw
,
wordsCode
,
sid
)
baiduSpider
.
get_page_html
()
baiduSpider
.
get_detail_html
()
except
Exception
as
e
:
logger
.
info
(
'百度搜索异常'
+
searchkw
)
finally
:
baiduSpider
.
driver
.
quit
()
logger
.
info
(
"关键词采集结束!"
+
searchkw
)
if
__name__
==
'__main__'
:
# ss='道地西洋参+(销售市场|交易市场|直播带货|借助大会平台|网店|微商|电商|农民博主|推介宣传|高品质定位|西洋参产品经营者加盟|引进龙头企业|西洋参冷风库|建设农旅中心|农产品展销中心|精品民宿|温泉)'
# keymsglist=getkeywords(ss)
# print(keymsglist)
# 创建Redis连接
baiduTaskJob
=
BaiduTaskJob
()
baseCore
=
BaseCore
()
logger
=
baseCore
.
getLogger
()
print
(
'---------------'
)
while
True
:
try
:
try
:
keymsg
=
baiduTaskJob
.
getkafka
()
kwList
=
baiduTaskJob
.
paserKeyMsg
(
keymsg
)
except
Exception
as
e
:
logger
.
info
(
"从kafka拿取信息失败!"
)
time
.
sleep
(
5
)
continue
if
kwList
:
# 创建一个线程池,指定线程数量为4
with
concurrent
.
futures
.
ThreadPoolExecutor
(
max_workers
=
2
)
as
executor
:
# 提交任务给线程池,每个任务处理一个数据
results
=
[
executor
.
submit
(
baiduTaskJob
.
runSpider
,
data
)
for
data
in
kwList
]
# 获取任务的执行结果
for
future
in
concurrent
.
futures
.
as_completed
(
results
):
try
:
result
=
future
.
result
()
# 处理任务的执行结果
logger
.
info
(
f
"任务执行结束: {result}"
)
except
Exception
as
e
:
# 处理任务执行过程中的异常
logger
.
info
(
f
"任务执行exception: {e}"
)
except
Exception
as
e
:
logger
.
info
(
'采集异常'
)
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论