提交 14beed77 作者: 薛凌堃

微信公众号

上级 a741a5e1
import json
import time
import requests
from pymysql.converters import escape_string
from selenium import webdriver
from bs4 import BeautifulSoup
from base.BaseCore import BaseCore
baseCore = BaseCore()
log = baseCore.getLogger()
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
def flushAndGetToken(browser):
log.info('======刷新浏览器=====')
browser.refresh()
cookie_list = browser.get_cookies()
cur_url = browser.current_url
token = cur_url.split('token=')[1]
log.info(f'===========当前token为:{token}============')
cookies = {}
for cookie in cookie_list:
cookies[cookie['name']] = cookie['value']
browser.get(cur_url)
info = browser.page_source
# res_2 = requests.get(year_url, proxies=ip)
soup = BeautifulSoup(info, 'html.parser')
user_name = soup.find('div', class_='weui-desktop_name').text
return token,cookies,user_name
if __name__=="__main__":
requests.DEFAULT_RETRIES = 5
time_start = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
log.info(f'开始时间为:{time_start}')
requests.adapters.DEFAULT_RETRIES = 3
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36',
}
opt = webdriver.ChromeOptions()
opt.add_argument(
'user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36')
opt.add_argument("--ignore-certificate-errors")
opt.add_argument("--ignore-ssl-errors")
opt.add_experimental_option("excludeSwitches", ["enable-automation"])
opt.add_experimental_option('excludeSwitches', ['enable-logging'])
opt.add_experimental_option('useAutomationExtension', False)
# opt.binary_location =r'D:\crawler\baidu_crawler\tool\Google\Chrome\Application\chrome.exe'
# chromedriver = r'C:\Users\WIN10\DataspellProjects\crawlerProjectDemo\tmpcrawler\cmd100\chromedriver.exe'
chromedriver = r'D:/chrome/chromedriver.exe'
browser = webdriver.Chrome(chrome_options=opt, executable_path=chromedriver)
url = "https://mp.weixin.qq.com/"
browser.get(url)
# 可改动
time.sleep(30)
s = requests.session()
#获取到token和cookies
token, cookies,user_name = flushAndGetToken(browser)
print(token,cookies)
cookies = json.dumps(cookies)
# loadinfo = [token,cookies]
#保存到数据库中
insert = f"insert into weixin_tokenCookies (token,cookies,create_time,fenghao_time,user_name) values ('{token}','{escape_string(cookies)}',now(),DATE_SUB(NOW(), INTERVAL 1 DAY),'{user_name}')"
cursor_.execute(insert)
cnx_.commit()
baseCore.close()
# s.cookies.update(cookies)
# s.keep_alive = False
# -*- coding: utf-8 -*-
'''
成功100 发送数据失败200 请求失败400 文章内容为空500
'''
import requests, time, random, json, pymysql, redis
import pandas as pd
import urllib3
from bs4 import BeautifulSoup
from openpyxl import Workbook
from selenium import webdriver
from obs import ObsClient
from kafka import KafkaProducer
# logging.basicConfig(filename='example.log', level=logging.INFO)
from base.BaseCore import BaseCore
import os
baseCore = BaseCore()
log = baseCore.getLogger()
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
cnx = pymysql.connect(host="114.116.44.11", user="root", password="f7s0&7qqtK", db="clb_project", charset="utf8mb4")
cursor = cnx.cursor()
r = baseCore.r
urllib3.disable_warnings()
def check_url(sid, article_url):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn')
res = r.sismember(f'wx_url_{sid}',article_url)
if res == 1:
return True
else:
return False
def add_url(sid, article_url):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn')
res = r.sadd(f'wx_url_{sid}', article_url, 3) # 注意是 保存set的方式
if res == 0: # 若返回0,说明插入不成功,表示有重复
return True
else:
return False
# #定时
# def getFromSql():
# selectSql = "SELECT info_source_code from info_source where site_uri like '%mp.weixin.qq.com%'"
# cursor.execute(selectSql)
# results = cursor.fetchall()
# result_list = [item[0] for item in results]
#
# #放入redis
# for item in result_list:
# r.rpush('WeiXinGZH:infoSourceCode', item)
#
# #刷新浏览器并获得token
# def flushAndGetToken(list_b):
# browser_run = list_b[0]
# log.info('======刷新浏览器=====')
# browser_run.refresh()
# cookie_list = browser_run.get_cookies()
# cur_url = browser_run.current_url
# token = cur_url.split('token=')[1]
# log.info(f'===========当前token为:{token}============')
# cookies = {}
# for cookie in cookie_list:
# cookies[cookie['name']] = cookie['value']
# return token,cookies
#采集失败的公众号 重新放入redis
def rePutIntoR(item):
r.rpush('WeiXinGZH:infoSourceCode', item)
def updatewxLink(link,info_source_code,state):
updateSuccess = f"update wx_link set state= {state} where link='{link}' and info_source_code='{info_source_code}' "
cursor_.execute(updateSuccess)
cnx_.commit()
def getjsonInfo():
#从数据库中获取信息 一条
select_sql = "select * from wx_link where state=0 order by id asc limit 1"
cursor_.execute(select_sql)
row = cursor_.fetchone()
if row:
pass
else:
log.info('-----没有数据了-----')
return False
dict_json = {
'sid':row[1],
'site_uri':row[2],
'site_name':row[3],
'info_source_code':row[4],
'title':row[5],
'publish_time':row[6],
'link':row[7]
}
# 拿到一条数据 更新状态
update_sql = f"update wx_link set state=1 where link='{row[7]}' and info_source_code='{row[4]}' "
cursor_.execute(update_sql)
cnx_.commit()
return dict_json
def get_info(dict_json):
# list_all_info = []
# num_caiji = 0
kaishi_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
obsClient = ObsClient(
access_key_id='VEHN7D0TJ9316H8AHCAV', # 你的华为云的ak码
secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk
server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址
)
news_title = dict_json['title']
sid = dict_json['sid']
news_date = dict_json['publish_time']
origin = dict_json['site_name']
url_news = dict_json['link']
info_source_code = dict_json['info_source_code']
# url_ft = check_url(sid, url_news)
# if url_ft:
# return list_all_info,num_caiji
try:
ip = baseCore.get_proxy()
res_news = requests.get(url_news, proxies=ip,timeout=20)
except:
#400请求失败
updatewxLink(url_news,info_source_code,400)
return False
soup_news = BeautifulSoup(res_news.content, 'html.parser')
news_html = soup_news.find('div', {'id': 'js_content'})
try:
del news_html['style']
del news_html['id']
del news_html['class']
except:
pass
try:
news_content = news_html.text
except:
log.info(f'--------内容为空--------{url_news}--------')
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
false = [
news_title,
url_news,
news_html,
'文章内容为空',
time_now
]
insertSql = f"insert into WeixinGZH (site_name,site_url,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(false))
cnx_.commit()
updatewxLink(url_news,info_source_code,500)
return False
list_img = news_html.find_all('img')
for num_img in range(len(list_img)):
img_one = list_img[num_img]
url_src = img_one.get('data-src')
# print(url_src)
if 'gif' in url_src:
url_img = ''
img_one.extract()
else:
try:
name_img = url_src.split('/')[-2] + '.' + url_src.split('wx_fmt=')[1]
except:
img_one.extract()
continue
try:
res = requests.get(url_src, timeout=20)
except:
img_one.extract()
resp = obsClient.putContent('zzsn', name_img, content=res.content)
url_img = resp['body']['objectUrl']
str_url_img = f'<img src="{url_img}">'
img_one.replace_with(BeautifulSoup(str_url_img, 'lxml').img)
for tag in news_html.descendants:
try:
del tag['style']
except:
pass
list_section = news_html.find_all('section')
for section in list_section:
section.name = 'div'
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_info = {
'sid': sid,
'title': news_title,
'content': news_content,
'contentWithtag': str(news_html),
'summary': '',
'author': '',
'origin': origin,
'publishDate': news_date,
'sourceAddress': url_news,
'source': '11',
'createDate': time_now
}
for nnn in range(0, 3):
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("crawlerInfo", json.dumps(dic_info, ensure_ascii=False).encode('utf8'))
kafka_time_out = kafka_result.get(timeout=10)
add_url(sid, url_news)
break
except:
time.sleep(5)
log.info('------数据发送kafka失败------')
updatewxLink(url_news,info_source_code,200)
continue
list_all_info.append(dic_info)
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_info2 = {
'infoSourceId': sid,
'code': info_source_code,
'num': num_caiji,
'collectTime': kaishi_time,
'dispatcherTime': time_now,
'dispatcherStatus': '1',
'source': '1',
}
for nnn2 in range(0, 3):
try:
producer2 = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result2 = producer2.send("collectionAndDispatcherInfo",
json.dumps(dic_info2, ensure_ascii=False).encode('utf8'))
break
except:
time.sleep(5)
continue
updatewxLink(url_news,info_source_code,100)
return True
if __name__=="__main__":
num_caiji = 0
list_all_info = []
while True:
#一次拿取一篇文章
dict_json =getjsonInfo()
if dict_json:
if get_info(dict_json):
num_caiji = num_caiji + 1
log.info(f'-----已采集{num_caiji}篇文章---来源{dict_json["site_name"]}----')
else:
break
baseCore.close()
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论