提交 eaa6815d 作者: LiuLiYuan

雅虎企业动态

上级 623b6803
import json
import json
import json
import time
import numpy as np
import pandas as pd
import pymysql
import requests
import sys
from bs4 import BeautifulSoup
from kafka import KafkaProducer
from NewsYahoo import news
from base.BaseCore import BaseCore
sys.path.append(r'F:\zzsn\zzsn_spider\base')
import BaseCore
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
taskType = '企业基本信息/雅虎财经'
baseCore = BaseCore()
baseCore = BaseCore.BaseCore()
r = baseCore.r
log = baseCore.getLogger()
headers = {
......@@ -38,7 +34,7 @@ headers = {
# 根据股票代码 获取企业基本信息 高管信息
def getInfo(name,enname,gpdm, xydm, start):
def getInfo(enname, gpdm, xydm, start):
if 'HK' in str(gpdm):
tmp_g = str(gpdm).split('.')[0]
if len(tmp_g) == 5:
......@@ -49,17 +45,9 @@ def getInfo(name,enname,gpdm, xydm, start):
gpdm_ = gpdm
retData = {}
retData['base_info'] = {
'公司名称': name,
'公司名称': enname,
'英文名': enname,
'信用代码': xydm,
'股票代码': gpdm,
'地址': '',
'电话': '',
'公司网站': '',
'部门': '',
'行业': '',
'员工人数': '',
'公司简介': ''
}
retData['people_info'] = []
# https://finance.yahoo.com/quote/VOW3.DE/profile?p=VOW3.DE
......@@ -76,22 +64,36 @@ def getInfo(name,enname,gpdm, xydm, start):
log.error(f"{gpdm}---第{i}次---获取基本信息接口返回失败:{response.status_code}")
except:
continue
if (response.status_code == 200):
pass
else:
try:
if 'lookup' in response.url:
log.error(f"{gpdm}------股票代码错误:{response.status_code}")
exeception = '股票代码错误'
state = 1
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(xydm, taskType, 0, takeTime, url, exeception)
return [state, retData]
elif response.status_code != 200:
log.error(f"{gpdm}------获取基本信息接口重试后依然失败失败:{response.status_code}")
exeception = '获取基本信息接口返回失败'
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, url, exeception)
baseCore.rePutIntoR('BaseInfoEnterprise:gwqy_socialCode', xydm)
return [state, retData]
except:
log.error(f"{gpdm}------获取基本信息接口重试后依然失败失败:{response.status_code}")
exeception = '获取基本信息接口返回失败'
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, url, exeception)
rePutIntoR('')
return [state,retData]
baseCore.rePutIntoR('BaseInfoEnterprise:gwqy_socialCode', xydm)
return [state, retData]
state = 1
soup = BeautifulSoup(response.content, 'html.parser')
page = soup.find('div', {'id': 'Col1-0-Profile-Proxy'})
name = page.find('h3',{'class':'Fz(m) Mb(10px)'}).text
name = page.find('h3', {'class': 'Fz(m) Mb(10px)'}).text
try:
com_info = page.find('div', {'class': 'Mb(25px)'})
except:
......@@ -126,7 +128,7 @@ def getInfo(name,enname,gpdm, xydm, start):
com_jianjie = ''
dic_com_info = {
'公司名称': name,
'英文名': enname,
'英文名': name,
'信用代码': xydm,
'股票代码': gpdm,
'地址': com_address,
......@@ -189,24 +191,31 @@ def getInfo(name,enname,gpdm, xydm, start):
retData['people_info'] = retPeople
log.info(f"获取基本信息--{gpdm},耗时{baseCore.getTimeCost(start, time.time())}")
response.close()
return [state,retData]
return [state, retData]
# 保存基本信息
def saveBaseInfo(info,start):
def saveBaseInfo(info, start):
# 基本信息发送到kafka
company_dict = {
'name': info['base_info']['公司名称'], # 企业名称
'shortName': '', # 企业简称
'socialCreditCode': info['base_info']['信用代码'], # 统一社会信用代码
'officialPhone': info['base_info']['电话'], # 电话
'officialUrl': info['base_info']['公司网站'], # 官网
'briefInfo': info['base_info']['公司简介'], # 简介
'industry': info['base_info']['行业'], # 所属行业
'englishName': info['base_info']['英文名'], # 英文名
'address': info['base_info']['地址'], # 地址
'status': 0, # 状态
}
try:
company_dict = {
'name': info['base_info']['公司名称'], # 企业名称
'shortName': '', # 企业简称
'socialCreditCode': info['base_info']['信用代码'], # 统一社会信用代码
'officialPhone': info['base_info']['电话'], # 电话
'officialUrl': info['base_info']['公司网站'], # 官网
'briefInfo': info['base_info']['公司简介'], # 简介
'industry': info['base_info']['行业'], # 所属行业
'englishName': info['base_info']['英文名'], # 英文名
'address': info['base_info']['地址'], # 地址
'status': 0, # 状态
}
except:
company_dict = {
'name': info['base_info']['公司名称'], # 企业名称
'socialCreditCode': info['base_info']['信用代码'], # 统一社会信用代码
'englishName': info['base_info']['英文名'], # 英文名
}
# print(company_dict)
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 0, 2))
kafka_result = producer.send("regionInfo", json.dumps(company_dict, ensure_ascii=False).encode('utf8'))
......@@ -216,7 +225,7 @@ def saveBaseInfo(info,start):
# 保存高管信息
def savePeopleInfo(info,start):
def savePeopleInfo(info, start):
# 高管信息调用接口
list_people = info['people_info']
list_one_info = []
......@@ -240,6 +249,7 @@ def savePeopleInfo(info,start):
json_updata = json.dumps(list_one_info)
# print(json_updata)
if json_updata == '[]':
log.info("没有高管")
pass
else:
for i in range(0, 3):
......@@ -274,18 +284,6 @@ def savePeopleInfo(info,start):
return state
def rePutIntoR(item):
r.rpush('BaseInfoEnterprise:gwqy_socialCode', item)
# def getInfomation(social_code):
# sql = f"SELECT * FROM EnterpriseInfo WHERE SocialCode = '{social_code}'"
# cursor.execute(sql)
# data = cursor.fetchone()
# return data
# 采集工作
def beginWork():
while True:
......@@ -298,65 +296,66 @@ def beginWork():
continue
# 数据库中获取基本信息
data = baseCore.getInfomation(social_code)
name = data[1]
enname = data[5]
gpdm = data[3]
gpdm = '0123'
xydm = data[2]
# 获取该企业对应项目的采集次数
count = data[13]
start_time = time.time()
# 股票代码为空跳过
if gpdm is None:
log.error(f"{name}--股票代码为空 跳过")
exception = '股票代码为空'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, '', exception)
continue
try:
retData = getInfo(name,enname,gpdm, xydm, start_time)
# 基本信息采集成功 进行数据入库,否则不入库
if retData[0] == 1:
# 企业基本信息入库
try:
saveBaseInfo(retData[1],start_time)
except:
log.error(f'{name}....企业基本信息Kafka操作失败')
exception = 'Kafka操作失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, '', exception)
# 企业高管信息入库
state = savePeopleInfo(retData[1],start_time)
# 只有企业高管信息和企业基本信息都采集到,该企业才算采集成功
if state == 1:
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, '', '')
if gpdm == '':
info = {"base_info": {'公司名称': enname,'英文名': enname,'信用代码': xydm, }}
log.error(f'{xydm}....股票代码为空')
try:
saveBaseInfo(info, start_time)
except:
log.error(f'{enname}....企业基本信息Kafka操作失败')
exception = 'Kafka操作失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, '', exception)
else:
try:
retData = getInfo(enname, gpdm, xydm, start_time)
# 基本信息采集成功 进行数据入库,否则不入库
if retData[0] == 1:
# 企业基本信息入库
try:
saveBaseInfo(retData[1], start_time)
except:
log.error(f'{enname}....企业基本信息Kafka操作失败')
exception = 'Kafka操作失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, '', exception)
# 企业高管信息入库
state = savePeopleInfo(retData[1], start_time)
# 只有企业高管信息和企业基本信息都采集到,该企业才算采集成功
if state == 1:
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, '', '')
else:
pass
else:
pass
else:
pass
except Exception as e:
# 若出现尚未发现的错误,则保存错误信息以及出错位置
ee = e.__traceback__.tb_lineno
log.error(f'{name}...{xydm}...{gpdm}.....数据采集失败,原因:{ee}行 {e}')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, '', f'数据采集失败,原因:{ee}行 {e}')
except Exception as e:
# 若出现尚未发现的错误,则保存错误信息以及出错位置
ee = e.__traceback__.tb_lineno
log.error(f'{enname}...{xydm}...{gpdm}.....数据采集失败,原因:{ee}行 {e}')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, '', f'数据采集失败,原因:{ee}行 {e}')
# 企业数据采集完成,采集次数加一
count += 1
runType = 'BaseInfoRunCount'
baseCore.updateRun(social_code,runType,count)
baseCore.updateRun(social_code, runType, count)
# 释放资源
baseCore.close()
if __name__ == '__main__':
cnx = pymysql.connect(host='114.115.159.144', user='root', password='zzsn9988', db='caiji',charset='utf8mb4')
cursor = cnx.cursor()
beginWork()
cursor.close()
cnx.close()
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论