提交 b931eea2 作者: 薛凌堃

12/13

上级 1f595f59
...@@ -74,7 +74,7 @@ def NewsEnterprise(): ...@@ -74,7 +74,7 @@ def NewsEnterprise():
print('=======') print('=======')
#将数据插入到redis中 #将数据插入到redis中
for item in gn_social_list: for item in gn_social_list:
r.rpush('NewsEnterprise:gnqy_socialCode', item) r.rpush('NewsResend:newsInfo', item)
# r.rpush('NewsEnterprise:gnqybc_socialCode', item) # r.rpush('NewsEnterprise:gnqybc_socialCode', item)
# for item in gw_social_list: # for item in gw_social_list:
...@@ -126,30 +126,69 @@ def NoticeEnterprise_task(): ...@@ -126,30 +126,69 @@ def NoticeEnterprise_task():
def NoticeDF(): def NoticeDF():
cnx, cursor = connectSql() cnx, cursor = connectSql()
# 获取美股企业 # 获取美股企业
# # mg_query = "SELECT a.SocialCode From EnterpriseInfo a ,EnterpriseType b WHERE a.SocialCode = b.SocialCode AND b.type=2 AND a.Place=0 AND SecuritiesCode is not null AND SecuritiesCode not LIKE '%.%'" om_mg_query = "SELECT a.SocialCode From EnterpriseInfo a ,EnterpriseType b WHERE a.SocialCode = b.SocialCode AND b.type=6 AND a.Place=0 AND SecuritiesCode is not null AND SecuritiesCode not LIKE '%.%'"
# mg_query = "SELECT a.SocialCode From EnterpriseInfo a ,EnterpriseType b WHERE a.SocialCode = b.SocialCode AND b.type=6 AND a.Place=0 AND SecuritiesCode is not null AND SecuritiesCode not LIKE '%.%'" cursor.execute(om_mg_query)
# cursor.execute(mg_query) cnx.commit()
# cnx.commit() om_mg_result = cursor.fetchall()
# mg_result = cursor.fetchall() om_mg_social_list = [item[0] for item in om_mg_result]
# mg_social_list = [item[0] for item in mg_result] print('欧盟美股企业=======')
# print('=======') for item in om_mg_social_list:
# for item in mg_social_list: if r.lrem('NoticeEnterprise:mgqy_socialCode_add', 0, item) == 0:
# if r.lrem('NoticeEnterprise:mgqy_socialCode_add', 0, item) == 0: r.lpush('NoticeEnterprise:mgqy_socialCode_add', item)
# r.lpush('NoticeEnterprise:mgqy_socialCode_add', item) else:
# else: continue
# continue
# # r.rpush('NoticeEnterprise:mgqy_socialCode_add', item) fq_mg_query = "SELECT a.SocialCode From EnterpriseInfo a ,EnterpriseType b WHERE a.SocialCode = b.SocialCode AND b.type=2 AND a.Place=0 AND SecuritiesCode is not null AND SecuritiesCode not LIKE '%.%'"
cursor.execute(fq_mg_query)
cnx.commit()
fq_mg_result = cursor.fetchall()
fq_mg_social_list = [item[0] for item in fq_mg_result]
print('500强美股企业=======')
for item in fq_mg_social_list:
if r.lrem('NoticeEnterprise:mgqy_socialCode_add', 0, item) == 0:
r.lpush('NoticeEnterprise:mgqy_socialCode_add', item)
else:
continue
fbs_mg_query = "SELECT a.SocialCode From EnterpriseInfo a ,EnterpriseType b WHERE a.SocialCode = b.SocialCode AND b.type=3 AND a.Place=0 AND SecuritiesCode is not null AND SecuritiesCode not LIKE '%.%'"
cursor.execute(fbs_mg_query)
cnx.commit()
fbs_mg_result = cursor.fetchall()
fbs_mg_social_list = [item[0] for item in fbs_mg_result]
print('福布斯美股企业=======')
for item in fbs_mg_social_list:
if r.lrem('NoticeEnterprise:mgqy_socialCode_add', 0, item) == 0:
r.lpush('NoticeEnterprise:mgqy_socialCode_add', item)
else:
continue
# 获取港股企业 # 获取港股企业
gg_query = "SELECT a.SocialCode From EnterpriseInfo a ,EnterpriseType b WHERE a.SocialCode = b.SocialCode AND b.type=6 And SecuritiesCode like '%.HK'" om_gg_query = "SELECT a.SocialCode From EnterpriseInfo a ,EnterpriseType b WHERE a.SocialCode = b.SocialCode AND b.type=6 And SecuritiesCode like '%.HK'"
cursor.execute(gg_query) cursor.execute(om_gg_query)
cnx.commit() cnx.commit()
gg_result = cursor.fetchall() om_gg_result = cursor.fetchall()
gg_social_list = [item[0] for item in gg_result] om_gg_social_list = [item[0] for item in om_gg_result]
print('=======') print('欧盟港股企业=======')
for item in gg_social_list: for item in om_gg_social_list:
r.rpush('NoticeEnterprise:ggqy_socialCode_add', item)
fq_gg_query = "SELECT a.SocialCode From EnterpriseInfo a ,EnterpriseType b WHERE a.SocialCode = b.SocialCode AND b.type=2 And SecuritiesCode like '%.HK'"
cursor.execute(fq_gg_query)
cnx.commit()
fq_gg_result = cursor.fetchall()
fq_gg_social_list = [item[0] for item in fq_gg_result]
print('500强港股企业=======')
for item in fq_gg_social_list:
r.rpush('NoticeEnterprise:ggqy_socialCode_add', item) r.rpush('NoticeEnterprise:ggqy_socialCode_add', item)
fbs_gg_query = "SELECT a.SocialCode From EnterpriseInfo a ,EnterpriseType b WHERE a.SocialCode = b.SocialCode AND b.type=2 And SecuritiesCode like '%.HK'"
cursor.execute(fbs_gg_query)
cnx.commit()
fbs_gg_result = cursor.fetchall()
fbs_gg_social_list = [item[0] for item in fbs_gg_result]
print('500强港股企业=======')
for item in fbs_gg_social_list:
r.rpush('NoticeEnterprise:ggqy_socialCode_add', item)
closeSql(cnx, cursor) closeSql(cnx, cursor)
...@@ -612,7 +651,7 @@ if __name__ == "__main__": ...@@ -612,7 +651,7 @@ if __name__ == "__main__":
# BaseInfoEnterprise() # BaseInfoEnterprise()
# BaseInfoEnterpriseAbroad() # BaseInfoEnterpriseAbroad()
# NewsEnterprise_task() # NewsEnterprise_task()
# NewsEnterprise() NewsEnterprise()
# CorPerson() # CorPerson()
# china100() # china100()
# global100() # global100()
...@@ -630,6 +669,6 @@ if __name__ == "__main__": ...@@ -630,6 +669,6 @@ if __name__ == "__main__":
# NoticeEnterprise_task() # NoticeEnterprise_task()
# AnnualEnterprise_task() # AnnualEnterprise_task()
# FinanceFromEast() # FinanceFromEast()
ipo_code() # ipo_code()
log.info(f'====={basecore.getNowTime(1)}=====添加数据成功======耗时:{basecore.getTimeCost(start,time.time())}===') log.info(f'====={basecore.getNowTime(1)}=====添加数据成功======耗时:{basecore.getTimeCost(start,time.time())}===')
import traceback
import urllib
import uuid
from selenium import webdriver
from bs4 import BeautifulSoup
from urllib import parse
import requests, re, time, pymysql, json, redis
from kafka import KafkaProducer
import urllib3
urllib3.disable_warnings()
from obs import ObsClient
import fitz
import sys
sys.path.append('D:\\kkwork\\zzsn_spider\\base')
import BaseCore
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
obsClient = ObsClient(
access_key_id='VEHN7D0TJ9316H8AHCAV', # 你的华为云的ak码
secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk
server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址
)
# tracker_conf = get_tracker_conf('./client.conf')
# client = Fdfs_client(tracker_conf)
chromedriver = 'D:/chrome/113/chromedriver.exe'
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36'
}
pathType_a = 'QYResearchReport/'
pathHYType = 'HYResearchReport/'
cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
def clean_text(text):
"""
清理多余空行
:param text:
:return:
"""
soup = BeautifulSoup(text, 'html.parser')
# log.info(soup.get_text())
text = soup.get_text()
# str1 = re.sub('[\n]+', '\n', 'dfadf d\n \n\n \nfa ds ')
text_ = re.sub('\n+', '\n', text.replace('\t', '').replace('\r', ''))
return text_
# 获取文件大小
def convert_size(size_bytes):
# 定义不同单位的转换值
units = ['bytes', 'KB', 'MB', 'GB', 'TB']
i = 0
while size_bytes >= 1024 and i < len(units) - 1:
size_bytes /= 1024
i += 1
return f"{size_bytes:.2f} {units[i]}"
def getuuid():
get_timestamp_uuid = uuid.uuid1() # 根据 时间戳生成 uuid , 保证全球唯一
return get_timestamp_uuid
# 数据入库,返回主键id传到kafka中
def tableUpdate(year, name_pdf, type_id, item_id, group_name, path, full_path, category, file_size, order_by, status,
create_by, create_time, come, page_size):
with cnx.cursor() as cursor:
Upsql = '''insert into clb_sys_attachment_copy2(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,source,page_size,object_key,bucket_name) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
values = (
year, name_pdf, type_id, item_id, group_name, path, full_path, category, file_size, order_by, status, create_by,
create_time, come, page_size, full_path.split('https://zzsn.obs.cn-north-1.myhuaweicloud.com/')[1], 'zzsn')
# log.info(values)
cursor.execute(Upsql, values) # 插入
cnx.commit() # 提交
querySql = '''select id from clb_sys_attachment_copy2 where type_id=4 and full_path = %s''' # and stock_code = "01786.HK"
cursor.execute(querySql, full_path)
selects = cursor.fetchone()
pdf_id = selects[0]
# cnx.close()
# log.info("更新完成:{}".format(pdf_id))
return pdf_id
# redis去重
def add_check_url(article_url):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn')
# res = r.sadd(f'report_pdf_two_history', article_url,3)
res = r.sadd(f'report_pdf_three_history_2', article_url, 3) # 注意是 保存set的方式
if res == 0: # 若返回0,说明插入不成功,表示有重复
return True
else:
return False
def uptoOBS(pdf_url, name_pdf, type_id, pathType):
retData = {'state': False, 'type_id': type_id, 'group_name': '', 'path': '',
'full_path': '',
'category': 'pdf', 'file_size': '', 'status': 1, 'create_by': 'XueLingKun',
'create_time': '', 'page_size': '', 'content': ''}
for i in range(0, 3):
try:
response = requests.get(pdf_url, headers=headers, verify=False, timeout=20)
file_size = int(response.headers.get('Content-Length'))
break
except:
time.sleep(3)
continue
page_size = 0
for i in range(0, 3):
try:
name = str(getuuid()) + '.pdf'
now_time = time.strftime("%Y-%m")
result = obsClient.putContent('zzsn', pathType + name, content=response.content)
with fitz.open(stream=response.content, filetype='pdf') as doc:
page_size = doc.page_count
for page in doc.pages():
retData['content'] += page.get_text()
break
except Exception as e:
time.sleep(3)
continue
if page_size < 1:
# pdf解析失败
# log.info(f'======pdf解析失败=====')
return retData
else:
try:
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
retData['state'] = True
retData['path'] = result['body']['objectUrl'].split('.com')[1]
retData['full_path'] = result['body']['objectUrl']
retData['file_size'] = convert_size(file_size)
retData['create_time'] = time_now
retData['page_size'] = page_size
except Exception as e:
return retData
return retData
# 下载pdf文件,上传至服务器
def download(data, order_by):
url_pdf = data['url_pdf']
name_pdf = data['title']
if '.pdf' not in name_pdf:
name_pdf = name_pdf + '.pdf'
year = int(data['year'])
type_id = data['type_id']
item_id = data['item_id']
category = data['category']
create_by = data['create_by']
publishDate = data['publishDate']
origin = data['origin']
if origin == '行业研报':
pathType = pathHYType
else:
pathType = pathType_a
sourceAddress = data['sourceAddress']
content = data['content']
summary = str(data['summary'])
sid = data['sid']
try:
come = data['come']
except:
come = ''
tf_url = add_check_url(sourceAddress)
if tf_url:
dic_result = {
'success': 'ture',
'message': '数据已存在',
'code': '200',
}
log.info(dic_result)
return
if url_pdf:
pass
else:
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_news = {
'attachmentIds': '',
'author': '',
'content': content,
'contentWithTag': '',
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': '',
'origin': origin,
'publishDate': publishDate,
'sid': sid,
'sourceAddress': sourceAddress,
'summary': summary,
'title': name_pdf,
'type': '0'
}
# 将相应字段通过kafka传输保存
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
try:
kafka_result = producer.send("researchReportStudyTopic", json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
log.info(kafka_result.get(timeout=10))
except:
log.info(f'无pdf链接:{dic_news["title"]}、{dic_news["createDate"]}')
dic_result = {
'success': 'ture',
'message': '操作成功',
'code': '200',
}
log.info(dic_result)
return
if 'http' not in url_pdf:
url_pdf = 'https://' + url_pdf
# 文件上传到obs
retData = uptoOBS(url_pdf, name_pdf, 4, pathType)
if retData['state']:
pass
else:
log.info(f'====pdf解析失败====')
return
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
page_size = retData['page_size']
path = retData['path']
full_path = retData['full_path']
file_size = retData['file_size']
status = 1
create_time = time_now
# year,name_pdf,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,come,page_size):
att_id = tableUpdate(year, name_pdf, type_id, item_id, '', path, full_path, category, file_size, order_by, status,
create_by, create_time, come, page_size)
if att_id:
pass
else:
return
# except:
# # log.info('下载失败')
# dic_result = {
# 'success':'false',
# 'message':'下载失败',
# 'code':'204',
# }
# log.info(dic_result)
# return
# # 将本地文件连接保存进MySQL表用于查询
# table_id = tableUpdate(year,name_pdf,type_id,item_id,group_name,path,full_path,
# category,file_size,order_by,status,create_by,create_time,come,page_size)
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_news = {
'attachmentIds': att_id,
'author': '',
'content': content,
'contentWithTag': '',
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': '',
'origin': origin,
'publishDate': publishDate,
'sid': sid,
'sourceAddress': sourceAddress,
'summary': summary,
'title': name_pdf.split('.pdf')[0],
'type': '0'
}
# log.info(dic_news)
# 将相应字段通过kafka传输保存
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("researchReportStudyTopic", json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
log.info(kafka_result.get(timeout=10))
except Exception as e:
msg = traceback.format_exc()
log.info(msg)
log.info(f'传输失败:{dic_news["title"]}、{dic_news["publishDate"]}')
dic_result = {
'success': 'ture',
'message': '操作成功',
'code': '200',
}
log.info(dic_result)
return
# 东方财富网
def dongfangcaifu():
cnx2 = pymysql.connect(host='114.115.159.144', user='root', password='zzsn9988', db='clb_project',
charset='utf8mb4')
list_short_name = []
list_social_code = []
with cnx2.cursor() as cursor:
sel_sql = '''select securities_short_name,social_credit_code from sys_base_enterprise_ipo'''
cursor.execute(sel_sql)
selects = cursor.fetchall()
for row in selects:
short_name = row[0]
social_code = row[1]
if short_name:
list_short_name.append(short_name)
list_social_code.append(social_code)
for key_word_num in range(0, len(list_short_name)):
log.info(key_word_num)
key_word = list_short_name[key_word_num]
social_code = list_social_code[key_word_num]
log.info(key_word)
page = 1
# for page in range(1,500):
# log.info(page)
param = {
"uid": "",
"keyword": key_word,
"type": ["researchReport"],
"client": "web",
"clientVersion": "curr",
"clientType": "web",
"param": {"researchReport": {"client": "web", "pageSize": 10, "pageIndex": page}}
}
param_url = parse.quote(str(param).replace(" ", ""))
# param_url = f'%7B"uid"%3A""%2C"keyword"%3A"{key_word}"%2C"type"%3A%5B"researchReport"%5D%2C"client"%3A"web"%2C"clientVersion"%3A"curr"%2C"clientType"%3A"web"%2C"param"%3A%7B"researchReport"%3A%7B"client"%3A"web"%2C"pageSize"%3A10%2C"pageIndex"%3A{page}%7D%7D%7D'
t = int(time.time() * 1000)
url = f'https://search-api-web.eastmoney.com/search/jsonp?cb=&param={param_url}&_={t}'
res = requests.get(url).text[1:-1]
res_json = json.loads(res)
list_all = res_json['result']['researchReport']
if list_all:
pass
else:
continue
for one_news in list_all:
news_title = one_news['title']
news_title = news_title.replace('<em>', '').replace('</em>', '')
news_date = one_news['date'][:10]
comparison_date = "2023-12-08"
# 比较发布日期是否小于2023-10-06
if news_date < comparison_date:
continue
else:
pass
news_come = one_news['source']
news_code = one_news['code']
news_url = f'https://data.eastmoney.com/report/zw_stock.jshtml?infocode={news_code}'
news_res = requests.get(news_url)
news_soup = BeautifulSoup(news_res.content, 'html.parser')
try:
if '抱歉,您访问的页面不存在或已删除!' in news_soup.title.text:
continue
except:
continue
try:
news_content = news_soup.find('div', {'class': 'newsContent'}).text.strip()
except:
news_content = news_soup.find('div', {'class': 'ctx-content'}).text.strip()
try:
news_pdf = news_soup.find('div', {'class': 'detail-header'}).find_all('a')[-1].get('href')
except:
news_pdf = news_soup.find('span', {'class': 'to-link'}).a.get('href')
dic_post = {
'title': news_title, # 报告名称
'url_pdf': news_pdf, # 报告链接
'year': news_date[:4], # 报告年份
'type_id': '4', # 报告种类,(年报:1,季报:2,月报:3,研报:4)
'item_id': social_code, # 关联记录id,如:企业信用代码
'category': 'pdf', # 文件后缀名,如:pdf
'create_by': 'TangYuHang', # 创建人,使用驼峰命名,如:TangYuHang
'publishDate': news_date, # 时间
'origin': '东方财富网-研报中心', # 来源
'sourceAddress': news_url, # 原文链接
'content': '', # 内容
'summary': news_content, # 摘要
'sid': '1662008733005160449', # 信息源id
'come': news_come,
}
order_by = 1
download(dic_post, order_by)
order_by += 1
# log.info(page,dic_post)
# url = 'http://114.115.155.139:5002/report_download'
# # report-list
# res = requests.post(url, data=json.dumps(dic_post))
# log.info(res.json())
# dic_news = {
# '关键字':key_word,
# '标题':news_title,
# '时间':news_date,
# '来源':news_come,
# '摘要':news_content,
# '原文链接':news_url,
# 'PDF链接':news_pdf,
# }
# list_all_info.append(dic_news)
# if len(list_all) != 10:
# break
# 东方财富网2
def dongfangcaifu2():
list_short_name = ['新', '的', '电', '能']
for key_word_num in range(0, len(list_short_name)):
log.info(key_word_num)
key_word = list_short_name[key_word_num]
social_code = ''
log.info(key_word)
page = 1
# for page in range(1,500):
# log.info(page)
param = {
"uid": "",
"keyword": key_word,
"type": ["researchReport"],
"client": "web",
"clientVersion": "curr",
"clientType": "web",
"param": {"researchReport": {"client": "web", "pageSize": 10, "pageIndex": page}}
}
param_url = parse.quote(str(param).replace(" ", ""))
# param_url = f'%7B"uid"%3A""%2C"keyword"%3A"{key_word}"%2C"type"%3A%5B"researchReport"%5D%2C"client"%3A"web"%2C"clientVersion"%3A"curr"%2C"clientType"%3A"web"%2C"param"%3A%7B"researchReport"%3A%7B"client"%3A"web"%2C"pageSize"%3A10%2C"pageIndex"%3A{page}%7D%7D%7D'
t = int(time.time() * 1000)
url = f'https://search-api-web.eastmoney.com/search/jsonp?cb=&param={param_url}&_={t}'
res = requests.get(url).text[1:-1]
res_json = json.loads(res)
list_all = res_json['result']['researchReport']
if list_all:
pass
else:
continue
for one_news in list_all:
news_title = one_news['title']
news_title = news_title.replace('<em>', '').replace('</em>', '')
news_date = one_news['date'][:10]
comparison_date = "2023-12-08"
# 比较发布日期是否小于2023-10-06
if news_date < comparison_date:
continue
else:
pass
news_come = one_news['source']
news_code = one_news['code']
news_url = f'https://data.eastmoney.com/report/zw_stock.jshtml?infocode={news_code}'
news_res = requests.get(news_url)
news_soup = BeautifulSoup(news_res.content, 'html.parser')
try:
if '抱歉,您访问的页面不存在或已删除!' in news_soup.title.text:
continue
except:
continue
try:
news_content = news_soup.find('div', {'class': 'newsContent'}).text.strip()
except:
news_content = news_soup.find('div', {'class': 'ctx-content'}).text.strip()
try:
news_pdf = news_soup.find('div', {'class': 'detail-header'}).find_all('a')[-1].get('href')
except:
news_pdf = news_soup.find('span', {'class': 'to-link'}).a.get('href')
dic_post = {
'title': news_title, # 报告名称
'url_pdf': news_pdf, # 报告链接
'year': news_date[:4], # 报告年份
'type_id': '4', # 报告种类,(年报:1,季报:2,月报:3,研报:4)
'item_id': social_code, # 关联记录id,如:企业信用代码
'category': 'pdf', # 文件后缀名,如:pdf
'create_by': 'TangYuHang', # 创建人,使用驼峰命名,如:TangYuHang
'publishDate': news_date, # 时间
'origin': '东方财富网-研报中心', # 来源
'sourceAddress': news_url, # 原文链接
'content': '', # 内容
'summary': news_content, # 摘要
'sid': '1662008733005160449', # 信息源id
'come': news_come,
}
order_by = 1
download(dic_post, order_by)
order_by += 1
# 东方财富网3 个股研报
def dongfangcaifu3():
t = str(int(time.time()) * 1000)
import datetime
now = datetime.datetime.now()
# 将日期格式化为“%Y-%m-%d”格式
# 当前日期
formatted_date = now.strftime("%Y-%m-%d")
pre_year = int(time.strftime('%Y', time.localtime(time.time()))) - 2
month_day = time.strftime('%m-%d', time.localtime(time.time()))
pre_date = '{}-{}'.format(pre_year, month_day)
# log.info("格式化后的日期为:", formatted_date)
# for i in range(1,1349):
for i in range(1, 15):
url = f'https://reportapi.eastmoney.com/report/list?industryCode=*&pageSize=50&industry=*&rating=&ratingChange=&beginTime={pre_date}&endTime={formatted_date}&pageNo={i}&fields=&qType=0&orgCode=&code=*&rcode=&p={i}&pageNum={i}&pageNumber={i}&_={t}'
# url = 'https://reportapi.eastmoney.com/report/list?industryCode=*&pageSize=50&industry=*&rating=&ratingChange=&beginTime=2021-06-13&endTime=2023-06-13&pageNo=1&fields=&qType=0&orgCode=&code=*&rcode=&p=1&pageNum=1&pageNumber=1&_=1686645164397'
res = requests.get(url).text
# log.info(res)
res_json = json.loads(res)
list_all = res_json['data']
# log.info(list_all)
for one_news in list_all:
news_title = one_news['title']
# log.info(news_title)
news_date = one_news['publishDate'][:10]
comparison_date = "2023-12-08"
# 比较发布日期是否小于2023-10-06
if news_date < comparison_date:
continue
else:
pass
news_come = one_news['orgSName']
news_href = 'https://data.eastmoney.com/report/info/' + one_news['infoCode'] + '.html'
news_res = requests.get(news_href)
news_soup = BeautifulSoup(news_res.content, 'html.parser')
# log.info(news_soup)
try:
if '抱歉,您访问的页面不存在或已删除!' in news_soup.title.text:
continue
except:
continue
# todo:摘要去标签
news_content = news_soup.find(id='ContentBody').text.replace(' ', '').strip()
news_content = clean_text(news_content)
news_result = news_soup.find(class_='report-infos')
# log.info(news_result)
news_pdf = news_result.find_all('span')[4].find('a')['href']
# log.info(news_pdf)
dic_post = {
'title': news_title, # 报告名称
'url_pdf': news_pdf, # 报告链接
'year': news_date[:4], # 报告年份
'type_id': '4', # 报告种类,(年报:1,季报:2,月报:3,研报:4)
'item_id': '', # 关联记录id,如:企业信用代码
'category': 'pdf', # 文件后缀名,如:pdf
'create_by': 'XueLingKun', # 创建人,使用驼峰命名,如:TangYuHang
'publishDate': news_date, # 时间
'origin': '个股研报', # 来源
'sourceAddress': news_href, # 原文链接
'content': '', # 内容
'summary': news_content, # 摘要
# 1673152967405879298
'sid': '1673152967405879298', # 信息源id
'come': news_come,
}
# list_quchong.append(dic_post)
order_by = 1
download(dic_post, order_by)
order_by += 1
# log.info(dic_post['title'],dic_post['publishDate'])
# 东方财富网4 行业研报
def dongfangcaifu4():
t = str(int(time.time()) * 1000)
import datetime
now = datetime.datetime.now()
# 将日期格式化为“%Y-%m-%d”格式
# 当前日期
formatted_date = now.strftime("%Y-%m-%d")
pre_year = int(time.strftime('%Y', time.localtime(time.time()))) - 2
month_day = time.strftime('%m-%d', time.localtime(time.time()))
pre_date = '{}-{}'.format(pre_year, month_day)
# log.info("格式化后的日期为:", formatted_date)
for i in range(1, 15):
url = f'https://reportapi.eastmoney.com/report/list?&industryCode=*&pageSize=50&industry=*&rating=*&ratingChange=*&beginTime={pre_date}&endTime={formatted_date}&pageNo={i}&fields=&qType=1&orgCode=&rcode=&p={i}&pageNum={i}&pageNumber={i}&_={t}'
# url = "https://reportapi.eastmoney.com/report/list?&industryCode=*&pageSize=50&industry=*&rating=*&ratingChange=*&beginTime=2021-06-27&endTime=2023-06-27&pageNo=6&fields=&qType=1&orgCode=&rcode=&p=6&pageNum=6&pageNumber=6&_=1687831020493"
res = requests.get(url).text
# log.info(res)
res_json = json.loads(res)
list_all = res_json['data']
# log.info(list_all)
for one_news in list_all:
news_title = one_news['title']
# log.info(news_title)
news_date = one_news['publishDate'][:10]
news_come = one_news['orgSName']
news_date = one_news['publishDate'][:10]
comparison_date = "2023-12-08"
# 比较发布日期是否小于2023-10-06
if news_date < comparison_date:
continue
else:
pass
news_href = 'https://data.eastmoney.com/report/info/' + one_news['infoCode'] + '.html'
news_res = requests.get(news_href)
news_soup = BeautifulSoup(news_res.content, 'html.parser')
# log.info(news_soup)
try:
if '抱歉,您访问的页面不存在或已删除!' in news_soup.title.text:
continue
except:
continue
# news_content = news_soup.find(id='ContentBody')
try:
news_content = news_soup.find(id='ContentBody').text.replace(' ', '').strip()
news_content = clean_text(news_content)
except:
# ctx-content
# log.info(news_href,news_title)
news_content = news_soup.find(id='ctx-content').text.replace(' ', '').strip()
news_content = clean_text(news_content)
try:
news_result = news_soup.find(class_='report-infos')
# log.info(news_result)
news_pdf = news_result.find_all('span')[4].find('a')['href']
# log.info(news_pdf)
except:
news_pdf = news_soup.find('span', class_='to-link').find('a')['href']
# log.info(news_soup)
dic_post = {
'title': news_title, # 报告名称
'url_pdf': news_pdf, # 报告链接
'year': news_date[:4], # 报告年份
'type_id': '4', # 报告种类,(年报:1,季报:2,月报:3,研报:4)
'item_id': '', # 关联记录id,如:企业信用代码
'category': 'pdf', # 文件后缀名,如:pdf
'create_by': 'XueLingKun', # 创建人,使用驼峰命名,如:TangYuHang
'publishDate': news_date, # 时间
'origin': '行业研报', # 来源
'sourceAddress': news_href, # 原文链接
'content': '', # 内容
'summary': news_content, # 摘要
'sid': '1673154805203091457', # 信息源id
'come': news_come,
}
order_by = 1
download(dic_post, order_by)
order_by += 1
# log.info(f'成功:{dic_post["title"]},{dic_post["publishDate"]}')
# 东方财富网5 新股研报
def dongfangcaifu5():
t = str(int(time.time()) * 1000)
import datetime
now = datetime.datetime.now()
# 将日期格式化为“%Y-%m-%d”格式
# 当前日期
formatted_date = now.strftime("%Y-%m-%d")
pre_year = int(time.strftime('%Y', time.localtime(time.time()))) - 2
month_day = time.strftime('%m-%d', time.localtime(time.time()))
pre_date = '{}-{}'.format(pre_year, month_day)
# log.info("格式化后的日期为:", formatted_date)
for i in range(1, 5):
url = f'https://reportapi.eastmoney.com/report/newStockList?pageSize=50&beginTime={pre_date}&endTime={formatted_date}&pageNo={i}&fields=&qType=4&p={i}&pageNum={i}&pageNumber={i}&_={t}'
res = requests.get(url).text
log.info(res)
res_json = json.loads(res)
list_all = res_json['data']
# log.info(list_all)
for one_news in list_all:
news_title = one_news['title']
# log.info(news_title)
news_date = one_news['publishDate'][:10]
news_come = one_news['orgSName']
comparison_date = "2023-12-08"
# 比较发布日期是否小于2023-10-06
if news_date < comparison_date:
continue
else:
pass
news_href = 'https://data.eastmoney.com/report/info/' + one_news['infoCode'] + '.html'
news_res = requests.get(news_href)
news_soup = BeautifulSoup(news_res.content, 'html.parser')
# log.info(news_soup)
try:
if '抱歉,您访问的页面不存在或已删除!' in news_soup.title.text:
continue
except:
continue
news_content = news_soup.find(id='ContentBody')
# news_content=str(news_content)
try:
news_result = news_soup.find(class_='report-infos')
# log.info(news_result)
news_pdf = news_result.find_all('span')[4].find('a')['href']
# log.info(news_pdf)
except:
news_pdf = news_soup.find('span', class_='to-link').find('a')['href']
# log.info(news_soup)
dic_post = {
'title': news_title, # 报告名称
'url_pdf': news_pdf, # 报告链接
'year': news_date[:4], # 报告年份
'type_id': '4', # 报告种类,(年报:1,季报:2,月报:3,研报:4)
'item_id': '', # 关联记录id,如:企业信用代码
'category': 'pdf', # 文件后缀名,如:pdf
'create_by': 'XueLingKun', # 创建人,使用驼峰命名,如:TangYuHang
'publishDate': news_date, # 时间
'origin': '新股研报', # 来源
'sourceAddress': news_href, # 原文链接
'content': '', # 内容
'summary': news_content, # 摘要
# 1673155535028764674
'sid': '1673155535028764674', # 信息源id
'come': news_come,
}
order_by = 1
download(dic_post, order_by)
order_by += 1
# log.info(f'成功:{dic_post["title"]},{dic_post["publishDate"]}')
# 东方财富网6 宏观研究
def dongfangcaifu6():
t = str(int(time.time()) * 1000)
import datetime
now = datetime.datetime.now()
# 将日期格式化为“%Y-%m-%d”格式
# 当前日期
formatted_date = now.strftime("%Y-%m-%d")
pre_year = int(time.strftime('%Y', time.localtime(time.time()))) - 2
month_day = time.strftime('%m-%d', time.localtime(time.time()))
pre_date = '{}-{}'.format(pre_year, month_day)
# log.info("格式化后的日期为:", formatted_date)
for i in range(1, 15):
url = f'https://reportapi.eastmoney.com/report/jg?pageSize=50&beginTime={pre_date}&endTime={formatted_date}&pageNo={i}&fields=&qType=3&orgCode=&author=&p={i}&pageNum={i}&pageNumber={i}&_={t}'
res = requests.get(url).text
# log.info(res)
res_json = json.loads(res)
list_all = res_json['data']
# log.info(list_all)
for one_news in list_all:
news_title = one_news['title']
# log.info(news_title)
news_date = one_news['publishDate'][:10]
comparison_date = "2023-12-08"
# 比较发布日期是否小于2023-10-06
if news_date < comparison_date:
continue
else:
pass
news_come = one_news['orgSName']
# https://data.eastmoney.com/report/zw_macresearch.jshtml?encodeUrl=H5bb92gpAmVPNj5wTfzCzue8aSv0RJSrltfLaEk8UoQ=
# H5bb92gpAmVPNj5wTfzCzue8aSv0RJSrltfLaEk8UoQ
news_href = 'https://data.eastmoney.com/report/zw_macresearch.jshtml?encodeUrl=' + one_news[
'encodeUrl'] + '='
news_res = requests.get(news_href)
news_soup = BeautifulSoup(news_res.content, 'html.parser')
# log.info(news_soup)
try:
if '抱歉,您访问的页面不存在或已删除!' in news_soup.title.text:
continue
except:
continue
# news_content = news_soup.find(id='ContentBody').text.replace(' ','').strip()
try:
news_content = news_soup.find(id='ContentBody').text.replace(' ', '').strip()
news_content = clean_text(news_content)
except:
# ctx-content
news_content = news_soup.find(id='ctx-content').text.replace(' ', '').strip()
news_content = clean_text(news_content)
# log.info(news_href)
try:
news_result = news_soup.find(class_='report-infos')
# log.info(news_result)
news_pdf = news_result.find_all('span')[4].find('a')['href']
# log.info(news_pdf)
except:
news_pdf = news_soup.find('span', class_='to-link').find('a')['href']
# log.info(news_soup)
dic_post = {
'title': news_title, # 报告名称
'url_pdf': news_pdf, # 报告链接
'year': news_date[:4], # 报告年份
'type_id': '4', # 报告种类,(年报:1,季报:2,月报:3,研报:4)
'item_id': '', # 关联记录id,如:企业信用代码
'category': 'pdf', # 文件后缀名,如:pdf
'create_by': 'XueLingKun', # 创建人,使用驼峰命名,如:TangYuHang
'publishDate': news_date, # 时间
'origin': '宏观研究', # 来源
'sourceAddress': news_href, # 原文链接
'content': '', # 内容
'summary': news_content, # 摘要
'sid': '1673155789744652290', # 信息源id
'come': news_come,
}
order_by = 1
download(dic_post, order_by)
order_by += 1
# log.info(f'成功:{dic_post["title"]},{dic_post["publishDate"]}')
# 东方财富网7 策略报告
def dongfangcaifu7():
t = str(int(time.time()) * 1000)
import datetime
now = datetime.datetime.now()
# 将日期格式化为“%Y-%m-%d”格式
# 当前日期
formatted_date = now.strftime("%Y-%m-%d")
pre_year = int(time.strftime('%Y', time.localtime(time.time()))) - 2
month_day = time.strftime('%m-%d', time.localtime(time.time()))
pre_date = '{}-{}'.format(pre_year, month_day)
# log.info("格式化后的日期为:", formatted_date)
for i in range(1, 3):
url = f'https://reportapi.eastmoney.com/report/jg?pageSize=50&beginTime={pre_date}&endTime={formatted_date}&pageNo={i}&fields=&qType=2&orgCode=&author=&p={i}&pageNum={i}&pageNumber={i}&_={t}'
res = requests.get(url).text
# log.info(res)
res_json = json.loads(res)
list_all = res_json['data']
# log.info(list_all)
for one_news in list_all:
news_title = one_news['title']
# log.info(news_title)
news_date = one_news['publishDate'][:10]
comparison_date = "2023-12-08"
# 比较发布日期是否小于2023-10-06
if news_date < comparison_date:
continue
else:
pass
news_come = one_news['orgSName']
# https://data.eastmoney.com/report/zw_macresearch.jshtml?encodeUrl=H5bb92gpAmVPNj5wTfzCzue8aSv0RJSrltfLaEk8UoQ=
# H5bb92gpAmVPNj5wTfzCzue8aSv0RJSrltfLaEk8UoQ
news_href = 'https://data.eastmoney.com/report/zw_macresearch.jshtml?encodeUrl=' + one_news[
'encodeUrl'] + '='
news_res = requests.get(news_href)
news_soup = BeautifulSoup(news_res.content, 'html.parser')
# log.info(news_soup)
try:
if '抱歉,您访问的页面不存在或已删除!' in news_soup.title.text:
continue
except:
continue
# news_content = news_soup.find(id='ContentBody')
try:
news_content = news_soup.find(id='ctx-content').text.replace(' ', '').strip()
news_content = clean_text(news_content)
except:
news_content = news_soup.find(id='ContentBody').text.replace(' ', '').strip()
news_content = clean_text(news_content)
# log.info(news_href,news_title)
try:
news_result = news_soup.find(class_='report-infos')
# log.info(news_result)
news_pdf = news_result.find_all('span')[4].find('a')['href']
# log.info(news_pdf)
except:
news_pdf = news_soup.find('span', class_='to-link').find('a')['href']
# log.info(news_soup)
dic_post = {
'title': news_title, # 报告名称
'url_pdf': news_pdf, # 报告链接
'year': news_date[:4], # 报告年份
'type_id': '4', # 报告种类,(年报:1,季报:2,月报:3,研报:4)
'item_id': '', # 关联记录id,如:企业信用代码
'category': 'pdf', # 文件后缀名,如:pdf
'create_by': 'XueLingKun', # 创建人,使用驼峰命名,如:TangYuHang
'publishDate': news_date, # 时间
'origin': '策略报告', # 来源
'sourceAddress': news_href, # 原文链接
'content': '', # 内容
'summary': news_content, # 摘要
'sid': '1673155946779394050', # 信息源id
'come': news_come,
}
order_by = 1
download(dic_post, order_by)
order_by += 1
# log.info(f'成功:{dic_post["title"]},{dic_post["publishDate"]}')
if __name__ == '__main__':
# try:
# log.info('mob')
# Mob()
# except:
# pass
# try:
# log.info('yidong_guanxiangtai')
# yidong_guanxiangtai()
# except:
# pass
# try:
# log.info('juliangsuanshu')
# juliangsuanshu()
# except:
# pass
# try:
# log.info('ke36')
# ke36()
# except:
# pass
# try:
# log.info('qianyanzhishiku')
# qianyanzhishiku()
# except:
# pass
# try:
# log.info('shijiejingjiluntan')
# shijiejingjiluntan()
# except:
# pass
try:
log.info('dongfangcaifu')
dongfangcaifu()
except:
pass
try:
log.info('dongfangcaifu2')
dongfangcaifu2()
except:
pass
try:
log.info('dongfangcaifu3')
dongfangcaifu3()
except Exception as e:
pass
try:
log.info('dongfangcaifu4')
dongfangcaifu4()
except:
pass
try:
log.info('dongfangcaifu5')
dongfangcaifu5()
except:
pass
try:
log.info('dongfangcaifu6')
dongfangcaifu6()
except:
pass
try:
log.info('dongfangcaifu7')
dongfangcaifu7()
except:
pass
...@@ -23,10 +23,10 @@ es = Elasticsearch(['http://114.116.19.92:9700'], http_auth=('elastic', 'zzsn998 ...@@ -23,10 +23,10 @@ es = Elasticsearch(['http://114.116.19.92:9700'], http_auth=('elastic', 'zzsn998
index_name = 'researchreportdata' index_name = 'researchreportdata'
pool = redis.ConnectionPool(host="114.115.236.206", port=6379, password='clbzzsn', db=6) pool = redis.ConnectionPool(host="114.115.236.206", port=6379, password='clbzzsn', db=6)
def searchATT(title): def searchATT():
sql = "select id from clb_sys_attachment where type_id=4 and name=%s " sql = "select id from clb_sys_attachment where type_id=4 and create_time>'2023-12-08' "
# lock.acquire() # lock.acquire()
cursor_.execute(sql, title+'.pdf') cursor_.execute(sql)
selects = cursor_.fetchone() selects = cursor_.fetchone()
# lock.release() # lock.release()
return selects return selects
......
...@@ -45,7 +45,7 @@ def sendKafka(dic_news,xydm): ...@@ -45,7 +45,7 @@ def sendKafka(dic_news,xydm):
start_time = time.time() start_time = time.time()
try: # 114.116.116.241 try: # 114.116.116.241
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], max_request_size=1024 * 1024 * 20) producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], max_request_size=1024 * 1024 * 20)
kafka_result = producer.send("researchReportTopic", kafka_result = producer.send("researchReportYearTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8')) json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10)) print(kafka_result.get(timeout=10))
......
...@@ -31,7 +31,7 @@ pathType = 'QYYearReport/' ...@@ -31,7 +31,7 @@ pathType = 'QYYearReport/'
def sendKafka(dic_news): def sendKafka(dic_news):
try: # 114.116.116.241 try: # 114.116.116.241
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'],max_request_size=1024*1024*20) producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'],max_request_size=1024*1024*20)
kafka_result = producer.send("researchReportTopic", kafka_result = producer.send("researchReportYearTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8')) json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10)) print(kafka_result.get(timeout=10))
...@@ -93,7 +93,11 @@ def main(): ...@@ -93,7 +93,11 @@ def main():
redis_conn = redis.Redis(connection_pool=pool) redis_conn = redis.Redis(connection_pool=pool)
# info_ = redis_conn.lpop("NoIPO:info") # info_ = redis_conn.lpop("NoIPO:info")
info_ = '91310000132206289R|1725799077425945040|2022' info_list = ['91130100236018805C|18703781588|2018', '915203002147892034|18703781589|2013',
'913200007455797746|18703781592|2018', '91440500723817938W|18703781594|2019',
'91340000704920454F|18703781596|2021']
for info_ in info_list:
if info_: if info_:
pass pass
else: else:
...@@ -178,7 +182,7 @@ def run_threads(num_threads): ...@@ -178,7 +182,7 @@ def run_threads(num_threads):
if __name__ == "__main__": if __name__ == "__main__":
while True: while True:
start = time.time() start = time.time()
num_threads = 5 num_threads =1
run_threads(num_threads) run_threads(num_threads)
log.info(f'5线程 总耗时{time.time() - start}秒') log.info(f'5线程 总耗时{time.time() - start}秒')
\ No newline at end of file
"""
将需要新增的企业入redis
"""
import json
import time
import requests
from bs4 import BeautifulSoup
from selenium import webdriver
import urllib3
from base.BaseCore import BaseCore
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
baseCore = BaseCore()
chromedriver = "./chromedriver"
browser = webdriver.Chrome(chromedriver)
taskType = '上市信息/东方财富网'
log = baseCore.getLogger()
error_list = []
list_all_info = []
# 需要提供股票代码、企业信用代码
while True:
com_code1 = baseCore.redicPullData('Ipo_newsAdd:comCode')
start = time.time()
# 股票代码0、2、3开头的为深圳交易所,6、9开头的为上海交易所,8开头的为北京交易所
if com_code1[0] == '2' or com_code1[0] == '0' or com_code1[0] == '3':
com_code = 'sz' + com_code1
if com_code1[0] == '9' or com_code1[0] == '6':
com_code = 'sh' + com_code1
if com_code1[0] == '8' or com_code1[0] == '4':
com_code = 'bj' + com_code1
if com_code1[0] == 'A':
com_code = ''
log.info(f'======开始采集{com_code}======')
url = f'https://quote.eastmoney.com/{com_code}.html'
url_1 = f'https://emweb.eastmoney.com/PC_HSF10/CompanySurvey/PageAjax?code={com_code}'
url_2 = f'https://emweb.eastmoney.com/PC_HSF10/BusinessAnalysis/PageAjax?code={com_code}'
# browser.get(url)
# time.sleep(8)
# page_source = browser.page_source
# soup_t = BeautifulSoup(page_source, 'html.parser')
# try:
# result = soup_t.find('div',class_='quote_quotenums').text
# # print(f'result:{result}')
# # if result=='未上市'or result=='已退市':
# if result == '未上市' :
# continue
# if result == '已退市':
# tag = 0
# else:
# tag = 1
# except Exception as e:
# error_list.append(com_code)
# log.info(f'={com_code}===解析上市状态失败=====')
# state = 0
# takeTime = baseCore.getTimeCost(start, time.time())
# baseCore.recordLog('', taskType, state, takeTime, '', f'{com_code}解析上市状态失败--e:{e}')
# print('error')
requests.adapters.DEFAULT_RETRIES = 5
json_1 = requests.get(url_1,verify=False).json()
json_2 = requests.get(url_2,verify=False).json()
# SECURITY_TYPE
try:
jys = json_1['jbzl'][0]['TRADE_MARKET']
except Exception as e:
log.info(f'====={com_code}=====解析交易所失败======')
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog('', taskType, state, takeTime, '', f'{com_code}解析交易所失败--e:{e}')
continue
try:
if "上海" in jys:
jys_code = '2'
if "深圳" in jys:
jys_code = '3'
except:
jys = json_1['jbzl'][0]['SECURITY_TYPE']
if "北京" in jys:
jys_code = '1'
short_name = json_1['jbzl'][0]['STR_NAMEA']
zhengquan_type = json_1['jbzl'][0]['SECURITY_TYPE']
# print(zhengquan_type)
if 'A' in zhengquan_type:
# print(zhengquan_type)
category = '1'
if 'B' in zhengquan_type:
category = '2'
if '新三板' in zhengquan_type:
category = '3'
if 'H' in zhengquan_type:
category = '4'
id_code = json_1['jbzl'][0]['REG_NUM']
dongcai = json_1['jbzl'][0]['EM2016']
zhengjian = json_1['jbzl'][0]['INDUSTRYCSRC1']
try:
shangshishijian = json_1['fxxg'][0]['LISTING_DATE'][:10]
except:
shangshishijian = ''
zhuyingfanwei = json_2['zyfw'][0]['BUSINESS_SCOPE']
dic_cwsj = {
"exchange": jys_code,
"category": category, # 股票类型(1-A股;2-B股;3-新三板;4-H股)
'listed': '1',
"listingDate": shangshishijian,
"securitiesCode": com_code[2:],
"securitiesShortName": short_name,
"securitiesType": zhengquan_type,
"socialCreditCode": id_code,
"businessScope": zhuyingfanwei,
"eastIndustry": dongcai,
"csrcIndustry": zhengjian
}
list_all_info.append(dic_cwsj)
log.info(f'======{com_code}====采集成功=====')
# 通过接口将数据保存进数据库
for num in range(0, len(list_all_info),100):
json_updata = json.dumps(list_all_info[num:num+100])
# print(json_updata)
try:
response = requests.post('http://114.115.236.206:8088/sync/enterpriseIpo', data=json_updata, timeout=300,
verify=False)
except Exception as e:
print(e)
print("{}:到:{}".format(num, num + 100))
print(response.text)
...@@ -31,11 +31,35 @@ class EsMethod(object): ...@@ -31,11 +31,35 @@ class EsMethod(object):
def queryatt(self,index_name,pnum): def queryatt(self,index_name,pnum):
body = { body = {
"query": {
"bool": {
"must": [
{
"nested" : {
"query" : {
"bool" : {
"must" : [
{
"match_phrase" : {
"labels.relationId" : {
"query" : "1677"
}
}
}
]
}
},
"path" : "labels"
}
}
]
}
},
"size":0, "size":0,
"aggs":{ "aggs":{
"duplicate_titles":{ "duplicate_titles":{
"terms":{ "terms":{
"field":"sourceAddress.keyword", "field":"title.keyword",
"min_doc_count":2, "min_doc_count":2,
"size":1000 "size":1000
}, },
...@@ -43,7 +67,7 @@ class EsMethod(object): ...@@ -43,7 +67,7 @@ class EsMethod(object):
"duplicate_docs":{ "duplicate_docs":{
"top_hits":{ "top_hits":{
"_source":{ "_source":{
"includes":["id","title","subjectId","sourceAddress","createDate"] "includes":["id","title","subjectId","sourceAddress","createDate","labels.relationId","attachmentIds"]
}, },
"size":10 "size":10
} }
...@@ -66,6 +90,12 @@ class EsMethod(object): ...@@ -66,6 +90,12 @@ class EsMethod(object):
# log.info(result) # log.info(result)
return result return result
def delete(self, index_name, id):
result = self.es.delete(index=index_name
, doc_type="_doc"
, id=id)
log.info('删除结果 %s' % result)
def main(page, p, esMethod): def main(page, p, esMethod):
result = esMethod.queryatt(index_name=esMethod.index_name, pnum=p) result = esMethod.queryatt(index_name=esMethod.index_name, pnum=p)
...@@ -75,18 +105,10 @@ def main(page, p, esMethod): ...@@ -75,18 +105,10 @@ def main(page, p, esMethod):
log.info('++++已没有数据+++++') log.info('++++已没有数据+++++')
return return
documents = result["aggregations"]["duplicate_titles"]["buckets"] documents = result["aggregations"]["duplicate_titles"]["buckets"]
for bucket in documents: unique_document_ids = [bucket["duplicate_docs"]["hits"]["hits"][-1]["_id"] for bucket in documents]
info_list = bucket["duplicate_docs"]["hits"]["hits"] # 删除重复的文档
for info in info_list: for doc_id in unique_document_ids:
esMethod.delete(index_name="policy", id=doc_id)
att_id_list = info['_source']['attachmentIds']
if len(att_id_list)==0:
unique_document_ids = info["_id"]
log.info(f'==={unique_document_ids}===')
# # 删除重复的文档
# for doc_id in unique_document_ids:
# esMethod.delete(index="policy", id=doc_id)
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论