提交 4a8ab091 作者: 刘伟刚

百度采集修改

上级 c5576d56
...@@ -202,8 +202,8 @@ if __name__=="__main__": ...@@ -202,8 +202,8 @@ if __name__=="__main__":
opt.add_experimental_option("excludeSwitches", ["enable-automation"]) opt.add_experimental_option("excludeSwitches", ["enable-automation"])
opt.add_experimental_option('excludeSwitches', ['enable-logging']) opt.add_experimental_option('excludeSwitches', ['enable-logging'])
opt.add_experimental_option('useAutomationExtension', False) opt.add_experimental_option('useAutomationExtension', False)
opt.binary_location =r'D:\crawler\baidu_crawler\tool\Google\Chrome\Application\chrome.exe'
chromedriver = 'D:/chrome/chromedriver.exe' chromedriver = r'C:\Users\WIN10\DataspellProjects\crawlerProjectDemo\tmpcrawler\cmd100\chromedriver.exe'
browser1 = webdriver.Chrome(chrome_options=opt, executable_path=chromedriver) browser1 = webdriver.Chrome(chrome_options=opt, executable_path=chromedriver)
......
# -*- coding: utf-8 -*-
# 智能采集请求
# 1、考虑:请求智能采集时,不再使用实体类
# a. 仍使用:通过HTTP的 raw 请求体,直接传递HTML源文件,通过query参数传递 lang-code、link-text 参数
# b. 原因:在 postman 中,不方便进行测试,无法使用粘贴后的HTML源文件
# 2、不考虑:使用实体类,利大于弊
# a. 使用实体类,方便扩展参数字段
# b. 方便展示接口文档:调用 json_parameter_utility.get_json_parameters 函数,可显示请求实体类
class ExtractionRequest:
# 语言代码
# 1、采集“非中文”的文章时,需要用到语言代码
lang_code = ""
# 链接文本
# 1、用于采集标题,如果不提供,标题的准确度会下降
link_text = ""
# 文章页面源文件
# 1、用于采集标题、发布时间、内容等
article_html = ""
@staticmethod
def from_dict(dictionary: dict):
extraction_request = ExtractionRequest()
# 尝试方法:
# 1、将字典,更新到内部的 __dict__ 对象
# extraction_request.__dict__.update(dictionary)
# 将字典值,设置到当前对象
for key in dictionary:
setattr(extraction_request, key, dictionary[key])
return extraction_request
def to_dict(self):
# 转换为字典对象:
# 1、序列化为JSON时,需要调用此方法
# 2、转换为JSON字符串:json.dumps(extraction_result, default=ExtractionResult.to_dict)
data = {}
# 借助内部的 __dict__ 对象
# 1、将内部的 __dict__ 对象,更新到新的字典对象中
data.update(self.__dict__)
return data
# 采集结果
class ExtractionResult:
# 标题
title = ""
# 发布日期
publish_date = ""
# 正文(保留所有HTML标记,如:br、img)
text = ""
# URL
url = ""
# 摘要
meta_description = ""
# 干净正文(不带HTML)
cleaned_text = ""
# 来源(目前只支持采集中文网站中的“来源”)
# source = ""
# 顶部图片(top_image:采集不到任何内容,不再使用此属性)
# top_image = ""
def to_dict(self):
# 转换为字典对象:
# 1、序列化为JSON时,需要调用此方法
# 2、转换为JSON字符串:json.dumps(extraction_result, default=ExtractionResult.to_dict)
data = {}
# 借助内部的 __dict__ 对象
# 1、将内部的 __dict__ 对象,更新到新的字典对象中
data.update(self.__dict__)
return data
class UrlPickingRequest:
# 列表页面的响应URL
# 1、作为Base URL,用于拼接提取到的相对URL
# 2、Base URL:必须使用响应URL
# 3、示例:在 Python中,通过 requests.get(url) 请求URL后,需要使用 resp.url 作为 Base URL
list_page_resp_url = ""
# 列表页面源文件
# 1、用于提取文章网址
list_page_html = ""
@staticmethod
def from_dict(dictionary: dict):
url_picking_request = UrlPickingRequest()
# 将字典值,设置到当前对象
for key in dictionary:
setattr(url_picking_request, key, dictionary[key])
return url_picking_request
def to_dict(self):
# 转换为字典对象:
# 1、序列化为JSON时,需要调用此方法
# 2、转换为JSON字符串:json.dumps(extraction_result, default=ExtractionResult.to_dict)
data = {}
# 借助内部的 __dict__ 对象
# 1、将内部的 __dict__ 对象,更新到新的字典对象中
data.update(self.__dict__)
return data
"""
任务集成测试
1、连接redis做取出
2、连接kafka做信息的获取,与存储
"""
import time
import redis
from kafka import KafkaProducer
from kafka import KafkaConsumer
import json
import itertools
from googleSpider import GoogleSpider
import concurrent.futures
from baseCore import BaseCore
from queue import Queue
import configparser
class GoogleTaskJob(object):
def __init__(self):
# 创建ConfigParser对象
self.config = configparser.ConfigParser()
# 读取配置文件
self.config.read('config.ini')
self.r = redis.Redis(host=self.config.get('redis', 'host'),
port=self.config.get('redis', 'port'),
password=self.config.get('redis', 'pass'), db=0)
def getkafka(self):
# Kafka集群的地址
bootstrap_servers = self.config.get('kafka', 'bootstrap_servers')
# 要订阅的主题
topic = self.config.get('kafka', 'topic')
groupId=self.config.get('kafka', 'groupId')
consumer = KafkaConsumer(topic, group_id=groupId,
bootstrap_servers=[bootstrap_servers],
value_deserializer=lambda m: json.loads(m.decode('utf-8')))
try:
for record in consumer:
try:
logger.info("value:",record.value)
keymsg=record.value
if keymsg:
break
else:
continue
#print("%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value))
except Exception as e:
logger.info("msg.value error:",e)
except KeyboardInterrupt as e:
keymsg={}
finally:
consumer.close()
return keymsg
def getkeyFromredis(self,codeid):
kvalue=self.r.get('KEY_WORDS_TO_REDIS::'+codeid)
kvalue=kvalue.decode('utf-8')
kvalue=json.loads(kvalue)
return kvalue
def getkeywords(self,keywords):
kwList=[]
if ')+(' in keywords:
k1List=keywords.split('+')
kk2=[]
for k2 in k1List:
k2=k2.strip("()")
k2List=k2.split('|')
kk2.append(k2List)
if len(kk2)==2:
result = list(itertools.product(kk2[0], kk2[1]))
elif len(kk2)==3:
result = list(itertools.product(kk2[0], kk2[1],kk2[2]))
elif len(kk2)==4:
result = list(itertools.product(kk2[0], kk2[1],kk2[2],kk2[3]))
for res in result:
kwstr=''
for kw in res:
kwstr+=kw+"+"
kwList.append(kwstr.strip('+'))
elif '+(' in keywords:
k1List=keywords.split('+')
kk2=[]
for k2 in k1List:
k2=k2.strip("()")
k2List=k2.split('|')
kk2.append(k2List)
if len(kk2)==2:
result = list(itertools.product(kk2[0], kk2[1]))
for res in result:
kwstr=''
for kw in res:
kwstr+=kw+"+"
kwList.append(kwstr.strip('+'))
else:
k3=keywords.split("|")
kwList=k3
return kwList
def paserKeyMsg(self,keymsg):
logger.info('----------')
wordsCode=keymsg['wordsCode']
id=keymsg['id']
try:
searchEngines=keymsg['searchEngines']
except Exception as e:
searchEngines=[]
kwList=[]
if searchEngines:
if '3' in searchEngines:
keyword=keymsg['keyWord']
keymsglist=self.getkeywords(keyword)
for kw in keymsglist:
kwmsg={
'kw':kw,
'wordsCode':wordsCode,
'sid':id
}
kwList.append(kwmsg)
else:
logger.info('+++++')
keyword=keymsg['keyWord']
keymsglist=self.getkeywords(keyword)
for kw in keymsglist:
kwmsg={
'kw':kw,
'wordsCode':wordsCode,
'sid':id
}
kwList.append(kwmsg)
return kwList
def runSpider(self,kwmsg):
searchkw=kwmsg['kw']
wordsCode=kwmsg['wordsCode']
sid=kwmsg['sid']
googleSpider=GoogleSpider(searchkw,wordsCode,sid)
try:
googleSpider.get_page_html()
googleSpider.get_detail_html()
except Exception as e:
logger.info('百度搜索异常'+searchkw)
finally:
googleSpider.driver.quit()
logger.info("关键词采集结束!"+searchkw)
if __name__ == '__main__':
# ss='道地西洋参+(销售市场|交易市场|直播带货|借助大会平台|网店|微商|电商|农民博主|推介宣传|高品质定位|西洋参产品经营者加盟|引进龙头企业|西洋参冷风库|建设农旅中心|农产品展销中心|精品民宿|温泉)'
# keymsglist=getkeywords(ss)
# print(keymsglist)
# 创建Redis连接
googleTaskJob=GoogleTaskJob()
baseCore=BaseCore()
logger=baseCore.getLogger()
print('---------------')
while True:
try:
try:
keymsg=googleTaskJob.getkafka()
kwList=googleTaskJob.paserKeyMsg(keymsg)
except Exception as e:
logger.info("从kafka拿取信息失败!")
time.sleep(5)
continue
if kwList:
# 创建一个线程池,指定线程数量为4
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
# 提交任务给线程池,每个任务处理一个数据
results = [executor.submit(googleTaskJob.runSpider, data) for data in kwList]
# 获取任务的执行结果
for future in concurrent.futures.as_completed(results):
try:
result = future.result()
# 处理任务的执行结果
logger.info(f"任务执行结束: {result}")
except Exception as e:
# 处理任务执行过程中的异常
logger.info(f"任务执行exception: {e}")
except Exception as e:
logger.info('采集异常')
"""
任务集成测试
1、连接redis做取出
2、连接kafka做信息的获取,与存储
"""
import time
import redis
from kafka import KafkaProducer
from kafka import KafkaConsumer
import json
import itertools
from googleSpider import GoogleSpider
import concurrent.futures
from baseCore import BaseCore
from queue import Queue
import configparser
class GoogleTaskJob(object):
def __init__(self):
# 创建ConfigParser对象
self.config = configparser.ConfigParser()
# 读取配置文件
self.config.read('config.ini')
self.r = redis.Redis(host=self.config.get('redis', 'host'),
port=self.config.get('redis', 'port'),
password=self.config.get('redis', 'pass'), db=0)
def getkafka(self):
# Kafka集群的地址
bootstrap_servers = self.config.get('kafka', 'bootstrap_servers')
# 要订阅的主题
topic = self.config.get('kafka', 'topic')
groupId=self.config.get('kafka', 'groupId')
consumer = KafkaConsumer(topic, group_id=groupId,
bootstrap_servers=[bootstrap_servers],
value_deserializer=lambda m: json.loads(m.decode('utf-8')))
try:
for record in consumer:
try:
logger.info("value:",record.value)
keymsg=record.value
if keymsg:
break
else:
continue
#print("%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value))
except Exception as e:
logger.info("msg.value error:",e)
except KeyboardInterrupt as e:
keymsg={}
finally:
consumer.close()
return keymsg
def getkeyFromredis(self,codeid):
kvalue=self.r.get('KEY_WORDS_TO_REDIS::'+codeid)
kvalue=kvalue.decode('utf-8')
kvalue=json.loads(kvalue)
return kvalue
def getkeywords(self,keywords):
kwList=[]
if ')+(' in keywords:
k1List=keywords.split('+')
kk2=[]
for k2 in k1List:
k2=k2.strip("()")
k2List=k2.split('|')
kk2.append(k2List)
if len(kk2)==2:
result = list(itertools.product(kk2[0], kk2[1]))
elif len(kk2)==3:
result = list(itertools.product(kk2[0], kk2[1],kk2[2]))
elif len(kk2)==4:
result = list(itertools.product(kk2[0], kk2[1],kk2[2],kk2[3]))
for res in result:
kwstr=''
for kw in res:
kwstr+=kw+"+"
kwList.append(kwstr.strip('+'))
elif '+(' in keywords:
k1List=keywords.split('+')
kk2=[]
for k2 in k1List:
k2=k2.strip("()")
k2List=k2.split('|')
kk2.append(k2List)
if len(kk2)==2:
result = list(itertools.product(kk2[0], kk2[1]))
for res in result:
kwstr=''
for kw in res:
kwstr+=kw+"+"
kwList.append(kwstr.strip('+'))
else:
k3=keywords.split("|")
kwList=k3
return kwList
def paserKeyMsg(self,keymsg):
logger.info('----------')
wordsCode=keymsg['wordsCode']
id=keymsg['id']
try:
searchEngines=keymsg['searchEngines']
except Exception as e:
searchEngines=[]
kwList=[]
if searchEngines:
if '4' in searchEngines:
keyword=keymsg['keyWord']
keymsglist=self.getkeywords(keyword)
for kw in keymsglist:
kwmsg={
'kw':kw,
'wordsCode':wordsCode,
'sid':id
}
kwList.append(kwmsg)
else:
logger.info('+++++')
keyword=keymsg['keyWord']
keymsglist=self.getkeywords(keyword)
for kw in keymsglist:
kwmsg={
'kw':kw,
'wordsCode':wordsCode,
'sid':id
}
kwList.append(kwmsg)
return kwList
def runSpider(self,kwmsg):
searchkw=kwmsg['kw']
wordsCode=kwmsg['wordsCode']
sid=kwmsg['sid']
googleSpider=GoogleSpider(searchkw,wordsCode,sid)
try:
googleSpider.get_page_html()
except Exception as e:
logger.info('google搜索异常'+searchkw)
finally:
googleSpider.driver.quit()
try:
googleSpider.get_detail_html()
except Exception as e:
logger.info('google搜索异常'+searchkw)
finally:
googleSpider.driver.quit()
logger.info("关键词采集结束!"+searchkw)
import random
if __name__ == '__main__':
# ss='道地西洋参+(销售市场|交易市场|直播带货|借助大会平台|网店|微商|电商|农民博主|推介宣传|高品质定位|西洋参产品经营者加盟|引进龙头企业|西洋参冷风库|建设农旅中心|农产品展销中心|精品民宿|温泉)'
# keymsglist=getkeywords(ss)
# print(keymsglist)
# 创建Redis连接
googleTaskJob=GoogleTaskJob()
baseCore=BaseCore()
logger=baseCore.getLogger()
print('---------------')
while True:
try:
codeids=[]
# codeid='KW-20230727-0001'
codeids.append('KW-20230814-0001')
codeids.append('KW-20230814-0005')
for codeid in codeids:
try:
# keymsg=baiduTaskJob.getkafka()
keymsg=googleTaskJob.getkeyFromredis(codeid)
kwList=googleTaskJob.paserKeyMsg(keymsg)
# kwList=reversed(kwList)
# 从列表中随机选择5个数据
# kwList = random.sample(kwList, 4)
logger.info(f"需要搜索的关键词:{kwList}")
except Exception as e:
logger.info("从kafka拿取信息失败!")
time.sleep(5)
continue
if kwList:
# 创建一个线程池,指定线程数量为4
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
# 提交任务给线程池,每个任务处理一个数据
results = [executor.submit(googleTaskJob.runSpider, data) for data in kwList]
# 获取任务的执行结果
for future in concurrent.futures.as_completed(results):
try:
result = future.result()
# 处理任务的执行结果
logger.info(f"任务执行结束: {result}")
except Exception as e:
# 处理任务执行过程中的异常
logger.info(f"任务执行exception: {e}")
except Exception as e:
logger.info('采集异常')
import pandas as pd
import requests
from bs4 import BeautifulSoup
from langid import langid
from openpyxl import Workbook
from requests.packages import urllib3
from smart_extractor import SmartExtractor
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
def detect_language(html):
soup = BeautifulSoup(html, "html.parser")
text = soup.get_text()
# 使用langid.py判断文本的语言
lang, confidence = langid.classify(text)
return lang
def webDriver(url):
chrome_driver =f'C:/Users/WIN10/DataspellProjects/crawlerProjectDemo/tmpcrawler/cmd100/chromedriver.exe'
path = Service(chrome_driver)
chrome_options = webdriver.ChromeOptions()
chrome_options.binary_location =f'D:/crawler/baidu_crawler/tool/Google/Chrome/Application/chrome.exe'
driver = webdriver.Chrome(service=path,chrome_options=chrome_options)
html=''
try:
driver.get(url)
# 等待页面加载完成
wait = WebDriverWait(driver, 10)
wait.until(EC.presence_of_element_located((By.TAG_NAME, "body")))
html=driver.page_source
except Exception as e:
print('请求失败')
finally:
# 关闭driver
driver.quit()
return html
def extractorMsg(url,title):
content=''
contentWithTag=''
try:
lang=detect_language(title)
sm=SmartExtractor(lang)
article=sm.extract_by_url(url=url)
content=article.cleaned_text
contentWithTag=article.text
except Exception as e:
try:
raw_html=webDriver(url)
lang=detect_language(title)
sm=SmartExtractor(lang)
article=sm.extract_by_html(raw_html)
content=article.cleaned_text
contentWithTag=article.text
except Exception as e:
print('抽取失败!!')
return content,contentWithTag
def readExcel():
detailmsgList=[]
# 读取Excel文件
data = pd.read_excel('./xls/sid_google.xlsx', dtype=str)
# 使用iterrows()方法遍历数据
for index, row in data.iterrows():
# 遍历每一行的数据
detailmsg={}
for column in data.columns:
# value = row[column]
detailmsg[column]=row[column]
detailmsgList.append(detailmsg)
return detailmsgList
def extratorMsg(detailmsgList):
for detail in detailmsgList:
detailurl=detail['detailurl']
title=detail['title']
content,contentWithTag=extractorMsg(detailurl,title)
if content=='' or content is None:
content=detail['content']
contentWithTag=detail['content_with_tag']
detail['content']=content
detail['content_with_tag']=contentWithTag
# 将数字列转换为字符串
print(detail['id'])
detailList=[]
detailList.append(detail)
writerToExcel(detailList)
# 将数据追加到excel
def writerToExcel(detailList):
# filename='google.xlsx'
# 读取已存在的xlsx文件
existing_data = pd.read_excel(filename,dtype=str)
# 创建新的数据
new_data = pd.DataFrame(data=detailList)
# 将新数据添加到现有数据的末尾
combined_data = existing_data.append(new_data, ignore_index=True)
# 将结果写入到xlsx文件
combined_data.to_excel(filename, index=False)
if __name__ == '__main__':
filename='google.xlsx'
# 创建一个工作簿
workbook = Workbook()
workbook.save(filename)
detailmsgList=readExcel()
extratorMsg(detailmsgList)
\ No newline at end of file
title google_comm
chcp 65001
cd /d %~dp0
python baidutaskJob.py
\ No newline at end of file
import datetime
from goose3 import Goose
from goose3.text import StopWordsArabic, StopWordsChinese
from smart_extractor import SmartExtractor
import jieba
if __name__ == '__main__':
# # url='https://www.eet-china.com/news/202302025307.html'
# url='https://www.finam.ru/publications/item/rzhd-gotovy-predlozhit-bespilotnye-tekhnologii-afrikanskim-partneram-20230728-1403/'
# # url='https://arka.am/en/news/business/rosgosstrakh_armenia_signs_more_than_one_million_mandatory_vehicle_insurance_contracts/'
# # jieba.cut('张三')
# sm=SmartExtractor('en')
# # g = Goose({'stopwords_class': StopWordsChinese})
# # article = g.extract(url)
# # print(article.cleaned_text)
# print('+++++++++++++++++++')
# print(sm.extract_by_url(url).cleaned_text)
needDate='2022-01-01 00:00:00'
needTime = datetime.datetime.strptime(needDate, "%Y-%m-%d %H:%M:%S")
print(needTime)
\ No newline at end of file
""" # -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
""" """
任务集成测试 任务集成测试
1、连接redis做取出 1、连接redis做取出
...@@ -142,17 +143,17 @@ class BaiduTaskJob(object): ...@@ -142,17 +143,17 @@ class BaiduTaskJob(object):
return kwList return kwList
def runSpider(self,kwmsg): def runSpider(self,kwmsg):
searchkw=kwmsg['kw']
wordsCode=kwmsg['wordsCode']
sid=kwmsg['sid']
baiduSpider=BaiduSpider(searchkw,wordsCode,sid)
try: try:
searchkw=kwmsg['kw']
wordsCode=kwmsg['wordsCode']
sid=kwmsg['sid']
baiduSpider=BaiduSpider(searchkw,wordsCode,sid)
baiduSpider.get_page_html() baiduSpider.get_page_html()
baiduSpider.get_detail_html() baiduSpider.get_detail_html()
except Exception as e: except Exception as e:
logger.info('百度搜索异常'+searchkw) logger.info('百度搜索异常'+searchkw)
finally: finally:
time.sleep(3)
baiduSpider.driver.quit() baiduSpider.driver.quit()
logger.info("关键词采集结束!"+searchkw) logger.info("关键词采集结束!"+searchkw)
......
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
"""
任务集成测试
1、连接redis做取出
2、连接kafka做信息的获取,与存储
"""
import time
import redis
from kafka import KafkaProducer
from kafka import KafkaConsumer
import json
import itertools
from baiduSpider import BaiduSpider
import concurrent.futures
from baseCore import BaseCore
from queue import Queue
import configparser
class BaiduTaskJob(object):
def __init__(self):
# 创建ConfigParser对象
self.config = configparser.ConfigParser()
# 读取配置文件
self.config.read('config.ini')
self.r = redis.Redis(host=self.config.get('redis', 'host'),
port=self.config.get('redis', 'port'),
password=self.config.get('redis', 'pass'), db=0)
def getkafka(self):
# Kafka集群的地址
bootstrap_servers = self.config.get('kafka', 'bootstrap_servers')
# 要订阅的主题
topic = self.config.get('kafka', 'topic')
groupId=self.config.get('kafka', 'groupId')
consumer = KafkaConsumer(topic, group_id=groupId,
bootstrap_servers=[bootstrap_servers],
value_deserializer=lambda m: json.loads(m.decode('utf-8')))
try:
for record in consumer:
try:
logger.info("value:",record.value)
keymsg=record.value
if keymsg:
break
else:
continue
#print("%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value))
except Exception as e:
logger.info("msg.value error:",e)
except KeyboardInterrupt as e:
keymsg={}
finally:
consumer.close()
return keymsg
def getkeyFromredis(self,codeid):
kvalue=self.r.get('KEY_WORDS_TO_REDIS::'+codeid)
kvalue=kvalue.decode('utf-8')
kvalue=json.loads(kvalue)
return kvalue
def getkeywords(self,keywords):
kwList=[]
if ')+(' in keywords:
k1List=keywords.split('+')
kk2=[]
for k2 in k1List:
k2=k2.strip("()")
k2List=k2.split('|')
kk2.append(k2List)
if len(kk2)==2:
result = list(itertools.product(kk2[0], kk2[1]))
elif len(kk2)==3:
result = list(itertools.product(kk2[0], kk2[1],kk2[2]))
elif len(kk2)==4:
result = list(itertools.product(kk2[0], kk2[1],kk2[2],kk2[3]))
for res in result:
kwstr=''
for kw in res:
kwstr+=kw+"+"
kwList.append(kwstr.strip('+'))
elif '+(' in keywords:
k1List=keywords.split('+')
kk2=[]
for k2 in k1List:
k2=k2.strip("()")
k2List=k2.split('|')
kk2.append(k2List)
if len(kk2)==2:
result = list(itertools.product(kk2[0], kk2[1]))
for res in result:
kwstr=''
for kw in res:
kwstr+=kw+"+"
kwList.append(kwstr.strip('+'))
else:
k3=keywords.split("|")
kwList=k3
return kwList
def paserKeyMsg(self,keymsg):
logger.info('----------')
wordsCode=keymsg['wordsCode']
id=keymsg['id']
try:
searchEngines=keymsg['searchEngines']
except Exception as e:
searchEngines=[]
kwList=[]
if searchEngines:
if '3' in searchEngines:
keyword=keymsg['keyWord']
keymsglist=self.getkeywords(keyword)
for kw in keymsglist:
kwmsg={
'kw':kw,
'wordsCode':wordsCode,
'sid':id
}
kwList.append(kwmsg)
else:
pass
# logger.info('+++++')
# keyword=keymsg['keyWord']
# keymsglist=self.getkeywords(keyword)
# for kw in keymsglist:
# kwmsg={
# 'kw':kw,
# 'wordsCode':wordsCode,
# 'sid':id
# }
# kwList.append(kwmsg)
return kwList
def runSpider(self,kwmsg):
try:
searchkw=kwmsg['kw']
wordsCode=kwmsg['wordsCode']
sid=kwmsg['sid']
baiduSpider=BaiduSpider(searchkw,wordsCode,sid)
baiduSpider.get_page_html()
baiduSpider.get_detail_html()
except Exception as e:
logger.info('百度搜索异常'+searchkw)
finally:
baiduSpider.driver.quit()
logger.info("关键词采集结束!"+searchkw)
if __name__ == '__main__':
# ss='道地西洋参+(销售市场|交易市场|直播带货|借助大会平台|网店|微商|电商|农民博主|推介宣传|高品质定位|西洋参产品经营者加盟|引进龙头企业|西洋参冷风库|建设农旅中心|农产品展销中心|精品民宿|温泉)'
# keymsglist=getkeywords(ss)
# print(keymsglist)
# 创建Redis连接
baiduTaskJob=BaiduTaskJob()
baseCore=BaseCore()
logger=baseCore.getLogger()
print('---------------')
while True:
try:
try:
keymsg=baiduTaskJob.getkafka()
kwList=baiduTaskJob.paserKeyMsg(keymsg)
except Exception as e:
logger.info("从kafka拿取信息失败!")
time.sleep(5)
continue
if kwList:
# 创建一个线程池,指定线程数量为4
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
# 提交任务给线程池,每个任务处理一个数据
results = [executor.submit(baiduTaskJob.runSpider, data) for data in kwList]
# 获取任务的执行结果
for future in concurrent.futures.as_completed(results):
try:
result = future.result()
# 处理任务的执行结果
logger.info(f"任务执行结束: {result}")
except Exception as e:
# 处理任务执行过程中的异常
logger.info(f"任务执行exception: {e}")
except Exception as e:
logger.info('采集异常')
[redis]
[redis]
host=114.115.236.206
port=6379
pass=clbzzsn
[mysql]
host=114.115.159.144
username=root
password=zzsn9988
database=caiji
url=jdbc:mysql://114.115.159.144:3306/caiji?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai&useSSL=false
[kafka]
bootstrap_servers=114.115.159.144:9092
topic=keyWordsInfo
groupId=python_baidu_test
[selenium]
chrome_driver=C:\Users\WIN10\DataspellProjects\crawlerProjectDemo\tmpcrawler\cmd100\chromedriver.exe
binary_location=D:\crawler\baidu_crawler\tool\Google\Chrome\Application\chrome.exe
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
# 智能采集请求
# 1、考虑:请求智能采集时,不再使用实体类
# a. 仍使用:通过HTTP的 raw 请求体,直接传递HTML源文件,通过query参数传递 lang-code、link-text 参数
# b. 原因:在 postman 中,不方便进行测试,无法使用粘贴后的HTML源文件
# 2、不考虑:使用实体类,利大于弊
# a. 使用实体类,方便扩展参数字段
# b. 方便展示接口文档:调用 json_parameter_utility.get_json_parameters 函数,可显示请求实体类
class ExtractionRequest:
# 语言代码
# 1、采集“非中文”的文章时,需要用到语言代码
lang_code = ""
# 链接文本
# 1、用于采集标题,如果不提供,标题的准确度会下降
link_text = ""
# 文章页面源文件
# 1、用于采集标题、发布时间、内容等
article_html = ""
@staticmethod
def from_dict(dictionary: dict):
extraction_request = ExtractionRequest()
# 尝试方法:
# 1、将字典,更新到内部的 __dict__ 对象
# extraction_request.__dict__.update(dictionary)
# 将字典值,设置到当前对象
for key in dictionary:
setattr(extraction_request, key, dictionary[key])
return extraction_request
def to_dict(self):
# 转换为字典对象:
# 1、序列化为JSON时,需要调用此方法
# 2、转换为JSON字符串:json.dumps(extraction_result, default=ExtractionResult.to_dict)
data = {}
# 借助内部的 __dict__ 对象
# 1、将内部的 __dict__ 对象,更新到新的字典对象中
data.update(self.__dict__)
return data
# 采集结果
class ExtractionResult:
# 标题
title = ""
# 发布日期
publish_date = ""
# 正文(保留所有HTML标记,如:br、img)
text = ""
# URL
url = ""
# 摘要
meta_description = ""
# 干净正文(不带HTML)
cleaned_text = ""
# 来源(目前只支持采集中文网站中的“来源”)
# source = ""
# 顶部图片(top_image:采集不到任何内容,不再使用此属性)
# top_image = ""
def to_dict(self):
# 转换为字典对象:
# 1、序列化为JSON时,需要调用此方法
# 2、转换为JSON字符串:json.dumps(extraction_result, default=ExtractionResult.to_dict)
data = {}
# 借助内部的 __dict__ 对象
# 1、将内部的 __dict__ 对象,更新到新的字典对象中
data.update(self.__dict__)
return data
class UrlPickingRequest:
# 列表页面的响应URL
# 1、作为Base URL,用于拼接提取到的相对URL
# 2、Base URL:必须使用响应URL
# 3、示例:在 Python中,通过 requests.get(url) 请求URL后,需要使用 resp.url 作为 Base URL
list_page_resp_url = ""
# 列表页面源文件
# 1、用于提取文章网址
list_page_html = ""
@staticmethod
def from_dict(dictionary: dict):
url_picking_request = UrlPickingRequest()
# 将字典值,设置到当前对象
for key in dictionary:
setattr(url_picking_request, key, dictionary[key])
return url_picking_request
def to_dict(self):
# 转换为字典对象:
# 1、序列化为JSON时,需要调用此方法
# 2、转换为JSON字符串:json.dumps(extraction_result, default=ExtractionResult.to_dict)
data = {}
# 借助内部的 __dict__ 对象
# 1、将内部的 __dict__ 对象,更新到新的字典对象中
data.update(self.__dict__)
return data
pip install langid -i https://mirrors.aliyun.com/pypi/simple/
pip install redis==4.3.5 -i https://pypi.douban.com/simple
pip install kafka-python==2.0.2 -i https://pypi.douban.com/simple
pip install PyMySQL -i https://pypi.douban.com/simple
pip install gne==0.3.0 -i https://pypi.douban.com/simple
pip install selenium==4.9.1 -i https://pypi.douban.com/simple
pip install logbook -i https://pypi.douban.com/simple
pip install tqdm -i https://pypi.douban.com/simple
pip install goose3 -i https://mirrors.aliyun.com/pypi/simple
pip install Beautifulsoup4 -i https://mirrors.aliyun.com/pypi/simple
pip install langid -i https://mirrors.aliyun.com/pypi/simple/
selenium==3.141.0
selenium-wire==5.1.0
pip install --upgrade selenium
pip install --upgrade urllib3
pip3 uninstall urllib3
ImportError: urllib3 v2.0 only supports OpenSSL 1.1.1+, currently the 'ssl' module is compiled with 'OpenSSL 1.1.0i 14 Aug 2018'. See: https://github.com/urllib3/urllib3/issues/2168
\ No newline at end of file
title baidu_comm
title baidu_comm
chcp 65001
cd /d %~dp0
python baidutaskJob.py
\ No newline at end of file
百度采集部署的服务器
百度采集部署的服务器
114.116.48.72
114.115.235.92
114.116.122.247
114.115.153.6
114.116.122.247
192.168.1.150
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论