提交 de21c2fe 作者: LiuLiYuan

fanyi 01/12

上级 b7d2cc8d
#coding:utf-8
#coding:utf-8
# 百度翻译 不登录翻译1000字 登录翻译5000字
import re
import string
import time
from urllib.parse import quote
import pymongo
from bs4 import BeautifulSoup
from bson import ObjectId
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
# from selenium.webdriver.chrome.service import Service
from selenium.webdriver.firefox.service import Service
from selenium.webdriver.firefox.options import Options
from base.BaseCore import BaseCore
baseCore = BaseCore()
class Translate():
def __init__(self):
self.url = "https://fanyi.baidu.com/#"
self.header = {
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/14.0.835.163 Safari/535.1"}
self.browser = self.createDriver()
self.db_storage = \
pymongo.MongoClient('mongodb://114.115.221.202:27017', username='admin', password='ZZsn@9988').中科软[
'数据源_0106']
def close(self):
self.browser.quit()
def createDriver(self):
# chrome_driver = r'F:\spider\117\chromedriver-win64\chromedriver.exe'
# path = Service(chrome_driver)
# chrome_options = webdriver.ChromeOptions()
# chrome_options.add_argument('--disable-gpu')
# chrome_options.add_argument('--ignore-certificate-errors')
# chrome_options.add_argument("--disable-blink-features=AutomationControlled")
# chrome_options.add_argument("--start-maximized")
# proxy = baseCore.get_proxy()
# chrome_options.add_argument('--proxy-server=' + proxy['http'].split('://')[1])
# chrome_options.add_argument(
# 'user-agent=' + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36')
#
# browser = webdriver.Chrome(service=path, chrome_options=chrome_options)
service = Service(r'F:\spider\firefox\geckodriver_1.exe')
options = Options()
options.set_preference("general.useragent.override",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3")
browser = webdriver.Firefox(options=options, service=service)
return browser
def translate(self, sentence, lang):
sentence_ = sentence
wait = WebDriverWait(self.browser, 20)
try:
word_type = self.get_input_language_type(sentence_, wait)
except:
self.browser.quit()
self.browser = self.createDriver()
result = self.translate(sentence_, lang)
return result
if word_type:
if word_type == lang:
pass
else:
word_type = lang
url = self.url.format(word_type, 'zh', sentence_)
url = quote(url, safe='/:#')
self.browser.set_page_load_timeout(10)
try:
self.browser.get(url)
wait.until(EC.presence_of_element_located(
(By.XPATH, '//*[@id="main-outer"]/div/div/div[1]/div[2]/div[1]/div[2]/div/div/div[1]/p[2]')))
result_ = self.browser.find_element(By.XPATH,
'//*[@id="main-outer"]/div/div/div[1]/div[2]/div[1]/div[2]/div/div/div[1]/p[2]')
result = result_.text.strip()
return result
except:
self.browser.quit()
self.browser = self.createDriver()
result = self.translate(sentence_, lang)
return result
def get_input_language_type(self, word, wait):
self.browser.get("https://fanyi.baidu.com/")
wait.until(EC.presence_of_element_located((By.ID, "baidu_translate_input")))
input_word = self.browser.find_element(By.ID, "baidu_translate_input")
input_word.send_keys(word)
wait.until(EC.presence_of_element_located(
(By.XPATH, '//*[@id="main-outer"]/div/div/div[1]/div[1]/div[1]/a[1]/span/span')))
word_type = self.browser.find_element(By.XPATH,
'//*[@id="main-outer"]/div/div/div[1]/div[1]/div[1]/a[1]/span/span')
word_type = word_type.get_attribute("data-lang")
return word_type
def is_punctuation(self, char):
punctuation = string.punctuation + '、' + '(' + '…' + ')' + '《' + '》' + '“' + '”' + ':' + ';' + '!' + ' ' + '。'
return char in punctuation
def sentence_split_sentence(self, contentWithTag):
pattern = re.compile(r'[^\n]+(?=\n)|[^\n]+$')
match_group = pattern.finditer(contentWithTag)
sentences = []
if match_group:
for _ in match_group:
start_end_index = _.span()
sentences.append((start_end_index[0], start_end_index[1], _.group()))
if (not sentences) and (len(contentWithTag) >= 4):
sentences.append((0, len(contentWithTag), contentWithTag))
return sentences
def jionstr(self, html):
paragraphs = []
current_sentence = ''
for tag in html.find_all(text=True):
sentence = str(tag)
if sentence == '\n' or sentence == '\t' or sentence == ' ':
continue
if self.is_punctuation(sentence):
continue
if sentence.startswith('https://') or sentence.startswith('http://') or sentence.startswith('www.'):
continue
# 检查拼接后的句子长度是否超过1000字
if len(current_sentence) + len(sentence) <= 1000:
current_sentence += sentence
else:
paragraphs.append(current_sentence.strip())
current_sentence = sentence
return paragraphs
def gethtml(self, contentWithTag):
tag_list = []
html = BeautifulSoup(contentWithTag, 'html.parser')
content = html.text
lang = baseCore.detect_language(content)
if lang == 'zh':
return contentWithTag
for tag in html.find_all(text=True):
sentence = str(tag).strip()
tag_list.append(sentence)
sentence = ''
for tag in tag_list:
if tag == '':
continue
sentence += f'{tag}😊'
# if len(sentence) == 1:
# continue
# if sentence == '\n' or sentence == '\t' or sentence == ' ':
# continue
# if self.is_punctuation(sentence):
# continue
#print(sentence)
result = ''
while True:
if len(sentence) > 1000:
index_1000 = sentence[999]
# 判断该字符是不是逗号或句号
if index_1000 == '.' or index_1000 == '。' or index_1000 == ',' or index_1000 == ',':
# 如果是标点符号
result += self.translate(sentence[:1000].strip(), lang)
sentence = sentence[1000:]
else:
# 如果不是标点符号
i = 1000
while i >= 0:
j = i - 1
if j <= 0:
break
index_punctuation = sentence[j]
if index_punctuation == '.' or index_punctuation == '。' or index_punctuation == ',' or index_punctuation == ',':
result += self.translate(sentence[:j + 1].strip(), lang)
sentence = sentence[j + 1:]
# result += self.translate(sentence[j + 1:].strip(), lang)
break
else:
i = j
continue
if i == 1:
result += self.translate(sentence[:1000].strip(), lang)
sentence = sentence[1000:]
else:
# 翻译
result += self.translate(sentence, lang)
time.sleep(2)
break
#print(result)
sentences = result.split('😊')
print(len(sentences))
num = 0
for tag in html.find_all(text=True):
if tag == '':
continue
sentence = sentences[num]
tag.replace_with(sentence)
num += 1
return str(html.prettify()) + '<p/><br>译文来源:微软自动翻译<br></p>'
if __name__ == "__main__":
test = Translate()
db_storage = pymongo.MongoClient('mongodb://114.115.221.202:27017/', username='admin', password='ZZsn@9988').中科软[
'数据源_0504']
data = db_storage.find_one({'_id': ObjectId('656f14e84d6d77428c713271')})
a = data['richTextForeign']
result = test.gethtml(a)
print(result)
test.close()
\ No newline at end of file
# D:\Program Files\Python36
# D:\Program Files\Python36
# -*- coding: utf-8 -*-
# @Time : 2022/2/19 14:20
from pyquery import PyQuery as pq
import xlrd
from bson.objectid import ObjectId
import json
import time
import requests
import datetime
import pymongo
import pymysql
import warnings
import random
from hashlib import md5
import redis
import re
import sys
from base import BaseCore
import baidufanyi
warnings.filterwarnings("ignore", category=DeprecationWarning)
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
filterWords = ['纽约州', '折扣推荐', '股指上涨', '涨跌不一', '二手', '免费发帖', '超市特价', '爱尔兰移民', '藏独', '疆独', '台独', '英语广播', '高通胀', '苹果官网',
'人才招聘', '工作机会', 'GAZOO', '尸体', '美通社日历', '苹果日报', 'HPV', '皇家马德里', '安阳', '柏林区', '皇家马德里', '傅首尔', '???',
'? ? ?', '火灾', '偷盗', '盗抢', '死亡', '性侵', '毒品', '犯罪', '报名', '美剧', '男子', '女子', '嫌犯', '摘要', '要闻', 'LGBT',
'跨性别', '火灾', '马拉松', '球员', '偷盗', '着火', '梅西', '盗抢', '餐馆', '新冠疫情', '银行', '篮球', '疫苗', '死亡', '三人篮球', '新冠',
'三人女篮', '谋杀', '欧罗巴联赛', '肇事逃逸', '性侵', '新冠病例', '赛车公司', '人权捍卫', '内马尔', '吸毒', '失踪者', '超级明星', '毒品', '遗体',
'球星', '犯罪', '致命袭击', '遇袭', '报名', '巨额报酬', '群殴', '美剧', '链接', '持刀袭击', '男子', '大卖场', '纵火', '白人', '开枪打死',
'泽连斯基', '女子', '新冠变异株', '特朗普', '嫌犯', '大火', '利率', '嫌疑人', '起火', '脸书', '世界杯', '枪击', '马德里大师赛', '四强', '抄袭',
'中国网球', '八强', '虐待', '无人机袭击', '音乐厅', '患者', '加冕典礼', '楼市', '性丑闻', '加冕仪式', '股票', '监狱', '枪杀', '股市', '牢房',
'枪手', '警察', '百元店', '票房', '警方', '橄榄球联盟', '娱乐圈', 'WSL', '超级碗', '唱片', '超级联赛', '美妆', '枪杀', '化妆', '暴击', '韩妆',
'验尸官', '拐卖', '大麻', '买卖人口', '儿童、孩子们', '足球裁判', '持刀', '死于', '毒贩', '车祸', '入狱', '华裔', '抢劫', '种族歧视', '法庭',
'辱骂', '谋杀', '婚礼', '价格', '车辆着火', '板球运动员', '招聘', '重返赛场', '明星', '赛场', '醉汉', '乌克兰战争', '持枪', '纳粹', '皇家空军',
'纳粹分子', '强迫', '被枪杀', '残疾', '命案', '读博', '洋基队', '航班', '意甲冠军', '高尔夫球', '国际米兰', '商店', 'LGBT', '陪审团', '虐待',
'运动员', '妻子', '新型冠状病毒', '新冠', '年薪', '房市', '房地产', '地产', '星级主厨', '骗局', '老年人', '餐厅', '球队', '诈骗', '维权',
'耐力锦标赛', '拉力赛', 'WRC', 'WTRC', '耐力赛', 'TCR', '河南', '排位赛', '色情', '阴道', '凯尔特人', '天安门']
judgmentWords = {'lejournaldugrandparis-成功的故事': '巴黎', 'lejournaldugrandparis-大巴黎': '巴黎',
'lejournaldugrandparis-大巴黎报革新': '巴黎', 'lejournaldugrandparis-地点': '巴黎',
'lejournaldugrandparis-公共市场': '巴黎', 'lejournaldugrandparis-规划': '巴黎',
'lejournaldugrandparis-国际的': '巴黎', 'lejournaldugrandparis-机构': '巴黎',
'lejournaldugrandparis-基础设施': '巴黎', 'lejournaldugrandparis-社区': '巴黎', 'lejournaldugrandparis-生长': '巴黎',
'lejournaldugrandparis-托儿所': '巴黎', 'lejournaldugrandparis-吸引力': '巴黎', 'lemonde-世界报巴黎': '巴黎',
'mylondon-北伦敦': '伦敦', 'mylondon-东伦敦': '伦敦', 'mylondon-怀旧之情': '伦敦', 'mylondon-驾驶': '伦敦',
'mylondon-交通出行': '伦敦', 'mylondon-教育': '伦敦', 'mylondon-伦敦商业': '伦敦', 'mylondon-伦敦市中心': '伦敦',
'mylondon-南伦敦': '伦敦', 'mylondon-西伦敦': '伦敦', 'mylondon-新闻观点': '伦敦', 'mylondon-政治': '伦敦',
'nydailynews-2021 年纽约市选举': '纽约', 'nydailynews-布朗克斯': '纽约', 'nydailynews-布鲁克林': '纽约',
'nydailynews-观点': '纽约', 'nydailynews-皇后区': '纽约', 'nydailynews-教育': '纽约', 'nydailynews-曼哈顿': '纽约',
'nydailynews-纽约': '纽约', 'nydailynews-纽约每日新闻新冠病毒': '纽约', 'nydailynews-纽约政治': '纽约', 'OTTAWAZINE': '渥太华',
'ouest-france-法兰西西部报法兰西岛': '巴黎', 'standard-华为': '伦敦', 'standard-健康': '伦敦', 'standard-教育': '伦敦',
'standard-可持续标准': '伦敦', 'standard-可持续发展': '伦敦', 'standard-伦敦': '伦敦', 'standard-伦敦市长': '伦敦',
'standard-商业': '伦敦', 'standard-世界疫苗': '伦敦', 'standard-新冠病毒': '伦敦', 'standard-运输': '伦敦',
'straitstimes-工作': '新加坡', 'straitstimes-公司与市场': '新加坡', 'straitstimes-环境': '新加坡',
'straitstimes-健康': '新加坡', 'straitstimes-经济': '新加坡', 'straitstimes-科技新闻': '新加坡',
'straitstimes-社区': '新加坡', 'straitstimes-消费者': '新加坡', 'straitstimes-育儿与教育': '新加坡',
'straitstimes-运输': '新加坡', 'straitstimes-政治': '新加坡', 'straitstimes-住房': '新加坡', 'tokyo-东京': '东京',
'tokyo-东京新闻新冠': '东京', 'tokyo-教育': '东京', 'tokyo-经济': '东京', 'tokyo-社会': '东京', 'tokyo-社论': '东京',
'tokyo-首都圈新闻': '东京', 'tokyo-政治': '东京', 'xxxx-sssss': '北京', '爱岛文化': '都柏林', '爱尔兰吧': '都柏林', '柏林伙伴': '柏林',
'朝日东京': '东京', '朝日世論調査': '东京', '朝日小池都政': '东京', '城市知道温哥华': '温哥华', '大华府华人资讯网': '温哥华', '德国华人街': '柏林',
'德国热线': '柏林', '德中网': '柏林', '东京在线': '东京', '俄罗斯龙报': '莫斯科', '法国巴黎大区': '巴黎', '法国中文网': '巴黎', '华人志': '都柏林',
'加都人': '渥太华', '伦敦发展促进署(公众号)': '伦敦', '每日俄罗斯在线': '莫斯科', '纽约华人资讯网': '纽约', '纽约时间': '纽约', '企航新加坡': '新加坡',
'瞧纽约': '纽约', '温哥华头条': '温哥华', '渥太华CFC': '渥太华', '新加坡红蚂蚁': '新加坡', '新加坡华人圈': '新加坡', '新加坡圈': '新加坡',
'新加坡鱼尾文': '新加坡', '英伦投资客': '伦敦', '遇见纽约': '纽约', '中国驻英国大使馆': '伦敦', '英国驻华大使馆': '伦敦', '法国驻华大使馆': '巴黎',
'法国旅游发展署': '巴黎', '德国印象': '柏林', '德国联邦外贸与投资署': '柏林', '赴德研究': '柏林', '投资德国北威州': '科隆', '中国驻杜塞尔多夫总领馆': '科隆',
'凤凰欧洲': '罗马', '中国驻欧盟使团': '罗马', '马德里投资代表处': '马德里', '中国驻罗马尼亚大使馆': '布加勒斯特', '以色列经济与产业部': '特拉维夫',
'朝鲜日报': '首尔', '首尔日报': '首尔', '首尔中国文化中心': '首尔', '中国驻日本大使馆': '东京', '中国驻新加坡大使馆': '新加坡', '中国驻纽约总领馆': '纽约',
'中国驻法国大使馆网': '巴黎', 'CBS新闻': '洛杉矶', '洛杉矶城市观察': '洛杉矶', '今日洛杉矶': '洛杉矶', '旧金山湾区华人资讯': '旧金山',
'旧金山市政厅新闻': '旧金山', '旧金山新闻': '旧金山', '旧金山政治新闻': '旧金山', '旧金山商业科技新闻': '旧金山', '多伦多城市新闻': '多伦多',
'莫斯科时报': '莫斯科', 'goToronto': '多伦多', '多伦多时间': '多伦多', '西闻': '马德里', '马德里市20分钟报': '马德里', '马德里新闻': '马德里',
'柏林晨报-柏林新闻': '柏林', '柏林晨报-初创企业新闻': '柏林', '东京观光指南 GOTOKYO': '东京', 'BBC-英格兰': '伦敦', 'BBC-伦敦': '伦敦',
'metro-伦敦': '伦敦', 'metro-英国': '伦敦', '马德里自治区-好日报': '马德里自治区', 'SurMadrid-马德里自治区': '马德里自治区',
'SurMadrid-马德里': '马德里自治区', '欧华集团': '马德里自治区', '西班牙华人街': '马德里自治区', 'ABC纽约': '纽约', 'FOX5纽约': '纽约',
'CBS纽约': '纽约', '东亚日报-文化': '首尔', '东亚日报-政治': '首尔', '韩联社-政治': '首尔', '建设新闻': '大阪府', '朝日新闻_大阪府': '大阪府',
'客观日本': '大阪府', '波士顿中文网': '波士顿', '波士顿留学生网': '波士顿', '波士顿发展新闻': '波士顿', '今日波士顿': '波士顿', '韩国亚洲经济': '首尔',
'亚洲日报': '首尔', '奥斯汀纪事报': '奥斯汀', '奥斯汀城市化': '奥斯汀', '休斯顿星空网': '奥斯汀', '芝加哥城市化': '芝加哥', '芝加哥建筑新闻特刊': '芝加哥',
'芝加哥一手资讯': '芝加哥', '旧金山纪事报': '旧金山', '中国驻匈牙利大使馆': '布达佩斯', '中国驻德国大使馆': '柏林', '中国驻比利时大使馆': '布鲁塞尔',
'中国驻荷兰王国大使馆': '阿姆斯特丹', '丹麦投资促进局': '哥本哈根', '丹麦研究中心': '哥本哈根', '丹麦科技创业中心': '哥本哈根', '今日芬兰': '赫尔辛基',
'芬兰创新商业资讯': '赫尔辛基', '赫尔辛基Helsinki Times': '赫尔辛基', '赫尔辛基华人生活': '赫尔辛基', '首尔新闻': '首尔', 'NBC波士顿新闻': '波士顿',
'中国驻法国大使馆': '巴黎', '哥本哈根中国文化中心': '哥本哈根', '罗马建筑师协会新闻': '罗马', '今日罗马环境新闻': '罗马', '今日罗马经济新闻': '罗马',
'今日罗马政治新闻': '罗马', '24小时报罗马新闻': '罗马', 'KBH新闻': '哥本哈根', '数字赫尔辛基新闻': '赫尔辛基', '赫尔辛基本地新闻': '赫尔辛基',
'布鲁塞尔新闻': '布鲁塞尔', '布鲁塞尔大区城市新闻': '布鲁塞尔', '布鲁塞尔统计分析新闻': '布鲁塞尔', '休斯顿社区发展新闻': '休斯顿', '休斯顿日报-经济新闻': '休斯顿',
'今日悉尼': '悉尼', '悉尼印象': '悉尼', '商务投资布鲁塞尔': '布鲁塞尔'}
# 判断字符串是否包含中文
def contains_chinese(text):
pattern = re.compile(r'[\u4e00-\u9fa5]')
match = re.search(pattern, text)
return match is not None
class Handler():
def __init__(self):
self.clientLocal = pymongo.MongoClient('mongodb://localhost:27017/', username='admin', password='ZZsn@9988')
# mongodb Tag1
def getcityTag1(self, text):
url = "http://39.105.62.235:7006/get_city/"
payload = {'text': text}
try:
response = requests.request("POST", url, data=payload, timeout=300)
r = response.json()
except:
r = {}
try:
tag = r['resultData']['data']
except Exception as e:
tag = ''
return tag
# mongodb Tag2
def getKeywordsTags2(self, text):
url = "http://39.105.62.235:7006/get_phrase/"
payload = {'topK': '20', 'text': text, 'name': 'phrase'}
try:
response = requests.request("POST", url, data=payload)
r = response.json()
tag = r['resultData']['data']
except Exception as e:
tag = ''
return tag
# 推送到MySQL
def postData(self, data):
import requests
import json
url = "http://111.62.12.163:18481/cms/dataPlatform/news_info"
payload = json.dumps(data)
headers = {
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
r = response.json()
try:
return r['code']
except:
return r['status']
# 百度html翻译
def postTrans(self, html):
# url = "http://114.116.19.92:8088/api/translate/getTranslateInfoWithTagContent"
url = "http://114.116.116.241:8008/api/translate/getTranslateInfoWithTagContent"
payload = {'co': html}
files = []
headers = {}
response = requests.request("POST", url, headers=headers, data=payload, files=files, timeout=180)
r = response.text + '<p/><br>译文来源:微软自动翻译<br></p>'
return r
# 微软html翻译
def postTransMicrotrans(self, html):
url = "http://114.116.19.92:8088/api/translator/microsoftTranslatorByContent"
payload = {'co': html, 'from': '0dccf3f02b264edc9f3e2b2ac71c22d7', 'to': 'zh-CN'}
files = []
headers = {}
response = requests.request("POST", url, headers=headers, data=payload, files=files, timeout=600)
r = response.text + '<p/><br>译文来源:微软自动翻译<br></p>'
return r
if __name__ == '__main__':
Handler = Handler()
Translate = baidufanyi.Translate()
db_storage = pymongo.MongoClient('mongodb://114.115.221.202:27017/', username='admin', password='ZZsn@9988').中科软[
'数据源_0504']
r = redis.StrictRedis(host='114.115.221.202', port=6379, db=1, decode_responses=True, password='clbzzsn')
while True:
db_id = r.spop('db_sid1')
if db_id is None:
log.info('暂无翻译数据, 等待10分钟')
time.sleep(600)
continue
a_dict = db_storage.find_one({'_id': ObjectId(db_id)})
log.info(f'{db_id}==={datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}')
t1 = int(time.time())
try:
titleForeign = a_dict['titleForeign'].strip()
LANG = a_dict['LANG'].strip()
except:
continue
errorNum = a_dict['errorNum']
flgA = False
flgB = False
try:
columns = a_dict['columns'].strip()
except:
columns = a_dict['columns']
try:
lang = a_dict['LANG'].strip()
except:
lang = a_dict['LANG']
try:
title = Translate.gethtml(titleForeign)
title = pq(title).text().replace('译文来源:微软自动翻译', '').strip()
if title == titleForeign:
# 存在语言识别错误情况
# 判断翻以前题目中是否包含中文,如果包含,则认为语言识别错误,修改为中文
if contains_chinese(title):
try:
richTextForeign = a_dict['richTextForeign'].strip()
except:
richTextForeign = a_dict['richTextForeign']
content = pq(richTextForeign).text()
tag1 = ''
try:
tag2 = ';'.join(Handler.getKeywordsTags2(content))
except Exception as e:
continue
db_storage.update_one({'_id': a_dict['_id']}, {
'$set': {'title': title, 'LANG': 'zh-cn', 'richText': richTextForeign, 'content': content,
'titleForeign': '', 'richTextForeign': '', 'contentForeign': '', 'tag1': tag1,
'tag2': tag2, 'postCode': '1'}})
log.info(f'{title}===语种已修改')
continue
else:
errorNum += 1
log.error(f'{titleForeign}===标题翻译失败')
db_storage.update_one({'_id': a_dict['_id']}, {'$set': {'postCode': '2', 'errorNum': errorNum}})
continue
except Exception as e:
log.error(f'{e}')
continue
for filterWord in filterWords:
if filterWord in title:
flgA = True
try:
if judgmentWords[columns] not in title:
flgB = True
except:
pass
if flgA or flgB:
flg_word = '                                                                                                                                                                                                                                                                                                                                                                                                                '
db_storage.update_one({'_id': a_dict['_id']}, {
'$set': {'title': title, 'richText': flg_word, 'content': flg_word, 'tag1': flg_word, 'tag2': flg_word,
'postCode': '10'}})
log.info(f'{title}===包含过滤词或缺少城市信息')
continue
log.info(title)
try:
richTextForeign = a_dict['richTextForeign'].strip()
except:
richTextForeign = a_dict['richTextForeign']
log.info('百度翻译')
try:
# 百度翻译
richText = Translate.gethtml(richTextForeign)
except:
log.error('正文翻译失败')
continue
# else:
# log.info('微软翻译')
content = pq(richText).text()
try:
tag1 = Handler.getcityTag1(content)
except:
tag1 = ''
tag1 = ';'.join(tag1)
try:
tag2 = Handler.getKeywordsTags2(content)
tag2 = ';'.join(tag2)
except:
continue
db_storage.update_one({'_id': a_dict['_id']}, {
'$set': {'title': title, 'richText': richText, 'content': content, 'tag1': tag1, 'tag2': tag2,
'postCode': '1'}})
t2 = int(time.time())
log.info(f'{db_id}翻译用时:{int(t2 - t1)}')
Translate.close()
#百度翻译 不登录翻译1000字 登录翻译5000字 ++ /dev/null
#百度翻译 不登录翻译1000字 登录翻译5000字
import re
import string
import time
import pymongo
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.chrome.service import Service
from base.BaseCore import BaseCore
baseCore = BaseCore()
class Translate():
def __init__(self):
""""
initialize the class, and include the fundamental attributes
"""
# self._lang_list = ['zh', 'en', 'kor', 'fra', 'jp', 'el', 'ru']
# self._lang_list_original = ["中文", "英语", "韩语", "法语", "日语", "希腊语", "俄语"]
# self._num = len(self._lang_list)
self.url = "https://fanyi.baidu.com/#{}/{}/{}"
self.header = {
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/14.0.835.163 Safari/535.1"}
self.db_storage = pymongo.MongoClient('mongodb://114.115.221.202:27017', username='admin', password='ZZsn@9988').中科软['数据源_0106']
def createDriver(self):
chrome_driver = r'D:\cmd100\chromedriver.exe'
path = Service(chrome_driver)
chrome_options = webdriver.ChromeOptions()
chrome_options.binary_location = r'D:\Google\Chrome\Application\chrome.exe'
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--ignore-certificate-errors')
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
chrome_options.add_argument("--start-maximized")
proxy = baseCore.get_proxy()
chrome_options.add_argument('--proxy-server=' + proxy['http'].split('://')[1])
chrome_options.add_argument(
'user-agent=' + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36')
# chrome_options.add_argument('--headless')
browser = webdriver.Chrome(service=path, chrome_options=chrome_options)
return browser
def translate(self, sentence, browser, lang):
sentence_ = sentence
# browser = self.createDriver()
wait = WebDriverWait(browser, 20)
try:
word_type = self.get_input_language_type(sentence_, browser, wait)
except:
browser.quit()
browser = self.createDriver()
result, browser = self.translate(sentence_, browser, lang)
return result, browser
if word_type:
if word_type == lang:
pass
else:
word_type = lang
url = self.url.format(word_type, 'zh', sentence_)
browser.set_page_load_timeout(10)
try:
browser.get(url)
wait.until(EC.presence_of_element_located(
(By.XPATH, '//*[@id="main-outer"]/div/div/div[1]/div[2]/div[1]/div[2]/div/div/div[1]/p[2]')))
result_ = browser.find_element(By.XPATH,
'//*[@id="main-outer"]/div/div/div[1]/div[2]/div[1]/div[2]/div/div/div[1]/p[2]')
result = result_.text.strip()
print(f'翻译后的句子:{result}')
return result, browser
except:
browser.quit()
print(f'翻译失败,重新翻译。当前句子为{sentence_}')
browser = self.createDriver()
result, browser = self.translate(sentence_, browser, lang)
return result, browser
def get_input_language_type(self, word, browser, wait):
browser.get("https://fanyi.baidu.com/")
wait.until(EC.presence_of_element_located((By.ID, "baidu_translate_input")))
input_word = browser.find_element(By.ID, "baidu_translate_input")
input_word.send_keys(word)
wait.until(EC.presence_of_element_located(
(By.XPATH, '//*[@id="main-outer"]/div/div/div[1]/div[1]/div[1]/a[1]/span/span')))
word_type = browser.find_element(By.XPATH,
'//*[@id="main-outer"]/div/div/div[1]/div[1]/div[1]/a[1]/span/span')
word_type = word_type.get_attribute("data-lang")
return word_type
def is_punctuation(self, char):
punctuation = string.punctuation + '、' + '(' + '…' + ')' + '《' + '》' + '“' + '”' + ':' + ';' + '!' + ' ' + '。'
return char in punctuation
def sentence_split_sentence(self, contentWithTag):
pattern = re.compile(r'[^\n]+(?=\n)|[^\n]+$')
match_group = pattern.finditer(contentWithTag)
sentences = []
if match_group:
for _ in match_group:
start_end_index = _.span()
sentences.append((start_end_index[0], start_end_index[1], _.group()))
if (not sentences) and (len(contentWithTag) >= 4):
sentences.append((0, len(contentWithTag), contentWithTag))
return sentences
def jionstr(self, html):
paragraphs = []
current_sentence = ''
for tag in html.find_all(text=True):
sentence = str(tag)
if sentence == '\n' or sentence == '\t' or sentence == ' ':
continue
if self.is_punctuation(sentence):
continue
# 检查拼接后的句子长度是否超过1000字
if len(current_sentence) + len(sentence) <= 1000:
current_sentence += sentence
else:
paragraphs.append(current_sentence.strip())
current_sentence = sentence
return paragraphs
def gethtml(self):
# data = self.db_storage.find_one({'titleForeign':{'$ne':''}})
try:
browser = self.createDriver()
except:
browser = self.createDriver()
datas = self.db_storage.find({'postCode': '2', 'newsTime': {'$gte': '2024-01-01', '$lt': '2024-01-02'}}).limit(10)
for data in datas:
contentWithTag = data['richTextForeign']
# 根据分段符\n拆分,拿取纯文本,翻译
# # 拆分成段
# # pattern1 = re.compile(r'[^\n]+(?=\n)|[^\n]+$')
# sentence_list = self.sentence_split_sentence(contentWithTag)
# print(sentence_list)
# # 每段拆分成标签
# result_list = []
# # for sentence_tag in tqdm(sentence_list):
# sentence_xml = BeautifulSoup(sentence_tag[2], 'lxml')
# for tag in sentence_xml.find_all(text=True):
# sentence =
# if len(sentence.strip()) == 0:
# # # print(f'aa当前内容为:{sentence}')
# result = sentence.strip()
# sentence_xml.text.replace(sentence, result)
# result_list.append({
# "start_index": sentence_tag[0],
# "sentence": result,
# "sentence_xml": sentence_xml
# })
# elif self.is_punctuation(sentence.strip()) or len(sentence.strip()) == 1:
# # # print(f'bb当前内容为:{sentence}')
# result_list.append({
# "start_index": sentence_tag[0],
# "sentence": sentence,
# "sentence_xml": sentence_xml
# })
# else:
# # 翻译文本
# result = self.translate(sentence)
# new_xml = sentence_tag[2].replace(sentence, result)
#
# result_list.append({
# "start_index": sentence_tag[0],
# # "sentence": sentence + "\n",
# "sentence": result,
# "sentence_xml": new_xml
# })
# # todo: 对内容进行排序,保证顺序对
# sorted_context_list = sorted(result_list, key=lambda x: x["start_index"])
# final_list = [item["sentence_xml"] for item in sorted_context_list]
#
# return f'\n'.join(final_list)
# paragraphs = self.jionstr(contentWithTag)
html = BeautifulSoup(contentWithTag, 'html.parser')
content = html.text
lang = baseCore.detect_language(content)
if lang == 'zh':
return contentWithTag
for tag in html.find_all(text=True):
sentence = str(tag)
# sentence = " 実際に働き手の数が8がけ(8割)になる16年後、介護のようなケアサービスを今のような形で受けることは困難になると予測される。"
if sentence == '\n' or sentence == '\t' or sentence == ' ':
continue
if self.is_punctuation(sentence):
continue
# if len(sentence) > 1000:
if len(sentence) > 50:
print(len(sentence))
# index_1000 = sentence[999]
index_1000 = sentence[49]
# 判断该字符是不是逗号或句号
if index_1000 == '.' or index_1000 == '。' or index_1000 == ',' or index_1000 == ',':
# 如果是标点符号
# print(f'当前的段1:{sentence[:1000]}')
print(f'当前的段1:{sentence[:50]}')
# result1, browser = self.translate(sentence[:1000].strip(), browser, lang)
result1, browser = self.translate(sentence[:50].strip(), browser, lang)
# print(f'当前的段2:{sentence[1000:]}')
print(f'当前的段2:{sentence[50:]}')
# result2, browser = self.translate(sentence[1000:].strip(), browser, lang)
result2, browser = self.translate(sentence[50:].strip(), browser, lang)
tag.replace_with(result1+result2)
else:
# 如果不是标点符号
# i = 1000
i = 50
while i >= 0:
j = i-1
if j <= 0:
break
index_punctuation = sentence[j]
if index_punctuation == '.' or index_punctuation == '。' or index_punctuation == ',' or index_punctuation == ',':
print(f'当前的段3:{sentence[:j+1]}')
result1, browser = self.translate(sentence[:j+1].strip(), browser, lang)
print(f'当前的段4:{sentence[j+1:]}')
result2, browser = self.translate(sentence[j+1:].strip(), browser, lang)
tag.replace_with(result1+result2)
break
else:
i = j
continue
if i == 1:
print(f'当前的段5:{sentence}')
# result, browser = self.translate(sentence[:1000].strip(), browser, lang)
result, browser = self.translate(sentence[:50].strip(), browser, lang)
tag.replace_with(result)
continue
else:
# 翻译
print(f'当前的段6:{sentence}')
result, browser = self.translate(sentence, browser, lang)
# 替换
tag.replace_with(result)
time.sleep(2)
print(html.prettify())
# return html.prettify()
if __name__ == "__main__":
test = Translate()
# test.translate()
# print(test.gethtml())
test.gethtml()
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论