提交 9d6175f9 作者: 薛凌堃

11/17

上级 6173700f
import os
import os
......@@ -68,6 +68,12 @@ class Policy():
for tag_ in tags:
tag_.extract() # 删除指定标签
def deletetext(self,soup, tag, text):# 删除带有特定内容的标签
tags = soup.find_all(tag)[:10]
for tag_ in tags:
text_ = tag_.text
if text in text_:
tag_.extract()
def deletek(self,soup):
# 删除空白标签(例如<p></p>、<p><br></p>, img、video、hr除外)
......@@ -850,7 +856,11 @@ def guangdong():
# 贵州省人民政府
def guizhou():
url = "https://www.guizhou.gov.cn/irs/front/search"
num = 0
path = 'data/贵州省人民政府'
if not os.path.exists(path):
os.makedirs(path)
DataList = []
payload = "{\"tenantId\":\"186\",\"configTenantId\":\"\",\"tenantIds\":\"\",\"searchWord\":\"REITs\",\"historySearchWords\":[\"REITs\"],\"dataTypeId\":\"965\",\"orderBy\":\"related\",\"searchBy\":\"all\",\"appendixType\":\"\",\"granularity\":\"ALL\",\"beginDateTime\":\"\",\"endDateTime\":\"\",\"isSearchForced\":0,\"filters\":[],\"pageNo\":1,\"pageSize\":9}"
headers = {
'Accept': 'application/json, text/javascript, */*; q=0.01',
......@@ -875,22 +885,82 @@ def guizhou():
jsonData = policy.requestPost(headers, url, payload)
result_list = jsonData['data']['middle']["list"]
for datainfo in result_list:
num += 1
title = datainfo['title']
publishData = datainfo['time']
publishDate = datainfo['time']
source = datainfo['source']
summary = datainfo['content']
newsUrl = datainfo['url']
soup = policy.getrequest_soup(headers,newsUrl)
soup = policy.getrequest_soup(headers, newsUrl)
# print(soup)
pub_hao = soup.find('head').find('title')
pub_hao_ = soup.find('head').find('title').text
start_index = pub_hao_.find("(") + 1
end_index = pub_hao_.find(")")
pub_hao = pub_hao_[start_index:end_index]
print(pub_hao)
# 删除包含特定字段的标签
contentWithTag = soup.find('div', class_='Zoom Box')
policy.deletetext(contentWithTag, 'p', title)
policy.deletetext(contentWithTag, 'p', pub_hao)
content = contentWithTag.text
# 附件:
fu_jian_name = ''
fu_jian_href = ''
try:
fujian_href = contentWithTag.find_all('a')
policy.paserUrl(contentWithTag, newsUrl)
for file_href_ in fujian_href:
file_href = file_href_['href']
file_name = file_href_.text
category = os.path.splitext(file_href)[1]
if category in file_name:
pass
else:
file_name = file_name + category
rename_file = f'{str(num)}_{publishDate.replace("-", "")[:8]}_{file_name}'
fu_jian_name += rename_file + '\n'
fu_jian_href += file_href + '\n'
policy.downloadfile(file_href, f'{path}/{rename_file}')
except:
pass
dic_info = {
'序号': num,
'标题': title.replace('\n', ''),
'发布时间': publishDate,
'来源': source,
'原文链接': newsUrl,
'发文时间': publishDate,
'发文机构': '',
'发文字号': pub_hao,
'摘要': summary.replace('\n', '').replace('<em>', '').replace('</em>', ''),
'正文': content,
'附件名称': fu_jian_name,
'附件链接': fu_jian_href,
}
print(dic_info)
DataList.append(dic_info)
sheet_name = '贵州省人民政府政策文件'
if sheet_name in wb.sheetnames:
log.info(f"{sheet_name}工作表已存在!")
else:
# 创建新工作表
wb.create_sheet(sheet_name)
print(f"{sheet_name}新工作表创建完成!")
# 保存Excel文件
wb.save(file_path)
baseCore.writerToExcel(DataList, file_path, sheet_name)
pass
if __name__=="__main__":
file_path = f'data/REITs深圳交易所.xlsx'
file_path = f'data/REITs贵州省人民政府.xlsx'
wb = policy.createfile(file_path)
# reform(wb,file_path)
# shenzhen()
......
import json
import threading
import time
import uuid
import requests
from retry import retry
from elasticsearch import Elasticsearch
from base import BaseCore
from obs import ObsClient
import fitz
from urllib.parse import unquote
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
baseCore = BaseCore.BaseCore()
# 使用连接池
# cnx_ = baseCore.pool_11.connection()
# cursor_ = cnx_.cursor()
cnx_ = baseCore.cnx_
cursor_ = cnx_.cursor()
lock = threading.Lock()
pathType = 'QYNotice/'
taskType = '企业公告/证监会'
class EsMethod(object):
def __init__(self):
# 创建Elasticsearch对象,并提供账号信息
self.es = Elasticsearch(['http://114.116.19.92:9700'], http_auth=('elastic', 'zzsn9988'), timeout=300)
self.index_name = 'researchreportdata'
def queryatt(self,index_name,pnum):
body = {
"_source": ["attachmentIds", "sourceAddress", "title",
"publishDate", "createDate"],
"query": {
"bool": {
"must": [
{
"match": {
"type": "0"
}
}
]
}
},
"sort": [
{
"createDate": {
"order": "asc"
}
}
],
"track_total_hits": True,
"size": 200,
"from": pnum
}
filter_path = ['hits.hits._id',
'hits.total.value',
'hits.hits._source.attachmentIds', # 字段1
'hits.hits._source.title',
'hits.hits._source.sourceAddress',
'hits.hits._source.createDate',
'hits.hits._source.publishDate',
] # 字段2
result = self.es.search(index=index_name
, doc_type='_doc'
, filter_path=filter_path
, body=body)
# log.info(result)
return result
def updateaunn(self,index_name,id,u_attid):
body = {
'doc': {
'attachmentIds': [str(u_attid)]
}
}
result = self.es.update(index=index_name
,id=id
,body=body)
log.info('更新结果:%s' % result)
# 根据es附件id,去附件表中查询,未查询到的研报,根据title查询出附件id,更新es
def secrchATT(item_id, retData, type_id,order_by):
sel_sql = '''select id from clb_sys_attachment where item_id = %s and path = %s and type_id=%s and order_by=%s '''
lock.acquire()
cursor_.execute(sel_sql, (item_id, retData['path'], type_id,order_by))
selects = cursor_.fetchone()
lock.release()
return selects
if __name__ == "__main__":
esMethod = EsMethod()
p = 200
result = esMethod.queryatt(index_name=esMethod.index_name, pnum=p)
total = result['hits']['total']['value']
page = 0
if total == 0:
log.info('++++已没有数据+++++')
msglist = result['hits']['hits']
print(msglist)
log.info(f'---第{page}页{len(msglist)}条数据----共{total}条数据----')
num = 0
for mms in msglist:
start_time = time.time()
id = mms['_id']
title = mms['_source']['title']
sourceAddress = mms['_source']['sourceAddress']
social_code = mms['_source']['labels'][0]['relationId']
year = mms['_source']['year']
publishDate = mms['_source']['publishDate']
createDate = mms['_source']['createDate']
log.info(f'{id}---{title}--{sourceAddress}---{social_code}')
"""
"""
......@@ -291,7 +291,7 @@ def run_threads(num_threads,esMethod):
thread.join()
if __name__ == '__main__':
for i in range(0,5):
for i in range(0,100):
esMethod = EsMethod()
p = 0
result = esMethod.queryatt(index_name=esMethod.index_name, pnum=p)
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论