提交 c1b41f41 作者: 薛凌堃

新三板基本信息

上级 2697722b
"""
打开SEC网址——【FILINGS】——【Company Filing】——输入证券代码——选10-K和20-F为年报
"""
import json
import re
import time
from base.BaseCore import BaseCore
baseCore = BaseCore()
import requests
from bs4 import BeautifulSoup
from kafka import KafkaProducer
from selenium import webdriver
def spider(com_name,cik):
url = f'https://www.sec.gov/edgar/browse/?CIK={cik}&owner=exclude'
browser.get(url)
time.sleep(3)
page_source = browser.page_source
soup = BeautifulSoup(page_source, 'html.parser')
# print(soup)
select_ann = soup.find_all('tr', class_='odd')
for tr in select_ann:
form_type = tr.find('td').text
if form_type == '20-F':
# print(tr)
# 获取原文链接
href = tr.find('a', class_='document-link')['href']
print(href)
if 'ix?doc' in href:
href = 'https://www.sec.gov/' + href.split('/ix?doc=/')[1]
else:
href = 'https://www.sec.gov' + href
print(href)
# 获取发布时间
a_list = tr.find_all('a')
# print(a_list)
for a in a_list:
text = a.text
match = re.search(pattern, text)
if match:
pub_date = match.group(0)
# print(pub_date)
year = pub_date[:4]
break
else:
pub_date = ''
year = ''
# 根据年报的链接,请求年报内容,不需要上传文件服务器,直接发送kafka
browser.get(href)
time.sleep(3)
i_page_source = browser.page_source
i_soup = BeautifulSoup(i_page_source, 'html.parser')
# print(i_page_source)
content = i_soup.text
# 采集下来正文内容,直接传输kafka
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
title = f'{com_name}:{year}年年度报告'
dic_news = {
'attachmentIds': '',
'author': '',
'content': content,
'contentWithTag': i_page_source,
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': 'zh',
'origin': 'SEC美国证券交易委员会',
'publishDate': pub_date,
'sid': '1684032033495392257',
'sourceAddress': href, # 原文链接
'summary': '',
'title': title,
'type': 1,
'socialCreditCode': social_code,
'year': year
}
# print(dic_news)
# 将相应字段通过kafka传输保存
# try:
# producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
# kafka_result = producer.send("researchReportTopic",
# json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
#
# print(kafka_result.get(timeout=10))
#
# dic_result = {
# 'success': 'ture',
# 'message': '操作成功',
# 'code': '200',
# }
# print(dic_result)
#
# except Exception as e:
# dic_result = {
# 'success': 'false',
# 'message': '操作失败',
# 'code': '204',
# 'e': e
# }
def getrequest(social_code,url,headers,data):
#通过请求post接口获取企业的CIK
response = requests.post(url=url, headers=headers, data=data) # ,proxies=ip)
response.encoding = response.apparent_encoding
# 检查响应状态码
if response.status_code == 200:
# 请求成功,处理响应数据
# print(response.text)
result = response.json()
# print(result)
pass
else:
# 请求失败,输出错误信息
print('请求失败:', response.status_code, response.text)
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, '请求失败')
result = ''
return result
#模拟浏览器
chromedriver = "D:/chrome/chromedriver.exe"
browser = webdriver.Chrome(chromedriver)
pattern = r"\d{4}-\d{2}-\d{2}"
if __name__ == '__main__':
headers = {
'authority': 'efts.sec.gov',
'method': 'POST',
'path': '/LATEST/search-index',
'scheme': 'https',
'accept': '*/*',
'accept-encoding': 'gzip deflate br',
'accept-language': 'zh-CNzh;q=0.9en;q=0.8',
'content-length': '34',
'content-type': 'application/x-www-form-urlencoded; charset=UTF-8',
'origin': 'https://www.sec.gov',
'referer': 'https://www.sec.gov/',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-site',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML like Gecko) Chrome/80.0.3987.116 Safari/537.36'
}
url = 'https://efts.sec.gov/LATEST/search-index'
num = 0
taskType = '企业年报/雪球网'
while True:
start_time = time.time()
social_code = ''
# if not social_code:
# time.sleep(20)
# continue
# if social_code == 'None':
# time.sleep(20)
# continue
# if social_code == '':
# time.sleep(20)
# continue
# dic_info = baseCore.getInfomation(social_code)
# count = dic_info[15]
# code = dic_info[3]
# com_name = dic_info[4]
# if code is None:
# exeception = '股票代码为空'
# state = 0
# takeTime = baseCore.getTimeCost(start_time, time.time())
# baseCore.recordLog(social_code, taskType, state, takeTime, '', exeception)
# continue
code = 'BP'
#"MNSO" post请求 获取企业CIK
# payload = {"keysTyped":f"{code}","narrow":flag}
payload = {"keysTyped": "BP", "narrow":True}
data = json.dumps(payload)
result = getrequest(social_code,url,headers,data)
# print(result)
#判断接口返回的数据哪一条是该企业 根据股票代码
tickers = result['hits']['hits']
for ticker in tickers:
i_t = ticker['_source']['tickers']
if i_t == code:
cik = ticker['_id']
print(cik)
break
# break
spider(cik)
break
......@@ -327,13 +327,13 @@ if __name__ == '__main__':
#从redis里拿数据
while True:
# TODO:需要隔两个小时左右抓包修改,token从数据库中获得
token = baseCore.GetToken()
token = '027ea02da6d901a724ecca47930379b4'
list_weicha = []
list_all_info = []
name_list = []
start_time = time.time()
# 获取企业信息
com_code = baseCore.redicPullData('EnterpriseIpoqccid:nq_gpdm')
com_code = baseCore.redicPullData('EnterpriseIpo:nq_gpdm')
if '.NQ' in com_code:
com_code1 = com_code
else:
......@@ -344,7 +344,7 @@ if __name__ == '__main__':
if not company_id:
log.info(com_code + ":企业ID获取失败===重新放入redis")
list_weicha.append(com_code + ":企业ID获取失败")
baseCore.rePutIntoR('EnterpriseIpoqccid:nq_gpdm',com_code)
baseCore.rePutIntoR('EnterpriseIpo:nq_gpdm',com_code)
log.info('-----已重新放入redis-----')
time.sleep(20)
continue
......
......@@ -57,7 +57,7 @@ if __name__=="__main__":
url = "https://mp.weixin.qq.com/"
browser.get(url)
# 可改动
time.sleep(20)
time.sleep(60)
s = requests.session()
#获取到token和cookies
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论