提交 0f75587a 作者: 薛凌堃

美国证券交易委员会--基本信息和年报

上级 237f5376
...@@ -312,6 +312,20 @@ def BaseInfoAbroad_task(): ...@@ -312,6 +312,20 @@ def BaseInfoAbroad_task():
print('定时采集异常', e) print('定时采集异常', e)
pass pass
def SEC_CIK():
cnx,cursor = connectSql()
cik_query = "select cik from mgzqyjwyh_list where state=2"
cursor.execute(cik_query)
cik_result = cursor.fetchall()
cik_list = [item[0] for item in cik_result]
print('=====')
for item in cik_list:
r.rpush('Sec_cik_US:uscik_baseinfo',item)
r.rpush('Sec_cik_US:uscik_annualReport', item)
closeSql(cnx,cursor)
#福布斯=====从数据库中读取信息放入redis #福布斯=====从数据库中读取信息放入redis
def FBS(): def FBS():
cnx,cursor = connectSql() cnx,cursor = connectSql()
...@@ -437,7 +451,8 @@ if __name__ == "__main__": ...@@ -437,7 +451,8 @@ if __name__ == "__main__":
# BaseInfoEnterprise() # BaseInfoEnterprise()
# FBS() # FBS()
# MengZhi() # MengZhi()
NQEnterprise() # NQEnterprise()
SEC_CIK()
# omeng() # omeng()
# AnnualEnterpriseUS() # AnnualEnterpriseUS()
# NoticeEnterprise_task() # NoticeEnterprise_task()
......
"""
美国交易委员会,基本信息,通过拼接企业的链接得到json数据,解析基本信息
https://data.sec.gov/submissions/CIK0000353278.json
部署在192.168.1.232服务器
"""
import json
import time
import requests
from kafka import KafkaProducer
from base.BaseCore import BaseCore
import urllib3
urllib3.disable_warnings()
baseCore = BaseCore()
log = baseCore.getLogger()
cnx = baseCore.cnx
cursor = baseCore.cursor
def fromcikgetinfo(cik):
query = f"select * from mgzqyjwyh_list where cik='{cik}' "
cursor.execute(query)
data = cursor.fetchone()
return data
def getRequest(url):
headers = {
'Host': 'data.sec.gov',
'Connection': 'keep-alive',
'Cache-Control': 'max-age=0',
'sec-ch-ua': '"Chromium";v="116", "Not)A;Brand";v="24", "Google Chrome";v="116"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-User': '?1',
'Sec-Fetch-Dest': 'document',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Cookie': '_ga=GA1.2.784424676.1695174651; _4c_=%7B%22_4c_s_%22%3A%22lZFLT4QwFIX%2FyqRrILS0pbAzmBgXajQ%2BlhNpLwOZcUoKDo4T%2Fru3gMbHym5ov55zcjk9kaGGPcmpzARNuVRcxElAtnDsSH4irjH%2BcyA50awsDTUq1ElShZwZCMuKmbASSQUUKsYoIwF5w6w0ZpmIpeBKqTEgul0yTkRbA5hFs4iqKA6rDh39OxKuYty2zppX3a%2F7Y%2BtlA5SrzmzxwsCh0bAeGtPX3s8m%2BUJraDZ1jzhlE22dl0QC90OzN3b47Vvol0%2BkFGnp7NCB9xa1sy%2BwolQitlgEeZocfloHFTg3yfDUNb0ftAMdbexhAVjezMKZPTaemtV9cYf8%2Bhu5LW6uFtT6jv0YO6ufdz4UnyUgF2frh8tz%2F2%2BKc8ZlKqPPpxKUjHPfCJiksRAZldhnvyO5kjz2a5yTp%2FrpTzVXWfZXPbcQ%2Bulh%2Fx%2FrOH4A%22%7D; _ga_300V1CHKH1=GS1.1.1695174651.1.1.1695174684.0.0.0; ak_bmsc=91C6D28D093861656DB8C1FC1972DAB6~000000000000000000000000000000~YAAQlQ8kF2U6orCKAQAAgyl9uxX8kNk3C77pkMi6N6RxnsUqDbYEmIcNjtLSa8W6kfGL9cQMRHBUaYcbEA1+oXsvUwUF80G8hmH/F4S0ZOEnVCrlcBLx219N24l2qmoSKtVDH+VKe7c1bji9MHc7tO2R56R7juZJv9gceAdtKEuArkPfD8ijx/TyEgIrM+XruGtzCRmLnfq86UoJYP+j+tXcaWkc/qm1zHDReDNf/cHd6h2aRMs4lsES8+uh6YTjE7bfCp8h2DNJ2e07pm0ojcI/kdycUPHmuTqWPdTBEjUybad31E1hRNBAE8PbGjy2lvlPY/piuN3HX3Q5ifsmTqCNJzynN2kjGm6i4SHhmEAijUeIzNQXB11GrVmALJVV6pEjd/uu; bm_sv=FD8981426EA388050697DFB615BAFFE3~YAAQ1wcsF5K72ZSKAQAAsvl/uxUw0do3nknGCkllXH27UZBpM7kQUXm4crBNTAkhek5YSDKIrrm2uFWidfpBfyxbRSr+w7FH7Y0w4cXMAa7BELzcc/B9Uf8T6e2I2W29wjurKkBFtSseslHSqYD3BWx9/GidJMW+dFNrlzNUMd1dONUR9J1TDnYifPhE6A/zSLPHVrCTJl7xzg7VlW/05Ay0i+Bo7TynZdWgotfjET3vg2/ZVixVSGaWeQo4~1'
}
for m in range(0,3):
try:
response = requests.get(url=url,headers=headers,verify=False)
break
except Exception as e:
log.error(f"request请求异常-------{e}")
continue
# 检查响应状态码
if response.status_code == 200:
jsonData = response.json()
return jsonData
else:
return False
if __name__=='__main__':
taskType = '基本信息/SEC'
while True:
start_time = time.time
# todo:从redis中获取企业cik
cik = baseCore.redicPullData('sec_cik_US:uscik_baseinfo')
# cik = '320193'
#通过cik去数据库中获取信息
data = fromcikgetinfo(cik)
com_name = data[2]
com_code = data[3]
exchange = data[4]
social_code = data[6]
cname = data[11]
#拼接链接的cik是十位数
url_cik = cik
while True:
if len(url_cik) < 10:
url_cik = '0' + url_cik
else:
break
url = f'https://data.sec.gov/submissions/CIK{url_cik}.json'
jsonData = getRequest(url)
if jsonData:
sicDescription = jsonData['sicDescription'] # 行业类型
ein = jsonData['ein'] # 联邦税号
address = jsonData['addresses']
city = address['business']['city']
business_address = address['business']['street1'] + ',' + city + ' ' + address['business']['stateOrCountryDescription']
phone = jsonData['phone'] # 电话
try:
formerNames = jsonData['formerNames'][0]['name'] # 曾用名
except:
formerNames = ''
com_dict = {
'name': cname, # 企业名称
'socialCreditCode': social_code, # 统一社会信用代码
'officialPhone': phone, # 电话
'officialUrl': '', # 官网
'officialEmail': '', # 邮箱
'industry': sicDescription, # 所属行业
'city': city, # 所属市
'beforeName': formerNames, # 曾用名
'englishName': com_name, # 英文名
'address': business_address, # 地址
'status': 0, # 状态
}
print(com_dict)
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 0, 2))
kafka_result = producer.send("regionInfo", json.dumps(com_dict, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
log.info(f'{cik}---{com_name}---基本信息采集成功')
except:
exception = 'kafka传输失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', exception)
log.info(f"{com_name}--{social_code}--kafka传输失败")
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
2. 根据cik 拼接链接拿到json数据 2. 根据cik 拼接链接拿到json数据
3. 遍历json数组文件 拼接详情链接 3. 遍历json数组文件 拼接详情链接
4. 解析详情文章 通过kafka发送数据 4. 解析详情文章 通过kafka发送数据
部署在192.168.1.235服务器
""" """
import json import json
import re import re
...@@ -102,6 +102,7 @@ def spider(com_name,cik,up_okCount): ...@@ -102,6 +102,7 @@ def spider(com_name,cik,up_okCount):
for nnn in range(0,4): for nnn in range(0,4):
try: try:
req = requests.get(url=url_json,headers=header,proxies=ip_dic,verify=False,timeout=30) req = requests.get(url=url_json,headers=header,proxies=ip_dic,verify=False,timeout=30)
# req = requests.get(url=url_json, headers=header, verify=False, timeout=30)
break break
except: except:
time.sleep(2) time.sleep(2)
...@@ -109,7 +110,7 @@ def spider(com_name,cik,up_okCount): ...@@ -109,7 +110,7 @@ def spider(com_name,cik,up_okCount):
try: try:
data = req.json() data = req.json()
except: except:
baseCore.rePutIntoR('AnnualEnterprise:usqy_socialCode',social_code) baseCore.rePutIntoR('Sec_cik_US:uscik_annualReport',social_code)
return return
info = data['filings']['recent'] info = data['filings']['recent']
form_type_list = info['form'] form_type_list = info['form']
...@@ -274,6 +275,11 @@ def getCIK(social_code,code): ...@@ -274,6 +275,11 @@ def getCIK(social_code,code):
log.info(f'{code}....{social_code}....cik为{cik}') log.info(f'{code}....{social_code}....cik为{cik}')
return cik return cik
def fromcikgetinfo(cik):
query = f"select * from mgzqyjwyh_list where cik='{cik}' "
cursor_.execute(query)
data = cursor_.fetchone()
return data
if __name__ == '__main__': if __name__ == '__main__':
headers = { headers = {
...@@ -299,58 +305,24 @@ if __name__ == '__main__': ...@@ -299,58 +305,24 @@ if __name__ == '__main__':
while True: while True:
start_time = time.time() start_time = time.time()
# 获取企业信息 # 获取企业信息
# social_code = baseCore.redicPullData('AnnualEnterprise:usqy_socialCode') # cik = baseCore.redicPullData('Sec_cik_US:uscik_annualReport')
social_code = 'ZZSN230912210643024' cik = '320193'
if not social_code: data = fromcikgetinfo(cik)
time.sleep(20) com_name = data[2]
continue com_code = data[3]
if social_code == 'None': exchange = data[4]
time.sleep(20) social_code = data[6]
continue cname = data[11]
if social_code == '': # 拼接链接的cik是十位数
time.sleep(20) url_cik = cik
continue while True:
dic_info = baseCore.getInfomation(social_code) if len(url_cik) < 10:
count = dic_info[15] url_cik = '0' + url_cik
code = dic_info[3] else:
com_name = dic_info[1] break
cik = dic_info[13]
if code is None:
exeception = '股票代码为空'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', exeception)
continue
if cik is None:
cik = getCIK(social_code,code)
if cik == '':
exeception = 'cik为空'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', exeception)
continue
# code = 'BP'
# com_name = '英国石油公司'
# cik = ''
#"MNSO" post请求 获取企业CIK 正式
# payload = {"keysTyped":f"{code}","narrow":True}
# #测试
# # payload = {"keysTyped": "BP", "narrow":True}
# data = json.dumps(payload)
# result = getrequest(social_code,url,headers,data)
# # print(result)
# #判断接口返回的数据哪一条是该企业 根据股票代码
# tickers = result['hits']['hits']
# for ticker in tickers:
# i_t = ticker['_source']['tickers']
# if i_t == code:
# cik = ticker['_id']
# print(cik)
# break
# break
up_okCount = 0 up_okCount = 0
try: try:
spider(com_name,cik,up_okCount) spider(com_name,url_cik,up_okCount)
except Exception as e: except Exception as e:
log.error(f'{social_code}----{e}--') log.error(f'{social_code}----{e}--')
break break
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论