提交 887d3377 作者: LiuLiYuan

lly 2024-07-23

上级 aba8680d
......@@ -6,6 +6,7 @@
<component name="ChangeListManager">
<list default="true" id="bc1f53d8-47f4-4dbb-add3-cecaf77d3733" name="变更" comment="">
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/.idea/zzsn.iml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/zzsn.iml" afterDir="false" />
</list>
<option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" />
......@@ -16,7 +17,9 @@
<option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$" />
</component>
<component name="ProjectId" id="2jJzRw0P9x2yKwi4olPMaOMjtRG" />
<component name="ProjectLevelVcsManager" settingsEditedManually="true" />
<component name="ProjectLevelVcsManager" settingsEditedManually="true">
<ConfirmationsSetting value="1" id="Add" />
</component>
<component name="ProjectViewState">
<option name="hideEmptyMiddlePackages" value="true" />
<option name="showLibraryContents" value="true" />
......@@ -25,11 +28,17 @@
<property name="RunOnceActivity.OpenProjectViewOnStart" value="true" />
<property name="RunOnceActivity.ShowReadmeOnStart" value="true" />
<property name="WebServerToolWindowFactoryState" value="false" />
<property name="last_opened_file_path" value="$PROJECT_DIR$/yjzx/zk" />
<property name="node.js.detected.package.eslint" value="true" />
<property name="node.js.detected.package.tslint" value="true" />
<property name="node.js.selected.package.eslint" value="(autodetect)" />
<property name="node.js.selected.package.tslint" value="(autodetect)" />
</component>
<component name="RecentsManager">
<key name="CopyFile.RECENT_KEYS">
<recent name="F:\zzsn\yjzx\zk" />
</key>
</component>
<component name="RunManager">
<configuration name="main" type="PythonConfigurationType" factoryName="Python" nameIsGenerated="true">
<module name="zzsn" />
......@@ -66,6 +75,7 @@
<workItem from="1721120377469" duration="1993000" />
<workItem from="1721124092579" duration="948000" />
<workItem from="1721132090791" duration="2000" />
<workItem from="1721723146118" duration="951000" />
</task>
<task id="LOCAL-00001" summary="lly 2024-07-16">
<created>1721124469886</created>
......@@ -74,7 +84,14 @@
<option name="project" value="LOCAL" />
<updated>1721124469886</updated>
</task>
<option name="localTasksCounter" value="2" />
<task id="LOCAL-00002" summary="lly 2024-07-23">
<created>1721723190158</created>
<option name="number" value="00002" />
<option name="presentableId" value="LOCAL-00002" />
<option name="project" value="LOCAL" />
<updated>1721723190158</updated>
</task>
<option name="localTasksCounter" value="3" />
<servers />
</component>
<component name="TypeScriptGeneratedFilesManager">
......@@ -82,7 +99,8 @@
</component>
<component name="VcsManagerConfiguration">
<MESSAGE value="lly 2024-07-16" />
<option name="LAST_COMMIT_MESSAGE" value="lly 2024-07-16" />
<MESSAGE value="lly 2024-07-23" />
<option name="LAST_COMMIT_MESSAGE" value="lly 2024-07-23" />
</component>
<component name="XDebuggerManager">
<breakpoint-manager>
......
......@@ -2,7 +2,7 @@
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="jdk" jdkName="Python 3.9" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>
\ No newline at end of file
# -*- coding: utf-8 -*-
import datetime
import json
import re
import time
from selenium import webdriver
from selenium.webdriver.edge.options import Options
from selenium.webdriver.edge.service import Service
import requests
from bs4 import BeautifulSoup
from requests.packages.urllib3.exceptions import InsecureRequestWarning
from retry import retry
from base import BaseCore
from download import download, paserUrl, sendKafka
import BaseCore
from download import download, paserUrl, sendKafka, Monitor
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
headers = {
'Accept': 'application/json, text/javascript, */*; q=0.01',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
# 'Accept-Encoding': 'gzip, deflate, br, zstd',
'Accept-Language': 'zh-CN,zh-TW;q=0.9,zh;q=0.8',
'Priority': 'u=1, i',
'Referer': 'https://www.cgdev.org/section/publications',
'Sec-Ch-Ua': '"Microsoft Edge";v="125", "Chromium";v="125", "Not.A/Brand";v="24"',
'Cache-Control': 'max-age=0',
'Priority': 'u=0, i',
'Cookie': 'visitor_id683263=1071035123; visitor_id683263-hash=74b5941f281dd4f40744d28330879d11882d02331f4a112c6e84db3df77cb144f6c30f3c54841cd502959c8b96479a50a70c4b8b; fpestid=E17SOVqAsTf227d5BfW0pcHp9pYALqjFJjqLrbn0xkoN02pouTer91bFKzte-3bfmq7lxA; _ga_4ELC9TB7D4=GS1.1.1718247124.1.1.1718248637.0.0.0; _ga_LS7XF9SSMB=GS1.1.1718247124.1.1.1718248639.0.0.0; _fbp=fb.1.1719975897499.22630470718587015; _ga=GA1.2.672074336.1718187029; _ga_HRVPCL33QJ=GS1.1.1719978739.5.0.1719978740.59.0.0',
'Sec-Ch-Ua': '"Not/A)Brand";v="8", "Chromium";v="126", "Microsoft Edge";v="126"',
'Sec-Ch-Ua-Mobile': '?0',
'Sec-Ch-Ua-Platform': '"Windows"',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36 Edg/125.0.0.0',
'X-Requested-With': 'XMLHttpRequest',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-User': '?1',
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36 Edg/126.0.0.0',
}
types = ['document-disclaimer', 'photo_credit', 'topics-listing']
def buildDriver():
# ip = baseCore.get_proxy()
edge_service = Service(r'D:\zzsn_spider\zk\msedgedriver.exe')
edge_options = Options()
# 开启开发者模式
edge_options.add_experimental_option('excludeSwitches', ['enable-automation'])
# 禁用启用Blink运行时的功能
edge_options.add_argument('--disable-blink-features=AutomationControlled')
edge_options.add_argument('--silent')
# edge_options.add_argument('--proxy-server=%s' % ip['http'])
# prefs = {"profile.managed_default_content_settings.images": 2, 'permissions.default.stylesheet': 2}
# edge_options.add_experimental_option("prefs", prefs)
driver = webdriver.Edge(service=edge_service, options=edge_options)
return driver
def is_member_containing_string(string):
cursor = '0'
while True:
......@@ -47,19 +69,32 @@ def is_member_containing_string(string):
@retry(tries=5, delay=5)
def getDataJson(url):
req = requests.get(url, headers=headers)
datasJson = req.json()
req.close()
# req = requests.get(url, headers=headers)
# datasJson = req.json()
# print(req.text)
# datasText = req.text
driver = buildDriver()
driver.get(url)
time.sleep(3)
datasText = driver.page_source
log.info(datasText)
datasText = re.findall('<textarea>(.*)</textarea>', datasText)[0]
datasJson = json.loads(datasText)
driver.close()
return datasJson
@retry(tries=5, delay=5)
def getSoup(url):
req = requests.get(url, headers=headers)
if req.status_code != 200:
raise
req.encoding = req.apparent_encoding
soup = BeautifulSoup(req.text, 'html.parser')
# req = requests.get(url, headers=headers)
# if req.status_code != 200:
# raise
# req.encoding = req.apparent_encoding
driver = buildDriver()
driver.get(url)
time.sleep(5)
soup = BeautifulSoup(driver.page_source, 'html.parser')
driver.close()
return soup
......@@ -81,46 +116,45 @@ def getDic(url, title, publishDate):
pdfHref = soup.find('div', attrs={'id': 'node-title-links'}).find('a', class_='gold').get('href')
if 'https:' not in pdfHref:
pdfHref = 'https://www.cgdev.org' + pdfHref
attachmentId = download(pdfHref, title, publishDate, headers)
attachmentId = download(pdfHref, title, publishDate, headers,'Center for Global Development')
if not attachmentId:
raise
attachmentIds.append(attachmentId)
except:
pass
uniqueCode = baseCore.getUniqueCode('AA', '232')
dic_news = {
'attachmentIds': attachmentIds,
'author': '',
'content': content,
'contentWithTag': str(contentWithTag),
'createDate': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': lang,
'origin': 'Center for Global Development',
'publishDate': publishDate,
'sid': '1534719607123562498',
'sourceAddress': url,
'summary': '',
'title': title,
'type': '0'
'source': 16,
'uniqueCode': uniqueCode,
}
return dic_news
def doJob():
for page in range(321):
for page in range(3):
log.info(f'开始采集第{page + 1}页')
url = f'https://www.cgdev.org/views/ajax?_wrapper_format=drupal_ajax&view_name=publications_landing&view_display_id=page_1&view_args=&view_path=%2Fsection%2Fpublications&view_base_path=section%2Fpublications&view_dom_id=12f9a0cad5ad0bcc4e400e0826e10f0ac27f401991ecc8edf4ed9ef3af4ac942&pager_element=0&page={page}&_drupal_ajax=1&ajax_page_state%5Btheme%5D=cgdw3css&ajax_page_state%5Btheme_token%5D=&ajax_page_state%5Blibraries%5D=eJx9UEkOwyAM_BCEQy_5TURgQtMCjmK3JL8voqqUA-rF8iy2xp4hgn3CsRHDT8saK2QTkLHbqFzw5eaYza_RIdJ8FU5RjnYYv782GwdHMdqNob54nJprkjsSTJf043WxdpQ2ysjC_91EIUIvlEXjqJlzDdUbaA5bwJT6ocpNV6AW6yD8O-OLBneHe8506LL6AOmb3isKa_uwh-KTBcnMtj6g0abVoYlXIpF_RXwAh_aRbQ'
datasJson = getDataJson(url)
for dataJson in datasJson:
if dataJson['command'] == 'insert' and dataJson['data']:
break
else:
log.error(f'第{page + 1}页数据获取失败')
continue
data = dataJson['data']
soup = BeautifulSoup(data, 'html.parser')
# url = f'https://www.cgdev.org/views/ajax?_wrapper_format=drupal_ajax&view_name=publications_landing&view_display_id=page_1&view_args=&view_path=%2Fsection%2Fpublications&view_base_path=section%2Fpublications&view_dom_id=cecf55db6e8ed558ede9911ca173e7691799ab6ba91bf6a3bcd39bfca8d3e0e6&pager_element=0&page={page}&_drupal_ajax=1&ajax_page_state%5Btheme%5D=cgdw3css&ajax_page_state%5Btheme_token%5D=&ajax_page_state%5Blibraries%5D=eJx9UEkOwyAM_BCEQy_5TURgQtMCjmK3JL8voqqUA-rF8iy2xp4hgn3CsRHDT8saK2QTkLHbqFzw5eaYza_RIdJ8FU5RjnYYv782GwdHMdqNob54nJprkjsSTJf043WxdpQ2ysjC_91EIUIvlEXjqJlzDdUbaA5bwJT6ocpNV6AW6yD8O-OLBneHe8506LL6AOmb3isKa_uwh-KTBcnMtj6g0abVoYlXIpF_RXwAh_aRbQ'
# datasJson = getDataJson(url)
# for dataJson in datasJson:
# if dataJson['command'] == 'insert' and dataJson['data']:
# break
# else:
# log.error(f'第{page + 1}页数据获取失败')
# continue
# data = dataJson['data']
url = f'https://www.cgdev.org/section/publications?page={page}'
soup = getSoup(url)
# soup = BeautifulSoup(data, 'html.parser')
divList = soup.find_all('div', class_='views-row')
for divTag in divList:
title = divTag.find('a').text.strip()
......@@ -141,6 +175,10 @@ def doJob():
if dic:
if sendKafka(dic):
baseCore.r.sadd('ZKresearchReport:Center for Global Development', href)
try:
Monitor(dic)
except Exception as e:
log.error(f'数据监控发送失败==={e}')
time.sleep(5)
......
# -*- coding: utf-8 -*-
import datetime
import json
import time
import requests
from bs4 import BeautifulSoup
from kafka import KafkaProducer
from retry import retry
from requests.packages.urllib3.exceptions import InsecureRequestWarning
from base import BaseCore
from download import download, sendKafka, paserUrl
import sys
sys.path.append(r'D:\zzsn_spider\base')
import BaseCore
from download import download, sendKafka, paserUrl, getSidName
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding': 'gzip, deflate, br, zstd',
#'Accept-Encoding': 'gzip, deflate, br, zstd',
'Accept-Language': 'zh-CN,zh-TW;q=0.9,zh;q=0.8',
'Priority': 'u=0, i',
'Referer': 'https://www.cnas.org/reports',
......@@ -30,6 +35,42 @@ headers = {
}
def Monitor(dic):
sidName = getSidName(dic['sid'])
monitor = {
"title": dic['title'], # 标题
"sourceAddress": dic['sourceAddress'], # 原文链接
"uniqueCode": dic['uniqueCode'], # 唯一编码 采集类型+6位日期+服务器序列+线程序列+自定义数字
"operateType": "DATA_CRAWLER", # 操作类型 写死
"handlerBody": {
"success": True, # 处理成功或失败状态 写死
"handlerStatus": "CRAWLED" # 处理状态 写死
},
"source": {
"sourceId": dic['sid'], # 信息源Id
"sourceName": sidName, # 信息源名称
"sourceType": 16, # 信息源类型 sourceType枚举字典
},
"processTime": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), # 处理时间 yyyy-MM-dd HH:mm:ss
"server": {
"serverIp": "94.74.96.195", # 所在服务器IP
"serverHostName": "数据采集服务", # 服务器名称
"processId": baseCore.getPID() # 进程Id
}
}
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 7, 0))
try:
kafka_result = producer.send("data-crawler", json.dumps(monitor, ensure_ascii=False).encode('utf8'))
log.info('监控数据发送Kafka成功')
except Exception as e:
monitor = json.dumps(monitor, ensure_ascii=False)
monitorDic = {
'lifecycle_data_crawler': monitor
}
baseCore.r.xadd('data_lifecycle_log_data_crawler-redis', monitorDic, id='*')
log.info('数据监控发送Kafka失败,已放置Redis中')
def is_member_containing_string(string):
cursor = '0'
while True:
......@@ -47,6 +88,7 @@ def is_member_containing_string(string):
@retry(tries=5, delay=5)
def getSoup(url):
req = requests.get(url, headers=headers, timeout=20)
req.encoding = req.apparent_encoding
if req.status_code != 200:
raise
soup = BeautifulSoup(req.text, 'html.parser')
......@@ -59,7 +101,7 @@ def getDic(url, title, publishDate):
soup = paserUrl(soup, url)
contentWithTag = soup.find('div', attrs={'id': 'mainbar'})
if not contentWithTag:
contentWithTag = soup.find('div',attrs={'id':'mainbar-toc'})
contentWithTag = soup.find('div', attrs={'id': 'mainbar-toc'})
if not contentWithTag:
return {}
try:
......@@ -77,36 +119,38 @@ def getDic(url, title, publishDate):
try:
if pdfHref:
attachmentId = download(pdfHref, title, publishDate, headers)
if not attachmentId:
return {}
attachmentIds.append(attachmentId)
except:
return {}
lang = baseCore.detect_language(content)
uniqueCode = baseCore.getUniqueCode('AA', '195')
dic_news = {
'attachmentIds': attachmentIds,
'author': '',
'content': content,
'contentWithTag': str(contentWithTag),
'createDate': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': lang,
'origin': 'Center for a New American Security',
'publishDate': publishDate,
'sid': '1774678285700079617',
'sourceAddress': url,
'summary': '',
'title': title,
'type': '0'
'source': 16,
'uniqueCode': uniqueCode,
}
return dic_news
def doJob():
for page in range(1, 34):
url = 'https://www.cnas.org/reports/p1'
for page in range(1, 3):
url = f'https://www.cnas.org/reports/p{page}'
#url = f'https://www.cnas.org/reports/p1'
soup = getSoup(url)
#log.info(soup)
liList = soup.find_all('li', class_='-with-image')
#log.info(f'{page}==={len(liList)}')
for liTag in liList:
title = liTag.find('a', class_='margin-top-half-em').text.strip()
href = liTag.find('a', class_='margin-top-half-em').get('href')
......@@ -115,16 +159,18 @@ def doJob():
if is_member_containing_string(href):
log.info(f'{title}===已采集')
continue
log.info(f'开始采集==={title}')
publihsDate = liTag.find('ul', class_='entry-listing__meta').text.strip()
publihsDate = datetime.datetime.strptime(publihsDate, '%B %d, %Y').strftime('%Y-%m-%d %H:%M:%S')
try:
dic = getDic(href, title, publihsDate)
except:
log.info(f"{title}===数据获取失败")
continue
except Exception as e:
log.info(f"{title}===数据获取失败==={e}")
continue
if dic:
if sendKafka(dic):
baseCore.r.sadd('ZKresearchReport:Center for a New American Security', href)
Monitor(dic)
time.sleep(5)
......
# -*- coding: utf-8 -*-
import datetime
import json
import re
import time
import requests
from bs4 import BeautifulSoup
from download import sendKafka, paserUrl, download
from base import BaseCore
from kafka import KafkaProducer
from download import sendKafka, paserUrl, download, getSidName
import sys
sys.path.append(r'D:\zzsn_spider\base')
import BaseCore
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
......@@ -26,6 +32,42 @@ headers = {
}
def Monitor(dic):
sidName = getSidName(dic['sid'])
monitor = {
"title": dic['title'], # 标题
"sourceAddress": dic['sourceAddress'], # 原文链接
"uniqueCode": dic['uniqueCode'], # 唯一编码 采集类型+6位日期+服务器序列+线程序列+自定义数字
"operateType": "DATA_CRAWLER", # 操作类型 写死
"handlerBody": {
"success": True, # 处理成功或失败状态 写死
"handlerStatus": "CRAWLED" # 处理状态 写死
},
"source": {
"sourceId": dic['sid'], # 信息源Id
"sourceName": sidName, # 信息源名称
"sourceType": 16, # 信息源类型 sourceType枚举字典
},
"processTime": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), # 处理时间 yyyy-MM-dd HH:mm:ss
"server": {
"serverIp": "94.74.96.195", # 所在服务器IP
"serverHostName": "数据采集服务", # 服务器名称
"processId": baseCore.getPID() # 进程Id
}
}
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 7, 0))
try:
kafka_result = producer.send("data-crawler", json.dumps(monitor, ensure_ascii=False).encode('utf8'))
log.info('监控数据发送Kafka成功')
except Exception as e:
monitor = json.dumps(monitor, ensure_ascii=False)
monitorDic = {
'lifecycle_data_crawler': monitor
}
baseCore.r.xadd('data_lifecycle_log_data_crawler-redis', monitorDic, id='*')
log.info('数据监控发送Kafka失败,已放置Redis中')
def is_member_containing_string(string):
cursor = '0'
while True:
......@@ -73,29 +115,26 @@ def getDic(url, title, publishDate):
return {}
except:
return {}
uniqueCode = baseCore.getUniqueCode('AB', '195')
dic_news = {
'attachmentIds': attachmentIds,
'author': '',
'content': content,
'contentWithTag': str(contentWithTag),
'createDate': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': lang,
'origin': 'Center for Security and Emerging Technology',
'publishDate': publishDate,
'sid': '1798649331272699906',
'sourceAddress': url,
'summary': '',
'title': title,
'type': '0'
'source': 16,
'uniqueCode': uniqueCode,
}
return dic_news
def doJob():
for page in range(1, 12):
for page in range(1, 3):
url = f'https://cset.georgetown.edu/publications/?fwp_content_type=analysis%2Cdata-brief%2Cformal-response&fwp_paged={page}&fwp_sort=date_desc'
dataPost = {"action": "facetwp_refresh", "data": {"facets": {"document_search": "", "topic": [], "content_type": ["analysis", "data-brief", "formal-response"]}, "frozen_facets": {},
"http_params": {"get": {"fwp_content_type": "analysis%2Cdata-brief%2Cformal-response", "fwp_paged": "2", "fwp_sort": "date_desc"},
......@@ -131,6 +170,7 @@ def doJob():
if dic:
if sendKafka(dic):
baseCore.r.sadd('ZKresearchReport:Center for Security and Emerging Technology', href)
Monitor(dic)
time.sleep(1)
time.sleep(5)
......
# -*- coding: utf-8 -*-
import datetime
import json
import requests
from bs4 import BeautifulSoup
from kafka import KafkaProducer
from retry import retry
from requests.packages.urllib3.exceptions import InsecureRequestWarning
from base import BaseCore
from download import download, paserUrl, sendKafka
import sys
sys.path.append(r'D:\zzsn_spider\base')
import BaseCore
from download import download, paserUrl, sendKafka, getSidName
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
......@@ -42,6 +47,42 @@ def getSoup(url):
return soup
def Monitor(dic):
sidName = getSidName(dic['sid'])
monitor = {
"title": dic['title'], # 标题
"sourceAddress": dic['sourceAddress'], # 原文链接
"uniqueCode": dic['uniqueCode'], # 唯一编码 采集类型+6位日期+服务器序列+线程序列+自定义数字
"operateType": "DATA_CRAWLER", # 操作类型 写死
"handlerBody": {
"success": True, # 处理成功或失败状态 写死
"handlerStatus": "CRAWLED" # 处理状态 写死
},
"source": {
"sourceId": dic['sid'], # 信息源Id
"sourceName": sidName, # 信息源名称
"sourceType": 16, # 信息源类型 sourceType枚举字典
},
"processTime": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), # 处理时间 yyyy-MM-dd HH:mm:ss
"server": {
"serverIp": "94.74.96.195", # 所在服务器IP
"serverHostName": "数据采集服务", # 服务器名称
"processId": baseCore.getPID() # 进程Id
}
}
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 7, 0))
try:
kafka_result = producer.send("data-crawler", json.dumps(monitor, ensure_ascii=False).encode('utf8'))
log.info('监控数据发送Kafka成功')
except Exception as e:
monitor = json.dumps(monitor, ensure_ascii=False)
monitorDic = {
'lifecycle_data_crawler': monitor
}
baseCore.r.xadd('data_lifecycle_log_data_crawler-redis', monitorDic, id='*')
log.info('数据监控发送Kafka失败,已放置Redis中')
def is_member_containing_string(string):
cursor = '0'
while True:
......@@ -88,29 +129,26 @@ def getDic(url, title, publishDate):
divTag.decompose()
content = contentWithTag.text
lang = baseCore.detect_language(content)
uniqueCode = baseCore.getUniqueCode('AC', '195')
dic_news = {
'attachmentIds': attachmentIds,
'author': '',
'content': content,
'contentWithTag': str(contentWithTag),
'createDate': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': lang,
'origin': 'Center for Strategic and International Studies(USA)',
'publishDate': publishDate,
'sid': '1572103917878022145',
'sourceAddress': url,
'summary': '',
'title': title,
'type': '0'
'source': 16,
'uniqueCode': uniqueCode,
}
return dic_news
def doJob():
for page in range(614):
for page in range(3):
url = f'https://www.csis.org/analysis?f[1]=date_published%3A2021&f[2]=date_published%3A2022&f[3]=date_published%3A2023&f[4]=date_published%3A2024&f[0]=date_published%3A2020&page={page}'
soup = getSoup(url)
divList = soup.find_all('div', class_='views-row')
......@@ -127,14 +165,13 @@ def doJob():
publishDate = datetime.datetime.strptime(publishDate, '%B %d, %Y').strftime('%Y-%m-%d %H:%M:%S')
try:
dic = getDic(href, title, publishDate)
except:
log.info(f'{title}===数据获取失败')
except Exception as e:
log.info(f'{title}===数据获取失败==={e}')
continue
if dic:
if sendKafka(dic):
baseCore.r.sadd('ZKresearchReport:Center for Strategic and International Studies(USA)', href)
break
break
Monitor(dic)
if __name__ == '__main__':
......
# -*- coding: utf-8 -*-
import datetime
import json
import time
import traceback
......@@ -11,12 +12,10 @@ from kafka import KafkaProducer
from obs import ObsClient
from retry import retry
from base import BaseCore
from requests.packages.urllib3.exceptions import InsecureRequestWarning
import BaseCore
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
obsClient = ObsClient(
access_key_id='VEHN7D0TJ9316H8AHCAV', # 你的华为云的ak码
secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk
......@@ -25,8 +24,8 @@ obsClient = ObsClient(
@retry(tries=5, delay=5)
def getReq(url, headers):
req = requests.get(url, headers=headers, verify=False, timeout=20)
def getReq(url,headers):
req = requests.get(url, headers=headers, verify=False,timeout=20)
return req
......@@ -54,20 +53,20 @@ def tableUpdate(year, name_pdf, type_id, item_id, group_name, path, full_path, c
return pdf_id
def uptoOBS(title, pdf_url, type_id, pathType, headers):
def uptoOBS(title, pdf_url, type_id, pathType,headers):
retData = {'state': False, 'type_id': type_id, 'group_name': '', 'path': '',
'full_path': '',
'category': 'pdf', 'file_size': '', 'status': 1, 'create_by': 'XueLingKun',
'create_time': '', 'page_size': '', 'content': ''}
try:
response = getReq(pdf_url, headers)
response = getReq(pdf_url,headers)
except:
log.info(f'{title}===附件请求失败')
return False
try:
file_size = int(response.headers.get('Content-Length'))
except:
file_size = int(len(response.content))
file_size = 0
page_size = 0
for i in range(0, 3):
try:
......@@ -157,8 +156,37 @@ def paserUrl(html, listurl):
return html
def getSidName(sid):
sqlSelect = f"SELECT web_site_name FROM `info_source` WHERE id = '{sid}'"
baseCore.cursor_.execute(sqlSelect)
data = baseCore.cursor_.fetchone()
return data
def Monitor(dic):
sidName = baseCore.getSidName(dic['sid'])
monitor = {
"title": dic['title'], # 标题
"sourceAddress": dic['sourceAddress'], # 原文链接
"uniqueCode": dic['uniqueCode'], # 唯一编码 采集类型+6位日期+服务器序列+线程序列+自定义数字
"operateType": "DATA_CRAWLER", # 操作类型 写死
"handlerBody": {
"success": True, # 处理成功或失败状态 写死
"handlerStatus": "CRAWLED" # 处理状态 写死
},
"source": {
"sourceId": dic['sid'], # 信息源Id
"sourceName": sidName, # 信息源名称
"sourceType": 16, # 信息源类型 sourceType枚举字典
},
"processTime": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), # 处理时间 yyyy-MM-dd HH:mm:ss
"server": {
"serverIp": "192.168.1.232", # 所在服务器IP
"serverHostName": "数据采集服务", # 服务器名称
"processId": baseCore.getPID() # 进程Id
}
}
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 7, 0))
try:
kafka_result = producer.send("data-crawler", json.dumps(monitor, ensure_ascii=False).encode('utf8'))
log.info('监控数据发送Kafka成功')
except Exception as e:
monitor = json.dumps(monitor, ensure_ascii=False)
monitorDic = {
'lifecycle_data_crawler': monitor
}
baseCore.r.xadd('data_lifecycle_log_data_crawler-redis', monitorDic, id='*')
log.info('数据监控发送Kafka失败,已放置Redis中')
......@@ -13,9 +13,9 @@ from bs4 import BeautifulSoup
from kafka import KafkaProducer
from obs import ObsClient
from retry import retry
from base import BaseCore
import BaseCore
from requests.packages.urllib3.exceptions import InsecureRequestWarning
from download import download, paserUrl, sendKafka
from download import download, paserUrl, sendKafka,Monitor
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
......@@ -54,7 +54,7 @@ def is_member_containing_string(string):
@retry(tries=5, delay=5)
def getReq(url):
req = requests.get(url, headers=headers, verify=False, proxies=baseCore.get_proxy(), timeout=20)
req = requests.get(url, headers=headers, verify=False, timeout=20)
return req
......@@ -106,26 +106,27 @@ def dicNewAndStatements(sid, url, title, publishDate):
contentWithTag = soup.find('div', class_='l-column--large').find('section', class_='frame-type-textmedia').find('div', class_='ce-bodytext')
content = contentWithTag.text
lang = baseCore.detect_language(content)
uniqueCode = baseCore.getUniqueCode('AB','232')
dic_news = {
'attachmentIds': attachmentIds,
'author': '',
'content': content,
'contentWithTag': str(contentWithTag),
'createDate': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': lang,
'origin': 'Kiel Institute for the World Economy',
'publishDate': publishDate,
'sid': sid,
'sourceAddress': url,
'summary': '',
'title': title,
'type': '0'
'source': 16,
'uniqueCode': uniqueCode,
}
if sendKafka(dic_news):
r.sadd('ZKresearchReport:Kiel Institute for the World Economy', url)
try:
Monitor(dic_news)
except Exception as e:
log.error(f'数据监控发送失败==={e}')
else:
log.info(f'{title}===采集失败')
......@@ -138,7 +139,7 @@ def dic(sid, url, title, publishDate, pdfHref):
soup = paserUrl(soup, url)
contentWithTag = soup.find('div', class_='description')
if pdfHref:
attachmentId = download(pdfHref, title, publishDate,headers)
attachmentId = download(pdfHref, title, publishDate,headers,'Kiel Institute for the World Economy')
if not attachmentId:
raise
attachmentIds.append(attachmentId)
......@@ -147,26 +148,28 @@ def dic(sid, url, title, publishDate, pdfHref):
contentWithTag.append(divTag)
content = contentWithTag.text
lang = baseCore.detect_language(content)
uniqueCode = baseCore.getUniqueCode('AB','232')
dic_news = {
'attachmentIds': attachmentIds,
'author': '',
'content': content,
'contentWithTag': str(contentWithTag),
'createDate': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': lang,
'origin': 'Kiel Institute for the World Economy',
'publishDate': publishDate,
'sid': sid,
'sourceAddress': url,
'summary': '',
'title': title,
'type': '0'
'source': 16,
'uniqueCode': uniqueCode,
}
if sendKafka(dic_news):
r.sadd('ZKresearchReport:Kiel Institute for the World Economy', url)
try:
Monitor(dic_news)
except Exception as e:
log.error(f'数据监控发送失败==={e}')
else:
log.info(f'{title}===采集失败')
......@@ -226,9 +229,12 @@ def policyContributions(sid, section):
liList = section.find_all('li', class_='teaserList__item')
for liTag in liList:
try:
title = liTag.find('a', class_='vertical-teaser__title').text.strip()
try:
title = liTag.find('a', class_='vertical-teaser__title').text.strip()
except:
title = liTag.find('h3', class_='vertical-teaser__title').text.strip()
except:
title = liTag.find('h3', class_='vertical-teaser__title').text.strip()
continue
href = liTag.find('a').get('href')
if 'https:' not in href:
href = 'https://www.ifw-kiel.de' + href
......@@ -1196,31 +1202,32 @@ class InternationalFinance():
href = soup.find('div', class_='document-title__sharing-bar-wrapper').find('a').get('href')
if 'https:' not in href:
href = 'https://www.ifw-kiel.de' + href
attachmentId = download(href, title, publishDate,headers)
attachmentId = download(href, title, publishDate,headers,'Kiel Institute for the World Economy')
if not attachmentId:
raise
attachmentIds.append(attachmentId)
lang = baseCore.detect_language(content)
uniqueCode = baseCore.getUniqueCode('AB','232')
dic_news = {
'attachmentIds': attachmentIds,
'author': '',
'content': content,
'contentWithTag': str(contentWithTag),
'createDate': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': lang,
'origin': 'Kiel Institute for the World Economy',
'publishDate': publishDate,
'sid': sid,
'sourceAddress': url,
'summary': '',
'title': title,
'type': '0'
'source': 16,
'uniqueCode':uniqueCode,
}
if sendKafka(dic_news):
r.sadd('ZKresearchReport:Kiel Institute for the World Economy', url)
try:
Monitor(dic)
except Exception as e:
log.error(f'数据监控发送失败==={e}')
else:
log.info(f'{title}===采集失败')
......@@ -1243,7 +1250,7 @@ class InternationalFinance():
try:
self.dicDataSets(sid, href, title, publishDate)
except:
log.info(f'{title}===采失败')
log.info(f'{title}===采失败')
def doJob(self):
sidA = '1534699364544237569'
......@@ -1252,7 +1259,7 @@ class InternationalFinance():
url = 'https://www.ifw-kiel.de/institute/research-centers/international-finance'
req = getReq(url)
soup = BeautifulSoup(req.text, 'lxml')
section = soup.find('div', class_='document-topic__main').find('div', class_='document-topic__content').select('#c76450')[0]
section = soup.find('div', class_='document-topic__main').find('div', class_='document-topic__content').find('section',class_='coGridelements_pi1')
divList = section.find_all('div', class_='m-tab')
academicResearch(sidA, divList[0].select('section')[0])
#self.moreAcademicResearch(sidA)
......@@ -1261,6 +1268,26 @@ class InternationalFinance():
self.dataSets(sidC, divList[4])
def doJob():
China = China()
China.doJob()
Germany = Germany()
Germany.doJob()
EuropeanUnion = EuropeanUnion()
EuropeanUnion.doJob()
ClimateAndEnergy = ClimateAndEnergy()
ClimateAndEnergy.doJob()
Africa = Africa()
Africa.doJob()
EconomicOutlook = EconomicOutlook()
EconomicOutlook.doJob()
Ukraine = Ukraine()
Ukraine.doJob()
InternationalTrade = InternationalTrade()
InternationalTrade.doJob()
InternationalFinance = InternationalFinance()
InternationalFinance.doJob()
if __name__ == '__main__':
China = China()
China.doJob()
......
......@@ -6,7 +6,7 @@ import time
import traceback
import uuid
from urllib.parse import urljoin
from download import getSidName
import redis
from kafka import KafkaProducer
from obs import ObsClient
......@@ -19,11 +19,16 @@ from selenium.webdriver.edge.options import Options
from selenium.webdriver.edge.service import Service
import requests
from bs4 import BeautifulSoup
from base import BaseCore
from requests.packages.urllib3.exceptions import InsecureRequestWarning
import sys
sys.path.append(r'D:\zzsn_spider\base')
import BaseCore
# 禁用安全请求警告
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
obsClient = ObsClient(
access_key_id='VEHN7D0TJ9316H8AHCAV', # 你的华为云的ak码
......@@ -32,15 +37,53 @@ obsClient = ObsClient(
)
def Monitor(dic):
sidName = getSidName(dic['sid'])
monitor = {
"title": dic['title'], # 标题
"sourceAddress": dic['sourceAddress'], # 原文链接
"uniqueCode": dic['uniqueCode'], # 唯一编码 采集类型+6位日期+服务器序列+线程序列+自定义数字
"operateType": "DATA_CRAWLER", # 操作类型 写死
"handlerBody": {
"success": True, # 处理成功或失败状态 写死
"handlerStatus": "CRAWLED" # 处理状态 写死
},
"source": {
"sourceId": dic['sid'], # 信息源Id
"sourceName": sidName, # 信息源名称
"sourceType": 16, # 信息源类型 sourceType枚举字典
},
"processTime": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), # 处理时间 yyyy-MM-dd HH:mm:ss
"server": {
"serverIp": "94.74.96.195", # 所在服务器IP
"serverHostName": "数据采集服务", # 服务器名称
"processId": baseCore.getPID() # 进程Id
}
}
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 7, 0))
try:
kafka_result = producer.send("data-crawler", json.dumps(monitor, ensure_ascii=False).encode('utf8'))
log.info('监控数据发送Kafka成功')
except Exception as e:
monitor = json.dumps(monitor, ensure_ascii=False)
monitorDic = {
'lifecycle_data_crawler': monitor
}
baseCore.r.xadd('data_lifecycle_log_data_crawler-redis', monitorDic, id='*')
log.info('数据监控发送Kafka失败,已放置Redis中')
def buildDriver():
edge_service = Service(r'F:\spider\117\msedgedriver.exe')
# ip = baseCore.get_proxy()
edge_service = Service(r'D:\soft\msedgedriver.exe')
edge_options = Options()
# 开启开发者模式
edge_options.add_experimental_option('excludeSwitches', ['enable-automation'])
# 禁用启用Blink运行时的功能
edge_options.add_argument('--disable-blink-features=AutomationControlled')
edge_options.add_argument('--start-maximized')
# edge_options.add_argument(f'--proxy-server={ip["http"]}')
# edge_options.add_argument('--proxy-server=%s' % ip['http'])
# prefs = {"profile.managed_default_content_settings.images": 2, 'permissions.default.stylesheet': 2}
# edge_options.add_experimental_option("prefs", prefs)
driver = webdriver.Edge(service=edge_service, options=edge_options)
return driver
......@@ -157,7 +200,7 @@ def uptoOBS(title, pdf_url, type_id, pathType, category):
try:
file_size = int(response.headers.get('Content-Length'))
except:
file_size = int(len(response.content))
file_size = 0
page_size = 0
for i in range(0, 3):
try:
......@@ -239,30 +282,27 @@ def getDic(url, title, publishDate):
return {}
attachmentIds.append(attachmentId)
lang = baseCore.detect_language(content)
uniqueCode = baseCore.getUniqueCode('AD', '195')
dic_news = {
'attachmentIds': attachmentIds,
'author': '',
'content': content,
'contentWithTag': str(contentWithTag),
'createDate': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': lang,
'origin': 'International Institute for Applied Systems Analysis',
'publishDate': publishDate,
'sid': '1798649911749525505',
'sourceAddress': url,
'summary': '',
'title': title,
'type': '0'
'source': 16,
'uniqueCode': uniqueCode,
}
return dic_news
def doJob():
driver = buildDriver()
for page in range(0, 57):
for page in range(0, 3):
log.info(f'开始采集第{page + 1}页数据')
driver.get(f'https://iiasa.ac.at/news?page={page}')
WebDriverWait(driver, 20).until(
......@@ -292,16 +332,14 @@ def doJob():
except:
log.info(f'{title}===数据获取失败')
continue
if sendKafka(dic):
baseCore.r.sadd('ZKresearchReport:International Institute for Applied Systems Analysis', href)
if dic:
if sendKafka(dic):
baseCore.r.sadd('ZKresearchReport:International Institute for Applied Systems Analysis', href)
Monitor(dic)
time.sleep(3)
driver.close()
if __name__ == '__main__':
# doJob()
url = 'https://static1.squarespace.com/static/633458017a1ae214f3772c76/t/665ed1e2b9d34b2bf8e17c63/1717490167773/The-State-of-Carbon-Dioxide-Removal-2Edition.pdf'
req = getPDFReq(url)
with open('./d.pdf', 'wb') as f:
f.write(req.content)
doJob()
baseCore.close()
......@@ -5,10 +5,13 @@ import time
import requests
from bs4 import BeautifulSoup
from kafka import KafkaProducer
from retry import retry
import sys
from base import BaseCore
from download import download, sendKafka, paserUrl
sys.path.append(r'D:\zzsn_spider\base')
import BaseCore
from download import download, sendKafka, paserUrl, getSidName
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
......@@ -26,6 +29,42 @@ headers = {
}
def Monitor(dic):
sidName = getSidName(dic['sid'])
monitor = {
"title": dic['title'], # 标题
"sourceAddress": dic['sourceAddress'], # 原文链接
"uniqueCode": dic['uniqueCode'], # 唯一编码 采集类型+6位日期+服务器序列+线程序列+自定义数字
"operateType": "DATA_CRAWLER", # 操作类型 写死
"handlerBody": {
"success": True, # 处理成功或失败状态 写死
"handlerStatus": "CRAWLED" # 处理状态 写死
},
"source": {
"sourceId": dic['sid'], # 信息源Id
"sourceName": sidName, # 信息源名称
"sourceType": 16, # 信息源类型 sourceType枚举字典
},
"processTime": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), # 处理时间 yyyy-MM-dd HH:mm:ss
"server": {
"serverIp": "94.74.96.195", # 所在服务器IP
"serverHostName": "数据采集服务", # 服务器名称
"processId": baseCore.getPID() # 进程Id
}
}
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 7, 0))
try:
kafka_result = producer.send("data-crawler", json.dumps(monitor, ensure_ascii=False).encode('utf8'))
log.info('监控数据发送Kafka成功')
except Exception as e:
monitor = json.dumps(monitor, ensure_ascii=False)
monitorDic = {
'lifecycle_data_crawler': monitor
}
baseCore.r.xadd('data_lifecycle_log_data_crawler-redis', monitorDic, id='*')
log.info('数据监控发送Kafka失败,已放置Redis中')
def is_member_containing_string(string):
cursor = '0'
while True:
......@@ -105,40 +144,37 @@ def getDic(url, title, publishDate, dataJson, buildId):
log.info(f'{title}===附件下载失败')
return {}
attachmentIds.append(attachmentId)
uniqueCode = baseCore.getUniqueCode('AE', '195')
dic_news = {
'attachmentIds': attachmentIds,
'author': '',
'content': content,
'contentWithTag': str(contentWithTag),
'createDate': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': lang,
'origin': 'International Institute for Applied Systems Analysis',
'publishDate': publishDate,
'sid': '1782224433010618369',
'sourceAddress': url,
'summary': '',
'title': title,
'type': '0'
'source': 16,
'uniqueCode': uniqueCode,
}
return dic_news
@retry(tries=5, delay=5)
def getBuildId():
url = 'https://itif.org/publications/?spublicationTypes=features%2Cblogs%2Creports-briefings%2Ctestimonies-filings'
req = requests.get(url, headers=headers)
soup = BeautifulSoup(req.text, 'html.parser')
script = soup.find('body').find('script', attrs={'id': '__NEXT_DATA__'}).text
script = json.loads(script)
print(script)
buildId = script['buildId']
return buildId
def doJob():
for num in range(0, 3351, 50):
for num in range(0, 200, 50):
buildId = getBuildId()
url = f'https://03hnmfyj.apicdn.sanity.io/v2021-08-11/data/query/production?query=%0A%20%20%20%20%7B%0A%20%20%20%20%20%22searchResults%22%3A%20*%5B%0A%20%20%20%20%20%20_type%20in%20%5B%27publications%27%5D%20%26%26%0A%20%20%20%20%20%20!(_id%20in%20path(%22drafts.**%22))%20%0A%20%20%20%20%20%20%0A%20%20%20%20%20%20%20%20%20%0A%20%20%20%20%20%20%20%20%0A%20%20%20%20%20%20%0A%20%20%20%20%20%20%0A%20%20%20%20%20%20%0A%20%20%20%20%20%20%20%26%26%20references(*%5B_type%3D%3D%22publication-types%22%20%26%26%20slug.current%20in%20%5B%27features%27%2C%27blogs%27%2C%27reports-briefings%27%2C%27testimonies-filings%27%5D%5D._id)%0A%20%20%20%20%20%20%5D%20%7C%20order(date%20desc%2C%20_createdAt%20desc)%20%0A%20%20%20%20%5B{num}...{num + 50}%5D%0A%20%20%20%20%7B_id%2C%20title%2C%20date%2C%20summary%2C%20externalURL%2C%20%22excerpt%22%3A%20pt%3A%3Atext(body%5B_type%20%3D%3D%20%22block%22%5D%5B0%5D)%2C%20slug%2C%20_type%2C%22publicationTypes%22%3A%20publicationTypes%5B%5D%20-%3E%20title%2C%20publicationBrand%20-%3E%20%7B_type%2C%20title%2C%20slug%7D%7D%2C%0A%20%20%20%20%22count%22%3A%20count(*%5B%0A%20%20%20%20%20%20_type%20in%20%5B%27publications%27%5D%20%26%26%0A%20%20%20%20%20%20!(_id%20in%20path(%22drafts.**%22))%20%0A%20%20%20%20%20%20%20%20%20%20%0A%20%20%20%20%20%20%20%20%0A%20%20%20%20%20%20%0A%20%20%20%20%20%20%0A%20%20%20%20%20%20%0A%20%20%20%20%20%20%20%26%26%20references(*%5B_type%3D%3D%22publication-types%22%20%26%26%20slug.current%20in%20%5B%27features%27%2C%27blogs%27%2C%27reports-briefings%27%2C%27testimonies-filings%27%5D%5D._id)%0A%20%20%20%20%20%20%5D%7B_id%7D)%0A%20%20%20%20%7D%0A%20%20%20%20'
datasJson = getDataJson(url)['result']['searchResults']
......@@ -157,9 +193,11 @@ def doJob():
if dic:
if sendKafka(dic):
baseCore.r.sadd('ZKresearchReport:Information Technology and Innovation Foundation', href)
Monitor(dic)
time.sleep(1)
time.sleep(5)
if __name__ == '__main__':
doJob()
baseCore.close()
......@@ -11,8 +11,8 @@ from kafka import KafkaProducer
from obs import ObsClient
from retry import retry
from requests.packages.urllib3.exceptions import InsecureRequestWarning
from base import BaseCore
from download import Monitor
import BaseCore
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
baseCore = BaseCore.BaseCore()
......@@ -97,7 +97,7 @@ def getReq(url):
def sendKafka(dic_news):
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 7, 0))
kafka_result = producer.send("researchReportStudyTopic", json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
kafka_result = producer.send("crawlerInfo", json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
log.info(kafka_result.get(timeout=10))
log.info(f'传输成功:{dic_news["title"]}==={dic_news["publishDate"]}')
......@@ -132,7 +132,7 @@ def uptoOBS(title, pdf_url, type_id, pathType):
try:
file_size = int(response.headers.get('Content-Length'))
except:
file_size = int(len(response.content))
file_size = 0
page_size = 0
for i in range(0, 3):
try:
......@@ -174,7 +174,7 @@ def download(url, title, publishDate):
create_time = time_now
try:
att_id = tableUpdate(year, title, 4, '', '', path, full_path, 'pdf', file_size, 1, status,
'LiuLiYuan', create_time, 'RAND Corporation', page_size)
'LiuLiYuan', create_time, 'Korea Institute for International Economic Policy', page_size)
except Exception as e:
log.info(f'{title}===附件插入sql失败=={e}')
return False
......@@ -218,23 +218,21 @@ def getDic(sid, url, title, publishDate, pdfHref, dataPost):
if not attachmentId:
raise
attachmentIds.append(attachmentId)
uniqueCode = baseCore.getUniqueCode('AC', '232')
dic_news = {
'attachmentIds': attachmentIds,
'author': '',
'content': content,
'contentWithTag': str(contentWithTag),
'createDate': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': lang,
'origin': 'Korea Institute for International Economic Policy',
'publishDate': publishDate,
'sid': sid,
'sourceAddress': url,
'summary': '',
'title': title,
'type': '0'
'source': 16,
'uniqueCode': uniqueCode,
}
return dic_news
......@@ -250,30 +248,27 @@ def getDicWorldEconomyBrief(sid, url, title, publishDate, pdfHref):
if not attachmentId:
raise
attachmentIds.append(attachmentId)
uniqueCode = baseCore.getUniqueCode('AC', '232')
dic_news = {
'attachmentIds': attachmentIds,
'author': '',
'content': content,
'contentWithTag': str(contentWithTag),
'createDate': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': lang,
'origin': 'Korea Institute for International Economic Policy',
'publishDate': publishDate,
'sid': sid,
'sourceAddress': url,
'summary': '',
'title': title,
'type': '0'
'source': 16,
'uniqueCode': uniqueCode,
}
return dic_news
def WorldEconomyBrief():
sid = '1777266535681835010'
for page in range(1, 45):
for page in range(1, 3):
dataPost = {
'mid': 'a20301000000',
'bid': '0007',
......@@ -312,12 +307,16 @@ def WorldEconomyBrief():
continue
if sendKafka(dic):
baseCore.r.sadd(f'ZKresearchReport:Korea Institute for International Economic Policy', href)
try:
Monitor(dic)
except Exception as e:
log.error(f'数据监控发送失败==={e}')
time.sleep(5)
def PolicyAnalyses():
sid = '1777267444407156738'
for page in range(49, 156):
for page in range(1, 3):
dataPost = {
'mid': 'a20303000000',
'bid': '0001',
......@@ -360,12 +359,16 @@ def PolicyAnalyses():
continue
if sendKafka(dic):
baseCore.r.sadd(f'ZKresearchReport:Korea Institute for International Economic Policy', href)
try:
Monitor(dic)
except Exception as e:
log.error(f'数据监控发送失败==={e}')
time.sleep(3)
def PolicyReference():
sid = '1777269053975511042'
for page in range(1, 68):
for page in range(1, 3):
dataPost = {
'mid': 'a20304000000',
'bid': '0001',
......@@ -408,12 +411,16 @@ def PolicyReference():
continue
if sendKafka(dic):
baseCore.r.sadd(f'ZKresearchReport:Korea Institute for International Economic Policy', href)
try:
Monitor(dic)
except Exception as e:
log.error(f'数据监控发送失败==={e}')
time.sleep(3)
def WorkingPapers():
sid = '1777269660161490945'
for page in range(1, 34):
for page in range(1, 3):
dataPost = {
'mid': 'a20305000000',
'bid': '0001',
......@@ -457,12 +464,16 @@ def WorkingPapers():
continue
if sendKafka(dic):
baseCore.r.sadd(f'ZKresearchReport:Korea Institute for International Economic Policy', href)
try:
Monitor(dic)
except Exception as e:
log.error(f'数据监控发送失败==={e}')
time.sleep(3)
def ConferenceProceedings():
sid = '1777270152996405250'
for page in range(1, 10):
for page in range(1, 3):
dataPost = {
'mid': 'a20306000000',
'bid': '0001',
......@@ -506,11 +517,23 @@ def ConferenceProceedings():
continue
if sendKafka(dic):
baseCore.r.sadd(f'ZKresearchReport:Korea Institute for International Economic Policy', href)
try:
Monitor(dic)
except Exception as e:
log.error(f'数据监控发送失败==={e}')
time.sleep(3)
def doJob():
WorldEconomyBrief()
PolicyAnalyses()
PolicyReference()
WorkingPapers()
ConferenceProceedings()
if __name__ == '__main__':
# WorldEconomyBrief()
WorldEconomyBrief()
PolicyAnalyses()
PolicyReference()
WorkingPapers()
......
......@@ -174,6 +174,7 @@ def doJob():
if dic:
if sendKafka(dic):
baseCore.r.sadd('ZKresearchReport:Observer Research Foundation', href)
Monitor(dic)
time.sleep(0.5)
time.sleep(2)
time.sleep(2)
......
......@@ -10,10 +10,11 @@ from kafka import KafkaProducer
from obs import ObsClient
from retry import retry
from base import BaseCore
import BaseCore
import datetime
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
from download import Monitor
r = redis.Redis(host="114.116.90.53", port=6380, password='clbzzsn', db=6)
# 禁用安全请求警告
......@@ -223,23 +224,22 @@ def getDic(url, title, publishDate):
except:
pass
content = contentWithTag.text
lang = baseCore.detect_language(content)
uniqueCode = baseCore.getUniqueCode('AD', '232')
dic_news = {
'attachmentIds': attachmentIds,
'author': '',
'content': content,
'contentWithTag': str(contentWithTag),
'createDate': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': '',
'lang': lang,
'origin': 'RAND Corporation',
'publishDate': publishDate,
'sid': '1798266329902235650',
'sourceAddress': url,
'summary': '',
'title': title,
'type': '0'
'source': 16,
'uniqueCode': uniqueCode,
}
return dic_news
......@@ -278,6 +278,10 @@ def doJob():
if dic:
if sendKafka(dic):
r.sadd('ZKresearchReport:RAND Corporation', href)
try:
Monitor(dic)
except Exception as e:
log.error(f'数据监控发送失败==={e}')
time.sleep(3)
......
......@@ -8,8 +8,8 @@ from bs4 import BeautifulSoup
from kafka import KafkaProducer
from retry import retry
from download import paserUrl, download, getSidName, sendKafka
from base import BaseCore
from download import paserUrl, download, sendKafka,Monitor
import BaseCore
baseCore = BaseCore.BaseCore(sqlflg=False)
log = baseCore.getLogger()
......@@ -59,40 +59,40 @@ def getSoup(url):
return soup
def Monitor(dic):
sidName = getSidName(dic['sid'])
monitor = {
"title": dic['title'], # 标题
"sourceAddress": dic['sourceAddress'], # 原文链接
"uniqueCode": dic['uniqueCode'], # 唯一编码 采集类型+6位日期+服务器序列+线程序列+自定义数字
"operateType": "DATA_CRAWLER", # 操作类型 写死
"handlerBody": {
"success": True, # 处理成功或失败状态 写死
"handlerStatus": "CRAWLED" # 处理状态 写死
},
"source": {
"sourceId": dic['sid'], # 信息源Id
"sourceName": sidName, # 信息源名称
"sourceType": 16, # 信息源类型 sourceType枚举字典
},
"processTime": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), # 处理时间 yyyy-MM-dd HH:mm:ss
"server": {
"serverIp": "192.168.1.231", # 所在服务器IP
"serverHostName": "数据采集服务", # 服务器名称
"processId": baseCore.getPID() # 进程Id
}
}
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 7, 0))
try:
kafka_result = producer.send("data-crawler", json.dumps(monitor, ensure_ascii=False).encode('utf8'))
log.info('监控数据发送Kafka成功')
except Exception as e:
monitor = json.dumps(monitor, ensure_ascii=False)
monitorDic = {
'lifecycle_data_crawler': monitor
}
baseCore.r.xadd('data_lifecycle_log_data_crawler-redis', monitorDic, id='*')
log.info('数据监控发送Kafka失败,已放置Redis中')
#def Monitor(dic):
# sidName = getSidName(dic['sid'])
# monitor = {
# "title": dic['title'], # 标题
# "sourceAddress": dic['sourceAddress'], # 原文链接
# "uniqueCode": dic['uniqueCode'], # 唯一编码 采集类型+6位日期+服务器序列+线程序列+自定义数字
# "operateType": "DATA_CRAWLER", # 操作类型 写死
# "handlerBody": {
# "success": True, # 处理成功或失败状态 写死
# "handlerStatus": "CRAWLED" # 处理状态 写死
# },
# "source": {
# "sourceId": dic['sid'], # 信息源Id
# "sourceName": sidName, # 信息源名称
# "sourceType": 16, # 信息源类型 sourceType枚举字典
# },
# "processTime": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), # 处理时间 yyyy-MM-dd HH:mm:ss
# "server": {
# "serverIp": "192.168.1.232", # 所在服务器IP
# "serverHostName": "数据采集服务", # 服务器名称
# "processId": baseCore.getPID() # 进程Id
# }
# }
# producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 7, 0))
# try:
# kafka_result = producer.send("data-crawler", json.dumps(monitor, ensure_ascii=False).encode('utf8'))
# log.info('监控数据发送Kafka成功')
# except Exception as e:
# monitor = json.dumps(monitor, ensure_ascii=False)
# monitorDic = {
# 'lifecycle_data_crawler': monitor
# }
# baseCore.r.xadd('data_lifecycle_log_data_crawler-redis', monitorDic, id='*')
# log.info('数据监控发送Kafka失败,已放置Redis中')
def getDic(url, title, publishDate):
......@@ -144,7 +144,7 @@ def getDic(url, title, publishDate):
def doJob():
for page in range(0, 87):
for page in range(0, 3):
url = f'https://unctad.org/views/ajax?f[0]=sitemap%3A910&_wrapper_format=drupal_ajax&view_name=unctad_publications_search&view_display_id=page_1&view_args=publication%2Fall%2Fall&view_path=%2Fpublications-search&view_base_path=publications-search&view_dom_id=b5cccd9d1ae1a3a43997ba344d149a54fcca3c4a85ff3c3a707fccbe4ae5b1e3&pager_element=0&f%5B0%5D=sitemap%3A910&page={page}&_drupal_ajax=1&ajax_page_state%5Btheme%5D=newyork_b5&ajax_page_state%5Btheme_token%5D=&ajax_page_state%5Blibraries%5D=eJxtkNGOhDAIRX-o2qf9ngYKdjq2xUAd179f4ySbXZ0XAueQGwKKdOsKS0BQzeJRGSjqWtHh1U2i9U5TEYQyWN9LbunuDzrbHVc2g8QWSk6P7uLMlLtogBhFKUvzv904qbTOjRzxK2CROIeGHui5Wg-P42DW_6pCLkfwYW1Fi5qR3QSRu3nSdYEyvqeRVBaSrQ1bpsT981Lixgrls3xl3myAJ3y7JJIKB2hQ9p6j-StwjbdddA74dX2b7da5egRjd0b6s45n8F9QhdbCPxnLrPo'
datasJson = getDataJson(url)
for dataJson in datasJson:
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论