提交 c0830237 作者: 薛凌堃

微信公众号

上级 540a101a
......@@ -12,9 +12,10 @@ from base.smart import smart_extractor
# import BaseCore
# from smart import smart_extractor
import urllib3
from getTycId import getTycIdByXYDM
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
jieba.cut("必须加载jieba")
# 初始化,设置中文分词
smart =smart_extractor.SmartExtractor('cn')
baseCore = BaseCore()
log = baseCore.getLogger()
......@@ -36,7 +37,7 @@ headers = {
'Referer': 'https://www.tianyancha.com/',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Edg/114.0.1823.51'
}
taskType = '企业动态/天眼查'
taskType = '企业动态/天眼查/福布斯'
def beinWork(tyc_code, social_code):
start_time = time.time()
time.sleep(3)
......@@ -132,28 +133,27 @@ def beinWork(tyc_code, social_code):
link = info_page['uri']
try:
sel_sql = '''select social_credit_code from brpa_source_article where source_address = %s and social_credit_code=%s and type='2' '''
cursor.execute(sel_sql, (link, social_code))
cursor_.execute(sel_sql, (link, social_code))
except Exception as e:
print(e)
selects = cursor.fetchone()
selects = cursor_.fetchone()
if selects:
log.info(f'{tyc_code}-----{social_code}----{link}:已经存在')
# continue
# todo:如果该条数据存在则说明该条数据之后的都已经采集完成,就可以跳出函数,执行下一个企业
retData['up_okCount'] = up_okCount
retData['up_errorCount'] = up_errorCount
retData['up_repetCount'] = up_repetCount
return retData
try:
time_struct = time.localtime(int(info_page['rtm'] / 1000)) # 首先把时间戳转换为结构化时间
time_format = time.strftime("%Y-%m-%d %H-%M-%S", time_struct) # 把结构化时间转换为格式化时间
time_format = time.strftime("%Y-%m-%d %H:%M:%S", time_struct) # 把结构化时间转换为格式化时间
except:
time_format = baseCore.getNowTime(1)
try:
# 开始进行智能解析
lang = baseCore.detect_language(title)
smart = smart_extractor.SmartExtractor(lang)
# lang = baseCore.detect_language(title)
# smart = smart_extractor.SmartExtractor(lang)
contentText = smart.extract_by_url(link).text
# time.sleep(3)
except Exception as e:
......@@ -173,95 +173,29 @@ def beinWork(tyc_code, social_code):
pass
continue
try:
insert_sql = '''insert into brpa_source_article(social_credit_code,title,summary,content,publish_date,source_address,origin,author,type,lang) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
#todo:更换插入的库
insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,author,type) values(%s,%s,%s,%s,%s)'''
# 动态信息列表
up_okCount = up_okCount + 1
list_info = [
social_code,
title,
info_page['abstracts'], # 摘要
contentText, # 正文
time_format, # 发布时间
link,
'天眼查',
source,
'2',
'zh'
]
cursor.execute(insert_sql, tuple(list_info))
cnx.commit()
cursor_.execute(insert_sql, tuple(list_info))
cnx_.commit()
# 采集一条资讯记录一条,记录该企业采到了多少的资讯
log.info(f'{social_code}----{link}:新增一条')
sel_sql = "select article_id from brpa_source_article where source_address = %s and social_credit_code = %s"
cursor.execute(sel_sql, (link, social_code))
row = cursor.fetchone()
id = row[0]
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# todo:插入一条数据,并传入kafka
dic_news = {
'attachmentIds': id,
'author': '',
'content': contentText,
'contentWithTag': contentText,
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': 'zh',
'origin': '天眼查',
'publishDate': time_format,
'sid': '1684032033495392257',
'sourceAddress': link, # 原文链接
'summary': info_page['abstracts'],
'title': contentText,
'type': 2,
'socialCreditCode': social_code,
'year': time_format[:4]
}
except Exception as e:
log.info(f'传输失败:{social_code}----{link}')
e = '数据库传输失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, link, e)
continue
# print(dic_news)
# 将相应字段通过kafka传输保存
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("researchReportTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
dic_result = {
'success': 'ture',
'message': '操作成功',
'code': '200',
}
log.info(dic_result)
# 传输成功,写入日志中
state = 1
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, link, '')
# return True
except Exception as e:
dic_result = {
'success': 'false',
'message': '操作失败',
'code': '204',
'e': e
}
log.error(dic_result)
e = 'Kafka操作失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, link, e)
log.info(f"获取分页数据--{tyc_code}----分页{num},耗时{baseCore.getTimeCost(start_page, time.time())}")
retData['up_okCount'] = up_okCount
retData['up_errorCount'] = up_errorCount
retData['up_repetCount'] = up_repetCount
......@@ -270,56 +204,65 @@ def beinWork(tyc_code, social_code):
def doJob():
while True:
# 获取企业信息
query = "SELECT * FROM Tfbs_bak where col3 is not null and length(col3)>3 and col6 not like '%HK%' and col3 not like 'ZZSN%' and state3 is null limit 1 "
# 兴业银行
# query = "SELECT * FROM Tfbs where col3 is not null and length(col3)>3 and col3 not like 'ZZSN%' and col5='兴业银行'"
cursor_.execute(query)
row = cursor_.fetchone()
if row:
pass
else:
print('没有数据了,结束脚本')
break
start = time.time()
# 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息
social_code = baseCore.redicPullData('NewsEnterpriseFbs:gnqy_socialCode')
if social_code == None:
time.sleep(20)
continue
start = time.time()
try:
data = baseCore.getInfomation(social_code)
if len(data) != 0:
pass
else:
# 数据重新塞入redis
baseCore.rePutIntoR('NewsEnterpriseFbs:gnqy_socialCode', social_code)
continue
id = data[0]
xydm = data[2]
tycid = data[11]
if tycid == None:
try:
retData = getTycIdByXYDM(xydm)
tycid = retData['tycData']['id']
# todo:写入数据库
updateSql = f"update Enterprise set TYCID = '{tycid}' where SocialCode = '{xydm}'"
cursor_.execute(updateSql)
cnx_.commit()
except:
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', '获取天眼查id失败')
baseCore.rePutIntoR('NewsEnterprise:gnqy_socialCode', social_code)
continue
count = data[17]
log.info(f"{id}---{xydm}----{tycid}----开始处理")
start_time = time.time()
# 开始采集企业动态
retData = beinWork(tycid, xydm)
tycid = row[16]
com_name = row[6]
xydm = row[4]
code = row[7]
count = 0
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
#0 表示拿取数据
updateBeginSql = f"update Tfbs_bak set state3='0',date3='{time_now}' where col3='{xydm}' "
# print(updateBeginSql)
cursor_.execute(updateBeginSql)
cnx_.commit()
log.info(f"{id}---{xydm}----{tycid}----开始处理")
start_time = time.time()
# baseCore.updateRun(xydm, runType, count)
total = retData['total']
up_okCount = retData['up_okCount']
up_errorCount = retData['up_errorCount']
up_repetCount = retData['up_repetCount']
log.info(f"{id}---{xydm}----{tycid}----结束处理,耗时{baseCore.getTimeCost(start_time, time.time())}---总数:{total}---成功数:{up_okCount}----失败数:{up_errorCount}--重复数:{up_repetCount}")
except Exception as e:
log.info(f'==={social_code}=====获取企业信息失败====')
# 重新塞入redis
baseCore.rePutIntoR('NewsEnterpriseFbs:gnqy_socialCode', social_code)
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', f'获取企业信息失败--{e}')
time.sleep(5)
# 开始采集企业动态
retData = beinWork(tycid, xydm)
# 信息采集完成后将该企业的采集次数更新
runType = 'NewsRunCount'
count += 1
# baseCore.updateRun(xydm, runType, count)
total = retData['total']
up_okCount = retData['up_okCount']
up_errorCount = retData['up_errorCount']
up_repetCount = retData['up_repetCount']
log.info(
f"{id}---{xydm}----{tycid}----结束处理,耗时{baseCore.getTimeCost(start_time, time.time())}---总数:{total}---成功数:{up_okCount}----失败数:{up_errorCount}--重复数:{up_repetCount}")
# 200 表示成功
updateBeginSql = f"update Tfbs_bak set state3='200',date3='{time_now}' where col3='{xydm}' "
# print(updateBeginSql)
cursor_.execute(updateBeginSql)
cnx_.commit()
cursor.close()
cnx.close()
# 释放资源
baseCore.close()
# Press the green button in the gutter to run the script.
#state3
if __name__ == '__main__':
doJob()
......
......@@ -149,7 +149,7 @@ def beinWork(tyc_code, social_code,start_time):
return retData
try:
time_struct = time.localtime(int(info_page['rtm'] / 1000)) # 首先把时间戳转换为结构化时间
time_format = time.strftime("%Y-%m-%d %H-%M-%S", time_struct) # 把结构化时间转换为格式化时间
time_format = time.strftime("%Y-%m-%d %H:%M:%S", time_struct) # 把结构化时间转换为格式化时间
except:
time_format = baseCore.getNowTime(1)
try:
......@@ -270,20 +270,24 @@ def beinWork(tyc_code, social_code,start_time):
return retData
# 日志信息保存至现已创建好数据库中,因此并没有再对此前保存日志信息数据库进行保存
def doJob():
while True:
# 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息
social_code = baseCore.redicPullData('NewsEnterprise:gnqy_socialCode')
# 判断 如果Redis中已经没有数据,则等待
if social_code == 'None':
if social_code == None:
time.sleep(20)
continue
start = time.time()
try:
data = baseCore.getInfomation(social_code)
if len(data) != 0:
pass
else:
#数据重新塞入redis
baseCore.rePutIntoR('NewsEnterprise:gnqy_socialCode',social_code)
continue
id = data[0]
xydm = data[2]
tycid = data[11]
......@@ -299,7 +303,7 @@ def doJob():
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', '获取天眼查id失败')
baseCore.rePutIntoR(social_code)
baseCore.rePutIntoR('NewsEnterprise:gnqy_socialCode',social_code)
continue
count = data[17]
log.info(f"{id}---{xydm}----{tycid}----开始处理")
......@@ -321,20 +325,20 @@ def doJob():
up_repetCount = retData['up_repetCount']
log.info(
f"{id}---{xydm}----{tycid}----结束处理,耗时{baseCore.getTimeCost(start_time, time.time())}---总数:{total}---成功数:{up_okCount}----失败数:{up_errorCount}--重复数:{up_repetCount}")
except:
except Exception as e:
log.info(f'==={social_code}=====获取企业信息失败====')
#重新塞入redis
baseCore.rePutIntoR('NewsEnterprise:gnqy_socialCode',social_code)
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', '获取企业信息失败')
baseCore.recordLog(social_code, taskType, state, takeTime, '', f'获取企业信息失败--{e}')
time.sleep(5)
cursor.close()
cnx.close()
# 释放资源
baseCore.close()
# Press the green button in the gutter to run the script.
if __name__ == '__main__':
doJob()
......
from base import BaseCore
# 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息
import time
baseCore = BaseCore.BaseCore()
# log = baseCore.getLogger()
r = baseCore.r
# social_code = baseCore.redicPullData('NewsEnterprise:gwqy_socialCode')
#
# # 判断 如果Redis中已经没有数据,则等待
# if not social_code:
#
# print('1:')
# if social_code == 'None':
#
# print('2')
#
# if social_code == None:
# print('3:')
#9131000010000595XD 91100000100003962T
# item = '91110000101690725E'
item = '91100000100003962T'
list = ['86',
'122',
'142',
'168',
'270',
'282',
'364',
'407',
'416',
'563',
'566',
'588',
'595',
'597',
'737',
'750',
'810',
'838',
'860',
'875',
'885',
'886',
'1003',
'1250',
'1272',
'1362',
'1379',
'1381',
'1382',
'1392',
'1476',
'1583',
'1639',
'1748',
'1764',
'1775',
'1801',
'1839',
'2018',
'2260',
'2356',
'2471',
'2563',
'2703',
'2800',
'2815',
'2934',
'3162',
'3376',
'3474',
'3737',
'3782',
'3939',
'4118',
'4509',
'4675',
'4801',
'4818',
'4943',
'5149',
'5195',
'5429',
'7023',
'7025',
'7026',
'7039',
'7053',
'7058',
'7059',
'7060',
'7062',
'7066',
'7067',
'7069',
'7073',
'7077',
'7083',
'7091',
'7095',
'7103',
'7105',
'7107',
'7109',
'7110',
'7113',
'7131',
'7135',
'7136',
'7138',
'7140',
'7141',
'7142',
'7143',
'7144',
'7145',
'7147',
'7150',
'7151',
'7152',
'7156',
'7157',
'7160',
'7162',
'7165',
'7169',
'7174',
'7180',
'7187',
'7193',
'7197',
'7198',
'7199',
'7201',
'7202',
'7203',
'7204',
'7205',
'7206',
'7208',
'7209',
'7211',
'7212',
'7213',
'7214']
for item in list:
r.rpush('NewsEnterpriseFbs:gnqy_socialCode', item)
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论