提交 b05ef000 作者: 刘伟刚

Merge remote-tracking branch 'origin/master'

......@@ -241,7 +241,6 @@ class BaseCore:
except :
pass
# 计算耗时
def getTimeCost(self,start, end):
seconds = int(end - start)
......@@ -393,7 +392,7 @@ class BaseCore:
# 从Redis的List中获取并移除一个元素
def redicPullData(self,key):
item = self.r.lpop(key)
item = self.r.rpop(key)
return item.decode() if item else None
# 获得脚本进程PID
......@@ -494,4 +493,27 @@ class BaseCore:
def rePutIntoR(self,item):
self.r.rpush('NewsEnterprise:gwqy_socialCode', item)
#增加计数器的值并返回增加后的值
def incrSet(self,key):
# 增加计数器的值并返回增加后的值
new_value = self.r.incr(key)
print("增加后的值:", new_value)
return new_value
#获取key剩余的过期时间
def getttl(self,key):
# 获取key的剩余过期时间
ttl = self.r.ttl(key)
print("剩余过期时间:", ttl)
# 判断key是否已过期
if ttl < 0:
# key已过期,将key的值重置为0
self.r.set(key, 0)
self.r.expire(key, 3600)
time.sleep(2)
......@@ -72,7 +72,7 @@ def NewsEnterprise_task():
#企业公告
def NoticeEnterprise():
# 获取国内企业
gn_query = "select SocialCode from EnterpriseInfo where Place = '1' and SecuritiesCode is not null limit 1 "
gn_query = "select SocialCode from EnterpriseInfo where Place = '1' and SecuritiesCode is not null limit 10 "
cursor.execute(gn_query)
gn_result = cursor.fetchall()
gn_social_list = [item[0] for item in gn_result]
......@@ -95,13 +95,14 @@ def NoticeEnterprise_task():
#企业年报
def AnnualEnterprise():
# 获取国内企业
gn_query = "select SocialCode from EnterpriseInfo where Place = '1' and SecuritiesCode is not null"
gn_query = "select SocialCode from EnterpriseInfo where Place = '1' and SecuritiesCode is not null limit 10"
cursor.execute(gn_query)
gn_result = cursor.fetchall()
gn_social_list = [item[0] for item in gn_result]
print('=======')
for item in gn_social_list:
r.rpush('AnnualEnterprise:gnqy_socialCode', item)
#企业年报定时任务
def AnnualEnterprise_task():
# 实例化一个调度器
......@@ -118,7 +119,7 @@ def AnnualEnterprise_task():
#企业基本信息
def BaseInfoEnterprise():
# 获取国内企业
gn_query = "select SocialCode from EnterpriseInfo where Place = '1' limit 1 "
gn_query = "select SocialCode from EnterpriseInfo where Place = '1' limit 10 "
cursor.execute(gn_query)
gn_result = cursor.fetchall()
gn_social_list = [item[0] for item in gn_result]
......@@ -162,35 +163,66 @@ def weixin_task():
print('定时采集异常', e)
pass
# 企业年报——雪球网
def AnnualEnterpriseIPO():
# 获取国内上市企业
gn_query = "select SocialCode from EnterpriseInfo where Place = '1' and SecuritiesCode is not null and isIPO = 1 limit 10"
cursor.execute(gn_query)
gn_result = cursor.fetchall()
gn_social_list = [item[0] for item in gn_result]
print('=======')
for item in gn_social_list:
r.rpush('AnnualEnterprise:gnshqy_socialCode', item)
#国外企业基本信息
def BaseInfoEnterpriseAbroad():
# 获取国外企业
gn_query = "select id from EnterpriseInfo where Place = '2' limit 10 "
cursor.execute(gn_query)
gn_result = cursor.fetchall()
gn_social_list = [item[0] for item in gn_result]
print('=======')
for item in gn_social_list:
r.rpush('BaseInfoEnterprise:gwqy_socialCode', item)
##福布斯=====从数据库中读取信息放入redis
def FBS():
# todo:调整为获取福布斯的数据库
gw_query = "select SocialCode from EnterpriseInfo where Place = '2'"
cursor.execute(gw_query)
gw_result = cursor.fetchall()
# gw_query = "select id from EnterpriseInfo where ext1='fbs2000' and ext2='1' and Place=2"
# cursor.execute(gw_query)
# gw_result = cursor.fetchall()
# #获取国内企业
gn_query = "select SocialCode from EnterpriseInfo where Place = '1'"
#获取国内企业
gn_query = "select id from EnterpriseInfo where ext1='fbs2000' and ext2='1' and Place=1"
cursor.execute(gn_query)
gn_result = cursor.fetchall()
gn_social_list = [item[0] for item in gn_result]
gw_social_list = [item[0] for item in gw_result]
for item in gw_social_list:
r.rpush('NewsEnterprise:gwqy_socialCode', item)
# gw_social_list = [item[0] for item in gw_result]
#
# for item in gw_social_list:
# r.rpush('NewsEnterpriseFbs:gwqy_socialCode', item)
for item in gn_social_list:
r.rpush('NewsEnterprise:gnqy_socialCode', item)
if not r.exists(item):
r.rpush('NewsEnterpriseFbs:gnqy_socialCode', item)
if __name__ == "__main__":
start = time.time()
# NoticeEnterprise()
# AnnualEnterpriseIPO()
# AnnualEnterprise()
# BaseInfoEnterpriseAbroad()
# NewsEnterprise_task()
# NewsEnterprise()
FBS()
# BaseInfoEnterprise()
# FBS()
# NoticeEnterprise_task()
# AnnualEnterprise_task()
NoticeEnterprise()
log.info(f'====={basecore.getNowTime(1)}=====添加数据成功======耗时:{basecore.getTimeCost(start,time.time())}===')
# cnx.close()
# cursor.close()
......
# connect timeout in seconds
# default value is 30s
connect_timeout=300
# network timeout in seconds
# default value is 30s
network_timeout=600
# the base path to store log files
#base_path=/home/tarena/django-project/cc_shop1/cc_shop1/logs
# tracker_server can ocur more than once, and tracker_server format is
# "host:port", host can be hostname or ip address
tracker_server=114.115.215.96:22122
#standard log level as syslog, case insensitive, value list:
### emerg for emergency
### alert
### crit for critical
### error
### warn for warning
### notice
### info
### debug
log_level=info
# if use connection pool
# default value is false
# since V4.05
use_connection_pool = false
# connections whose the idle time exceeds this time will be closed
# unit: second
# default value is 3600
# since V4.05
connection_pool_max_idle_time = 3600
# if load FastDFS parameters from tracker server
# since V4.05
# default value is false
load_fdfs_parameters_from_tracker=false
# if use storage ID instead of IP address
# same as tracker.conf
# valid only when load_fdfs_parameters_from_tracker is false
# default value is false
# since V4.05
use_storage_id = false
# specify storage ids filename, can use relative or absolute path
# same as tracker.conf
# valid only when load_fdfs_parameters_from_tracker is false
# since V4.05
storage_ids_filename = storage_ids.conf
#HTTP settings
http.tracker_server_port=80
#use "#include" directive to include HTTP other settiongs
##include http.conf
\ No newline at end of file
# __init__.py
__version__ = '2.2.0'
VERSION = tuple(map(int, __version__.split('.')))
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# filename: client.py
'''
Client module for Fastdfs 3.08
author: scott yuan scottzer8@gmail.com
date: 2012-06-21
'''
import os
import sys
from fdfs_client.utils import *
from fdfs_client.tracker_client import *
from fdfs_client.storage_client import *
from fdfs_client.exceptions import *
def get_tracker_conf(conf_path='client.conf'):
cf = Fdfs_ConfigParser()
tracker = {}
try:
cf.read(conf_path)
timeout = cf.getint('__config__', 'connect_timeout')
tracker_list = cf.get('__config__', 'tracker_server')
if isinstance(tracker_list, str):
tracker_list = [tracker_list]
tracker_ip_list = []
for tr in tracker_list:
tracker_ip, tracker_port = tr.split(':')
tracker_ip_list.append(tracker_ip)
tracker['host_tuple'] = tuple(tracker_ip_list)
tracker['port'] = int(tracker_port)
tracker['timeout'] = timeout
tracker['name'] = 'Tracker Pool'
except:
raise
return tracker
class Fdfs_client(object):
'''
Class Fdfs_client implemented Fastdfs client protol ver 3.08.
It's useful upload, download, delete file to or from fdfs server, etc. It's uses
connection pool to manage connection to server.
'''
def __init__(self, trackers, poolclass=ConnectionPool):
self.trackers = trackers
self.tracker_pool = poolclass(**self.trackers)
self.timeout = self.trackers['timeout']
return None
def __del__(self):
try:
self.pool.destroy()
self.pool = None
except:
pass
def upload_by_filename(self, filename, meta_dict=None):
'''
Upload a file to Storage server.
arguments:
@filename: string, name of file that will be uploaded
@meta_dict: dictionary e.g.:{
'ext_name' : 'jpg',
'file_size' : '10240B',
'width' : '160px',
'hight' : '80px'
} meta_dict can be null
@return dict {
'Group name' : group_name,
'Remote file_id' : remote_file_id,
'Status' : 'Upload successed.',
'Local file name' : local_file_name,
'Uploaded size' : upload_size,
'Storage IP' : storage_ip
} if success else None
'''
isfile, errmsg = fdfs_check_file(filename)
if not isfile:
raise DataError(errmsg + '(uploading)')
tc = Tracker_client(self.tracker_pool)
store_serv = tc.tracker_query_storage_stor_without_group()
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
return store.storage_upload_by_filename(tc, store_serv, filename, meta_dict)
def upload_by_file(self, filename, meta_dict=None):
isfile, errmsg = fdfs_check_file(filename)
if not isfile:
raise DataError(errmsg + '(uploading)')
tc = Tracker_client(self.tracker_pool)
store_serv = tc.tracker_query_storage_stor_without_group()
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
return store.storage_upload_by_file(tc, store_serv, filename, meta_dict)
def upload_by_buffer(self, filebuffer, file_ext_name=None, meta_dict=None):
'''
Upload a buffer to Storage server.
arguments:
@filebuffer: string, buffer
@file_ext_name: string, file extend name
@meta_dict: dictionary e.g.:{
'ext_name' : 'jpg',
'file_size' : '10240B',
'width' : '160px',
'hight' : '80px'
}
@return dict {
'Group name' : group_name,
'Remote file_id' : remote_file_id,
'Status' : 'Upload successed.',
'Local file name' : '',
'Uploaded size' : upload_size,
'Storage IP' : storage_ip
} if success else None
'''
if not filebuffer:
raise DataError('[-] Error: argument filebuffer can not be null.')
tc = Tracker_client(self.tracker_pool)
store_serv = tc.tracker_query_storage_stor_without_group()
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
return store.storage_upload_by_buffer(tc, store_serv, filebuffer, file_ext_name, meta_dict)
def upload_slave_by_filename(self, filename, remote_file_id, prefix_name, meta_dict=None):
'''
Upload slave file to Storage server.
arguments:
@filename: string, local file name
@remote_file_id: string, remote file id
@prefix_name: string
@meta_dict: dictionary e.g.:{
'ext_name' : 'jpg',
'file_size' : '10240B',
'width' : '160px',
'hight' : '80px'
}
@return dictionary {
'Status' : 'Upload slave successed.',
'Local file name' : local_filename,
'Uploaded size' : upload_size,
'Remote file id' : remote_file_id,
'Storage IP' : storage_ip
}
'''
isfile, errmsg = fdfs_check_file(filename)
if not isfile:
raise DataError(errmsg + '(uploading slave)')
tmp = split_remote_fileid(remote_file_id)
if not tmp:
raise DataError('[-] Error: remote_file_id is invalid.(uploading slave)')
if not prefix_name:
raise DataError('[-] Error: prefix_name can not be null.')
group_name, remote_filename = tmp
tc = Tracker_client(self.tracker_pool)
store_serv = tc.tracker_query_storage_stor_with_group(group_name)
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
try:
ret_dict = store.storage_upload_slave_by_filename(tc, store_serv, filename, prefix_name, remote_filename,
meta_dict=None)
except:
raise
ret_dict['Status'] = 'Upload slave file successed.'
return ret_dict
def upload_slave_by_file(self, filename, remote_file_id, prefix_name, meta_dict=None):
'''
Upload slave file to Storage server.
arguments:
@filename: string, local file name
@remote_file_id: string, remote file id
@prefix_name: string
@meta_dict: dictionary e.g.:{
'ext_name' : 'jpg',
'file_size' : '10240B',
'width' : '160px',
'hight' : '80px'
}
@return dictionary {
'Status' : 'Upload slave successed.',
'Local file name' : local_filename,
'Uploaded size' : upload_size,
'Remote file id' : remote_file_id,
'Storage IP' : storage_ip
}
'''
isfile, errmsg = fdfs_check_file(filename)
if not isfile:
raise DataError(errmsg + '(uploading slave)')
tmp = split_remote_fileid(remote_file_id)
if not tmp:
raise DataError('[-] Error: remote_file_id is invalid.(uploading slave)')
if not prefix_name:
raise DataError('[-] Error: prefix_name can not be null.')
group_name, remote_filename = tmp
tc = Tracker_client(self.tracker_pool)
store_serv = tc.tracker_query_storage_stor_with_group(group_name)
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
try:
ret_dict = store.storage_upload_slave_by_file(tc, store_serv, filename, prefix_name, remote_filename,
meta_dict=None)
except:
raise
ret_dict['Status'] = 'Upload slave file successed.'
return ret_dict
def upload_slave_by_buffer(self, filebuffer, remote_file_id, meta_dict=None, file_ext_name=None):
'''
Upload slave file by buffer
arguments:
@filebuffer: string
@remote_file_id: string
@meta_dict: dictionary e.g.:{
'ext_name' : 'jpg',
'file_size' : '10240B',
'width' : '160px',
'hight' : '80px'
}
@return dictionary {
'Status' : 'Upload slave successed.',
'Local file name' : local_filename,
'Uploaded size' : upload_size,
'Remote file id' : remote_file_id,
'Storage IP' : storage_ip
}
'''
if not filebuffer:
raise DataError('[-] Error: argument filebuffer can not be null.')
tmp = split_remote_fileid(remote_file_id)
if not tmp:
raise DataError('[-] Error: remote_file_id is invalid.(uploading slave)')
group_name, remote_filename = tmp
tc = Tracker_client(self.tracker_pool)
store_serv = tc.tracker_query_storage_update(group_name, remote_filename)
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
return store.storage_upload_slave_by_buffer(tc, store_serv, filebuffer, remote_filename, meta_dict,
file_ext_name)
def upload_appender_by_filename(self, local_filename, meta_dict=None):
'''
Upload an appender file by filename.
arguments:
@local_filename: string
@meta_dict: dictionary e.g.:{
'ext_name' : 'jpg',
'file_size' : '10240B',
'width' : '160px',
'hight' : '80px'
} Notice: it can be null
@return dict {
'Group name' : group_name,
'Remote file_id' : remote_file_id,
'Status' : 'Upload successed.',
'Local file name' : '',
'Uploaded size' : upload_size,
'Storage IP' : storage_ip
} if success else None
'''
isfile, errmsg = fdfs_check_file(local_filename)
if not isfile:
raise DataError(errmsg + '(uploading appender)')
tc = Tracker_client(self.tracker_pool)
store_serv = tc.tracker_query_storage_stor_without_group()
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
return store.storage_upload_appender_by_filename(tc, store_serv, local_filename, meta_dict)
def upload_appender_by_file(self, local_filename, meta_dict=None):
'''
Upload an appender file by file.
arguments:
@local_filename: string
@meta_dict: dictionary e.g.:{
'ext_name' : 'jpg',
'file_size' : '10240B',
'width' : '160px',
'hight' : '80px'
} Notice: it can be null
@return dict {
'Group name' : group_name,
'Remote file_id' : remote_file_id,
'Status' : 'Upload successed.',
'Local file name' : '',
'Uploaded size' : upload_size,
'Storage IP' : storage_ip
} if success else None
'''
isfile, errmsg = fdfs_check_file(local_filename)
if not isfile:
raise DataError(errmsg + '(uploading appender)')
tc = Tracker_client(self.tracker_pool)
store_serv = tc.tracker_query_storage_stor_without_group()
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
return store.storage_upload_appender_by_file(tc, store_serv, local_filename, meta_dict)
def upload_appender_by_buffer(self, filebuffer, file_ext_name=None, meta_dict=None):
'''
Upload a buffer to Storage server.
arguments:
@filebuffer: string
@file_ext_name: string, can be null
@meta_dict: dictionary, can be null
@return dict {
'Group name' : group_name,
'Remote file_id' : remote_file_id,
'Status' : 'Upload successed.',
'Local file name' : '',
'Uploaded size' : upload_size,
'Storage IP' : storage_ip
} if success else None
'''
if not filebuffer:
raise DataError('[-] Error: argument filebuffer can not be null.')
tc = Tracker_client(self.tracker_pool)
store_serv = tc.tracker_query_storage_stor_without_group()
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
return store.storage_upload_appender_by_buffer(tc, store_serv, filebuffer, meta_dict, file_ext_name)
def delete_file(self, remote_file_id):
'''
Delete a file from Storage server.
arguments:
@remote_file_id: string, file_id of file that is on storage server
@return tuple ('Delete file successed.', remote_file_id, storage_ip)
'''
tmp = split_remote_fileid(remote_file_id)
if not tmp:
raise DataError('[-] Error: remote_file_id is invalid.(in delete file)')
group_name, remote_filename = tmp
tc = Tracker_client(self.tracker_pool)
store_serv = tc.tracker_query_storage_update(group_name, remote_filename)
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
return store.storage_delete_file(tc, store_serv, remote_filename)
def download_to_file(self, local_filename, remote_file_id, offset=0, down_bytes=0):
'''
Download a file from Storage server.
arguments:
@local_filename: string, local name of file
@remote_file_id: string, file_id of file that is on storage server
@offset: long
@downbytes: long
@return dict {
'Remote file_id' : remote_file_id,
'Content' : local_filename,
'Download size' : downloaded_size,
'Storage IP' : storage_ip
}
'''
tmp = split_remote_fileid(remote_file_id)
if not tmp:
raise DataError('[-] Error: remote_file_id is invalid.(in download file)')
group_name, remote_filename = tmp
if not offset:
file_offset = int(offset)
if not down_bytes:
download_bytes = int(down_bytes)
tc = Tracker_client(self.tracker_pool)
store_serv = tc.tracker_query_storage_fetch(group_name, remote_filename)
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
return store.storage_download_to_file(tc, store_serv, local_filename, file_offset, download_bytes,
remote_filename)
def download_to_buffer(self, remote_file_id, offset=0, down_bytes=0):
'''
Download a file from Storage server and store in buffer.
arguments:
@remote_file_id: string, file_id of file that is on storage server
@offset: long
@down_bytes: long
@return dict {
'Remote file_id' : remote_file_id,
'Content' : file_buffer,
'Download size' : downloaded_size,
'Storage IP' : storage_ip
}
'''
tmp = split_remote_fileid(remote_file_id)
if not tmp:
raise DataError('[-] Error: remote_file_id is invalid.(in download file)')
group_name, remote_filename = tmp
if not offset:
file_offset = int(offset)
if not down_bytes:
download_bytes = int(down_bytes)
tc = Tracker_client(self.tracker_pool)
store_serv = tc.tracker_query_storage_fetch(group_name, remote_filename)
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
file_buffer = None
return store.storage_download_to_buffer(tc, store_serv, file_buffer, file_offset, download_bytes,
remote_filename)
def list_one_group(self, group_name):
'''
List one group information.
arguments:
@group_name: string, group name will be list
@return Group_info, instance
'''
tc = Tracker_client(self.tracker_pool)
return tc.tracker_list_one_group(group_name)
def list_servers(self, group_name, storage_ip=None):
'''
List all storage servers information in a group
arguments:
@group_name: string
@return dictionary {
'Group name' : group_name,
'Servers' : server list,
}
'''
tc = Tracker_client(self.tracker_pool)
return tc.tracker_list_servers(group_name, storage_ip)
def list_all_groups(self):
'''
List all group information.
@return dictionary {
'Groups count' : group_count,
'Groups' : list of groups
}
'''
tc = Tracker_client(self.tracker_pool)
return tc.tracker_list_all_groups()
def get_meta_data(self, remote_file_id):
'''
Get meta data of remote file.
arguments:
@remote_fileid: string, remote file id
@return dictionary, meta data
'''
tmp = split_remote_fileid(remote_file_id)
if not tmp:
raise DataError('[-] Error: remote_file_id is invalid.(in get meta data)')
group_name, remote_filename = tmp
tc = Tracker_client(self.tracker_pool)
store_serv = tc.tracker_query_storage_update(group_name, remote_filename)
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
return store.storage_get_metadata(tc, store_serv, remote_filename)
def set_meta_data(self, remote_file_id, meta_dict, op_flag=STORAGE_SET_METADATA_FLAG_OVERWRITE):
'''
Set meta data of remote file.
arguments:
@remote_file_id: string
@meta_dict: dictionary
@op_flag: char, 'O' for overwrite, 'M' for merge
@return dictionary {
'Status' : status,
'Storage IP' : storage_ip
}
'''
tmp = split_remote_fileid(remote_file_id)
if not tmp:
raise DataError('[-] Error: remote_file_id is invalid.(in set meta data)')
group_name, remote_filename = tmp
tc = Tracker_client(self.tracker_pool)
try:
store_serv = tc.tracker_query_storage_update(group_name, remote_filename)
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
status = store.storage_set_metadata(tc, store_serv, remote_filename, meta_dict)
except (ConnectionError, ResponseError, DataError):
raise
# if status == 2:
# raise DataError('[-] Error: remote file %s is not exist.' % remote_file_id)
if status != 0:
raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
ret_dict = {}
ret_dict['Status'] = 'Set meta data success.'
ret_dict['Storage IP'] = store_serv.ip_addr
return ret_dict
def append_by_filename(self, local_filename, remote_fileid):
isfile, errmsg = fdfs_check_file(local_filename)
if not isfile:
raise DataError(errmsg + '(append)')
tmp = split_remote_fileid(remote_fileid)
if not tmp:
raise DataError('[-] Error: remote_file_id is invalid.(append)')
group_name, appended_filename = tmp
tc = Tracker_client(self.tracker_pool)
store_serv = tc.tracker_query_storage_update(group_name, appended_filename)
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
return store.storage_append_by_filename(tc, store_serv, local_filename, appended_filename)
def append_by_file(self, local_filename, remote_fileid):
isfile, errmsg = fdfs_check_file(local_filename)
if not isfile:
raise DataError(errmsg + '(append)')
tmp = split_remote_fileid(remote_fileid)
if not tmp:
raise DataError('[-] Error: remote_file_id is invalid.(append)')
group_name, appended_filename = tmp
tc = Tracker_client(self.tracker_pool)
store_serv = tc.tracker_query_storage_update(group_name, appended_filename)
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
return store.storage_append_by_file(tc, store_serv, local_filename, appended_filename)
def append_by_buffer(self, file_buffer, remote_fileid):
if not file_buffer:
raise DataError('[-] Error: file_buffer can not be null.')
tmp = split_remote_fileid(remote_fileid)
if not tmp:
raise DataError('[-] Error: remote_file_id is invalid.(append)')
group_name, appended_filename = tmp
tc = Tracker_client(self.tracker_pool)
store_serv = tc.tracker_query_storage_update(group_name, appended_filename)
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
return store.storage_append_by_buffer(tc, store_serv, file_buffer, appended_filename)
def truncate_file(self, truncated_filesize, appender_fileid):
'''
Truncate file in Storage server.
arguments:
@truncated_filesize: long
@appender_fileid: remote_fileid
@return: dictionary {
'Status' : 'Truncate successed.',
'Storage IP' : storage_ip
}
'''
trunc_filesize = int(truncated_filesize)
tmp = split_remote_fileid(appender_fileid)
if not tmp:
raise DataError('[-] Error: appender_fileid is invalid.(truncate)')
group_name, appender_filename = tmp
tc = Tracker_client(self.tracker_pool)
store_serv = tc.tracker_query_storage_update(group_name, appender_filename)
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
return store.storage_truncate_file(tc, store_serv, trunc_filesize, appender_filename)
def modify_by_filename(self, filename, appender_fileid, offset=0):
'''
Modify a file in Storage server by file.
arguments:
@filename: string, local file name
@offset: long, file offset
@appender_fileid: string, remote file id
@return: dictionary {
'Status' : 'Modify successed.',
'Storage IP' : storage_ip
}
'''
isfile, errmsg = fdfs_check_file(filename)
if not isfile:
raise DataError(errmsg + '(modify)')
filesize = os.stat(filename).st_size
tmp = split_remote_fileid(appender_fileid)
if not tmp:
raise DataError('[-] Error: remote_fileid is invalid.(modify)')
group_name, appender_filename = tmp
if not offset:
file_offset = int(offset)
else:
file_offset = 0
tc = Tracker_client(self.tracker_pool)
store_serv = tc.tracker_query_storage_update(group_name, appender_filename)
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
return store.storage_modify_by_filename(tc, store_serv, filename, file_offset, filesize, appender_filename)
def modify_by_file(self, filename, appender_fileid, offset=0):
'''
Modify a file in Storage server by file.
arguments:
@filename: string, local file name
@offset: long, file offset
@appender_fileid: string, remote file id
@return: dictionary {
'Status' : 'Modify successed.',
'Storage IP' : storage_ip
}
'''
isfile, errmsg = fdfs_check_file(filename)
if not isfile:
raise DataError(errmsg + '(modify)')
filesize = os.stat(filename).st_size
tmp = split_remote_fileid(appender_fileid)
if not tmp:
raise DataError('[-] Error: remote_fileid is invalid.(modify)')
group_name, appender_filename = tmp
if not offset:
file_offset = int(offset)
else:
file_offset = 0
tc = Tracker_client(self.tracker_pool)
store_serv = tc.tracker_query_storage_update(group_name, appender_filename)
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
return store.storage_modify_by_file(tc, store_serv, filename, file_offset, filesize, appender_filename)
def modify_by_buffer(self, filebuffer, appender_fileid, offset=0):
'''
Modify a file in Storage server by buffer.
arguments:
@filebuffer: string, file buffer
@offset: long, file offset
@appender_fileid: string, remote file id
@return: dictionary {
'Status' : 'Modify successed.',
'Storage IP' : storage_ip
}
'''
if not filebuffer:
raise DataError('[-] Error: filebuffer can not be null.(modify)')
filesize = len(filebuffer)
tmp = split_remote_fileid(appender_fileid)
if not tmp:
raise DataError('[-] Error: remote_fileid is invalid.(modify)')
group_name, appender_filename = tmp
if not offset:
file_offset = int(offset)
else:
file_offset = 0
tc = Tracker_client(self.tracker_pool)
store_serv = tc.tracker_query_storage_update(group_name, appender_filename)
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
return store.storage_modify_by_buffer(tc, store_serv, filebuffer, file_offset, filesize, appender_filename)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# filename: connection.py
import socket
import os
import sys
import time
import random
from itertools import chain
from fdfs_client.exceptions import (
FDFSError,
ConnectionError,
ResponseError,
InvaildResponse,
DataError
)
# start class Connection
class Connection(object):
'''Manage TCP comunication to and from Fastdfs Server.'''
def __init__(self, **conn_kwargs):
self.pid = os.getpid()
self.host_tuple = conn_kwargs['host_tuple']
self.remote_port = conn_kwargs['port']
self.remote_addr = None
self.timeout = conn_kwargs['timeout']
self._sock = None
def __del__(self):
try:
self.disconnect()
except:
pass
def connect(self):
'''Connect to fdfs server.'''
if self._sock:
return
try:
sock = self._connect()
except socket.error as e:
raise ConnectionError(self._errormessage(e))
self._sock = sock
# print '[+] Create a connection success.'
# print '\tLocal address is %s:%s.' % self._sock.getsockname()
# print '\tRemote address is %s:%s' % (self.remote_addr, self.remote_port)
def _connect(self):
'''Create TCP socket. The host is random one of host_tuple.'''
self.remote_addr = random.choice(self.host_tuple)
# print '[+] Connecting... remote: %s:%s' % (self.remote_addr, self.remote_port)
# sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# sock.settimeout(self.timeout)
sock = socket.create_connection((self.remote_addr, self.remote_port), self.timeout)
return sock
def disconnect(self):
'''Disconnect from fdfs server.'''
if self._sock is None:
return
try:
self._sock.close()
except socket.error as e:
raise ConnectionError(self._errormessage(e))
self._sock = None
def get_sock(self):
return self._sock
def _errormessage(self, exception):
# args for socket.error can either be (errno, "message")
# or just "message" '''
if len(exception.args) == 1:
return "[-] Error: connect to %s:%s. %s." % (self.remote_addr, self.remote_port, exception.args[0])
else:
return "[-] Error: %s connect to %s:%s. %s." % \
(exception.args[0], self.remote_addr, self.remote_port, exception.args[1])
# end class Connection
# start ConnectionPool
class ConnectionPool(object):
'''Generic Connection Pool'''
def __init__(self, name='', conn_class=Connection,
max_conn=None, **conn_kwargs):
self.pool_name = name
self.pid = os.getpid()
self.conn_class = conn_class
self.max_conn = max_conn or 2 ** 31
self.conn_kwargs = conn_kwargs
self._conns_created = 0
self._conns_available = []
self._conns_inuse = set()
# print '[+] Create a connection pool success, name: %s.' % self.pool_name
def _check_pid(self):
if self.pid != os.getpid():
self.destroy()
self.__init__(self.conn_class, self.max_conn, **self.conn_kwargs)
def make_conn(self):
'''Create a new connection.'''
if self._conns_created >= self.max_conn:
raise ConnectionError('[-] Error: Too many connections.')
num_try = 10
while True:
try:
if num_try <= 0:
sys.exit()
conn_instance = self.conn_class(**self.conn_kwargs)
conn_instance.connect()
self._conns_created += 1
break
except ConnectionError as e:
print(e)
num_try -= 1
conn_instance = None
return conn_instance
def get_connection(self):
'''Get a connection from pool.'''
self._check_pid()
try:
conn = self._conns_available.pop()
# print '[+] Get a connection from pool %s.' % self.pool_name
# print '\tLocal address is %s:%s.' % conn._sock.getsockname()
# print '\tRemote address is %s:%s' % (conn.remote_addr, conn.remote_port)
except IndexError:
conn = self.make_conn()
self._conns_inuse.add(conn)
return conn
def remove(self, conn):
'''Remove connection from pool.'''
if conn in self._conns_inuse:
self._conns_inuse.remove(conn)
self._conns_created -= 1
if conn in self._conns_available:
self._conns_available.remove(conn)
self._conns_created -= 1
def destroy(self):
'''Disconnect all connections in the pool.'''
all_conns = chain(self._conns_inuse, self._conns_available)
for conn in all_conns:
conn.disconnect()
# print '[-] Destroy connection pool %s.' % self.pool_name
def release(self, conn):
'''Release the connection back to the pool.'''
self._check_pid()
if conn.pid == self.pid:
self._conns_inuse.remove(conn)
self._conns_available.append(conn)
# print '[-] Release connection back to pool %s.' % self.pool_name
# end ConnectionPool class
def tcp_recv_response(conn, bytes_size, buffer_size=4096):
'''Receive response from server.
It is not include tracker header.
arguments:
@conn: connection
@bytes_size: int, will be received byte_stream size
@buffer_size: int, receive buffer size
@Return: tuple,(response, received_size)
'''
recv_buff = []
total_size = 0
try:
while bytes_size > 0:
resp = conn._sock.recv(buffer_size)
recv_buff.append(resp)
total_size += len(resp)
bytes_size -= len(resp)
except (socket.error, socket.timeout) as e:
raise ConnectionError('[-] Error: while reading from socket: (%s)' % e.args)
return (b''.join(recv_buff), total_size)
def tcp_send_data(conn, bytes_stream):
'''Send buffer to server.
It is not include tracker header.
arguments:
@conn: connection
@bytes_stream: trasmit buffer
@Return bool
'''
try:
conn._sock.sendall(bytes_stream)
except (socket.error, socket.timeout) as e:
raise ConnectionError('[-] Error: while writting to socket: (%s)' % e.args)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# filename: exceptions.py
'''Core exceptions raised by fdfs client'''
class FDFSError(Exception):
pass
class ConnectionError(FDFSError):
pass
class ResponseError(FDFSError):
pass
class InvaildResponse(FDFSError):
pass
class DataError(FDFSError):
pass
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# filename: fdfs_protol.py
import struct
import socket
from fdfs_client.exceptions import (
FDFSError,
ConnectionError,
ResponseError,
InvaildResponse,
DataError
)
# define FDFS protol constans
TRACKER_PROTO_CMD_STORAGE_JOIN = 81
FDFS_PROTO_CMD_QUIT = 82
TRACKER_PROTO_CMD_STORAGE_BEAT = 83 # storage heart beat
TRACKER_PROTO_CMD_STORAGE_REPORT_DISK_USAGE = 84 # report disk usage
TRACKER_PROTO_CMD_STORAGE_REPLICA_CHG = 85 # repl new storage servers
TRACKER_PROTO_CMD_STORAGE_SYNC_SRC_REQ = 86 # src storage require sync
TRACKER_PROTO_CMD_STORAGE_SYNC_DEST_REQ = 87 # dest storage require sync
TRACKER_PROTO_CMD_STORAGE_SYNC_NOTIFY = 88 # sync done notify
TRACKER_PROTO_CMD_STORAGE_SYNC_REPORT = 89 # report src last synced time as dest server
TRACKER_PROTO_CMD_STORAGE_SYNC_DEST_QUERY = 79 # dest storage query sync src storage server
TRACKER_PROTO_CMD_STORAGE_REPORT_IP_CHANGED = 78 # storage server report it's ip changed
TRACKER_PROTO_CMD_STORAGE_CHANGELOG_REQ = 77 # storage server request storage server's changelog
TRACKER_PROTO_CMD_STORAGE_REPORT_STATUS = 76 # report specified storage server status
TRACKER_PROTO_CMD_STORAGE_PARAMETER_REQ = 75 # storage server request parameters
TRACKER_PROTO_CMD_STORAGE_REPORT_TRUNK_FREE = 74 # storage report trunk free space
TRACKER_PROTO_CMD_STORAGE_REPORT_TRUNK_FID = 73 # storage report current trunk file id
TRACKER_PROTO_CMD_STORAGE_FETCH_TRUNK_FID = 72 # storage get current trunk file id
TRACKER_PROTO_CMD_TRACKER_GET_SYS_FILES_START = 61 # start of tracker get system data files
TRACKER_PROTO_CMD_TRACKER_GET_SYS_FILES_END = 62 # end of tracker get system data files
TRACKER_PROTO_CMD_TRACKER_GET_ONE_SYS_FILE = 63 # tracker get a system data file
TRACKER_PROTO_CMD_TRACKER_GET_STATUS = 64 # tracker get status of other tracker
TRACKER_PROTO_CMD_TRACKER_PING_LEADER = 65 # tracker ping leader
TRACKER_PROTO_CMD_TRACKER_NOTIFY_NEXT_LEADER = 66 # notify next leader to other trackers
TRACKER_PROTO_CMD_TRACKER_COMMIT_NEXT_LEADER = 67 # commit next leader to other trackers
TRACKER_PROTO_CMD_SERVER_LIST_ONE_GROUP = 90
TRACKER_PROTO_CMD_SERVER_LIST_ALL_GROUPS = 91
TRACKER_PROTO_CMD_SERVER_LIST_STORAGE = 92
TRACKER_PROTO_CMD_SERVER_DELETE_STORAGE = 93
TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ONE = 101
TRACKER_PROTO_CMD_SERVICE_QUERY_FETCH_ONE = 102
TRACKER_PROTO_CMD_SERVICE_QUERY_UPDATE = 103
TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITH_GROUP_ONE = 104
TRACKER_PROTO_CMD_SERVICE_QUERY_FETCH_ALL = 105
TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ALL = 106
TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITH_GROUP_ALL = 107
TRACKER_PROTO_CMD_RESP = 100
FDFS_PROTO_CMD_ACTIVE_TEST = 111 # active test, tracker and storage both support since V1.28
STORAGE_PROTO_CMD_REPORT_CLIENT_IP = 9 # ip as tracker client
STORAGE_PROTO_CMD_UPLOAD_FILE = 11
STORAGE_PROTO_CMD_DELETE_FILE = 12
STORAGE_PROTO_CMD_SET_METADATA = 13
STORAGE_PROTO_CMD_DOWNLOAD_FILE = 14
STORAGE_PROTO_CMD_GET_METADATA = 15
STORAGE_PROTO_CMD_SYNC_CREATE_FILE = 16
STORAGE_PROTO_CMD_SYNC_DELETE_FILE = 17
STORAGE_PROTO_CMD_SYNC_UPDATE_FILE = 18
STORAGE_PROTO_CMD_SYNC_CREATE_LINK = 19
STORAGE_PROTO_CMD_CREATE_LINK = 20
STORAGE_PROTO_CMD_UPLOAD_SLAVE_FILE = 21
STORAGE_PROTO_CMD_QUERY_FILE_INFO = 22
STORAGE_PROTO_CMD_UPLOAD_APPENDER_FILE = 23 # create appender file
STORAGE_PROTO_CMD_APPEND_FILE = 24 # append file
STORAGE_PROTO_CMD_SYNC_APPEND_FILE = 25
STORAGE_PROTO_CMD_FETCH_ONE_PATH_BINLOG = 26 # fetch binlog of one store path
STORAGE_PROTO_CMD_RESP = TRACKER_PROTO_CMD_RESP
STORAGE_PROTO_CMD_UPLOAD_MASTER_FILE = STORAGE_PROTO_CMD_UPLOAD_FILE
STORAGE_PROTO_CMD_TRUNK_ALLOC_SPACE = 27 # since V3.00
STORAGE_PROTO_CMD_TRUNK_ALLOC_CONFIRM = 28 # since V3.00
STORAGE_PROTO_CMD_TRUNK_FREE_SPACE = 29 # since V3.00
STORAGE_PROTO_CMD_TRUNK_SYNC_BINLOG = 30 # since V3.00
STORAGE_PROTO_CMD_TRUNK_GET_BINLOG_SIZE = 31 # since V3.07
STORAGE_PROTO_CMD_TRUNK_DELETE_BINLOG_MARKS = 32 # since V3.07
STORAGE_PROTO_CMD_TRUNK_TRUNCATE_BINLOG_FILE = 33 # since V3.07
STORAGE_PROTO_CMD_MODIFY_FILE = 34 # since V3.08
STORAGE_PROTO_CMD_SYNC_MODIFY_FILE = 35 # since V3.08
STORAGE_PROTO_CMD_TRUNCATE_FILE = 36 # since V3.08
STORAGE_PROTO_CMD_SYNC_TRUNCATE_FILE = 37 # since V3.08
# for overwrite all old metadata
STORAGE_SET_METADATA_FLAG_OVERWRITE = 'O'
STORAGE_SET_METADATA_FLAG_OVERWRITE_STR = "O"
# for replace, insert when the meta item not exist, otherwise update it
STORAGE_SET_METADATA_FLAG_MERGE = 'M'
STORAGE_SET_METADATA_FLAG_MERGE_STR = "M"
FDFS_RECORD_SEPERATOR = '\x01'
FDFS_FIELD_SEPERATOR = '\x02'
# common constants
FDFS_GROUP_NAME_MAX_LEN = 16
IP_ADDRESS_SIZE = 16
FDFS_PROTO_PKG_LEN_SIZE = 8
FDFS_PROTO_CMD_SIZE = 1
FDFS_PROTO_STATUS_SIZE = 1
FDFS_PROTO_IP_PORT_SIZE = (IP_ADDRESS_SIZE + 6)
FDFS_MAX_SERVERS_EACH_GROUP = 32
FDFS_MAX_GROUPS = 512
FDFS_MAX_TRACKERS = 16
FDFS_DOMAIN_NAME_MAX_LEN = 128
FDFS_MAX_META_NAME_LEN = 64
FDFS_MAX_META_VALUE_LEN = 256
FDFS_FILE_PREFIX_MAX_LEN = 16
FDFS_LOGIC_FILE_PATH_LEN = 10
FDFS_TRUE_FILE_PATH_LEN = 6
FDFS_FILENAME_BASE64_LENGTH = 27
FDFS_TRUNK_FILE_INFO_LEN = 16
FDFS_FILE_EXT_NAME_MAX_LEN = 6
FDFS_SPACE_SIZE_BASE_INDEX = 2 # storage space size based (MB)
FDFS_UPLOAD_BY_BUFFER = 1
FDFS_UPLOAD_BY_FILENAME = 2
FDFS_UPLOAD_BY_FILE = 3
FDFS_DOWNLOAD_TO_BUFFER = 1
FDFS_DOWNLOAD_TO_FILE = 2
FDFS_NORMAL_LOGIC_FILENAME_LENGTH = (
FDFS_LOGIC_FILE_PATH_LEN + FDFS_FILENAME_BASE64_LENGTH + FDFS_FILE_EXT_NAME_MAX_LEN + 1)
FDFS_TRUNK_FILENAME_LENGTH = (
FDFS_TRUE_FILE_PATH_LEN + FDFS_FILENAME_BASE64_LENGTH + FDFS_TRUNK_FILE_INFO_LEN + 1 + FDFS_FILE_EXT_NAME_MAX_LEN)
FDFS_TRUNK_LOGIC_FILENAME_LENGTH = (FDFS_TRUNK_FILENAME_LENGTH + (FDFS_LOGIC_FILE_PATH_LEN - FDFS_TRUE_FILE_PATH_LEN))
FDFS_VERSION_SIZE = 6
TRACKER_QUERY_STORAGE_FETCH_BODY_LEN = (FDFS_GROUP_NAME_MAX_LEN + IP_ADDRESS_SIZE - 1 + FDFS_PROTO_PKG_LEN_SIZE)
TRACKER_QUERY_STORAGE_STORE_BODY_LEN = (FDFS_GROUP_NAME_MAX_LEN + IP_ADDRESS_SIZE - 1 + FDFS_PROTO_PKG_LEN_SIZE + 1)
# status code, order is important!
FDFS_STORAGE_STATUS_INIT = 0
FDFS_STORAGE_STATUS_WAIT_SYNC = 1
FDFS_STORAGE_STATUS_SYNCING = 2
FDFS_STORAGE_STATUS_IP_CHANGED = 3
FDFS_STORAGE_STATUS_DELETED = 4
FDFS_STORAGE_STATUS_OFFLINE = 5
FDFS_STORAGE_STATUS_ONLINE = 6
FDFS_STORAGE_STATUS_ACTIVE = 7
FDFS_STORAGE_STATUS_RECOVERY = 9
FDFS_STORAGE_STATUS_NONE = 99
class Storage_server(object):
'''Class storage server for upload.'''
def __init__(self):
self.ip_addr = None
self.port = None
self.group_name = ''
self.store_path_index = 0
# Class tracker_header
class Tracker_header(object):
'''
Class for Pack or Unpack tracker header
struct tracker_header{
char pkg_len[FDFS_PROTO_PKG_LEN_SIZE],
char cmd,
char status,
}
'''
def __init__(self):
self.fmt = '!QBB' # pkg_len[FDFS_PROTO_PKG_LEN_SIZE] + cmd + status
self.st = struct.Struct(self.fmt)
self.pkg_len = 0
self.cmd = 0
self.status = 0
def _pack(self, pkg_len=0, cmd=0, status=0):
return self.st.pack(pkg_len, cmd, status)
def _unpack(self, bytes_stream):
self.pkg_len, self.cmd, self.status = self.st.unpack(bytes_stream)
return True
def header_len(self):
return self.st.size
def send_header(self, conn):
'''Send Tracker header to server.'''
header = self._pack(self.pkg_len, self.cmd, self.status)
try:
conn._sock.sendall(header)
except (socket.error, socket.timeout) as e:
raise ConnectionError('[-] Error: while writting to socket: %s' % (e.args,))
def recv_header(self, conn):
'''Receive response from server.
if sucess, class member (pkg_len, cmd, status) is response.
'''
try:
header = conn._sock.recv(self.header_len())
except (socket.error, socket.timeout) as e:
raise ConnectionError('[-] Error: while reading from socket: %s' % (e.args,))
self._unpack(header)
def fdfs_pack_metadata(meta_dict):
ret = ''
for key in meta_dict:
ret += '%s%c%s%c' % (key, FDFS_FIELD_SEPERATOR, meta_dict[key], FDFS_RECORD_SEPERATOR)
return ret[0:-1]
def fdfs_unpack_metadata(bytes_stream):
li = bytes_stream.split(FDFS_RECORD_SEPERATOR)
return dict([item.split(FDFS_FIELD_SEPERATOR) for item in li])
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# filename: fdfs_test.py
import os
import sys
import time
try:
from fdfs_client.client import *
from fdfs_client.exceptions import *
except ImportError:
import_path = os.path.abspath('../')
sys.path.append(import_path)
from fdfs_client.client import *
from fdfs_client.exceptions import *
def usage():
s = 'Usage: python fdfs_test.py {options} [{local_filename} [{remote_file_id}]]\n'
s += 'options: upfile, upbuffer, downfile, downbuffer, delete, listgroup, listserv\n'
s += ' upslavefile, upslavebuffer, upappendfile, upappendbuffer\n'
s += '\tupfile {local_filename}\n'
s += '\tupbuffer {local_filename}\n'
s += '\tdownfile {local_filename} {remote_file_id}\n'
s += '\tdownbuffer {remote_file_id}\n'
s += '\tdelete {remote_file_id}\n'
s += '\tlistgroup {group_name}\n'
s += '\tlistall \n'
s += '\tlistsrv {group_name} [storage_ip]\n'
s += '\tsetmeta {remote_file_id}\n'
s += '\tgetmeta {remote_file_id}\n'
s += '\tupslavefile {local_filename} {remote_fileid} {prefix_name}\n'
s += '\tupappendfile {local_filename}\n'
s += '\ttruncate {truncate_filesize} {remote_fileid}\n'
s += '\tmodifyfile {local_filename} {remote_fileid} {file_offset}\n'
s += '\tmodifybuffer {local_filename} {remote_fileid} {file_offset}\n'
s += 'e.g.: python fdfs_test.py upfile test'
print(s)
sys.exit(0)
if len(sys.argv) < 2:
usage()
client = Fdfs_client('client.conf')
def upfile_func():
# Upload by filename
# usage: python fdfs_test.py upfile {local_filename}
if len(sys.argv) < 3:
usage()
return None
try:
local_filename = sys.argv[2]
file_size = os.stat(local_filename).st_size
# meta_buffer can be null.
meta_dict = {
'ext_name': 'py',
'file_size': str(file_size) + 'B'
}
t1 = time.time()
ret_dict = client.upload_by_filename(local_filename, meta_dict)
t2 = time.time()
for key in ret_dict:
print('[+] %s : %s' % (key, ret_dict[key]))
print('[+] time consume: %fs' % (t2 - t1))
except (ConnectionError, ResponseError, DataError) as e:
print(e)
def upfileex_func():
# Upload by file
# usage: python fdfs_test.py upfileex {local_filename}
if len(sys.argv) < 3:
usage()
return None
try:
local_filename = sys.argv[2]
t1 = time.time()
ret_dict = client.upload_by_file(local_filename)
t2 = time.time()
for key in ret_dict:
print('[+] %s : %s' % (key, ret_dict[key]))
print('[+] time consume: %fs' % (t2 - t1))
except (ConnectionError, ResponseError, DataError) as e:
print(e)
def upslavefile_func():
# upload slave file
# usage: python fdfs_test.py upslavefile {local_filename} {remote_fileid} {prefix_name}
if len(sys.argv) < 5:
usage()
return None
try:
local_filename = sys.argv[2]
remote_fileid = sys.argv[3]
prefix_name = sys.argv[4]
ret_dict = client.upload_slave_by_file(local_filename, remote_fileid, \
prefix_name)
for key in ret_dict:
print('[+] %s : %s' % (key, ret_dict[key]))
except (ConnectionError, ResponseError, DataError) as e:
print(e)
def upslavebuffer_func():
# upload slave by buffer
# usage: python fdfs_test.py upslavebuffer {local_filename} {remote_fileid} {prefix_name}
if len(sys.argv) < 5:
usage()
return None
try:
local_filename = sys.argv[2]
remote_fileid = sys.argv[3]
prefix_name = sys.argv[4]
with open(local_filename, 'rb') as f:
filebuffer = f.read()
ret_dict = client.upload_slave_by_buffer(local_filename, \
remote_fileid, prefix_name)
for key in ret_dict:
print('[+] %s : %s' % (key, ret_dict[key]))
except (ConnectionError, ResponseError, DataError) as e:
print(e)
def del_func():
# delete file
# usage: python fdfs_test.py delete {remote_fileid}
if len(sys.argv) < 3:
usage()
return None
try:
remote_file_id = sys.argv[2]
ret_tuple = client.delete_file(remote_file_id)
print('[+] %s' % ret_tuple[0])
print('[+] remote_fileid: %s' % ret_tuple[1])
print('[+] Storage IP: %s' % ret_tuple[2])
except (ConnectionError, ResponseError, DataError) as e:
print(e)
def downfile_func():
# Download to file
# usage: python fdfs_test.py downfile {local_filename} {remote_fileid}
if len(sys.argv) < 3:
usage()
return None
try:
local_filename = sys.argv[2]
remote_fileid = sys.argv[3]
ret_dict = client.download_to_file(local_filename, remote_fileid)
for key in ret_dict:
print('[+] %s : %s' % (key, ret_dict[key]))
except (ConnectionError, ResponseError, DataError) as e:
print(e)
def list_group_func():
# List one group info
# usage: python fdfs_test.py listgroup {group_name}
if len(sys.argv) < 3:
usage()
return None
try:
group_name = sys.argv[2]
ret = client.list_one_group(group_name)
print(ret)
except (ConnectionError, ResponseError, DataError) as e:
print(e)
def listall_func():
# List all group info
# usage: python fdfs_test.py listall
if len(sys.argv) < 2:
usage()
return None
try:
ret_dict = client.list_all_groups()
print('=' * 80)
print('Groups count:', ret_dict['Groups count'])
for li in ret_dict['Groups']:
print('-' * 80)
print(li)
print('-' * 80)
except (ConnectionError, ResponseError, DataError) as e:
print(e)
def list_server_func():
# List all servers info of group
# usage: python fdfs_test.py listsrv {group_name} [storage_ip]
if len(sys.argv) < 3:
usage()
return None
try:
group_name = sys.argv[2]
if len(sys.argv) > 3:
storage_ip = sys.argv[3]
else:
storage_ip = None
ret_dict = client.list_servers(group_name, storage_ip)
print('=' * 80)
print('Group name: %s' % ret_dict['Group name'])
print('=' * 80)
i = 1
for serv in ret_dict['Servers']:
print('Storage server %d:' % i)
print('=' * 80)
print(serv)
i += 1
print('=' * 80)
except (ConnectionError, ResponseError, DataError) as e:
print(e)
def upbuffer_func():
# Upload by buffer
# usage: python fdfs_test.py upbuffer {local_filename} [remote_file_ext_name]
if len(sys.argv) < 3:
usage()
return None
local_filename = sys.argv[2]
if len(sys.argv) > 3:
ext_name = sys.argv[3]
else:
ext_name = None
# meta_buffer can be null.
meta_buffer = {
'ext_name': 'gif',
'width': '150px',
'height': '80px'
}
try:
with open(local_filename, 'rb') as f:
file_buffer = f.read()
ret_dict = client.upload_by_buffer(file_buffer, ext_name, meta_buffer)
for key in ret_dict:
print('[+] %s : %s' % (key, ret_dict[key]))
except (ConnectionError, ResponseError, DataError) as e:
print(e)
def downbuffer_func():
# Download to buffer
# usage: python fdfs_test.py downbuffer {remote_file_id}
# e.g.: 'group1/M00/00/00/wKjzhU_rLNmjo2-1AAAamGDONEA5818.py'
if len(sys.argv) < 3:
usage()
return None
remote_fileid = sys.argv[2]
try:
ret_dict = client.download_to_buffer(remote_fileid)
print('Downloaded content:')
print(ret_dict['Content'])
except (ConnectionError, ResponseError, DataError) as e:
print(e)
def get_meta_data_func():
# Get meta data of remote file
# usage python fdfs_test.py getmeta {remote_file_id}
if len(sys.argv) < 3:
usage()
return None
remote_fileid = sys.argv[2]
try:
ret_dict = client.get_meta_data(remote_fileid)
print(ret_dict)
except (ConnectionError, ResponseError, DataError) as e:
print(e)
def set_meta_data_func():
# Set meta data of remote file
# usage python fdfs_test.py setmeta {remote_file_id}
if len(sys.argv) < 3:
usage()
return None
remote_fileid = sys.argv[2]
meta_dict = {
'ext_name': 'jgp',
'width': '160px',
'hight': '80px',
}
try:
ret_dict = client.set_meta_data(remote_fileid, meta_dict)
for key in ret_dict:
print('[+] %s : %s' % (key, ret_dict[key]))
except (ConnectionError, ResponseError, DataError) as e:
print(e)
def upappendfile_func():
# Upload an appender file by filename
# usage: python fdfs_test.py upappendfile {local_filename}
if len(sys.argv) < 3:
usage()
return None
local_filename = sys.argv[2]
try:
ret_dict = client.upload_appender_by_file(local_filename)
for key in ret_dict:
print('[+] %s : %s' % (key, ret_dict[key]))
except (ConnectionError, ResponseError, DataError) as e:
print(e)
def upappendbuffer_func():
# Upload an appender file by buffer
# usage: python fdfs_test.py upappendbuffer {local_filename}
if len(sys.argv) < 3:
usage()
return None
local_filename = sys.argv[2]
try:
with open(local_filename, 'rb') as f:
file_buffer = f.read()
ret_dict = client.upload_appender_by_buffer(file_buffer)
for key in ret_dict:
print('[+] %s : %s' % (key, ret_dict[key]))
except (ConnectionError, ResponseError, DataError) as e:
print(e)
def appendfile_func():
# Append a remote file
# usage: python fdfs_test.py appendfile {local_filename} {remote_file_id}
if len(sys.argv) < 4:
usage()
return None
local_filename = sys.argv[2]
remote_fileid = sys.argv[3]
try:
ret_dict = client.append_by_file(local_filename, remote_fileid)
for key in ret_dict:
print('[+] %s : %s' % (key, ret_dict[key]))
except (ConnectionError, ResponseError, DataError) as e:
print(e)
def appendbuffer_func():
# Append a remote file by buffer
# usage: python fdfs_test.py appendbuffer {local_filename} {remote_file_id}
if len(sys.argv) < 4:
usage()
return None
local_filename = sys.argv[2]
remote_fileid = sys.argv[3]
try:
with open(local_filename, 'rb') as f:
filebuffer = f.read()
ret_dict = client.append_by_buffer(filebuffer, remote_fileid)
for key in ret_dict:
print('[+] %s : %s' % (key, ret_dict[key]))
except (ConnectionError, ResponseError, DataError) as e:
print(e)
def truncate_func():
# Truncate file
# usage: python fdfs_test.py truncate {truncate_filesize} {remote_file_id}
if len(sys.argv) < 4:
usage()
return None
truncate_filesize = int(sys.argv[2])
remote_fileid = sys.argv[3]
try:
ret_dict = client.truncate_file(truncate_filesize, remote_fileid)
for key in ret_dict:
print('[+] %s : %s' % (key, ret_dict[key]))
except (ConnectionError, ResponseError, DataError) as e:
print(e)
def modifyfile_func():
# Modify file by filename
# usage: python fdfs_test.py modifyfile {local_filename} {remote_fileid} [file_offset]
if len(sys.argv) < 4:
usage()
return None
local_filename = sys.argv[2]
remote_fileid = sys.argv[3]
if len(sys.argv) > 4:
file_offset = int(sys.argv[4])
else:
file_offset = 0
try:
ret_dict = client.modify_by_filename(local_filename, remote_fileid, file_offset)
for key in ret_dict:
print('[+] %s : %s' % (key, ret_dict[key]))
except (ConnectionError, ResponseError, DataError) as e:
print(e)
def modifybuffer_func():
# Modify file by buffer
# usage: python fdfs_test.py modifybuffer {local_filename} {remote_fileid} [file_offset]
if len(sys.argv) < 4:
usage()
return None
local_filename = sys.argv[2]
remote_fileid = sys.argv[3]
if len(sys.argv) > 4:
file_offset = int(sys.argv[4])
else:
file_offset = 0
try:
with open(local_filename, 'rb') as f:
filebuffer = f.read()
ret_dict = client.modify_by_buffer(filebuffer, remote_fileid, file_offset)
for key in ret_dict:
print('[+] %s : %s' % (key, ret_dict[key]))
except (ConnectionError, ResponseError, DataError) as e:
print(e)
result = {
'upfile': lambda: upfile_func(),
'upfileex': lambda: upfileex_func(),
'upbuffer': lambda: upbuffer_func(),
'delete': lambda: del_func(),
'downfile': lambda: downfile_func(),
'downbuffer': lambda: downbuffer_func(),
'listgroup': lambda: list_group_func(),
'listall': lambda: listall_func(),
'listsrv': lambda: list_server_func(),
'getmeta': lambda: get_meta_data_func(),
'setmeta': lambda: set_meta_data_func(),
'upslavefile': lambda: upslavefile_func(),
'upappendfile': lambda: upappendfile_func(),
'upappendbuffer': lambda: upappendbuffer_func(),
'appendfile': lambda: appendfile_func(),
'appendbuffer': lambda: appendbuffer_func(),
'truncate': lambda: truncate_func(),
'modifyfile': lambda: modifyfile_func(),
'modifybuffer': lambda: modifybuffer_func(),
'-h': lambda: usage(),
}[sys.argv[1].lower()]()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# filename: storage_client.py
import os
import stat
import errno
import struct
import socket
import datetime
import platform
from fdfs_client.fdfs_protol import *
from fdfs_client.connection import *
# from test_fdfs.sendfile import *
from fdfs_client.exceptions import (
FDFSError,
ConnectionError,
ResponseError,
InvaildResponse,
DataError
)
from fdfs_client.utils import *
__os_sep__ = "/" if platform.system() == 'Windows' else os.sep
def tcp_send_file(conn, filename, buffer_size=1024):
'''
Send file to server, and split into multiple pkgs while sending.
arguments:
@conn: connection
@filename: string
@buffer_size: int ,send buffer size
@Return int: file size if success else raise ConnectionError.
'''
file_size = 0
with open(filename, 'rb') as f:
while 1:
try:
send_buffer = f.read(buffer_size)
send_size = len(send_buffer)
if send_size == 0:
break
tcp_send_data(conn, send_buffer)
file_size += send_size
except ConnectionError as e:
raise ConnectionError('[-] Error while uploading file(%s).' % e.args)
except IOError as e:
raise DataError('[-] Error while reading local file(%s).' % e.args)
return file_size
def tcp_send_file_ex(conn, filename, buffer_size=4096):
'''
Send file to server. Using linux system call 'sendfile'.
arguments:
@conn: connection
@filename: string
@return long, sended size
'''
if 'linux' not in sys.platform.lower():
raise DataError('[-] Error: \'sendfile\' system call only available on linux.')
nbytes = 0
offset = 0
sock_fd = conn.get_sock().fileno()
with open(filename, 'rb') as f:
in_fd = f.fileno()
while 1:
try:
pass
# sent = sendfile(sock_fd, in_fd, offset, buffer_size)
# if 0 == sent:
# break
# nbytes += sent
# offset += sent
except OSError as e:
if e.errno == errno.EAGAIN:
continue
raise
return nbytes
def tcp_recv_file(conn, local_filename, file_size, buffer_size=1024):
'''
Receive file from server, fragmented it while receiving and write to disk.
arguments:
@conn: connection
@local_filename: string
@file_size: int, remote file size
@buffer_size: int, receive buffer size
@Return int: file size if success else raise ConnectionError.
'''
total_file_size = 0
flush_size = 0
remain_bytes = file_size
with open(local_filename, 'wb+') as f:
while remain_bytes > 0:
try:
if remain_bytes >= buffer_size:
file_buffer, recv_size = tcp_recv_response(conn, buffer_size, buffer_size)
else:
file_buffer, recv_size = tcp_recv_response(conn, remain_bytes, buffer_size)
f.write(file_buffer)
remain_bytes -= buffer_size
total_file_size += recv_size
flush_size += recv_size
if flush_size >= 4096:
f.flush()
flush_size = 0
except ConnectionError as e:
raise ConnectionError('[-] Error: while downloading file(%s).' % e.args)
except IOError as e:
raise DataError('[-] Error: while writting local file(%s).' % e.args)
return total_file_size
class Storage_client(object):
'''
The Class Storage_client for storage server.
Note: argument host_tuple of storage server ip address, that should be a single element.
'''
def __init__(self, *kwargs):
conn_kwargs = {
'name': 'Storage Pool',
'host_tuple': (kwargs[0],),
'port': kwargs[1],
'timeout': kwargs[2]
}
self.pool = ConnectionPool(**conn_kwargs)
return None
def __del__(self):
try:
self.pool.destroy()
self.pool = None
except:
pass
def update_pool(self, old_store_serv, new_store_serv, timeout=30):
'''
Update connection pool of storage client.
We need update connection pool of storage client, while storage server is changed.
but if server not changed, we do nothing.
'''
if old_store_serv.ip_addr == new_store_serv.ip_addr:
return None
self.pool.destroy()
conn_kwargs = {
'name': 'Storage_pool',
'host_tuple': (new_store_serv.ip_addr,),
'port': new_store_serv.port,
'timeout': timeout
}
self.pool = ConnectionPool(**conn_kwargs)
return True
def _storage_do_upload_file(self, tracker_client, store_serv, file_buffer, file_size=None, upload_type=None,
meta_dict=None, cmd=None, master_filename=None, prefix_name=None, file_ext_name=None):
'''
core of upload file.
arguments:
@tracker_client: Tracker_client, it is useful connect to tracker server
@store_serv: Storage_server, it is return from query tracker server
@file_buffer: string, file name or file buffer for send
@file_size: int
@upload_type: int, optional: FDFS_UPLOAD_BY_FILE, FDFS_UPLOAD_BY_FILENAME,
FDFS_UPLOAD_BY_BUFFER
@meta_dic: dictionary, store metadata in it
@cmd: int, reference fdfs protol
@master_filename: string, useful upload slave file
@prefix_name: string
@file_ext_name: string
@Return dictionary
{
'Group name' : group_name,
'Remote file_id' : remote_file_id,
'Status' : status,
'Local file name' : local_filename,
'Uploaded size' : upload_size,
'Storage IP' : storage_ip
}
'''
store_conn = self.pool.get_connection()
th = Tracker_header()
master_filename_len = len(master_filename) if master_filename else 0
prefix_name_len = len(prefix_name) if prefix_name else 0
upload_slave = len(store_serv.group_name) and master_filename_len
file_ext_name = str(file_ext_name) if file_ext_name else ''
# non_slave_fmt |-store_path_index(1)-file_size(8)-file_ext_name(6)-|
non_slave_fmt = '!B Q %ds' % FDFS_FILE_EXT_NAME_MAX_LEN
# slave_fmt |-master_len(8)-file_size(8)-prefix_name(16)-file_ext_name(6)
# -master_name(master_filename_len)-|
slave_fmt = '!Q Q %ds %ds %ds' % (FDFS_FILE_PREFIX_MAX_LEN, FDFS_FILE_EXT_NAME_MAX_LEN, master_filename_len)
th.pkg_len = struct.calcsize(slave_fmt) if upload_slave else struct.calcsize(non_slave_fmt)
th.pkg_len += file_size
th.cmd = cmd
th.send_header(store_conn)
if upload_slave:
send_buffer = struct.pack(
slave_fmt, master_filename_len, file_size, prefix_name, file_ext_name, master_filename)
else:
send_buffer = struct.pack(non_slave_fmt, store_serv.store_path_index, file_size, file_ext_name.encode())
try:
tcp_send_data(store_conn, send_buffer)
if upload_type == FDFS_UPLOAD_BY_FILENAME:
send_file_size = tcp_send_file(store_conn, file_buffer)
elif upload_type == FDFS_UPLOAD_BY_BUFFER:
tcp_send_data(store_conn, file_buffer)
elif upload_type == FDFS_UPLOAD_BY_FILE:
send_file_size = tcp_send_file_ex(store_conn, file_buffer)
th.recv_header(store_conn)
if th.status != 0:
raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
recv_buffer, recv_size = tcp_recv_response(store_conn, th.pkg_len)
if recv_size <= FDFS_GROUP_NAME_MAX_LEN:
errmsg = '[-] Error: Storage response length is not match, '
errmsg += 'expect: %d, actual: %d' % (th.pkg_len, recv_size)
raise ResponseError(errmsg)
# recv_fmt: |-group_name(16)-remote_file_name(recv_size - 16)-|
recv_fmt = '!%ds %ds' % (FDFS_GROUP_NAME_MAX_LEN, th.pkg_len - FDFS_GROUP_NAME_MAX_LEN)
(group_name, remote_name) = struct.unpack(recv_fmt, recv_buffer)
remote_filename = remote_name.strip(b'\x00')
if meta_dict and len(meta_dict) > 0:
status = self.storage_set_metadata(tracker_client, store_serv, remote_filename, meta_dict)
if status != 0:
# rollback
self.storage_delete_file(tracker_client, store_serv, remote_filename)
raise DataError('[-] Error: %d, %s' % (status, os.strerror(status)))
except:
raise
finally:
self.pool.release(store_conn)
ret_dic = {
'Group name': group_name.strip(b'\x00'),
'Remote file_id': group_name.strip(b'\x00') + __os_sep__.encode() + remote_filename,
'Status': 'Upload successed.',
'Local file name': file_buffer if (upload_type == FDFS_UPLOAD_BY_FILENAME
or upload_type == FDFS_UPLOAD_BY_FILE
) else '',
'Uploaded size': appromix(send_file_size) if (upload_type == FDFS_UPLOAD_BY_FILENAME
or upload_type == FDFS_UPLOAD_BY_FILE
) else appromix(len(file_buffer)),
'Storage IP': store_serv.ip_addr
}
return ret_dic
def storage_upload_by_filename(self, tracker_client, store_serv, filename, meta_dict=None):
file_size = os.stat(filename).st_size
file_ext_name = get_file_ext_name(filename)
return self._storage_do_upload_file(tracker_client, store_serv, filename, file_size, FDFS_UPLOAD_BY_FILENAME,
meta_dict, STORAGE_PROTO_CMD_UPLOAD_FILE, None, None, file_ext_name)
def storage_upload_by_file(self, tracker_client, store_serv, filename, meta_dict=None):
file_size = os.stat(filename).st_size
file_ext_name = get_file_ext_name(filename)
return self._storage_do_upload_file(tracker_client, store_serv, filename, file_size, FDFS_UPLOAD_BY_FILE,
meta_dict, STORAGE_PROTO_CMD_UPLOAD_FILE, None, None, file_ext_name)
def storage_upload_by_buffer(self, tracker_client, store_serv, file_buffer, file_ext_name=None, meta_dict=None):
buffer_size = len(file_buffer)
return self._storage_do_upload_file(tracker_client, store_serv, file_buffer, buffer_size, FDFS_UPLOAD_BY_BUFFER,
meta_dict, STORAGE_PROTO_CMD_UPLOAD_FILE, None, None, file_ext_name)
def storage_upload_slave_by_filename(self, tracker_client, store_serv, filename, prefix_name, remote_filename,
meta_dict=None):
file_size = os.stat(filename).st_size
file_ext_name = get_file_ext_name(filename)
return self._storage_do_upload_file(tracker_client, store_serv, filename, file_size, FDFS_UPLOAD_BY_FILENAME,
meta_dict, STORAGE_PROTO_CMD_UPLOAD_SLAVE_FILE, remote_filename,
prefix_name, file_ext_name)
def storage_upload_slave_by_file(self, tracker_client, store_serv, filename, prefix_name, remote_filename,
meta_dict=None):
file_size = os.stat(filename).st_size
file_ext_name = get_file_ext_name(filename)
return self._storage_do_upload_file(tracker_client, store_serv, filename, file_size, FDFS_UPLOAD_BY_FILE,
meta_dict, STORAGE_PROTO_CMD_UPLOAD_SLAVE_FILE, remote_filename,
prefix_name, file_ext_name)
def storage_upload_slave_by_buffer(self, tracker_client, store_serv, filebuffer, remote_filename, meta_dict,
file_ext_name):
file_size = len(filebuffer)
return self._storage_do_upload_file(tracker_client, store_serv, filebuffer, file_size, FDFS_UPLOAD_BY_BUFFER,
meta_dict, STORAGE_PROTO_CMD_UPLOAD_SLAVE_FILE, None, remote_filename,
file_ext_name)
def storage_upload_appender_by_filename(self, tracker_client, store_serv, filename, meta_dict=None):
file_size = os.stat(filename).st_size
file_ext_name = get_file_ext_name(filename)
return self._storage_do_upload_file(tracker_client, store_serv, filename, file_size, FDFS_UPLOAD_BY_FILENAME,
meta_dict, STORAGE_PROTO_CMD_UPLOAD_APPENDER_FILE, None, None,
file_ext_name)
def storage_upload_appender_by_file(self, tracker_client, store_serv, filename, meta_dict=None):
file_size = os.stat(filename).st_size
file_ext_name = get_file_ext_name(filename)
return self._storage_do_upload_file(tracker_client, store_serv, filename, file_size, FDFS_UPLOAD_BY_FILE,
meta_dict, STORAGE_PROTO_CMD_UPLOAD_APPENDER_FILE, None, None,
file_ext_name)
def storage_upload_appender_by_buffer(self, tracker_client, store_serv, file_buffer, meta_dict=None,
file_ext_name=None):
file_size = len(file_buffer)
return self._storage_do_upload_file(tracker_client, store_serv, file_buffer, file_size, FDFS_UPLOAD_BY_BUFFER,
meta_dict, STORAGE_PROTO_CMD_UPLOAD_APPENDER_FILE, None, None,
file_ext_name)
def storage_delete_file(self, tracker_client, store_serv, remote_filename):
'''
Delete file from storage server.
'''
store_conn = self.pool.get_connection()
th = Tracker_header()
th.cmd = STORAGE_PROTO_CMD_DELETE_FILE
file_name_len = len(remote_filename)
th.pkg_len = FDFS_GROUP_NAME_MAX_LEN + file_name_len
try:
th.send_header(store_conn)
# del_fmt: |-group_name(16)-filename(len)-|
del_fmt = '!%ds %ds' % (FDFS_GROUP_NAME_MAX_LEN, file_name_len)
send_buffer = struct.pack(del_fmt, store_serv.group_name, remote_filename)
tcp_send_data(store_conn, send_buffer)
th.recv_header(store_conn)
# if th.status == 2:
# raise DataError('[-] Error: remote file %s is not exist.'
# % (store_serv.group_name + __os_sep__.encode() + remote_filename))
if th.status != 0:
raise DataError('Error: %d, %s' % (th.status, os.strerror(th.status)))
# recv_buffer, recv_size = tcp_recv_response(store_conn, th.pkg_len)
except:
raise
finally:
self.pool.release(store_conn)
remote_filename = store_serv.group_name + __os_sep__.encode() + remote_filename
return ('Delete file successed.', remote_filename, store_serv.ip_addr)
def _storage_do_download_file(self, tracker_client, store_serv, file_buffer, offset, download_size,
download_type, remote_filename):
'''
Core of download file from storage server.
You can choice download type, optional FDFS_DOWNLOAD_TO_FILE or
FDFS_DOWNLOAD_TO_BUFFER. And you can choice file offset.
@Return dictionary
'Remote file name' : remote_filename,
'Content' : local_filename or buffer,
'Download size' : download_size,
'Storage IP' : storage_ip
'''
store_conn = self.pool.get_connection()
th = Tracker_header()
remote_filename_len = len(remote_filename)
th.pkg_len = FDFS_PROTO_PKG_LEN_SIZE * 2 + FDFS_GROUP_NAME_MAX_LEN + remote_filename_len
th.cmd = STORAGE_PROTO_CMD_DOWNLOAD_FILE
try:
th.send_header(store_conn)
# down_fmt: |-offset(8)-download_bytes(8)-group_name(16)-remote_filename(len)-|
down_fmt = '!Q Q %ds %ds' % (FDFS_GROUP_NAME_MAX_LEN, remote_filename_len)
send_buffer = struct.pack(down_fmt, offset, download_size, store_serv.group_name, remote_filename)
tcp_send_data(store_conn, send_buffer)
th.recv_header(store_conn)
# if th.status == 2:
# raise DataError('[-] Error: remote file %s is not exist.' %
# (store_serv.group_name + __os_sep__.encode() + remote_filename))
if th.status != 0:
raise DataError('Error: %d %s' % (th.status, os.strerror(th.status)))
if download_type == FDFS_DOWNLOAD_TO_FILE:
total_recv_size = tcp_recv_file(store_conn, file_buffer, th.pkg_len)
elif download_type == FDFS_DOWNLOAD_TO_BUFFER:
recv_buffer, total_recv_size = tcp_recv_response(store_conn, th.pkg_len)
except:
raise
finally:
self.pool.release(store_conn)
ret_dic = {
'Remote file_id': store_serv.group_name + __os_sep__.encode() + remote_filename,
'Content': file_buffer if download_type == FDFS_DOWNLOAD_TO_FILE else recv_buffer,
'Download size': appromix(total_recv_size),
'Storage IP': store_serv.ip_addr
}
return ret_dic
def storage_download_to_file(self, tracker_client, store_serv, local_filename, file_offset, download_bytes,
remote_filename):
return self._storage_do_download_file(tracker_client, store_serv, local_filename, file_offset, download_bytes,
FDFS_DOWNLOAD_TO_FILE, remote_filename)
def storage_download_to_buffer(self, tracker_client, store_serv, file_buffer, file_offset, download_bytes,
remote_filename):
return self._storage_do_download_file(tracker_client, store_serv, file_buffer, file_offset, download_bytes,
FDFS_DOWNLOAD_TO_BUFFER, remote_filename)
def storage_set_metadata(self, tracker_client, store_serv, remote_filename, meta_dict,
op_flag=STORAGE_SET_METADATA_FLAG_OVERWRITE):
ret = 0
conn = self.pool.get_connection()
remote_filename_len = len(remote_filename)
meta_buffer = fdfs_pack_metadata(meta_dict)
meta_len = len(meta_buffer)
th = Tracker_header()
th.pkg_len = FDFS_PROTO_PKG_LEN_SIZE * 2 + 1 + FDFS_GROUP_NAME_MAX_LEN + remote_filename_len + meta_len
th.cmd = STORAGE_PROTO_CMD_SET_METADATA
try:
th.send_header(conn)
# meta_fmt: |-filename_len(8)-meta_len(8)-op_flag(1)-group_name(16)
# -filename(remote_filename_len)-meta(meta_len)|
meta_fmt = '!Q Q c %ds %ds %ds' % (FDFS_GROUP_NAME_MAX_LEN, remote_filename_len, meta_len)
send_buffer = struct.pack(meta_fmt, remote_filename_len, meta_len, op_flag, store_serv.group_name,
remote_filename, meta_buffer)
tcp_send_data(conn, send_buffer)
th.recv_header(conn)
if th.status != 0:
ret = th.status
except:
raise
finally:
self.pool.release(conn)
return ret
def storage_get_metadata(self, tracker_client, store_serv, remote_file_name):
store_conn = self.pool.get_connection()
th = Tracker_header()
remote_filename_len = len(remote_file_name)
th.pkg_len = FDFS_GROUP_NAME_MAX_LEN + remote_filename_len
th.cmd = STORAGE_PROTO_CMD_GET_METADATA
try:
th.send_header(store_conn)
# meta_fmt: |-group_name(16)-filename(remote_filename_len)-|
meta_fmt = '!%ds %ds' % (FDFS_GROUP_NAME_MAX_LEN, remote_filename_len)
send_buffer = struct.pack(meta_fmt, store_serv.group_name, remote_file_name.encode())
tcp_send_data(store_conn, send_buffer)
th.recv_header(store_conn)
# if th.status == 2:
# raise DataError('[-] Error: Remote file %s has no meta data.'
# % (store_serv.group_name + __os_sep__.encode() + remote_file_name))
if th.status != 0:
raise DataError('[-] Error:%d, %s' % (th.status, os.strerror(th.status)))
if th.pkg_len == 0:
ret_dict = {}
meta_buffer, recv_size = tcp_recv_response(store_conn, th.pkg_len)
except:
raise
finally:
self.pool.release(store_conn)
ret_dict = fdfs_unpack_metadata(meta_buffer)
return ret_dict
def _storage_do_append_file(self, tracker_client, store_serv, file_buffer, file_size, upload_type,
appended_filename):
store_conn = self.pool.get_connection()
th = Tracker_header()
appended_filename_len = len(appended_filename)
th.pkg_len = FDFS_PROTO_PKG_LEN_SIZE * 2 + appended_filename_len + file_size
th.cmd = STORAGE_PROTO_CMD_APPEND_FILE
try:
th.send_header(store_conn)
# append_fmt: |-appended_filename_len(8)-file_size(8)-appended_filename(len)
# -filecontent(filesize)-|
append_fmt = '!Q Q %ds' % appended_filename_len
send_buffer = struct.pack(append_fmt, appended_filename_len, file_size, appended_filename)
tcp_send_data(store_conn, send_buffer)
if upload_type == FDFS_UPLOAD_BY_FILENAME:
tcp_send_file(store_conn, file_buffer)
elif upload_type == FDFS_UPLOAD_BY_BUFFER:
tcp_send_data(store_conn, file_buffer)
elif upload_type == FDFS_UPLOAD_BY_FILE:
tcp_send_file_ex(store_conn, file_buffer)
th.recv_header(store_conn)
if th.status != 0:
raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
except:
raise
finally:
self.pool.release(store_conn)
ret_dict = {}
ret_dict['Status'] = 'Append file successed.'
ret_dict['Appender file name'] = store_serv.group_name + __os_sep__.encode() + appended_filename
ret_dict['Appended size'] = appromix(file_size)
ret_dict['Storage IP'] = store_serv.ip_addr
return ret_dict
def storage_append_by_filename(self, tracker_client, store_serv, local_filename, appended_filename):
file_size = os.stat(local_filename).st_size
return self._storage_do_append_file(tracker_client, store_serv, local_filename, file_size,
FDFS_UPLOAD_BY_FILENAME, appended_filename)
def storage_append_by_file(self, tracker_client, store_serv, local_filename, appended_filename):
file_size = os.stat(local_filename).st_size
return self._storage_do_append_file(tracker_client, store_serv, local_filename, file_size, FDFS_UPLOAD_BY_FILE,
appended_filename)
def storage_append_by_buffer(self, tracker_client, store_serv, file_buffer, appended_filename):
file_size = len(file_buffer)
return self._storage_do_append_file(tracker_client, store_serv, file_buffer, file_size, FDFS_UPLOAD_BY_BUFFER,
appended_filename)
def _storage_do_truncate_file(self, tracker_client, store_serv, truncated_filesize, appender_filename):
store_conn = self.pool.get_connection()
th = Tracker_header()
th.cmd = STORAGE_PROTO_CMD_TRUNCATE_FILE
appender_filename_len = len(appender_filename)
th.pkg_len = FDFS_PROTO_PKG_LEN_SIZE * 2 + appender_filename_len
try:
th.send_header(store_conn)
# truncate_fmt:|-appender_filename_len(8)-truncate_filesize(8)
# -appender_filename(len)-|
truncate_fmt = '!Q Q %ds' % appender_filename_len
send_buffer = struct.pack(truncate_fmt, appender_filename_len, truncated_filesize, appender_filename)
tcp_send_data(store_conn, send_buffer)
th.recv_header(store_conn)
if th.status != 0:
raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
except:
raise
finally:
self.pool.release(store_conn)
ret_dict = {}
ret_dict['Status'] = 'Truncate successed.'
ret_dict['Storage IP'] = store_serv.ip_addr
return ret_dict
def storage_truncate_file(self, tracker_client, store_serv, truncated_filesize, appender_filename):
return self._storage_do_truncate_file(tracker_client, store_serv, truncated_filesize, appender_filename)
def _storage_do_modify_file(self, tracker_client, store_serv, upload_type, filebuffer, offset, filesize,
appender_filename):
store_conn = self.pool.get_connection()
th = Tracker_header()
th.cmd = STORAGE_PROTO_CMD_MODIFY_FILE
appender_filename_len = len(appender_filename)
th.pkg_len = FDFS_PROTO_PKG_LEN_SIZE * 3 + appender_filename_len + filesize
try:
th.send_header(store_conn)
# modify_fmt: |-filename_len(8)-offset(8)-filesize(8)-filename(len)-|
modify_fmt = '!Q Q Q %ds' % appender_filename_len
send_buffer = struct.pack(modify_fmt, appender_filename_len, offset, filesize, appender_filename)
tcp_send_data(store_conn, send_buffer)
if upload_type == FDFS_UPLOAD_BY_FILENAME:
upload_size = tcp_send_file(store_conn, filebuffer)
elif upload_type == FDFS_UPLOAD_BY_BUFFER:
tcp_send_data(store_conn, filebuffer)
elif upload_type == FDFS_UPLOAD_BY_FILE:
upload_size = tcp_send_file_ex(store_conn, filebuffer)
th.recv_header(store_conn)
if th.status != 0:
raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
except:
raise
finally:
self.pool.release(store_conn)
ret_dict = {}
ret_dict['Status'] = 'Modify successed.'
ret_dict['Storage IP'] = store_serv.ip_addr
return ret_dict
def storage_modify_by_filename(self, tracker_client, store_serv, filename, offset, filesize, appender_filename):
return self._storage_do_modify_file(tracker_client, store_serv, FDFS_UPLOAD_BY_FILENAME, filename, offset,
filesize, appender_filename)
def storage_modify_by_file(self, tracker_client, store_serv, filename, offset, filesize, appender_filename):
return self._storage_do_modify_file(tracker_client, store_serv, FDFS_UPLOAD_BY_FILE, filename, offset, filesize,
appender_filename)
def storage_modify_by_buffer(self, tracker_client, store_serv, filebuffer, offset, filesize, appender_filename):
return self._storage_do_modify_file(tracker_client, store_serv, FDFS_UPLOAD_BY_BUFFER, filebuffer, offset,
filesize, appender_filename)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# filename: tracker_client.py
import struct
import socket
from datetime import datetime
from fdfs_client.fdfs_protol import *
from fdfs_client.connection import *
from fdfs_client.exceptions import (
FDFSError,
ConnectionError,
ResponseError,
InvaildResponse,
DataError
)
from fdfs_client.utils import *
def parse_storage_status(status_code):
try:
ret = {
FDFS_STORAGE_STATUS_INIT: lambda: 'INIT',
FDFS_STORAGE_STATUS_WAIT_SYNC: lambda: 'WAIT_SYNC',
FDFS_STORAGE_STATUS_SYNCING: lambda: 'SYNCING',
FDFS_STORAGE_STATUS_IP_CHANGED: lambda: 'IP_CHANGED',
FDFS_STORAGE_STATUS_DELETED: lambda: 'DELETED',
FDFS_STORAGE_STATUS_OFFLINE: lambda: 'OFFLINE',
FDFS_STORAGE_STATUS_ONLINE: lambda: 'ONLINE',
FDFS_STORAGE_STATUS_ACTIVE: lambda: 'ACTIVE',
FDFS_STORAGE_STATUS_RECOVERY: lambda: 'RECOVERY'
}[status_code]()
except KeyError:
ret = 'UNKNOW'
return ret
class Storage_info(object):
def __init__(self):
self.status = 0
self.id = ''
self.ip_addr = ''
self.domain_name = ''
self.src_id = ''
self.version = ''
self.join_time = datetime.fromtimestamp(0).isoformat()
self.up_time = datetime.fromtimestamp(0).isoformat()
self.totalMB = ''
self.freeMB = ''
self.upload_prio = 0
self.store_path_count = 0
self.subdir_count_per_path = 0
self.curr_write_path = 0
self.storage_port = 23000
self.storage_http_port = 80
self.alloc_count = 0
self.current_count = 0
self.max_count = 0
self.total_upload_count = 0
self.success_upload_count = 0
self.total_append_count = 0
self.success_append_count = 0
self.total_modify_count = 0
self.success_modify_count = 0
self.total_truncate_count = 0
self.success_truncate_count = 0
self.total_setmeta_count = 0
self.success_setmeta_count = 0
self.total_del_count = 0
self.success_del_count = 0
self.total_download_count = 0
self.success_download_count = 0
self.total_getmeta_count = 0
self.success_getmeta_count = 0
self.total_create_link_count = 0
self.success_create_link_count = 0
self.total_del_link_count = 0
self.success_del_link_count = 0
self.total_upload_bytes = 0
self.success_upload_bytes = 0
self.total_append_bytes = 0
self.success_append_bytes = 0
self.total_modify_bytes = 0
self.success_modify_bytes = 0
self.total_download_bytes = 0
self.success_download_bytes = 0
self.total_sync_in_bytes = 0
self.success_sync_in_bytes = 0
self.total_sync_out_bytes = 0
self.success_sync_out_bytes = 0
self.total_file_open_count = 0
self.success_file_open_count = 0
self.total_file_read_count = 0
self.success_file_read_count = 0
self.total_file_write_count = 0
self.success_file_write_count = 0
self.last_source_sync = datetime.fromtimestamp(0).isoformat()
self.last_sync_update = datetime.fromtimestamp(0).isoformat()
self.last_synced_time = datetime.fromtimestamp(0).isoformat()
self.last_heartbeat_time = datetime.fromtimestamp(0).isoformat()
self.if_trunk_server = ''
# fmt = |-status(1)-ipaddr(16)-domain(128)-srcipaddr(16)-ver(6)-52*8-|
self.fmt = '!B 16s 16s 128s 16s 6s 10Q 4s4s4s 42Q?'
def set_info(self, bytes_stream):
(self.status, self.id, ip_addr, domain_name, self.src_id, version, join_time, up_time, totalMB, freeMB,
self.upload_prio, self.store_path_count, self.subdir_count_per_path, self.curr_write_path, self.storage_port,
self.storage_http_port, self.alloc_count, self.current_count, self.max_count, self.total_upload_count,
self.success_upload_count, self.total_append_count, self.success_append_count, self.total_modify_count,
self.success_modify_count, self.total_truncate_count, self.success_truncate_count, self.total_setmeta_count,
self.success_setmeta_count, self.total_del_count, self.success_del_count, self.total_download_count,
self.success_download_count, self.total_getmeta_count, self.success_getmeta_count,
self.total_create_link_count, self.success_create_link_count, self.total_del_link_count,
self.success_del_link_count, self.total_upload_bytes, self.success_upload_bytes, self.total_append_bytes,
self.total_append_bytes, self.total_modify_bytes, self.success_modify_bytes, self.total_download_bytes,
self.success_download_bytes, self.total_sync_in_bytes, self.success_sync_in_bytes, self.total_sync_out_bytes,
self.success_sync_out_bytes, self.total_file_open_count, self.success_file_open_count,
self.total_file_read_count, self.success_file_read_count, self.total_file_write_count,
self.success_file_write_count, last_source_sync, last_sync_update, last_synced_time, last_heartbeat_time,
self.if_trunk_server,) = struct.unpack(self.fmt, bytes_stream)
try:
self.ip_addr = ip_addr.strip(b'\x00')
self.domain_name = domain_name.strip(b'\x00')
self.version = version.strip(b'\x00')
self.totalMB = appromix(totalMB, FDFS_SPACE_SIZE_BASE_INDEX)
self.freeMB = appromix(freeMB, FDFS_SPACE_SIZE_BASE_INDEX)
except ValueError as e:
raise ResponseError('[-] Error: disk space overrun, can not represented it.')
self.join_time = datetime.fromtimestamp(join_time).isoformat()
self.up_time = datetime.fromtimestamp(up_time).isoformat()
self.last_source_sync = datetime.fromtimestamp(last_source_sync).isoformat()
self.last_sync_update = datetime.fromtimestamp(last_sync_update).isoformat()
self.last_synced_time = datetime.fromtimestamp(last_synced_time).isoformat()
self.last_heartbeat_time = datetime.fromtimestamp(last_heartbeat_time).isoformat()
return True
def __str__(self):
'''Transform to readable string.'''
s = 'Storage information:\n'
s += '\tip_addr = %s (%s)\n' % (self.ip_addr, parse_storage_status(self.status))
s += '\thttp domain = %s\n' % self.domain_name
s += '\tversion = %s\n' % self.version
s += '\tjoin time = %s\n' % self.join_time
s += '\tup time = %s\n' % self.up_time
s += '\ttotal storage = %s\n' % self.totalMB
s += '\tfree storage = %s\n' % self.freeMB
s += '\tupload priority = %d\n' % self.upload_prio
s += '\tstore path count = %d\n' % self.store_path_count
s += '\tsubdir count per path = %d\n' % self.subdir_count_per_path
s += '\tstorage port = %d\n' % self.storage_port
s += '\tstorage HTTP port = %d\n' % self.storage_http_port
s += '\tcurrent write path = %d\n' % self.curr_write_path
s += '\tsource ip_addr = %s\n' % self.ip_addr
s += '\tif_trunk_server = %d\n' % self.if_trunk_server
s += '\ttotal upload count = %ld\n' % self.total_upload_count
s += '\tsuccess upload count = %ld\n' % self.success_upload_count
s += '\ttotal download count = %ld\n' % self.total_download_count
s += '\tsuccess download count = %ld\n' % self.success_download_count
s += '\ttotal append count = %ld\n' % self.total_append_count
s += '\tsuccess append count = %ld\n' % self.success_append_count
s += '\ttotal modify count = %ld\n' % self.total_modify_count
s += '\tsuccess modify count = %ld\n' % self.success_modify_count
s += '\ttotal truncate count = %ld\n' % self.total_truncate_count
s += '\tsuccess truncate count = %ld\n' % self.success_truncate_count
s += '\ttotal delete count = %ld\n' % self.total_del_count
s += '\tsuccess delete count = %ld\n' % self.success_del_count
s += '\ttotal set_meta count = %ld\n' % self.total_setmeta_count
s += '\tsuccess set_meta count = %ld\n' % self.success_setmeta_count
s += '\ttotal get_meta count = %ld\n' % self.total_getmeta_count
s += '\tsuccess get_meta count = %ld\n' % self.success_getmeta_count
s += '\ttotal create link count = %ld\n' % self.total_create_link_count
s += '\tsuccess create link count = %ld\n' % self.success_create_link_count
s += '\ttotal delete link count = %ld\n' % self.total_del_link_count
s += '\tsuccess delete link count = %ld\n' % self.success_del_link_count
s += '\ttotal upload bytes = %ld\n' % self.total_upload_bytes
s += '\tsuccess upload bytes = %ld\n' % self.success_upload_bytes
s += '\ttotal download bytes = %ld\n' % self.total_download_bytes
s += '\tsuccess download bytes = %ld\n' % self.success_download_bytes
s += '\ttotal append bytes = %ld\n' % self.total_append_bytes
s += '\tsuccess append bytes = %ld\n' % self.success_append_bytes
s += '\ttotal modify bytes = %ld\n' % self.total_modify_bytes
s += '\tsuccess modify bytes = %ld\n' % self.success_modify_bytes
s += '\ttotal sync_in bytes = %ld\n' % self.total_sync_in_bytes
s += '\tsuccess sync_in bytes = %ld\n' % self.success_sync_in_bytes
s += '\ttotal sync_out bytes = %ld\n' % self.total_sync_out_bytes
s += '\tsuccess sync_out bytes = %ld\n' % self.success_sync_out_bytes
s += '\ttotal file open count = %ld\n' % self.total_file_open_count
s += '\tsuccess file open count = %ld\n' % self.success_file_open_count
s += '\ttotal file read count = %ld\n' % self.total_file_read_count
s += '\tsuccess file read count = %ld\n' % self.success_file_read_count
s += '\ttotal file write count = %ld\n' % self.total_file_write_count
s += '\tsucess file write count = %ld\n' % self.success_file_write_count
s += '\tlast heartbeat time = %s\n' % self.last_heartbeat_time
s += '\tlast source update = %s\n' % self.last_source_sync
s += '\tlast sync update = %s\n' % self.last_sync_update
s += '\tlast synced time = %s\n' % self.last_synced_time
return s
def get_fmt_size(self):
return struct.calcsize(self.fmt)
class Group_info(object):
def __init__(self):
self.group_name = ''
self.totalMB = ''
self.freeMB = ''
self.trunk_freeMB = ''
self.count = 0
self.storage_port = 0
self.store_http_port = 0
self.active_count = 0
self.curr_write_server = 0
self.store_path_count = 0
self.subdir_count_per_path = 0
self.curr_trunk_file_id = 0
self.fmt = '!%ds 11Q' % (FDFS_GROUP_NAME_MAX_LEN + 1)
return None
def __str__(self):
s = 'Group information:\n'
s += '\tgroup name = %s\n' % self.group_name
s += '\ttotal disk space = %s\n' % self.totalMB
s += '\tdisk free space = %s\n' % self.freeMB
s += '\ttrunk free space = %s\n' % self.trunk_freeMB
s += '\tstorage server count = %d\n' % self.count
s += '\tstorage port = %d\n' % self.storage_port
s += '\tstorage HTTP port = %d\n' % self.store_http_port
s += '\tactive server count = %d\n' % self.active_count
s += '\tcurrent write server index = %d\n' % self.curr_write_server
s += '\tstore path count = %d\n' % self.store_path_count
s += '\tsubdir count per path = %d\n' % self.subdir_count_per_path
s += '\tcurrent trunk file id = %d\n' % self.curr_trunk_file_id
return s
def set_info(self, bytes_stream):
(group_name, totalMB, freeMB, trunk_freeMB, self.count, self.storage_port, self.store_http_port,
self.active_count, self.curr_write_server, self.store_path_count, self.subdir_count_per_path,
self.curr_trunk_file_id) = struct.unpack(self.fmt, bytes_stream)
try:
self.group_name = group_name.strip(b'\x00')
self.freeMB = appromix(freeMB, FDFS_SPACE_SIZE_BASE_INDEX)
self.totalMB = appromix(totalMB, FDFS_SPACE_SIZE_BASE_INDEX)
self.trunk_freeMB = appromix(trunk_freeMB, FDFS_SPACE_SIZE_BASE_INDEX)
except ValueError:
raise DataError('[-] Error disk space overrun, can not represented it.')
def get_fmt_size(self):
return struct.calcsize(self.fmt)
class Tracker_client(object):
'''Class Tracker client.'''
def __init__(self, pool):
self.pool = pool
def tracker_list_servers(self, group_name, storage_ip=None):
'''
List servers in a storage group
'''
conn = self.pool.get_connection()
th = Tracker_header()
ip_len = len(storage_ip) if storage_ip else 0
if ip_len >= IP_ADDRESS_SIZE:
ip_len = IP_ADDRESS_SIZE - 1
th.pkg_len = FDFS_GROUP_NAME_MAX_LEN + ip_len
th.cmd = TRACKER_PROTO_CMD_SERVER_LIST_STORAGE
group_fmt = '!%ds' % FDFS_GROUP_NAME_MAX_LEN
store_ip_addr = storage_ip or ''
storage_ip_fmt = '!%ds' % ip_len
try:
th.send_header(conn)
send_buffer = struct.pack(group_fmt, group_name) + struct.pack(storage_ip_fmt, store_ip_addr)
tcp_send_data(conn, send_buffer)
th.recv_header(conn)
if th.status != 0:
raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
recv_buffer, recv_size = tcp_recv_response(conn, th.pkg_len)
si = Storage_info()
si_fmt_size = si.get_fmt_size()
recv_size = len(recv_buffer)
if recv_size % si_fmt_size != 0:
errinfo = '[-] Error: response size not match, expect: %d, actual: %d' % (th.pkg_len, recv_size)
raise ResponseError(errinfo)
except ConnectionError:
raise
finally:
self.pool.release(conn)
num_storage = recv_size / si_fmt_size
si_list = []
i = 0
while num_storage:
si.set_info(recv_buffer[(i * si_fmt_size): ((i + 1) * si_fmt_size)])
si_list.append(si)
si = Storage_info()
num_storage -= 1
i += 1
ret_dict = {}
ret_dict['Group name'] = group_name
ret_dict['Servers'] = si_list
return ret_dict
def tracker_list_one_group(self, group_name):
conn = self.pool.get_connection()
th = Tracker_header()
th.pkg_len = FDFS_GROUP_NAME_MAX_LEN
th.cmd = TRACKER_PROTO_CMD_SERVER_LIST_ONE_GROUP
# group_fmt: |-group_name(16)-|
group_fmt = '!%ds' % FDFS_GROUP_NAME_MAX_LEN
try:
th.send_header(conn)
send_buffer = struct.pack(group_fmt, group_name)
tcp_send_data(conn, send_buffer)
th.recv_header(conn)
if th.status != 0:
raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
recv_buffer, recv_size = tcp_recv_response(conn, th.pkg_len)
group_info = Group_info()
group_info.set_info(recv_buffer)
except ConnectionError:
raise
finally:
self.pool.release(conn)
return group_info
def tracker_list_all_groups(self):
conn = self.pool.get_connection()
th = Tracker_header()
th.cmd = TRACKER_PROTO_CMD_SERVER_LIST_ALL_GROUPS
try:
th.send_header(conn)
th.recv_header(conn)
if th.status != 0:
raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
recv_buffer, recv_size = tcp_recv_response(conn, th.pkg_len)
except:
raise
finally:
self.pool.release(conn)
gi = Group_info()
gi_fmt_size = gi.get_fmt_size()
if recv_size % gi_fmt_size != 0:
errmsg = '[-] Error: Response size is mismatch, except: %d, actul: %d' % (th.pkg_len, recv_size)
raise ResponseError(errmsg)
num_groups = recv_size / gi_fmt_size
ret_dict = {}
ret_dict['Groups count'] = num_groups
gi_list = []
i = 0
while num_groups:
gi.set_info(recv_buffer[i * gi_fmt_size: (i + 1) * gi_fmt_size])
gi_list.append(gi)
gi = Group_info()
i += 1
num_groups -= 1
ret_dict['Groups'] = gi_list
return ret_dict
def tracker_query_storage_stor_without_group(self):
'''Query storage server for upload, without group name.
Return: Storage_server object'''
conn = self.pool.get_connection()
th = Tracker_header()
th.cmd = TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ONE
try:
th.send_header(conn)
th.recv_header(conn)
if th.status != 0:
raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
recv_buffer, recv_size = tcp_recv_response(conn, th.pkg_len)
if recv_size != TRACKER_QUERY_STORAGE_STORE_BODY_LEN:
errmsg = '[-] Error: Tracker response length is invaild, '
errmsg += 'expect: %d, actual: %d' % (TRACKER_QUERY_STORAGE_STORE_BODY_LEN, recv_size)
raise ResponseError(errmsg)
except ConnectionError:
raise
finally:
self.pool.release(conn)
# recv_fmt |-group_name(16)-ipaddr(16-1)-port(8)-store_path_index(1)|
recv_fmt = '!%ds %ds Q B' % (FDFS_GROUP_NAME_MAX_LEN, IP_ADDRESS_SIZE - 1)
store_serv = Storage_server()
(group_name, ip_addr, store_serv.port, store_serv.store_path_index) = struct.unpack(recv_fmt, recv_buffer)
store_serv.group_name = group_name.strip(b'\x00')
store_serv.ip_addr = ip_addr.strip(b'\x00')
return store_serv
def tracker_query_storage_stor_with_group(self, group_name):
'''Query storage server for upload, based group name.
arguments:
@group_name: string
@Return Storage_server object
'''
conn = self.pool.get_connection()
th = Tracker_header()
th.cmd = TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITH_GROUP_ONE
th.pkg_len = FDFS_GROUP_NAME_MAX_LEN
th.send_header(conn)
group_fmt = '!%ds' % FDFS_GROUP_NAME_MAX_LEN
send_buffer = struct.pack(group_fmt, group_name)
try:
tcp_send_data(conn, send_buffer)
th.recv_header(conn)
if th.status != 0:
raise DataError('Error: %d, %s' % (th.status, os.strerror(th.status)))
recv_buffer, recv_size = tcp_recv_response(conn, th.pkg_len)
if recv_size != TRACKER_QUERY_STORAGE_STORE_BODY_LEN:
errmsg = '[-] Error: Tracker response length is invaild, '
errmsg += 'expect: %d, actual: %d' % (TRACKER_QUERY_STORAGE_STORE_BODY_LEN, recv_size)
raise ResponseError(errmsg)
except ConnectionError:
raise
finally:
self.pool.release(conn)
# recv_fmt: |-group_name(16)-ipaddr(16-1)-port(8)-store_path_index(1)-|
recv_fmt = '!%ds %ds Q B' % (FDFS_GROUP_NAME_MAX_LEN, IP_ADDRESS_SIZE - 1)
store_serv = Storage_server()
(group, ip_addr, store_serv.port, store_serv.store_path_index) = struct.unpack(recv_fmt, recv_buffer)
store_serv.group_name = group.strip(b'\x00')
store_serv.ip_addr = ip_addr.strip(b'\x00')
return store_serv
def _tracker_do_query_storage(self, group_name, filename, cmd):
'''
core of query storage, based group name and filename.
It is useful download, delete and set meta.
arguments:
@group_name: string
@filename: string. remote file_id
@Return: Storage_server object
'''
conn = self.pool.get_connection()
th = Tracker_header()
file_name_len = len(filename)
th.pkg_len = FDFS_GROUP_NAME_MAX_LEN + file_name_len
th.cmd = cmd
th.send_header(conn)
# query_fmt: |-group_name(16)-filename(file_name_len)-|
query_fmt = '!%ds %ds' % (FDFS_GROUP_NAME_MAX_LEN, file_name_len)
send_buffer = struct.pack(query_fmt, group_name, filename)
try:
tcp_send_data(conn, send_buffer)
th.recv_header(conn)
if th.status != 0:
raise DataError('Error: %d, %s' % (th.status, os.strerror(th.status)))
recv_buffer, recv_size = tcp_recv_response(conn, th.pkg_len)
if recv_size != TRACKER_QUERY_STORAGE_FETCH_BODY_LEN:
errmsg = '[-] Error: Tracker response length is invaild, '
errmsg += 'expect: %d, actual: %d' % (th.pkg_len, recv_size)
raise ResponseError(errmsg)
except ConnectionError:
raise
finally:
self.pool.release(conn)
# recv_fmt: |-group_name(16)-ip_addr(16)-port(8)-|
recv_fmt = '!%ds %ds Q' % (FDFS_GROUP_NAME_MAX_LEN, IP_ADDRESS_SIZE - 1)
store_serv = Storage_server()
(group_name, ipaddr, store_serv.port) = struct.unpack(recv_fmt, recv_buffer)
store_serv.group_name = group_name.strip(b'\x00')
store_serv.ip_addr = ipaddr.strip(b'\x00')
return store_serv
def tracker_query_storage_update(self, group_name, filename):
'''
Query storage server to update(delete and set_meta).
'''
return self._tracker_do_query_storage(group_name, filename, TRACKER_PROTO_CMD_SERVICE_QUERY_UPDATE)
def tracker_query_storage_fetch(self, group_name, filename):
'''
Query storage server to download.
'''
return self._tracker_do_query_storage(group_name, filename, TRACKER_PROTO_CMD_SERVICE_QUERY_FETCH_ONE)
#!/usr/bin/env python
# -*- coding = utf-8 -*-
# filename: utils.py
import io
import os
import sys
import stat
import platform
import configparser
SUFFIX = ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']
__os_sep__ = "/" if platform.system() == 'Windows' else os.sep
def appromix(size, base=0):
'''Conver bytes stream size to human-readable format.
Keyword arguments:
size: int, bytes stream size
base: int, suffix index
Return: string
'''
multiples = 1024
if size < 0:
raise ValueError('[-] Error: number must be non-negative.')
if size < multiples:
return '{0:d}{1}'.format(size, SUFFIX[base])
for suffix in SUFFIX[base:]:
if size < multiples:
return '{0:.2f}{1}'.format(size, suffix)
size = size / float(multiples)
raise ValueError('[-] Error: number too big.')
def get_file_ext_name(filename, double_ext=True):
li = filename.split(os.extsep)
if len(li) <= 1:
return ''
else:
if li[-1].find(__os_sep__) != -1:
return ''
if double_ext:
if len(li) > 2:
if li[-2].find(__os_sep__) == -1:
return '%s.%s' % (li[-2], li[-1])
return li[-1]
class Fdfs_ConfigParser(configparser.RawConfigParser):
"""
Extends ConfigParser to allow files without sections.
This is done by wrapping read files and prepending them with a placeholder
section, which defaults to '__config__'
"""
def __init__(self, default_section=None, *args, **kwargs):
configparser.RawConfigParser.__init__(self, *args, **kwargs)
self._default_section = None
self.set_default_section(default_section or '__config__')
def get_default_section(self):
return self._default_section
def set_default_section(self, section):
self.add_section(section)
# move all values from the previous default section to the new one
try:
default_section_items = self.items(self._default_section)
self.remove_section(self._default_section)
except configparser.NoSectionError:
pass
else:
for (key, value) in default_section_items:
self.set(section, key, value)
self._default_section = section
def read(self, filenames):
if isinstance(filenames, str):
filenames = [filenames]
read_ok = []
for filename in filenames:
try:
with open(filename) as fp:
self.readfp(fp)
except IOError:
continue
else:
read_ok.append(filename)
return read_ok
def readfp(self, fp, *args, **kwargs):
stream = io.StringIO()
try:
stream.name = fp.name
except AttributeError:
pass
stream.write('[' + self._default_section + ']\n')
stream.write(fp.read())
stream.seek(0, 0)
return self._read(stream, stream.name)
def write(self, fp):
# Write the items from the default section manually and then remove them
# from the data. They'll be re-added later.
try:
default_section_items = self.items(self._default_section)
self.remove_section(self._default_section)
for (key, value) in default_section_items:
fp.write("{0} = {1}\n".format(key, value))
fp.write("\n")
except configparser.NoSectionError:
pass
configparser.RawConfigParser.write(self, fp)
self.add_section(self._default_section)
for (key, value) in default_section_items:
self.set(self._default_section, key, value)
def _read(self, fp, fpname):
"""Parse a sectioned setup file.
The sections in setup file contains a title line at the top,
indicated by a name in square brackets (`[]'), plus key/value
options lines, indicated by `name: value' format lines.
Continuations are represented by an embedded newline then
leading whitespace. Blank lines, lines beginning with a '#',
and just about everything else are ignored.
"""
cursect = None # None, or a dictionary
optname = None
lineno = 0
e = None # None, or an exception
while True:
line = fp.readline()
if not line:
break
lineno = lineno + 1
# comment or blank line?
if line.strip() == '' or line[0] in '#;':
continue
if line.split(None, 1)[0].lower() == 'rem' and line[0] in "rR":
# no leading whitespace
continue
# continuation line?
if line[0].isspace() and cursect is not None and optname:
value = line.strip()
if value:
cursect[optname] = "%s\n%s" % (cursect[optname], value)
# a section header or option header?
else:
# is it a section header?
mo = self.SECTCRE.match(line)
if mo:
sectname = mo.group('header')
if sectname in self._sections:
cursect = self._sections[sectname]
elif sectname == DEFAULTSECT:
cursect = self._defaults
else:
cursect = self._dict()
cursect['__name__'] = sectname
self._sections[sectname] = cursect
# So sections can't start with a continuation line
optname = None
# no section header in the file?
elif cursect is None:
raise MissingSectionHeaderError(fpname, lineno, line)
# an option line?
else:
mo = self.OPTCRE.match(line)
if mo:
optname, vi, optval = mo.group('option', 'vi', 'value')
if vi in ('=', ':') and ';' in optval:
# ';' is a comment delimiter only if it follows
# a spacing character
pos = optval.find(';')
if pos != -1 and optval[pos - 1].isspace():
optval = optval[:pos]
optval = optval.strip()
# allow empty values
if optval == '""':
optval = ''
optname = self.optionxform(optname.rstrip())
if optname in cursect:
if not isinstance(cursect[optname], list):
cursect[optname] = [cursect[optname]]
cursect[optname].append(optval)
else:
cursect[optname] = optval
else:
# a non-fatal parsing error occurred. set up the
# exception but keep going. the exception will be
# raised at the end of the file and will contain a
# list of all bogus lines
if not e:
e = ParsingError(fpname)
e.append(lineno, repr(line))
# if any parsing errors occurred, raise an exception
if e:
raise e
def split_remote_fileid(remote_file_id):
'''
Splite remote_file_id to (group_name, remote_file_name)
arguments:
@remote_file_id: string
@return tuple, (group_name, remote_file_name)
'''
index = remote_file_id.find(b'/')
if -1 == index:
return None
return (remote_file_id[0:index], remote_file_id[(index + 1):])
def fdfs_check_file(filename):
ret = True
errmsg = ''
if not os.path.isfile(filename):
ret = False
errmsg = '[-] Error: %s is not a file.' % filename
elif not stat.S_ISREG(os.stat(filename).st_mode):
ret = False
errmsg = '[-] Error: %s is not a regular file.' % filename
return (ret, errmsg)
if __name__ == '__main__':
print(get_file_ext_name('/bc.tar.gz'))
import requests, re, time, pymysql
import requests, re, time, pymysql
from bs4 import BeautifulSoup as bs
from fdfs_client.client import get_tracker_conf, Fdfs_client
from base import BaseCore
baseCore = BaseCore.BaseCore()
requests.adapters.DEFAULT_RETRIES = 3
log = baseCore.getLogger()
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
cursor = cnx.cursor()
tracker_conf = get_tracker_conf('./client.conf')
client = Fdfs_client(tracker_conf)
taskType = '企业年报/雪球网'
def tableUpdate(year, com_name, type_id, item_id, group_name, path, full_path, category, file_size, order_by, status,
create_by, create_time):
sel_sql = '''select item_id from clb_sys_attachment where item_id = %s and year = %s'''
cursor.execute(sel_sql, (item_id, year))
selects = cursor.fetchone()
if selects:
print(f'{com_name},{year}已存在')
else:
Upsql = '''insert into clb_sys_attachment(year,name,type_id,item_id,group_name,path,full_path,category,file_size,order_by,status,create_by,create_time) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
values = (
year, com_name, type_id, item_id, group_name, path, full_path, category, file_size, order_by, status,
create_by,
create_time)
cursor.execute(Upsql, values) # 插入
cnx.commit() # 提交
print("更新完成:{}".format(Upsql))
def getContent(social_code, com_name, code,start_time):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36",
}
comp = re.compile('-?[1-9]\d*')
num = 1
ip = baseCore.get_proxy()
url_1 = f'https://vip.stock.finance.sina.com.cn/corp/go.php/vCB_Bulletin/stockid/{code}/page_type/ndbg.phtml'
res_1 = requests.get(url_1, proxies=ip)
soup = bs(res_1.content, 'html.parser',from_encoding='gb2312')
# 获取年度报告列表
try:
list_all = soup.find('div', {'class': 'datelist'}).find_all('a')
except:
log.info(f'{social_code}.........年度报告列表为空')
exception = '年度报告列表为空'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', exception)
return
# 获取年报详细信息
for href in list_all:
ip = baseCore.get_proxy()
year_url = 'https://vip.stock.finance.sina.com.cn' + href.get('href')
year_name = href.text
res_2 = requests.get(year_url, proxies=ip)
soup_2 = bs(res_2.content, 'html.parser',from_encoding='gb2312')
try:
pdf_url = soup_2.find('th', {'style': 'text-align:center'}).find('a').get('href')
except:
log.error(f'{social_code}....{year_url}....无下载链接')
exception = '无下载链接'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, year_url, exception)
continue
for i in range(0, 3):
try:
resp_content = requests.get(pdf_url, headers=headers, verify=False, timeout=20).content
break
except:
time.sleep(3)
continue
try:
year = comp.findall(year_name)[0]
except:
continue
name_pdf = f"{com_name}:{year}年年报.pdf".replace('*', '')
result = ''
for i in range(0, 3):
try:
result = client.upload_by_buffer(resp_content, file_ext_name='pdf')
break
except Exception as e:
log.error(f'{social_code}...年报上传服务器出错:{e}')
time.sleep(3)
continue
if result == '':
exception = '上传服务器失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, year_url, exception)
continue
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
type_id = '1'
item_id = social_code
group_name = 'group1'
path = bytes.decode(result['Remote file_id']).replace('group1', '')
full_path = bytes.decode(result['Remote file_id'])
category = 'pdf'
file_size = result['Uploaded size']
order_by = num
status = 1
create_by = 'XueLingKun'
create_time = time_now
try:
tableUpdate(year, name_pdf, type_id, item_id, group_name, path, full_path, category, file_size,
order_by, status, create_by, create_time)
state = 1
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, year_url, '')
except:
exception = '数据库传输失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, year_url, exception)
def begin():
while True:
start_time = time.time()
# 获取企业信息
social_code = baseCore.redicPullData('AnnualEnterprise:gnshqy_socialCode')
if not social_code:
time.sleep(20)
continue
if social_code == 'None':
time.sleep(20)
continue
if social_code == '':
time.sleep(20)
continue
dic_info = baseCore.getInfomation(social_code)
count = dic_info[15]
code = dic_info[3]
com_name = dic_info[4]
if code is None:
exeception = '股票代码为空'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, '', exeception)
continue
getContent(social_code, com_name, code,start_time)
count += 1
runType = 'AnnualReportCount'
baseCore.updateRun(social_code, runType, count)
if __name__ == '__main__':
begin()
cursor.close()
cnx.close()
baseCore.close()
......@@ -331,6 +331,12 @@ if __name__ == '__main__':
start_time = time.time()
# 获取企业信息
social_code = baseCore.redicPullData('AnnualEnterprise:gnqy_socialCode')
if not social_code:
time.sleep(20)
continue
if social_code == 'None':
time.sleep(20)
continue
if social_code == '':
time.sleep(20)
continue
......
"""
"""
......@@ -256,8 +256,18 @@ def SpiderByZJH(url, payload, dic_info, start_time): # dic_info 数据库中获
soup = RequestUrl(url, payload, social_code, start_time)
if soup == '':
return
# 先获取页数
# 判断查找内容是否存在
try:
is_exist = soup.find('div',class_='con').text
if is_exist == '没有查询到数据':
state = 1
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(social_code, taskType, state, takeTime, url, '')
return
except:
pass
# 先获取页数
page = soup.find('div', class_='pages').find('ul', class_='g-ul').text
total = re.findall(r'\d+', page)[0]
......
......@@ -14,9 +14,9 @@ from base.smart import smart_extractor
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
jieba.cut("必须加载jieba")
# 初始化,设置中文分词
jieba.cut("必须加载jieba")
smart =smart_extractor.SmartExtractor('cn')
baseCore = BaseCore()
log = baseCore.getLogger()
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='dbScore', charset='utf8mb4')
......@@ -154,8 +154,8 @@ def beinWork(tyc_code, social_code,start_time):
time_format = baseCore.getNowTime(1)
try:
# 开始进行智能解析
lang = baseCore.detect_language(title)
smart = smart_extractor.SmartExtractor(lang)
# lang = baseCore.detect_language(title)
# smart = smart_extractor.SmartExtractor(lang)
contentText = smart.extract_by_url(link).text
# time.sleep(3)
except Exception as e:
......
# -*- coding: utf-8 -*-
'''
记录一天能采多少公众号,建一个数据库表 更新公众号的状态
记录一天能采多少公众号,建一个数据库表 更新公众号的状态 十分钟能采50多个
'''
import requests, time, random, json, pymysql, redis
......@@ -25,16 +25,14 @@ cursor = cnx.cursor()
r = baseCore.r
urllib3.disable_warnings()
def check_url(sid, article_url):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn')
res = r.sismember(f'wx_url_{sid}',article_url)
if res == 1:
if res == True:
return True
else:
return False
def add_url(sid, article_url):
r = redis.Redis(host="114.115.236.206", port=6379, password='clbzzsn')
res = r.sadd(f'wx_url_{sid}', article_url, 3) # 注意是 保存set的方式
......@@ -43,7 +41,6 @@ def add_url(sid, article_url):
else:
return False
def get_proxy():
cnx = pymysql.connect(host="114.115.159.144", user="root", password="zzsn9988", db="clb_project", charset="utf8mb4")
with cnx.cursor() as cursor:
......@@ -67,8 +64,37 @@ def get_proxy():
proxy_list.append(proxy)
return proxy_list
#定时
def getFromSql():
selectSql = "SELECT info_source_code from info_source where site_uri like '%mp.weixin.qq.com%'"
cursor.execute(selectSql)
results = cursor.fetchall()
result_list = [item[0] for item in results]
#放入redis
for item in result_list:
r.rpush('WeiXinGZH:infoSourceCode', item)
#刷新浏览器并获得token
def flushAndGetToken(list_b):
browser_run = list_b[0]
log.info('======刷新浏览器=====')
browser_run.refresh()
cookie_list = browser_run.get_cookies()
cur_url = browser_run.current_url
token = cur_url.split('token=')[1]
log.info(f'===========当前token为:{token}============')
cookies = {}
for cookie in cookie_list:
cookies[cookie['name']] = cookie['value']
return token,cookies
#采集失败的公众号 重新放入redis
def rePutIntoR(item):
r.rpush('WeiXinGZH:infoSourceCode', item)
def get_info(sid,json_search):
def get_info(sid,json_search,origin,info_source_code):
list_all_info = []
num_caiji = 0
kaishi_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
obsClient = ObsClient(
......@@ -88,6 +114,7 @@ def get_info(sid,json_search):
url_ft = check_url(sid, url_news)
if url_ft:
log.info(f'已采过该篇文章----文章链接-----{url_news}')
return list_all_info,num_caiji
try:
res_news = requests.get(url_news, timeout=20)
......@@ -173,14 +200,13 @@ def get_info(sid,json_search):
producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'])
kafka_result = producer.send("crawlerInfo", json.dumps(dic_info, ensure_ascii=False).encode('utf8'))
kafka_time_out = kafka_result.get(timeout=10)
# add_url(sid, url_news)
add_url(sid, url_news)
break
except:
time.sleep(5)
continue
num_caiji = num_caiji + 1
list_all_info.append(dic_info)
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
dic_info2 = {
'infoSourceId': sid,
......@@ -202,77 +228,11 @@ def get_info(sid,json_search):
continue
return list_all_info,num_caiji
#定时
def getFromSql():
selectSql = "SELECT info_source_code from info_source where site_uri like '%mp.weixin.qq.com%'"
cursor.execute(selectSql)
results = cursor.fetchall()
result_list = [item[0] for item in results]
#放入redis
for item in result_list:
r.rpush('WeiXinGZH:infoSourceCode', item)
#刷新浏览器并获得token
def flushAndGetToken(list_b):
browser_run = list_b[0]
log.info('======刷新浏览器=====')
browser_run.refresh()
cookie_list = browser_run.get_cookies()
cur_url = browser_run.current_url
token = cur_url.split('token=')[1]
log.info(f'===========当前token为:{token}============')
cookies = {}
for cookie in cookie_list:
cookies[cookie['name']] = cookie['value']
return token,cookies
#采集失败的公众号 重新放入redis
def rePutIntoR(item):
r.rpush('WeiXinGZH:infoSourceCode', item)
if __name__=="__main__":
requests.DEFAULT_RETRIES = 5
time_start = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print(f'开始时间为:{time_start}')
requests.adapters.DEFAULT_RETRIES = 3
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36',
}
opt = webdriver.ChromeOptions()
opt.add_argument(
'user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36')
opt.add_argument("--ignore-certificate-errors")
opt.add_argument("--ignore-ssl-errors")
opt.add_experimental_option("excludeSwitches", ["enable-automation"])
opt.add_experimental_option('excludeSwitches', ['enable-logging'])
opt.add_experimental_option('useAutomationExtension', False)
# opt.binary_location =r'D:\crawler\baidu_crawler\tool\Google\Chrome\Application\chrome.exe'
# chromedriver = r'C:\Users\WIN10\DataspellProjects\crawlerProjectDemo\tmpcrawler\cmd100\chromedriver.exe'
chromedriver = r'D:/chrome/chromedriver.exe'
browser1 = webdriver.Chrome(chrome_options=opt, executable_path=chromedriver)
list_b = [browser1]
url = "https://mp.weixin.qq.com/"
browser1.get(url)
# 可改动
time.sleep(30)
num_b = 0
# todo:从数据库里读信息,放入redis,定时任务 每天放入数据
# getFromSql()
def job(count,key):
s = requests.session()
# 记录运行公众号的个数
count = 0
while True:
# 刷新浏览器并获取当前token和cookie
token, cookies = flushAndGetToken(list_b)
list_all_info = []
log.info('===========获取公众号============')
start_ = time.time()
# #todo:redis中数据 pop一条
......@@ -282,7 +242,7 @@ if __name__=="__main__":
getFromSql()
time.sleep(20)
log.info(f'========本次公众号已采集完毕,共采集{count}个公众号=========总耗时:{baseCore.getTimeCost(start_,time.time())}')
continue
return
sql = f"SELECT site_uri,id,site_name,info_source_code from info_source where info_source_code = '{infoSourceCode}' "
# '一带一路百人论坛'
......@@ -322,10 +282,11 @@ if __name__=="__main__":
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(error))
cnx_.commit()
continue
return
fakeid = biz + '=='
url_search = f'https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin=5&count=5&fakeid={fakeid}&type=9&query=&token={token}&lang=zh_CN&f=json&ajax=1'
url_search = f'https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin=0&count=5&fakeid={fakeid}&type=9&query=&token={token}&lang=zh_CN&f=json&ajax=1'
# https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin=0&count=5&fakeid=MzAwNDA5Njc1Mg==&type=9&query=&token=550883192&lang=zh_CN&f=json&ajax=1
try:
ip = get_proxy()[random.randint(0, 3)]
json_search = s.get(url_search, headers=headers, proxies=ip,
......@@ -336,7 +297,7 @@ if __name__=="__main__":
except Exception as e:
log.error(f'===公众号{origin}请求失败!当前时间:{baseCore.getNowTime(1)}======={e}===')
rePutIntoR(info_source_code)
continue
return
#{"base_resp": {"ret": 200003, "err_msg": "invalid session"}}
# TODO:需要判断返回值,根据返回值判断是封号还是biz错误
# {'base_resp': {'err_msg': 'freq control', 'ret': 200013}}========= 封号
......@@ -351,13 +312,16 @@ if __name__=="__main__":
# 刷新 暂时用一下方法
rePutIntoR(info_source_code)
log.info(f'======该账号被封=======')
for i in range(0,6): #600,1200,1800,2400,3000,3600
#刷新
wait_time = time.sleep(600)
log.info(f'=======等待时间{wait_time}秒=====刷新浏览器=====')
browser_run = list_b[0]
browser_run.refresh()
continue
# for i in range(0,6): #600,1200,1800,2400,3000,3600
# #刷新
# time.sleep(600)
# log.info('=======等待时间600秒=====刷新浏览器=====')
# browser_run = list_b[0]
# browser_run.refresh()
r.set(key, 50)
r.expire(key, 3600)
return
elif ret == 200002:
# 公众号链接错误 保存库里 记录错误信息及错误类型
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
......@@ -373,7 +337,7 @@ if __name__=="__main__":
cursor_.execute(insertSql, tuple(error))
cnx_.commit()
log.info(f'公众号----{origin}----耗时{baseCore.getTimeCost(start_,time.time())}')
continue
return
elif ret == 200003:
# 无效的session
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
......@@ -389,7 +353,7 @@ if __name__=="__main__":
cursor_.execute(insertSql, tuple(error))
cnx_.commit()
log.info(f'公众号----{origin}----耗时{baseCore.getTimeCost(start_, time.time())}')
continue
return
else:
log.info(f'----其他情况-----{json_search}---公众号{origin}------')
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
......@@ -404,11 +368,12 @@ if __name__=="__main__":
insertSql = f"insert into WeixinGZH (site_name,site_url,info_source_code,json_error_info,error_type,create_time) values (%s,%s,%s,%s,%s,%s)"
cursor_.execute(insertSql, tuple(error))
cnx_.commit()
continue
return
list_all = json_search['app_msg_list']
try:
list_all_info,num_caiji= get_info(sid,json_search)
list_all_info,num_caiji= get_info(sid,json_search,origin,info_source_code)
print(f'----------{len(list_all_info)}------{num_caiji}-------')
time.sleep(2)
if len(list_all_info) != 0:
count += 1
......@@ -448,9 +413,71 @@ if __name__=="__main__":
log.info(f'{fakeid}、公众号:{origin}采集失败!!!!!!耗时{baseCore.getTimeCost(start_, time.time())}')
time.sleep(2)
#关闭资源
return count
if __name__=="__main__":
requests.DEFAULT_RETRIES = 5
time_start = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
log.info(f'开始时间为:{time_start}')
requests.adapters.DEFAULT_RETRIES = 3
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36',
}
opt = webdriver.ChromeOptions()
opt.add_argument(
'user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36')
opt.add_argument("--ignore-certificate-errors")
opt.add_argument("--ignore-ssl-errors")
opt.add_experimental_option("excludeSwitches", ["enable-automation"])
opt.add_experimental_option('excludeSwitches', ['enable-logging'])
opt.add_experimental_option('useAutomationExtension', False)
# opt.binary_location =r'D:\crawler\baidu_crawler\tool\Google\Chrome\Application\chrome.exe'
# chromedriver = r'C:\Users\WIN10\DataspellProjects\crawlerProjectDemo\tmpcrawler\cmd100\chromedriver.exe'
chromedriver = r'D:/chrome/chromedriver.exe'
browser1 = webdriver.Chrome(chrome_options=opt, executable_path=chromedriver)
list_b = [browser1]
url = "https://mp.weixin.qq.com/"
browser1.get(url)
# 可改动
time.sleep(30)
num_b = 0
# 记录运行公众号的个数
count = 0
# todo:从数据库里读信息,放入redis,定时任务 每天放入数据
# getFromSql()
s = requests.session()
# 设置计数器的初始值为0
key = baseCore.getNextSeq()
r.set(key, 0)
# 设置key的过期时间为10秒
r.expire(key, 3600)
while True:
time.sleep(2)
new_value = baseCore.incrSet(key)
baseCore.getttl(key)
if new_value < 50:
aa = job(count,key)
count = aa
else:
#刷新浏览器
browser_run = list_b[0]
browser_run.refresh()
time.sleep(600)
# 关闭资源
cnx.close()
cursor.close()
baseCore.close()
import json
import json
......@@ -11,18 +11,20 @@ from NewsYahoo import news
from base.BaseCore import BaseCore
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
cnx = pymysql.connect(host='114.116.44.11', user='root', password='f7s0&7qqtK', db='clb_project', charset='utf8mb4')
cursor = cnx.cursor()
taskType = '企业基本信息/雅虎财经'
baseCore = BaseCore()
log= baseCore.getLogger()
r = baseCore.r
log = baseCore.getLogger()
headers = {
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'accept-encoding': 'gzip, deflate, br',
'accept-language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7',
'cache-control': 'max-age=0',
#'cookie': 'maex=%7B%22v2%22%3A%7B%7D%7D; GUC=AQEBBwFjY49jkEIa8gQo&s=AQAAABw20C7P&g=Y2JIFQ; A1=d=AQABBBIpnmICEOnPTXZVmK6DESXgxq3niTMFEgEBBwGPY2OQYysNb2UB_eMBAAcIEimeYq3niTM&S=AQAAAobGawhriFKqJdu9-rSz9nc; A3=d=AQABBBIpnmICEOnPTXZVmK6DESXgxq3niTMFEgEBBwGPY2OQYysNb2UB_eMBAAcIEimeYq3niTM&S=AQAAAobGawhriFKqJdu9-rSz9nc; A1S=d=AQABBBIpnmICEOnPTXZVmK6DESXgxq3niTMFEgEBBwGPY2OQYysNb2UB_eMBAAcIEimeYq3niTM&S=AQAAAobGawhriFKqJdu9-rSz9nc&j=WORLD; PRF=t%3D6954.T%252BTEL%252BSOLB.BR%252BSTM%252BEMR%252BGT%252BAMD%252BSYM.DE%252BPEMEX%252BSGO.PA%252BLRLCF%252BSYNH%252B001040.KS; cmp=t=1669714927&j=0&u=1---',
# 'cookie': 'maex=%7B%22v2%22%3A%7B%7D%7D; GUC=AQEBBwFjY49jkEIa8gQo&s=AQAAABw20C7P&g=Y2JIFQ; A1=d=AQABBBIpnmICEOnPTXZVmK6DESXgxq3niTMFEgEBBwGPY2OQYysNb2UB_eMBAAcIEimeYq3niTM&S=AQAAAobGawhriFKqJdu9-rSz9nc; A3=d=AQABBBIpnmICEOnPTXZVmK6DESXgxq3niTMFEgEBBwGPY2OQYysNb2UB_eMBAAcIEimeYq3niTM&S=AQAAAobGawhriFKqJdu9-rSz9nc; A1S=d=AQABBBIpnmICEOnPTXZVmK6DESXgxq3niTMFEgEBBwGPY2OQYysNb2UB_eMBAAcIEimeYq3niTM&S=AQAAAobGawhriFKqJdu9-rSz9nc&j=WORLD; PRF=t%3D6954.T%252BTEL%252BSOLB.BR%252BSTM%252BEMR%252BGT%252BAMD%252BSYM.DE%252BPEMEX%252BSGO.PA%252BLRLCF%252BSYNH%252B001040.KS; cmp=t=1669714927&j=0&u=1---',
'sec-ch-ua': '"Chromium";v="106", "Google Chrome";v="106", "Not;A=Brand";v="99"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': "Windows",
......@@ -33,42 +35,22 @@ headers = {
'upgrade-insecure-requests': '1',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36'
}
# 获取股票代码
# def getGpdm(name):
# start=time.time()
# gpdm=""
# try:
#
# url = f'https://query1.finance.yahoo.com/v1/finance/search?q={name}&lang=en-US&region=US&quotesCount=6&newsCount=2&listsCount=2&enableFuzzyQuery=false&quotesQueryId=tss_match_phrase_query&multiQuoteQueryId=multi_quote_single_token_query&newsQueryId=news_cie_vespa&enableCb=true&enableNavLinks=true&enableEnhancedTrivialQuery=true&enableResearchReports=true&enableCulturalAssets=true&enableLogoUrl=true&researchReportsCount=2'
# response = requests.get(url, headers=headers, verify=False,timeout=(3.05, 3))
# time.sleep(3)
# except:
# return gpdm
# if (response.status_code == 200):
# pass
# else:
# log.error(f"{name}------获取股票接口返回失败:{response.status_code}")
# return gpdm
# retJson = json.loads(response.content.decode('utf-8'))
# try:
# gpdm =retJson['quotes'][0]['symbol']
# except:
# log.error(f"{name}---获取股票代码异常")
# return gpdm
# log.info(f"获取股票代码--{name},耗时{baseCore.getTimeCost(start, time.time())}")
#
# return gpdm
# 根据股票代码 获取企业基本信息 高管信息
def getInfo(gpdm,xydm):
def getInfo(name,enname,gpdm, xydm, start):
if 'HK' in str(gpdm):
tmp_g = str(gpdm).split('.')[0]
if len(tmp_g) == 5:
gpdm_ = str(gpdm)[1:]
else:
pass
else:
gpdm_ = gpdm
if 'HK' in gpdm_:
gpdm_ = gpdm_
start = time.time()
retData={}
retData = {}
retData['base_info'] = {
'公司名称': '',
'英文名':'',
'公司名称': name,
'英文名': enname,
'信用代码': xydm,
'股票代码': gpdm,
'地址': '',
......@@ -79,12 +61,12 @@ def getInfo(gpdm,xydm):
'员工人数': '',
'公司简介': ''
}
retData['people_info']=[]
retData['people_info'] = []
# https://finance.yahoo.com/quote/VOW3.DE/profile?p=VOW3.DE
url = f'https://finance.yahoo.com/quote/{gpdm}/profile?p={gpdm}'
url = f'https://finance.yahoo.com/quote/{gpdm_}/profile?p={gpdm_}'
time.sleep(3)
for i in range(0,3):
for i in range(0, 3):
try:
response = requests.get(url, headers=headers, verify=False)
time.sleep(1)
......@@ -92,14 +74,21 @@ def getInfo(gpdm,xydm):
break
else:
log.error(f"{gpdm}---第{i}次---获取基本信息接口返回失败:{response.status_code}")
except :
except:
continue
if (response.status_code == 200):
pass
else:
log.error(f"{gpdm}------获取基本信息接口重试后依然失败失败:{response.status_code}")
return retData
exeception = '获取基本信息接口返回失败'
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, url, exeception)
rePutIntoR('')
return [state,retData]
state = 1
soup = BeautifulSoup(response.content, 'html.parser')
page = soup.find('div', {'id': 'Col1-0-Profile-Proxy'})
try:
......@@ -135,8 +124,8 @@ def getInfo(gpdm,xydm):
except:
com_jianjie = ''
dic_com_info = {
'公司名称':'',
'英文名':'',
'公司名称': name,
'英文名': enname,
'信用代码': xydm,
'股票代码': gpdm,
'地址': com_address,
......@@ -147,8 +136,8 @@ def getInfo(gpdm,xydm):
'员工人数': com_people,
'公司简介': com_jianjie
}
retData['base_info']=dic_com_info
#高管信息
retData['base_info'] = dic_com_info
# 高管信息
retPeople = []
try:
list_people = page.find('table', {'class': 'W(100%)'}).find_all('tr')[1:]
......@@ -177,8 +166,8 @@ def getInfo(gpdm,xydm):
except:
p_year = ''
if(p_zhiwu=="N/A"):
p_zhiwu=""
if (p_zhiwu == "N/A"):
p_zhiwu = ""
if (p_money == "N/A"):
p_money = ""
if (p_xingshi == "N/A"):
......@@ -186,7 +175,7 @@ def getInfo(gpdm,xydm):
if (p_year == "N/A"):
p_year = ""
dic_main_people = {
'公司名称': '',
'公司名称': name,
'股票代码': gpdm,
'信用代码': xydm,
'姓名': p_name,
......@@ -197,41 +186,16 @@ def getInfo(gpdm,xydm):
}
retPeople.append(dic_main_people)
retData['people_info'] = retPeople
df_retData = pd.DataFrame(retPeople)
# df_a = pd.DataFrame(retData['base_info'])
df_retData.to_excel('./data/采集高管结果1.xlsx',index=False)
log.info(f"获取基本信息--{gpdm},耗时{baseCore.getTimeCost(start, time.time())}")
return retData
response.close()
return [state,retData]
# def Nongpdm(xydm,name):
# start = time.time()
# company_dict = {
# 'name': name, # 企业名称
# 'shortName': '', # 企业简称
# 'socialCreditCode': xydm, # 统一社会信用代码
# 'officialPhone': '', # 电话
# 'officialUrl': '', # 官网
# 'briefInfo': '', # 简介
# 'industry': '', # 所属行业
# 'englishName': name, # 英文名
# 'address': '', # 地址
# 'status': 0, # 状态
# }
# # print(company_dict)
# producer = KafkaProducer(bootstrap_servers=['114.115.159.144:9092'], api_version=(2, 0, 2))
# kafka_result = producer.send("regionInfo", json.dumps(company_dict, ensure_ascii=False).encode('utf8'))
# kafka_result.get(timeout=10)
# # log.info(f"保存基本信息--{info['base_info']['信用代码']},耗时{baseCore.getTimeCost(start, time.time())}")
# log.info(f"保存基本信息--{company_dict['name']},耗时{baseCore.getTimeCost(start, time.time())}")
# return company_dict
#
#保存基本信息
def saveBaseInfo(info):
start = time.time()
#基本信息发送到kafka
# 保存基本信息
def saveBaseInfo(info,start):
# 基本信息发送到kafka
company_dict = {
'name': '', # 企业名称
'name': info['base_info']['公司名称'], # 企业名称
'shortName': '', # 企业简称
'socialCreditCode': info['base_info']['信用代码'], # 统一社会信用代码
'officialPhone': info['base_info']['电话'], # 电话
......@@ -249,13 +213,13 @@ def saveBaseInfo(info):
log.info(f"保存基本信息--{info['base_info']['信用代码']},耗时{baseCore.getTimeCost(start, time.time())}")
log.info(f"保存基本信息--{company_dict['name']},耗时{baseCore.getTimeCost(start, time.time())}")
#保存高管信息
def savePeopleInfo(info):
start = time.time()
# 保存高管信息
def savePeopleInfo(info,start):
# 高管信息调用接口
list_people = info['people_info']
list_one_info = []
for i in range(0,len(list_people)):
for i in range(0, len(list_people)):
dic_json = {
"socialCreditCode": list_people[i]['信用代码'],
"name": list_people[i]['姓名'],
......@@ -269,7 +233,7 @@ def savePeopleInfo(info):
"benefitShare": '',
"currentTerm": '',
"personInfo": '',
"sort": str(i+1)
"sort": str(i + 1)
}
list_one_info.append(dic_json)
json_updata = json.dumps(list_one_info)
......@@ -277,11 +241,12 @@ def savePeopleInfo(info):
if json_updata == '[]':
pass
else:
for i in range(0,3):
response = requests.post('http://114.115.236.206:9988/datapull/sync/executive',data=json_updata,timeout=300, verify=False)
for i in range(0, 3):
response = requests.post('http://114.115.236.206:9988/datapull/sync/executive', data=json_updata,
timeout=300, verify=False)
if (response.status_code == 200):
retJson = json.loads(response.content.decode('utf-8'))
if(retJson['success'] or retJson['success']=='true'):
if (retJson['success'] or retJson['success'] == 'true'):
break
if (response.status_code == 200):
......@@ -290,123 +255,107 @@ def savePeopleInfo(info):
pass
else:
log.error("保存高管接口失败---{retJson}")
exception = '保存高管接口失败'
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(dic_json['socialCreditCode'], taskType, state, takeTime, '', exception)
return state
else:
log.error("保存高管接口失败---{response.status_code}")
exception = '保存高管接口失败'
state = 0
takeTime = baseCore.getTimeCost(start, time.time())
baseCore.recordLog(dic_json['socialCreditCode'], taskType, state, takeTime, '', exception)
return state
state = 1
log.info(f"保存高管信息--{info['base_info']['信用代码']},耗时{baseCore.getTimeCost(start, time.time())}")
return state
def rePutIntoR(item):
r.rpush('BaseInfoEnterprise:gwqy_socialCode', item)
#根据名字获取股票代码 必须是英文名字 如果提供的数据有股票代码 则跳过此步骤
#gpdm=getGpdm("Volkswagen")
#生成一个新的信用代码 如果提供的原始数据有信用代码 则不能生成新的信用代码
#xydm=baseCore.getNextXydm()
# xydm='ZZSN230710201009006'
# retData=getInfo("Volkswagen","VOW3.DE",xydm)
# saveBaseInfo(retData)
# savePeopleInfo(retData)
# print(retData)
# def getInfomation(social_code):
# sql = f"SELECT * FROM EnterpriseInfo WHERE SocialCode = '{social_code}'"
# cursor.execute(sql)
# data = cursor.fetchone()
# return data
#采集工作
# 采集工作
def beginWork():
#给定excel名单 保存股票代码
okCount=0
errorCount=0
df_all = pd.read_excel('./data/福布斯美国企业股票代码.xlsx',dtype=str,keep_default_na=False)
a = []
b = []
# df_all = pd.read_excel('../../data/23年500强企业新榜股票代码.xlsx',dtype=str, keep_default_na=False)
for i in range(len(df_all)):
# # name = df_all['中文名称'][i]
# # rank = df_all['排名'][i]
# # officialUrl = df_all['企业官网'][i]
# # industry = df_all['行业'][i]
# # englishName = df_all['英文名称'][i]
# # address = df_all['企业总部地址'][i]
#
# xydm_name = df_all_xydm['名称'][i]
# # print(xydm_name)
# for j in range(len(df_all)):
# name = df_all['中文名称'][j]
# if name == xydm_name:
# print(name,xydm_name)
# xydm = df_all_xydm['信用代码'][i]
# if i>=22:
# pass
# else:
# continue
# log.info(f"{i}----------开始")
# # country = df_all['企业所属国家'][i]
# # if country=='中国':
# # continue
# # else:
# # log.info(f"{i}----------为国外企业 继续")
name = df_all['name'][i]
ename = df_all['companyName'][i]
gpdm= df_all['code'][i]
xydm = df_all['xydm'][i]
# if gpdm == '':
# Nongpdm(xydm,name)
# continue
# # print(f'名字:{name},股票代码:{gpdm},信用代码:{xydm}')
# # #没有股票代码,就保存榜单中的数据
# if gpdm == '':
# Nongpdm(gpdm,name)
# continue
# # xydm = baseCore.getNextXydm()
# # # Nongpdm(xydm,name,officialUrl,industry,englishName,address)
# # else:
# # log.info(f"{j}----------为股票代码不为空 继续")
# # pass
# # enname = df_all['英文名称'][j]
# # if enname != '':
# # pass
# # else:
# # log.info(f"{j}----------英文名字为空 跳过")
# # continue
# # # log.info(f"{i}----------开始股票代码")
# # # gpdm = getGpdm(enname)
# # # xydm=baseCore.getNextXydm()
# #
while True:
social_code = baseCore.redicPullData('BaseInfoEnterprise:gwqy_socialCode')
if not social_code:
time.sleep(20)
continue
if social_code == 'None':
time.sleep(20)
continue
# 数据库中获取基本信息
data = baseCore.getInfomation(social_code)
name = data[1]
enname = data[5]
gpdm = data[3]
xydm = data[2]
# 获取该企业对应项目的采集次数
count = data[13]
start_time = time.time()
# 股票代码为空跳过
if gpdm is None:
log.error(f"{name}--股票代码为空 跳过")
exception = '股票代码为空'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, '', exception)
continue
try:
retData = getInfo(name,enname,gpdm, xydm, start_time)
# 基本信息采集成功 进行数据入库,否则不入库
if retData[0] == 1:
# 企业基本信息入库
try:
retData = getInfo(name,ename,gpdm,xydm)
saveBaseInfo(retData)
savePeopleInfo(retData)
#采集企业动态
news(i,gpdm,xydm)
okCount += 1
b.append([name,gpdm,xydm])
print(name,'.......采集成功')
saveBaseInfo(retData[1],start_time)
except:
a.append([name,gpdm,xydm])
errorCount += 1
print(name,'......采集失败')
print(f'成功{okCount}个,失败{errorCount}个')
df_a = pd.DataFrame(np.array(a))
df_a.columns = ['name','gpdm','xydm']
df_a.to_excel('./data/没有采集到.xlsx',index=False)
df_b = pd.DataFrame(np.array(b))
df_b.columns = ['name','gpdm','xydm']
df_b.to_excel('./data/采集到.xlsx',index=False)
#
# if gpdm!='':
# okCount=okCount+1
# else:
# errorCount=errorCount+1
# log.info(f"{i}-------成功{okCount}--失败-{errorCount}")
# # if gpdm == '':
# # continue
# # else:
# # pass
# # df_all['股票代码'][j]=gpdm
# # else:
# # continue
# # if (i % 10 == 0):
# # df_all.to_excel(r'./data/23年500强企业新上榜_ret22.xlsx', sheet_name='Sheet1', index=False, header=True)
# # df_all.to_excel(r'./data/23年500强企业新榜_ret22.xlsx', sheet_name='Sheet1', index=False, header=True)
# # 释放资源
log.error(f'{name}....企业基本信息Kafka操作失败')
exception = 'Kafka操作失败'
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, '', exception)
# 企业高管信息入库
state = savePeopleInfo(retData[1],start_time)
# 只有企业高管信息和企业基本信息都采集到,该企业才算采集成功
if state == 1:
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, '', '')
else:
pass
else:
pass
except Exception as e:
# 若出现尚未发现的错误,则保存错误信息以及出错位置
ee = e.__traceback__.tb_lineno
log.error(f'{name}...{xydm}...{gpdm}.....数据采集失败,原因:{ee}行 {e}')
state = 0
takeTime = baseCore.getTimeCost(start_time, time.time())
baseCore.recordLog(xydm, taskType, state, takeTime, '', f'数据采集失败,原因:{ee}行 {e}')
# 企业数据采集完成,采集次数加一
count += 1
runType = 'BaseInfoRunCount'
baseCore.updateRun(social_code,runType,count)
# 释放资源
baseCore.close()
if __name__ == '__main__':
#gpdm = getGpdm("Volkswagen")
#print(gpdm)
cnx = pymysql.connect(host='114.115.159.144', user='root', password='zzsn9988', db='caiji',charset='utf8mb4')
cursor = cnx.cursor()
beginWork()
cursor.close()
cnx.close()
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论