提交 2aca66c0 作者: 薛凌堃

es操作

上级 61eecac3
...@@ -22,8 +22,7 @@ cnx_ = baseCore.cnx_ ...@@ -22,8 +22,7 @@ cnx_ = baseCore.cnx_
cursor_ = cnx_.cursor() cursor_ = cnx_.cursor()
lock = threading.Lock() lock = threading.Lock()
pathType_ = 'QYResearchReport/'
taskType = '企业研报/东方财富网'
pool = redis.ConnectionPool(host="114.115.236.206", port=6379, password='clbzzsn', db=6) pool = redis.ConnectionPool(host="114.115.236.206", port=6379, password='clbzzsn', db=6)
......
import pandas as pd
import urllib3 import urllib3
import BaseCore import BaseCore
from elasticsearch import Elasticsearch from elasticsearch import Elasticsearch
...@@ -11,20 +10,25 @@ es = Elasticsearch([{'host': '114.115.215.250', 'port': '9700'}], http_auth=('el ...@@ -11,20 +10,25 @@ es = Elasticsearch([{'host': '114.115.215.250', 'port': '9700'}], http_auth=('el
baseCore = BaseCore.BaseCore() baseCore = BaseCore.BaseCore()
log = baseCore.getLogger() log = baseCore.getLogger()
r = baseCore.r r = baseCore.r
cnx = baseCore.cnx
cursor = baseCore.cursor
cnx_11 = baseCore.cnx_ cnx_11 = baseCore.cnx_
cursor_11 = baseCore.cursor_ cursor_11 = baseCore.cursor_
def getList(): def getList():
sql = 'Select id,item_id,year from clb_sys_attachment where type_id = 1' df = pd.read_excel('./中国500强榜单年报.xlsx', sheet_name='Sheet1')
cursor_11.execute(sql) for i in range(len(df)):
datas = cursor_11.fetchall() social_code = df['企业信用代码'][i]
year = df['年报年份'][i]
gg_social_list = [str(item[1]) + "|" + str(item[0]) + "|" + str(item[2]) for item in datas] sql = 'Select id,item_id,year from clb_sys_attachment where type_id = 1 and item_id = %s and year = %s'
print('=======') cursor_11.execute(sql,(social_code, year))
for item in gg_social_list: datas = cursor_11.fetchall()
r.rpush("NianBao:info", item) if datas:
gg_social_list = [str(item[1]) + "|" + str(item[0]) + "|" + str(item[2]) for item in datas]
print('=======')
for item in gg_social_list:
r.rpush("NianBao:info", item)
else:
log.info(f'{social_code}|{year}======不存在')
if __name__ == "__main__": if __name__ == "__main__":
getList() getList()
"""
年报上传到es
content 需要重新解析
lang语言
origin 来源 从数据库中获取
title 从数据库中获取
dic_info = {
'attachmentIds': att_id,
'author': '',
'content': content,
'contentWithTag': '',
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': lang,
'origin': origin,
'publishDate': datetime_string,
'sid': '1684032033495392257',
'sourceAddress': year_url, # 原文链接
'summary': '',
'title': name_pdf.replace('.pdf', ''),
'type': 1,
'socialCreditCode': social_code,
'year': year
}
"""
# 1.无年份和信用代码 另外存一个redis的key中 # 1.无年份和信用代码 另外存一个redis的key中
# 2.有信用代码 id 年份的 (1)es 中的id 需要更新为附件表中的id # 2.有信用代码 id 年份的 (1)es 中的id 需要更新为附件表中的id
# (2)通过信用代码 查出名称相同的个数 如果有两个的话,说明其中有一个没有在es库中 需要把es库中的id获取到,并删除没有在es库中的那个记录 # (2)通过信用代码 查出名称相同的个数 如果有两个的话,说明其中有一个没有在es库中 需要把es库中的id获取到,并删除没有在es库中的那个记录
...@@ -32,11 +6,14 @@ dic_info = { ...@@ -32,11 +6,14 @@ dic_info = {
#todo:查出有一条记录的,先更新 其他的先保存到另一个redis中 #todo:查出有一条记录的,先更新 其他的先保存到另一个redis中
import json
import threading import threading
import redis import redis
import requests, re, time, pymysql, fitz import requests, re, time, pymysql, fitz
import urllib3 import urllib3
from kafka import KafkaProducer
from base import BaseCore from base import BaseCore
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
...@@ -51,45 +28,139 @@ lock = threading.Lock() ...@@ -51,45 +28,139 @@ lock = threading.Lock()
taskType = '企业年报' taskType = '企业年报'
pathType = 'QYYearReport/' pathType = 'QYYearReport/'
def sendKafka(dic_news):
def secrchATT(type_id, xydm): try: # 114.116.116.241
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'],max_request_size=1024*1024*20)
sel_sql = '''select * from clb_sys_attachment where item_id=%s ''' kafka_result = producer.send("researchReportTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
dic_result = {
'success': 'ture',
'message': '操作成功',
'code': '200',
}
log.info(dic_result)
return True
except Exception as e:
dic_result = {
'success': 'false',
'message': '操作失败',
'code': '204',
'e': e
}
log.info(dic_result)
return False
def getContent(file_href):
headers = {}
content = ''
headers['User-Agent'] = baseCore.getRandomUserAgent()
for i in range(0, 3):
try:
response = requests.get(file_href, headers=headers, verify=False, timeout=20)
break
except:
time.sleep(3)
continue
with fitz.open(stream=response.content, filetype='pdf') as doc:
page_size = doc.page_count
log.info(f'当前页码----{page_size}')
for page in doc.pages():
content += page.get_text()
return content
def secrchATT(type_id, xydm, year):
sel_sql = '''select * from clb_sys_attachment where item_id=%s and type_id=%s and year=%s'''
lock.acquire() lock.acquire()
cursor_.execute(sel_sql, (type_id, xydm)) cursor_.execute(sel_sql, (xydm, type_id, year))
selects = cursor_.fetchall() selects = cursor_.fetchall()
lock.release() lock.release()
return selects return selects
def selectShortName(xydm):
sel_sql = "select * from sys_base_enterprise where social_credit_code = %s"
lock.acquire()
cursor_.execute(sel_sql, xydm)
selects = cursor_.fetchone()
lock.release()
return selects
def main(): def main():
redis_conn = redis.Redis(connection_pool=pool) redis_conn = redis.Redis(connection_pool=pool)
info_ = redis_conn.lpop("NianBao:id")
# info_ = redis_conn.lpop("NoIPO:info")
info_ = '91310000132206289R|1725799077425945040|2022'
if info_: if info_:
pass pass
else: else:
log.info("++++已没有数据++++") log.info("++++已没有数据++++")
return return
info = info_.decode() # info = info_.decode()
info = info_
xydm = info.split('|')[0] xydm = info.split('|')[0]
att_id = info.split('|')[1] att_id = info.split('|')[1]
year = info.split('|')[2] year = info.split('|')[2]
if not xydm or not year: if not xydm or not year:
redis_conn.lpush('NianBao:info', info) redis_conn.lpush('info', info)
else: else:
selects = secrchATT(1, xydm) selects = secrchATT('1', xydm, year)
if len(selects) > 1: if len(selects) > 1:
redis_conn.lpush('NianBao:info', info) redis_conn.lpush('NianBao:info', info)
elif len(selects) == 1: elif len(selects) == 1:
file_name = selects[1] # results = selectShortName(xydm)
origin = selects[18] # if results:
create_time = selects[13] # pass
publishDate = selects[21] # else:
# redis_conn.lpush('NoIPO:info', info)
file_href = 'http://zzsn.luyuen.com' + str(selects[5]) # return
select = selects[0]
# name = results[3]
name = select[1]
if name:
# file_name = results[3] + ':' + year + '年年度报告'
file_name = name.split('.')[0]
else:
redis_conn.lpush('Noname:info', info)
return
log.info(f'-----------{file_name}-----------')
origin = select[18]
create_time = select[13]
publishDate = select[21]
if publishDate == '2023-12-31':
publishDate = '2023-08-31'
file_href = 'http://zzsn.luyuen.com' + str(select[5])
content = getContent(file_href)
lang = baseCore.detect_language(content)
if lang == 'cn':
lang = 'zh'
dic_info = {
'attachmentIds': att_id,
'author': '',
'content': content,
'contentWithTag': '',
'createDate': str(create_time),
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': lang,
'origin': origin,
'publishDate': publishDate,
'sid': '1684032033495392257',
'sourceAddress': '', # 原文链接
'summary': '',
'title': file_name,
'type': 1,
'socialCreditCode': xydm,
'year': year
}
sendKafka(dic_info)
time.sleep(1)
def run_threads(num_threads): def run_threads(num_threads):
threads = [] threads = []
...@@ -105,8 +176,9 @@ def run_threads(num_threads): ...@@ -105,8 +176,9 @@ def run_threads(num_threads):
thread.join() thread.join()
if __name__ == "__main__": if __name__ == "__main__":
start = time.time() while True:
num_threads = 5 start = time.time()
run_threads(num_threads) num_threads = 5
run_threads(num_threads)
log.info(f'5线程 总耗时{time.time() - start}秒') log.info(f'5线程 总耗时{time.time() - start}秒')
\ No newline at end of file \ No newline at end of file
...@@ -162,11 +162,11 @@ class EsMethod(object): ...@@ -162,11 +162,11 @@ class EsMethod(object):
'query': { 'query': {
'bool': { 'bool': {
'should':[ 'should':[
# {'term':{'origin' : '雪球网'}}, # {'term':{'origin': '证监会'}},
{'term':{'type' : 1}}, {'term':{'type': 1}},
], ],
'must': [ 'must': [
{'match': {'title': '.pdf'}} {'match': {'title': '.PDF'}}
] ]
} }
}, },
...@@ -221,11 +221,11 @@ if __name__ == '__main__': ...@@ -221,11 +221,11 @@ if __name__ == '__main__':
for mms in msglist: for mms in msglist:
id=mms['_source']['id'] id=mms['_source']['id']
title=mms['_source']['title'] title=mms['_source']['title']
utitle=title.replace('.pdf','') utitle=title.replace('.PDF','')
print(f'id:{id}---title:{title}--utitle:{utitle}') print(f'id:{id}---title:{title}--utitle:{utitle}')
esMethod.updateaunn(esMethod.index_name,str(id),utitle) esMethod.updateaunn(esMethod.index_name,str(id),utitle)
# esMethod.delete(esMethod.index_name,str(id)) # esMethod.delete(esMethod.index_name,str(id))
print('跟新成功!!') # print('更新成功!!')
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论