提交 2ea9c487 作者: LiuLiYuan

微软翻译 02/27

上级 369b5d9f
from flask import Flask, request, jsonify
app = Flask(__name__)
import os
import pyautogui
from bs4 import BeautifulSoup
from retry import retry
from selenium import webdriver
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
import datetime
from bson import ObjectId
import pymongo
from base import BaseCore
baseCore = BaseCore.BaseCore(sqlflg=False)
log = baseCore.getLogger()
pyautogui.FAILSAFE = False
pyautogui.PAUSE = 1
import redis
from selenium.webdriver.edge.options import Options
from tempfile import TemporaryFile
# path = r'D:\soft\msedgedriver.exe'
path = r'F:\spider\117\msedgedriver.exe'
driver = webdriver.Edge(executable_path=path)
# 获取当前活动窗口的标题
def get_active_window_title():
window = pyautogui.getActiveWindow()
log.info(f'当前活动窗口的标题是:{window.title}')
return window.title if window else None
def shiftwindow(n):
pyautogui.keyDown('alt')
for _ in range(n):
pyautogui.press('tab')
pyautogui.keyUp('alt')
pyautogui.sleep(1) # 可选的等待时间,以防操作太快
n += 1
def click(type_name, driver):
# 右键选择翻译
pyautogui.moveTo(500, 400, duration=1)
if type_name == '正文':
try:
if driver.find_element(By.TAG_NAME, 'img').is_displayed():
pass
else:
time.sleep(5)
except:
pass
time.sleep(1)
position_elements = driver.find_elements(By.TAG_NAME, 'p')
for e in position_elements:
driver.execute_script("arguments[0].scrollIntoView();", e)
time.sleep(1)
try:
if e.find_element(By.TAG_NAME, 'a'):
continue
else:
break
except:
break
else:
pyautogui.moveTo(1500, 900, duration=1)
# log.error(f'{type_name}----未找到可点击的元素')
# return None
else:
time.sleep(1)
position_elements = driver.find_elements(By.TAG_NAME, 'div')
for e in position_elements:
try:
if e.find_element(By.TAG_NAME, 'a'):
continue
else:
break
except:
break
else:
log.error(f'{type_name}----未找到可点击的元素')
return None
rightClick = ActionChains(driver)
try:
rightClick.context_click(e).perform()
except:
rightClick.context_click().perform()
@retry(tries=3, delay=1)
def Translate(type_name, file_name, driver):
# driver.get('file:///C:/Users/Administrator/Desktop/aaa.html')
driver.get(f'file:///{file_name}')
window_title = file_name.split('\\')[-1]
flag = driver.find_element(By.TAG_NAME, 'body').text
driver.maximize_window()
edge_handle = driver.window_handles[0]
driver.switch_to.window(edge_handle)
# click(type_name, driver)
time.sleep(1)
n = 1
while window_title not in get_active_window_title():
# print(n)
if 'Edge' in get_active_window_title():
break
time.sleep(1)
log.info('窗口切换操作')
shiftwindow(n)
n += 1
# if n>20:
# break
time.sleep(1)
# if n > 20:
# log.error(f'{type_name}未找到浏览器窗口')
# raise
driver.refresh()
click(type_name, driver)
time.sleep(1)
pyautogui.typewrite(['down'] * 6)
pyautogui.typewrite(["enter"])
time.sleep(1)
js = "window.scrollTo(0,0)"
driver.execute_script(js)
time.sleep(1)
count_ = 0
while driver.find_element(By.TAG_NAME, 'body').text[:500] in flag and count_ < 10:
time.sleep(2)
count_ += 1
if driver.find_element(By.TAG_NAME, 'body').text[:500] in flag:
log.error(f'{type_name}---翻译加载失败')
return None
js = "return action=document.body.scrollHeight"
new_height = driver.execute_script(js)
for i in range(0, new_height, 300):
# js = "var q=document.documentElement.scrollTop=300"
driver.execute_script(js)
driver.execute_script('window.scrollTo(0, %s)' % (i))
time.sleep(1)
# time.sleep(2)
if driver.find_element(By.TAG_NAME, 'body').text[:500] in flag:
log.error(f'{type_name}---翻译失败,重试')
# 使用pyautogui模块模拟按下Alt+Tab键,将Edge浏览器置于最前面
count = 0
while window_title not in get_active_window_title():
if 'Edge' in get_active_window_title():
break
time.sleep(1)
shiftwindow(count)
log.info('窗口切换操作')
count += 1
# if count>5:
# break
# log.info('窗口切换操作')
# pyautogui.hotkey('alt', 'tab')
# 切换到Edge浏览器窗口
# driver.switch_to.window(edge_handle)
# driver.refresh()
raise
page_source = driver.page_source
contentWithTag = BeautifulSoup(page_source, 'html.parser')
if type_name == '正文':
translate_type = str(contentWithTag)
else:
translate_type = contentWithTag.text
# db_storage.update_one({'_id':ObjectId(_id)},{'$set':{'postCode':'18','richText':str(contentWithTag),'postTime':datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}})
# with open(rf'C:\Users\EDY\Desktop\{_id}.html', 'w', encoding='utf-8') as f:
# f.write(str(contentWithTag))
# print(str(contentWithTag))
return translate_type
def save_file(text, driver, num):
if num == 1:
type_name = '标题'
text = f'<div>{text}</div>'
elif num == 2:
type_name = '摘要'
text = f'<div>{text}</div>'
elif num == 3:
type_name = '正文'
else:
type_name = '纯正文文本'
text = f'<div>{text}</div>'
now = time.time()
result = ''
# with open(r'C:\Users\Administrator\Desktop\aaa.html', 'w', encoding='utf-8') as f:
# f.write(str(text))
with TemporaryFile(mode='w+t', delete=False, encoding='utf-8', suffix='.html') as f:
f.write(str(text))
file_name = f.name
try:
result = Translate(type_name, file_name, driver)
if result:
log.info(f'{type_name}翻译用时--{time.time() - now}')
else:
log.error(f'{type_name}翻译失败')
except Exception as e:
log.error(f'{type_name}翻译失败--{e}')
os.remove(file_name)
return result
@app.route('/translate', methods=['POST'])
def doJob():
global driver
start = time.time()
try:
num = int(baseCore.r.spop('translate:num').decode())
except:
num = 0
if num > 50:
driver.close()
driver = webdriver.Edge(executable_path=path)
driver.maximize_window()
rightClick = ActionChains(driver)
rightClick.context_click().perform()
num = 0
num += 1
baseCore.r.sadd('translate:num', num)
log.info('开始翻译')
data = request.get_json() # post请求中获取json数据
try:
title = data['title']
except:
title = ''
try:
summary = data['summary']
except:
summary = ''
try:
contentWithTag = data['contentWithTag']
except:
contentWithTag = ''
try:
content = data['content']
except:
content = ''
# 带标签的标题和摘要
if title:
title_result = save_file(title, driver, 1)
else:
title_result = 'null'
if summary:
summary_result = save_file(summary, driver, 2)
else:
summary_result = 'null'
if contentWithTag:
contentWithTag_result = save_file(contentWithTag, driver, 3)
else:
contentWithTag_result = 'null'
if content:
content_result = save_file(content, driver, 4)
else:
content_result = 'null'
if title_result and summary_result and contentWithTag_result and content_result:
translate_result = {
'status': 'success',
'title': title_result,
'summary': summary_result,
'contentWithTag': contentWithTag_result,
'content': content_result
}
else:
translate_result = {
'status': 'failed',
'title': title_result,
'summary': summary_result,
'contentWithTag': contentWithTag_result,
'content': content_result
}
log.info(f'翻译完成,耗时--{time.time() - start}')
return jsonify(translate_result)
if __name__ == "__main__":
# doJob()
# baseCore.close()
app.run('0.0.0.0', 5001)
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论