提交 b271b47d 作者: 薛凌堃

9/8 美国企业年报 新增去重逻辑

上级 f89f6cc9
...@@ -22,7 +22,8 @@ from kafka import KafkaProducer ...@@ -22,7 +22,8 @@ from kafka import KafkaProducer
from base.BaseCore import BaseCore from base.BaseCore import BaseCore
baseCore = BaseCore() baseCore = BaseCore()
log = baseCore.getLogger() log = baseCore.getLogger()
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
def paserUrl(html,listurl): def paserUrl(html,listurl):
# soup = BeautifulSoup(html, 'html.parser') # soup = BeautifulSoup(html, 'html.parser')
# 获取所有的<a>标签和<img>标签 # 获取所有的<a>标签和<img>标签
...@@ -71,7 +72,7 @@ def get_news(news_url,ip_dic): ...@@ -71,7 +72,7 @@ def get_news(news_url,ip_dic):
return result return result
def spider(com_name,cik): def spider(com_name,cik,up_okCount):
header = { header = {
'Host':'data.sec.gov', 'Host':'data.sec.gov',
'Connection':'keep-alive', 'Connection':'keep-alive',
...@@ -98,9 +99,18 @@ def spider(com_name,cik): ...@@ -98,9 +99,18 @@ def spider(com_name,cik):
# url_json = 'https://data.sec.gov/submissions/CIK0001395064.json' # url_json = 'https://data.sec.gov/submissions/CIK0001395064.json'
#解析页面 #解析页面
req = requests.get(url=url_json,headers=header,proxies=ip_dic,verify=False,timeout=30) for nnn in range(0,4):
# ,proxies=ip_dic) try:
data = req.json() req = requests.get(url=url_json,headers=header,proxies=ip_dic,verify=False,timeout=30)
# ,proxies=ip_dic)
except:
time.sleep(2)
continue
try:
data = req.json()
except:
baseCore.rePutIntoR('AnnualEnterprise:usqy_socialCode',social_code)
return
info = data['filings']['recent'] info = data['filings']['recent']
form_type_list = info['form'] form_type_list = info['form']
accessionNumber_list = info['accessionNumber'] accessionNumber_list = info['accessionNumber']
...@@ -121,6 +131,18 @@ def spider(com_name,cik): ...@@ -121,6 +131,18 @@ def spider(com_name,cik):
u_2 = accessionNumber.replace('-','') u_2 = accessionNumber.replace('-','')
u_3 = primaryDocument_list[i] u_3 = primaryDocument_list[i]
news_url = 'https://www.sec.gov/Archives/edgar/data/' + u_1 + '/' + u_2 + '/' + u_3 news_url = 'https://www.sec.gov/Archives/edgar/data/' + u_1 + '/' + u_2 + '/' + u_3
try:
sel_sql = '''select social_credit_code from brpa_source_article where source_address = %s and social_credit_code=%s and type='1' '''
cursor_.execute(sel_sql, (news_url, social_code))
except Exception as e:
print(e)
selects = cursor_.fetchone()
if selects:
log.info(f'{cik}-----{social_code}----{news_url}:已经存在')
#全量采集使用
continue
else:
pass
soup = get_news(news_url,ip_dic) soup = get_news(news_url,ip_dic)
if soup: if soup:
pass pass
...@@ -132,6 +154,27 @@ def spider(com_name,cik): ...@@ -132,6 +154,27 @@ def spider(com_name,cik):
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
title = f'{com_name}:{year}年年度报告' title = f'{com_name}:{year}年年度报告'
try:
insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,create_time) values(%s,%s,%s,%s,now())'''
# 动态信息列表
up_okCount = up_okCount + 1
list_info = [
social_code,
news_url,
'SEC',
'1',
]
cursor_.execute(insert_sql, tuple(list_info))
cnx_.commit()
# 采集一条资讯记录一条,记录该企业采到了多少的资讯
log.info(f'{social_code}----{news_url}:新增一条')
except Exception as e:
log.error(f'传输失败:{social_code}----{news_url}-----{e}')
e = '数据库传输失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, news_url, e)
continue
log.info(f'---{title}----采集完成----发送数据----') log.info(f'---{title}----采集完成----发送数据----')
dic_news = { dic_news = {
'attachmentIds': '', 'attachmentIds': '',
...@@ -156,7 +199,7 @@ def spider(com_name,cik): ...@@ -156,7 +199,7 @@ def spider(com_name,cik):
# print(dic_news) # print(dic_news)
# 将相应字段通过kafka传输保存 # 将相应字段通过kafka传输保存
try: try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'],batch_size=54788160,max_request_size=102400000) producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'],batch_size=10240000,buffer_memory=33554432)
kafka_result = producer.send("researchReportTopic", kafka_result = producer.send("researchReportTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8')) json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
...@@ -257,7 +300,7 @@ if __name__ == '__main__': ...@@ -257,7 +300,7 @@ if __name__ == '__main__':
start_time = time.time() start_time = time.time()
# 获取企业信息 # 获取企业信息
# social_code = baseCore.redicPullData('AnnualEnterprise:usqy_socialCode') # social_code = baseCore.redicPullData('AnnualEnterprise:usqy_socialCode')
social_code = 'ZZSN230711140539000' social_code = 'ZZSN22080900000024'
if not social_code: if not social_code:
time.sleep(20) time.sleep(20)
continue continue
...@@ -305,8 +348,9 @@ if __name__ == '__main__': ...@@ -305,8 +348,9 @@ if __name__ == '__main__':
# print(cik) # print(cik)
# break # break
# break # break
spider(com_name,cik) up_okCount = 0
break spider(com_name,cik,up_okCount)
# break
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论