提交 7fc3194e 作者: 刘伟刚

Merge remote-tracking branch 'origin/master'

# Conflicts:
#	tmp/usVsRussia/pravo.py
...@@ -19,12 +19,14 @@ from openpyxl import Workbook ...@@ -19,12 +19,14 @@ from openpyxl import Workbook
import langid import langid
#创建连接池 #创建连接池
import pymysql
from pymysql import connections from pymysql import connections
from DBUtils.PooledDB import PooledDB from DBUtils.PooledDB import PooledDB
import pymysql # import sys
# sys.path.append('D://zzsn_spider//base//fdfs_client')
from fdfs_client.client import get_tracker_conf, Fdfs_client from fdfs_client.client import get_tracker_conf, Fdfs_client
tracker_conf = get_tracker_conf('./client.conf') tracker_conf = get_tracker_conf('E:\\kkwork\\zzsn_spider\\base\\client.conf')
client = Fdfs_client(tracker_conf) client = Fdfs_client(tracker_conf)
# 注意 程序退出前 调用BaseCore.close() 关闭相关资源 # 注意 程序退出前 调用BaseCore.close() 关闭相关资源
......
...@@ -12,7 +12,7 @@ r = basecore.r ...@@ -12,7 +12,7 @@ r = basecore.r
def cnn11(): def cnn11():
#11数据库 #11数据库
cnx_ = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4') cnx_ = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
cursor_ = cnx_.cursor() cursor_ = cnx_.cursor()
return cnx_,cursor_ return cnx_,cursor_
def close11(cnx_,cursor_): def close11(cnx_,cursor_):
...@@ -22,7 +22,7 @@ def close11(cnx_,cursor_): ...@@ -22,7 +22,7 @@ def close11(cnx_,cursor_):
# # 连接到Redis # # 连接到Redis
# r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=6) # r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=6)
# #
# cnx = pymysql.connect(host='114.115.159.144', user='root', password='zzsn9988', db='caiji', # cnx = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='caiji',
# charset='utf8mb4') # charset='utf8mb4')
# cursor = cnx.cursor() # cursor = cnx.cursor()
...@@ -320,7 +320,7 @@ def FBS(): ...@@ -320,7 +320,7 @@ def FBS():
if not r.exists(item): if not r.exists(item):
# r.rpush('NewsEnterpriseFbs:gnqy_socialCode', item) # r.rpush('NewsEnterpriseFbs:gnqy_socialCode', item)
# r.rpush('CorPersonEnterpriseFbs:gnqy_socialCode', item) # r.rpush('CorPersonEnterpriseFbs:gnqy_socialCode', item)
r.rpush('NoticeEnterpriseFbs:gnqy_socialCode',item) r.rpush('AnnualEnterprise:gnshqy_socialCode',item)
# r.rpush('BaseInfoEnterpriseFbs:gnqy_social_code',item) # r.rpush('BaseInfoEnterpriseFbs:gnqy_social_code',item)
# r.rpush('FinanceFromEast:eastfinance_socialCode',item) # r.rpush('FinanceFromEast:eastfinance_socialCode',item)
closeSql(cnx,cursor) closeSql(cnx,cursor)
......
"""
打开SEC网址——【FILINGS】——【Company Filing】——输入证券代码——选10-K和20-F为年报
"""
import time
import requests
from bs4 import BeautifulSoup
from selenium import webdriver
url = 'https://www.sec.gov/edgar/browse/?CIK=1815846&owner=exclude'
#模拟浏览器
chromedriver = "D:/chrome/chromedriver.exe"
browser = webdriver.Chrome(chromedriver)
browser.get(url)
time.sleep(3)
page_source = browser.page_source
soup = BeautifulSoup(page_source, 'html.parser')
print(soup)
select_ann = soup.find_all('tr',class_='odd')
for tr in select_ann:
want_type = tr.find('td').text
if want_type=='20-F':
print('yes')
#获取原文链接
td = tr.find('td').find('a',class_='document-link')['title_href']
print(td)
import requests, re, time, pymysql import requests, re, time, pymysql
...@@ -7,7 +7,7 @@ baseCore = BaseCore.BaseCore() ...@@ -7,7 +7,7 @@ baseCore = BaseCore.BaseCore()
requests.adapters.DEFAULT_RETRIES = 3 requests.adapters.DEFAULT_RETRIES = 3
log = baseCore.getLogger() log = baseCore.getLogger()
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4') cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
cursor = cnx.cursor() cursor = cnx.cursor()
tracker_conf = get_tracker_conf('./client.conf') tracker_conf = get_tracker_conf('./client.conf')
client = Fdfs_client(tracker_conf) client = Fdfs_client(tracker_conf)
...@@ -131,7 +131,8 @@ def begin(): ...@@ -131,7 +131,8 @@ def begin():
while True: while True:
start_time = time.time() start_time = time.time()
# 获取企业信息 # 获取企业信息
social_code = baseCore.redicPullData('AnnualEnterprise:gnshqy_socialCode') # social_code = baseCore.redicPullData('AnnualEnterprise:gnshqy_socialCode')
social_code = '91100000100003962T'
if not social_code: if not social_code:
time.sleep(20) time.sleep(20)
continue continue
...@@ -157,7 +158,7 @@ def begin(): ...@@ -157,7 +158,7 @@ def begin():
count += 1 count += 1
runType = 'AnnualReportCount' runType = 'AnnualReportCount'
baseCore.updateRun(social_code, runType, count) baseCore.updateRun(social_code, runType, count)
break
if __name__ == '__main__': if __name__ == '__main__':
begin() begin()
......
...@@ -10,7 +10,7 @@ urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) ...@@ -10,7 +10,7 @@ urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
baseCore = BaseCore.BaseCore() baseCore = BaseCore.BaseCore()
# conn = cx_Oracle.connect('cis/ZZsn9988_1qaz@114.116.91.1:1521/orcl') # conn = cx_Oracle.connect('cis/ZZsn9988_1qaz@114.116.91.1:1521/orcl')
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4') cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
cursor_ = cnx.cursor() cursor_ = cnx.cursor()
cnx_ = baseCore.cnx cnx_ = baseCore.cnx
......
import json
import json
from kafka import KafkaProducer
from fdfs_client.client import get_tracker_conf, Fdfs_client from fdfs_client.client import get_tracker_conf, Fdfs_client
...@@ -9,8 +12,9 @@ from base import BaseCore ...@@ -9,8 +12,9 @@ from base import BaseCore
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
baseCore = BaseCore.BaseCore() baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
# conn = cx_Oracle.connect('cis/ZZsn9988_1qaz@114.116.91.1:1521/orcl') # conn = cx_Oracle.connect('cis/ZZsn9988_1qaz@114.116.91.1:1521/orcl')
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4') cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
cursor_ = cnx.cursor() cursor_ = cnx.cursor()
tracker_conf = get_tracker_conf('./client.conf') tracker_conf = get_tracker_conf('./client.conf')
...@@ -18,28 +22,6 @@ client = Fdfs_client(tracker_conf) ...@@ -18,28 +22,6 @@ client = Fdfs_client(tracker_conf)
taskType = '企业年报/证监会' taskType = '企业年报/证监会'
# def get_proxy():
# cursor = cnx_ip.cursor()
# sql = "select proxy from clb_proxy"
# cursor.execute(sql)
# proxy_lists = cursor.fetchall()
# ip_list = []
# for proxy_ in proxy_lists:
# ip_list.append(str(proxy_).replace("('", '').replace("',)", ''))
# proxy_list = []
# for str_ip in ip_list:
# str_ip_list = str_ip.split('-')
# proxyMeta = "http://%(host)s:%(port)s" % {
# "host": str_ip_list[0],
# "port": str_ip_list[1],
# }
# proxy = {
# "HTTP": proxyMeta,
# "HTTPS": proxyMeta
# }
# proxy_list.append(proxy)
# return proxy_list
def RequestUrl(url, payload, item_id, start_time): def RequestUrl(url, payload, item_id, start_time):
# ip = get_proxy()[random.randint(0, 3)] # ip = get_proxy()[random.randint(0, 3)]
...@@ -118,7 +100,7 @@ def SpiderByZJH(url, payload, dic_info, num, start_time): ...@@ -118,7 +100,7 @@ def SpiderByZJH(url, payload, dic_info, num, start_time):
pdf_url = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[0].strip('\'') pdf_url = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[0].strip('\'')
name_pdf = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[1].strip('\'') name_pdf = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[1].strip('\'')
# pub_time = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[2].strip('\'') pub_time = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[2].strip('\'')
# print(name) # print(name)
report_type = td_list[4].text.strip() report_type = td_list[4].text.strip()
# print(report_type) # print(report_type)
...@@ -129,11 +111,11 @@ def SpiderByZJH(url, payload, dic_info, num, start_time): ...@@ -129,11 +111,11 @@ def SpiderByZJH(url, payload, dic_info, num, start_time):
try: try:
year = re.findall('\d{4}\s*年', name_pdf)[0].replace('年', '') year = re.findall('\d{4}\s*年', name_pdf)[0].replace('年', '')
except Exception as e: except Exception as e:
pub_time = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[2].strip('\'')[:4] # pub_time = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[2].strip('\'')[:4]
year = int(pub_time) - 1 year = int(pub_time) - 1
year = str(year) year = str(year)
page_size = 0 # page_size = 0
sel_sql = '''select item_id,year from clb_sys_attachment where item_id = %s and year = %s''' sel_sql = '''select item_id,year from clb_sys_attachment where item_id = %s and year = %s'''
cursor_.execute(sel_sql, (item_id, year)) cursor_.execute(sel_sql, (item_id, year))
...@@ -142,77 +124,65 @@ def SpiderByZJH(url, payload, dic_info, num, start_time): ...@@ -142,77 +124,65 @@ def SpiderByZJH(url, payload, dic_info, num, start_time):
print(f'com_name:{short_name}、{year}已存在') print(f'com_name:{short_name}、{year}已存在')
continue continue
else: else:
# 类型为年报的话就解析该年报pdf,并入库 retData = baseCore.upLoadToServe(pdf_url, 1, social_code)
for i in range(0, 3): #插入数据库获取att_id
try: num = num + 1
resp_content = requests.request("GET", pdf_url).content att_id = baseCore.tableUpdate(retData, short_name, year, name_pdf, num)
# 获取pdf页数 content = retData['content']
with fitz.open(stream=resp_content, filetype='pdf') as doc: if retData['state']:
page_size = doc.page_count pass
break
except Exception as e:
print(e)
time.sleep(3)
continue
if page_size < 1:
# pdf解析失败
print(f'==={short_name}、{year}===pdf解析失败')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(item_id, taskType, state, takeTime, pdf_url, 'pdf解析失败')
continue
result = ''
for i in range(0, 3):
try:
result = client.upload_by_buffer(resp_content, file_ext_name='pdf')
break
except Exception as e:
print(e)
time.sleep(3)
continue
if result == '':
e = '上传服务器失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(item_id, taskType, state, takeTime, pdf_url, e)
continue
if 'Remote file_id' in str(result) and 'Uploaded size' in str(result):
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
type_id = '1'
item_id = dic_info['social_code']
group_name = 'group1'
path = bytes.decode(result['Remote file_id']).replace('group1', '')
full_path = bytes.decode(result['Remote file_id'])
category = 'pdf'
file_size = result['Uploaded size']
order_by = num
status = 1
create_by = 'XueLingKun'
create_time = time_now
page_size = page_size
try:
tableUpdate(year, name_pdf, type_id, item_id, group_name, path, full_path,
category, file_size, order_by, status, create_by, create_time, page_size)
state = 1
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(item_id, taskType, state, takeTime, pdf_url, '')
except:
e = '数据库传输失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(item_id, taskType, state, takeTime, pdf_url, e)
num = num + 1
time.sleep(2)
else: else:
e = '采集失败' log.info(f'====pdf解析失败====')
return False
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_news = {
'attachmentIds': att_id,
'author': '',
'content': content,
'contentWithTag': '',
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': 'zh',
'origin': '证监会',
'publishDate': pub_time,
'sid': '1684032033495392257',
'sourceAddress': '', # 原文链接
'summary': '',
'title': name_pdf,
'type': 1,
'socialCreditCode': social_code,
'year': year
}
# print(dic_news)
# 将相应字段通过kafka传输保存
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("researchReportTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
dic_result = {
'success': 'ture',
'message': '操作成功',
'code': '200',
}
print(dic_result)
return True
except Exception as e:
dic_result = {
'success': 'false',
'message': '操作失败',
'code': '204',
'e': e
}
state = 0 state = 0
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(item_id, taskType, state, takeTime, pdf_url, e) baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, 'Kafka操作失败')
continue print(dic_result)
return False
else: else:
continue continue
......
...@@ -222,10 +222,10 @@ class BaseCore: ...@@ -222,10 +222,10 @@ class BaseCore:
__USER_PHONE_AGENT_LIST = ['Mozilla/5.0 (Linux; Android 7.1.1; OPPO R9sk) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.111 Mobile Safari/537.36'] __USER_PHONE_AGENT_LIST = ['Mozilla/5.0 (Linux; Android 7.1.1; OPPO R9sk) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.111 Mobile Safari/537.36']
def __init__(self): def __init__(self):
self.__cnx_proxy = pymysql.connect(host='114.115.159.144', user='root', password='zzsn9988', db='clb_project', self.__cnx_proxy = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='clb_project',
charset='utf8mb4') charset='utf8mb4')
self.__cursor_proxy = self.__cnx_proxy.cursor() self.__cursor_proxy = self.__cnx_proxy.cursor()
self.cnx = pymysql.connect(host='114.115.159.144', user='root', password='zzsn9988', db='caiji', self.cnx = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='caiji',
charset='utf8mb4') charset='utf8mb4')
self.cursor = self.cnx.cursor() self.cursor = self.cnx.cursor()
......
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
...@@ -143,7 +143,7 @@ class YahooCaiwu(object): ...@@ -143,7 +143,7 @@ class YahooCaiwu(object):
return driver return driver
def conn11(self): def conn11(self):
conn = pymysql.Connect(host='114.116.44.11', port=3306, user='root', passwd='f7s0&7qqtK', db='clb_project', conn = pymysql.Connect(host='114.116.44.11', port=3306, user='caiji', passwd='f7s0&7qqtK', db='clb_project',
charset='utf8') charset='utf8')
cursor = conn.cursor() cursor = conn.cursor()
return conn,cursor return conn,cursor
......
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
...@@ -46,7 +46,7 @@ class Shizhi(object): ...@@ -46,7 +46,7 @@ class Shizhi(object):
return driver return driver
def conn11(self): def conn11(self):
conn = pymysql.Connect(host='114.116.44.11', port=3306, user='root', passwd='f7s0&7qqtK', db='clb_project', conn = pymysql.Connect(host='114.116.44.11', port=3306, user='caiji', passwd='f7s0&7qqtK', db='clb_project',
charset='utf8') charset='utf8')
cursor = conn.cursor() cursor = conn.cursor()
return conn,cursor return conn,cursor
......
""" """
...@@ -8,7 +8,7 @@ import pandas as pd ...@@ -8,7 +8,7 @@ import pandas as pd
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from base.BaseCore import BaseCore from base.BaseCore import BaseCore
baseCore = BaseCore() baseCore = BaseCore()
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4') cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
cursor = cnx.cursor() cursor = cnx.cursor()
cnx_ = baseCore.cnx cnx_ = baseCore.cnx
cursor_ = baseCore.cursor cursor_ = baseCore.cursor
...@@ -439,14 +439,14 @@ def getReportTime(): ...@@ -439,14 +439,14 @@ def getReportTime():
# 2023-04-01 # 2023-04-01
#todo:正式任务 #todo:正式任务
# 获取当前日期和时间 # 获取当前日期和时间
# current_date = datetime.now() current_date = datetime.now()
# 计算昨天的日期 # 计算昨天的日期
# yesterday = current_date - timedelta(days=1) yesterday = current_date - timedelta(days=1)
# 格式化昨天的日期 # 格式化昨天的日期
# report_date = yesterday.strftime('%Y-%m-%d') report_date = yesterday.strftime('%Y-%m-%d')
# list_date.append(report_date) list_date.append(report_date)
# year = int(current_date.strftime('%Y')) year = int(current_date.strftime('%Y'))
list_date = ['2023-03-31'] # list_date = ['2023-03-31']
list_month = ['-12-31', '-09-30', '-06-30', '-03-31'] list_month = ['-12-31', '-09-30', '-06-30', '-03-31']
for year in range(2022, 2018, -1): for year in range(2022, 2018, -1):
......
...@@ -20,7 +20,7 @@ class Gpdm(object): ...@@ -20,7 +20,7 @@ class Gpdm(object):
'version':'TYC-Web', 'version':'TYC-Web',
'Content-Type':'application/json;charset=UTF-8' 'Content-Type':'application/json;charset=UTF-8'
} }
cnx = pymysql.connect(host='114.115.159.144', user='root', password='zzsn9988', db='caiji',charset='utf8mb4') cnx = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='caiji',charset='utf8mb4')
cursor= cnx.cursor() cursor= cnx.cursor()
taskType = '股票代码/东方财富网' taskType = '股票代码/东方财富网'
......
import requests, pymysql, re, time, json, sys
import requests, pymysql, re, time, json, sys
import pandas as pd
from bs4 import BeautifulSoup
from selenium import webdriver
from concurrent.futures.thread import ThreadPoolExecutor
from base.BaseCore import BaseCore
baseCore = BaseCore()
log = baseCore.getLogger()
cnx = baseCore.cnx
cursor = baseCore.cursor
def InsterInto(short_name, social_code, pdf_url):
inster = False
sel_sql = '''select social_credit_code,source_address from brpa_source_article where social_credit_code = %s and source_address = %s'''
cursor.execute(sel_sql, (social_code, pdf_url))
selects = cursor.fetchone()
if selects:
print(f'com_name:{short_name}、{pdf_url}已存在')
return inster
# 信息插入数据库
try:
insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,create_time) values(%s,%s,%s,%s,now())'''
list_info = [
social_code,
pdf_url,
'东方财富网',
'1',
]
#144数据库
cursor.execute(insert_sql, tuple(list_info))
cnx.commit()
insert = True
return insert
except:
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, '数据库传输失败')
return insert
def gonggao_info(dic_info):
list_all_info = []
code = dic_info[3]
com_name = dic_info[4]
social__code = dic_info[2]
if 'HK' in code:
# browser.quit()
return
code1 = str(code)
while True:
if len(code1) < 6:
code1 = '0' + code1
else:
break
if code1[0] == '0' or code1[0] == '3' or code[0] == '2':
com_code = 'SZ' + code1
elif code1[0] == '6' or code1[0] == '9':
com_code = 'SH' + code1
elif code1[0] == '8' or code1[0] == '4':
com_code = 'BJ' + code1
break_id = 0
for page1 in range(1, 2):
if break_id == 1:
break
url = f'https://np-anotice-stock.eastmoney.com/api/security/ann?sr=-1&page_size=50&page_index={page1}&ann_type=A&client_source=web&stock_list={code1}&f_node=0&s_node=0'
for n1 in range(0, 3):
try:
res = requests.get(url, verify=False)
break
except:
if n1 == 2:
sys.exit(0)
time.sleep(5)
continue
res_json = res.json()
list_all = res_json['data']['list']
if list_all:
for one_info in list_all:
title = one_info['title']
info_date = one_info['notice_date']
if page1 > 1 and '2022' in info_date:
break_id = 1
break
if '2021' in info_date: # 只采集22年以后的数据
break_id = 1
break
try:
info_type = one_info['columns'][0]['column_name']
except:
info_type = ''
art_code = one_info['art_code']
info_url = 'https://data.eastmoney.com/notices/detail/' + com_code + '/' + art_code + '.html'
t = int(time.time() * 1000)
json_url = f'https://np-cnotice-stock.eastmoney.com/api/content/ann?art_code={art_code}&client_source=web&page_index=1&_={t}'
for n1 in range(0, 3):
try:
json_2 = requests.get(json_url, verify=False).json()
break
except:
if n1 == 2:
sys.exit(0)
time.sleep(5)
continue
try:
pdf_url = json_2['data']['attach_url']
except:
pdf_url = ''
#拿到pdfurl去数据库中查找,如果有该条信息 则跳过,否则继续采集
sel_sql = '''select social_credit_code from brpa_source_article where source_address = %s and type='1' '''
cursor.execute(sel_sql, info_url)
selects = cursor.fetchall()
if selects:
return
else:
pass
try:
info_content = json_2['data']['notice_content']
except:
info_content = ''
list_info = [
social_code,
title,
info_content[:2000],
info_date,
info_url,
pdf_url,
'东方财富网',
'1',
'zh'
]
# list_all_info.append(tuple(list_info))
with cnx.cursor() as cursor:
sel_sql = '''select social_credit_code from brpa_source_article where source_address = %s '''
cursor.execute(sel_sql, info_url)
selects = cursor.fetchall()
if selects:
break
else:
#todo:取消入库操作
insert_sql = '''insert into brpa_source_article(social_credit_code,title,summary,publish_date,source_address,pdf_address,origin,type,lang) values(%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
cursor.execute(insert_sql, tuple(list_info))
cnx.commit()
else:
break
print(f'{code}:传输完成')
# list_all_info_1.append(list_all_info)
list_c.append(code)
if __name__ =='__main__':
#从redis中读取social_code'
list_c = []
list_all_info_1 = []
num = 0
taskType = '企业公告/东方财富网'
while True:
start_time = time.time()
# 获取企业信息
social_code = baseCore.redicPullData('NoticeEnterpriseEasteFinance:gnshqy_socialCode')
# social_code = '911100007109288314'
if not social_code:
time.sleep(20)
continue
if social_code == 'None':
time.sleep(20)
continue
if social_code == '':
time.sleep(20)
continue
dic_info = baseCore.getInfomation(social_code)
count = dic_info[15]
code = dic_info[3]
com_name = dic_info[4]
gonggao_info(dic_info)
""" """
...@@ -168,6 +168,11 @@ def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time,com_na ...@@ -168,6 +168,11 @@ def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time,com_na
#上传至文件服务器 #上传至文件服务器
retData = baseCore.upLoadToServe(pdf_url,8,social_code) retData = baseCore.upLoadToServe(pdf_url,8,social_code)
#附件插入att数据库 #附件插入att数据库
if retData['state']:
pass
else:
log.info(f'====pdf解析失败====')
return False
num = num + 1 num = num + 1
att_id = baseCore.tableUpdate(retData,com_name,year,pdf_name,num) att_id = baseCore.tableUpdate(retData,com_name,year,pdf_name,num)
content = retData['content'] content = retData['content']
...@@ -176,27 +181,7 @@ def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time,com_na ...@@ -176,27 +181,7 @@ def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time,com_na
else: else:
log.info(f'====pdf解析失败====') log.info(f'====pdf解析失败====')
return False return False
# 先获取PDF链接下载pdf,在解析内容
# try:
# res = requests.get(pdf_url)
# content = ''
# # 读取文件内容,解析内容
# with fitz.open(stream=res.content, filetype='pdf') as doc:
# for page in doc.pages():
# content += page.get_text()
# except:
# # print('解析失败')
# dic_result = {
# 'success': 'false',
# 'message': 'PDF解析失败',
# 'code': '204',
# }
# log.info(dic_result)
# state = 0
# takeTime = baseCore.getTimeCost(start_time, time.time())
# baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, dic_result['message'])
# return False
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_news = { dic_news = {
'attachmentIds': att_id, 'attachmentIds': att_id,
...@@ -373,8 +358,8 @@ if __name__ == '__main__': ...@@ -373,8 +358,8 @@ if __name__ == '__main__':
while True: while True:
start_time = time.time() start_time = time.time()
# 获取企业信息 # 获取企业信息
# social_code = baseCore.redicPullData('NoticeEnterpriseFbs:gnqy_socialCode') social_code = baseCore.redicPullData('NoticeEnterpriseFbs:gnqy_socialCode')
social_code = '9110000071092841XX' # social_code = '9110000071092841XX'
# 判断 如果Redis中已经没有数据,则等待 # 判断 如果Redis中已经没有数据,则等待
if social_code == None: if social_code == None:
time.sleep(20) time.sleep(20)
......
This source diff could not be displayed because it is too large. You can view the blob instead.
# __init__.py
__version__ = '2.2.0'
VERSION = tuple(map(int, __version__.split('.')))
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# filename: connection.py
import socket
import os
import sys
import time
import random
from itertools import chain
from fdfs_client.exceptions import (
FDFSError,
ConnectionError,
ResponseError,
InvaildResponse,
DataError
)
# start class Connection
class Connection(object):
'''Manage TCP comunication to and from Fastdfs Server.'''
def __init__(self, **conn_kwargs):
self.pid = os.getpid()
self.host_tuple = conn_kwargs['host_tuple']
self.remote_port = conn_kwargs['port']
self.remote_addr = None
self.timeout = conn_kwargs['timeout']
self._sock = None
def __del__(self):
try:
self.disconnect()
except:
pass
def connect(self):
'''Connect to fdfs server.'''
if self._sock:
return
try:
sock = self._connect()
except socket.error as e:
raise ConnectionError(self._errormessage(e))
self._sock = sock
# print '[+] Create a connection success.'
# print '\tLocal address is %s:%s.' % self._sock.getsockname()
# print '\tRemote address is %s:%s' % (self.remote_addr, self.remote_port)
def _connect(self):
'''Create TCP socket. The host is random one of host_tuple.'''
self.remote_addr = random.choice(self.host_tuple)
# print '[+] Connecting... remote: %s:%s' % (self.remote_addr, self.remote_port)
# sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# sock.settimeout(self.timeout)
sock = socket.create_connection((self.remote_addr, self.remote_port), self.timeout)
return sock
def disconnect(self):
'''Disconnect from fdfs server.'''
if self._sock is None:
return
try:
self._sock.close()
except socket.error as e:
raise ConnectionError(self._errormessage(e))
self._sock = None
def get_sock(self):
return self._sock
def _errormessage(self, exception):
# args for socket.error can either be (errno, "message")
# or just "message" '''
if len(exception.args) == 1:
return "[-] Error: connect to %s:%s. %s." % (self.remote_addr, self.remote_port, exception.args[0])
else:
return "[-] Error: %s connect to %s:%s. %s." % \
(exception.args[0], self.remote_addr, self.remote_port, exception.args[1])
# end class Connection
# start ConnectionPool
class ConnectionPool(object):
'''Generic Connection Pool'''
def __init__(self, name='', conn_class=Connection,
max_conn=None, **conn_kwargs):
self.pool_name = name
self.pid = os.getpid()
self.conn_class = conn_class
self.max_conn = max_conn or 2 ** 31
self.conn_kwargs = conn_kwargs
self._conns_created = 0
self._conns_available = []
self._conns_inuse = set()
# print '[+] Create a connection pool success, name: %s.' % self.pool_name
def _check_pid(self):
if self.pid != os.getpid():
self.destroy()
self.__init__(self.conn_class, self.max_conn, **self.conn_kwargs)
def make_conn(self):
'''Create a new connection.'''
if self._conns_created >= self.max_conn:
raise ConnectionError('[-] Error: Too many connections.')
num_try = 10
while True:
try:
if num_try <= 0:
sys.exit()
conn_instance = self.conn_class(**self.conn_kwargs)
conn_instance.connect()
self._conns_created += 1
break
except ConnectionError as e:
print(e)
num_try -= 1
conn_instance = None
return conn_instance
def get_connection(self):
'''Get a connection from pool.'''
self._check_pid()
try:
conn = self._conns_available.pop()
# print '[+] Get a connection from pool %s.' % self.pool_name
# print '\tLocal address is %s:%s.' % conn._sock.getsockname()
# print '\tRemote address is %s:%s' % (conn.remote_addr, conn.remote_port)
except IndexError:
conn = self.make_conn()
self._conns_inuse.add(conn)
return conn
def remove(self, conn):
'''Remove connection from pool.'''
if conn in self._conns_inuse:
self._conns_inuse.remove(conn)
self._conns_created -= 1
if conn in self._conns_available:
self._conns_available.remove(conn)
self._conns_created -= 1
def destroy(self):
'''Disconnect all connections in the pool.'''
all_conns = chain(self._conns_inuse, self._conns_available)
for conn in all_conns:
conn.disconnect()
# print '[-] Destroy connection pool %s.' % self.pool_name
def release(self, conn):
'''Release the connection back to the pool.'''
self._check_pid()
if conn.pid == self.pid:
self._conns_inuse.remove(conn)
self._conns_available.append(conn)
# print '[-] Release connection back to pool %s.' % self.pool_name
# end ConnectionPool class
def tcp_recv_response(conn, bytes_size, buffer_size=4096):
'''Receive response from server.
It is not include tracker header.
arguments:
@conn: connection
@bytes_size: int, will be received byte_stream size
@buffer_size: int, receive buffer size
@Return: tuple,(response, received_size)
'''
recv_buff = []
total_size = 0
try:
while bytes_size > 0:
resp = conn._sock.recv(buffer_size)
recv_buff.append(resp)
total_size += len(resp)
bytes_size -= len(resp)
except (socket.error, socket.timeout) as e:
raise ConnectionError('[-] Error: while reading from socket: (%s)' % e.args)
return (b''.join(recv_buff), total_size)
def tcp_send_data(conn, bytes_stream):
'''Send buffer to server.
It is not include tracker header.
arguments:
@conn: connection
@bytes_stream: trasmit buffer
@Return bool
'''
try:
conn._sock.sendall(bytes_stream)
except (socket.error, socket.timeout) as e:
raise ConnectionError('[-] Error: while writting to socket: (%s)' % e.args)
...@@ -22,7 +22,7 @@ headers = { ...@@ -22,7 +22,7 @@ headers = {
'version':'TYC-Web', 'version':'TYC-Web',
'Content-Type':'application/json;charset=UTF-8' 'Content-Type':'application/json;charset=UTF-8'
} }
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4') cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
cursor= cnx.cursor() cursor= cnx.cursor()
#根据信用代码获取天眼查id 企业名字等信息 #根据信用代码获取天眼查id 企业名字等信息
......
...@@ -12,7 +12,7 @@ jieba.cut("必须加载jieba") ...@@ -12,7 +12,7 @@ jieba.cut("必须加载jieba")
smart =smart_extractor.SmartExtractor('cn') smart =smart_extractor.SmartExtractor('cn')
baseCore = BaseCore() baseCore = BaseCore()
log = baseCore.getLogger() log = baseCore.getLogger()
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4') cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
cursor= cnx.cursor() cursor= cnx.cursor()
pageSize = 10 pageSize = 10
headers = { headers = {
......
import json
import time
import requests
from pymysql.converters import escape_string
from selenium import webdriver
from bs4 import BeautifulSoup
from base.BaseCore import BaseCore
baseCore = BaseCore()
log = baseCore.getLogger()
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
def flushAndGetToken(browser):
log.info('======刷新浏览器=====')
browser.refresh()
cookie_list = browser.get_cookies()
cur_url = browser.current_url
token = cur_url.split('token=')[1]
log.info(f'===========当前token为:{token}============')
cookies = {}
for cookie in cookie_list:
cookies[cookie['name']] = cookie['value']
browser.get(cur_url)
info = browser.page_source
# res_2 = requests.get(year_url, proxies=ip)
soup = BeautifulSoup(info, 'html.parser')
user_name = soup.find('div', class_='weui-desktop_name').text
return token,cookies,user_name
if __name__=="__main__":
requests.DEFAULT_RETRIES = 5
time_start = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
log.info(f'开始时间为:{time_start}')
requests.adapters.DEFAULT_RETRIES = 3
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36',
}
opt = webdriver.ChromeOptions()
opt.add_argument(
'user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36')
opt.add_argument("--ignore-certificate-errors")
opt.add_argument("--ignore-ssl-errors")
opt.add_experimental_option("excludeSwitches", ["enable-automation"])
opt.add_experimental_option('excludeSwitches', ['enable-logging'])
opt.add_experimental_option('useAutomationExtension', False)
# opt.binary_location =r'D:\crawler\baidu_crawler\tool\Google\Chrome\Application\chrome.exe'
# chromedriver = r'C:\Users\WIN10\DataspellProjects\crawlerProjectDemo\tmpcrawler\cmd100\chromedriver.exe'
chromedriver = r'D:/chrome/chromedriver.exe'
browser = webdriver.Chrome(chrome_options=opt, executable_path=chromedriver)
url = "https://mp.weixin.qq.com/"
browser.get(url)
# 可改动
time.sleep(70)
s = requests.session()
#获取到token和cookies
token, cookies,user_name = flushAndGetToken(browser)
print(token,cookies)
cookies = json.dumps(cookies)
# loadinfo = [token,cookies]
#保存到数据库中
insert = f"insert into weixin_tokenCookies (token,cookies,create_time,fenghao_time,user_name,update_time) values ('{token}','{escape_string(cookies)}',now(),DATE_SUB(NOW(), INTERVAL 1 DAY),'{user_name}',now())"
cursor_.execute(insert)
cnx_.commit()
baseCore.close()
# s.cookies.update(cookies)
# s.keep_alive = False
...@@ -20,13 +20,13 @@ baseCore = BaseCore() ...@@ -20,13 +20,13 @@ baseCore = BaseCore()
log = baseCore.getLogger() log = baseCore.getLogger()
cnx_ = baseCore.cnx cnx_ = baseCore.cnx
cursor_ = baseCore.cursor cursor_ = baseCore.cursor
cnx = pymysql.connect(host="114.116.44.11", user="root", password="f7s0&7qqtK", db="clb_project", charset="utf8mb4") cnx = pymysql.connect(host="114.116.44.11", user="caiji", password="f7s0&7qqtK", db="clb_project", charset="utf8mb4")
cursor = cnx.cursor() cursor = cnx.cursor()
r = baseCore.r r = baseCore.r
urllib3.disable_warnings() urllib3.disable_warnings()
def check_url(sid, article_url): def check_url(sid, article_url):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn') # r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn')
res = r.sismember(f'wx_url_{sid}',article_url) res = r.sismember(f'wx_url_{sid}',article_url)
if res == True: if res == True:
return True return True
...@@ -34,7 +34,7 @@ def check_url(sid, article_url): ...@@ -34,7 +34,7 @@ def check_url(sid, article_url):
return False return False
def add_url(sid, article_url): def add_url(sid, article_url):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn') # r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn')
res = r.sadd(f'wx_url_{sid}', article_url, 3) # 注意是 保存set的方式 res = r.sadd(f'wx_url_{sid}', article_url, 3) # 注意是 保存set的方式
if res == 0: # 若返回0,说明插入不成功,表示有重复 if res == 0: # 若返回0,说明插入不成功,表示有重复
return True return True
...@@ -88,10 +88,10 @@ def get_info(sid,json_search,origin,url_,info_source_code,page): ...@@ -88,10 +88,10 @@ def get_info(sid,json_search,origin,url_,info_source_code,page):
url_news = one_news['link'] url_news = one_news['link']
url_ft = check_url(sid, url_news) # url_ft = check_url(sid, url_news)
if url_ft: # if url_ft:
log.info(f'-----{origin}--第{page}页--已采过该篇文章--文章链接--{url_news}-----') # log.info(f'-----{origin}--第{page}页--已采过该篇文章--文章链接--{url_news}-----')
return list_all_info,num_caiji # return list_all_info,num_caiji
try: try:
ip = baseCore.get_proxy() ip = baseCore.get_proxy()
res_news = requests.get(url_news, timeout=20,proxies=ip) res_news = requests.get(url_news, timeout=20,proxies=ip)
...@@ -176,43 +176,46 @@ def get_info(sid,json_search,origin,url_,info_source_code,page): ...@@ -176,43 +176,46 @@ def get_info(sid,json_search,origin,url_,info_source_code,page):
'source': '11', 'source': '11',
'createDate': time_now 'createDate': time_now
} }
for nnn in range(0, 3): # for nnn in range(0, 3):
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092']) # producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
try: # try:
kafka_result = producer.send("crawlerInfo", json.dumps(dic_info, ensure_ascii=False).encode('utf8')) # kafka_result = producer.send("crawlerInfo", json.dumps(dic_info, ensure_ascii=False).encode('utf8'))
kafka_time_out = kafka_result.get(timeout=10) # kafka_time_out = kafka_result.get(timeout=10)
add_url(sid, url_news) # add_url(sid, url_news)
break # break
except: # except:
time.sleep(5) # time.sleep(5)
continue # continue
finally: # finally:
producer.close() # producer.close()
num_caiji = num_caiji + 1 num_caiji = num_caiji + 1
list_all_info.append(dic_info) list_all_info.append(dic_info)
time.sleep(5) time.sleep(5)
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) # time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_info2 = { # dic_info2 = {
'infoSourceId': sid, # 'infoSourceId': sid,
'code': info_source_code, # 'code': info_source_code,
'num': num_caiji, # 'num': num_caiji,
'collectTime': kaishi_time, # 'collectTime': kaishi_time,
'dispatcherTime': time_now, # 'dispatcherTime': time_now,
'dispatcherStatus': '1', # 'dispatcherStatus': '1',
'source': '1', # 'source': '1',
} # }
for nnn2 in range(0, 3): # for nnn2 in range(0, 3):
try: # producer2 = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
producer2 = KafkaProducer(bootstrap_servers=['114.115.159.144:9092']) # try:
kafka_result2 = producer2.send("collectionAndDispatcherInfo", # # producer2 = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
json.dumps(dic_info2, ensure_ascii=False).encode('utf8')) # kafka_result2 = producer2.send("collectionAndDispatcherInfo",
break # json.dumps(dic_info2, ensure_ascii=False).encode('utf8'))
except: # break
time.sleep(5) # except:
continue # time.sleep(5)
# continue
# finally:
# producer2.close()
return list_all_info,num_caiji return list_all_info,num_caiji
def RequestUrl(dic_url,token,key): def RequestUrl(dic_url,token,key,i):
start_ = time.time() start_ = time.time()
url_ = dic_url['url_'] url_ = dic_url['url_']
origin = dic_url['name'] origin = dic_url['name']
...@@ -220,14 +223,13 @@ def RequestUrl(dic_url,token,key): ...@@ -220,14 +223,13 @@ def RequestUrl(dic_url,token,key):
sid = dic_url['sid'] sid = dic_url['sid']
biz = dic_url['biz'] biz = dic_url['biz']
fakeid = biz + '==' fakeid = biz + '=='
url_search = f'https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin=0&count=5&fakeid={fakeid}&type=9&query=&token={token}&lang=zh_CN&f=json&ajax=1' url_search = f'https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin={i}&count=5&fakeid={fakeid}&type=9&query=&token={token}&lang=zh_CN&f=json&ajax=1'
ret = -1 ret = -1
json_search = '' json_search = ''
# 获取页数 # 获取页数
try: try:
# ip = baseCore.get_proxy() ip = baseCore.get_proxy()
json_search = s.get(url_search, headers=headers, json_search = s.get(url_search, headers=headers,proxies=ip, verify=False).json() # , proxies=ip, verify=False
verify=False).json() # , proxies=ip, verify=False
str_t = json.dumps(json_search) str_t = json.dumps(json_search)
time.sleep(1) time.sleep(1)
except Exception as e: except Exception as e:
...@@ -243,7 +245,7 @@ def RequestUrl(dic_url,token,key): ...@@ -243,7 +245,7 @@ def RequestUrl(dic_url,token,key):
# {'base_resp': {'err_msg': 'invalid args', 'ret': 200002}} 公众号biz错误 链接 # {'base_resp': {'err_msg': 'invalid args', 'ret': 200002}} 公众号biz错误 链接
# 'base_resp': {'err_msg': 'ok', 'ret': 0} 正常 # 'base_resp': {'err_msg': 'ok', 'ret': 0} 正常
if ret == 0: if ret == 0:
pass return json_search,ret
elif ret == 200013: elif ret == 200013:
# 重新放入redis # 重新放入redis
# time.sleep(3600) # time.sleep(3600)
...@@ -315,17 +317,17 @@ def job(count,key): ...@@ -315,17 +317,17 @@ def job(count,key):
log.info('===========获取公众号============') log.info('===========获取公众号============')
start_ = time.time() start_ = time.time()
#todo:redis中数据 pop一条 #todo:redis中数据 pop一条
infoSourceCode = baseCore.redicPullData('WeiXinGZH:infoSourceCode') # infoSourceCode = baseCore.redicPullData('WeiXinGZH:infoSourceCode')
if infoSourceCode == 'None' or infoSourceCode == None: # if infoSourceCode == 'None' or infoSourceCode == None:
#当一次采集完之后,重新插入数据并等待插入完成 # #当一次采集完之后,重新插入数据并等待插入完成
getFromSql() # getFromSql()
time.sleep(20) # time.sleep(20)
log.info(f'========本次公众号已采集完毕,共采集{count}个公众号=========总耗时:{baseCore.getTimeCost(start_,time.time())}') # log.info(f'========本次公众号已采集完毕,共采集{count}个公众号=========总耗时:{baseCore.getTimeCost(start_,time.time())}')
return count # return count
sql = f"SELECT site_uri,id,site_name,info_source_code from info_source where info_source_code = '{infoSourceCode}' " # sql = f"-- SELECT site_uri,id,site_name,info_source_code from info_source where info_source_code = '{infoSourceCode}' "
# '一带一路百人论坛' # '一带一路百人论坛'
# sql = f"-- SELECT site_uri,id,site_name,info_source_code from info_source where info_source_code = 'IN-20220609-57436' " sql = f"SELECT site_uri,id,site_name,info_source_code from info_source where info_source_code = 'IN-20230630-0010' "
cursor.execute(sql) cursor.execute(sql)
row = cursor.fetchone() row = cursor.fetchone()
...@@ -362,7 +364,8 @@ def job(count,key): ...@@ -362,7 +364,8 @@ def job(count,key):
cursor_.execute(insertSql, tuple(error)) cursor_.execute(insertSql, tuple(error))
cnx_.commit() cnx_.commit()
return count return count
json_search,ret = RequestUrl(dic_url,token,key) i = 0
json_search,ret = RequestUrl(dic_url,token,key,i)
if ret == 0: if ret == 0:
try: try:
Max_data = int(json_search['app_msg_cnt']) Max_data = int(json_search['app_msg_cnt'])
...@@ -376,7 +379,7 @@ def job(count,key): ...@@ -376,7 +379,7 @@ def job(count,key):
Max_data = 5 Max_data = 5
log.info(f'开始采集{origin}-----共{Max_page}页---{Max_data}条数据-----') log.info(f'开始采集{origin}-----共{Max_page}页---{Max_data}条数据-----')
for i in range(0, Max_data, 5): for i in range(0, Max_data, 5):
json_search,ret = RequestUrl(dic_url,token,key) json_search,ret = RequestUrl(dic_url,token,key,i)
if ret == 0: if ret == 0:
pass pass
else: else:
......
# -*- coding: utf-8 -*-
'''
成功100 发送数据失败200 请求失败400 文章内容为空500
'''
import requests, time, random, json, pymysql, redis
import pandas as pd
import urllib3
from bs4 import BeautifulSoup
from openpyxl import Workbook
from selenium import webdriver
from obs import ObsClient
from kafka import KafkaProducer
# logging.basicConfig(filename='example.log', level=logging.INFO)
from base.BaseCore import BaseCore
import os
baseCore = BaseCore()
log = baseCore.getLogger()
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
# cnx = pymysql.connect(host="114.116.44.11", user="root", password="f7s0&7qqtK", db="clb_project", charset="utf8mb4")
# cursor = cnx.cursor()
r = baseCore.r
urllib3.disable_warnings()
def check_url(sid, article_url):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn')
res = r.sismember(f'wx_url_{sid}',article_url)
if res == 1:
return True
else:
return False
def add_url(sid, article_url):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn')
res = r.sadd(f'wx_url_{sid}', article_url, 3) # 注意是 保存set的方式
if res == 0: # 若返回0,说明插入不成功,表示有重复
return True
else:
return False
# #定时
# def getFromSql():
# selectSql = "SELECT info_source_code from info_source where site_uri like '%mp.weixin.qq.com%'"
# cursor.execute(selectSql)
# results = cursor.fetchall()
# result_list = [item[0] for item in results]
#
# #放入redis
# for item in result_list:
# r.rpush('WeiXinGZH:infoSourceCode', item)
#
# #刷新浏览器并获得token
# def flushAndGetToken(list_b):
# browser_run = list_b[0]
# log.info('======刷新浏览器=====')
# browser_run.refresh()
# cookie_list = browser_run.get_cookies()
# cur_url = browser_run.current_url
# token = cur_url.split('token=')[1]
# log.info(f'===========当前token为:{token}============')
# cookies = {}
# for cookie in cookie_list:
# cookies[cookie['name']] = cookie['value']
# return token,cookies
#采集失败的公众号 重新放入redis
def rePutIntoR(item):
r.rpush('WeiXinGZH:infoSourceCode', item)
def updatewxLink(link,info_source_code,state):
updateSuccess = f"update wx_link set state= {state} where link='{link}' and info_source_code='{info_source_code}' "
cursor_.execute(updateSuccess)
cnx_.commit()
def getjsonInfo():
#从数据库中获取信息 一条
select_sql = "select * from wx_link where state=0 order by id asc limit 1"
cursor_.execute(select_sql)
row = cursor_.fetchone()
if row:
pass
else:
log.info('-----没有数据了-----')
return False
dict_json = {
'sid':row[1],
'site_uri':row[2],
'site_name':row[3],
'info_source_code':row[4],
'title':row[5],
'publish_time':row[6],
'link':row[7]
}
# 拿到一条数据 更新状态
update_sql = f"update wx_link set state=1 where link='{row[7]}' and info_source_code='{row[4]}' "
cursor_.execute(update_sql)
cnx_.commit()
return dict_json
def get_info(dict_json):
# list_all_info = []
# num_caiji = 0
kaishi_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
obsClient = ObsClient(
access_key_id='VEHN7D0TJ9316H8AHCAV', # 你的华为云的ak码
secret_access_key='heR353lvSWVPNU8pe2QxDtd8GDsO5L6PGH5eUoQY', # 你的华为云的sk
server='https://obs.cn-north-1.myhuaweicloud.com' # 你的桶的地址
)
news_title = dict_json['title']
sid = dict_json['sid']
news_date = dict_json['publish_time']
origin = dict_json['site_name']
url_news = dict_json['link']
info_source_code = dict_json['info_source_code']
# url_ft = check_url(sid, url_news)
# if url_ft:
# return list_all_info,num_caiji
try:
ip = baseCore.get_proxy()
res_news = requests.get(url_news, proxies=ip,timeout=20)
except:
#400请求失败
updatewxLink(url_news,info_source_code,400)
return False
soup_news = BeautifulSoup(res_news.content, 'html.parser')
news_html = soup_news.find('div', {'id': 'js_content'})
try:
del news_html['style']
del news_html['id']
del news_html['class']
except:
pass
try:
news_content = news_html.text
except:
log.info(f'--------内容为空--------{url_news}--------')
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
false = [
news_title,
url_news,
news_html,
'文章内容为空',
time_now
]
insertSql = f"insert into WeixinGZH (site_name,site_url,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(false))
cnx_.commit()
updatewxLink(url_news,info_source_code,500)
return False
list_img = news_html.find_all('img')
for num_img in range(len(list_img)):
img_one = list_img[num_img]
url_src = img_one.get('data-src')
# print(url_src)
if 'gif' in url_src:
url_img = ''
img_one.extract()
else:
try:
name_img = url_src.split('/')[-2] + '.' + url_src.split('wx_fmt=')[1]
except:
img_one.extract()
continue
try:
res = requests.get(url_src, timeout=20)
except:
img_one.extract()
resp = obsClient.putContent('zzsn', name_img, content=res.content)
url_img = resp['body']['objectUrl']
str_url_img = f'<img src="{url_img}">'
try:
img_one.replace_with(BeautifulSoup(str_url_img, 'lxml').img)
except Exception as e:
log.info(f'----{url_news}-----------{e}')
return False
for tag in news_html.descendants:
try:
del tag['style']
except:
pass
list_section = news_html.find_all('section')
for section in list_section:
section.name = 'div'
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_info = {
'sid': sid,
'title': news_title,
'content': news_content,
'contentWithtag': str(news_html),
'summary': '',
'author': '',
'origin': origin,
'publishDate': news_date,
'sourceAddress': url_news,
'source': '11',
'createDate': time_now
}
for nnn in range(0, 3):
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("crawlerInfo", json.dumps(dic_info, ensure_ascii=False).encode('utf8'))
kafka_time_out = kafka_result.get(timeout=10)
# add_url(sid, url_news)
break
except:
time.sleep(5)
log.info('------数据发送kafka失败------')
updatewxLink(url_news,info_source_code,200)
continue
list_all_info.append(dic_info)
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_info2 = {
'infoSourceId': sid,
'code': info_source_code,
'num': num_caiji,
'collectTime': kaishi_time,
'dispatcherTime': time_now,
'dispatcherStatus': '1',
'source': '1',
}
for nnn2 in range(0, 3):
try:
producer2 = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result2 = producer2.send("collectionAndDispatcherInfo",
json.dumps(dic_info2, ensure_ascii=False).encode('utf8'))
break
except:
time.sleep(5)
continue
updatewxLink(url_news,info_source_code,100)
return True
if __name__=="__main__":
num_caiji = 0
list_all_info = []
while True:
#一次拿取一篇文章
dict_json =getjsonInfo()
if dict_json:
if get_info(dict_json):
num_caiji = num_caiji + 1
log.info(f'-----已采集{num_caiji}篇文章---来源{dict_json["site_name"]}----')
else:
break
baseCore.close()
\ No newline at end of file
# 微信采集列表数据
import json
import time
import random
import pymysql
import requests
import urllib3
from pymysql.converters import escape_string
from base.BaseCore import BaseCore
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
baseCore = BaseCore()
log = baseCore.getLogger()
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36',
}
s = requests.session()
cnx = pymysql.connect(host="114.116.44.11", user="caiji", password="f7s0&7qqtK", db="clb_project", charset="utf8mb4")
cursor = cnx.cursor()
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
r = baseCore.r
def resHtml(token,url,cookies):
try:
ip = baseCore.get_proxy()
s=requests.session()
cookie_jar = requests.utils.cookiejar_from_dict(cookies, cookiejar=None, overwrite=True)
s.cookies = cookie_jar
# json_search = s.get(url, headers=headers, proxies=ip, verify=False).json()
json_search = s.get(url, headers=headers, proxies=ip,verify=False).json()
aa=s.cookies.get_dict()
updateCookieToken(token, json.dumps(aa))
except Exception as e:
json_search= {}
return json_search
#采集失败的公众号 重新放入redis
def rePutIntoR(item):
r.rpush('WeiXinGZH:infoSourceCode', item)
#获取公众号信息
def getSourceInfo(infoSourceCode):
sql = f"SELECT site_uri,id,site_name,info_source_code from info_source where info_source_code = '{infoSourceCode}' "
cursor.execute(sql)
row = cursor.fetchone()
dic_url = {
'url_': row[0],
'sid': row[1],
'name': row[2],
'info_source_code': row[3],
'biz': ''
}
url_ = dic_url['url_']
origin = dic_url['name']
info_source_code = dic_url['info_source_code']
sid = dic_url['sid']
try:
biz = url_.split('__biz=')[1].split('==&')[0].split('=')[0]
dic_url['biz'] = biz
except Exception as e:
log.info(f'---公众号--{origin}---biz错误')
error = [
origin,
url_,
info_source_code,
e,
'biz错误'
]
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,now())"
cursor_.execute(insertSql, tuple(error))
cnx_.commit()
return False
return dic_url
#保存错误日志
def insertBadSql(error):
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,now())"
cursor_.execute(insertSql, tuple(error))
cnx_.commit()
#保存文章列表数据
def insertWxList(dic_url,json_search,page):
list_all_news = json_search['app_msg_list']
listCount=0
repetCount=0
insertCount=0
for one_news in list_all_news:
listCount=listCount+1
news_title = one_news['title']
timestamp = one_news['create_time']
time_local = time.localtime(timestamp)
news_date = time.strftime("%Y-%m-%d %H:%M:%S", time_local)
url_news = one_news['link']
selectCountSql=f"select count(1) from wx_link where link='{escape_string(url_news)}'"
cursor_.execute(selectCountSql)
count = cursor_.fetchone()[0]
if count > 0:
repetCount=repetCount+1
continue
else:
insertCount=insertCount+1
try:
insertSql=f"insert into wx_link(sid,site_uri,site_name,info_source_code,title,publish_time,link,state,create_time) values " \
f"('{dic_url['sid']}','{dic_url['url_']}','{dic_url['name']}','{dic_url['info_source_code']}','{escape_string(news_title)}','{escape_string(news_date)}','{escape_string(url_news)}',0,now())"
cursor_.execute(insertSql)
cnx_.commit()
except Exception as e:
log.error(f"保存数据库失败:{e}")
log.info(f"---{dic_url['name']}--第{page}页----总数:{listCount}---重复数:{repetCount}---新增数:{insertCount}-------------")
if listCount==0:
#列表为空认为结束
return True
if repetCount>= listCount/2:
#重复数量大于等于一半认为结束
return True
#没有结束
return False
#token的处理
def updateTokeen(token,type):
if type==2:
#session失效,删除token
cursor_.execute(f"delete from weixin_tokenCookies where token={token}")
if type ==1:
#封号了 修改封号时间
cursor_.execute(f"update weixin_tokenCookies set fenghao_time=now() where token={token}")
if type ==3:
#封号了 修改封号时间
cursor_.execute(f"update weixin_tokenCookies set update_time=now() where token={token}")
cnx_.commit()
#token的处理
def updateCookieToken(token,cookies):
cursor_.execute(f"update weixin_tokenCookies set cookies='{escape_string(cookies)}' where token={token}")
cnx_.commit()
#获取token
def getToken():
cursor_.execute(f"select token,cookies from weixin_tokenCookies where fenghao_time < DATE_SUB(NOW(), INTERVAL 2 HOUR) order by update_time asc limit 1")
row = cursor_.fetchall()
if row:
pass
else:
#没有查到token
return False
return row[0]
#获取列表数据
def getPageData(dic_url,page):
url_ = dic_url['url_']
origin = dic_url['name']
info_source_code = dic_url['info_source_code']
biz = dic_url['biz']
fakeid = biz + '=='
tokenAndCookie = getToken()
if tokenAndCookie:
pass
else:
while True:
time.sleep(60)
tokenAndCookie = getToken()
if tokenAndCookie:
break
token = tokenAndCookie[0]
log.info(f"获取token到----{token}")
cookies = json.loads(tokenAndCookie[1])
# s.cookies.update(cookies)
url = f'https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin={(page - 1) * 5}&count=5&fakeid={fakeid}&type=9&query=&token={token}&lang=zh_CN&f=json&ajax=1'
# reponse = s.get(url, headers=headers, proxies=ip, verify=False)
# json_search = reponse.json()
# newcookies = requests.utils.dict_from_cookiejar(reponse.cookies, cookiejar=None, overwrite=True)
# s.cookies = newcookies
# updateCookieToken(token,json.dumps(s.cookies))
#调用方法
json_search=resHtml(token,url,cookies)
str_t = json.dumps(json_search)
ret = json_search['base_resp']['ret']
if ret == 0:
pass
elif ret == 200013:
log.info(f'======{origin}-----{biz}----该账号被封=======')
#封号修改token
updateTokeen(token,1)
return getPageData(dic_url,page)
elif ret == 200002:
log.info(f'======{origin}-----{biz}----该账号biz错误,请检查=======')
error = [origin, url_, info_source_code, str_t, '无效biz参数']
insertBadSql(error)
return True
elif ret == 200003:
log.info(f'======{origin}-----{biz}----该账号无效session=======')
# session失效修改token
updateTokeen(token, 2)
error = [origin, url_, info_source_code, str_t, '无效session']
insertBadSql(error)
return getPageData(dic_url, page)
else:
log.info(f'======{origin}-----{biz}----该账号其他错误=======')
error = [origin, url_, info_source_code, str_t, '其他错误']
insertBadSql(error)
return True
# 修改token使用时间
updateTokeen(token, 3)
# 保存数据到数据库
return insertWxList(dic_url,json_search,page)
#获取微信公众号数据
def getWxList(infoSourceCode):
dic_url = getSourceInfo(infoSourceCode)
log.info(f"======{infoSourceCode}----开始采集=======")
if dic_url:
pass
else:
log.info(f'======{infoSourceCode}---------该账号biz错误,请检查=======')
error = ['', '', infoSourceCode, '', '该账号biz错误']
insertBadSql(error)
return
origin = dic_url['name']
biz = dic_url['biz']
for page in range(1,2):
retFlag = getPageData(dic_url, page)
time.sleep(random.randint(60,181))
if retFlag:
#结束 跳出该公众号
break
else:
#没有结束
pass
log.info(f"======{origin}-----{biz}----结束采集=======")
def getFromSql():
selectSql = "SELECT info_source_code from info_source where site_uri like '%mp.weixin.qq.com%'"
cursor.execute(selectSql)
results = cursor.fetchall()
result_list = [item[0] for item in results]
#放入redis
for item in result_list:
r.rpush('WeiXinGZH:infoSourceCode', item)
if __name__=="__main__":
while True:
infoSourceCode = baseCore.redicPullData('WeiXinGZH:infoSourceCode')
if infoSourceCode == 'None' or infoSourceCode == None:
log.info("redis已经没有数据了,重新放置数据")
getFromSql()
time.sleep(10)
infoSourceCode = baseCore.redicPullData('WeiXinGZH:infoSourceCode')
getWxList(infoSourceCode)
# infoSourceCode = 'IN-20220917-0159'
# getWxList(infoSourceCode)
...@@ -88,7 +88,7 @@ chrome_options.add_argument('--headless') ...@@ -88,7 +88,7 @@ chrome_options.add_argument('--headless')
executable_path = r'D:\chrome\chromedriver.exe' executable_path = r'D:\chrome\chromedriver.exe'
driver = webdriver.Chrome(options=chrome_options, executable_path=executable_path) driver = webdriver.Chrome(options=chrome_options, executable_path=executable_path)
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4') cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
def scroll(driver): def scroll(driver):
for i in range(0,30): for i in range(0,30):
......
雅虎财经 国外上市企业信息采集 雅虎财经企业动态 部署到香港服务器上
# 雅虎财经企业动态获取 # -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
# 雅虎财经企业动态获取 # 雅虎财经企业动态获取
import json import json
import time import time
...@@ -62,7 +63,7 @@ def getZx(xydm, url, title, cnx, path): ...@@ -62,7 +63,7 @@ def getZx(xydm, url, title, cnx, path):
'雅虎财经', '雅虎财经',
author, author,
'2', '2',
'zh' 'en'
] ]
try: try:
...@@ -180,15 +181,15 @@ def scroll(xydm,name,gpdm): ...@@ -180,15 +181,15 @@ def scroll(xydm,name,gpdm):
break break
last_url_ = last_url last_url_ = last_url
#采集失败的公众号 重新放入redis #采集失败的企业 重新放入redis
def rePutIntoR(item): def rePutIntoR(item):
r.rpush('NewsEnterprise:gwqy_socialCode', item) r.rpush('NewsEnterprise:gwqy_socialCode', item)
if __name__ == "__main__": if __name__ == "__main__":
path = r'F:\spider\115\chromedriver.exe' path = r'D:\chrome\chromedriver.exe'
driver = baseCore.buildDriver(path) driver = baseCore.buildDriver(path)
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4') cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
cursor = cnx.cursor() cursor = cnx.cursor()
while True: while True:
...@@ -197,9 +198,10 @@ if __name__ == "__main__": ...@@ -197,9 +198,10 @@ if __name__ == "__main__":
# 判断 如果Redis中已经没有数据,则等待 # 判断 如果Redis中已经没有数据,则等待
if not social_code : if not social_code :
log.info('============已没有数据============等待===============')
time.sleep(20) time.sleep(20)
continue continue
if social_code == 'None': if social_code == None:
time.sleep(20) time.sleep(20)
continue continue
data = baseCore.getInfomation(social_code) data = baseCore.getInfomation(social_code)
......
...@@ -214,7 +214,7 @@ class BaseCore: ...@@ -214,7 +214,7 @@ class BaseCore:
except : except :
pass pass
def __init__(self): def __init__(self):
self.__cnx_proxy = pymysql.connect(host='114.115.159.144', user='root', password='zzsn9988', db='clb_project', self.__cnx_proxy = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='clb_project',
charset='utf8mb4') charset='utf8mb4')
self.__cursor_proxy= self.__cnx_proxy.cursor() self.__cursor_proxy= self.__cnx_proxy.cursor()
pass pass
......
...@@ -5,7 +5,7 @@ pass=clbzzsn ...@@ -5,7 +5,7 @@ pass=clbzzsn
[mysql] [mysql]
host=114.115.159.144 host=114.115.159.144
username=root username=caiji
password=zzsn9988 password=zzsn9988
database=caiji database=caiji
url=jdbc:mysql://114.115.159.144:3306/caiji?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai&useSSL=false url=jdbc:mysql://114.115.159.144:3306/caiji?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai&useSSL=false
......
...@@ -215,7 +215,7 @@ class BaseCore: ...@@ -215,7 +215,7 @@ class BaseCore:
except : except :
pass pass
def __init__(self): def __init__(self):
self.__cnx_proxy = pymysql.connect(host='114.115.159.144', user='root', password='zzsn9988', db='clb_project', self.__cnx_proxy = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='clb_project',
charset='utf8mb4') charset='utf8mb4')
self.__cursor_proxy= self.__cnx_proxy.cursor() self.__cursor_proxy= self.__cnx_proxy.cursor()
pass pass
......
...@@ -5,7 +5,7 @@ pass=clbzzsn ...@@ -5,7 +5,7 @@ pass=clbzzsn
[mysql] [mysql]
host=114.115.159.144 host=114.115.159.144
username=root username=caiji
password=zzsn9988 password=zzsn9988
database=caiji database=caiji
url=jdbc:mysql://114.115.159.144:3306/caiji?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai&useSSL=false url=jdbc:mysql://114.115.159.144:3306/caiji?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai&useSSL=false
......
import os
import pandas as pd import pandas as pd
import pymysql import pymysql
import requests import requests
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from pymysql.converters import escape_string from pymysql.converters import escape_string
from selenium.webdriver.common.by import By import downPdf
from BaseCore import BaseCore
from base.BaseCore import BaseCore from datetime import datetime
baseCore = BaseCore() baseCore = BaseCore()
log =baseCore.getLogger() log =baseCore.getLogger()
headers = { headers = {
...@@ -28,20 +29,72 @@ headers = { ...@@ -28,20 +29,72 @@ headers = {
cnx = baseCore.cnx cnx = baseCore.cnx
cursor = baseCore.cursor cursor = baseCore.cursor
def downFile(url,path,pdf_name):
try:
baseCore.mkPath(path)
# proxy = {'https': 'http://127.0.0.1:1080', 'http': 'http://127.0.0.1:1080'}
response = requests.get(url, headers=headers, verify=False, timeout=10)
# response = requests.get(url, proxies=proxy, headers=headers, verify=False,timeout=10)
pdf_name = pdf_name +'.pdf'
with open(os.path.join(path, pdf_name), "wb") as pyFile:
for chunk in response.iter_content(chunk_size=1024):
if chunk:
pyFile.write(chunk)
except Exception as e:
log.error(f"出错了----------{e}")
return False
return pdf_name
def job_2(): def job_2():
log.info('----开始采集---俄罗斯国家杂志----') log.info('----开始采集---俄罗斯国家杂志----')
path = r'C:\Users\WIN10\DataspellProjects\crawlerProjectDemo\tmpcrawler\cmd100\chromedriver.exe' # path = 'D:chrome/chromedriver.exe'
driverContent = baseCore.buildDriver(path, headless=False) # driverContent = baseCore.buildDriver(path, headless=False)
url = 'http://publication.pravo.gov.ru/documents/block/president' for i in range(68,200):
req = requests.get(url,headers) if i == 1:
soup = BeautifulSoup(req.content,'html.parser') url = 'http://publication.pravo.gov.ru/documents/block/president'
container = soup.find('div',class_='documents-container') else:
web_list = container.find_all('div',class_='documents-table-row') url = f'http://publication.pravo.gov.ru/documents/block/president?index={i}&pageSize=30'
for web in web_list[:1]: req = requests.get(url,headers)
web_href = web.find('a')['href'] soup = BeautifulSoup(req.content,'html.parser')
web_url = 'http://publication.pravo.gov.ru/' + web_href container = soup.find('div',class_='documents-container')
title = web.find('a').text web_list = container.find_all('div',class_='documents-table-row')
print(title) for web in web_list:
title = web.find_all('a')[1].text
if '"О' in title:
pdftitle = title.strip().split('"О')[0]
if '-рп' in title:
pdftitle = title.strip().split('-рп')[0] + '-рп'
pdfUrl = 'http://publication.pravo.gov.ru' + web.find('div',class_='notforprint pt-2').find('a')['href']
# pdfTitle = aa.find('a')['title']
print(pdfUrl)
selectCountSql = f"select * from usvsrussia where url = '{pdfUrl}' "
cursor.execute(selectCountSql)
url = cursor.fetchone()
if url:
log.info("已采集,跳过")
continue
else:
pass
date_string = web.find('div',class_='infoindocumentlist').find_all('div')[1].find('span',class_='info-data').text
#时间格式转化
date_object = datetime.strptime(date_string, "%d.%m.%Y")
pub_time = date_object.strftime("%Y-%m-%d")
print(pub_time)
pdf_name = web.find('div',class_='infoindocumentlist').find_all('div')[0].find('span',class_='info-data').text
#下载pdf
path=r'D:\美国VS俄罗斯制裁'
path = os.path.join(path, downPdf.getPath(pdftitle))
downFile(pdfUrl,path,pdf_name)
insertSql = f"insert into usvsrussia (website,url,title,pub_time,state,pdf_name,pdf_path,create_time) values ('总统令文件','{pdfUrl}','{escape_string(pdftitle)}','{pub_time}',0,'{pdf_name}','{path}',now() )"
# log.info(insertSql)
cursor.execute(insertSql)
cnx.commit()
# break
job_2()
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
...@@ -215,7 +215,7 @@ class BaseCore: ...@@ -215,7 +215,7 @@ class BaseCore:
except : except :
pass pass
def __init__(self): def __init__(self):
self.__cnx_proxy = pymysql.connect(host='114.115.159.144', user='root', password='zzsn9988', db='clb_project', self.__cnx_proxy = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='clb_project',
charset='utf8mb4') charset='utf8mb4')
self.__cursor_proxy= self.__cnx_proxy.cursor() self.__cursor_proxy= self.__cnx_proxy.cursor()
pass pass
......
[redis] [redis]
...@@ -5,7 +5,7 @@ pass=clbzzsn ...@@ -5,7 +5,7 @@ pass=clbzzsn
[mysql] [mysql]
host=114.115.159.144 host=114.115.159.144
username=root username=caiji
password=zzsn9988 password=zzsn9988
database=caiji database=caiji
url=jdbc:mysql://114.115.159.144:3306/caiji?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai&useSSL=false url=jdbc:mysql://114.115.159.144:3306/caiji?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai&useSSL=false
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论