提交 422a49c9 作者: 薛凌堃

美国企业年报

上级 8fb1c602
......@@ -102,7 +102,7 @@ def spider(com_name,cik,up_okCount):
for nnn in range(0,4):
try:
req = requests.get(url=url_json,headers=header,proxies=ip_dic,verify=False,timeout=30)
# ,proxies=ip_dic)
break
except:
time.sleep(2)
continue
......@@ -124,12 +124,12 @@ def spider(com_name,cik,up_okCount):
log.info(form,i-1)
accessionNumber = accessionNumber_list[i-1]
#发布日期
filingDate = filingDate_list[i]
filingDate = filingDate_list[i-1]
year = filingDate[:4]
u_1 = cik
# u_1 = '1395064'
u_2 = accessionNumber.replace('-','')
u_3 = primaryDocument_list[i]
u_3 = primaryDocument_list[i-1]
news_url = 'https://www.sec.gov/Archives/edgar/data/' + u_1 + '/' + u_2 + '/' + u_3
try:
sel_sql = '''select social_credit_code from brpa_source_article where source_address = %s and social_credit_code=%s and type='1' '''
......@@ -154,27 +154,7 @@ def spider(com_name,cik,up_okCount):
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
title = f'{com_name}:{year}年年度报告'
try:
insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,create_time) values(%s,%s,%s,%s,now())'''
# 动态信息列表
up_okCount = up_okCount + 1
list_info = [
social_code,
news_url,
'SEC',
'1',
]
cursor_.execute(insert_sql, tuple(list_info))
cnx_.commit()
# 采集一条资讯记录一条,记录该企业采到了多少的资讯
log.info(f'{social_code}----{news_url}:新增一条')
except Exception as e:
log.error(f'传输失败:{social_code}----{news_url}-----{e}')
e = '数据库传输失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, news_url, e)
continue
log.info(f'---{title}----采集完成----发送数据----')
dic_news = {
'attachmentIds': '',
......@@ -199,7 +179,7 @@ def spider(com_name,cik,up_okCount):
# print(dic_news)
# 将相应字段通过kafka传输保存
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'],batch_size=10240000,buffer_memory=33554432)
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'],compression_type='gzip',batch_size=1638400,linger_ms=1,buffer_memory=33445532*2,max_request_size=8388608) #,batch_size=20480000,buffer_memory=64000000)
kafka_result = producer.send("researchReportTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
......@@ -211,7 +191,27 @@ def spider(com_name,cik,up_okCount):
'code': '200',
}
log.info(dic_result)
try:
insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,create_time) values(%s,%s,%s,%s,now())'''
# 动态信息列表
up_okCount = up_okCount + 1
list_info = [
social_code,
news_url,
'SEC',
'1',
]
cursor_.execute(insert_sql, tuple(list_info))
cnx_.commit()
# 采集一条资讯记录一条,记录该企业采到了多少的资讯
log.info(f'{social_code}----{news_url}:新增一条')
except Exception as e:
log.error(f'传输失败:{social_code}----{news_url}-----{e}')
e = '数据库传输失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, news_url, e)
continue
except Exception as e:
dic_result = {
'success': 'false',
......@@ -299,8 +299,8 @@ if __name__ == '__main__':
while True:
start_time = time.time()
# 获取企业信息
social_code = baseCore.redicPullData('AnnualEnterprise:usqy_socialCode')
# social_code = 'ZZSN22080900000024'
# social_code = baseCore.redicPullData('AnnualEnterprise:usqy_socialCode')
social_code = 'ZZSN22080900000025'
if not social_code:
time.sleep(20)
continue
......@@ -353,7 +353,7 @@ if __name__ == '__main__':
spider(com_name,cik,up_okCount)
except Exception as e:
log.error(f'{social_code}----{e}--')
# break
break
......
"""
"""
......@@ -135,7 +135,7 @@ def getUrl(code, url_parms, Catagory2_parms):
def InsterInto(short_name, social_code, name_pdf, pub_time, pdf_url, report_type):
inster = False
sel_sql = '''select social_credit_code,source_address from brpa_source_article where social_credit_code = %s and source_address = %s'''
sel_sql = '''select social_credit_code,source_address from brpa_source_article where social_credit_code = %s and source_address = %s and origin='证监会' and type='1' '''
cursor.execute(sel_sql, (social_code, pdf_url))
selects = cursor.fetchone()
if selects:
......
import json
import json
......@@ -218,7 +218,10 @@ def job():
list_news = []
href_type = type.find('a')['href']
ting_type = type.find('a').text
try:
relationId = mapId_dic[ting_type]
except:
continue
work(href_type,ting_type,relationId)
num += 1
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论