提交 07b5b32c 作者: 刘伟刚

Merge remote-tracking branch 'origin/master'

...@@ -228,10 +228,10 @@ class BaseCore: ...@@ -228,10 +228,10 @@ class BaseCore:
__USER_PHONE_AGENT_LIST = ['Mozilla/5.0 (Linux; Android 7.1.1; OPPO R9sk) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.111 Mobile Safari/537.36'] __USER_PHONE_AGENT_LIST = ['Mozilla/5.0 (Linux; Android 7.1.1; OPPO R9sk) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.111 Mobile Safari/537.36']
def __init__(self): def __init__(self):
self.__cnx_proxy = pymysql.connect(host='114.115.159.144', user='root', password='zzsn9988', db='clb_project', self.__cnx_proxy = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='clb_project',
charset='utf8mb4') charset='utf8mb4')
self.__cursor_proxy = self.__cnx_proxy.cursor() self.__cursor_proxy = self.__cnx_proxy.cursor()
self.cnx = pymysql.connect(host='114.115.159.144', user='root', password='zzsn9988', db='caiji', self.cnx = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='caiji',
charset='utf8mb4') charset='utf8mb4')
self.cursor = self.cnx.cursor() self.cursor = self.cnx.cursor()
......
...@@ -171,18 +171,40 @@ def BaseInfoEnterprise_task(): ...@@ -171,18 +171,40 @@ def BaseInfoEnterprise_task():
pass pass
#企业核心人员 #企业核心人员
def CorPerson():
cnx, cursor = connectSql()
gn_query = "select SocialCode from EnterpriseInfo where Place = '1'"
cursor.execute(gn_query)
gn_result = cursor.fetchall()
gn_social_list = [item[0] for item in gn_result]
print('=======')
for item in gn_social_list:
r.rpush('CorPersonEnterprise:gnqy_socialCode', item)
closeSql(cnx, cursor)
#企业核心人员定时任务:
def CorPerson_task():
# 实例化一个调度器
scheduler = BlockingScheduler()
# 每个月执行一次
scheduler.add_job(CorPerson, 'cron', day='1', hour=0, minute=0)
try:
scheduler.start()
except Exception as e:
print('定时采集异常', e)
pass
#东方财富网财务数据 #东方财富网财务数据
def FinanceFromEast(): def FinanceFromEast():
cnx_,cursor_ = cnn11() cnx_,cursor_ = cnn11()
#从上市企业库中读取数据 #从上市企业库中读取数据
sql_sel = '''select social_credit_code from sys_base_enterprise_ipo where category = '1' limit 10 ''' sql_sel = '''select social_credit_code from sys_base_enterprise_ipo where category = '1' '''
cursor_.execute(sql_sel) cursor_.execute(sql_sel)
finance = cursor_.fetchall() finance = cursor_.fetchall()
finance_list = [item[0] for item in finance] finance_list = [item[0] for item in finance]
print('=======') print('=======')
for item in finance_list: for item in finance_list:
r.rpush('FinanceFromEast:finance_socialCode', item) r.rpush('FinanceFromEast:eastfinance_socialCode', item)
close11(cnx_,cursor_) close11(cnx_,cursor_)
#东方财富网财务数据定时任务 #东方财富网财务数据定时任务
...@@ -192,7 +214,6 @@ def FinanceFromEase_task(): ...@@ -192,7 +214,6 @@ def FinanceFromEase_task():
# 每个季度执行一次 # 每个季度执行一次
scheduler.add_job(FinanceFromEast, 'cron', month='1-12/3', day='1',hour=0, minute=0) scheduler.add_job(FinanceFromEast, 'cron', month='1-12/3', day='1',hour=0, minute=0)
try: try:
# 定时开始前执行一次
scheduler.start() scheduler.start()
except Exception as e: except Exception as e:
print('定时采集异常', e) print('定时采集异常', e)
...@@ -250,11 +271,11 @@ def AnnualEnterpriseXueQ_task(): ...@@ -250,11 +271,11 @@ def AnnualEnterpriseXueQ_task():
print('定时采集异常', e) print('定时采集异常', e)
pass pass
#国外企业基本信息 #国外企业基本信息 redis中放入id
def BaseInfoEnterpriseAbroad(): def BaseInfoEnterpriseAbroad():
cnx,cursor = connectSql() cnx,cursor = connectSql()
# 获取国外企业 # 获取国外企业
gn_query = "select SocialCode from EnterpriseInfo where Place = '2' " gn_query = "select id from EnterpriseInfo where Place = '2' "
cursor.execute(gn_query) cursor.execute(gn_query)
gn_result = cursor.fetchall() gn_result = cursor.fetchall()
gn_social_list = [item[0] for item in gn_result] gn_social_list = [item[0] for item in gn_result]
...@@ -280,24 +301,26 @@ def BaseInfoAbroad_task(): ...@@ -280,24 +301,26 @@ def BaseInfoAbroad_task():
def FBS(): def FBS():
cnx,cursor = connectSql() cnx,cursor = connectSql()
# todo:调整为获取福布斯的数据库 # todo:调整为获取福布斯的数据库
# gw_query = "select id from EnterpriseInfo where ext1='fbs2000' and ext2='1' and Place=2" gw_query = "select a.SocialCode from EnterpriseInfo a,EnterpriseType b where a.SocialCode=b.SocialCode and b.type=3 and a.Place=2"
# cursor.execute(gw_query) cursor.execute(gw_query)
# gw_result = cursor.fetchall() gw_result = cursor.fetchall()
#获取国内企业 #获取国内企业
gn_query = "select id from EnterpriseInfo where ext1='fbs2000' and ext2='1' and Place=1" gn_query = "select a.SocialCode from EnterpriseInfo a,EnterpriseType b where a.SocialCode=b.SocialCode and b.type=3 and a.Place=1 "
cursor.execute(gn_query) cursor.execute(gn_query)
gn_result = cursor.fetchall() gn_result = cursor.fetchall()
gn_social_list = [item[0] for item in gn_result] gn_social_list = [item[0] for item in gn_result]
# gw_social_list = [item[0] for item in gw_result] gw_social_list = [item[0] for item in gw_result]
# for item in gw_social_list:
# r.rpush('NewsEnterpriseFbs:gwqy_socialCode', item)
for item in gw_social_list:
r.rpush('NewsEnterpriseFbs:gwqy_socialCode', item)
r.rpush('BaseInfoEnterpriseFbs:gwqy_social_code',item)
for item in gn_social_list: for item in gn_social_list:
if not r.exists(item): if not r.exists(item):
r.rpush('NewsEnterpriseFbs:gnqy_socialCode', item) r.rpush('NewsEnterpriseFbs:gnqy_socialCode', item)
r.rpush('NoticeEnterpriseFbs:gnqy_socialCode',item)
r.rpush('BaseInfoEnterpriseFbs:gnqy_social_code',item)
closeSql(cnx,cursor) closeSql(cnx,cursor)
#将IPO的国外股票代码放到redis中 #将IPO的国外股票代码放到redis中
...@@ -310,7 +333,7 @@ def yahooCodeFromSql(): ...@@ -310,7 +333,7 @@ def yahooCodeFromSql():
gn_social_list = [item[0] for item in gn_result] gn_social_list = [item[0] for item in gn_result]
print('=======') print('=======')
for item in gn_social_list: for item in gn_social_list:
r.rpush('NoticeEnterprise:securities_code', item) r.rpush('FinanceFromEast:yahoo_securities_code', item)
except Exception as e: except Exception as e:
log.info("数据查询异常") log.info("数据查询异常")
finally: finally:
...@@ -337,11 +360,11 @@ if __name__ == "__main__": ...@@ -337,11 +360,11 @@ if __name__ == "__main__":
# NoticeEnterprise() # NoticeEnterprise()
# AnnualEnterpriseIPO() # AnnualEnterpriseIPO()
# AnnualEnterprise() # AnnualEnterprise()
BaseInfoEnterpriseAbroad() # BaseInfoEnterpriseAbroad()
# NewsEnterprise_task() # NewsEnterprise_task()
# NewsEnterprise() # NewsEnterprise()
# BaseInfoEnterprise() # BaseInfoEnterprise()
# FBS() FBS()
# NoticeEnterprise_task() # NoticeEnterprise_task()
# AnnualEnterprise_task() # AnnualEnterprise_task()
# NoticeEnterprise() # NoticeEnterprise()
......
import json import json
import json import json
import time import time
import numpy as np
import pandas as pd
import pymysql
import requests import requests
import sys
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from kafka import KafkaProducer from kafka import KafkaProducer
from NewsYahoo import news sys.path.append(r'F:\zzsn\zzsn_spider\base')
import BaseCore
from base.BaseCore import BaseCore
import urllib3 import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
taskType = '企业基本信息/雅虎财经' taskType = '企业基本信息/雅虎财经'
baseCore = BaseCore() baseCore = BaseCore.BaseCore()
r = baseCore.r r = baseCore.r
log = baseCore.getLogger() log = baseCore.getLogger()
headers = { headers = {
...@@ -38,7 +34,7 @@ headers = { ...@@ -38,7 +34,7 @@ headers = {
# 根据股票代码 获取企业基本信息 高管信息 # 根据股票代码 获取企业基本信息 高管信息
def getInfo(name,enname,gpdm, xydm, start): def getInfo(enname, gpdm, xydm, start):
if 'HK' in str(gpdm): if 'HK' in str(gpdm):
tmp_g = str(gpdm).split('.')[0] tmp_g = str(gpdm).split('.')[0]
if len(tmp_g) == 5: if len(tmp_g) == 5:
...@@ -49,17 +45,9 @@ def getInfo(name,enname,gpdm, xydm, start): ...@@ -49,17 +45,9 @@ def getInfo(name,enname,gpdm, xydm, start):
gpdm_ = gpdm gpdm_ = gpdm
retData = {} retData = {}
retData['base_info'] = { retData['base_info'] = {
'公司名称': name, '公司名称': enname,
'英文名': enname, '英文名': enname,
'信用代码': xydm, '信用代码': xydm,
'股票代码': gpdm,
'地址': '',
'电话': '',
'公司网站': '',
'部门': '',
'行业': '',
'员工人数': '',
'公司简介': ''
} }
retData['people_info'] = [] retData['people_info'] = []
# https://finance.yahoo.com/quote/VOW3.DE/profile?p=VOW3.DE # https://finance.yahoo.com/quote/VOW3.DE/profile?p=VOW3.DE
...@@ -76,22 +64,36 @@ def getInfo(name,enname,gpdm, xydm, start): ...@@ -76,22 +64,36 @@ def getInfo(name,enname,gpdm, xydm, start):
log.error(f"{gpdm}---第{i}次---获取基本信息接口返回失败:{response.status_code}") log.error(f"{gpdm}---第{i}次---获取基本信息接口返回失败:{response.status_code}")
except: except:
continue continue
try:
if (response.status_code == 200): if 'lookup' in response.url:
pass log.error(f"{gpdm}------股票代码错误:{response.status_code}")
else: exeception = '股票代码错误'
state = 1
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(xydm, taskType, 0, takeTime, url, exeception)
return [state, retData]
elif response.status_code != 200:
log.error(f"{gpdm}------获取基本信息接口重试后依然失败失败:{response.status_code}")
exeception = '获取基本信息接口返回失败'
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, url, exeception)
baseCore.rePutIntoR('BaseInfoEnterprise:gwqy_socialCode', xydm)
return [state, retData]
except:
log.error(f"{gpdm}------获取基本信息接口重试后依然失败失败:{response.status_code}") log.error(f"{gpdm}------获取基本信息接口重试后依然失败失败:{response.status_code}")
exeception = '获取基本信息接口返回失败' exeception = '获取基本信息接口返回失败'
state = 0 state = 0
takeTime = baseCore.getTimeCost(start, time.time()) takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, url, exeception) baseCore.recordLog(xydm, taskType, state, takeTime, url, exeception)
rePutIntoR('') baseCore.rePutIntoR('BaseInfoEnterprise:gwqy_socialCode', xydm)
return [state,retData] return [state, retData]
state = 1 state = 1
soup = BeautifulSoup(response.content, 'html.parser') soup = BeautifulSoup(response.content, 'html.parser')
page = soup.find('div', {'id': 'Col1-0-Profile-Proxy'}) page = soup.find('div', {'id': 'Col1-0-Profile-Proxy'})
name = page.find('h3',{'class':'Fz(m) Mb(10px)'}).text name = page.find('h3', {'class': 'Fz(m) Mb(10px)'}).text
try: try:
com_info = page.find('div', {'class': 'Mb(25px)'}) com_info = page.find('div', {'class': 'Mb(25px)'})
except: except:
...@@ -126,7 +128,7 @@ def getInfo(name,enname,gpdm, xydm, start): ...@@ -126,7 +128,7 @@ def getInfo(name,enname,gpdm, xydm, start):
com_jianjie = '' com_jianjie = ''
dic_com_info = { dic_com_info = {
'公司名称': name, '公司名称': name,
'英文名': enname, '英文名': name,
'信用代码': xydm, '信用代码': xydm,
'股票代码': gpdm, '股票代码': gpdm,
'地址': com_address, '地址': com_address,
...@@ -189,24 +191,31 @@ def getInfo(name,enname,gpdm, xydm, start): ...@@ -189,24 +191,31 @@ def getInfo(name,enname,gpdm, xydm, start):
retData['people_info'] = retPeople retData['people_info'] = retPeople
log.info(f"获取基本信息--{gpdm},耗时{baseCore.getTimeCost(start, time.time())}") log.info(f"获取基本信息--{gpdm},耗时{baseCore.getTimeCost(start, time.time())}")
response.close() response.close()
return [state,retData] return [state, retData]
# 保存基本信息 # 保存基本信息
def saveBaseInfo(info,start): def saveBaseInfo(info, start):
# 基本信息发送到kafka # 基本信息发送到kafka
company_dict = { try:
'name': info['base_info']['公司名称'], # 企业名称 company_dict = {
'shortName': '', # 企业简称 'name': info['base_info']['公司名称'], # 企业名称
'socialCreditCode': info['base_info']['信用代码'], # 统一社会信用代码 'shortName': '', # 企业简称
'officialPhone': info['base_info']['电话'], # 电话 'socialCreditCode': info['base_info']['信用代码'], # 统一社会信用代码
'officialUrl': info['base_info']['公司网站'], # 官网 'officialPhone': info['base_info']['电话'], # 电话
'briefInfo': info['base_info']['公司简介'], # 简介 'officialUrl': info['base_info']['公司网站'], # 官网
'industry': info['base_info']['行业'], # 所属行业 'briefInfo': info['base_info']['公司简介'], # 简介
'englishName': info['base_info']['英文名'], # 英文名 'industry': info['base_info']['行业'], # 所属行业
'address': info['base_info']['地址'], # 地址 'englishName': info['base_info']['英文名'], # 英文名
'status': 0, # 状态 'address': info['base_info']['地址'], # 地址
} 'status': 0, # 状态
}
except:
company_dict = {
'name': info['base_info']['公司名称'], # 企业名称
'socialCreditCode': info['base_info']['信用代码'], # 统一社会信用代码
'englishName': info['base_info']['英文名'], # 英文名
}
# print(company_dict) # print(company_dict)
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 0, 2)) producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 0, 2))
kafka_result = producer.send("regionInfo", json.dumps(company_dict, ensure_ascii=False).encode('utf8')) kafka_result = producer.send("regionInfo", json.dumps(company_dict, ensure_ascii=False).encode('utf8'))
...@@ -216,7 +225,7 @@ def saveBaseInfo(info,start): ...@@ -216,7 +225,7 @@ def saveBaseInfo(info,start):
# 保存高管信息 # 保存高管信息
def savePeopleInfo(info,start): def savePeopleInfo(info, start):
# 高管信息调用接口 # 高管信息调用接口
list_people = info['people_info'] list_people = info['people_info']
list_one_info = [] list_one_info = []
...@@ -240,6 +249,7 @@ def savePeopleInfo(info,start): ...@@ -240,6 +249,7 @@ def savePeopleInfo(info,start):
json_updata = json.dumps(list_one_info) json_updata = json.dumps(list_one_info)
# print(json_updata) # print(json_updata)
if json_updata == '[]': if json_updata == '[]':
log.info("没有高管")
pass pass
else: else:
for i in range(0, 3): for i in range(0, 3):
...@@ -274,18 +284,6 @@ def savePeopleInfo(info,start): ...@@ -274,18 +284,6 @@ def savePeopleInfo(info,start):
return state return state
def rePutIntoR(item):
r.rpush('BaseInfoEnterprise:gwqy_socialCode', item)
# def getInfomation(social_code):
# sql = f"SELECT * FROM EnterpriseInfo WHERE SocialCode = '{social_code}'"
# cursor.execute(sql)
# data = cursor.fetchone()
# return data
# 采集工作 # 采集工作
def beginWork(): def beginWork():
while True: while True:
...@@ -298,65 +296,66 @@ def beginWork(): ...@@ -298,65 +296,66 @@ def beginWork():
continue continue
# 数据库中获取基本信息 # 数据库中获取基本信息
data = baseCore.getInfomation(social_code) data = baseCore.getInfomation(social_code)
name = data[1]
enname = data[5] enname = data[5]
gpdm = data[3] gpdm = '0123'
xydm = data[2] xydm = data[2]
# 获取该企业对应项目的采集次数 # 获取该企业对应项目的采集次数
count = data[13] count = data[13]
start_time = time.time() start_time = time.time()
# 股票代码为空跳过 # 股票代码为空跳过
if gpdm is None: if gpdm == '':
log.error(f"{name}--股票代码为空 跳过") info = {"base_info": {'公司名称': enname,'英文名': enname,'信用代码': xydm, }}
exception = '股票代码为空' log.error(f'{xydm}....股票代码为空')
state = 0 try:
takeTime = baseCore.getTimeCost(start_time, time.time()) saveBaseInfo(info, start_time)
baseCore.recordLog(xydm, taskType, state, takeTime, '', exception) except:
continue log.error(f'{enname}....企业基本信息Kafka操作失败')
try: exception = 'Kafka操作失败'
retData = getInfo(name,enname,gpdm, xydm, start_time) state = 0
# 基本信息采集成功 进行数据入库,否则不入库 takeTime = baseCore.getTimeCost(start_time, time.time())
if retData[0] == 1: baseCore.recordLog(xydm, taskType, state, takeTime, '', exception)
# 企业基本信息入库 else:
try: try:
saveBaseInfo(retData[1],start_time) retData = getInfo(enname, gpdm, xydm, start_time)
except: # 基本信息采集成功 进行数据入库,否则不入库
log.error(f'{name}....企业基本信息Kafka操作失败') if retData[0] == 1:
exception = 'Kafka操作失败' # 企业基本信息入库
state = 0 try:
takeTime = baseCore.getTimeCost(start_time, time.time()) saveBaseInfo(retData[1], start_time)
baseCore.recordLog(xydm, taskType, state, takeTime, '', exception)
# 企业高管信息入库 except:
state = savePeopleInfo(retData[1],start_time) log.error(f'{enname}....企业基本信息Kafka操作失败')
# 只有企业高管信息和企业基本信息都采集到,该企业才算采集成功 exception = 'Kafka操作失败'
if state == 1: state = 0
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, '', '') baseCore.recordLog(xydm, taskType, state, takeTime, '', exception)
# 企业高管信息入库
state = savePeopleInfo(retData[1], start_time)
# 只有企业高管信息和企业基本信息都采集到,该企业才算采集成功
if state == 1:
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, '', '')
else:
pass
else: else:
pass pass
else: except Exception as e:
pass # 若出现尚未发现的错误,则保存错误信息以及出错位置
except Exception as e: ee = e.__traceback__.tb_lineno
# 若出现尚未发现的错误,则保存错误信息以及出错位置 log.error(f'{enname}...{xydm}...{gpdm}.....数据采集失败,原因:{ee}行 {e}')
ee = e.__traceback__.tb_lineno state = 0
log.error(f'{name}...{xydm}...{gpdm}.....数据采集失败,原因:{ee}行 {e}') takeTime = baseCore.getTimeCost(start_time, time.time())
state = 0 baseCore.recordLog(xydm, taskType, state, takeTime, '', f'数据采集失败,原因:{ee}行 {e}')
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, '', f'数据采集失败,原因:{ee}行 {e}')
# 企业数据采集完成,采集次数加一 # 企业数据采集完成,采集次数加一
count += 1 count += 1
runType = 'BaseInfoRunCount' runType = 'BaseInfoRunCount'
baseCore.updateRun(social_code,runType,count) baseCore.updateRun(social_code, runType, count)
# 释放资源 # 释放资源
baseCore.close() baseCore.close()
if __name__ == '__main__': if __name__ == '__main__':
cnx = pymysql.connect(host='114.115.159.144', user='root', password='zzsn9988', db='caiji',charset='utf8mb4')
cursor = cnx.cursor()
beginWork() beginWork()
cursor.close()
cnx.close()
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论