提交 1cbee20b 作者: 丁双波

工具包提交

上级 c196e8ce
...@@ -20,6 +20,7 @@ parts/ ...@@ -20,6 +20,7 @@ parts/
sdist/ sdist/
var/ var/
logs logs
*.log
wheels/ wheels/
*.egg-info/ *.egg-info/
.installed.cfg .installed.cfg
......
import logging
import os import os
import random import random
import sys import sys
import time import time
import uuid import logbook
from base import config import logbook.more
# 核心工具包
import pymysql
#计算耗时 # 注意 程序退出前 调用BaseCore.close() 关闭相关资源
def getTimeCost(start,end): class BaseCore:
seconds=int(end-start) # 序列号
m, s = divmod(seconds, 60) __seq = 0
h, m = divmod(m, 60) # 代理池 数据库连接
if(h>0): __cnx_proxy =None
return "%d小时%d分钟%d秒" % (h, m, s) __cursor_proxy = None
elif(m>0): # agent 池
return "%d分钟%d秒" % (m, s) __USER_AGENT_LIST = [
elif(seconds>0):
return "%d秒" % ( s)
else:
ms=int((end - start) * 1000)
return "%d毫秒" % (ms)
#获取日志
def getLogger():
logger = logging.getLogger("logger")
logger.setLevel(logging.INFO)
# 创建处理器:sh为控制台处理器,fh为文件处理器
sh = logging.StreamHandler()
#创建处理器:sh为控制台处理器,fh为文件处理器, log_file为日志存放的文件夹
# log_file = os.path.join(log_dir,"{}_log".format(time.strftime("%Y/%m/%d",time.localtime())))
#F:\python_code\pythonDemo\logs
dirname, filename = os.path.split(os.path.abspath(sys.argv[0]))
dirname = os.path.join(dirname, "logs")
if not os.path.exists(dirname):
os.mkdir(dirname)
log_file_path = os.path.join(dirname, filename+".log")
fh = logging.FileHandler(log_file_path, encoding="UTF-8")
# 创建格式器,并将sh,fh设置对应的格式
formator = logging.Formatter(fmt="%(asctime)s %(filename)s %(levelname)s %(message)s",datefmt="%Y-%m-%d %X")
sh.setFormatter(formator)
fh.setFormatter(formator)
# 将处理器,添加至日志器中
logger.addHandler(sh)
logger.addHandler(fh)
return logger
#获取流水号
def getNextSeq():
config.seq+=1
if(config.seq>1000):
config.seq=0
return getNowTime(2)+str(config.seq).zfill(3)
#当前时间格式化
# 1 : 2001-01-01 12:00:00
# 2 : 010101120000
# 时间戳 3:1690179526555
def getNowTime(type):
now_time=""
if(type==1):
now_time=time.strftime("%Y-%m-%d %H:%M:%S")
if (type == 2):
now_time = time.strftime("%y%m%d%H%M%S")
if type==3:
now_time=int(time.time()*1000)
return now_time
#获取代理
def get_proxy(cursor):
sql = "select proxy from clb_proxy"
cursor.execute(sql)
proxy_lists = cursor.fetchall()
ip_list = []
for proxy_ in proxy_lists:
ip_list.append(str(proxy_).replace("('", '').replace("',)", ''))
proxy_list = []
for str_ip in ip_list:
str_ip_list = str_ip.split('-')
proxyMeta = "http://%(host)s:%(port)s" % {
"host": str_ip_list[0],
"port": str_ip_list[1],
}
proxy = {
"HTTP": proxyMeta,
"HTTPS": proxyMeta
}
proxy_list.append(proxy)
return proxy_list[random.randint(0, 3)]
USER_AGENT_LIST = [
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.90 Safari/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.90 Safari/537.36',
'Mozilla/5.0 (Windows; U; Windows NT 5.2; en-US) AppleWebKit/525.13 (KHTML, like Gecko) Chrome/0.2.149.29 Safari/525.13', 'Mozilla/5.0 (Windows; U; Windows NT 5.2; en-US) AppleWebKit/525.13 (KHTML, like Gecko) Chrome/0.2.149.29 Safari/525.13',
'Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US) AppleWebKit/531.4 (KHTML, like Gecko) Chrome/3.0.194.0 Safari/531.4', 'Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US) AppleWebKit/531.4 (KHTML, like Gecko) Chrome/3.0.194.0 Safari/531.4',
...@@ -280,22 +205,105 @@ USER_AGENT_LIST = [ ...@@ -280,22 +205,105 @@ USER_AGENT_LIST = [
'Mozilla/5.0 (Windows; U; Windows NT 6.0; en-US) AppleWebKit/534.8 (KHTML, like Gecko) Chrome/7.0.521.0 Safari/534.8', 'Mozilla/5.0 (Windows; U; Windows NT 6.0; en-US) AppleWebKit/534.8 (KHTML, like Gecko) Chrome/7.0.521.0 Safari/534.8',
'Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.9.1b2pre) Gecko/20081015 Fennec/1.0a1', 'Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.9.1b2pre) Gecko/20081015 Fennec/1.0a1',
'Mozilla/5.0 (iPod; U; CPU iPhone OS 4_3_3 like Mac OS X; en-us) AppleWebKit/533.17.9 (KHTML, like Gecko) Version/5.0.2 Mobile/8J2 Safari/6533.18.5' 'Mozilla/5.0 (iPod; U; CPU iPhone OS 4_3_3 like Mac OS X; en-us) AppleWebKit/533.17.9 (KHTML, like Gecko) Version/5.0.2 Mobile/8J2 Safari/6533.18.5'
] ]
def getRandomUserAgent(headers): def close(self):
headers['User-Agent']= random.choice(USER_AGENT_LIST) try:
return headers self.__cursor_proxy.close()
#获取信用代码 self.__cnx_proxy.close()
def getNextXydm(): except :
config.seq+=1 pass
if(config.seq>1000): def __init__(self):
config.seq=0 self.__cnx_proxy = pymysql.connect(host='114.115.159.144', user='root', password='zzsn9988', db='clb_project',
return "ZZSN"+getNowTime(2)+str(config.seq).zfill(3) charset='utf8mb4')
self.__cursor_proxy= self.__cnx_proxy.cursor()
pass
def getUUID(): # 当前时间格式化
uid = str(uuid.uuid4()) # 1 : 2001-01-01 12:00:00 %Y-%m-%d %H:%M:%S
uid = ''.join(uid.split('-')) # 2 : 010101120000 %y%m%d%H%M%S
print(uid) # 时间戳 3:1690179526555 精确到秒
def getNowTime(self, type):
now_time = ""
if type == 1:
now_time = time.strftime("%Y-%m-%d %H:%M:%S")
if type == 2:
now_time = time.strftime("%y%m%d%H%M%S")
if type == 3:
now_time = int(time.time() * 1000)
return now_time
# 获取流水号
def getNextSeq(self):
self.__seq += 1
if self.__seq > 1000:
self.__seq = 0
return self.getNowTime(2) + str(self.__seq).zfill(3)
# 获取信用代码
def getNextXydm(self):
self.__seq += 1
if self.__seq > 1000:
self.__seq = 0
return "ZZSN" + self.getNowTime(2) + str(self.__seq).zfill(3)
# 日志格式
def logFormate(self,record, handler):
formate = "[{date}] [{level}] [{filename}] [{func_name}] [{lineno}] {msg}".format(
date=record.time, # 日志时间
level=record.level_name, # 日志等级
filename=os.path.split(record.filename)[-1], # 文件名
func_name=record.func_name, # 函数名
lineno=record.lineno, # 行号
msg=record.message # 日志内容
)
return formate
# 获取logger
def getLogger(self,fileLogFlag=True, stdOutFlag=True):
dirname, filename = os.path.split(os.path.abspath(sys.argv[0]))
dirname = os.path.join(dirname, "logs")
filename = filename.replace(".py", "") + ".log"
if not os.path.exists(dirname):
os.mkdir(dirname)
logbook.set_datetime_format('local')
logger = logbook.Logger(filename)
logger.handlers = []
if fileLogFlag: # 日志输出到文件
logFile = logbook.TimedRotatingFileHandler(os.path.join(dirname, filename), date_format='%Y-%m-%d',
bubble=True, encoding='utf-8')
logFile.formatter = self.logFormate
logger.handlers.append(logFile)
if stdOutFlag: # 日志打印到屏幕
logStd = logbook.more.ColorizedStderrHandler(bubble=True)
logStd.formatter = self.logFormate
logger.handlers.append(logStd)
return logger
# 获取随机的userAgent
def getRandomUserAgent(self):
return random.choice(self.__USER_AGENT_LIST)
if __name__ == '__main__': # 获取代理
print(getNowTime(3)) def get_proxy(self):
\ No newline at end of file sql = "select proxy from clb_proxy"
self.__cursor_proxy.execute(sql)
proxy_lists = self.__cursor_proxy.fetchall()
ip_list = []
for proxy_ in proxy_lists:
ip_list.append(str(proxy_).replace("('", '').replace("',)", ''))
proxy_list = []
for str_ip in ip_list:
str_ip_list = str_ip.split('-')
proxyMeta = "http://%(host)s:%(port)s" % {
"host": str_ip_list[0],
"port": str_ip_list[1],
}
proxy = {
"HTTP": proxyMeta,
"HTTPS": proxyMeta
}
proxy_list.append(proxy)
return proxy_list[random.randint(0, 3)]
\ No newline at end of file
seq=0
\ No newline at end of file
from base import BaseTools
log =BaseTools.getLogger() from base.BaseCore import BaseCore
baseCore = BaseCore()
log =baseCore.getLogger()
if __name__ == '__main__': if __name__ == '__main__':
log.info("ok") log.info("ok")
#获取流水号
print(baseCore.getNextSeq())
print(baseCore.getNextSeq())
# 获取随机agent
print(baseCore.getRandomUserAgent())
# 获取代理池
print(baseCore.get_proxy())
# 释放相关资源
baseCore.close()
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论