提交 bed1863a 作者: 薛凌堃

Merge remote-tracking branch 'origin/master'

"""
"""
......@@ -6,6 +6,7 @@ https://blog.csdn.net/yangbisheng1121/article/details/128528112
https://blog.csdn.net/qiuweifan/article/details/128610083
"""
import json
import threading
import time
import uuid
import requests
......@@ -39,7 +40,7 @@ class EsMethod(object):
self.es = Elasticsearch(['http://114.116.19.92:9700'], http_auth=('elastic', 'zzsn9988'), timeout=300)
self.index_name = 'researchreportdata'
def queryatt(self,index_name):
def queryatt(self,index_name,pnum):
body = {
"_source": ["attachmentIds", "createDate", "sourceAddress", "labels.relationId", "title", "year",
"publishDate", "createDate"],
......@@ -67,7 +68,8 @@ class EsMethod(object):
}
],
"track_total_hits": True,
"size": 200
"size": 200,
"from": pnum
}
filter_path = ['hits.hits._id',
......@@ -114,13 +116,13 @@ def convert_size(size_bytes):
i += 1
return f"{size_bytes:.2f} {units[i]}"
def uptoOBS(pdf_url,pdf_name,type_id,social_code):
def uptoOBS(pdf_url,pdf_name,type_id,social_code,createDate):
start_time = time.time()
headers = {}
retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': '', 'path': '',
'full_path': '',
'category': 'pdf', 'file_size': '', 'status': 1, 'create_by': 'XueLingKun',
'create_time': '', 'page_size': '', 'content': ''}
'create_time': createDate, 'page_size': '', 'content': ''}
headers['User-Agent'] = baseCore.getRandomUserAgent()
for i in range(0, 3):
try:
......@@ -218,9 +220,9 @@ def tableUpdate(retData, year, pdf_name, num,pub_time,origin):
id = selects[0]
return id
def upload(sourceAddress,num):
def upload(sourceAddress,num,title,social_code,year,publishDate,createDate):
# todo:链接上传obs
retData = uptoOBS(sourceAddress, title + '.pdf', 8, social_code)
retData = uptoOBS(sourceAddress, title + '.pdf', 8, social_code,createDate)
# 附件插入att数据库
if retData['state']:
pass
......@@ -234,39 +236,56 @@ def upload(sourceAddress,num):
return att_id
else:
return None
from multiprocessing import Process, Queue
def main(page,p,esMethod):
# esMethod = EsMethod()
# esMethod.getFileds(index_name=esMethod.index_name)
result = esMethod.queryatt(index_name=esMethod.index_name,pnum=p)
total = result['hits']['total']['value']
if total==0:
log.info('++++已没有数据+++++')
return
msglist = result['hits']['hits']
log.info(f'---第{page}页{len(msglist)}条数据----共{total}条数据----')
# print(msglist)
num = 0
for mms in msglist:
id = mms['_id']
title = mms['_source']['title']
sourceAddress = mms['_source']['sourceAddress']
social_code = mms['_source']['labels'][0]['relationId']
year = mms['_source']['year']
publishDate = mms['_source']['publishDate']
createDate = mms['_source']['createDate']
log.info(f'{id}---{title}--{sourceAddress}---{social_code}')
att_id = upload(sourceAddress,num,title,social_code,year,publishDate,createDate)
u_attid = att_id
esMethod.updateaunn(esMethod.index_name, str(id), u_attid)
def run_threads(num_threads,esMethod):
threads = []
for i in range(num_threads):
page = i + 1
p = i * 200
thread = threading.Thread(target=main, args=(page,p,esMethod))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
if __name__ == '__main__':
esMethod = EsMethod()
# esMethod.getFileds(index_name=esMethod.index_name)
page = 1
while True:
result = esMethod.queryatt(index_name=esMethod.index_name)
esMethod = EsMethod()
p = 0
result = esMethod.queryatt(index_name=esMethod.index_name, pnum=p)
total = result['hits']['total']['value']
if total==0:
if total == 0:
log.info('++++已没有数据+++++')
break
msglist = result['hits']['hits']
log.info(f'---第{page}页{len(msglist)}条数据----共{total}条数据----')
# print(msglist)
num = 0
for mms in msglist:
id = mms['_id']
title = mms['_source']['title']
sourceAddress = mms['_source']['sourceAddress']
social_code = mms['_source']['labels'][0]['relationId']
year = mms['_source']['year']
publishDate = mms['_source']['publishDate']
createDate = mms['_source']['createDate']
log.info(f'{id}---{title}--{sourceAddress}---{social_code}')
att_id = upload(sourceAddress,num)
u_attid = att_id
esMethod.updateaunn(esMethod.index_name, str(id), u_attid)
page+=1
# # esMethod.delete(esMethod.index_name,str(id))
# print('跟新成功!!')
num_threads = 5
run_threads(num_threads,esMethod)
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论