提交 ac997c9c 作者: 刘伟刚

上传新文件

上级 19ef2b48
import pymysql
import pymysql
from gne import GeneralNewsExtractor
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains
import csv
import threading
import time
from lxml import etree
from queue import Queue
import re,sys
import datetime
import redis
from kafka import KafkaProducer
import json
from baseCore import BaseCore
import configparser
class BaiduSpider(object):
def __init__(self,searchkw,wordsCode,sid):
# 创建ConfigParser对象
self.config = configparser.ConfigParser()
# 读取配置文件
self.config.read('config.ini')
baseCore=BaseCore()
self.logger=baseCore.getLogger()
self.url = 'https://www.baidu.com/'
self.r = redis.Redis(host=self.config.get('redis', 'host'),
port=self.config.get('redis', 'port'),
password=self.config.get('redis', 'pass'), db=0)
self.page_num = 1
chrome_driver =self.config.get('selenium', 'chrome_driver')
path = Service(chrome_driver)
chrome_options = webdriver.ChromeOptions()
chrome_options.binary_location = self.config.get('selenium', 'binary_location')
self.driver = webdriver.Chrome(service=path,chrome_options=chrome_options)
# driver = webdriver.Chrome(chrome_options=chrome_options)
self.qtitle = Queue()
self.qurl = Queue()
self.detailList = Queue()
self.searchkw = searchkw
self.wordsCode = wordsCode
self.sid = sid
def createDriver(self):
chrome_driver =self.config.get('selenium', 'chrome_driver')
path = Service(chrome_driver)
chrome_options = webdriver.ChromeOptions()
chrome_options.binary_location =self.config.get('selenium', 'binary_location')
# 设置代理
# proxy = "127.0.0.1:8080" # 代理地址和端口
# chrome_options.add_argument('--proxy-server=http://' + proxy)
self.driver = webdriver.Chrome(service=path,chrome_options=chrome_options)
#将列表数据插入到表中 baidu_search_result
def itemInsertToTable(self,items):
itemdata=[]
conx,cursorM=self.connMysql()
for item in items:
nowtime=self.getNowDate()
data=(self.sid,self.wordsCode,item['title'],item['detailurl'],item['source'],item['publishtime'],item['content'],item['contentHtml'],'1',item['kword'],nowtime)
itemdata.append(data)
sql ="INSERT into baidu_search_result (sid,wordsCode,title,detailurl,origin,publishdate,content,content_with_tag,state,keyword,create_time) VALUES (%s, %s,%s, %s, %s, %s, %s, %s, %s, %s, %s)"
cursorM.executemany(sql, itemdata)
self.logger.info("数据插入数据库成功!")
# 定义插入数据的SQL语句
# 执行插入操作
conx.commit()
self.closeSql(conx,cursorM)
def connMysql(self):
# 创建MySQL连接
conx = pymysql.connect(host=self.config.get('mysql', 'host'),
user=self.config.get('mysql', 'username'),
password=self.config.get('mysql', 'password'),
database=self.config.get('mysql', 'database'))
# 创建一个游标对象
cursorM = conx.cursor()
return conx,cursorM
def closeSql(self,conx,cursorM):
# 关闭游标和连接
cursorM.close()
conx.close()
# 解析页面
def parse_page(self):
self.logger.info('解析百度列表页')
response = self.driver.page_source
response = response.replace('<em>', '')
response = response.replace('</em>', '')
html = etree.HTML(response)
lists=self.xpath_paser(html)
try:
flag = html.xpath('//div[@id="page"]//a[last()]//@class')[0]
except Exception as e:
flag=''
return flag, lists
def xpath_paser(self,html):
lists=[]
itemTag=html.xpath('//div[@class="result-op c-container xpath-log new-pmd"]')
for itemTag in itemTag:
try:
title=itemTag.xpath('.//h3[@class="news-title_1YtI1 "]/a/text()')[0]
except Exception as e:
title=''
try:
detailUrl=itemTag.xpath('.//h3[@class="news-title_1YtI1 "]/a/@href')[0]
except Exception as e:
detailUrl=''
try:
sourceTag=itemTag.xpath('.//div[@class="news-source_Xj4Dv"]/a/span[@class="c-color-gray"]/text()')[0]
except Exception as e:
sourceTag=''
try:
publishTag=itemTag.xpath('.//span[@class="c-color-gray2 c-font-normal c-gap-right-xsmall"]/text()')[0]
except Exception as e:
publishTag=''
detailmsg={
'title':title,
'detailUrl':detailUrl,
'sourceTag':sourceTag,
'publishTag':publishTag
}
lists.append(detailmsg)
self.logger.info(f'列表获取信息的条数{len(lists)}')
return lists
#获取当前时间
def getNowDate(self):
# 获取当前时间
current_time = datetime.datetime.now()
# 将时间转换为字符串
currentdate = current_time.strftime("%Y-%m-%d %H:%M:%S")
return currentdate
#智能抽取
def paserDetail(self,detailhtml,detailurl):
try:
extractor = GeneralNewsExtractor()
article_content = extractor.extract(detailhtml,host=detailurl,with_body_html=True)
# element = html2element(detailhtml)
except:
article_content={}
return article_content
#解析时间
def paserTime(self,publishtime):
timeType=['年前','月前','周前','前天','昨天','天前','今天','小时前','分钟前']
current_datetime = datetime.datetime.now()
publishtime=publishtime.strip()
print(publishtime)
try:
if '年前' in publishtime:
numbers = re.findall(r'\d+', publishtime)
day=int(numbers[0])
delta = datetime.timedelta(days=365 * day)
publishtime = current_datetime - delta
elif '月前' in publishtime:
numbers = re.findall(r'\d+', publishtime)
day=int(numbers[0])
delta = datetime.timedelta(months= day)
publishtime = current_datetime - delta
elif '周前' in publishtime:
numbers = re.findall(r'\d+', publishtime)
day=int(numbers[0])
delta = datetime.timedelta(weeks= day)
publishtime = current_datetime - delta
elif '天前' in publishtime:
numbers = re.findall(r'\d+', publishtime)
day=int(numbers[0])
delta = datetime.timedelta(days= day)
publishtime = current_datetime - delta
elif '前天' in publishtime:
delta = datetime.timedelta(days= 2)
publishtime = current_datetime - delta
elif '昨天' in publishtime:
current_datetime = datetime.datetime.now()
delta = datetime.timedelta(days= 1)
publishtime = current_datetime - delta
elif '今天' in publishtime or'小时前' in publishtime or '分钟前' in publishtime :
delta = datetime.timedelta(hours= 5)
publishtime = current_datetime - delta
elif '年' in publishtime and '月' in publishtime :
time_format = '%Y年%m月%d日'
publishtime = datetime.datetime.strptime(publishtime, time_format)
elif '月' in publishtime and '日' in publishtime :
current_year = current_datetime.year
time_format = '%Y年%m月%d日'
publishtime=str(current_year)+'年'+publishtime
publishtime = datetime.datetime.strptime(publishtime, time_format)
except Exception as e:
print('时间解析异常!!')
return publishtime
# 获取每一页数据, 开趴.
def get_page_html(self):
self.logger.info("进入百度首页...")
self.driver.get(self.url)
self.driver.find_element(By.NAME, 'wd').send_keys(self.searchkw)
self.driver.find_element(By.ID, 'su').click()
time.sleep(5)
self.driver.find_element(By.CLASS_NAME, 's-tab-news').click()
self.logger.info("开始抓取首页...")
try:
flag, lists = self.parse_page()
except Exception as e:
time.sleep(5)
return
if len(lists)==0:
time.sleep(5)
for detail in lists:
durl=detail['detailUrl']
is_member = self.r.sismember('pybaidu_baidu_'+self.wordsCode, durl)
if is_member:
continue
self.detailList.put(detail)
response = self.driver.page_source
html = etree.HTML(response)
hasnext = html.xpath('//div[@id="page"]//a[last()]//text()')[0]
hasnext = hasnext.strip()
while hasnext == '下一页 >':
try:
if self.page_num==5:
break
self.page_num = self.page_num + 1
self.logger.info("开始抓取第%s页..." % self.page_num)
try:
self.driver.find_element(By.XPATH, '//div[@id="page"]//a[last()]').click()
except Exception as e:
if self.page_num>1:
self.page_num = self.page_num - 1
continue
time.sleep(5)
flag, lists = self.parse_page()
for detail in lists:
is_member = self.r.sismember('pybaidu_baidu_'+self.wordsCode, durl)
if is_member:
continue
self.detailList.put(detail)
response = self.driver.page_source
html = etree.HTML(response)
hasnext = html.xpath('//div[@id="page"]//a[last()]//text()')[0]
hasnext = hasnext.strip()
except Exception as e:
time.sleep(5)
break
self.logger.info("抓取完毕")
# 获取详情页
def get_detail_html(self):
# 获取当前窗口的句柄
current_window = self.driver.current_window_handle
while True:
if self.detailList.qsize() != 0:
try:
detailmsg=self.detailList.get()
title = detailmsg['title']
detailUrl = detailmsg['detailUrl']
if 'baijiahao' in detailUrl:
continue
print("%s:%s\n" % (title, detailUrl))
# js = "window.open('"+detailUrl+"')"
# self.driver.execute_script(js)
try:
self.driver.get(detailUrl)
except Exception as e:
self.driver.quit()
self.driver=self.createDriver()
self.driver.get(detailUrl)
response = self.driver.page_source
bdetail=self.getDetailmsg(response,detailmsg)
processitem=self.getProcessitem(bdetail)
try:
self.sendkafka(processitem)
self.r.sadd('pybaidu_test_'+self.wordsCode, processitem['sourceAddress'])
except Exception as e:
self.logger.info("放入kafka失败!")
#插入数据库
try:
items=[]
items.append(bdetail)
self.itemInsertToTable(items)
except Exception as e:
self.logger.info("插入数据库失败!")
# 关闭当前新窗口
# self.driver.close()
time.sleep(1)
except Exception as e:
time.sleep(3)
self.logger.info("详情页解析异常!"+detailUrl)
else:
break
# time.sleep(5)
def getDetailmsg(self,detailhtml,detailmsg):
try:
detailurl=detailmsg['detailUrl']
article_content=self.paserDetail(detailhtml,detailurl)
content=article_content['content']
contentWithTag=article_content['body_html']
except Exception as e:
self.logger.info('内容抽取失败')
content=''
contentWithTag=''
currentdate=self.getNowDate()
kword=self.searchkw
publishtime=detailmsg['publishTag']
publishtime=self.paserTime(publishtime)
publishDate=publishtime.strftime("%Y-%m-%d %H:%M:%S")
detailmsg={
'title':detailmsg['title'],
'source':detailmsg['sourceTag'],
'detailurl':detailurl,
'content':content,
'contentHtml':contentWithTag,
'publishtime':publishDate,
'currentdate':currentdate,
'kword':kword
}
return detailmsg
def getProcessitem(self,bdetail):
nowDate=self.getNowDate()
content=bdetail['content']
if content!='':
processitem={
"sid":self.sid,
"source":"3",
"title":bdetail['title'],
"content":bdetail['content'],
"contentWithtag":bdetail['contentHtml'],
"origin":bdetail['source'],
"publishDate":bdetail['publishtime'],
"sourceAddress":bdetail['detailurl'],
"createDate":nowDate
}
return processitem
def sendkafka(self,processitem):
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
content=processitem['content']
publishDate=str(processitem['publishDate'])
title=processitem['title']
if title =='':
return
if content=='':
return
if publishDate=='':
return
kafka_result = producer.send("crawlerInfo", json.dumps(processitem, ensure_ascii=False).encode('utf8'))
# kafka_result = producer.send("pythonCrawlerInfo", json.dumps(processitem, ensure_ascii=False).encode('utf8'))
self.logger.info("数据发送kafka成功")
self.logger.info(kafka_result.get(timeout=10))
except Exception as e:
self.logger.info('发送kafka异常')
finally:
producer.close()
def run(self):
# 获取每页URL
c = threading.Thread(target=self.get_page_html)
c.start()
c.join()
# 解析详情页
t = threading.Thread(target=self.get_detail_html)
t.start()
if __name__ == '__main__':
zhuce = BaiduSpider()
zhuce.run()
# zhuce.driver.close()
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论