提交 4a3bd851 作者: 刘伟刚

Merge remote-tracking branch 'origin/master'

...@@ -449,6 +449,21 @@ def omeng(): ...@@ -449,6 +449,21 @@ def omeng():
# r.rpush('gnOMEnterprise_socialcode:Notice', item) # r.rpush('gnOMEnterprise_socialcode:Notice', item)
closeSql(cnx, cursor) closeSql(cnx, cursor)
#单项冠军
def danxiangguanjun():
pass
#科改示范
def kegaishifan():
pass
#双百企业
def shuangbaiqiye():
pass
#专精特新
def zhuangjingtexind():
pass
if __name__ == "__main__": if __name__ == "__main__":
start = time.time() start = time.time()
......
...@@ -46,8 +46,8 @@ def doJob(): ...@@ -46,8 +46,8 @@ def doJob():
if tycid == None or tycid == '': if tycid == None or tycid == '':
try: try:
retData = getTycIdByXYDM(xydm) retData = getTycIdByXYDM(xydm)
if retData: if retData['state']:
tycid = retData['id'] tycid = retData['tycData']['id']
# todo:写入数据库 # todo:写入数据库
updateSql = f"update EnterpriseInfo set TYCID = '{tycid}' where SocialCode = '{xydm}'" updateSql = f"update EnterpriseInfo set TYCID = '{tycid}' where SocialCode = '{xydm}'"
cursor_.execute(updateSql) cursor_.execute(updateSql)
......
...@@ -101,8 +101,8 @@ def spider(com_name,cik,up_okCount): ...@@ -101,8 +101,8 @@ def spider(com_name,cik,up_okCount):
#解析页面 #解析页面
for nnn in range(0,4): for nnn in range(0,4):
try: try:
req = requests.get(url=url_json,headers=header,proxies=ip_dic,verify=False,timeout=30) # req = requests.get(url=url_json,headers=header,proxies=ip_dic,verify=False,timeout=30)
# req = requests.get(url=url_json, headers=header, verify=False, timeout=30) req = requests.get(url=url_json, headers=header, verify=False, timeout=30)
break break
except: except:
time.sleep(2) time.sleep(2)
......
import json
import re
import time
from itertools import groupby
from operator import itemgetter
import pymysql
import redis
import requests
from bs4 import BeautifulSoup
from requests.adapters import HTTPAdapter
from requests.packages import urllib3
from retry import retry
from base import BaseCore
urllib3.disable_warnings()
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
cnx = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='caiji',
charset='utf8mb4')
cursor = cnx.cursor()
URL = 'https://www.nasdaq.com/'
session = requests.session()
session.mount('https://', HTTPAdapter(pool_connections=20, pool_maxsize=100))
session.mount('http://', HTTPAdapter(pool_connections=20, pool_maxsize=100))
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36',
}
taskType = '财务数据/纳斯达克'
# 判断股票代码是否存在
@retry(tries=3, delay=1)
def check_code(com_code):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=3)
res = r.exists('caiwu_nasdaq_code::' + com_code)
# 如果key存在 则不是第一次采集该企业, res = 1
if res:
return False # 表示不是第一次采集
else:
return True # 表示是第一次采集
# 判断采集日期是否存在
@retry(tries=3, delay=1)
def check_date(com_code, info_date):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=3)
res = r.sismember('caiwu_nasdaq_code::' + com_code, info_date) # 注意是 保存set的方式
if res:
return True
else:
return False
# 将采集后的股票代码对应的报告期保存进redis
@retry(tries=3, delay=1)
def add_date(com_code, date_list):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=3)
# 遍历date_list 放入redis
for date in date_list:
res = r.sadd('caiwu_nasdaq_code::' + com_code, date)
# 数据发送端口
def sendData(start_time, social_code, gpdm, dic_info):
data = json.dumps(dic_info)
url_baocun = 'http://114.115.236.206:8088/sync/finance/nsdk'
for nnn in range(0, 3):
try:
res_baocun = requests.post(url_baocun, data=data)
log.info(f'{social_code}=={gpdm}财务数据保存接口成功')
break
except:
log.error(f'{social_code}=={gpdm}财务数据保存接口失败')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', f'{social_code}===财务数据保存接口失败')
time.sleep(1)
# 获取单位
def getUnit(gpdm):
url = f'https://www.nasdaq.com/market-activity/stocks/{gpdm}/financials'
req = requests.get(url, headers=headers, verify=False)
req.encoding = req.apparent_encoding
soup = BeautifulSoup(req.text, 'lxml')
unit = soup.find('div', class_='financials__note').text.split(' ')[1].lstrip().strip()
unit = f'(千){unit}'
req.close()
return unit
# 获取财务数据列表
def getlist(table, tableName):
list = []
years = table['headers']
datas = table['rows']
for data in datas:
for i in range(2, len(data) + 1):
name = data['value1']
value = data[f'value{i}']
if any(char.isdigit() for char in value):
value = re.sub(r"[^\d+-]", "", value)
else:
value = '-'
date = years[f'value{i}'].split('/')[2] + '-' + years[f'value{i}'].split('/')[0] + '-' + \
years[f'value{i}'].split('/')[1]
list.append({f'{tableName}': name, 'value': value, 'date': date, })
return list
# 财务数据按年份整合
def combanBydate(balance_list):
listbydates = []
balance_list.sort(key=itemgetter('date'))
groups = groupby(balance_list, key=itemgetter('date'))
for date, group in groups:
# 迭代表达式 一个日期的表
listbydate = [item for item in group]
listbydates.append([date, listbydate])
return listbydates
# 构造规范财务数据列表
def reviseData(lists, unit, tableName):
list_r = []
for data in lists:
list = {
'enName': data[f'{tableName}'],
'value': data['value'],
'unit': unit
}
list_r.append(list)
return list_r
# 获取年度财务数据
def getYear(start_time, session, social_code, gpdm):
ynFirst = check_code(social_code)
date_list = []
url = f'https://api.nasdaq.com/api/company/{gpdm}/financials?frequency=1'
try:
req = session.get(url, headers=headers, verify=False)
req.encoding = req.apparent_encoding
data = req.json()['data']
if data:
unit = getUnit(gpdm)
all_list = []
lrb_list = getlist(data['incomeStatementTable'], 'lrb')
zcfz_list = getlist(data['balanceSheetTable'], 'zcfz')
xjll_list = getlist(data['cashFlowTable'], 'xjll')
for list in lrb_list:
all_list.append(list)
for list in zcfz_list:
all_list.append(list)
for list in xjll_list:
all_list.append(list)
all_group = combanBydate(all_list)
date_list = []
for date, final_list in all_group:
# 判断该报告期是否已采过
panduan = check_date(social_code, date + '-year')
if panduan:
continue
xjll_list_f = reviseData([item for item in final_list if 'xjll' in item], unit, 'xjll')
zcfz_list_f = reviseData([item for item in final_list if 'zcfz' in item], unit, 'zcfz')
lrb_list_f = reviseData([item for item in final_list if 'lrb' in item], unit, 'lrb')
dic_info = {
"socialCreditCode": social_code,
"securitiesCode": gpdm,
"date": date,
"debt": zcfz_list_f,
"profit": lrb_list_f,
"cash": xjll_list_f,
'dateFlag': 'year',
"ynFirst": ynFirst,
}
sendData(start_time, social_code, gpdm, dic_info)
date_list.append(date + '-year')
else:
log.error(f'找不到{social_code}=={gpdm}年度财务数据')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===无年度财务数据')
except:
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===年度财务数据访问失败')
return date_list
# 获取季度财务数据 需要判断日期是否取与年度数据日期重合,重合需要修改类型为dateFlag字段为year
def getQuarter(start_time, session, social_code, gpdm):
ynFirst = check_code(social_code)
date_list = []
url = f'https://api.nasdaq.com/api/company/{gpdm}/financials?frequency=2'
try:
req = session.get(url, headers=headers, verify=False)
req.encoding = req.apparent_encoding
data = req.json()['data']
if data:
unit = getUnit(gpdm)
all_list = []
lrb_list = getlist(data['incomeStatementTable'], 'lrb')
zcfz_list = getlist(data['balanceSheetTable'], 'zcfz')
xjll_list = getlist(data['cashFlowTable'], 'xjll')
for list in lrb_list:
all_list.append(list)
for list in zcfz_list:
all_list.append(list)
for list in xjll_list:
all_list.append(list)
all_group = combanBydate(all_list)
for date, final_list in all_group:
# 判断该报告期是否已采过
panduan = check_date(social_code, date + '-quarter')
if panduan:
continue
xjll_list_f = reviseData([item for item in final_list if 'xjll' in item], unit, 'xjll')
zcfz_list_f = reviseData([item for item in final_list if 'zcfz' in item], unit, 'zcfz')
lrb_list_f = reviseData([item for item in final_list if 'lrb' in item], unit, 'lrb')
dic_info = {
"socialCreditCode": social_code,
"securitiesCode": gpdm,
"date": date,
"debt": zcfz_list_f,
"profit": lrb_list_f,
"cash": xjll_list_f,
'dateFlag': 'quarter',
"ynFirst": ynFirst,
}
# 判断季度数据年份是否与年度数据年份相投
panduan_flag = check_date(social_code, date + '-year')
if panduan_flag:
dic_info['dateFlag'] = 'year'
sendData(start_time, social_code, gpdm, dic_info)
date_list.append(date + '-quarter')
else:
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===无季度财务数据')
except:
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, f'{social_code}===季度财务数据访问失败')
return date_list
def doJob():
# while True:
# social_code = baseCore.redicPullData('')
# datas_enterprise = baseCore.getInfomation(social_code)
session.get(URL, headers=headers)
# sql = "select * from mgzqyjwyh_list where state=2 and exchange='Nasdaq';"
# cursor.execute(sql)
# datas_enterprise = cursor.fetchall()
# for data_enterprise in datas_enterprise:
start_time = time.time()
# gpdm = data_enterprise[3]
# social_code = data_enterprise[6]
social_code = 'ZD0CN0012309000172'
gpdm = 'NTES'
# 采集年度数据
date_list_year = getYear(start_time, session, social_code, gpdm)
# 保存年度数据到redis
add_date(social_code, date_list_year)
# 采集季度数据
date_list_quarter = getQuarter(start_time, session, social_code, gpdm)
# 保存季度数据到redis
add_date(social_code, date_list_quarter)
timeCost = baseCore.getTimeCost(start_time, time.time())
state = 1
baseCore.recordLog(social_code, taskType, state, timeCost, '', '')
log.info(f'{social_code}=={gpdm}==耗时{timeCost}')
# break
cursor.close()
cnx.close()
if __name__ == '__main__':
doJob()
...@@ -41,6 +41,14 @@ type_map = { ...@@ -41,6 +41,14 @@ type_map = {
'9605':'公司公告', '9605':'公司公告',
'9533':'公司公告', '9533':'公司公告',
} }
type_id_map = {
'公司公告': '8',
'股转公告': '9',
'挂牌审核': '10',
'自律监管措施': '11',
'问询函': '12',
'纪律处分': '13'
}
def secrchATT(item_id, name, type_id): def secrchATT(item_id, name, type_id):
sel_sql = '''select id from clb_sys_attachment where item_id = %s and name = %s and type_id=%s ''' sel_sql = '''select id from clb_sys_attachment where item_id = %s and name = %s and type_id=%s '''
...@@ -157,7 +165,7 @@ def InsterInto(short_name, social_code, pdf_url): ...@@ -157,7 +165,7 @@ def InsterInto(short_name, social_code, pdf_url):
return insert return insert
def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time,com_name,num): def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time,com_name,num,kfkid):
#上传至文件服务器 #上传至文件服务器
retData = baseCore.upLoadToServe(pdf_url,8,social_code) retData = baseCore.upLoadToServe(pdf_url,8,social_code)
#附件插入att数据库 #附件插入att数据库
...@@ -192,7 +200,7 @@ def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time,com_na ...@@ -192,7 +200,7 @@ def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time,com_na
'sourceAddress': pdf_url, # 原文链接 'sourceAddress': pdf_url, # 原文链接
'summary': '', 'summary': '',
'title': pdf_name, 'title': pdf_name,
'type': 3, 'type': kfkid,
'socialCreditCode': social_code, 'socialCreditCode': social_code,
'year': year 'year': year
} }
...@@ -241,6 +249,7 @@ def SpiderByZJH(url, dic_info, start_time,num): # dic_info 数据库中获取 ...@@ -241,6 +249,7 @@ def SpiderByZJH(url, dic_info, start_time,num): # dic_info 数据库中获取
pdf_url = 'https://www.neeq.com.cn' + rp['destFilePath'] pdf_url = 'https://www.neeq.com.cn' + rp['destFilePath']
name_pdf = rp['disclosureTitle'] name_pdf = rp['disclosureTitle']
rp_type = type_map[rp['disclosureType']] rp_type = type_map[rp['disclosureType']]
kfkid = type_id_map[rp_type]
publishDate = rp['publishDate'] publishDate = rp['publishDate']
year = publishDate[:4] year = publishDate[:4]
# 数据入库 # 数据入库
...@@ -250,7 +259,7 @@ def SpiderByZJH(url, dic_info, start_time,num): # dic_info 数据库中获取 ...@@ -250,7 +259,7 @@ def SpiderByZJH(url, dic_info, start_time,num): # dic_info 数据库中获取
# okCount = okCount + 1 # okCount = okCount + 1
# 解析PDF内容,先获取PDF链接 下载 解析成功,解析失败 ,传输成功,传输失败 # 解析PDF内容,先获取PDF链接 下载 解析成功,解析失败 ,传输成功,传输失败
log.info(f'======={short_name}===========插入公告库成功') log.info(f'======={short_name}===========插入公告库成功')
result = GetContent(pdf_url, name_pdf, social_code, year, publishDate, start_time, com_name, num) result = GetContent(pdf_url, name_pdf, social_code, year, publishDate, start_time, com_name, num,kfkid)
if result: if result:
# 公告信息列表 # 公告信息列表
...@@ -300,17 +309,18 @@ if __name__ == '__main__': ...@@ -300,17 +309,18 @@ if __name__ == '__main__':
while True: while True:
start_time = time.time() start_time = time.time()
# # 获取企业信息 # # 获取企业信息
# # social_code = baseCore.redicPullData('NoticeEnterpriseFbs:gnqy_socialCode') social_code = baseCore.redicPullData('NQEnterprise:nq_finance')
social_code = '9110000071092841XX' # social_code = '9110000071092841XX'
com_code = '430045' # com_code = '430045'
short_name = '超毅网络' # short_name = '超毅网络'
dic_info = {} dic_info = {}
# # 判断 如果Redis中已经没有数据,则等待 # # 判断 如果Redis中已经没有数据,则等待
# if social_code == None: if social_code == None:
# time.sleep(20) time.sleep(20)
# continue continue
# dic_info = baseCore.getInfomation(social_code) data = baseCore.getInfomation(social_code)
# count = dic_info[16] com_code = data[3]
short_name = data[4]
url = 'https://www.neeq.com.cn/disclosureInfoController/productInfoResult.do' url = 'https://www.neeq.com.cn/disclosureInfoController/productInfoResult.do'
#翻页 page 0~ 25 totalPages #翻页 page 0~ 25 totalPages
......
...@@ -56,7 +56,7 @@ if __name__=="__main__": ...@@ -56,7 +56,7 @@ if __name__=="__main__":
url = "https://mp.weixin.qq.com/" url = "https://mp.weixin.qq.com/"
browser.get(url) browser.get(url)
# 可改动 # 可改动
time.sleep(60) time.sleep(20)
s = requests.session() s = requests.session()
#获取到token和cookies #获取到token和cookies
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论