提交 35396c0b 作者: 薛凌堃

11/01

上级 bf1890e2
This source diff could not be displayed because it is too large. You can view the blob instead.
...@@ -666,7 +666,7 @@ class BaseCore: ...@@ -666,7 +666,7 @@ class BaseCore:
self.cnx_.commit() self.cnx_.commit()
# 插入到att表 返回附件id # 插入到att表 返回附件id
def tableUpdate(self, retData, com_name, year, pdf_name, num, pub_time,origin): def tableUpdate(self, retData, year, pdf_name, num, pub_time,origin):
item_id = retData['item_id'] item_id = retData['item_id']
type_id = retData['type_id'] type_id = retData['type_id']
group_name = retData['group_name'] group_name = retData['group_name']
......
...@@ -28,8 +28,8 @@ pathType = 'QYYearReport/' ...@@ -28,8 +28,8 @@ pathType = 'QYYearReport/'
type_id = 1 type_id = 1
create_by = 'XueLingKun' create_by = 'XueLingKun'
taskType = '企业年报' taskType = '企业年报'
#付俊雪的需要改为巨潮资讯网1_福布斯2000_PDF_60_付
file_path = 'D:\\年报\\失败' file_path = 'D:\\年报\\欧盟记分牌2500_年报补充_718_20231018'
log.info(f'=============当前pid为{baseCore.getPID()}==============') log.info(f'=============当前pid为{baseCore.getPID()}==============')
def sendKafka(dic_news): def sendKafka(dic_news):
...@@ -146,9 +146,9 @@ if __name__=='__main__': ...@@ -146,9 +146,9 @@ if __name__=='__main__':
social_code = data[1] social_code = data[1]
ename = data[2] ename = data[2]
cname = data[3] cname = data[3]
file_name = cname + ':' + file_year + '年年度报告' + '.pdf' file_name = ename + ':' + file_year + '年年度报告' + '.pdf'
content = '' content = ''
origin = cname + '官网' origin = ename + '官网'
#解析文件页数和内容 #解析文件页数和内容
log.info(f"-----------正在处理{file_name}--------------") log.info(f"-----------正在处理{file_name}--------------")
with open(pdf_path, 'rb') as file: with open(pdf_path, 'rb') as file:
......
...@@ -3,7 +3,9 @@ ...@@ -3,7 +3,9 @@
""" """
import json import json
import os import os
import re
import time import time
import uuid
from kafka import KafkaProducer from kafka import KafkaProducer
from obs import ObsClient from obs import ObsClient
...@@ -24,12 +26,15 @@ baseCore = BaseCore.BaseCore() ...@@ -24,12 +26,15 @@ baseCore = BaseCore.BaseCore()
log = baseCore.getLogger() log = baseCore.getLogger()
cnx = baseCore.cnx cnx = baseCore.cnx
cursor = baseCore.cursor cursor = baseCore.cursor
cnx_ = baseCore.cnx_
cursor_ = baseCore.cursor_
pathType = 'QYYearReport/' pathType = 'QYYearReport/'
type_id = 1 type_id = 1
create_by = 'XueLingKun' create_by = 'XueLingKun'
taskType = '企业年报' taskType = '企业年报'
#付俊雪的需要改为巨潮资讯网 #付俊雪的需要改为巨潮资讯网
file_path = 'D:\\BaiduNetdiskDownload\\1_福布斯2000_PDF_50_郑' file_path = 'D:\\年报\\年度报告\\中石化炼化工程年度报告'
log.info(f'=============当前pid为{baseCore.getPID()}==============') log.info(f'=============当前pid为{baseCore.getPID()}==============')
def sendKafka(dic_news): def sendKafka(dic_news):
...@@ -66,6 +71,10 @@ def sendKafka(dic_news): ...@@ -66,6 +71,10 @@ def sendKafka(dic_news):
log.info(dic_result) log.info(dic_result)
return False return False
def getuuid():
get_timestamp_uuid = uuid.uuid1() # 根据 时间戳生成 uuid , 保证全球唯一
return get_timestamp_uuid
def uptoOBS(retData, pathType, taskType, start_time,file_name,pdf_path): def uptoOBS(retData, pathType, taskType, start_time,file_name,pdf_path):
""" """
retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': '', 'path': '', retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': '', 'path': '',
...@@ -92,7 +101,8 @@ def uptoOBS(retData, pathType, taskType, start_time,file_name,pdf_path): ...@@ -92,7 +101,8 @@ def uptoOBS(retData, pathType, taskType, start_time,file_name,pdf_path):
'category': category, 'file_size': file_size, 'status': status, 'create_by': create_by, 'category': category, 'file_size': file_size, 'status': status, 'create_by': create_by,
'create_time': create_time, 'page_size': page_size, 'content': content} 'create_time': create_time, 'page_size': page_size, 'content': content}
try: try:
result = getOBSres(pathType, file_name, pdf_path) name = str(getuuid()) + '.pdf'
result = getOBSres(pathType, name, pdf_path)
except: except:
log = baseCore.getLogger() log = baseCore.getLogger()
log.error(f'OBS发送失败') log.error(f'OBS发送失败')
...@@ -117,6 +127,13 @@ def getOBSres(pathType, name, response): ...@@ -117,6 +127,13 @@ def getOBSres(pathType, name, response):
result = obsClient.putFile('zzsn', pathType+name, file_path=response) result = obsClient.putFile('zzsn', pathType+name, file_path=response)
return result return result
def secrchATT(item_id, year, type_id):
sel_sql = '''select id from clb_sys_attachment where item_id = %s and year = %s and type_id=%s'''
cursor_.execute(sel_sql, (item_id, year, type_id))
selects = cursor_.fetchone()
return selects
if __name__=='__main__': if __name__=='__main__':
log.info(f'-----------当前文件{file_path}---------------') log.info(f'-----------当前文件{file_path}---------------')
file_list = os.listdir(file_path) file_list = os.listdir(file_path)
...@@ -126,19 +143,27 @@ if __name__=='__main__': ...@@ -126,19 +143,27 @@ if __name__=='__main__':
start_time = time.time() start_time = time.time()
pdf_path = file_path + '/'+file pdf_path = file_path + '/'+file
file_rank = int(file.split('-')[0]) # file_name_ = file.split('-')[0].replace('公司','')
file_year = file.split('-')[1] file_year = re.findall('\d{4}', file)[0]
file_name_ = file.split(file_year)[0]
#file_rank 对应上企业信用代码 #file_rank 对应上企业信用代码
selectsql = f"select * from rankandcode where id = {file_rank}" selectsql = f"select * from 500Report where com_name = '{file_name_}'"
cursor.execute(selectsql) cursor.execute(selectsql)
data = cursor.fetchone() data = cursor.fetchone()
cnx.commit() cnx.commit()
social_code = data[1] social_code = data[2]
ename = data[2]
cname = data[3] file_name = file_name_ + ':' + file_year + '年年度报告' + '.pdf'
file_name = cname + ':' + file_year + '年年度报告' + '.pdf'
content = '' content = ''
origin = file_name_ + '官网'
selects = secrchATT(social_code,file_year,1)
if selects:
# self.getLogger().info(f'com_name:{com_name}--{year}已存在')
log.info(f'===={file_name}--年报已存在===')
continue
#解析文件页数和内容 #解析文件页数和内容
log.info(f"-----------正在处理{file_name}--------------") log.info(f"-----------正在处理{file_name}--------------")
with open(pdf_path, 'rb') as file: with open(pdf_path, 'rb') as file:
...@@ -153,7 +178,7 @@ if __name__=='__main__': ...@@ -153,7 +178,7 @@ if __name__=='__main__':
content += page.get_text() content += page.get_text()
# print(content) # print(content)
except Exception as e: except Exception as e:
log.info(f'文件已损坏:{cname}') log.info(f'文件已损坏:{file_name}')
continue continue
#解析文件大小 #解析文件大小
file_size = os.path.getsize(pdf_path) file_size = os.path.getsize(pdf_path)
...@@ -168,8 +193,9 @@ if __name__=='__main__': ...@@ -168,8 +193,9 @@ if __name__=='__main__':
retData_f = uptoOBS(retData, pathType, taskType, start_time,file_name,pdf_path) retData_f = uptoOBS(retData, pathType, taskType, start_time,file_name,pdf_path)
if retData_f['state']: if retData_f['state']:
#retData, com_name, year, pdf_name, num, pub_time #retData, com_name, year, pdf_name, num, pub_time
att_id= baseCore.tableUpdate(retData_f, cname,file_year,file_name, num,file_year+'-12-31') att_id= baseCore.tableUpdate(retData_f,file_year,file_name, num,file_year+'-12-31',origin)
if att_id: if att_id:
detect_language = baseCore.detect_language(content)
dic_news = { dic_news = {
'attachmentIds': att_id, 'attachmentIds': att_id,
'author': '', 'author': '',
...@@ -179,8 +205,8 @@ if __name__=='__main__': ...@@ -179,8 +205,8 @@ if __name__=='__main__':
'deleteFlag': '0', 'deleteFlag': '0',
'id': '', 'id': '',
'keyWords': '', 'keyWords': '',
'lang': 'zh', 'lang': detect_language,
'origin': '企业官网', 'origin': origin,
'publishDate': file_year + '-12-31', 'publishDate': file_year + '-12-31',
'sid': '1684032033495392257', 'sid': '1684032033495392257',
'sourceAddress': '', # 原文链接 'sourceAddress': '', # 原文链接
...@@ -191,13 +217,15 @@ if __name__=='__main__': ...@@ -191,13 +217,15 @@ if __name__=='__main__':
'year': file_year 'year': file_year
} }
if sendKafka(dic_news): if sendKafka(dic_news):
log.info(f'成功-{file_rank}--{file_name}----{att_id}---{social_code}') log.info(f'成功---{file_name}----{att_id}---{social_code}')
num += 1 num += 1
else: else:
log.info(f'失败-{file_rank}--{file_name}----{att_id}---{social_code}') log.info(f'失败---{file_name}----{att_id}---{social_code}')
# 删除插入的数据 400表示发送数据失败 # 删除插入的数据 400表示发送数据失败
baseCore.deliteATT(att_id) baseCore.deliteATT(att_id)
log.info(f'已删除插入附件表的数据---{file_name}-----{social_code}') log.info(f'已删除插入附件表的数据---{file_name}-----{social_code}')
else:
log.info(f'-----年报已存在--{social_code}--{file_name}-----')
except Exception as e: except Exception as e:
log.info(f'error------{e}') log.info(f'error------{e}')
\ No newline at end of file
from obs import ObsClient
from base.BaseCore import BaseCore
baseCore = BaseCore()
cnx_=baseCore.cnx_
cursor_=baseCore.cursor_
# 创建ObsClient对象
obs_client = ObsClient(access_key_id='VEHN7D0TJ9316H8AHCAV', secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', server='https://obs.cn-north-1.myhuaweicloud.com')
def delete(object_keys):
# 指定要删除的文件名列表
bucket_name = 'zzsn'
# object_keys = ['QYNotice/8921b4b0-7853-11ee-bcc0-000c29312880.pdf']
# 批量删除文件
for object_key in object_keys:
resp = obs_client.deleteObject(bucket_name, object_key)
if resp.status >= 200 and resp.status < 300:
print(f"文件 {object_key} 删除成功!")
else:
print(f"文件 {object_key} 删除失败! 错误码:{resp.errorCode},错误信息:{resp.errorMessage}")
# 关闭ObsClient对象
obs_client.close()
if __name__=='__main__':
query = "SELECT object_key FROM clb_sys_attachment WHERE type_id=8 AND source = '证监会' AND create_time >= '2023-10-30 16:46:09' AND create_time <= '2023-11-01 09:11:12'"
cursor_.execute(query)
results = cursor_.fetchall()
object_keys = [item[0] for item in results]
delete(object_keys)
\ No newline at end of file
...@@ -56,7 +56,7 @@ if __name__=="__main__": ...@@ -56,7 +56,7 @@ if __name__=="__main__":
url = "https://mp.weixin.qq.com/" url = "https://mp.weixin.qq.com/"
browser.get(url) browser.get(url)
# 可改动 # 可改动
time.sleep(20) time.sleep(40)
s = requests.session() s = requests.session()
#获取到token和cookies #获取到token和cookies
......
...@@ -149,10 +149,10 @@ class EsMethod(object): ...@@ -149,10 +149,10 @@ class EsMethod(object):
''' '''
删除 删除
''' '''
def delete(self,index_name): def delete(self,index_name,id):
result = self.es.delete(index=index_name result = self.es.delete(index=index_name
,doc_type="_doc" ,doc_type="_doc"
,id='20220901-XXXXXX') ,id=id)
print('删除结果 %s' % result) print('删除结果 %s' % result)
''' '''
...@@ -163,16 +163,50 @@ class EsMethod(object): ...@@ -163,16 +163,50 @@ class EsMethod(object):
'query': { 'query': {
'bool': { 'bool': {
'should':[ 'should':[
{'term':{'origin' : '雪球网'}}, {'term':{'labels.relationId' : '91110108740053589U'}},
{'term':{'type' : 1}}, {'term':{'type' : 3}},
], ],
'must': [ # 'must': [
{'match': {'title': '.pdf'}} # {'match': {'title': '.pdf'}}
] # ]
} }
}, },
'from' : pnum, 'from' : pnum,
'size' : 6000, 'size' : 600,
}
body = {
"query": {
"bool": {
"must": [
{
"nested": {
"path": "labels",
"query": {
"match": {
"labels.relationId": "91110108740053589U"
}
}
}
},
{
"term": {
"type.keyword": {
"value": "3"
}
}
}
]
}
},
"sort": [
{
"publishDate": {
"order": "desc"
}
}
],
"track_total_hits": True,
"size": 212
} }
filter_path=['hits.hits._source.title', # 字段1 filter_path=['hits.hits._source.title', # 字段1
...@@ -215,13 +249,14 @@ if __name__ == '__main__': ...@@ -215,13 +249,14 @@ if __name__ == '__main__':
print(f'第{pnum}页数据') print(f'第{pnum}页数据')
result=esMethod.multi_should(index_name=esMethod.index_name,pnum=p) result=esMethod.multi_should(index_name=esMethod.index_name,pnum=p)
msglist=result['hits']['hits'] msglist=result['hits']['hits']
print(msglist) # print(msglist)
for mms in msglist: for mms in msglist:
id=mms['_source']['id'] id=mms['_source']['id']
title=mms['_source']['title'] title=mms['_source']['title']
utitle=title.replace('.pdf','') utitle=title.replace('.pdf','')
print(f'id:{id}---title:{title}--utitle:{utitle}') print(f'id:{id}---title:{title}--utitle:{utitle}')
esMethod.updateaunn(esMethod.index_name,str(id),utitle) # esMethod.updateaunn(esMethod.index_name,str(id),utitle)
esMethod.delete(esMethod.index_name,str(id))
print('跟新成功!!') print('跟新成功!!')
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论