提交 456ba4fa 作者: 薛凌堃

国内外企业动态自动化

上级 f4dce399
......@@ -7,21 +7,17 @@ import logbook
import logbook.more
# 核心工具包
import pymysql
import redis
from selenium import webdriver
# 注意 程序退出前 调用BaseCore.close() 关闭相关资源
from selenium.webdriver.chrome.service import Service
# 注意 程序退出前 调用BaseCore.close() 关闭相关资源
class BaseCore:
# 序列号
__seq = 0
# 代理池 数据库连接
__cnx_proxy = None
__cnx_proxy =None
__cursor_proxy = None
# 基本信息 数据库连接
__cnx_infomation = None
__cursor_infomation = None
# agent 池
__USER_AGENT_LIST = [
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.90 Safari/537.36',
......@@ -215,26 +211,29 @@ class BaseCore:
'Mozilla/5.0 (iPod; U; CPU iPhone OS 4_3_3 like Mac OS X; en-us) AppleWebKit/533.17.9 (KHTML, like Gecko) Version/5.0.2 Mobile/8J2 Safari/6533.18.5'
]
# 连接到Redis
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=6)
def close(self):
try:
self.__cursor_proxy.close()
self.__cnx_proxy.close()
self.__cursor_infomation.close()
self.__cnx_infomation.close()
except:
self.cursor.close()
self.cnx.close()
except :
pass
def __init__(self):
self.__cnx_proxy = pymysql.connect(host='114.115.159.144', user='root', password='zzsn9988', db='clb_project',
charset='utf8mb4')
self.__cursor_proxy = self.__cnx_proxy.cursor()
self.__cnx_infomation = pymysql.connect(host='114.115.159.144', user='root', password='zzsn9988', db='caiji',
charset='utf8mb4')
self.__cursor_infomation = self.__cnx_infomation.cursor()
self.cnx = pymysql.connect(host='114.115.159.144', user='root', password='zzsn9988', db='caiji',
charset='utf8mb4')
self.cursor = self.cnx.cursor()
pass
# 计算耗时
def getTimeCost(self, start, end):
def getTimeCost(self,start, end):
seconds = int(end - start)
m, s = divmod(seconds, 60)
h, m = divmod(m, 60)
......@@ -247,7 +246,6 @@ class BaseCore:
else:
ms = int((end - start) * 1000)
return "%d毫秒" % (ms)
# 当前时间格式化
# 1 : 2001-01-01 12:00:00 %Y-%m-%d %H:%M:%S
# 2 : 010101120000 %y%m%d%H%M%S
......@@ -277,7 +275,7 @@ class BaseCore:
return "ZZSN" + self.getNowTime(2) + str(self.__seq).zfill(3)
# 日志格式
def logFormate(self, record, handler):
def logFormate(self,record, handler):
formate = "[{date}] [{level}] [{filename}] [{func_name}] [{lineno}] {msg}".format(
date=record.time, # 日志时间
level=record.level_name, # 日志等级
......@@ -287,9 +285,8 @@ class BaseCore:
msg=record.message # 日志内容
)
return formate
# 获取logger
def getLogger(self, fileLogFlag=True, stdOutFlag=True):
def getLogger(self,fileLogFlag=True, stdOutFlag=True):
dirname, filename = os.path.split(os.path.abspath(sys.argv[0]))
dirname = os.path.join(dirname, "logs")
filename = filename.replace(".py", "") + ".log"
......@@ -338,25 +335,48 @@ class BaseCore:
proxy_list.append(proxy)
return proxy_list[random.randint(0, 3)]
# 字符串截取
def getSubStr(self, str, beginStr, endStr):
if beginStr == '':
#字符串截取
def getSubStr(self,str,beginStr,endStr):
if beginStr=='':
pass
else:
begin = str.find(beginStr)
if begin == -1:
begin = 0
str = str[begin:]
if endStr == '':
begin=str.find(beginStr)
if begin==-1:
begin=0
str=str[begin:]
if endStr=='':
pass
else:
end = str.rfind(endStr)
if end == -1:
end=str.rfind(endStr)
if end==-1:
pass
else:
str = str[0:end + 1]
str = str[0:end+1]
return str
# def pullDateFromSql(self):
# query = "select SocialCode from EnterpriseInfo "
# self.cursor.execute(query)
# result = self.cursor.fetchall()
# social_list = list(result)
# return social_list
#
# def redisPushData(self,social_list):
#
# #将数据插入到redis中
# for item in social_list:
# self.r.rpush('qy_socialCode', item)
# 从Redis的List中获取并移除一个元素
def redicPullData(self,type):
if type == 1:
gn_item = self.r.lpop('gnqy_socialCode')
return gn_item.decode() if gn_item else None
if type == 2:
gw_item = self.r.lpop('gwqy_socialCode')
return gw_item.decode() if gw_item else None
# 获得脚本进程PID
def getPID(self):
PID = os.getpid()
......@@ -384,7 +404,7 @@ class BaseCore:
chrome_options.add_argument(
'user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36')
driver = webdriver.Chrome(chrome_options=chrome_options, service=service)
with open('./stealth.min.js') as f:
with open('../../base/stealth.min.js') as f:
js = f.read()
driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
......@@ -395,15 +415,15 @@ class BaseCore:
# 根据社会信用代码获取企业信息
def getInfomation(self, social_code):
sql = f"SELECT * FROM EnterpriseInfo WHERE SocialCode = '{social_code}'"
self.__cursor_infomation.execute(sql)
data = self.__cursor_infomation.fetchone()
self.cursor.execute(sql)
data = self.cursor.fetchone()
return data
# 更新企业采集次数
def updateRun(self, social_code, runType, count):
sql_update = f"UPDATE EnterpriseInfo SET {runType} = {count} WHERE SocialCode = '{social_code}'"
self.__cursor_infomation.excute(sql_update)
self.__cnx_infomation.commit()
self.cursor.execute(sql_update)
self.cnx.commit()
# 保存日志入库
def recordLog(self, xydm, taskType, state, takeTime, url, e):
......@@ -412,5 +432,10 @@ class BaseCore:
pid = self.getPID()
sql = "INSERT INTO LogTable(SocialCode,TaskType,state,TakeTime,url,CreateTime,ProcessIp,PID,Exception) VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s)"
values = [xydm, taskType, state, takeTime, url, createTime, ip, pid, e]
self.__cursor_infomation.excute(sql, values)
self.__cnx_infomation.commit()
try:
self.cursor.execute(sql, values)
except Exception as e:
print(e)
self.cnx.commit()
import time
import pymysql
import redis
from base import BaseCore
from apscheduler.schedulers.blocking import BlockingScheduler
basecore = BaseCore.BaseCore()
log = basecore.getLogger()
# 连接到Redis
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=6)
cnx = pymysql.connect(host='114.115.159.144', user='root', password='zzsn9988', db='caiji',
charset='utf8mb4')
cursor = cnx.cursor()
def pullDateFromSql():
gn_query = "select SocialCode from EnterpriseInfo where Place = '1' limit 1 "
cursor.execute(gn_query)
gn_result = cursor.fetchall()
gw_query = "select SocialCode from EnterpriseInfo where Place = '2' limit 1 "
cursor.execute(gw_query)
gw_result = cursor.fetchall()
gw_social_list = [item[0] for item in gw_result]
gn_social_list = [item[0] for item in gn_result]
return gn_social_list,gw_social_list
def redisPushData():
print('=======')
gn_social_list,gw_social_list = pullDateFromSql()
#将数据插入到redis中
for item in gn_social_list:
r.rpush('gnqy_socialCode', item)
for item in gw_social_list:
r.rpush('gwqy_socialCode', item)
# 从Redis的List中获取并移除一个元素
def redicPullData(type):
gn_item = r.lpop('gn_socialCode')
gw_item = r.lpop('gw_socialCode')
#1 表示国内 2 表示国外
if type==1:
return gn_item.decode() if gn_item else None
if type==2:
return gw_item.decode() if gw_item else None
def task(task_time):
# 实例化一个调度器
scheduler = BlockingScheduler()
# 每半分钟执行一次
scheduler.add_job(redisPushData, 'cron', second=task_time, max_instances=3)
# 每天早上9点执行一次
# scheduler.add_job(self.auto_tb(), 'cron', day='*', hour=12, minute=5, start_date='2021-12-16 09:00:00',end_date='2023-11-30 23:59:59')
try:
# redisPushData # 定时开始前执行一次
scheduler.start()
except Exception as e:
print('定时采集异常', e)
pass
if __name__ == "__main__":
start = time.time()
task_time = '*/10'
task(task_time)
log.info(f'====={basecore.getNowTime(1)}=====添加数据成功======耗时:{basecore.getTimeCost(start,time.time())}===')
\ No newline at end of file
......@@ -52,11 +52,6 @@ class SmartExtractor:
# 支持语言
self.goose = Goose({'stopwords_class': StopWordsChinese})
def get_extraction_result(self, article, link_text=''):
"""
获取采集结果:
......
......@@ -68,6 +68,9 @@ def beinWork(tyc_code, social_code):
break
except Exception as e:
log.error(f"request请求异常----{m}-----{e}")
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, e)
pass
if (response.status_code == 200):
......@@ -154,10 +157,9 @@ def beinWork(tyc_code, social_code):
selects = cursor.fetchone()
if selects:
log.info(f'{tyc_code}-----{social_code}----{link}:已经存在')
# up_repetCount = up_repetCount + 1
# continue
# todo:如果该条数据存在则说明该条数据之后的都已经采集完成,就可以跳出函数,执行下一个企业
# retData['up_state'] = True
retData['up_okCount'] = up_okCount
retData['up_errorCount'] = up_errorCount
retData['up_repetCount'] = up_repetCount
......@@ -274,7 +276,7 @@ def beinWork(tyc_code, social_code):
baseCore.recordLog(social_code, taskType, state, takeTime, link, e)
log.info(f"获取分页数据--{tyc_code}----分页{num},耗时{baseCore.getTimeCost(start_page, time.time())}")
# retData['up_state'] = True
retData['up_okCount'] = up_okCount
retData['up_errorCount'] = up_errorCount
retData['up_repetCount'] = up_repetCount
......@@ -286,9 +288,9 @@ def beinWork(tyc_code, social_code):
def doJob():
while True:
# 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息
social_code = ''
social_code = baseCore.redicPullData(1)
# 判断 如果Redis中已经没有数据,则等待
if social_code == '':
if social_code == 'None':
time.sleep(20)
continue
data = baseCore.getInfomation(social_code)
......@@ -309,30 +311,10 @@ def doJob():
runType = 'NewsRunCount'
count += 1
baseCore.updateRun(social_code, runType, count)
# up_state = retData['up_state']
total = retData['total']
up_okCount = retData['up_okCount']
up_errorCount = retData['up_errorCount']
up_repetCount = retData['up_repetCount']
# if up_state:
# stateNum = 1
# else:
# stateNum = 4
#
# # 取出数据库中okCount errorCount repetCount 并更新
# selectOrginSql = f"select okCount,errorCount,repetCount,total from ssqy_tyc where id={id}"
# cursor.execute(selectOrginSql)
# count_info = cursor.fetchone()
# okCount = count_info[0]
# errorCount = count_info[1]
# repetCount = count_info[2]
#
# updateEndSql = f"update ssqy_tyc set update_state={stateNum},up_okCount={up_okCount},up_errorCount={up_errorCount},up_repetCount={up_repetCount} ,date_time=now(),okCount={okCount+up_okCount},errorCount={errorCount+up_errorCount},repetCount={repetCount+up_repetCount},total={total} where id={id}"
# cursor.execute(updateEndSql)
# cnx.commit()
log.info(
f"{id}---{xydm}----{tycid}----结束处理,耗时{baseCore.getTimeCost(start_time, time.time())}---总数:{total}---成功数:{up_okCount}----失败数:{up_errorCount}--重复数:{up_repetCount}")
......@@ -341,19 +323,7 @@ def doJob():
# 释放资源
baseCore.close()
# Press the green button in the gutter to run the script.
if __name__ == '__main__':
doJob()
# link = 'https://m.thepaper.cn/newsDetail_forward_24049067'
# social_code = '915101006653023886'
# try:
# sel_sql = '''select social_credit_code from brpa_source_article where source_address = %s and social_credit_code = %s and type='2' '''
# print(sel_sql)
# cursor.execute(sel_sql, (link,social_code))
# aa = cursor.fetchone()
# print(aa)
# except Exception as e:
# print(e)
# 雅虎财经企业动态获取
# 雅虎财经企业动态获取
# 雅虎财经企业动态获取
import time
import pandas as pd
import pymysql
import requests
from bs4 import BeautifulSoup
from selenium.webdriver.common.by import By
from selenium import webdriver
from base.BaseCore import BaseCore
baseCore = BaseCore()
log= BaseCore.getLogger()
log= baseCore.getLogger()
#获取资讯详情
def getZx(xydm,url,title,cnx):
def getZx(xydm,url,title,cnx,path):
start_time_content= time.time()
try:
path = r'E:\chromedriver_win32\chromedriver.exe'
driverContent = baseCore.buildDriver(path)
driverContent.get(url)
try:
......@@ -71,12 +64,6 @@ def getZx(xydm,url,title,cnx):
return e
return ''
path = r'E:\chromedriver_win32\chromedriver.exe'
driver = baseCore.buildDriver(path)
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
# 拖拽30次获取企业新闻
def scroll(driver):
for i in range(0,30):
......@@ -85,89 +72,92 @@ def scroll(driver):
driver.execute_script(js)
time.sleep(0.1)
if __name__ == "__main__":
path = r'D:\chrome\chromedriver.exe'
driver = baseCore.buildDriver(path)
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
while True:
# 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息
social_code= baseCore.redicPullData(2)
while True:
# 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息
social_code = ''
# 判断 如果Redis中已经没有数据,则等待
if social_code == '':
time.sleep(20)
continue
data = baseCore.getInfomation(social_code)
name = data[1]
enname = data[5]
gpdm = data[3]
xydm = data[2]
taskType = '企业动态'
# 获取该企业对应项目的采集次数
count = data[17]
start_time = time.time()
if(gpdm==''):
log.error(f"{name}--股票代码为空 跳过")
e = '.......股票代码为空 跳过'
state = 0
takeTime = baseCore.getTimeCost(start_time,time.time())
baseCore.recordLog(xydm,taskType,state,takeTime,'',e)
continue
url=f"https://finance.yahoo.com/quote/{gpdm}/press-releases?p={gpdm}"
driver.get(url)
scroll(driver)
try:
news_div = driver.find_element(By.ID, 'summaryPressStream-0-Stream')
except Exception as e:
log.error(f"{name}--{gpdm}--没找到新闻元素")
e = str(e) + '.......没找到新闻元素'
state = 0
takeTime = baseCore.getTimeCost(start_time,time.time())
baseCore.recordLog(xydm,taskType,state,takeTime,url,e)
continue
news_lis = news_div.find_elements(By.XPATH,"./ul/li")
log.info(f"{name}--{gpdm}--{len(news_lis)}条信息")
for i in range(0,len(news_lis)):
# 判断 如果Redis中已经没有数据,则等待
if social_code == 'None':
time.sleep(20)
continue
data = baseCore.getInfomation(social_code)
name = data[1]
enname = data[5]
gpdm = data[3]
xydm = data[2]
taskType = '企业动态'
# 获取该企业对应项目的采集次数
count = data[17]
start_time = time.time()
if(gpdm==''):
log.error(f"{name}--股票代码为空 跳过")
e = '.......股票代码为空 跳过'
state = 0
takeTime = baseCore.getTimeCost(start_time,time.time())
baseCore.recordLog(xydm,taskType,state,takeTime,'',e)
continue
url=f"https://finance.yahoo.com/quote/{gpdm}/press-releases?p={gpdm}"
driver.get(url)
scroll(driver)
try:
a_ele= news_lis[i].find_element(By.XPATH,"./div[1]/div[1]/div[2]/h3[1]/a")
news_div = driver.find_element(By.ID, 'summaryPressStream-0-Stream')
except Exception as e:
log.error(f"{name}--{gpdm}--{i}----a标签没找到")
e = str(e) + '.......a标签没找到'
log.error(f"{name}--{gpdm}--没找到新闻元素")
e = str(e) + '.......没找到新闻元素'
state = 0
takeTime = baseCore.getTimeCost(start_time,time.time())
baseCore.recordLog(xydm,taskType,state,takeTime,url,e)
continue
news_url = a_ele.get_attribute("href").lstrip().strip().replace("'","''")
if(news_url.startswith("https://finance.yahoo.com")):
pass
else:
continue
#判断url是否已经存在
with cnx.cursor() as cursor:
sel_sql = '''select social_credit_code from brpa_source_article where source_address = %s and social_credit_code=%s '''
cursor.execute(sel_sql, (news_url,xydm))
selects = cursor.fetchall()
if selects:
log.error(f"{name}--{gpdm}--网址已经存在----{news_url}")
e = '网址已存在'
news_lis = news_div.find_elements(By.XPATH,"./ul/li")
log.info(f"{name}--{gpdm}--{len(news_lis)}条信息")
for i in range(0,len(news_lis)):
try:
a_ele= news_lis[i].find_element(By.XPATH,"./div[1]/div[1]/div[2]/h3[1]/a")
except Exception as e:
log.error(f"{name}--{gpdm}--{i}----a标签没找到")
e = str(e) + '.......a标签没找到'
state = 0
takeTime = baseCore.getTimeCost(start_time,time.time())
baseCore.recordLog(xydm,taskType,state,takeTime,news_url,e)
baseCore.recordLog(xydm,taskType,state,takeTime,url,e)
continue
title = a_ele.text.lstrip().strip().replace("'","''")
e = getZx(xydm,news_url,title,cnx)
if e == '':
state = 1
else:
state = 0
takeTime = baseCore.getTimeCost(start_time,time.time())
baseCore.recordLog(xydm,taskType,state,takeTime,news_url,e)
log.info(f"{name}--{gpdm}--{i}----{news_url}----------{news_url}")
news_url = a_ele.get_attribute("href").lstrip().strip().replace("'","''")
if(news_url.startswith("https://finance.yahoo.com")):
pass
else:
continue
#判断url是否已经存在
with cnx.cursor() as cursor:
sel_sql = '''select social_credit_code from brpa_source_article where source_address = %s and social_credit_code=%s '''
cursor.execute(sel_sql, (news_url,xydm))
selects = cursor.fetchall()
if selects:
log.error(f"{name}--{gpdm}--网址已经存在----{news_url}")
e = '网址已存在'
state = 0
takeTime = baseCore.getTimeCost(start_time,time.time())
baseCore.recordLog(xydm,taskType,state,takeTime,news_url,e)
continue
title = a_ele.text.lstrip().strip().replace("'","''")
e = getZx(xydm,news_url,title,cnx,path)
if e == '':
state = 1
else:
state = 0
takeTime = baseCore.getTimeCost(start_time,time.time())
baseCore.recordLog(xydm,taskType,state,takeTime,news_url,e)
log.info(f"{name}--{gpdm}--{i}----{news_url}")
log.info(f"{name}--{gpdm}--企业整体,耗时{baseCore.getTimeCost(start_time,time.time())}")
log.info(f"{name}--{gpdm}--企业整体,耗时{baseCore.getTimeCost(start_time,time.time())}")
# 信息采集完成后将该企业的采集次数更新
runType = 'NewsRunCount'
count += 1
baseCore.updateRun(social_code,runType,count)
# 信息采集完成后将该企业的采集次数更新
runType = 'NewsRunCount'
count += 1
baseCore.updateRun(social_code,runType,count)
#释放资源
baseCore.close()
\ No newline at end of file
#释放资源
baseCore.close()
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论