提交 610b0b53 作者: LiuLiYuan

Merge remote-tracking branch 'origin/master'

...@@ -7,21 +7,17 @@ import logbook ...@@ -7,21 +7,17 @@ import logbook
import logbook.more import logbook.more
# 核心工具包 # 核心工具包
import pymysql import pymysql
import redis
from selenium import webdriver from selenium import webdriver
# 注意 程序退出前 调用BaseCore.close() 关闭相关资源
from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.service import Service
# 注意 程序退出前 调用BaseCore.close() 关闭相关资源
class BaseCore: class BaseCore:
# 序列号 # 序列号
__seq = 0 __seq = 0
# 代理池 数据库连接 # 代理池 数据库连接
__cnx_proxy = None __cnx_proxy =None
__cursor_proxy = None __cursor_proxy = None
# 基本信息 数据库连接
__cnx_infomation = None
__cursor_infomation = None
# agent 池 # agent 池
__USER_AGENT_LIST = [ __USER_AGENT_LIST = [
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.90 Safari/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.90 Safari/537.36',
...@@ -215,26 +211,29 @@ class BaseCore: ...@@ -215,26 +211,29 @@ class BaseCore:
'Mozilla/5.0 (iPod; U; CPU iPhone OS 4_3_3 like Mac OS X; en-us) AppleWebKit/533.17.9 (KHTML, like Gecko) Version/5.0.2 Mobile/8J2 Safari/6533.18.5' 'Mozilla/5.0 (iPod; U; CPU iPhone OS 4_3_3 like Mac OS X; en-us) AppleWebKit/533.17.9 (KHTML, like Gecko) Version/5.0.2 Mobile/8J2 Safari/6533.18.5'
] ]
# 连接到Redis
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=6)
def close(self): def close(self):
try: try:
self.__cursor_proxy.close() self.__cursor_proxy.close()
self.__cnx_proxy.close() self.__cnx_proxy.close()
self.__cursor_infomation.close() self.cursor.close()
self.__cnx_infomation.close() self.cnx.close()
except: except :
pass pass
def __init__(self): def __init__(self):
self.__cnx_proxy = pymysql.connect(host='114.115.159.144', user='root', password='zzsn9988', db='clb_project', self.__cnx_proxy = pymysql.connect(host='114.115.159.144', user='root', password='zzsn9988', db='clb_project',
charset='utf8mb4') charset='utf8mb4')
self.__cursor_proxy = self.__cnx_proxy.cursor() self.__cursor_proxy = self.__cnx_proxy.cursor()
self.__cnx_infomation = pymysql.connect(host='114.115.159.144', user='root', password='zzsn9988', db='caiji', self.cnx = pymysql.connect(host='114.115.159.144', user='root', password='zzsn9988', db='caiji',
charset='utf8mb4') charset='utf8mb4')
self.__cursor_infomation = self.__cnx_infomation.cursor()
self.cursor = self.cnx.cursor()
pass pass
# 计算耗时 # 计算耗时
def getTimeCost(self, start, end): def getTimeCost(self,start, end):
seconds = int(end - start) seconds = int(end - start)
m, s = divmod(seconds, 60) m, s = divmod(seconds, 60)
h, m = divmod(m, 60) h, m = divmod(m, 60)
...@@ -247,7 +246,6 @@ class BaseCore: ...@@ -247,7 +246,6 @@ class BaseCore:
else: else:
ms = int((end - start) * 1000) ms = int((end - start) * 1000)
return "%d毫秒" % (ms) return "%d毫秒" % (ms)
# 当前时间格式化 # 当前时间格式化
# 1 : 2001-01-01 12:00:00 %Y-%m-%d %H:%M:%S # 1 : 2001-01-01 12:00:00 %Y-%m-%d %H:%M:%S
# 2 : 010101120000 %y%m%d%H%M%S # 2 : 010101120000 %y%m%d%H%M%S
...@@ -277,7 +275,7 @@ class BaseCore: ...@@ -277,7 +275,7 @@ class BaseCore:
return "ZZSN" + self.getNowTime(2) + str(self.__seq).zfill(3) return "ZZSN" + self.getNowTime(2) + str(self.__seq).zfill(3)
# 日志格式 # 日志格式
def logFormate(self, record, handler): def logFormate(self,record, handler):
formate = "[{date}] [{level}] [{filename}] [{func_name}] [{lineno}] {msg}".format( formate = "[{date}] [{level}] [{filename}] [{func_name}] [{lineno}] {msg}".format(
date=record.time, # 日志时间 date=record.time, # 日志时间
level=record.level_name, # 日志等级 level=record.level_name, # 日志等级
...@@ -287,9 +285,8 @@ class BaseCore: ...@@ -287,9 +285,8 @@ class BaseCore:
msg=record.message # 日志内容 msg=record.message # 日志内容
) )
return formate return formate
# 获取logger # 获取logger
def getLogger(self, fileLogFlag=True, stdOutFlag=True): def getLogger(self,fileLogFlag=True, stdOutFlag=True):
dirname, filename = os.path.split(os.path.abspath(sys.argv[0])) dirname, filename = os.path.split(os.path.abspath(sys.argv[0]))
dirname = os.path.join(dirname, "logs") dirname = os.path.join(dirname, "logs")
filename = filename.replace(".py", "") + ".log" filename = filename.replace(".py", "") + ".log"
...@@ -338,25 +335,48 @@ class BaseCore: ...@@ -338,25 +335,48 @@ class BaseCore:
proxy_list.append(proxy) proxy_list.append(proxy)
return proxy_list[random.randint(0, 3)] return proxy_list[random.randint(0, 3)]
# 字符串截取 #字符串截取
def getSubStr(self, str, beginStr, endStr): def getSubStr(self,str,beginStr,endStr):
if beginStr == '': if beginStr=='':
pass pass
else: else:
begin = str.find(beginStr) begin=str.find(beginStr)
if begin == -1: if begin==-1:
begin = 0 begin=0
str = str[begin:] str=str[begin:]
if endStr == '': if endStr=='':
pass pass
else: else:
end = str.rfind(endStr) end=str.rfind(endStr)
if end == -1: if end==-1:
pass pass
else: else:
str = str[0:end + 1] str = str[0:end+1]
return str return str
# def pullDateFromSql(self):
# query = "select SocialCode from EnterpriseInfo "
# self.cursor.execute(query)
# result = self.cursor.fetchall()
# social_list = list(result)
# return social_list
#
# def redisPushData(self,social_list):
#
# #将数据插入到redis中
# for item in social_list:
# self.r.rpush('qy_socialCode', item)
# 从Redis的List中获取并移除一个元素
def redicPullData(self,type):
if type == 1:
gn_item = self.r.lpop('gnqy_socialCode')
return gn_item.decode() if gn_item else None
if type == 2:
gw_item = self.r.lpop('gwqy_socialCode')
return gw_item.decode() if gw_item else None
# 获得脚本进程PID # 获得脚本进程PID
def getPID(self): def getPID(self):
PID = os.getpid() PID = os.getpid()
...@@ -384,7 +404,7 @@ class BaseCore: ...@@ -384,7 +404,7 @@ class BaseCore:
chrome_options.add_argument( chrome_options.add_argument(
'user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36') 'user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36')
driver = webdriver.Chrome(chrome_options=chrome_options, service=service) driver = webdriver.Chrome(chrome_options=chrome_options, service=service)
with open('./stealth.min.js') as f: with open('../../base/stealth.min.js') as f:
js = f.read() js = f.read()
driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", { driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
...@@ -395,15 +415,15 @@ class BaseCore: ...@@ -395,15 +415,15 @@ class BaseCore:
# 根据社会信用代码获取企业信息 # 根据社会信用代码获取企业信息
def getInfomation(self, social_code): def getInfomation(self, social_code):
sql = f"SELECT * FROM EnterpriseInfo WHERE SocialCode = '{social_code}'" sql = f"SELECT * FROM EnterpriseInfo WHERE SocialCode = '{social_code}'"
self.__cursor_infomation.execute(sql) self.cursor.execute(sql)
data = self.__cursor_infomation.fetchone() data = self.cursor.fetchone()
return data return data
# 更新企业采集次数 # 更新企业采集次数
def updateRun(self, social_code, runType, count): def updateRun(self, social_code, runType, count):
sql_update = f"UPDATE EnterpriseInfo SET {runType} = {count} WHERE SocialCode = '{social_code}'" sql_update = f"UPDATE EnterpriseInfo SET {runType} = {count} WHERE SocialCode = '{social_code}'"
self.__cursor_infomation.excute(sql_update) self.cursor.execute(sql_update)
self.__cnx_infomation.commit() self.cnx.commit()
# 保存日志入库 # 保存日志入库
def recordLog(self, xydm, taskType, state, takeTime, url, e): def recordLog(self, xydm, taskType, state, takeTime, url, e):
...@@ -412,5 +432,10 @@ class BaseCore: ...@@ -412,5 +432,10 @@ class BaseCore:
pid = self.getPID() pid = self.getPID()
sql = "INSERT INTO LogTable(SocialCode,TaskType,state,TakeTime,url,CreateTime,ProcessIp,PID,Exception) VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s)" sql = "INSERT INTO LogTable(SocialCode,TaskType,state,TakeTime,url,CreateTime,ProcessIp,PID,Exception) VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s)"
values = [xydm, taskType, state, takeTime, url, createTime, ip, pid, e] values = [xydm, taskType, state, takeTime, url, createTime, ip, pid, e]
self.__cursor_infomation.excute(sql, values) try:
self.__cnx_infomation.commit() self.cursor.execute(sql, values)
except Exception as e:
print(e)
self.cnx.commit()
import time
import pymysql
import redis
from base import BaseCore
from apscheduler.schedulers.blocking import BlockingScheduler
basecore = BaseCore.BaseCore()
log = basecore.getLogger()
# 连接到Redis
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=6)
cnx = pymysql.connect(host='114.115.159.144', user='root', password='zzsn9988', db='caiji',
charset='utf8mb4')
cursor = cnx.cursor()
def pullDateFromSql():
gn_query = "select SocialCode from EnterpriseInfo where Place = '1' limit 1 "
cursor.execute(gn_query)
gn_result = cursor.fetchall()
gw_query = "select SocialCode from EnterpriseInfo where Place = '2' limit 1 "
cursor.execute(gw_query)
gw_result = cursor.fetchall()
gw_social_list = [item[0] for item in gw_result]
gn_social_list = [item[0] for item in gn_result]
return gn_social_list,gw_social_list
def redisPushData():
print('=======')
gn_social_list,gw_social_list = pullDateFromSql()
#将数据插入到redis中
for item in gn_social_list:
r.rpush('gnqy_socialCode', item)
for item in gw_social_list:
r.rpush('gwqy_socialCode', item)
# 从Redis的List中获取并移除一个元素
def redicPullData(type):
gn_item = r.lpop('gn_socialCode')
gw_item = r.lpop('gw_socialCode')
#1 表示国内 2 表示国外
if type==1:
return gn_item.decode() if gn_item else None
if type==2:
return gw_item.decode() if gw_item else None
def task(task_time):
# 实例化一个调度器
scheduler = BlockingScheduler()
# 每半分钟执行一次
scheduler.add_job(redisPushData, 'cron', second=task_time, max_instances=3)
# 每天早上9点执行一次
# scheduler.add_job(self.auto_tb(), 'cron', day='*', hour=12, minute=5, start_date='2021-12-16 09:00:00',end_date='2023-11-30 23:59:59')
try:
# redisPushData # 定时开始前执行一次
scheduler.start()
except Exception as e:
print('定时采集异常', e)
pass
if __name__ == "__main__":
start = time.time()
task_time = '*/10'
task(task_time)
log.info(f'====={basecore.getNowTime(1)}=====添加数据成功======耗时:{basecore.getTimeCost(start,time.time())}===')
\ No newline at end of file
...@@ -52,11 +52,6 @@ class SmartExtractor: ...@@ -52,11 +52,6 @@ class SmartExtractor:
# 支持语言 # 支持语言
self.goose = Goose({'stopwords_class': StopWordsChinese}) self.goose = Goose({'stopwords_class': StopWordsChinese})
def get_extraction_result(self, article, link_text=''): def get_extraction_result(self, article, link_text=''):
""" """
获取采集结果: 获取采集结果:
......
...@@ -68,6 +68,9 @@ def beinWork(tyc_code, social_code): ...@@ -68,6 +68,9 @@ def beinWork(tyc_code, social_code):
break break
except Exception as e: except Exception as e:
log.error(f"request请求异常----{m}-----{e}") log.error(f"request请求异常----{m}-----{e}")
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, e)
pass pass
if (response.status_code == 200): if (response.status_code == 200):
...@@ -154,10 +157,9 @@ def beinWork(tyc_code, social_code): ...@@ -154,10 +157,9 @@ def beinWork(tyc_code, social_code):
selects = cursor.fetchone() selects = cursor.fetchone()
if selects: if selects:
log.info(f'{tyc_code}-----{social_code}----{link}:已经存在') log.info(f'{tyc_code}-----{social_code}----{link}:已经存在')
# up_repetCount = up_repetCount + 1
# continue
# todo:如果该条数据存在则说明该条数据之后的都已经采集完成,就可以跳出函数,执行下一个企业 # todo:如果该条数据存在则说明该条数据之后的都已经采集完成,就可以跳出函数,执行下一个企业
# retData['up_state'] = True
retData['up_okCount'] = up_okCount retData['up_okCount'] = up_okCount
retData['up_errorCount'] = up_errorCount retData['up_errorCount'] = up_errorCount
retData['up_repetCount'] = up_repetCount retData['up_repetCount'] = up_repetCount
...@@ -274,7 +276,7 @@ def beinWork(tyc_code, social_code): ...@@ -274,7 +276,7 @@ def beinWork(tyc_code, social_code):
baseCore.recordLog(social_code, taskType, state, takeTime, link, e) baseCore.recordLog(social_code, taskType, state, takeTime, link, e)
log.info(f"获取分页数据--{tyc_code}----分页{num},耗时{baseCore.getTimeCost(start_page, time.time())}") log.info(f"获取分页数据--{tyc_code}----分页{num},耗时{baseCore.getTimeCost(start_page, time.time())}")
# retData['up_state'] = True
retData['up_okCount'] = up_okCount retData['up_okCount'] = up_okCount
retData['up_errorCount'] = up_errorCount retData['up_errorCount'] = up_errorCount
retData['up_repetCount'] = up_repetCount retData['up_repetCount'] = up_repetCount
...@@ -286,9 +288,9 @@ def beinWork(tyc_code, social_code): ...@@ -286,9 +288,9 @@ def beinWork(tyc_code, social_code):
def doJob(): def doJob():
while True: while True:
# 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息 # 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息
social_code = '' social_code = baseCore.redicPullData(1)
# 判断 如果Redis中已经没有数据,则等待 # 判断 如果Redis中已经没有数据,则等待
if social_code == '': if social_code == 'None':
time.sleep(20) time.sleep(20)
continue continue
data = baseCore.getInfomation(social_code) data = baseCore.getInfomation(social_code)
...@@ -309,30 +311,10 @@ def doJob(): ...@@ -309,30 +311,10 @@ def doJob():
runType = 'NewsRunCount' runType = 'NewsRunCount'
count += 1 count += 1
baseCore.updateRun(social_code, runType, count) baseCore.updateRun(social_code, runType, count)
# up_state = retData['up_state']
total = retData['total'] total = retData['total']
up_okCount = retData['up_okCount'] up_okCount = retData['up_okCount']
up_errorCount = retData['up_errorCount'] up_errorCount = retData['up_errorCount']
up_repetCount = retData['up_repetCount'] up_repetCount = retData['up_repetCount']
# if up_state:
# stateNum = 1
# else:
# stateNum = 4
#
# # 取出数据库中okCount errorCount repetCount 并更新
# selectOrginSql = f"select okCount,errorCount,repetCount,total from ssqy_tyc where id={id}"
# cursor.execute(selectOrginSql)
# count_info = cursor.fetchone()
# okCount = count_info[0]
# errorCount = count_info[1]
# repetCount = count_info[2]
#
# updateEndSql = f"update ssqy_tyc set update_state={stateNum},up_okCount={up_okCount},up_errorCount={up_errorCount},up_repetCount={up_repetCount} ,date_time=now(),okCount={okCount+up_okCount},errorCount={errorCount+up_errorCount},repetCount={repetCount+up_repetCount},total={total} where id={id}"
# cursor.execute(updateEndSql)
# cnx.commit()
log.info( log.info(
f"{id}---{xydm}----{tycid}----结束处理,耗时{baseCore.getTimeCost(start_time, time.time())}---总数:{total}---成功数:{up_okCount}----失败数:{up_errorCount}--重复数:{up_repetCount}") f"{id}---{xydm}----{tycid}----结束处理,耗时{baseCore.getTimeCost(start_time, time.time())}---总数:{total}---成功数:{up_okCount}----失败数:{up_errorCount}--重复数:{up_repetCount}")
...@@ -341,19 +323,7 @@ def doJob(): ...@@ -341,19 +323,7 @@ def doJob():
# 释放资源 # 释放资源
baseCore.close() baseCore.close()
# Press the green button in the gutter to run the script. # Press the green button in the gutter to run the script.
if __name__ == '__main__': if __name__ == '__main__':
doJob() doJob()
# link = 'https://m.thepaper.cn/newsDetail_forward_24049067'
# social_code = '915101006653023886'
# try:
# sel_sql = '''select social_credit_code from brpa_source_article where source_address = %s and social_credit_code = %s and type='2' '''
# print(sel_sql)
# cursor.execute(sel_sql, (link,social_code))
# aa = cursor.fetchone()
# print(aa)
# except Exception as e:
# print(e)
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论