提交 ce65474b 作者: LiuLiYuan

纳斯达克财务数据采集 9/28

上级 ca6cfd8d
import json
import re
import time
from itertools import groupby
from operator import itemgetter
import pymysql
import redis
import requests
from bs4 import BeautifulSoup
from requests.adapters import HTTPAdapter
from requests.packages import urllib3
from retry import retry
from base import BaseCore
urllib3.disable_warnings()
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
cnx = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='caiji',
charset='utf8mb4')
cursor = cnx.cursor()
URL = 'https://www.nasdaq.com/'
session = requests.session()
session.mount('https://', HTTPAdapter(pool_connections=20, pool_maxsize=100))
session.mount('http://', HTTPAdapter(pool_connections=20, pool_maxsize=100))
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36',
}
taskType = '财务数据/纳斯达克'
# 判断股票代码是否存在
@retry(tries=3, delay=1)
def check_code(com_code):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=3)
res = r.exists('caiwu_nasdaq_code::' + com_code)
# 如果key存在 则不是第一次采集该企业, res = 1
if res:
return False # 表示不是第一次采集
else:
return True # 表示是第一次采集
# 判断采集日期是否存在
@retry(tries=3, delay=1)
def check_date(com_code, info_date):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=3)
res = r.sismember('caiwu_nasdaq_code::' + com_code, info_date) # 注意是 保存set的方式
if res:
return True
else:
return False
# 将采集后的股票代码对应的报告期保存进redis
@retry(tries=3, delay=1)
def add_date(com_code, date_list):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=3)
# 遍历date_list 放入redis
for date in date_list:
res = r.sadd('caiwu_nasdaq_code::' + com_code, date)
# 数据发送端口
def sendData(start_time, social_code, gpdm, dic_info):
data = json.dumps(dic_info)
url_baocun = 'http://114.115.236.206:8088/sync/finance/nsdk'
for nnn in range(0, 3):
try:
res_baocun = requests.post(url_baocun, data=data)
log.info(f'{social_code}=={gpdm}财务数据保存接口成功')
break
except:
log.error(f'{social_code}=={gpdm}财务数据保存接口失败')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', f'{social_code}===财务数据保存接口失败')
time.sleep(1)
# 获取单位
def getUnit(gpdm):
url = f'https://www.nasdaq.com/market-activity/stocks/{gpdm}/financials'
req = requests.get(url, headers=headers, verify=False)
req.encoding = req.apparent_encoding
soup = BeautifulSoup(req.text, 'lxml')
unit = soup.find('div', class_='financials__note').text.split(' ')[1].lstrip().strip()
unit = f'(千){unit}'
req.close()
return unit
# 获取财务数据列表
def getlist(table, tableName):
list = []
years = table['headers']
datas = table['rows']
for data in datas:
for i in range(2, len(data) + 1):
name = data['value1']
value = data[f'value{i}']
if any(char.isdigit() for char in value):
value = re.sub(r"[^\d+-]", "", value)
else:
value = '-'
date = years[f'value{i}'].split('/')[2] + '-' + years[f'value{i}'].split('/')[0] + '-' + \
years[f'value{i}'].split('/')[1]
list.append({f'{tableName}': name, 'value': value, 'date': date, })
return list
# 财务数据按年份整合
def combanBydate(balance_list):
listbydates = []
balance_list.sort(key=itemgetter('date'))
groups = groupby(balance_list, key=itemgetter('date'))
for date, group in groups:
# 迭代表达式 一个日期的表
listbydate = [item for item in group]
listbydates.append([date, listbydate])
return listbydates
# 构造规范财务数据列表
def reviseData(lists, unit, tableName):
list_r = []
for data in lists:
list = {
'enName': data[f'{tableName}'],
'value': data['value'],
'unit': unit
}
list_r.append(list)
return list_r
# 获取年度财务数据
def getYear(start_time, session, social_code, gpdm):
ynFirst = check_code(social_code)
date_list = []
url = f'https://api.nasdaq.com/api/company/{gpdm}/financials?frequency=1'
try:
req = session.get(url, headers=headers, verify=False)
req.encoding = req.apparent_encoding
data = req.json()['data']
if data:
unit = getUnit(gpdm)
all_list = []
lrb_list = getlist(data['incomeStatementTable'], 'lrb')
zcfz_list = getlist(data['balanceSheetTable'], 'zcfz')
xjll_list = getlist(data['cashFlowTable'], 'xjll')
for list in lrb_list:
all_list.append(list)
for list in zcfz_list:
all_list.append(list)
for list in xjll_list:
all_list.append(list)
all_group = combanBydate(all_list)
date_list = []
for date, final_list in all_group:
# 判断该报告期是否已采过
panduan = check_date(social_code, date + '-year')
if panduan:
continue
xjll_list_f = reviseData([item for item in final_list if 'xjll' in item], unit, 'xjll')
zcfz_list_f = reviseData([item for item in final_list if 'zcfz' in item], unit, 'zcfz')
lrb_list_f = reviseData([item for item in final_list if 'lrb' in item], unit, 'lrb')
dic_info = {
"socialCreditCode": social_code,
"securitiesCode": gpdm,
"date": date,
"debt": zcfz_list_f,
"profit": lrb_list_f,
"cash": xjll_list_f,
'dateFlag': 'year',
"ynFirst": ynFirst,
}
sendData(start_time, social_code, gpdm, dic_info)
date_list.append(date + '-year')
else:
log.error(f'找不到{social_code}=={gpdm}年度财务数据')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===无年度财务数据')
except:
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===年度财务数据访问失败')
return date_list
# 获取季度财务数据 需要判断日期是否取与年度数据日期重合,重合需要修改类型为dateFlag字段为year
def getQuarter(start_time, session, social_code, gpdm):
ynFirst = check_code(social_code)
date_list = []
url = f'https://api.nasdaq.com/api/company/{gpdm}/financials?frequency=2'
try:
req = session.get(url, headers=headers, verify=False)
req.encoding = req.apparent_encoding
data = req.json()['data']
if data:
unit = getUnit(gpdm)
all_list = []
lrb_list = getlist(data['incomeStatementTable'], 'lrb')
zcfz_list = getlist(data['balanceSheetTable'], 'zcfz')
xjll_list = getlist(data['cashFlowTable'], 'xjll')
for list in lrb_list:
all_list.append(list)
for list in zcfz_list:
all_list.append(list)
for list in xjll_list:
all_list.append(list)
all_group = combanBydate(all_list)
for date, final_list in all_group:
# 判断该报告期是否已采过
panduan = check_date(social_code, date + '-quarter')
if panduan:
continue
xjll_list_f = reviseData([item for item in final_list if 'xjll' in item], unit, 'xjll')
zcfz_list_f = reviseData([item for item in final_list if 'zcfz' in item], unit, 'zcfz')
lrb_list_f = reviseData([item for item in final_list if 'lrb' in item], unit, 'lrb')
dic_info = {
"socialCreditCode": social_code,
"securitiesCode": gpdm,
"date": date,
"debt": zcfz_list_f,
"profit": lrb_list_f,
"cash": xjll_list_f,
'dateFlag': 'quarter',
"ynFirst": ynFirst,
}
# 判断季度数据年份是否与年度数据年份相投
panduan_flag = check_date(social_code, date + '-year')
if panduan_flag:
dic_info['dateFlag'] = 'year'
sendData(start_time, social_code, gpdm, dic_info)
date_list.append(date + '-quarter')
else:
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===无季度财务数据')
except:
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===季度财务数据访问失败')
return date_list
def doJob():
# while True:
# social_code = baseCore.redicPullData('')
# datas_enterprise = baseCore.getInfomation(social_code)
session.get(URL, headers=headers)
# sql = "select * from mgzqyjwyh_list where state=2 and exchange='Nasdaq';"
# cursor.execute(sql)
# datas_enterprise = cursor.fetchall()
# for data_enterprise in datas_enterprise:
start_time = time.time()
# gpdm = data_enterprise[3]
# social_code = data_enterprise[6]
social_code = 'ZD0CN0012309000172'
gpdm = 'NTES'
# 采集年度数据
date_list_year = getYear(start_time, session, social_code, gpdm)
# 保存年度数据到redis
add_date(social_code, date_list_year)
# 采集季度数据
date_list_quarter = getQuarter(start_time, session, social_code, gpdm)
# 保存季度数据到redis
add_date(social_code, date_list_quarter)
timeCost = baseCore.getTimeCost(start_time, time.time())
state = 1
baseCore.recordLog(social_code, taskType, state, timeCost, '', '')
log.info(f'{social_code}=={gpdm}==耗时{timeCost}')
# break
cursor.close()
cnx.close()
if __name__ == '__main__':
doJob()
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论