提交 c0b05e59 作者: 薛凌堃

11/23

上级 d4632531
...@@ -122,6 +122,33 @@ def NoticeEnterprise_task(): ...@@ -122,6 +122,33 @@ def NoticeEnterprise_task():
print('定时采集异常', e) print('定时采集异常', e)
pass pass
# 东方财富网公告 美股 港股
def NoticeDF():
cnx, cursor = connectSql()
# 获取美股企业
mg_query = "SELECT a.SocialCode From EnterpriseInfo a ,EnterpriseType b WHERE a.SocialCode = b.SocialCode AND b.type=2 AND a.Place=0 AND SecuritiesCode is not null AND SecuritiesCode not LIKE '%.%'"
cursor.execute(mg_query)
cnx.commit()
mg_result = cursor.fetchall()
mg_social_list = [item[0] for item in mg_result]
print('=======')
for item in mg_social_list:
r.rpush('NoticeEnterprise:mgqy_socialCode_add', item)
# 获取港股企业
gg_query = "SELECT a.SocialCode From EnterpriseInfo a ,EnterpriseType b WHERE a.SocialCode = b.SocialCode AND b.type=2 And SecuritiesCode like '%.HK'"
cursor.execute(gg_query)
cnx.commit()
gg_result = cursor.fetchall()
gg_social_list = [item[0] for item in gg_result]
print('=======')
for item in gg_social_list:
r.rpush('NoticeEnterprise:ggqy_socialCode_add', item)
closeSql(cnx, cursor)
#企业基本信息 #企业基本信息
def BaseInfoEnterprise(): def BaseInfoEnterprise():
cnx,cursor = connectSql() cnx,cursor = connectSql()
...@@ -564,7 +591,8 @@ if __name__ == "__main__": ...@@ -564,7 +591,8 @@ if __name__ == "__main__":
# kegaishifan() # kegaishifan()
# shuangbaiqiye() # shuangbaiqiye()
# zhuangjingtexind() # zhuangjingtexind()
NoticeEnterprise() # NoticeEnterprise()
NoticeDF()
# AnnualEnterpriseIPO() # AnnualEnterpriseIPO()
# AnnualEnterprise() # AnnualEnterprise()
# BaseInfoEnterprise() # BaseInfoEnterprise()
......
...@@ -27,14 +27,15 @@ requests.adapters.DEFAULT_RETRIES = 5 ...@@ -27,14 +27,15 @@ requests.adapters.DEFAULT_RETRIES = 5
def doJob(): def doJob():
while True: while True:
# 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息 # 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息
social_code = baseCore.redicPullData('CorPersonEnterprise:gnqy_socialCode') # social_code = baseCore.redicPullData('CorPersonEnterprise:gnqy_socialCode')
# 判断 如果Redis中已经没有数据,则等待 # 判断 如果Redis中已经没有数据,则等待
# social_code = '92640302MA76KNNT0D' social_code = '91510000207312079C'
if 'ZZSN' in social_code:
continue
if social_code == None: if social_code == None:
time.sleep(20) time.sleep(20)
continue continue
if 'ZZSN' in social_code:
continue
start = time.time() start = time.time()
try: try:
# data = baseCore.getInfomation(social_code) # data = baseCore.getInfomation(social_code)
......
# -*- coding: utf-8 -*-
import json
import re
import threading
import time
import uuid
import fitz
import redis
import requests
from bs4 import BeautifulSoup
from obs import ObsClient
from retry import retry
from elasticsearch import Elasticsearch
from base import BaseCore
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
baseCore = BaseCore.BaseCore()
cnx_ = baseCore.cnx_
cursor_ = cnx_.cursor()
lock = threading.Lock()
pathType_ = 'QYResearchReport/'
taskType = '企业研报/东方财富网'
pool = redis.ConnectionPool(host="114.115.236.206", port=6379, password='clbzzsn', db=6)
obsClient = ObsClient(
access_key_id='VEHN7D0TJ9316H8AHCAV', # 你的华为云的ak码
secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk
server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址
)
class EsMethod(object):
def __init__(self):
# 创建Elasticsearch对象,并提供账号信息
self.es = Elasticsearch(['http://114.116.19.92:9700'], http_auth=('elastic', 'zzsn9988'), timeout=300)
self.index_name = 'researchreportdata'
def queryatt(self,index_name,id):
body = {
"query": {
"match": {
"id": id
}
}
}
filter_path = ['hits.hits._id',
'hits.total.value',
'hits.hits._source.title',
'hits.hits._source.origin',
'hits.hits._source.sourceAddress',
'hits.hits._source.createDate',
'hits.hits._source.publishDate',
] # 字段2
result = self.es.search(index=index_name
, doc_type='_doc'
, filter_path=filter_path
, body=body)
# log.info(result)
return result
def updateaunn(self,index_name,id,u_publishDate):
body = {
'doc': {
'publishDate':u_publishDate
}
}
result = self.es.update(index=index_name
,id=id
,body=body)
log.info('更新结果:%s' % result)
if __name__ == "__main__":
esMethod = EsMethod()
redis_conn = redis.Redis(connection_pool=pool)
while True:
id_ = redis_conn.lpop('YanBao:up')
# id = "23112104300"
if id:
pass
else:
log.info('已无数据')
id = id_.decode()
result_ = esMethod.queryatt(index_name=esMethod.index_name, id=id)
result = result_['hits']['hits'][0]
num = 0
publishDate = result['_source']['publishDate']
u_publishDate = '2023-08-31' #+ publishDate.split('T')[1]
esMethod.updateaunn(esMethod.index_name, str(id), u_publishDate)
import requests,time, json, sys import os
import os
import uuid
import requests,time, json, sys import requests,time, json, sys
from kafka import KafkaProducer from kafka import KafkaProducer
from retry import retry
from base import BaseCore from base import BaseCore
from obs import ObsClient from obs import ObsClient
import fitz import fitz
...@@ -17,7 +22,7 @@ obsClient = ObsClient( ...@@ -17,7 +22,7 @@ obsClient = ObsClient(
secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk
server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址 server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址
) )
pathType = 'QYNotice/'
#获取文件大小 #获取文件大小
def convert_size(size_bytes): def convert_size(size_bytes):
# 定义不同单位的转换值 # 定义不同单位的转换值
...@@ -28,35 +33,34 @@ def convert_size(size_bytes): ...@@ -28,35 +33,34 @@ def convert_size(size_bytes):
i += 1 i += 1
return f"{size_bytes:.2f} {units[i]}" return f"{size_bytes:.2f} {units[i]}"
def getuuid():
get_timestamp_uuid = uuid.uuid1() # 根据 时间戳生成 uuid , 保证全球唯一
return get_timestamp_uuid
def uptoOBS(pdf_url,pdf_name,type_id,social_code): def uptoOBS(pdf_url,pdf_name,type_id,social_code):
headers = {} headers = {}
category = os.path.splitext(pdf_url)[1]
retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': '', 'path': '', retData = {'state': False, 'type_id': type_id, 'item_id': social_code, 'group_name': '', 'path': '',
'full_path': '', 'full_path': '',
'category': 'pdf', 'file_size': '', 'status': 1, 'create_by': 'XueLingKun', 'category': category, 'file_size': '', 'status': 1, 'create_by': 'XueLingKun',
'create_time': '', 'page_size': '', 'content': ''} 'create_time': '', 'page_size': '', 'content': ''}
headers['User-Agent'] = baseCore.getRandomUserAgent() headers['User-Agent'] = baseCore.getRandomUserAgent()
for i in range(0, 3): for i in range(0, 3):
try: try:
response = requests.get(pdf_url, headers=headers,verify=False, timeout=20) response = requests.get(pdf_url, headers=headers,verify=False, timeout=20)
file_size = int(response.headers.get('Content-Length')) file_size = int(response.headers.get('Content-Length'))
retData['content'] = response.text
break break
except: except:
time.sleep(3) time.sleep(3)
continue continue
page_size = 0 page_size = 1
for i in range(0, 3): name = str(getuuid()) + category
try: try:
name = pdf_name result = getOBSres(pathType, name, response)
now_time = time.strftime("%Y-%m")
result = obsClient.putContent('zzsn', 'QYNotice/'+name, content=response.content)
with fitz.open(stream=response.content, filetype='pdf') as doc:
page_size = doc.page_count
for page in doc.pages():
retData['content'] += page.get_text()
break
except: except:
time.sleep(3) log.error(f'OBS发送失败')
continue return retData
if page_size < 1: if page_size < 1:
# pdf解析失败 # pdf解析失败
...@@ -79,11 +83,16 @@ def uptoOBS(pdf_url,pdf_name,type_id,social_code): ...@@ -79,11 +83,16 @@ def uptoOBS(pdf_url,pdf_name,type_id,social_code):
return retData return retData
def secrchATT(item_id, name, type_id,order_by): @retry(tries=3, delay=1)
sel_sql = '''select id from clb_sys_attachment where item_id = %s and name = %s and type_id=%s and order_by=%s ''' def getOBSres(pathType,name, response):
cursor_.execute(sel_sql, (item_id, name+'.pdf', type_id,order_by)) result = obsClient.putContent('zzsn', pathType + name, content=response.content)
select = cursor_.fetchall() # resp = obsClient.putFile('zzsn', pathType + name, file_path='要上传的那个文件的本地路径')
selects = select[-1] return result
def secrchATT(item_id, retData, type_id):
sel_sql = f"select id from clb_sys_attachment where item_id = '{item_id}' and path = '{retData['path']}' and type_id={type_id} "
cursor_.execute(sel_sql)
selects = cursor_.fetchone()
return selects return selects
# 插入到att表 返回附件id # 插入到att表 返回附件id
...@@ -111,15 +120,18 @@ def tableUpdate(retData, com_name, year, pdf_name, num): ...@@ -111,15 +120,18 @@ def tableUpdate(retData, com_name, year, pdf_name, num):
Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,page_size,object_key,bucket_name) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''' Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,page_size,object_key,bucket_name) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
values = ( values = (
year, pdf_name+'.pdf', type_id, item_id, group_name, path, full_path, category, file_size, order_by, year, pdf_name+category, type_id, item_id, group_name, path, full_path, category, file_size, order_by,
status, create_by, status, create_by,
create_time, page_size,full_path.split('https://zzsn.obs.cn-north-1.myhuaweicloud.com/')[1],'zzsn') create_time, page_size,full_path.split('https://zzsn.obs.cn-north-1.myhuaweicloud.com/')[1],'zzsn')
cursor_.execute(Upsql, values) # 插入 cursor_.execute(Upsql, values) # 插入
cnx_.commit() # 提交 cnx_.commit() # 提交
except Exception as e: except Exception as e:
print(e) print(e)
log.info(f"更新完成:{item_id}===={pdf_name+'.pdf'}") log.info(f"更新完成:{item_id}===={pdf_name+category}")
selects = secrchATT(item_id, pdf_name, type_id,order_by) try:
selects = secrchATT(item_id, pdf_name, type_id)
except Exception as e:
log.info(e)
id = selects[0] id = selects[0]
return id return id
...@@ -153,7 +165,7 @@ def InsterInto(social_code, pdf_url,pub_time,pdf_name): ...@@ -153,7 +165,7 @@ def InsterInto(social_code, pdf_url,pub_time,pdf_name):
def ifInstert(short_name, social_code, pdf_url): def ifInstert(short_name, social_code, pdf_url):
ifexist = True ifexist = True
sel_sql = '''select social_credit_code,source_address from brpa_source_article where social_credit_code = %s and source_address = %s and origin='证监会' and type='1' ''' sel_sql = '''select social_credit_code,source_address from brpa_source_article where social_credit_code = %s and source_address = %s and origin='东方财富网' and type='1' '''
cursor.execute(sel_sql, (social_code, pdf_url)) cursor.execute(sel_sql, (social_code, pdf_url))
selects = cursor.fetchone() selects = cursor.fetchone()
#如果数据库中存在 则跳过 #如果数据库中存在 则跳过
...@@ -234,32 +246,42 @@ def GetContent(pdf_url,info_url, pdf_name, social_code, year, pub_time, start_ti ...@@ -234,32 +246,42 @@ def GetContent(pdf_url,info_url, pdf_name, social_code, year, pub_time, start_ti
def gonggao_info(dic_info): def gonggao_info(dic_info):
code = dic_info[3] code = dic_info[3]
com_name = dic_info[4] com_name = dic_info[1]
social__code = dic_info[2] social__code = dic_info[2]
if 'HK' in code: if 'HK' in code:
# browser.quit() # browser.quit()
return return
code1 = str(code) # code1 = str(code)
while True: # while True:
if len(code1) < 6: # if len(code1) < 6:
code1 = '0' + code1 # code1 = '0' + code1
else: # else:
break # break
if code1[0] == '0' or code1[0] == '3' or code[0] == '2': # if code1[0] == '0' or code1[0] == '3' or code[0] == '2':
com_code = 'SZ' + code1 # com_code = 'SZ' + code1
elif code1[0] == '6' or code1[0] == '9': # elif code1[0] == '6' or code1[0] == '9':
com_code = 'SH' + code1 # com_code = 'SH' + code1
elif code1[0] == '8' or code1[0] == '4': # elif code1[0] == '8' or code1[0] == '4':
com_code = 'BJ' + code1 # com_code = 'BJ' + code1
break_id = 0 url = f'https://np-anotice-stock.eastmoney.com/api/security/ann?sr=-1&page_size=50&page_index=1&ann_type=U%2CU_Pink&client_source=web&stock_list={code}'
for page1 in range(1, 100): for n1 in range(0, 3):
if break_id == 1: try:
res = requests.get(url, verify=False)
break break
url = f'https://np-anotice-stock.eastmoney.com/api/security/ann?sr=-1&page_size=50&page_index={page1}&ann_type=A&client_source=web&stock_list={code1}&f_node=0&s_node=0' except:
if n1 == 2:
sys.exit(0)
time.sleep(5)
continue
res_json = res.json()
total_hits = res_json['data']['total_hits']
for page1 in range(1,total_hits+1):
# url = f'https://np-anotice-stock.eastmoney.com/api/security/ann?sr=-1&page_size=50&page_index={page1}&ann_type=A&client_source=web&stock_list={code1}&f_node=0&s_node=0'
url = f'https://np-anotice-stock.eastmoney.com/api/security/ann?sr=-1&page_size=50&page_index={page1}&ann_type=U%2CU_Pink&client_source=web&stock_list={code}'
for n1 in range(0, 3): for n1 in range(0, 3):
try: try:
res = requests.get(url, verify=False) res = requests.get(url, verify=False)
...@@ -269,7 +291,6 @@ def gonggao_info(dic_info): ...@@ -269,7 +291,6 @@ def gonggao_info(dic_info):
sys.exit(0) sys.exit(0)
time.sleep(5) time.sleep(5)
continue continue
res_json = res.json() res_json = res.json()
list_all = res_json['data']['list'] list_all = res_json['data']['list']
if list_all: if list_all:
...@@ -277,19 +298,18 @@ def gonggao_info(dic_info): ...@@ -277,19 +298,18 @@ def gonggao_info(dic_info):
title = one_info['title'] title = one_info['title']
info_date = one_info['notice_date'] info_date = one_info['notice_date']
year = info_date[:4] year = info_date[:4]
if page1 > 1 and '2022' in info_date: # if page1 > 1 and '2022' in info_date:
break_id = 1 # break_id = 1
break # break
if '2021' in info_date: # 只采集22年以后的数据 # if '2021' in info_date: # 只采集22年以后的数据
break_id = 1 # break_id = 1
break # break
try: try:
info_type = one_info['columns'][0]['column_name'] info_type = one_info['columns'][0]['column_name']
except: except:
info_type = '' info_type = ''
art_code = one_info['art_code'] art_code = one_info['art_code']
info_url = 'https://data.eastmoney.com/notices/detail/' + com_code + '/' + art_code + '.html' info_url = 'https://data.eastmoney.com/notices/detail/' + code + '/' + art_code + '.html'
t = int(time.time() * 1000) t = int(time.time() * 1000)
json_url = f'https://np-cnotice-stock.eastmoney.com/api/content/ann?art_code={art_code}&client_source=web&page_index=1&_={t}' json_url = f'https://np-cnotice-stock.eastmoney.com/api/content/ann?art_code={art_code}&client_source=web&page_index=1&_={t}'
...@@ -332,33 +352,6 @@ def gonggao_info(dic_info): ...@@ -332,33 +352,6 @@ def gonggao_info(dic_info):
log.info(f'======={com_name}========{code}===已存在') log.info(f'======={com_name}========{code}===已存在')
continue continue
# list_info = [
# social_code,
# title,
# info_content[:2000],
# info_date,
# info_url,
# pdf_url,
# '东方财富网',
# '1',
# 'zh'
# ]
# # list_all_info.append(tuple(list_info))
# with cnx.cursor() as cursor:
# sel_sql = '''select social_credit_code from brpa_source_article where source_address = %s '''
#
# cursor.execute(sel_sql, info_url)
# selects = cursor.fetchall()
# if selects:
# break
# else:
# #todo:取消入库操作
# insert_sql = '''insert into brpa_source_article(social_credit_code,title,summary,publish_date,source_address,pdf_address,origin,type,lang) values(%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
# cursor.execute(insert_sql, tuple(list_info))
# cnx.commit()
# else:
# break
if __name__ =='__main__': if __name__ =='__main__':
#从redis中读取social_code' #从redis中读取social_code'
...@@ -370,7 +363,7 @@ if __name__ =='__main__': ...@@ -370,7 +363,7 @@ if __name__ =='__main__':
start_time = time.time() start_time = time.time()
# 获取企业信息 # 获取企业信息
# social_code = baseCore.redicPullData('NoticeEnterpriseEasteFinance:gnshqy_socialCode') # social_code = baseCore.redicPullData('NoticeEnterpriseEasteFinance:gnshqy_socialCode')
social_code = '911100007109288314' social_code = 'ZZSN23030900000316'
if not social_code: if not social_code:
time.sleep(20) time.sleep(20)
continue continue
...@@ -383,9 +376,10 @@ if __name__ =='__main__': ...@@ -383,9 +376,10 @@ if __name__ =='__main__':
dic_info = baseCore.getInfomation(social_code) dic_info = baseCore.getInfomation(social_code)
count = dic_info[15] count = dic_info[15]
code = dic_info[3] code = dic_info[3]
com_name = dic_info[4] com_name = dic_info[1]
log.info(f'-----开始处理{com_name}----{social_code}------') log.info(f'-----开始处理{com_name}----{social_code}------')
gonggao_info(dic_info) gonggao_info(dic_info)
break
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论