提交 a5dbff9a 作者: 刘伟刚

Merge remote-tracking branch 'origin/master'

...@@ -4,9 +4,12 @@ import random ...@@ -4,9 +4,12 @@ import random
import socket import socket
import sys import sys
import time import time
import fitz
import logbook import logbook
import logbook.more import logbook.more
import pandas as pd import pandas as pd
import requests
import zhconv import zhconv
import pymysql import pymysql
import redis import redis
...@@ -20,16 +23,21 @@ from pymysql import connections ...@@ -20,16 +23,21 @@ from pymysql import connections
from DBUtils.PooledDB import PooledDB from DBUtils.PooledDB import PooledDB
import pymysql import pymysql
from fdfs_client.client import get_tracker_conf, Fdfs_client
tracker_conf = get_tracker_conf('./client.conf')
client = Fdfs_client(tracker_conf)
# 注意 程序退出前 调用BaseCore.close() 关闭相关资源 # 注意 程序退出前 调用BaseCore.close() 关闭相关资源
class BaseCore: class BaseCore:
# 序列号 # 序列号
__seq = 0 __seq = 0
# 代理池 数据库连接 # 代理池 数据库连接
__cnx_proxy =None # __cnx_proxy =None
__cursor_proxy = None # __cursor_proxy = None
cnx = None cnx = None
cursor = None cursor = None
cnx_ = None
cursor_ = None
r = None r = None
# agent 池 # agent 池
__USER_AGENT_LIST = [ __USER_AGENT_LIST = [
...@@ -228,13 +236,18 @@ class BaseCore: ...@@ -228,13 +236,18 @@ class BaseCore:
__USER_PHONE_AGENT_LIST = ['Mozilla/5.0 (Linux; Android 7.1.1; OPPO R9sk) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.111 Mobile Safari/537.36'] __USER_PHONE_AGENT_LIST = ['Mozilla/5.0 (Linux; Android 7.1.1; OPPO R9sk) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.111 Mobile Safari/537.36']
def __init__(self): def __init__(self):
self.__cnx_proxy = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='clb_project', # self.__cnx_proxy = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='clb_project',
charset='utf8mb4') # charset='utf8mb4')
self.__cursor_proxy = self.__cnx_proxy.cursor() # self.__cursor_proxy = self.__cnx_proxy.cursor()
self.cnx = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='caiji', self.cnx = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='caiji',
charset='utf8mb4') charset='utf8mb4')
self.cursor = self.cnx.cursor() self.cursor = self.cnx.cursor()
#11数据库
self.cnx_ = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='clb_project',
charset='utf8mb4')
self.cursor_ = self.cnx_.cursor()
# 连接到Redis # 连接到Redis
self.r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=6) self.r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=6)
...@@ -246,7 +259,7 @@ class BaseCore: ...@@ -246,7 +259,7 @@ class BaseCore:
blocking=True, blocking=True,
host='114.115.159.144', host='114.115.159.144',
port=3306, port=3306,
user='root', user='caiji',
password='zzsn9988', password='zzsn9988',
database='caiji', database='caiji',
charset='utf8mb4' charset='utf8mb4'
...@@ -254,8 +267,6 @@ class BaseCore: ...@@ -254,8 +267,6 @@ class BaseCore:
def close(self): def close(self):
try: try:
self.__cursor_proxy.close()
self.__cnx_proxy.close()
self.cursor.close() self.cursor.close()
self.cnx.close() self.cnx.close()
except : except :
...@@ -345,8 +356,8 @@ class BaseCore: ...@@ -345,8 +356,8 @@ class BaseCore:
# 获取代理 # 获取代理
def get_proxy(self): def get_proxy(self):
sql = "select proxy from clb_proxy" sql = "select proxy from clb_proxy"
self.__cursor_proxy.execute(sql) self.cursor.execute(sql)
proxy_lists = self.__cursor_proxy.fetchall() proxy_lists = self.cursor.fetchall()
ip_list = [] ip_list = []
for proxy_ in proxy_lists: for proxy_ in proxy_lists:
ip_list.append(str(proxy_).replace("('", '').replace("',)", '')) ip_list.append(str(proxy_).replace("('", '').replace("',)", ''))
...@@ -369,7 +380,7 @@ class BaseCore: ...@@ -369,7 +380,7 @@ class BaseCore:
if beginStr=='': if beginStr=='':
pass pass
else: else:
begin=str.find(beginStr) begin=str.rfind(beginStr)
if begin==-1: if begin==-1:
begin=0 begin=0
str=str[begin:] str=str[begin:]
...@@ -425,11 +436,18 @@ class BaseCore: ...@@ -425,11 +436,18 @@ class BaseCore:
IP = socket.gethostbyname(socket.gethostname()) IP = socket.gethostbyname(socket.gethostname())
return IP return IP
def mkPath(self,path):
folder = os.path.exists(path)
if not folder: # 判断是否存在文件夹如果不存在则创建为文件夹
os.makedirs(path) # makedirs 创建文件时如果路径不存在会创建这个路径
else:
pass
# 生成google模拟浏览器 必须传入值为googledriver位置信息 # 生成google模拟浏览器 必须传入值为googledriver位置信息
# headless用于决定是否为无头浏览器,初始默认为无头浏览器 # headless用于决定是否为无头浏览器,初始默认为无头浏览器
# 正常浏览器可用于开始对页面解析使用或一些网站无头时无法正常采集 # 正常浏览器可用于开始对页面解析使用或一些网站无头时无法正常采集
# 无头浏览器用于后续对信息采集时不会有浏览器一直弹出, # 无头浏览器用于后续对信息采集时不会有浏览器一直弹出,
def buildDriver(self, path, headless=True): def buildDriver(self, path, headless=True):
service = Service(path) service = Service(path)
chrome_options = webdriver.ChromeOptions() chrome_options = webdriver.ChromeOptions()
if headless: if headless:
...@@ -442,7 +460,7 @@ class BaseCore: ...@@ -442,7 +460,7 @@ class BaseCore:
chrome_options.add_argument('user-agent=' + self.getRandomUserAgent()) chrome_options.add_argument('user-agent=' + self.getRandomUserAgent())
# 'user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36') # 'user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36')
driver = webdriver.Chrome(chrome_options=chrome_options, service=service) driver = webdriver.Chrome(options=chrome_options, service=service)
# with open(r'F:\zzsn\zzsn_spider\base\stealth.min.js') as f: # with open(r'F:\zzsn\zzsn_spider\base\stealth.min.js') as f:
# js = f.read() # js = f.read()
# #
...@@ -468,6 +486,7 @@ class BaseCore: ...@@ -468,6 +486,7 @@ class BaseCore:
except: except:
log = self.getLogger() log = self.getLogger()
log.info('=========数据库操作失败========') log.info('=========数据库操作失败========')
return data return data
# 更新企业采集次数 # 更新企业采集次数
...@@ -520,6 +539,13 @@ class BaseCore: ...@@ -520,6 +539,13 @@ class BaseCore:
token = self.cursor.fetchone()[0] token = self.cursor.fetchone()[0]
return token return token
#获取天眼查token
def GetTYCToken(self):
query = 'select token from TYC_token'
self.cursor.execute(query)
token = self.cursor.fetchone()[0]
return token
#检测语言 #检测语言
def detect_language(self, text): def detect_language(self, text):
# 使用langid.py判断文本的语言 # 使用langid.py判断文本的语言
...@@ -565,6 +591,91 @@ class BaseCore: ...@@ -565,6 +591,91 @@ class BaseCore:
self.r.set(key, 0) self.r.set(key, 0)
self.r.expire(key, 3600) self.r.expire(key, 3600)
time.sleep(2) time.sleep(2)
#上传至文件服务器,并解析pdf的内容和页数
def upLoadToServe(self,pdf_url,type_id,social_code):
headers = {}
retData = {'state':False,'type_id':type_id,'item_id':social_code,'group_name':'group1','path':'','full_path':'',
'category':'pdf','file_size':'','status':1,'create_by':'XueLingKun',
'create_time':'','page_size':'','content':''}
headers['User-Agent'] = self.getRandomUserAgent()
for i in range(0, 3):
try:
resp_content = requests.get(pdf_url, headers=headers, verify=False, timeout=20).content
break
except:
time.sleep(3)
continue
page_size = 0
for i in range(0, 3):
try:
result = client.upload_by_buffer(resp_content, file_ext_name='pdf')
with fitz.open(stream=resp_content, filetype='pdf') as doc:
page_size = doc.page_count
for page in doc.pages():
retData['content'] += page.get_text()
break
except:
time.sleep(3)
continue
if page_size < 1:
# pdf解析失败
print(f'======pdf解析失败=====')
return retData
else:
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
retData['state'] = True
retData['path'] = bytes.decode(result['Remote file_id']).replace('group1', '')
retData['full_path'] = bytes.decode(result['Remote file_id'])
retData['file_size'] = result['Uploaded size']
retData['create_time'] = time_now
retData['page_size'] = page_size
return retData
def secrchATT(self,item_id,year,type_id):
sel_sql = '''select id from clb_sys_attachment where item_id = %s and year = %s and type_id=%s '''
self.cursor_.execute(sel_sql, (item_id, year, type_id))
selects = self.cursor_.fetchone()
return selects
#插入到att表 返回附件id
def tableUpdate(self,retData,com_name,year,pdf_name,num):
item_id = retData['item_id']
type_id = retData['type_id']
group_name = retData['group_name']
path = retData['path']
full_path = retData['full_path']
category = retData['category']
file_size = retData['file_size']
status = retData['status']
create_by = retData['create_by']
page_size = retData['page_size']
create_time = retData['create_time']
order_by = num
selects = self.secrchATT(item_id,year,type_id)
# sel_sql = '''select id,item_id from clb_sys_attachment where item_id = %s and year = %s and type_id=%s '''
# self.cursor.execute(sel_sql, (item_id, year,type_id))
# selects = self.cursor.fetchone()
if selects:
self.getLogger().info(f'com_name:{com_name}已存在')
id = selects[0]
return id
else:
Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time,page_size) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
values = (
year, pdf_name, type_id, item_id, group_name, path, full_path, category, file_size, order_by,
status, create_by,
create_time, page_size)
self.cursor_.execute(Upsql, values) # 插入
self.cnx_.commit() # 提交
self.getLogger().info("更新完成:{}".format(Upsql))
selects = self.secrchATT(item_id,year,type_id)
id = selects[0]
return id
......
...@@ -301,9 +301,9 @@ def BaseInfoAbroad_task(): ...@@ -301,9 +301,9 @@ def BaseInfoAbroad_task():
def FBS(): def FBS():
cnx,cursor = connectSql() cnx,cursor = connectSql()
# todo:调整为获取福布斯的数据库 # todo:调整为获取福布斯的数据库
gw_query = "select a.SocialCode from EnterpriseInfo a,EnterpriseType b where a.SocialCode=b.SocialCode and b.type=3 and a.Place=2" # gw_query = "select a.SocialCode from EnterpriseInfo a,EnterpriseType b where a.SocialCode=b.SocialCode and b.type=3 and a.Place=2"
cursor.execute(gw_query) # cursor.execute(gw_query)
gw_result = cursor.fetchall() # gw_result = cursor.fetchall()
#获取国内企业 #获取国内企业
gn_query = "select a.SocialCode from EnterpriseInfo a,EnterpriseType b where a.SocialCode=b.SocialCode and b.type=3 and a.Place=1 " gn_query = "select a.SocialCode from EnterpriseInfo a,EnterpriseType b where a.SocialCode=b.SocialCode and b.type=3 and a.Place=1 "
...@@ -311,16 +311,18 @@ def FBS(): ...@@ -311,16 +311,18 @@ def FBS():
gn_result = cursor.fetchall() gn_result = cursor.fetchall()
gn_social_list = [item[0] for item in gn_result] gn_social_list = [item[0] for item in gn_result]
gw_social_list = [item[0] for item in gw_result] # gw_social_list = [item[0] for item in gw_result]
for item in gw_social_list: # for item in gw_social_list:
r.rpush('NewsEnterpriseFbs:gwqy_socialCode', item) # r.rpush('NewsEnterpriseFbs:gwqy_socialCode', item)
r.rpush('BaseInfoEnterpriseFbs:gwqy_social_code',item) # r.rpush('BaseInfoEnterpriseFbs:gwqy_social_code',item)
for item in gn_social_list: for item in gn_social_list:
if not r.exists(item): if not r.exists(item):
r.rpush('NewsEnterpriseFbs:gnqy_socialCode', item) # r.rpush('NewsEnterpriseFbs:gnqy_socialCode', item)
# r.rpush('CorPersonEnterpriseFbs:gnqy_socialCode', item)
r.rpush('NoticeEnterpriseFbs:gnqy_socialCode',item) r.rpush('NoticeEnterpriseFbs:gnqy_socialCode',item)
r.rpush('BaseInfoEnterpriseFbs:gnqy_social_code',item) # r.rpush('BaseInfoEnterpriseFbs:gnqy_social_code',item)
# r.rpush('FinanceFromEast:eastfinance_socialCode',item)
closeSql(cnx,cursor) closeSql(cnx,cursor)
#将IPO的国外股票代码放到redis中 #将IPO的国外股票代码放到redis中
......
# connect timeout in seconds
# default value is 30s
connect_timeout=300
# network timeout in seconds
# default value is 30s
network_timeout=600
# the base path to store log files
#base_path=/home/tarena/django-project/cc_shop1/cc_shop1/logs
# tracker_server can ocur more than once, and tracker_server format is
# "host:port", host can be hostname or ip address
tracker_server=114.115.215.96:22122
#standard log level as syslog, case insensitive, value list:
### emerg for emergency
### alert
### crit for critical
### error
### warn for warning
### notice
### info
### debug
log_level=info
# if use connection pool
# default value is false
# since V4.05
use_connection_pool = false
# connections whose the idle time exceeds this time will be closed
# unit: second
# default value is 3600
# since V4.05
connection_pool_max_idle_time = 3600
# if load FastDFS parameters from tracker server
# since V4.05
# default value is false
load_fdfs_parameters_from_tracker=false
# if use storage ID instead of IP address
# same as tracker.conf
# valid only when load_fdfs_parameters_from_tracker is false
# default value is false
# since V4.05
use_storage_id = false
# specify storage ids filename, can use relative or absolute path
# same as tracker.conf
# valid only when load_fdfs_parameters_from_tracker is false
# since V4.05
storage_ids_filename = storage_ids.conf
#HTTP settings
http.tracker_server_port=80
#use "#include" directive to include HTTP other settiongs
##include http.conf
\ No newline at end of file
# __init__.py
__version__ = '2.2.0'
VERSION = tuple(map(int, __version__.split('.')))
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# filename: connection.py
import socket
import os
import sys
import time
import random
from itertools import chain
from fdfs_client.exceptions import (
FDFSError,
ConnectionError,
ResponseError,
InvaildResponse,
DataError
)
# start class Connection
class Connection(object):
'''Manage TCP comunication to and from Fastdfs Server.'''
def __init__(self, **conn_kwargs):
self.pid = os.getpid()
self.host_tuple = conn_kwargs['host_tuple']
self.remote_port = conn_kwargs['port']
self.remote_addr = None
self.timeout = conn_kwargs['timeout']
self._sock = None
def __del__(self):
try:
self.disconnect()
except:
pass
def connect(self):
'''Connect to fdfs server.'''
if self._sock:
return
try:
sock = self._connect()
except socket.error as e:
raise ConnectionError(self._errormessage(e))
self._sock = sock
# print '[+] Create a connection success.'
# print '\tLocal address is %s:%s.' % self._sock.getsockname()
# print '\tRemote address is %s:%s' % (self.remote_addr, self.remote_port)
def _connect(self):
'''Create TCP socket. The host is random one of host_tuple.'''
self.remote_addr = random.choice(self.host_tuple)
# print '[+] Connecting... remote: %s:%s' % (self.remote_addr, self.remote_port)
# sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# sock.settimeout(self.timeout)
sock = socket.create_connection((self.remote_addr, self.remote_port), self.timeout)
return sock
def disconnect(self):
'''Disconnect from fdfs server.'''
if self._sock is None:
return
try:
self._sock.close()
except socket.error as e:
raise ConnectionError(self._errormessage(e))
self._sock = None
def get_sock(self):
return self._sock
def _errormessage(self, exception):
# args for socket.error can either be (errno, "message")
# or just "message" '''
if len(exception.args) == 1:
return "[-] Error: connect to %s:%s. %s." % (self.remote_addr, self.remote_port, exception.args[0])
else:
return "[-] Error: %s connect to %s:%s. %s." % \
(exception.args[0], self.remote_addr, self.remote_port, exception.args[1])
# end class Connection
# start ConnectionPool
class ConnectionPool(object):
'''Generic Connection Pool'''
def __init__(self, name='', conn_class=Connection,
max_conn=None, **conn_kwargs):
self.pool_name = name
self.pid = os.getpid()
self.conn_class = conn_class
self.max_conn = max_conn or 2 ** 31
self.conn_kwargs = conn_kwargs
self._conns_created = 0
self._conns_available = []
self._conns_inuse = set()
# print '[+] Create a connection pool success, name: %s.' % self.pool_name
def _check_pid(self):
if self.pid != os.getpid():
self.destroy()
self.__init__(self.conn_class, self.max_conn, **self.conn_kwargs)
def make_conn(self):
'''Create a new connection.'''
if self._conns_created >= self.max_conn:
raise ConnectionError('[-] Error: Too many connections.')
num_try = 10
while True:
try:
if num_try <= 0:
sys.exit()
conn_instance = self.conn_class(**self.conn_kwargs)
conn_instance.connect()
self._conns_created += 1
break
except ConnectionError as e:
print(e)
num_try -= 1
conn_instance = None
return conn_instance
def get_connection(self):
'''Get a connection from pool.'''
self._check_pid()
try:
conn = self._conns_available.pop()
# print '[+] Get a connection from pool %s.' % self.pool_name
# print '\tLocal address is %s:%s.' % conn._sock.getsockname()
# print '\tRemote address is %s:%s' % (conn.remote_addr, conn.remote_port)
except IndexError:
conn = self.make_conn()
self._conns_inuse.add(conn)
return conn
def remove(self, conn):
'''Remove connection from pool.'''
if conn in self._conns_inuse:
self._conns_inuse.remove(conn)
self._conns_created -= 1
if conn in self._conns_available:
self._conns_available.remove(conn)
self._conns_created -= 1
def destroy(self):
'''Disconnect all connections in the pool.'''
all_conns = chain(self._conns_inuse, self._conns_available)
for conn in all_conns:
conn.disconnect()
# print '[-] Destroy connection pool %s.' % self.pool_name
def release(self, conn):
'''Release the connection back to the pool.'''
self._check_pid()
if conn.pid == self.pid:
self._conns_inuse.remove(conn)
self._conns_available.append(conn)
# print '[-] Release connection back to pool %s.' % self.pool_name
# end ConnectionPool class
def tcp_recv_response(conn, bytes_size, buffer_size=4096):
'''Receive response from server.
It is not include tracker header.
arguments:
@conn: connection
@bytes_size: int, will be received byte_stream size
@buffer_size: int, receive buffer size
@Return: tuple,(response, received_size)
'''
recv_buff = []
total_size = 0
try:
while bytes_size > 0:
resp = conn._sock.recv(buffer_size)
recv_buff.append(resp)
total_size += len(resp)
bytes_size -= len(resp)
except (socket.error, socket.timeout) as e:
raise ConnectionError('[-] Error: while reading from socket: (%s)' % e.args)
return (b''.join(recv_buff), total_size)
def tcp_send_data(conn, bytes_stream):
'''Send buffer to server.
It is not include tracker header.
arguments:
@conn: connection
@bytes_stream: trasmit buffer
@Return bool
'''
try:
conn._sock.sendall(bytes_stream)
except (socket.error, socket.timeout) as e:
raise ConnectionError('[-] Error: while writting to socket: (%s)' % e.args)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# filename: exceptions.py
'''Core exceptions raised by fdfs client'''
class FDFSError(Exception):
pass
class ConnectionError(FDFSError):
pass
class ResponseError(FDFSError):
pass
class InvaildResponse(FDFSError):
pass
class DataError(FDFSError):
pass
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# filename: fdfs_protol.py
import struct
import socket
from fdfs_client.exceptions import (
FDFSError,
ConnectionError,
ResponseError,
InvaildResponse,
DataError
)
# define FDFS protol constans
TRACKER_PROTO_CMD_STORAGE_JOIN = 81
FDFS_PROTO_CMD_QUIT = 82
TRACKER_PROTO_CMD_STORAGE_BEAT = 83 # storage heart beat
TRACKER_PROTO_CMD_STORAGE_REPORT_DISK_USAGE = 84 # report disk usage
TRACKER_PROTO_CMD_STORAGE_REPLICA_CHG = 85 # repl new storage servers
TRACKER_PROTO_CMD_STORAGE_SYNC_SRC_REQ = 86 # src storage require sync
TRACKER_PROTO_CMD_STORAGE_SYNC_DEST_REQ = 87 # dest storage require sync
TRACKER_PROTO_CMD_STORAGE_SYNC_NOTIFY = 88 # sync done notify
TRACKER_PROTO_CMD_STORAGE_SYNC_REPORT = 89 # report src last synced time as dest server
TRACKER_PROTO_CMD_STORAGE_SYNC_DEST_QUERY = 79 # dest storage query sync src storage server
TRACKER_PROTO_CMD_STORAGE_REPORT_IP_CHANGED = 78 # storage server report it's ip changed
TRACKER_PROTO_CMD_STORAGE_CHANGELOG_REQ = 77 # storage server request storage server's changelog
TRACKER_PROTO_CMD_STORAGE_REPORT_STATUS = 76 # report specified storage server status
TRACKER_PROTO_CMD_STORAGE_PARAMETER_REQ = 75 # storage server request parameters
TRACKER_PROTO_CMD_STORAGE_REPORT_TRUNK_FREE = 74 # storage report trunk free space
TRACKER_PROTO_CMD_STORAGE_REPORT_TRUNK_FID = 73 # storage report current trunk file id
TRACKER_PROTO_CMD_STORAGE_FETCH_TRUNK_FID = 72 # storage get current trunk file id
TRACKER_PROTO_CMD_TRACKER_GET_SYS_FILES_START = 61 # start of tracker get system data files
TRACKER_PROTO_CMD_TRACKER_GET_SYS_FILES_END = 62 # end of tracker get system data files
TRACKER_PROTO_CMD_TRACKER_GET_ONE_SYS_FILE = 63 # tracker get a system data file
TRACKER_PROTO_CMD_TRACKER_GET_STATUS = 64 # tracker get status of other tracker
TRACKER_PROTO_CMD_TRACKER_PING_LEADER = 65 # tracker ping leader
TRACKER_PROTO_CMD_TRACKER_NOTIFY_NEXT_LEADER = 66 # notify next leader to other trackers
TRACKER_PROTO_CMD_TRACKER_COMMIT_NEXT_LEADER = 67 # commit next leader to other trackers
TRACKER_PROTO_CMD_SERVER_LIST_ONE_GROUP = 90
TRACKER_PROTO_CMD_SERVER_LIST_ALL_GROUPS = 91
TRACKER_PROTO_CMD_SERVER_LIST_STORAGE = 92
TRACKER_PROTO_CMD_SERVER_DELETE_STORAGE = 93
TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ONE = 101
TRACKER_PROTO_CMD_SERVICE_QUERY_FETCH_ONE = 102
TRACKER_PROTO_CMD_SERVICE_QUERY_UPDATE = 103
TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITH_GROUP_ONE = 104
TRACKER_PROTO_CMD_SERVICE_QUERY_FETCH_ALL = 105
TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ALL = 106
TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITH_GROUP_ALL = 107
TRACKER_PROTO_CMD_RESP = 100
FDFS_PROTO_CMD_ACTIVE_TEST = 111 # active test, tracker and storage both support since V1.28
STORAGE_PROTO_CMD_REPORT_CLIENT_IP = 9 # ip as tracker client
STORAGE_PROTO_CMD_UPLOAD_FILE = 11
STORAGE_PROTO_CMD_DELETE_FILE = 12
STORAGE_PROTO_CMD_SET_METADATA = 13
STORAGE_PROTO_CMD_DOWNLOAD_FILE = 14
STORAGE_PROTO_CMD_GET_METADATA = 15
STORAGE_PROTO_CMD_SYNC_CREATE_FILE = 16
STORAGE_PROTO_CMD_SYNC_DELETE_FILE = 17
STORAGE_PROTO_CMD_SYNC_UPDATE_FILE = 18
STORAGE_PROTO_CMD_SYNC_CREATE_LINK = 19
STORAGE_PROTO_CMD_CREATE_LINK = 20
STORAGE_PROTO_CMD_UPLOAD_SLAVE_FILE = 21
STORAGE_PROTO_CMD_QUERY_FILE_INFO = 22
STORAGE_PROTO_CMD_UPLOAD_APPENDER_FILE = 23 # create appender file
STORAGE_PROTO_CMD_APPEND_FILE = 24 # append file
STORAGE_PROTO_CMD_SYNC_APPEND_FILE = 25
STORAGE_PROTO_CMD_FETCH_ONE_PATH_BINLOG = 26 # fetch binlog of one store path
STORAGE_PROTO_CMD_RESP = TRACKER_PROTO_CMD_RESP
STORAGE_PROTO_CMD_UPLOAD_MASTER_FILE = STORAGE_PROTO_CMD_UPLOAD_FILE
STORAGE_PROTO_CMD_TRUNK_ALLOC_SPACE = 27 # since V3.00
STORAGE_PROTO_CMD_TRUNK_ALLOC_CONFIRM = 28 # since V3.00
STORAGE_PROTO_CMD_TRUNK_FREE_SPACE = 29 # since V3.00
STORAGE_PROTO_CMD_TRUNK_SYNC_BINLOG = 30 # since V3.00
STORAGE_PROTO_CMD_TRUNK_GET_BINLOG_SIZE = 31 # since V3.07
STORAGE_PROTO_CMD_TRUNK_DELETE_BINLOG_MARKS = 32 # since V3.07
STORAGE_PROTO_CMD_TRUNK_TRUNCATE_BINLOG_FILE = 33 # since V3.07
STORAGE_PROTO_CMD_MODIFY_FILE = 34 # since V3.08
STORAGE_PROTO_CMD_SYNC_MODIFY_FILE = 35 # since V3.08
STORAGE_PROTO_CMD_TRUNCATE_FILE = 36 # since V3.08
STORAGE_PROTO_CMD_SYNC_TRUNCATE_FILE = 37 # since V3.08
# for overwrite all old metadata
STORAGE_SET_METADATA_FLAG_OVERWRITE = 'O'
STORAGE_SET_METADATA_FLAG_OVERWRITE_STR = "O"
# for replace, insert when the meta item not exist, otherwise update it
STORAGE_SET_METADATA_FLAG_MERGE = 'M'
STORAGE_SET_METADATA_FLAG_MERGE_STR = "M"
FDFS_RECORD_SEPERATOR = '\x01'
FDFS_FIELD_SEPERATOR = '\x02'
# common constants
FDFS_GROUP_NAME_MAX_LEN = 16
IP_ADDRESS_SIZE = 16
FDFS_PROTO_PKG_LEN_SIZE = 8
FDFS_PROTO_CMD_SIZE = 1
FDFS_PROTO_STATUS_SIZE = 1
FDFS_PROTO_IP_PORT_SIZE = (IP_ADDRESS_SIZE + 6)
FDFS_MAX_SERVERS_EACH_GROUP = 32
FDFS_MAX_GROUPS = 512
FDFS_MAX_TRACKERS = 16
FDFS_DOMAIN_NAME_MAX_LEN = 128
FDFS_MAX_META_NAME_LEN = 64
FDFS_MAX_META_VALUE_LEN = 256
FDFS_FILE_PREFIX_MAX_LEN = 16
FDFS_LOGIC_FILE_PATH_LEN = 10
FDFS_TRUE_FILE_PATH_LEN = 6
FDFS_FILENAME_BASE64_LENGTH = 27
FDFS_TRUNK_FILE_INFO_LEN = 16
FDFS_FILE_EXT_NAME_MAX_LEN = 6
FDFS_SPACE_SIZE_BASE_INDEX = 2 # storage space size based (MB)
FDFS_UPLOAD_BY_BUFFER = 1
FDFS_UPLOAD_BY_FILENAME = 2
FDFS_UPLOAD_BY_FILE = 3
FDFS_DOWNLOAD_TO_BUFFER = 1
FDFS_DOWNLOAD_TO_FILE = 2
FDFS_NORMAL_LOGIC_FILENAME_LENGTH = (
FDFS_LOGIC_FILE_PATH_LEN + FDFS_FILENAME_BASE64_LENGTH + FDFS_FILE_EXT_NAME_MAX_LEN + 1)
FDFS_TRUNK_FILENAME_LENGTH = (
FDFS_TRUE_FILE_PATH_LEN + FDFS_FILENAME_BASE64_LENGTH + FDFS_TRUNK_FILE_INFO_LEN + 1 + FDFS_FILE_EXT_NAME_MAX_LEN)
FDFS_TRUNK_LOGIC_FILENAME_LENGTH = (FDFS_TRUNK_FILENAME_LENGTH + (FDFS_LOGIC_FILE_PATH_LEN - FDFS_TRUE_FILE_PATH_LEN))
FDFS_VERSION_SIZE = 6
TRACKER_QUERY_STORAGE_FETCH_BODY_LEN = (FDFS_GROUP_NAME_MAX_LEN + IP_ADDRESS_SIZE - 1 + FDFS_PROTO_PKG_LEN_SIZE)
TRACKER_QUERY_STORAGE_STORE_BODY_LEN = (FDFS_GROUP_NAME_MAX_LEN + IP_ADDRESS_SIZE - 1 + FDFS_PROTO_PKG_LEN_SIZE + 1)
# status code, order is important!
FDFS_STORAGE_STATUS_INIT = 0
FDFS_STORAGE_STATUS_WAIT_SYNC = 1
FDFS_STORAGE_STATUS_SYNCING = 2
FDFS_STORAGE_STATUS_IP_CHANGED = 3
FDFS_STORAGE_STATUS_DELETED = 4
FDFS_STORAGE_STATUS_OFFLINE = 5
FDFS_STORAGE_STATUS_ONLINE = 6
FDFS_STORAGE_STATUS_ACTIVE = 7
FDFS_STORAGE_STATUS_RECOVERY = 9
FDFS_STORAGE_STATUS_NONE = 99
class Storage_server(object):
'''Class storage server for upload.'''
def __init__(self):
self.ip_addr = None
self.port = None
self.group_name = ''
self.store_path_index = 0
# Class tracker_header
class Tracker_header(object):
'''
Class for Pack or Unpack tracker header
struct tracker_header{
char pkg_len[FDFS_PROTO_PKG_LEN_SIZE],
char cmd,
char status,
}
'''
def __init__(self):
self.fmt = '!QBB' # pkg_len[FDFS_PROTO_PKG_LEN_SIZE] + cmd + status
self.st = struct.Struct(self.fmt)
self.pkg_len = 0
self.cmd = 0
self.status = 0
def _pack(self, pkg_len=0, cmd=0, status=0):
return self.st.pack(pkg_len, cmd, status)
def _unpack(self, bytes_stream):
self.pkg_len, self.cmd, self.status = self.st.unpack(bytes_stream)
return True
def header_len(self):
return self.st.size
def send_header(self, conn):
'''Send Tracker header to server.'''
header = self._pack(self.pkg_len, self.cmd, self.status)
try:
conn._sock.sendall(header)
except (socket.error, socket.timeout) as e:
raise ConnectionError('[-] Error: while writting to socket: %s' % (e.args,))
def recv_header(self, conn):
'''Receive response from server.
if sucess, class member (pkg_len, cmd, status) is response.
'''
try:
header = conn._sock.recv(self.header_len())
except (socket.error, socket.timeout) as e:
raise ConnectionError('[-] Error: while reading from socket: %s' % (e.args,))
self._unpack(header)
def fdfs_pack_metadata(meta_dict):
ret = ''
for key in meta_dict:
ret += '%s%c%s%c' % (key, FDFS_FIELD_SEPERATOR, meta_dict[key], FDFS_RECORD_SEPERATOR)
return ret[0:-1]
def fdfs_unpack_metadata(bytes_stream):
li = bytes_stream.split(FDFS_RECORD_SEPERATOR)
return dict([item.split(FDFS_FIELD_SEPERATOR) for item in li])
#!/usr/bin/env python
# -*- coding = utf-8 -*-
# filename: utils.py
import io
import os
import sys
import stat
import platform
import configparser
SUFFIX = ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']
__os_sep__ = "/" if platform.system() == 'Windows' else os.sep
def appromix(size, base=0):
'''Conver bytes stream size to human-readable format.
Keyword arguments:
size: int, bytes stream size
base: int, suffix index
Return: string
'''
multiples = 1024
if size < 0:
raise ValueError('[-] Error: number must be non-negative.')
if size < multiples:
return '{0:d}{1}'.format(size, SUFFIX[base])
for suffix in SUFFIX[base:]:
if size < multiples:
return '{0:.2f}{1}'.format(size, suffix)
size = size / float(multiples)
raise ValueError('[-] Error: number too big.')
def get_file_ext_name(filename, double_ext=True):
li = filename.split(os.extsep)
if len(li) <= 1:
return ''
else:
if li[-1].find(__os_sep__) != -1:
return ''
if double_ext:
if len(li) > 2:
if li[-2].find(__os_sep__) == -1:
return '%s.%s' % (li[-2], li[-1])
return li[-1]
class Fdfs_ConfigParser(configparser.RawConfigParser):
"""
Extends ConfigParser to allow files without sections.
This is done by wrapping read files and prepending them with a placeholder
section, which defaults to '__config__'
"""
def __init__(self, default_section=None, *args, **kwargs):
configparser.RawConfigParser.__init__(self, *args, **kwargs)
self._default_section = None
self.set_default_section(default_section or '__config__')
def get_default_section(self):
return self._default_section
def set_default_section(self, section):
self.add_section(section)
# move all values from the previous default section to the new one
try:
default_section_items = self.items(self._default_section)
self.remove_section(self._default_section)
except configparser.NoSectionError:
pass
else:
for (key, value) in default_section_items:
self.set(section, key, value)
self._default_section = section
def read(self, filenames):
if isinstance(filenames, str):
filenames = [filenames]
read_ok = []
for filename in filenames:
try:
with open(filename) as fp:
self.readfp(fp)
except IOError:
continue
else:
read_ok.append(filename)
return read_ok
def readfp(self, fp, *args, **kwargs):
stream = io.StringIO()
try:
stream.name = fp.name
except AttributeError:
pass
stream.write('[' + self._default_section + ']\n')
stream.write(fp.read())
stream.seek(0, 0)
return self._read(stream, stream.name)
def write(self, fp):
# Write the items from the default section manually and then remove them
# from the data. They'll be re-added later.
try:
default_section_items = self.items(self._default_section)
self.remove_section(self._default_section)
for (key, value) in default_section_items:
fp.write("{0} = {1}\n".format(key, value))
fp.write("\n")
except configparser.NoSectionError:
pass
configparser.RawConfigParser.write(self, fp)
self.add_section(self._default_section)
for (key, value) in default_section_items:
self.set(self._default_section, key, value)
def _read(self, fp, fpname):
"""Parse a sectioned setup file.
The sections in setup file contains a title line at the top,
indicated by a name in square brackets (`[]'), plus key/value
options lines, indicated by `name: value' format lines.
Continuations are represented by an embedded newline then
leading whitespace. Blank lines, lines beginning with a '#',
and just about everything else are ignored.
"""
cursect = None # None, or a dictionary
optname = None
lineno = 0
e = None # None, or an exception
while True:
line = fp.readline()
if not line:
break
lineno = lineno + 1
# comment or blank line?
if line.strip() == '' or line[0] in '#;':
continue
if line.split(None, 1)[0].lower() == 'rem' and line[0] in "rR":
# no leading whitespace
continue
# continuation line?
if line[0].isspace() and cursect is not None and optname:
value = line.strip()
if value:
cursect[optname] = "%s\n%s" % (cursect[optname], value)
# a section header or option header?
else:
# is it a section header?
mo = self.SECTCRE.match(line)
if mo:
sectname = mo.group('header')
if sectname in self._sections:
cursect = self._sections[sectname]
elif sectname == DEFAULTSECT:
cursect = self._defaults
else:
cursect = self._dict()
cursect['__name__'] = sectname
self._sections[sectname] = cursect
# So sections can't start with a continuation line
optname = None
# no section header in the file?
elif cursect is None:
raise MissingSectionHeaderError(fpname, lineno, line)
# an option line?
else:
mo = self.OPTCRE.match(line)
if mo:
optname, vi, optval = mo.group('option', 'vi', 'value')
if vi in ('=', ':') and ';' in optval:
# ';' is a comment delimiter only if it follows
# a spacing character
pos = optval.find(';')
if pos != -1 and optval[pos - 1].isspace():
optval = optval[:pos]
optval = optval.strip()
# allow empty values
if optval == '""':
optval = ''
optname = self.optionxform(optname.rstrip())
if optname in cursect:
if not isinstance(cursect[optname], list):
cursect[optname] = [cursect[optname]]
cursect[optname].append(optval)
else:
cursect[optname] = optval
else:
# a non-fatal parsing error occurred. set up the
# exception but keep going. the exception will be
# raised at the end of the file and will contain a
# list of all bogus lines
if not e:
e = ParsingError(fpname)
e.append(lineno, repr(line))
# if any parsing errors occurred, raise an exception
if e:
raise e
def split_remote_fileid(remote_file_id):
'''
Splite remote_file_id to (group_name, remote_file_name)
arguments:
@remote_file_id: string
@return tuple, (group_name, remote_file_name)
'''
index = remote_file_id.find(b'/')
if -1 == index:
return None
return (remote_file_id[0:index], remote_file_id[(index + 1):])
def fdfs_check_file(filename):
ret = True
errmsg = ''
if not os.path.isfile(filename):
ret = False
errmsg = '[-] Error: %s is not a file.' % filename
elif not stat.S_ISREG(os.stat(filename).st_mode):
ret = False
errmsg = '[-] Error: %s is not a regular file.' % filename
return (ret, errmsg)
if __name__ == '__main__':
print(get_file_ext_name('/bc.tar.gz'))
...@@ -37,6 +37,11 @@ def find_id_by_name(start,token,name): ...@@ -37,6 +37,11 @@ def find_id_by_name(start,token,name):
time.sleep(5) time.sleep(5)
continue continue
time.sleep(2) time.sleep(2)
#{'status': 40101, 'message': '无效的sessionToken!'}
if resp_dict['status']==40101:
KeyNo = False
log.info(f'====token失效====时间{baseCore.getTimeCost(start, time.time())}')
return KeyNo
try: try:
if resp_dict['result']['Result']: if resp_dict['result']['Result']:
result_dict = resp_dict['result']['Result'][0] result_dict = resp_dict['result']['Result'][0]
......
""" """
...@@ -27,7 +27,7 @@ def check_code(com_code): ...@@ -27,7 +27,7 @@ def check_code(com_code):
def check_date(com_code,info_date): def check_date(com_code,info_date):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=3) r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn', db=3)
res = r.sismember('com_caiwushuju_date::'+com_code, info_date) # 注意是 保存set的方式 res = r.sismember('com_caiwushuju_code::'+com_code, info_date) # 注意是 保存set的方式
if res: if res:
return True return True
else: else:
...@@ -437,15 +437,16 @@ def getReportTime(): ...@@ -437,15 +437,16 @@ def getReportTime():
# timeNow = baseCore.getNowTime(1)[:10] # timeNow = baseCore.getNowTime(1)[:10]
list_date = [] list_date = []
# 2023-04-01 # 2023-04-01
#todo:正式任务
# 获取当前日期和时间 # 获取当前日期和时间
current_date = datetime.now() # current_date = datetime.now()
# 计算昨天的日期 # 计算昨天的日期
yesterday = current_date - timedelta(days=1) # yesterday = current_date - timedelta(days=1)
# 格式化昨天的日期 # 格式化昨天的日期
report_date = yesterday.strftime('%Y-%m-%d') # report_date = yesterday.strftime('%Y-%m-%d')
list_date.append(report_date) # list_date.append(report_date)
year = int(current_date.strftime('%Y')) # year = int(current_date.strftime('%Y'))
# list_date = ['2023-03-31'] list_date = ['2023-03-31']
list_month = ['-12-31', '-09-30', '-06-30', '-03-31'] list_month = ['-12-31', '-09-30', '-06-30', '-03-31']
for year in range(2022, 2018, -1): for year in range(2022, 2018, -1):
...@@ -459,7 +460,8 @@ def job(taskType): ...@@ -459,7 +460,8 @@ def job(taskType):
# 需要提供股票代码、企业信用代码 # 需要提供股票代码、企业信用代码
while True: while True:
#从redis中获取企业信用代码 #从redis中获取企业信用代码
social_code = baseCore.redicPullData('FinanceFromEast:finance_socialCode') social_code = baseCore.redicPullData('FinanceFromEast:eastfinance_socialCode')
# social_code = '91100000100003962T'
# 判断 如果Redis中已经没有数据,则等待 # 判断 如果Redis中已经没有数据,则等待
if social_code == None: if social_code == None:
time.sleep(20) time.sleep(20)
...@@ -468,8 +470,12 @@ def job(taskType): ...@@ -468,8 +470,12 @@ def job(taskType):
sql_sel = f'''select securities_code,exchange from sys_base_enterprise_ipo where category = '1' and social_credit_code='{social_code}' ''' sql_sel = f'''select securities_code,exchange from sys_base_enterprise_ipo where category = '1' and social_credit_code='{social_code}' '''
cursor.execute(sql_sel) cursor.execute(sql_sel)
row = cursor.fetchone() row = cursor.fetchone()
try:
securities_code = row[0] securities_code = row[0]
pass
except:
log.info(f'======{social_code}没有股票代码======')
continue
exchange = row[1] exchange = row[1]
# for code in list_code: # for code in list_code:
# social_code = rows[0] # social_code = rows[0]
...@@ -510,18 +516,17 @@ def job(taskType): ...@@ -510,18 +516,17 @@ def job(taskType):
time.sleep(1) time.sleep(1)
print(res_baocun.text) print(res_baocun.text)
for nnn in range(0, 3):
try:
add_date(com_code, date_list)
break
except:
time.sleep(1)
if len(info_date_list) != 0: if len(info_date_list) != 0:
for date in info_date_list: for date in info_date_list:
date_list.append(date) date_list.append(date)
print(date_list) print(date_list)
date_list = str(date_list) # date_list = str(date_list)
for nnn in range(0, 3):
try:
add_date(com_code,date_list)
break
except:
time.sleep(1)
end_time = time.time() end_time = time.time()
log.info(f'===={com_code}====该企业耗时{end_time-start_time}===') log.info(f'===={com_code}====该企业耗时{end_time-start_time}===')
cnx.close() cnx.close()
...@@ -529,7 +534,7 @@ def job(taskType): ...@@ -529,7 +534,7 @@ def job(taskType):
baseCore.close() baseCore.close()
if __name__=='__main__': if __name__=='__main__':
task_type = '财务数据/东方财富网' task_type = '财务数据/东方财富网/福布斯'
job(task_type) job(task_type)
......
""" """
...@@ -15,20 +15,18 @@ from bs4 import BeautifulSoup ...@@ -15,20 +15,18 @@ from bs4 import BeautifulSoup
from kafka import KafkaProducer from kafka import KafkaProducer
from datetime import datetime from datetime import datetime
from base import BaseCore from base import BaseCore
from fdfs_client.client import get_tracker_conf, Fdfs_client # from fdfs_client.client import get_tracker_conf, Fdfs_client
baseCore = BaseCore.BaseCore() baseCore = BaseCore.BaseCore()
log = baseCore.getLogger() log = baseCore.getLogger()
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
cnx_ = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
# cnx_ip = pymysql.connect(host='114.115.159.144',user='root', password='zzsn9988', db='clb_project', charset='utf8mb4')
cursor = cnx.cursor()
cursor_ = cnx_.cursor()
tracker_conf = get_tracker_conf('./client.conf') cnx = baseCore.cnx
client = Fdfs_client(tracker_conf) cursor = baseCore.cursor
taskType = '企业公告/证监会' # tracker_conf = get_tracker_conf('./client.conf')
# client = Fdfs_client(tracker_conf)
taskType = '企业公告/证监会/福布斯'
def RequestUrl(url, payload, social_code,start_time): def RequestUrl(url, payload, social_code,start_time):
# ip = get_proxy()[random.randint(0, 3)] # ip = get_proxy()[random.randint(0, 3)]
...@@ -138,30 +136,25 @@ def InsterInto(short_name, social_code, name_pdf, pub_time, pdf_url, report_type ...@@ -138,30 +136,25 @@ def InsterInto(short_name, social_code, name_pdf, pub_time, pdf_url, report_type
inster = False inster = False
sel_sql = '''select social_credit_code,source_address from brpa_source_article where social_credit_code = %s and source_address = %s''' sel_sql = '''select social_credit_code,source_address from brpa_source_article where social_credit_code = %s and source_address = %s'''
cursor_.execute(sel_sql, (social_code, pdf_url)) cursor.execute(sel_sql, (social_code, pdf_url))
selects = cursor_.fetchone() selects = cursor.fetchone()
if selects: if selects:
print(f'com_name:{short_name}、{pdf_url}已存在') print(f'com_name:{short_name}、{pdf_url}已存在')
return inster return inster
# 信息插入数据库 # 信息插入数据库
try: try:
insert_sql = '''insert into brpa_source_article(social_credit_code,title,summary,content,publish_date,source_address,origin,author,type,lang) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''' insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,create_time) values(%s,%s,%s,%s,now())'''
list_info = [ list_info = [
social_code, social_code,
name_pdf,
'', # 摘要
'', # 正文
pub_time, # 发布时间
pdf_url, pdf_url,
'证监会', '证监会',
report_type,
'1', '1',
'zh'
] ]
cursor_.execute(insert_sql, tuple(list_info)) #144数据库
cnx_.commit() cursor.execute(insert_sql, tuple(list_info))
cnx.commit()
insert = True insert = True
return insert return insert
except: except:
...@@ -171,34 +164,42 @@ def InsterInto(short_name, social_code, name_pdf, pub_time, pdf_url, report_type ...@@ -171,34 +164,42 @@ def InsterInto(short_name, social_code, name_pdf, pub_time, pdf_url, report_type
return insert return insert
def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time): def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time,com_name,num):
sel_sql = "select article_id from brpa_source_article where source_address = %s" #上传至文件服务器
cursor_.execute(sel_sql, pdf_url) retData = baseCore.upLoadToServe(pdf_url,8,social_code)
row = cursor_.fetchone() #附件插入att数据库
id = row[0] num = num + 1
# 先获取PDF链接下载pdf,在解析内容 att_id = baseCore.tableUpdate(retData,com_name,year,pdf_name,num)
try: content = retData['content']
res = requests.get(pdf_url) if retData['state']:
content = '' pass
# 读取文件内容, else:
with fitz.open(stream=res.content, filetype='pdf') as doc: log.info(f'====pdf解析失败====')
for page in doc.pages():
content += page.get_text()
except:
# print('解析失败')
dic_result = {
'success': 'false',
'message': 'PDF解析失败',
'code': '204',
}
print(dic_result)
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, dic_result['message'])
return False return False
# 先获取PDF链接下载pdf,在解析内容
# try:
# res = requests.get(pdf_url)
# content = ''
# # 读取文件内容,解析内容
# with fitz.open(stream=res.content, filetype='pdf') as doc:
# for page in doc.pages():
# content += page.get_text()
# except:
# # print('解析失败')
# dic_result = {
# 'success': 'false',
# 'message': 'PDF解析失败',
# 'code': '204',
# }
# log.info(dic_result)
# state = 0
# takeTime = baseCore.getTimeCost(start_time, time.time())
# baseCore.recordLog(social_code, taskType, state, takeTime, pdf_url, dic_result['message'])
# return False
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_news = { dic_news = {
'attachmentIds': id, 'attachmentIds': att_id,
'author': '', 'author': '',
'content': content, 'content': content,
'contentWithTag': '', 'contentWithTag': '',
...@@ -247,11 +248,12 @@ def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time): ...@@ -247,11 +248,12 @@ def GetContent(pdf_url, pdf_name, social_code, year, pub_time, start_time):
# 采集信息 # 采集信息
def SpiderByZJH(url, payload, dic_info, start_time): # dic_info 数据库中获取到的基本信息 def SpiderByZJH(url, payload, dic_info, start_time,num): # dic_info 数据库中获取到的基本信息
okCount = 0 okCount = 0
errorCount = 0 errorCount = 0
social_code = dic_info[2] social_code = dic_info[2]
short_name = dic_info[4] short_name = dic_info[4]
com_name = dic_info[1]
soup = RequestUrl(url, payload, social_code, start_time) soup = RequestUrl(url, payload, social_code, start_time)
if soup == '': if soup == '':
...@@ -298,9 +300,9 @@ def SpiderByZJH(url, payload, dic_info, start_time): # dic_info 数据库中获 ...@@ -298,9 +300,9 @@ def SpiderByZJH(url, payload, dic_info, start_time): # dic_info 数据库中获
pdf_url_info = td_list[2] pdf_url_info = td_list[2]
# print(pdf_url) # print(pdf_url)
pdf_url = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[0].strip('\'') pdf_url = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[0].strip('\'')
name_pdf = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[1].strip('\'') name_pdf = pdf_url_info['onclick'].strip('downloadPdf1(').split('\',')[1].strip('\'')
pub_time = pdf_url_info['onclick'].strip('downloadPdf1(').split(',')[2].strip('\'') pub_time = pdf_url_info['onclick'].strip('downloadPdf1(').split('\',')[2].strip('\'')
year = pub_time[:4] year = pub_time[:4]
report_type = td_list[4].text.strip() report_type = td_list[4].text.strip()
...@@ -311,7 +313,7 @@ def SpiderByZJH(url, payload, dic_info, start_time): # dic_info 数据库中获 ...@@ -311,7 +313,7 @@ def SpiderByZJH(url, payload, dic_info, start_time): # dic_info 数据库中获
# # 公告信息列表 # # 公告信息列表
# okCount = okCount + 1 # okCount = okCount + 1
# 解析PDF内容,先获取PDF链接 下载 解析成功,解析失败 ,传输成功,传输失败 # 解析PDF内容,先获取PDF链接 下载 解析成功,解析失败 ,传输成功,传输失败
result = GetContent(pdf_url, name_pdf, social_code, year, pub_time, start_time) result = GetContent(pdf_url, name_pdf, social_code, year, pub_time, start_time,com_name,num)
if result: if result:
# 公告信息列表 # 公告信息列表
...@@ -335,6 +337,7 @@ def SpiderByZJH(url, payload, dic_info, start_time): # dic_info 数据库中获 ...@@ -335,6 +337,7 @@ def SpiderByZJH(url, payload, dic_info, start_time): # dic_info 数据库中获
if __name__ == '__main__': if __name__ == '__main__':
num = 0
headers = { headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding': 'gzip, deflate', 'Accept-Encoding': 'gzip, deflate',
...@@ -370,7 +373,8 @@ if __name__ == '__main__': ...@@ -370,7 +373,8 @@ if __name__ == '__main__':
while True: while True:
start_time = time.time() start_time = time.time()
# 获取企业信息 # 获取企业信息
social_code = baseCore.redicPullData('NoticeEnterprise:gnqy_socialCode') # social_code = baseCore.redicPullData('NoticeEnterpriseFbs:gnqy_socialCode')
social_code = '9110000071092841XX'
# 判断 如果Redis中已经没有数据,则等待 # 判断 如果Redis中已经没有数据,则等待
if social_code == None: if social_code == None:
time.sleep(20) time.sleep(20)
...@@ -391,15 +395,16 @@ if __name__ == '__main__': ...@@ -391,15 +395,16 @@ if __name__ == '__main__':
# 股票代码0、2、3开头的为深圳交易所,6、9开头的为上海交易所,4、8开头的为北京交易所 # 股票代码0、2、3开头的为深圳交易所,6、9开头的为上海交易所,4、8开头的为北京交易所
code = dic_info[3] code = dic_info[3]
short_name = dic_info[4] short_name = dic_info[4]
com_name = dic_info[1]
dic_parms = getUrl(code, url_parms, Catagory2_parms) dic_parms = getUrl(code, url_parms, Catagory2_parms)
dic_parms_ls = getUrl(code, url_parms_ls, Catagory2_parms_ls) dic_parms_ls = getUrl(code, url_parms_ls, Catagory2_parms_ls)
if len(dic_parms) > 0: if len(dic_parms) > 0:
start_time_cj = time.time() start_time_cj = time.time()
SpiderByZJH(dic_parms["url"], dic_parms["payload"], dic_info, start_time) SpiderByZJH(dic_parms["url"], dic_parms["payload"], dic_info, start_time,num)
log.info(f'{code}==========={short_name},发行公告,耗时{baseCore.getTimeCost(start_time_cj, time.time())}') log.info(f'{code}==========={short_name},{com_name},发行公告,耗时{baseCore.getTimeCost(start_time_cj, time.time())}')
start_time_ls = time.time() start_time_ls = time.time()
SpiderByZJH(dic_parms_ls['url'], dic_parms_ls['payload'], dic_info, start_time) SpiderByZJH(dic_parms_ls['url'], dic_parms_ls['payload'], dic_info, start_time,num)
log.info(f'{code}==========={short_name},临时报告,耗时{baseCore.getTimeCost(start_time_ls, time.time())}') log.info(f'{code}==========={short_name},{com_name},临时报告,耗时{baseCore.getTimeCost(start_time_ls, time.time())}')
# UpdateInfoSql(retData,retData_ls,social_code) # UpdateInfoSql(retData,retData_ls,social_code)
# log.info(f'{code}================更新成功') # log.info(f'{code}================更新成功')
end_time = time.time() end_time = time.time()
...@@ -410,7 +415,7 @@ if __name__ == '__main__': ...@@ -410,7 +415,7 @@ if __name__ == '__main__':
cursor.close() cursor.close()
cnx.close() cnx.close()
cursor_.close() # cursor_.close()
cnx_.close() # cnx_.close()
# 释放资源 # 释放资源
baseCore.close() baseCore.close()
...@@ -19,7 +19,7 @@ jieba.cut("必须加载jieba") ...@@ -19,7 +19,7 @@ jieba.cut("必须加载jieba")
smart =smart_extractor.SmartExtractor('cn') smart =smart_extractor.SmartExtractor('cn')
baseCore = BaseCore() baseCore = BaseCore()
log = baseCore.getLogger() log = baseCore.getLogger()
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4') cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
cursor= cnx.cursor() cursor= cnx.cursor()
cnx_ = baseCore.cnx cnx_ = baseCore.cnx
...@@ -37,7 +37,7 @@ headers = { ...@@ -37,7 +37,7 @@ headers = {
'Referer': 'https://www.tianyancha.com/', 'Referer': 'https://www.tianyancha.com/',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Edg/114.0.1823.51' 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Edg/114.0.1823.51'
} }
taskType = '企业动态/天眼查/福布斯' taskType = '企业动态/天眼查'
def beinWork(tyc_code, social_code): def beinWork(tyc_code, social_code):
start_time = time.time() start_time = time.time()
time.sleep(3) time.sleep(3)
...@@ -154,11 +154,14 @@ def beinWork(tyc_code, social_code): ...@@ -154,11 +154,14 @@ def beinWork(tyc_code, social_code):
# 开始进行智能解析 # 开始进行智能解析
# lang = baseCore.detect_language(title) # lang = baseCore.detect_language(title)
# smart = smart_extractor.SmartExtractor(lang) # smart = smart_extractor.SmartExtractor(lang)
contentText = smart.extract_by_url(link).text #带标签正文
contentWithTag = smart.extract_by_url(link).text
#不带标签正文
content = smart.extract_by_url(link).cleaned_text
# time.sleep(3) # time.sleep(3)
except Exception as e: except Exception as e:
contentText = '' contentWithTag = ''
if contentText == '': if contentWithTag == '':
log.error(f'获取正文失败:--------{tyc_code}--------{num}--------{link}') log.error(f'获取正文失败:--------{tyc_code}--------{num}--------{link}')
e = '获取正文失败' e = '获取正文失败'
state = 0 state = 0
...@@ -174,7 +177,7 @@ def beinWork(tyc_code, social_code): ...@@ -174,7 +177,7 @@ def beinWork(tyc_code, social_code):
continue continue
try: try:
#todo:更换插入的库 #todo:更换插入的库
insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,author,type) values(%s,%s,%s,%s,%s)''' insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,create_time) values(%s,%s,%s,%s,now())'''
# 动态信息列表 # 动态信息列表
up_okCount = up_okCount + 1 up_okCount = up_okCount + 1
...@@ -182,14 +185,73 @@ def beinWork(tyc_code, social_code): ...@@ -182,14 +185,73 @@ def beinWork(tyc_code, social_code):
social_code, social_code,
link, link,
'天眼查', '天眼查',
source,
'2', '2',
] ]
cursor_.execute(insert_sql, tuple(list_info)) cursor_.execute(insert_sql, tuple(list_info))
cnx_.commit() cnx_.commit()
# 采集一条资讯记录一条,记录该企业采到了多少的资讯 # 采集一条资讯记录一条,记录该企业采到了多少的资讯
log.info(f'{social_code}----{link}:新增一条') log.info(f'{social_code}----{link}:新增一条')
# 采集一条资讯记录一条,记录该企业采到了多少的资讯
log.info(f'{social_code}----{link}:新增一条')
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# todo:插入一条数据,并传入kafka
dic_news = {
'attachmentIds': '',
'author': '',
'content': content,
'contentWithTag': contentWithTag,
'createDate': time_now,
'deleteFlag': '0',
'id': '',
'keyWords': '',
'lang': 'zh',
'origin': '天眼查',
'publishDate': time_format,
'sid': '1684032033495392257',
'sourceAddress': link, # 原文链接
'summary': info_page['abstracts'],
'title': title,
'type': 2,
'socialCreditCode': social_code,
'year': time_format[:4]
}
except Exception as e:
log.info(f'传输失败:{social_code}----{link}')
# e = '数据库传输失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, link, e)
continue
try:
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("researchReportTopic",
json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10))
dic_result = {
'success': 'ture',
'message': '操作成功',
'code': '200',
}
log.info(dic_result)
# 传输成功,写入日志中
state = 1
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, link, '')
# return True
except Exception as e: except Exception as e:
dic_result = {
'success': 'false',
'message': '操作失败',
'code': '204',
'e': e
}
log.error(dic_result)
e = 'Kafka操作失败'
state = 0 state = 0
takeTime = baseCore.getTimeCost(start_time, time.time()) takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, link, e) baseCore.recordLog(social_code, taskType, state, takeTime, link, e)
...@@ -205,8 +267,9 @@ def doJob(): ...@@ -205,8 +267,9 @@ def doJob():
while True: while True:
start = time.time() start = time.time()
# 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息 # 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息 天眼查ID19276488
social_code = baseCore.redicPullData('NewsEnterpriseFbs:gnqy_socialCode') # social_code = baseCore.redicPullData('NewsEnterpriseFbs:gnqy_socialCode')
social_code = '912301001275921118'
if social_code == None: if social_code == None:
time.sleep(20) time.sleep(20)
continue continue
...@@ -222,19 +285,25 @@ def doJob(): ...@@ -222,19 +285,25 @@ def doJob():
id = data[0] id = data[0]
xydm = data[2] xydm = data[2]
tycid = data[11] tycid = data[11]
if tycid == None: if tycid == None or tycid== '':
try: try:
retData = getTycIdByXYDM(xydm) retData = getTycIdByXYDM(xydm)
tycid = retData['tycData']['id'] if retData:
# todo:写入数据库 tycid = retData['id']
updateSql = f"update Enterprise set TYCID = '{tycid}' where SocialCode = '{xydm}'" # todo:写入数据库
cursor_.execute(updateSql) updateSql = f"update EnterpriseInfo set TYCID = '{tycid}' where SocialCode = '{xydm}'"
cnx_.commit() cursor_.execute(updateSql)
cnx_.commit()
else:
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', '获取天眼查id失败')
baseCore.rePutIntoR('NewsEnterpriseFbs:gnqy_socialCode', social_code)
except: except:
state = 0 state = 0
takeTime = baseCore.getTimeCost(start, time.time()) takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', '获取天眼查id失败') baseCore.recordLog(social_code, taskType, state, takeTime, '', '获取天眼查id失败')
baseCore.rePutIntoR('NewsEnterprise:gnqy_socialCode', social_code) baseCore.rePutIntoR('NewsEnterpriseFbs:gnqy_socialCode', social_code)
continue continue
count = data[17] count = data[17]
log.info(f"{id}---{xydm}----{tycid}----开始处理") log.info(f"{id}---{xydm}----{tycid}----开始处理")
...@@ -242,8 +311,10 @@ def doJob(): ...@@ -242,8 +311,10 @@ def doJob():
# 开始采集企业动态 # 开始采集企业动态
retData = beinWork(tycid, xydm) retData = beinWork(tycid, xydm)
# 信息采集完成后将该企业的采集次数更新
# baseCore.updateRun(xydm, runType, count) runType = 'NewsRunCount'
count += 1
baseCore.updateRun(xydm, runType, count)
total = retData['total'] total = retData['total']
up_okCount = retData['up_okCount'] up_okCount = retData['up_okCount']
up_errorCount = retData['up_errorCount'] up_errorCount = retData['up_errorCount']
...@@ -257,7 +328,7 @@ def doJob(): ...@@ -257,7 +328,7 @@ def doJob():
takeTime = baseCore.getTimeCost(start, time.time()) takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', f'获取企业信息失败--{e}') baseCore.recordLog(social_code, taskType, state, takeTime, '', f'获取企业信息失败--{e}')
time.sleep(5) time.sleep(5)
# break
cursor.close() cursor.close()
cnx.close() cnx.close()
# 释放资源 # 释放资源
......
...@@ -10,9 +10,15 @@ from base.BaseCore import BaseCore ...@@ -10,9 +10,15 @@ from base.BaseCore import BaseCore
requests.adapters.DEFAULT_RETRIES = 5 requests.adapters.DEFAULT_RETRIES = 5
baseCore = BaseCore() baseCore = BaseCore()
log = baseCore.getLogger() log = baseCore.getLogger()
headers={ # headers={
'X-AUTH-TOKEN':'eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiIxMzY4MzgxNjk4NCIsImlhdCI6MTY5MDE3ODYyOCwiZXhwIjoxNjkyNzcwNjI4fQ.VV3Zoa4RM5nVN8UXBc0-81KMGqLzTOme6rButeETGfFQi7p5h4ydg8CFrEsizr_iFwB3_BVaKR2o2xR-M4ipbQ', # 'X-AUTH-TOKEN':'eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiIxMzY4MzgxNjk4NCIsImlhdCI6MTY5MDE3ODYyOCwiZXhwIjoxNjkyNzcwNjI4fQ.VV3Zoa4RM5nVN8UXBc0-81KMGqLzTOme6rButeETGfFQi7p5h4ydg8CFrEsizr_iFwB3_BVaKR2o2xR-M4ipbQ',
'X-TYCID':'77e997401d5f11ee9e91d5a0fd3c0b83', # 'X-TYCID':'77e997401d5f11ee9e91d5a0fd3c0b83',
# 'version':'TYC-Web',
# 'Content-Type':'application/json;charset=UTF-8'
# }
headers = {
'X-TYCID':'30c1289042f511ee9182cd1e1bcaa517',
# 'X-AUTH-TOKEN': 'eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiIxMzU5MjQ4MTgzOSIsImlhdCI6MTY5MjkzMzIxMiwiZXhwIjoxNjk1NTI1MjEyfQ.BKxDem8fpgeDHrIgm3qCoF76ueHtQSG1DggiTl4FAaoNKt4gem6NTX1XYndPXqVj9TXfl-8yp2kKE3jY66dyig',
'version':'TYC-Web', 'version':'TYC-Web',
'Content-Type':'application/json;charset=UTF-8' 'Content-Type':'application/json;charset=UTF-8'
} }
...@@ -27,6 +33,7 @@ def getTycIdByXYDM(xydm): ...@@ -27,6 +33,7 @@ def getTycIdByXYDM(xydm):
paramJsonData = {'keyword':xydm} paramJsonData = {'keyword':xydm}
try: try:
headers['User-Agent'] = baseCore.getRandomUserAgent() headers['User-Agent'] = baseCore.getRandomUserAgent()
headers['X-AUTH-TOKEN'] = baseCore.GetTYCToken()
response = requests.post(url,json=paramJsonData,headers=headers,verify=False, proxies=ip) response = requests.post(url,json=paramJsonData,headers=headers,verify=False, proxies=ip)
time.sleep(random.randint(3, 5)) time.sleep(random.randint(3, 5))
retJsonData =json.loads(response.content.decode('utf-8')) retJsonData =json.loads(response.content.decode('utf-8'))
...@@ -35,14 +42,14 @@ def getTycIdByXYDM(xydm): ...@@ -35,14 +42,14 @@ def getTycIdByXYDM(xydm):
retData['state'] = True retData['state'] = True
retData['tycData'] = retJsonData['data'][0] retData['tycData'] = retJsonData['data'][0]
response.close() response.close()
return retData return retData['tycData']
else: else:
log.error(f"{xydm}------{retJsonData}") log.error(f"{xydm}------{retJsonData}")
response.close() response.close()
return retData return retData['tycData']
except Exception as e: except:
log.error(f"{xydm}---exception---{e}") log.error(f"---{xydm}--天眼查token失效---")
return retData return retData['tycData']
# 更新天眼查企业基本信息 # 更新天眼查企业基本信息
......
...@@ -3,7 +3,6 @@ import json ...@@ -3,7 +3,6 @@ import json
import requests, time, pymysql import requests, time, pymysql
import jieba import jieba
import sys import sys
from kafka import KafkaProducer from kafka import KafkaProducer
from getTycId import getTycIdByXYDM from getTycId import getTycIdByXYDM
from base.BaseCore import BaseCore from base.BaseCore import BaseCore
...@@ -12,15 +11,15 @@ from base.smart import smart_extractor ...@@ -12,15 +11,15 @@ from base.smart import smart_extractor
# import BaseCore # import BaseCore
# from smart import smart_extractor # from smart import smart_extractor
import urllib3 import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 初始化,设置中文分词 # 初始化,设置中文分词
jieba.cut("必须加载jieba") jieba.cut("必须加载jieba")
smart =smart_extractor.SmartExtractor('cn') smart =smart_extractor.SmartExtractor('cn')
baseCore = BaseCore() baseCore = BaseCore()
log = baseCore.getLogger() log = baseCore.getLogger()
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4') cnx = pymysql.connect(host='114.116.44.11', user='caiji', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
cursor = cnx.cursor() cursor = cnx.cursor()
pageSize = 10 pageSize = 10
headers = { headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
...@@ -134,10 +133,10 @@ def beinWork(tyc_code, social_code,start_time): ...@@ -134,10 +133,10 @@ def beinWork(tyc_code, social_code,start_time):
link = info_page['uri'] link = info_page['uri']
try: try:
sel_sql = '''select social_credit_code from brpa_source_article where source_address = %s and social_credit_code=%s and type='2' ''' sel_sql = '''select social_credit_code from brpa_source_article where source_address = %s and social_credit_code=%s and type='2' '''
cursor.execute(sel_sql, (link, social_code)) cursor_.execute(sel_sql, (link, social_code))
except Exception as e: except Exception as e:
print(e) print(e)
selects = cursor.fetchone() selects = cursor_.fetchone()
if selects: if selects:
log.info(f'{tyc_code}-----{social_code}----{link}:已经存在') log.info(f'{tyc_code}-----{social_code}----{link}:已经存在')
...@@ -156,7 +155,10 @@ def beinWork(tyc_code, social_code,start_time): ...@@ -156,7 +155,10 @@ def beinWork(tyc_code, social_code,start_time):
# 开始进行智能解析 # 开始进行智能解析
# lang = baseCore.detect_language(title) # lang = baseCore.detect_language(title)
# smart = smart_extractor.SmartExtractor(lang) # smart = smart_extractor.SmartExtractor(lang)
#带标签正文
contentText = smart.extract_by_url(link).text contentText = smart.extract_by_url(link).text
#不带标签正文
content = smart.extract_by_url(link).cleaned_text
# time.sleep(3) # time.sleep(3)
except Exception as e: except Exception as e:
contentText = '' contentText = ''
...@@ -175,36 +177,25 @@ def beinWork(tyc_code, social_code,start_time): ...@@ -175,36 +177,25 @@ def beinWork(tyc_code, social_code,start_time):
pass pass
continue continue
try: try:
insert_sql = '''insert into brpa_source_article(social_credit_code,title,summary,content,publish_date,source_address,origin,author,type,lang) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''' insert_sql = '''insert into brpa_source_article(social_credit_code,source_address,origin,type,create_time) values(%s,%s,%s,%s,now())'''
# 动态信息列表 # 动态信息列表
up_okCount = up_okCount + 1 up_okCount = up_okCount + 1
list_info = [ list_info = [
social_code, social_code,
title,
info_page['abstracts'], # 摘要
contentText, # 正文
time_format, # 发布时间
link, link,
'天眼查', '天眼查',
source,
'2', '2',
'zh'
] ]
cursor.execute(insert_sql, tuple(list_info)) cursor_.execute(insert_sql, tuple(list_info))
cnx.commit() cnx_.commit()
# 采集一条资讯记录一条,记录该企业采到了多少的资讯 # 采集一条资讯记录一条,记录该企业采到了多少的资讯
log.info(f'{social_code}----{link}:新增一条') log.info(f'{social_code}----{link}:新增一条')
sel_sql = "select article_id from brpa_source_article where source_address = %s and social_credit_code = %s"
cursor.execute(sel_sql, (link, social_code))
row = cursor.fetchone()
id = row[0]
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# todo:插入一条数据,并传入kafka # todo:插入一条数据,并传入kafka
dic_news = { dic_news = {
'attachmentIds': id, 'attachmentIds': '',
'author': '', 'author': '',
'content': contentText, 'content': content,
'contentWithTag': contentText, 'contentWithTag': contentText,
'createDate': time_now, 'createDate': time_now,
'deleteFlag': '0', 'deleteFlag': '0',
...@@ -222,7 +213,6 @@ def beinWork(tyc_code, social_code,start_time): ...@@ -222,7 +213,6 @@ def beinWork(tyc_code, social_code,start_time):
'year': time_format[:4] 'year': time_format[:4]
} }
except Exception as e: except Exception as e:
log.info(f'传输失败:{social_code}----{link}') log.info(f'传输失败:{social_code}----{link}')
e = '数据库传输失败' e = '数据库传输失败'
state = 0 state = 0
...@@ -237,7 +227,6 @@ def beinWork(tyc_code, social_code,start_time): ...@@ -237,7 +227,6 @@ def beinWork(tyc_code, social_code,start_time):
json.dumps(dic_news, ensure_ascii=False).encode('utf8')) json.dumps(dic_news, ensure_ascii=False).encode('utf8'))
print(kafka_result.get(timeout=10)) print(kafka_result.get(timeout=10))
dic_result = { dic_result = {
'success': 'ture', 'success': 'ture',
'message': '操作成功', 'message': '操作成功',
...@@ -250,7 +239,6 @@ def beinWork(tyc_code, social_code,start_time): ...@@ -250,7 +239,6 @@ def beinWork(tyc_code, social_code,start_time):
baseCore.recordLog(social_code, taskType, state, takeTime, link, '') baseCore.recordLog(social_code, taskType, state, takeTime, link, '')
# return True # return True
except Exception as e: except Exception as e:
dic_result = { dic_result = {
'success': 'false', 'success': 'false',
'message': '操作失败', 'message': '操作失败',
...@@ -269,12 +257,12 @@ def beinWork(tyc_code, social_code,start_time): ...@@ -269,12 +257,12 @@ def beinWork(tyc_code, social_code,start_time):
retData['up_repetCount'] = up_repetCount retData['up_repetCount'] = up_repetCount
return retData return retData
# 日志信息保存至现已创建好数据库中,因此并没有再对此前保存日志信息数据库进行保存 # 日志信息保存至现已创建好数据库中,因此并没有再对此前保存日志信息数据库进行保存
def doJob(): def doJob():
while True: while True:
# 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息 # 根据从Redis中拿到的社会信用代码,在数据库中获取对应基本信息
social_code = baseCore.redicPullData('NewsEnterprise:gnqy_socialCode') # social_code = baseCore.redicPullData('NewsEnterprise:gnqy_socialCode')
social_code = '912301001275921118'
# 判断 如果Redis中已经没有数据,则等待 # 判断 如果Redis中已经没有数据,则等待
if social_code == None: if social_code == None:
time.sleep(20) time.sleep(20)
...@@ -291,28 +279,31 @@ def doJob(): ...@@ -291,28 +279,31 @@ def doJob():
id = data[0] id = data[0]
xydm = data[2] xydm = data[2]
tycid = data[11] tycid = data[11]
if tycid == None: if tycid == None or tycid == '':
try: try:
retData = getTycIdByXYDM(xydm) retData = getTycIdByXYDM(xydm)
tycid = retData['tycData']['id'] if retData:
#todo:写入数据库 tycid = retData['id']
updateSql = f"update Enterprise set TYCID = '{tycid}' where SocialCode = '{xydm}'" # todo:写入数据库
cursor_.execute(updateSql) updateSql = f"update EnterpriseInfo set TYCID = '{tycid}' where SocialCode = '{xydm}'"
cnx_.commit() cursor_.execute(updateSql)
cnx_.commit()
else:
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', '获取天眼查id失败')
log.info(f'======={social_code}====重新放入redis====')
baseCore.rePutIntoR('NewsEnterprise:gnqy_socialCode', social_code)
continue
except: except:
state = 0 state = 0
takeTime = baseCore.getTimeCost(start, time.time()) takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', '获取天眼查id失败') baseCore.recordLog(social_code, taskType, state, takeTime, '', '获取天眼查id失败')
baseCore.rePutIntoR('NewsEnterprise:gnqy_socialCode',social_code) baseCore.rePutIntoR('NewsEnterprise:gnqy_socialCode', social_code)
continue continue
count = data[17] count = data[17]
log.info(f"{id}---{xydm}----{tycid}----开始处理") log.info(f"{id}---{xydm}----{tycid}----开始处理")
start_time = time.time() start_time = time.time()
# updateBeginSql = f"update ssqy_tyc set update_state=2,date_time=now() where id={id}"
# cursor.execute(updateBeginSql)
# cnx.commit()
# 开始采集企业动态 # 开始采集企业动态
retData = beinWork(tycid, xydm,start_time) retData = beinWork(tycid, xydm,start_time)
# 信息采集完成后将该企业的采集次数更新 # 信息采集完成后将该企业的采集次数更新
......
#下载pdf文件
import os
from datetime import time
import pymysql
import requests
import urllib3
from pymysql.converters import escape_string
from base.BaseCore import BaseCore
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
headers = {
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'accept-encoding': 'gzip, deflate, br',
'accept-language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7',
'cache-control': 'max-age=0',
# 'cookie': 'maex=%7B%22v2%22%3A%7B%7D%7D; GUC=AQEBBwFjY49jkEIa8gQo&s=AQAAABw20C7P&g=Y2JIFQ; A1=d=AQABBBIpnmICEOnPTXZVmK6DESXgxq3niTMFEgEBBwGPY2OQYysNb2UB_eMBAAcIEimeYq3niTM&S=AQAAAobGawhriFKqJdu9-rSz9nc; A3=d=AQABBBIpnmICEOnPTXZVmK6DESXgxq3niTMFEgEBBwGPY2OQYysNb2UB_eMBAAcIEimeYq3niTM&S=AQAAAobGawhriFKqJdu9-rSz9nc; A1S=d=AQABBBIpnmICEOnPTXZVmK6DESXgxq3niTMFEgEBBwGPY2OQYysNb2UB_eMBAAcIEimeYq3niTM&S=AQAAAobGawhriFKqJdu9-rSz9nc&j=WORLD; PRF=t%3D6954.T%252BTEL%252BSOLB.BR%252BSTM%252BEMR%252BGT%252BAMD%252BSYM.DE%252BPEMEX%252BSGO.PA%252BLRLCF%252BSYNH%252B001040.KS; cmp=t=1669714927&j=0&u=1---',
'sec-ch-ua': '"Chromium";v="106", "Google Chrome";v="106", "Not;A=Brand";v="99"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': "Windows",
'sec-fetch-dest': 'document',
'sec-fetch-mode': 'navigate',
'sec-fetch-site': 'same-origin',
'sec-fetch-user': '?1',
'upgrade-insecure-requests': '1',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36'
}
baseCore = BaseCore()
log =baseCore.getLogger()
cnx = pymysql.connect(host='114.115.159.144', user='caiji', password='zzsn9988', db='caiji',
charset='utf8mb4')
cursor = cnx.cursor()
def get_file_name(headers):
filename = ''
if 'Content-Disposition' in headers and headers['Content-Disposition']:
disposition_split = headers['Content-Disposition'].split(';')
if len(disposition_split) > 1:
if disposition_split[1].strip().lower().startswith('filename='):
file_name = disposition_split[1].split('=')
if len(file_name) > 1:
filename = file_name[1]
if not filename:
return baseCore.getNextSeq()+".pdf"
return filename
def downFile(url,path):
try:
baseCore.mkPath(path)
proxy = {'https': 'http://127.0.0.1:1080', 'http': 'http://127.0.0.1:1080'}
response = requests.get(url, proxies=proxy, headers=headers, verify=False,timeout=10)
fileName = get_file_name(response.headers)
with open(os.path.join(path, fileName), "wb") as pyFile:
for chunk in response.iter_content(chunk_size=1024):
if chunk:
pyFile.write(chunk)
except Exception as e:
log.error(f"出错了----------{e}")
return False
return fileName
def getPath(str):
str = str.replace(':', '')
str = str.replace(': ', '')
str = str.replace(' ', '')
str = str.replace('"', '')
str = str.replace("'", '')
str = str.replace("/", '')
return str
if __name__ == '__main__':
while True :
selectSql = f"select id,url,website,ftype,stype,ttype from usvsrussia where state=0 order by id asc limit 1"
cursor.execute(selectSql)
data = cursor.fetchone()
if data:
id=data[0]
url=data[1]
website=data[2]
ftype=data[3]
stype=data[4]
ttype=data[5]
path=r'D:\美国VS俄罗斯制裁'
log.info(f"开始处理{url}----")
if website:
path = os.path.join(path, getPath(website))
if ftype:
path = os.path.join(path, getPath(ftype))
if stype:
path = os.path.join(path, getPath(stype))
if ttype:
path = os.path.join(path, getPath(ttype))
fileName = downFile(url,path)
if fileName:
updateSql = f"update usvsrussia set state=1,pdf_name='{fileName}' ,pdf_path='{escape_string(path)}' where id={id}"
log.info(f"开始处理{url}----处理ok")
else:
updateSql = f"update usvsrussia set state=2 where id={id}"
log.info(f"开始处理{url}----处理error")
cursor.execute(updateSql)
cnx.commit()
else:
log.info("数据处理完毕,程序退出")
break
baseCore.close()
cursor.close()
cnx.close()
\ No newline at end of file
import pandas as pd
import pymysql
import requests
from bs4 import BeautifulSoup
from pymysql.converters import escape_string
from selenium.webdriver.common.by import By
from base.BaseCore import BaseCore
baseCore = BaseCore()
log =baseCore.getLogger()
headers = {
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'accept-encoding': 'gzip, deflate, br',
'accept-language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7',
'cache-control': 'max-age=0',
# 'cookie': 'maex=%7B%22v2%22%3A%7B%7D%7D; GUC=AQEBBwFjY49jkEIa8gQo&s=AQAAABw20C7P&g=Y2JIFQ; A1=d=AQABBBIpnmICEOnPTXZVmK6DESXgxq3niTMFEgEBBwGPY2OQYysNb2UB_eMBAAcIEimeYq3niTM&S=AQAAAobGawhriFKqJdu9-rSz9nc; A3=d=AQABBBIpnmICEOnPTXZVmK6DESXgxq3niTMFEgEBBwGPY2OQYysNb2UB_eMBAAcIEimeYq3niTM&S=AQAAAobGawhriFKqJdu9-rSz9nc; A1S=d=AQABBBIpnmICEOnPTXZVmK6DESXgxq3niTMFEgEBBwGPY2OQYysNb2UB_eMBAAcIEimeYq3niTM&S=AQAAAobGawhriFKqJdu9-rSz9nc&j=WORLD; PRF=t%3D6954.T%252BTEL%252BSOLB.BR%252BSTM%252BEMR%252BGT%252BAMD%252BSYM.DE%252BPEMEX%252BSGO.PA%252BLRLCF%252BSYNH%252B001040.KS; cmp=t=1669714927&j=0&u=1---',
'sec-ch-ua': '"Chromium";v="106", "Google Chrome";v="106", "Not;A=Brand";v="99"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': "Windows",
'sec-fetch-dest': 'document',
'sec-fetch-mode': 'navigate',
'sec-fetch-site': 'same-origin',
'sec-fetch-user': '?1',
'upgrade-insecure-requests': '1',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36'
}
cnx = baseCore.cnx
cursor = baseCore.cursor
def job_2():
log.info('----开始采集---俄罗斯国家杂志----')
path = 'D:chrome/chromedriver.exe'
driverContent = baseCore.buildDriver(path, headless=False)
url = 'http://publication.pravo.gov.ru/documents/block/president'
req = requests.get(url,headers)
soup = BeautifulSoup(req.content,'html.parser')
container = soup.find('div',class_='documents-container')
web_list = container.find_all('div',class_='documents-table-row')
for web in web_list[:1]:
web_href = web.find('a')['href']
web_url = 'http://publication.pravo.gov.ru/' + web_href
title = web.find('a').text
print(title)
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论