提交 c887e9d2 作者: XveLingKun

0717

上级 57e944a7
This source diff could not be displayed because it is too large. You can view the blob instead.
...@@ -52,8 +52,10 @@ if __name__ == "__main__": ...@@ -52,8 +52,10 @@ if __name__ == "__main__":
opt.add_experimental_option("excludeSwitches", ["enable-automation"]) opt.add_experimental_option("excludeSwitches", ["enable-automation"])
opt.add_experimental_option('excludeSwitches', ['enable-logging']) opt.add_experimental_option('excludeSwitches', ['enable-logging'])
opt.add_experimental_option('useAutomationExtension', False) opt.add_experimental_option('useAutomationExtension', False)
opt.binary_location = r'F:\spider\Google\Chrome\Application\chrome.exe' # opt.binary_location = r'F:\spider\Google\Chrome\Application\chrome.exe'
chromedriver = r'F:\spider\cmd100\chromedriver.exe' # chromedriver = r'F:\spider\cmd100\chromedriver.exe'
opt.binary_location = r'D:\Google\Chrome\Application\chrome.exe'
chromedriver = r'D:\cmd100\chromedriver.exe'
browser = webdriver.Chrome(chrome_options=opt, executable_path=chromedriver) browser = webdriver.Chrome(chrome_options=opt, executable_path=chromedriver)
url = "https://mp.weixin.qq.com/" url = "https://mp.weixin.qq.com/"
browser.get(url) browser.get(url)
......
import json import json
...@@ -3,7 +3,9 @@ import time ...@@ -3,7 +3,9 @@ import time
import pymongo import pymongo
url = "https://web.archive.org/web/20230702131549/https://www.forbes.com/lists/global2000/"
# url = "https://web.archive.org/web/20230702131549/https://www.forbes.com/lists/global2000/"
url = "https://web.archive.org/web/20220929184024/https://www.forbes.com/lists/global2000/"
db_storage = pymongo.MongoClient('mongodb://114.115.221.202:27017/', username='admin', password='ZZsn@9988').ZZSN[ db_storage = pymongo.MongoClient('mongodb://114.115.221.202:27017/', username='admin', password='ZZsn@9988').ZZSN[
'福布斯企业人数'] '福布斯企业人数']
headers = { headers = {
...@@ -25,7 +27,7 @@ headers = { ...@@ -25,7 +27,7 @@ headers = {
import requests import requests
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
requests.adapters.DEFAULT_RETRIES = 5
proxies = { proxies = {
'https': 'http://127.0.0.1:1080', 'https': 'http://127.0.0.1:1080',
'http': 'http://127.0.0.1:1080', 'http': 'http://127.0.0.1:1080',
...@@ -46,7 +48,7 @@ with open('./a.txt', 'r', encoding='utf-8') as f: ...@@ -46,7 +48,7 @@ with open('./a.txt', 'r', encoding='utf-8') as f:
dataJson = f.read() dataJson = f.read()
dataJson = json.loads(dataJson) dataJson = json.loads(dataJson)
tableDates = dataJson['tableData'] tableDates = dataJson['tableData']
for tableDate in tableDates[894:]: for tableDate in tableDates:
uri = tableDate['uri'] uri = tableDate['uri']
rank = tableDate['rank'] rank = tableDate['rank']
...@@ -79,4 +81,5 @@ for tableDate in tableDates[894:]: ...@@ -79,4 +81,5 @@ for tableDate in tableDates[894:]:
db_storage.insert_one(dic) db_storage.insert_one(dic)
print(f'{rank}==={organizationName}===已入库') print(f'{rank}==={organizationName}===已入库')
req.close() req.close()
time.sleep(1) # time.sleep(1)
\ No newline at end of file break
\ No newline at end of file
import json
import json
import pandas as pd
import pymongo
import requests
from bs4 import BeautifulSoup
from retry import retry
db_storage = pymongo.MongoClient('mongodb://114.115.221.202:27017/', username='admin', password='ZZsn@9988').ZZSN[
'2022年福布斯企业人数']
url = 'https://web.archive.org/web/20220929184024/https://www.forbes.com/lists/global2000/'
headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding': 'gzip, deflate, br, zstd',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Cache-Control': 'max-age=0',
'Cookie': 'lux_uid=166447682647510727; donation-identifier=aab33e1c4e293a8fcd5490465688bb01; bafp=79fcddb0-4e71-11ee-8a81-b762f64bf85c',
'Priority': 'u=0, i',
'Sec-Ch-Ua': '"Not/A)Brand";v="8", "Chromium";v="126", "Microsoft Edge";v="126"',
'Sec-Ch-Ua-Mobile': '?0',
'Sec-Ch-Ua-Platform': 'Windows"',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-User': '?1',
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36 Edg/126.0.0.0'
}
proxies = {
'https': 'http://127.0.0.1:1080',
'http': 'http://127.0.0.1:1080',
}
@retry(tries=5, delay=2)
def detail(href):
try:
req = requests.get(headers=headers, url=href, verify=False, proxies=proxies)
soup_ = BeautifulSoup(req.text, 'lxml')
scripts = soup_.find_all('script')
req.close()
return scripts
except:
raise
@retry(tries=3, delay=2)
def spider():
response = requests.get(url=url, headers=headers, proxies=proxies)
soup = BeautifulSoup(response.text, 'html.parser')
# print(soup)
tables = soup.find_all('div', class_="table-row-group")
print(len(tables))
for idx, table in enumerate(tables):
print(f'正在遍历第{idx}个table')
a_list = table.find_all('a', class_="table-row")
for a in a_list:
rank = a.find('div', class_="rank").text.replace('.', '')
print(f'排名: {rank}')
organizationName = a.find('div', class_="organizationName").text
href = a.get('href')
try:
scripts = detail(href)
except:
print(f'error--:{idx},{rank},{organizationName}')
item = str(idx) + ',' + rank + ',' + organizationName
with open('./error_2022.txt', 'a', encoding='utf-8')as f:
f.write(item)
continue
# print(scripts)
for script in scripts:
if 'numberOfEmployees' in script.text:
break
else:
continue
# print(f'{rank}--{uri}---not found')
try:
employeesJson = script.text
# print(employeesJson)
employeesJson = json.loads(employeesJson)
numberOfEmployees = employeesJson['numberOfEmployees'].replace(',', '')
except:
numberOfEmployees = '--'
dic = {
'排名': rank,
'企业名称': organizationName,
'员工人数': numberOfEmployees,
}
# print(dic)
db_storage.insert_one(dic)
print(f'{rank}==={organizationName}===已入库')
def spider2():
# 读取excel
df = pd.read_excel('./2022年福布斯榜单.xlsx', sheet_name='待补充')
# 获取数据
data = df.values.tolist()
for idx, row in enumerate(data):
# 获取排名、公司名称、链接
rank = row[1]
organizationName = row[2]
# 将名称转化成小写
organizationName = organizationName.lower().replace(' ', '-')
href = f'https://web.archive.org/web/20220929184024/https://www.forbes.com/companies/{organizationName}/?list=global2000'
# 调用爬虫
try:
scripts = detail(href)
except:
print(f'error--:{idx},{rank},{organizationName}')
item = str(idx) + ',' + rank + ',' + organizationName
with open('./error_2022.txt', 'a', encoding='utf-8') as f:
f.write(item)
continue
# print(scripts)
for script in scripts:
if 'numberOfEmployees' in script.text:
break
else:
continue
# print(f'{rank}--{uri}---not found')
try:
employeesJson = script.text
# print(employeesJson)
employeesJson = json.loads(employeesJson)
numberOfEmployees = employeesJson['numberOfEmployees'].replace(',', '')
except:
numberOfEmployees = '--'
dic = {
'排名': rank,
'企业名称': organizationName,
'员工人数': numberOfEmployees,
}
# print(dic)
db_storage.insert_one(dic)
print(f'{rank}==={organizationName}===已入库')
if __name__ == '__main__':
# spider()
spider2()
\ No newline at end of file
import pandas as pd
import pandas as pd
import pymongo
# 7649
data_list = []
db_stroage = pymongo.MongoClient('mongodb://114.115.221.202:27017', username='admin', password='ZZsn@9988').ZZSN['全球企业资讯']
# datas = db_stroage.find({"内容": {"$ne": None, "$exists": True}})
# 导出标签是空的数据
datas = db_stroage.find({"标签": ""})
link = []
for data in datas:
del data['_id']
del data['id']
if data['标题'] not in link:
data_list.append(data)
link.append(data['标题'])
# print(data)
print(len(data_list))
df = pd.DataFrame(data_list)
df.to_excel('./不保留企业资讯.xlsx',index=False)
\ No newline at end of file
import json
import json
import re
import threading
import time
import uuid
import pymongo
import redis
import requests
from bs4 import BeautifulSoup
from retry import retry
from elasticsearch import Elasticsearch
from base import BaseCore
from obs import ObsClient
import fitz
from urllib.parse import unquote
baseCore = BaseCore.BaseCore(sqlFlg=False)
log = baseCore.getLogger()
db_storage = pymongo.MongoClient('mongodb://114.115.221.202:27017/', username='admin', password='ZZsn@9988').ZZSN[
'智库-不保留222']
lock = threading.Lock()
class EsMethod(object):
def __init__(self):
# 创建Elasticsearch对象,并提供账号信息
self.es = Elasticsearch(['http://114.116.19.92:9700'], http_auth=('elastic', 'zzsn9988'), timeout=300)
self.index_name = 'subjectdatabase'
def queryatt(self,index_name,pnum):
body = {
"query": {
"bool": {
"must": [
{
"match": {
"subjectId": "1537739653432397825"
}
},
{
"match": {
"deleteFlag": "1"
}
},
{
"range": {
"createDate": {
"gte": "2023-12-31T00:00:00",
"lte": "2024-07-02T12:00:00"
}
}
}
]
}
},
"sort": [
{
"createDate": {
"order": "desc"
}
}
],
"track_total_hits": True,
"size": 200,
"from": pnum
}
result = self.es.search(index=index_name
, doc_type='_doc'
, body=body)
# log.info(result)
return result
def clean_html_tag(content):
# todo: 考虑正式场景中是以</p>进行段落划分的
ori_text = re.sub("(<\/p\s*>)", "\t", content)
# 处理图片标签
ori_text = re.sub(r"<img.*?/>", "", ori_text)
tag_content_list = ori_text.split("\t") if "<p" in ori_text else ori_text
temp_content_list = []
if type(tag_content_list) is list:
for text in tag_content_list:
bs = BeautifulSoup(text, 'lxml')
ori_match_content = bs.text.strip()
temp_content_list.append(ori_match_content)
match_content = "\n".join(temp_content_list)
else:
bs1 = BeautifulSoup(tag_content_list, 'lxml')
match_content = bs1.text.strip()
# if "参考文献" not in tag_content_list:
# match_content = temp_content
# else:
# match_content = temp_content.split("参考文献")[0]
return match_content
def preprocess(text: str):
text = text.strip().strip('\n').strip()
text = re.sub(' +', '', text)
text = re.sub('\n+', '\n', text)
return text
def main(page, p, esMethod):
result = esMethod.queryatt(index_name=esMethod.index_name, pnum=p)
total = result['hits']['total']['value']
# if total == 0:
# log.info('++++已没有数据+++++')
# return
try:
msglist = result['hits']['hits']
except:
log.info(f'error-----{result}')
return
log.info(f'---第{page}页{len(msglist)}条数据----共{total}条数据----')
for mms in msglist:
id = mms['_id']
title = mms['_source']['title']
try:
content = mms['_source']['content']
except:
continue
try:
clean_content = clean_html_tag(content)
pre_content = preprocess(clean_content)
except:
pre_content = content
try:
summary = mms['_source']['summary']
except:
summary = ''
try:
clean_summary = clean_html_tag(summary)
pre_summary = preprocess(clean_summary)
except:
pre_summary = summary
try:
contentRaw = mms['_source']['contentRaw']
except:
contentRaw = ''
try:
clean_contentRaw = clean_html_tag(contentRaw)
pre_contentRaw = preprocess(clean_contentRaw)
except:
pre_contentRaw = contentRaw
try:
titleRaw = mms['_source']['titleRaw']
except:
titleRaw = ''
try:
summaryRaw = mms['_source']['summaryRaw']
except:
summaryRaw = ''
try:
clean_summaryRaw = clean_html_tag(summaryRaw)
pre_summaryRaw = preprocess(clean_summaryRaw)
except:
pre_summaryRaw = summaryRaw
contentWithTag = mms['_source']['contentWithTag']
log.info(f'{id}--{title}---')
# labels = mms['_source']['labels']
# tags = []
# for label in labels:
# label_name = label['labelMark']
# if label_name == "dynamic_tags":
# relationName = label['relationName']
# tags.append(relationName)
# else:
# continue
# info_tags = ','.join(tags)
# 存入数据库
dic = {
"id": id,
"标题": title,
"摘要": pre_summary,
"内容": pre_content,
"标题译文": titleRaw,
"摘要译文": pre_summaryRaw,
"内容译文": pre_contentRaw,
"正文html": contentWithTag,
"标签": '',
"状态": "通过",
}
db_storage.insert_one(dic)
def run_threads(num_threads,esMethod,j):
threads = []
for i in range(num_threads):
page = j + i + 1
p = j + i * 200
thread = threading.Thread(target=main, args=(page, p, esMethod))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
if __name__ == "__main__":
j = 0
for i in range(9):
esMethod = EsMethod()
# result = esMethod.queryatt(index_name=esMethod.index_name, pnum=p)
# total = result['hits']['total']['value']
# if total == 0:
# log.info('++++已没有数据+++++')
# break
start = time.time()
num_threads = 5
run_threads(num_threads, esMethod, j)
j += 1000
log.info(f'5线程 每个处理200条数据 总耗时{time.time() - start}秒')
\ No newline at end of file
import json
import json
import re
import threading
import time
import uuid
import pymongo
import redis
import requests
from bs4 import BeautifulSoup
from retry import retry
from elasticsearch import Elasticsearch
from base import BaseCore
from obs import ObsClient
import fitz
from urllib.parse import unquote
baseCore = BaseCore.BaseCore(sqlFlg=False)
log = baseCore.getLogger()
db_storage = pymongo.MongoClient('mongodb://114.115.221.202:27017/', username='admin', password='ZZsn@9988').ZZSN[
'全球企业资讯0710']
lock = threading.Lock()
class EsMethod(object):
def __init__(self):
# 创建Elasticsearch对象,并提供账号信息
self.es = Elasticsearch(['http://114.116.19.92:9700'], http_auth=('elastic', 'zzsn9988'), timeout=300)
self.index_name = 'subjectdatabase'
def queryatt(self,index_name,pnum):
body = {
"query": {
"bool": {
"must": [
{
"match": {
"subjectId": "1734030182269853697"
}
},
{
"range": {
"createDate": {
"gte": "2024-07-01T00:00:00",
"lte": "2024-07-11T00:00:00"
}
}
}
]
}
},
"sort": [
{
"createDate": {
"order": "desc"
}
}
],
"track_total_hits": True,
"size": 200,
"from": pnum
}
result = self.es.search(index=index_name
, doc_type='_doc'
, body=body)
# log.info(result)
return result
def clean_html_tag(content):
# todo: 考虑正式场景中是以</p>进行段落划分的
ori_text = re.sub("(<\/p\s*>)", "\t", content)
# 处理图片标签
ori_text = re.sub(r"<img.*?/>", "", ori_text)
tag_content_list = ori_text.split("\t") if "<p" in ori_text else ori_text
temp_content_list = []
if type(tag_content_list) is list:
for text in tag_content_list:
bs = BeautifulSoup(text, 'lxml')
ori_match_content = bs.text.strip()
temp_content_list.append(ori_match_content)
match_content = "\n".join(temp_content_list)
else:
bs1 = BeautifulSoup(tag_content_list, 'lxml')
match_content = bs1.text.strip()
# if "参考文献" not in tag_content_list:
# match_content = temp_content
# else:
# match_content = temp_content.split("参考文献")[0]
return match_content
def preprocess(text: str):
text = text.strip().strip('\n').strip()
text = re.sub(' +', '', text)
text = re.sub('\n+', '\n', text)
return text
def main(page, p, esMethod):
result = esMethod.queryatt(index_name=esMethod.index_name, pnum=p)
total = result['hits']['total']['value']
# if total == 0:
# log.info('++++已没有数据+++++')
# return
try:
msglist = result['hits']['hits']
except:
log.info(f'error-----{result}')
return
log.info(f'---第{page}页{len(msglist)}条数据----共{total}条数据----')
for mms in msglist:
id = mms['_id']
title = mms['_source']['title']
try:
content = mms['_source']['content']
except:
continue
try:
contentWithTag = mms['_source']['contentWithTag']
except:
continue
try:
clean_content = clean_html_tag(content)
pre_content = preprocess(clean_content)
except:
pre_content = content
try:
summary = mms['_source']['summary']
except:
summary = ''
try:
clean_summary = clean_html_tag(summary)
pre_summary = preprocess(clean_summary)
except:
pre_summary = summary
try:
contentRaw = mms['_source']['contentRaw']
except:
contentRaw = ''
try:
clean_contentRaw = clean_html_tag(contentRaw)
pre_contentRaw = preprocess(clean_contentRaw)
except:
pre_contentRaw = contentRaw
try:
titleRaw = mms['_source']['titleRaw']
except:
titleRaw = ''
try:
summaryRaw = mms['_source']['summaryRaw']
except:
summaryRaw = ''
try:
clean_summaryRaw = clean_html_tag(summaryRaw)
pre_summaryRaw = preprocess(clean_summaryRaw)
except:
pre_summaryRaw = summaryRaw
log.info(f'{id}--{title}---')
labels = mms['_source']['labels']
tags = []
for label in labels:
label_name = label['labelMark']
if label_name == "dynamic_tags":
relationName = label['relationName']
tags.append(relationName)
else:
continue
info_tags = ','.join(tags)
# 存入数据库
dic = {
"id": id,
"标题": title,
"摘要": pre_summary,
"内容": pre_content,
"带标签内容": contentWithTag,
"标题译文": titleRaw,
"摘要译文": pre_summaryRaw,
"内容译文": pre_contentRaw,
"标签": info_tags,
}
db_storage.insert_one(dic)
def run_threads(num_threads,esMethod,j):
threads = []
for i in range(num_threads):
page = j + i + 1
p = j + i * 200
thread = threading.Thread(target=main, args=(page, p, esMethod))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
if __name__ == "__main__":
j = 0
for i in range(2):
esMethod = EsMethod()
# result = esMethod.queryatt(index_name=esMethod.index_name, pnum=p)
# total = result['hits']['total']['value']
# if total == 0:
# log.info('++++已没有数据+++++')
# break
start = time.time()
num_threads = 5
run_threads(num_threads, esMethod, j)
j += 1000
log.info(f'5线程 每个处理200条数据 总耗时{time.time() - start}秒')
\ No newline at end of file
#coding=utf-8
#coding=utf-8
from urllib.parse import urljoin
#from base import BaseCore
import pymysql
from bs4 import BeautifulSoup
#from func_timeout import func_set_timeout
from gne import GeneralNewsExtractor
from langid import langid
from retry import retry
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import csv
import threading
import time
from lxml import etree
from queue import Queue
import re,sys
import datetime
import redis
from kafka import KafkaProducer
import json
from baseCore import BaseCore
import configparser
from smart_extractor import SmartExtractor
class BaiduSpider(object):
def __init__(self,threadId,searchkw,wordsCode,sid):
# 创建ConfigParser对象
self.config = configparser.ConfigParser()
# 读取配置文件
self.config.read('config.ini')
self.baseCore=BaseCore()
self.logger=self.baseCore.getLogger()
self.url = 'https://www.baidu.com/'
self.r = redis.Redis(host=self.config.get('redis', 'host'),
port=self.config.get('redis', 'port'),
password=self.config.get('redis', 'pass'), db=0)
self.page_num = 1
chrome_driver =self.config.get('selenium', 'chrome_driver')
self.kafka_bootstrap_servers = self.config.get('kafka', 'bootstrap_servers')
path = Service(chrome_driver)
chrome_options = webdriver.ChromeOptions()
chrome_options.add_experimental_option('excludeSwitches', ['enable-automation'])
chrome_options.add_experimental_option('useAutomationExtension',False)
chrome_options.binary_location = self.config.get('selenium', 'binary_location')
self.driver = webdriver.Chrome(service=path, chrome_options=chrome_options)
self.driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {'source':'Object.defineProperty(navigator,"webdriver",{get:()=>undefind})'})
# driver = webdriver.Chrome(chrome_options=chrome_options)
self.qtitle = Queue()
self.qurl = Queue()
self.detailList = Queue()
self.searchkw = searchkw
self.wordsCode = wordsCode
self.sid = sid
self.threadId = threadId
def createDriver(self):
chrome_driver =self.config.get('selenium', 'chrome_driver')
path = Service(chrome_driver)
chrome_options = webdriver.ChromeOptions()
chrome_options.binary_location =self.config.get('selenium', 'binary_location')
# 设置代理
# proxy = "127.0.0.1:8080" # 代理地址和端口
# chrome_options.add_argument('--proxy-server=http://' + proxy)
self.driver = webdriver.Chrome(service=path,chrome_options=chrome_options)
#将列表数据插入到表中 meta_search_result
def itemInsertToTable(self,items):
try:
itemdata=[]
conx,cursorM=self.connMysql()
for item in items:
nowtime=self.getNowDate()
data=(self.sid,self.wordsCode,item['title'],item['detailurl'],item['source'],item['publishtime'],item['content'],item['contentHtml'],'1',item['kword'],nowtime)
itemdata.append(data)
sql ="INSERT into baidu_search_result_ (sid,wordsCode,title,detailurl,origin,publishdate,content,content_with_tag,state,keyword,create_time) VALUES (%s, %s,%s, %s, %s, %s, %s, %s, %s, %s, %s)"
cursorM.executemany(sql, itemdata)
self.logger.info("数据插入数据库成功!")
# 定义插入数据的SQL语句
# 执行插入操作
conx.commit()
except Exception as e:
self.logger.info("数据插入数据库失败!")
finally:
self.closeSql(conx,cursorM)
def connMysql(self):
# 创建MySQL连接
conx = pymysql.connect(host=self.config.get('mysql', 'host'),
user=self.config.get('mysql', 'username'),
password=self.config.get('mysql', 'password'),
database=self.config.get('mysql', 'database'))
# 创建一个游标对象
cursorM = conx.cursor()
return conx,cursorM
def closeSql(self,conx,cursorM):
# 关闭游标和连接
cursorM.close()
conx.close()
# 解析页面
def parse_page(self):
self.logger.info(f'{self.searchkw}解析百度列表页...')
response = self.driver.page_source
response = response.replace('<em>', '')
response = response.replace('</em>', '')
html = etree.HTML(response)
lists=self.xpath_paser(html)
try:
flag = html.xpath('//div[@id="page"]//a[last()]//@class')[0]
except Exception as e:
flag=''
lists=[]
return flag, lists
def xpath_paser(self,html):
lists=[]
itemTags=html.xpath('//div[@class="result-op c-container xpath-log new-pmd"]')
for itemTag in itemTags:
try:
title=itemTag.xpath('.//h3[@class="news-title_1YtI1 "]/a/text()')[0]
except Exception as e:
title=''
try:
detailUrl=itemTag.xpath('.//h3[@class="news-title_1YtI1 "]/a/@href')[0]
except Exception as e:
detailUrl=''
try:
sourceTag=itemTag.xpath('.//div[@class="news-source_Xj4Dv"]/a/span[@class="c-color-gray"]/text()')[0]
except Exception as e:
sourceTag=''
try:
publishTag=itemTag.xpath('.//span[@class="c-color-gray2 c-font-normal c-gap-right-xsmall"]/text()')[0]
publishTag=str(publishTag)
publishtime=self.paserTime(publishTag)
publishTag=publishtime.strftime("%Y-%m-%d %H:%M:%S")
except Exception as e:
publishTag=''
detailmsg={
'title':title,
'detailUrl':detailUrl,
'sourceTag':sourceTag,
'publishTag':publishTag
}
lists.append(detailmsg)
self.logger.info(f'{self.searchkw}---列表获取信息的条数{len(lists)}')
return lists
#获取当前时间
def getNowDate(self):
# 获取当前时间
current_time = datetime.datetime.now()
# 将时间转换为字符串
currentdate = current_time.strftime("%Y-%m-%d %H:%M:%S")
return currentdate
def rmTagattr(self,html,url):
# 使用BeautifulSoup解析网页内容
# soup = BeautifulSoup(html, 'html.parser')
soup = self.paserUrl(html,url)
# 遍历所有标签,并去掉属性
for tag in soup.find_all(True):
if tag.name == 'img':
tag.attrs = {key: value for key, value in tag.attrs.items() if key == 'src'}
elif tag.name !='img':
tag.attrs = {key: value for key, value in tag.attrs.items() if key == 'src'}
else:
tag.attrs = {key: value for key, value in tag.attrs.items()}
# 打印去掉属性后的网页内容
# print(soup.prettify())
html=soup.prettify()
return html
# 将html中的相对地址转换成绝对地址
def paserUrl(self,html,listurl):
soup = BeautifulSoup(html, 'html.parser')
# 获取所有的<a>标签和<img>标签
links = soup.find_all(['a', 'img'])
# 遍历标签,将相对地址转换为绝对地址
for link in links:
if 'href' in link.attrs:
link['href'] = urljoin(listurl, link['href'])
elif 'src' in link.attrs:
link['src'] = urljoin(listurl, link['src'])
return soup
#智能抽取
def paserDetail(self,detailhtml,detailurl):
try:
extractor = GeneralNewsExtractor()
article_content = extractor.extract(detailhtml,host=detailurl,with_body_html=True)
# element = html2element(detailhtml)
except:
article_content={}
return article_content
#解析时间
def paserTime(self,publishtime):
timeType=['年前','月前','周前','前天','昨天','天前','今天','小时前','分钟前']
current_datetime = datetime.datetime.now()
publishtime=publishtime.strip()
print(publishtime)
try:
if '年前' in publishtime:
numbers = re.findall(r'\d+', publishtime)
day=int(numbers[0])
delta = datetime.timedelta(days=365 * day)
publishtime = current_datetime - delta
elif '月前' in publishtime:
numbers = re.findall(r'\d+', publishtime)
day=int(numbers[0])
delta = datetime.timedelta(months= day)
publishtime = current_datetime - delta
elif '周前' in publishtime:
numbers = re.findall(r'\d+', publishtime)
day=int(numbers[0])
delta = datetime.timedelta(weeks= day)
publishtime = current_datetime - delta
elif '天前' in publishtime:
numbers = re.findall(r'\d+', publishtime)
day=int(numbers[0])
delta = datetime.timedelta(days= day)
publishtime = current_datetime - delta
elif '前天' in publishtime:
delta = datetime.timedelta(days= 2)
publishtime = current_datetime - delta
elif '昨天' in publishtime:
current_datetime = datetime.datetime.now()
delta = datetime.timedelta(days= 1)
publishtime = current_datetime - delta
elif '今天' in publishtime or '小时前' in publishtime or '分钟前' in publishtime :
delta = datetime.timedelta(hours= 5)
publishtime = current_datetime - delta
elif '年' in publishtime and '月' in publishtime :
time_format = '%Y年%m月%d日'
publishtime = datetime.datetime.strptime(publishtime, time_format)
elif '月' in publishtime and '日' in publishtime :
current_year = current_datetime.year
time_format = '%Y年%m月%d日'
publishtime=str(current_year)+'年'+publishtime
publishtime = datetime.datetime.strptime(publishtime, time_format)
except Exception as e:
print('时间解析异常!!')
return publishtime
# 获取每一页数据, 开趴.
@retry(tries=3, delay=3)
# @func_set_timeout(100)
def get_page_html(self):
self.logger.info(f"{self.searchkw}进入百度首页...")
self.driver.get(self.url)
self.driver.find_element(By.NAME, 'wd').send_keys(self.searchkw)
time.sleep(3)
# self.driver.refresh()
self.driver.find_element(By.ID, 'su').click()
wait = WebDriverWait(self.driver, 30)
wait.until(EC.presence_of_element_located((By.TAG_NAME, "body")))
time.sleep(3)
wait = WebDriverWait(self.driver, 30)
wait.until(EC.presence_of_element_located((By.CLASS_NAME, "s-tab-news")))
self.driver.find_element(By.CLASS_NAME, 's-tab-news').click()
wait = WebDriverWait(self.driver, 30)
wait.until(EC.presence_of_element_located((By.TAG_NAME, "body")))
time.sleep(3)
wait = WebDriverWait(self.driver, 30)
wait.until(EC.presence_of_element_located(
(By.XPATH, '//div[@class="search_sort search_sort_news new-pmd"]/span[text()="按焦点排序"]')))
self.driver.find_element('xpath',
'//div[@class="search_sort search_sort_news new-pmd"]/span[text()="按焦点排序"]').click()
time.sleep(3)
wait = WebDriverWait(self.driver, 30)
wait.until(EC.presence_of_element_located((By.XPATH, '//a[text()="按时间排序"]')))
self.driver.find_element('xpath', '//a[text()="按时间排序"]').click()
self.logger.info(f"{self.searchkw}开始抓取首页...")
try:
flag, lists = self.parse_page()
if len(lists)<1:
return
except Exception as e:
time.sleep(5)
return
if len(lists)==0:
time.sleep(5)
# self.logger.info(f"{self.searchkw}获取{len(lists)}条数据")
repeatCounts = 0
for detail in lists:
durl=detail['detailUrl']
is_member = self.r.sismember('pybaidu_baidu_'+self.wordsCode, durl)
if is_member:
self.logger.info(f"{self.searchkw}已存在{detail['title']}")
repeatCounts += 1
if repeatCounts/len(lists)>0.5:
self.logger.info(f"{self.searchkw}首页已存在50%以上,结束抓取")
return
continue
self.detailList.put(detail)
response = self.driver.page_source
html = etree.HTML(response)
try:
hasnext = html.xpath('//div[@id="page"]//a[last()]//text()')[0]
hasnext = hasnext.strip()
except:
hasnext = ''
timeFlag=False
while hasnext == '下一页 >':
try:
if self.page_num == 5:
break
self.page_num = self.page_num + 1
self.logger.info(f"{self.searchkw}开始抓取第{self.page_num}页...")
try:
self.driver.find_element(By.XPATH, '//div[@id="page"]//a[last()]').click()
except Exception as e:
time.sleep(5)
continue
time.sleep(5)
flag, lists = self.parse_page()
if len(lists)<1:
break
repeated_counts = 0
for detail in lists:
publishTag=detail['publishTag']
# if publishTag:
# pubtime = datetime.datetime.strptime(publishTag, "%Y-%m-%d %H:%M:%S")
# needDate='2022-01-01 00:00:00'
# needTime = datetime.datetime.strptime(needDate, "%Y-%m-%d %H:%M:%S")
# if pubtime < needTime:
# timeFlag = True
# break
durl = detail['detailUrl']
is_member = self.r.sismember('pybaidu_baidu_'+self.wordsCode, durl)
if is_member:
self.logger.info(f"{self.searchkw}已存在{detail['title']}")
repeated_counts += 1
if repeated_counts/len(lists) > 0.5:
self.logger.info(f"{self.searchkw}第{self.page_num}页已存在过多,跳出循环")
return
continue
self.detailList.put(detail)
if timeFlag:
break
try:
response = self.driver.page_source
html = etree.HTML(response)
hasnext = html.xpath('//div[@id="page"]//a[last()]//text()')[0]
hasnext = hasnext.strip()
except Exception as e:
hasnext=''
except Exception as e:
time.sleep(5)
break
self.logger.info(f"{self.searchkw}列表抓取完毕")
#获取资讯内容信息
# @retry(tries=3, delay=2)
def getDetailmsg(self,detailmsg):
try:
detailurl=detailmsg['detailUrl']
title = detailmsg['title']
content,contentWithTag=self.extractorMsg(detailurl,title)
contentWithTag=self.rmTagattr(contentWithTag,detailurl)
except Exception as e:
content=''
contentWithTag=''
currentdate=self.getNowDate()
kword=self.searchkw
publishDate=detailmsg['publishTag']
publishDate=publishDate+''
# publishtime=self.paserTime(publishtime)
# publishDate=publishtime.strftime("%Y-%m-%d %H:%M:%S")
detailmsg={
'title':detailmsg['title'],
'source':detailmsg['sourceTag'],
'detailurl':detailurl,
'content':content,
'contentHtml':contentWithTag,
'publishtime':publishDate,
'currentdate':currentdate,
'kword':kword
}
return detailmsg
def webDriver(self,url):
chrome_driver =self.config.get('selenium', 'chrome_driver')
path = Service(chrome_driver)
chrome_options = webdriver.ChromeOptions()
chrome_options.binary_location =self.config.get('selenium', 'binary_location')
driver = webdriver.Chrome(service=path,chrome_options=chrome_options)
html=''
try:
driver.get(url)
# 等待页面加载完成
# wait = WebDriverWait(self.driver, 20)
# wait.until(EC.presence_of_element_located((By.TAG_NAME, "body")))
time.sleep(2)
html=driver.page_source
except Exception as e:
self.logger.info('请求失败')
finally:
driver.quit()
return html
def extractorMsg(self,url,title):
content=''
contentWithTag=''
lang=''
try:
lang=self.detect_language(title)
raw_html=self.webDriver(url)
sm=SmartExtractor(lang)
article=sm.extract_by_html(raw_html)
content=article.cleaned_text
contentWithTag=article.text
except Exception as e:
try:
raw_html=self.webDriver(url)
sm=SmartExtractor(lang)
article=sm.extract_by_html(raw_html)
content=article.cleaned_text
contentWithTag=article.text
except Exception as e:
print('抽取失败!!')
return content,contentWithTag
def detect_language(self,html):
soup = BeautifulSoup(html, "html.parser")
text = soup.get_text()
# 使用langid.py判断文本的语言
lang, confidence = langid.classify(text)
return lang
def sendMonitor(self,processitem):
self.logger.info(processitem['uniqueCode'])
sidName = self.baseCore.getSidName(processitem['sid'])
monitor = {
"title": processitem['title'], # 标题
"sourceAddress": processitem['sourceAddress'], # 原文链接
"uniqueCode": processitem['uniqueCode'], # 唯一编码 采集类型+6位日期+服务器序列+线程序列+自定义数字
"operateType": "DATA_CRAWLER", # 操作类型 写死
"handlerBody": {
"success": True, # 处理成功或失败状态 写死
"handlerStatus": "CRAWLED" # 处理状态 写死
},
"source": {
"sourceId": processitem['sid'], # 信息源Id
"sourceName": sidName, # 信息源名称
"sourceType": 3, # 信息源类型 sourceType枚举字典
},
"processTime": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), # 处理时间 yyyy-MM-dd HH:mm:ss
"server": {
"serverIp": "192.168.1.150", # 所在服务器IP
"serverHostName": "数据采集服务", # 服务器名称
"processId": self.baseCore.getPID() # 进程Id
}
}
producer = KafkaProducer(bootstrap_servers=[self.kafka_bootstrap_servers],max_request_size=1024*1024*20,api_version=(2,7,0))
try:
kafka_result = producer.send("data_lifecycle_log_data_crawler", json.dumps(monitor, ensure_ascii=False).encode('utf8'))
self.logger.info('监控数据发送Kafka成功')
except Exception as e:
monitor = json.dumps(monitor,ensure_ascii=False)
monitorDic = {
'lifecycle_data_crawler':monitor
}
self.baseCore.r.xadd('data_lifecycle_log_data_crawler-redis',monitorDic,id='*')
self.logger.info('数据监控发送Kafka失败,已放置Redis中')
# 获取详情页
def get_detail_html(self):
# 获取当前窗口的句柄
# current_window = self.driver.current_window_handle
while True:
if self.detailList.qsize() != 0:
detailmsg=self.detailList.get()
title = detailmsg['title']
detailUrl = detailmsg['detailUrl']
if 'baijiahao' in detailUrl:
continue
self.logger.info("%s:%s开始解析详情数据\n" % (title, detailUrl))
# # js = "window.open('"+detailUrl+"')"
# # self.driver.execute_script(js)
# try:
# self.driver.get(detailUrl)
# except Exception as e:
# self.driver.quit()
# self.driver=self.createDriver()
# self.driver.get(detailUrl)
#
# response = self.driver.page_source
# bdetail=self.getDetailmsg(response,detailmsg)
try:
bdetail=self.getDetailmsg(detailmsg)
processitem=self.getProcessitem(bdetail)
uniqueCode = self.baseCore.getUniqueCode('BS','150',self.threadId)
processitem['uniqueCode'] = uniqueCode
try:
flg = self.sendkafka(processitem)
if flg:
self.r.sadd('pybaidu_baidu_'+self.wordsCode, processitem['sourceAddress'])
# 插入数据库
try:
items = []
items.append(bdetail)
self.itemInsertToTable(items)
except Exception as e:
self.logger.info(f"插入数据库失败!{bdetail['kword']}===={detailUrl}")
self.logger.info(f"放入kafka成功!{bdetail['kword']}===={detailUrl}")
self.sendMonitor(processitem)
except Exception as e:
self.logger.info(f"放入kafka失败!{bdetail['kword']}===={detailUrl}")
# 关闭当前新窗口
# self.driver.close()
time.sleep(1)
except Exception as e:
time.sleep(3)
self.logger.info("详情页解析异常!"+detailUrl)
else:
break
# time.sleep(5)
def getProcessitem(self,bdetail):
nowDate=self.getNowDate()
content=bdetail['content']
if content!='':
processitem={
"sid":self.sid,
"source":"3",
"title":bdetail['title'],
"content":bdetail['content'],
"contentWithTag":bdetail['contentHtml'],
"origin":bdetail['source'],
"publishDate":bdetail['publishtime'],
"sourceAddress":bdetail['detailurl'],
"createDate":nowDate
}
return processitem
def sendkafka(self,processitem):
producer = KafkaProducer(bootstrap_servers=[self.kafka_bootstrap_servers],max_request_size=1024*1024*20,api_version=(2,7,0))
content=processitem['content']
publishDate=str(processitem['publishDate'])
title=processitem['title']
if title =='':
return
if content=='':
return
if publishDate=='':
return
try:
kafka_result = producer.send("crawlerInfo", json.dumps(processitem, ensure_ascii=False).encode('utf8'))
# self.logger.info(f"数据发送kafka成功")
self.logger.info(kafka_result.get(timeout=10))
flg = True
except Exception as e:
self.logger.info(f'发送kafka异常{e}')
flg = False
# self.logger.info('发送kafka异常')
finally:
producer.close()
if flg:
return True
else:
return False
def run(self):
# 获取每页URL
c = threading.Thread(target=self.get_page_html)
c.start()
c.join()
# 解析详情页
t = threading.Thread(target=self.get_detail_html)
t.start()
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论