提交 e00e2f5b 作者: 薛凌堃

11/28

上级 119a9a33
import os
import time
import requests
from retry import retry
from base.BaseCore import BaseCore
baseCore = BaseCore()
log = baseCore.getLogger()
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF WindowsWechat(0x6309080f)XWEB/8461',
'Content-Type': 'application/octet-stream',
}
@retry(tries=3, delay=5)
def getContent(url):
req = requests.get(url, headers=headers,timeout=120)
if req.status_code != 200:
raise
req.encoding = req.apparent_encoding
content = req.content
return content
if __name__ == '__main__':
url_list = []
name_list = []
count_dict = {}
while True:
item = baseCore.redicPullData('Download:gwshrfe')
if not item or item == 'None':
log.info('已没有数据')
continue
if 'http' not in item:
# 文件名字
file_name_ = item
if file_name_ in name_list:
count_dict[file_name_] += 1
file_name = file_name_ + '_' + str(count_dict[file_name_])
else:
count_dict[file_name_] = 1
file_name = file_name_
name_list.append(file_name_)
continue
else:
# 说明是链接
url = item
if url in url_list:
log.info(f'{url}该链接已处理过')
continue
log.info(f'{file_name}==={url}===开始采集')
try:
content = getContent(url)
except:
# r.sadd('gwshrfe', url)
log.error(f'{file_name}==={url}===解析失败')
time.sleep(2)
continue
# 需加上后缀
category = os.path.splitext(url)[1]
path = f'./文件1/{file_name}'
if not os.path.exists(path):
os.makedirs(path)
file = f'{path}/{file_name}{category}'
try:
with open(file, 'wb') as f:
f.write(content)
log.info(f'{url}===下载成功')
except:
log.error(f'{url}===下载失败')
url_list.append(url)
time.sleep(2)
baseCore.close()
\ No newline at end of file
import http.server
import socketserver
PORT = 8001
DIRECTORY = r"D:\kkwork\zzsn_spider\SASAC"
Handler = http.server.SimpleHTTPRequestHandler
with socketserver.TCPServer(("", PORT), Handler) as httpd:
print("Serving at port", PORT)
httpd.RequestHandlerClass.directory = DIRECTORY
httpd.serve_forever()
\ No newline at end of file
...@@ -728,6 +728,12 @@ class BaseCore: ...@@ -728,6 +728,12 @@ class BaseCore:
# #
# return retData # return retData
def deliteATT(self,id):
delitesql = f"delete from clb_sys_attachment where id = '{id}' "
self.cursor_.execute(delitesql)
self.cnx_.commit()
def secrchATT(self, item_id, year, type_id): def secrchATT(self, item_id, year, type_id):
sel_sql = '''select id from clb_sys_attachment where item_id = %s and year = %s and type_id=%s ''' sel_sql = '''select id from clb_sys_attachment where item_id = %s and year = %s and type_id=%s '''
self.cursor_.execute(sel_sql, (item_id, year, type_id)) self.cursor_.execute(sel_sql, (item_id, year, type_id))
......
...@@ -126,14 +126,19 @@ def NoticeEnterprise_task(): ...@@ -126,14 +126,19 @@ def NoticeEnterprise_task():
def NoticeDF(): def NoticeDF():
cnx, cursor = connectSql() cnx, cursor = connectSql()
# 获取美股企业 # 获取美股企业
mg_query = "SELECT a.SocialCode From EnterpriseInfo a ,EnterpriseType b WHERE a.SocialCode = b.SocialCode AND b.type=2 AND a.Place=0 AND SecuritiesCode is not null AND SecuritiesCode not LIKE '%.%'" # # mg_query = "SELECT a.SocialCode From EnterpriseInfo a ,EnterpriseType b WHERE a.SocialCode = b.SocialCode AND b.type=2 AND a.Place=0 AND SecuritiesCode is not null AND SecuritiesCode not LIKE '%.%'"
cursor.execute(mg_query) # mg_query = "SELECT a.SocialCode From EnterpriseInfo a ,EnterpriseType b WHERE a.SocialCode = b.SocialCode AND b.type=3 AND a.Place=0 AND SecuritiesCode is not null AND SecuritiesCode not LIKE '%.%'"
cnx.commit() # cursor.execute(mg_query)
mg_result = cursor.fetchall() # cnx.commit()
mg_social_list = [item[0] for item in mg_result] # mg_result = cursor.fetchall()
print('=======') # mg_social_list = [item[0] for item in mg_result]
for item in mg_social_list: # print('=======')
r.rpush('NoticeEnterprise:mgqy_socialCode_add', item) # for item in mg_social_list:
# if r.lrem('NoticeEnterprise:mgqy_socialCode_add', 0, item) == 0:
# r.lpush('NoticeEnterprise:mgqy_socialCode_add', item)
# else:
# continue
# # r.rpush('NoticeEnterprise:mgqy_socialCode_add', item)
# 获取港股企业 # 获取港股企业
gg_query = "SELECT a.SocialCode From EnterpriseInfo a ,EnterpriseType b WHERE a.SocialCode = b.SocialCode AND b.type=2 And SecuritiesCode like '%.HK'" gg_query = "SELECT a.SocialCode From EnterpriseInfo a ,EnterpriseType b WHERE a.SocialCode = b.SocialCode AND b.type=2 And SecuritiesCode like '%.HK'"
......
import os import os
import os import os
import uuid import uuid
import fitz
import requests,time, json, sys import requests
from bs4 import BeautifulSoup
import time, json
from kafka import KafkaProducer from kafka import KafkaProducer
from obs import ObsClient
from urllib.parse import unquote
from retry import retry from retry import retry
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from base import BaseCore from base import BaseCore
from obs import ObsClient
import fitz
from urllib.parse import unquote
baseCore = BaseCore.BaseCore() baseCore = BaseCore.BaseCore()
log = baseCore.getLogger() log = baseCore.getLogger()
cnx = baseCore.cnx cnx = baseCore.cnx
cursor = baseCore.cursor cursor = baseCore.cursor
pathType = 'QYNotice/'
cnx_ = baseCore.cnx_ cnx_ = baseCore.cnx_
cursor_ = baseCore.cursor_ cursor_ = baseCore.cursor_
...@@ -22,7 +33,7 @@ obsClient = ObsClient( ...@@ -22,7 +33,7 @@ obsClient = ObsClient(
secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk
server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址 server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址
) )
pathType = 'QYNotice/'
#获取文件大小 #获取文件大小
def convert_size(size_bytes): def convert_size(size_bytes):
# 定义不同单位的转换值 # 定义不同单位的转换值
...@@ -54,14 +65,19 @@ def uptoOBS(pdf_url,pdf_name,type_id,social_code): ...@@ -54,14 +65,19 @@ def uptoOBS(pdf_url,pdf_name,type_id,social_code):
except: except:
time.sleep(3) time.sleep(3)
continue continue
page_size = 1
name = str(getuuid()) + category name = str(getuuid()) + category
try: try:
result = getOBSres(pathType, name, response) result = getOBSres(pathType, name, response)
except: except:
log.error(f'OBS发送失败') log.error(f'OBS发送失败')
return retData return retData
try:
with fitz.open(stream=response.content, filetype='pdf') as doc:
page_size = doc.page_count
except:
log.error(f'文件损坏')
return retData
if page_size < 1: if page_size < 1:
# pdf解析失败 # pdf解析失败
# print(f'======pdf解析失败=====') # print(f'======pdf解析失败=====')
...@@ -126,12 +142,13 @@ def tableUpdate(retData, com_name, year, pdf_name, num): ...@@ -126,12 +142,13 @@ def tableUpdate(retData, com_name, year, pdf_name, num):
cursor_.execute(Upsql, values) # 插入 cursor_.execute(Upsql, values) # 插入
cnx_.commit() # 提交 cnx_.commit() # 提交
except Exception as e: except Exception as e:
print(e) log.info(e)
log.info(f"更新完成:{item_id}===={pdf_name+category}") log.info(f"更新完成:{item_id}===={pdf_name+category}")
try: try:
selects = secrchATT(item_id, retData, type_id) selects = secrchATT(item_id, retData, type_id)
except Exception as e: except Exception as e:
log.info(e) log.info(e)
return ''
id = selects[0] id = selects[0]
return id return id
...@@ -176,48 +193,10 @@ def ifInstert(short_name, social_code, pdf_url): ...@@ -176,48 +193,10 @@ def ifInstert(short_name, social_code, pdf_url):
else: else:
return ifexist return ifexist
def GetContent(pdf_url,info_url, pdf_name, social_code, year, pub_time, start_time,com_name,num): def sendKafka(social_code,newsUrl,dic_news):
# 上传至华为云服务器
retData = uptoOBS(pdf_url, pdf_name, 8, social_code)
# 附件插入att数据库
if retData['state']:
pass
else:
log.info(f'====pdf解析失败====')
return False
num = num + 1
att_id = tableUpdate(retData, com_name, year, pdf_name, num)
if att_id:
pass
else:
return False
content = retData['content']
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_news = {
'attachmentIds': att_id,
'author': '',
'content': content,
'contentWithTag': '',
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': 'zh',
'origin': '东方财富网',
'publishDate': pub_time,
'sid': '1684032033495392257',
'sourceAddress': info_url, # 原文链接
'summary': '',
'title': pdf_name.replace('.pdf', ''),
'type': 3,
'socialCreditCode': social_code,
'year': year
}
# print(dic_news)
# 将相应字段通过kafka传输保存
try: try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092']) producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], max_request_size=1024*1024*20)
kafka_result = producer.send("researchReportTopic", kafka_result = producer.send("researchReportTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8')) json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
...@@ -239,118 +218,150 @@ def GetContent(pdf_url,info_url, pdf_name, social_code, year, pub_time, start_ti ...@@ -239,118 +218,150 @@ def GetContent(pdf_url,info_url, pdf_name, social_code, year, pub_time, start_ti
} }
state = 0 state = 0
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, 'Kafka操作失败') baseCore.recordLog(social_code, taskType, state, takeTime, newsUrl, 'Kafka操作失败')
log.info(dic_result) log.info(dic_result)
return False return False
def spider(browser, code, social_code, com_name):
num = 0
page_source = browser.page_source
soup = BeautifulSoup(page_source, 'html.parser')
now_page = int(soup.find_all('div', class_='mbox')[-1].find('span', class_='active').text)
li_list = soup.find('div', class_='notice').find_all('li')
log.info(f'----{com_name}--{code}--第{now_page}页开始处理-----')
for li in li_list:
publishDate = li.find('span').text
year = publishDate[:4]
newsUrl = 'https://np-info.eastmoney.com/pc/notice/?art_code=' + li.find('a')['data-code']
title = li.find('a').text
if ifInstert(com_name, social_code, title):
pass
else:
continue
browser2 = createDriver()
browser2.get(newsUrl)
wait = WebDriverWait(browser2, 30)
wait.until(EC.presence_of_element_located((By.ID, "render-html")))
page_source = browser2.page_source
soup_news = BeautifulSoup(page_source, 'html.parser')
contentWithTag = soup_news.find('div', id='render-html')
content = contentWithTag.text
# 判断有无附件
try:
browser2.find_element(By.CLASS_NAME, 'download-list').click()
time.sleep(0.5)
browser2.switch_to.window(browser2.window_handles[-1])
pdf_url = browser2.current_url
# 上传到obs
retData = uptoOBS(pdf_url, title, 8, social_code)
# 附件插入att数据库
if retData['state']:
pass
else:
log.info(f'====pdf解析失败====')
return False
num = num + 1
att_id = tableUpdate(retData, com_name, year, title, num)
if att_id:
pass
else:
return False
# content = retData['content']
# contentWithTag = ''
except:
att_id = ''
browser2.quit()
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_news = {
'attachmentIds': att_id,
'author': '',
'content': content,
'contentWithTag': str(contentWithTag),
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': 'zh',
'origin': '东方财富网',
'publishDate': publishDate,
'sid': '1684032033495392257',
'sourceAddress': newsUrl, # 原文链接
'summary': '',
'title': title,
'type': 3,
'socialCreditCode': social_code,
'year': year
}
if sendKafka(social_code,newsUrl,dic_news):
log.info(f'---{com_name}---{code}---第{now_page}页----采集成功---{newsUrl}')
insert = InsterInto(social_code, newsUrl, publishDate, title)
if insert:
log.info('====插入数据库成功====')
else:
log.info(f'失败---{title}----{att_id}---{social_code}')
# 删除插入的数据 400表示发送数据失败
baseCore.deliteATT(att_id)
log.info(f'已删除插入附件表的数据---{title}-----{social_code}')
time.sleep(1)
# 翻页功能
try:
browser.find_element(By.CLASS_NAME, 'next').click()
# continue
return spider(browser, code, social_code, com_name)
except:
# span_tag = browser.find_element(By.CLASS_NAME,'mbox')
span_tag = browser.find_element(By.XPATH, '//div[@class="mbox"]/span[2]')
current_page = int(span_tag.text)
totalpage = int(soup.find_all('div', class_='mbox')[-1].find_all('a')[-1].text)
if current_page < totalpage:
# 说明还未到最后一页
span_tag.find_element(By.XPATH, './following-sibling::a[1]').click()
return spider(browser, code, social_code, com_name)
else:
# 已经到最后一页
return
def createDriver():
chrome_driver = r'D:\cmd100\chromedriver.exe'
path = Service(chrome_driver)
chrome_options = webdriver.ChromeOptions()
chrome_options.binary_location = r'D:\Google\Chrome\Application\chrome.exe'
# 设置代理
# proxy = "127.0.0.1:8080" # 代理地址和端口
# chrome_options.add_argument('--proxy-server=http://' + proxy)
driver = webdriver.Chrome(service=path, chrome_options=chrome_options)
return driver
def gonggao_info(dic_info): def gonggao_info(dic_info):
# code = '00175.HK'
code = dic_info[3] code = dic_info[3]
com_name = dic_info[1] com_name = dic_info[1]
social__code = dic_info[2] social_code = dic_info[2]
if 'HK' in code: if 'HK' in code:
# browser.quit() pass
else:
return return
# code1 = str(code)
# while True:
# if len(code1) < 6:
# code1 = '0' + code1
# else:
# break
# if code1[0] == '0' or code1[0] == '3' or code[0] == '2':
# com_code = 'SZ' + code1
# elif code1[0] == '6' or code1[0] == '9':
# com_code = 'SH' + code1
# elif code1[0] == '8' or code1[0] == '4':
# com_code = 'BJ' + code1
url = f'https://np-anotice-stock.eastmoney.com/api/security/ann?sr=-1&page_size=50&page_index=1&ann_type=U%2CU_Pink&client_source=web&stock_list={code}'
for n1 in range(0, 3):
try:
res = requests.get(url, verify=False)
break
except:
if n1 == 2:
sys.exit(0)
time.sleep(5)
continue
res_json = res.json() # 模拟浏览器
total_hits = res_json['data']['total_hits']
for page1 in range(1,total_hits+1): url = f'https://emweb.securities.eastmoney.com/PC_HKF10/pages/home/index.html?code={code.split(".HK")[0]}&type=web&color=w#/CompanyNews'
# url = f'https://np-anotice-stock.eastmoney.com/api/security/ann?sr=-1&page_size=50&page_index={page1}&ann_type=A&client_source=web&stock_list={code1}&f_node=0&s_node=0' browser = createDriver()
url = f'https://np-anotice-stock.eastmoney.com/api/security/ann?sr=-1&page_size=50&page_index={page1}&ann_type=U%2CU_Pink&client_source=web&stock_list={code}'
for n1 in range(0, 3): browser.get(url)
try: time.sleep(1)
res = requests.get(url, verify=False) try:
break spider(browser, code, social_code, com_name)
except: return
if n1 == 2: except Exception as e:
sys.exit(0) log.info(f'error===={e}')
time.sleep(5) return
continue
res_json = res.json()
list_all = res_json['data']['list']
if list_all:
for one_info in list_all:
title = one_info['title']
info_date = one_info['notice_date']
year = info_date[:4]
# if page1 > 1 and '2022' in info_date:
# break_id = 1
# break
# if '2021' in info_date: # 只采集22年以后的数据
# break_id = 1
# break
try:
info_type = one_info['columns'][0]['column_name']
except:
info_type = ''
art_code = one_info['art_code']
info_url = 'https://data.eastmoney.com/notices/detail/' + code + '/' + art_code + '.html'
t = int(time.time() * 1000)
json_url = f'https://np-cnotice-stock.eastmoney.com/api/content/ann?art_code={art_code}&client_source=web&page_index=1&_={t}'
for n1 in range(0, 3):
try:
json_2 = requests.get(json_url, verify=False).json()
break
except:
if n1 == 2:
sys.exit(0)
time.sleep(5)
continue
try:
pdf_url = json_2['data']['attach_url']
except:
pdf_url = ''
try:
info_content = json_2['data']['notice_content']
except:
info_content = ''
ifexist = ifInstert(com_name, social_code, info_url)
if ifexist:
# 解析PDF内容,先获取PDF链接 下载 解析成功,解析失败 ,传输成功,传输失败
result = GetContent(pdf_url, info_url,title, social_code, year, info_date, start_time, com_name, num)
if result:
# 公告信息列表
log.info(f'{com_name}==============解析传输操作成功')
state = 1
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, '成功')
# 发送kafka成功之后 再插入数据库
insert = InsterInto(social_code, info_url, info_date, title)
if insert:
log.info(f'===={social_code}========{title}=====插入库成功')
pass
else:
continue
else:
log.info(f'======={com_name}========{code}===已存在')
continue
if __name__ =='__main__': if __name__ =='__main__':
#从redis中读取social_code' #从redis中读取social_code'
...@@ -362,8 +373,8 @@ if __name__ =='__main__': ...@@ -362,8 +373,8 @@ if __name__ =='__main__':
while True: while True:
start_time = time.time() start_time = time.time()
# 获取企业信息 # 获取企业信息
social_code = baseCore.redicPullData('NoticeEnterprise:mgqy_socialCode_add') # social_code = baseCore.redicPullData('NoticeEnterprise:ggqy_socialCode_add')
# social_code = 'ZZSN23030900000316' social_code = 'ZZSN23030800000022'
if not social_code: if not social_code:
time.sleep(20) time.sleep(20)
continue continue
...@@ -374,7 +385,7 @@ if __name__ =='__main__': ...@@ -374,7 +385,7 @@ if __name__ =='__main__':
time.sleep(20) time.sleep(20)
continue continue
dic_info = baseCore.getInfomation(social_code) dic_info = baseCore.getInfomation(social_code)
count = dic_info[15] # count = dic_info[15]
code = dic_info[3] code = dic_info[3]
com_name = dic_info[1] com_name = dic_info[1]
log.info(f'-----开始处理{com_name}----{social_code}------') log.info(f'-----开始处理{com_name}----{social_code}------')
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论