提交 3ad4e1eb 作者: 薛凌堃

11/10

上级 ce4c997a
# 核心工具包
# 核心工具包
......@@ -524,7 +524,7 @@ class BaseCore:
if category in file_name:
pass
else:
file_name = file_name + '.' + category
file_name = file_name + category
result = obsClient.putContent('zzsn', 'PolicyDocuments/' + file_name, content=response.content)
break
except:
......
import os
import os
import os
import re
import fitz
import openpyxl
import pandas as pd
import requests
from bs4 import BeautifulSoup
from datetime import datetime
import time
from openpyxl import load_workbook
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
......@@ -89,18 +94,27 @@ class Policy():
return att_id,full_path
def downloadfile(self,file_name,file_href,path):
def downloadfile(self,file_href,path):
response = requests.get(file_href)
with open(path,"wb") as file:
file.write(response.content)
pass
def createfile(self,file_path):
file_exist = baseCore.check_excel_file(file_path)
if file_exist:
wb = load_workbook(file_path)
return wb
else:
wb = openpyxl.Workbook()
wb.save(file_path)
log.info("Excel文件已创建")
return wb
policy = Policy()
#国家发展和改革委员会 https://www.ndrc.gov.cn/xxgk/wjk/index.html?tab=all&qt=
def reform():
def reform(wb,file_path):
headers = {
'Accept': 'application/json, text/javascript, */*; q=0.01',
'Accept-Encoding': 'gzip, deflate, br',
......@@ -117,95 +131,148 @@ def reform():
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"'
}
url = 'https://fwfx.ndrc.gov.cn/api/query?qt=REITs&tab=all&page=1&pageSize=20&siteCode=bm04000fgk&key=CAB549A94CF659904A7D6B0E8FC8A7E9&startDateStr=&endDateStr=&timeOption=0&sort=dateDesc'
result = policy.getrequest_json(headers, url)
data_list = result['data']['resultList']
DataList = []
num = 0
for info in data_list:
num += 1
# info = data_list[1]
publishDate = info['docDate']
title = info['title']
summary = info['summary']
newsUrl = info['url']
header = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Cache-Control': 'max-age=0',
'Connection': 'keep-alive',
'Cookie': 'Hm_lvt_6c8165462fd93121348afe212168341f=1699338341; yfx_c_g_u_id_10005970=_ck23110714254113251712738304141; http_waf_cookie=05e8486c-c47f-4927291823a10f5e24ceed45b1eaa3eb7354; SF_cookie_3=21321202; Hm_lpvt_6c8165462fd93121348afe212168341f=1699422316; yfx_f_l_v_t_10005970=f_t_1699338341317__r_t_1699412780356__v_t_1699422316031__r_c_1',
'Host': 'www.ndrc.gov.cn',
'Referer': 'https://www.ndrc.gov.cn/xxgk/wjk/index.html?tab=all&qt=',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'same-origin',
'Sec-Fetch-User': '?1',
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36',
'sec-ch-ua': '"Google Chrome";v="119", "Chromium";v="119", "Not?A_Brand";v="24"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"'
}
newssoup = policy.getrequest_soup(header, newsUrl)
# print(newssoup)
try:
pubHao = ''
source = ''
path = 'data/国家改革发展委员会'
if not os.path.exists(path):
os.makedirs(path)
for page in range(1,3):
url = f'https://fwfx.ndrc.gov.cn/api/query?qt=REITs&tab=all&page={page}&pageSize=20&siteCode=bm04000fgk&key=CAB549A94CF659904A7D6B0E8FC8A7E9&startDateStr=&endDateStr=&timeOption=0&sort=dateDesc'
result = policy.getrequest_json(headers, url)
data_list = result['data']['resultList']
for info in data_list:
num += 1
# info = data_list[1]
publishDate_ = info['docDate']
title = info['title']
summary = info['summary'].replace('<em>','').replace('</em>','')
newsUrl = info['url']
header = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Cache-Control': 'max-age=0',
'Connection': 'keep-alive',
'Cookie': 'Hm_lvt_6c8165462fd93121348afe212168341f=1699338341; yfx_c_g_u_id_10005970=_ck23110714254113251712738304141; http_waf_cookie=05e8486c-c47f-4927291823a10f5e24ceed45b1eaa3eb7354; SF_cookie_3=21321202; Hm_lpvt_6c8165462fd93121348afe212168341f=1699422316; yfx_f_l_v_t_10005970=f_t_1699338341317__r_t_1699412780356__v_t_1699422316031__r_c_1',
'Host': 'www.ndrc.gov.cn',
'Referer': 'https://www.ndrc.gov.cn/xxgk/wjk/index.html?tab=all&qt=',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'same-origin',
'Sec-Fetch-User': '?1',
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36',
'sec-ch-ua': '"Google Chrome";v="119", "Chromium";v="119", "Not?A_Brand";v="24"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"'
}
newssoup = policy.getrequest_soup(header, newsUrl)
# print(newssoup)
try:
# article_con article_con_title
contentWithTag = newssoup.select('div[class="article_con article_con_notitle"]')[0]
except:
pubHao = ''
source = ''
try:
contentWithTag = newssoup.select('div[class="article_con article_con_title"]')[0]
# article_con article_con_title
contentWithTag = newssoup.select('div[class="article_con article_con_notitle"]')[0]
except:
continue
try:
pubHao_ = newssoup.select('div[class="article_con article_con_notitle"]>span')[0].text
if '〔' in pubHao_:
pubHao = pubHao_
except:
pass
policy.deletep(contentWithTag, 3, 'div', 'style', 'text-align: center;')
policy.deletek(contentWithTag)
content = contentWithTag.text
try:
source = newssoup.select('div[class="ly laiyuantext"]>span')[0].text
except:
pass
dic_info = {
'序号':num,
'标题': title,
'时间': publishDate,
'来源': source,
'原文链接':newsUrl,
'发文字号': pubHao,
'摘要':summary,
'正文': content,
'附件名称':'',
'附件链接':'',
}
DataList.append(dic_info)
file_name = f'../data/REITs专题数据.xlsx'
sheet_name = "国家发展和改革委员会"
file_exist = baseCore.check_excel_file(file_name)
if file_exist:
pass
else:
wb = openpyxl.Workbook()
wb.save(file_name)
log.info("Excel文件已创建")
baseCore.writerToExcel(DataList, file_name, sheet_name)
try:
contentWithTag = newssoup.select('div[class="article_con article_con_title"]')[0]
except:
continue
policy.deletek(contentWithTag)
try:
for pubHao_ in newssoup.find_all('div', style='text-align: center;')[:10]:
# print(pubHao_)
if '〔' in pubHao_.text:
pubHao = pubHao_.text
print(pubHao)
else:
continue
except:
pass
try:
source = newssoup.select('div[class="ly laiyuantext"]>span')[0].text
except:
pass
except:
log.info(f"error!!!{newsUrl}")
log.info(f'=============处理结束,以采集{num}条数据=================')
policy.deletek(newssoup)
try:
publishDate = newssoup.find('div',class_="article_con article_con_notitle").find_all('span')[-1].text
except:
try:
publishDate = newssoup.find('div',class_="article_con article_con_title").find_all('span')[-1].text
except:
publishDate = ''
pattern = r"\d{4}年\d{1,2}月\d{1,2}日"
match = re.match(pattern, publishDate)
if match:
pass
else:
publishDate = ''
policy.deletep(contentWithTag, 3, 'div', 'style', 'text-align: center;')
policy.deletek(contentWithTag)
content = contentWithTag.text
try:
policy.paserUrl(newssoup,newsUrl)
att = newssoup.find('div', class_='attachment_r')
fu_jian_name = ''
fu_jian_href = ''
except:
fu_jian_name = ''
fu_jian_href = ''
att = ''
if att:
for a in att.find_all('a'):
file_name = a.text.replace('.', '、')
if '<' in file_name or '>' in file_name:
file_name = file_name.replace('<', '').replace('>', '')
file_href = a['href']
category = os.path.splitext(file_href)[1]
if category in file_name:
pass
else:
file_name = file_name + category
rename_file = f'{str(num)}_{publishDate}_{file_name}'
fu_jian_name += rename_file + '\n'
fu_jian_href += file_href + '\n'
policy.downloadfile(file_href, f'{path}/{rename_file}')
dic_info = {
'序号': num,
'标题': title,
'发布时间': publishDate_,
'来源': source,
'原文链接': newsUrl,
'发文时间': publishDate,
'发文机构': '',
'发文字号': pubHao,
'摘要': summary,
'正文': content,
'附件名称': fu_jian_name,
'附件链接': fu_jian_href,
}
DataList.append(dic_info)
sheet_name = "国家发展和改革委员会"
if sheet_name in wb.sheetnames:
log.info(f"{sheet_name}工作表已存在!")
else:
# 创建新工作表
wb.create_sheet(sheet_name)
print(f"{sheet_name}新工作表创建完成!")
# 保存Excel文件
wb.save(file_path)
baseCore.writerToExcel(DataList, file_path, sheet_name)
except Exception as e:
log.info(f"error!!!{newsUrl}")
log.info({e})
log.info(f'====第{page}页====处理结束,已采集{num}条数据=================')
#证券期货 https://neris.csrc.gov.cn/falvfagui/multipleFindController/indexJsp
def zhengquanqihuo():
def zhengquanqihuo(wb,file_path):
headers = {
'Accept': 'application/json, text/javascript, */*; q=0.01',
'Accept-Encoding': 'gzip, deflate, br',
......@@ -245,7 +312,12 @@ def zhengquanqihuo():
total = pageUtil['rowCount']
page_size = pageUtil['pageSize']
Max_page = int(total / page_size)
for page in range(0, Max_page):
DataList = []
num = 0
path = 'data/证监会'
if not os.path.exists(path):
os.makedirs(path)
for page in range(0, Max_page+1):
payload_page = {
'pageNo': page + 1,
'secFutrsLawName': '',
......@@ -262,32 +334,55 @@ def zhengquanqihuo():
data_page = policy.requestPost(headers, url, payload_page)
info_list = data_page['pageUtil']['pageList']
for info in info_list:
title = info['secFutrsLawName']
pubHao = info['fileno']
source = info['lawPubOrgName']
publish_ = datetime.strptime(info['secFutrsLawVersion'], "%Y%m%d")
publishDate = datetime.strftime(publish_, "%Y-%m-%d")
# print(publishDate)
secFutrsLawId = info['secFutrsLawId']
newsUrl = f'https://neris.csrc.gov.cn/falvfagui/rdqsHeader/mainbody?navbarId=3&secFutrsLawId={secFutrsLawId}&body=REITs'
browser = policy.createDriver()
browser.get(newsUrl)
time.sleep(1)
page_source = browser.page_source
newssoup = BeautifulSoup(page_source, 'html.parser')
# print(newssoup)
contentWithTag = newssoup.find('div', class_='law_text mainBody catalog')
content = contentWithTag.text
print(content)
dic_info = {
'title': title,
'publishDate': publishDate,
'source': source,
'pub_hao': pubHao,
'contentWithTag': contentWithTag,
'content': content
}
print(dic_info)
num += 1
try:
title = info['secFutrsLawName']
pubHao = info['fileno']
source = info['lawPubOrgName']
publish_ = datetime.strptime(info['secFutrsLawVersion'], "%Y%m%d")
publishDate = datetime.strftime(publish_, "%Y-%m-%d")
# print(publishDate)
secFutrsLawId = info['secFutrsLawId']
newsUrl = f'https://neris.csrc.gov.cn/falvfagui/rdqsHeader/mainbody?navbarId=3&secFutrsLawId={secFutrsLawId}&body=REITs'
browser = policy.createDriver()
browser.get(newsUrl)
time.sleep(1)
page_source = browser.page_source
newssoup = BeautifulSoup(page_source, 'html.parser')
# print(newssoup)
contentWithTag = newssoup.find('div', class_='law_text mainBody catalog')
content = contentWithTag.text.replace('显示注释', '')
# print(content)
dic_info = {
'序号': num,
'标题': title,
'发布时间': publishDate,
'来源': source,
'原文链接': newsUrl,
'发文时间': publishDate,
'发文机构': source,
'发文字号': pubHao,
'摘要': '',
'正文': content,
'附件名称': '',
'附件链接': '',
}
DataList.append(dic_info)
sheet_name = "证监会"
if sheet_name in wb.sheetnames:
log.info(f"{sheet_name}工作表已存在!")
else:
# 创建新工作表
wb.create_sheet(sheet_name)
print(f"{sheet_name}新工作表创建完成!")
# 保存Excel文件
wb.save(file_path)
baseCore.writerToExcel(DataList, file_path, sheet_name)
except Exception as e:
log.info(f"error!!!{num}")
log.info({e})
log.info(f'====第{page}页====处理结束,已采集{num}条数据=================')
#深圳交易所 http://www.szse.cn/lawrules/index.html
......@@ -307,15 +402,20 @@ def sse():
}
result = policy.getrequest_json(headers,url)
total_page = result['data']['totalPage']
DataList = []
num = 0
path = 'data/上海交易所'
if not os.path.exists(path):
os.makedirs(path)
for page in range(0, int(total_page)):
url_page = f'http://query.sse.com.cn/search/getESSearchDoc.do?page={page}&limit=10&publishTimeEnd=&publishTimeStart=&orderByDirection=DESC&orderByKey=score&searchMode=fuzzy&spaceId=3&keyword=REITs&siteName=sse&keywordPosition=title%2Cpaper_content&channelId=10001&channelCode=8640%2C8641%2C8642%2C8643%2C8644%2C8645%2C8646%2C8647%2C8648%2C8649%2C8650%2C8651%2C8652%2C8653%2C8654%2C8655%2C8656%2C8657%2C8658%2C8659%2C8660%2C8661%2C8685%2C9348%2C12632%2C12768%2C12769%2C12770%2C12771%2C12772%2C12773%2C12774%2C12775%2C12776%2C12777%2C12778%2C12779%2C12780%2C12781%2C12782%2C12783%2C12784%2C12785%2C12786%2C12787%2C12788%2C12789%2C12790%2C12791%2C12792%2C12793%2C12794%2C12795%2C12796%2C12797%2C12798%2C12799%2C12800%2C12801%2C12802%2C12803%2C12804%2C12805%2C12806%2C12807%2C12808%2C12809%2C12810%2C12811%2C12812%2C13061%2C13282%2C13283%2C13284%2C13285%2C13286%2C13287%2C13288%2C13289%2C13294%2C13364%2C13365%2C13366%2C13367%2C14595%2C14596%2C14597%2C14598%2C14599%2C14600%2C14601%2C14602%2C14603%2C14604%2C14605%2C14606&trackId=50619067167713018335655119683810&_=1699508921761'
data = policy.getrequest_json(headers, url_page)
newslist = data['data']['knowledgeList']
# print(newslist)
for news in newslist[:1]:
title = news['title']
num += 1
title = news['title'].replace("<em>",'').replace('</em>','')
publishDate = news['createTime']
newsUrl = 'http://www.sse.com.cn' + news['extend'][4]['value']
# print(newsUrl)
summary = news['rtfContent']
source = news['spaceName']
......@@ -331,44 +431,91 @@ def sse():
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36'
}
newssoup = policy.getrequest_soup(header, newsUrl)
# print(newssoup)
content_ = newssoup.find('div', class_='allZoom')
# print(content_)
# # 将链接替换为绝对路径
contentWithTag = policy.paserUrl(content_, newsUrl)
pubHao = contentWithTag.find('p',style='text-align: center;').text.strip(' ')
if '〔' in pubHao:
pass
newsUrl = 'http://www.sse.com.cn' + news['extend'][4]['value']
if '.pdf' in newsUrl:
fu_jian_name = ''
fu_jian_href = ''
content = ''
response = requests.get(newsUrl, timeout=20)
with fitz.open(stream=response.content, filetype='pdf') as doc:
for page in doc.pages():
content += page.get_text()
file_href = newsUrl
file_name = title
rename_file = f'{str(num)}_{publishDate}_{file_name}'
fu_jian_name += rename_file + '\n'
fu_jian_href += file_href + '\n'
policy.downloadfile(file_href, f'{path}/{rename_file}')
dic_info = {
'序号': num,
'标题': title,
'发布时间': publishDate,
'来源': source,
'原文链接': newsUrl,
'发文时间': '',
'发文机构': '',
'发文字号': pubHao,
'摘要': summary,
'正文': content,
'附件名称': fu_jian_name,
'附件链接': fu_jian_href,
}
DataList.append(dic_info)
else:
pubHao = ''
# print(contentWithTag)
content = contentWithTag.text
fujian_list = contentWithTag.find_all('a')
id_list = []
for fujian in fujian_list:
try:
num = 1
newssoup = policy.getrequest_soup(header, newsUrl)
# print(newssoup)
content_ = newssoup.find('div', class_='allZoom')
# print(content_)
# # 将链接替换为绝对路径
contentWithTag = policy.paserUrl(content_, newsUrl)
pubHao = contentWithTag.find('p',style='text-align: center;').text.strip(' ')
if '〔' in pubHao:
pass
else:
pubHao = ''
# print(contentWithTag)
content = contentWithTag.text
fujian_list = contentWithTag.find_all('a')
fu_jian_name = ''
fu_jian_href = ''
for fujian in fujian_list:
file_href = fujian['href']
file_name = fujian.text.strip(' ')
# 下载附件到本地,并上传文件服务器
att_id, full_path = policy.attuributefile(file_name,file_href,num,publishDate)
num += 1
if att_id and full_path:
id_list.append(att_id)
dic_info = {
'attachmentIds':id_list,
'title': title,
'summary':summary,
'publishDate': publishDate,
'source': source,
'pub_hao': pubHao,
'contentWithTag': contentWithTag,
'content': content
}
rename_file = f'{str(num)}_{publishDate}_{file_name}'
fu_jian_name += rename_file + '\n'
fu_jian_href += file_href + '\n'
policy.downloadfile(file_href, f'{path}/{rename_file}')
dic_info = {
'序号': num,
'标题': title,
'发布时间': publishDate,
'来源': source,
'原文链接': newsUrl,
'发文时间': '',
'发文机构': '',
'发文字号': pubHao,
'摘要': summary,
'正文': content,
'附件名称': fu_jian_name,
'附件链接': fu_jian_href,
}
DataList.append(dic_info)
sheet_name = "上海交易所"
if sheet_name in wb.sheetnames:
log.info(f"{sheet_name}工作表已存在!")
else:
# 创建新工作表
wb.create_sheet(sheet_name)
print(f"{sheet_name}新工作表创建完成!")
# 保存Excel文件
wb.save(file_path)
baseCore.writerToExcel(DataList, file_path, sheet_name)
except:
continue
#北京市人民政府 https://www.beijing.gov.cn/so/s?siteCode=1100000088&tab=zcfg&qt=REITs
......@@ -468,6 +615,8 @@ def beijing():
if __name__=="__main__":
reform()
file_path = f'data/REITs专题数据.xlsx'
wb = policy.createfile(file_path)
# reform(wb,file_path)
zhengquanqihuo(wb,file_path)
# zhengquanqihuo()
\ No newline at end of file
......@@ -564,13 +564,13 @@ if __name__ == "__main__":
# kegaishifan()
# shuangbaiqiye()
# zhuangjingtexind()
NoticeEnterprise()
# NoticeEnterprise()
# AnnualEnterpriseIPO()
# AnnualEnterprise()
# BaseInfoEnterprise()
# BaseInfoEnterpriseAbroad()
# NewsEnterprise_task()
# NewsEnterprise()
NewsEnterprise()
# CorPerson()
# china100()
# global100()
......@@ -585,8 +585,8 @@ if __name__ == "__main__":
# dujioashou()
# omeng()
# AnnualEnterpriseUS()
NoticeEnterprise_task()
AnnualEnterprise_task()
# NoticeEnterprise_task()
# AnnualEnterprise_task()
# FinanceFromEast()
log.info(f'====={basecore.getNowTime(1)}=====添加数据成功======耗时:{basecore.getTimeCost(start,time.time())}===')
"""模拟扫码登录"""
import time
import requests
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from base.BaseCore import BaseCore
baseCore = BaseCore()
log = baseCore.getLogger()
def createDriver():
chrome_driver = r'D:\cmd100\chromedriver.exe'
path = Service(chrome_driver)
chrome_options = webdriver.ChromeOptions()
chrome_options.binary_location = r'D:\Google\Chrome\Application\chrome.exe'
# 设置代理
# proxy = "127.0.0.1:8080" # 代理地址和端口
# chrome_options.add_argument('--proxy-server=http://' + proxy)
driver = webdriver.Chrome(service=path, chrome_options=chrome_options)
return driver
def flushAndGetToken():
log.info('======刷新浏览器=====')
browser.refresh()
cookie_list = browser.get_cookies()
cookies = {}
for cookie in cookie_list:
cookies[cookie['name']] = cookie['value']
print(cookies)
def getrequest_soup(headers,url):
req = requests.get(headers=headers,url=url)
result = BeautifulSoup(req.content,'html.parser')
return result
def dojob():
headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Connection': 'keep-alive',
'Cookie': 'qcc_did=046d99c9-566e-4046-9094-689901b79748; UM_distinctid=18aac5b8c21810-046f8431aecf58-26031f51-1fa400-18aac5b8c22efd; CNZZDATA1254842228=109635008-1695108795-https%253A%252F%252Fwww.qcc.com%252F%7C1695113473; _uab_collina=169935323766710839405007; QCCSESSID=1d489139eea4830a062c3a1240; acw_tc=db9062ad16994955552435350e3b43e7e5cee64c77d9f807936897ab1f',
'Host': 'www.qcc.com',
'Referer': 'https://www.qcc.com/',
'Sec-Ch-Ua': '"Google Chrome";v="119", "Chromium";v="119", "Not?A_Brand";v="24"',
'Sec-Ch-Ua-Mobile': '?0',
'Sec-Ch-Ua-Platform': '"Windows"',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'same-origin',
'Sec-Fetch-User': '?1',
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36'
}
url = 'https://www.qcc.com/web/search?key=%E5%B0%8F%E7%B1%B3%E9%80%9A%E8%AE%AF%E6%8A%80%E6%9C%AF%E6%9C%89%E9%99%90%E5%85%AC%E5%8F%B8'
soup = getrequest_soup(headers,url)
pass
if __name__ == "__main__":
urlqcc = 'https://www.qcc.com/'
browser = createDriver()
browser.get(urlqcc)
wait = WebDriverWait(browser, 10)
wait.until(EC.presence_of_element_located((By.CLASS_NAME, "nav-item")))
# page_source = browser.page_source
# soup = BeautifulSoup(page_source,'html.parser')
# print(soup)
browser.find_element(By.CLASS_NAME, 'nav-item').click()
time.sleep(20)
flushAndGetToken()
#企业动态 从redis中获取数据
import json
import os
import random
import subprocess
import requests, time, pymysql
import jieba
......@@ -50,7 +52,7 @@ headers = {
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
taskType = '企业动态/天眼查/补采20W+'
taskType = '企业动态/天眼查/补采专精特新'
def reqDetailmsg(url,headers):
......@@ -76,7 +78,7 @@ def beinWork(tyc_code, social_code,start_time):
t = time.time()
url = f'https://capi.tianyancha.com/cloud-yq-news/company/detail/publicmsg/news/webSimple?_={t}&id={tyc_code}&ps={pageSize}&pn=1&emotion=-100&event=-100'
try:
for m in range(0, 3):
for m in range(0,3):
ip = baseCore.get_proxy()
headers['User-Agent'] = baseCore.getRandomUserAgent()
response = requests.get(url=url, headers=headers, proxies=ip, verify=False)
......@@ -85,11 +87,18 @@ def beinWork(tyc_code, social_code,start_time):
if (response.status_code == 200):
pass
except Exception as e:
#todo:重新放入redis中
baseCore.rePutIntoR('NoticeEnterprise:gnqy_socialCode',social_code)
log.error(f"{tyc_code}-----获取总数接口失败")
error = '获取总数接口失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{error}----{e}')
#获取当前进程pid
current_pid = baseCore.getPID()
#todo: 重新启动新进程,杀死当前进程
subprocess.Popen([sys.executable] + sys.argv)
os.kill(current_pid,9)
return retData
try:
json_1 = json.loads(response.content.decode('utf-8'))
......@@ -126,7 +135,7 @@ def beinWork(tyc_code, social_code,start_time):
ip = baseCore.get_proxy()
headers['User-Agent'] = baseCore.getRandomUserAgent()
response_page = requests.get(url=url_page, headers=headers, proxies=ip, verify=False)
time.sleep(1)
# time.sleep(3)
break
except:
pass
......@@ -172,43 +181,25 @@ def beinWork(tyc_code, social_code,start_time):
retData['up_okCount'] = up_okCount
retData['up_errorCount'] = up_errorCount
retData['up_repetCount'] = up_repetCount
#return retData
continue
return retData
try:
time_struct = time.localtime(int(info_page['rtm'] / 1000)) # 首先把时间戳转换为结构化时间
time_format = time.strftime("%Y-%m-%d %H:%M:%S", time_struct) # 把结构化时间转换为格式化时间
except:
time_format = baseCore.getNowTime(1)
#记录时间 对比时间
#if time_format > '2023-09-25' and time_format < '2023-10-01':
#pass
#else:
#continue
try:
# 开始进行智能解析
# lang = baseCore.detect_language(title)
# smart = smart_extractor.SmartExtractor(lang)
# req = requests.get(url=link,headers=headers,timeout=10)
# html = BeautifulSoup(req.content,'html.parser')
raw_html = reqDetailmsg(link,headers)
if raw_html:
# soup = BeautifulSoup(raw_html, 'html.parser')
try:
article = smart.extract_by_html(raw_html)
content = article.cleaned_text
contentText = article.text
except Exception as e:
log.info(f'抽取失败!!{e}')
# #带标签正文
# contentText = smart.extract_by_url(link).text
# #不带标签正文
# content = smart.extract_by_url(link).cleaned_text
# # time.sleep(3)
#带标签正文
contentText = smart.extract_by_url(link).text
#不带标签正文
content = smart.extract_by_url(link).cleaned_text
if len(content) < 300:
continue
# time.sleep(3)
except Exception as e:
contentText = ''
if contentText == '':
log.error(f'获取正文失败:--------{tyc_code}--------{num}--------{link}')
e = '获取正文失败'
......@@ -253,7 +244,7 @@ def beinWork(tyc_code, social_code,start_time):
'lang': 'zh',
'origin': source,
'publishDate': time_format,
#'sid': '1684032033495392257',
# 'sid': '1684032033495392257',
'sid': '1714852232679067649',
'sourceAddress': link, # 原文链接
'summary': info_page['abstracts'],
......@@ -286,7 +277,7 @@ def beinWork(tyc_code, social_code,start_time):
# 传输成功,写入日志中
state = 1
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, link, '成功')
baseCore.recordLog(social_code, taskType, state, takeTime, link, '')
# return True
except Exception as e:
dic_result = {
......@@ -312,7 +303,7 @@ def doJob():
while True:
# 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息
social_code = baseCore.redicPullData('NewsEnterprise:gnqybc_socialCode')
# social_code = '913205007764477744'
# social_code = '912301001275921118'
# 判断 如果Redis中已经没有数据,则等待
if social_code == None:
time.sleep(20)
......@@ -376,16 +367,12 @@ def doJob():
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', f'获取企业信息失败--{e}')
time.sleep(5)
#break
cursor.close()
cnx.close()
# 释放资源
baseCore.close()
# Press the green button in the gutter to run the script.
if __name__ == '__main__':
log.info(f'当前进程id为{baseCore.getPID()}')
doJob()
#企业动态 从redis中获取数据
import json
import os
import random
import subprocess
import requests, time, pymysql
import jieba
import sys
from bs4 import BeautifulSoup
from kafka import KafkaProducer
from getTycId import getTycIdByXYDM
# from base.BaseCore import BaseCore
# from base.smart import smart_extractor
sys.path.append('D:\\kkwork\\zzsn_spider\\base')
import BaseCore
from smart import smart_extractor
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
jieba.cut("必须加载jieba")
# 初始化,设置中文分词
smart =smart_extractor.SmartExtractor('cn')
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
cursor = cnx.cursor()
pageSize = 10
log.info(f'======================当前脚本进程为{baseCore.getPID()}==============================')
headers = {
'Accept': 'application/json, text/plain, */*',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Connection': 'keep-alive',
'Content-Type': 'application/json',
'Host': 'capi.tianyancha.com',
'Origin': 'https://www.tianyancha.com',
'Referer': 'https://www.tianyancha.com/',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-site',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36',
'X-AUTH-TOKEN': 'eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiIxODcwMzc1MjYwMCIsImlhdCI6MTY5NzE5MDMwMywiZXhwIjoxNjk5NzgyMzAzfQ.awXuS-59RzK35r0aUJq4Rj83JzyAOvsdUfL_ojp66CVQMjlLv_ZDD9g5gCoZKE21LN1JYRMLNZhuWsHhxapROw',
'X-TYCID': '6f6298905d3011ee96146793e725899d',
'sec-ch-ua': '"Google Chrome";v="117", "Not;A=Brand";v="8", "Chromium";v="117"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'version': 'TYC-Web'
}
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
taskType = '企业动态/天眼查/补采专精特新'
def reqDetailmsg(url,headers):
# proxy = {'https': 'http://127.0.0.1:1080', 'http': 'http://127.0.0.1:1080'}
for i in range(0,1):
try:
response=requests.get(url=url,headers=headers,timeout=8,verify=False)
response.encoding = response.apparent_encoding
htmltext=response.text
except Exception as e:
htmltext=''
log.info(f'{url}---详情请求失败--{e}')
if htmltext:
log.info(f'{url}---详情请求成功')
break
return htmltext
def beinWork(tyc_code, social_code,start_time):
time.sleep(3)
# retData={'up_state':False,'total':0,'up_okCount':0,'up_errorCount':0,'up_repetCount':0}
retData = {'total': 0, 'up_okCount': 0, 'up_errorCount': 0, 'up_repetCount': 0}
t = time.time()
url = f'https://capi.tianyancha.com/cloud-yq-news/company/detail/publicmsg/news/webSimple?_={t}&id={tyc_code}&ps={pageSize}&pn=1&emotion=-100&event=-100'
try:
for m in range(0,3):
ip = baseCore.get_proxy()
headers['User-Agent'] = baseCore.getRandomUserAgent()
response = requests.get(url=url, headers=headers, proxies=ip, verify=False)
time.sleep(random.randint(3, 5))
break
if (response.status_code == 200):
pass
except Exception as e:
#todo:重新放入redis中
baseCore.rePutIntoR('NoticeEnterprise:gnqy_socialCode',social_code)
log.error(f"{tyc_code}-----获取总数接口失败")
error = '获取总数接口失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{error}----{e}')
#获取当前进程pid
current_pid = baseCore.getPID()
#todo: 重新启动新进程,杀死当前进程
subprocess.Popen([sys.executable] + sys.argv)
os.kill(current_pid,9)
return retData
try:
json_1 = json.loads(response.content.decode('utf-8'))
total = json_1['data']['total']
except:
log.error(f"{tyc_code}-----获取总数失败")
e = '获取总数失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, e)
return retData
if (total > 0):
if (total % pageSize == 0):
totalPage = total // pageSize
else:
totalPage = total // pageSize + 1
else:
log.error(f"{tyc_code}--------总数为0")
retData['state'] = True
return retData
log.info(f"{tyc_code}-------总数:{total}----总页数:{totalPage}")
retData['total'] = total
up_okCount = 0
up_errorCount = 0
up_repetCount = 0
for num in range(1, totalPage + 1):
time.sleep(3)
log.info(f"获取分页数据--{tyc_code}----分页{num}----开始")
start_page = time.time()
url_page = f'https://capi.tianyancha.com/cloud-yq-news/company/detail/publicmsg/news/webSimple?_={time.time()}&id={tyc_code}&ps={pageSize}&pn={num}&emotion=-100&event=-100'
for m in range(0, 3):
try:
ip = baseCore.get_proxy()
headers['User-Agent'] = baseCore.getRandomUserAgent()
response_page = requests.get(url=url_page, headers=headers, proxies=ip, verify=False)
# time.sleep(3)
break
except:
pass
if (response_page.status_code == 200):
pass
else:
log.error(f"{tyc_code}--{num}页---获取分页数据失败")
e = '获取分页数据失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url_page, e)
up_errorCount = up_errorCount + pageSize
continue
try:
json_page = json.loads(response_page.content.decode('utf-8'))
info_list_page = json_page['data']['items']
except:
log.error(f"{tyc_code}--{num}页---获取分页数据失败")
e = '获取分页数据失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url_page, e)
up_errorCount = up_errorCount + pageSize
continue
pageIndex = 0
for info_page in info_list_page:
pageIndex = pageIndex + 1
title = info_page['title']
source = info_page['website']
link = info_page['uri']
try:
sel_sql = '''select social_credit_code from brpa_source_article_news where source_address = %s and social_credit_code=%s and type='2' '''
cursor_.execute(sel_sql, (link, social_code))
except Exception as e:
print(e)
selects = cursor_.fetchone()
if selects:
log.info(f'{tyc_code}-----{social_code}----{link}:已经存在')
# todo:如果该条数据存在则说明该条数据之后的都已经采集完成,就可以跳出函数,执行下一个企业
retData['up_okCount'] = up_okCount
retData['up_errorCount'] = up_errorCount
retData['up_repetCount'] = up_repetCount
return retData
try:
time_struct = time.localtime(int(info_page['rtm'] / 1000)) # 首先把时间戳转换为结构化时间
time_format = time.strftime("%Y-%m-%d %H:%M:%S", time_struct) # 把结构化时间转换为格式化时间
except:
time_format = baseCore.getNowTime(1)
try:
# 开始进行智能解析
# lang = baseCore.detect_language(title)
# smart = smart_extractor.SmartExtractor(lang)
#带标签正文
contentText = smart.extract_by_url(link).text
#不带标签正文
content = smart.extract_by_url(link).cleaned_text
if len(content) < 300:
continue
# time.sleep(3)
except Exception as e:
contentText = ''
if contentText == '':
log.error(f'获取正文失败:--------{tyc_code}--------{num}--------{link}')
e = '获取正文失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, link, e)
up_errorCount = up_errorCount + 1
try:
insert_err_sql = f"insert into dt_err(xydm,`from`,url,title,pub_date,zhaiyao,create_date,state,pageNo,pageIndex) values('{social_code}','{source}','{link}','{title}','{time_format}','{info_page['abstracts']}',now(),1,{num},{pageIndex})"
cursor.execute(insert_err_sql)
cnx.commit()
except:
pass
continue
try:
insert_sql = '''insert into brpa_source_article_news(social_credit_code,source_address,origin,type,publish_time,content,create_time) values(%s,%s,%s,%s,%s,%s,now())'''
# 动态信息列表
up_okCount = up_okCount + 1
list_info = [
social_code,
link,
f'天眼查-{source}',
'2',
time_format,
content[:500]
]
cursor_.execute(insert_sql, tuple(list_info))
cnx_.commit()
# 采集一条资讯记录一条,记录该企业采到了多少的资讯
log.info(f'{social_code}----{link}:新增一条')
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# todo:插入一条数据,并传入kafka
dic_news = {
'attachmentIds': '',
'author': '',
'content': content,
'contentWithTag': contentText,
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': 'zh',
'origin': source,
'publishDate': time_format,
# 'sid': '1684032033495392257',
'sid': '1714852232679067649',
'sourceAddress': link, # 原文链接
'summary': info_page['abstracts'],
'title': title,
'type': 2,
'socialCreditCode': social_code,
'year': time_format[:4]
}
except Exception as e:
log.info(f'传输失败:{social_code}----{link}')
error = '数据库传输失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, link, f'{error}----{e}')
continue
# print(dic_news)
# 将相应字段通过kafka传输保存
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("researchReportTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
dic_result = {
'success': 'ture',
'message': '操作成功',
'code': '200',
}
log.info(dic_result)
# 传输成功,写入日志中
state = 1
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, link, '')
# return True
except Exception as e:
dic_result = {
'success': 'false',
'message': '操作失败',
'code': '204',
'e': e
}
log.error(dic_result)
e = 'Kafka操作失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, link, e)
log.info(f"获取分页数据--{tyc_code}----分页{num},耗时{baseCore.getTimeCost(start_page, time.time())}")
retData['up_okCount'] = up_okCount
retData['up_errorCount'] = up_errorCount
retData['up_repetCount'] = up_repetCount
return retData
# 日志信息保存至现已创建好数据库中,因此并没有再对此前保存日志信息数据库进行保存
def doJob():
while True:
# 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息
social_code = baseCore.redicPullData('NewsEnterprise:gnqybc_socialCode')
# social_code = '912301001275921118'
# 判断 如果Redis中已经没有数据,则等待
if social_code == None:
time.sleep(20)
continue
start = time.time()
try:
data = baseCore.getInfomation(social_code)
if len(data) != 0:
pass
else:
#数据重新塞入redis
baseCore.rePutIntoR('NewsEnterprise:gnqybc_socialCode',social_code)
continue
id = data[0]
xydm = data[2]
tycid = data[11]
if tycid == None or tycid == '':
try:
retData = getTycIdByXYDM(xydm)
if retData['tycData'] and retData['reput']:
tycid = retData['tycData']['id']
# todo:写入数据库
updateSql = f"update EnterpriseInfo set TYCID = '{tycid}' where SocialCode = '{xydm}'"
cursor_.execute(updateSql)
cnx_.commit()
elif not retData['tycData'] and retData['reput']:
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', '获取天眼查id失败')
log.info(f'======={social_code}====重新放入redis====')
baseCore.rePutIntoR('NewsEnterprise:gnqybc_socialCode', social_code)
continue
elif not retData['reput'] and not retData['tycData']:
continue
except:
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', '获取天眼查id失败')
baseCore.rePutIntoR('NewsEnterprise:gnqybc_socialCode', social_code)
continue
count = data[17]
log.info(f"{id}---{xydm}----{tycid}----开始处理")
start_time = time.time()
# 开始采集企业动态
retData = beinWork(tycid, xydm,start_time)
# 信息采集完成后将该企业的采集次数更新
runType = 'NewsRunCount'
count += 1
baseCore.updateRun(social_code, runType, count)
total = retData['total']
up_okCount = retData['up_okCount']
up_errorCount = retData['up_errorCount']
up_repetCount = retData['up_repetCount']
log.info(
f"{id}---{xydm}----{tycid}----结束处理,耗时{baseCore.getTimeCost(start_time, time.time())}---总数:{total}---成功数:{up_okCount}----失败数:{up_errorCount}--重复数:{up_repetCount}")
except Exception as e:
log.info(f'==={social_code}=====获取企业信息失败====')
#重新塞入redis
baseCore.rePutIntoR('NewsEnterprise:gnqybc_socialCode',social_code)
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', f'获取企业信息失败--{e}')
time.sleep(5)
cursor.close()
cnx.close()
# 释放资源
baseCore.close()
if __name__ == '__main__':
log.info(f'当前进程id为{baseCore.getPID()}')
doJob()
"""
"""
Elasticsearch 安装
pip install elasticsearch==7.8.1 版本的
使用时参考文章
https://blog.csdn.net/yangbisheng1121/article/details/128528112
https://blog.csdn.net/qiuweifan/article/details/128610083
"""
import json
import time
import uuid
import requests
from retry import retry
from elasticsearch import Elasticsearch
from base import BaseCore
from obs import ObsClient
import fitz
from urllib.parse import unquote
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
baseCore = BaseCore.BaseCore()
cnx = baseCore.cnx
cursor = baseCore.cursor
cnx_ = baseCore.cnx_
cursor_ = baseCore.cursor_
pathType = 'QYNotice/'
taskType = '企业公告/证监会'
obsClient = ObsClient(
access_key_id='VEHN7D0TJ9316H8AHCAV', # 你的华为云的ak码
secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk
server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址
)
class EsMethod(object):
def __init__(self):
# 创建Elasticsearch对象,并提供账号信息
self.es = Elasticsearch(['http://114.116.19.92:9700'], http_auth=('elastic', 'zzsn9988'), timeout=300)
self.index_name = 'researchreportdata'
def queryatt(self,index_name):
body = {
"_source": ["attachmentIds", "createDate", "sourceAddress", "labels.relationId", "title", "year",
"publishDate", "createDate"],
"query": {
"bool": {
"must": [
{
"match": {
"type": "3"
}
},
{
"wildcard": {
"attachmentIds.keyword": "911*"
}
}
]
}
},
"sort": [
{
"createDate": {
"order": "asc"
}
}
],
"track_total_hits": True,
"size": 200
}
filter_path = ['hits.hits._id',
'hits.total.value',
'hits.hits._source.attachmentIds', # 字段1
'hits.hits._source.title',
'hits.hits._source.sourceAddress',
'hits.hits._source.createDate',
'hits.hits._source.labels.relationId',
'hits.hits._source.publishDate',
'hits.hits._source.year',
'hits.hits._source.createDate',
] # 字段2
result = self.es.search(index=index_name
, doc_type='_doc'
, filter_path=filter_path
, body=body)
log.info(result)
return result
def updateaunn(self,index_name,id,u_attid):
body = {
'doc': {
'attachmentIds': [str(u_attid)]
}
}
result = self.es.update(index=index_name
,id=id
,body=body)
log.info('更新结果:%s' % result)
def getuuid():
get_timestamp_uuid = uuid.uuid1() # 根据 时间戳生成 uuid , 保证全球唯一
return get_timestamp_uuid
#获取文件大小
def convert_size(size_bytes):
# 定义不同单位的转换值
units = ['bytes', 'KB', 'MB', 'GB', 'TB']
i = 0
while size_bytes >= 1024 and i < len(units)-1:
size_bytes /= 1024
i += 1
return f"{size_bytes:.2f} {units[i]}"
def uptoOBS(pdf_url,pdf_name,type_id,social_code):
start_time = time.time()
headers = {}
retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': '', 'path': '',
'full_path': '',
'category': 'pdf', 'file_size': '', 'status': 1, 'create_by': 'XueLingKun',
'create_time': '', 'page_size': '', 'content': ''}
headers['User-Agent'] = baseCore.getRandomUserAgent()
for i in range(0, 3):
try:
response = requests.get(pdf_url, headers=headers, verify=False, timeout=20)
file_size = int(response.headers.get('Content-Length'))
break
except:
time.sleep(3)
continue
page_size = 0
name = str(getuuid()) + '.pdf'
now_time = time.strftime("%Y-%m")
try:
result = getOBSres(pathType, name, response)
except:
log.error(f'OBS发送失败')
return retData
try:
with fitz.open(stream=response.content, filetype='pdf') as doc:
page_size = doc.page_count
for page in doc.pages():
retData['content'] += page.get_text()
except:
log.error(f'文件损坏')
return retData
if page_size < 1:
# pdf解析失败
# print(f'======pdf解析失败=====')
return retData
else:
try:
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
retData['state'] = True
retData['path'] = result['body']['objectUrl'].split('.com')[1]
retData['full_path'] = result['body']['objectUrl']
retData['file_size'] = convert_size(file_size)
retData['create_time'] = time_now
retData['page_size'] = page_size
except Exception as e:
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, f'{e}')
return retData
return retData
@retry(tries=3, delay=1)
def getOBSres(pathType,name, response):
result = obsClient.putContent('zzsn', pathType + name, content=response.content)
# resp = obsClient.putFile('zzsn', pathType + name, file_path='要上传的那个文件的本地路径')
return result
def secrchATT(item_id, retData, type_id,order_by):
sel_sql = '''select id from clb_sys_attachment where item_id = %s and path = %s and type_id=%s and order_by=%s '''
cursor_.execute(sel_sql, (item_id, retData['path'], type_id,order_by))
selects = cursor_.fetchone()
return selects
# 插入到att表 返回附件id
def tableUpdate(retData, year, pdf_name, num,pub_time,origin):
item_id = retData['item_id']
type_id = retData['type_id']
group_name = retData['group_name']
path = retData['path']
full_path = retData['full_path']
category = retData['category']
file_size = retData['file_size']
status = retData['status']
create_by = retData['create_by']
page_size = retData['page_size']
create_time = retData['create_time']
order_by = num
# selects = secrchATT(item_id, pdf_name, type_id)
#
# if selects:
# log.info(f'pdf_name:{pdf_name}已存在')
# id = ''
# return id
# else:
try:
Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,page_size,object_key,bucket_name,publish_time,source) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
values = (
year, pdf_name, type_id, item_id, group_name, path, full_path, category, file_size, order_by,
status, create_by,
create_time, page_size, full_path.split('https://zzsn.obs.cn-north-1.myhuaweicloud.com/')[1], 'zzsn',
pub_time, origin)
cursor_.execute(Upsql, values) # 插入
cnx_.commit() # 提交
except Exception as e:
log.info(e)
log.info(f"更新完成:{item_id}===={pdf_name}")
selects = secrchATT(item_id, retData, type_id,order_by)
id = selects[0]
return id
def upload(sourceAddress,num):
# todo:链接上传obs
retData = uptoOBS(sourceAddress, title + '.pdf', 8, social_code)
# 附件插入att数据库
if retData['state']:
pass
else:
log.info(f'====pdf解析失败====')
return None
num = num + 1
origin = '证监会'
att_id = tableUpdate(retData, year, title + '.pdf', num, publishDate, origin)
if att_id:
return att_id
else:
return None
if __name__ == '__main__':
esMethod = EsMethod()
# esMethod.getFileds(index_name=esMethod.index_name)
page = 1
while True:
result = esMethod.queryatt(index_name=esMethod.index_name)
total = result['hits']['total']['value']
if total==0:
log.info('++++已没有数据+++++')
break
msglist = result['hits']['hits']
log.info(f'---第{page}页{len(msglist)}条数据----共{total}条数据----')
# print(msglist)
num = 0
for mms in msglist:
id = mms['_id']
title = mms['_source']['title']
sourceAddress = mms['_source']['sourceAddress']
social_code = mms['_source']['labels'][0]['relationId']
year = mms['_source']['year']
publishDate = mms['_source']['publishDate']
createDate = mms['_source']['createDate']
log.info(f'{id}---{title}--{sourceAddress}---{social_code}')
att_id = upload(sourceAddress,num)
u_attid = att_id
esMethod.updateaunn(esMethod.index_name, str(id), u_attid)
page+=1
# # esMethod.delete(esMethod.index_name,str(id))
# print('跟新成功!!')
"""
"""
Elasticsearch 安装
pip install elasticsearch==7.8.1 版本的
使用时参考文章
https://blog.csdn.net/yangbisheng1121/article/details/128528112
https://blog.csdn.net/qiuweifan/article/details/128610083
"""
import json
import time
import uuid
import requests
from retry import retry
from elasticsearch import Elasticsearch
from base import BaseCore
from obs import ObsClient
import fitz
from urllib.parse import unquote
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
baseCore = BaseCore.BaseCore()
cnx = baseCore.cnx
cursor = baseCore.cursor
cnx_ = baseCore.cnx_
cursor_ = baseCore.cursor_
pathType = 'QYNotice/'
taskType = '企业公告/证监会'
obsClient = ObsClient(
access_key_id='VEHN7D0TJ9316H8AHCAV', # 你的华为云的ak码
secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk
server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址
)
class EsMethod(object):
def __init__(self):
# 创建Elasticsearch对象,并提供账号信息
self.es = Elasticsearch(['http://114.116.19.92:9700'], http_auth=('elastic', 'zzsn9988'), timeout=300)
self.index_name = 'researchreportdata'
def queryatt(self,index_name):
body = {
"_source": ["attachmentIds", "createDate", "sourceAddress", "labels.relationId", "title", "year",
"publishDate", "createDate"],
"query": {
"bool": {
"must": [
{
"match": {
"type": "3"
}
},
{
"wildcard": {
"attachmentIds.keyword": "None"
}
}
]
}
},
"sort": [
{
"createDate": {
"order": "desc"
}
}
],
"track_total_hits": True,
"size": 200
}
filter_path = ['hits.hits._id',
'hits.total.value',
'hits.hits._source.attachmentIds', # 字段1
'hits.hits._source.title',
'hits.hits._source.sourceAddress',
'hits.hits._source.createDate',
'hits.hits._source.labels.relationId',
'hits.hits._source.publishDate',
'hits.hits._source.year',
'hits.hits._source.createDate',
] # 字段2
result = self.es.search(index=index_name
, doc_type='_doc'
, filter_path=filter_path
, body=body)
log.info(result)
return result
def updateaunn(self,index_name,id,u_attid):
body = {
'doc': {
'attachmentIds': [str(u_attid)]
}
}
result = self.es.update(index=index_name
,id=id
,body=body)
log.info('更新结果:%s' % result)
def getuuid():
get_timestamp_uuid = uuid.uuid1() # 根据 时间戳生成 uuid , 保证全球唯一
return get_timestamp_uuid
#获取文件大小
def convert_size(size_bytes):
# 定义不同单位的转换值
units = ['bytes', 'KB', 'MB', 'GB', 'TB']
i = 0
while size_bytes >= 1024 and i < len(units)-1:
size_bytes /= 1024
i += 1
return f"{size_bytes:.2f} {units[i]}"
def uptoOBS(pdf_url,pdf_name,type_id,social_code):
start_time = time.time()
headers = {}
retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': '', 'path': '',
'full_path': '',
'category': 'pdf', 'file_size': '', 'status': 1, 'create_by': 'XueLingKun',
'create_time': '', 'page_size': '', 'content': ''}
headers['User-Agent'] = baseCore.getRandomUserAgent()
for i in range(0, 3):
try:
response = requests.get(pdf_url, headers=headers, verify=False, timeout=20)
file_size = int(response.headers.get('Content-Length'))
break
except:
time.sleep(3)
continue
page_size = 0
name = str(getuuid()) + '.pdf'
now_time = time.strftime("%Y-%m")
try:
result = getOBSres(pathType, name, response)
except:
log.error(f'OBS发送失败')
return retData
try:
with fitz.open(stream=response.content, filetype='pdf') as doc:
page_size = doc.page_count
for page in doc.pages():
retData['content'] += page.get_text()
except:
log.error(f'文件损坏')
return retData
if page_size < 1:
# pdf解析失败
# print(f'======pdf解析失败=====')
return retData
else:
try:
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
retData['state'] = True
retData['path'] = result['body']['objectUrl'].split('.com')[1]
retData['full_path'] = result['body']['objectUrl']
retData['file_size'] = convert_size(file_size)
retData['create_time'] = time_now
retData['page_size'] = page_size
except Exception as e:
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, f'{e}')
return retData
return retData
@retry(tries=3, delay=1)
def getOBSres(pathType,name, response):
result = obsClient.putContent('zzsn', pathType + name, content=response.content)
# resp = obsClient.putFile('zzsn', pathType + name, file_path='要上传的那个文件的本地路径')
return result
def secrchATT(item_id, retData, type_id,order_by):
sel_sql = '''select id from clb_sys_attachment where item_id = %s and path = %s and type_id=%s and order_by=%s '''
cursor_.execute(sel_sql, (item_id, retData['path'], type_id,order_by))
selects = cursor_.fetchone()
return selects
# 插入到att表 返回附件id
def tableUpdate(retData, year, pdf_name, num,pub_time,origin):
item_id = retData['item_id']
type_id = retData['type_id']
group_name = retData['group_name']
path = retData['path']
full_path = retData['full_path']
category = retData['category']
file_size = retData['file_size']
status = retData['status']
create_by = retData['create_by']
page_size = retData['page_size']
create_time = retData['create_time']
order_by = num
# selects = secrchATT(item_id, pdf_name, type_id)
#
# if selects:
# log.info(f'pdf_name:{pdf_name}已存在')
# id = ''
# return id
# else:
try:
Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,page_size,object_key,bucket_name,publish_time,source) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
values = (
year, pdf_name, type_id, item_id, group_name, path, full_path, category, file_size, order_by,
status, create_by,
create_time, page_size, full_path.split('https://zzsn.obs.cn-north-1.myhuaweicloud.com/')[1], 'zzsn',
pub_time, origin)
cursor_.execute(Upsql, values) # 插入
cnx_.commit() # 提交
except Exception as e:
log.info(e)
log.info(f"更新完成:{item_id}===={pdf_name}")
selects = secrchATT(item_id, retData, type_id,order_by)
id = selects[0]
return id
def upload(sourceAddress,num):
# todo:链接上传obs
retData = uptoOBS(sourceAddress, title + '.pdf', 8, social_code)
# 附件插入att数据库
if retData['state']:
pass
else:
log.info(f'====pdf解析失败====')
return None
num = num + 1
origin = '证监会'
att_id = tableUpdate(retData, year, title + '.pdf', num, publishDate, origin)
if att_id:
return att_id
else:
return None
if __name__ == '__main__':
esMethod = EsMethod()
# esMethod.getFileds(index_name=esMethod.index_name)
page = 1
while True:
result = esMethod.queryatt(index_name=esMethod.index_name)
total = result['hits']['total']['value']
if total==0:
log.info('++++已没有数据+++++')
break
msglist = result['hits']['hits']
log.info(f'---第{page}页{len(msglist)}条数据----共{total}条数据----')
# print(msglist)
num = 0
for mms in msglist:
id = mms['_id']
title = mms['_source']['title']
sourceAddress = mms['_source']['sourceAddress']
social_code = mms['_source']['labels'][0]['relationId']
year = mms['_source']['year']
publishDate = mms['_source']['publishDate']
createDate = mms['_source']['createDate']
log.info(f'{id}---{title}--{sourceAddress}---{social_code}')
att_id = upload(sourceAddress,num)
u_attid = att_id
esMethod.updateaunn(esMethod.index_name, str(id), u_attid)
page+=1
# # esMethod.delete(esMethod.index_name,str(id))
# print('跟新成功!!')
......@@ -313,7 +313,7 @@ def get_content1():
end_time = time.time()
log.info(f'共抓取国务院文件{num}条数据,共耗时{end_time-start_time}')
# 国务院部文件
# 国务院部文件
def get_content2():
pathType = 'policy/gwybmwj/'
def getTotalpage(bmfl,headers,session):
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论