提交 7da84ece 作者: 丁双波

Merge remote-tracking branch 'origin/master'

......@@ -19,12 +19,14 @@ from openpyxl import Workbook
import langid
#创建连接池
import pymysql
from pymysql import connections
from DBUtils.PooledDB import PooledDB
import pymysql
# import sys
# sys.path.append('D://zzsn_spider//base//fdfs_client')
from fdfs_client.client import get_tracker_conf, Fdfs_client
tracker_conf = get_tracker_conf('./client.conf')
tracker_conf = get_tracker_conf('E:\\kkwork\\zzsn_spider\\base\\client.conf')
client = Fdfs_client(tracker_conf)
# 注意 程序退出前 调用BaseCore.close() 关闭相关资源
......
......@@ -12,7 +12,7 @@ r = basecore.r
def cnn11():
#11数据库
cnx_ = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
cnx_ = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
cursor_ = cnx_.cursor()
return cnx_,cursor_
def close11(cnx_,cursor_):
......@@ -22,7 +22,7 @@ def close11(cnx_,cursor_):
# # 连接到Redis
# r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=6)
#
# cnx = pymysql.connect(host='114.115.159.144', user='root', password='zzsn9988', db='caiji',
# cnx = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='caiji',
# charset='utf8mb4')
# cursor = cnx.cursor()
......@@ -320,7 +320,7 @@ def FBS():
if not r.exists(item):
# r.rpush('NewsEnterpriseFbs:gnqy_socialCode', item)
# r.rpush('CorPersonEnterpriseFbs:gnqy_socialCode', item)
r.rpush('NoticeEnterpriseFbs:gnqy_socialCode',item)
r.rpush('AnnualEnterprise:gnshqy_socialCode',item)
# r.rpush('BaseInfoEnterpriseFbs:gnqy_social_code',item)
# r.rpush('FinanceFromEast:eastfinance_socialCode',item)
closeSql(cnx,cursor)
......
"""
打开SEC网址——【FILINGS】——【Company Filing】——输入证券代码——选10-K和20-F为年报
"""
import time
import requests
from bs4 import BeautifulSoup
from selenium import webdriver
url = 'https://www.sec.gov/edgar/browse/?CIK=1815846&owner=exclude'
#模拟浏览器
chromedriver = "D:/chrome/chromedriver.exe"
browser = webdriver.Chrome(chromedriver)
browser.get(url)
time.sleep(3)
page_source = browser.page_source
soup = BeautifulSoup(page_source, 'html.parser')
print(soup)
select_ann = soup.find_all('tr',class_='odd')
for tr in select_ann:
want_type = tr.find('td').text
if want_type=='20-F':
print('yes')
#获取原文链接
td = tr.find('td').find('a',class_='document-link')['title_href']
print(td)
import requests, re, time, pymysql
import requests, re, time, pymysql
......@@ -7,7 +7,7 @@ baseCore = BaseCore.BaseCore()
requests.adapters.DEFAULT_RETRIES = 3
log = baseCore.getLogger()
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
cursor = cnx.cursor()
tracker_conf = get_tracker_conf('./client.conf')
client = Fdfs_client(tracker_conf)
......@@ -131,7 +131,8 @@ def begin():
while True:
start_time = time.time()
# 获取企业信息
social_code = baseCore.redicPullData('AnnualEnterprise:gnshqy_socialCode')
# social_code = baseCore.redicPullData('AnnualEnterprise:gnshqy_socialCode')
social_code = '91100000100003962T'
if not social_code:
time.sleep(20)
continue
......@@ -157,7 +158,7 @@ def begin():
count += 1
runType = 'AnnualReportCount'
baseCore.updateRun(social_code, runType, count)
break
if __name__ == '__main__':
begin()
......
......@@ -10,7 +10,7 @@ urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
baseCore = BaseCore.BaseCore()
# conn = cx_Oracle.connect('cis/ZZsn9988_1qaz@114.116.91.1:1521/orcl')
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
cursor_ = cnx.cursor()
cnx_ = baseCore.cnx
......
import json
import json
......@@ -14,7 +14,7 @@ urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
baseCore = BaseCore.BaseCore()
log = baseCore.getLogger()
# conn = cx_Oracle.connect('cis/ZZsn9988_1qaz@114.116.91.1:1521/orcl')
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
cursor_ = cnx.cursor()
tracker_conf = get_tracker_conf('./client.conf')
......@@ -124,78 +124,6 @@ def SpiderByZJH(url, payload, dic_info, num, start_time):
print(f'com_name:{short_name}、{year}已存在')
continue
else:
# # 类型为年报的话就解析该年报pdf,并入库
# for i in range(0, 3):
# try:
# resp_content = requests.request("GET", pdf_url).content
# # 获取pdf页数
# with fitz.open(stream=resp_content, filetype='pdf') as doc:
# page_size = doc.page_count
# break
# except Exception as e:
# print(e)
# time.sleep(3)
# continue
# if page_size < 1:
# # pdf解析失败
# print(f'==={short_name}、{year}===pdf解析失败')
# state = 0
# takeTime = baseCore.getTimeCost(start_time, time.time())
# baseCore.recordLog(item_id, taskType, state, takeTime, pdf_url, 'pdf解析失败')
# continue
# result = ''
# for i in range(0, 3):
# try:
# result = client.upload_by_buffer(resp_content, file_ext_name='pdf')
# break
# except Exception as e:
# print(e)
# time.sleep(3)
# continue
# if result == '':
# e = '上传服务器失败'
# state = 0
# takeTime = baseCore.getTimeCost(start_time, time.time())
# baseCore.recordLog(item_id, taskType, state, takeTime, pdf_url, e)
# continue
#
# if 'Remote file_id' in str(result) and 'Uploaded size' in str(result):
#
# time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
#
# type_id = '1'
# item_id = dic_info['social_code']
# group_name = 'group1'
#
# path = bytes.decode(result['Remote file_id']).replace('group1', '')
# full_path = bytes.decode(result['Remote file_id'])
# category = 'pdf'
# file_size = result['Uploaded size']
# order_by = num
# status = 1
# create_by = 'XueLingKun'
# create_time = time_now
# page_size = page_size
# try:
# tableUpdate(year, name_pdf, type_id, item_id, group_name, path, full_path,
# category, file_size, order_by, status, create_by, create_time, page_size)
# state = 1
# takeTime = baseCore.getTimeCost(start_time, time.time())
# baseCore.recordLog(item_id, taskType, state, takeTime, pdf_url, '')
# except:
# e = '数据库传输失败'
# state = 0
# takeTime = baseCore.getTimeCost(start_time, time.time())
# baseCore.recordLog(item_id, taskType, state, takeTime, pdf_url, e)
# num = num + 1
# time.sleep(2)
# else:
# e = '采集失败'
# state = 0
# takeTime = baseCore.getTimeCost(start_time, time.time())
# baseCore.recordLog(item_id, taskType, state, takeTime, pdf_url, e)
# continue
#上传至文件服务器
retData = baseCore.upLoadToServe(pdf_url, 1, social_code)
#插入数据库获取att_id
num = num + 1
......
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
......@@ -23,12 +23,12 @@ log = baseCore.getLogger()
requests.adapters.DEFAULT_RETRIES = 3
# conn = cx_Oracle.connect('cis/ZZsn9988_1qaz@114.116.91.1:1521/orcl')
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
# cnx_ = pymysql.connect(host='114.115.159.144', user='root', password='zzsn9988', db='caiji', charset='utf8mb4')
# # cnx_ip = pymysql.connect(host='114.115.159.144',user='root', password='zzsn9988', db='clb_project', charset='utf8mb4')
# cnx_ = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='caiji', charset='utf8mb4')
# # cnx_ip = pymysql.connect(host='114.115.159.144',user='caiji', password='zzsn9988', db='clb_project', charset='utf8mb4')
# cursor_ = cnx_.cursor()
headers = {
......@@ -229,7 +229,7 @@ def spider_annual_report(dict_info,num):
#state1
if __name__ == '__main__':
num = 0
taskType = '企业年报/雪球网/福布斯'
taskType = '企业年报/雪球网'
while True:
start_time = time.time()
# 获取企业信息
......
......@@ -222,10 +222,10 @@ class BaseCore:
__USER_PHONE_AGENT_LIST = ['Mozilla/5.0 (Linux; Android 7.1.1; OPPO R9sk) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.111 Mobile Safari/537.36']
def __init__(self):
self.__cnx_proxy = pymysql.connect(host='114.115.159.144', user='root', password='zzsn9988', db='clb_project',
self.__cnx_proxy = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='clb_project',
charset='utf8mb4')
self.__cursor_proxy = self.__cnx_proxy.cursor()
self.cnx = pymysql.connect(host='114.115.159.144', user='root', password='zzsn9988', db='caiji',
self.cnx = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='caiji',
charset='utf8mb4')
self.cursor = self.cnx.cursor()
......
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
......@@ -143,7 +143,7 @@ class YahooCaiwu(object):
return driver
def conn11(self):
conn = pymysql.Connect(host='114.116.44.11', port=3306, user='root', passwd='f7s0&7qqtK', db='clb_project',
conn = pymysql.Connect(host='114.116.44.11', port=3306, user='caiji', passwd='f7s0&7qqtK', db='clb_project',
charset='utf8')
cursor = conn.cursor()
return conn,cursor
......
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
......@@ -46,7 +46,7 @@ class Shizhi(object):
return driver
def conn11(self):
conn = pymysql.Connect(host='114.116.44.11', port=3306, user='root', passwd='f7s0&7qqtK', db='clb_project',
conn = pymysql.Connect(host='114.116.44.11', port=3306, user='caiji', passwd='f7s0&7qqtK', db='clb_project',
charset='utf8')
cursor = conn.cursor()
return conn,cursor
......
"""
"""
......@@ -8,7 +8,7 @@ import pandas as pd
from bs4 import BeautifulSoup
from base.BaseCore import BaseCore
baseCore = BaseCore()
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
cursor = cnx.cursor()
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
......@@ -439,14 +439,14 @@ def getReportTime():
# 2023-04-01
#todo:正式任务
# 获取当前日期和时间
# current_date = datetime.now()
current_date = datetime.now()
# 计算昨天的日期
# yesterday = current_date - timedelta(days=1)
yesterday = current_date - timedelta(days=1)
# 格式化昨天的日期
# report_date = yesterday.strftime('%Y-%m-%d')
# list_date.append(report_date)
# year = int(current_date.strftime('%Y'))
list_date = ['2023-03-31']
report_date = yesterday.strftime('%Y-%m-%d')
list_date.append(report_date)
year = int(current_date.strftime('%Y'))
# list_date = ['2023-03-31']
list_month = ['-12-31', '-09-30', '-06-30', '-03-31']
for year in range(2022, 2018, -1):
......
......@@ -20,7 +20,7 @@ class Gpdm(object):
'version':'TYC-Web',
'Content-Type':'application/json;charset=UTF-8'
}
cnx = pymysql.connect(host='114.115.159.144', user='root', password='zzsn9988', db='caiji',charset='utf8mb4')
cnx = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='caiji',charset='utf8mb4')
cursor= cnx.cursor()
taskType = '股票代码/东方财富网'
......
import requests, pymysql, re, time, json, sys
import requests, pymysql, re, time, json, sys
import pandas as pd
from bs4 import BeautifulSoup
from selenium import webdriver
from concurrent.futures.thread import ThreadPoolExecutor
from base.BaseCore import BaseCore
baseCore = BaseCore()
log = baseCore.getLogger()
cnx = baseCore.cnx
cursor = baseCore.cursor
def InsterInto(short_name, social_code, pdf_url):
inster = False
sel_sql = '''select social_credit_code,source_address from brpa_source_article where social_credit_code = %s and source_address = %s'''
cursor.execute(sel_sql, (social_code, pdf_url))
selects = cursor.fetchone()
if selects:
print(f'com_name:{short_name}、{pdf_url}已存在')
return inster
# 信息插入数据库
try:
insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,create_time) values(%s,%s,%s,%s,now())'''
list_info = [
social_code,
pdf_url,
'东方财富网',
'1',
]
#144数据库
cursor.execute(insert_sql, tuple(list_info))
cnx.commit()
insert = True
return insert
except:
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, '数据库传输失败')
return insert
def gonggao_info(dic_info):
list_all_info = []
code = dic_info[3]
com_name = dic_info[4]
social__code = dic_info[2]
if 'HK' in code:
# browser.quit()
return
code1 = str(code)
while True:
if len(code1) < 6:
code1 = '0' + code1
else:
break
if code1[0] == '0' or code1[0] == '3' or code[0] == '2':
com_code = 'SZ' + code1
elif code1[0] == '6' or code1[0] == '9':
com_code = 'SH' + code1
elif code1[0] == '8' or code1[0] == '4':
com_code = 'BJ' + code1
break_id = 0
for page1 in range(1, 2):
if break_id == 1:
break
url = f'https://np-anotice-stock.eastmoney.com/api/security/ann?sr=-1&page_size=50&page_index={page1}&ann_type=A&client_source=web&stock_list={code1}&f_node=0&s_node=0'
for n1 in range(0, 3):
try:
res = requests.get(url, verify=False)
break
except:
if n1 == 2:
sys.exit(0)
time.sleep(5)
continue
res_json = res.json()
list_all = res_json['data']['list']
if list_all:
for one_info in list_all:
title = one_info['title']
info_date = one_info['notice_date']
if page1 > 1 and '2022' in info_date:
break_id = 1
break
if '2021' in info_date: # 只采集22年以后的数据
break_id = 1
break
try:
info_type = one_info['columns'][0]['column_name']
except:
info_type = ''
art_code = one_info['art_code']
info_url = 'https://data.eastmoney.com/notices/detail/' + com_code + '/' + art_code + '.html'
t = int(time.time() * 1000)
json_url = f'https://np-cnotice-stock.eastmoney.com/api/content/ann?art_code={art_code}&client_source=web&page_index=1&_={t}'
for n1 in range(0, 3):
try:
json_2 = requests.get(json_url, verify=False).json()
break
except:
if n1 == 2:
sys.exit(0)
time.sleep(5)
continue
try:
pdf_url = json_2['data']['attach_url']
except:
pdf_url = ''
#拿到pdfurl去数据库中查找,如果有该条信息 则跳过,否则继续采集
sel_sql = '''select social_credit_code from brpa_source_article where source_address = %s and type='1' '''
cursor.execute(sel_sql, info_url)
selects = cursor.fetchall()
if selects:
return
else:
pass
try:
info_content = json_2['data']['notice_content']
except:
info_content = ''
list_info = [
social_code,
title,
info_content[:2000],
info_date,
info_url,
pdf_url,
'东方财富网',
'1',
'zh'
]
# list_all_info.append(tuple(list_info))
with cnx.cursor() as cursor:
sel_sql = '''select social_credit_code from brpa_source_article where source_address = %s '''
cursor.execute(sel_sql, info_url)
selects = cursor.fetchall()
if selects:
break
else:
#todo:取消入库操作
insert_sql = '''insert into brpa_source_article(social_credit_code,title,summary,publish_date,source_address,pdf_address,origin,type,lang) values(%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
cursor.execute(insert_sql, tuple(list_info))
cnx.commit()
else:
break
print(f'{code}:传输完成')
# list_all_info_1.append(list_all_info)
list_c.append(code)
if __name__ =='__main__':
#从redis中读取social_code'
list_c = []
list_all_info_1 = []
num = 0
taskType = '企业公告/东方财富网'
while True:
start_time = time.time()
# 获取企业信息
social_code = baseCore.redicPullData('NoticeEnterpriseEasteFinance:gnshqy_socialCode')
# social_code = '911100007109288314'
if not social_code:
time.sleep(20)
continue
if social_code == 'None':
time.sleep(20)
continue
if social_code == '':
time.sleep(20)
continue
dic_info = baseCore.getInfomation(social_code)
count = dic_info[15]
code = dic_info[3]
com_name = dic_info[4]
gonggao_info(dic_info)
"""
"""
......@@ -168,6 +168,11 @@ def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time,com_na
#上传至文件服务器
retData = baseCore.upLoadToServe(pdf_url,8,social_code)
#附件插入att数据库
if retData['state']:
pass
else:
log.info(f'====pdf解析失败====')
return False
num = num + 1
att_id = baseCore.tableUpdate(retData,com_name,year,pdf_name,num)
content = retData['content']
......@@ -176,27 +181,7 @@ def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time,com_na
else:
log.info(f'====pdf解析失败====')
return False
# 先获取PDF链接下载pdf,在解析内容
# try:
# res = requests.get(pdf_url)
# content = ''
# # 读取文件内容,解析内容
# with fitz.open(stream=res.content, filetype='pdf') as doc:
# for page in doc.pages():
# content += page.get_text()
# except:
# # print('解析失败')
# dic_result = {
# 'success': 'false',
# 'message': 'PDF解析失败',
# 'code': '204',
# }
# log.info(dic_result)
# state = 0
# takeTime = baseCore.getTimeCost(start_time, time.time())
# baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, dic_result['message'])
# return False
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_news = {
'attachmentIds': att_id,
......@@ -373,8 +358,8 @@ if __name__ == '__main__':
while True:
start_time = time.time()
# 获取企业信息
# social_code = baseCore.redicPullData('NoticeEnterpriseFbs:gnqy_socialCode')
social_code = '9110000071092841XX'
social_code = baseCore.redicPullData('NoticeEnterpriseFbs:gnqy_socialCode')
# social_code = '9110000071092841XX'
# 判断 如果Redis中已经没有数据,则等待
if social_code == None:
time.sleep(20)
......
This source diff could not be displayed because it is too large. You can view the blob instead.
# __init__.py
__version__ = '2.2.0'
VERSION = tuple(map(int, __version__.split('.')))
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# filename: connection.py
import socket
import os
import sys
import time
import random
from itertools import chain
from fdfs_client.exceptions import (
FDFSError,
ConnectionError,
ResponseError,
InvaildResponse,
DataError
)
# start class Connection
class Connection(object):
'''Manage TCP comunication to and from Fastdfs Server.'''
def __init__(self, **conn_kwargs):
self.pid = os.getpid()
self.host_tuple = conn_kwargs['host_tuple']
self.remote_port = conn_kwargs['port']
self.remote_addr = None
self.timeout = conn_kwargs['timeout']
self._sock = None
def __del__(self):
try:
self.disconnect()
except:
pass
def connect(self):
'''Connect to fdfs server.'''
if self._sock:
return
try:
sock = self._connect()
except socket.error as e:
raise ConnectionError(self._errormessage(e))
self._sock = sock
# print '[+] Create a connection success.'
# print '\tLocal address is %s:%s.' % self._sock.getsockname()
# print '\tRemote address is %s:%s' % (self.remote_addr, self.remote_port)
def _connect(self):
'''Create TCP socket. The host is random one of host_tuple.'''
self.remote_addr = random.choice(self.host_tuple)
# print '[+] Connecting... remote: %s:%s' % (self.remote_addr, self.remote_port)
# sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# sock.settimeout(self.timeout)
sock = socket.create_connection((self.remote_addr, self.remote_port), self.timeout)
return sock
def disconnect(self):
'''Disconnect from fdfs server.'''
if self._sock is None:
return
try:
self._sock.close()
except socket.error as e:
raise ConnectionError(self._errormessage(e))
self._sock = None
def get_sock(self):
return self._sock
def _errormessage(self, exception):
# args for socket.error can either be (errno, "message")
# or just "message" '''
if len(exception.args) == 1:
return "[-] Error: connect to %s:%s. %s." % (self.remote_addr, self.remote_port, exception.args[0])
else:
return "[-] Error: %s connect to %s:%s. %s." % \
(exception.args[0], self.remote_addr, self.remote_port, exception.args[1])
# end class Connection
# start ConnectionPool
class ConnectionPool(object):
'''Generic Connection Pool'''
def __init__(self, name='', conn_class=Connection,
max_conn=None, **conn_kwargs):
self.pool_name = name
self.pid = os.getpid()
self.conn_class = conn_class
self.max_conn = max_conn or 2 ** 31
self.conn_kwargs = conn_kwargs
self._conns_created = 0
self._conns_available = []
self._conns_inuse = set()
# print '[+] Create a connection pool success, name: %s.' % self.pool_name
def _check_pid(self):
if self.pid != os.getpid():
self.destroy()
self.__init__(self.conn_class, self.max_conn, **self.conn_kwargs)
def make_conn(self):
'''Create a new connection.'''
if self._conns_created >= self.max_conn:
raise ConnectionError('[-] Error: Too many connections.')
num_try = 10
while True:
try:
if num_try <= 0:
sys.exit()
conn_instance = self.conn_class(**self.conn_kwargs)
conn_instance.connect()
self._conns_created += 1
break
except ConnectionError as e:
print(e)
num_try -= 1
conn_instance = None
return conn_instance
def get_connection(self):
'''Get a connection from pool.'''
self._check_pid()
try:
conn = self._conns_available.pop()
# print '[+] Get a connection from pool %s.' % self.pool_name
# print '\tLocal address is %s:%s.' % conn._sock.getsockname()
# print '\tRemote address is %s:%s' % (conn.remote_addr, conn.remote_port)
except IndexError:
conn = self.make_conn()
self._conns_inuse.add(conn)
return conn
def remove(self, conn):
'''Remove connection from pool.'''
if conn in self._conns_inuse:
self._conns_inuse.remove(conn)
self._conns_created -= 1
if conn in self._conns_available:
self._conns_available.remove(conn)
self._conns_created -= 1
def destroy(self):
'''Disconnect all connections in the pool.'''
all_conns = chain(self._conns_inuse, self._conns_available)
for conn in all_conns:
conn.disconnect()
# print '[-] Destroy connection pool %s.' % self.pool_name
def release(self, conn):
'''Release the connection back to the pool.'''
self._check_pid()
if conn.pid == self.pid:
self._conns_inuse.remove(conn)
self._conns_available.append(conn)
# print '[-] Release connection back to pool %s.' % self.pool_name
# end ConnectionPool class
def tcp_recv_response(conn, bytes_size, buffer_size=4096):
'''Receive response from server.
It is not include tracker header.
arguments:
@conn: connection
@bytes_size: int, will be received byte_stream size
@buffer_size: int, receive buffer size
@Return: tuple,(response, received_size)
'''
recv_buff = []
total_size = 0
try:
while bytes_size > 0:
resp = conn._sock.recv(buffer_size)
recv_buff.append(resp)
total_size += len(resp)
bytes_size -= len(resp)
except (socket.error, socket.timeout) as e:
raise ConnectionError('[-] Error: while reading from socket: (%s)' % e.args)
return (b''.join(recv_buff), total_size)
def tcp_send_data(conn, bytes_stream):
'''Send buffer to server.
It is not include tracker header.
arguments:
@conn: connection
@bytes_stream: trasmit buffer
@Return bool
'''
try:
conn._sock.sendall(bytes_stream)
except (socket.error, socket.timeout) as e:
raise ConnectionError('[-] Error: while writting to socket: (%s)' % e.args)
......@@ -22,7 +22,7 @@ headers = {
'version':'TYC-Web',
'Content-Type':'application/json;charset=UTF-8'
}
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
cursor= cnx.cursor()
#根据信用代码获取天眼查id 企业名字等信息
......
......@@ -12,7 +12,7 @@ jieba.cut("必须加载jieba")
smart =smart_extractor.SmartExtractor('cn')
baseCore = BaseCore()
log = baseCore.getLogger()
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
cursor= cnx.cursor()
pageSize = 10
headers = {
......
......@@ -58,7 +58,7 @@ if __name__=="__main__":
url = "https://mp.weixin.qq.com/"
browser.get(url)
# 可改动
time.sleep(30)
time.sleep(70)
s = requests.session()
#获取到token和cookies
......
......@@ -163,19 +163,19 @@ def get_info(sid,json_search,origin,url_,info_source_code,page):
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
#将信息传输到kafka中
# dic_info = {
# 'sid': sid,
# 'title': news_title,
# 'content': news_content,
# 'contentWithtag': str(news_html),
# 'summary': '',
# 'author': '',
# 'origin': origin,
# 'publishDate': news_date,
# 'sourceAddress': url_news,
# 'source': '11',
# 'createDate': time_now
# }
dic_info = {
'sid': sid,
'title': news_title,
'content': news_content,
'contentWithtag': str(news_html),
'summary': '',
'author': '',
'origin': origin,
'publishDate': news_date,
'sourceAddress': url_news,
'source': '11',
'createDate': time_now
}
# for nnn in range(0, 3):
# producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
# try:
......@@ -188,9 +188,9 @@ def get_info(sid,json_search,origin,url_,info_source_code,page):
# continue
# finally:
# producer.close()
# num_caiji = num_caiji + 1
# list_all_info.append(dic_info)
# time.sleep(5)
num_caiji = num_caiji + 1
list_all_info.append(dic_info)
time.sleep(5)
# time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# dic_info2 = {
# 'infoSourceId': sid,
......
......@@ -20,8 +20,8 @@ baseCore = BaseCore()
log = baseCore.getLogger()
cnx_ = baseCore.cnx
cursor_ = baseCore.cursor
cnx = pymysql.connect(host="114.116.44.11", user="root", password="f7s0&7qqtK", db="clb_project", charset="utf8mb4")
cursor = cnx.cursor()
# cnx = pymysql.connect(host="114.116.44.11", user="root", password="f7s0&7qqtK", db="clb_project", charset="utf8mb4")
# cursor = cnx.cursor()
r = baseCore.r
urllib3.disable_warnings()
......@@ -177,7 +177,11 @@ def get_info(dict_json):
url_img = resp['body']['objectUrl']
str_url_img = f'<img src="{url_img}">'
img_one.replace_with(BeautifulSoup(str_url_img, 'lxml').img)
try:
img_one.replace_with(BeautifulSoup(str_url_img, 'lxml').img)
except Exception as e:
log.info(f'----{url_news}-----------{e}')
return False
for tag in news_html.descendants:
try:
......@@ -208,7 +212,7 @@ def get_info(dict_json):
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("crawlerInfo", json.dumps(dic_info, ensure_ascii=False).encode('utf8'))
kafka_time_out = kafka_result.get(timeout=10)
add_url(sid, url_news)
# add_url(sid, url_news)
break
except:
time.sleep(5)
......
......@@ -88,7 +88,7 @@ chrome_options.add_argument('--headless')
executable_path = r'D:\chrome\chromedriver.exe'
driver = webdriver.Chrome(options=chrome_options, executable_path=executable_path)
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
def scroll(driver):
for i in range(0,30):
......
雅虎财经 国外上市企业信息采集
雅虎财经企业动态 部署到香港服务器上
# 雅虎财经企业动态获取
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
# 雅虎财经企业动态获取
import json
import time
......@@ -62,7 +63,7 @@ def getZx(xydm, url, title, cnx, path):
'雅虎财经',
author,
'2',
'zh'
'en'
]
try:
......@@ -180,15 +181,15 @@ def scroll(xydm,name,gpdm):
break
last_url_ = last_url
#采集失败的公众号 重新放入redis
#采集失败的企业 重新放入redis
def rePutIntoR(item):
r.rpush('NewsEnterprise:gwqy_socialCode', item)
if __name__ == "__main__":
path = r'F:\spider\115\chromedriver.exe'
path = r'D:\chrome\chromedriver.exe'
driver = baseCore.buildDriver(path)
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
cursor = cnx.cursor()
while True:
......@@ -197,9 +198,10 @@ if __name__ == "__main__":
# 判断 如果Redis中已经没有数据,则等待
if not social_code :
log.info('============已没有数据============等待===============')
time.sleep(20)
continue
if social_code == 'None':
if social_code == None:
time.sleep(20)
continue
data = baseCore.getInfomation(social_code)
......
......@@ -214,7 +214,7 @@ class BaseCore:
except :
pass
def __init__(self):
self.__cnx_proxy = pymysql.connect(host='114.115.159.144', user='root', password='zzsn9988', db='clb_project',
self.__cnx_proxy = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='clb_project',
charset='utf8mb4')
self.__cursor_proxy= self.__cnx_proxy.cursor()
pass
......
......@@ -5,7 +5,7 @@ pass=clbzzsn
[mysql]
host=114.115.159.144
username=root
username=caiji
password=zzsn9988
database=caiji
url=jdbc:mysql://114.115.159.144:3306/caiji?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai&useSSL=false
......
......@@ -215,7 +215,7 @@ class BaseCore:
except :
pass
def __init__(self):
self.__cnx_proxy = pymysql.connect(host='114.115.159.144', user='root', password='zzsn9988', db='clb_project',
self.__cnx_proxy = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='clb_project',
charset='utf8mb4')
self.__cursor_proxy= self.__cnx_proxy.cursor()
pass
......
......@@ -5,7 +5,7 @@ pass=clbzzsn
[mysql]
host=114.115.159.144
username=root
username=caiji
password=zzsn9988
database=caiji
url=jdbc:mysql://114.115.159.144:3306/caiji?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai&useSSL=false
......
import os
import pandas as pd
import pymysql
import requests
from bs4 import BeautifulSoup
from pymysql.converters import escape_string
from selenium.webdriver.common.by import By
from base.BaseCore import BaseCore
import downPdf
from BaseCore import BaseCore
from datetime import datetime
baseCore = BaseCore()
log =baseCore.getLogger()
headers = {
......@@ -28,21 +29,72 @@ headers = {
cnx = baseCore.cnx
cursor = baseCore.cursor
def downFile(url,path,pdf_name):
try:
baseCore.mkPath(path)
# proxy = {'https': 'http://127.0.0.1:1080', 'http': 'http://127.0.0.1:1080'}
response = requests.get(url, headers=headers, verify=False, timeout=10)
# response = requests.get(url, proxies=proxy, headers=headers, verify=False,timeout=10)
pdf_name = pdf_name +'.pdf'
with open(os.path.join(path, pdf_name), "wb") as pyFile:
for chunk in response.iter_content(chunk_size=1024):
if chunk:
pyFile.write(chunk)
except Exception as e:
log.error(f"出错了----------{e}")
return False
return pdf_name
def job_2():
log.info('----开始采集---俄罗斯国家杂志----')
path = 'D:chrome/chromedriver.exe'
driverContent = baseCore.buildDriver(path, headless=False)
url = 'http://publication.pravo.gov.ru/documents/block/president'
req = requests.get(url,headers)
soup = BeautifulSoup(req.content,'html.parser')
container = soup.find('div',class_='documents-container')
web_list = container.find_all('div',class_='documents-table-row')
for web in web_list[:1]:
web_href = web.find('a')['href']
web_url = 'http://publication.pravo.gov.ru/' + web_href
title = web.find('a').text
print(title)
# path = 'D:chrome/chromedriver.exe'
# driverContent = baseCore.buildDriver(path, headless=False)
for i in range(68,200):
if i == 1:
url = 'http://publication.pravo.gov.ru/documents/block/president'
else:
url = f'http://publication.pravo.gov.ru/documents/block/president?index={i}&pageSize=30'
req = requests.get(url,headers)
soup = BeautifulSoup(req.content,'html.parser')
container = soup.find('div',class_='documents-container')
web_list = container.find_all('div',class_='documents-table-row')
for web in web_list:
title = web.find_all('a')[1].text
if '"О' in title:
pdftitle = title.strip().split('"О')[0]
if '-рп' in title:
pdftitle = title.strip().split('-рп')[0] + '-рп'
pdfUrl = 'http://publication.pravo.gov.ru' + web.find('div',class_='notforprint pt-2').find('a')['href']
# pdfTitle = aa.find('a')['title']
print(pdfUrl)
selectCountSql = f"select * from usvsrussia where url = '{pdfUrl}' "
cursor.execute(selectCountSql)
url = cursor.fetchone()
if url:
log.info("已采集,跳过")
continue
else:
pass
date_string = web.find('div',class_='infoindocumentlist').find_all('div')[1].find('span',class_='info-data').text
#时间格式转化
date_object = datetime.strptime(date_string, "%d.%m.%Y")
pub_time = date_object.strftime("%Y-%m-%d")
print(pub_time)
pdf_name = web.find('div',class_='infoindocumentlist').find_all('div')[0].find('span',class_='info-data').text
#下载pdf
path=r'D:\美国VS俄罗斯制裁'
path = os.path.join(path, downPdf.getPath(pdftitle))
downFile(pdfUrl,path,pdf_name)
insertSql = f"insert into usvsrussia (website,url,title,pub_time,state,pdf_name,pdf_path,create_time) values ('总统令文件','{pdfUrl}','{escape_string(pdftitle)}','{pub_time}',0,'{pdf_name}','{path}',now() )"
# log.info(insertSql)
cursor.execute(insertSql)
cnx.commit()
# break
job_2()
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
......@@ -215,7 +215,7 @@ class BaseCore:
except :
pass
def __init__(self):
self.__cnx_proxy = pymysql.connect(host='114.115.159.144', user='root', password='zzsn9988', db='clb_project',
self.__cnx_proxy = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='clb_project',
charset='utf8mb4')
self.__cursor_proxy= self.__cnx_proxy.cursor()
pass
......
[redis]
[redis]
......@@ -5,7 +5,7 @@ pass=clbzzsn
[mysql]
host=114.115.159.144
username=root
username=caiji
password=zzsn9988
database=caiji
url=jdbc:mysql://114.115.159.144:3306/caiji?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai&useSSL=false
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论