提交 1de3a9f5 作者: 薛凌堃

Merge remote-tracking branch 'origin/master'

# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
......@@ -162,12 +162,12 @@ def scroll(xydm,name,gpdm):
log.error(f"{name}--{gpdm}--获取不到最后一条链接")
break
# todo:增量时 需打开注释
# try:
# selects = selectUrl(last_url_,xydm)
# except:
# break
# if selects:
# break
try:
selects = selectUrl(last_url_,xydm)
except:
break
if selects:
break
if last_url_ == last_url:
break
last_url_ = last_url
......@@ -178,7 +178,7 @@ def rePutIntoR(item):
if __name__ == "__main__":
path = r'D:\chrome\chromedriver.exe'
path = r'F:\spider\1\chromedriver.exe'
driver = baseCore.buildDriver(path)
cnx = baseCore.cnx
cursor = baseCore.cursor
......@@ -186,7 +186,7 @@ if __name__ == "__main__":
while True:
# 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息
social_code = baseCore.redicPullData('NewsEnterpriseFbs:gwqy_socialCode')
social_code = baseCore.redicPullData('NewsEnterprise:gwqy_socialCode')
# social_code = 'ZZSN22080900000046'
# 判断 如果Redis中已经没有数据,则等待
......@@ -207,6 +207,8 @@ if __name__ == "__main__":
gpdm = str(gpdm)[1:]
else:
pass
elif str(gpdm)[-2:] == '.N' or str(gpdm)[-2:] == '.O':
gpdm = gpdm[:-2]
xydm = data[2]
# 获取该企业对应项目的采集次数
......@@ -280,9 +282,9 @@ if __name__ == "__main__":
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, news_url, exception)
# 增量使用
# break
break
# 全量使用
continue
# continue
title = a_ele.text.lstrip().strip().replace("'", "''")
exception = getZx(xydm, news_url, title, cnx, path)
if exception == '':
......
import json
import json
......@@ -4,8 +4,9 @@ import requests
import sys
from bs4 import BeautifulSoup
from kafka import KafkaProducer
sys.path.append(r'F:\zzsn\zzsn_spider\base')
import BaseCore
# sys.path.append(r'F:\zzsn\zzsn_spider\base')
# import BaseCore
from base import BaseCore
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
......@@ -20,7 +21,6 @@ headers = {
'accept-encoding': 'gzip, deflate, br',
'accept-language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7',
'cache-control': 'max-age=0',
# 'cookie': 'maex=%7B%22v2%22%3A%7B%7D%7D; GUC=AQEBBwFjY49jkEIa8gQo&s=AQAAABw20C7P&g=Y2JIFQ; A1=d=AQABBBIpnmICEOnPTXZVmK6DESXgxq3niTMFEgEBBwGPY2OQYysNb2UB_eMBAAcIEimeYq3niTM&S=AQAAAobGawhriFKqJdu9-rSz9nc; A3=d=AQABBBIpnmICEOnPTXZVmK6DESXgxq3niTMFEgEBBwGPY2OQYysNb2UB_eMBAAcIEimeYq3niTM&S=AQAAAobGawhriFKqJdu9-rSz9nc; A1S=d=AQABBBIpnmICEOnPTXZVmK6DESXgxq3niTMFEgEBBwGPY2OQYysNb2UB_eMBAAcIEimeYq3niTM&S=AQAAAobGawhriFKqJdu9-rSz9nc&j=WORLD; PRF=t%3D6954.T%252BTEL%252BSOLB.BR%252BSTM%252BEMR%252BGT%252BAMD%252BSYM.DE%252BPEMEX%252BSGO.PA%252BLRLCF%252BSYNH%252B001040.KS; cmp=t=1669714927&j=0&u=1---',
'sec-ch-ua': '"Chromium";v="106", "Google Chrome";v="106", "Not;A=Brand";v="99"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': "Windows",
......@@ -41,6 +41,8 @@ def getInfo(enname, gpdm, xydm, start):
gpdm_ = str(gpdm)[1:]
else:
pass
elif str(gpdm)[-2:] == '.N' or str(gpdm)[-2:] == '.O':
gpdm_ = gpdm[:-2]
else:
gpdm_ = gpdm
retData = {}
......@@ -50,7 +52,6 @@ def getInfo(enname, gpdm, xydm, start):
'信用代码': xydm,
}
retData['people_info'] = []
# https://finance.yahoo.com/quote/VOW3.DE/profile?p=VOW3.DE
url = f'https://finance.yahoo.com/quote/{gpdm_}/profile?p={gpdm_}'
time.sleep(3)
......@@ -78,7 +79,7 @@ def getInfo(enname, gpdm, xydm, start):
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, url, exeception)
baseCore.rePutIntoR('BaseInfoEnterprise:gwqy_socialCode', xydm)
baseCore.rePutIntoR('BaseInfoEnterpriseFbs:gwqy_social_code', xydm)
return [state, retData]
except:
log.error(f"{gpdm}------获取基本信息接口重试后依然失败失败:{response.status_code}")
......@@ -86,7 +87,7 @@ def getInfo(enname, gpdm, xydm, start):
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, url, exeception)
baseCore.rePutIntoR('BaseInfoEnterprise:gwqy_socialCode', xydm)
baseCore.rePutIntoR('BaseInfoEnterpriseFbs:gwqy_social_code', xydm)
return [state, retData]
state = 1
......@@ -216,7 +217,6 @@ def saveBaseInfo(info, start):
'socialCreditCode': info['base_info']['信用代码'], # 统一社会信用代码
'englishName': info['base_info']['英文名'], # 英文名
}
# print(company_dict)
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 0, 2))
kafka_result = producer.send("regionInfo", json.dumps(company_dict, ensure_ascii=False).encode('utf8'))
kafka_result.get(timeout=10)
......@@ -247,7 +247,6 @@ def savePeopleInfo(info, start):
}
list_one_info.append(dic_json)
json_updata = json.dumps(list_one_info)
# print(json_updata)
if json_updata == '[]':
log.info("没有高管")
pass
......@@ -265,14 +264,14 @@ def savePeopleInfo(info, start):
if (retJson['success'] or retJson['success'] == 'true'):
pass
else:
log.error("保存高管接口失败---{retJson}")
log.error(f"保存高管接口失败---{retJson}")
exception = '保存高管接口失败'
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(dic_json['socialCreditCode'], taskType, state, takeTime, '', exception)
return state
else:
log.error("保存高管接口失败---{response.status_code}")
log.error(f"保存高管接口失败---{response.status_code}")
exception = '保存高管接口失败'
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
......@@ -288,6 +287,7 @@ def savePeopleInfo(info, start):
def beginWork():
while True:
social_code = baseCore.redicPullData('BaseInfoEnterprise:gwqy_socialCode')
# social_code = 'ZZSN230824151229535'
if not social_code:
time.sleep(20)
continue
......@@ -297,7 +297,7 @@ def beginWork():
# 数据库中获取基本信息
data = baseCore.getInfomation(social_code)
enname = data[5]
gpdm = '0123'
gpdm = data[3]
xydm = data[2]
# 获取该企业对应项目的采集次数
......@@ -305,7 +305,7 @@ def beginWork():
start_time = time.time()
# 股票代码为空跳过
if gpdm == '':
info = {"base_info": {'公司名称': enname,'英文名': enname,'信用代码': xydm, }}
info = {"base_info": {'公司名称': enname, '英文名': enname, '信用代码': xydm, }}
log.error(f'{xydm}....股票代码为空')
try:
saveBaseInfo(info, start_time)
......@@ -323,7 +323,7 @@ def beginWork():
# 企业基本信息入库
try:
saveBaseInfo(retData[1], start_time)
time.sleep(1)
except:
log.error(f'{enname}....企业基本信息Kafka操作失败')
exception = 'Kafka操作失败'
......@@ -332,6 +332,7 @@ def beginWork():
baseCore.recordLog(xydm, taskType, state, takeTime, '', exception)
# 企业高管信息入库
state = savePeopleInfo(retData[1], start_time)
time.sleep(1)
# 只有企业高管信息和企业基本信息都采集到,该企业才算采集成功
if state == 1:
takeTime = baseCore.getTimeCost(start_time, time.time())
......@@ -342,16 +343,26 @@ def beginWork():
pass
except Exception as e:
# 若出现尚未发现的错误,则保存错误信息以及出错位置
info = {"base_info": {'公司名称': enname,'英文名': enname,'信用代码': xydm, }}
try:
saveBaseInfo(info, start_time)
log.info(f'{enname}.....股票代码出错只保存基本信息')
except:
log.error(f'{enname}....企业基本信息Kafka操作失败')
exception = 'Kafka操作失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, '', exception)
ee = e.__traceback__.tb_lineno
log.error(f'{enname}...{xydm}...{gpdm}.....数据采集失败,原因:{ee}行 {e}')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, '', f'数据采集失败,原因:{ee}行 {e}')
# 企业数据采集完成,采集次数加一
count += 1
runType = 'BaseInfoRunCount'
baseCore.updateRun(social_code, runType, count)
# 企业数据采集完成,采集次数加一
count += 1
runType = 'BaseInfoRunCount'
baseCore.updateRun(social_code, runType, count)
# 释放资源
baseCore.close()
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论