提交 32c31bd1 作者: 薛凌堃

天眼查

上级 623b6803
......@@ -468,6 +468,7 @@ class BaseCore:
except:
log = self.getLogger()
log.info('=========数据库操作失败========')
return data
# 更新企业采集次数
......@@ -520,6 +521,13 @@ class BaseCore:
token = self.cursor.fetchone()[0]
return token
#获取天眼查token
def GetTYCToken(self):
query = 'select token from TYC_token'
self.cursor.execute(query)
token = self.cursor.fetchone()[0]
return token
#检测语言
def detect_language(self, text):
# 使用langid.py判断文本的语言
......
......@@ -315,12 +315,13 @@ def FBS():
for item in gw_social_list:
r.rpush('NewsEnterpriseFbs:gwqy_socialCode', item)
r.rpush('BaseInfoEnterpriseFbs:gwqy_social_code',item)
for item in gn_social_list:
if not r.exists(item):
r.rpush('NewsEnterpriseFbs:gnqy_socialCode', item)
r.rpush('NoticeEnterpriseFbs:gnqy_socialCode',item)
r.rpush('BaseInfoEnterpriseFbs:gnqy_social_code',item)
# r.rpush('BaseInfoEnterpriseFbs:gwqy_social_code',item)
# for item in gn_social_list:
# if not r.exists(item):
# r.rpush('NewsEnterpriseFbs:gnqy_socialCode', item)
# r.rpush('CorPersonEnterpriseFbs:gnqy_socialCode', item)
# r.rpush('NoticeEnterpriseFbs:gnqy_socialCode',item)
# r.rpush('BaseInfoEnterpriseFbs:gnqy_social_code',item)
closeSql(cnx,cursor)
#将IPO的国外股票代码放到redis中
......
#补充剩余核心人员信息
#先采集天眼查id,再通过id采集核心人员信息
import datetime
import json
import requests,time,random
import pandas as pd
from bs4 import BeautifulSoup
import urllib3
from base.BaseCore import BaseCore
from getTycId import getTycIdByXYDM
baseCore = BaseCore()
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
log = baseCore.getLogger()
headers = {
'Cookie':'HWWAFSESID=0e10b77869899be8365; HWWAFSESTIME=1688781923708; csrfToken=VeTF4UIZKJ0q6yWmgfC_FLqv; TYCID=e7cec7501d3311eea9dcb9fb7af79aad; ssuid=3142278034; sajssdk_2015_cross_new_user=1; bannerFlag=true; _ga=GA1.2.1006597844.1688781929; _gid=GA1.2.146077413.1688781929; Hm_lvt_e92c8d65d92d534b0fc290df538b4758=1688781929; tyc-user-info={%22state%22:%220%22%2C%22vipManager%22:%220%22%2C%22mobile%22:%2217103123002%22}; tyc-user-info-save-time=1688781977329; auth_token=eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiIxNzEwMzEyMzAwMiIsImlhdCI6MTY4ODc4MTk3NiwiZXhwIjoxNjkxMzczOTc2fQ.Luw0DCFul8WxRNOM8X5-NCmy_z3BwJC5JBvofWqWkSQOleJ6zJU0SRbqwAobPfOfVyGFDUBqmxxWd4YKCeCWeQ; tyc-user-phone=%255B%252217103123002%2522%255D; searchSessionId=1688778331.16177575; sensorsdata2015jssdkcross=%7B%22distinct_id%22%3A%22302953956%22%2C%22first_id%22%3A%22189333f38cb947-0fb9b252742a6c-26031d51-921600-189333f38cdcdd%22%2C%22props%22%3A%7B%22%24latest_traffic_source_type%22%3A%22%E7%9B%B4%E6%8E%A5%E6%B5%81%E9%87%8F%22%2C%22%24latest_search_keyword%22%3A%22%E6%9C%AA%E5%8F%96%E5%88%B0%E5%80%BC_%E7%9B%B4%E6%8E%A5%E6%89%93%E5%BC%80%22%2C%22%24latest_referrer%22%3A%22%22%7D%2C%22identities%22%3A%22eyIkaWRlbnRpdHlfY29va2llX2lkIjoiMTg5MzMzZjM4Y2I5NDctMGZiOWIyNTI3NDJhNmMtMjYwMzFkNTEtOTIxNjAwLTE4OTMzM2YzOGNkY2RkIiwiJGlkZW50aXR5X2xvZ2luX2lkIjoiMzAyOTUzOTU2In0%3D%22%2C%22history_login_id%22%3A%7B%22name%22%3A%22%24identity_login_id%22%2C%22value%22%3A%22302953956%22%7D%2C%22%24device_id%22%3A%22189333f38cb947-0fb9b252742a6c-26031d51-921600-189333f38cdcdd%22%7D; Hm_lpvt_e92c8d65d92d534b0fc290df538b4758=1688781980',
# 'Cookie': 'TYCID=82cbe530204b11ed9f23298cecec1c60; ssuid=3927938144; _ga=GA1.2.1842488970.1670638075; jsid=SEO-BAIDU-ALL-SY-000001; tyc-user-info={%22state%22:%220%22%2C%22vipManager%22:%220%22%2C%22mobile%22:%2215565837784%22}; tyc-user-info-save-time=1678953978429; auth_token=eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiIxNTU2NTgzNzc4NCIsImlhdCI6MTY3ODk1Mzk3OCwiZXhwIjoxNjgxNTQ1OTc4fQ.wsNxLWMkZVrtOEvo_CCDPD38R7F23c5yk7dFAdHkwFPkZhEEvmiv0nlt7UD0ZWfo3t8aYxc4qvu4ueEgMubJ5g; tyc-user-phone=%255B%252215565837784%2522%255D; sensorsdata2015jssdkcross=%7B%22distinct_id%22%3A%22284710084%22%2C%22first_id%22%3A%22182b9ca585ead-089598c1d7f7928-26021d51-1327104-182b9ca585f7f1%22%2C%22props%22%3A%7B%22%24latest_traffic_source_type%22%3A%22%E8%87%AA%E7%84%B6%E6%90%9C%E7%B4%A2%E6%B5%81%E9%87%8F%22%2C%22%24latest_search_keyword%22%3A%22%E6%9C%AA%E5%8F%96%E5%88%B0%E5%80%BC%22%2C%22%24latest_referrer%22%3A%22https%3A%2F%2Fwww.baidu.com%2Flink%22%7D%2C%22identities%22%3A%22eyIkaWRlbnRpdHlfbG9naW5faWQiOiIyODQ3MTAwODQiLCIkaWRlbnRpdHlfY29va2llX2lkIjoiMTgyYjljYTU4NWVhZC0wODk1OThjMWQ3Zjc5MjgtMjYwMjFkNTEtMTMyNzEwNC0xODJiOWNhNTg1ZjdmMSJ9%22%2C%22history_login_id%22%3A%7B%22name%22%3A%22%24identity_login_id%22%2C%22value%22%3A%22284710084%22%7D%2C%22%24device_id%22%3A%22182b9ca585ead-089598c1d7f7928-26021d51-1327104-182b9ca585f7f1%22%7D; HWWAFSESID=fa776898fa88a6520ea; HWWAFSESTIME=1679899464128; csrfToken=m3cB6mHsznwIuppkT-S8oYc6; Hm_lvt_e92c8d65d92d534b0fc290df538b4758=1679016180,1679471093,1679732923,1679899468; bdHomeCount=28; bannerFlag=true; show_activity_id_92=92; searchSessionId=1679899783.48494979; Hm_lpvt_e92c8d65d92d534b0fc290df538b4758=1679899783',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36',
}
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
list_all_1 = []
list_all_2 = []
taskType = '天眼查/核心人员/福布斯'
def doJob():
while True:
# 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息
social_code = baseCore.redicPullData('CorPersonEnterpriseFbs:gnqy_socialCode')
# 判断 如果Redis中已经没有数据,则等待
# social_code = 'ZZSN23011300000004'
if social_code == None:
time.sleep(20)
continue
start = time.time()
try:
data = baseCore.getInfomation(social_code)
if len(data) != 0:
pass
else:
#数据重新塞入redis
baseCore.rePutIntoR('CorPersonEnterpriseFbs:gnqy_socialCode',social_code)
continue
id = data[0]
xydm = data[2]
tycid = data[11]
if tycid == None:
try:
retData = getTycIdByXYDM(xydm)
tycid = retData['tycData']['id']
#todo:写入数据库
updateSql = f"update Enterprise set TYCID = '{tycid}' where SocialCode = '{xydm}'"
cursor_.execute(updateSql)
cnx_.commit()
except:
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', '获取天眼查id失败')
baseCore.rePutIntoR('CorPersonEnterpriseFbs:gnqy_socialCode',social_code)
continue
count = data[17]
log.info(f"{id}---{xydm}----{tycid}----开始采集核心人员")
list_one_info = []
num = 1
for page in range(1,2):
t = int(time.time()*1000)
#https://capi.tianyancha.com/cloud-listed-company/listed/getHkNoRepeatSeniorExecutive?_=1692929256462&gid=209370942&pageSize=20&pageNum=1
url = f'https://capi.tianyancha.com/cloud-listed-company/listed/noRepeatSeniorExecutive?_={t}&gid={tycid}&pageSize=20&pageNum={page}'
ip = baseCore.get_proxy()
res = requests.get(url,headers=headers,proxies=ip) # ,verify=False
time.sleep(1)
list_all = res.json()['data']['dataList']
if list_all:
for one_info in list_all:
name = one_info['name']
sex = one_info['sex']
education = one_info['education']
position = one_info['position']
Salary = one_info['salary']
#todo:获取当前年份
now = datetime.datetime.now()
year = now.year
try:
birthYear = year - int(one_info['age'])
except:
birthYear = ''
StockKeepings = one_info['numberOfShares']
currentTerm = one_info['term']
personInfo = one_info['resume']
try:
person_img = one_info['logo']
except:
person_img = '--'
dic_json = {
"socialCreditCode":social_code,
"name":name,
"sex":sex,
"education":education,
"position":position,
"salary":Salary,
"birthYear":birthYear,
"shareNum":StockKeepings,
"shareRatio":'',
"benefitShare":'',
"currentTerm":currentTerm,
"personInfo":personInfo,
"sort":str(num)
}
dic_json_img = {
"socialCreditCode":social_code,
"name":name,
"sex":sex,
"education":education,
"position":position,
"salary":Salary,
"birthYear":birthYear,
"shareNum":StockKeepings,
"shareRatio":'',
"benefitShare":'',
"currentTerm":currentTerm,
"personInfo":personInfo,
"头像":person_img,
"sort":str(num)
}
num = num+1
list_one_info.append(dic_json)
# list_all_2.append(dic_json_img)
else:
t = int(time.time() * 1000)
url = f'https://capi.tianyancha.com/cloud-listed-company/listed/getHkNoRepeatSeniorExecutive?_={t}&gid={tycid}&pageSize=20&pageNum={page}'
ip = baseCore.get_proxy()
res = requests.get(url, headers=headers, proxies=ip) # ,verify=False
time.sleep(1)
list_all = res.json()['data']['dataList']
if list_all:
for one_info in list_all:
name = one_info['personal_name']
sex = one_info['gender2']
education = ''
position = one_info['position_name']
Salary = ''
birthYear = ''
personInfo = one_info['resume_cn']
dic_json = {
"socialCreditCode": social_code,
"name": name,
"sex": sex,
"education": education,
"position": position,
"salary": Salary,
"birthYear": birthYear,
"shareNum": '',
"shareRatio": '',
"benefitShare": '',
"currentTerm": '',
"personInfo": personInfo,
"sort": str(num)
}
num = num + 1
list_one_info.append(dic_json)
else:
t = int(time.time() * 1000)
url = f'https://capi.tianyancha.com/cloud-company-background/company/dim/staff?_={t}&gid={tycid}&pageSize=20&pageNum={page}'
ip = baseCore.get_proxy()
res = requests.get(url, headers=headers, proxies=ip) # ,verify=False
time.sleep(1)
list_all = res.json()['data']['result']
# todo:增加一种情况
if list_all:
for one_info in list_all:
name = one_info['name']
try:
sex = one_info['sex']
except:
sex = ''
try:
education = one_info['education']
except:
education = ''
try:
position = one_info['typeSore']
except:
position = ''
try:
Salary = one_info['salary']
except:
Salary = ''
birthYear = ''
try:
shareRatio = one_info['percent']
except:
shareRatio = ''
try:
benefitShare = one_info['finalBenefitShares']
except:
benefitShare = ''
try:
currentTerm = one_info['term']
except:
currentTerm = ''
person_id = one_info['id']
person_url = f'https://www.tianyancha.com/human/{person_id}-c{tycid}'
person_res = requests.get(person_url, headers=headers, proxies=ip)
person_soup = BeautifulSoup(person_res.content, 'html.parser')
try:
personInfo = person_soup.find('span', {'class': '_56d0a'}).text.strip()
except:
personInfo = ''
try:
person_img = one_info['logo']
except:
person_img = '--'
dic_json = {
"socialCreditCode": social_code,
"name": name,
"sex": sex,
"education": education,
"position": position,
"salary": Salary,
"birthYear": birthYear,
"shareNum": '',
"shareRatio": shareRatio,
"benefitShare": benefitShare,
"currentTerm": currentTerm,
"personInfo": personInfo,
"sort": str(num)
}
dic_json_img = {
"socialCreditCode": social_code,
"name": name,
"sex": sex,
"education": education,
"position": position,
"salary": Salary,
"birthYear": birthYear,
"shareNum": '',
"shareRatio": shareRatio,
"benefitShare": benefitShare,
"currentTerm": '',
"personInfo": personInfo,
"头像": person_img,
"sort": str(num)
}
num = num + 1
list_one_info.append(dic_json)
json_updata = json.dumps(list_one_info)
if json_updata == '[]':
continue
else:
pass
response = requests.post('http://114.115.236.206:8088/sync/executive',data=json_updata,timeout=300, verify=False)
print(response.text)
log.info('=========成功======')
except Exception as e:
log.info(f'==={social_code}=====企业核心人员采集失败====')
# 重新塞入redis
baseCore.rePutIntoR('CorPersonEnterpriseFbs:gnqy_socialCode', social_code)
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', f'获取企业信息失败--{e}')
time.sleep(5)
# break
# df_img = pd.DataFrame(list_all_2)
# df_img.to_excel('企业主要人员-头像.xlsx',index=False)
if __name__ == "__main__":
doJob()
\ No newline at end of file
......@@ -19,7 +19,7 @@ jieba.cut("必须加载jieba")
smart =smart_extractor.SmartExtractor('cn')
baseCore = BaseCore()
log = baseCore.getLogger()
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
cursor= cnx.cursor()
cnx_ = baseCore.cnx
......@@ -37,7 +37,7 @@ headers = {
'Referer': 'https://www.tianyancha.com/',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Edg/114.0.1823.51'
}
taskType = '企业动态/天眼查/福布斯'
taskType = '企业动态/天眼查'
def beinWork(tyc_code, social_code):
start_time = time.time()
time.sleep(3)
......@@ -154,11 +154,14 @@ def beinWork(tyc_code, social_code):
# 开始进行智能解析
# lang = baseCore.detect_language(title)
# smart = smart_extractor.SmartExtractor(lang)
contentText = smart.extract_by_url(link).text
#带标签正文
contentWithTag = smart.extract_by_url(link).text
#不带标签正文
content = smart.extract_by_url(link).cleaned_text
# time.sleep(3)
except Exception as e:
contentText = ''
if contentText == '':
contentWithTag = ''
if contentWithTag == '':
log.error(f'获取正文失败:--------{tyc_code}--------{num}--------{link}')
e = '获取正文失败'
state = 0
......@@ -174,7 +177,7 @@ def beinWork(tyc_code, social_code):
continue
try:
#todo:更换插入的库
insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,author,type) values(%s,%s,%s,%s,%s)'''
insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,create_time) values(%s,%s,%s,%s,now())'''
# 动态信息列表
up_okCount = up_okCount + 1
......@@ -182,14 +185,73 @@ def beinWork(tyc_code, social_code):
social_code,
link,
'天眼查',
source,
'2',
]
cursor_.execute(insert_sql, tuple(list_info))
cnx_.commit()
# 采集一条资讯记录一条,记录该企业采到了多少的资讯
log.info(f'{social_code}----{link}:新增一条')
# 采集一条资讯记录一条,记录该企业采到了多少的资讯
log.info(f'{social_code}----{link}:新增一条')
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# todo:插入一条数据,并传入kafka
dic_news = {
'attachmentIds': '',
'author': '',
'content': content,
'contentWithTag': contentWithTag,
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': 'zh',
'origin': '天眼查',
'publishDate': time_format,
'sid': '1684032033495392257',
'sourceAddress': link, # 原文链接
'summary': info_page['abstracts'],
'title': title,
'type': 2,
'socialCreditCode': social_code,
'year': time_format[:4]
}
except Exception as e:
log.info(f'传输失败:{social_code}----{link}')
# e = '数据库传输失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, link, e)
continue
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("researchReportTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
dic_result = {
'success': 'ture',
'message': '操作成功',
'code': '200',
}
log.info(dic_result)
# 传输成功,写入日志中
state = 1
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, link, '')
# return True
except Exception as e:
dic_result = {
'success': 'false',
'message': '操作失败',
'code': '204',
'e': e
}
log.error(dic_result)
e = 'Kafka操作失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, link, e)
......@@ -205,8 +267,9 @@ def doJob():
while True:
start = time.time()
# 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息
social_code = baseCore.redicPullData('NewsEnterpriseFbs:gnqy_socialCode')
# 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息 天眼查ID19276488
# social_code = baseCore.redicPullData('NewsEnterpriseFbs:gnqy_socialCode')
social_code = '912301001275921118'
if social_code == None:
time.sleep(20)
continue
......@@ -222,19 +285,25 @@ def doJob():
id = data[0]
xydm = data[2]
tycid = data[11]
if tycid == None:
if tycid == None or tycid== '':
try:
retData = getTycIdByXYDM(xydm)
tycid = retData['tycData']['id']
# todo:写入数据库
updateSql = f"update Enterprise set TYCID = '{tycid}' where SocialCode = '{xydm}'"
cursor_.execute(updateSql)
cnx_.commit()
if retData:
tycid = retData['id']
# todo:写入数据库
updateSql = f"update EnterpriseInfo set TYCID = '{tycid}' where SocialCode = '{xydm}'"
cursor_.execute(updateSql)
cnx_.commit()
else:
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', '获取天眼查id失败')
baseCore.rePutIntoR('NewsEnterpriseFbs:gnqy_socialCode', social_code)
except:
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', '获取天眼查id失败')
baseCore.rePutIntoR('NewsEnterprise:gnqy_socialCode', social_code)
baseCore.rePutIntoR('NewsEnterpriseFbs:gnqy_socialCode', social_code)
continue
count = data[17]
log.info(f"{id}---{xydm}----{tycid}----开始处理")
......@@ -242,8 +311,10 @@ def doJob():
# 开始采集企业动态
retData = beinWork(tycid, xydm)
# baseCore.updateRun(xydm, runType, count)
# 信息采集完成后将该企业的采集次数更新
runType = 'NewsRunCount'
count += 1
baseCore.updateRun(xydm, runType, count)
total = retData['total']
up_okCount = retData['up_okCount']
up_errorCount = retData['up_errorCount']
......@@ -257,7 +328,7 @@ def doJob():
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', f'获取企业信息失败--{e}')
time.sleep(5)
# break
cursor.close()
cnx.close()
# 释放资源
......
......@@ -10,9 +10,15 @@ from base.BaseCore import BaseCore
requests.adapters.DEFAULT_RETRIES = 5
baseCore = BaseCore()
log = baseCore.getLogger()
headers={
'X-AUTH-TOKEN':'eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiIxMzY4MzgxNjk4NCIsImlhdCI6MTY5MDE3ODYyOCwiZXhwIjoxNjkyNzcwNjI4fQ.VV3Zoa4RM5nVN8UXBc0-81KMGqLzTOme6rButeETGfFQi7p5h4ydg8CFrEsizr_iFwB3_BVaKR2o2xR-M4ipbQ',
'X-TYCID':'77e997401d5f11ee9e91d5a0fd3c0b83',
# headers={
# 'X-AUTH-TOKEN':'eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiIxMzY4MzgxNjk4NCIsImlhdCI6MTY5MDE3ODYyOCwiZXhwIjoxNjkyNzcwNjI4fQ.VV3Zoa4RM5nVN8UXBc0-81KMGqLzTOme6rButeETGfFQi7p5h4ydg8CFrEsizr_iFwB3_BVaKR2o2xR-M4ipbQ',
# 'X-TYCID':'77e997401d5f11ee9e91d5a0fd3c0b83',
# 'version':'TYC-Web',
# 'Content-Type':'application/json;charset=UTF-8'
# }
headers = {
'X-TYCID':'30c1289042f511ee9182cd1e1bcaa517',
# 'X-AUTH-TOKEN': 'eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiIxMzU5MjQ4MTgzOSIsImlhdCI6MTY5MjkzMzIxMiwiZXhwIjoxNjk1NTI1MjEyfQ.BKxDem8fpgeDHrIgm3qCoF76ueHtQSG1DggiTl4FAaoNKt4gem6NTX1XYndPXqVj9TXfl-8yp2kKE3jY66dyig',
'version':'TYC-Web',
'Content-Type':'application/json;charset=UTF-8'
}
......@@ -27,6 +33,7 @@ def getTycIdByXYDM(xydm):
paramJsonData = {'keyword':xydm}
try:
headers['User-Agent'] = baseCore.getRandomUserAgent()
headers['X-AUTH-TOKEN'] = baseCore.GetTYCToken()
response = requests.post(url,json=paramJsonData,headers=headers,verify=False, proxies=ip)
time.sleep(random.randint(3, 5))
retJsonData =json.loads(response.content.decode('utf-8'))
......@@ -35,14 +42,14 @@ def getTycIdByXYDM(xydm):
retData['state'] = True
retData['tycData'] = retJsonData['data'][0]
response.close()
return retData
return retData['tycData']
else:
log.error(f"{xydm}------{retJsonData}")
response.close()
return retData
except Exception as e:
log.error(f"{xydm}---exception---{e}")
return retData
return retData['tycData']
except:
log.error(f"---{xydm}--天眼查token失效---")
return retData['tycData']
# 更新天眼查企业基本信息
......
......@@ -3,7 +3,6 @@ import json
import requests, time, pymysql
import jieba
import sys
from kafka import KafkaProducer
from getTycId import getTycIdByXYDM
from base.BaseCore import BaseCore
......@@ -12,15 +11,15 @@ from base.smart import smart_extractor
# import BaseCore
# from smart import smart_extractor
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 初始化,设置中文分词
jieba.cut("必须加载jieba")
smart =smart_extractor.SmartExtractor('cn')
baseCore = BaseCore()
log = baseCore.getLogger()
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
cursor = cnx.cursor()
pageSize = 10
headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
......@@ -134,10 +133,10 @@ def beinWork(tyc_code, social_code,start_time):
link = info_page['uri']
try:
sel_sql = '''select social_credit_code from brpa_source_article where source_address = %s and social_credit_code=%s and type='2' '''
cursor.execute(sel_sql, (link, social_code))
cursor_.execute(sel_sql, (link, social_code))
except Exception as e:
print(e)
selects = cursor.fetchone()
selects = cursor_.fetchone()
if selects:
log.info(f'{tyc_code}-----{social_code}----{link}:已经存在')
......@@ -156,7 +155,10 @@ def beinWork(tyc_code, social_code,start_time):
# 开始进行智能解析
# lang = baseCore.detect_language(title)
# smart = smart_extractor.SmartExtractor(lang)
#带标签正文
contentText = smart.extract_by_url(link).text
#不带标签正文
content = smart.extract_by_url(link).cleaned_text
# time.sleep(3)
except Exception as e:
contentText = ''
......@@ -175,36 +177,25 @@ def beinWork(tyc_code, social_code,start_time):
pass
continue
try:
insert_sql = '''insert into brpa_source_article(social_credit_code,title,summary,content,publish_date,source_address,origin,author,type,lang) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,create_time) values(%s,%s,%s,%s,now())'''
# 动态信息列表
up_okCount = up_okCount + 1
list_info = [
social_code,
title,
info_page['abstracts'], # 摘要
contentText, # 正文
time_format, # 发布时间
link,
'天眼查',
source,
'2',
'zh'
]
cursor.execute(insert_sql, tuple(list_info))
cnx.commit()
cursor_.execute(insert_sql, tuple(list_info))
cnx_.commit()
# 采集一条资讯记录一条,记录该企业采到了多少的资讯
log.info(f'{social_code}----{link}:新增一条')
sel_sql = "select article_id from brpa_source_article where source_address = %s and social_credit_code = %s"
cursor.execute(sel_sql, (link, social_code))
row = cursor.fetchone()
id = row[0]
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# todo:插入一条数据,并传入kafka
dic_news = {
'attachmentIds': id,
'attachmentIds': '',
'author': '',
'content': contentText,
'content': content,
'contentWithTag': contentText,
'createDate': time_now,
'deleteFlag': '0',
......@@ -222,7 +213,6 @@ def beinWork(tyc_code, social_code,start_time):
'year': time_format[:4]
}
except Exception as e:
log.info(f'传输失败:{social_code}----{link}')
e = '数据库传输失败'
state = 0
......@@ -237,7 +227,6 @@ def beinWork(tyc_code, social_code,start_time):
json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
dic_result = {
'success': 'ture',
'message': '操作成功',
......@@ -250,7 +239,6 @@ def beinWork(tyc_code, social_code,start_time):
baseCore.recordLog(social_code, taskType, state, takeTime, link, '')
# return True
except Exception as e:
dic_result = {
'success': 'false',
'message': '操作失败',
......@@ -269,12 +257,12 @@ def beinWork(tyc_code, social_code,start_time):
retData['up_repetCount'] = up_repetCount
return retData
# 日志信息保存至现已创建好数据库中,因此并没有再对此前保存日志信息数据库进行保存
def doJob():
while True:
# 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息
social_code = baseCore.redicPullData('NewsEnterprise:gnqy_socialCode')
# social_code = baseCore.redicPullData('NewsEnterprise:gnqy_socialCode')
social_code = '912301001275921118'
# 判断 如果Redis中已经没有数据,则等待
if social_code == None:
time.sleep(20)
......@@ -291,28 +279,31 @@ def doJob():
id = data[0]
xydm = data[2]
tycid = data[11]
if tycid == None:
if tycid == None or tycid == '':
try:
retData = getTycIdByXYDM(xydm)
tycid = retData['tycData']['id']
#todo:写入数据库
updateSql = f"update Enterprise set TYCID = '{tycid}' where SocialCode = '{xydm}'"
cursor_.execute(updateSql)
cnx_.commit()
if retData:
tycid = retData['id']
# todo:写入数据库
updateSql = f"update EnterpriseInfo set TYCID = '{tycid}' where SocialCode = '{xydm}'"
cursor_.execute(updateSql)
cnx_.commit()
else:
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', '获取天眼查id失败')
log.info(f'======={social_code}====重新放入redis====')
baseCore.rePutIntoR('NewsEnterprise:gnqy_socialCode', social_code)
continue
except:
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', '获取天眼查id失败')
baseCore.rePutIntoR('NewsEnterprise:gnqy_socialCode',social_code)
baseCore.rePutIntoR('NewsEnterprise:gnqy_socialCode', social_code)
continue
count = data[17]
log.info(f"{id}---{xydm}----{tycid}----开始处理")
start_time = time.time()
# updateBeginSql = f"update ssqy_tyc set update_state=2,date_time=now() where id={id}"
# cursor.execute(updateBeginSql)
# cnx.commit()
# 开始采集企业动态
retData = beinWork(tycid, xydm,start_time)
# 信息采集完成后将该企业的采集次数更新
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论