提交 ad1e6d7b 作者: 薛凌堃

公告补采

上级 71253052
"""
"""
Elasticsearch 安装
pip install elasticsearch==7.8.1 版本的
使用时参考文章
https://blog.csdn.net/yangbisheng1121/article/details/128528112
https://blog.csdn.net/qiuweifan/article/details/128610083
"""
import json
import time
import uuid
import requests
from retry import retry
from elasticsearch import Elasticsearch
from base import BaseCore
from obs import ObsClient
import fitz
from urllib.parse import unquote
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
baseCore = BaseCore.BaseCore()
cnx = baseCore.cnx
cursor = baseCore.cursor
cnx_ = baseCore.cnx_
cursor_ = baseCore.cursor_
pathType = 'QYNotice/'
taskType = '企业公告/证监会'
obsClient = ObsClient(
access_key_id='VEHN7D0TJ9316H8AHCAV', # 你的华为云的ak码
secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk
server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址
)
class EsMethod(object):
def __init__(self):
# 创建Elasticsearch对象,并提供账号信息
self.es = Elasticsearch(['http://114.116.19.92:9700'], http_auth=('elastic', 'zzsn9988'), timeout=300)
self.index_name = 'researchreportdata'
def queryatt(self,index_name):
body = {
"_source": ["attachmentIds", "createDate", "sourceAddress", "labels.relationId", "title", "year",
"publishDate", "createDate"],
"query": {
"bool": {
"must": [
{
"match": {
"type": "3"
}
},
{
"wildcard": {
"attachmentIds.keyword": "911*"
}
}
]
}
},
"sort": [
{
"createDate": {
"order": "desc"
}
}
],
"track_total_hits": True,
"size": 200
}
filter_path = ['hits.hits._id',
'hits.total.value',
'hits.hits._source.attachmentIds', # 字段1
'hits.hits._source.title',
'hits.hits._source.sourceAddress',
'hits.hits._source.createDate',
'hits.hits._source.labels.relationId',
'hits.hits._source.publishDate',
'hits.hits._source.year',
'hits.hits._source.createDate',
] # 字段2
result = self.es.search(index=index_name
, doc_type='_doc'
, filter_path=filter_path
, body=body)
log.info(result)
return result
def updateaunn(self,index_name,id,u_attid):
body = {
'doc': {
'attachmentIds': [str(u_attid)]
}
}
result = self.es.update(index=index_name
,id=id
,body=body)
log.info('更新结果:%s' % result)
def getuuid():
get_timestamp_uuid = uuid.uuid1() # 根据 时间戳生成 uuid , 保证全球唯一
return get_timestamp_uuid
#获取文件大小
def convert_size(size_bytes):
# 定义不同单位的转换值
units = ['bytes', 'KB', 'MB', 'GB', 'TB']
i = 0
while size_bytes >= 1024 and i < len(units)-1:
size_bytes /= 1024
i += 1
return f"{size_bytes:.2f} {units[i]}"
def uptoOBS(pdf_url,pdf_name,type_id,social_code):
start_time = time.time()
headers = {}
retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': '', 'path': '',
'full_path': '',
'category': 'pdf', 'file_size': '', 'status': 1, 'create_by': 'XueLingKun',
'create_time': '', 'page_size': '', 'content': ''}
headers['User-Agent'] = baseCore.getRandomUserAgent()
for i in range(0, 3):
try:
response = requests.get(pdf_url, headers=headers, verify=False, timeout=20)
file_size = int(response.headers.get('Content-Length'))
break
except:
time.sleep(3)
continue
page_size = 0
name = str(getuuid()) + '.pdf'
now_time = time.strftime("%Y-%m")
try:
result = getOBSres(pathType, name, response)
except:
log.error(f'OBS发送失败')
return retData
try:
with fitz.open(stream=response.content, filetype='pdf') as doc:
page_size = doc.page_count
for page in doc.pages():
retData['content'] += page.get_text()
except:
log.error(f'文件损坏')
return retData
if page_size < 1:
# pdf解析失败
# print(f'======pdf解析失败=====')
return retData
else:
try:
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
retData['state'] = True
retData['path'] = result['body']['objectUrl'].split('.com')[1]
retData['full_path'] = result['body']['objectUrl']
retData['file_size'] = convert_size(file_size)
retData['create_time'] = time_now
retData['page_size'] = page_size
except Exception as e:
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, f'{e}')
return retData
return retData
@retry(tries=3, delay=1)
def getOBSres(pathType,name, response):
result = obsClient.putContent('zzsn', pathType + name, content=response.content)
# resp = obsClient.putFile('zzsn', pathType + name, file_path='要上传的那个文件的本地路径')
return result
def secrchATT(item_id, retData, type_id,order_by):
sel_sql = '''select id from clb_sys_attachment where item_id = %s and path = %s and type_id=%s and order_by=%s '''
cursor_.execute(sel_sql, (item_id, retData['path'], type_id,order_by))
selects = cursor_.fetchone()
return selects
# 插入到att表 返回附件id
def tableUpdate(retData, year, pdf_name, num,pub_time,origin):
item_id = retData['item_id']
type_id = retData['type_id']
group_name = retData['group_name']
path = retData['path']
full_path = retData['full_path']
category = retData['category']
file_size = retData['file_size']
status = retData['status']
create_by = retData['create_by']
page_size = retData['page_size']
create_time = retData['create_time']
order_by = num
# selects = secrchATT(item_id, pdf_name, type_id)
#
# if selects:
# log.info(f'pdf_name:{pdf_name}已存在')
# id = ''
# return id
# else:
try:
Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,page_size,object_key,bucket_name,publish_time,source) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
values = (
year, pdf_name, type_id, item_id, group_name, path, full_path, category, file_size, order_by,
status, create_by,
create_time, page_size, full_path.split('https://zzsn.obs.cn-north-1.myhuaweicloud.com/')[1], 'zzsn',
pub_time, origin)
cursor_.execute(Upsql, values) # 插入
cnx_.commit() # 提交
except Exception as e:
log.info(e)
log.info(f"更新完成:{item_id}===={pdf_name}")
selects = secrchATT(item_id, retData, type_id,order_by)
id = selects[0]
return id
def upload(sourceAddress,num):
# todo:链接上传obs
retData = uptoOBS(sourceAddress, title + '.pdf', 8, social_code)
# 附件插入att数据库
if retData['state']:
pass
else:
log.info(f'====pdf解析失败====')
return None
num = num + 1
origin = '证监会'
att_id = tableUpdate(retData, year, title + '.pdf', num, publishDate, origin)
if att_id:
return att_id
else:
return None
if __name__ == '__main__':
esMethod = EsMethod()
# esMethod.getFileds(index_name=esMethod.index_name)
page = 1
while True:
result = esMethod.queryatt(index_name=esMethod.index_name)
total = result['hits']['total']['value']
if total==0:
log.info('++++已没有数据+++++')
break
msglist = result['hits']['hits']
log.info(f'---第{page}页{len(msglist)}条数据----共{total}条数据----')
# print(msglist)
num = 0
for mms in msglist:
id = mms['_id']
title = mms['_source']['title']
sourceAddress = mms['_source']['sourceAddress']
social_code = mms['_source']['labels'][0]['relationId']
year = mms['_source']['year']
publishDate = mms['_source']['publishDate']
createDate = mms['_source']['createDate']
log.info(f'{id}---{title}--{sourceAddress}---{social_code}')
att_id = upload(sourceAddress,num)
u_attid = att_id
esMethod.updateaunn(esMethod.index_name, str(id), u_attid)
page+=1
# # esMethod.delete(esMethod.index_name,str(id))
# print('跟新成功!!')
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论