提交 687dbf5e 作者: 薛凌堃

12/07

上级 87f47a13
...@@ -127,7 +127,7 @@ def NoticeDF(): ...@@ -127,7 +127,7 @@ def NoticeDF():
cnx, cursor = connectSql() cnx, cursor = connectSql()
# 获取美股企业 # 获取美股企业
# # mg_query = "SELECT a.SocialCode From EnterpriseInfo a ,EnterpriseType b WHERE a.SocialCode = b.SocialCode AND b.type=2 AND a.Place=0 AND SecuritiesCode is not null AND SecuritiesCode not LIKE '%.%'" # # mg_query = "SELECT a.SocialCode From EnterpriseInfo a ,EnterpriseType b WHERE a.SocialCode = b.SocialCode AND b.type=2 AND a.Place=0 AND SecuritiesCode is not null AND SecuritiesCode not LIKE '%.%'"
# mg_query = "SELECT a.SocialCode From EnterpriseInfo a ,EnterpriseType b WHERE a.SocialCode = b.SocialCode AND b.type=3 AND a.Place=0 AND SecuritiesCode is not null AND SecuritiesCode not LIKE '%.%'" # mg_query = "SELECT a.SocialCode From EnterpriseInfo a ,EnterpriseType b WHERE a.SocialCode = b.SocialCode AND b.type=6 AND a.Place=0 AND SecuritiesCode is not null AND SecuritiesCode not LIKE '%.%'"
# cursor.execute(mg_query) # cursor.execute(mg_query)
# cnx.commit() # cnx.commit()
# mg_result = cursor.fetchall() # mg_result = cursor.fetchall()
...@@ -141,7 +141,7 @@ def NoticeDF(): ...@@ -141,7 +141,7 @@ def NoticeDF():
# # r.rpush('NoticeEnterprise:mgqy_socialCode_add', item) # # r.rpush('NoticeEnterprise:mgqy_socialCode_add', item)
# 获取港股企业 # 获取港股企业
gg_query = "SELECT a.SocialCode From EnterpriseInfo a ,EnterpriseType b WHERE a.SocialCode = b.SocialCode AND b.type=2 And SecuritiesCode like '%.HK'" gg_query = "SELECT a.SocialCode From EnterpriseInfo a ,EnterpriseType b WHERE a.SocialCode = b.SocialCode AND b.type=6 And SecuritiesCode like '%.HK'"
cursor.execute(gg_query) cursor.execute(gg_query)
cnx.commit() cnx.commit()
gg_result = cursor.fetchall() gg_result = cursor.fetchall()
...@@ -588,6 +588,15 @@ def global100(): ...@@ -588,6 +588,15 @@ def global100():
for item in com_namelist: for item in com_namelist:
r.rpush('global100:baseinfo', item) r.rpush('global100:baseinfo', item)
def ipo_code():
cnx, cursor = connectSql()
query = "SELECT gpdm FROM gpdm1"
cursor.execute(query)
result = cursor.fetchall()
cnx.commit()
com_namelist = [item[0] for item in result]
for item in com_namelist:
r.rpush('gpdm:info', item)
if __name__ == "__main__": if __name__ == "__main__":
start = time.time() start = time.time()
...@@ -597,7 +606,7 @@ if __name__ == "__main__": ...@@ -597,7 +606,7 @@ if __name__ == "__main__":
# shuangbaiqiye() # shuangbaiqiye()
# zhuangjingtexind() # zhuangjingtexind()
# NoticeEnterprise() # NoticeEnterprise()
NoticeDF() # NoticeDF()
# AnnualEnterpriseIPO() # AnnualEnterpriseIPO()
# AnnualEnterprise() # AnnualEnterprise()
# BaseInfoEnterprise() # BaseInfoEnterprise()
...@@ -621,5 +630,6 @@ if __name__ == "__main__": ...@@ -621,5 +630,6 @@ if __name__ == "__main__":
# NoticeEnterprise_task() # NoticeEnterprise_task()
# AnnualEnterprise_task() # AnnualEnterprise_task()
# FinanceFromEast() # FinanceFromEast()
ipo_code()
log.info(f'====={basecore.getNowTime(1)}=====添加数据成功======耗时:{basecore.getTimeCost(start,time.time())}===') log.info(f'====={basecore.getNowTime(1)}=====添加数据成功======耗时:{basecore.getTimeCost(start,time.time())}===')
...@@ -47,7 +47,7 @@ if __name__ == "__main__": ...@@ -47,7 +47,7 @@ if __name__ == "__main__":
redis_conn = redis.Redis(connection_pool=pool) redis_conn = redis.Redis(connection_pool=pool)
while True: while True:
# 从redis中读取数据,去附件表中根据title查询,更新查到的附件id # 从redis中读取数据,去附件表中根据title查询,更新查到的附件id
item = redis_conn.lpop('YanBao:up') item = redis_conn.lpop('YanBao:aa')
if item: if item:
log.info(item) log.info(item)
id = item.decode() id = item.decode()
......
...@@ -52,8 +52,8 @@ class EsMethod(object): ...@@ -52,8 +52,8 @@ class EsMethod(object):
{ {
"range": { "range": {
"createDate": { "createDate": {
"gte": "2023-11-28T10:00:00", "gte": "2023-11-29T10:00:00",
"lte": "2023-11-29T10:00:00" "lte": "2023-12-01T10:00:00"
} }
} }
} }
...@@ -85,7 +85,11 @@ def main(page, p, esMethod): ...@@ -85,7 +85,11 @@ def main(page, p, esMethod):
if total == 0: if total == 0:
log.info('++++已没有数据+++++') log.info('++++已没有数据+++++')
return return
msglist = result['hits']['hits'] try:
msglist = result['hits']['hits']
except:
log.info(f'error-----{result}')
return
log.info(f'---第{page}页{len(msglist)}条数据----共{total}条数据----') log.info(f'---第{page}页{len(msglist)}条数据----共{total}条数据----')
for mms in msglist: for mms in msglist:
...@@ -118,7 +122,7 @@ def run_threads(num_threads,esMethod,j): ...@@ -118,7 +122,7 @@ def run_threads(num_threads,esMethod,j):
if __name__ == "__main__": if __name__ == "__main__":
j = 0 j = 0
for i in range(5): for i in range(2):
esMethod = EsMethod() esMethod = EsMethod()
# result = esMethod.queryatt(index_name=esMethod.index_name, pnum=p) # result = esMethod.queryatt(index_name=esMethod.index_name, pnum=p)
# total = result['hits']['total']['value'] # total = result['hits']['total']['value']
......
import json
from kafka import KafkaProducer
import requests, time, fitz
import urllib3
from base import BaseCore
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
baseCore = BaseCore.BaseCore()
cnx_ = baseCore.cnx_
cursor_ = cnx_.cursor()
cnx = baseCore.cnx
cursor = baseCore.cursor
def sendKafka(dic_news):
try: # 114.116.116.241
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'],max_request_size=1024*1024*20)
kafka_result = producer.send("researchReportTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
dic_result = {
'success': 'ture',
'message': '操作成功',
'code': '200',
}
log.info(dic_result)
return True
except Exception as e:
dic_result = {
'success': 'false',
'message': '操作失败',
'code': '204',
'e': e
}
log.info(dic_result)
return False
#state1
def secrchATT(type_id, xydm, att_id):
sel_sql = '''select * from clb_sys_attachment where item_id=%s and type_id=%s and id=%s'''
cursor_.execute(sel_sql, (xydm, type_id, att_id))
selects = cursor_.fetchall()
return selects
if __name__ == '__main__':
header = {
'Connection': 'keep-alive',
'Cache-Control': 'max-age=0',
'sec-ch-ua': '"Chromium";v="112", "Microsoft Edge";v="112", "Not:A-Brand";v="99"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36 Edg/112.0.1722.64',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-User': '?1',
'Sec-Fetch-Dest': 'document',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
}
info_list = [
# 'ZZSN230824151202408|1724390910695550977|2017',
'ZZSN230912210754179|18703841781|2022'
]
for info in info_list:
xydm = info.split('|')[0]
att_id = info.split('|')[1]
year = info.split('|')[2]
selects = secrchATT('1', xydm, att_id)
if len(selects) > 1:
pass
elif len(selects) == 1:
select = selects[0]
file_name = select[1]
origin = select[18]
create_time = select[13]
publishDate = select[21]
if year == '2023':
publishDate = '2023-08-31'
full_path = 'http://zzsn.luyuen.com/' + str(select[19])
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
create_time = time_now
content = ''
for i in range(0, 3):
try:
response = requests.get(url=full_path, headers=header, timeout=30)
break
except Exception as e:
time.sleep(3)
continue
with fitz.open(stream=response.content, filetype='pdf') as doc:
page_size = doc.page_count
log = baseCore.getLogger()
log.info(f'当前页码----{page_size}')
for page in doc.pages():
content += page.get_text()
detect_language = baseCore.detect_language(content)
dic_news = {
'attachmentIds': att_id,
'author': '',
'content': content,
'contentWithTag': '',
'createDate': str(create_time),
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': detect_language,
'origin': origin,
# 'origin': '雪球网',
'publishDate': publishDate,
'sid': '1684032033495392257',
'sourceAddress': '', # 原文链接
'summary': '',
'title': file_name.replace('.pdf','').replace('.PDF',''),
'type': 1,
'socialCreditCode': xydm,
'year': year
}
if sendKafka(dic_news):
# 100表示成功
log.info(f'==========={xydm}成功============')
cnx.close()
cursor_.close()
baseCore.close()
...@@ -29,7 +29,7 @@ type_id = 1 ...@@ -29,7 +29,7 @@ type_id = 1
create_by = 'XueLingKun' create_by = 'XueLingKun'
taskType = '企业年报' taskType = '企业年报'
file_path = 'D:\\年报\\福布斯2000年报PDF下载-207' file_path = 'D:\\年报\\'
log.info(f'=============当前pid为{baseCore.getPID()}==============') log.info(f'=============当前pid为{baseCore.getPID()}==============')
def sendKafka(dic_news): def sendKafka(dic_news):
...@@ -133,22 +133,26 @@ if __name__=='__main__': ...@@ -133,22 +133,26 @@ if __name__=='__main__':
start_time = time.time() start_time = time.time()
pdf_path = file_path + '/'+file pdf_path = file_path + '/'+file
file_rank = int(file.split('-')[0]) # file_rank = int(file.split('-')[0])
file_year = file.split('-')[1] # file_year = file.split('-')[1]
if file_year== '2023': file_year = '2021'
print(pdf_path) # if file_year== '2023':
continue # print(pdf_path)
# continue
#file_rank 对应上企业信用代码 #file_rank 对应上企业信用代码
selectsql = f"select * from rankandcode where id = {file_rank}" # selectsql = f"select * from rankandcode where id = {file_rank}"
cursor.execute(selectsql) # cursor.execute(selectsql)
data = cursor.fetchone() # data = cursor.fetchone()
cnx.commit() # cnx.commit()
social_code = data[1] # social_code = data[1]
ename = data[2] # ename = data[2]
cname = data[3] # cname = data[3]
file_name = cname + ':' + file_year + '年年度报告' + '.pdf' # file_name = cname + ':' + file_year + '年年度报告' + '.pdf'
content = '' content = ''
origin = cname + '官网' # origin = cname + '官网'
social_code = 'ZZSN22080900000177'
file_name = '好事达:2021年年度报告.pdf'
origin = '好事达官网'
#解析文件页数和内容 #解析文件页数和内容
log.info(f"-----------正在处理{file_name}--------------") log.info(f"-----------正在处理{file_name}--------------")
with open(pdf_path, 'rb') as file: with open(pdf_path, 'rb') as file:
...@@ -163,7 +167,7 @@ if __name__=='__main__': ...@@ -163,7 +167,7 @@ if __name__=='__main__':
content += page.get_text() content += page.get_text()
# print(content) # print(content)
except Exception as e: except Exception as e:
log.info(f'文件已损坏:{ename}') # log.info(f'文件已损坏:{ename}')
continue continue
#解析文件大小 #解析文件大小
file_size = os.path.getsize(pdf_path) file_size = os.path.getsize(pdf_path)
......
"""
采集上市信息企业信用代码,跟数据库对比
"""
import json
import time
import requests
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
from base.BaseCore import BaseCore
baseCore = BaseCore()
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
taskType = '上市信息/东方财富网'
log = baseCore.getLogger()
error_list = []
list_all_info = []
def createDriver():
chrome_driver = r'D:\cmd100\chromedriver.exe'
path = Service(chrome_driver)
chrome_options = webdriver.ChromeOptions()
chrome_options.binary_location = r'D:\Google\Chrome\Application\chrome.exe'
# 设置代理
# proxy = "127.0.0.1:8080" # 代理地址和端口
# chrome_options.add_argument('--proxy-server=http://' + proxy)
driver = webdriver.Chrome(service=path, chrome_options=chrome_options)
return driver
# 需要提供股票代码、企业信用代码
while True:
start = time.time()
com_code1 = baseCore.redicPullData('gpdm:info')
# com_code1 = '837592'
if com_code1:
pass
else:
log.info('已没有数据')
break
# 股票代码0、2、3开头的为深圳交易所,6、9开头的为上海交易所,8开头的为北京交易所
if com_code1[0] == '2' or com_code1[0] == '0' or com_code1[0] == '3':
com_code = 'sz' + com_code1
if com_code1[0] == '9' or com_code1[0] == '6':
com_code = 'sh' + com_code1
if com_code1[0] == '8' or com_code1[0] == '4':
com_code = 'bj' + com_code1
if com_code1[0] == 'A':
com_code = ''
log.info(f'======开始采集{com_code1}======')
if 'bj' in com_code:
url = f'https://quote.eastmoney.com/bj/{com_code1}.html'
else:
url = f'https://quote.eastmoney.com/{com_code}.html'
url_1 = f'https://emweb.eastmoney.com/PC_HSF10/CompanySurvey/PageAjax?code={com_code}'
url_2 = f'https://emweb.eastmoney.com/PC_HSF10/BusinessAnalysis/PageAjax?code={com_code}'
browser = createDriver()
browser.get(url)
time.sleep(8)
page_source = browser.page_source
soup_t = BeautifulSoup(page_source, 'html.parser')
try:
result = soup_t.find('div',class_='quote_quotenums').text
# print(f'result:{result}')
# if result=='未上市'or result=='已退市':
if result == '未上市' :
tag = 2
if result == '已退市':
tag = 0
else:
tag = 1
except Exception as e:
error_list.append(com_code)
log.info(f'={com_code}===解析上市状态失败=====')
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog('', taskType, state, takeTime, '', f'{com_code}解析上市状态失败--e:{e}')
print('error')
continue
requests.adapters.DEFAULT_RETRIES = 5
json_1 = requests.get(url_1,verify=False).json()
json_2 = requests.get(url_2,verify=False).json()
zhengquan_type = json_1['jbzl'][0]['SECURITY_TYPE']
# print(zhengquan_type)
if 'A' in zhengquan_type:
# print(zhengquan_type)
category = '1'
if 'B' in zhengquan_type:
category = '2'
if '新三板' in zhengquan_type:
category = '3'
if 'H' in zhengquan_type:
category = '4'
id_code = json_1['jbzl'][0]['REG_NUM']
zhuyingfanwei = json_2['zyfw'][0]['BUSINESS_SCOPE']
dic_cwsj = {
"category": category, # 股票类型(1-A股;2-B股;3-新三板;4-H股)
'listed':tag,
"securitiesCode": com_code[2:],
"securitiesType": zhengquan_type,
"socialCreditCode": id_code,
}
list_all_info.append(dic_cwsj)
log.info(f'======{com_code}====采集成功=====')
updateSql = f"update gpdm1 set xydm = '{id_code}',tag = '{tag}',category = '{category}' where gpdm = '{com_code1}'"
cursor_.execute(updateSql)
cnx_.commit()
# 核心工具包 # 核心工具包
import os import os
import random import random
import socket
import sys import sys
import time import time
import uuid import uuid
import fitz
import logbook import logbook
import logbook.more import logbook.more
import pandas as pd import pandas as pd
import requests import requests
import zhconv import zhconv
import pymysql
import redis import redis
from docx import Document
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from openpyxl import Workbook
import langid import langid
# 创建连接池 # 创建连接池
import pymysql import pymysql
from pymysql import connections
from DBUtils.PooledDB import PooledDB
# import sys
# sys.path.append('D://zzsn_spider//base//fdfs_client')
from fdfs_client.client import get_tracker_conf, Fdfs_client
tracker_conf = get_tracker_conf('D:\\zzsn_spider\\comData\\policylaw\\client.conf') from DBUtils.PooledDB import PooledDB
client = Fdfs_client(tracker_conf)
# 注意 程序退出前 调用BaseCore.close() 关闭相关资源 # 注意 程序退出前 调用BaseCore.close() 关闭相关资源
from obs import ObsClient from obs import ObsClient
import fitz import fitz
from urllib.parse import unquote
obsClient = ObsClient( obsClient = ObsClient(
access_key_id='VEHN7D0TJ9316H8AHCAV', # 你的华为云的ak码 access_key_id='VEHN7D0TJ9316H8AHCAV', # 你的华为云的ak码
...@@ -263,6 +250,7 @@ class BaseCore: ...@@ -263,6 +250,7 @@ class BaseCore:
self.cnx_ = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='clb_project', self.cnx_ = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='clb_project',
charset='utf8mb4') charset='utf8mb4')
self.cursor_ = self.cnx_.cursor() self.cursor_ = self.cnx_.cursor()
# 连接到Redis # 连接到Redis
self.r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=6) self.r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=6)
...@@ -453,82 +441,11 @@ class BaseCore: ...@@ -453,82 +441,11 @@ class BaseCore:
# 解析word文件页数 # 解析word文件页数
def doc_page(self, file_path):
doc = Document(file_path)
return len(doc.sections)
def pdf_content(self, resp_content):
# 解析pdf文件内容
content = ''
for i in range(0, 3):
try:
result = client.upload_by_buffer(resp_content, file_ext_name='pdf')
with fitz.open(stream=resp_content, filetype='pdf') as doc:
# page_size = doc.page_count
for page in doc.pages():
content += page.get_text()
break
except:
time.sleep(3)
continue
return content
def getuuid(self): def getuuid(self):
get_timestamp_uuid = uuid.uuid1() # 根据 时间戳生成 uuid , 保证全球唯一 get_timestamp_uuid = uuid.uuid1() # 根据 时间戳生成 uuid , 保证全球唯一
return get_timestamp_uuid return get_timestamp_uuid
# 替换为绝对路径之后,解析出来a.href
def uploadToserver(self, file_href, item_id):
category = os.path.splitext(file_href)[1]
# 上传至文件服务器
headers = {}
retData = {'state': False, 'type_id': 7, 'item_id': item_id, 'group_name': 'group1', 'path': '',
'full_path': '',
'category': category, 'file_size': '', 'status': 1, 'create_by': 'XueLingKun',
'create_time': '', 'page_size': '', 'content': ''}
headers['User-Agent'] = self.getRandomUserAgent()
resp_content = ''
for i in range(0, 3):
try:
resp_content = requests.get(file_href, headers=headers, verify=False, timeout=20).content
break
except:
time.sleep(3)
continue
if resp_content:
pass
else:
return retData
# page_size = 0
# if category == '.doc' or category == '.docx':
# # page_size = self.doc_page(file_href)
# return retData
# if category == '.pdf' or category == '.PDF':
# page_size = self.pdf_page(resp_content)
for i in range(0, 3):
try:
result = client.upload_by_buffer(resp_content, file_ext_name=category.replace('.', ''))
self.getLogger().info('-------文件上传成功------')
break
except:
time.sleep(3)
continue
# if page_size>0:
# pass
# else:
# self.getLogger().info(f'======解析失败=====')
# return retData
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
retData['state'] = True
retData['path'] = bytes.decode(result['Remote file_id']).replace('group1', '')
retData['full_path'] = bytes.decode(result['Remote file_id'])
retData['file_size'] = result['Uploaded size']
retData['create_time'] = time_now
# retData['page_size'] = page_size
return retData
def secrchATT(self, item_id, retData, type_id, order_by): def secrchATT(self, item_id, retData, type_id, order_by):
sel_sql = '''select id from clb_sys_attachment where item_id = %s and path = %s and type_id=%s and order_by=%s ''' sel_sql = '''select id from clb_sys_attachment where item_id = %s and path = %s and type_id=%s and order_by=%s '''
self.cursor_.execute(sel_sql, (item_id, retData['path'], type_id, order_by)) self.cursor_.execute(sel_sql, (item_id, retData['path'], type_id, order_by))
...@@ -549,20 +466,21 @@ class BaseCore: ...@@ -549,20 +466,21 @@ class BaseCore:
page_size = retData['page_size'] page_size = retData['page_size']
create_time = retData['create_time'] create_time = retData['create_time']
order_by = num order_by = num
object_key = full_path.split('https://zzsn.obs.cn-north-1.myhuaweicloud.com/')[1]
Upsql = '''insert into clb_sys_attachment(name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,object_key,bucket_name,publish_time) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''' Upsql = '''insert into clb_sys_attachment(name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,object_key,bucket_name,publish_time) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
values = ( values = (
file_name, type_id, item_id, group_name, path, full_path, category, file_size, order_by, file_name, type_id, item_id, group_name, path, full_path, category, file_size, order_by,
status, create_by, status, create_by,
create_time, full_path.split('https://zzsn.obs.cn-north-1.myhuaweicloud.com/')[1], 'zzsn', publishDate) create_time, object_key, 'zzsn', publishDate)
self.cursor_.execute(Upsql, values) # 插入 self.cursor_.execute(Upsql, values) # 插入
self.cnx_.commit() # 提交 self.cnx_.commit() # 提交
self.getLogger().info("更新完成:{}".format(Upsql)) self.getLogger().info("更新完成:{}".format(Upsql))
selects = self.secrchATT(item_id, retData, type_id, order_by) selects = self.secrchATT(item_id, retData, type_id, order_by)
id = selects[0] id = selects[0]
return id, full_path return id, object_key
# 获取文件大小 # 获取文件大小
def convert_size(self, size_bytes): def convert_size(self, size_bytes):
...@@ -600,18 +518,17 @@ class BaseCore: ...@@ -600,18 +518,17 @@ class BaseCore:
except: except:
time.sleep(3) time.sleep(3)
continue continue
else:
try:
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
retData['state'] = True
retData['path'] = result['body']['objectUrl'].split('.com')[1]
retData['full_path'] = result['body']['objectUrl']
retData['file_size'] = self.convert_size(file_size)
retData['create_time'] = time_now
except Exception as e:
print(f'error:{e}')
return retData
try:
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
retData['state'] = True
retData['path'] = result['body']['objectUrl'].split('.com')[1]
retData['full_path'] = result['body']['objectUrl']
retData['file_size'] = self.convert_size(file_size)
retData['create_time'] = time_now
return retData
except Exception as e:
print(f'error:{e}')
return retData return retData
...@@ -626,3 +543,5 @@ class BaseCore: ...@@ -626,3 +543,5 @@ class BaseCore:
# 根据链接删除链接重复的数据
import json
import threading
import time
import uuid
import redis
import requests
from retry import retry
from elasticsearch import Elasticsearch
from base import BaseCore
from obs import ObsClient
import fitz
from urllib.parse import unquote
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
baseCore = BaseCore.BaseCore()
cnx_ = baseCore.cnx_
cursor_ = cnx_.cursor()
lock = threading.Lock()
pool = redis.ConnectionPool(host="114.115.236.206", port=6379, password='clbzzsn', db=6)
class EsMethod(object):
def __init__(self):
# 创建Elasticsearch对象,并提供账号信息
self.es = Elasticsearch(['http://114.116.19.92:9700'], http_auth=('elastic', 'zzsn9988'), timeout=300)
self.index_name = 'policy'
def queryatt(self,index_name,pnum):
body = {
"size":0,
"aggs":{
"duplicate_titles":{
"terms":{
"field":"sourceAddress.keyword",
"min_doc_count":2,
"size":1000
},
"aggs":{
"duplicate_docs":{
"top_hits":{
"_source":{
"includes":["id","title","subjectId","sourceAddress","createDate"]
},
"size":10
}
}
}
}
}
}
# filter_path = ['hits.aggregations.duplicate_titles',
# 'hits.total.value',
# 'hits.hits._source.title',
# 'hits.hits._source.sourceAddress',
# 'hits.hits._source.createDate',
# ] # 字段2
result = self.es.search(index=index_name
, doc_type='_doc'
# , filter_path=filter_path
, body=body)
# log.info(result)
return result
def main(page, p, esMethod):
result = esMethod.queryatt(index_name=esMethod.index_name, pnum=p)
total = result['hits']['total']['value']
if total == 0:
log.info('++++已没有数据+++++')
return
documents = result["aggregations"]["duplicate_titles"]["buckets"]
for bucket in documents:
info_list = bucket["duplicate_docs"]["hits"]["hits"]
for info in info_list:
att_id_list = info['_source']['attachmentIds']
if len(att_id_list)==0:
unique_document_ids = info["_id"]
log.info(f'==={unique_document_ids}===')
# # 删除重复的文档
# for doc_id in unique_document_ids:
# esMethod.delete(index="policy", id=doc_id)
def run_threads(num_threads,esMethod,j):
threads = []
for i in range(num_threads):
page = j + i + 1
p = j + i * 200
thread = threading.Thread(target=main, args=(page, p, esMethod))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
if __name__ == "__main__":
j = 0
for i in range(5):
esMethod = EsMethod()
# result = esMethod.queryatt(index_name=esMethod.index_name, pnum=p)
# total = result['hits']['total']['value']
# if total == 0:
# log.info('++++已没有数据+++++')
# break
start = time.time()
num_threads = 5
run_threads(num_threads, esMethod, j)
j += 1000
log.info(f'5线程 每个处理200条数据 总耗时{time.time() - start}秒')
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论