提交 5a277057 作者: 薛凌堃

东方财富网财务数据

上级 f0246e72
"""
"""
修改东方财富网财务数据 存储redis的方式 修改成功
"""
import requests, json, time, re, random, pymysql, redis
from datetime import datetime,timedelta
import pandas as pd
from bs4 import BeautifulSoup
from base.BaseCore import BaseCore
baseCore = BaseCore()
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
cursor = cnx.cursor()
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
log = baseCore.getLogger()
# 判断股票代码是否存在
def check_code(com_code):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn',db=3)
res = r.exists('com_caiwushuju_code::'+com_code)
#如果key存在 则不是第一次采集该企业, res = 1
if res:
return False #表示不是第一次采集
else:
return True #表示是第一次采集
def check_date(com_code,info_date):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=3)
res = r.sismember('com_caiwushuju_date::'+com_code, info_date) # 注意是 保存set的方式
if res:
return True
else:
return False
# 将采集后的股票代码对应的报告期保存进redis
def add_date(com_code,date_list):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn',db=3)
#遍历date_list 放入redis
for date in date_list:
res = r.sadd('com_caiwushuju_code::'+com_code,date)
# 根据信用代码、股票代码、报告时间采集三张表的数据
def get_info(social_code, com_code,info_date,delist_all,info_date_list,taskType):
dic_info = {}
# 第一次采集的股票代码做处理
for nnn in range(0, 3):
try:
ynFirst = check_code(com_code)
break
except:
time.sleep(1)
#判断该报告期是否已采过
for nnn in range(0, 3):
try:
panduan = check_date(com_code,info_date)
if panduan:
return dic_info
else:
pass
break
except:
time.sleep(1)
# 页面url,用于采集字段名称
url_name = f'https://emweb.eastmoney.com/PC_HSF10/NewFinanceAnalysis/Index?type=web&code={com_code}'
# print(f'url_name:{url_name}')
#todo:循环20次还是采集不到的记录
try:
start_time = time.time()
for i in range(1, 20):
# 资产负债表,返回资产负债表json数据
url_data_zcfzb1 = f'https://emweb.eastmoney.com/PC_HSF10/NewFinanceAnalysis/zcfzbAjaxNew?companyType={i}&reportDateType=0&reportType=1&dates={info_date}&code={com_code}'
# 利润表,返回利润表json数据
url_data_lrb1 = f'https://emweb.eastmoney.com/PC_HSF10/NewFinanceAnalysis/lrbAjaxNew?companyType={i}&reportDateType=0&reportType=1&dates={info_date}&code={com_code}'
# 现金流量表,返回现金流量表json数据
url_data_xjllb1 = f'https://emweb.eastmoney.com/PC_HSF10/NewFinanceAnalysis/xjllbAjaxNew?companyType={i}&reportDateType=0&reportType=1&dates={info_date}&code={com_code}'
res_data_zcfzb = requests.get(url_data_zcfzb1)
res_data_lrb = requests.get(url_data_lrb1)
res_data_xjllb = requests.get(url_data_xjllb1)
#如果没有解析成功就继续循环
try:
data_json_zcfzb = res_data_zcfzb.json()['data'][0]
print(f'{info_date}第{i}次解析成功')
except:
continue
#只要第一个能解析成功那其他的就都可以解析成功
data_json_lrb = res_data_lrb.json()['data'][0]
data_json_xjllb = res_data_xjllb.json()['data'][0]
res_name = requests.get(url_name)
soup_name = BeautifulSoup(res_name.content, 'html.parser')
#第一个表
try:
script_zcfzb = soup_name.find('script', {'id': 'zcfzb_qy'})
if script_zcfzb:
soup_zcfzb = BeautifulSoup(script_zcfzb.text.strip(), 'lxml')
else:
script_zcfzb = soup_name.find('script', {'id': 'zcfzb_qs'})
if script_zcfzb:
soup_zcfzb = BeautifulSoup(script_zcfzb.text.strip(), 'lxml')
else:
script_zcfzb = soup_name.find('script', {'id': 'zcfzb_yh'})
if script_zcfzb:
soup_zcfzb = BeautifulSoup(script_zcfzb.text.strip(), 'lxml')
# bx
else:
script_zcfzb = soup_name.find('script', {'id': 'zcfzb_bx'})
soup_zcfzb = BeautifulSoup(script_zcfzb.text.strip(), 'lxml')
except:
log.info(f'---error: {social_code}, {com_code}---')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url_name, f'{info_date}资产负债表失败')
#第二个表
try:
script_lrb = soup_name.find('script', {'id': 'lrb_qy'})
if script_lrb:
soup_lrb = BeautifulSoup(script_lrb.text.strip(), 'lxml')
else:
script_lrb = soup_name.find('script', {'id': 'lrb_qs'})
if script_lrb:
soup_lrb = BeautifulSoup(script_lrb.text.strip(), 'lxml')
else:
# zcfzb_yh
script_lrb = soup_name.find('script', {'id': 'lrb_yh'})
if script_lrb:
soup_lrb = BeautifulSoup(script_lrb.text.strip(), 'lxml')
else:
script_lrb = soup_name.find('script', {'id': 'lrb_bx'})
soup_lrb = BeautifulSoup(script_lrb.text.strip(), 'lxml')
except:
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url_name, f'{info_date}利润表失败')
#第三个表
try:
script_xjllb = soup_name.find('script', {'id': 'xjllb_qy'})
if script_xjllb:
soup_xjllb = BeautifulSoup(script_xjllb.text.strip(), 'lxml')
else:
script_xjllb = soup_name.find('script', {'id': 'xjllb_qs'})
if script_xjllb:
soup_xjllb = BeautifulSoup(script_xjllb.text.strip(), 'lxml')
else:
script_xjllb = soup_name.find('script', {'id': 'xjllb_yh'})
if script_xjllb:
soup_xjllb = BeautifulSoup(script_xjllb.text.strip(), 'lxml')
else:
script_xjllb = soup_name.find('script', {'id': 'xjllb_bx'})
soup_xjllb = BeautifulSoup(script_xjllb.text.strip(), 'lxml')
except:
log.info(f'---error: {social_code}, {com_code}---')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url_name, f'{info_date}现金流量表失败')
list_zcfzb = []
for one_info in soup_zcfzb.find_all('tr')[2:]:
if 'value.' not in one_info.text:
continue
info_name = one_info.find('span').text
if '审计意见' in info_name:
continue
info_name_en = re.findall('value\.(.*?)\)}}', one_info.text)[0]
if info_name_en:
try:
info_data = data_json_zcfzb[info_name_en]
except:
continue
else:
continue
if not info_data:
info_data = '--'
dic_info_zcfzb = {
"name": info_name,
'enName': info_name_en,
"value": info_data
}
list_zcfzb.append(dic_info_zcfzb)
log.info(f'----list_zcfzb:采集条数{len(list_zcfzb)}----')
list_lrb = []
for one_info in soup_lrb.find_all('tr')[2:]:
if 'value.' not in one_info.text:
continue
info_name = one_info.find('span').text
if '审计意见' in info_name:
continue
info_name_en = re.findall('value\.(.*?)\)}}', one_info.text)[0]
if info_name_en:
try:
info_data = data_json_lrb[info_name_en]
except:
continue
else:
continue
if not info_data:
info_data = '--'
dic_info_lrb = {
"name": info_name,
'enName': info_name_en,
"value": info_data
}
list_lrb.append(dic_info_lrb)
list_xjllb = []
for one_info in soup_xjllb.find_all('tr')[2:]:
if '补充资料' in one_info.text:
break
if 'value.' not in one_info.text:
continue
info_name = one_info.find('span').text
if '审计意见' in info_name:
continue
info_name_en = re.findall('value\.(.*?)\)}}', one_info.text)[0]
if info_name_en:
try:
info_data = data_json_xjllb[info_name_en]
except:
continue
else:
continue
if not info_data:
info_data = '--'
dic_info_xjllb = {
"name": info_name,
'enName': info_name_en,
"value": info_data
}
list_xjllb.append(dic_info_xjllb)
dic_info = {
"socialCreditCode": social_code,
"securitiesCode": com_code[2:],
"date": info_date,
"debt": list_zcfzb,
"profit": list_lrb,
"cash": list_xjllb,
"ynFirst": ynFirst,
}
#当前报告期加入列表
info_date_list.append(info_date)
return dic_info
except:
start_time = time.time()
try:
for i in range(1, 20):
# 资产负债表,返回资产负债表json数据
url_data_zcfzb1 = f'https://emweb.eastmoney.com/PC_HSF10/NewFinanceAnalysis/zcfzbAjaxNew?companyType={i}&reportDateType=0&reportType=1&dates=2023-03-31%2C2022-12-31%2C2022-09-30%2C2022-06-30%2C2022-03-31&code={com_code}'
# 利润表,返回利润表json数据
url_data_lrb1 = f'https://emweb.eastmoney.com/PC_HSF10/NewFinanceAnalysis/lrbAjaxNew?companyType={i}&reportDateType=0&reportType=1&dates=2023-03-31%2C2022-12-31%2C2022-09-30%2C2022-06-30%2C2022-03-31&code={com_code}'
# 现金流量表,返回现金流量表json数据
url_data_xjllb1 = f'https://emweb.eastmoney.com/PC_HSF10/NewFinanceAnalysis/xjllbAjaxNew?companyType={i}&reportDateType=0&reportType=1&dates=2023-03-31%2C2022-12-31%2C2022-09-30%2C2022-06-30%2C2022-03-31&code={com_code}'
res_data_zcfzb = requests.get(url_data_zcfzb1)
res_data_lrb = requests.get(url_data_lrb1)
res_data_xjllb = requests.get(url_data_xjllb1)
# 如果没有解析成功就继续循环
try:
data_json_zcfzb = res_data_zcfzb.json()['data'][0]
log.info(f'----{com_code}---{info_date}--第{i}次解析成功-----')
except:
continue
# 只要第一个能解析成功那其他的就都可以解析成功
data_json_lrb = res_data_lrb.json()['data'][0]
data_json_xjllb = res_data_xjllb.json()['data'][0]
res_name = requests.get(url_name)
soup_name = BeautifulSoup(res_name.content, 'html.parser')
# 第一个表
try:
script_zcfzb = soup_name.find('script', {'id': 'zcfzb_qy'})
if script_zcfzb:
soup_zcfzb = BeautifulSoup(script_zcfzb.text.strip(), 'lxml')
else:
script_zcfzb = soup_name.find('script', {'id': 'zcfzb_qs'})
if script_zcfzb:
soup_zcfzb = BeautifulSoup(script_zcfzb.text.strip(), 'lxml')
else:
script_zcfzb = soup_name.find('script', {'id': 'zcfzb_yh'})
if script_zcfzb:
soup_zcfzb = BeautifulSoup(script_zcfzb.text.strip(), 'lxml')
# bx
else:
script_zcfzb = soup_name.find('script', {'id': 'zcfzb_bx'})
soup_zcfzb = BeautifulSoup(script_zcfzb.text.strip(), 'lxml')
except:
log.info(f'---error: {social_code}, {com_code}---')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url_name, f'{info_date}资产负债表失败')
# 第二个表
try:
script_lrb = soup_name.find('script', {'id': 'lrb_qy'})
if script_lrb:
soup_lrb = BeautifulSoup(script_lrb.text.strip(), 'lxml')
else:
script_lrb = soup_name.find('script', {'id': 'lrb_qs'})
if script_lrb:
soup_lrb = BeautifulSoup(script_lrb.text.strip(), 'lxml')
else:
# zcfzb_yh
script_lrb = soup_name.find('script', {'id': 'lrb_yh'})
if script_lrb:
soup_lrb = BeautifulSoup(script_lrb.text.strip(), 'lxml')
else:
script_lrb = soup_name.find('script', {'id': 'lrb_bx'})
soup_lrb = BeautifulSoup(script_lrb.text.strip(), 'lxml')
except:
log.info(f'---error: {social_code}, {com_code}---')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url_name, f'{info_date}利润表失败')
# 第三个表
try:
script_xjllb = soup_name.find('script', {'id': 'xjllb_qy'})
if script_xjllb:
soup_xjllb = BeautifulSoup(script_xjllb.text.strip(), 'lxml')
else:
script_xjllb = soup_name.find('script', {'id': 'xjllb_qs'})
if script_xjllb:
soup_xjllb = BeautifulSoup(script_xjllb.text.strip(), 'lxml')
else:
script_xjllb = soup_name.find('script', {'id': 'xjllb_yh'})
if script_xjllb:
soup_xjllb = BeautifulSoup(script_xjllb.text.strip(), 'lxml')
else:
script_xjllb = soup_name.find('script', {'id': 'xjllb_bx'})
soup_xjllb = BeautifulSoup(script_xjllb.text.strip(), 'lxml')
except:
log.info(f'---error: {social_code}, {com_code}---')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url_name, f'{info_date}现金流量表失败')
list_zcfzb = []
for one_info in soup_zcfzb.find_all('tr')[2:]:
if 'value.' not in one_info.text:
continue
info_name = one_info.find('span').text
if '审计意见' in info_name:
continue
info_name_en = re.findall('value\.(.*?)\)}}', one_info.text)[0]
if info_name_en:
try:
info_data = data_json_zcfzb[info_name_en]
except:
continue
else:
continue
if not info_data:
info_data = '--'
dic_info_zcfzb = {
"name": info_name,
'enName': info_name_en,
"value": info_data
}
list_zcfzb.append(dic_info_zcfzb)
log.info(f'----list_zcfzb:采集条数{len(list_zcfzb)}----')
list_lrb = []
for one_info in soup_lrb.find_all('tr')[2:]:
if 'value.' not in one_info.text:
continue
info_name = one_info.find('span').text
if '审计意见' in info_name:
continue
info_name_en = re.findall('value\.(.*?)\)}}', one_info.text)[0]
if info_name_en:
try:
info_data = data_json_lrb[info_name_en]
except:
continue
else:
continue
if not info_data:
info_data = '--'
dic_info_lrb = {
"name": info_name,
'enName': info_name_en,
"value": info_data
}
list_lrb.append(dic_info_lrb)
list_xjllb = []
for one_info in soup_xjllb.find_all('tr')[2:]:
if '补充资料' in one_info.text:
break
if 'value.' not in one_info.text:
continue
info_name = one_info.find('span').text
if '审计意见' in info_name:
continue
info_name_en = re.findall('value\.(.*?)\)}}', one_info.text)[0]
if info_name_en:
try:
info_data = data_json_xjllb[info_name_en]
except:
continue
else:
continue
if not info_data:
info_data = '--'
dic_info_xjllb = {
"name": info_name,
'enName': info_name_en,
"value": info_data
}
list_xjllb.append(dic_info_xjllb)
dic_info = {
"socialCreditCode": social_code,
"securitiesCode": com_code[2:],
"date": info_date,
"debt": list_zcfzb,
"profit": list_lrb,
"cash": list_xjllb,
"ynFirst": ynFirst,
}
info_date_list.append(info_date)
return dic_info
except:
# delist_json = {'info_date':info_date,'com_code': com_code, 'social_code': social_code}
log.info(f'---{info_date}报告期无数据,股票代码:{com_code}----')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url_name, f'{info_date}--报告期无数据--{com_code}')
#如果本期无数据 就把日期记录下来
delist_all.append(info_date)
def getReportTime():
# timeNow = baseCore.getNowTime(1)[:10]
list_date = []
# 2023-04-01
# 获取当前日期和时间
current_date = datetime.now()
# 计算昨天的日期
yesterday = current_date - timedelta(days=1)
# 格式化昨天的日期
report_date = yesterday.strftime('%Y-%m-%d')
list_date.append(report_date)
year = int(current_date.strftime('%Y'))
# list_date = ['2023-03-31']
list_month = ['-12-31', '-09-30', '-06-30', '-03-31']
for year in range(2022, 2018, -1):
for month in list_month:
date = str(year) + month
list_date.append(date)
return list_date
def job(taskType):
# 将上市企业库中的全部A股代码存入list
# 需要提供股票代码、企业信用代码
while True:
#从redis中获取企业信用代码
social_code = baseCore.redicPullData('FinanceFromEast:finance_socialCode')
# 判断 如果Redis中已经没有数据,则等待
if social_code == None:
time.sleep(20)
continue
sql_sel = f'''select securities_code,exchange from sys_base_enterprise_ipo where category = '1' and social_credit_code='{social_code}' '''
cursor.execute(sql_sel)
row = cursor.fetchone()
securities_code = row[0]
exchange = row[1]
# for code in list_code:
# social_code = rows[0]
# exchange = rows[2]
# if code==rows[1]:
# securities_code = code
# else:
# continue
if exchange == 1:
com_code = 'bj' + securities_code
if exchange == 2:
com_code = 'sh' + securities_code
if exchange == 3:
com_code = 'sz' + securities_code
# if com_code=='sz002163':
list_date = getReportTime()
delist = [] # 记录该企业所有无数据的报告期
date_list = [] # 记录该企业所有数据的报告期
start_time = time.time()
# 分别对每个报告期进行采集
for info_date in list_date:
delist_all = []
info_date_list = []
dic_info = get_info(social_code, com_code, info_date, delist_all, info_date_list,taskType)
# print(dic_info)
# 将采集后的报告期存入redis
if len(dic_info)!=0:
# 调凯歌接口存储数据
data = json.dumps(dic_info)
# print(data)
url_baocun = 'http://114.115.236.206:8088/sync/finance/df'
for nnn in range(0, 3):
try:
res_baocun = requests.post(url_baocun, data=data)
break
except:
time.sleep(1)
print(res_baocun.text)
if len(info_date_list) != 0:
for date in info_date_list:
date_list.append(date)
print(date_list)
date_list = str(date_list)
for nnn in range(0, 3):
try:
add_date(com_code,date_list)
break
except:
time.sleep(1)
end_time = time.time()
log.info(f'===={com_code}====该企业耗时{end_time-start_time}===')
cnx.close()
cursor.close()
baseCore.close()
if __name__=='__main__':
task_type = '财务数据/东方财富网'
job(task_type)
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论