小编给大家分享一下python如何实现查询bucket已用量脚本,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
目前仅支持ceph的s3方案,具体配置看说明
# -*- coding: utf-8 -*-
import requests
import json
from email.utils import formatdate
import hmac
py3k = False
from hashlib import sha1 as sha
try:
from urlparse import urlparse
from base64 import encodestring
except:
py3k = True
from urllib.parse import urlparse
from base64 import encodebytes as encodestring
class AuthBase(object):
"""Base class that all auth implementations derive from"""
def __call__(self, r):
raise NotImplementedError('Auth hooks must be callable.')
class S3Auth(AuthBase):
"""Attaches AWS Authentication to the given Request object."""
service_base_url = 's3.amazonaws.com'
# List of Query String Arguments of Interest
special_params = [
'acl', 'location', 'logging', 'partNumber', 'policy', 'requestPayment',
'torrent', 'versioning', 'versionId', 'versions', 'website', 'uploads',
'uploadId', 'response-content-type', 'response-content-language',
'response-expires', 'response-cache-control', 'delete', 'lifecycle',
'response-content-disposition', 'response-content-encoding'
]
def __init__(self, access_key, secret_key, service_url=None):
if service_url:
self.service_base_url = service_url
self.access_key = str(access_key)
self.secret_key = str(secret_key)
self.au =""
def __call__(self, r):
# Create date header if it is not created yet.
if not 'date' in r.headers and not 'x-amz-date' in r.headers:
r.headers['date'] = formatdate(
timeval=None,
localtime=False,
usegmt=True)
signature = self.get_signature(r)
if py3k:
signature = signature.decode('utf-8')
r.headers['Authorization'] = 'AWS %s:%s' % (self.access_key, signature)
self.au = r.headers
# print self.au
return r
def get_signature(self, r):
canonical_string = self.get_canonical_string(
r.url, r.headers, r.method)
if py3k:
key = self.secret_key.encode('utf-8')
msg = canonical_string.encode('utf-8')
else:
key = self.secret_key
msg = canonical_string
h = hmac.new(key, msg, digestmod=sha)
return encodestring(h.digest()).strip()
def get_canonical_string(self, url, headers, method):
parsedurl = urlparse(url)
objectkey = parsedurl.path[1:]
query_args = sorted(parsedurl.query.split('&'))
bucket = parsedurl.netloc[:-len(self.service_base_url)]
if len(bucket) > 1:
# remove last dot
bucket = bucket[:-1]
interesting_headers = {
'content-md5': '',
'content-type': '',
'date': ''}
for key in headers:
lk = key.lower()
try:
lk = lk.decode('utf-8')
except:
pass
if headers[key] and (lk in interesting_headers.keys() or lk.startswith('x-amz-')):
interesting_headers[lk] = headers[key].strip()
# If x-amz-date is used it supersedes the date header.
if not py3k:
if 'x-amz-date' in interesting_headers:
interesting_headers['date'] = ''
else:
if 'x-amz-date' in interesting_headers:
interesting_headers['date'] = ''
buf = '%s\n' % method
for key in sorted(interesting_headers.keys()):
val = interesting_headers[key]
if key.startswith('x-amz-'):
buf += '%s:%s\n' % (key, val)
else:
buf += '%s\n' % val
# append the bucket if it exists
if bucket != '':
buf += '/%s' % bucket
# add the objectkey. even if it doesn't exist, add the slash
buf += '/%s' % objectkey
params_found = False
# handle special query string arguments
for q in query_args:
k = q.split('=')[0]
if k in self.special_params:
if params_found:
buf += '&%s' % q
else:
buf += '?%s' % q
params_found = True
return buf
class S3Admin():
def __init__(self):
self.access_key = '' #填access_key
self.secret_key = '' #填secret_key
self.endpoint = 's3.ceph.work' #填endpoint
def get_bucket_usage(self,bucketname):
#url = 'http://%s/%s/' % (self.endpoint, bucketname) #path style
url = 'http://%s.%s/' % (bucketname,self.endpoint) #virtual hosted styple
r = requests.head(url, auth=S3Auth(self.access_key, self.secret_key, self.endpoint))
print r.headers
return r.headers
s3client = S3Admin()
bucket_name= 'xxx' #替换成相应的bucket名称
result = s3client.get_bucket_usage(bucket_name)
print 'objects_num= %s , total_Bytes_Used= %s ' % (result['X-RGW-Object-Count'],result['X-RGW-Bytes-Used'])
#注意 objects_num 为当前bucket内的object数量,total_Bytes_Used为当前bucket内的已用容量(单位为Byte)
以上是“python如何实现查询bucket已用量脚本”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注天达云行业资讯频道!