提交 5a10db80 作者: XveLingKun

Merge remote-tracking branch 'origin/master'

from base import BaseCore
baseCore = BaseCore.BaseCore()
r = baseCore.r
log = baseCore.getLogger()
cursor = baseCore.cursor_
sql = "SELECT social_credit_code FROM sys_base_enterprise_ipo WHERE securities_type='新三板' and listed='1'"
cursor.execute(sql)
results = cursor.fetchall()
nq_social_list = [item[0] for item in results]
for item in nq_social_list:
r.rpush('Jingyingfenxi:finance_socialCode_xsb', item)
baseCore.close()
import json
import time
import urllib.parse
from decimal import Decimal
import redis
from retry import retry
from base import BaseCore
import requests
baseCore = BaseCore.BaseCore()
cursor = baseCore.cursor_
r = baseCore.r
log = baseCore.getLogger()
headers = {
'Host': 'xinsanban.eastmoney.com',
'Connection': 'keep-alive',
'Accept': '*/*',
'Sec-Fetch-Dest': 'empty',
'X-Requested-With': 'XMLHttpRequest',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.116 Safari/537.36',
'Sec-Fetch-Site': 'same-origin',
'Sec-Fetch-Mode': 'cors',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9',
}
@retry(tries=2, delay=5)
def getRes(code, type):
ip = baseCore.get_proxy()
type = urllib.parse.quote(type)
url = f'https://xinsanban.eastmoney.com/api/F10/CompanyInfo/GetMainBusiness?code={code}&type={type}'
req = requests.get(url, headers=headers, proxies=ip)
req.encoding = req.apparent_encoding
# print(req.text)
results = req.json()['result']
return results
def sendData(dics, code):
jsonDics = json.dumps(dics)
# 192.168.1.77 114.115.236.206
try:
# response = requests.post('http://192.168.1.28:8088/sync/operationAnalysis', data=jsonDics,
# timeout=300,
# verify=False)
response = requests.post('http://114.115.236.206:8088/sync/operationAnalysis', data=jsonDics,
timeout=300,
verify=False)
log.info(f'{code}==={response.text}')
except Exception as e:
log.info(e)
log.info(f'数据疑似有重复===={code}')
def getDic(result, code, type):
mainComposition = result['ITEMNAME']
select_sql = f'''select stock_code from operation_analysis where main_composition = %s and stock_code = %s and date = %s and category = %s '''
cursor.execute(select_sql, (mainComposition, code, f"{result['REPORTDATE']}", f'按{type}分类'))
flg = cursor.fetchall()
if flg:
# log.info(f"{code}==={mainComposition}==={result['REPORTDATE']}==={type}===已采集")
return {}
if result['MBREVENUE']:
mainIncome = str(int(Decimal(result['MBREVENUE']) * Decimal(10000))) + '元'
else:
mainIncome = '--'
if result['MBCOST']:
mainCost = str(int(Decimal(result['MBCOST']) * Decimal(10000))) + '元'
else:
mainCost = '--'
if result['MBPROFIT']:
mainProfit = str(int(Decimal(result['MBPROFIT']) * Decimal(10000))) + '元'
else:
mainProfit = '--'
if result['DEC_MAOLILV']:
grossProfitMargin = str(result['DEC_MAOLILV']) + '%'
else:
grossProfitMargin = '--'
if result['DEC_SHOURUGOUCHENG']:
incomeRatio = str(result['DEC_SHOURUGOUCHENG']) + '%'
else:
incomeRatio = '--'
if result['DEC_LIRUNGOUCHENG']:
profitRatio = str(result['DEC_LIRUNGOUCHENG']) + '%'
else:
profitRatio = '--'
if result['DEC_CHENGBENGOUCHENG']:
costRatio = str(result['DEC_CHENGBENGOUCHENG']) + '%'
else:
costRatio = '--'
dic = {
'stockCode': code,
'date': result['REPORTDATE'],
'mainComposition': mainComposition,
'mainIncome': mainIncome,
'mainCost': mainCost,
'mainProfit': mainProfit,
'grossProfitMargin': grossProfitMargin,
'incomeRatio': incomeRatio,
'profitRatio': profitRatio,
'costRatio': costRatio,
'category': f'按{type}分类'
}
return dic
def doJob():
while True:
social_code = baseCore.redicPullData('Jingyingfenxi:finance_socialCode_xsb')
if not social_code:
log.info('======已没有数据==========等待=====')
time.sleep(20)
sql_sel = f'''select securities_code,exchange from sys_base_enterprise_ipo where exchange = '1' or exchange = '2' or exchange = '3' and listed = '1' and social_credit_code='{social_code}' '''
cursor.execute(sql_sel)
row = cursor.fetchone()
try:
securities_code = row[0]
except:
log.info(f'======{social_code}没有股票代码======')
continue
log.info(f'{social_code}===开始采集')
dics = []
types = ['行业', '产品', '地区']
for type in types:
results = getRes(securities_code, type)
for result in results:
dic = getDic(result, securities_code, type)
if dic:
dics.append(dic)
if dics:
sendData(dics, securities_code)
else:
log.info(f'{securities_code}===没有数据')
log.info(f'{social_code}===采集结束')
if __name__ == '__main__':
doJob()
baseCore.close()
import pymysql
import pymysql
......@@ -152,7 +152,7 @@ def operate_analysis(i,com_code,social_code):
json_updata = json.dumps(list_updata)
#192.168.1.77 114.115.236.206
try:
response = requests.post('http://114.115.236.206:9988/datapull/sync/operationAnalysis', data=json_updata,
response = requests.post('http://114.115.236.206:8088/sync/operationAnalysis', data=json_updata,
timeout=300,
verify=False)
time.sleep(3)
......@@ -188,7 +188,7 @@ while True:
cnx = baseCore.cnx_
cursor = baseCore.cursor_
sql_sel = f'''select securities_code,exchange from sys_base_enterprise_ipo where exchange = '1' or exchange = '2' or exchange = '3' and listed = '1' and social_credit_code='{social_code}' '''
sql_sel = f'''select securities_code,exchange from sys_base_enterprise_ipo where (exchange = '1' or exchange = '2' or exchange = '3') and listed = '1' and social_credit_code='{social_code}' '''
cursor.execute(sql_sel)
row = cursor.fetchone()
try:
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论