提交 d4632531 作者: 薛凌堃

研报处理

上级 481d2a7b
......@@ -79,16 +79,16 @@ class Token():
# 删除失效的token
def delete_token(self, cookie_):
deletesql = f"delete from QCC_token where cookies='{cookie_}' "
deletesql = f"delete from QCC_token where id='{cookie_}' "
cursor.execute(deletesql)
cnx.commit()
# token的处理
def updateTokeen(self,id_token, type):
if type == 2:
if type == 1:
# session失效,删除token
cursor.execute(f"delete from QCC_token where id={id_token}")
if type == 1:
if type == 2:
# 封号了 修改封号时间
cursor.execute(f"update QCC_token set fenghao_time=now() where id={id_token}")
if type == 3:
......
# -*- coding: utf-8 -*-
import json
import re
import threading
import time
import uuid
import fitz
import redis
import requests
from bs4 import BeautifulSoup
from obs import ObsClient
from retry import retry
from elasticsearch import Elasticsearch
from base import BaseCore
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
baseCore = BaseCore.BaseCore()
cnx_ = baseCore.cnx_
cursor_ = cnx_.cursor()
lock = threading.Lock()
pathType_ = 'QYResearchReport/'
taskType = '企业研报/东方财富网'
pool = redis.ConnectionPool(host="114.115.236.206", port=6379, password='clbzzsn', db=6)
class EsMethod(object):
def __init__(self):
# 创建Elasticsearch对象,并提供账号信息
self.es = Elasticsearch(['http://114.116.19.92:9700'], http_auth=('elastic', 'zzsn9988'),timeout=300 )
self.index_name='researchreportdata'
'''
删除
'''
def delete(self,index_name,id):
result = self.es.delete(index=index_name
,doc_type="_doc"
,id=id)
log.info('删除结果 %s' % result)
if __name__ == "__main__":
esMethod = EsMethod()
redis_conn = redis.Redis(connection_pool=pool)
while True:
# 从redis中读取数据,去附件表中根据title查询,更新查到的附件id
item = redis_conn.lpop('YanBao:up')
if item:
log.info(item)
id = item.decode()
esMethod.delete(esMethod.index_name,id)
else:
log.info('已删除完毕')
break
# -*- coding: utf-8 -*-
import json
import re
import threading
import time
import uuid
import fitz
import redis
import requests
from bs4 import BeautifulSoup
from obs import ObsClient
from retry import retry
from elasticsearch import Elasticsearch
from base import BaseCore
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
baseCore = BaseCore.BaseCore()
cnx_ = baseCore.cnx_
cursor_ = cnx_.cursor()
lock = threading.Lock()
pathType_ = 'QYResearchReport/'
taskType = '企业研报/东方财富网'
pool = redis.ConnectionPool(host="114.115.236.206", port=6379, password='clbzzsn', db=6)
obsClient = ObsClient(
access_key_id='VEHN7D0TJ9316H8AHCAV', # 你的华为云的ak码
secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk
server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址
)
class EsMethod(object):
def __init__(self):
# 创建Elasticsearch对象,并提供账号信息
self.es = Elasticsearch(['http://114.116.19.92:9700'], http_auth=('elastic', 'zzsn9988'), timeout=300)
self.index_name = 'researchreportdata'
def queryatt(self,index_name,id):
body = {
"query": {
"match": {
"id": id
}
}
}
filter_path = ['hits.hits._id',
'hits.total.value',
'hits.hits._source.title',
'hits.hits._source.origin',
'hits.hits._source.sourceAddress',
'hits.hits._source.createDate',
'hits.hits._source.publishDate',
] # 字段2
result = self.es.search(index=index_name
, doc_type='_doc'
, filter_path=filter_path
, body=body)
# log.info(result)
return result
def updateaunn(self,index_name,id,u_attid):
body = {
'doc': {
'attachmentIds': [str(u_attid)]
}
}
result = self.es.update(index=index_name
,id=id
,body=body)
log.info('更新结果:%s' % result)
def clean_text(text):
"""
清理多余空行
:param text:
:return:
"""
soup = BeautifulSoup(text, 'html.parser')
# print(soup.get_text())
text = soup.get_text()
# str1 = re.sub('[\n]+', '\n', 'dfadf d\n \n\n \nfa ds ')
text_ = re.sub('\n+', '\n', text.replace('\t', '').replace('\r', ''))
return text_
def getNewspdf(sourceAddress):
news_res = requests.get(sourceAddress)
news_soup = BeautifulSoup(news_res.content, 'html.parser')
# print(news_soup)
try:
if '抱歉,您访问的页面不存在或已删除!' in news_soup.title.text:
news_pdf = ''
except:
news_pdf = ''
try:
news_result = news_soup.find(class_='report-infos')
# print(news_result)
news_pdf = news_result.find_all('span')[4].find('a')['href']
# print(news_pdf)
except:
news_pdf = news_soup.find('span', class_='to-link').find('a')['href']
# print(news_soup)
finally:
return news_pdf
def getuuid():
get_timestamp_uuid = uuid.uuid1() # 根据 时间戳生成 uuid , 保证全球唯一
return get_timestamp_uuid
#获取文件大小
def convert_size(size_bytes):
# 定义不同单位的转换值
units = ['bytes', 'KB', 'MB', 'GB', 'TB']
i = 0
while size_bytes >= 1024 and i < len(units)-1:
size_bytes /= 1024
i += 1
return f"{size_bytes:.2f} {units[i]}"
def uptoOBS(pdf_url,pdf_name,type_id,pathType,createDate):
start_time = time.time()
headers = {}
retData = {'state': False, 'type_id': type_id, 'item_id': '', 'group_name': '', 'path': '',
'full_path': '',
'category': 'pdf', 'file_size': '', 'status': 1, 'create_by': 'XueLingKun',
'create_time': createDate, 'page_size': '', 'content': ''}
headers['User-Agent'] = baseCore.getRandomUserAgent()
for i in range(0, 3):
try:
response = requests.get(pdf_url, headers=headers, verify=False, timeout=20)
file_size = int(response.headers.get('Content-Length'))
break
except:
time.sleep(3)
continue
name = str(getuuid()) + '.pdf'
try:
result = getOBSres(pathType, name, response)
except:
log.error(f'OBS发送失败')
return retData
try:
with fitz.open(stream=response.content, filetype='pdf') as doc:
page_size = doc.page_count
for page in doc.pages():
retData['content'] += page.get_text()
except:
page_size = 0
log.error(f'文件损坏')
return retData
if page_size < 1:
# pdf解析失败
# print(f'======pdf解析失败=====')
return retData
else:
try:
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
retData['state'] = True
retData['path'] = result['body']['objectUrl'].split('.com')[1]
retData['full_path'] = result['body']['objectUrl']
retData['file_size'] = convert_size(file_size)
retData['create_time'] = time_now
retData['page_size'] = page_size
except Exception as e:
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog('', taskType, state, takeTime, pdf_url, f'{e}')
return retData
return retData
@retry(tries=3, delay=1)
def getOBSres(pathType,name, response):
result = obsClient.putContent('zzsn', pathType + str(name), content=response.content)
# resp = obsClient.putFile('zzsn', pathType + name, file_path='要上传的那个文件的本地路径')
return result
def secrchATT(retData, type_id):
sel_sql = '''select id from clb_sys_attachment where type_id=%s and path = %s '''
lock.acquire()
cursor_.execute(sel_sql, (type_id,retData['path']))
selects = cursor_.fetchone()
lock.release()
return selects
# 插入到att表 返回附件id
def tableUpdate(retData, year, pdf_name, num,pub_time,origin):
item_id = retData['item_id']
type_id = retData['type_id']
group_name = retData['group_name']
path = retData['path']
full_path = retData['full_path']
category = retData['category']
file_size = retData['file_size']
status = retData['status']
create_by = retData['create_by']
page_size = retData['page_size']
create_time = retData['create_time']
order_by = num
# selects = secrchATT(item_id, pdf_name, type_id)
#
# if selects:
# log.info(f'pdf_name:{pdf_name}已存在')
# id = ''
# return id
# else:
try:
Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,page_size,object_key,bucket_name,publish_time,source) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
values = (
year, pdf_name, type_id, item_id, group_name, path, full_path, category, file_size, order_by,
status, create_by,
create_time, page_size, full_path.split('https://zzsn.obs.cn-north-1.myhuaweicloud.com/')[1], 'zzsn',
pub_time, origin)
lock.acquire()
cursor_.execute(Upsql, values) # 插入
cnx_.commit() # 提交
lock.release()
except Exception as e:
log.info(e)
return ''
log.info(f"更新完成:{item_id}===={pdf_name}")
selects = secrchATT(retData, type_id)
id = selects[0]
return id
def upload(news_pdf,num,title,pathType,year,publishDate,createDate):
# todo:链接上传obs
retData = uptoOBS(news_pdf, title + '.pdf', 4, pathType, createDate)
# 附件插入att数据库
if retData['state']:
pass
else:
log.info(f'====pdf解析失败====')
return None
num = num + 1
origin = ''
att_id = tableUpdate(retData, year, title + '.pdf', num, publishDate, origin)
if att_id:
return att_id
else:
return None
def main(esMethod):
redis_conn = redis.Redis(connection_pool=pool)
id_ = redis_conn.lpop('YanBao:up')
id = id_.decode()
# id = "23063023845"
if id:
pass
else:
log.info('已无数据')
return
result_ = esMethod.queryatt(index_name=esMethod.index_name,id=id)
result = result_['hits']['hits'][0]
num = 0
title = result['_source']['title']
origin = result['_source']['origin']
if origin == '行业研报':
pathType = 'HYResearchReport/'
else:
pathType = pathType_
sourceAddress = result['_source']['sourceAddress']
news_pdf = getNewspdf(sourceAddress)
if news_pdf:
pass
else:
return
publishDate = result['_source']['publishDate']
createDate = result['_source']['createDate']
log.info(f'{id}---{title}--{sourceAddress}---{pathType}')
att_id = upload(news_pdf,num,title,pathType,publishDate[:4],publishDate,createDate)
if att_id:
pass
else:
return
u_attid = att_id
esMethod.updateaunn(esMethod.index_name, str(id), u_attid)
def run_threads(num_threads,esMethod):
threads = []
for i in range(num_threads):
thread = threading.Thread(target=main, args=(esMethod,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
if __name__ == '__main__':
while True:
esMethod = EsMethod()
start = time.time()
num_threads = 5
run_threads(num_threads,esMethod)
log.info(f'5线程 总耗时{time.time()-start}秒')
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论