提交 b8689932 作者: XveLingKun

微信公众号日志添加

上级 976e2fb4
...@@ -11,6 +11,7 @@ from bs4 import BeautifulSoup ...@@ -11,6 +11,7 @@ from bs4 import BeautifulSoup
from obs import ObsClient from obs import ObsClient
from kafka import KafkaProducer from kafka import KafkaProducer
from retry import retry
from base.BaseCore import BaseCore from base.BaseCore import BaseCore
baseCore = BaseCore() baseCore = BaseCore()
...@@ -64,6 +65,15 @@ def getjsonInfo(): ...@@ -64,6 +65,15 @@ def getjsonInfo():
cnx_.commit() cnx_.commit()
return dict_json return dict_json
@retry(tries=3,delay=2)
def getrequest(url_news):
ip = baseCore.get_proxy()
res_news = requests.get(url_news, proxies=ip, timeout=20)
if res_news.status_code != 200:
raise
def get_info(dict_json): def get_info(dict_json):
# list_all_info = [] # list_all_info = []
# num_caiji = 0 # num_caiji = 0
...@@ -80,23 +90,32 @@ def get_info(dict_json): ...@@ -80,23 +90,32 @@ def get_info(dict_json):
origin = dict_json['site_name'] origin = dict_json['site_name']
url_news = dict_json['link'] url_news = dict_json['link']
info_source_code = dict_json['info_source_code'] info_source_code = dict_json['info_source_code']
# url_ft = check_url(sid, url_news)
# if url_ft: # try:
# return list_all_info,num_caiji # ip = baseCore.get_proxy()
# res_news = requests.get(url_news, proxies=ip, timeout=20)
# except:
# #400请求失败
# updatewxLink(url_news, info_source_code, 400)
# return False
# 修改请求方法,retry 3次
try: try:
ip = baseCore.get_proxy() res_news = getrequest(url_news)
res_news = requests.get(url_news, proxies=ip,timeout=20)
except: except:
#400请求失败 try:
updatewxLink(url_news,info_source_code,400) res_news = requests.get(url_news, timeout=20)
return False except:
# 400请求失败
updatewxLink(url_news, info_source_code, 400)
return False
soup_news = BeautifulSoup(res_news.content, 'html.parser') soup_news = BeautifulSoup(res_news.content, 'html.parser')
try: try:
news_html = soup_news.find('div', {'id': 'js_content'}) news_html = soup_news.find('div', {'id': 'js_content'})
news_html['style'] = 'width: 814px ; margin: 0 auto;' news_html['style'] = 'width: 814px ; margin: 0 auto;'
#del news_html['style'] #del news_html['style']
news_html=rm_style_attr(news_html) news_html = rm_style_attr(news_html)
del news_html['id'] del news_html['id']
del news_html['class'] del news_html['class']
except: except:
......
...@@ -41,32 +41,6 @@ import pandas as pd ...@@ -41,32 +41,6 @@ import pandas as pd
# pass # pass
import redis import redis
from base.BaseCore import BaseCore
baseCore = BaseCore()
r = baseCore.r
key = 'counter'
expiration_time = 10 # 设置过期时间 60秒
# # 设置自增
# r.incr(key)
# # #自增并设置过期时间
# while True:
# # 设置自增
# r.incr(key)
# value = int(r.get(key).decode())
#
# if value > 10:
# print(value)
# # 设置过期时间
# r.expire(key, expiration_time)
# time.sleep(20)
# print('------------------')
# continue
# # print(value)
# time.sleep(5)
# print(value)
# print("==========")
def check_url(): def check_url():
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn',db=6) r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn',db=6)
...@@ -76,7 +50,22 @@ def check_url(): ...@@ -76,7 +50,22 @@ def check_url():
print('True') print('True')
else: else:
print('False') print('False')
check_url() # check_url()
def test(dic_user_count):
dic_user_count["A"] += 1
# print(dic_user.items())
for key,value in dic_user_count.items():
print(key,value)
return dic_user_count
def test1():
dic_user_count = {"A":0}
for i in range(3):
dic_user_count = test(dic_user_count)
print(dic_user_count)
if __name__ == "__main__":
test1()
...@@ -9,10 +9,12 @@ import requests ...@@ -9,10 +9,12 @@ import requests
import urllib3 import urllib3
from pymysql.converters import escape_string from pymysql.converters import escape_string
import sys import sys
sys.path.append('D:\\kkwork\\zzsn_spider\\base')
sys.path.append('D:\\zzsn_spider\\base')
import BaseCore import BaseCore
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
baseCore =BaseCore.BaseCore() baseCore = BaseCore.BaseCore()
log = baseCore.getLogger() log = baseCore.getLogger()
headers = { headers = {
...@@ -26,26 +28,30 @@ cnx_ = baseCore.cnx ...@@ -26,26 +28,30 @@ cnx_ = baseCore.cnx
cursor_ = baseCore.cursor cursor_ = baseCore.cursor
r = baseCore.r r = baseCore.r
def resHtml(token,url,cookies):
def resHtml(token, url, cookies):
try: try:
ip = baseCore.get_proxy() ip = baseCore.get_proxy()
# s=requests.session()
cookie_jar = requests.utils.cookiejar_from_dict(cookies, cookiejar=None, overwrite=True)
s = requests.session() s = requests.session()
cookie_jar = requests.utils.cookiejar_from_dict(cookies, cookiejar=None, overwrite=True)
s.cookies = cookie_jar s.cookies = cookie_jar
# json_search = s.get(url, headers=headers, proxies=ip, verify=False).json() # json_search = s.get(url, headers=headers, proxies=ip, verify=False).json()
json_search = s.get(url, headers=headers, proxies=ip,verify=False).json() json_search = s.get(url, headers=headers, proxies=ip, verify=False).json()
aa=s.cookies.get_dict() aa = s.cookies.get_dict()
updateCookieToken(token, json.dumps(aa)) updateCookieToken(token, json.dumps(aa))
except Exception as e: except Exception as e:
json_search= {} json_search = {}
return json_search return json_search
#采集失败的公众号 重新放入redis
# 采集失败的公众号 重新放入redis
def rePutIntoR(item): def rePutIntoR(item):
r.rpush('WeiXinGZH:infoSourceCode', item) r.rpush('WeiXinGZH:infoSourceCode', item)
#获取公众号信息
# 获取公众号信息
def getSourceInfo(infoSourceCode): def getSourceInfo(infoSourceCode):
sql = f"SELECT site_uri,id,site_name,info_source_code from info_source where info_source_code = '{infoSourceCode}' " sql = f"SELECT site_uri,id,site_name,info_source_code from info_source where info_source_code = '{infoSourceCode}' "
cursor.execute(sql) cursor.execute(sql)
...@@ -79,42 +85,45 @@ def getSourceInfo(infoSourceCode): ...@@ -79,42 +85,45 @@ def getSourceInfo(infoSourceCode):
cnx_.commit() cnx_.commit()
return False return False
return dic_url return dic_url
#保存错误日志
# 保存错误日志
def insertBadSql(error): def insertBadSql(error):
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,now())" insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,now())"
cursor_.execute(insertSql, tuple(error)) cursor_.execute(insertSql, tuple(error))
cnx_.commit() cnx_.commit()
#保存文章列表数据
def insertWxList(dic_url,json_search,page):
# 保存文章列表数据
def insertWxList(dic_url, json_search, page, user_name):
list_all_news = json_search['app_msg_list'] list_all_news = json_search['app_msg_list']
listCount=0 listCount = 0
repetCount=0 repetCount = 0
insertCount=0 insertCount = 0
for one_news in list_all_news: for one_news in list_all_news:
listCount=listCount+1 listCount = listCount + 1
news_title = one_news['title'] news_title = one_news['title']
timestamp = one_news['update_time'] timestamp = one_news['update_time']
time_local = time.localtime(timestamp) time_local = time.localtime(timestamp)
news_date = time.strftime("%Y-%m-%d %H:%M:%S", time_local) news_date = time.strftime("%Y-%m-%d %H:%M:%S", time_local)
url_news = one_news['link'] url_news = one_news['link']
selectCountSql=f"select count(1) from wx_link where link='{escape_string(url_news)}'" selectCountSql = f"select count(1) from wx_link where link='{escape_string(url_news)}'"
cursor_.execute(selectCountSql) cursor_.execute(selectCountSql)
count = cursor_.fetchone()[0] count = cursor_.fetchone()[0]
if count > 0: if count > 0:
repetCount=repetCount+1 repetCount = repetCount + 1
continue continue
else: else:
insertCount=insertCount+1 insertCount = insertCount + 1
try: try:
insertSql=f"insert into wx_link(sid,site_uri,site_name,info_source_code,title,publish_time,link,state,create_time) values " \ insertSql = f"insert into wx_link(sid,site_uri,site_name,info_source_code,title,publish_time,link,state,create_time) values " \
f"('{dic_url['sid']}','{dic_url['url_']}','{dic_url['name']}','{dic_url['info_source_code']}','{escape_string(news_title)}','{escape_string(news_date)}','{escape_string(url_news)}',0,now())" f"('{dic_url['sid']}','{dic_url['url_']}','{dic_url['name']}','{dic_url['info_source_code']}','{escape_string(news_title)}','{escape_string(news_date)}','{escape_string(url_news)}',0,now())"
cursor_.execute(insertSql) cursor_.execute(insertSql)
cnx_.commit() cnx_.commit()
except Exception as e: except Exception as e:
log.error(f"保存数据库失败:{e}") log.error(f"保存数据库失败:{e}")
# 查询放入之后的id # 查询放入之后的id
selectIdSql = f"select id from wx_link where sid='{dic_url['sid']}' and link='{escape_string(url_news)}'" selectIdSql = f"select id from wx_link where sid='{dic_url['sid']}' and link='{escape_string(url_news)}'"
cursor_.execute(selectIdSql) cursor_.execute(selectIdSql)
...@@ -123,54 +132,60 @@ def insertWxList(dic_url,json_search,page): ...@@ -123,54 +132,60 @@ def insertWxList(dic_url,json_search,page):
try: try:
r.ping() r.ping()
except: except:
r_ = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=6) r_ = redis.Redis(host="114.116.90.53", port=6380, password='clbzzsn', db=6)
r_.lpush('WeiXinGZH:url', linkid) r_.lpush('WeiXinGZH:url', linkid)
continue continue
r.lpush('WeiXinGZH:linkid',linkid) r.lpush('WeiXinGZH:linkid', linkid)
log.info(f"---{dic_url['name']}--第{page}页----总数:{listCount}---重复数:{repetCount}---新增数:{insertCount}-------------") log.info(f"---{dic_url['name']}--第{page}页----总数:{listCount}---重复数:{repetCount}---新增数:{insertCount}-----使用账号{user_name}--------")
if listCount==0: if listCount == 0:
#列表为空认为结束 # 列表为空认为结束
return True return True
if repetCount>= listCount/2: if repetCount >= listCount / 2:
#重复数量大于等于一半认为结束 # 重复数量大于等于一半认为结束
return True return True
#没有结束 # 没有结束
return False return False
#token的处理
def updateTokeen(token,type): # token的处理
if type==2: def updateTokeen(token, type):
#session失效,删除token if type == 2:
# session失效,删除token
cursor_.execute(f"delete from weixin_tokenCookies where token={token}") cursor_.execute(f"delete from weixin_tokenCookies where token={token}")
if type ==1: if type == 1:
#封号了 修改封号时间 # 封号了 修改封号时间
cursor_.execute(f"update weixin_tokenCookies set fenghao_time=now() where token={token}") cursor_.execute(f"update weixin_tokenCookies set fenghao_time=now() where token={token}")
if type ==3: if type == 3:
#封号了 修改封号时间 # 封号了 修改封号时间
cursor_.execute(f"update weixin_tokenCookies set update_time=now() where token={token}") cursor_.execute(f"update weixin_tokenCookies set update_time=now() where token={token}")
cnx_.commit() cnx_.commit()
#token的处理
def updateCookieToken(token,cookies):
# token的处理
def updateCookieToken(token, cookies):
cursor_.execute(f"update weixin_tokenCookies set cookies='{escape_string(cookies)}' where token={token}") cursor_.execute(f"update weixin_tokenCookies set cookies='{escape_string(cookies)}' where token={token}")
cnx_.commit() cnx_.commit()
#获取token
# 获取token
def getToken(): def getToken():
# cursor_.execute(f"select token,cookies from weixin_tokenCookies where fenghao_time < DATE_SUB(NOW(), INTERVAL 2 HOUR) order by update_time asc limit 1") cursor_.execute(
cursor_.execute(f"select token,cookies from weixin_tokenCookies where user_name = 'wahaha2'") f"select token, cookies, user_name from weixin_tokenCookies where fenghao_time < DATE_SUB(NOW(), INTERVAL 2 HOUR) order by update_time asc limit 1")
row = cursor_.fetchall() row = cursor_.fetchall()
cnx_.commit() cnx_.commit()
if row: if row:
pass pass
else: else:
#没有查到token # 没有查到token
log.info("没有拿到token") log.info("没有拿到token")
return False return False
return row[0] return row[0]
#获取列表数据
def getPageData(dic_url,page): # 获取列表数据
def getPageData(dic_url, page, dic_user_count):
url_ = dic_url['url_'] url_ = dic_url['url_']
origin = dic_url['name'] origin = dic_url['name']
info_source_code = dic_url['info_source_code'] info_source_code = dic_url['info_source_code']
...@@ -182,14 +197,17 @@ def getPageData(dic_url,page): ...@@ -182,14 +197,17 @@ def getPageData(dic_url,page):
else: else:
log.info("没有拿到token,开始递归") log.info("没有拿到token,开始递归")
while True: while True:
log.info("没有拿到token,开始休眠") log.info("没有拿到token,开始休眠")
time.sleep(60) time.sleep(60)
log.info("没有拿到token,结束休眠") log.info("没有拿到token,结束休眠")
tokenAndCookie = getToken() tokenAndCookie = getToken()
if tokenAndCookie: if tokenAndCookie:
break break
user_name = tokenAndCookie[2]
token = tokenAndCookie[0] token = tokenAndCookie[0]
log.info(f"获取token到----{token}") log.info(f"获取token到----{token}----{user_name}")
dic_user_count[user_name] = 0
cookies = json.loads(tokenAndCookie[1]) cookies = json.loads(tokenAndCookie[1])
# s.cookies.update(cookies) # s.cookies.update(cookies)
...@@ -199,40 +217,41 @@ def getPageData(dic_url,page): ...@@ -199,40 +217,41 @@ def getPageData(dic_url,page):
# newcookies = requests.utils.dict_from_cookiejar(reponse.cookies, cookiejar=None, overwrite=True) # newcookies = requests.utils.dict_from_cookiejar(reponse.cookies, cookiejar=None, overwrite=True)
# s.cookies = newcookies # s.cookies = newcookies
# updateCookieToken(token,json.dumps(s.cookies)) # updateCookieToken(token,json.dumps(s.cookies))
#调用方法 # 调用方法
json_search=resHtml(token,url,cookies) json_search = resHtml(token, url, cookies)
str_t = json.dumps(json_search) str_t = json.dumps(json_search)
ret = json_search['base_resp']['ret'] ret = json_search['base_resp']['ret']
if ret == 0: if ret == 0:
dic_user_count[user_name] += 1
pass pass
elif ret == 200013: elif ret == 200013:
log.info(f'======{origin}-----{biz}----账号被封=======') log.info(f'======{origin}-----{biz}----{user_name}账号被封=======')
#封号修改token # 封号修改token
updateTokeen(token,1) updateTokeen(token, 1)
return getPageData(dic_url,page) return getPageData(dic_url, page, dic_user_count)
elif ret == 200002: elif ret == 200002:
log.info(f'======{origin}-----{biz}----该号biz错误,请检查=======') log.info(f'======{origin}-----{biz}----该公众号号biz错误,请检查=======')
error = [origin, url_, info_source_code, str_t, '无效biz参数'] error = [origin, url_, info_source_code, str_t, '无效biz参数']
insertBadSql(error) insertBadSql(error)
return True return True
elif ret == 200003: elif ret == 200003:
log.info(f'======{origin}-----{biz}----账号无效session=======') log.info(f'======{origin}-----{biz}----{user_name}账号无效session=======')
# session失效修改token # session失效修改token
updateTokeen(token, 2) updateTokeen(token, 2)
error = [origin, url_, info_source_code, str_t, '无效session'] error = [origin, url_, info_source_code, str_t, '无效session']
insertBadSql(error) insertBadSql(error)
return getPageData(dic_url, page) return getPageData(dic_url, page, dic_user_count)
elif ret == 200074: elif ret == 200074:
#{"base_resp": {"ret": 200074, "err_msg": "default"}} # {"base_resp": {"ret": 200074, "err_msg": "default"}}
log.info(f'======{origin}-----{biz}----账号未登录成功=======') log.info(f'======{origin}-----{biz}----{user_name}账号未登录成功=======')
# session失效修改token # session失效修改token
updateTokeen(token, 2) updateTokeen(token, 2)
error = [origin, url_, info_source_code, str_t, '该账号未登录成功'] error = [origin, url_, info_source_code, str_t, f'{user_name}账号未登录成功']
insertBadSql(error) insertBadSql(error)
return getPageData(dic_url, page) return getPageData(dic_url, page, dic_user_count)
else: else:
log.info(f'======{origin}-----{biz}----账号其他错误=======') log.info(f'======{origin}-----{biz}----{user_name}账号其他错误=======')
error = [origin, url_, info_source_code, str_t, '其他错误'] error = [origin, url_, info_source_code, str_t, '其他错误']
insertBadSql(error) insertBadSql(error)
updateTokeen(token, 2) updateTokeen(token, 2)
...@@ -240,9 +259,10 @@ def getPageData(dic_url,page): ...@@ -240,9 +259,10 @@ def getPageData(dic_url,page):
# 修改token使用时间 # 修改token使用时间
updateTokeen(token, 3) updateTokeen(token, 3)
# 保存数据到数据库 # 保存数据到数据库
return insertWxList(dic_url,json_search,page) return insertWxList(dic_url, json_search, page, user_name), dic_user_count
#获取微信公众号数据
# 获取微信公众号数据
def getWxList(infoSourceCode): def getWxList(infoSourceCode):
dic_url = getSourceInfo(infoSourceCode) dic_url = getSourceInfo(infoSourceCode)
...@@ -250,21 +270,24 @@ def getWxList(infoSourceCode): ...@@ -250,21 +270,24 @@ def getWxList(infoSourceCode):
if dic_url: if dic_url:
pass pass
else: else:
log.info(f'======{infoSourceCode}---------该号biz错误,请检查=======') log.info(f'======{infoSourceCode}---------该公众号biz错误,请检查=======')
error = ['', '', infoSourceCode, '', '该号biz错误'] error = ['', '', infoSourceCode, '', '该公众号biz错误']
insertBadSql(error) insertBadSql(error)
return return
origin = dic_url['name'] origin = dic_url['name']
biz = dic_url['biz'] biz = dic_url['biz']
for page in range(1,6): dic_user_count = {}
retFlag = getPageData(dic_url, page) for page in range(1, 6):
time.sleep(random.randint(60,181)) retFlag, dic_user_count = getPageData(dic_url, page, dic_user_count)
time.sleep(random.randint(60, 181))
if retFlag: if retFlag:
#结束 跳出该公众号 # 结束 跳出该公众号
break break
else: else:
#没有结束 # 没有结束
pass pass
for key, value in dic_user_count.items():
log.info(f"====账号{key},采集公众号个数{value}")
log.info(f"======{origin}-----{biz}----结束采集=======") log.info(f"======{origin}-----{biz}----结束采集=======")
...@@ -274,18 +297,37 @@ def getFromSql(): ...@@ -274,18 +297,37 @@ def getFromSql():
results = cursor.fetchall() results = cursor.fetchall()
cnx.commit() cnx.commit()
result_list = [item[0] for item in results] result_list = [item[0] for item in results]
time.sleep(20)
# 放入redis # 放入redis
for item in result_list: for item in result_list:
r.rpush('WeiXinGZH:infoSourceCode', item) r.rpush('WeiXinGZH:infoSourceCode', item)
if __name__=="__main__":
# 获取redis中的公众号个数
def getnumber_redis():
length = r.llen('WeiXinGZH:infoSourceCode')
return length
if __name__ == "__main__":
numbers = getnumber_redis()
log.info("当前批次采集公众号个数{}".format(numbers))
time.sleep(3)
while True: while True:
start = time.time()
log.info(f"开始时间{baseCore.getNowTime(1)}")
infoSourceCode = baseCore.redicPullData('WeiXinGZH:infoSourceCode') infoSourceCode = baseCore.redicPullData('WeiXinGZH:infoSourceCode')
if infoSourceCode == 'None' or infoSourceCode == None: if infoSourceCode == 'None' or infoSourceCode == None:
log.info("redis已经没有数据了,重新放置数据") log.info("redis已经没有数据了,重新放置数据")
getFromSql() log.info(f"采集完一轮公众号耗时{baseCore.getTimeCost(start, time.time())}")
time.sleep(60) # getFromSql()
infoSourceCode = baseCore.redicPullData('WeiXinGZH:infoSourceCode') # time.sleep(60)
# numbers = getnumber_redis()
# log.info("当前批次采集公众号个数{}".format(numbers))
# break
# infoSourceCode = baseCore.redicPullData('WeiXinGZH:infoSourceCode')
continue
getWxList(infoSourceCode) getWxList(infoSourceCode)
# infoSourceCode = 'IN-20220917-0159' # infoSourceCode = 'IN-20220917-0159'
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论