提交 ed571f70 作者: 刘伟刚

Merge remote-tracking branch 'origin/master'

......@@ -421,15 +421,15 @@ class BaseCore:
chrome_options.add_experimental_option('useAutomationExtension', False)
chrome_options.add_argument('lang=zh-CN,zh,zh-TW,en-US,en')
chrome_options.add_argument(self.getRandomUserAgent())
chrome_options.add_argument('user-agent=' + self.getRandomUserAgent())
# 'user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36')
driver = webdriver.Chrome(chrome_options=chrome_options, service=service)
with open('../../base/stealth.min.js') as f:
js = f.read()
driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
"source": js
})
# with open(r'F:\zzsn\zzsn_spider\base\stealth.min.js') as f:
# js = f.read()
#
# driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
# "source": js
# })
return driver
# 根据社会信用代码获取企业信息
......@@ -458,6 +458,7 @@ class BaseCore:
print(e)
self.cnx.commit()
#获取企查查token
def GetToken(self):
#获取企查查token
query = "select token from QCC_token "
......@@ -476,6 +477,7 @@ class BaseCore:
return 'cn'
return result[0]
#追加接入excel
def writerToExcel(self,detailList,filename):
# filename='baidu搜索.xlsx'
# 读取已存在的xlsx文件
......@@ -488,4 +490,8 @@ class BaseCore:
combined_data.to_excel(filename, index=False)
# return combined_data
#对失败或者断掉的企业 重新放入redis
def rePutIntoR(self,item):
self.r.rpush('NewsEnterprise:gwqy_socialCode', item)
......@@ -29,28 +29,34 @@ r = basecore.r
# gn_social_list = [item[0] for item in gn_result]
# return gn_social_list,gw_social_list
#企业动态
def NewsEnterprise():
#获取国内企业
gn_query = "select SocialCode from EnterpriseInfo where Place = '1'"
cursor.execute(gn_query)
gn_result = cursor.fetchall()
# #获取国内企业
# gn_query = "select SocialCode from EnterpriseInfo where Place = '1'"
# cursor.execute(gn_query)
# gn_result = cursor.fetchall()
#获取国外企业
gw_query = "select SocialCode from EnterpriseInfo where Place = '2'"
cursor.execute(gw_query)
gw_result = cursor.fetchall()
gw_social_list = [item[0] for item in gw_result]
gn_social_list = [item[0] for item in gn_result]
# return gn_social_list, gw_social_list
#todo:打印长度
print(len(gw_social_list))
# gn_social_list = [item[0] for item in gn_result]
print('=======')
# gn_social_list,gw_social_list = pullDateFromSql()
#将数据插入到redis中
for item in gn_social_list:
r.rpush('NewsEnterprise:gnqy_socialCode', item)
#将数据插入到redis中
# for item in gn_social_list:
# r.rpush('NewsEnterprise:gnqy_socialCode', item)
count = 0
for item in gw_social_list:
r.rpush('NewsEnterprise:gwqy_socialCode', item)
count+=1
print(item)
print(count)
#企业动态定时任务
def NewsEnterprise_task():
# 实例化一个调度器
scheduler = BlockingScheduler()
......@@ -63,6 +69,7 @@ def NewsEnterprise_task():
print('定时采集异常', e)
pass
#企业公告
def NoticeEnterprise():
# 获取国内企业
gn_query = "select SocialCode from EnterpriseInfo where Place = '1' and SecuritiesCode is not null limit 1 "
......@@ -72,7 +79,7 @@ def NoticeEnterprise():
print('=======')
for item in gn_social_list:
r.rpush('NoticeEnterprise:gnqy_socialCode', item)
#企业公告定时任务
def NoticeEnterprise_task():
# 实例化一个调度器
scheduler = BlockingScheduler()
......@@ -85,6 +92,7 @@ def NoticeEnterprise_task():
print('定时采集异常', e)
pass
#企业年报
def AnnualEnterprise():
# 获取国内企业
gn_query = "select SocialCode from EnterpriseInfo where Place = '1' and SecuritiesCode is not null"
......@@ -94,7 +102,7 @@ def AnnualEnterprise():
print('=======')
for item in gn_social_list:
r.rpush('AnnualEnterprise:gnqy_socialCode', item)
#企业年报定时任务
def AnnualEnterprise_task():
# 实例化一个调度器
scheduler = BlockingScheduler()
......@@ -107,6 +115,7 @@ def AnnualEnterprise_task():
print('定时采集异常', e)
pass
#企业基本信息
def BaseInfoEnterprise():
# 获取国内企业
gn_query = "select SocialCode from EnterpriseInfo where Place = '1' limit 1 "
......@@ -117,7 +126,7 @@ def BaseInfoEnterprise():
for item in gn_social_list:
r.rpush('BaseInfoEnterprise:gnqy_socialCode', item)
#企业基本信息
#企业基本信息定时任务
def BaseInfoEnterprise_task():
# 实例化一个调度器
scheduler = BlockingScheduler()
......@@ -130,12 +139,58 @@ def BaseInfoEnterprise_task():
print('定时采集异常', e)
pass
#微信公众号
def WeiXingetFromSql():
selectSql = "SELECT info_source_code from info_source where site_uri like '%mp.weixin.qq.com%'"
cursor.execute(selectSql)
results = cursor.fetchall()
result_list = [item[0] for item in results]
#放入redis
for item in result_list:
r.rpush('WeiXinGZH:infoSourceCode', item)
#微信公众号定时任务
def weixin_task():
# 实例化一个调度器
scheduler = BlockingScheduler()
# 每天执行一次
scheduler.add_job(WeiXingetFromSql, 'cron', hour=12,minute=0)
try:
# redisPushData # 定时开始前执行一次
scheduler.start()
except Exception as e:
print('定时采集异常', e)
pass
##福布斯=====从数据库中读取信息放入redis
def FBS():
# todo:调整为获取福布斯的数据库
gw_query = "select SocialCode from EnterpriseInfo where Place = '2'"
cursor.execute(gw_query)
gw_result = cursor.fetchall()
# #获取国内企业
gn_query = "select SocialCode from EnterpriseInfo where Place = '1'"
cursor.execute(gn_query)
gn_result = cursor.fetchall()
gn_social_list = [item[0] for item in gn_result]
gw_social_list = [item[0] for item in gw_result]
for item in gw_social_list:
r.rpush('NewsEnterprise:gwqy_socialCode', item)
for item in gn_social_list:
r.rpush('NewsEnterprise:gnqy_socialCode', item)
if __name__ == "__main__":
start = time.time()
# NewsEnterprise_task()
# NewsEnterprise()
FBS()
# NoticeEnterprise_task()
AnnualEnterprise_task()
# AnnualEnterprise_task()
log.info(f'====={basecore.getNowTime(1)}=====添加数据成功======耗时:{basecore.getTimeCost(start,time.time())}===')
# cnx.close()
# cursor.close()
......
......@@ -7,7 +7,7 @@ import pymysql
import requests
from base.BaseCore import BaseCore
requests.adapters.DEFAULT_RETRIES = 5
baseCore = BaseCore()
log = baseCore.getLogger()
headers={
......@@ -34,13 +34,17 @@ def getTycIdByXYDM(xydm):
if matchType=='信用代码匹配':
retData['state'] = True
retData['tycData'] = retJsonData['data'][0]
response.close()
return retData
else:
log.error(f"{xydm}------{retJsonData}")
response.close()
return retData
except Exception as e:
log.error(f"{xydm}---exception---{e}")
return retData
# 更新天眼查企业基本信息
def updateTycInfo(id,retData):
state= retData['state']
......
"""
增量采集:
取state为3、update_state为空的企业 表示上次采集成功的企业,
新增update_state字段,取一个企业更新为2,表示该企业正在采集。
采集完毕更新为1.
表示已经采集完成。跟据date_time 来排列 每次就不会拿到重复的数据。
okCount
errorCount
repectCount
新增三个字段分别对应更新的up_okCount up_errorCount up_repectCount ,
记录这些更新的数据 然后加到原来的数据上表示该企业已采集多少动态
8.8日改版,企业动态也传kafka
"""
import json
import requests, time, pymysql
import jieba
import sys
from kafka import KafkaProducer
from getTycId import getTycIdByXYDM
from base.BaseCore import BaseCore
from base.smart import smart_extractor
# sys.path.append('D:\\KK\\zzsn_spider\\base')
......@@ -49,12 +34,13 @@ headers = {
'Referer': 'https://www.tianyancha.com/',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Edg/114.0.1823.51'
}
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
taskType = '企业动态/天眼查'
def beinWork(tyc_code, social_code):
start_time = time.time()
def beinWork(tyc_code, social_code,start_time):
time.sleep(3)
# retData={'up_state':False,'total':0,'up_okCount':0,'up_errorCount':0,'up_repetCount':0}
retData = {'total': 0, 'up_okCount': 0, 'up_errorCount': 0, 'up_repetCount': 0}
......@@ -230,12 +216,13 @@ def beinWork(tyc_code, social_code):
'sid': '1684032033495392257',
'sourceAddress': link, # 原文链接
'summary': info_page['abstracts'],
'title': contentText,
'title': title,
'type': 2,
'socialCreditCode': social_code,
'year': time_format[:4]
}
except Exception as e:
log.info(f'传输失败:{social_code}----{link}')
e = '数据库传输失败'
state = 0
......@@ -263,6 +250,7 @@ def beinWork(tyc_code, social_code):
baseCore.recordLog(social_code, taskType, state, takeTime, link, '')
# return True
except Exception as e:
dic_result = {
'success': 'false',
'message': '操作失败',
......@@ -276,8 +264,6 @@ def beinWork(tyc_code, social_code):
baseCore.recordLog(social_code, taskType, state, takeTime, link, e)
log.info(f"获取分页数据--{tyc_code}----分页{num},耗时{baseCore.getTimeCost(start_page, time.time())}")
retData['up_okCount'] = up_okCount
retData['up_errorCount'] = up_errorCount
retData['up_repetCount'] = up_repetCount
......@@ -295,10 +281,26 @@ def doJob():
if social_code == 'None':
time.sleep(20)
continue
start = time.time()
try:
data = baseCore.getInfomation(social_code)
id = data[0]
xydm = data[2]
tycid = data[11]
if tycid == None:
try:
retData = getTycIdByXYDM(xydm)
tycid = retData['tycData']['id']
#todo:写入数据库
updateSql = f"update Enterprise set TYCID = '{tycid}' where SocialCode = '{xydm}'"
cursor_.execute(updateSql)
cnx_.commit()
except:
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', '获取天眼查id失败')
baseCore.rePutIntoR(social_code)
continue
count = data[17]
log.info(f"{id}---{xydm}----{tycid}----开始处理")
start_time = time.time()
......@@ -308,7 +310,7 @@ def doJob():
# cnx.commit()
# 开始采集企业动态
retData = beinWork(tycid, xydm)
retData = beinWork(tycid, xydm,start_time)
# 信息采集完成后将该企业的采集次数更新
runType = 'NewsRunCount'
count += 1
......@@ -319,6 +321,12 @@ def doJob():
up_repetCount = retData['up_repetCount']
log.info(
f"{id}---{xydm}----{tycid}----结束处理,耗时{baseCore.getTimeCost(start_time, time.time())}---总数:{total}---成功数:{up_okCount}----失败数:{up_errorCount}--重复数:{up_repetCount}")
except:
log.info(f'==={social_code}=====获取企业信息失败====')
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', '获取企业信息失败')
cursor.close()
cnx.close()
......@@ -328,4 +336,6 @@ def doJob():
# Press the green button in the gutter to run the script.
if __name__ == '__main__':
doJob()
import pandas as pd
def writeaa():
detailList=[]
aa={
'id':3,
'name':'qqqwe'
}
detailList.append(aa)
writerToExcel(detailList)
# def writeaa():
# detailList=[]
# aa={
# 'id':3,
# 'name':'qqqwe'
# }
# detailList.append(aa)
# writerToExcel(detailList)
# 将数据追加到excel
def writerToExcel(detailList):
# filename='baidu搜索.xlsx'
# 读取已存在的xlsx文件
existing_data = pd.read_excel(filename,engine='openpyxl')
# 创建新的数据
new_data = pd.DataFrame(data=detailList)
# 将新数据添加到现有数据的末尾
combined_data = existing_data.append(new_data, ignore_index=True)
# 将结果写入到xlsx文件
combined_data.to_excel(filename, index=False)
# def writerToExcel(detailList):
# # filename='baidu搜索.xlsx'
# # 读取已存在的xlsx文件
# existing_data = pd.read_excel(filename,engine='openpyxl')
# # 创建新的数据
# new_data = pd.DataFrame(data=detailList)
# # 将新数据添加到现有数据的末尾
# combined_data = existing_data.append(new_data, ignore_index=True)
# # 将结果写入到xlsx文件
# combined_data.to_excel(filename, index=False)
#
# from openpyxl import Workbook
#
# if __name__ == '__main__':
# filename='test1.xlsx'
# # # 创建一个工作簿
# workbook = Workbook(filename)
# workbook.save(filename)
# writeaa()
from openpyxl import Workbook
if __name__ == '__main__':
filename='test1.xlsx'
# # 创建一个工作簿
workbook = Workbook(filename)
workbook.save(filename)
writeaa()
gpdm = '01109.HK'
if 'HK' in str(gpdm):
tmp_g = str(gpdm).split('.')[0]
if len(tmp_g) == 5:
gpdm = str(gpdm)[1:]
print(gpdm)
else:
pass
# 雅虎财经企业动态获取
# 雅虎财经企业动态获取
......@@ -5,14 +5,18 @@ import pymysql
from kafka import KafkaProducer
from selenium.webdriver.common.by import By
import sys
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from base import BaseCore
from base.smart import smart_extractor
sys.path.append('D:/zzsn_spider/base')
import BaseCore
from smart import smart_extractor
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
r = baseCore.r
taskType = '企业动态/雅虎财经'
smart =smart_extractor.SmartExtractor('cn')
......@@ -45,6 +49,8 @@ def getZx(xydm, url, title, cnx, path):
content = contentElement.replace("'", "''")
driverContent.close()
# driverContent.quit()
# 动态信息列表
list_info = [
xydm,
......@@ -85,8 +91,8 @@ def getZx(xydm, url, title, cnx, path):
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': 'zh',
'origin': '天眼查',
'lang': 'en',
'origin': '雅虎财经',
'publishDate': pub_time,
'sid': '1684032033495392257',
'sourceAddress': url, # 原文链接
......@@ -155,40 +161,44 @@ def getLastUrl():
def scroll(xydm,name,gpdm):
last_url_ = ''
try:
last_url = getLastUrl()
except:
log.error(f"{name}--{gpdm}--获取不到最后一条链接")
while True:
js = "var q=document.documentElement.scrollTop=100000"
driver.execute_script(js)
time.sleep(1)
try:
last_url_ = getLastUrl()
last_url = getLastUrl()
except Exception as e:
log.error(f"{name}--{gpdm}--获取不到最后一条链接")
break
try:
selects = selectUrl(last_url_,xydm)
except:
break
if selects:
break
# try:
# selects = selectUrl(last_url_,xydm)
# except:
# break
# if selects:
# break
if last_url_ == last_url:
break
last_url = last_url_
last_url_ = last_url
#采集失败的公众号 重新放入redis
def rePutIntoR(item):
r.rpush('NewsEnterprise:gwqy_socialCode', item)
if __name__ == "__main__":
path = r'D:\zzsn_spider\comData\cmd6\chromedriver.exe'
path = r'F:\spider\115\chromedriver.exe'
driver = baseCore.buildDriver(path)
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
cursor = cnx.cursor()
while True:
# 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息
social_code = baseCore.redicPullData('NewsEnterprise:gwqy_socialCode')
# 判断 如果Redis中已经没有数据,则等待
if not social_code :
time.sleep(20)
continue
if social_code == 'None':
time.sleep(20)
continue
......@@ -214,10 +224,13 @@ if __name__ == "__main__":
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, '', exception)
continue
try:
url = f"https://finance.yahoo.com/quote/{gpdm}/press-releases?p={gpdm}"
driver.get(url)
try:
WebDriverWait(driver, 15).until(EC.visibility_of_element_located((By.ID, 'summaryPressStream-0-Stream')))
news_div = driver.find_element(By.ID, 'summaryPressStream-0-Stream')
news_div.find_element(By.TAG_NAME, 'a')
except Exception as e:
log.error(f"{name}--{gpdm}--没找到新闻元素")
exception = '没找到新闻元素'
......@@ -232,16 +245,30 @@ if __name__ == "__main__":
log.error(f"{name}--{gpdm}--拖拽出现问题")
news_lis = news_div.find_elements(By.XPATH, "./ul/li")
log.info(f"{name}--{gpdm}--{len(news_lis)}条信息")
#标识符 判断脚本是否断开连接
flag = 0
for i in range(0, len(news_lis)):
try:
try:
a_ele = news_lis[i].find_element(By.XPATH, "./div[1]/div[1]/div[2]/h3[1]/a")
except:
a_ele = news_lis[i].find_element(By.XPATH, "./div[1]/div[1]/div[1]/h3[1]/a")
except Exception as e:
if news_lis[i].is_displayed():
log.error(f"{name}--{gpdm}--{i}----a标签没找到")
exception = 'a标签没找到'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, url, exception)
continue
else:
log.error(f"{name}--{gpdm}--{i}----与网站断开连接")
#todo:重新放入redis
rePutIntoR(xydm)
time.sleep(300)
flag = 1
break
news_url = a_ele.get_attribute("href").lstrip().strip().replace("'", "''")
if (news_url.startswith("https://finance.yahoo.com")):
pass
......@@ -257,6 +284,9 @@ if __name__ == "__main__":
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, news_url, exception)
# 增量使用
# break
# 全量使用
continue
title = a_ele.text.lstrip().strip().replace("'", "''")
exception = getZx(xydm, news_url, title, cnx, path)
......@@ -268,12 +298,25 @@ if __name__ == "__main__":
baseCore.recordLog(xydm, taskType, state, takeTime, news_url, exception)
log.info(f"{name}--{gpdm}--{i}----{news_url}")
if flag==1:
continue
log.info(f"{name}--{gpdm}--企业整体,耗时{baseCore.getTimeCost(start_time, time.time())}")
# 信息采集完成后将该企业的采集次数更新
runType = 'NewsRunCount'
count += 1
baseCore.updateRun(social_code, runType, count)
except:
rePutIntoR(xydm)
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, '', '远程主机强迫关闭了一个现有的连接。')
log.info(f"-------{name}--{gpdm}---'远程主机强迫关闭了一个现有的连接。'--------")
log.info('===========连接已被关闭========等待重新连接===========')
driver.quit()
driver = baseCore.buildDriver(path)
time.sleep(5)
continue
cursor.close()
cnx.close()
......
This source diff could not be displayed because it is too large. You can view the blob instead.
import time
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from base.BaseCore import BaseCore
baseCore = BaseCore()
......@@ -6,13 +8,25 @@ log =baseCore.getLogger()
if __name__ == '__main__':
log.info("ok")
#获取流水号
print(baseCore.getNextSeq())
print(baseCore.getNextSeq())
# 获取随机agent
print(baseCore.getRandomUserAgent())
# 获取代理池
print(baseCore.get_proxy())
# 释放相关资源
baseCore.close()
\ No newline at end of file
path = r'F:\spider\115\chromedriver.exe'
driver = baseCore.buildDriver(path,headless=False)
# service = Service(r'F:\spider\115\chromedriver.exe')
# chrome_options = webdriver.ChromeOptions()
# # chrome_options.add_argument('--headless')
# # chrome_options.add_argument('--disable-gpu')
# chrome_options.add_experimental_option(
# "excludeSwitches", ["enable-automation"])
# chrome_options.add_experimental_option('useAutomationExtension', False)
# chrome_options.add_argument('lang=zh-CN,zh,zh-TW,en-US,en')
# chrome_options.add_argument('user-agent='+baseCore.getRandomUserAgent())
#
# bro = webdriver.Chrome(chrome_options=chrome_options, service=service)
# with open('stealth.min.js') as f:
# js = f.read()
#
# bro.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
# "source": js
# })
gpdm = '9021.T'
url = f"https://finance.yahoo.com/quote/{gpdm}/press-releases?p={gpdm}"
driver.get(url)
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论