提交 36713e81 作者: 丁双波

Merge remote-tracking branch 'origin/master'

...@@ -475,6 +475,7 @@ class BaseCore: ...@@ -475,6 +475,7 @@ class BaseCore:
except: except:
log = self.getLogger() log = self.getLogger()
log.info('=========数据库操作失败========') log.info('=========数据库操作失败========')
return data return data
# 更新企业采集次数 # 更新企业采集次数
...@@ -527,6 +528,13 @@ class BaseCore: ...@@ -527,6 +528,13 @@ class BaseCore:
token = self.cursor.fetchone()[0] token = self.cursor.fetchone()[0]
return token return token
#获取天眼查token
def GetTYCToken(self):
query = 'select token from TYC_token'
self.cursor.execute(query)
token = self.cursor.fetchone()[0]
return token
#检测语言 #检测语言
def detect_language(self, text): def detect_language(self, text):
# 使用langid.py判断文本的语言 # 使用langid.py判断文本的语言
......
...@@ -315,12 +315,13 @@ def FBS(): ...@@ -315,12 +315,13 @@ def FBS():
for item in gw_social_list: for item in gw_social_list:
r.rpush('NewsEnterpriseFbs:gwqy_socialCode', item) r.rpush('NewsEnterpriseFbs:gwqy_socialCode', item)
r.rpush('BaseInfoEnterpriseFbs:gwqy_social_code',item) # r.rpush('BaseInfoEnterpriseFbs:gwqy_social_code',item)
for item in gn_social_list: # for item in gn_social_list:
if not r.exists(item): # if not r.exists(item):
r.rpush('NewsEnterpriseFbs:gnqy_socialCode', item) # r.rpush('NewsEnterpriseFbs:gnqy_socialCode', item)
r.rpush('NoticeEnterpriseFbs:gnqy_socialCode',item) # r.rpush('CorPersonEnterpriseFbs:gnqy_socialCode', item)
r.rpush('BaseInfoEnterpriseFbs:gnqy_social_code',item) # r.rpush('NoticeEnterpriseFbs:gnqy_socialCode',item)
# r.rpush('BaseInfoEnterpriseFbs:gnqy_social_code',item)
closeSql(cnx,cursor) closeSql(cnx,cursor)
#将IPO的国外股票代码放到redis中 #将IPO的国外股票代码放到redis中
......
# -*- coding: utf-8 -*-
import pandas as pd
import time
import requests
import json
from kafka import KafkaProducer
from base.BaseCore import BaseCore
from getQccId import find_id_by_name
baseCore = BaseCore()
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
log = baseCore.getLogger()
# 通过企查查id获取企业基本信息
def info_by_id(com_id,com_name,social_code):
aa_dict_list = []
t = str(int(time.time()) * 1000)
headers['Qcc-Timestamp'] = t
url = "https://xcx.qcc.com/mp-weixin/forwardApp/v1/ent/detail?token={}&t={}&unique={}".format(token, t, com_id)
resp_dict = requests.get(url=url, headers=headers, verify=False).json()
time.sleep(2)
com_jc_name = ''
try:
result_dict = resp_dict['result']['Company']
except:
log.info(com_name + ":获取失败===========重新放入redis")
baseCore.rePutIntoR('BaseInfoEnterpriseFbs:gnqy_social_code',social_code)
return aa_dict_list
company_name = result_dict['Name']
CreditCode = result_dict['CreditCode']
if CreditCode is None:
CreditCode = ''
try:
OperName = result_dict['Oper']['Name']
except:
OperName = ''
if OperName is None:
OperName = ''
if baseCore.str_have_num(OperName):
OperName = ''
try:
Status = result_dict['ShortStatus']
except:
Status = ''
if Status is None:
Status = ''
try:
StartDate = result_dict['StartDate']
except:
StartDate = ''
if StartDate is None:
StartDate = ''
try:
RegistCapi = result_dict['RegistCapi']
except:
RegistCapi = ''
if RegistCapi is None:
RegistCapi = ''
RecCap = '' # result_dict['RecCap'] #实际缴纳金额,现已没有显示
if RecCap is None:
RecCap = ''
try:
OrgNo = result_dict['CreditCode'][8:-2] + '-' + result_dict['CreditCode'][-2] # 组织机构代码,现已没有显示
except:
OrgNo = ''
if OrgNo is None:
OrgNo = ''
try:
TaxNo = result_dict['TaxNo']
except:
TaxNo = ''
if TaxNo is None:
TaxNo = ''
try:
EconKind = result_dict['EconKind']
except:
EconKind = ''
if EconKind is None:
EconKind = ''
TermStart = '' # result_dict['TermStart'] 营业期限自,现已没有显示
if TermStart is None:
TermStart = ''
TeamEnd = '' # result_dict['TeamEnd']营业期限至,现已没有显示
if TeamEnd is None:
TeamEnd = ''
try:
SubIndustry = result_dict['Industry']['SubIndustry']
except:
SubIndustry = ''
if SubIndustry is None:
SubIndustry = ''
try:
Province = result_dict['Area']['Province']
except:
Province = ''
try:
City = result_dict['Area']['City']
except:
City = ''
try:
County = result_dict['Area']['County']
except:
County = ''
try:
region = Province + City + County
except:
region = ''
BelongOrg = '' # result_dict['BelongOrg']登记机关,现已没有显示
can_bao = ''
CommonList = [] # result_dict['CommonList']参保人数,现已没有显示
for Common_dict in CommonList:
try:
KeyDesc = Common_dict['KeyDesc']
except:
continue
if KeyDesc == '参保人数':
can_bao = Common_dict['Value']
if can_bao == '0':
can_bao = ''
OriginalName = ''
try:
OriginalName_lists = result_dict['OriginalName']
for OriginalName_dict in OriginalName_lists:
OriginalName += OriginalName_dict['Name'] + ' '
except:
OriginalName = ''
try:
OriginalName.strip()
except:
OriginalName = ''
EnglishName = '' # result_dict['EnglishName']企业英文名,现已没有显示
if EnglishName is None:
EnglishName = ''
IxCode = '' # result_dict['IxCode']进出口企业代码,现已没有显示
if IxCode is None:
IxCode = ''
Address = result_dict['Address']
if Address is None:
Address = ''
Scope = '' # result_dict['Scope']经营范围,现已没有显示
if Scope is None:
Scope = ''
try:
PhoneNumber = result_dict['companyExtendInfo']['Tel']
except:
PhoneNumber = ''
if PhoneNumber is None:
PhoneNumber = ''
try:
WebSite = result_dict['companyExtendInfo']['WebSite']
except:
WebSite = None
if WebSite is None:
try:
WebSite = result_dict['ContactInfo']['WebSite'][0]['Url']
except:
WebSite = ''
try:
Email = result_dict['companyExtendInfo']['Email']
except:
Email = ''
if Email is None:
Email = ''
try:
Desc = result_dict['companyExtendInfo']['Desc']
except:
Desc = ''
if Desc is None:
Desc = ''
try:
Info = result_dict['companyExtendInfo']['Info']
except:
Info = ''
if Info is None:
Info = ''
company_name = baseCore.hant_2_hans(company_name)
t = str(int(time.time()) * 1000)
headers['Qcc-Timestamp'] = t
url = "https://xcx.qcc.com/mp-weixin/forwardApp/v6/base/getEntDetail?token={}&t={}&unique={}".format(token, t,
com_id)
resp_dict2 = requests.get(url=url, headers=headers, verify=False).json()
time.sleep(1)
try:
com2 = resp_dict2['result']['Company']
except:
com2 = ''
try:
Scope = com2['Scope']
except:
Scope = ''
try:
CheckDate = com2['CheckDate']
except:
CheckDate = ''
if CheckDate is None:
CheckDate = ''
try:
TaxpayerType = com2['TaxpayerType'] #纳税人资质
except:
TaxpayerType = ''
if TaxpayerType is None:
TaxpayerType = ''
try:
No = com2['No']
except:
No = ''
if No is None:
No = ''
try:
IxCode = com2['IxCode']
except:
IxCode = ''
try:
OrgNo = com2['OrgNo']
except:
OrgNo = ''
try:
for Common_t in com2['CommonList']:
try:
if Common_t['KeyDesc'] == '参保人数':
can_bao = Common_t['Value']
except:
pass
except:
can_bao = ''
try:
TermStart = com2['TermStart']
except:
TermStart = ''
try:
TeamEnd = com2['TeamEnd']
except:
TeamEnd = ''
try:
RecCap = com2['RecCap']
except:
RecCap = ''
try:
No = com2['No']
except:
No = ''
try:
SubIndustry = com2['IndustryArray'][-1]
except:
SubIndustry = ''
try:
BelongOrg = com2['BelongOrg']
except:
BelongOrg = ''
try:
EnglishName = com2['EnglishName']
except:
EnglishName = ''
aa_dict = {
'qccId': com_id, # 企查查企业id
'name': company_name, # 企业名称
'shortName': com_jc_name, # 企业简称
'socialCreditCode': CreditCode, # 统一社会信用代码
'legalPerson': OperName, # 法定代表人
'officialPhone': PhoneNumber, # 电话
'officialUrl': WebSite, # 官网
'officialEmail': Email, # 邮箱
'briefInfo': Desc, # 简介
'registerStatus': Status, # 登记状态
'incorporationDate': StartDate, # 成立日期
'capital': RegistCapi, # 注册资本
'paidCapital': RecCap, # 实缴资本
'approvalDate': CheckDate, # 核准日期
'organizationCode': OrgNo, # 组织机构代码
'registerNo': No, # 工商注册号
'taxpayerNo': CreditCode, # 纳税人识别号
'type': EconKind, # 企业类型
'businessStartDate': TermStart, # 营业期限自
'businessEndDate': TeamEnd, # 营业期限至
'taxpayerQualification': TaxpayerType, # 纳税人资质
'industry': SubIndustry, # 所属行业
'region': region,
'province': Province, # 所属省
'city': City, # 所属市
'county': County, # 所属县
'registerDepartment': BelongOrg, # 登记机关
'scale': Info, # 人员规模
'insured': can_bao, # 参保人数
'beforeName': OriginalName, # 曾用名
'englishName': EnglishName, # 英文名
'importExportEnterpriseCode': IxCode, # 进出口企业代码
'address': Address, # 地址
'businessRange': Scope, # 经营范围
'status': 0, # 状态
}
aa_dict_list.append(aa_dict)
print(company_name + ":爬取完成")
return aa_dict_list
if __name__ == '__main__':
taskType = '基本信息/企查查'
headers = {
'Host': 'xcx.qcc.com',
'Connection': 'keep-alive',
'Qcc-Platform': 'mp-weixin',
'Qcc-Timestamp': '',
'Qcc-Version': '1.0.0',
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.143 Safari/537.36 MicroMessenger/7.0.9.501 NetType/WIFI MiniProgramEnv/Windows WindowsWechat',
'content-type': 'application/json',
'Referer': 'https://servicewechat.com/wx395200814fcd7599/166/page-frame.html',
'Accept-Encoding': 'gzip, deflate, br,'
}
#从redis里拿数据
while True:
# TODO:需要隔两个小时左右抓包修改,token从数据库中获得
token = baseCore.GetToken()
list_weicha = []
list_all_info = []
name_list = []
start_time = time.time()
# 获取企业信息
# social_code = baseCore.redicPullData('BaseInfoEnterprise:gnqy_socialCode')
social_code = '91110000802100433B'
if social_code == '':
time.sleep(20)
continue
dic_info = baseCore.getInfomation(social_code)
log.info(f'----当前企业{social_code}--开始处理---')
count = dic_info[13]
com_name = dic_info[1]
social_code = dic_info[2]
#企查查id
company_id = dic_info[12]
#如果没有信用代码 就通过名字搜索 如果有信用代码 就通过信用代码
if company_id == '' or company_id == None:
if social_code:
company_id = find_id_by_name(start_time,token,social_code)
else:
company_id = find_id_by_name(start_time,token,com_name)
if not company_id:
log.info(com_name + ":企业ID获取失败===重新放入redis")
list_weicha.append(com_name + ":企业ID获取失败")
baseCore.rePutIntoR('BaseInfoEnterprise:gnqy_socialCode',social_code)
time.sleep(20)
continue
else:
log.info(f'====={social_code}===={company_id}=====获取企业id成功=====')
# todo:写入数据库
updateSql = f"update EnterpriseInfo set QCCID = '{company_id}' where SocialCode = '{social_code}'"
cursor_.execute(updateSql)
cnx_.commit()
try:
post_data_list = info_by_id(company_id, com_name,social_code)
except:
log.info(f'====={social_code}=====获取基本信息失败,重新放入redis=====')
baseCore.rePutIntoR('BaseInfoEnterprise:gnqy_social_code', social_code)
continue
if post_data_list:
pass
else:
log.info(f'======{social_code}====企查查token失效====')
time.sleep(20)
continue
for post_data in post_data_list:
list_all_info.append(post_data)
if post_data is None:
print(com_name + ":企业信息获取失败")
list_weicha.append(com_name + ":企业信息获取失败")
continue
get_name = post_data['name']
get_socialcode = post_data['socialCreditCode']
name_compile = {
'yuan_name':com_name,
'get_name':get_name
}
name_list.append(name_compile)
log.info(f'采集{com_name}成功=======耗时{baseCore.getTimeCost(start_time,time.time())}')
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 0, 2))
kafka_result = producer.send("regionInfo", json.dumps(post_data, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
except:
exception = 'kafka传输失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(get_socialcode, taskType, state, takeTime, '', exception)
log.info(f"{get_name}--{get_socialcode}--kafka传输失败")
# 信息采集完成后将该企业的采集次数更新
runType = 'BaseInfoRunCount'
count += 1
baseCore.updateRun(social_code, runType, count)
nowtime = baseCore.getNowTime(1).replace('-','_')[:10]
companyName = pd.DataFrame(name_list)
companyName.to_excel(f'./data/企业名称对比_{nowtime}.xlsx',index=False)
false_com = pd.DataFrame(list_weicha)
false_com.to_excel(f'./data/采集失败企业名单_{nowtime}.xlsx',index=False)
...@@ -37,6 +37,11 @@ def find_id_by_name(start,token,name): ...@@ -37,6 +37,11 @@ def find_id_by_name(start,token,name):
time.sleep(5) time.sleep(5)
continue continue
time.sleep(2) time.sleep(2)
#{'status': 40101, 'message': '无效的sessionToken!'}
if resp_dict['status']==40101:
KeyNo = False
log.info(f'====token失效====时间{baseCore.getTimeCost(start, time.time())}')
return KeyNo
try: try:
if resp_dict['result']['Result']: if resp_dict['result']['Result']:
result_dict = resp_dict['result']['Result'][0] result_dict = resp_dict['result']['Result'][0]
......
#补充剩余核心人员信息
#先采集天眼查id,再通过id采集核心人员信息
import datetime
import json
import requests,time,random
import pandas as pd
from bs4 import BeautifulSoup
import urllib3
from base.BaseCore import BaseCore
from getTycId import getTycIdByXYDM
baseCore = BaseCore()
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
log = baseCore.getLogger()
headers = {
'Cookie':'HWWAFSESID=0e10b77869899be8365; HWWAFSESTIME=1688781923708; csrfToken=VeTF4UIZKJ0q6yWmgfC_FLqv; TYCID=e7cec7501d3311eea9dcb9fb7af79aad; ssuid=3142278034; sajssdk_2015_cross_new_user=1; bannerFlag=true; _ga=GA1.2.1006597844.1688781929; _gid=GA1.2.146077413.1688781929; Hm_lvt_e92c8d65d92d534b0fc290df538b4758=1688781929; tyc-user-info={%22state%22:%220%22%2C%22vipManager%22:%220%22%2C%22mobile%22:%2217103123002%22}; tyc-user-info-save-time=1688781977329; auth_token=eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiIxNzEwMzEyMzAwMiIsImlhdCI6MTY4ODc4MTk3NiwiZXhwIjoxNjkxMzczOTc2fQ.Luw0DCFul8WxRNOM8X5-NCmy_z3BwJC5JBvofWqWkSQOleJ6zJU0SRbqwAobPfOfVyGFDUBqmxxWd4YKCeCWeQ; tyc-user-phone=%255B%252217103123002%2522%255D; searchSessionId=1688778331.16177575; sensorsdata2015jssdkcross=%7B%22distinct_id%22%3A%22302953956%22%2C%22first_id%22%3A%22189333f38cb947-0fb9b252742a6c-26031d51-921600-189333f38cdcdd%22%2C%22props%22%3A%7B%22%24latest_traffic_source_type%22%3A%22%E7%9B%B4%E6%8E%A5%E6%B5%81%E9%87%8F%22%2C%22%24latest_search_keyword%22%3A%22%E6%9C%AA%E5%8F%96%E5%88%B0%E5%80%BC_%E7%9B%B4%E6%8E%A5%E6%89%93%E5%BC%80%22%2C%22%24latest_referrer%22%3A%22%22%7D%2C%22identities%22%3A%22eyIkaWRlbnRpdHlfY29va2llX2lkIjoiMTg5MzMzZjM4Y2I5NDctMGZiOWIyNTI3NDJhNmMtMjYwMzFkNTEtOTIxNjAwLTE4OTMzM2YzOGNkY2RkIiwiJGlkZW50aXR5X2xvZ2luX2lkIjoiMzAyOTUzOTU2In0%3D%22%2C%22history_login_id%22%3A%7B%22name%22%3A%22%24identity_login_id%22%2C%22value%22%3A%22302953956%22%7D%2C%22%24device_id%22%3A%22189333f38cb947-0fb9b252742a6c-26031d51-921600-189333f38cdcdd%22%7D; Hm_lpvt_e92c8d65d92d534b0fc290df538b4758=1688781980',
# 'Cookie': 'TYCID=82cbe530204b11ed9f23298cecec1c60; ssuid=3927938144; _ga=GA1.2.1842488970.1670638075; jsid=SEO-BAIDU-ALL-SY-000001; tyc-user-info={%22state%22:%220%22%2C%22vipManager%22:%220%22%2C%22mobile%22:%2215565837784%22}; tyc-user-info-save-time=1678953978429; auth_token=eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiIxNTU2NTgzNzc4NCIsImlhdCI6MTY3ODk1Mzk3OCwiZXhwIjoxNjgxNTQ1OTc4fQ.wsNxLWMkZVrtOEvo_CCDPD38R7F23c5yk7dFAdHkwFPkZhEEvmiv0nlt7UD0ZWfo3t8aYxc4qvu4ueEgMubJ5g; tyc-user-phone=%255B%252215565837784%2522%255D; sensorsdata2015jssdkcross=%7B%22distinct_id%22%3A%22284710084%22%2C%22first_id%22%3A%22182b9ca585ead-089598c1d7f7928-26021d51-1327104-182b9ca585f7f1%22%2C%22props%22%3A%7B%22%24latest_traffic_source_type%22%3A%22%E8%87%AA%E7%84%B6%E6%90%9C%E7%B4%A2%E6%B5%81%E9%87%8F%22%2C%22%24latest_search_keyword%22%3A%22%E6%9C%AA%E5%8F%96%E5%88%B0%E5%80%BC%22%2C%22%24latest_referrer%22%3A%22https%3A%2F%2Fwww.baidu.com%2Flink%22%7D%2C%22identities%22%3A%22eyIkaWRlbnRpdHlfbG9naW5faWQiOiIyODQ3MTAwODQiLCIkaWRlbnRpdHlfY29va2llX2lkIjoiMTgyYjljYTU4NWVhZC0wODk1OThjMWQ3Zjc5MjgtMjYwMjFkNTEtMTMyNzEwNC0xODJiOWNhNTg1ZjdmMSJ9%22%2C%22history_login_id%22%3A%7B%22name%22%3A%22%24identity_login_id%22%2C%22value%22%3A%22284710084%22%7D%2C%22%24device_id%22%3A%22182b9ca585ead-089598c1d7f7928-26021d51-1327104-182b9ca585f7f1%22%7D; HWWAFSESID=fa776898fa88a6520ea; HWWAFSESTIME=1679899464128; csrfToken=m3cB6mHsznwIuppkT-S8oYc6; Hm_lvt_e92c8d65d92d534b0fc290df538b4758=1679016180,1679471093,1679732923,1679899468; bdHomeCount=28; bannerFlag=true; show_activity_id_92=92; searchSessionId=1679899783.48494979; Hm_lpvt_e92c8d65d92d534b0fc290df538b4758=1679899783',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36',
}
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
list_all_1 = []
list_all_2 = []
taskType = '天眼查/核心人员'
def doJob():
while True:
# 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息
social_code = baseCore.redicPullData('CorPersonEnterprise:gnqy_socialCode')
# 判断 如果Redis中已经没有数据,则等待
# social_code = 'ZZSN23011300000004'
if social_code == None:
time.sleep(20)
continue
start = time.time()
try:
data = baseCore.getInfomation(social_code)
if len(data) != 0:
pass
else:
#数据重新塞入redis
baseCore.rePutIntoR('CorPersonEnterprise:gnqy_socialCode',social_code)
continue
id = data[0]
xydm = data[2]
tycid = data[11]
if tycid == None or tycid == '':
try:
retData = getTycIdByXYDM(xydm)
if retData:
tycid = retData['id']
# todo:写入数据库
updateSql = f"update EnterpriseInfo set TYCID = '{tycid}' where SocialCode = '{xydm}'"
cursor_.execute(updateSql)
cnx_.commit()
else:
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', '获取天眼查id失败')
log.info(f'======={social_code}====重新放入redis====')
baseCore.rePutIntoR('CorPersonEnterprise:gnqy_socialCode', social_code)
continue
except:
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', '获取天眼查id失败')
baseCore.rePutIntoR('CorPersonEnterprise:gnqy_socialCode', social_code)
continue
count = data[17]
log.info(f"{id}---{xydm}----{tycid}----开始采集核心人员")
list_one_info = []
num = 1
for page in range(1,2):
t = int(time.time()*1000)
#https://capi.tianyancha.com/cloud-listed-company/listed/getHkNoRepeatSeniorExecutive?_=1692929256462&gid=209370942&pageSize=20&pageNum=1
url = f'https://capi.tianyancha.com/cloud-listed-company/listed/noRepeatSeniorExecutive?_={t}&gid={tycid}&pageSize=20&pageNum={page}'
ip = baseCore.get_proxy()
res = requests.get(url,headers=headers,proxies=ip) # ,verify=False
time.sleep(1)
list_all = res.json()['data']['dataList']
if list_all:
for one_info in list_all:
name = one_info['name']
sex = one_info['sex']
education = one_info['education']
position = one_info['position']
Salary = one_info['salary']
#todo:获取当前年份
now = datetime.datetime.now()
year = now.year
try:
birthYear = year - int(one_info['age'])
except:
birthYear = ''
StockKeepings = one_info['numberOfShares']
currentTerm = one_info['term']
personInfo = one_info['resume']
try:
person_img = one_info['logo']
except:
person_img = '--'
dic_json = {
"socialCreditCode":social_code,
"name":name,
"sex":sex,
"education":education,
"position":position,
"salary":Salary,
"birthYear":birthYear,
"shareNum":StockKeepings,
"shareRatio":'',
"benefitShare":'',
"currentTerm":currentTerm,
"personInfo":personInfo,
"sort":str(num)
}
dic_json_img = {
"socialCreditCode":social_code,
"name":name,
"sex":sex,
"education":education,
"position":position,
"salary":Salary,
"birthYear":birthYear,
"shareNum":StockKeepings,
"shareRatio":'',
"benefitShare":'',
"currentTerm":currentTerm,
"personInfo":personInfo,
"头像":person_img,
"sort":str(num)
}
num = num+1
list_one_info.append(dic_json)
# list_all_2.append(dic_json_img)
else:
t = int(time.time() * 1000)
url = f'https://capi.tianyancha.com/cloud-listed-company/listed/getHkNoRepeatSeniorExecutive?_={t}&gid={tycid}&pageSize=20&pageNum={page}'
ip = baseCore.get_proxy()
res = requests.get(url, headers=headers, proxies=ip) # ,verify=False
time.sleep(1)
list_all = res.json()['data']['dataList']
if list_all:
for one_info in list_all:
name = one_info['personal_name']
sex = one_info['gender2']
education = ''
position = one_info['position_name']
Salary = ''
birthYear = ''
personInfo = one_info['resume_cn']
dic_json = {
"socialCreditCode": social_code,
"name": name,
"sex": sex,
"education": education,
"position": position,
"salary": Salary,
"birthYear": birthYear,
"shareNum": '',
"shareRatio": '',
"benefitShare": '',
"currentTerm": '',
"personInfo": personInfo,
"sort": str(num)
}
num = num + 1
list_one_info.append(dic_json)
else:
t = int(time.time() * 1000)
url = f'https://capi.tianyancha.com/cloud-company-background/company/dim/staff?_={t}&gid={tycid}&pageSize=20&pageNum={page}'
ip = baseCore.get_proxy()
res = requests.get(url, headers=headers, proxies=ip) # ,verify=False
time.sleep(1)
list_all = res.json()['data']['result']
# todo:增加一种情况
if list_all:
for one_info in list_all:
name = one_info['name']
try:
sex = one_info['sex']
except:
sex = ''
try:
education = one_info['education']
except:
education = ''
try:
position = one_info['typeSore']
except:
position = ''
try:
Salary = one_info['salary']
except:
Salary = ''
birthYear = ''
try:
shareRatio = one_info['percent']
except:
shareRatio = ''
try:
benefitShare = one_info['finalBenefitShares']
except:
benefitShare = ''
try:
currentTerm = one_info['term']
except:
currentTerm = ''
person_id = one_info['id']
person_url = f'https://www.tianyancha.com/human/{person_id}-c{tycid}'
person_res = requests.get(person_url, headers=headers, proxies=ip)
person_soup = BeautifulSoup(person_res.content, 'html.parser')
try:
personInfo = person_soup.find('span', {'class': '_56d0a'}).text.strip()
except:
personInfo = ''
try:
person_img = one_info['logo']
except:
person_img = '--'
dic_json = {
"socialCreditCode": social_code,
"name": name,
"sex": sex,
"education": education,
"position": position,
"salary": Salary,
"birthYear": birthYear,
"shareNum": '',
"shareRatio": shareRatio,
"benefitShare": benefitShare,
"currentTerm": currentTerm,
"personInfo": personInfo,
"sort": str(num)
}
dic_json_img = {
"socialCreditCode": social_code,
"name": name,
"sex": sex,
"education": education,
"position": position,
"salary": Salary,
"birthYear": birthYear,
"shareNum": '',
"shareRatio": shareRatio,
"benefitShare": benefitShare,
"currentTerm": '',
"personInfo": personInfo,
"头像": person_img,
"sort": str(num)
}
num = num + 1
list_one_info.append(dic_json)
json_updata = json.dumps(list_one_info)
if json_updata == '[]':
continue
else:
pass
response = requests.post('http://114.115.236.206:8088/sync/executive',data=json_updata,timeout=300, verify=False)
print(response.text)
log.info('=========成功======')
except Exception as e:
log.info(f'==={social_code}=====企业核心人员采集失败===重新放入redis====')
# 重新塞入redis
baseCore.rePutIntoR('CorPersonEnterprise:gnqy_socialCode', social_code)
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', f'获取企业信息失败--{e}')
time.sleep(5)
# break
# df_img = pd.DataFrame(list_all_2)
# df_img.to_excel('企业主要人员-头像.xlsx',index=False)
if __name__ == "__main__":
doJob()
\ No newline at end of file
...@@ -19,7 +19,7 @@ jieba.cut("必须加载jieba") ...@@ -19,7 +19,7 @@ jieba.cut("必须加载jieba")
smart =smart_extractor.SmartExtractor('cn') smart =smart_extractor.SmartExtractor('cn')
baseCore = BaseCore() baseCore = BaseCore()
log = baseCore.getLogger() log = baseCore.getLogger()
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4') cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
cursor= cnx.cursor() cursor= cnx.cursor()
cnx_ = baseCore.cnx cnx_ = baseCore.cnx
...@@ -37,7 +37,7 @@ headers = { ...@@ -37,7 +37,7 @@ headers = {
'Referer': 'https://www.tianyancha.com/', 'Referer': 'https://www.tianyancha.com/',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Edg/114.0.1823.51' 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Edg/114.0.1823.51'
} }
taskType = '企业动态/天眼查/福布斯' taskType = '企业动态/天眼查'
def beinWork(tyc_code, social_code): def beinWork(tyc_code, social_code):
start_time = time.time() start_time = time.time()
time.sleep(3) time.sleep(3)
...@@ -154,11 +154,14 @@ def beinWork(tyc_code, social_code): ...@@ -154,11 +154,14 @@ def beinWork(tyc_code, social_code):
# 开始进行智能解析 # 开始进行智能解析
# lang = baseCore.detect_language(title) # lang = baseCore.detect_language(title)
# smart = smart_extractor.SmartExtractor(lang) # smart = smart_extractor.SmartExtractor(lang)
contentText = smart.extract_by_url(link).text #带标签正文
contentWithTag = smart.extract_by_url(link).text
#不带标签正文
content = smart.extract_by_url(link).cleaned_text
# time.sleep(3) # time.sleep(3)
except Exception as e: except Exception as e:
contentText = '' contentWithTag = ''
if contentText == '': if contentWithTag == '':
log.error(f'获取正文失败:--------{tyc_code}--------{num}--------{link}') log.error(f'获取正文失败:--------{tyc_code}--------{num}--------{link}')
e = '获取正文失败' e = '获取正文失败'
state = 0 state = 0
...@@ -174,7 +177,7 @@ def beinWork(tyc_code, social_code): ...@@ -174,7 +177,7 @@ def beinWork(tyc_code, social_code):
continue continue
try: try:
#todo:更换插入的库 #todo:更换插入的库
insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,author,type) values(%s,%s,%s,%s,%s)''' insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,create_time) values(%s,%s,%s,%s,now())'''
# 动态信息列表 # 动态信息列表
up_okCount = up_okCount + 1 up_okCount = up_okCount + 1
...@@ -182,14 +185,73 @@ def beinWork(tyc_code, social_code): ...@@ -182,14 +185,73 @@ def beinWork(tyc_code, social_code):
social_code, social_code,
link, link,
'天眼查', '天眼查',
source,
'2', '2',
] ]
cursor_.execute(insert_sql, tuple(list_info)) cursor_.execute(insert_sql, tuple(list_info))
cnx_.commit() cnx_.commit()
# 采集一条资讯记录一条,记录该企业采到了多少的资讯 # 采集一条资讯记录一条,记录该企业采到了多少的资讯
log.info(f'{social_code}----{link}:新增一条') log.info(f'{social_code}----{link}:新增一条')
# 采集一条资讯记录一条,记录该企业采到了多少的资讯
log.info(f'{social_code}----{link}:新增一条')
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# todo:插入一条数据,并传入kafka
dic_news = {
'attachmentIds': '',
'author': '',
'content': content,
'contentWithTag': contentWithTag,
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': 'zh',
'origin': '天眼查',
'publishDate': time_format,
'sid': '1684032033495392257',
'sourceAddress': link, # 原文链接
'summary': info_page['abstracts'],
'title': title,
'type': 2,
'socialCreditCode': social_code,
'year': time_format[:4]
}
except Exception as e:
log.info(f'传输失败:{social_code}----{link}')
# e = '数据库传输失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, link, e)
continue
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("researchReportTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
dic_result = {
'success': 'ture',
'message': '操作成功',
'code': '200',
}
log.info(dic_result)
# 传输成功,写入日志中
state = 1
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, link, '')
# return True
except Exception as e: except Exception as e:
dic_result = {
'success': 'false',
'message': '操作失败',
'code': '204',
'e': e
}
log.error(dic_result)
e = 'Kafka操作失败'
state = 0 state = 0
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, link, e) baseCore.recordLog(social_code, taskType, state, takeTime, link, e)
...@@ -205,8 +267,9 @@ def doJob(): ...@@ -205,8 +267,9 @@ def doJob():
while True: while True:
start = time.time() start = time.time()
# 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息 # 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息 天眼查ID19276488
social_code = baseCore.redicPullData('NewsEnterpriseFbs:gnqy_socialCode') # social_code = baseCore.redicPullData('NewsEnterpriseFbs:gnqy_socialCode')
social_code = '912301001275921118'
if social_code == None: if social_code == None:
time.sleep(20) time.sleep(20)
continue continue
...@@ -222,19 +285,25 @@ def doJob(): ...@@ -222,19 +285,25 @@ def doJob():
id = data[0] id = data[0]
xydm = data[2] xydm = data[2]
tycid = data[11] tycid = data[11]
if tycid == None: if tycid == None or tycid== '':
try: try:
retData = getTycIdByXYDM(xydm) retData = getTycIdByXYDM(xydm)
tycid = retData['tycData']['id'] if retData:
# todo:写入数据库 tycid = retData['id']
updateSql = f"update Enterprise set TYCID = '{tycid}' where SocialCode = '{xydm}'" # todo:写入数据库
cursor_.execute(updateSql) updateSql = f"update EnterpriseInfo set TYCID = '{tycid}' where SocialCode = '{xydm}'"
cnx_.commit() cursor_.execute(updateSql)
cnx_.commit()
else:
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', '获取天眼查id失败')
baseCore.rePutIntoR('NewsEnterpriseFbs:gnqy_socialCode', social_code)
except: except:
state = 0 state = 0
takeTime = baseCore.getTimeCost(start, time.time()) takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', '获取天眼查id失败') baseCore.recordLog(social_code, taskType, state, takeTime, '', '获取天眼查id失败')
baseCore.rePutIntoR('NewsEnterprise:gnqy_socialCode', social_code) baseCore.rePutIntoR('NewsEnterpriseFbs:gnqy_socialCode', social_code)
continue continue
count = data[17] count = data[17]
log.info(f"{id}---{xydm}----{tycid}----开始处理") log.info(f"{id}---{xydm}----{tycid}----开始处理")
...@@ -242,8 +311,10 @@ def doJob(): ...@@ -242,8 +311,10 @@ def doJob():
# 开始采集企业动态 # 开始采集企业动态
retData = beinWork(tycid, xydm) retData = beinWork(tycid, xydm)
# 信息采集完成后将该企业的采集次数更新
# baseCore.updateRun(xydm, runType, count) runType = 'NewsRunCount'
count += 1
baseCore.updateRun(xydm, runType, count)
total = retData['total'] total = retData['total']
up_okCount = retData['up_okCount'] up_okCount = retData['up_okCount']
up_errorCount = retData['up_errorCount'] up_errorCount = retData['up_errorCount']
...@@ -257,7 +328,7 @@ def doJob(): ...@@ -257,7 +328,7 @@ def doJob():
takeTime = baseCore.getTimeCost(start, time.time()) takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', f'获取企业信息失败--{e}') baseCore.recordLog(social_code, taskType, state, takeTime, '', f'获取企业信息失败--{e}')
time.sleep(5) time.sleep(5)
# break
cursor.close() cursor.close()
cnx.close() cnx.close()
# 释放资源 # 释放资源
......
...@@ -10,9 +10,15 @@ from base.BaseCore import BaseCore ...@@ -10,9 +10,15 @@ from base.BaseCore import BaseCore
requests.adapters.DEFAULT_RETRIES = 5 requests.adapters.DEFAULT_RETRIES = 5
baseCore = BaseCore() baseCore = BaseCore()
log = baseCore.getLogger() log = baseCore.getLogger()
headers={ # headers={
'X-AUTH-TOKEN':'eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiIxMzY4MzgxNjk4NCIsImlhdCI6MTY5MDE3ODYyOCwiZXhwIjoxNjkyNzcwNjI4fQ.VV3Zoa4RM5nVN8UXBc0-81KMGqLzTOme6rButeETGfFQi7p5h4ydg8CFrEsizr_iFwB3_BVaKR2o2xR-M4ipbQ', # 'X-AUTH-TOKEN':'eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiIxMzY4MzgxNjk4NCIsImlhdCI6MTY5MDE3ODYyOCwiZXhwIjoxNjkyNzcwNjI4fQ.VV3Zoa4RM5nVN8UXBc0-81KMGqLzTOme6rButeETGfFQi7p5h4ydg8CFrEsizr_iFwB3_BVaKR2o2xR-M4ipbQ',
'X-TYCID':'77e997401d5f11ee9e91d5a0fd3c0b83', # 'X-TYCID':'77e997401d5f11ee9e91d5a0fd3c0b83',
# 'version':'TYC-Web',
# 'Content-Type':'application/json;charset=UTF-8'
# }
headers = {
'X-TYCID':'30c1289042f511ee9182cd1e1bcaa517',
# 'X-AUTH-TOKEN': 'eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiIxMzU5MjQ4MTgzOSIsImlhdCI6MTY5MjkzMzIxMiwiZXhwIjoxNjk1NTI1MjEyfQ.BKxDem8fpgeDHrIgm3qCoF76ueHtQSG1DggiTl4FAaoNKt4gem6NTX1XYndPXqVj9TXfl-8yp2kKE3jY66dyig',
'version':'TYC-Web', 'version':'TYC-Web',
'Content-Type':'application/json;charset=UTF-8' 'Content-Type':'application/json;charset=UTF-8'
} }
...@@ -27,6 +33,7 @@ def getTycIdByXYDM(xydm): ...@@ -27,6 +33,7 @@ def getTycIdByXYDM(xydm):
paramJsonData = {'keyword':xydm} paramJsonData = {'keyword':xydm}
try: try:
headers['User-Agent'] = baseCore.getRandomUserAgent() headers['User-Agent'] = baseCore.getRandomUserAgent()
headers['X-AUTH-TOKEN'] = baseCore.GetTYCToken()
response = requests.post(url,json=paramJsonData,headers=headers,verify=False, proxies=ip) response = requests.post(url,json=paramJsonData,headers=headers,verify=False, proxies=ip)
time.sleep(random.randint(3, 5)) time.sleep(random.randint(3, 5))
retJsonData =json.loads(response.content.decode('utf-8')) retJsonData =json.loads(response.content.decode('utf-8'))
...@@ -35,14 +42,14 @@ def getTycIdByXYDM(xydm): ...@@ -35,14 +42,14 @@ def getTycIdByXYDM(xydm):
retData['state'] = True retData['state'] = True
retData['tycData'] = retJsonData['data'][0] retData['tycData'] = retJsonData['data'][0]
response.close() response.close()
return retData return retData['tycData']
else: else:
log.error(f"{xydm}------{retJsonData}") log.error(f"{xydm}------{retJsonData}")
response.close() response.close()
return retData return retData['tycData']
except Exception as e: except:
log.error(f"{xydm}---exception---{e}") log.error(f"---{xydm}--天眼查token失效---")
return retData return retData['tycData']
# 更新天眼查企业基本信息 # 更新天眼查企业基本信息
......
...@@ -3,7 +3,6 @@ import json ...@@ -3,7 +3,6 @@ import json
import requests, time, pymysql import requests, time, pymysql
import jieba import jieba
import sys import sys
from kafka import KafkaProducer from kafka import KafkaProducer
from getTycId import getTycIdByXYDM from getTycId import getTycIdByXYDM
from base.BaseCore import BaseCore from base.BaseCore import BaseCore
...@@ -12,15 +11,15 @@ from base.smart import smart_extractor ...@@ -12,15 +11,15 @@ from base.smart import smart_extractor
# import BaseCore # import BaseCore
# from smart import smart_extractor # from smart import smart_extractor
import urllib3 import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 初始化,设置中文分词 # 初始化,设置中文分词
jieba.cut("必须加载jieba") jieba.cut("必须加载jieba")
smart =smart_extractor.SmartExtractor('cn') smart =smart_extractor.SmartExtractor('cn')
baseCore = BaseCore() baseCore = BaseCore()
log = baseCore.getLogger() log = baseCore.getLogger()
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4') cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
cursor = cnx.cursor() cursor = cnx.cursor()
pageSize = 10 pageSize = 10
headers = { headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
...@@ -134,10 +133,10 @@ def beinWork(tyc_code, social_code,start_time): ...@@ -134,10 +133,10 @@ def beinWork(tyc_code, social_code,start_time):
link = info_page['uri'] link = info_page['uri']
try: try:
sel_sql = '''select social_credit_code from brpa_source_article where source_address = %s and social_credit_code=%s and type='2' ''' sel_sql = '''select social_credit_code from brpa_source_article where source_address = %s and social_credit_code=%s and type='2' '''
cursor.execute(sel_sql, (link, social_code)) cursor_.execute(sel_sql, (link, social_code))
except Exception as e: except Exception as e:
print(e) print(e)
selects = cursor.fetchone() selects = cursor_.fetchone()
if selects: if selects:
log.info(f'{tyc_code}-----{social_code}----{link}:已经存在') log.info(f'{tyc_code}-----{social_code}----{link}:已经存在')
...@@ -156,7 +155,10 @@ def beinWork(tyc_code, social_code,start_time): ...@@ -156,7 +155,10 @@ def beinWork(tyc_code, social_code,start_time):
# 开始进行智能解析 # 开始进行智能解析
# lang = baseCore.detect_language(title) # lang = baseCore.detect_language(title)
# smart = smart_extractor.SmartExtractor(lang) # smart = smart_extractor.SmartExtractor(lang)
#带标签正文
contentText = smart.extract_by_url(link).text contentText = smart.extract_by_url(link).text
#不带标签正文
content = smart.extract_by_url(link).cleaned_text
# time.sleep(3) # time.sleep(3)
except Exception as e: except Exception as e:
contentText = '' contentText = ''
...@@ -175,36 +177,25 @@ def beinWork(tyc_code, social_code,start_time): ...@@ -175,36 +177,25 @@ def beinWork(tyc_code, social_code,start_time):
pass pass
continue continue
try: try:
insert_sql = '''insert into brpa_source_article(social_credit_code,title,summary,content,publish_date,source_address,origin,author,type,lang) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''' insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,create_time) values(%s,%s,%s,%s,now())'''
# 动态信息列表 # 动态信息列表
up_okCount = up_okCount + 1 up_okCount = up_okCount + 1
list_info = [ list_info = [
social_code, social_code,
title,
info_page['abstracts'], # 摘要
contentText, # 正文
time_format, # 发布时间
link, link,
'天眼查', '天眼查',
source,
'2', '2',
'zh'
] ]
cursor.execute(insert_sql, tuple(list_info)) cursor_.execute(insert_sql, tuple(list_info))
cnx.commit() cnx_.commit()
# 采集一条资讯记录一条,记录该企业采到了多少的资讯 # 采集一条资讯记录一条,记录该企业采到了多少的资讯
log.info(f'{social_code}----{link}:新增一条') log.info(f'{social_code}----{link}:新增一条')
sel_sql = "select article_id from brpa_source_article where source_address = %s and social_credit_code = %s"
cursor.execute(sel_sql, (link, social_code))
row = cursor.fetchone()
id = row[0]
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# todo:插入一条数据,并传入kafka # todo:插入一条数据,并传入kafka
dic_news = { dic_news = {
'attachmentIds': id, 'attachmentIds': '',
'author': '', 'author': '',
'content': contentText, 'content': content,
'contentWithTag': contentText, 'contentWithTag': contentText,
'createDate': time_now, 'createDate': time_now,
'deleteFlag': '0', 'deleteFlag': '0',
...@@ -222,7 +213,6 @@ def beinWork(tyc_code, social_code,start_time): ...@@ -222,7 +213,6 @@ def beinWork(tyc_code, social_code,start_time):
'year': time_format[:4] 'year': time_format[:4]
} }
except Exception as e: except Exception as e:
log.info(f'传输失败:{social_code}----{link}') log.info(f'传输失败:{social_code}----{link}')
e = '数据库传输失败' e = '数据库传输失败'
state = 0 state = 0
...@@ -237,7 +227,6 @@ def beinWork(tyc_code, social_code,start_time): ...@@ -237,7 +227,6 @@ def beinWork(tyc_code, social_code,start_time):
json.dumps(dic_news, ensure_ascii=False).encode('utf8')) json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10)) print(kafka_result.get(timeout=10))
dic_result = { dic_result = {
'success': 'ture', 'success': 'ture',
'message': '操作成功', 'message': '操作成功',
...@@ -250,7 +239,6 @@ def beinWork(tyc_code, social_code,start_time): ...@@ -250,7 +239,6 @@ def beinWork(tyc_code, social_code,start_time):
baseCore.recordLog(social_code, taskType, state, takeTime, link, '') baseCore.recordLog(social_code, taskType, state, takeTime, link, '')
# return True # return True
except Exception as e: except Exception as e:
dic_result = { dic_result = {
'success': 'false', 'success': 'false',
'message': '操作失败', 'message': '操作失败',
...@@ -269,12 +257,12 @@ def beinWork(tyc_code, social_code,start_time): ...@@ -269,12 +257,12 @@ def beinWork(tyc_code, social_code,start_time):
retData['up_repetCount'] = up_repetCount retData['up_repetCount'] = up_repetCount
return retData return retData
# 日志信息保存至现已创建好数据库中,因此并没有再对此前保存日志信息数据库进行保存 # 日志信息保存至现已创建好数据库中,因此并没有再对此前保存日志信息数据库进行保存
def doJob(): def doJob():
while True: while True:
# 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息 # 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息
social_code = baseCore.redicPullData('NewsEnterprise:gnqy_socialCode') # social_code = baseCore.redicPullData('NewsEnterprise:gnqy_socialCode')
social_code = '912301001275921118'
# 判断 如果Redis中已经没有数据,则等待 # 判断 如果Redis中已经没有数据,则等待
if social_code == None: if social_code == None:
time.sleep(20) time.sleep(20)
...@@ -291,28 +279,31 @@ def doJob(): ...@@ -291,28 +279,31 @@ def doJob():
id = data[0] id = data[0]
xydm = data[2] xydm = data[2]
tycid = data[11] tycid = data[11]
if tycid == None: if tycid == None or tycid == '':
try: try:
retData = getTycIdByXYDM(xydm) retData = getTycIdByXYDM(xydm)
tycid = retData['tycData']['id'] if retData:
#todo:写入数据库 tycid = retData['id']
updateSql = f"update Enterprise set TYCID = '{tycid}' where SocialCode = '{xydm}'" # todo:写入数据库
cursor_.execute(updateSql) updateSql = f"update EnterpriseInfo set TYCID = '{tycid}' where SocialCode = '{xydm}'"
cnx_.commit() cursor_.execute(updateSql)
cnx_.commit()
else:
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', '获取天眼查id失败')
log.info(f'======={social_code}====重新放入redis====')
baseCore.rePutIntoR('NewsEnterprise:gnqy_socialCode', social_code)
continue
except: except:
state = 0 state = 0
takeTime = baseCore.getTimeCost(start, time.time()) takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', '获取天眼查id失败') baseCore.recordLog(social_code, taskType, state, takeTime, '', '获取天眼查id失败')
baseCore.rePutIntoR('NewsEnterprise:gnqy_socialCode',social_code) baseCore.rePutIntoR('NewsEnterprise:gnqy_socialCode', social_code)
continue continue
count = data[17] count = data[17]
log.info(f"{id}---{xydm}----{tycid}----开始处理") log.info(f"{id}---{xydm}----{tycid}----开始处理")
start_time = time.time() start_time = time.time()
# updateBeginSql = f"update ssqy_tyc set update_state=2,date_time=now() where id={id}"
# cursor.execute(updateBeginSql)
# cnx.commit()
# 开始采集企业动态 # 开始采集企业动态
retData = beinWork(tycid, xydm,start_time) retData = beinWork(tycid, xydm,start_time)
# 信息采集完成后将该企业的采集次数更新 # 信息采集完成后将该企业的采集次数更新
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论