提交 265f7dd1 作者: Xiancai

外国企业动态自动化

上级 f7c06cc2
import os
import random
import socket
import sys
import time
import logbook
import logbook.more
# 核心工具包
import pymysql
from selenium import webdriver
# 注意 程序退出前 调用BaseCore.close() 关闭相关资源
from selenium.webdriver.chrome.service import Service
class BaseCore:
# 序列号
__seq = 0
# 代理池 数据库连接
__cnx_proxy =None
__cursor_proxy = None
# 基本信息 数据库连接
__cnx_infomation = None
__cursor_infomation = None
# agent 池
__USER_AGENT_LIST = [
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.90 Safari/537.36',
......@@ -211,12 +220,16 @@ class BaseCore:
try:
self.__cursor_proxy.close()
self.__cnx_proxy.close()
self.__cursor_infomation.close()
self.__cnx_infomation.close()
except :
pass
def __init__(self):
self.__cnx_proxy = pymysql.connect(host='114.115.159.144', user='root', password='zzsn9988', db='clb_project',
charset='utf8mb4')
charset='utf8mb4')
self.__cursor_proxy= self.__cnx_proxy.cursor()
self.__cnx_infomation = pymysql.connect(host='114.115.159.144', user='root', password='zzsn9988', db='caiji', charset='utf8mb4')
self.__cursor_infomation = self.__cnx_infomation.cursor()
pass
# 计算耗时
......@@ -339,4 +352,54 @@ class BaseCore:
pass
else:
str = str[0:end+1]
return str
\ No newline at end of file
return str
# 获得脚本进程PID
def getPID(self):
PID = os.getpid()
return PID
# 获取本机IP
def getIP(self):
IP = socket.gethostbyname(socket.gethostname())
return IP
# 生成模拟浏览器 必须传入值为googledriver位置信息
# headless用于决定是否为无头浏览器,初始默认为无头浏览器
# 正常浏览器可用于开始对页面解析使用或一些网站无头时无法正常采集
# 无头浏览器用于后续对信息采集时不会有浏览器一直弹出,
def buildDriver(self,path,headless=True):
service = Service(path)
chrome_options = webdriver.ChromeOptions()
if headless:
chrome_options.add_argument('--headless')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_experimental_option(
"excludeSwitches", ["enable-automation"])
chrome_options.add_experimental_option('useAutomationExtension', False)
chrome_options.add_argument('lang=zh-CN,zh,zh-TW,en-US,en')
chrome_options.add_argument(
'user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36')
driver = webdriver.Chrome(chrome_options=chrome_options, service=service)
return driver
def getInfomation(self,social_code):
sql = f"SELECT * FROM EnterpriseInfo WHERE SocialCode = '{social_code}'"
self.__cursor_infomation.execute(sql)
data = self.__cursor_infomation.fetchone()
return data
def updateRun(self,social_code,runType,count):
sql_update = f"UPDATE EnterpriseInfo SET {runType} = {count} WHERE SocialCode = '{social_code}'"
self.__cursor_infomation.excute(sql_update)
self.__cnx_infomation.commit()
def recordLog(self,xydm,taskType,state,takeTime,url,e):
createTime = self.getNowTime(1)
ip = self.getIP()
pid = self.getPID()
sql = "INSERT INTO LogTable(SocialCode,TaskType,state,TakeTime,url,CreateTime,ProcessIp,PID,Exception) VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s)"
values = [xydm,taskType,state,takeTime,url,createTime,ip,pid,e]
self.__cursor_infomation.excute(sql,values)
self.__cnx_infomation.commit()
\ No newline at end of file
# 雅虎财经企业动态获取
# 雅虎财经企业动态获取
......@@ -17,24 +17,14 @@ log= BaseCore.getLogger()
def getZx(xydm,url,title,cnx):
start_time_content= time.time()
try:
chrome_options_content = webdriver.ChromeOptions()
chrome_options_content.add_argument('--disable-gpu')
chrome_options_content.add_argument('--ignore-certificate-errors')
chrome_options_content.add_experimental_option('excludeSwitches', ['enable-automation'])
chrome_options_content.add_argument("--disable-blink-features=AutomationControlled")
chrome_options_content.add_argument("--start-maximized")
prefs_content = {'profile.managed_default_content_settings.images': 2}
chrome_options_content.add_experimental_option('prefs', prefs_content)
chrome_options_content.add_argument('--headless')
executable_path = r'E:\chromedriver_win32\chromedriver.exe'
driverContent = webdriver.Chrome(options=chrome_options_content, executable_path=executable_path)
path = r'E:\chromedriver_win32\chromedriver.exe'
driverContent = baseCore.buildDriver(path)
driverContent.get(url)
try:
clickButton = driverContent.find_element(By.CLASS_NAME,"collapse-button")
clickButton.click()
except Exception as e:
pass
pass
time.sleep(0.5)
authorElement = driverContent.find_element(By.CLASS_NAME,"caas-author-byline-collapse")
......@@ -71,25 +61,23 @@ def getZx(xydm,url,title,cnx):
except Exception as e1:
log.error("保存数据库失败")
e1 = str(e1) + '.........保存数据库失败'
return e1
log.info(f"文章耗时,耗时{baseCore.getTimeCost(start_time_content,time.time())}")
except Exception as e:
log.error("获取正文失败")
e = str(e)+'.........获取正文失败'
return e
return ''
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--ignore-certificate-errors')
chrome_options.add_experimental_option('excludeSwitches', ['enable-automation'])
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
chrome_options.add_argument("--start-maximized")
prefs = {'profile.managed_default_content_settings.images': 2}
chrome_options.add_experimental_option('prefs',prefs)
chrome_options.add_argument('--headless')
executable_path = r'E:\chromedriver_win32\chromedriver.exe'
driver = webdriver.Chrome(options=chrome_options, executable_path=executable_path)
path = r'E:\chromedriver_win32\chromedriver.exe'
driver = baseCore.buildDriver(path)
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
# 拖拽30次获取企业新闻
def scroll(driver):
for i in range(0,30):
#js = "window.scrollTo(0,document.body.scrollHeight)"
......@@ -99,46 +87,52 @@ def scroll(driver):
#读取excel数据
df_all = pd.read_excel(r'./../data/2023年500强新上榜名单.xlsx', sheet_name='500强23年国外', keep_default_na=False)
for num in range(len(df_all)):
while True:
# 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息
social_code = ''
# 判断 如果Redis中已经没有数据,则等待
if social_code == '':
time.sleep(20)
continue
data = baseCore.getInfomation(social_code)
name = data[1]
enname = data[5]
gpdm = data[3]
xydm = data[2]
# 获取该企业对应项目的采集次数
count = data[17]
start_time = time.time()
# country = df_all['国别'][num]
# if(country!='国外'):
# continue
enname=df_all['英文名称'][num]
gpdm = df_all['股票票代码'][num]
xydm = df_all['信用代码'][num]
if(gpdm==''):
log.error(f"{num}--{gpdm}--股票代码为空 跳过")
continue
log.error(f"{name}--股票代码为空 跳过")
if (xydm == ''):
log.error(f"{num}--{gpdm}--信用代码为空 跳过")
continue
count = int(df_all['企业动态数量(7.15)'][num])
# if(count>0):
# log.error(f"{num}--{gpdm}--动态大于0 跳过")
# continue
#https://finance.yahoo.com/quote/GOOG/press-releases?p=GOOG
# def news(i,gpdm):
log.error(f"{name}--信用代码为空 跳过")
url=f"https://finance.yahoo.com/quote/{gpdm}/press-releases?p={gpdm}"
driver.get(url)
scroll(driver)
# if True:
# continue
try:
news_div = driver.find_element(By.ID, 'summaryPressStream-0-Stream')
except Exception as e:
log.error(f"{num}--{gpdm}--没找到新闻元素")
log.error(f"{name}--{gpdm}--没找到新闻元素")
e = str(e) + '.......没找到新闻元素'
taskType = '企业动态'
state = 0
takeTime = baseCore.getTimeCost(start_time,time.time())
baseCore.recordLog(xydm,taskType,state,takeTime,url,e)
continue
news_lis = news_div.find_elements(By.XPATH,"./ul/li")
log.info(f"{num}--{gpdm}--{len(news_lis)}条信息")
log.info(f"{name}--{gpdm}--{len(news_lis)}条信息")
for i in range(0,len(news_lis)):
try:
a_ele= news_lis[i].find_element(By.XPATH,"./div[1]/div[1]/div[2]/h3[1]/a")
except Exception :
log.error(f"{num}--{gpdm}--{i}----a标签没找到")
except Exception as e:
log.error(f"{name}--{gpdm}--{i}----a标签没找到")
e = str(e) + '.......a标签没找到'
taskType = '企业动态'
state = 0
takeTime = baseCore.getTimeCost(start_time,time.time())
baseCore.recordLog(xydm,taskType,state,takeTime,url,e)
continue
news_url = a_ele.get_attribute("href").lstrip().strip().replace("'","''")
if(news_url.startswith("https://finance.yahoo.com")):
......@@ -151,12 +145,30 @@ for num in range(len(df_all)):
cursor.execute(sel_sql, (news_url,xydm))
selects = cursor.fetchall()
if selects:
log.error(f"{num}--{gpdm}--网址已经存在----{news_url}")
log.error(f"{name}--{gpdm}--网址已经存在----{news_url}")
e = '网址已存在'
taskType = '企业动态'
state = 0
takeTime = baseCore.getTimeCost(start_time,time.time())
baseCore.recordLog(xydm,taskType,state,takeTime,news_url,e)
continue
title = a_ele.text.lstrip().strip().replace("'","''")
getZx(xydm,news_url,title,cnx)
log.info(f"{num}--{gpdm}--{i}----{news_url}----------{news_url}")
e = getZx(xydm,news_url,title,cnx)
taskType = '企业动态'
if e == '':
state = 1
else:
state = 0
takeTime = baseCore.getTimeCost(start_time,time.time())
baseCore.recordLog(xydm,taskType,state,takeTime,news_url,e)
log.info(f"{name}--{gpdm}--{i}----{news_url}----------{news_url}")
log.info(f"{name}--{gpdm}--企业整体,耗时{baseCore.getTimeCost(start_time,time.time())}")
# 信息采集完成后将该企业的采集次数更新
runType = 'NewsRunCount'
count += 1
baseCore.updateRun(social_code,runType,count)
log.info(f"{num}--{gpdm}--企业整体,耗时{baseCore.getTimeCost(start_time,time.time())}")
#释放资源
baseCore.close()
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论