提交 bdf0bc3e 作者: LiuLiYuan

Merge remote-tracking branch 'origin/master'

...@@ -217,6 +217,9 @@ class BaseCore: ...@@ -217,6 +217,9 @@ class BaseCore:
'Mozilla/5.0 (iPod; U; CPU iPhone OS 4_3_3 like Mac OS X; en-us) AppleWebKit/533.17.9 (KHTML, like Gecko) Version/5.0.2 Mobile/8J2 Safari/6533.18.5' 'Mozilla/5.0 (iPod; U; CPU iPhone OS 4_3_3 like Mac OS X; en-us) AppleWebKit/533.17.9 (KHTML, like Gecko) Version/5.0.2 Mobile/8J2 Safari/6533.18.5'
] ]
#Android agent池
__USER_PHONE_AGENT_LIST = ['Mozilla/5.0 (Linux; Android 7.1.1; OPPO R9sk) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.111 Mobile Safari/537.36']
def __init__(self): def __init__(self):
self.__cnx_proxy = pymysql.connect(host='114.115.159.144', user='root', password='zzsn9988', db='clb_project', self.__cnx_proxy = pymysql.connect(host='114.115.159.144', user='root', password='zzsn9988', db='clb_project',
charset='utf8mb4') charset='utf8mb4')
...@@ -461,6 +464,7 @@ class BaseCore: ...@@ -461,6 +464,7 @@ class BaseCore:
self.cursor.execute(query) self.cursor.execute(query)
token = self.cursor.fetchone()[0] token = self.cursor.fetchone()[0]
#检测语言
def detect_language(self, text): def detect_language(self, text):
# 使用langid.py判断文本的语言 # 使用langid.py判断文本的语言
result = langid.classify(text) result = langid.classify(text)
...@@ -471,3 +475,5 @@ class BaseCore: ...@@ -471,3 +475,5 @@ class BaseCore:
return result[0] return result[0]
'''
补充智库动态没有公众号信息数据的公众号
从库中读取信息,根据域名找到属于公众号的链接,
设置time.sleep 等待到每天执行
'''
import requests, time, re, datetime, random, json, pymysql, redis
import pandas as pd
import urllib3
from bs4 import BeautifulSoup
from selenium import webdriver
from obs import ObsClient
from kafka import KafkaProducer
# logging.basicConfig(filename='example.log', level=logging.INFO)
from base.BaseCore import BaseCore
baseCore = BaseCore()
log = baseCore.getLogger()
urllib3.disable_warnings()
def check_url(sid, article_url):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn')
res = r.sismember(f'wx_url_{sid}', article_url) # 注意是 保存set的方式
if res == 1: # 若返回0,说明插入不成功,表示有重复
return True
else:
return False
def add_url(sid, article_url):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn')
res = r.sadd(f'wx_url_{sid}', article_url, 3) # 注意是 保存set的方式
if res == 0: # 若返回0,说明插入不成功,表示有重复
return True
else:
return False
def get_proxy():
cnx = pymysql.connect(host="114.115.159.144", user="root", password="zzsn9988", db="clb_project", charset="utf8mb4")
with cnx.cursor() as cursor:
sql = "select proxy from clb_proxy"
cursor.execute(sql)
proxy_lists = cursor.fetchall()
ip_list = []
for proxy_ in proxy_lists:
ip_list.append(str(proxy_).replace("('", '').replace("',)", ''))
proxy_list = []
for str_ip in ip_list:
str_ip_list = str_ip.split('-')
proxyMeta = "http://%(host)s:%(port)s" % {
"host": str_ip_list[0],
"port": str_ip_list[1],
}
proxy = {
"HTTP": proxyMeta,
"HTTPS": proxyMeta
}
proxy_list.append(proxy)
return proxy_list
def get_info(json_search):
num_caiji = 0
kaishi_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
obsClient = ObsClient(
access_key_id='VEHN7D0TJ9316H8AHCAV', # 你的华为云的ak码
secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk
server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址
)
list_all_news = json_search['app_msg_list']
for one_news in list_all_news:
news_title = one_news['title']
timestamp = one_news['create_time']
time_local = time.localtime(timestamp)
news_date = time.strftime("%Y-%m-%d %H:%M:%S", time_local)
url_news = one_news['link']
url_ft = check_url(sid, url_news)
if url_ft:
return list_all_info,url_news,news_title
try:
res_news = requests.get(url_news, timeout=20)
except:
continue
soup_news = BeautifulSoup(res_news.content, 'html.parser')
news_html = soup_news.find('div', {'id': 'js_content'})
try:
del news_html['style']
del news_html['id']
del news_html['class']
except:
pass
news_content = news_html.text
list_img = news_html.find_all('img')
for num_img in range(len(list_img)):
img_one = list_img[num_img]
url_src = img_one.get('data-src')
# print(url_src)
if 'gif' in url_src:
url_img = ''
img_one.extract()
else:
try:
name_img = url_src.split('/')[-2] + '.' + url_src.split('wx_fmt=')[1]
except:
img_one.extract()
continue
try:
res = requests.get(url_src, timeout=20)
except:
img_one.extract()
resp = obsClient.putContent('zzsn', name_img, content=res.content)
url_img = resp['body']['objectUrl']
str_url_img = f'<img src="{url_img}">'
img_one.replace_with(BeautifulSoup(str_url_img, 'lxml').img)
for tag in news_html.descendants:
try:
del tag['style']
except:
pass
list_section = news_html.find_all('section')
for section in list_section:
section.name = 'div'
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_info = {
'sid': sid,
'title': news_title,
'content': news_content,
'contentWithtag': str(news_html),
'summary': '',
'author': '',
'origin': origin,
'publishDate': news_date,
'sourceAddress': url_news,
'source': '11',
'createDate': time_now
}
for nnn in range(0, 3):
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("crawlerInfo", json.dumps(dic_info, ensure_ascii=False).encode('utf8'))
kafka_time_out = kafka_result.get(timeout=10)
add_url(sid, url_news)
break
except:
time.sleep(5)
continue
num_caiji = num_caiji + 1
list_all_info.append(dic_info)
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_info2 = {
'infoSourceId': sid,
'code': info_source_code,
'num': num_caiji,
'collectTime': kaishi_time,
'dispatcherTime': time_now,
'dispatcherStatus': '1',
'source': '1',
}
for nnn2 in range(0, 3):
try:
producer2 = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result2 = producer2.send("collectionAndDispatcherInfo",
json.dumps(dic_info2, ensure_ascii=False).encode('utf8'))
break
except:
time.sleep(5)
continue
return list_all_info,url_news,news_title
if __name__=="__main__":
time_start = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print(f'开始时间为:{time_start}')
requests.adapters.DEFAULT_RETRIES = 3
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36',
}
opt = webdriver.ChromeOptions()
opt.add_argument(
'user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36')
# opt.add_argument(f"--proxy-server={ip}")
opt.add_argument("--ignore-certificate-errors")
opt.add_argument("--ignore-ssl-errors")
opt.add_experimental_option("excludeSwitches", ["enable-automation"])
opt.add_experimental_option('excludeSwitches', ['enable-logging'])
opt.add_experimental_option('useAutomationExtension', False)
chromedriver = 'D:/chrome/chromedriver.exe'
browser1 = webdriver.Chrome(chrome_options=opt, executable_path=chromedriver)
list_b = [browser1]
url = "https://mp.weixin.qq.com/"
browser1.get(url)
# browser2.get(url)
# browser3.get(url)
# 可改动
time.sleep(50)
num_b = 0
browser_run = list_b[0]
log.info('======刷新浏览器=====')
browser_run.refresh()
cookie_list = browser_run.get_cookies()
cur_url = browser_run.current_url
token = cur_url.split('token=')[1]
log.info(f'===========当前token为:{token}============')
cookies = {}
for cookie in cookie_list:
cookies[cookie['name']] = cookie['value']
s = requests.session()
while True:
all = []
list_all_info = []
list_error_url = []
list_laiyuan = []
cnx = pymysql.connect(host="114.116.44.11", user="root", password="f7s0&7qqtK", db="clb_project", charset="utf8mb4")
log.info('===========获取公众号============')
start_ = time.time()
with cnx.cursor() as cursor:
sql = "SELECT site_uri,id,site_name,info_source_code from info_source where site_uri like '%mp.weixin.qq.com%'"
cursor.execute(sql)
rows = cursor.fetchall()
# 将数据库中的数据切分为两部分
for row in rows:
# print(len(rows[:945]))
# if row[2]=='南方周末':
dic_url = {
'url': row[0],
'sid': row[1],
'name': row[2],
'info_source_code': row[3],
'biz': ''
}
list_laiyuan.append(dic_url)
log.info(f'===========获取公众号完成,耗时{baseCore.getTimeCost(start_,time.time())}============')
# list_laiyuan.reverse()
log.info('===========获取biz==========')
start__ = time.time()
for dic_one in list_laiyuan:
url = dic_one['url']
try:
biz = url.split('__biz=')[1].split('==&')[0].split('=')[0]
dic_one['biz'] = biz
except:
continue
log.info(f'==========获取biz完成,耗时{baseCore.getTimeCost(start__,time.time())}==========')
# list_biz.append(biz)
# list_laiyuan.reverse()
#记录错误的biz及相关信息
biz_error_biz = []
biz_error_origin = []
biz_error_code = []
#记录解析成功但采集失败的相关信息
get_error_biz = []
get_error_origin = []
get_error_code = []
#记录解析失败的相关信息
json_error_biz = []
json_error_origin = []
json_error_code = []
for num_biz in range(0, len(list_laiyuan)):
browser_run.refresh()
cookie_list = browser_run.get_cookies()
cur_url = browser_run.current_url
token = cur_url.split('token=')[1]
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
log.info(f'=========刷新时间:{time_now}=========')
log.info(f'=========当前token为:{token}=========')
cookies = {}
for cookie in cookie_list:
cookies[cookie['name']] = cookie['value']
list_url = []
s.cookies.update(cookies)
sid = list_laiyuan[num_biz]['sid']
origin = list_laiyuan[num_biz]['name']
info_source_code = list_laiyuan[num_biz]['info_source_code']
biz = list_laiyuan[num_biz]['biz']
if biz:
pass
else:
continue
fakeid = biz + '=='
url_search = f'https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin=5&count=5&fakeid={fakeid}&type=9&query=&token={token}&lang=zh_CN&f=json&ajax=1'
#记录运行公众号的个数
count = 0
try:
ip = get_proxy()[random.randint(0, 3)]
json_search = s.get(url_search, headers=baseCore.getRandomUserAgent(), proxies=ip,
verify=False).json() # , proxies=ip, verify=False
time.sleep(2)
break
except:
log.info(f'===公众号{origin}请求失败!当前时间:{baseCore.getNowTime(1)}===')
error_text = str(json_search)
json_search = ''
aa = time.sleep(600)
log.info(f'======等待时间{aa}=======')
break
try:
list_all = json_search['app_msg_list']
except:
#解析失败的情况
count += 1
# (f'{fakeid}:biz错误!')
log.info(f'{fakeid}:biz错误!、公众号为{origin}=====当前时间:{baseCore.getNowTime(1)}')
biz_error_biz.append(biz)
biz_error_origin.append(origin)
biz_error_code.append(info_source_code)
df_error_biz = pd.DataFrame({'公众号': biz_error_origin,
'code': biz_error_code,
'错误biz': biz_error_biz,
})
excel_name = time.strftime("%Y-%m-%d", time.localtime())
#原来:
# df_error_biz.to_excel(f'./错误biz/{excel_name}.xlsx', index=False)
#改为:
with pd.ExcelWriter(f'./错误biz/{excel_name}2.xlsx', engine='xlsxwriter',
options={'strings_to_urls': False}) as writer:
df_error_biz.to_excel(writer, index=False)
bb = time.sleep(3600)
log.info(f'========当前账号可能被封,等待时长{bb}======')
#刷新
log.info(f'=============刷新浏览器=============')
browser_run.refresh()
cookie_list = browser_run.get_cookies()
cur_url = browser_run.current_url
token = cur_url.split('token=')[1]
log.info(f'=========当前token:{token}=========')
cookies = {}
for cookie in cookie_list:
cookies[cookie['name']] = cookie['value']
continue
if list_all:
str_t = json.dumps(json_search)
try:
list_all_info,url_news,news_title = get_info(json_search)
time.sleep(10)
count += 1
if len(list_all_info):
for dic_one in list_all_info:
all.append(dic_one)
df_info = pd.DataFrame(all)
excel_name = time.strftime("%Y-%m-%d", time.localtime())
try:
# df_info.to_excel(f'./运行结果/{excel_name}_实时数据.xlsx', index=False)
with pd.ExcelWriter(f'./运行结果/{excel_name}_实时数据.xlsx', engine='xlsxwriter',
options={'strings_to_urls': False}) as writer:
df_info.to_excel(writer, index=False)
except:
# df_info.to_excel(f'./运行结果/{excel_name}_2_实时数据.xlsx', index=False)
with pd.ExcelWriter(f'./运行结果/{excel_name}_2_实时数据.xlsx', engine='xlsxwriter',
options={'strings_to_urls': False}) as writer:
df_info.to_excel(writer, index=False)
# 该公众号的所有文章采集完成
# print(f'{fakeid}:采集成功!')
log.info(f'{fakeid}、公众号{origin}:采集成功!、已采集{count}个公众号')
else:
log.info(f'{fakeid}、公众号{origin}:{url_news},{news_title}已采集过该文章!、已采集{count}个公众号')
except:
# json解析该公众号成功但采集数据失败
count += 1
log.info(f'{fakeid}、公众号:{origin}采集失败!!!!!!已采集{count}个公众号')
# print(f'{fakeid}:解析失败!!!!!!')
list_error_url.append(str_t)
get_error_origin.append(origin)
get_error_code.append(info_source_code)
excel_name = time.strftime("%Y-%m-%d", time.localtime())
df_error_url = pd.DataFrame({'公众号:': get_error_origin,
'code': get_error_code,
'信息': list_error_url})
# df_error_url.to_excel(f'./保存失败/{excel_name}.xlsx', index=False)
with pd.ExcelWriter(f'./保存失败/{excel_name}.xlsx',engine='xlsxwriter',options={'strings_to_urls':False}) as writer:
df_error_url.to_excel(writer,index=False)
time.sleep(1)
else:
# list_all为空
count += 1
time_end = time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())
# print(f'{fakeid}:运行出错!时间为:{time_end}')
log.info(f'{fakeid}、公众号{origin}:list_all为空!已采集{count}个公众号、时间为:{time_end}')
json_error_biz.append(fakeid)
json_error_origin.append(origin)
json_error_code.append(info_source_code)
df_error_json = pd.DataFrame({'公众号:': json_error_origin,
'code': json_error_code,
'信息': json_error_biz})
# df_error_json.to_excel(f'./错误文件/{time_end}.xlsx', index=False)
with pd.ExcelWriter(f'./错误文件/{time_end}.xlsx', engine='xlsxwriter',
options={'strings_to_urls': False}) as writer:
df_error_json.to_excel(writer, index=False)
# error_text_txt = fakeid
# with open(f'./错误文件/{time_end}.txt', 'w') as f:
# f.write(error_text_txt)
# time.sleep(2)
# browser_run = list_b[0]
# browser_run.refresh()
# cookie_list = browser_run.get_cookies()
# cur_url = browser_run.current_url
# token = cur_url.split('token=')[1]
# print(token)
# cookies = {}
# for cookie in cookie_list:
# cookies[cookie['name']] = cookie['value']
time_end = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
log.info(f'运行结束,时间为:{time_end}')
print(f'运行结束,时间为:{time_end}')
df_info = pd.DataFrame(list_all_info)
excel_name = time.strftime("%Y-%m-%d", time.localtime())
df_info.to_excel(f'./运行结果/{excel_name}_总数据.xlsx', index=False)
list_b[0].refresh()
time.sleep(2)
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论