提交 c8758796 作者: LiuLiYuan

欧洲联盟 02/05

上级 785f3d85
import json
import time
import pymongo
import requests
import datetime
import pytz
from bs4 import BeautifulSoup
from kafka import KafkaProducer
from retry import retry
from base import BaseCore
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
db_storage = pymongo.MongoClient('mongodb://114.115.221.202:27017', username='admin', password='ZZsn@9988').ZZSN[
'国外智库']
headers = {
'Accept': '*/*',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh-TW;q=0.9,zh;q=0.8',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Cookie': '_pk_ses.a441974e-4574-4f0a-9ed2-f90fb8547bb7.9d4d=*; _pk_id.a441974e-4574-4f0a-9ed2-f90fb8547bb7.9d4d=27c6cb9ba359af3e.1707027768.1.1707028382.1707027769.; cck1=%7B%22cm%22%3Atrue%2C%22all1st%22%3Atrue%2C%22closed%22%3Atrue%7D',
'Host': 'ec.europa.eu',
'Pragma': 'no-cache',
'Referer': 'https://ec.europa.eu/eurostat/databrowser/explore/all/tb_eu?lang=en&display=list&sort=category',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36 Edg/121.0.0.0',
'sec-ch-ua': '"Not A(Brand";v="99", "Microsoft Edge";v="121", "Chromium";v="121"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
}
@retry(tries=2, delay=5)
def sendKafka(dic):
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], max_request_size=1024 * 1024 * 20)
kafka_result = producer.send("research_center_fourth",
json.dumps(dic, ensure_ascii=False).encode('utf8'))
log.info(f'{dic["sourceAddress"]}传输成功')
def get1stCode():
codeList = []
url = 'https://ec.europa.eu/eurostat/databrowser-backend/api/public/navtree/en/getFullTree?showProduct=all'
req = requests.get(url, headers=headers)
req.encoding = req.apparent_encoding
datas = req.json()['numFlowInCategories']
for data in datas:
if 'tb_eu.' in data:
code = data.split('.')[-1]
codeList.append(code)
req.close()
return codeList
def get2ndCode(code):
codeList = []
url = f'https://ec.europa.eu/eurostat/databrowser-backend/api/public/navtree/en/getProductPositionForCategory/tb_eu/{code}?notFullId=true&showProduct=all'
req = requests.get(url, headers=headers)
req.encoding = req.apparent_encoding
try:
datas = req.json()
for data in datas:
dataBrowserLink = data['dataProduct']['dataBrowserLink']
code_ = dataBrowserLink.split('/')[-1]
codeList.append(code_)
except:
pass
time.sleep(3)
req.close()
return codeList
def save_data(dic_news):
aaa_dic = {
'附件id': '',
'网址': dic_news['sourceAddress'],
'tid': '',
'来源': f"欧洲联盟",
'创建时间': dic_news['createDate'],
'带标签内容': dic_news['contentWithTag'][:100],
'发布时间': dic_news['publishDate'],
'标题': dic_news['title']
}
db_storage.insert_one(aaa_dic)
def doJob():
codeList = get1stCode()
for code in codeList:
try:
codeList_ = get2ndCode(code)
except:
log.error('2ndCode获取失败')
time.sleep(2)
continue
for code_ in codeList_:
url = f'https://ec.europa.eu/eurostat/databrowser-backend/api/card/1.0/LIVE/en/{code_}/flow'
try:
req = requests.get(url, headers=headers)
req.encoding = req.apparent_encoding
dataJson = req.json()
except:
log.error(f'{code}==={url}===数据请求失败')
time.sleep(5)
continue
try:
updateDate = dataJson['updateData']
updateDate = datetime.datetime.strptime(updateDate, "%Y-%m-%dT%H:%M:%S%z")
updateDate_tz = pytz.timezone("Asia/Shanghai")
updateDate = updateDate.astimezone(updateDate_tz)
publishDate = updateDate.strftime('%Y-%m-%d %H:%M:%S')
title = dataJson['label']
content = dataJson['description']
contentWithTag = str(BeautifulSoup(content, 'html.parser'))
href = f'https://ec.europa.eu/eurostat/databrowser/view/{code_}/default/table?lang=en'
is_href = db_storage.find_one({'网址': href})
if is_href:
log.info(f'{href}===已采集')
continue
now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
lang = baseCore.detect_language(content)
dic = {
'id':f'1620244462491893761{int(time.time())}',
'subjectId':'1620244462491893761',
'checkStatus':1,
'deleteFlag':0,
'topNum':0,
'content':content,
'contentWithTag':contentWithTag,
'createDate':now,
'labels':[{'labelMark':'organization','relationId':'1619903623295033346','relationName':'欧洲联盟'}],
'lang':lang,
'origin':'欧洲联盟',
'publishDate':publishDate,
'sourceAddress':href,
'title':title,
'updateDate':now,
}
try:
sendKafka(dic)
try:
save_data(dic)
except:
log.error(f'{href}===数据库保存失败')
except:
log.error(f'{href}===kafka发送失败')
except:
pass
req.close()
time.sleep(3)
log.info(f'{code}===已采集完')
if __name__ == '__main__':
doJob()
baseCore.close()
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论