提交 f0246e72 作者: 薛凌堃

微信公众号

上级 0bf17a9a
...@@ -41,30 +41,6 @@ def add_url(sid, article_url): ...@@ -41,30 +41,6 @@ def add_url(sid, article_url):
else: else:
return False return False
def get_proxy():
cnx = pymysql.connect(host="114.115.159.144", user="root", password="zzsn9988", db="clb_project", charset="utf8mb4")
with cnx.cursor() as cursor:
sql = "select proxy from clb_proxy"
cursor.execute(sql)
proxy_lists = cursor.fetchall()
ip_list = []
for proxy_ in proxy_lists:
ip_list.append(str(proxy_).replace("('", '').replace("',)", ''))
proxy_list = []
for str_ip in ip_list:
str_ip_list = str_ip.split('-')
proxyMeta = "http://%(host)s:%(port)s" % {
"host": str_ip_list[0],
"port": str_ip_list[1],
}
proxy = {
"HTTP": proxyMeta,
"HTTPS": proxyMeta
}
proxy_list.append(proxy)
return proxy_list
#定时
def getFromSql(): def getFromSql():
selectSql = "SELECT info_source_code from info_source where site_uri like '%mp.weixin.qq.com%'" selectSql = "SELECT info_source_code from info_source where site_uri like '%mp.weixin.qq.com%'"
cursor.execute(selectSql) cursor.execute(selectSql)
...@@ -117,7 +93,7 @@ def get_info(sid,json_search,origin,url_,info_source_code,page): ...@@ -117,7 +93,7 @@ def get_info(sid,json_search,origin,url_,info_source_code,page):
log.info(f'-----{origin}--第{page}页--已采过该篇文章--文章链接--{url_news}-----') log.info(f'-----{origin}--第{page}页--已采过该篇文章--文章链接--{url_news}-----')
return list_all_info,num_caiji return list_all_info,num_caiji
try: try:
ip = get_proxy()[random.randint(0, 3)] ip = baseCore.get_proxy()
res_news = requests.get(url_news, timeout=20,proxies=ip) res_news = requests.get(url_news, timeout=20,proxies=ip)
time.sleep(2) time.sleep(2)
except: except:
...@@ -201,8 +177,8 @@ def get_info(sid,json_search,origin,url_,info_source_code,page): ...@@ -201,8 +177,8 @@ def get_info(sid,json_search,origin,url_,info_source_code,page):
'createDate': time_now 'createDate': time_now
} }
for nnn in range(0, 3): for nnn in range(0, 3):
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
try: try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("crawlerInfo", json.dumps(dic_info, ensure_ascii=False).encode('utf8')) kafka_result = producer.send("crawlerInfo", json.dumps(dic_info, ensure_ascii=False).encode('utf8'))
kafka_time_out = kafka_result.get(timeout=10) kafka_time_out = kafka_result.get(timeout=10)
add_url(sid, url_news) add_url(sid, url_news)
...@@ -210,6 +186,8 @@ def get_info(sid,json_search,origin,url_,info_source_code,page): ...@@ -210,6 +186,8 @@ def get_info(sid,json_search,origin,url_,info_source_code,page):
except: except:
time.sleep(5) time.sleep(5)
continue continue
finally:
producer.close()
num_caiji = num_caiji + 1 num_caiji = num_caiji + 1
list_all_info.append(dic_info) list_all_info.append(dic_info)
time.sleep(5) time.sleep(5)
...@@ -241,9 +219,9 @@ def job(count,key): ...@@ -241,9 +219,9 @@ def job(count,key):
log.info('===========获取公众号============') log.info('===========获取公众号============')
start_ = time.time() start_ = time.time()
# #todo:redis中数据 pop一条 #todo:redis中数据 pop一条
infoSourceCode = baseCore.redicPullData('WeiXinGZH:infoSourceCode') infoSourceCode = baseCore.redicPullData('WeiXinGZH:infoSourceCode')
if infoSourceCode == 'None': if infoSourceCode == 'None' or infoSourceCode == None:
#当一次采集完之后,重新插入数据并等待插入完成 #当一次采集完之后,重新插入数据并等待插入完成
getFromSql() getFromSql()
time.sleep(20) time.sleep(20)
...@@ -252,7 +230,7 @@ def job(count,key): ...@@ -252,7 +230,7 @@ def job(count,key):
sql = f"SELECT site_uri,id,site_name,info_source_code from info_source where info_source_code = '{infoSourceCode}' " sql = f"SELECT site_uri,id,site_name,info_source_code from info_source where info_source_code = '{infoSourceCode}' "
# '一带一路百人论坛' # '一带一路百人论坛'
# sql = f"SELECT site_uri,id,site_name,info_source_code from info_source where site_name = '一带一路百人论坛' " # sql = f"-- SELECT site_uri,id,site_name,info_source_code from info_source where info_source_code = 'IN-20220609-57436' "
cursor.execute(sql) cursor.execute(sql)
row = cursor.fetchone() row = cursor.fetchone()
...@@ -289,18 +267,90 @@ def job(count,key): ...@@ -289,18 +267,90 @@ def job(count,key):
cursor_.execute(insertSql, tuple(error)) cursor_.execute(insertSql, tuple(error))
cnx_.commit() cnx_.commit()
return count return count
fakeid = biz + '=='
fakeid = biz + '=='
url_search = f'https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin=0&count=5&fakeid={fakeid}&type=9&query=&token={token}&lang=zh_CN&f=json&ajax=1' url_search = f'https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin=0&count=5&fakeid={fakeid}&type=9&query=&token={token}&lang=zh_CN&f=json&ajax=1'
#获取页数 #获取页数
try: try:
ip = get_proxy()[random.randint(0, 3)] # ip = baseCore.get_proxy()
json_search = s.get(url_search, headers=headers, proxies=ip, json_search = s.get(url_search, headers=headers,
verify=False).json() verify=False).json() # , proxies=ip, verify=False
str_t = json.dumps(json_search)
time.sleep(1) time.sleep(1)
except Exception as e: except Exception as e:
log.error(f'===公众号{origin}请求失败!当前时间:{baseCore.getNowTime(1)}======={e}===') log.error(f'===公众号{origin}请求失败!当前时间:{baseCore.getNowTime(1)}======={e}===')
rePutIntoR(info_source_code) rePutIntoR(info_source_code)
time.sleep(20)
return count
ret = json_search['base_resp']['ret']
# {"base_resp": {"ret": 200003, "err_msg": "invalid session"}}
# TODO:需要判断返回值,根据返回值判断是封号还是biz错误
# {'base_resp': {'err_msg': 'freq control', 'ret': 200013}}========= 封号
# {'base_resp': {'err_msg': 'invalid args', 'ret': 200002}} 公众号biz错误 链接
# 'base_resp': {'err_msg': 'ok', 'ret': 0} 正常
if ret == 0:
pass
elif ret == 200013:
# 重新放入redis
# time.sleep(3600)
# 刷新 暂时用一下方法
rePutIntoR(info_source_code)
log.info(f'======该账号被封=======')
# for i in range(0,6): #600,1200,1800,2400,3000,3600
# #刷新
# time.sleep(600)
# log.info('=======等待时间600秒=====刷新浏览器=====')
# browser_run = list_b[0]
# browser_run.refresh()
r.set(key, 50)
r.expire(key, 5400)
return count
elif ret == 200002:
# 公众号链接错误 保存库里 记录错误信息及错误类型
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
error = [
origin,
url_,
info_source_code,
str_t,
'无效biz参数',
time_now
]
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(error))
cnx_.commit()
log.info(f'公众号----{origin}----耗时{baseCore.getTimeCost(start_, time.time())}')
return count
elif ret == 200003:
# 无效的session
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
error = [
origin,
url_,
info_source_code,
str_t,
'无效session',
time_now
]
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(error))
cnx_.commit()
log.info(f'公众号----{origin}----耗时{baseCore.getTimeCost(start_, time.time())}')
return count
else:
log.info(f'----其他情况-----{json_search}---公众号{origin}------')
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
error = [
origin,
url_,
info_source_code,
str_t,
'其他错误',
time_now
]
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(error))
cnx_.commit()
return count return count
try: try:
Max_data = int(json_search['app_msg_cnt']) Max_data = int(json_search['app_msg_cnt'])
...@@ -319,87 +369,15 @@ def job(count,key): ...@@ -319,87 +369,15 @@ def job(count,key):
# url_search = f'https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin=0&count=5&fakeid={fakeid}&type=9&query=&token={token}&lang=zh_CN&f=json&ajax=1' # url_search = f'https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin=0&count=5&fakeid={fakeid}&type=9&query=&token={token}&lang=zh_CN&f=json&ajax=1'
# https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin=0&count=5&fakeid=MzAwNDA5Njc1Mg==&type=9&query=&token=550883192&lang=zh_CN&f=json&ajax=1 # https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin=0&count=5&fakeid=MzAwNDA5Njc1Mg==&type=9&query=&token=550883192&lang=zh_CN&f=json&ajax=1
try: try:
ip = get_proxy()[random.randint(0, 3)] # ip = get_proxy()[random.randint(0, 3)]
json_search = s.get(url_search, headers=headers, proxies=ip, json_search = s.get(url_search, headers=headers,
verify=False).json() # , proxies=ip, verify=False verify=False).json() # , proxies=ip, verify=False
str_t = json.dumps(json_search) str_t = json.dumps(json_search)
time.sleep(2) time.sleep(2)
except Exception as e: except Exception as e:
log.error(f'===公众号{origin}请求失败!当前时间:{baseCore.getNowTime(1)}======={e}===') log.error(f'===公众号{origin}请求失败!当前时间:{baseCore.getNowTime(1)}======={e}===')
rePutIntoR(info_source_code) rePutIntoR(info_source_code)
return count return count
#{"base_resp": {"ret": 200003, "err_msg": "invalid session"}}
# TODO:需要判断返回值,根据返回值判断是封号还是biz错误
# {'base_resp': {'err_msg': 'freq control', 'ret': 200013}}========= 封号
# {'base_resp': {'err_msg': 'invalid args', 'ret': 200002}} 公众号biz错误 链接
# 'base_resp': {'err_msg': 'ok', 'ret': 0} 正常
ret = json_search['base_resp']['ret']
if ret == 0:
pass
elif ret == 200013:
# 重新放入redis
# time.sleep(3600)
# 刷新 暂时用一下方法
rePutIntoR(info_source_code)
log.info(f'======该账号被封=======')
# for i in range(0,6): #600,1200,1800,2400,3000,3600
# #刷新
# time.sleep(600)
# log.info('=======等待时间600秒=====刷新浏览器=====')
# browser_run = list_b[0]
# browser_run.refresh()
r.set(key, 50)
r.expire(key, 3600)
return count
elif ret == 200002:
# 公众号链接错误 保存库里 记录错误信息及错误类型
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
error = [
origin,
url_,
info_source_code,
str_t,
'无效biz参数',
time_now
]
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(error))
cnx_.commit()
log.info(f'公众号----{origin}----耗时{baseCore.getTimeCost(start_,time.time())}')
return count
elif ret == 200003:
# 无效的session
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
error = [
origin,
url_,
info_source_code,
str_t,
'无效session',
time_now
]
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(error))
cnx_.commit()
log.info(f'公众号----{origin}----耗时{baseCore.getTimeCost(start_, time.time())}')
return count
else:
log.info(f'----其他情况-----{json_search}---公众号{origin}------')
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
error = [
origin,
url_,
info_source_code,
str_t,
'其他错误',
time_now
]
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(error))
cnx_.commit()
return count
list_all = json_search['app_msg_list'] list_all = json_search['app_msg_list']
try: try:
...@@ -431,7 +409,6 @@ def job(count,key): ...@@ -431,7 +409,6 @@ def job(count,key):
return count return count
except Exception as e: except Exception as e:
# json解析该公众号成功但采集数据失败 # json解析该公众号成功但采集数据失败
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
false = [ false = [
origin, origin,
...@@ -491,8 +468,8 @@ if __name__=="__main__": ...@@ -491,8 +468,8 @@ if __name__=="__main__":
# 设置计数器的初始值为0 # 设置计数器的初始值为0
key = baseCore.getNextSeq() key = baseCore.getNextSeq()
r.set(key, 0) r.set(key, 0)
# 设置key的过期时间为10秒 # 设置key的过期时间为一个半小时
r.expire(key, 3600) r.expire(key, 5400)
while True: while True:
time.sleep(2) time.sleep(2)
new_value = baseCore.incrSet(key) new_value = baseCore.incrSet(key)
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论