提交 335b0090 作者: 薛凌堃

fdfs

上级 0893c367
# __init__.py
__version__ = '2.2.0'
VERSION = tuple(map(int, __version__.split('.')))
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# filename: client.py
'''
Client module for Fastdfs 3.08
author: scott yuan scottzer8@gmail.com
date: 2012-06-21
'''
import os
import sys
from fdfs_client.utils import *
from fdfs_client.tracker_client import *
from fdfs_client.storage_client import *
from fdfs_client.exceptions import *
def get_tracker_conf(conf_path='client.conf'):
cf = Fdfs_ConfigParser()
tracker = {}
try:
cf.read(conf_path)
timeout = cf.getint('__config__', 'connect_timeout')
tracker_list = cf.get('__config__', 'tracker_server')
if isinstance(tracker_list, str):
tracker_list = [tracker_list]
tracker_ip_list = []
for tr in tracker_list:
tracker_ip, tracker_port = tr.split(':')
tracker_ip_list.append(tracker_ip)
tracker['host_tuple'] = tuple(tracker_ip_list)
tracker['port'] = int(tracker_port)
tracker['timeout'] = timeout
tracker['name'] = 'Tracker Pool'
except:
raise
return tracker
class Fdfs_client(object):
'''
Class Fdfs_client implemented Fastdfs client protol ver 3.08.
It's useful upload, download, delete file to or from fdfs server, etc. It's uses
connection pool to manage connection to server.
'''
def __init__(self, trackers, poolclass=ConnectionPool):
self.trackers = trackers
self.tracker_pool = poolclass(**self.trackers)
self.timeout = self.trackers['timeout']
return None
def __del__(self):
try:
self.pool.destroy()
self.pool = None
except:
pass
def upload_by_filename(self, filename, meta_dict=None):
'''
Upload a file to Storage server.
arguments:
@filename: string, name of file that will be uploaded
@meta_dict: dictionary e.g.:{
'ext_name' : 'jpg',
'file_size' : '10240B',
'width' : '160px',
'hight' : '80px'
} meta_dict can be null
@return dict {
'Group name' : group_name,
'Remote file_id' : remote_file_id,
'Status' : 'Upload successed.',
'Local file name' : local_file_name,
'Uploaded size' : upload_size,
'Storage IP' : storage_ip
} if success else None
'''
isfile, errmsg = fdfs_check_file(filename)
if not isfile:
raise DataError(errmsg + '(uploading)')
tc = Tracker_client(self.tracker_pool)
store_serv = tc.tracker_query_storage_stor_without_group()
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
return store.storage_upload_by_filename(tc, store_serv, filename, meta_dict)
def upload_by_file(self, filename, meta_dict=None):
isfile, errmsg = fdfs_check_file(filename)
if not isfile:
raise DataError(errmsg + '(uploading)')
tc = Tracker_client(self.tracker_pool)
store_serv = tc.tracker_query_storage_stor_without_group()
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
return store.storage_upload_by_file(tc, store_serv, filename, meta_dict)
def upload_by_buffer(self, filebuffer, file_ext_name=None, meta_dict=None):
'''
Upload a buffer to Storage server.
arguments:
@filebuffer: string, buffer
@file_ext_name: string, file extend name
@meta_dict: dictionary e.g.:{
'ext_name' : 'jpg',
'file_size' : '10240B',
'width' : '160px',
'hight' : '80px'
}
@return dict {
'Group name' : group_name,
'Remote file_id' : remote_file_id,
'Status' : 'Upload successed.',
'Local file name' : '',
'Uploaded size' : upload_size,
'Storage IP' : storage_ip
} if success else None
'''
if not filebuffer:
raise DataError('[-] Error: argument filebuffer can not be null.')
tc = Tracker_client(self.tracker_pool)
store_serv = tc.tracker_query_storage_stor_without_group()
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
return store.storage_upload_by_buffer(tc, store_serv, filebuffer, file_ext_name, meta_dict)
def upload_slave_by_filename(self, filename, remote_file_id, prefix_name, meta_dict=None):
'''
Upload slave file to Storage server.
arguments:
@filename: string, local file name
@remote_file_id: string, remote file id
@prefix_name: string
@meta_dict: dictionary e.g.:{
'ext_name' : 'jpg',
'file_size' : '10240B',
'width' : '160px',
'hight' : '80px'
}
@return dictionary {
'Status' : 'Upload slave successed.',
'Local file name' : local_filename,
'Uploaded size' : upload_size,
'Remote file id' : remote_file_id,
'Storage IP' : storage_ip
}
'''
isfile, errmsg = fdfs_check_file(filename)
if not isfile:
raise DataError(errmsg + '(uploading slave)')
tmp = split_remote_fileid(remote_file_id)
if not tmp:
raise DataError('[-] Error: remote_file_id is invalid.(uploading slave)')
if not prefix_name:
raise DataError('[-] Error: prefix_name can not be null.')
group_name, remote_filename = tmp
tc = Tracker_client(self.tracker_pool)
store_serv = tc.tracker_query_storage_stor_with_group(group_name)
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
try:
ret_dict = store.storage_upload_slave_by_filename(tc, store_serv, filename, prefix_name, remote_filename,
meta_dict=None)
except:
raise
ret_dict['Status'] = 'Upload slave file successed.'
return ret_dict
def upload_slave_by_file(self, filename, remote_file_id, prefix_name, meta_dict=None):
'''
Upload slave file to Storage server.
arguments:
@filename: string, local file name
@remote_file_id: string, remote file id
@prefix_name: string
@meta_dict: dictionary e.g.:{
'ext_name' : 'jpg',
'file_size' : '10240B',
'width' : '160px',
'hight' : '80px'
}
@return dictionary {
'Status' : 'Upload slave successed.',
'Local file name' : local_filename,
'Uploaded size' : upload_size,
'Remote file id' : remote_file_id,
'Storage IP' : storage_ip
}
'''
isfile, errmsg = fdfs_check_file(filename)
if not isfile:
raise DataError(errmsg + '(uploading slave)')
tmp = split_remote_fileid(remote_file_id)
if not tmp:
raise DataError('[-] Error: remote_file_id is invalid.(uploading slave)')
if not prefix_name:
raise DataError('[-] Error: prefix_name can not be null.')
group_name, remote_filename = tmp
tc = Tracker_client(self.tracker_pool)
store_serv = tc.tracker_query_storage_stor_with_group(group_name)
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
try:
ret_dict = store.storage_upload_slave_by_file(tc, store_serv, filename, prefix_name, remote_filename,
meta_dict=None)
except:
raise
ret_dict['Status'] = 'Upload slave file successed.'
return ret_dict
def upload_slave_by_buffer(self, filebuffer, remote_file_id, meta_dict=None, file_ext_name=None):
'''
Upload slave file by buffer
arguments:
@filebuffer: string
@remote_file_id: string
@meta_dict: dictionary e.g.:{
'ext_name' : 'jpg',
'file_size' : '10240B',
'width' : '160px',
'hight' : '80px'
}
@return dictionary {
'Status' : 'Upload slave successed.',
'Local file name' : local_filename,
'Uploaded size' : upload_size,
'Remote file id' : remote_file_id,
'Storage IP' : storage_ip
}
'''
if not filebuffer:
raise DataError('[-] Error: argument filebuffer can not be null.')
tmp = split_remote_fileid(remote_file_id)
if not tmp:
raise DataError('[-] Error: remote_file_id is invalid.(uploading slave)')
group_name, remote_filename = tmp
tc = Tracker_client(self.tracker_pool)
store_serv = tc.tracker_query_storage_update(group_name, remote_filename)
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
return store.storage_upload_slave_by_buffer(tc, store_serv, filebuffer, remote_filename, meta_dict,
file_ext_name)
def upload_appender_by_filename(self, local_filename, meta_dict=None):
'''
Upload an appender file by filename.
arguments:
@local_filename: string
@meta_dict: dictionary e.g.:{
'ext_name' : 'jpg',
'file_size' : '10240B',
'width' : '160px',
'hight' : '80px'
} Notice: it can be null
@return dict {
'Group name' : group_name,
'Remote file_id' : remote_file_id,
'Status' : 'Upload successed.',
'Local file name' : '',
'Uploaded size' : upload_size,
'Storage IP' : storage_ip
} if success else None
'''
isfile, errmsg = fdfs_check_file(local_filename)
if not isfile:
raise DataError(errmsg + '(uploading appender)')
tc = Tracker_client(self.tracker_pool)
store_serv = tc.tracker_query_storage_stor_without_group()
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
return store.storage_upload_appender_by_filename(tc, store_serv, local_filename, meta_dict)
def upload_appender_by_file(self, local_filename, meta_dict=None):
'''
Upload an appender file by file.
arguments:
@local_filename: string
@meta_dict: dictionary e.g.:{
'ext_name' : 'jpg',
'file_size' : '10240B',
'width' : '160px',
'hight' : '80px'
} Notice: it can be null
@return dict {
'Group name' : group_name,
'Remote file_id' : remote_file_id,
'Status' : 'Upload successed.',
'Local file name' : '',
'Uploaded size' : upload_size,
'Storage IP' : storage_ip
} if success else None
'''
isfile, errmsg = fdfs_check_file(local_filename)
if not isfile:
raise DataError(errmsg + '(uploading appender)')
tc = Tracker_client(self.tracker_pool)
store_serv = tc.tracker_query_storage_stor_without_group()
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
return store.storage_upload_appender_by_file(tc, store_serv, local_filename, meta_dict)
def upload_appender_by_buffer(self, filebuffer, file_ext_name=None, meta_dict=None):
'''
Upload a buffer to Storage server.
arguments:
@filebuffer: string
@file_ext_name: string, can be null
@meta_dict: dictionary, can be null
@return dict {
'Group name' : group_name,
'Remote file_id' : remote_file_id,
'Status' : 'Upload successed.',
'Local file name' : '',
'Uploaded size' : upload_size,
'Storage IP' : storage_ip
} if success else None
'''
if not filebuffer:
raise DataError('[-] Error: argument filebuffer can not be null.')
tc = Tracker_client(self.tracker_pool)
store_serv = tc.tracker_query_storage_stor_without_group()
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
return store.storage_upload_appender_by_buffer(tc, store_serv, filebuffer, meta_dict, file_ext_name)
def delete_file(self, remote_file_id):
'''
Delete a file from Storage server.
arguments:
@remote_file_id: string, file_id of file that is on storage server
@return tuple ('Delete file successed.', remote_file_id, storage_ip)
'''
tmp = split_remote_fileid(remote_file_id)
if not tmp:
raise DataError('[-] Error: remote_file_id is invalid.(in delete file)')
group_name, remote_filename = tmp
tc = Tracker_client(self.tracker_pool)
store_serv = tc.tracker_query_storage_update(group_name, remote_filename)
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
return store.storage_delete_file(tc, store_serv, remote_filename)
def download_to_file(self, local_filename, remote_file_id, offset=0, down_bytes=0):
'''
Download a file from Storage server.
arguments:
@local_filename: string, local name of file
@remote_file_id: string, file_id of file that is on storage server
@offset: long
@downbytes: long
@return dict {
'Remote file_id' : remote_file_id,
'Content' : local_filename,
'Download size' : downloaded_size,
'Storage IP' : storage_ip
}
'''
tmp = split_remote_fileid(remote_file_id)
if not tmp:
raise DataError('[-] Error: remote_file_id is invalid.(in download file)')
group_name, remote_filename = tmp
if not offset:
file_offset = int(offset)
if not down_bytes:
download_bytes = int(down_bytes)
tc = Tracker_client(self.tracker_pool)
store_serv = tc.tracker_query_storage_fetch(group_name, remote_filename)
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
return store.storage_download_to_file(tc, store_serv, local_filename, file_offset, download_bytes,
remote_filename)
def download_to_buffer(self, remote_file_id, offset=0, down_bytes=0):
'''
Download a file from Storage server and store in buffer.
arguments:
@remote_file_id: string, file_id of file that is on storage server
@offset: long
@down_bytes: long
@return dict {
'Remote file_id' : remote_file_id,
'Content' : file_buffer,
'Download size' : downloaded_size,
'Storage IP' : storage_ip
}
'''
tmp = split_remote_fileid(remote_file_id)
if not tmp:
raise DataError('[-] Error: remote_file_id is invalid.(in download file)')
group_name, remote_filename = tmp
if not offset:
file_offset = int(offset)
if not down_bytes:
download_bytes = int(down_bytes)
tc = Tracker_client(self.tracker_pool)
store_serv = tc.tracker_query_storage_fetch(group_name, remote_filename)
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
file_buffer = None
return store.storage_download_to_buffer(tc, store_serv, file_buffer, file_offset, download_bytes,
remote_filename)
def list_one_group(self, group_name):
'''
List one group information.
arguments:
@group_name: string, group name will be list
@return Group_info, instance
'''
tc = Tracker_client(self.tracker_pool)
return tc.tracker_list_one_group(group_name)
def list_servers(self, group_name, storage_ip=None):
'''
List all storage servers information in a group
arguments:
@group_name: string
@return dictionary {
'Group name' : group_name,
'Servers' : server list,
}
'''
tc = Tracker_client(self.tracker_pool)
return tc.tracker_list_servers(group_name, storage_ip)
def list_all_groups(self):
'''
List all group information.
@return dictionary {
'Groups count' : group_count,
'Groups' : list of groups
}
'''
tc = Tracker_client(self.tracker_pool)
return tc.tracker_list_all_groups()
def get_meta_data(self, remote_file_id):
'''
Get meta data of remote file.
arguments:
@remote_fileid: string, remote file id
@return dictionary, meta data
'''
tmp = split_remote_fileid(remote_file_id)
if not tmp:
raise DataError('[-] Error: remote_file_id is invalid.(in get meta data)')
group_name, remote_filename = tmp
tc = Tracker_client(self.tracker_pool)
store_serv = tc.tracker_query_storage_update(group_name, remote_filename)
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
return store.storage_get_metadata(tc, store_serv, remote_filename)
def set_meta_data(self, remote_file_id, meta_dict, op_flag=STORAGE_SET_METADATA_FLAG_OVERWRITE):
'''
Set meta data of remote file.
arguments:
@remote_file_id: string
@meta_dict: dictionary
@op_flag: char, 'O' for overwrite, 'M' for merge
@return dictionary {
'Status' : status,
'Storage IP' : storage_ip
}
'''
tmp = split_remote_fileid(remote_file_id)
if not tmp:
raise DataError('[-] Error: remote_file_id is invalid.(in set meta data)')
group_name, remote_filename = tmp
tc = Tracker_client(self.tracker_pool)
try:
store_serv = tc.tracker_query_storage_update(group_name, remote_filename)
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
status = store.storage_set_metadata(tc, store_serv, remote_filename, meta_dict)
except (ConnectionError, ResponseError, DataError):
raise
# if status == 2:
# raise DataError('[-] Error: remote file %s is not exist.' % remote_file_id)
if status != 0:
raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
ret_dict = {}
ret_dict['Status'] = 'Set meta data success.'
ret_dict['Storage IP'] = store_serv.ip_addr
return ret_dict
def append_by_filename(self, local_filename, remote_fileid):
isfile, errmsg = fdfs_check_file(local_filename)
if not isfile:
raise DataError(errmsg + '(append)')
tmp = split_remote_fileid(remote_fileid)
if not tmp:
raise DataError('[-] Error: remote_file_id is invalid.(append)')
group_name, appended_filename = tmp
tc = Tracker_client(self.tracker_pool)
store_serv = tc.tracker_query_storage_update(group_name, appended_filename)
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
return store.storage_append_by_filename(tc, store_serv, local_filename, appended_filename)
def append_by_file(self, local_filename, remote_fileid):
isfile, errmsg = fdfs_check_file(local_filename)
if not isfile:
raise DataError(errmsg + '(append)')
tmp = split_remote_fileid(remote_fileid)
if not tmp:
raise DataError('[-] Error: remote_file_id is invalid.(append)')
group_name, appended_filename = tmp
tc = Tracker_client(self.tracker_pool)
store_serv = tc.tracker_query_storage_update(group_name, appended_filename)
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
return store.storage_append_by_file(tc, store_serv, local_filename, appended_filename)
def append_by_buffer(self, file_buffer, remote_fileid):
if not file_buffer:
raise DataError('[-] Error: file_buffer can not be null.')
tmp = split_remote_fileid(remote_fileid)
if not tmp:
raise DataError('[-] Error: remote_file_id is invalid.(append)')
group_name, appended_filename = tmp
tc = Tracker_client(self.tracker_pool)
store_serv = tc.tracker_query_storage_update(group_name, appended_filename)
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
return store.storage_append_by_buffer(tc, store_serv, file_buffer, appended_filename)
def truncate_file(self, truncated_filesize, appender_fileid):
'''
Truncate file in Storage server.
arguments:
@truncated_filesize: long
@appender_fileid: remote_fileid
@return: dictionary {
'Status' : 'Truncate successed.',
'Storage IP' : storage_ip
}
'''
trunc_filesize = int(truncated_filesize)
tmp = split_remote_fileid(appender_fileid)
if not tmp:
raise DataError('[-] Error: appender_fileid is invalid.(truncate)')
group_name, appender_filename = tmp
tc = Tracker_client(self.tracker_pool)
store_serv = tc.tracker_query_storage_update(group_name, appender_filename)
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
return store.storage_truncate_file(tc, store_serv, trunc_filesize, appender_filename)
def modify_by_filename(self, filename, appender_fileid, offset=0):
'''
Modify a file in Storage server by file.
arguments:
@filename: string, local file name
@offset: long, file offset
@appender_fileid: string, remote file id
@return: dictionary {
'Status' : 'Modify successed.',
'Storage IP' : storage_ip
}
'''
isfile, errmsg = fdfs_check_file(filename)
if not isfile:
raise DataError(errmsg + '(modify)')
filesize = os.stat(filename).st_size
tmp = split_remote_fileid(appender_fileid)
if not tmp:
raise DataError('[-] Error: remote_fileid is invalid.(modify)')
group_name, appender_filename = tmp
if not offset:
file_offset = int(offset)
else:
file_offset = 0
tc = Tracker_client(self.tracker_pool)
store_serv = tc.tracker_query_storage_update(group_name, appender_filename)
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
return store.storage_modify_by_filename(tc, store_serv, filename, file_offset, filesize, appender_filename)
def modify_by_file(self, filename, appender_fileid, offset=0):
'''
Modify a file in Storage server by file.
arguments:
@filename: string, local file name
@offset: long, file offset
@appender_fileid: string, remote file id
@return: dictionary {
'Status' : 'Modify successed.',
'Storage IP' : storage_ip
}
'''
isfile, errmsg = fdfs_check_file(filename)
if not isfile:
raise DataError(errmsg + '(modify)')
filesize = os.stat(filename).st_size
tmp = split_remote_fileid(appender_fileid)
if not tmp:
raise DataError('[-] Error: remote_fileid is invalid.(modify)')
group_name, appender_filename = tmp
if not offset:
file_offset = int(offset)
else:
file_offset = 0
tc = Tracker_client(self.tracker_pool)
store_serv = tc.tracker_query_storage_update(group_name, appender_filename)
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
return store.storage_modify_by_file(tc, store_serv, filename, file_offset, filesize, appender_filename)
def modify_by_buffer(self, filebuffer, appender_fileid, offset=0):
'''
Modify a file in Storage server by buffer.
arguments:
@filebuffer: string, file buffer
@offset: long, file offset
@appender_fileid: string, remote file id
@return: dictionary {
'Status' : 'Modify successed.',
'Storage IP' : storage_ip
}
'''
if not filebuffer:
raise DataError('[-] Error: filebuffer can not be null.(modify)')
filesize = len(filebuffer)
tmp = split_remote_fileid(appender_fileid)
if not tmp:
raise DataError('[-] Error: remote_fileid is invalid.(modify)')
group_name, appender_filename = tmp
if not offset:
file_offset = int(offset)
else:
file_offset = 0
tc = Tracker_client(self.tracker_pool)
store_serv = tc.tracker_query_storage_update(group_name, appender_filename)
store = Storage_client(store_serv.ip_addr, store_serv.port, self.timeout)
return store.storage_modify_by_buffer(tc, store_serv, filebuffer, file_offset, filesize, appender_filename)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# filename: connection.py
import socket
import os
import sys
import time
import random
from itertools import chain
from fdfs_client.exceptions import (
FDFSError,
ConnectionError,
ResponseError,
InvaildResponse,
DataError
)
# start class Connection
class Connection(object):
'''Manage TCP comunication to and from Fastdfs Server.'''
def __init__(self, **conn_kwargs):
self.pid = os.getpid()
self.host_tuple = conn_kwargs['host_tuple']
self.remote_port = conn_kwargs['port']
self.remote_addr = None
self.timeout = conn_kwargs['timeout']
self._sock = None
def __del__(self):
try:
self.disconnect()
except:
pass
def connect(self):
'''Connect to fdfs server.'''
if self._sock:
return
try:
sock = self._connect()
except socket.error as e:
raise ConnectionError(self._errormessage(e))
self._sock = sock
# print '[+] Create a connection success.'
# print '\tLocal address is %s:%s.' % self._sock.getsockname()
# print '\tRemote address is %s:%s' % (self.remote_addr, self.remote_port)
def _connect(self):
'''Create TCP socket. The host is random one of host_tuple.'''
self.remote_addr = random.choice(self.host_tuple)
# print '[+] Connecting... remote: %s:%s' % (self.remote_addr, self.remote_port)
# sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# sock.settimeout(self.timeout)
sock = socket.create_connection((self.remote_addr, self.remote_port), self.timeout)
return sock
def disconnect(self):
'''Disconnect from fdfs server.'''
if self._sock is None:
return
try:
self._sock.close()
except socket.error as e:
raise ConnectionError(self._errormessage(e))
self._sock = None
def get_sock(self):
return self._sock
def _errormessage(self, exception):
# args for socket.error can either be (errno, "message")
# or just "message" '''
if len(exception.args) == 1:
return "[-] Error: connect to %s:%s. %s." % (self.remote_addr, self.remote_port, exception.args[0])
else:
return "[-] Error: %s connect to %s:%s. %s." % \
(exception.args[0], self.remote_addr, self.remote_port, exception.args[1])
# end class Connection
# start ConnectionPool
class ConnectionPool(object):
'''Generic Connection Pool'''
def __init__(self, name='', conn_class=Connection,
max_conn=None, **conn_kwargs):
self.pool_name = name
self.pid = os.getpid()
self.conn_class = conn_class
self.max_conn = max_conn or 2 ** 31
self.conn_kwargs = conn_kwargs
self._conns_created = 0
self._conns_available = []
self._conns_inuse = set()
# print '[+] Create a connection pool success, name: %s.' % self.pool_name
def _check_pid(self):
if self.pid != os.getpid():
self.destroy()
self.__init__(self.conn_class, self.max_conn, **self.conn_kwargs)
def make_conn(self):
'''Create a new connection.'''
if self._conns_created >= self.max_conn:
raise ConnectionError('[-] Error: Too many connections.')
num_try = 10
while True:
try:
if num_try <= 0:
sys.exit()
conn_instance = self.conn_class(**self.conn_kwargs)
conn_instance.connect()
self._conns_created += 1
break
except ConnectionError as e:
print(e)
num_try -= 1
conn_instance = None
return conn_instance
def get_connection(self):
'''Get a connection from pool.'''
self._check_pid()
try:
conn = self._conns_available.pop()
# print '[+] Get a connection from pool %s.' % self.pool_name
# print '\tLocal address is %s:%s.' % conn._sock.getsockname()
# print '\tRemote address is %s:%s' % (conn.remote_addr, conn.remote_port)
except IndexError:
conn = self.make_conn()
self._conns_inuse.add(conn)
return conn
def remove(self, conn):
'''Remove connection from pool.'''
if conn in self._conns_inuse:
self._conns_inuse.remove(conn)
self._conns_created -= 1
if conn in self._conns_available:
self._conns_available.remove(conn)
self._conns_created -= 1
def destroy(self):
'''Disconnect all connections in the pool.'''
all_conns = chain(self._conns_inuse, self._conns_available)
for conn in all_conns:
conn.disconnect()
# print '[-] Destroy connection pool %s.' % self.pool_name
def release(self, conn):
'''Release the connection back to the pool.'''
self._check_pid()
if conn.pid == self.pid:
self._conns_inuse.remove(conn)
self._conns_available.append(conn)
# print '[-] Release connection back to pool %s.' % self.pool_name
# end ConnectionPool class
def tcp_recv_response(conn, bytes_size, buffer_size=4096):
'''Receive response from server.
It is not include tracker header.
arguments:
@conn: connection
@bytes_size: int, will be received byte_stream size
@buffer_size: int, receive buffer size
@Return: tuple,(response, received_size)
'''
recv_buff = []
total_size = 0
try:
while bytes_size > 0:
resp = conn._sock.recv(buffer_size)
recv_buff.append(resp)
total_size += len(resp)
bytes_size -= len(resp)
except (socket.error, socket.timeout) as e:
raise ConnectionError('[-] Error: while reading from socket: (%s)' % e.args)
return (b''.join(recv_buff), total_size)
def tcp_send_data(conn, bytes_stream):
'''Send buffer to server.
It is not include tracker header.
arguments:
@conn: connection
@bytes_stream: trasmit buffer
@Return bool
'''
try:
conn._sock.sendall(bytes_stream)
except (socket.error, socket.timeout) as e:
raise ConnectionError('[-] Error: while writting to socket: (%s)' % e.args)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# filename: exceptions.py
'''Core exceptions raised by fdfs client'''
class FDFSError(Exception):
pass
class ConnectionError(FDFSError):
pass
class ResponseError(FDFSError):
pass
class InvaildResponse(FDFSError):
pass
class DataError(FDFSError):
pass
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# filename: fdfs_protol.py
import struct
import socket
from fdfs_client.exceptions import (
FDFSError,
ConnectionError,
ResponseError,
InvaildResponse,
DataError
)
# define FDFS protol constans
TRACKER_PROTO_CMD_STORAGE_JOIN = 81
FDFS_PROTO_CMD_QUIT = 82
TRACKER_PROTO_CMD_STORAGE_BEAT = 83 # storage heart beat
TRACKER_PROTO_CMD_STORAGE_REPORT_DISK_USAGE = 84 # report disk usage
TRACKER_PROTO_CMD_STORAGE_REPLICA_CHG = 85 # repl new storage servers
TRACKER_PROTO_CMD_STORAGE_SYNC_SRC_REQ = 86 # src storage require sync
TRACKER_PROTO_CMD_STORAGE_SYNC_DEST_REQ = 87 # dest storage require sync
TRACKER_PROTO_CMD_STORAGE_SYNC_NOTIFY = 88 # sync done notify
TRACKER_PROTO_CMD_STORAGE_SYNC_REPORT = 89 # report src last synced time as dest server
TRACKER_PROTO_CMD_STORAGE_SYNC_DEST_QUERY = 79 # dest storage query sync src storage server
TRACKER_PROTO_CMD_STORAGE_REPORT_IP_CHANGED = 78 # storage server report it's ip changed
TRACKER_PROTO_CMD_STORAGE_CHANGELOG_REQ = 77 # storage server request storage server's changelog
TRACKER_PROTO_CMD_STORAGE_REPORT_STATUS = 76 # report specified storage server status
TRACKER_PROTO_CMD_STORAGE_PARAMETER_REQ = 75 # storage server request parameters
TRACKER_PROTO_CMD_STORAGE_REPORT_TRUNK_FREE = 74 # storage report trunk free space
TRACKER_PROTO_CMD_STORAGE_REPORT_TRUNK_FID = 73 # storage report current trunk file id
TRACKER_PROTO_CMD_STORAGE_FETCH_TRUNK_FID = 72 # storage get current trunk file id
TRACKER_PROTO_CMD_TRACKER_GET_SYS_FILES_START = 61 # start of tracker get system data files
TRACKER_PROTO_CMD_TRACKER_GET_SYS_FILES_END = 62 # end of tracker get system data files
TRACKER_PROTO_CMD_TRACKER_GET_ONE_SYS_FILE = 63 # tracker get a system data file
TRACKER_PROTO_CMD_TRACKER_GET_STATUS = 64 # tracker get status of other tracker
TRACKER_PROTO_CMD_TRACKER_PING_LEADER = 65 # tracker ping leader
TRACKER_PROTO_CMD_TRACKER_NOTIFY_NEXT_LEADER = 66 # notify next leader to other trackers
TRACKER_PROTO_CMD_TRACKER_COMMIT_NEXT_LEADER = 67 # commit next leader to other trackers
TRACKER_PROTO_CMD_SERVER_LIST_ONE_GROUP = 90
TRACKER_PROTO_CMD_SERVER_LIST_ALL_GROUPS = 91
TRACKER_PROTO_CMD_SERVER_LIST_STORAGE = 92
TRACKER_PROTO_CMD_SERVER_DELETE_STORAGE = 93
TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ONE = 101
TRACKER_PROTO_CMD_SERVICE_QUERY_FETCH_ONE = 102
TRACKER_PROTO_CMD_SERVICE_QUERY_UPDATE = 103
TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITH_GROUP_ONE = 104
TRACKER_PROTO_CMD_SERVICE_QUERY_FETCH_ALL = 105
TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ALL = 106
TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITH_GROUP_ALL = 107
TRACKER_PROTO_CMD_RESP = 100
FDFS_PROTO_CMD_ACTIVE_TEST = 111 # active test, tracker and storage both support since V1.28
STORAGE_PROTO_CMD_REPORT_CLIENT_IP = 9 # ip as tracker client
STORAGE_PROTO_CMD_UPLOAD_FILE = 11
STORAGE_PROTO_CMD_DELETE_FILE = 12
STORAGE_PROTO_CMD_SET_METADATA = 13
STORAGE_PROTO_CMD_DOWNLOAD_FILE = 14
STORAGE_PROTO_CMD_GET_METADATA = 15
STORAGE_PROTO_CMD_SYNC_CREATE_FILE = 16
STORAGE_PROTO_CMD_SYNC_DELETE_FILE = 17
STORAGE_PROTO_CMD_SYNC_UPDATE_FILE = 18
STORAGE_PROTO_CMD_SYNC_CREATE_LINK = 19
STORAGE_PROTO_CMD_CREATE_LINK = 20
STORAGE_PROTO_CMD_UPLOAD_SLAVE_FILE = 21
STORAGE_PROTO_CMD_QUERY_FILE_INFO = 22
STORAGE_PROTO_CMD_UPLOAD_APPENDER_FILE = 23 # create appender file
STORAGE_PROTO_CMD_APPEND_FILE = 24 # append file
STORAGE_PROTO_CMD_SYNC_APPEND_FILE = 25
STORAGE_PROTO_CMD_FETCH_ONE_PATH_BINLOG = 26 # fetch binlog of one store path
STORAGE_PROTO_CMD_RESP = TRACKER_PROTO_CMD_RESP
STORAGE_PROTO_CMD_UPLOAD_MASTER_FILE = STORAGE_PROTO_CMD_UPLOAD_FILE
STORAGE_PROTO_CMD_TRUNK_ALLOC_SPACE = 27 # since V3.00
STORAGE_PROTO_CMD_TRUNK_ALLOC_CONFIRM = 28 # since V3.00
STORAGE_PROTO_CMD_TRUNK_FREE_SPACE = 29 # since V3.00
STORAGE_PROTO_CMD_TRUNK_SYNC_BINLOG = 30 # since V3.00
STORAGE_PROTO_CMD_TRUNK_GET_BINLOG_SIZE = 31 # since V3.07
STORAGE_PROTO_CMD_TRUNK_DELETE_BINLOG_MARKS = 32 # since V3.07
STORAGE_PROTO_CMD_TRUNK_TRUNCATE_BINLOG_FILE = 33 # since V3.07
STORAGE_PROTO_CMD_MODIFY_FILE = 34 # since V3.08
STORAGE_PROTO_CMD_SYNC_MODIFY_FILE = 35 # since V3.08
STORAGE_PROTO_CMD_TRUNCATE_FILE = 36 # since V3.08
STORAGE_PROTO_CMD_SYNC_TRUNCATE_FILE = 37 # since V3.08
# for overwrite all old metadata
STORAGE_SET_METADATA_FLAG_OVERWRITE = 'O'
STORAGE_SET_METADATA_FLAG_OVERWRITE_STR = "O"
# for replace, insert when the meta item not exist, otherwise update it
STORAGE_SET_METADATA_FLAG_MERGE = 'M'
STORAGE_SET_METADATA_FLAG_MERGE_STR = "M"
FDFS_RECORD_SEPERATOR = '\x01'
FDFS_FIELD_SEPERATOR = '\x02'
# common constants
FDFS_GROUP_NAME_MAX_LEN = 16
IP_ADDRESS_SIZE = 16
FDFS_PROTO_PKG_LEN_SIZE = 8
FDFS_PROTO_CMD_SIZE = 1
FDFS_PROTO_STATUS_SIZE = 1
FDFS_PROTO_IP_PORT_SIZE = (IP_ADDRESS_SIZE + 6)
FDFS_MAX_SERVERS_EACH_GROUP = 32
FDFS_MAX_GROUPS = 512
FDFS_MAX_TRACKERS = 16
FDFS_DOMAIN_NAME_MAX_LEN = 128
FDFS_MAX_META_NAME_LEN = 64
FDFS_MAX_META_VALUE_LEN = 256
FDFS_FILE_PREFIX_MAX_LEN = 16
FDFS_LOGIC_FILE_PATH_LEN = 10
FDFS_TRUE_FILE_PATH_LEN = 6
FDFS_FILENAME_BASE64_LENGTH = 27
FDFS_TRUNK_FILE_INFO_LEN = 16
FDFS_FILE_EXT_NAME_MAX_LEN = 6
FDFS_SPACE_SIZE_BASE_INDEX = 2 # storage space size based (MB)
FDFS_UPLOAD_BY_BUFFER = 1
FDFS_UPLOAD_BY_FILENAME = 2
FDFS_UPLOAD_BY_FILE = 3
FDFS_DOWNLOAD_TO_BUFFER = 1
FDFS_DOWNLOAD_TO_FILE = 2
FDFS_NORMAL_LOGIC_FILENAME_LENGTH = (
FDFS_LOGIC_FILE_PATH_LEN + FDFS_FILENAME_BASE64_LENGTH + FDFS_FILE_EXT_NAME_MAX_LEN + 1)
FDFS_TRUNK_FILENAME_LENGTH = (
FDFS_TRUE_FILE_PATH_LEN + FDFS_FILENAME_BASE64_LENGTH + FDFS_TRUNK_FILE_INFO_LEN + 1 + FDFS_FILE_EXT_NAME_MAX_LEN)
FDFS_TRUNK_LOGIC_FILENAME_LENGTH = (FDFS_TRUNK_FILENAME_LENGTH + (FDFS_LOGIC_FILE_PATH_LEN - FDFS_TRUE_FILE_PATH_LEN))
FDFS_VERSION_SIZE = 6
TRACKER_QUERY_STORAGE_FETCH_BODY_LEN = (FDFS_GROUP_NAME_MAX_LEN + IP_ADDRESS_SIZE - 1 + FDFS_PROTO_PKG_LEN_SIZE)
TRACKER_QUERY_STORAGE_STORE_BODY_LEN = (FDFS_GROUP_NAME_MAX_LEN + IP_ADDRESS_SIZE - 1 + FDFS_PROTO_PKG_LEN_SIZE + 1)
# status code, order is important!
FDFS_STORAGE_STATUS_INIT = 0
FDFS_STORAGE_STATUS_WAIT_SYNC = 1
FDFS_STORAGE_STATUS_SYNCING = 2
FDFS_STORAGE_STATUS_IP_CHANGED = 3
FDFS_STORAGE_STATUS_DELETED = 4
FDFS_STORAGE_STATUS_OFFLINE = 5
FDFS_STORAGE_STATUS_ONLINE = 6
FDFS_STORAGE_STATUS_ACTIVE = 7
FDFS_STORAGE_STATUS_RECOVERY = 9
FDFS_STORAGE_STATUS_NONE = 99
class Storage_server(object):
'''Class storage server for upload.'''
def __init__(self):
self.ip_addr = None
self.port = None
self.group_name = ''
self.store_path_index = 0
# Class tracker_header
class Tracker_header(object):
'''
Class for Pack or Unpack tracker header
struct tracker_header{
char pkg_len[FDFS_PROTO_PKG_LEN_SIZE],
char cmd,
char status,
}
'''
def __init__(self):
self.fmt = '!QBB' # pkg_len[FDFS_PROTO_PKG_LEN_SIZE] + cmd + status
self.st = struct.Struct(self.fmt)
self.pkg_len = 0
self.cmd = 0
self.status = 0
def _pack(self, pkg_len=0, cmd=0, status=0):
return self.st.pack(pkg_len, cmd, status)
def _unpack(self, bytes_stream):
self.pkg_len, self.cmd, self.status = self.st.unpack(bytes_stream)
return True
def header_len(self):
return self.st.size
def send_header(self, conn):
'''Send Tracker header to server.'''
header = self._pack(self.pkg_len, self.cmd, self.status)
try:
conn._sock.sendall(header)
except (socket.error, socket.timeout) as e:
raise ConnectionError('[-] Error: while writting to socket: %s' % (e.args,))
def recv_header(self, conn):
'''Receive response from server.
if sucess, class member (pkg_len, cmd, status) is response.
'''
try:
header = conn._sock.recv(self.header_len())
except (socket.error, socket.timeout) as e:
raise ConnectionError('[-] Error: while reading from socket: %s' % (e.args,))
self._unpack(header)
def fdfs_pack_metadata(meta_dict):
ret = ''
for key in meta_dict:
ret += '%s%c%s%c' % (key, FDFS_FIELD_SEPERATOR, meta_dict[key], FDFS_RECORD_SEPERATOR)
return ret[0:-1]
def fdfs_unpack_metadata(bytes_stream):
li = bytes_stream.split(FDFS_RECORD_SEPERATOR)
return dict([item.split(FDFS_FIELD_SEPERATOR) for item in li])
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# filename: fdfs_test.py
import os
import sys
import time
try:
from fdfs_client.client import *
from fdfs_client.exceptions import *
except ImportError:
import_path = os.path.abspath('../')
sys.path.append(import_path)
from fdfs_client.client import *
from fdfs_client.exceptions import *
def usage():
s = 'Usage: python fdfs_test.py {options} [{local_filename} [{remote_file_id}]]\n'
s += 'options: upfile, upbuffer, downfile, downbuffer, delete, listgroup, listserv\n'
s += ' upslavefile, upslavebuffer, upappendfile, upappendbuffer\n'
s += '\tupfile {local_filename}\n'
s += '\tupbuffer {local_filename}\n'
s += '\tdownfile {local_filename} {remote_file_id}\n'
s += '\tdownbuffer {remote_file_id}\n'
s += '\tdelete {remote_file_id}\n'
s += '\tlistgroup {group_name}\n'
s += '\tlistall \n'
s += '\tlistsrv {group_name} [storage_ip]\n'
s += '\tsetmeta {remote_file_id}\n'
s += '\tgetmeta {remote_file_id}\n'
s += '\tupslavefile {local_filename} {remote_fileid} {prefix_name}\n'
s += '\tupappendfile {local_filename}\n'
s += '\ttruncate {truncate_filesize} {remote_fileid}\n'
s += '\tmodifyfile {local_filename} {remote_fileid} {file_offset}\n'
s += '\tmodifybuffer {local_filename} {remote_fileid} {file_offset}\n'
s += 'e.g.: python fdfs_test.py upfile test'
print(s)
sys.exit(0)
if len(sys.argv) < 2:
usage()
client = Fdfs_client('client.conf')
def upfile_func():
# Upload by filename
# usage: python fdfs_test.py upfile {local_filename}
if len(sys.argv) < 3:
usage()
return None
try:
local_filename = sys.argv[2]
file_size = os.stat(local_filename).st_size
# meta_buffer can be null.
meta_dict = {
'ext_name': 'py',
'file_size': str(file_size) + 'B'
}
t1 = time.time()
ret_dict = client.upload_by_filename(local_filename, meta_dict)
t2 = time.time()
for key in ret_dict:
print('[+] %s : %s' % (key, ret_dict[key]))
print('[+] time consume: %fs' % (t2 - t1))
except (ConnectionError, ResponseError, DataError) as e:
print(e)
def upfileex_func():
# Upload by file
# usage: python fdfs_test.py upfileex {local_filename}
if len(sys.argv) < 3:
usage()
return None
try:
local_filename = sys.argv[2]
t1 = time.time()
ret_dict = client.upload_by_file(local_filename)
t2 = time.time()
for key in ret_dict:
print('[+] %s : %s' % (key, ret_dict[key]))
print('[+] time consume: %fs' % (t2 - t1))
except (ConnectionError, ResponseError, DataError) as e:
print(e)
def upslavefile_func():
# upload slave file
# usage: python fdfs_test.py upslavefile {local_filename} {remote_fileid} {prefix_name}
if len(sys.argv) < 5:
usage()
return None
try:
local_filename = sys.argv[2]
remote_fileid = sys.argv[3]
prefix_name = sys.argv[4]
ret_dict = client.upload_slave_by_file(local_filename, remote_fileid, \
prefix_name)
for key in ret_dict:
print('[+] %s : %s' % (key, ret_dict[key]))
except (ConnectionError, ResponseError, DataError) as e:
print(e)
def upslavebuffer_func():
# upload slave by buffer
# usage: python fdfs_test.py upslavebuffer {local_filename} {remote_fileid} {prefix_name}
if len(sys.argv) < 5:
usage()
return None
try:
local_filename = sys.argv[2]
remote_fileid = sys.argv[3]
prefix_name = sys.argv[4]
with open(local_filename, 'rb') as f:
filebuffer = f.read()
ret_dict = client.upload_slave_by_buffer(local_filename, \
remote_fileid, prefix_name)
for key in ret_dict:
print('[+] %s : %s' % (key, ret_dict[key]))
except (ConnectionError, ResponseError, DataError) as e:
print(e)
def del_func():
# delete file
# usage: python fdfs_test.py delete {remote_fileid}
if len(sys.argv) < 3:
usage()
return None
try:
remote_file_id = sys.argv[2]
ret_tuple = client.delete_file(remote_file_id)
print('[+] %s' % ret_tuple[0])
print('[+] remote_fileid: %s' % ret_tuple[1])
print('[+] Storage IP: %s' % ret_tuple[2])
except (ConnectionError, ResponseError, DataError) as e:
print(e)
def downfile_func():
# Download to file
# usage: python fdfs_test.py downfile {local_filename} {remote_fileid}
if len(sys.argv) < 3:
usage()
return None
try:
local_filename = sys.argv[2]
remote_fileid = sys.argv[3]
ret_dict = client.download_to_file(local_filename, remote_fileid)
for key in ret_dict:
print('[+] %s : %s' % (key, ret_dict[key]))
except (ConnectionError, ResponseError, DataError) as e:
print(e)
def list_group_func():
# List one group info
# usage: python fdfs_test.py listgroup {group_name}
if len(sys.argv) < 3:
usage()
return None
try:
group_name = sys.argv[2]
ret = client.list_one_group(group_name)
print(ret)
except (ConnectionError, ResponseError, DataError) as e:
print(e)
def listall_func():
# List all group info
# usage: python fdfs_test.py listall
if len(sys.argv) < 2:
usage()
return None
try:
ret_dict = client.list_all_groups()
print('=' * 80)
print('Groups count:', ret_dict['Groups count'])
for li in ret_dict['Groups']:
print('-' * 80)
print(li)
print('-' * 80)
except (ConnectionError, ResponseError, DataError) as e:
print(e)
def list_server_func():
# List all servers info of group
# usage: python fdfs_test.py listsrv {group_name} [storage_ip]
if len(sys.argv) < 3:
usage()
return None
try:
group_name = sys.argv[2]
if len(sys.argv) > 3:
storage_ip = sys.argv[3]
else:
storage_ip = None
ret_dict = client.list_servers(group_name, storage_ip)
print('=' * 80)
print('Group name: %s' % ret_dict['Group name'])
print('=' * 80)
i = 1
for serv in ret_dict['Servers']:
print('Storage server %d:' % i)
print('=' * 80)
print(serv)
i += 1
print('=' * 80)
except (ConnectionError, ResponseError, DataError) as e:
print(e)
def upbuffer_func():
# Upload by buffer
# usage: python fdfs_test.py upbuffer {local_filename} [remote_file_ext_name]
if len(sys.argv) < 3:
usage()
return None
local_filename = sys.argv[2]
if len(sys.argv) > 3:
ext_name = sys.argv[3]
else:
ext_name = None
# meta_buffer can be null.
meta_buffer = {
'ext_name': 'gif',
'width': '150px',
'height': '80px'
}
try:
with open(local_filename, 'rb') as f:
file_buffer = f.read()
ret_dict = client.upload_by_buffer(file_buffer, ext_name, meta_buffer)
for key in ret_dict:
print('[+] %s : %s' % (key, ret_dict[key]))
except (ConnectionError, ResponseError, DataError) as e:
print(e)
def downbuffer_func():
# Download to buffer
# usage: python fdfs_test.py downbuffer {remote_file_id}
# e.g.: 'group1/M00/00/00/wKjzhU_rLNmjo2-1AAAamGDONEA5818.py'
if len(sys.argv) < 3:
usage()
return None
remote_fileid = sys.argv[2]
try:
ret_dict = client.download_to_buffer(remote_fileid)
print('Downloaded content:')
print(ret_dict['Content'])
except (ConnectionError, ResponseError, DataError) as e:
print(e)
def get_meta_data_func():
# Get meta data of remote file
# usage python fdfs_test.py getmeta {remote_file_id}
if len(sys.argv) < 3:
usage()
return None
remote_fileid = sys.argv[2]
try:
ret_dict = client.get_meta_data(remote_fileid)
print(ret_dict)
except (ConnectionError, ResponseError, DataError) as e:
print(e)
def set_meta_data_func():
# Set meta data of remote file
# usage python fdfs_test.py setmeta {remote_file_id}
if len(sys.argv) < 3:
usage()
return None
remote_fileid = sys.argv[2]
meta_dict = {
'ext_name': 'jgp',
'width': '160px',
'hight': '80px',
}
try:
ret_dict = client.set_meta_data(remote_fileid, meta_dict)
for key in ret_dict:
print('[+] %s : %s' % (key, ret_dict[key]))
except (ConnectionError, ResponseError, DataError) as e:
print(e)
def upappendfile_func():
# Upload an appender file by filename
# usage: python fdfs_test.py upappendfile {local_filename}
if len(sys.argv) < 3:
usage()
return None
local_filename = sys.argv[2]
try:
ret_dict = client.upload_appender_by_file(local_filename)
for key in ret_dict:
print('[+] %s : %s' % (key, ret_dict[key]))
except (ConnectionError, ResponseError, DataError) as e:
print(e)
def upappendbuffer_func():
# Upload an appender file by buffer
# usage: python fdfs_test.py upappendbuffer {local_filename}
if len(sys.argv) < 3:
usage()
return None
local_filename = sys.argv[2]
try:
with open(local_filename, 'rb') as f:
file_buffer = f.read()
ret_dict = client.upload_appender_by_buffer(file_buffer)
for key in ret_dict:
print('[+] %s : %s' % (key, ret_dict[key]))
except (ConnectionError, ResponseError, DataError) as e:
print(e)
def appendfile_func():
# Append a remote file
# usage: python fdfs_test.py appendfile {local_filename} {remote_file_id}
if len(sys.argv) < 4:
usage()
return None
local_filename = sys.argv[2]
remote_fileid = sys.argv[3]
try:
ret_dict = client.append_by_file(local_filename, remote_fileid)
for key in ret_dict:
print('[+] %s : %s' % (key, ret_dict[key]))
except (ConnectionError, ResponseError, DataError) as e:
print(e)
def appendbuffer_func():
# Append a remote file by buffer
# usage: python fdfs_test.py appendbuffer {local_filename} {remote_file_id}
if len(sys.argv) < 4:
usage()
return None
local_filename = sys.argv[2]
remote_fileid = sys.argv[3]
try:
with open(local_filename, 'rb') as f:
filebuffer = f.read()
ret_dict = client.append_by_buffer(filebuffer, remote_fileid)
for key in ret_dict:
print('[+] %s : %s' % (key, ret_dict[key]))
except (ConnectionError, ResponseError, DataError) as e:
print(e)
def truncate_func():
# Truncate file
# usage: python fdfs_test.py truncate {truncate_filesize} {remote_file_id}
if len(sys.argv) < 4:
usage()
return None
truncate_filesize = int(sys.argv[2])
remote_fileid = sys.argv[3]
try:
ret_dict = client.truncate_file(truncate_filesize, remote_fileid)
for key in ret_dict:
print('[+] %s : %s' % (key, ret_dict[key]))
except (ConnectionError, ResponseError, DataError) as e:
print(e)
def modifyfile_func():
# Modify file by filename
# usage: python fdfs_test.py modifyfile {local_filename} {remote_fileid} [file_offset]
if len(sys.argv) < 4:
usage()
return None
local_filename = sys.argv[2]
remote_fileid = sys.argv[3]
if len(sys.argv) > 4:
file_offset = int(sys.argv[4])
else:
file_offset = 0
try:
ret_dict = client.modify_by_filename(local_filename, remote_fileid, file_offset)
for key in ret_dict:
print('[+] %s : %s' % (key, ret_dict[key]))
except (ConnectionError, ResponseError, DataError) as e:
print(e)
def modifybuffer_func():
# Modify file by buffer
# usage: python fdfs_test.py modifybuffer {local_filename} {remote_fileid} [file_offset]
if len(sys.argv) < 4:
usage()
return None
local_filename = sys.argv[2]
remote_fileid = sys.argv[3]
if len(sys.argv) > 4:
file_offset = int(sys.argv[4])
else:
file_offset = 0
try:
with open(local_filename, 'rb') as f:
filebuffer = f.read()
ret_dict = client.modify_by_buffer(filebuffer, remote_fileid, file_offset)
for key in ret_dict:
print('[+] %s : %s' % (key, ret_dict[key]))
except (ConnectionError, ResponseError, DataError) as e:
print(e)
result = {
'upfile': lambda: upfile_func(),
'upfileex': lambda: upfileex_func(),
'upbuffer': lambda: upbuffer_func(),
'delete': lambda: del_func(),
'downfile': lambda: downfile_func(),
'downbuffer': lambda: downbuffer_func(),
'listgroup': lambda: list_group_func(),
'listall': lambda: listall_func(),
'listsrv': lambda: list_server_func(),
'getmeta': lambda: get_meta_data_func(),
'setmeta': lambda: set_meta_data_func(),
'upslavefile': lambda: upslavefile_func(),
'upappendfile': lambda: upappendfile_func(),
'upappendbuffer': lambda: upappendbuffer_func(),
'appendfile': lambda: appendfile_func(),
'appendbuffer': lambda: appendbuffer_func(),
'truncate': lambda: truncate_func(),
'modifyfile': lambda: modifyfile_func(),
'modifybuffer': lambda: modifybuffer_func(),
'-h': lambda: usage(),
}[sys.argv[1].lower()]()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# filename: storage_client.py
import os
import stat
import errno
import struct
import socket
import datetime
import platform
from fdfs_client.fdfs_protol import *
from fdfs_client.connection import *
# from test_fdfs.sendfile import *
from fdfs_client.exceptions import (
FDFSError,
ConnectionError,
ResponseError,
InvaildResponse,
DataError
)
from fdfs_client.utils import *
__os_sep__ = "/" if platform.system() == 'Windows' else os.sep
def tcp_send_file(conn, filename, buffer_size=1024):
'''
Send file to server, and split into multiple pkgs while sending.
arguments:
@conn: connection
@filename: string
@buffer_size: int ,send buffer size
@Return int: file size if success else raise ConnectionError.
'''
file_size = 0
with open(filename, 'rb') as f:
while 1:
try:
send_buffer = f.read(buffer_size)
send_size = len(send_buffer)
if send_size == 0:
break
tcp_send_data(conn, send_buffer)
file_size += send_size
except ConnectionError as e:
raise ConnectionError('[-] Error while uploading file(%s).' % e.args)
except IOError as e:
raise DataError('[-] Error while reading local file(%s).' % e.args)
return file_size
def tcp_send_file_ex(conn, filename, buffer_size=4096):
'''
Send file to server. Using linux system call 'sendfile'.
arguments:
@conn: connection
@filename: string
@return long, sended size
'''
if 'linux' not in sys.platform.lower():
raise DataError('[-] Error: \'sendfile\' system call only available on linux.')
nbytes = 0
offset = 0
sock_fd = conn.get_sock().fileno()
with open(filename, 'rb') as f:
in_fd = f.fileno()
while 1:
try:
pass
# sent = sendfile(sock_fd, in_fd, offset, buffer_size)
# if 0 == sent:
# break
# nbytes += sent
# offset += sent
except OSError as e:
if e.errno == errno.EAGAIN:
continue
raise
return nbytes
def tcp_recv_file(conn, local_filename, file_size, buffer_size=1024):
'''
Receive file from server, fragmented it while receiving and write to disk.
arguments:
@conn: connection
@local_filename: string
@file_size: int, remote file size
@buffer_size: int, receive buffer size
@Return int: file size if success else raise ConnectionError.
'''
total_file_size = 0
flush_size = 0
remain_bytes = file_size
with open(local_filename, 'wb+') as f:
while remain_bytes > 0:
try:
if remain_bytes >= buffer_size:
file_buffer, recv_size = tcp_recv_response(conn, buffer_size, buffer_size)
else:
file_buffer, recv_size = tcp_recv_response(conn, remain_bytes, buffer_size)
f.write(file_buffer)
remain_bytes -= buffer_size
total_file_size += recv_size
flush_size += recv_size
if flush_size >= 4096:
f.flush()
flush_size = 0
except ConnectionError as e:
raise ConnectionError('[-] Error: while downloading file(%s).' % e.args)
except IOError as e:
raise DataError('[-] Error: while writting local file(%s).' % e.args)
return total_file_size
class Storage_client(object):
'''
The Class Storage_client for storage server.
Note: argument host_tuple of storage server ip address, that should be a single element.
'''
def __init__(self, *kwargs):
conn_kwargs = {
'name': 'Storage Pool',
'host_tuple': (kwargs[0],),
'port': kwargs[1],
'timeout': kwargs[2]
}
self.pool = ConnectionPool(**conn_kwargs)
return None
def __del__(self):
try:
self.pool.destroy()
self.pool = None
except:
pass
def update_pool(self, old_store_serv, new_store_serv, timeout=30):
'''
Update connection pool of storage client.
We need update connection pool of storage client, while storage server is changed.
but if server not changed, we do nothing.
'''
if old_store_serv.ip_addr == new_store_serv.ip_addr:
return None
self.pool.destroy()
conn_kwargs = {
'name': 'Storage_pool',
'host_tuple': (new_store_serv.ip_addr,),
'port': new_store_serv.port,
'timeout': timeout
}
self.pool = ConnectionPool(**conn_kwargs)
return True
def _storage_do_upload_file(self, tracker_client, store_serv, file_buffer, file_size=None, upload_type=None,
meta_dict=None, cmd=None, master_filename=None, prefix_name=None, file_ext_name=None):
'''
core of upload file.
arguments:
@tracker_client: Tracker_client, it is useful connect to tracker server
@store_serv: Storage_server, it is return from query tracker server
@file_buffer: string, file name or file buffer for send
@file_size: int
@upload_type: int, optional: FDFS_UPLOAD_BY_FILE, FDFS_UPLOAD_BY_FILENAME,
FDFS_UPLOAD_BY_BUFFER
@meta_dic: dictionary, store metadata in it
@cmd: int, reference fdfs protol
@master_filename: string, useful upload slave file
@prefix_name: string
@file_ext_name: string
@Return dictionary
{
'Group name' : group_name,
'Remote file_id' : remote_file_id,
'Status' : status,
'Local file name' : local_filename,
'Uploaded size' : upload_size,
'Storage IP' : storage_ip
}
'''
store_conn = self.pool.get_connection()
th = Tracker_header()
master_filename_len = len(master_filename) if master_filename else 0
prefix_name_len = len(prefix_name) if prefix_name else 0
upload_slave = len(store_serv.group_name) and master_filename_len
file_ext_name = str(file_ext_name) if file_ext_name else ''
# non_slave_fmt |-store_path_index(1)-file_size(8)-file_ext_name(6)-|
non_slave_fmt = '!B Q %ds' % FDFS_FILE_EXT_NAME_MAX_LEN
# slave_fmt |-master_len(8)-file_size(8)-prefix_name(16)-file_ext_name(6)
# -master_name(master_filename_len)-|
slave_fmt = '!Q Q %ds %ds %ds' % (FDFS_FILE_PREFIX_MAX_LEN, FDFS_FILE_EXT_NAME_MAX_LEN, master_filename_len)
th.pkg_len = struct.calcsize(slave_fmt) if upload_slave else struct.calcsize(non_slave_fmt)
th.pkg_len += file_size
th.cmd = cmd
th.send_header(store_conn)
if upload_slave:
send_buffer = struct.pack(
slave_fmt, master_filename_len, file_size, prefix_name, file_ext_name, master_filename)
else:
send_buffer = struct.pack(non_slave_fmt, store_serv.store_path_index, file_size, file_ext_name.encode())
try:
tcp_send_data(store_conn, send_buffer)
if upload_type == FDFS_UPLOAD_BY_FILENAME:
send_file_size = tcp_send_file(store_conn, file_buffer)
elif upload_type == FDFS_UPLOAD_BY_BUFFER:
tcp_send_data(store_conn, file_buffer)
elif upload_type == FDFS_UPLOAD_BY_FILE:
send_file_size = tcp_send_file_ex(store_conn, file_buffer)
th.recv_header(store_conn)
if th.status != 0:
raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
recv_buffer, recv_size = tcp_recv_response(store_conn, th.pkg_len)
if recv_size <= FDFS_GROUP_NAME_MAX_LEN:
errmsg = '[-] Error: Storage response length is not match, '
errmsg += 'expect: %d, actual: %d' % (th.pkg_len, recv_size)
raise ResponseError(errmsg)
# recv_fmt: |-group_name(16)-remote_file_name(recv_size - 16)-|
recv_fmt = '!%ds %ds' % (FDFS_GROUP_NAME_MAX_LEN, th.pkg_len - FDFS_GROUP_NAME_MAX_LEN)
(group_name, remote_name) = struct.unpack(recv_fmt, recv_buffer)
remote_filename = remote_name.strip(b'\x00')
if meta_dict and len(meta_dict) > 0:
status = self.storage_set_metadata(tracker_client, store_serv, remote_filename, meta_dict)
if status != 0:
# rollback
self.storage_delete_file(tracker_client, store_serv, remote_filename)
raise DataError('[-] Error: %d, %s' % (status, os.strerror(status)))
except:
raise
finally:
self.pool.release(store_conn)
ret_dic = {
'Group name': group_name.strip(b'\x00'),
'Remote file_id': group_name.strip(b'\x00') + __os_sep__.encode() + remote_filename,
'Status': 'Upload successed.',
'Local file name': file_buffer if (upload_type == FDFS_UPLOAD_BY_FILENAME
or upload_type == FDFS_UPLOAD_BY_FILE
) else '',
'Uploaded size': appromix(send_file_size) if (upload_type == FDFS_UPLOAD_BY_FILENAME
or upload_type == FDFS_UPLOAD_BY_FILE
) else appromix(len(file_buffer)),
'Storage IP': store_serv.ip_addr
}
return ret_dic
def storage_upload_by_filename(self, tracker_client, store_serv, filename, meta_dict=None):
file_size = os.stat(filename).st_size
file_ext_name = get_file_ext_name(filename)
return self._storage_do_upload_file(tracker_client, store_serv, filename, file_size, FDFS_UPLOAD_BY_FILENAME,
meta_dict, STORAGE_PROTO_CMD_UPLOAD_FILE, None, None, file_ext_name)
def storage_upload_by_file(self, tracker_client, store_serv, filename, meta_dict=None):
file_size = os.stat(filename).st_size
file_ext_name = get_file_ext_name(filename)
return self._storage_do_upload_file(tracker_client, store_serv, filename, file_size, FDFS_UPLOAD_BY_FILE,
meta_dict, STORAGE_PROTO_CMD_UPLOAD_FILE, None, None, file_ext_name)
def storage_upload_by_buffer(self, tracker_client, store_serv, file_buffer, file_ext_name=None, meta_dict=None):
buffer_size = len(file_buffer)
return self._storage_do_upload_file(tracker_client, store_serv, file_buffer, buffer_size, FDFS_UPLOAD_BY_BUFFER,
meta_dict, STORAGE_PROTO_CMD_UPLOAD_FILE, None, None, file_ext_name)
def storage_upload_slave_by_filename(self, tracker_client, store_serv, filename, prefix_name, remote_filename,
meta_dict=None):
file_size = os.stat(filename).st_size
file_ext_name = get_file_ext_name(filename)
return self._storage_do_upload_file(tracker_client, store_serv, filename, file_size, FDFS_UPLOAD_BY_FILENAME,
meta_dict, STORAGE_PROTO_CMD_UPLOAD_SLAVE_FILE, remote_filename,
prefix_name, file_ext_name)
def storage_upload_slave_by_file(self, tracker_client, store_serv, filename, prefix_name, remote_filename,
meta_dict=None):
file_size = os.stat(filename).st_size
file_ext_name = get_file_ext_name(filename)
return self._storage_do_upload_file(tracker_client, store_serv, filename, file_size, FDFS_UPLOAD_BY_FILE,
meta_dict, STORAGE_PROTO_CMD_UPLOAD_SLAVE_FILE, remote_filename,
prefix_name, file_ext_name)
def storage_upload_slave_by_buffer(self, tracker_client, store_serv, filebuffer, remote_filename, meta_dict,
file_ext_name):
file_size = len(filebuffer)
return self._storage_do_upload_file(tracker_client, store_serv, filebuffer, file_size, FDFS_UPLOAD_BY_BUFFER,
meta_dict, STORAGE_PROTO_CMD_UPLOAD_SLAVE_FILE, None, remote_filename,
file_ext_name)
def storage_upload_appender_by_filename(self, tracker_client, store_serv, filename, meta_dict=None):
file_size = os.stat(filename).st_size
file_ext_name = get_file_ext_name(filename)
return self._storage_do_upload_file(tracker_client, store_serv, filename, file_size, FDFS_UPLOAD_BY_FILENAME,
meta_dict, STORAGE_PROTO_CMD_UPLOAD_APPENDER_FILE, None, None,
file_ext_name)
def storage_upload_appender_by_file(self, tracker_client, store_serv, filename, meta_dict=None):
file_size = os.stat(filename).st_size
file_ext_name = get_file_ext_name(filename)
return self._storage_do_upload_file(tracker_client, store_serv, filename, file_size, FDFS_UPLOAD_BY_FILE,
meta_dict, STORAGE_PROTO_CMD_UPLOAD_APPENDER_FILE, None, None,
file_ext_name)
def storage_upload_appender_by_buffer(self, tracker_client, store_serv, file_buffer, meta_dict=None,
file_ext_name=None):
file_size = len(file_buffer)
return self._storage_do_upload_file(tracker_client, store_serv, file_buffer, file_size, FDFS_UPLOAD_BY_BUFFER,
meta_dict, STORAGE_PROTO_CMD_UPLOAD_APPENDER_FILE, None, None,
file_ext_name)
def storage_delete_file(self, tracker_client, store_serv, remote_filename):
'''
Delete file from storage server.
'''
store_conn = self.pool.get_connection()
th = Tracker_header()
th.cmd = STORAGE_PROTO_CMD_DELETE_FILE
file_name_len = len(remote_filename)
th.pkg_len = FDFS_GROUP_NAME_MAX_LEN + file_name_len
try:
th.send_header(store_conn)
# del_fmt: |-group_name(16)-filename(len)-|
del_fmt = '!%ds %ds' % (FDFS_GROUP_NAME_MAX_LEN, file_name_len)
send_buffer = struct.pack(del_fmt, store_serv.group_name, remote_filename)
tcp_send_data(store_conn, send_buffer)
th.recv_header(store_conn)
# if th.status == 2:
# raise DataError('[-] Error: remote file %s is not exist.'
# % (store_serv.group_name + __os_sep__.encode() + remote_filename))
if th.status != 0:
raise DataError('Error: %d, %s' % (th.status, os.strerror(th.status)))
# recv_buffer, recv_size = tcp_recv_response(store_conn, th.pkg_len)
except:
raise
finally:
self.pool.release(store_conn)
remote_filename = store_serv.group_name + __os_sep__.encode() + remote_filename
return ('Delete file successed.', remote_filename, store_serv.ip_addr)
def _storage_do_download_file(self, tracker_client, store_serv, file_buffer, offset, download_size,
download_type, remote_filename):
'''
Core of download file from storage server.
You can choice download type, optional FDFS_DOWNLOAD_TO_FILE or
FDFS_DOWNLOAD_TO_BUFFER. And you can choice file offset.
@Return dictionary
'Remote file name' : remote_filename,
'Content' : local_filename or buffer,
'Download size' : download_size,
'Storage IP' : storage_ip
'''
store_conn = self.pool.get_connection()
th = Tracker_header()
remote_filename_len = len(remote_filename)
th.pkg_len = FDFS_PROTO_PKG_LEN_SIZE * 2 + FDFS_GROUP_NAME_MAX_LEN + remote_filename_len
th.cmd = STORAGE_PROTO_CMD_DOWNLOAD_FILE
try:
th.send_header(store_conn)
# down_fmt: |-offset(8)-download_bytes(8)-group_name(16)-remote_filename(len)-|
down_fmt = '!Q Q %ds %ds' % (FDFS_GROUP_NAME_MAX_LEN, remote_filename_len)
send_buffer = struct.pack(down_fmt, offset, download_size, store_serv.group_name, remote_filename)
tcp_send_data(store_conn, send_buffer)
th.recv_header(store_conn)
# if th.status == 2:
# raise DataError('[-] Error: remote file %s is not exist.' %
# (store_serv.group_name + __os_sep__.encode() + remote_filename))
if th.status != 0:
raise DataError('Error: %d %s' % (th.status, os.strerror(th.status)))
if download_type == FDFS_DOWNLOAD_TO_FILE:
total_recv_size = tcp_recv_file(store_conn, file_buffer, th.pkg_len)
elif download_type == FDFS_DOWNLOAD_TO_BUFFER:
recv_buffer, total_recv_size = tcp_recv_response(store_conn, th.pkg_len)
except:
raise
finally:
self.pool.release(store_conn)
ret_dic = {
'Remote file_id': store_serv.group_name + __os_sep__.encode() + remote_filename,
'Content': file_buffer if download_type == FDFS_DOWNLOAD_TO_FILE else recv_buffer,
'Download size': appromix(total_recv_size),
'Storage IP': store_serv.ip_addr
}
return ret_dic
def storage_download_to_file(self, tracker_client, store_serv, local_filename, file_offset, download_bytes,
remote_filename):
return self._storage_do_download_file(tracker_client, store_serv, local_filename, file_offset, download_bytes,
FDFS_DOWNLOAD_TO_FILE, remote_filename)
def storage_download_to_buffer(self, tracker_client, store_serv, file_buffer, file_offset, download_bytes,
remote_filename):
return self._storage_do_download_file(tracker_client, store_serv, file_buffer, file_offset, download_bytes,
FDFS_DOWNLOAD_TO_BUFFER, remote_filename)
def storage_set_metadata(self, tracker_client, store_serv, remote_filename, meta_dict,
op_flag=STORAGE_SET_METADATA_FLAG_OVERWRITE):
ret = 0
conn = self.pool.get_connection()
remote_filename_len = len(remote_filename)
meta_buffer = fdfs_pack_metadata(meta_dict)
meta_len = len(meta_buffer)
th = Tracker_header()
th.pkg_len = FDFS_PROTO_PKG_LEN_SIZE * 2 + 1 + FDFS_GROUP_NAME_MAX_LEN + remote_filename_len + meta_len
th.cmd = STORAGE_PROTO_CMD_SET_METADATA
try:
th.send_header(conn)
# meta_fmt: |-filename_len(8)-meta_len(8)-op_flag(1)-group_name(16)
# -filename(remote_filename_len)-meta(meta_len)|
meta_fmt = '!Q Q c %ds %ds %ds' % (FDFS_GROUP_NAME_MAX_LEN, remote_filename_len, meta_len)
send_buffer = struct.pack(meta_fmt, remote_filename_len, meta_len, op_flag, store_serv.group_name,
remote_filename, meta_buffer)
tcp_send_data(conn, send_buffer)
th.recv_header(conn)
if th.status != 0:
ret = th.status
except:
raise
finally:
self.pool.release(conn)
return ret
def storage_get_metadata(self, tracker_client, store_serv, remote_file_name):
store_conn = self.pool.get_connection()
th = Tracker_header()
remote_filename_len = len(remote_file_name)
th.pkg_len = FDFS_GROUP_NAME_MAX_LEN + remote_filename_len
th.cmd = STORAGE_PROTO_CMD_GET_METADATA
try:
th.send_header(store_conn)
# meta_fmt: |-group_name(16)-filename(remote_filename_len)-|
meta_fmt = '!%ds %ds' % (FDFS_GROUP_NAME_MAX_LEN, remote_filename_len)
send_buffer = struct.pack(meta_fmt, store_serv.group_name, remote_file_name.encode())
tcp_send_data(store_conn, send_buffer)
th.recv_header(store_conn)
# if th.status == 2:
# raise DataError('[-] Error: Remote file %s has no meta data.'
# % (store_serv.group_name + __os_sep__.encode() + remote_file_name))
if th.status != 0:
raise DataError('[-] Error:%d, %s' % (th.status, os.strerror(th.status)))
if th.pkg_len == 0:
ret_dict = {}
meta_buffer, recv_size = tcp_recv_response(store_conn, th.pkg_len)
except:
raise
finally:
self.pool.release(store_conn)
ret_dict = fdfs_unpack_metadata(meta_buffer)
return ret_dict
def _storage_do_append_file(self, tracker_client, store_serv, file_buffer, file_size, upload_type,
appended_filename):
store_conn = self.pool.get_connection()
th = Tracker_header()
appended_filename_len = len(appended_filename)
th.pkg_len = FDFS_PROTO_PKG_LEN_SIZE * 2 + appended_filename_len + file_size
th.cmd = STORAGE_PROTO_CMD_APPEND_FILE
try:
th.send_header(store_conn)
# append_fmt: |-appended_filename_len(8)-file_size(8)-appended_filename(len)
# -filecontent(filesize)-|
append_fmt = '!Q Q %ds' % appended_filename_len
send_buffer = struct.pack(append_fmt, appended_filename_len, file_size, appended_filename)
tcp_send_data(store_conn, send_buffer)
if upload_type == FDFS_UPLOAD_BY_FILENAME:
tcp_send_file(store_conn, file_buffer)
elif upload_type == FDFS_UPLOAD_BY_BUFFER:
tcp_send_data(store_conn, file_buffer)
elif upload_type == FDFS_UPLOAD_BY_FILE:
tcp_send_file_ex(store_conn, file_buffer)
th.recv_header(store_conn)
if th.status != 0:
raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
except:
raise
finally:
self.pool.release(store_conn)
ret_dict = {}
ret_dict['Status'] = 'Append file successed.'
ret_dict['Appender file name'] = store_serv.group_name + __os_sep__.encode() + appended_filename
ret_dict['Appended size'] = appromix(file_size)
ret_dict['Storage IP'] = store_serv.ip_addr
return ret_dict
def storage_append_by_filename(self, tracker_client, store_serv, local_filename, appended_filename):
file_size = os.stat(local_filename).st_size
return self._storage_do_append_file(tracker_client, store_serv, local_filename, file_size,
FDFS_UPLOAD_BY_FILENAME, appended_filename)
def storage_append_by_file(self, tracker_client, store_serv, local_filename, appended_filename):
file_size = os.stat(local_filename).st_size
return self._storage_do_append_file(tracker_client, store_serv, local_filename, file_size, FDFS_UPLOAD_BY_FILE,
appended_filename)
def storage_append_by_buffer(self, tracker_client, store_serv, file_buffer, appended_filename):
file_size = len(file_buffer)
return self._storage_do_append_file(tracker_client, store_serv, file_buffer, file_size, FDFS_UPLOAD_BY_BUFFER,
appended_filename)
def _storage_do_truncate_file(self, tracker_client, store_serv, truncated_filesize, appender_filename):
store_conn = self.pool.get_connection()
th = Tracker_header()
th.cmd = STORAGE_PROTO_CMD_TRUNCATE_FILE
appender_filename_len = len(appender_filename)
th.pkg_len = FDFS_PROTO_PKG_LEN_SIZE * 2 + appender_filename_len
try:
th.send_header(store_conn)
# truncate_fmt:|-appender_filename_len(8)-truncate_filesize(8)
# -appender_filename(len)-|
truncate_fmt = '!Q Q %ds' % appender_filename_len
send_buffer = struct.pack(truncate_fmt, appender_filename_len, truncated_filesize, appender_filename)
tcp_send_data(store_conn, send_buffer)
th.recv_header(store_conn)
if th.status != 0:
raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
except:
raise
finally:
self.pool.release(store_conn)
ret_dict = {}
ret_dict['Status'] = 'Truncate successed.'
ret_dict['Storage IP'] = store_serv.ip_addr
return ret_dict
def storage_truncate_file(self, tracker_client, store_serv, truncated_filesize, appender_filename):
return self._storage_do_truncate_file(tracker_client, store_serv, truncated_filesize, appender_filename)
def _storage_do_modify_file(self, tracker_client, store_serv, upload_type, filebuffer, offset, filesize,
appender_filename):
store_conn = self.pool.get_connection()
th = Tracker_header()
th.cmd = STORAGE_PROTO_CMD_MODIFY_FILE
appender_filename_len = len(appender_filename)
th.pkg_len = FDFS_PROTO_PKG_LEN_SIZE * 3 + appender_filename_len + filesize
try:
th.send_header(store_conn)
# modify_fmt: |-filename_len(8)-offset(8)-filesize(8)-filename(len)-|
modify_fmt = '!Q Q Q %ds' % appender_filename_len
send_buffer = struct.pack(modify_fmt, appender_filename_len, offset, filesize, appender_filename)
tcp_send_data(store_conn, send_buffer)
if upload_type == FDFS_UPLOAD_BY_FILENAME:
upload_size = tcp_send_file(store_conn, filebuffer)
elif upload_type == FDFS_UPLOAD_BY_BUFFER:
tcp_send_data(store_conn, filebuffer)
elif upload_type == FDFS_UPLOAD_BY_FILE:
upload_size = tcp_send_file_ex(store_conn, filebuffer)
th.recv_header(store_conn)
if th.status != 0:
raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
except:
raise
finally:
self.pool.release(store_conn)
ret_dict = {}
ret_dict['Status'] = 'Modify successed.'
ret_dict['Storage IP'] = store_serv.ip_addr
return ret_dict
def storage_modify_by_filename(self, tracker_client, store_serv, filename, offset, filesize, appender_filename):
return self._storage_do_modify_file(tracker_client, store_serv, FDFS_UPLOAD_BY_FILENAME, filename, offset,
filesize, appender_filename)
def storage_modify_by_file(self, tracker_client, store_serv, filename, offset, filesize, appender_filename):
return self._storage_do_modify_file(tracker_client, store_serv, FDFS_UPLOAD_BY_FILE, filename, offset, filesize,
appender_filename)
def storage_modify_by_buffer(self, tracker_client, store_serv, filebuffer, offset, filesize, appender_filename):
return self._storage_do_modify_file(tracker_client, store_serv, FDFS_UPLOAD_BY_BUFFER, filebuffer, offset,
filesize, appender_filename)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# filename: tracker_client.py
import struct
import socket
from datetime import datetime
from fdfs_client.fdfs_protol import *
from fdfs_client.connection import *
from fdfs_client.exceptions import (
FDFSError,
ConnectionError,
ResponseError,
InvaildResponse,
DataError
)
from fdfs_client.utils import *
def parse_storage_status(status_code):
try:
ret = {
FDFS_STORAGE_STATUS_INIT: lambda: 'INIT',
FDFS_STORAGE_STATUS_WAIT_SYNC: lambda: 'WAIT_SYNC',
FDFS_STORAGE_STATUS_SYNCING: lambda: 'SYNCING',
FDFS_STORAGE_STATUS_IP_CHANGED: lambda: 'IP_CHANGED',
FDFS_STORAGE_STATUS_DELETED: lambda: 'DELETED',
FDFS_STORAGE_STATUS_OFFLINE: lambda: 'OFFLINE',
FDFS_STORAGE_STATUS_ONLINE: lambda: 'ONLINE',
FDFS_STORAGE_STATUS_ACTIVE: lambda: 'ACTIVE',
FDFS_STORAGE_STATUS_RECOVERY: lambda: 'RECOVERY'
}[status_code]()
except KeyError:
ret = 'UNKNOW'
return ret
class Storage_info(object):
def __init__(self):
self.status = 0
self.id = ''
self.ip_addr = ''
self.domain_name = ''
self.src_id = ''
self.version = ''
self.join_time = datetime.fromtimestamp(0).isoformat()
self.up_time = datetime.fromtimestamp(0).isoformat()
self.totalMB = ''
self.freeMB = ''
self.upload_prio = 0
self.store_path_count = 0
self.subdir_count_per_path = 0
self.curr_write_path = 0
self.storage_port = 23000
self.storage_http_port = 80
self.alloc_count = 0
self.current_count = 0
self.max_count = 0
self.total_upload_count = 0
self.success_upload_count = 0
self.total_append_count = 0
self.success_append_count = 0
self.total_modify_count = 0
self.success_modify_count = 0
self.total_truncate_count = 0
self.success_truncate_count = 0
self.total_setmeta_count = 0
self.success_setmeta_count = 0
self.total_del_count = 0
self.success_del_count = 0
self.total_download_count = 0
self.success_download_count = 0
self.total_getmeta_count = 0
self.success_getmeta_count = 0
self.total_create_link_count = 0
self.success_create_link_count = 0
self.total_del_link_count = 0
self.success_del_link_count = 0
self.total_upload_bytes = 0
self.success_upload_bytes = 0
self.total_append_bytes = 0
self.success_append_bytes = 0
self.total_modify_bytes = 0
self.success_modify_bytes = 0
self.total_download_bytes = 0
self.success_download_bytes = 0
self.total_sync_in_bytes = 0
self.success_sync_in_bytes = 0
self.total_sync_out_bytes = 0
self.success_sync_out_bytes = 0
self.total_file_open_count = 0
self.success_file_open_count = 0
self.total_file_read_count = 0
self.success_file_read_count = 0
self.total_file_write_count = 0
self.success_file_write_count = 0
self.last_source_sync = datetime.fromtimestamp(0).isoformat()
self.last_sync_update = datetime.fromtimestamp(0).isoformat()
self.last_synced_time = datetime.fromtimestamp(0).isoformat()
self.last_heartbeat_time = datetime.fromtimestamp(0).isoformat()
self.if_trunk_server = ''
# fmt = |-status(1)-ipaddr(16)-domain(128)-srcipaddr(16)-ver(6)-52*8-|
self.fmt = '!B 16s 16s 128s 16s 6s 10Q 4s4s4s 42Q?'
def set_info(self, bytes_stream):
(self.status, self.id, ip_addr, domain_name, self.src_id, version, join_time, up_time, totalMB, freeMB,
self.upload_prio, self.store_path_count, self.subdir_count_per_path, self.curr_write_path, self.storage_port,
self.storage_http_port, self.alloc_count, self.current_count, self.max_count, self.total_upload_count,
self.success_upload_count, self.total_append_count, self.success_append_count, self.total_modify_count,
self.success_modify_count, self.total_truncate_count, self.success_truncate_count, self.total_setmeta_count,
self.success_setmeta_count, self.total_del_count, self.success_del_count, self.total_download_count,
self.success_download_count, self.total_getmeta_count, self.success_getmeta_count,
self.total_create_link_count, self.success_create_link_count, self.total_del_link_count,
self.success_del_link_count, self.total_upload_bytes, self.success_upload_bytes, self.total_append_bytes,
self.total_append_bytes, self.total_modify_bytes, self.success_modify_bytes, self.total_download_bytes,
self.success_download_bytes, self.total_sync_in_bytes, self.success_sync_in_bytes, self.total_sync_out_bytes,
self.success_sync_out_bytes, self.total_file_open_count, self.success_file_open_count,
self.total_file_read_count, self.success_file_read_count, self.total_file_write_count,
self.success_file_write_count, last_source_sync, last_sync_update, last_synced_time, last_heartbeat_time,
self.if_trunk_server,) = struct.unpack(self.fmt, bytes_stream)
try:
self.ip_addr = ip_addr.strip(b'\x00')
self.domain_name = domain_name.strip(b'\x00')
self.version = version.strip(b'\x00')
self.totalMB = appromix(totalMB, FDFS_SPACE_SIZE_BASE_INDEX)
self.freeMB = appromix(freeMB, FDFS_SPACE_SIZE_BASE_INDEX)
except ValueError as e:
raise ResponseError('[-] Error: disk space overrun, can not represented it.')
self.join_time = datetime.fromtimestamp(join_time).isoformat()
self.up_time = datetime.fromtimestamp(up_time).isoformat()
self.last_source_sync = datetime.fromtimestamp(last_source_sync).isoformat()
self.last_sync_update = datetime.fromtimestamp(last_sync_update).isoformat()
self.last_synced_time = datetime.fromtimestamp(last_synced_time).isoformat()
self.last_heartbeat_time = datetime.fromtimestamp(last_heartbeat_time).isoformat()
return True
def __str__(self):
'''Transform to readable string.'''
s = 'Storage information:\n'
s += '\tip_addr = %s (%s)\n' % (self.ip_addr, parse_storage_status(self.status))
s += '\thttp domain = %s\n' % self.domain_name
s += '\tversion = %s\n' % self.version
s += '\tjoin time = %s\n' % self.join_time
s += '\tup time = %s\n' % self.up_time
s += '\ttotal storage = %s\n' % self.totalMB
s += '\tfree storage = %s\n' % self.freeMB
s += '\tupload priority = %d\n' % self.upload_prio
s += '\tstore path count = %d\n' % self.store_path_count
s += '\tsubdir count per path = %d\n' % self.subdir_count_per_path
s += '\tstorage port = %d\n' % self.storage_port
s += '\tstorage HTTP port = %d\n' % self.storage_http_port
s += '\tcurrent write path = %d\n' % self.curr_write_path
s += '\tsource ip_addr = %s\n' % self.ip_addr
s += '\tif_trunk_server = %d\n' % self.if_trunk_server
s += '\ttotal upload count = %ld\n' % self.total_upload_count
s += '\tsuccess upload count = %ld\n' % self.success_upload_count
s += '\ttotal download count = %ld\n' % self.total_download_count
s += '\tsuccess download count = %ld\n' % self.success_download_count
s += '\ttotal append count = %ld\n' % self.total_append_count
s += '\tsuccess append count = %ld\n' % self.success_append_count
s += '\ttotal modify count = %ld\n' % self.total_modify_count
s += '\tsuccess modify count = %ld\n' % self.success_modify_count
s += '\ttotal truncate count = %ld\n' % self.total_truncate_count
s += '\tsuccess truncate count = %ld\n' % self.success_truncate_count
s += '\ttotal delete count = %ld\n' % self.total_del_count
s += '\tsuccess delete count = %ld\n' % self.success_del_count
s += '\ttotal set_meta count = %ld\n' % self.total_setmeta_count
s += '\tsuccess set_meta count = %ld\n' % self.success_setmeta_count
s += '\ttotal get_meta count = %ld\n' % self.total_getmeta_count
s += '\tsuccess get_meta count = %ld\n' % self.success_getmeta_count
s += '\ttotal create link count = %ld\n' % self.total_create_link_count
s += '\tsuccess create link count = %ld\n' % self.success_create_link_count
s += '\ttotal delete link count = %ld\n' % self.total_del_link_count
s += '\tsuccess delete link count = %ld\n' % self.success_del_link_count
s += '\ttotal upload bytes = %ld\n' % self.total_upload_bytes
s += '\tsuccess upload bytes = %ld\n' % self.success_upload_bytes
s += '\ttotal download bytes = %ld\n' % self.total_download_bytes
s += '\tsuccess download bytes = %ld\n' % self.success_download_bytes
s += '\ttotal append bytes = %ld\n' % self.total_append_bytes
s += '\tsuccess append bytes = %ld\n' % self.success_append_bytes
s += '\ttotal modify bytes = %ld\n' % self.total_modify_bytes
s += '\tsuccess modify bytes = %ld\n' % self.success_modify_bytes
s += '\ttotal sync_in bytes = %ld\n' % self.total_sync_in_bytes
s += '\tsuccess sync_in bytes = %ld\n' % self.success_sync_in_bytes
s += '\ttotal sync_out bytes = %ld\n' % self.total_sync_out_bytes
s += '\tsuccess sync_out bytes = %ld\n' % self.success_sync_out_bytes
s += '\ttotal file open count = %ld\n' % self.total_file_open_count
s += '\tsuccess file open count = %ld\n' % self.success_file_open_count
s += '\ttotal file read count = %ld\n' % self.total_file_read_count
s += '\tsuccess file read count = %ld\n' % self.success_file_read_count
s += '\ttotal file write count = %ld\n' % self.total_file_write_count
s += '\tsucess file write count = %ld\n' % self.success_file_write_count
s += '\tlast heartbeat time = %s\n' % self.last_heartbeat_time
s += '\tlast source update = %s\n' % self.last_source_sync
s += '\tlast sync update = %s\n' % self.last_sync_update
s += '\tlast synced time = %s\n' % self.last_synced_time
return s
def get_fmt_size(self):
return struct.calcsize(self.fmt)
class Group_info(object):
def __init__(self):
self.group_name = ''
self.totalMB = ''
self.freeMB = ''
self.trunk_freeMB = ''
self.count = 0
self.storage_port = 0
self.store_http_port = 0
self.active_count = 0
self.curr_write_server = 0
self.store_path_count = 0
self.subdir_count_per_path = 0
self.curr_trunk_file_id = 0
self.fmt = '!%ds 11Q' % (FDFS_GROUP_NAME_MAX_LEN + 1)
return None
def __str__(self):
s = 'Group information:\n'
s += '\tgroup name = %s\n' % self.group_name
s += '\ttotal disk space = %s\n' % self.totalMB
s += '\tdisk free space = %s\n' % self.freeMB
s += '\ttrunk free space = %s\n' % self.trunk_freeMB
s += '\tstorage server count = %d\n' % self.count
s += '\tstorage port = %d\n' % self.storage_port
s += '\tstorage HTTP port = %d\n' % self.store_http_port
s += '\tactive server count = %d\n' % self.active_count
s += '\tcurrent write server index = %d\n' % self.curr_write_server
s += '\tstore path count = %d\n' % self.store_path_count
s += '\tsubdir count per path = %d\n' % self.subdir_count_per_path
s += '\tcurrent trunk file id = %d\n' % self.curr_trunk_file_id
return s
def set_info(self, bytes_stream):
(group_name, totalMB, freeMB, trunk_freeMB, self.count, self.storage_port, self.store_http_port,
self.active_count, self.curr_write_server, self.store_path_count, self.subdir_count_per_path,
self.curr_trunk_file_id) = struct.unpack(self.fmt, bytes_stream)
try:
self.group_name = group_name.strip(b'\x00')
self.freeMB = appromix(freeMB, FDFS_SPACE_SIZE_BASE_INDEX)
self.totalMB = appromix(totalMB, FDFS_SPACE_SIZE_BASE_INDEX)
self.trunk_freeMB = appromix(trunk_freeMB, FDFS_SPACE_SIZE_BASE_INDEX)
except ValueError:
raise DataError('[-] Error disk space overrun, can not represented it.')
def get_fmt_size(self):
return struct.calcsize(self.fmt)
class Tracker_client(object):
'''Class Tracker client.'''
def __init__(self, pool):
self.pool = pool
def tracker_list_servers(self, group_name, storage_ip=None):
'''
List servers in a storage group
'''
conn = self.pool.get_connection()
th = Tracker_header()
ip_len = len(storage_ip) if storage_ip else 0
if ip_len >= IP_ADDRESS_SIZE:
ip_len = IP_ADDRESS_SIZE - 1
th.pkg_len = FDFS_GROUP_NAME_MAX_LEN + ip_len
th.cmd = TRACKER_PROTO_CMD_SERVER_LIST_STORAGE
group_fmt = '!%ds' % FDFS_GROUP_NAME_MAX_LEN
store_ip_addr = storage_ip or ''
storage_ip_fmt = '!%ds' % ip_len
try:
th.send_header(conn)
send_buffer = struct.pack(group_fmt, group_name) + struct.pack(storage_ip_fmt, store_ip_addr)
tcp_send_data(conn, send_buffer)
th.recv_header(conn)
if th.status != 0:
raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
recv_buffer, recv_size = tcp_recv_response(conn, th.pkg_len)
si = Storage_info()
si_fmt_size = si.get_fmt_size()
recv_size = len(recv_buffer)
if recv_size % si_fmt_size != 0:
errinfo = '[-] Error: response size not match, expect: %d, actual: %d' % (th.pkg_len, recv_size)
raise ResponseError(errinfo)
except ConnectionError:
raise
finally:
self.pool.release(conn)
num_storage = recv_size / si_fmt_size
si_list = []
i = 0
while num_storage:
si.set_info(recv_buffer[(i * si_fmt_size): ((i + 1) * si_fmt_size)])
si_list.append(si)
si = Storage_info()
num_storage -= 1
i += 1
ret_dict = {}
ret_dict['Group name'] = group_name
ret_dict['Servers'] = si_list
return ret_dict
def tracker_list_one_group(self, group_name):
conn = self.pool.get_connection()
th = Tracker_header()
th.pkg_len = FDFS_GROUP_NAME_MAX_LEN
th.cmd = TRACKER_PROTO_CMD_SERVER_LIST_ONE_GROUP
# group_fmt: |-group_name(16)-|
group_fmt = '!%ds' % FDFS_GROUP_NAME_MAX_LEN
try:
th.send_header(conn)
send_buffer = struct.pack(group_fmt, group_name)
tcp_send_data(conn, send_buffer)
th.recv_header(conn)
if th.status != 0:
raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
recv_buffer, recv_size = tcp_recv_response(conn, th.pkg_len)
group_info = Group_info()
group_info.set_info(recv_buffer)
except ConnectionError:
raise
finally:
self.pool.release(conn)
return group_info
def tracker_list_all_groups(self):
conn = self.pool.get_connection()
th = Tracker_header()
th.cmd = TRACKER_PROTO_CMD_SERVER_LIST_ALL_GROUPS
try:
th.send_header(conn)
th.recv_header(conn)
if th.status != 0:
raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
recv_buffer, recv_size = tcp_recv_response(conn, th.pkg_len)
except:
raise
finally:
self.pool.release(conn)
gi = Group_info()
gi_fmt_size = gi.get_fmt_size()
if recv_size % gi_fmt_size != 0:
errmsg = '[-] Error: Response size is mismatch, except: %d, actul: %d' % (th.pkg_len, recv_size)
raise ResponseError(errmsg)
num_groups = recv_size / gi_fmt_size
ret_dict = {}
ret_dict['Groups count'] = num_groups
gi_list = []
i = 0
while num_groups:
gi.set_info(recv_buffer[i * gi_fmt_size: (i + 1) * gi_fmt_size])
gi_list.append(gi)
gi = Group_info()
i += 1
num_groups -= 1
ret_dict['Groups'] = gi_list
return ret_dict
def tracker_query_storage_stor_without_group(self):
'''Query storage server for upload, without group name.
Return: Storage_server object'''
conn = self.pool.get_connection()
th = Tracker_header()
th.cmd = TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ONE
try:
th.send_header(conn)
th.recv_header(conn)
if th.status != 0:
raise DataError('[-] Error: %d, %s' % (th.status, os.strerror(th.status)))
recv_buffer, recv_size = tcp_recv_response(conn, th.pkg_len)
if recv_size != TRACKER_QUERY_STORAGE_STORE_BODY_LEN:
errmsg = '[-] Error: Tracker response length is invaild, '
errmsg += 'expect: %d, actual: %d' % (TRACKER_QUERY_STORAGE_STORE_BODY_LEN, recv_size)
raise ResponseError(errmsg)
except ConnectionError:
raise
finally:
self.pool.release(conn)
# recv_fmt |-group_name(16)-ipaddr(16-1)-port(8)-store_path_index(1)|
recv_fmt = '!%ds %ds Q B' % (FDFS_GROUP_NAME_MAX_LEN, IP_ADDRESS_SIZE - 1)
store_serv = Storage_server()
(group_name, ip_addr, store_serv.port, store_serv.store_path_index) = struct.unpack(recv_fmt, recv_buffer)
store_serv.group_name = group_name.strip(b'\x00')
store_serv.ip_addr = ip_addr.strip(b'\x00')
return store_serv
def tracker_query_storage_stor_with_group(self, group_name):
'''Query storage server for upload, based group name.
arguments:
@group_name: string
@Return Storage_server object
'''
conn = self.pool.get_connection()
th = Tracker_header()
th.cmd = TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITH_GROUP_ONE
th.pkg_len = FDFS_GROUP_NAME_MAX_LEN
th.send_header(conn)
group_fmt = '!%ds' % FDFS_GROUP_NAME_MAX_LEN
send_buffer = struct.pack(group_fmt, group_name)
try:
tcp_send_data(conn, send_buffer)
th.recv_header(conn)
if th.status != 0:
raise DataError('Error: %d, %s' % (th.status, os.strerror(th.status)))
recv_buffer, recv_size = tcp_recv_response(conn, th.pkg_len)
if recv_size != TRACKER_QUERY_STORAGE_STORE_BODY_LEN:
errmsg = '[-] Error: Tracker response length is invaild, '
errmsg += 'expect: %d, actual: %d' % (TRACKER_QUERY_STORAGE_STORE_BODY_LEN, recv_size)
raise ResponseError(errmsg)
except ConnectionError:
raise
finally:
self.pool.release(conn)
# recv_fmt: |-group_name(16)-ipaddr(16-1)-port(8)-store_path_index(1)-|
recv_fmt = '!%ds %ds Q B' % (FDFS_GROUP_NAME_MAX_LEN, IP_ADDRESS_SIZE - 1)
store_serv = Storage_server()
(group, ip_addr, store_serv.port, store_serv.store_path_index) = struct.unpack(recv_fmt, recv_buffer)
store_serv.group_name = group.strip(b'\x00')
store_serv.ip_addr = ip_addr.strip(b'\x00')
return store_serv
def _tracker_do_query_storage(self, group_name, filename, cmd):
'''
core of query storage, based group name and filename.
It is useful download, delete and set meta.
arguments:
@group_name: string
@filename: string. remote file_id
@Return: Storage_server object
'''
conn = self.pool.get_connection()
th = Tracker_header()
file_name_len = len(filename)
th.pkg_len = FDFS_GROUP_NAME_MAX_LEN + file_name_len
th.cmd = cmd
th.send_header(conn)
# query_fmt: |-group_name(16)-filename(file_name_len)-|
query_fmt = '!%ds %ds' % (FDFS_GROUP_NAME_MAX_LEN, file_name_len)
send_buffer = struct.pack(query_fmt, group_name, filename)
try:
tcp_send_data(conn, send_buffer)
th.recv_header(conn)
if th.status != 0:
raise DataError('Error: %d, %s' % (th.status, os.strerror(th.status)))
recv_buffer, recv_size = tcp_recv_response(conn, th.pkg_len)
if recv_size != TRACKER_QUERY_STORAGE_FETCH_BODY_LEN:
errmsg = '[-] Error: Tracker response length is invaild, '
errmsg += 'expect: %d, actual: %d' % (th.pkg_len, recv_size)
raise ResponseError(errmsg)
except ConnectionError:
raise
finally:
self.pool.release(conn)
# recv_fmt: |-group_name(16)-ip_addr(16)-port(8)-|
recv_fmt = '!%ds %ds Q' % (FDFS_GROUP_NAME_MAX_LEN, IP_ADDRESS_SIZE - 1)
store_serv = Storage_server()
(group_name, ipaddr, store_serv.port) = struct.unpack(recv_fmt, recv_buffer)
store_serv.group_name = group_name.strip(b'\x00')
store_serv.ip_addr = ipaddr.strip(b'\x00')
return store_serv
def tracker_query_storage_update(self, group_name, filename):
'''
Query storage server to update(delete and set_meta).
'''
return self._tracker_do_query_storage(group_name, filename, TRACKER_PROTO_CMD_SERVICE_QUERY_UPDATE)
def tracker_query_storage_fetch(self, group_name, filename):
'''
Query storage server to download.
'''
return self._tracker_do_query_storage(group_name, filename, TRACKER_PROTO_CMD_SERVICE_QUERY_FETCH_ONE)
#!/usr/bin/env python
# -*- coding = utf-8 -*-
# filename: utils.py
import io
import os
import sys
import stat
import platform
import configparser
SUFFIX = ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']
__os_sep__ = "/" if platform.system() == 'Windows' else os.sep
def appromix(size, base=0):
'''Conver bytes stream size to human-readable format.
Keyword arguments:
size: int, bytes stream size
base: int, suffix index
Return: string
'''
multiples = 1024
if size < 0:
raise ValueError('[-] Error: number must be non-negative.')
if size < multiples:
return '{0:d}{1}'.format(size, SUFFIX[base])
for suffix in SUFFIX[base:]:
if size < multiples:
return '{0:.2f}{1}'.format(size, suffix)
size = size / float(multiples)
raise ValueError('[-] Error: number too big.')
def get_file_ext_name(filename, double_ext=True):
li = filename.split(os.extsep)
if len(li) <= 1:
return ''
else:
if li[-1].find(__os_sep__) != -1:
return ''
if double_ext:
if len(li) > 2:
if li[-2].find(__os_sep__) == -1:
return '%s.%s' % (li[-2], li[-1])
return li[-1]
class Fdfs_ConfigParser(configparser.RawConfigParser):
"""
Extends ConfigParser to allow files without sections.
This is done by wrapping read files and prepending them with a placeholder
section, which defaults to '__config__'
"""
def __init__(self, default_section=None, *args, **kwargs):
configparser.RawConfigParser.__init__(self, *args, **kwargs)
self._default_section = None
self.set_default_section(default_section or '__config__')
def get_default_section(self):
return self._default_section
def set_default_section(self, section):
self.add_section(section)
# move all values from the previous default section to the new one
try:
default_section_items = self.items(self._default_section)
self.remove_section(self._default_section)
except configparser.NoSectionError:
pass
else:
for (key, value) in default_section_items:
self.set(section, key, value)
self._default_section = section
def read(self, filenames):
if isinstance(filenames, str):
filenames = [filenames]
read_ok = []
for filename in filenames:
try:
with open(filename) as fp:
self.readfp(fp)
except IOError:
continue
else:
read_ok.append(filename)
return read_ok
def readfp(self, fp, *args, **kwargs):
stream = io.StringIO()
try:
stream.name = fp.name
except AttributeError:
pass
stream.write('[' + self._default_section + ']\n')
stream.write(fp.read())
stream.seek(0, 0)
return self._read(stream, stream.name)
def write(self, fp):
# Write the items from the default section manually and then remove them
# from the data. They'll be re-added later.
try:
default_section_items = self.items(self._default_section)
self.remove_section(self._default_section)
for (key, value) in default_section_items:
fp.write("{0} = {1}\n".format(key, value))
fp.write("\n")
except configparser.NoSectionError:
pass
configparser.RawConfigParser.write(self, fp)
self.add_section(self._default_section)
for (key, value) in default_section_items:
self.set(self._default_section, key, value)
def _read(self, fp, fpname):
"""Parse a sectioned setup file.
The sections in setup file contains a title line at the top,
indicated by a name in square brackets (`[]'), plus key/value
options lines, indicated by `name: value' format lines.
Continuations are represented by an embedded newline then
leading whitespace. Blank lines, lines beginning with a '#',
and just about everything else are ignored.
"""
cursect = None # None, or a dictionary
optname = None
lineno = 0
e = None # None, or an exception
while True:
line = fp.readline()
if not line:
break
lineno = lineno + 1
# comment or blank line?
if line.strip() == '' or line[0] in '#;':
continue
if line.split(None, 1)[0].lower() == 'rem' and line[0] in "rR":
# no leading whitespace
continue
# continuation line?
if line[0].isspace() and cursect is not None and optname:
value = line.strip()
if value:
cursect[optname] = "%s\n%s" % (cursect[optname], value)
# a section header or option header?
else:
# is it a section header?
mo = self.SECTCRE.match(line)
if mo:
sectname = mo.group('header')
if sectname in self._sections:
cursect = self._sections[sectname]
elif sectname == DEFAULTSECT:
cursect = self._defaults
else:
cursect = self._dict()
cursect['__name__'] = sectname
self._sections[sectname] = cursect
# So sections can't start with a continuation line
optname = None
# no section header in the file?
elif cursect is None:
raise MissingSectionHeaderError(fpname, lineno, line)
# an option line?
else:
mo = self.OPTCRE.match(line)
if mo:
optname, vi, optval = mo.group('option', 'vi', 'value')
if vi in ('=', ':') and ';' in optval:
# ';' is a comment delimiter only if it follows
# a spacing character
pos = optval.find(';')
if pos != -1 and optval[pos - 1].isspace():
optval = optval[:pos]
optval = optval.strip()
# allow empty values
if optval == '""':
optval = ''
optname = self.optionxform(optname.rstrip())
if optname in cursect:
if not isinstance(cursect[optname], list):
cursect[optname] = [cursect[optname]]
cursect[optname].append(optval)
else:
cursect[optname] = optval
else:
# a non-fatal parsing error occurred. set up the
# exception but keep going. the exception will be
# raised at the end of the file and will contain a
# list of all bogus lines
if not e:
e = ParsingError(fpname)
e.append(lineno, repr(line))
# if any parsing errors occurred, raise an exception
if e:
raise e
def split_remote_fileid(remote_file_id):
'''
Splite remote_file_id to (group_name, remote_file_name)
arguments:
@remote_file_id: string
@return tuple, (group_name, remote_file_name)
'''
index = remote_file_id.find(b'/')
if -1 == index:
return None
return (remote_file_id[0:index], remote_file_id[(index + 1):])
def fdfs_check_file(filename):
ret = True
errmsg = ''
if not os.path.isfile(filename):
ret = False
errmsg = '[-] Error: %s is not a file.' % filename
elif not stat.S_ISREG(os.stat(filename).st_mode):
ret = False
errmsg = '[-] Error: %s is not a regular file.' % filename
return (ret, errmsg)
if __name__ == '__main__':
print(get_file_ext_name('/bc.tar.gz'))
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论