提交 2c5329d9 作者: 薛凌堃

Merge remote-tracking branch 'origin/master'

# 国研网 国外政策法规
import datetime
import json
import os
import time
import uuid
import pymongo
import pymysql
import requests
from kafka import KafkaProducer
from base import BaseCore
from requests.packages import urllib3
from retry import retry
from selenium.webdriver.common.by import By
from selenium.webdriver.edge.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium import webdriver
from obs import ObsClient
urllib3.disable_warnings()
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
db_storage = pymongo.MongoClient('mongodb://114.115.221.202:27017', username='admin', password='ZZsn@9988').ZZSN[
'国务院_国资委_copy1']
cnx_ = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='clb_project',
charset='utf8mb4')
cursor_ = cnx_.cursor()
obsClient = ObsClient(
access_key_id='VEHN7D0TJ9316H8AHCAV', # 你的华为云的ak码
secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk
server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址
)
headers = {
'Accept': 'application/json, text/plain, */*',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh-TW;q=0.9,zh;q=0.8',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
# Cookie: ASP.NET_SessionId=3qlxjr4vlarmswlqxdbi3gpj; Bf3%2bnvGZzxV%2bBqdrme8NTA%3d%3d=qXb%2ft%2bC55Mvy4%2bEud0fWBw%3d%3d; myfinger=a72c512593731801a0e41384bf13c95f; 8b0806eb2bb3468084a0c7707e7804d5=be41fd851789e9068499c450a33f3201092821702a
'Host': 'ydyl.drcnet.com.cn',
'Pragma': 'no-cache',
'Referer': 'https://ydyl.drcnet.com.cn/',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36 Edg/121.0.0.0',
'sec-ch-ua': '"Not A(Brand";v="99", "Microsoft Edge";v="121", "Chromium";v="121"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
}
def create_driver():
path = r'F:\spider\117\msedgedriver.exe'
edge_options = Options()
download_path = r"F:\spider\gzw"
edge_options.add_experimental_option("prefs", {
"download.default_directory": download_path,
"download.prompt_for_download": False,
"download.directory_upgrade": True,
"safebrowsing.enabled": True
})
# 开启开发者模式
edge_options.add_experimental_option('excludeSwitches', ['enable-automation'])
# 禁用启用Blink运行时的功能
edge_options.add_argument('--disable-blink-features=AutomationControlled')
driver = webdriver.Edge(executable_path=path, options=edge_options)
return driver
def getuuid():
get_timestamp_uuid = uuid.uuid1() # 根据 时间戳生成 uuid , 保证全球唯一
return get_timestamp_uuid
def secrchATT(item_id, retData, type_id, order_by):
sel_sql = '''select id from clb_sys_attachment where item_id = %s and path = %s and type_id=%s and order_by=%s '''
cursor_.execute(sel_sql, (item_id, retData['path'], type_id, order_by))
selects = cursor_.fetchone()
return selects
# 插入到att表 返回附件id
def tableUpdate(retData, com_name, file_name, num, publishDate):
item_id = retData['item_id']
type_id = retData['type_id']
group_name = retData['group_name']
path = retData['path']
full_path = retData['full_path']
category = retData['category']
file_size = retData['file_size']
status = retData['status']
create_by = retData['create_by']
page_size = retData['page_size']
create_time = retData['create_time']
order_by = num
object_key = full_path.split('https://zzsn.obs.cn-north-1.myhuaweicloud.com/')[1]
Upsql = '''insert into clb_sys_attachment(name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,object_key,bucket_name,publish_time) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
values = (
file_name, type_id, item_id, group_name, path, full_path, category, file_size, order_by,
status, create_by,
create_time, object_key, 'zzsn', publishDate)
cursor_.execute(Upsql, values) # 插入
cnx_.commit() # 提交
log.info("更新完成:{}".format(Upsql))
selects = secrchATT(item_id, retData, type_id, order_by)
id = selects[0]
return id, object_key
# 获取文件大小
def convert_size(size_bytes):
# 定义不同单位的转换值
units = ['bytes', 'KB', 'MB', 'GB', 'TB']
i = 0
while size_bytes >= 1024 and i < len(units) - 1:
size_bytes /= 1024
i += 1
return f"{size_bytes:.2f} {units[i]}"
#上传obs
@retry(tries=2,delay=5)
def uptoOBS(category,content):
name = str(getuuid()) + category
result = obsClient.putContent('zzsn', 'PolicyDocuments/' + name, content=content)
return result
def uploadAttachment(content,category,href):
retData = {'state': False, 'type_id': 7, 'item_id': '', 'group_name': '', 'path': '',
'full_path': '',
'category': category, 'file_size': '', 'status': 1, 'create_by': 'XueLingKun',
'create_time': '', 'page_size': '', 'content': ''}
try:
result = uptoOBS(category,content)
except Exception as e:
log.error(f'{href}===obs上传失败==={e}')
return retData
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
retData['state'] = True
retData['path'] = result['body']['objectUrl'].split('.com')[1]
retData['full_path'] = result['body']['objectUrl']
file_size = len(content)
try:
retData['file_size'] = convert_size(file_size)
except:
retData['file_size'] = ''
retData['create_time'] = time_now
log.info(f'{href}===obs上传成功')
return retData
@retry(tries=3, delay=8)
def getCountry():
ip = baseCore.get_proxy()
countryList = []
url = 'https://ydyl.drcnet.com.cn/ydylapi/documentController/getStateList'
req = requests.get(url, headers=headers, proxies=ip, verify=False)
req.encoding = req.apparent_encoding
datasJson = req.json()['data']
for dataJson in datasJson:
datas = dataJson['states']
for data in datas:
countryList.append(data)
return countryList
# countryList = getCountry()
# for country in countryList:
@retry(tries=3, delay=8)
def getTotalPage(country):
# ip = baseCore.get_proxy()
url = f'https://ydyl.drcnet.com.cn/ydylapi/documentController/getPageChannelDocumentsByUId?uid=8007&attr={country}&curPage=1&pageSize=5'
# req = requests.get(url,headers=headers,proxies=ip)
req = requests.get(url, headers=headers)
req.encoding = req.apparent_encoding
total = req.json()['data']['totlalCount']
if int(total) % 5 == 0:
page = int(int(total) / 5)
else:
page = int(int(total) / 5 + 1)
return page
def getDatas(country, page):
# ip = baseCore.get_proxy()
url = f'https://ydyl.drcnet.com.cn/ydylapi/documentController/getPageChannelDocumentsByUId?uid=8007&attr={country}&curPage={page}&pageSize=5'
# req = requests.get(url,headers=headers,proxies=ip)
req = requests.get(url, headers=headers)
req.encoding = req.apparent_encoding
dataList = req.json()['data']['data']
return dataList
def getDic(driver):
contenWithTag = driver.find_element(By.ID, 'docContent').get_attribute('outerHTML')
content = driver.find_element(By.ID, 'docContent').text.strip()
return contenWithTag, content
@retry(tries=2, delay=5)
def sendKafka(dic):
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], max_request_size=1024 * 1024 * 20)
kafka_result = producer.send("research_center_fourth",
json.dumps(dic, ensure_ascii=False).encode('utf8'))
log.info(f'{dic["sourceAddress"]}传输成功')
def save_data(dic_news):
aaa_dic = {
'附件id': dic_news['attachmentIds'],
'网址': dic_news['sourceAddress'],
'tid': '',
'来源': f"国研网-{dic_news['labels'][0]['relationName']}",
'创建时间': dic_news['createDate'],
'带标签内容': dic_news['contentWithTag'][:100],
'发布时间': dic_news['publishDate'],
'标题': dic_news['title']
}
db_storage.insert_one(aaa_dic)
def doJob():
driver = create_driver()
counrtyList = getCountry()
time.sleep(2)
for country in counrtyList:
num = 0
log.info(f'{country}===开始采集')
pageFlg = True
totalPage = getTotalPage(country)
time.sleep(1)
for page in range(1, totalPage + 1):
if pageFlg:
dataList = getDatas(country, page)
for data in dataList:
attachmentIds = []
title = data['title']
author = data['author']
source = data['source']
summary = data['summary']
publishdate = data['deliveddate']
if publishdate < '2023-05-25':
pageFlg = False
break
chnid = data['chnid']
leafid = data['leafid']
code = data['code'].split('_')[1]
href = f'https://d.drcnet.com.cn/eDRCnet.common.web/DocDetail.aspx?chnid={chnid}&leafid={leafid}&docid={code}&uid=8007&version=YDYL'
is_href = db_storage.find_one({'网址': href})
if is_href:
continue
driver.get(href)
# js = "return action=document.body.scrollHeight"
# new_height = driver.execute_script(js)
# for i in range(0, new_height, 300):
# driver.execute_script(js)
# driver.execute_script('window.scrollTo(0, %s)' % (i))
# time.sleep(1)
try:
WebDriverWait(driver, 5).until(
EC.presence_of_element_located((By.ID, 'AttachmentDownload'))
)
driver.find_element(By.ID, 'AttachmentDownload').click()
time.sleep(2)
upFlg = True
while True:
fileNames = os.listdir(r"F:\spider\gzw")
if len(fileNames) == 0:
time.sleep(2)
else:
downloadFlg = fileNames[0].split('.')[-1]
if 'tmp' in downloadFlg or 'crdownload' in downloadFlg:
time.sleep(2)
else:
break
for fileName in fileNames:
path = rf"F:\spider\gzw\{fileName}"
category = '.' + fileName.split('.')[-1]
with open(path,'rb') as f:
content = f.read()
retData = uploadAttachment(content,category,href)
if retData['state']:
pass
else:
upFlg = False
break
try:
att_id, full_path = tableUpdate(retData, f'国研网-{country}', fileName, 1, publishdate)
except:
log.error(f'{href}===附件表插入失败')
upFlg = False
break
attachmentIds.append(att_id)
os.remove(path)
if not upFlg:
time.sleep(5)
continue
except:
pass
contentWithtag, content = getDic(driver)
lang = baseCore.detect_language(content)
now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
dic = {
'id': f'1566349618153701378{int(time.time())}',
'subjectId': '1566349618153701378',
'checkStatus': 1,
'deleteFlag': 0,
'topNum': 0,
'content': content,
'contentWithTag': contentWithtag,
'createDate': now,
'labels': [{'labelMark': 'country', 'relationName': country}],
'lang': lang,
'origin': source,
'publishDate': publishdate,
'sourceAddress': href,
'title': title,
'summary': summary,
'updateDate': now,
'author': author,
'attachmentIds':attachmentIds
}
try:
sendKafka(dic)
try:
save_data(dic)
except:
log.error(f'{href}===数据库保存失败')
except:
log.error(f'{href}===传输失败')
num += 1
time.sleep(5)
pageFlg = False
else:
break
log.info(f'{country}===共采集{num}条')
try:
driver.close()
except:
pass
if __name__ == '__main__':
doJob()
baseCore.close()
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论