提交 9d49a0cd 作者: XveLingKun

谷歌搜索

上级 252c04d3
import datetime
import os
import random
import redis
import sys
import time
import logbook
......@@ -211,12 +213,18 @@ class BaseCore:
try:
self.__cursor_proxy.close()
self.__cnx_proxy.close()
self.cursor_.close()
self.cnx_.close()
except :
pass
def __init__(self):
self.r = redis.Redis(host="114.116.90.53", port=6380, password='clbzzsn', db=6)
self.__cnx_proxy = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='clb_project',
charset='utf8mb4')
self.__cursor_proxy= self.__cnx_proxy.cursor()
self.cnx_ = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='clb_project',
charset='utf8mb4')
self.cursor_ = self.cnx_.cursor()
pass
# 计算耗时
......@@ -348,3 +356,41 @@ class BaseCore:
ip_list.append(proxy)
return ip_list
# 从Redis的List中获取并移除一个元素
def redicPullData(self, key):
try:
self.r.ping()
except:
self.r = redis.Redis(host="114.116.90.53", port=6380, password='clbzzsn', db=6)
item = self.r.lpop(key)
return item.decode() if item else None
def getSidName(self, sid):
sqlSelect = f"SELECT words_name FROM `key_words` WHERE id = '{sid}'"
self.cursor_.execute(sqlSelect)
data = self.cursor_.fetchone()[0]
return data
# 获得脚本进程PID
def getPID(self):
PID = os.getpid()
return PID
def getUniqueCode(self, abbr, serverId, threadId):
while True:
timeCode = self.r.blpop(['timeCode:google'], 2)
if timeCode:
timeCode = timeCode[1]
timeCode = timeCode.decode('utf-8')
break
else:
time.sleep(2)
pid = str(self.getPID())
if len(pid) < 4:
pid = pid.zfill(4)
elif len(pid) > 4:
pid = pid[0:4]
uniqueCode = abbr + str(datetime.datetime.now().strftime('%Y%m%d'))[2:] + serverId + pid + str(threadId) + str(timeCode)
return uniqueCode
\ No newline at end of file
......@@ -2,7 +2,7 @@ from urllib.parse import urljoin
import langid
import pymysql
from gne import GeneralNewsExtractor
from retry import retry
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
......@@ -15,11 +15,12 @@ import threading
import time
from lxml import etree
from queue import Queue
import re,sys
import re, sys
import datetime
import redis
from kafka import KafkaProducer
import json
import uuid
from baseCore import BaseCore
import configparser
......@@ -31,14 +32,15 @@ import requests
# 从HTML中提取纯文本
from bs4 import BeautifulSoup
class GoogleSpider(object):
def __init__(self,searchkw,wordsCode,sid):
def __init__(self, threadId, searchkw, wordsCode, sid, item, bangdan_name):
# 创建ConfigParser对象
self.config = configparser.ConfigParser()
# 读取配置文件
self.config.read('config.ini')
baseCore=BaseCore()
self.logger=baseCore.getLogger()
self.baseCore = BaseCore()
self.logger = self.baseCore.getLogger()
# self.url = f'https://www.google.com/search?q={searchkw}&tbm=nws&source=lnms&sa=X&ved=2ahUKEwicke6y37OAAxWJGIgKHQWAASUQ0pQJegQIDRAB&biw=1366&bih=372&dpr=1'
# self.url = f'https://www.google.com.hk/search?q={searchkw}&sca_esv=555819424&tbs=sbd:1&tbm=nws&ei=CezVZPaGCaqC4-EPqZi_oAk&start=90&sa=N&ved=2ahUKEwi2r_qGk9SAAxUqwTgGHSnMD5QQ8tMDegQIAhAU&biw=1366&bih=619&dpr=1'
self.url = f'https://www.google.com.hk'
......@@ -46,12 +48,15 @@ class GoogleSpider(object):
port=self.config.get('redis', 'port'),
password=self.config.get('redis', 'pass'), db=0)
self.page_num = 1
chrome_driver =self.config.get('selenium', 'chrome_driver')
chrome_driver = self.config.get('selenium', 'chrome_driver')
self.kafka_bootstrap_servers = self.config.get('kafka', 'bootstrap_servers')
path = Service(chrome_driver)
chrome_options = webdriver.ChromeOptions()
chrome_options.binary_location = self.config.get('selenium', 'binary_location')
self.driver = webdriver.Chrome(service=path,chrome_options=chrome_options)
chrome_options.add_argument(rf'user-data-dir=D:\seleniumTmp\baidu{uuid.uuid1()}')
chrome_options.add_argument("--disable-component-update")
chrome_options.add_argument("--disable-extensions")
self.driver = webdriver.Chrome(service=path, chrome_options=chrome_options)
# driver = webdriver.Chrome(chrome_options=chrome_options)
self.qtitle = Queue()
self.qurl = Queue()
......@@ -59,33 +64,41 @@ class GoogleSpider(object):
self.searchkw = searchkw
self.wordsCode = wordsCode
self.sid = sid
self.threadId = threadId
self.item = item
self.bangdan_name = bangdan_name
def createDriver(self):
chrome_driver =self.config.get('selenium', 'chrome_driver')
chrome_driver = self.config.get('selenium', 'chrome_driver')
path = Service(chrome_driver)
chrome_options = webdriver.ChromeOptions()
chrome_options.binary_location =self.config.get('selenium', 'binary_location')
chrome_options.binary_location = self.config.get('selenium', 'binary_location')
# 设置代理
# proxy = "127.0.0.1:8080" # 代理地址和端口
# chrome_options.add_argument('--proxy-server=http://' + proxy)
self.driver = webdriver.Chrome(service=path,chrome_options=chrome_options)
#将列表数据插入到表中 baidu_search_result
def itemInsertToTable(self,items):
itemdata=[]
conx,cursorM=self.connMysql()
self.driver = webdriver.Chrome(service=path, chrome_options=chrome_options)
# 将列表数据插入到表中 baidu_search_result
def itemInsertToTable(self, items):
itemdata = []
conx, cursorM = self.connMysql()
companyinfo = self.item
social_code = str(companyinfo.split('|')[0])
ch_name = companyinfo.split('|')[1]
en_name = companyinfo.split('|')[2]
rank = self.bangdan_name + '|' + str(companyinfo.split('|')[3])
for item in items:
nowtime=self.getNowDate()
data=(self.sid,self.wordsCode,item['title'],item['detailurl'],item['source'],item['publishtime'],item['content'],item['contentHtml'],'1',item['kword'],nowtime)
nowtime = self.getNowDate()
data = (social_code, en_name, ch_name, rank, item['title'], item['content'], item['detailurl'], item['publishtime'], item['source'], nowtime)
itemdata.append(data)
sql ="INSERT into google_search_result (sid,wordsCode,title,detailurl,origin,publishdate,content,content_with_tag,state,keyword,create_time) VALUES (%s, %s,%s, %s, %s, %s, %s, %s, %s, %s, %s)"
sql = "INSERT into Company_layoff (企业信用代码,企业英文名称,企业中文名称,所在榜单排名,标题,内容,链接,发布时间,来源,创建时间) VALUES (%s, %s,%s, %s, %s, %s, %s, %s, %s, %s)"
cursorM.executemany(sql, itemdata)
self.logger.info("数据插入数据库成功!")
# 定义插入数据的SQL语句
# 执行插入操作
conx.commit()
self.closeSql(conx,cursorM)
self.closeSql(conx, cursorM)
def connMysql(self):
# 创建MySQL连接
......@@ -95,62 +108,63 @@ class GoogleSpider(object):
database=self.config.get('mysql', 'database'))
# 创建一个游标对象
cursorM = conx.cursor()
return conx,cursorM
return conx, cursorM
def closeSql(self,conx,cursorM):
def closeSql(self, conx, cursorM):
# 关闭游标和连接
cursorM.close()
conx.close()
# 解析页面
def parse_page(self):
try:
response = self.driver.page_source
html = etree.HTML(response)
lists=self.xpath_paser(html)
lists = self.xpath_paser(html)
flag = html.xpath('//tr[@jsname="TeSSVd"]//a[last()]//@class')[0]
except:
lists=[]
flag=''
lists = []
flag = ''
return flag, lists
def xpath_paser(self,html):
lists=[]
itemTag=html.xpath('//div[@class="SoaBEf"]')
def xpath_paser(self, html):
lists = []
itemTag = html.xpath('//div[@class="SoaBEf"]')
for itemTag in itemTag:
try:
title=itemTag.xpath('.//div[@class="n0jPhd ynAwRc MBeuO nDgy9d"]/text()')[0]
title=str(title)
title = itemTag.xpath('.//div[@class="n0jPhd ynAwRc MBeuO nDgy9d"]/text()')[0]
title = str(title)
except Exception as e:
title=''
title = ''
try:
detailUrl=itemTag.xpath('.//a[@class="WlydOe"]/@href')[0]
detailUrl=str(detailUrl)
detailUrl = itemTag.xpath('.//a[@class="WlydOe"]/@href')[0]
detailUrl = str(detailUrl)
except Exception as e:
detailUrl=''
detailUrl = ''
try:
sourceTag=itemTag.xpath('.//div[@class="MgUUmf NUnG9d"]//text()')[0]
sourceTag=str(sourceTag)
sourceTag = itemTag.xpath('.//div[@class="MgUUmf NUnG9d"]//text()')[0]
sourceTag = str(sourceTag)
except Exception as e:
print(e)
sourceTag=''
sourceTag = ''
try:
publishTag=itemTag.xpath('.//div[@class="OSrXXb rbYSKb LfVVr"]/span/text()')[0]
publishTag=str(publishTag)
publishtime=self.paserTime(publishTag)
publishTag=publishtime.strftime("%Y-%m-%d %H:%M:%S")
publishTag = itemTag.xpath('.//div[@class="OSrXXb rbYSKb LfVVr"]/span/text()')[0]
publishTag = str(publishTag)
publishtime = self.paserTime(publishTag)
publishTag = publishtime.strftime("%Y-%m-%d %H:%M:%S")
except Exception as e:
publishTag=''
detailmsg={
'title':title,
'detailUrl':detailUrl,
'sourceTag':sourceTag,
'publishTag':publishTag
publishTag = ''
detailmsg = {
'title': title,
'detailUrl': detailUrl,
'sourceTag': sourceTag,
'publishTag': publishTag
}
lists.append(detailmsg)
return lists
#获取当前时间
# 获取当前时间
def getNowDate(self):
# 获取当前时间
......@@ -159,13 +173,13 @@ class GoogleSpider(object):
currentdate = current_time.strftime("%Y-%m-%d %H:%M:%S")
return currentdate
def webDriver(self,url):
chrome_driver =self.config.get('selenium', 'chrome_driver')
def webDriver(self, url):
chrome_driver = self.config.get('selenium', 'chrome_driver')
path = Service(chrome_driver)
chrome_options = webdriver.ChromeOptions()
chrome_options.binary_location =self.config.get('selenium', 'binary_location')
driver = webdriver.Chrome(service=path,chrome_options=chrome_options)
html=''
chrome_options.binary_location = self.config.get('selenium', 'binary_location')
driver = webdriver.Chrome(service=path, chrome_options=chrome_options)
html = ''
try:
driver.get(url)
# 等待页面加载完成
......@@ -173,7 +187,7 @@ class GoogleSpider(object):
driver.refresh()
wait = WebDriverWait(driver, 20)
wait.until(EC.presence_of_element_located((By.TAG_NAME, "body")))
html=driver.page_source
html = driver.page_source
except Exception as e:
self.logger.info('请求失败')
finally:
......@@ -182,77 +196,81 @@ class GoogleSpider(object):
return html
def extractorMsg(self,url,title):
content=''
contentWithTag=''
lang=''
def extractorMsg(self, url, title):
content = ''
contentWithTag = ''
lang = ''
try:
lang=self.detect_language(title)
sm=SmartExtractor(lang)
article=sm.extract_by_url(url=url)
content=article.cleaned_text
contentWithTag=article.text
lang = self.detect_language(title)
sm = SmartExtractor(lang)
article = sm.extract_by_url(url=url)
content = article.cleaned_text
contentWithTag = article.text
except Exception as e:
try:
raw_html=self.webDriver(url)
sm=SmartExtractor(lang)
article=sm.extract_by_html(raw_html)
content=article.cleaned_text
contentWithTag=article.text
raw_html = self.webDriver(url)
sm = SmartExtractor(lang)
article = sm.extract_by_html(raw_html)
content = article.cleaned_text
contentWithTag = article.text
except Exception as e:
print('抽取失败!!')
return content,contentWithTag
return content, contentWithTag
def paserTime(self,publishtime):
timeType=['年前','月前','周前','前天','昨天','天前','今天','小时前','分钟前']
def paserTime(self, publishtime):
timeType = ['年前', '月前', '周前', '前天', '昨天', '天前', '今天', '小时前', '分钟前']
current_datetime = datetime.datetime.now()
publishtime=publishtime.strip()
publishtime = publishtime.strip()
print(publishtime)
try:
if '年前' in publishtime:
numbers = re.findall(r'\d+', publishtime)
day=int(numbers[0])
day = int(numbers[0])
delta = datetime.timedelta(days=365 * day)
publishtime = current_datetime - delta
elif '月前' in publishtime:
numbers = re.findall(r'\d+', publishtime)
day=int(numbers[0])
delta = datetime.timedelta(days= 30*day)
day = int(numbers[0])
delta = datetime.timedelta(days=30 * day)
publishtime = current_datetime - delta
elif '周前' in publishtime:
numbers = re.findall(r'\d+', publishtime)
day=int(numbers[0])
delta = datetime.timedelta(weeks= day)
day = int(numbers[0])
delta = datetime.timedelta(weeks=day)
publishtime = current_datetime - delta
elif '天前' in publishtime:
numbers = re.findall(r'\d+', publishtime)
day=int(numbers[0])
delta = datetime.timedelta(days= day)
day = int(numbers[0])
delta = datetime.timedelta(days=day)
publishtime = current_datetime - delta
elif '前天' in publishtime:
delta = datetime.timedelta(days= 2)
delta = datetime.timedelta(days=2)
publishtime = current_datetime - delta
elif '昨天' in publishtime:
current_datetime = datetime.datetime.now()
delta = datetime.timedelta(days= 1)
delta = datetime.timedelta(days=1)
publishtime = current_datetime - delta
elif '今天' in publishtime or '小时前' in publishtime or '分钟前' in publishtime :
delta = datetime.timedelta(hours= 5)
elif '今天' in publishtime or '小时前' in publishtime or '分钟前' in publishtime:
delta = datetime.timedelta(hours=5)
publishtime = current_datetime - delta
elif '年' in publishtime and '月' in publishtime :
elif '年' in publishtime and '月' in publishtime:
time_format = '%Y年%m月%d日'
publishtime = datetime.datetime.strptime(publishtime, time_format)
elif '月' in publishtime and '日' in publishtime :
elif '月' in publishtime and '日' in publishtime:
current_year = current_datetime.year
time_format = '%Y年%m月%d日'
publishtime=str(current_year)+'年'+publishtime
publishtime = str(current_year) + '年' + publishtime
publishtime = datetime.datetime.strptime(publishtime, time_format)
except Exception as e:
print('时间解析异常!!')
return publishtime
@retry(tries=3, delay=3)
def get_buket_news(self):
self.driver.find_element('xpath', '//div[contains(@class, "YmvwI") and contains(text(), "新闻")]').click()
@retry(tries=3, delay=3)
# 获取每一页数据, 开趴.
def get_page_html(self):
self.logger.info(f"{self.searchkw}...进入google首页...")
......@@ -266,33 +284,44 @@ class GoogleSpider(object):
search_input.send_keys(self.searchkw)
search_input.submit()
try:
time.sleep(3)
wait = WebDriverWait(self.driver, 20)
wait.until(EC.presence_of_element_located((By.TAG_NAME, "body")))
wait.until(EC.presence_of_element_located((By.CLASS_NAME, "crJ18e")))
time.sleep(3)
for i in range(3):
try:
self.driver.find_element('xpath', '//div[contains(@class, "YmvwI") and contains(text(), "新闻")]').click()
except:
self.logger.info('点击新闻按钮失效')
self.get_buket_news()
break
except Exception as e:
self.logger.info(f'点击新闻按钮失效')
self.driver.refresh()
time.sleep(3)
if i < 3:
continue
else:
return
time.sleep(3)
self.driver.find_element('xpath', '//div[@id="hdtb-tls"]').click()
time.sleep(2)
self.driver.find_element('xpath', '//div[@class="hdtb-mn-hd"]/div[text()="按相关性排序"]').click()
# self.driver.find_element('xpath', '//div[@class="hdtb-mn-hd"]/div[text()="按相关性排序"]').click()
self.driver.find_element('xpath',
'//*[@id="tn_1"]/span[3]/g-popup/div[1]/div/div/div[text()="按相关性排序"]').click()
time.sleep(2)
self.driver.find_element('xpath', '//div[@class="YpcDnf OSrXXb HG1dvd"]/a[text()="按日期排序"]').click()
# self.driver.find_element('xpath', '//div[@class="YpcDnf OSrXXb HG1dvd"]/a[text()="按日期排序"]').click()
self.driver.find_element('xpath', '//*[@id="lb"]/div/g-menu/g-menu-item[2]/div/a[text()="按日期排序"]').click()
except Exception as e:
self.logger.info(f'--{self.searchkw}--点击按钮失效----{e}')
self.logger.info(f'--{self.searchkw}--点击按钮失效')
return
self.logger.info(f"{self.searchkw}...开始抓取首页...")
time.sleep(5)
flag, lists = self.parse_page()
if len(lists)<1:
if len(lists) < 1:
time.sleep(6)
repeatCounts = 0
for detail in lists:
durl=detail['detailUrl']
is_member = self.r.sismember('pygoogle_'+self.wordsCode, durl)
durl = detail['detailUrl']
is_member = self.r.sismember('pygoogle_' + self.wordsCode, durl)
if is_member:
repeatCounts += 1
if repeatCounts / len(lists) > 0.5:
......@@ -310,8 +339,8 @@ class GoogleSpider(object):
hasnext = ''
timeFlag = False
while hasnext == '下一页':
if self.page_num==5:
break
# if self.page_num == 5:
# break
self.page_num = self.page_num + 1
self.logger.info(f"{self.searchkw}...开始抓取第{self.page_num}页...")
try:
......@@ -323,7 +352,7 @@ class GoogleSpider(object):
repeated_counts = 0
for detail in lists:
durl = detail['detailUrl']
is_member = self.r.sismember('pygoogle_'+self.wordsCode, durl)
is_member = self.r.sismember('pygoogle_' + self.wordsCode, durl)
if is_member:
self.logger.info(f"{self.searchkw}已存在{detail['title']}")
repeated_counts += 1
......@@ -331,14 +360,14 @@ class GoogleSpider(object):
self.logger.info(f"{self.searchkw}第{self.page_num}页已存在过多,跳出循环")
return
continue
publishTag=detail['publishTag']
# if publishTag:
# pubtime = datetime.datetime.strptime(publishTag, "%Y-%m-%d %H:%M:%S")
# needDate='2022-01-01 00:00:00'
# needTime = datetime.datetime.strptime(needDate, "%Y-%m-%d %H:%M:%S")
# if pubtime < needTime:
# timeFlag = True
# break
publishTag = detail['publishTag']
if publishTag:
pubtime = datetime.datetime.strptime(publishTag, "%Y-%m-%d %H:%M:%S")
needDate = '2022-01-01 00:00:00'
needTime = datetime.datetime.strptime(needDate, "%Y-%m-%d %H:%M:%S")
if pubtime < needTime:
timeFlag = True
break
self.detailList.put(detail)
if timeFlag:
break
......@@ -349,37 +378,74 @@ class GoogleSpider(object):
hasnext = hasnext.strip()
self.logger.info(hasnext)
except Exception as e:
hasnext=''
hasnext = ''
self.logger.info(f"{self.searchkw}...列表抓取完毕")
def getRequest(self,url):
html=''
def getRequest(self, url):
html = ''
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3',
}
try:
print(url)
res=requests.get(url=url,timeout=30)
if res.status_code==200:
res = requests.get(url=url, timeout=30)
if res.status_code == 200:
res.encoding = res.apparent_encoding # 使用自动检测的编码方式
html=res.text
html = res.text
else:
html=''
if html=='':
for i in range(1,3):
html = ''
if html == '':
for i in range(1, 3):
time.sleep(1)
html=self.getRequest(url)
html = self.getRequest(url)
except Exception as e:
print(e)
return html
def sendMonitor(self, processitem):
self.logger.info(processitem['uniqueCode'])
sidName = self.baseCore.getSidName(processitem['sid'])
monitor = {
"title": processitem['title'], # 标题
"sourceAddress": processitem['sourceAddress'], # 原文链接
"uniqueCode": processitem['uniqueCode'], # 唯一编码 采集类型+6位日期+服务器序列+线程序列+自定义数字
"operateType": "DATA_CRAWLER", # 操作类型 写死
"handlerBody": {
"success": True, # 处理成功或失败状态 写死
"handlerStatus": "CRAWLED" # 处理状态 写死
},
"source": {
"sourceId": processitem['sid'], # 信息源Id
"sourceName": sidName, # 信息源名称
"sourceType": 4, # 信息源类型 sourceType枚举字典
},
"processTime": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), # 处理时间 yyyy-MM-dd HH:mm:ss
"server": {
"serverIp": "94.74.96.195", # 所在服务器IP
"serverHostName": "数据采集服务", # 服务器名称
"processId": self.baseCore.getPID() # 进程Id
}
}
producer = KafkaProducer(bootstrap_servers=[self.kafka_bootstrap_servers], max_request_size=1024 * 1024 * 20,
api_version=(2, 7, 0))
try:
kafka_result = producer.send("crawlerInfo", json.dumps(monitor, ensure_ascii=False).encode('utf8'))
self.logger.info('监控数据发送Kafka成功')
except Exception as e:
monitor = json.dumps(monitor, ensure_ascii=False)
monitorDic = {
'lifecycle_data_crawler': monitor
}
self.baseCore.r.xadd('data_lifecycle_log_data_crawler-redis', monitorDic, id='*')
self.logger.info('数据监控发送Kafka失败,已放置Redis中')
# 获取详情页
def get_detail_html(self):
while True:
if self.detailList.qsize() != 0:
detailmsg=self.detailList.get()
detailmsg = self.detailList.get()
title = detailmsg['title']
detailUrl = detailmsg['detailUrl']
self.logger.info("%s:%s开始解析详情数据\n" % (title, detailUrl))
......@@ -392,12 +458,12 @@ class GoogleSpider(object):
# self.driver.get(detailUrl)
# response = self.driver.page_source
try:
bdetail=self.getDetailmsg(detailmsg)
bdetail = self.getDetailmsg(detailmsg)
# 'content':content,
# 'contentHtml':contentWithTag,
content=bdetail['content']
contentHtml=bdetail['contentHtml']
if len(content)<100:
content = bdetail['content']
contentHtml = bdetail['contentHtml']
if len(content) < 100:
continue
soup = BeautifulSoup(contentHtml, "html.parser")
# 查找所有带有class属性的元素
......@@ -405,57 +471,62 @@ class GoogleSpider(object):
# 循环遍历元素并去掉class属性
for element in elements_with_class:
del element.attrs["class"]
contentHtml=str(soup)
bdetail['content']=content
bdetail['contentHtml']=contentHtml
contentHtml = str(soup)
bdetail['content'] = content
bdetail['contentHtml'] = contentHtml
except Exception as e:
self.logger.info('详情解析失败')
continue
processitem=self.getProcessitem(bdetail)
processitem = self.getProcessitem(bdetail)
# uniqueCode = self.baseCore.getUniqueCode('GG', '195', self.threadId)
# processitem['uniqueCode'] = uniqueCode
try:
self.sendkafka(processitem)
self.r.sadd('pygoogle_'+self.wordsCode, processitem['sourceAddress'])
# flg = self.sendkafka(processitem)
flg = True
if flg:
self.r.sadd('pygoogle_' + self.wordsCode, processitem['sourceAddress'])
# 插入数据库
try:
items = []
items.append(bdetail)
self.itemInsertToTable(items)
except Exception as e:
self.logger.info(f"插入数据库失败!{bdetail['kword']}===={detailUrl}")
self.logger.info(f"放入kafka成功!{bdetail['kword']}===={detailUrl}")
self.logger.info(f"插入数据库失败!{bdetail['kword']}===={e}")
# self.logger.info(f"放入kafka成功!{bdetail['kword']}===={detailUrl}")
# self.sendMonitor(processitem)
except Exception as e:
self.logger.info(f"放入kafka失败!{bdetail['kword']}===={detailUrl}")
self.logger.info(f"{e}{bdetail['kword']}===={detailUrl}")
# 关闭当前新窗口
# self.driver.close()
time.sleep(1)
except Exception as e:
time.sleep(5)
self.logger.info("详情页解析异常!"+detailUrl)
self.logger.info("详情页解析异常!" + detailUrl)
else:
break
# time.sleep(5)
def rmTagattr(self,html,url):
def rmTagattr(self, html, url):
# 使用BeautifulSoup解析网页内容
# soup = BeautifulSoup(html, 'html.parser')
soup = self.paserUrl(html,url)
soup = self.paserUrl(html, url)
# 遍历所有标签,并去掉属性
for tag in soup.find_all(True):
if tag.name == 'img':
tag.attrs = {key: value for key, value in tag.attrs.items() if key == 'src'}
elif tag.name !='img':
elif tag.name != 'img':
tag.attrs = {key: value for key, value in tag.attrs.items() if key == 'src'}
else:
tag.attrs = {key: value for key, value in tag.attrs.items()}
# 打印去掉属性后的网页内容
# print(soup.prettify())
html=soup.prettify()
html = soup.prettify()
return html
# 将html中的相对地址转换成绝对地址
def paserUrl(self,html,listurl):
def paserUrl(self, html, listurl):
soup = BeautifulSoup(html, 'html.parser')
# 获取所有的<a>标签和<img>标签
links = soup.find_all(['a', 'img'])
......@@ -468,73 +539,76 @@ class GoogleSpider(object):
return soup
#获取资讯内容信息
def getDetailmsg(self,detailmsg):
# 获取资讯内容信息
def getDetailmsg(self, detailmsg):
try:
detailurl=detailmsg['detailUrl']
detailurl = detailmsg['detailUrl']
title = detailmsg['title']
content,contentWithTag=self.extractorMsg(detailurl,title)
contentWithTag=self.rmTagattr(contentWithTag,detailurl)
content, contentWithTag = self.extractorMsg(detailurl, title)
contentWithTag = self.rmTagattr(contentWithTag, detailurl)
except Exception as e:
content=''
contentWithTag=''
content = ''
contentWithTag = ''
currentdate=self.getNowDate()
kword=self.searchkw
publishDate=detailmsg['publishTag']
publishDate=publishDate+''
currentdate = self.getNowDate()
kword = self.searchkw
publishDate = detailmsg['publishTag']
publishDate = publishDate + ''
# publishtime=self.paserTime(publishtime)
# publishDate=publishtime.strftime("%Y-%m-%d %H:%M:%S")
detailmsg={
'title':detailmsg['title'],
'source':detailmsg['sourceTag'],
'detailurl':detailurl,
'content':content,
'contentHtml':contentWithTag,
'publishtime':publishDate,
'currentdate':currentdate,
'kword':kword
detailmsg = {
'title': detailmsg['title'],
'source': detailmsg['sourceTag'],
'detailurl': detailurl,
'content': content,
'contentHtml': contentWithTag,
'publishtime': publishDate,
'currentdate': currentdate,
'kword': kword
}
return detailmsg
def getProcessitem(self,bdetail):
nowDate=self.getNowDate()
content=bdetail['content']
if content!='':
processitem={
"sid":self.sid,
"source":"4",
"title":bdetail['title'],
"content":bdetail['content'],
"contentWithtag":bdetail['contentHtml'],
"origin":bdetail['source'],
"publishDate":bdetail['publishtime'],
"sourceAddress":bdetail['detailurl'],
"createDate":nowDate
def getProcessitem(self, bdetail):
nowDate = self.getNowDate()
content = bdetail['content']
if content != '':
processitem = {
"sid": self.sid,
"source": "4",
"title": bdetail['title'],
"content": bdetail['content'],
"contentWithtag": bdetail['contentHtml'],
"origin": bdetail['source'],
"publishDate": bdetail['publishtime'],
"sourceAddress": bdetail['detailurl'],
"createDate": nowDate
}
return processitem
def sendkafka(self,processitem):
def sendkafka(self, processitem):
try:
producer = KafkaProducer(bootstrap_servers=[self.kafka_bootstrap_servers])
content=processitem['content']
publishDate=str(processitem['publishDate'])
title=processitem['title']
if title =='':
content = processitem['content']
publishDate = str(processitem['publishDate'])
title = processitem['title']
if title == '':
return
if content=='':
if content == '':
return
if publishDate=='':
if publishDate == '':
return
kafka_result = producer.send("crawlerInfo", json.dumps(processitem, ensure_ascii=False).encode('utf8'))
# self.logger.info("数据发送kafka成功")
self.logger.info(kafka_result.get(timeout=10))
flg = True
except Exception as e:
flg = False
pass
# self.logger.info('发送kafka异常')
finally:
producer.close()
return flg
def run(self):
# 获取每页URL
......@@ -545,38 +619,37 @@ class GoogleSpider(object):
t = threading.Thread(target=self.get_detail_html)
t.start()
def detect_language(self,html):
def detect_language(self, html):
soup = BeautifulSoup(html, "html.parser")
text = soup.get_text()
# 使用langid.py判断文本的语言
lang, confidence = langid.classify(text)
return lang
if __name__ == '__main__':
searchkw='kw'
wordsCode='wordsCode'
sid='sid'
zhuce=GoogleSpider(searchkw,wordsCode,sid)
searchkw = 'kw'
wordsCode = 'wordsCode'
sid = 'sid'
zhuce = GoogleSpider(searchkw, wordsCode, sid)
# zhuce.run()
url='https://vostok.today/46962-fesco-i-rzhd-rasshirjat-propusknuju-sposobnost-vladivostokskogo-morskogo-torgovogo-porta.html'
url = 'https://vostok.today/46962-fesco-i-rzhd-rasshirjat-propusknuju-sposobnost-vladivostokskogo-morskogo-torgovogo-porta.html'
zhuce.driver.get(url)
time.sleep(20)
html=zhuce.driver.page_source
html = zhuce.driver.page_source
print(html)
lang=zhuce.detect_language(html)
lang = zhuce.detect_language(html)
print(lang)
print('++++++++++++++++++')
sm=SmartExtractor(lang)
article=sm.extract_by_html(html)
sm = SmartExtractor(lang)
article = sm.extract_by_html(html)
# article=sm.extract_by_url(url)
content=article.cleaned_text
text=article.text
content = article.cleaned_text
text = article.text
print(content)
print(text)
# raw_html = article.raw_html
# html=zhuce.getRequest(url)
# article_content=zhuce.extract_article(html,url)
# print(article_content)
......@@ -27,6 +27,9 @@ class GoogleTaskJob(object):
self.r = redis.Redis(host=self.config.get('redis', 'host'),
port=self.config.get('redis', 'port'),
password=self.config.get('redis', 'pass'), db=0)
self.r_6 = redis.Redis(host=self.config.get('redis', 'host'),
port=self.config.get('redis', 'port'),
password=self.config.get('redis', 'pass'), db=6)
def getkafka(self):
# Kafka集群的地址
......@@ -108,35 +111,36 @@ class GoogleTaskJob(object):
def paserKeyMsg(self,keymsg):
num = 1
logger.info('----------')
wordsCode=keymsg['wordsCode']
id=keymsg['id']
try:
searchEngines=keymsg['searchEngines']
if 'java.util.ArrayList' in searchEngines:
searchEngines=searchEngines[1]
except Exception as e:
searchEngines=[]
kwList=[]
if searchEngines:
if '4' in searchEngines:
keyword=keymsg['keyWord']
kwList = []
keymsglist=self.getkeywords(keyword)
for kw in keymsglist:
kwmsg={
'kw':kw,
'wordsCode':wordsCode,
'sid':id
}
kwList.append(kwmsg)
kwList.append((num,kwmsg))
num += 1
return kwList
def runSpider(self,kwmsg):
def runSpider(self,threadId,kwmsg, item, bangdan_name):
if 'lay' in kwmsg['kw']:
com_name = item.split('|')[2]
else:
com_name = item.split('|')[1]
searchkw = com_name + ' ' + kwmsg['kw']
searchkw=kwmsg['kw']
wordsCode=kwmsg['wordsCode']
sid=kwmsg['sid']
googleSpider=GoogleSpider(searchkw,wordsCode,sid)
print(f'======拼接的关键词是{searchkw}=={com_name}====')
wordsCode = kwmsg['wordsCode']
sid = kwmsg['sid']
googleSpider = GoogleSpider(threadId, searchkw, wordsCode, sid, item, bangdan_name)
try:
googleSpider.get_page_html()
......@@ -151,7 +155,28 @@ class GoogleTaskJob(object):
finally:
googleSpider.driver.quit()
logger.info("关键词采集结束!"+searchkw)
import random
def get_comname(self):
# todo:读取redis里的企业名称添加到关键词上
# ZZSN22080900000001|沃尔玛|WMT|1
item = baseCore.redicPullData('GOOGLE_KEYWORDS:COMPANY_NAME:2023_500')
# item = 'ZZSN22080900000001|沃尔玛|WMT|1'
if item:
return item
else:
logger.info('====已无企业===')
return None
# 从Redis的List中获取并移除一个元素
def redicPullData(key, r):
try:
r.ping()
except:
r = redis.Redis(host="114.116.90.53", port=6380, password='clbzzsn', db=6)
item = r.lpop(key)
return item.decode() if item else None
if __name__ == '__main__':
# ss='道地西洋参+(销售市场|交易市场|直播带货|借助大会平台|网店|微商|电商|农民博主|推介宣传|高品质定位|西洋参产品经营者加盟|引进龙头企业|西洋参冷风库|建设农旅中心|农产品展销中心|精品民宿|温泉)'
# keymsglist=getkeywords(ss)
......@@ -164,14 +189,28 @@ if __name__ == '__main__':
print('---------------')
while True:
try:
codeids=[]
# codeid='KW-20230727-0001'
codeids.append('KW-20240318-0001')
for codeid in codeids:
# try:
# googleTaskJob.r.ping()
# except:
# googleTaskJob.r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=6)
# all_keys = 'GOOGLE_KEYWORDS:COMPANY_NAME*'
# keys = googleTaskJob.r.scan_iter(f"{key}*")
# for key in keys:
item = googleTaskJob.get_comname()
bangdan_name = '2023年世界500强'
if item:
pass
else:
break
codeList = [
'KW-20240516-0002'
]
for codeid in codeList:
try:
# keymsg=baiduTaskJob.getkafka()
keymsg=googleTaskJob.getkeyFromredis(codeid)
kwList=googleTaskJob.paserKeyMsg(keymsg)
#keymsg=baiduTaskJob.getkafka()
keymsg = googleTaskJob.getkeyFromredis(codeid)
kwList = googleTaskJob.paserKeyMsg(keymsg)
# kwList=reversed(kwList)
# 从列表中随机选择5个数据
# kwList = random.sample(kwList, 4)
......@@ -182,9 +221,9 @@ if __name__ == '__main__':
continue
if kwList:
# 创建一个线程池,指定线程数量为4
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
# 提交任务给线程池,每个任务处理一个数据
results = [executor.submit(googleTaskJob.runSpider, data) for data in kwList]
results = [executor.submit(googleTaskJob.runSpider, num, data, item, bangdan_name) for num, data in kwList]
# 获取任务的执行结果
for future in concurrent.futures.as_completed(results):
try:
......@@ -195,5 +234,5 @@ if __name__ == '__main__':
# 处理任务执行过程中的异常
logger.info(f"任务执行exception: {e}")
except Exception as e:
logger.info('采集异常')
logger.info(f'采集异常{e}')
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论