提交 e96f6a29 作者: 薛凌堃

国外企业基本信息-高管信息-企业动态

上级 456ba4fa
# 雅虎财经企业动态获取
# 雅虎财经企业动态获取
# 雅虎财经企业动态获取
import json
import time
import pymysql
from kafka import KafkaProducer
from selenium.webdriver.common.by import By
from base.BaseCore import BaseCore
......@@ -46,23 +48,86 @@ def getZx(xydm,url,title,cnx,path):
'2',
'zh'
]
with cnx.cursor() as cursor:
try:
insert_sql = '''insert into brpa_source_article(social_credit_code,title,summary,content,publish_date,source_address,origin,author,type,lang) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
cursor.execute(insert_sql, tuple(list_info))
cnx.commit()
except Exception as e1:
log.error("保存数据库失败")
e1 = str(e1) + '.........保存数据库失败'
return e1
log.info(f"文章耗时,耗时{baseCore.getTimeCost(start_time_content,time.time())}")
except Exception as e:
try:
insert_sql = '''insert into brpa_source_article(social_credit_code,title,summary,content,publish_date,source_address,origin,author,type,lang) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
cursor.execute(insert_sql, tuple(list_info))
cnx.commit()
except Exception as e1:
log.error("保存数据库失败")
e1 = str(e1) + '.........保存数据库失败'
return e1
log.info(f"文章耗时,耗时{baseCore.getTimeCost(start_time_content, time.time())}")
try:
sel_sql = "select article_id from brpa_source_article where source_address = %s and social_credit_code = %s"
cursor.execute(sel_sql, (url, social_code))
row = cursor.fetchone()
id = row[0]
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# todo:插入一条数据,并传入kafka
dic_news = {
'attachmentIds': id,
'author': '',
'content': content,
'contentWithTag': content,
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': 'zh',
'origin': '天眼查',
'publishDate': pub_time,
'sid': '1684032033495392257',
'sourceAddress': url, # 原文链接
'summary': '',
'title': title,
'type': 2,
'socialCreditCode': social_code,
'year': pub_time[:4]
}
# print(dic_news)
# 将相应字段通过kafka传输保存
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("researchReportTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
dic_result = {
'success': 'ture',
'message': '操作成功',
'code': '200',
}
log.info(dic_result)
# 传输成功,写入日志中
state = 1
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, '')
# return True
except Exception as e:
dic_result = {
'success': 'false',
'message': '操作失败',
'code': '204',
'e': e
}
log.error(dic_result)
e = str(e) + '操作失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, e)
except Exception as e:
log.info(f'传输失败:{social_code}----{url}')
e = '传输失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, e)
except Exception as e:
log.error("获取正文失败")
e = str(e)+'.........获取正文失败'
e = str(e) + '.........获取正文失败'
return e
return ''
# 拖拽30次获取企业新闻
def scroll(driver):
......@@ -76,7 +141,7 @@ if __name__ == "__main__":
path = r'D:\chrome\chromedriver.exe'
driver = baseCore.buildDriver(path)
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
cursor = cnx.cursor()
while True:
# 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息
social_code= baseCore.redicPullData(2)
......@@ -131,17 +196,16 @@ if __name__ == "__main__":
else:
continue
#判断url是否已经存在
with cnx.cursor() as cursor:
sel_sql = '''select social_credit_code from brpa_source_article where source_address = %s and social_credit_code=%s '''
cursor.execute(sel_sql, (news_url,xydm))
selects = cursor.fetchall()
if selects:
log.error(f"{name}--{gpdm}--网址已经存在----{news_url}")
e = '网址已存在'
state = 0
takeTime = baseCore.getTimeCost(start_time,time.time())
baseCore.recordLog(xydm,taskType,state,takeTime,news_url,e)
continue
sel_sql = '''select social_credit_code from brpa_source_article where source_address = %s and social_credit_code=%s '''
cursor.execute(sel_sql, (news_url,xydm))
selects = cursor.fetchall()
if selects:
log.error(f"{name}--{gpdm}--网址已经存在----{news_url}")
e = '网址已存在'
state = 0
takeTime = baseCore.getTimeCost(start_time,time.time())
baseCore.recordLog(xydm,taskType,state,takeTime,news_url,e)
continue
title = a_ele.text.lstrip().strip().replace("'","''")
e = getZx(xydm,news_url,title,cnx,path)
if e == '':
......@@ -159,5 +223,7 @@ if __name__ == "__main__":
count += 1
baseCore.updateRun(social_code,runType,count)
cursor.close()
cnx.close()
#释放资源
baseCore.close()
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论