提交 344f8a7b 作者: liuweigang

Merge remote-tracking branch 'origin/master'

# Conflicts:
#	baidu_search/src/main/java/com/zzsn/CrawlerMateSearchApplication.java
......@@ -252,6 +252,67 @@
<version>4.0.0-rc-2</version>
</dependency>
<!--WebMagic 爬虫框架-->
<dependency>
<groupId>us.codecraft</groupId>
<artifactId>webmagic-core</artifactId>
<version>0.7.5</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>us.codecraft</groupId>
<artifactId>webmagic-extension</artifactId>
<version>0.7.5</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 下面引用本地包,webMgic源码包修过后-->
<!-- <dependency>-->
<!-- <groupId>us.codecraft</groupId>-->
<!-- <artifactId>xsoup</artifactId>-->
<!-- <version>0.3.4</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>us.codecraft</groupId>-->
<!-- <artifactId>webmagic-core</artifactId>-->
<!-- <version>0.7.5</version>-->
<!-- <scope>system</scope>-->
<!-- <systemPath>${pom.basedir}/lib/webmagic-core-0.7.5.jar</systemPath>-->
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <groupId>org.slf4j</groupId>-->
<!-- <artifactId>slf4j-log4j12</artifactId>-->
<!-- </exclusion>-->
<!-- </exclusions>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>us.codecraft</groupId>-->
<!-- <artifactId>webmagic-extension</artifactId>-->
<!-- <version>0.7.5</version>-->
<!-- <systemPath>${pom.basedir}/lib/webmagic-extension-0.7.5.jar</systemPath>-->
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <groupId>org.slf4j</groupId>-->
<!-- <artifactId>slf4j-log4j12</artifactId>-->
<!-- </exclusion>-->
<!-- </exclusions>-->
<!-- </dependency>-->
<!--&lt;!&ndash; &lt;!&ndash; 上面引用本地包,webMgic源码包修过后&ndash;&gt;&ndash;&gt;-->
<dependency>
<groupId>com.github.detro</groupId>
<artifactId>phantomjsdriver</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
......@@ -387,7 +448,12 @@
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.7</version>
<version>3.11</version>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.2</version>
</dependency>
<!-- http 工具 -->
<dependency>
......@@ -400,11 +466,11 @@
<artifactId>protobuf-java</artifactId>
<version>3.2.0</version>
</dependency>
<dependency>
<groupId>com.burgstaller</groupId>
<artifactId>okhttp-digest</artifactId>
<version>1.15</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.burgstaller</groupId>-->
<!-- <artifactId>okhttp-digest</artifactId>-->
<!-- <version>2.0</version>-->
<!-- </dependency>-->
<!-- spring定时任务 -->
<dependency>
......
......@@ -8,6 +8,7 @@ import com.zzsn.search.MetaBaiduSearchThread;
import com.zzsn.search.entity.KeywordMsg;
import com.zzsn.search.util.SpringContextUtil;
import com.zzsn.utility.index.Constants;
import com.zzsn.webMagic.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
......@@ -34,26 +35,40 @@ import java.util.Properties;
@Slf4j
@SpringBootApplication(scanBasePackages = "com.zzsn")
public class CrawlerMateSearchApplication extends SpringBootServletInitializer implements CommandLineRunner {
public class CrawlerMateSearchApplication extends SpringBootServletInitializer {
@Override
protected SpringApplicationBuilder configure(SpringApplicationBuilder builder) {
return builder.sources(CrawlerMateSearchApplication.class);
}
public static void main(String[] args) {
SpringApplication.run(CrawlerMateSearchApplication.class, args);
SpringApplication.run(CrawlerMateSearchApplication.class,args);
}
@Override
public void run(String... args) throws Exception {
/**
* 采用webMagic框架爬取
*/
public void webMagic(){
new LinksReadThread().start();
new BaiduContentThread().start();
}
// System.out.println("——————++++++++++++——————===");
// try {
// consumerNoPartition();
// } catch (Exception e) {
// consumerNoPartition();
// }
/**
* 老方法抓取
* @throws Exception
*/
public void ordinary() {
System.out.println("——————++++++++++++——————===");
try {
consumerPartition();
} catch (Exception e) {
consumerPartition();
}
loadSiteMsgLoc();
}
public void consumerPartition (){
log.info("定时获取mq消息");
//1.创建消费者
......@@ -85,33 +100,6 @@ public class CrawlerMateSearchApplication extends SpringBootServletInitializer i
}
}
public void consumerNoPartition (){
log.info("定时获取mq消息");
//1.创建消费者
KafkaConsumer<String, String> consumer = createConsumer();
consumer.subscribe(Arrays.asList(Constants.KAFKA_CONSUMER_TOPIC));
while(true){
//消费者是一个长期运行的程序,通过持续轮询向Kafka请求数据。在其他线程中调用consumer.wakeup()可以退出循环
//在0ms内等待Kafka的broker返回数据.超时参数指定poll在多久之后可以返回,不管有没有可用的数据都要返回
ConsumerRecords<String, String> records = consumer.poll(0);
consumer.commitSync();
for(ConsumerRecord record : records){
try {
KeywordMsg keywordMsg = new Gson().fromJson(record.value().toString(), KeywordMsg.class);
log.info("关键词解析keywordMsg正常");
consumer.commitSync();
MetaBaiduSearchThread metaSearchThread = new MetaBaiduSearchThread();
metaSearchThread.keywordMsg = keywordMsg;
metaSearchThread.crawler();
log.info("关键词请求结束++++");
}catch (Exception e){
log.info("关键词解析异常: "+record.value().toString());
}
}
}
}
public void loadSiteMsgLoc() {
String filepath= Constants.META_SEARCH_KEYWORDPATH;
System.out.println(filepath);
......
......@@ -113,4 +113,12 @@ public class Constants {
public static final String KAFKA_CONSUMER_PARTITION= prop.getProperty("KAFKA_CONSUMER_PARTITION");
public static final String KAFKA_PRODUCT_PARTITION= prop.getProperty("KAFKA_PRODUCT_PARTITION");
public static final int KAFKA_COUNT=Integer.valueOf(prop.getProperty("whiles"));
public static final String testBaidu=prop.getProperty("KAFKA_test_TOPIC");
public static final Integer PAGESIZE=Integer.valueOf(prop.getProperty("pageSize"));
public static final Integer AVERGER=Integer.valueOf(prop.getProperty("averger"));
}
package com.zzsn.utils;
import com.zzsn.webMagic.KafkaConsumers;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.proxy.Proxy;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class GrabUtil {
private final static Logger logger = LoggerFactory.getLogger(GrabUtil.class);
/**
* 批量代理IP有效检测
*
*/
public static boolean checkProxyIp(Proxy proxyx) {
//创建httpGet实例
// HttpGet httpGet = new HttpGet("http://www.baidu.com");
HttpGet httpGet = new HttpGet("https://www.163.com/dy/article/GDT03TFD05158K7T.html");
//设置代理IP,设置连接超时时间 、 设置 请求读取数据的超时时间 、 设置从connect Manager获取Connection超时时间、
HttpHost proxy = new HttpHost(proxyx.getHost(),proxyx.getPort());
CredentialsProvider provider = new BasicCredentialsProvider();
//包含账号密码的代理
provider.setCredentials(new AuthScope(proxy), new UsernamePasswordCredentials(proxyx.getUsername(), proxyx.getPassword()));
//创建httpClient实例
CloseableHttpClient httpClient = HttpClients.custom().setDefaultCredentialsProvider(provider).build();
RequestConfig requestConfig = RequestConfig.custom()
.setProxy(proxy)
.setConnectTimeout(2000)
.setSocketTimeout(2000)
.setConnectionRequestTimeout(2000)
.build();
httpGet.setConfig(requestConfig);
//设置请求头消息
httpGet.setHeader("User-Agent","Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.94 Safari/537.36");
CloseableHttpResponse response=null;
try {
response = httpClient.execute(httpGet);
int statusCode = response.getStatusLine().getStatusCode();
if (response != null){
HttpEntity entity = response.getEntity(); //获取返回实体
if (entity != null){
System.out.println("网页内容为:"+ EntityUtils.toString(entity,"utf-8"));
}
}
if (statusCode ==200){
return true;
}
}catch ( Exception e){
e.printStackTrace();
// logger.error("校验代理ip是否可用出错,错误信息:",e);
}finally {
if (response != null){
try {
response.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (httpClient != null){
try {
httpClient.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return false;
}
public static String httpGet(String url,Proxy myproxy){
HttpGet httpGet = new HttpGet(url);
//设置代理IP,设置连接超时时间 、 设置 请求读取数据的超时时间 、 设置从connect Manager获取Connection超时时间、
HttpHost proxy = new HttpHost(myproxy.getHost(),myproxy.getPort());
CredentialsProvider provider = new BasicCredentialsProvider();
//包含账号密码的代理
provider.setCredentials(new AuthScope(proxy), new UsernamePasswordCredentials(myproxy.getUsername(), myproxy.getPassword()));
//创建httpClient实例
CloseableHttpClient httpClient = HttpClients.custom().setDefaultCredentialsProvider(provider).build();
RequestConfig requestConfig = RequestConfig.custom()
.setProxy(proxy)
.setConnectTimeout(5000)
.setSocketTimeout(5000)
.setConnectionRequestTimeout(5000)
.build();
httpGet.setConfig(requestConfig);
//设置请求头消息
httpGet.setHeader("Accept",
"text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3");
httpGet.setHeader("Accept-Encoding", "gzip, deflate");
httpGet.setHeader("Cache-Control", "max-age=0");
httpGet.setHeader("Connection", "keep-alive");
httpGet.setHeader("User-Agent","Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36");
CloseableHttpResponse response=null;
try {
response = httpClient.execute(httpGet);
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode ==200){
logger.info(" page sucess by ["+proxy.getHostName()+"],"+url+" ");
HttpEntity entity = response.getEntity(); //获取返回实体
return EntityUtils.toString(entity,"utf-8");
}else {
logger.info("page error by ["+proxy.getHostName()+"],"+url+" code:"+statusCode);
}
}catch ( Exception e){
logger.info("page error by ["+proxy.getHostName()+"],"+url+" code:500 ",e);
}finally {
if (response != null){
try {
response.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (httpClient != null){
try {
httpClient.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return null;
}
/**
* 给list进行分包
* @param iPacket 包大小
* @param list
*/
public static List<List<Request>> AverageList(List<Request>list, int iPacket){
int iCount=list.size()/iPacket;
int iRemit=list.size()%iPacket;
List<List<Request>> Average=new ArrayList<>();
for (int i=0;i<iCount;i++){
Average.add(list.subList(i*iPacket,(i+1)*iPacket));
}
if (iRemit>0){
Average.add(list.subList(iCount*iPacket,(iCount*iPacket+iRemit)));
}
return Average;
}
/**
* 获取神龙ip接口
* 一次获取一个,有效3分钟
*/
// public static MyProxy getShenlongIp(){
// MyProxy proxy=null;
// String url="http://api.shenlongip.com/ip?key=jy0tr2q0&pattern=json&count=1&need=1100&protocol=1&sign=d5654a620e9b424ca87fe1802f4c9e88";
// String result=HttpUtil.get(url);
// logger.info("调用代理IP接口返回结果:"+result);
// if (result!=null && result.length()>0){
// JSONObject re=JSONObject.parseObject(result);
// if (200== re.getInteger("code")){
// JSONArray datas= re.getJSONArray("data");
// if (datas.size()>0){
// for (Object data:datas){
// try {
// JSONObject o=(JSONObject) data;
// long expire= DateUtil.convertDate(o.getString("expire"),"yyyy-MM-dd HH:mm:ss").getTime();
// proxy=new MyProxy(o.getString("ip"),o.getInteger("port"),"hys_81310170_41c8","12345678",expire);
// }catch (Exception e) {
// e.printStackTrace();
// }
// }
// }
// }else {
// logger.info("获取代理IP出错,返回信息:"+result);
// }
// }
// return proxy;
// }
public static void main(String[] args) {
String url="http://baijiahao.baidu.com/s?id=1662021416338923958&wfr=spider&for=pc;";
Proxy proxy= KafkaConsumers.getProxy().get(0);
System.out.println(httpGet(url,proxy));
}
// 3 175.10.141.61-40013-hys_81310170_41c8-12345678
// 1 125.119.174.226-40032-hys_81310170_41c8-12345678
// 2 122.230.139.103-40004-hys_81310170_41c8-12345678
// 4 183.165.248.64-40007-hys_81310170_41c8-12345678
}
package com.zzsn.webMagic;
import com.alibaba.fastjson.JSONObject;
import com.zzsn.cache.JedisUtil;
import com.zzsn.search.entity.ClbAnsProcessitem;
import com.zzsn.utility.index.Constants;
import com.zzsn.utils.GrabUtil;
import com.zzsn.webMagic.downloader.SeleniumDownloader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.Spider;
import us.codecraft.webmagic.downloader.HttpClientDownloader;
import us.codecraft.webmagic.proxy.Proxy;
import us.codecraft.webmagic.proxy.SimpleProxyProvider;
import java.util.ArrayList;
import java.util.List;
/**
* 抓取 百度内容
*/
public class BaiduContentThread extends Thread {
private final static Logger logger = LoggerFactory.getLogger(BaiduContentThread.class);
@Override
public void run() {
//从队列获取连接进行内容提取
List<Request> requestsList=new ArrayList<>();
Request request=null;
while (true){
try {
//队列
ClbAnsProcessitem entity=(ClbAnsProcessitem) BaiduTask.baiduUrlQueue.take();
if (entity !=null){
logger.info("等到内容连接来了,开始处理列表连接。。。。。");
// 根据连接判断url是否在redis里存在,存在则丢掉
boolean sismember = JedisUtil.sismember("baidu_web_test5::"+entity.getOrgId(), entity.getSourceAddress());
if (!sismember){
request=new Request();
request.setUrl(entity.getSourceAddress());
//把相关信息传递到下个对象中
request.putExtra("model", JSONObject.toJSONString(entity));
requestsList.add(request);
}else {
BaiduTask.doublep++;
logger.info("连接:"+entity.getSourceAddress()+"已存在。");
}
}
if (requestsList.size()<5){
continue;
}
//清除jvm dns缓存ip信息
java.security.Security.setProperty("networkaddress.cache.ttl" , "0");
java.security.Security.setProperty("networkaddress.cache.negative.ttl", "0");
if ("1".equals(Constants.PROXY)){
Proxy p=new Proxy("",3232); //ProxyMap.getProxy();
logger.info("获取资讯内容url,启动代理ip,进行下一步内容处理");
if (p !=null){
HttpClientDownloader httpClientDownloader=new HttpClientDownloader();
httpClientDownloader.setProxyProvider(new SimpleProxyProvider(KafkaConsumers.getProxy()));
Spider.create(new BaiduTxtProcessor())
.setDownloader(httpClientDownloader)
.startRequest(requestsList)
.thread(1)
.runAsync();
}else {
Thread.sleep(1000*30);
continue;
}
}else {
logger.info("获取资讯内容url,组装好了http请求信息,提交到WebMgic..................");
Spider.create(new BaiduTxtProcessor())
.startRequest(requestsList)
.thread(1)
.runAsync();
}
requestsList=new ArrayList<>();
try {
Thread.sleep(1000*5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}catch (Exception e){
e.printStackTrace();
}
}
}
}
package com.zzsn.webMagic;
import com.zzsn.search.entity.ClbAnsProcessitem;
import com.zzsn.utility.util.DateUtil;
import org.apache.http.HttpHeaders;
import org.jsoup.select.Elements;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import us.codecraft.webmagic.Page;
import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.Site;
import us.codecraft.webmagic.Spider;
import us.codecraft.webmagic.downloader.HttpClientDownloader;
import us.codecraft.webmagic.processor.PageProcessor;
import us.codecraft.webmagic.proxy.Proxy;
import us.codecraft.webmagic.proxy.SimpleProxyProvider;
import us.codecraft.webmagic.selector.Html;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 根据百度关键字搜索返回列表结果处理
*/
public class BaiduKeyWordProcessor implements PageProcessor {
private final static Logger log = LoggerFactory.getLogger(BaiduKeyWordProcessor.class);
private ClbAnsProcessitem processitem=null;
private List<ClbAnsProcessitem> processitemList=null;
private Site site;
@Override
public void process(Page page) {
log.info("监听到百度关键字搜索,返回结果");
List<String> links=page.getHtml().xpath("//div[@id=content_left]/div[@tpl='news-normal']").all();
processitem=new ClbAnsProcessitem();
processitemList=new ArrayList<>();
Request request=page.getRequest();
log.info("获取到列表Url数:"+(links.size()));
for (String s:links){
processitem=new ClbAnsProcessitem();
//解析出标题,时间
String link="关键字列表";
try {
Html html=Html.create(s);
link=html.xpath("//div/div/h3").links().toString();
if (link == null || link.contains(".pdf") || link.trim().length()==0|| link.contains(".PDF")||link.contains("download")) {
BaiduTask.invalidUrl++;
continue;
}
Elements title=html.getDocument().select("a[data-click]");
Elements tiem=html.getDocument().select("span.c-color-gray2");
Elements org=html.getDocument().select("span.c-color-gray");
if (title !=null && title.get(0)!=null){
processitem.setTitle(title.get(0).text());
}
if (tiem!=null && tiem.size()>0){
processitem.setPublishDate(DateUtil.getPublishDate(tiem.get(0).text()));
}
if (org !=null && org.size()>0){
processitem.setSourceSite(org.get(0).text());
}
processitem.setSourceAddress(link);
processitem.setSid(request.getExtra("sid").toString());
processitem.setOrgId(request.getExtra("tid").toString());
processitem.setTid(request.getExtra("tid").toString());
processitem.setKeyWords(request.getExtra("words").toString());
//完成列表相关属性组装放入全局对列,等待下一步处理连接里的内容
BaiduTask.baiduUrlQueue.put(processitem); //加入到队列,如果队列满则阻塞
BaiduTask.linksUrl++;
}catch (Exception e){
log.error("解析列表Url["+page.getRequest().getUrl()+"]下的"+link+"+出错,错误信息:",e);
}
}
processitemList=null;
}
@Override
public Site getSite() {
if (site==null){
site=Site.me().setSleepTime(3000); //停3秒
site.addHeader("Content-Type","application/x-www-form-urlencoded;charset=utf-8");
site.addHeader("Accept", "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9");
site.setUserAgent("Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36");
site.setCycleRetryTimes(3);//设置循环次数
site.setTimeOut(8000);
}
return site;
}
// public static void main(String[] args) {
// List<Request> list=new ArrayList<>();
// Request request=new Request();
// Map<String,Object>map=new HashMap<>();
// map.put("aa","这是测试传参");
// request.setExtras(map);
// String url="https://sichuan.scol.com.cn/ggxw/202209/58604583.html";
// String url1="https://www.baidu.com/s?ie=utf-8&medium=0&rtt=1&bsst=1&rsv_dl=news_t_sk&cl=2&wd=%E5%9F%83%E5%A1%9E%E4%BF%84%E6%AF%94%E4%BA%9A%2B%E5%AE%A2%E6%9C%BA%2B%E9%81%87%E9%9A%BE&tn=news&rsv_bp=1&oq=&rsv_btype=t&f=8&pn=0";
// request.setUrl(url);
// list.add(request);
//// Request r2=new Request();
//// r2.setExtras(map);
//// r2.setUrl(url1);
//// list.add(r2);
// // Proxy proxy=new Proxy("113.243.178.169",40016,"hys_81310170_41c8","12345678");
// // HttpClientDownloader httpClientDownloader=new HttpClientDownloader();
// // httpClientDownloader.setProxyProvider(SimpleProxyProvider.from(proxy));
// Spider.create(new BaiduKeyWordProcessor()).thread(1)
// .startRequest(list)
// // .setDownloader(new SeleniumDownloader("E:\\chromdriver\\chromedriver.exe"))
// // .setDownloader(httpClientDownloader)
// // .addUrl(url)
// .runAsync();
//
// }
}
package com.zzsn.webMagic;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.Gson;
import com.zzsn.cache.JedisUtil;
import com.zzsn.cache.MemcachedUtils;
import com.zzsn.job.KafkaConsumerTask;
import com.zzsn.search.entity.ClbAnsProcessitem;
import com.zzsn.search.entity.KeywordMsg;
import com.zzsn.search.util.SplitKeyword;
import com.zzsn.utility.index.Constants;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
/**
* 百度关键字搜索爬虫任务
*/
@Component
@EnableScheduling
public class BaiduTask {
private final static Logger logger = LoggerFactory.getLogger(BaiduTask.class);
//缓存url列表对象对列,先进先出原则
public static LinkedBlockingQueue<ClbAnsProcessitem> baiduUrlQueue=new LinkedBlockingQueue();
/**
* 关键字队列
*/
public static LinkedBlockingQueue<KeywordMsg> keyWordsQueue=new LinkedBlockingQueue<>();
private final String BAIDU_KEY_URL="https://www.baidu.com/s?ie=utf-8&medium=0&rtt=1&bsst=1&rsv_dl=news_t_sk&cl=2&wd=%s&tn=news&rsv_bp=1&oq=&rsv_btype=t&f=8&pn=%d";
/**
* 关键词组个数
*/
static int keyWordsArrey=0;
/**
* 关键词个数
*/
static int keyWordsSigin=0;
/**
* 生成有效百度连接
*/
static int linksUrl=0;
/**
* 无效百度连接
*/
static int invalidUrl=0;
/**
* 重复连接
*/
static int doublep=0;
static int toKafka=0;
/**
* 根据关键词组搜索抓取连接列表
*/
@Scheduled(initialDelay = 40000,fixedRate = 1000*60*10)
public void getFormKeyWordsArry(){
//创建消费者
logger.info("10分钟跑一次。。。。。");
KafkaConsumer<String, String> consumer = KafkaConsumers.createConsumer();
ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
String kafkaConsumerPartition = Constants.KAFKA_CONSUMER_PARTITION;
String[] partitions = kafkaConsumerPartition.split(",");
for (int i = 0; i < partitions.length; i++) {
topicPartitions.add(new TopicPartition(Constants.KAFKA_CONSUMER_TOPIC, Integer.parseInt(partitions[i])));
}
consumer.assign(topicPartitions);
KeywordMsg keywordMsg=null; //关键词对象
KeywordMsg nextKeyWords=null; //关键词对象
List<String> keyword=null; //关键词字符串集合
List<KeywordMsg> keywordMsgsList=new ArrayList<>(); //保存所有关键字列表
for (int i=0;i<Constants.KAFKA_COUNT;i++){
ConsumerRecords<String, String> records = consumer.poll(100);
consumer.commitSync();
logger.info("百度关键字搜索消费消息开始。。。。。。。。。。。。");
for (ConsumerRecord consumerRecord:records){
//封装关键字对象
try {
keywordMsg = new Gson().fromJson(consumerRecord.value().toString(), KeywordMsg.class);
/***
* 拿到关键词组,由关键词组再次拼装成关键词生成关键词对象放入队列
* 下一步需要用代理ip来分包爬取连接
*/
if (keywordMsg !=null){
keyWordsArrey++;
//拿到关键词组,再次组装关键词对象
keyword= SplitKeyword.transForm(keywordMsg.getKeyWord());
if (keyword !=null && keyword.size()>0){
for (String keys:keyword){
keyWordsSigin++;
nextKeyWords=new KeywordMsg();
nextKeyWords.setId(keywordMsg.getId());
nextKeyWords.setCrawlerType(keywordMsg.getCrawlerType());
nextKeyWords.setKeyWord(keys);
nextKeyWords.setStatus(keywordMsg.getStatus());
nextKeyWords.setExclusionWord(keywordMsg.getExclusionWord());
nextKeyWords.setSubjectId(keywordMsg.getSubjectId());
nextKeyWords.setWordsCode(keywordMsg.getWordsCode());
nextKeyWords.setWordsName(keywordMsg.getWordsName());
keyWordsQueue.put(nextKeyWords);
}
}
}else {
logger.error("百度搜索关键字,未解析到关键字");
}
}catch (Exception e){
logger.error("从kafka获取关键字出错,错误信息:",e);
}
}
}
}
}
package com.zzsn.webMagic;
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.zzsn.cache.JedisUtil;
import com.zzsn.cache.MemcachedUtils;
import com.zzsn.docinfo.DocInfo;
import com.zzsn.entity.SiteTemplate;
import com.zzsn.paser.SourceTemplateByTag;
import com.zzsn.search.entity.ClbAnsProcessitem;
import com.zzsn.search.extractor.ContentFileFinder;
import com.zzsn.search.extractor.StandardWebExtractorHandler;
import com.zzsn.search.util.SpringContextUtil;
import com.zzsn.utility.index.Constants;
import com.zzsn.utility.model.ContentFileResult;
import com.zzsn.utility.util.RequestUtil;
import com.zzsn.utility.util.Utility;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHeaders;
import org.apache.ibatis.annotations.Param;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import us.codecraft.webmagic.Page;
import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.Site;
import us.codecraft.webmagic.processor.PageProcessor;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
/**
* 资讯正文抓取
*/
public class BaiduTxtProcessor implements PageProcessor {
private final static Logger log = LoggerFactory.getLogger(BaiduTxtProcessor.class);
private Site site;
public KafkaTemplate kafkaTemplate= SpringContextUtil.getBean(KafkaTemplate.class);
private ClbAnsProcessitem processitem=null;
private List<ClbAnsProcessitem> processitemList=null;
@Override
public void process(Page page) {
log.info("监听到进入内容解析中。。。。。。。。。。。。。");
Request request=page.getRequest();
processitem=new ClbAnsProcessitem();
processitemList=new ArrayList<>();
processitem=JSONObject.parseObject(request.getExtra("model").toString(),ClbAnsProcessitem.class);
processitem.setSource("3");
DocInfo docInfo = new DocInfo();
try {
String link=page.getUrl().toString(); //当前页面连接
String infodata=page.getHtml().toString(); //页面内容
//判断网页编码
String contentCharset = Utility.getWebEncodingByStr(infodata);
if(link.contains("toutiao.com") &&(null == infodata || infodata.length() < 50)){
infodata = RequestUtil.getTaotiaoData(link );
}
// 判断是否存在对应域名的模板
if(link.contains("qq.com") && !link.contains("://new.qq.com")){
link= KafkaConsumers.transqqURl(link);
}
String domainurl = new URL(link).getHost();
Object siteTempObj = MemcachedUtils.get("domainUri_"+domainurl);
SiteTemplate siteTemplate=new SiteTemplate();
if (siteTempObj != null && !"null".equals(siteTempObj)) {
com.zzsn.entity.Site site=(com.zzsn.entity.Site)siteTempObj;
siteTemplate.setMatchTitle(site.getMatchTitle());
siteTemplate.setMatchAuthor(site.getMatchAuthor());
siteTemplate.setMatchContent(site.getMatchContent());
siteTemplate.setMatchOrigin(site.getMatchOrigin());
siteTemplate.setMatchPublishDate(site.getMatchPublishDate());
siteTemplate.setMatchSummary(site.getMatchSummary());
docInfo= SourceTemplateByTag.doPaserByTag(infodata, docInfo, siteTemplate);
}
if(null!=docInfo.getContentWithTag()) {
System.out.println("使用模板解析内容成功"+domainurl);
log.info("使用模板解析内容成功"+domainurl);
}
//使用内容规则解析
if(null==docInfo.getContentWithTag() || docInfo.getContentWithTag().trim().length() == 0) {
new StandardWebExtractorHandler().doHandler(infodata, docInfo);
}
//替换图片相对路径为绝对路径
if (docInfo.getTitle() != null
&& docInfo.getTitle().trim().length() > 0
&& docInfo.getContentNoTag() != null
&& docInfo.getContentNoTag().trim().length() > 0){
ContentFileResult contentFileResult = new ContentFileResult();
contentFileResult =KafkaConsumers.getContentFile(docInfo.getContentWithTag(),docInfo.getSourceaddress());
docInfo.setContentWithTag(ContentFileFinder.rmHtmlImgOrAtag(contentFileResult.getContentImgCvtTag()));
docInfo.setContentImgCvtTag(contentFileResult.getContentImgCvtTag());
}
if (StringUtils.isNotBlank(docInfo.getPublishDate())){
processitem.setPublishDate(docInfo.getPublishDate());
}
//只有内容不为空的才进下一步处理
if (StringUtils.isNotBlank(docInfo.getContentNoTag())){
processitem.setContent(docInfo.getContentNoTag());
processitem.setContentWithtag(docInfo.getContentWithTag());
processitem.setSummary(docInfo.getSummary());
processitem.setAuthor(docInfo.getAuthor());
processitem.setCreateDate(DateUtil.now());
String msg=new ObjectMapper().writeValueAsString(processitem);
//输出
kafkaTemplate.send(Constants.testBaidu, msg);
BaiduTask.toKafka++;
log.info("加入到kfaka...........");
// 加入缓存池中
// JedisUtil.sadd("baidu::"+processitem.getOrgId(), processitem.getSourceAddress());
//测试redis
JedisUtil.sadd("baidu_web_test5::"+processitem.getOrgId(), processitem.getSourceAddress());
}else {
log.info("没有获取资讯内容,连接:"+link);
}
}catch (Exception e){
log.error("解析内容逻辑出错,错误信息:",e);
}
log.info("程序处理:关键词组总数:"+BaiduTask.keyWordsArrey+";关键词总数:"+BaiduTask.keyWordsSigin+";有效URL连接数:"+BaiduTask.linksUrl+";无效URL连接数:"+BaiduTask.invalidUrl+";重复连接个数:"+BaiduTask.doublep+";入kafak数量:"+BaiduTask.toKafka);
}
@Override
public Site getSite() {
if (site==null){
site=Site.me().setSleepTime(4000);
site.addHeader("Content-Type","application/x-www-form-urlencoded;charset=utf-8");
site.addHeader("Accept", "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8");
site.addHeader(HttpHeaders.CONNECTION, "close");
site.setUserAgent("Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.94 Safari/537.36");
site.setTimeOut(8000);
site.setCycleRetryTimes(3);
}
return site;
}
}
package com.zzsn.webMagic;
import com.zzsn.search.entity.ClbAnsProcessitem;
import com.zzsn.utility.util.DateUtil;
import com.zzsn.utils.GrabUtil;
import com.zzsn.webMagic.downloader.SeleniumDownloader;
import org.apache.http.HttpHeaders;
import org.jsoup.select.Elements;
import us.codecraft.webmagic.Page;
import us.codecraft.webmagic.Site;
import us.codecraft.webmagic.Spider;
import us.codecraft.webmagic.downloader.HttpClientDownloader;
import us.codecraft.webmagic.processor.PageProcessor;
import us.codecraft.webmagic.proxy.Proxy;
import us.codecraft.webmagic.proxy.SimpleProxyProvider;
import us.codecraft.webmagic.selector.Html;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory;
import java.util.List;
public class Expaler implements PageProcessor {
Site site;
@Override
public void process(Page page) {
// List<String> links=page.getHtml().xpath("//div[@id=content_left]/div[@tpl='news-normal']").all();
// links.forEach(t-> System.out.println(t));
System.out.println(page.getHtml().toString());
}
@Override
public Site getSite() {
if (site==null){
site=Site.me().setSleepTime(0);
site.addHeader("Content-Type","application/x-www-form-urlencoded;charset=utf-8");
site.addHeader("Accept", "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8");
site.setUserAgent("Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.94 Safari/537.36");
site.addHeader(HttpHeaders.CONNECTION, "close");
}
return site;
}
public static void main(String[] args) {
String url="https://www.163.com/dy/article/GTOJV0AF0551M1ZM.html";
Proxy proxy=new Proxy("114.102.52.200",61798,"hys_81310170_41c8","12345678");
HttpClientDownloader httpClientDownloader=new HttpClientDownloader();
httpClientDownloader.setProxyProvider(SimpleProxyProvider.from(KafkaConsumers.getProxy().get(0)));
SeleniumDownloader seleniumDownloader=new SeleniumDownloader("E:\\chromdriver\\chromedriver.exe");
seleniumDownloader.setProxyProvider(new SimpleProxyProvider(KafkaConsumers.getProxy()));
Spider.create(new Expaler()).thread(1)
// .startRequest(list)
.setDownloader(seleniumDownloader)
// .setDownloader(httpClientDownloader)
.addUrl(url)
.runAsync();
}
}
package com.zzsn.webMagic;
import cn.hutool.core.util.RandomUtil;
import com.zzsn.search.extractor.ContentFileFinder;
import com.zzsn.search.oracledb.OracleDBManager;
import com.zzsn.search.oracledb.OracleDataTable;
import com.zzsn.utility.index.Constants;
import com.zzsn.utility.model.ContentFileResult;
import com.zzsn.utility.model.FileTag;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import us.codecraft.webmagic.proxy.Proxy;
import java.sql.SQLException;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* kafka消费者
*/
public class KafkaConsumers {
public static org.apache.kafka.clients.consumer.KafkaConsumer<String, String> createConsumer() {
Properties properties = new Properties();
System.out.println(Constants.KAFKA_CONSUMER_SERVERS);
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, Constants.KAFKA_CONSUMER_SERVERS);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, Constants.KAFKA_CONSUMER_GROUP_ID);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
//kafka数据的读取方式
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,Constants.KAFKA_CONSUMER_AUTO_OFFSET_RESET);
// latest earliest
//时间间隔设置为1h
properties.put("max.poll.interval.ms", 60*60*1000);
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
return new KafkaConsumer<>(properties);
}
/**
* 从oracl里获取代理ip
* @return
*/
public static List<Proxy> getProxy(){
List<Proxy> proxyList=null;
String searchSql = "select proxy from CIS_sys_Proxy where id="+(RandomUtil.randomInt(4)+1);
// String searchSql = "select proxy from CIS_sys_Proxy ";
String proxy=null;
OracleDBManager dm = new OracleDBManager();
try {
OracleDataTable dt = dm.getResultData(null, null, searchSql);
if(dt != null && dt.getRowCount()> 0){
proxyList=new ArrayList<>();
for(int i = 0; i<dt.getRowCount(); i++){
for(int j = 0; j<dt.getColCoun(); j++)
if(dt.getRow()[i][j].length()>5){
proxy=dt.getRow()[i][j];
String[] ps=proxy.split("-");
proxyList.add(new Proxy(ps[0],Integer.valueOf(ps[1]),ps[2],ps[3]));
}
}
}
} catch (SQLException e) {
e.printStackTrace();
}
return proxyList;
}
//转换qq新闻链接
public static String transqqURl(String oldurl){
String patt="https://new.qq.com/omn/[date]/[pamars].html";
String b1=oldurl.substring(oldurl.lastIndexOf("/")+1);
String b2=getNumbers(b1);
String curl=patt.replace("[date]",b2).replace("[pamars]",b1);
return curl;
}
public static String getNumbers(String content) {
Pattern pattern = Pattern.compile("\\d+");
Matcher matcher = pattern.matcher(content);
while (matcher.find()) {
return matcher.group(0);
}
return "";
}
public static ContentFileResult getContentFile(String contentWithTag, String sourceaddress)throws Exception{
String contentImgCvtTag = contentWithTag;
String formatImgContent= contentWithTag;
Map<String, FileTag> imgDataMap = ContentFileFinder.getContentFileTag(contentWithTag,sourceaddress);
//key为图片爬取路径,value为图片保存路径
Map<String, FileTag> imgMap = new HashMap<String, FileTag>();
for (String key : imgDataMap.keySet()) {
FileTag fileTag = imgDataMap.get(key);
while (contentImgCvtTag.contains(key)) {
//IMG_SERVER开头的路径
contentImgCvtTag = contentImgCvtTag.replace(key, fileTag.getSaveTag());
}
imgMap.put(fileTag.getAbsolutePath(), fileTag);
}
ContentFileResult cis = new ContentFileResult();
cis.setContentAbsoulute(formatImgContent);
cis.setContentImgCvtTag(contentImgCvtTag);
cis.setFileMap(imgMap);
return cis;
}
}
package com.zzsn.webMagic;
import com.zzsn.search.entity.KeywordMsg;
import com.zzsn.utility.index.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.Spider;
import us.codecraft.webmagic.downloader.HttpClientDownloader;
import us.codecraft.webmagic.proxy.Proxy;
import us.codecraft.webmagic.proxy.SimpleProxyProvider;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.List;
/**
* 从队列里读取关键字
*/
public class LinksReadThread extends Thread {
private final static Logger logger = LoggerFactory.getLogger(LinksReadThread.class);
private final String BAIDU_KEY_URL="https://www.baidu.com/s?rtt=1&bsst=1&cl=2&tn=news&ie=utf-8&word=%s&pn=%d";
@Override
public void run() {
List<Request> requestByMcList=new ArrayList<>(); //reques列表
Request webReq=null;
while (true){
try {
KeywordMsg keywordMsg= BaiduTask.keyWordsQueue.take();
logger.info("等到关键字来了。。。。。。");
/****
* 根据关键字词组再次拼装成需要搜索的关键字
* 一个关键字词组能生成N个关键字,一个关键字词组启动一个webMagic爬虫任务
* 同时把关键字词组的基本属性传递给下一步
* 处理流程:1、初始化 webMagic
* 2、 代理获取:增加代理验证
* 3、由关键字拼接百度url,每个关键词只处理一页(10),后续可修改
*/
//组装下一步用 webMagic进行多线程处理,组装url
try {
for (int i=0;i<Constants.PAGESIZE;i++){
//根据配置文件配置抓取页数,此方法只对百度元搜索有用
webReq=new Request();
webReq.putExtra("sid",keywordMsg.getId());
webReq.putExtra("tid",keywordMsg.getWordsCode());
webReq.putExtra("words",keywordMsg.getKeyWord());
webReq.setUrl(String.format(BAIDU_KEY_URL, URLEncoder.encode(keywordMsg.getKeyWord(),"UTF-8"),(i*10)));
requestByMcList.add(webReq);
}
}catch (Exception e){
logger.error("根据关键词组生成request对象出错,错误信息:",e);
}
// if (requestByMcList.size()<10){
// continue;
// }
//清除jvm 缓存Ip
java.security.Security.setProperty("networkaddress.cache.ttl" , "0");
java.security.Security.setProperty("networkaddress.cache.negative.ttl", "0");
if (Constants.PROXY.equals("1")){
Proxy p=new Proxy("",32323);//ProxyMap.getProxy();
logger.info("获取待处理一批url,获取代理ip请等待进行下步处理");
if (p!=null){
HttpClientDownloader httpClientDownloader=new HttpClientDownloader();
httpClientDownloader.setProxyProvider(new SimpleProxyProvider(KafkaConsumers.getProxy()));
Spider.create(new BaiduKeyWordProcessor())
.setDownloader(httpClientDownloader)
.startRequest(requestByMcList)
.thread(1)
.runAsync();
}else {
Thread.sleep(1000*10);
continue;
}
}else {
//不走代理则直接开始跑
logger.info("获取待处理url提交到webMgic,提交数:"+requestByMcList.size());
Spider.create(new BaiduKeyWordProcessor())
.startRequest(requestByMcList)
.thread(1)
.runAsync();
}
requestByMcList=new ArrayList<>();
try {
Thread.sleep(1000*8);
} catch (InterruptedException e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
package com.zzsn.webMagic.downloader;
import org.openqa.selenium.*;
import org.openqa.selenium.chrome.ChromeDriver;
import org.openqa.selenium.chrome.ChromeOptions;
import org.openqa.selenium.remote.RemoteWebDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import us.codecraft.webmagic.Page;
import us.codecraft.webmagic.Request;
import us.codecraft.webmagic.Site;
import us.codecraft.webmagic.Task;
import us.codecraft.webmagic.downloader.Downloader;
import us.codecraft.webmagic.proxy.ProxyProvider;
import us.codecraft.webmagic.selector.Html;
import us.codecraft.webmagic.selector.PlainText;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* 使用Selenium调用浏览器进行渲染。目前仅支持chrome。<br>
* 需要下载Selenium driver支持。<br>
*
* @author code4crafter@gmail.com <br>
* Date: 13-7-26 <br>
* Time: 下午1:37 <br>
*/
public class SeleniumDownloader implements Downloader, Closeable {
private volatile WebDriverPool webDriverPool;
private Logger logger = LoggerFactory.getLogger(getClass());
private int sleepTime = 0;
private int poolSize = 1;
private volatile ChromeDriver webDriver;
private static final String DRIVER_PHANTOMJS = "phantomjs";
Dimension targetSize = new Dimension(600, 600);// 设置窗口大小
private ProxyProvider proxyProvider;
/**
* 新建
*
* @param chromeDriverPath chromeDriverPath
*/
public SeleniumDownloader(String chromeDriverPath) {
System.getProperties().setProperty("webdriver.chrome.driver",
chromeDriverPath);
}
/**
* Constructor without any filed. Construct PhantomJS browser
*
* @author bob.li.0718@gmail.com
*/
public SeleniumDownloader() {
// System.setProperty("phantomjs.binary.path",
// "/Users/Bingo/Downloads/phantomjs-1.9.7-macosx/bin/phantomjs");
}
public void setProxyProvider(ProxyProvider proxyProvider) {
this.proxyProvider = proxyProvider;
}
/**
* set sleep time to wait until load success
*
* @param sleepTime sleepTime
* @return this
*/
public SeleniumDownloader setSleepTime(int sleepTime) {
this.sleepTime = sleepTime;
return this;
}
@Override
public Page download(Request request, Task task) {
if (webDriver !=null){
webDriver.quit();
}
try {
ChromeOptions options=new ChromeOptions();
//设置 chrome 的无头模式
options.addArguments("--headless");
options.addArguments("--disable-gpu");
options.addArguments("--no-sandbox");
options.addArguments("--disable-dev-shm-usage");
options.addArguments("--start-maximized");
//因为报表页面必须滚动才能全部展示,这里直接给个很大的高度
options.addArguments("--window-size=1280,4300");
if (proxyProvider !=null){
us.codecraft.webmagic.proxy.Proxy p= proxyProvider.getProxy(task);
String proxyServer =p.getHost()+":"+p.getPort();
org.openqa.selenium.Proxy proxy = new Proxy().setHttpProxy(proxyServer).setSslProxy(proxyServer);
proxy.setSocksUsername("hys_81310170_41c8");
proxy.setSocksPassword("1234567");
options.setProxy(proxy);
}
webDriver = new ChromeDriver(options);
} catch (Exception e) {
logger.warn("interrupted", e);
return null;
}
webDriver.get(request.getUrl());
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
/*
* TODO You can add mouse event or other processes
*
* @author: bob.li.0718@gmail.com
*/
WebElement webElement = webDriver.findElement(By.xpath("/html"));
String content = webElement.getAttribute("outerHTML");
Page page = new Page();
page.setRawText(content);
page.setHtml(new Html(content, request.getUrl()));
page.setUrl(new PlainText(request.getUrl()));
page.setRequest(request);
webDriver.quit();
return page;
}
// private void checkInit() {
// if (webDriver == null) {
// synchronized (this) {
// webDriverPool = new WebDriverPool(poolSize);
// }
// }
// }
@Override
public void setThread(int thread) {
this.poolSize = thread;
}
@Override
public void close() throws IOException {
webDriver.quit();
}
}
package com.zzsn.webMagic.downloader;
import org.openqa.selenium.Proxy;
import org.openqa.selenium.WebDriver;
import org.openqa.selenium.chrome.ChromeDriver;
import org.openqa.selenium.chrome.ChromeOptions;
import org.openqa.selenium.firefox.FirefoxDriver;
import org.openqa.selenium.phantomjs.PhantomJSDriver;
import org.openqa.selenium.phantomjs.PhantomJSDriverService;
import org.openqa.selenium.remote.DesiredCapabilities;
import org.openqa.selenium.remote.RemoteWebDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileReader;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author code4crafter@gmail.com <br>
* Date: 13-7-26 <br>
* Time: 下午1:41 <br>
*/
class WebDriverPool {
private Logger logger = LoggerFactory.getLogger(getClass());
private final static int DEFAULT_CAPACITY = 5;
private final int capacity;
private final static int STAT_RUNNING = 1;
private final static int STAT_CLODED = 2;
private AtomicInteger stat = new AtomicInteger(STAT_RUNNING);
/*
* new fields for configuring phantomJS
*/
private WebDriver mDriver = null;
private boolean mAutoQuitDriver = true;
private static final String DEFAULT_CONFIG_FILE = "E:/chromdriver/config.ini";
private static final String DRIVER_FIREFOX = "firefox";
private static final String DRIVER_CHROME = "chrome";
private static final String DRIVER_PHANTOMJS = "phantomjs";
protected static Properties sConfig;
protected static DesiredCapabilities sCaps;
/**
* Configure the GhostDriver, and initialize a WebDriver instance. This part
* of code comes from GhostDriver.
* https://github.com/detro/ghostdriver/tree/master/test/java/src/test/java/ghostdriver
*
* @author bob.li.0718@gmail.com
* @throws IOException
*/
public void configure() throws IOException {
// Read config file
sConfig = new Properties();
String configFile = DEFAULT_CONFIG_FILE;
if (System.getProperty("selenuim_config")!=null){
configFile = System.getProperty("selenuim_config");
}
sConfig.load(new FileReader(configFile));
// Prepare capabilities
sCaps = new DesiredCapabilities();
sCaps.setJavascriptEnabled(true);
sCaps.setCapability("takesScreenshot", false);
String driver = sConfig.getProperty("driver", DRIVER_PHANTOMJS);
// Fetch PhantomJS-specific configuration parameters
if (driver.equals(DRIVER_PHANTOMJS)) {
// "phantomjs_exec_path"
if (sConfig.getProperty("phantomjs_exec_path") != null) {
sCaps.setCapability(
PhantomJSDriverService.PHANTOMJS_EXECUTABLE_PATH_PROPERTY,
sConfig.getProperty("phantomjs_exec_path"));
} else {
throw new IOException(
String.format(
"Property '%s' not set!",
PhantomJSDriverService.PHANTOMJS_EXECUTABLE_PATH_PROPERTY));
}
// "phantomjs_driver_path"
if (sConfig.getProperty("phantomjs_driver_path") != null) {
System.out.println("Test will use an external GhostDriver");
sCaps.setCapability(
PhantomJSDriverService.PHANTOMJS_GHOSTDRIVER_PATH_PROPERTY,
sConfig.getProperty("phantomjs_driver_path"));
} else {
System.out
.println("Test will use PhantomJS internal GhostDriver");
}
}
// Disable "web-security", enable all possible "ssl-protocols" and
// "ignore-ssl-errors" for PhantomJSDriver
// sCaps.setCapability(PhantomJSDriverService.PHANTOMJS_CLI_ARGS, new
// String[] {
// "--web-security=false",
// "--ssl-protocol=any",
// "--ignore-ssl-errors=true"
// });
ArrayList<String> cliArgsCap = new ArrayList<String>();
cliArgsCap.add("--web-security=false");
cliArgsCap.add("--ssl-protocol=any");
cliArgsCap.add("--ignore-ssl-errors=true");
sCaps.setCapability(PhantomJSDriverService.PHANTOMJS_CLI_ARGS,
cliArgsCap);
// Control LogLevel for GhostDriver, via CLI arguments
sCaps.setCapability(
PhantomJSDriverService.PHANTOMJS_GHOSTDRIVER_CLI_ARGS,
new String[] { "--logLevel="
+ (sConfig.getProperty("phantomjs_driver_loglevel") != null ? sConfig
.getProperty("phantomjs_driver_loglevel")
: "INFO") });
// String driver = sConfig.getProperty("driver", DRIVER_PHANTOMJS);
// Start appropriate Driver
if (isUrl(driver)) {
sCaps.setBrowserName("phantomjs");
mDriver = new RemoteWebDriver(new URL(driver), sCaps);
} else if (driver.equals(DRIVER_FIREFOX)) {
mDriver = new FirefoxDriver(sCaps);
} else if (driver.equals(DRIVER_CHROME)) {
ChromeOptions options=new ChromeOptions();
//设置 chrome 的无头模式
options.addArguments("--headless");
options.addArguments("--disable-gpu");
options.addArguments("--no-sandbox");
options.addArguments("--disable-dev-shm-usage");
options.addArguments("--start-maximized");
//因为报表页面必须滚动才能全部展示,这里直接给个很大的高度
options.addArguments("--window-size=1280,4300");
String proxyServer = "1.83.251.72:40013";
Proxy proxy = new Proxy().setHttpProxy(proxyServer).setSslProxy(proxyServer);
options.setProxy(proxy);
mDriver = new ChromeDriver(options);
} else if (driver.equals(DRIVER_PHANTOMJS)) {
mDriver = new PhantomJSDriver(sCaps);
}
}
/**
* check whether input is a valid URL
*
* @author bob.li.0718@gmail.com
* @param urlString urlString
* @return true means yes, otherwise no.
*/
private boolean isUrl(String urlString) {
try {
new URL(urlString);
return true;
} catch (MalformedURLException mue) {
return false;
}
}
/**
* store webDrivers created
*/
private List<WebDriver> webDriverList = Collections
.synchronizedList(new ArrayList<WebDriver>());
/**
* store webDrivers available
*/
private BlockingDeque<WebDriver> innerQueue = new LinkedBlockingDeque<WebDriver>();
public WebDriverPool(int capacity) {
this.capacity = capacity;
}
public WebDriverPool() {
this(DEFAULT_CAPACITY);
}
/**
*
* @return
* @throws InterruptedException
*/
public WebDriver get() throws InterruptedException {
checkRunning();
WebDriver poll = innerQueue.poll();
if (poll != null) {
return poll;
}
if (webDriverList.size() < capacity) {
synchronized (webDriverList) {
if (webDriverList.size() < capacity) {
// add new WebDriver instance into pool
try {
configure();
innerQueue.add(mDriver);
webDriverList.add(mDriver);
} catch (IOException e) {
e.printStackTrace();
}
// ChromeDriver e = new ChromeDriver();
// WebDriver e = getWebDriver();
// innerQueue.add(e);
// webDriverList.add(e);
}
}
}
return innerQueue.take();
}
public void returnToPool(WebDriver webDriver) {
checkRunning();
innerQueue.add(webDriver);
}
protected void checkRunning() {
if (!stat.compareAndSet(STAT_RUNNING, STAT_RUNNING)) {
throw new IllegalStateException("Already closed!");
}
}
public void closeAll() {
boolean b = stat.compareAndSet(STAT_RUNNING, STAT_CLODED);
if (!b) {
throw new IllegalStateException("Already closed!");
}
for (WebDriver webDriver : webDriverList) {
logger.info("Quit webDriver" + webDriver);
webDriver.quit();
webDriver = null;
}
}
}
......@@ -34,7 +34,7 @@ kafka.producer.servers=114.115.159.144:9092
kafka.producer.retries=0
kafka.producer.batch.size=4096
kafka.producer.linger=1
kafka.producer.buffer.memory=40960
kafka.producer.buffer.memory=4096000
# slf4j????
logging.level.root=info
......
......@@ -23,9 +23,19 @@ THREAD_SIZE=1
#地址
KAFKA_CONSUMER_SERVERS=114.115.159.144:9092
#消费主题
KAFKA_CONSUMER_TOPIC=keyWordsInfo
#KAFKA_CONSUMER_TOPIC=keyWordsInfo
#测试主题
KAFKA_CONSUMER_TOPIC=baiduTest
#消费者
KAFKA_CONSUMER_GROUP_ID=baidu-sync
#KAFKA_CONSUMER_GROUP_ID=baidu-web-test
#测试消费者
KAFKA_CONSUMER_GROUP_ID=baidu-wemagic
#发送消息
KAFKA_test_TOPIC=baidu-bind2-test
#KAFKA_test_TOPIC=baidu-new-test
#kafka消费信息模式
KAFKA_CONSUMER_AUTO_OFFSET_RESET=earliest
#信息发送主题
......@@ -40,7 +50,6 @@ KAFKA_PRODUCT_GOOGLE_URLLIST_TOPIC=google_crawler_urlList
#KAFKA_PRODUCT_PASERURL_TOPIC=baidu_crawler_paserurl
#搜索地址
#META_SEARCH_URL=https://www.google.com.hk/search?q=[keyword]&newwindow=1&tbs=cdr:1,cd_min:[startTime],cd_max:[endTime]&tbm=nws&ei=fYBfYp-CHffo2roPhoOPsA4&start=[pn]
#META_SEARCH_URL=https://www.baidu.com/s?rtt=1&bsst=1&cl=2&tn=news&ie=utf-8&word=
......@@ -65,3 +74,10 @@ redis.maxTotal=600
# \u6BEB\u79D2
redis.maxWaitMillis=1000
redis.testOnBorrow=false
#循环kafak次数
whiles=10
#抓取页数,只对百度元搜索爬虫有效
pageSize=5
#平分因子
averger=2000
......@@ -17,13 +17,18 @@
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<!--日志文件输出的文件名 -->
<FileNamePattern>${LOG_HOME}/cmppv2-%d{yyyy-MM-dd}.%i.log</FileNamePattern>
<!--日志文件保留天数 -->
<!-- <MaxHistory>-1</MaxHistory> -->
<maxFileSize>300MB</maxFileSize>
<!-- 按照文件大小拆分: 文件超过maxFileSize就开始压缩存档到:*.log.gz文件中 -->
<maxFileSize>50MB</maxFileSize>
<maxHistory>30</maxHistory>
<!-- 存档文件的总大小最多为1GB : Total size of all archived files is at most 1GB -->
<!--<totalSizeCap>1GB</totalSizeCap>-->
</rollingPolicy>
<!-- 追加方式记录日志 -->
<append>true</append>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符 -->
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50}:%L - %msg%n</pattern>
<charset>utf-8</charset>
</encoder>
</appender>
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论