提交 7dcd1a4c 作者: 薛凌堃

9/8

上级 7baf2215
......@@ -116,33 +116,6 @@ def NoticeEnterprise_task():
print('定时采集异常', e)
pass
#企业年报
def AnnualEnterprise():
cnx,cursor = connectSql()
# 获取国内企业
gn_query = "select SocialCode from EnterpriseInfo where Place = '1' and SecuritiesCode is not null"
cursor.execute(gn_query)
gn_result = cursor.fetchall()
gn_social_list = [item[0] for item in gn_result]
print('=======')
for item in gn_social_list:
r.rpush('AnnualEnterprise:gnqy_socialCode', item)
closeSql(cnx,cursor)
#企业年报定时任务
def AnnualEnterprise_task():
# 实例化一个调度器
scheduler = BlockingScheduler()
# 每年执行一次
scheduler.add_job(AnnualEnterprise, 'cron', second='*/10')
try:
# 定时开始前执行一次
AnnualEnterprise()
scheduler.start()
except Exception as e:
print('定时采集异常', e)
pass
#企业基本信息
def BaseInfoEnterprise():
cnx,cursor = connectSql()
......@@ -245,6 +218,33 @@ def weixin_task():
print('定时采集异常', e)
pass
#企业年报证监会
def AnnualEnterprise():
cnx,cursor = connectSql()
# 获取国内企业
gn_query = "select SocialCode from EnterpriseInfo where Place = '1' and SecuritiesCode is not null"
cursor.execute(gn_query)
gn_result = cursor.fetchall()
gn_social_list = [item[0] for item in gn_result]
print('=======')
for item in gn_social_list:
r.rpush('AnnualEnterprise:gnqy_socialCode', item)
closeSql(cnx,cursor)
#企业年报定时任务
def AnnualEnterprise_task():
# 实例化一个调度器
scheduler = BlockingScheduler()
# 每年执行一次
scheduler.add_job(AnnualEnterprise, 'cron', second='*/10')
try:
# 定时开始前执行一次
AnnualEnterprise()
scheduler.start()
except Exception as e:
print('定时采集异常', e)
pass
# 企业年报——雪球网
def AnnualEnterpriseXueQ():
cnx,cursor = connectSql()
......@@ -271,6 +271,21 @@ def AnnualEnterpriseXueQ_task():
print('定时采集异常', e)
pass
#企业年报--美国证券交易委员会
def AnnualEnterpriseUS():
cnx,cursor = connectSql()
# 获取美股企业
us_query = "select SocialCode from EnterpriseInfo where Place = '2' and SecuritiesType = '美股' and SecuritiesCode is not null"
# us_query = "select SocialCode from EnterpriseInfo where Place = '2' and SecuritiesType = '美股' and SecuritiesCode = 'BP' "
#ZZSN22080900000025
cursor.execute(us_query)
us_result = cursor.fetchall()
us_social_list = [item[0] for item in us_result]
print('=======')
for item in us_social_list:
r.rpush('AnnualEnterprise:usqy_socialCode', item)
closeSql(cnx,cursor)
#国外企业基本信息 redis中放入id
def BaseInfoEnterpriseAbroad():
cnx,cursor = connectSql()
......@@ -383,7 +398,8 @@ if __name__ == "__main__":
# NewsEnterprise()
# BaseInfoEnterprise()
# FBS()
MengZhi()
# MengZhi()
AnnualEnterpriseUS()
# NoticeEnterprise_task()
# AnnualEnterprise_task()
# NoticeEnterprise()
......
......@@ -9,54 +9,165 @@
import json
import re
import time
from urllib.parse import urljoin
from base.BaseCore import BaseCore
baseCore = BaseCore()
import requests
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
from bs4 import BeautifulSoup
from kafka import KafkaProducer
from selenium import webdriver
# from selenium import webdriver
def paserUrl(html,listurl):
# soup = BeautifulSoup(html, 'html.parser')
# 获取所有的<a>标签和<img>标签
links = html.find_all(['a', 'img'])
# 遍历标签,将相对地址转换为绝对地址
for link in links:
if 'href' in link.attrs:
link['href'] = urljoin(listurl, link['href'])
elif 'src' in link.attrs:
link['src'] = urljoin(listurl, link['src'])
return html
def get_news(news_url,ip_dic):
header = {
'Host': 'www.sec.gov',
'Connection': 'keep-alive',
'sec-ch-ua': '"Not/A)Brand";v="99", "Google Chrome";v="115", "Chromium";v="115"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-User': '?1',
'Sec-Fetch-Dest': 'document',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Cookie': '_gid=GA1.2.385814648.1694135927; _ga_300V1CHKH1=GS1.1.1694135927.6.1.1694136598.0.0.0; _ga=GA1.1.733439486.1693211261; _4c_=%7B%22_4c_s_%22%3A%22dZJbj9owEIX%2FCvJDngj4EowTKaqqVKq20vbe7SMK9pBYC3HkGLwU8d9rQ%2Bh2V61fEn9z5vjInhPyLXSoIDzPCOMcYyHwFD3CcUDFCVmt4ueACqRqlinOcMprxtOsZos0ZwpSIYQUQi0WFDCaoqfgtcQ4F0vKCRX0PEWqu3lYUDDopnupE5xSHnS6d6MwpGEsx8Ez4%2BKmJYTzK4nam2WN%2Flm3%2FmZ1Kyxyxl9KIwnS3r4%2B9b9S2Y%2FSE5JGQTie5DMiZjjdDCGH%2BxVIJuI19NaovXQrd%2ByjzMN6MqjHUFBw0BJWXivXXvopfqYt6KZ1EeOLi4rZEAl%2FXnfK%2BNdtI%2F3TlrOoXVvjB4idVWvNDiaELAI24UXRz0tHDGthA9ZeZK1z%2FVDM59772QBy1pjDXDY6XetufjVLQTW1fSPNrq%2B7Y%2Fnh832yq51sy8HV1g2p165NNnoL3X5XJt9c7aBMKrPvnD2G%2FV1VJruj8R3YEp7kdq8gqaXTpisbcKNryDRoF29rzDCCMItXll7Zg45UTb5XXwP%2F%2BBf5Un26H9H7t6sfd%2B%2FCZslYxvJM8Fl8XkpIGEt0vr5umHlKaR5WFqbMuS0qBM9wXOfz%2BTc%3D%22%7D'
}
response = requests.get(url=news_url,headers=header,verify=False,timeout=30)
# response = requests.get(url=news_url, verify=False, proxies=ip_dic, timeout=30)
if response.status_code == 200:
# 请求成功,处理响应数据
# print(response.text)
result = BeautifulSoup(response.content,'html.parser')
# print(result)
pass
else:
# 请求失败,输出错误信息
print('请求失败:', response.status_code, response.text)
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, '请求失败')
result = ''
return result
def spider(com_name,cik):
url = f'https://www.sec.gov/edgar/browse/?CIK={cik}&owner=exclude'
browser.get(url)
time.sleep(3)
page_source = browser.page_source
soup = BeautifulSoup(page_source, 'html.parser')
# print(soup)
select_ann = soup.find_all('tr', class_='odd')
for tr in select_ann:
form_type = tr.find('td').text
if form_type == '20-F':
# print(tr)
# 获取原文链接
href = tr.find('a', class_='document-link')['href']
print(href)
if 'ix?doc' in href:
href = 'https://www.sec.gov/' + href.split('/ix?doc=/')[1]
header = {
'Host':'data.sec.gov',
'Connection':'keep-alive',
'Pragma':'no-cache',
'Cache-Control':'no-cache',
'sec-ch-ua':'"Chromium";v="116", "Not)A;Brand";v="24", "Google Chrome";v="116"',
'sec-ch-ua-mobile':'?0',
'sec-ch-ua-platform':'"Windows"',
'Upgrade-Insecure-Requests':'1',
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36',
'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Sec-Fetch-Site':'none',
'Sec-Fetch-Mode':'navigate',
'Sec-Fetch-User':'?1',
'Sec-Fetch-Dest':'document',
'Accept-Encoding':'gzip, deflate, br',
'Accept-Language':'zh-CN,zh;q=0.9',
'Cookie':'_4c_=%7B%22_4c_s_%22%3A%22fVPLbtswEPwVg2fLJimSonwrUqDoIS1apO0xYMi1JcSRBIqx4hr%2B9%2B5act6pLiKHM8PR7urAhgoathKmzFWhpZFG2Dm7hX3PVgcW60CvHVsx4Zz2XOiMB6czJUXIrHZlBrAuxFob73PP5uwBvQoupNJalIXUxznz3eRxYL4NQF7lQtgFz9Y9KtJfRJTluOxiG%2B59uk77jmgD3Mz6cIsHAXa1h%2BuhDqkifW7ME1pBvakSwoWxhHaRKLga6ia0w2vVhD6qjCoRvYnt0AMpL6rY3sFMCCK3WAb256SgrBHWEOOJhru%2BThSzB7%2FYtLsJwNKNWDZiv2tCw%2Bzq4ifi354hPy6%2BX05QRxXOcbFtvduSKTZlzr58uv719fMpellqxctcLk6dMqUUVLD7uMXTKqWuXy2XwzAspjBLCBsXlz246Ktx7du7zjX7EUItNHRpFwMFB5%2FqthmD4%2F4q1psNxEtIVYsTgHsXamK4LVWYiBEC9PWGYgYqI%2B5uU9s9wsdxFjCtNsIYrqXEXifMa43i9BzH7z6NRv7E1kZyYXnxlj32KKPaQvMfqX0rDbA%2BD7IFl6t1YTLBwWaqUDIrC5Nn%2FMaALVTgXjj20lNK855nc7Z8Voun%2BbcoKxTy6i5NxKl3luc8z19yCSHu2dKxd8%2FjcLY6HyhFP%2BzDK4RqG1%2Ff%2BgH1ePwH%22%7D; _ga_300V1CHKH1=GS1.1.1694142118.3.0.1694142118.0.0.0; _ga=GA1.2.1399540932.1693469210; _gid=GA1.2.1824845345.1694142136; ak_bmsc=CB437E1B69906A01E58692EFBAA8A225~000000000000000000000000000000~YAAQ8BQgFyY6AFaKAQAAbKy9chWzUG2FvPYSvQ1oaw2RdgKemipNBxwFJPC71bps8Pe4B7LG80Yn8Gg+yVD84WX1d+lVZqdaPr8pbsd3N8NWzwiWUcN7PSoKK1Ej/G2WgOv8Nl0s2E8E8x/5XVYtGyFwKSl5mUGNsfsL4WYI++6imjaYHtyTDxtmKhvnWHMwXCMiJgqvRCr9yf5CeXKJuhpRrSZV/GZa8qlDr5PmF1LPu2RKv1jNRfLqq+BKaO4jKN8ETA0RUxhvXEpI1cc0bxFp9t/mD6iTVhzbxJ17qiBn9DLPcXoX1yheRONu9M//SyeHfETezU2RagRHONIPZXB2oN/8Qlu+Rjz9NIZk532RTj0qCSRu48EH8nmYFcwvGXb8YNhotygum3P+ELZSCzlgolFBQp+qciKBTsuJ3JL99/HMDHO9OyheN5yw6RH/hu6/xVW95acmV925q/yjoXITR+mcZWkrH4iRncHGQmwWQR+d+pNqeBYUNNm2; bm_sv=2C2708DF01ED851C6C481514DDA7F381~YAAQ8BQgF409AFaKAQAAFsm9chW4u/u6J8XhmAzFpGSqZr1ktVU8veuhu+tJ9h+G3Lf52nquY6mUDlkG1ZBMRAkAB3WCPBGWiKSbGR6sB29QOE9LOosBZKzL742Z5a0k6rOWyoByvjl75i7j68RIqGt0h87YwwLLqnH6gx6H0uqCkg+J405BKwHjvVhnQOF3eAD5CCbaJY5GQdS8bKDjOaX7e1WVr5aqdlNdEciyrs9hxhPZSPLLXuCFIDH+~1'
}
ip_dic = {'https': 'http://127.0.0.1:1080', 'http': 'http://127.0.0.1:1080'}
#正式
url_json = f'https://data.sec.gov/submissions/CIK{cik}.json'
#测试
# url_json = 'https://data.sec.gov/submissions/CIK0001395064.json'
#解析页面
req = requests.get(url=url_json,headers=header,proxies=ip_dic,verify=False,timeout=30)
# ,proxies=ip_dic)
data = req.json()
info = data['filings']['recent']
form_type_list = info['form']
accessionNumber_list = info['accessionNumber']
primaryDocument_list = info['primaryDocument']
filingDate_list = info['filingDate']
i = 0
for form in form_type_list:
i += 1
if form == '10-K' or form == '20-F':
print(form,i)
accessionNumber = accessionNumber_list[i]
#发布日期
filingDate = filingDate_list[i]
year = filingDate[:4]
# u_1 = cik
u_1 = '1395064'
u_2 = accessionNumber.replace('-','')
u_3 = primaryDocument_list[i]
news_url = 'https://www.sec.gov/Archives/edgar/data/' + u_1 + '/' + u_2 + '/' + u_3
soup = get_news(news_url,ip_dic)
if soup:
pass
else:
href = 'https://www.sec.gov' + href
print(href)
# 获取发布时间
a_list = tr.find_all('a')
# print(a_list)
for a in a_list:
text = a.text
match = re.search(pattern, text)
if match:
pub_date = match.group(0)
# print(pub_date)
year = pub_date[:4]
break
else:
pub_date = ''
year = ''
# 根据年报的链接,请求年报内容,不需要上传文件服务器,直接发送kafka
browser.get(href)
time.sleep(3)
i_page_source = browser.page_source
i_soup = BeautifulSoup(i_page_source, 'html.parser')
# print(i_page_source)
content = i_soup.text
continue
#相对路径转化为绝对路径
soup = paserUrl(soup,news_url)
content = soup.text.strip()
# url = f'https://www.sec.gov/edgar/browse/?CIK={cik}&owner=exclude'
# browser.get(url)
# time.sleep(3)
# page_source = browser.page_source
# soup = BeautifulSoup(page_source, 'html.parser')
# # print(soup)
# select_ann = soup.find_all('tr', class_='odd')
#
# for tr in select_ann:
# form_type = tr.find('td').text
# if form_type == '20-F':
# # print(tr)
# # 获取原文链接
# href = tr.find('a', class_='document-link')['href']
# print(href)
# if 'ix?doc' in href:
# href = 'https://www.sec.gov/' + href.split('/ix?doc=/')[1]
# else:
# href = 'https://www.sec.gov' + href
# print(href)
# # 获取发布时间
# a_list = tr.find_all('a')
# # print(a_list)
# for a in a_list:
# text = a.text
# match = re.search(pattern, text)
# if match:
# pub_date = match.group(0)
# # print(pub_date)
# year = pub_date[:4]
# break
# else:
# pub_date = ''
# year = ''
# # 根据年报的链接,请求年报内容,不需要上传文件服务器,直接发送kafka
# browser.get(href)
# time.sleep(3)
# i_page_source = browser.page_source
# i_soup = BeautifulSoup(i_page_source, 'html.parser')
# # print(i_page_source)
# content = i_soup.text
# 采集下来正文内容,直接传输kafka
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
......@@ -65,45 +176,45 @@ def spider(com_name,cik):
'attachmentIds': '',
'author': '',
'content': content,
'contentWithTag': i_page_source,
'contentWithTag': soup,
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': 'zh',
'origin': 'SEC美国证券交易委员会',
'publishDate': pub_date,
'publishDate': filingDate,
'sid': '1684032033495392257',
'sourceAddress': href, # 原文链接
'sourceAddress': news_url, # 原文链接
'summary': '',
'title': title,
'type': 1,
'socialCreditCode': social_code,
'socialCreditCode': '',
'year': year
}
# print(dic_news)
print(dic_news)
# 将相应字段通过kafka传输保存
# try:
# producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
# kafka_result = producer.send("researchReportTopic",
# json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
#
# print(kafka_result.get(timeout=10))
#
# dic_result = {
# 'success': 'ture',
# 'message': '操作成功',
# 'code': '200',
# }
# print(dic_result)
#
# except Exception as e:
# dic_result = {
# 'success': 'false',
# 'message': '操作失败',
# 'code': '204',
# 'e': e
# }
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("researchReportTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
dic_result = {
'success': 'ture',
'message': '操作成功',
'code': '200',
}
print(dic_result)
except Exception as e:
dic_result = {
'success': 'false',
'message': '操作失败',
'code': '204',
'e': e
}
def getrequest(social_code,url,headers,data):
......@@ -126,11 +237,6 @@ def getrequest(social_code,url,headers,data):
result = ''
return result
#模拟浏览器
chromedriver = "D:/chrome/chromedriver.exe"
browser = webdriver.Chrome(chromedriver)
pattern = r"\d{4}-\d{2}-\d{2}"
if __name__ == '__main__':
headers = {
'authority': 'efts.sec.gov',
......@@ -154,44 +260,56 @@ if __name__ == '__main__':
taskType = '企业年报/SEC'
while True:
start_time = time.time()
social_code = ''
# if not social_code:
# time.sleep(20)
# continue
# if social_code == 'None':
# time.sleep(20)
# continue
# if social_code == '':
# time.sleep(20)
# continue
# dic_info = baseCore.getInfomation(social_code)
# count = dic_info[15]
# code = dic_info[3]
# com_name = dic_info[4]
# if code is None:
# exeception = '股票代码为空'
# state = 0
# takeTime = baseCore.getTimeCost(start_time, time.time())
# baseCore.recordLog(social_code, taskType, state, takeTime, '', exeception)
# continue
code = 'BP'
#"MNSO" post请求 获取企业CIK
# payload = {"keysTyped":f"{code}","narrow":flag}
payload = {"keysTyped": "BP", "narrow":True}
data = json.dumps(payload)
result = getrequest(social_code,url,headers,data)
# print(result)
#判断接口返回的数据哪一条是该企业 根据股票代码
tickers = result['hits']['hits']
for ticker in tickers:
i_t = ticker['_source']['tickers']
if i_t == code:
cik = ticker['_id']
print(cik)
break
# 获取企业信息
social_code = baseCore.redicPullData('AnnualEnterprise:usqy_socialCode')
# social_code = ''
if not social_code:
time.sleep(20)
continue
if social_code == 'None':
time.sleep(20)
continue
if social_code == '':
time.sleep(20)
continue
dic_info = baseCore.getInfomation(social_code)
count = dic_info[15]
code = dic_info[3]
com_name = dic_info[4]
cik = dic_info[13]
if code is None:
exeception = '股票代码为空'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', exeception)
continue
if cik is None:
exeception = 'cik为空'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', exeception)
continue
# code = 'BP'
# com_name = '英国石油公司'
# cik = ''
#"MNSO" post请求 获取企业CIK 正式
# payload = {"keysTyped":f"{code}","narrow":True}
# #测试
# # payload = {"keysTyped": "BP", "narrow":True}
# data = json.dumps(payload)
# result = getrequest(social_code,url,headers,data)
# # print(result)
# #判断接口返回的数据哪一条是该企业 根据股票代码
# tickers = result['hits']['hits']
# for ticker in tickers:
# i_t = ticker['_source']['tickers']
# if i_t == code:
# cik = ticker['_id']
# print(cik)
# break
# break
spider(com_name,cik)
break
# break
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论