提交 0b2f4ac1 作者: 薛凌堃

微信公众号

上级 fe8d1cca
...@@ -481,7 +481,7 @@ class BaseCore: ...@@ -481,7 +481,7 @@ class BaseCore:
def writerToExcel(self,detailList,filename): def writerToExcel(self,detailList,filename):
# filename='baidu搜索.xlsx' # filename='baidu搜索.xlsx'
# 读取已存在的xlsx文件 # 读取已存在的xlsx文件
existing_data = pd.read_excel(filename,engine='openpyxl') existing_data = pd.read_excel(filename,engine='openpyxl',dtype=str)
# 创建新的数据 # 创建新的数据
new_data = pd.DataFrame(data=detailList) new_data = pd.DataFrame(data=detailList)
# 将新数据添加到现有数据的末尾 # 将新数据添加到现有数据的末尾
......
# -*- coding: utf-8 -*-
''' '''
记录一天能采多少公众号 记录一天能采多少公众号,建一个数据库表 更新公众号的状态
''' '''
import requests, time, random, json, pymysql, redis import requests, time, random, json, pymysql, redis
...@@ -17,13 +18,17 @@ from base.BaseCore import BaseCore ...@@ -17,13 +18,17 @@ from base.BaseCore import BaseCore
import os import os
baseCore = BaseCore() baseCore = BaseCore()
log = baseCore.getLogger() log = baseCore.getLogger()
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
cnx = pymysql.connect(host="114.116.44.11", user="root", password="f7s0&7qqtK", db="clb_project", charset="utf8mb4")
cursor = cnx.cursor()
r = baseCore.r
urllib3.disable_warnings() urllib3.disable_warnings()
def check_url(sid, article_url): def check_url(sid, article_url):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn') r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn')
res = r.sismember(f'wx_url_{sid}', article_url) # 注意是 保存set的方式 res = r.sismember(f'wx_url_{sid}',article_url)
if res == 1: if res == 1:
return True return True
else: else:
...@@ -63,7 +68,7 @@ def get_proxy(): ...@@ -63,7 +68,7 @@ def get_proxy():
return proxy_list return proxy_list
def get_info(json_search): def get_info(sid,json_search):
num_caiji = 0 num_caiji = 0
kaishi_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) kaishi_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
obsClient = ObsClient( obsClient = ObsClient(
...@@ -81,9 +86,9 @@ def get_info(json_search): ...@@ -81,9 +86,9 @@ def get_info(json_search):
url_news = one_news['link'] url_news = one_news['link']
# url_ft = check_url(sid, url_news) url_ft = check_url(sid, url_news)
# if url_ft: if url_ft:
# return list_all_info,url_news,news_title return list_all_info,num_caiji
try: try:
res_news = requests.get(url_news, timeout=20) res_news = requests.get(url_news, timeout=20)
except: except:
...@@ -97,10 +102,24 @@ def get_info(json_search): ...@@ -97,10 +102,24 @@ def get_info(json_search):
del news_html['class'] del news_html['class']
except: except:
pass pass
try:
news_content = news_html.text news_content = news_html.text
except:
log.info(f'--------内容为空--------{url_news}--------')
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
false = [
news_title,
url_news,
news_html,
'文章内容为空',
time_now
]
insertSql = f"insert into WeixinGZH (site_name,site_url,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(false))
cnx_.commit()
continue
list_img = news_html.find_all('img') list_img = news_html.find_all('img')
for num_img in range(len(list_img)): for num_img in range(len(list_img)):
img_one = list_img[num_img] img_one = list_img[num_img]
...@@ -149,18 +168,19 @@ def get_info(json_search): ...@@ -149,18 +168,19 @@ def get_info(json_search):
'source': '11', 'source': '11',
'createDate': time_now 'createDate': time_now
} }
# for nnn in range(0, 3): for nnn in range(0, 3):
# try: try:
# producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092']) producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
# kafka_result = producer.send("crawlerInfo", json.dumps(dic_info, ensure_ascii=False).encode('utf8')) kafka_result = producer.send("crawlerInfo", json.dumps(dic_info, ensure_ascii=False).encode('utf8'))
# kafka_time_out = kafka_result.get(timeout=10) kafka_time_out = kafka_result.get(timeout=10)
# # add_url(sid, url_news) # add_url(sid, url_news)
# break break
# except: except:
# time.sleep(5) time.sleep(5)
# continue continue
num_caiji = num_caiji + 1 num_caiji = num_caiji + 1
list_all_info.append(dic_info) list_all_info.append(dic_info)
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_info2 = { dic_info2 = {
'infoSourceId': sid, 'infoSourceId': sid,
...@@ -171,16 +191,45 @@ def get_info(json_search): ...@@ -171,16 +191,45 @@ def get_info(json_search):
'dispatcherStatus': '1', 'dispatcherStatus': '1',
'source': '1', 'source': '1',
} }
# for nnn2 in range(0, 3): for nnn2 in range(0, 3):
# try: try:
# producer2 = KafkaProducer(bootstrap_servers=['114.115.159.144:9092']) producer2 = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
# kafka_result2 = producer2.send("collectionAndDispatcherInfo", kafka_result2 = producer2.send("collectionAndDispatcherInfo",
# json.dumps(dic_info2, ensure_ascii=False).encode('utf8')) json.dumps(dic_info2, ensure_ascii=False).encode('utf8'))
# break break
# except: except:
# time.sleep(5) time.sleep(5)
# continue continue
return list_all_info,url_news,news_title return list_all_info,num_caiji
#定时
def getFromSql():
selectSql = "SELECT info_source_code from info_source where site_uri like '%mp.weixin.qq.com%'"
cursor.execute(selectSql)
results = cursor.fetchall()
result_list = [item[0] for item in results]
#放入redis
for item in result_list:
r.rpush('WeiXinGZH:infoSourceCode', item)
#刷新浏览器并获得token
def flushAndGetToken(list_b):
browser_run = list_b[0]
log.info('======刷新浏览器=====')
browser_run.refresh()
cookie_list = browser_run.get_cookies()
cur_url = browser_run.current_url
token = cur_url.split('token=')[1]
log.info(f'===========当前token为:{token}============')
cookies = {}
for cookie in cookie_list:
cookies[cookie['name']] = cookie['value']
return token,cookies
#采集失败的公众号 重新放入redis
def rePutIntoR(item):
r.rpush('WeiXinGZH:infoSourceCode', item)
if __name__=="__main__": if __name__=="__main__":
...@@ -195,288 +244,212 @@ if __name__=="__main__": ...@@ -195,288 +244,212 @@ if __name__=="__main__":
opt = webdriver.ChromeOptions() opt = webdriver.ChromeOptions()
opt.add_argument( opt.add_argument(
'user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36') 'user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36')
# opt.add_argument(f"--proxy-server={ip}")
opt.add_argument("--ignore-certificate-errors") opt.add_argument("--ignore-certificate-errors")
opt.add_argument("--ignore-ssl-errors") opt.add_argument("--ignore-ssl-errors")
opt.add_experimental_option("excludeSwitches", ["enable-automation"]) opt.add_experimental_option("excludeSwitches", ["enable-automation"])
opt.add_experimental_option('excludeSwitches', ['enable-logging']) opt.add_experimental_option('excludeSwitches', ['enable-logging'])
opt.add_experimental_option('useAutomationExtension', False) opt.add_experimental_option('useAutomationExtension', False)
opt.binary_location =r'D:\crawler\baidu_crawler\tool\Google\Chrome\Application\chrome.exe' # opt.binary_location =r'D:\crawler\baidu_crawler\tool\Google\Chrome\Application\chrome.exe'
chromedriver = r'C:\Users\WIN10\DataspellProjects\crawlerProjectDemo\tmpcrawler\cmd100\chromedriver.exe' # chromedriver = r'C:\Users\WIN10\DataspellProjects\crawlerProjectDemo\tmpcrawler\cmd100\chromedriver.exe'
chromedriver = r'D:/chrome/chromedriver.exe'
browser1 = webdriver.Chrome(chrome_options=opt, executable_path=chromedriver) browser1 = webdriver.Chrome(chrome_options=opt, executable_path=chromedriver)
list_b = [browser1] list_b = [browser1]
url = "https://mp.weixin.qq.com/" url = "https://mp.weixin.qq.com/"
browser1.get(url) browser1.get(url)
# browser2.get(url)
# browser3.get(url)
# 可改动 # 可改动
time.sleep(30) time.sleep(30)
num_b = 0 num_b = 0
browser_run = list_b[0]
log.info('======刷新浏览器=====') # todo:从数据库里读信息,放入redis,定时任务 每天放入数据
browser_run.refresh() # getFromSql()
cookie_list = browser_run.get_cookies()
cur_url = browser_run.current_url
token = cur_url.split('token=')[1]
log.info(f'===========当前token为:{token}============')
cookies = {}
for cookie in cookie_list:
cookies[cookie['name']] = cookie['value']
s = requests.session() s = requests.session()
# 记录运行公众号的个数 # 记录运行公众号的个数
count = 0 count = 0
while True: while True:
all = [] # 刷新浏览器并获取当前token和cookie
token, cookies = flushAndGetToken(list_b)
list_all_info = [] list_all_info = []
list_error_url = []
list_laiyuan = []
cnx = pymysql.connect(host="114.116.44.11", user="root", password="f7s0&7qqtK", db="clb_project", charset="utf8mb4")
log.info('===========获取公众号============') log.info('===========获取公众号============')
start_ = time.time() start_ = time.time()
with cnx.cursor() as cursor: # #todo:redis中数据 pop一条
sql = "SELECT site_uri,id,site_name,info_source_code from info_source where site_uri like '%mp.weixin.qq.com%'" infoSourceCode = baseCore.redicPullData('WeiXinGZH:infoSourceCode')
if infoSourceCode == 'None':
#当一次采集完之后,重新插入数据并等待插入完成
getFromSql()
time.sleep(20)
log.info(f'========本次公众号已采集完毕,共采集{count}个公众号=========总耗时:{baseCore.getTimeCost(start_,time.time())}')
continue
sql = f"SELECT site_uri,id,site_name,info_source_code from info_source where info_source_code = '{infoSourceCode}' "
# '一带一路百人论坛'
# sql = f"SELECT site_uri,id,site_name,info_source_code from info_source where site_name = '一带一路百人论坛' "
cursor.execute(sql) cursor.execute(sql)
rows = cursor.fetchall() row = cursor.fetchone()
# 将数据库中的数据切分为两部分
for row in rows:
# print(len(rows[:945]))
# if row[2]=='南方周末':
dic_url = { dic_url = {
'url': row[0], 'url_': row[0],
'sid': row[1], 'sid': row[1],
'name': row[2], 'name': row[2],
'info_source_code': row[3], 'info_source_code': row[3],
'biz': '' 'biz': ''
} }
list_laiyuan.append(dic_url)
log.info(f'===========获取公众号完成,耗时{baseCore.getTimeCost(start_,time.time())}============')
# list_laiyuan.reverse()
log.info('===========获取biz==========') log.info('===========获取biz==========')
start__ = time.time() s.cookies.update(cookies)
for dic_one in list_laiyuan: url_ = dic_url['url_']
url = dic_one['url'] origin = dic_url['name']
info_source_code = dic_url['info_source_code']
sid = dic_url['sid']
try: try:
biz = url.split('__biz=')[1].split('==&')[0].split('=')[0] biz = url_.split('__biz=')[1].split('==&')[0].split('=')[0]
dic_one['biz'] = biz dic_url['biz'] = biz
except: except Exception as e:
continue log.info(f'---公众号--{origin}---biz错误')
log.info(f'==========获取biz完成,耗时{baseCore.getTimeCost(start__,time.time())}==========')
# list_biz.append(biz)
# list_laiyuan.reverse()
#记录错误的biz及相关信息
biz_error_biz = []
biz_error_origin = []
biz_error_code = []
#记录解析成功但采集失败的相关信息
get_error_biz = []
get_error_origin = []
get_error_code = []
#记录解析失败的相关信息
json_error_biz = []
json_error_origin = []
json_error_code = []
for num_biz in range(0, len(list_laiyuan)):
browser_run.refresh()
cookie_list = browser_run.get_cookies()
cur_url = browser_run.current_url
token = cur_url.split('token=')[1]
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
log.info(f'=========刷新时间:{time_now}=========') error = [
log.info(f'=========当前token为:{token}=========') origin,
cookies = {} url_,
for cookie in cookie_list: info_source_code,
cookies[cookie['name']] = cookie['value'] e,
'biz错误',
list_url = [] time_now
s.cookies.update(cookies) ]
sid = list_laiyuan[num_biz]['sid'] insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)"
origin = list_laiyuan[num_biz]['name'] cursor_.execute(insertSql, tuple(error))
info_source_code = list_laiyuan[num_biz]['info_source_code'] cnx_.commit()
biz = list_laiyuan[num_biz]['biz']
if biz:
pass
else:
continue continue
fakeid = biz + '==' fakeid = biz + '=='
url_search = f'https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin=5&count=5&fakeid={fakeid}&type=9&query=&token={token}&lang=zh_CN&f=json&ajax=1' url_search = f'https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin=5&count=5&fakeid={fakeid}&type=9&query=&token={token}&lang=zh_CN&f=json&ajax=1'
try: try:
ip = get_proxy()[random.randint(0, 3)] ip = get_proxy()[random.randint(0, 3)]
json_search = s.get(url_search, headers=headers, proxies=ip, json_search = s.get(url_search, headers=headers, proxies=ip,
verify=False).json() # , proxies=ip, verify=False verify=False).json() # , proxies=ip, verify=False
str_t = json.dumps(json_search)
time.sleep(2) time.sleep(2)
except: except:
log.info(f'===公众号{origin}请求失败!当前时间:{baseCore.getNowTime(1)}===') log.error(f'===公众号{origin}请求失败!当前时间:{baseCore.getNowTime(1)}===')
# error_text = str(json_search) rePutIntoR(info_source_code)
json_search = '' continue
aa = time.sleep(600) #{"base_resp": {"ret": 200003, "err_msg": "invalid session"}}
log.info(f'======等待时间{aa}=======') # TODO:需要判断返回值,根据返回值判断是封号还是biz错误
break # {'base_resp': {'err_msg': 'freq control', 'ret': 200013}}========= 封号
try: # {'base_resp': {'err_msg': 'invalid args', 'ret': 200002}} 公众号biz错误 链接
list_all = json_search['app_msg_list'] # 'base_resp': {'err_msg': 'ok', 'ret': 0} 正常
except: ret = json_search['base_resp']['ret']
#解析失败的情况 if ret == 0:
count += 1
# (f'{fakeid}:biz错误!')
log.info(f'{fakeid}:biz错误!、公众号为{origin}=====当前时间:{baseCore.getNowTime(1)}')
biz_error_biz.append(biz)
biz_error_origin.append(origin)
biz_error_code.append(info_source_code)
df_error_biz = pd.DataFrame({'公众号': biz_error_origin,
'code': biz_error_code,
'错误biz': biz_error_biz,
})
excel_name = time.strftime("%Y-%m-%d", time.localtime())
#原来:
# df_error_biz.to_excel(f'./错误biz/{excel_name}.xlsx', index=False)
#改为:
file_path = f'./错误biz/{excel_name}.xlsx'
if os.path.exists(file_path):
pass pass
else: elif ret == 200013:
workbook = Workbook() # 重新放入redis
workbook.save(file_path) # time.sleep(3600)
workbook.close() # 刷新 暂时用一下方法
# with pd.ExcelWriter(file_path, engine='xlsxwriter', rePutIntoR(info_source_code)
# options={'strings_to_urls': False}) as writer: log.info(f'======该账号被封=======')
for i in range(0,6): #600,1200,1800,2400,3000,3600
baseCore.writerToExcel(df_error_biz, file_path)
# combined_data.to_excel(writer, index=False)
bb = time.sleep(3600)
log.info(f'========当前账号可能被封,等待时长{bb}======')
#刷新 #刷新
log.info(f'=============刷新浏览器=============') wait_time = time.sleep(600)
log.info(f'=======等待时间{wait_time}秒=====刷新浏览器=====')
browser_run = list_b[0]
browser_run.refresh() browser_run.refresh()
cookie_list = browser_run.get_cookies() continue
cur_url = browser_run.current_url elif ret == 200002:
token = cur_url.split('token=')[1] # 公众号链接错误 保存库里 记录错误信息及错误类型
log.info(f'=========当前token:{token}=========') time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
cookies = {} error = [
for cookie in cookie_list: origin,
cookies[cookie['name']] = cookie['value'] url_,
info_source_code,
str_t,
'无效biz参数',
time_now
]
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(error))
cnx_.commit()
log.info(f'公众号----{origin}----耗时{baseCore.getTimeCost(start_,time.time())}')
continue
elif ret == 200003:
# 无效的session
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
error = [
origin,
url_,
info_source_code,
str_t,
'无效session',
time_now
]
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(error))
cnx_.commit()
log.info(f'公众号----{origin}----耗时{baseCore.getTimeCost(start_, time.time())}')
continue
else:
log.info(f'----其他情况-----{json_search}---公众号{origin}------')
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
error = [
origin,
url_,
info_source_code,
str_t,
'其他错误',
time_now
]
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(error))
cnx_.commit()
continue continue
if list_all: list_all = json_search['app_msg_list']
str_t = json.dumps(json_search)
try: try:
list_all_info,url_news,news_title = get_info(json_search) list_all_info,num_caiji= get_info(sid,json_search)
time.sleep(2) time.sleep(2)
if len(list_all_info) != 0:
count += 1 count += 1
if len(list_all_info): time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
for dic_one in list_all_info: success = [
origin,
all.append(dic_one) url_,
# df_info = pd.DataFrame(all) info_source_code,
excel_name = time.strftime("%Y-%m-%d", time.localtime()) num_caiji,
'采集成功',
try: time_now
file_path = f'./运行结果/{excel_name}_实时数据.xlsx' ]
if os.path.exists(file_path): #成功信息保存
pass insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,success_info,success_num,create_time) values (%s,%s,%s,%s,%s,%s)"
else: cursor_.execute(insertSql, tuple(success))
workbook = Workbook() cnx_.commit()
workbook.save(file_path)
workbook.close()
# df_info.to_excel(f'./运行结果/{excel_name}_实时数据.xlsx', index=False)
# with pd.ExcelWriter(file_path, engine='xlsxwriter',
# options={'strings_to_urls': False}) as writer:
baseCore.writerToExcel(all,file_path)
# combined_data.to_excel(writer, index=False)
except:
file_path = f'./运行结果/{excel_name}_2_实时数据.xlsx'
if os.path.exists(file_path):
pass
else:
workbook = Workbook()
workbook.save(file_path)
workbook.close()
# df_info.to_excel(f'./运行结果/{excel_name}_2_实时数据.xlsx', index=False)
# with pd.ExcelWriter(file_path, engine='xlsxwriter',
# options={'strings_to_urls': False}) as writer:
baseCore.writerToExcel(all, file_path)
# combined_data.to_excel(writer, index=False)
# 该公众号的所有文章采集完成 # 该公众号的所有文章采集完成
# print(f'{fakeid}:采集成功!') log.info(f'{fakeid}、公众号{origin}:采集成功!、已采集{count}个公众号、耗时{baseCore.getTimeCost(start_,time.time())}')
log.info(f'{fakeid}、公众号{origin}:采集成功!、已采集{count}个公众号')
else: else:
log.info(f'{fakeid}、公众号{origin}:{url_news},{news_title}已采集过该文章!、已采集{count}个公众号') log.info(f'{fakeid}、公众号{origin}、网址已存在!耗时{baseCore.getTimeCost(start_,time.time())}')
except Exception as e:
except:
# json解析该公众号成功但采集数据失败 # json解析该公众号成功但采集数据失败
count += 1 count += 1
log.info(f'{fakeid}、公众号:{origin}采集失败!!!!!!已采集{count}个公众号') time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# print(f'{fakeid}:解析失败!!!!!!') false = [
list_error_url.append(str_t) origin,
get_error_origin.append(origin) url_,
get_error_code.append(info_source_code) info_source_code,
excel_name = time.strftime("%Y-%m-%d", time.localtime()) e,
'采集失败',
df_error_url = pd.DataFrame({'公众号:': get_error_origin, time_now
'code': get_error_code, ]
'信息': list_error_url}) # 成功信息保存
file_path = f'./保存失败/{excel_name}.xlsx' insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)"
if os.path.exists(file_path): cursor_.execute(insertSql, tuple(false))
pass cnx_.commit()
else: log.info(f'{fakeid}、公众号:{origin}采集失败!!!!!!耗时{baseCore.getTimeCost(start_, time.time())}')
workbook = Workbook()
workbook.save(file_path)
workbook.close()
# df_error_url.to_excel(f'./保存失败/{excel_name}.xlsx', index=False)
# with pd.ExcelWriter(file_path,engine='xlsxwriter',options={'strings_to_urls':False}) as writer:
baseCore.writerToExcel(df_error_url, file_path)
# combined_data.to_excel(writer,index=False)
time.sleep(1)
else:
# list_all为空
count += 1
time_end = time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())
# print(f'{fakeid}:运行出错!时间为:{time_end}')
log.info(f'{fakeid}、公众号{origin}:list_all为空!已采集{count}个公众号、时间为:{time_end}')
json_error_biz.append(fakeid)
json_error_origin.append(origin)
json_error_code.append(info_source_code)
df_error_json = pd.DataFrame({'公众号:': json_error_origin,
'code': json_error_code,
'信息': json_error_biz})
file_path = f'./错误文件/{time_end}.xlsx'
if os.path.exists(file_path):
pass
else:
workbook = Workbook()
workbook.save(file_path)
workbook.close()
# df_error_json.to_excel(f'./错误文件/{time_end}.xlsx', index=False)
# with pd.ExcelWriter(file_path, engine='xlsxwriter',
# options={'strings_to_urls': False}) as writer:
baseCore.writerToExcel(df_error_json, file_path)
# combined_data.to_excel(writer, index=False)
time_end = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
log.info(f'运行结束,时间为:{time_end}')
print(f'运行结束,时间为:{time_end}')
df_info = pd.DataFrame(list_all_info)
excel_name = time.strftime("%Y-%m-%d", time.localtime())
df_info.to_excel(f'./运行结果/{excel_name}_总数据.xlsx', index=False)
list_b[0].refresh()
time.sleep(2) time.sleep(2)
#关闭资源
cnx.close()
cursor.close()
baseCore.close()
import pandas as pd import pandas as pd
def writeaa(): # def writeaa():
detailList=[] # detailList=[]
aa={ # aa={
'id':3, # 'id':3,
'name':'qqqwe' # 'name':'qqqwe'
} # }
detailList.append(aa) # detailList.append(aa)
writerToExcel(detailList) # writerToExcel(detailList)
# 将数据追加到excel # 将数据追加到excel
def writerToExcel(detailList): # def writerToExcel(detailList):
# filename='baidu搜索.xlsx' # # filename='baidu搜索.xlsx'
# 读取已存在的xlsx文件 # # 读取已存在的xlsx文件
existing_data = pd.read_excel(filename,engine='openpyxl') # existing_data = pd.read_excel(filename,engine='openpyxl')
# 创建新的数据 # # 创建新的数据
new_data = pd.DataFrame(data=detailList) # new_data = pd.DataFrame(data=detailList)
# 将新数据添加到现有数据的末尾 # # 将新数据添加到现有数据的末尾
combined_data = existing_data.append(new_data, ignore_index=True) # combined_data = existing_data.append(new_data, ignore_index=True)
# 将结果写入到xlsx文件 # # 将结果写入到xlsx文件
combined_data.to_excel(filename, index=False) # combined_data.to_excel(filename, index=False)
#
# from openpyxl import Workbook
#
# if __name__ == '__main__':
# filename='test1.xlsx'
# # # 创建一个工作簿
# workbook = Workbook(filename)
# workbook.save(filename)
# writeaa()
from openpyxl import Workbook gpdm = '01109.HK'
if 'HK' in str(gpdm):
if __name__ == '__main__': tmp_g = str(gpdm).split('.')[0]
filename='test1.xlsx' if len(tmp_g) == 5:
# # 创建一个工作簿 gpdm = str(gpdm)[1:]
workbook = Workbook(filename) print(gpdm)
workbook.save(filename) else:
writeaa() pass
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论