提交 edc0ae0e 作者: 薛凌堃

1/24

上级 7fe3a354
import pymysql
class Province():
def __init__(self):
self.cnx = cnx
self.cursor = cursor
def Insert(self, line_info):
select_sql = f"SELECT cityName from cityinfo WHERE cityName='{line_info[0]}'"
cursor.execute(select_sql)
result = cursor.fetchone()
if result:
print('该信息已存在')
return
insert_sql = "INSERT INTO cityinfo values (%s,%s,%s,%s,%s)"
cursor.execute(insert_sql,(line_info[0],line_info[1],line_info[2],line_info[3],line_info[4]))
cnx.commit()
def Query(self, parts_text):
select_sql = "SELECT * FROM cityinfo WHERE introduction like %s "
search_term = f"%{parts_text}%"
cursor.execute(select_sql, search_term)
def Update(self, field, value):
update_sql = f"UPDATE cityinfo SET '{field}'= %s "
cursor.execute(update_sql, value)
cnx.commit()
def Delete(self, field, value):
delete_sql = f"DELETE FROM cityinfo WHERE '{field}'= %s"
cursor.execute(delete_sql, value)
cnx.commit()
# def create_table():
# sql = '''
# CREATE TABLE IF NOT EXISTS cityinfo (
# id INT AUTO_INCREMENT PRIMARY KEY,
# cityname VARCHAR(255) NOT NULL,
# GDP FLOAT,
# population INT,
# area FLOAT,
# introduction VARCHAR(255)
# )
# '''
#
# cursor.execute(sql)
#
# cnx.commit()
# cnx.close()
def search_city(dict_info, cityname):
city_info = dict_info[cityname]
return city_info
if __name__ == "__main__":
cnx = pymysql.connect(host='localhost', user='root', password='123456', database='db1',charset='utf8mb4')
cursor = cnx.cursor()
Tool = Province()
dict_info = {}
try:
with open('file.txt', 'r') as f:
info_lists = f.readlines()
# headers = info_lists[1]
# for head in headers.split(',')[0]:
# dict_info[head] = ''
# for key in dict_info.items():
# create_table()
for line in info_lists[1:]:
line_info = line.split(',')
key = line_info[0]
# 定义一个字典
dict_info[key] = {
'GDP':line_info[1],
'population':line_info[2],
'area':line_info[3],
'induction':line_info[4]
}
Tool.Insert(line_info)
except:
print('该文件不存在')
# 实现省份的快速查询
cityname = '河北'
city_info = search_city(dict_info, cityname)
# (1)
data_list = [
{"city1": "北京", "city2": "纽约", "go": 50.3, "come": 60.5},
{"city1": "上海", "city2": "洛杉矶", "go": 60, "come": 40},
{"city1": "广州", "city2": "芝加哥", "go": 30, "come": 20},
]
to_america = 0
to_china = 0
for item in data_list:
to_america += item["go"]
to_china += item["come"]
# (2)
import pandas as pd
data_list = [
{"city1": "北京", "city2": "纽约", "go": 50.3, "come": 60.5},
{"city1": "上海", "city2": "洛杉矶", "go": 60, "come": 40},
{"city1": "广州", "city2": "芝加哥", "go": 30, "come": 20},
{"city1": "北京", "city2": "芝加哥", "go": 30, "come": 20},
]
def calculate_go_come(city1, city2):
for data in data_list:
if data["city1"] == city1 and data["city2"] == city2:
return data["go"] - data["come"]
return None
# print(calculate_go_come("上海", "洛杉矶"))
index = set([data["city1"] for data in data_list])
col = set([data["city2"] for data in data_list])
data_frame = pd.DataFrame(index=list(index), columns=list(col), data=0.0)
for i in range(len(data_list)):
for j in range(len(data_list)):
city1 = data_list[i]["city1"]
city2 = data_list[j]["city2"]
difference = calculate_go_come(city1, city2)
data_frame.loc[city1, city2] = difference
print(data_frame)
\ No newline at end of file
import pyautogui
from bs4 import BeautifulSoup
from retry import retry
from selenium import webdriver
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
from bson import ObjectId
import pymongo
from base import BaseCore
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
db_storage = pymongo.MongoClient('mongodb://114.115.221.202:27017/', username='admin', password='ZZsn@9988').中科软[
'数据源_0106']
# 获取当前活动窗口的标题
def get_active_window_title():
window = pyautogui.getActiveWindow()
log.info(f'当前活动窗口的标题是:{window.title}')
return window.title if window else None
@retry(tries=3, delay=1)
def Translate(_id, driver):
driver.get('file:///C:/Users/EDY/Desktop/aaa.html')
flag = driver.find_element(By.TAG_NAME, 'body').text
driver.maximize_window()
# 切换到Edge浏览器窗口
driver.switch_to.window(driver.current_window_handle)
# 等待一段时间,确保页面加载完成
time.sleep(5)
# 获取Edge浏览器窗口的句柄
edge_handle = driver.current_window_handle
# driver.refresh()
# time.sleep(5)
# 右键选择翻译
rightClick = ActionChains(driver)
position_element = driver.find_element(By.TAG_NAME, 'body')
rightClick.context_click(position_element).perform()
time.sleep(1)
pyautogui.typewrite(['down'] * 6)
pyautogui.typewrite(["enter"])
js = "return action=document.body.scrollHeight"
new_height = driver.execute_script(js)
for i in range(0, new_height, 300):
# js = "var q=document.documentElement.scrollTop=300"
driver.execute_script(js)
driver.execute_script('window.scrollTo(0, %s)' % (i))
time.sleep(1)
time.sleep(2)
if driver.find_element(By.TAG_NAME, 'body').text[:500] in flag:
log.error(f'{_id}---翻译失败,重试')
# 使用pyautogui模块模拟按下Alt+Tab键,将Edge浏览器置于最前面
# while get_a
# ctive_window_title() != "Edge浏览器":
while 'Microsoft​ Edge' not in get_active_window_title():
pyautogui.hotkey('alt', 'tab')
log.info('窗口切换操作')
# pyautogui.hotkey('alt', 'tab')
# 切换到Edge浏览器窗口
driver.switch_to.window(edge_handle)
driver.refresh()
raise
page_source = driver.page_source
contentWithTag = BeautifulSoup(page_source, 'html.parser')
db_storage.update_one({'_id':ObjectId(_id)},{'$set':{'postCode':'18','richText':str(contentWithTag)}})
# with open(rf'C:\Users\EDY\Desktop\{_id}.html', 'w', encoding='utf-8') as f:
# f.write(str(contentWithTag))
# print(str(contentWithTag))
def doJob():
driver = webdriver.Edge()
while True:
datas = db_storage.find({'postCode':'2'}).limit(10)
for data in datas:
now = time.time()
_id = str(data['_id'])
richTextForeign = data['richTextForeign']
with open(r'C:\Users\EDY\Desktop\aaa.html', 'w', encoding='utf-8') as f:
f.write(str(richTextForeign))
try:
Translate(_id, driver)
log.info(f'{_id}翻译用时--{time.time() - now}')
except:
log.error(f'{_id}翻译失败')
if __name__ == "__main__":
doJob()
baseCore.close()
\ No newline at end of file
import os
from flask import Flask, request, jsonify
app = Flask(__name__)
import pyautogui
from bs4 import BeautifulSoup
from retry import retry
from selenium import webdriver
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.by import By
# from selenium.webdriver.support.wait import WebDriverWait
# from selenium.webdriver.support import expected_conditions as EC
import time
# import datetime
# from bson import ObjectId
import pymongo
import sys
sys.path.append('D:/kkwork/zzsn_spider/base')
import BaseCore
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
pyautogui.FAILSAFE = False
pyautogui.PAUSE = 1
import redis
# from selenium.webdriver.edge.options import Options
# from concurrent.futures import ThreadPoolExecutor
# executor = ThreadPoolExecutor(2)
import concurrent.futures
import threading
thread_local = threading.local()
from tempfile import TemporaryFile
r = redis.StrictRedis(host='114.115.221.202', port=6379, db=1, decode_responses=True, password='clbzzsn')
db_storage = pymongo.MongoClient('mongodb://114.115.221.202:27017/', username='admin', password='ZZsn@9988').中科软[
'数据源_0504']
# path = r'D:\soft\msedgedriver.exe'
#
# driver = webdriver.Edge(executable_path=path)
def create_driver():
path = r'D:\soft\msedgedriver.exe'
# options = webdriver.EdgeOptions()
options = {
"browserName": "MicrosoftEdge",
"ms:edgeOptions": {
"extensions": [], "args": ["--start-maximized"] # 添加最大化窗口运作参数
}
}
driver_ = webdriver.Edge(executable_path=path, capabilities=options)
return driver_
def get_driver():
if not hasattr(thread_local,'driver'):
thread_local.driver = create_driver()
return thread_local.driver
# 获取当前活动窗口的标题
def get_active_window_title():
window = pyautogui.getActiveWindow()
log.info(f'当前活动窗口的标题是:{window.title}')
return window.title if window else None
def shiftwindow(n):
pyautogui.keyDown('alt')
for _ in range(n):
pyautogui.press('tab')
pyautogui.keyUp('alt')
pyautogui.sleep(1) # 可选的等待时间,以防操作太快
# n += 1
def click(type_name, driver):
# 右键选择翻译
pyautogui.moveTo(500, 400, duration=1)
if type_name == '正文':
try:
if driver.find_element(By.TAG_NAME, 'img').is_displayed():
pass
else:
time.sleep(5)
except:
pass
time.sleep(1)
position_elements = driver.find_elements(By.TAG_NAME, 'p')
for e in position_elements:
driver.execute_script("arguments[0].scrollIntoView();", e)
time.sleep(1)
try:
if e.find_element(By.TAG_NAME, 'a'):
continue
else:
break
except:
break
else:
pyautogui.moveTo(1700, 900, duration=1)
# log.error(f'{type_name}----未找到可点击的元素')
# return None
else:
time.sleep(1)
position_elements = driver.find_elements(By.TAG_NAME, 'div')
for e in position_elements:
try:
if e.find_element(By.TAG_NAME, 'a'):
continue
else:
break
except:
break
else:
log.error(f'{type_name}----未找到可点击的元素')
return None
rightClick = ActionChains(driver)
try:
rightClick.context_click(e).perform()
except:
rightClick.context_click().perform()
@retry(tries=3, delay=1)
def Translate(type_name, file_name, driver):
# driver.get('file:///C:/Users/Administrator/Desktop/aaa.html')
driver.get(f'file:///{file_name}')
window_title = file_name.split('\\')[-1]
flag = driver.find_element(By.TAG_NAME, 'body').text
driver.maximize_window()
edge_handle = driver.window_handles[0]
driver.switch_to.window(edge_handle)
# click(type_name, driver)
#time.sleep(1)
n = 1
# while 'Edge' not in get_active_window_title():
while window_title not in get_active_window_title():
# print(n)
time.sleep(1)
log.info('窗口切换操作')
shiftwindow(n)
n += 1
# if n>20:
# break
time.sleep(1)
# if n > 20:
# log.error(f'{type_name}未找到浏览器窗口')
# raise
#driver.refresh()
click(type_name, driver)
time.sleep(1)
pyautogui.typewrite(['down'] * 6)
pyautogui.typewrite(["enter"])
time.sleep(1)
js = "window.scrollTo(0,0)"
driver.execute_script(js)
time.sleep(1)
count_ = 0
while driver.find_element(By.TAG_NAME, 'body').text[:500] in flag and count_ < 10:
time.sleep(2)
count_ += 1
if driver.find_element(By.TAG_NAME, 'body').text[:500] in flag:
log.error(f'{type_name}---翻译加载失败')
return None
js = "return action=document.body.scrollHeight"
new_height = driver.execute_script(js)
for i in range(0, new_height, 300):
# js = "var q=document.documentElement.scrollTop=300"
driver.execute_script(js)
driver.execute_script('window.scrollTo(0, %s)' % (i))
time.sleep(1)
# time.sleep(2)
if driver.find_element(By.TAG_NAME, 'body').text[:500] in flag:
log.error(f'{type_name}---翻译失败,重试')
# 使用pyautogui模块模拟按下Alt+Tab键,将Edge浏览器置于最前面
count = 0
# while 'Edge' not in get_active_window_title():
while window_title not in get_active_window_title():
time.sleep(1)
shiftwindow(count)
log.info('窗口切换操作')
count += 1
# if count>5:
# break
# log.info('窗口切换操作')
# pyautogui.hotkey('alt', 'tab')
# 切换到Edge浏览器窗口
# driver.switch_to.window(edge_handle)
#driver.refresh()
raise
page_source = driver.page_source
contentWithTag = BeautifulSoup(page_source, 'html.parser')
if type_name == '正文':
translate_type = str(contentWithTag)
else:
translate_type = contentWithTag.text
# db_storage.update_one({'_id':ObjectId(_id)},{'$set':{'postCode':'18','richText':str(contentWithTag),'postTime':datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}})
# with open(rf'C:\Users\EDY\Desktop\{_id}.html', 'w', encoding='utf-8') as f:
# f.write(str(contentWithTag))
# print(str(contentWithTag))
return translate_type
def save_file(text, driver, num):
if num == 1:
type_name = '标题'
text = f'<div>{text}</div>'
elif num == 2:
type_name = '摘要'
text = f'<div>{text}</div>'
else:
type_name = '正文'
now = time.time()
result = ''
# with open(r'C:\Users\Administrator\Desktop\aaa.html', 'w', encoding='utf-8') as f:
with TemporaryFile(mode='w+t', delete=False, encoding='utf-8', suffix='.html') as f:
# with open(r'C:\Users\EDY\Desktop\aaa.html', 'w', encoding='utf-8') as f:
f.write(str(text))
file_name = f.name
try:
result = Translate(type_name, file_name, driver)
if result:
log.info(f'{type_name}翻译用时--{time.time() - now}')
else:
log.error(f'{type_name}翻译失败')
except Exception as e:
log.error(f'{type_name}翻译失败--{e}')
os.remove(file_name)
return result
@app.route('/translate', methods=['POST'])
def doJob(data):
driver = get_driver()
start = time.time()
log.info('开始翻译')
# path = r'D:\soft\msedgedriver.exe'
# driver = webdriver.Edge(executable_path=path)
# data = request.get_json() # post请求中获取json数据
title = data['title']
summary = data['summary']
contentWithTag = data['contentWithTag']
# 带标签的标题和摘要
title_result = save_file(title, driver, 1)
summary_result = save_file(summary, driver, 2)
contentWithTag_result = save_file(contentWithTag, driver, 3)
if title_result and summary_result and contentWithTag_result:
translate_result = {
'status': 'success',
'title': title_result,
'summary': summary_result,
'contentWithTag': contentWithTag_result
}
else:
translate_result = {
'status': 'failed',
'title': title_result,
'summary': summary_result,
'contentWithTag': contentWithTag_result
}
log.info(f'翻译完成,耗时--{time.time() - start}')
return jsonify(translate_result)
@app.route('/thread_translate', methods=['POST'])
def run_flask_app():
# app.run('0.0.0.0', 5000)
# future = executor.submit(doJob)
data = request.get_json()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
# 提交任务到线程池
futures = [executor.submit(doJob, data) for _ in range(2)]
# 等待任务完成
concurrent.futures.wait(futures)
return 'Job submitted'
if __name__ == "__main__":
# doJob()
# baseCore.close()
app.run('0.0.0.0', 5000)
# executor.submit(run_flask_app)
\ No newline at end of file
......@@ -581,7 +581,7 @@ if __name__ == '__main__':
start_time = time.time()
# 获取企业信息
# company_field = baseCore.redicPullData('BaseInfoEnterprise:gnqy_socialCode')
company_field = '913300007125582210||'
company_field = '91440600708114839J||'
if company_field == 'end':
# 本轮处理完毕,需要发送邮件,并且进入下一轮
baseCore.sendEmail(file_name)
......
......@@ -81,6 +81,7 @@ def checklogin(key):
# ip = baseCore.get_proxy()
# req = requests.get(headers=headers, url=url, proxies=ip)
req = requests.get(headers=headers, url=url)
time.sleep(1)
soup = BeautifulSoup(req.content, 'html.parser')
if soup.find('title').text == '会员登录 - 企查查':
log.info('状态---未登录')
......@@ -629,7 +630,7 @@ if __name__ == '__main__':
# exchange = ''
count = redaytowork(com_name, social_code, securitiesCode, securitiesShortName, listingDate, category, exchange,listType, ynDomestic, countryName, file_name)
time.sleep(2)
time.sleep(10)
# break
# baseCore.r.close()
# baseCore.sendEmail(file_name)
......
......@@ -23,7 +23,7 @@ class File():
sheet.append(["企业名称", "社会信用代码"])
# 创建另一个sheet
sheet2 = wb.create_sheet("获取基本信息成功企业")
sheet2.append(["企业名称", "社会信用代码", "采到的信用代码"])
sheet2.append(["企业名称", "采到的企业名称", "社会信用代码", "采到的信用代码"])
wb.save(file_name)
wb.close()
......
......@@ -33,6 +33,7 @@ def flushAndGetToken():
browser.refresh()
cookie_list = browser.get_cookies()
cookies = {}
print(cookie_list)
for cookie in cookie_list:
cookies[cookie['name']] = cookie['value']
print(cookies)
......@@ -48,7 +49,7 @@ if __name__ == "__main__":
# soup = BeautifulSoup(page_source,'html.parser')
# print(soup)
browser.find_element(By.CLASS_NAME, 'nav-item').click()
time.sleep(70)
time.sleep(20)
cookies = flushAndGetToken()
cookies = json.dumps(cookies)
insert = f"insert into QCC_token (cookies,create_time,fenghao_time,update_time) values ('{escape_string(cookies)}',now(),DATE_SUB(NOW(), INTERVAL 1 DAY),now())"
......
......@@ -137,7 +137,7 @@ def uptoOBS(pdf_url, name_pdf, type_id, pathType, header):
if header:
response = requests.get(pdf_url, headers=header, verify=False, timeout=20)
else:
response = requests.get(pdf_url, headers=headers, verify=False, timeout=20)
response = requests.get(pdf_url, verify=False, timeout=20)
if response.status_code == 200:
pass
else:
......@@ -182,7 +182,7 @@ def uptoOBS(pdf_url, name_pdf, type_id, pathType, header):
# 下载pdf文件,上传至服务器
def download(data, order_by, header):
def download(data, order_by,header):
url_pdf = data['url_pdf']
name_pdf = data['title']
if '.pdf' not in name_pdf:
......@@ -902,7 +902,7 @@ def qianyanzhishiku():
def shijiejingjiluntan():
allnum = {'一': '01', '二': '02', '三': '03', '四': '04', '五': '05', '六': '06', '七': '07', '八': '08', '九': '09', '十': '10', '十一': '11', '十二': '12'}
for i in range(1, 2):
for i in range(2, 3):
# res = requests.get(url)
# soup = BeautifulSoup(res.content,'html.parser')
......@@ -910,40 +910,47 @@ def shijiejingjiluntan():
url = f'https://cn.weforum.org/publications/?page={i}'
browser.get(url) # 跳到指定页面
time.sleep(5)
time.sleep(1)
# 输出浏览器头部的cookies
cookie_list = browser.get_cookies()
cookies = {}
for cookie in cookie_list:
cookies[cookie['name']] = cookie['value']
# print(cookies)
cookies = json.dumps(cookies)
cookies_ = json.loads(cookies)
s = requests.session()
s.cookies.update(cookies_)
wait = WebDriverWait(browser, 30)
wait.until(EC.presence_of_element_located((By.CLASS_NAME, "wef-184hs11")))
wait.until(EC.presence_of_element_located((By.XPATH, "/html/body/div[2]/div/section/div/div/main/div[4]")))
page_source = browser.page_source # 获取页面信息
soup = BeautifulSoup(page_source, 'html.parser')
soup = BeautifulSoup(page_source, 'lxml')
from lxml import etree
root = etree.HTML(str(soup))
time.sleep(2)
list_all = soup.find('div', {'class':'wef-qrllub'}).find_all('div',{'class':'wef-184hs11'})
# list_all = soup.find('div', {'class':'wef-qrllub'}).find_all('div',{'class':'wef-184hs11'})
list_all = root.xpath('/html/body/div[2]/div/section/div/div/main/div[4]/div/div')
time.sleep(2)
for one_info in list_all:
tag = one_info.find('div', class_='wef-wx6hgt').find_all('div',class_='wef-0')[1]
info_title = tag.find('a').text.strip()
info_date = one_info.find('div',{'class':'wef-1nvfeoy'}).find('time')['datetime']
for one_info in list_all[1:]:
info_title = one_info.xpath('./article/div[2]/div[2]/p/a/text()')[0]
info_href = one_info.xpath('./article/div[2]/div[2]/p/a/@href')[0]
info_date = one_info.xpath('./article/div[2]/div[4]/time/@datetime')[0]
datetime_obj = datetime.strptime(info_date, '%Y-%m-%dT%H:%M:%SZ')
info_date = datetime_obj.strftime('%Y-%m-%d')
# if info_date >= '2022-07-21':
# continue
try:
info_zhaiyao = one_info.find('div', {'class': 'wef-8xl60i'}).text.strip()
info_zhaiyao = one_info.xpath('./article/div[2]/div[3]/p/text()')[0]
except:
info_zhaiyao = ''
try:
info_pdf = one_info.find('div',{'class':'wef-1nvfeoy'}).find('a').get('href')
info_pdf = one_info.xpath('./article/div[2]/div[4]/a/@href')[0]
except:
info_pdf = ''
info_href = tag.find('a').get('href')
# info_href = tag.find('a').get('href')
header ={
header ={
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9',
......@@ -962,10 +969,7 @@ def shijiejingjiluntan():
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"'
}
cookies = json.dumps(cookies)
cookies_ = json.loads(cookies)
s = requests.session()
s.cookies.update(cookies_)
reponse = s.get(url=info_href, headers=header, verify=False)
# jar = requests.cookies.RequestsCookieJar() # 先构建RequestsCookieJar对象
......@@ -1011,7 +1015,23 @@ def shijiejingjiluntan():
'sid': '1662008019231088642', # 信息源id
}
order_by = 1
download(dic_post, order_by, header)
headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh-TW;q=0.9,zh;q=0.8',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Pragma': 'no-cache',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-User': '?1',
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0',
'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Microsoft Edge";v="120"',
'sec-ch-ua-mobile': '?0',
}
download(dic_post, order_by, headers)
order_by += 1
# print(page,dic_post)
# url = 'http://114.115.155.139:5002/report_download'
......@@ -1148,7 +1168,7 @@ def dongfangcaifu():
'come': news_come,
}
order_by = 1
download(dic_post, order_by)
download(dic_post, order_by,'')
order_by += 1
# 东方财富网2
......@@ -1236,7 +1256,7 @@ def dongfangcaifu2():
'come': news_come,
}
order_by = 1
download(dic_post, order_by)
download(dic_post, order_by,'')
order_by += 1
......@@ -1316,7 +1336,7 @@ def dongfangcaifu3():
}
# list_quchong.append(dic_post)
order_by = 1
download(dic_post, order_by)
download(dic_post, order_by,'')
order_by += 1
# log.info(dic_post['title'],dic_post['publishDate'])
......@@ -1409,7 +1429,7 @@ def dongfangcaifu4():
'come': news_come,
}
order_by = 1
download(dic_post, order_by)
download(dic_post, order_by,'')
order_by += 1
# log.info(f'成功:{dic_post["title"]},{dic_post["publishDate"]}')
......@@ -1491,7 +1511,7 @@ def dongfangcaifu5():
'come': news_come,
}
order_by = 1
download(dic_post, order_by)
download(dic_post, order_by,'')
order_by += 1
# log.info(f'成功:{dic_post["title"]},{dic_post["publishDate"]}')
......@@ -1585,7 +1605,7 @@ def dongfangcaifu6():
'come': news_come,
}
order_by = 1
download(dic_post, order_by)
download(dic_post, order_by,'')
order_by += 1
# log.info(f'成功:{dic_post["title"]},{dic_post["publishDate"]}')
......@@ -1674,7 +1694,7 @@ def dongfangcaifu7():
'come': news_come,
}
order_by = 1
download(dic_post, order_by)
download(dic_post, order_by,'')
order_by += 1
# log.info(f'成功:{dic_post["title"]},{dic_post["publishDate"]}')
......
# connect timeout in seconds
# default value is 30s
connect_timeout=300
# network timeout in seconds
# default value is 30s
network_timeout=600
# the base path to store log files
#base_path=/home/tarena/django-project/cc_shop1/cc_shop1/logs
# tracker_server can ocur more than once, and tracker_server format is
# "host:port", host can be hostname or ip address
tracker_server=114.115.215.96:22122
#standard log level as syslog, case insensitive, value list:
### emerg for emergency
### alert
### crit for critical
### error
### warn for warning
### notice
### info
### debug
log_level=info
# if use connection pool
# default value is false
# since V4.05
use_connection_pool = false
# connections whose the idle time exceeds this time will be closed
# unit: second
# default value is 3600
# since V4.05
connection_pool_max_idle_time = 3600
# if load FastDFS parameters from tracker server
# since V4.05
# default value is false
load_fdfs_parameters_from_tracker=false
# if use storage ID instead of IP address
# same as tracker.conf
# valid only when load_fdfs_parameters_from_tracker is false
# default value is false
# since V4.05
use_storage_id = false
# specify storage ids filename, can use relative or absolute path
# same as tracker.conf
# valid only when load_fdfs_parameters_from_tracker is false
# since V4.05
storage_ids_filename = storage_ids.conf
#HTTP settings
http.tracker_server_port=80
#use "#include" directive to include HTTP other settiongs
##include http.conf
\ No newline at end of file
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
import policy
import tingtype
import BaseCore
from apscheduler.schedulers.blocking import BlockingScheduler
basecore = BaseCore.BaseCore()
log = basecore.getLogger()
def policylaw_task():
# 实例化一个调度器
scheduler = BlockingScheduler()
# 每天执行一次
scheduler.add_job(policy, 'cron', hour=0,minute=0)
scheduler.add_job(tingtype, 'cron', hour=0, minute=0)
try:
scheduler.start()
except Exception as e:
log.info('定时采集异常', e)
pass
policylaw_task()
\ No newline at end of file
......@@ -56,7 +56,7 @@ if __name__=="__main__":
url = "https://mp.weixin.qq.com/"
browser.get(url)
# 可改动
time.sleep(40)
time.sleep(20)
s = requests.session()
#获取到token和cookies
......
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
......@@ -12,7 +12,7 @@ from kafka import KafkaProducer
from requests.packages import urllib3
from datetime import datetime, timedelta
urllib3.disable_warnings()
db_storage = pymongo.MongoClient('mongodb://114.115.221.202:27017', username='admin', password='zzsn@9988').ZZSN['人民网-习讲话数据库_copy']
db_storage = pymongo.MongoClient('mongodb://114.115.221.202:27017', username='admin', password='ZZsn@9988').ZZSN['人民网-习讲话数据库_copy']
def newsdata(art_content_dict,art_type_dict,dic_lables):
for key, value in art_content_dict.items():
......@@ -124,7 +124,8 @@ def get_content():
continue
for data_dict in data_list[::-1]:
article_id = data_dict['article_id']
is_article_id = db_storage.find_one({'id': article_id})
print(type(article_id))
is_article_id = db_storage.find_one({'id': f"1534423014825668610{article_id}"})
if is_article_id:
continue
title = data_dict['title']
......
# 微软翻译 不太好用 刚测试就要输入验证码
# 微软翻译 不太好用 刚测试就要输入验证码
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
import time
from selenium.webdriver.support.ui import Select
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.chrome.service import Service
class Translate():
def __init__(self):
self._lang_list = ['zh-Hans', 'zh-hant', 'en', 'ko', 'fr', 'jp', 'el', 'ru']
self._lang_list_original = ["中文(简体)", "中文(繁体)", "英语", "韩语", "法语", "日语", "希腊语", "俄语"]
self._num = len(self._lang_list)
self.url = "https://cn.bing.com/translator?"
self.header = {
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/14.0.835.163 Safari/535.1"}
chrome_driver = r'D:\cmd100\chromedriver.exe'
path = Service(chrome_driver)
chrome_options = webdriver.ChromeOptions()
chrome_options.binary_location = r'D:\Google\Chrome\Application\chrome.exe'
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--ignore-certificate-errors')
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
chrome_options.add_argument("--start-maximized")
# chrome_options.add_argument(
# 'user-agent=' + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36')
# chrome_options.add_argument('--headless')
self.browser = webdriver.Chrome(service=path, chrome_options=chrome_options)
self.wait = WebDriverWait(self.browser, 20)
def translate(self):
word = input("Input your word: ")
self.get_input_language_type(word)
yournum = int(input("Input the language you want to translate: "))
self.wait.until(EC.presence_of_element_located(
(By.ID, 'tta_tgtsl')))
sl = Select(self.browser.find_element(By.ID, 'tta_tgtsl'))
sl.select_by_value(f'{self._lang_list[int(yournum) - 1]}')
result = self.browser.find_element(By.CLASS_NAME, 'tta_outtxt')
print(result.text)
return
def get_input_language_type(self,word):
self.browser.get("https://cn.bing.com/translator?")
self.wait.until(EC.presence_of_element_located((By.ID, "tta_input_ta")))
input_word = self.browser.find_element(By.ID, "tta_input_ta")
input_word.send_keys(word)
sl = Select(self.browser.find_element(By.ID, 'tta_srcsl'))
sl.select_by_value('auto-detect')
if __name__ == "__main__":
test = Translate()
test.translate()
\ No newline at end of file
#有道翻译 不用登录翻译5000字 网页加载速度慢
#有道翻译 不用登录翻译5000字 网页加载速度慢
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
import time
from selenium.webdriver.support.ui import Select
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.chrome.service import Service
class Translate():
def __init__(self):
self.url = "https://fanyi.youdao.com/index.html#/"
self.header = {
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/14.0.835.163 Safari/535.1"}
self.chrome_driver = r'D:\cmd100\chromedriver.exe'
self.path = Service(self.chrome_driver)
self.chrome_options = webdriver.ChromeOptions()
self.chrome_options.binary_location = r'D:\Google\Chrome\Application\chrome.exe'
self.chrome_options.add_argument('--disable-gpu')
self.chrome_options.add_argument('--ignore-certificate-errors')
self.chrome_options.add_argument("--disable-blink-features=AutomationControlled")
self.chrome_options.add_argument("--start-maximized")
# chrome_options.add_argument(
# 'user-agent=' + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36')
# chrome_options.add_argument('--headless')
self.browser = webdriver.Chrome(service=self.path, chrome_options=self.chrome_options)
self.wait = WebDriverWait(self.browser, 20)
def translate(self):
word = input("Input your word: ")
self.get_input_language_type(word)
self.wait.until(EC.presence_of_element_located((By.ID, "js_fanyi_output_resultOutput")))
result = self.browser.find_element(By.ID, 'js_fanyi_output_resultOutput')
print(result.text)
return
def get_input_language_type(self,word):
self.browser.get("https://fanyi.youdao.com/index.html#/")
time.sleep(2)
self.browser.refresh()
self.wait.until(EC.presence_of_element_located((By.ID, "js_fanyi_input")))
input_word = self.browser.find_element(By.ID, "js_fanyi_input")
input_word.send_keys(word)
return
if __name__ == "__main__":
test = Translate()
test.translate()
test.browser.close()
# 腾讯翻译君 不用登陆字符5000
# 腾讯翻译君 不用登陆字符5000
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
import time
from selenium.webdriver.support.ui import Select
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.chrome.service import Service
class Translate():
def __init__(self):
self.url = "https://fanyi.qq.com/"
self.header = {
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/14.0.835.163 Safari/535.1"}
self.chrome_driver = r'D:\cmd100\chromedriver.exe'
self.path = Service(self.chrome_driver)
self.chrome_options = webdriver.ChromeOptions()
self.chrome_options.binary_location = r'D:\Google\Chrome\Application\chrome.exe'
self.chrome_options.add_argument('--disable-gpu')
self.chrome_options.add_argument('--ignore-certificate-errors')
self.chrome_options.add_argument("--disable-blink-features=AutomationControlled")
self.chrome_options.add_argument("--start-maximized")
# chrome_options.add_argument(
# 'user-agent=' + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36')
# chrome_options.add_argument('--headless')
self.browser = webdriver.Chrome(service=self.path, chrome_options=self.chrome_options)
self.wait = WebDriverWait(self.browser, 20)
def translate(self):
word = input("Input your word: ")
self.get_input_language_type(word)
self.wait.until(EC.presence_of_element_located((By.CLASS_NAME, "text-dst")))
result = self.browser.find_element(By.CLASS_NAME, 'text-dst')
print(result.text)
return
def get_input_language_type(self,word):
self.browser.get("https://fanyi.qq.com/")
time.sleep(2)
self.browser.refresh()
self.wait.until(EC.presence_of_element_located((By.CLASS_NAME, "textinput")))
input_word = self.browser.find_element(By.CLASS_NAME, "textinput")
input_word.send_keys(word)
return
if __name__ == "__main__":
test = Translate()
test.translate()
test.browser.close()
# Deepl 注册字符5000 测试失败
# Deepl 注册字符5000 测试失败
import requests
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
import time
from selenium.webdriver.support.ui import Select
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.chrome.service import Service
from base import BaseCore
baseCore = BaseCore.BaseCore()
class Translate():
def __init__(self):
self.url = "https://www.deepl.com/translator"
self.header = {
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/14.0.835.163 Safari/535.1"}
self.chrome_driver = r'D:\cmd100\chromedriver.exe'
self.path = Service(self.chrome_driver)
self.chrome_options = webdriver.ChromeOptions()
self.chrome_options.binary_location = r'D:\Google\Chrome\Application\chrome.exe'
self.chrome_options.add_argument('--disable-gpu')
self.chrome_options.add_argument('--ignore-certificate-errors')
self.chrome_options.add_argument("--disable-blink-features=AutomationControlled")
self.chrome_options.add_argument("--start-maximized")
# chrome_options.add_argument(
# 'user-agent=' + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36')
# chrome_options.add_argument('--headless')
self.browser = webdriver.Chrome(service=self.path, chrome_options=self.chrome_options)
self.wait = WebDriverWait(self.browser, 20)
def translate(self):
word = input("Input your word: ")
self.get_input_language_type(word)
time.sleep(5)
self.wait.until(EC.presence_of_element_located((By.XPATH, '//*[@id="headlessui-tabs-panel-7"]/div/div[1]/section/div/div[2]/div[3]/section/div[1]/d-textarea/div')))
result = self.browser.find_element(By.XPATH, '//*[@id="headlessui-tabs-panel-7"]/div/div[1]/section/div/div[2]/div[3]/section/div[1]/d-textarea/div')
print(result.text)
return
def get_input_language_type(self,word):
self.browser.get("https://www.deepl.com/translator")
time.sleep(2)
self.browser.refresh()
self.wait.until(EC.presence_of_element_located((By.XPATH, '//*[@id="headlessui-tabs-panel-7"]/div/div[1]/section/div/div[2]/div[1]/section/div/div[1]/d-textarea')))
input_word = self.browser.find_element(By.XPATH, '//*[@id="headlessui-tabs-panel-7"]/div/div[1]/section/div/div[2]/div[1]/section/div/div[1]/d-textarea')
input_word.send_keys(word)
return
if __name__ == "__main__":
# test = Translate()
# test.translate()
# test.browser.close()
headers = {
'Accept': '*/*',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Content-Length': '914',
'Content-Type': 'application/json',
'Cookie': 'INGRESSCOOKIE=5af7d91f1e42627f47a49d13c8df6af3|a6d4ac311669391fc997a6a267dc91c0; userCountry=CN; releaseGroups=5418.AAEXP-4408.1.1_5421.AAEXP-4411.1.1_863.DM-601.2.2_2413.DWFA-524.2.4_2465.DWFA-636.2.3_4476.DWFA-695.2.2_5408.AAEXP-4398.2.1_975.DM-609.2.3_5410.AAEXP-4400.1.1_5424.AAEXP-4414.1.1_4121.WDW-356.1.3_4475.DWFA-693.2.2_5409.AAEXP-4399.2.1_5431.AAEXP-4421.1.1_5436.AAEXP-4426.1.1_5560.DWFA-638.2.1_976.DM-667.2.3_1577.DM-594.2.3_2459.TC-850.2.3_5422.AAEXP-4412.1.1_5425.AAEXP-4415.1.1_220.DF-1925.1.9_1119.B2B-251.2.4_5407.AAEXP-4397.1.1_5411.AAEXP-4401.2.1_5419.AAEXP-4409.1.1_3939.B2B-596.1.1_4297.DF-3763.2.2_4322.DWFA-689.2.2_1444.DWFA-362.2.2_1483.DM-821.2.2_2455.DPAY-2828.2.2_2964.DWFA-616.2.2_3283.DWFA-661.2.2_5412.AAEXP-4402.2.1_5428.AAEXP-4418.1.1_5435.AAEXP-4425.1.1_4478.SI-606.2.1_5375.WDW-385.2.1_5416.AAEXP-4406.2.1_866.DM-592.2.2_1583.DM-807.2.5_2962.DF-3552.1.5_3127.DM-1032.2.2_3613.WDW-267.2.2_5432.AAEXP-4422.1.1_5562.DWFA-732.1.1_5561.WDW-395.1.1_1780.DM-872.2.2_2345.DM-1001.2.2_2373.DM-1113.2.4_3614.DWFA-95.2.2_5420.AAEXP-4410.1.1_5433.AAEXP-4423.1.1_1571.DM-791.2.4_1585.DM-900.2.3_1997.DM-941.2.3_2973.DWFA-588.2.2_5426.AAEXP-4416.1.1_2393.DWFA-595.2.4_3788.TACO-41.2.2_5413.AAEXP-4403.2.1_5414.AAEXP-4404.2.1_5430.AAEXP-4420.1.1_5417.AAEXP-4407.1.1_5423.AAEXP-4413.1.1_2274.DM-952.2.2_2399.WDW-164.1.1_2499.DWFA-657.2.3_2974.DWFA-587.2.3_3585.ACL-523.1.1_2055.DM-814.2.3_2464.DM-1175.2.2_2656.DM-1177.2.2_5434.AAEXP-4424.1.1_5427.AAEXP-4417.1.1_2356.B2B-515.2.2_3586.DF-3635.1.4_4321.B2B-679.2.2_4828.TACO-68.2.3_4829.TACO-91.2.3_3587.DWFA-653.2.2_4831.WDW-341.2.2_4853.DF-3503.1.1_5376.WDW-360.2.1_5415.AAEXP-4405.2.1_2497.WDW-209.2.2_5429.AAEXP-4419.1.1; dapUid=c2d67210-6f35-4bc5-b9a1-40d512bbb878; dapVn=1; LMTBID=v2|27b68f9f-2191-450b-80fd-391db06237fc|bccba0a1bf17e78d03c14ada051c38bb; privacySettings=%7B%22v%22%3A%221%22%2C%22t%22%3A1704758400%2C%22m%22%3A%22LAX%22%2C%22consent%22%3A%5B%22NECESSARY%22%2C%22PERFORMANCE%22%2C%22COMFORT%22%2C%22MARKETING%22%5D%7D; __cf_bm=m_ZHpHuOYEKKo.9dwf6BGpiuajZvYkpB.XXH5YxuwtU-1704791184-1-AetRDdjia/Uo3ASo/W7XwDF9GDBTD9ysjZkssiUL5d2XR3gHF4cmFbldDC/k1TAxxwfiYoP4TAk4rTGw7QYkoXM=; dapSid=%7B%22sid%22%3A%2231048f73-f6fc-4804-8ffb-8897514b36e2%22%2C%22lastUpdate%22%3A1704791875%7D',
'Origin': 'https://www.deepl.com',
'Referer': 'https://www.deepl.com/',
'Sec-Ch-Ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"',
'Sec-Ch-Ua-Mobile': '?0',
'Sec-Ch-Ua-Platform': '"Windows"',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-site',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
}
text = '''In compliance with the provisions of the Financial Services '''
aa = {"jsonrpc":"2.0","method": "LMT_handle_jobs","params":{"jobs":[{"kind":"default","sentences":[{"text":f"{text}","id":1,"prefix":" "}],"raw_en_context_before":[],"raw_en_context_after":[],"preferred_num_beams":4,"quality":"fast"}],"lang":{"target_lang":"ZH","preference":{"weight":{"DE":0.15016,"EN":5.99281,"ES":0.08125,"FR":0.12516,"IT":0.0273,"JA":0.03235,"NL":0.02295,"PL":0.01412,"PT":0.01891,"RU":0.01285,"ZH":0.32337,"BG":0,"CS":0.00495,"DA":0.00216,"EL":0,"ET":0.0067,"FI":0.0028,"HU":0.00912,"ID":0.00738,"LV":0.00172,"LT":0.00103,"RO":0.09677,"SK":0.00349,"SL":0.0124,"SV":0.01817,"TR":0.00596,"UK":0,"KO":0.00021,"NB":0.01005},"default":"default"},"source_lang_user_selected":"auto"},"priority":-1,"commonJobParams":{"mode":"translate","textType":"plaintext","browserType":1},"timestamp":1704791876600},"id":17330022}
url = 'https://www2.deepl.com/jsonrpc?method=LMT_handle_jobs'
ip = baseCore.get_proxy()
req = requests.post(url=url, headers=headers, data=aa, proxies=ip)
bb = req.json()
print(bb)
# 谷歌翻译 不用登陆字符5000
# 谷歌翻译 不用登陆字符5000
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
import time
from selenium.webdriver.support.ui import Select
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.chrome.service import Service
class Translate():
def __init__(self):
self.url = "https://translate.google.cn/"
self.header = {
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/14.0.835.163 Safari/535.1"}
self.chrome_driver = r'D:\cmd100\chromedriver.exe'
self.path = Service(self.chrome_driver)
self.chrome_options = webdriver.ChromeOptions()
self.chrome_options.binary_location = r'D:\Google\Chrome\Application\chrome.exe'
self.chrome_options.add_argument('--disable-gpu')
self.chrome_options.add_argument('--ignore-certificate-errors')
self.chrome_options.add_argument("--disable-blink-features=AutomationControlled")
self.chrome_options.add_argument("--start-maximized")
# chrome_options.add_argument(
# 'user-agent=' + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36')
# chrome_options.add_argument('--headless')
self.browser = webdriver.Chrome(service=self.path, chrome_options=self.chrome_options)
self.wait = WebDriverWait(self.browser, 20)
def translate(self):
word = input("Input your word: ")
self.get_input_language_type(word)
self.wait.until(EC.presence_of_element_located((By.CLASS_NAME, "text-dst")))
result = self.browser.find_element(By.CLASS_NAME, 'text-dst')
print(result.text)
return
def get_input_language_type(self,word):
self.browser.get("https://translate.google.com.hk/?hl=zh-CN&sourceid=cnhp")
time.sleep(2)
self.browser.refresh()
self.wait.until(EC.presence_of_element_located((By.XPATH, '//*[@id="yDmH0d"]/c-wiz/div/div[2]/c-wiz/div[2]/c-wiz/div[1]/div[2]/div[3]/c-wiz[1]')))
input_word = self.browser.find_element(By.XPATH, '//*[@id="yDmH0d"]/c-wiz/div/div[2]/c-wiz/div[2]/c-wiz/div[1]/div[2]/div[3]/c-wiz[1]')
input_word.send_keys(word)
return
if __name__ == "__main__":
test = Translate()
test.translate()
test.browser.close()
#百度翻译 不登录翻译1000字 登录翻译5000字
#百度翻译 不登录翻译1000字 登录翻译5000字
import re
import string
import time
from urllib.parse import quote
import pymongo
from bs4 import BeautifulSoup
from bson import ObjectId
from lxml import etree
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.chrome.service import Service
from base.BaseCore import BaseCore
baseCore = BaseCore()
class Translate():
def __init__(self):
""""
initialize the class, and include the fundamental attributes
"""
# self._lang_list = ['zh', 'en', 'kor', 'fra', 'jp', 'el', 'ru']
# self._lang_list_original = ["中文", "英语", "韩语", "法语", "日语", "希腊语", "俄语"]
# self._num = len(self._lang_list)
self.url = "https://fanyi.baidu.com/#{}/{}/{}"
# self.url = "https://fanyi.baidu.com/#"
self.header = {
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/14.0.835.163 Safari/535.1"}
self.db_storage = pymongo.MongoClient('mongodb://114.115.221.202:27017', username='admin', password='ZZsn@9988').中科软['数据源_0106']
def createDriver(self):
chrome_driver = r'D:\cmd100\chromedriver.exe'
path = Service(chrome_driver)
chrome_options = webdriver.ChromeOptions()
chrome_options.binary_location = r'D:\Google\Chrome\Application\chrome.exe'
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--ignore-certificate-errors')
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
chrome_options.add_argument("--start-maximized")
proxy = baseCore.get_proxy()
chrome_options.add_argument('--proxy-server=' + proxy['http'].split('://')[1])
chrome_options.add_argument(
'user-agent=' + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36')
# chrome_options.add_argument('--headless')
browser = webdriver.Chrome(service=path, chrome_options=chrome_options)
return browser
def translate(self, sentence, browser, lang):
sentence_ = sentence
# browser = self.createDriver()
wait = WebDriverWait(browser, 20)
try:
word_type = self.get_input_language_type(sentence_, browser, wait)
except:
browser.quit()
browser = self.createDriver()
result, browser = self.translate(sentence_, browser, lang)
return result, browser
if word_type:
if word_type == lang:
pass
else:
word_type = lang
url_ = self.url.format(word_type, 'zh', sentence_)
url = quote(url_, safe='/:#')
browser.set_page_load_timeout(10)
try:
browser.get(url)
wait.until(EC.presence_of_element_located(
(By.XPATH, '//*[@id="main-outer"]/div/div/div[1]/div[2]/div[1]/div[2]/div/div/div[1]/p[2]')))
result_ = browser.find_element(By.XPATH,
'//*[@id="main-outer"]/div/div/div[1]/div[2]/div[1]/div[2]/div/div/div[1]/p[2]')
result = result_.text.strip()
print(f'翻译后的句子:{result}')
return result, browser
except:
browser.quit()
print(f'翻译失败,重新翻译。当前句子为{sentence_}')
browser = self.createDriver()
result, browser = self.translate(sentence_, browser, lang)
return result, browser
def get_input_language_type(self, word, browser, wait):
browser.get("https://fanyi.baidu.com/")
wait.until(EC.presence_of_element_located((By.ID, "baidu_translate_input")))
input_word = browser.find_element(By.ID, "baidu_translate_input")
input_word.send_keys(word)
wait.until(EC.presence_of_element_located(
(By.XPATH, '//*[@id="main-outer"]/div/div/div[1]/div[1]/div[1]/a[1]/span/span')))
word_type = browser.find_element(By.XPATH,
'//*[@id="main-outer"]/div/div/div[1]/div[1]/div[1]/a[1]/span/span')
word_type = word_type.get_attribute("data-lang")
return word_type
def is_punctuation(self, char):
punctuation = string.punctuation + '、' + '(' + '…' + ')' + '《' + '》' + '“' + '”' + ':' + ';' + '!' + ' ' + '。' + ': '
return char in punctuation
def getTags(self, sentence_tag):
tag_list = []
tree = etree.HTML(sentence_tag)
root = tree
# 使用XPath表达式选取所有的标签
all_tags = root.xpath("//*")
# 打印结果
for tag in all_tags:
print(tag.tag)
tag_list.append(tag.tag)
return tag_list
def sentence_split_sentence(self, contentWithTag, pattern):
# pattern = re.compile(r'[^\n]+(?=\n)|[^\n]+$')
match_group = pattern.finditer(contentWithTag)
sentences = []
if match_group:
for _ in match_group:
start_end_index = _.span()
sentences.append((start_end_index[0], start_end_index[1], _.group()))
if (not sentences) and (len(contentWithTag) >= 4):
sentences.append((0, len(contentWithTag), contentWithTag))
return sentences
def jionstr(self, html):
paragraphs = []
current_sentence = ''
for tag in html.find_all(text=True):
sentence = str(tag)
if sentence.startswith('www.') or sentence.startswith('http://') or sentence.startswith('https://'):
continue
if sentence == '\n' or sentence == '\t' or sentence == ' ':
continue
if self.is_punctuation(sentence):
continue
# 检查拼接后的句子长度是否超过1000字
if len(current_sentence) + len(sentence) <= 1000:
current_sentence += sentence
else:
paragraphs.append(current_sentence.strip())
current_sentence = sentence
return paragraphs
def gethtml(self):
# data = self.db_storage.find_one({'titleForeign':{'$ne':''}})
# try:
# browser = self.createDriver()
# except:
#
# browser = self.createDriver()
# datas = self.db_storage.find({'postCode': '2', 'newsTime': {'$gte': '2024-01-01', '$lt': '2024-01-02'}}).limit(10)
datas = self.db_storage.find({'_id':ObjectId('659bb28148168960c7cc09c3')})
for data in datas:
contentWithTag = data['richTextForeign']
# html = BeautifulSoup(contentWithTag, 'html.parser')
# content = html.text
# lang = baseCore.detect_language(content)
# if lang == 'zh':
# return contentWithTag
# for tag in html.find_all(text=True):
# print(f'{tag}|{tag.parent.name}')
# print('\n')
tag_list = self.getTags(sentence_tag[2])
pattern2 = re.compile(r'[^.*?]+(?=<.*?>)|[^.*?]+$')
parts_content = self.sentence_split_sentence(sentence_tag[2], pattern2)
sentence_xml = BeautifulSoup(sentence_tag[2], 'lxml')
sentence = sentence_xml.text
if __name__ == "__main__":
test = Translate()
# test.translate()
# print(test.gethtml())
test.gethtml()
#coding=utf-8
#coding=utf-8
......@@ -232,7 +232,7 @@ class BaiduSpider(object):
current_datetime = datetime.datetime.now()
delta = datetime.timedelta(days= 1)
publishtime = current_datetime - delta
elif '今天' in publishtime or'小时前' in publishtime or '分钟前' in publishtime :
elif '今天' in publishtime or '小时前' in publishtime or '分钟前' in publishtime :
delta = datetime.timedelta(hours= 5)
publishtime = current_datetime - delta
elif '年' in publishtime and '月' in publishtime :
......@@ -286,11 +286,16 @@ class BaiduSpider(object):
if len(lists)==0:
time.sleep(5)
# self.logger.info(f"{self.searchkw}获取{len(lists)}条数据")
repeatCounts = 0
for detail in lists:
durl=detail['detailUrl']
is_member = self.r.sismember('pybaidu_baidu_'+self.wordsCode, durl)
if is_member:
self.logger.info(f"{self.searchkw}已存在{detail['title']}")
repeatCounts += 1
if repeatCounts/len(lists)>0.5:
self.logger.info(f"{self.searchkw}首页已存在50%以上,结束抓取")
return
continue
self.detailList.put(detail)
......@@ -304,8 +309,8 @@ class BaiduSpider(object):
timeFlag=False
while hasnext == '下一页 >':
try:
# if self.page_num==5:
# break
if self.page_num==5:
break
self.page_num = self.page_num + 1
self.logger.info(f"{self.searchkw}开始抓取第{self.page_num}页...")
try:
......@@ -317,6 +322,7 @@ class BaiduSpider(object):
flag, lists = self.parse_page()
if len(lists)<1:
break
repeated_counts = 0
for detail in lists:
publishTag=detail['publishTag']
# if publishTag:
......@@ -330,6 +336,10 @@ class BaiduSpider(object):
is_member = self.r.sismember('pybaidu_baidu_'+self.wordsCode, durl)
if is_member:
self.logger.info(f"{self.searchkw}已存在{detail['title']}")
repeated_counts += 1
if repeated_counts/len(lists) > 0.5:
self.logger.info(f"{self.searchkw}第{self.page_num}页已存在过多,跳出循环")
return
continue
self.detailList.put(detail)
if timeFlag:
......
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
......@@ -239,19 +239,19 @@ if __name__ == '__main__':
# #codeList.append('KW-20220108-0004')
# logger.info(f'开始采集{codeid}')
codeList = [
'KW-20220809-0004',
'KW-20220524-0004',
'KW-20220809-0005',
'KW-20220824-0001',
'KW-20220809-0002',
'KW-20220809-0003',
# 'KW-20220809-0004',
# 'KW-20220524-0004',
# 'KW-20220809-0005',
# 'KW-20220824-0001',
# 'KW-20220809-0002',
# 'KW-20220809-0003',
'KW-20220826-0001',
'KW-20220602-0003',
'KW-20220602-0002',
'KW-20220113-0007',
'KW-20220113-0006',
'KW-20220108-0004',
'KW-20220113-0004'
# 'KW-20220602-0003',
# 'KW-20220602-0002',
# 'KW-20220113-0007',
# 'KW-20220113-0006',
# 'KW-20220108-0004',
# 'KW-20220113-0004'
]
for codeid in codeList:
try:
......@@ -269,7 +269,7 @@ if __name__ == '__main__':
continue
if kwList:
# 创建一个线程池,指定线程数量为4
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
# 提交任务给线程池,每个任务处理一个数据
results = [executor.submit(baiduTaskJob.runSpider, data) for data in kwList]
# 获取任务的执行结果
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论