[Level 3] More decorator samples.
There is a good website to demo decorator sample codes. here
regards,
Stanley Huang
'''
Usage:
@deprecated
def func(): ...
'''
class deprecated:
def __init__(self, func):
self.func = func
self.__name__ = self.func.__name__
self.__doc__ = self.func.__doc__
self.__dict__.update(self.func.__dict__)
pass
"""This is a decorator which can be used to mark functions
as deprecated. It will result in a warning being emitted
when the function is used."""
def __call__(self, *args, **kwargs):
msg = "Call to deprecated function %s()." % self.func.__name__
## Suppressing Warnings
warnings.simplefilter("ignore")
warnings.warn(msg, category=DeprecationWarning)
## Delete ignore filter
del warnings.filters[0]
(tmpDebug2Console, libcommon.bDebug2Console) = (libcommon.bDebug2Console, False)
libcommon.debug(msg, logfile='deprecated.log')
libcommon.bDebug2Console = tmpDebug2Console
return self.func(*args, **kwargs)
'''
Usage:
#{case 1}
libdecorator.func_state_flag_default = True/False/None
libdecorator.func_state_flag_test = True/False/None
libdecorator.func_state_flag = True/False/None
@funcState()
@funcState(func_set='test')
@funcState(func_set='test', skip_return_value=True)
def func(): ...
#{case 2}
@funcEnabled
def func(): ...
#{case 3}
@funcDisabled
def func(): ...
#{case 4}
@funcSkipped(True/False/...)
def func(): ...
'''
#global func_state_flag
#func_state_flag=None
class funcState:
def __init__(self, func_set='default', skip_return_value=True):
self.func_set = func_set
self.skip_return_value = skip_return_value
def __call__(self, func):
if 'func_state_flag_%s' % self.func_set in locals():
self.func_state_flag = locals()['func_state_flag_%s' % self.func_set]
elif 'func_state_flag_%s' % self.func_set in globals():
self.func_state_flag = globals()['func_state_flag_%s' % self.func_set]
elif 'func_state_flag' in locals():
self.func_state_flag = locals()['func_state_flag']
elif 'func_state_flag' in globals():
self.func_state_flag = globals()['func_state_flag']
else:
self.func_state_flag = True
self.func_enabled = funcEnabled()
self.func_disabled = funcDisabled(func.__name__)
self.func_skipped = funcSkipped(self.skip_return_value)
funcState = {True: self.func_enabled,
False: self.func_disabled,
None: self.func_skipped
}[self.func_state_flag]
@funcState
@functools.wraps(func)
def wrapped_f(*args, **kwargs):
return func(*args, **kwargs)
# wrapped_f.__name__ = func.__name__
# wrapped_f.__doc__ = func.__doc__
# wrapped_f.__dict__.update(func.__dict__)
return wrapped_f
class funcEnabled:
def __init__(self):
pass
"This decorator enables the provided function, and does nothing."
def __call__(self, func):
return func
class funcDisabled:
def __init__(self, func_name=None):
self.func_name = func_name
pass
"This decorator disables the provided function, and does nothing."
def __call__(self, func):
func_name = self.func_name if self.func_name else func.__name__
assert False, 'function %s() is disabled!' % func_name
class funcSkipped:
def __init__(self, return_value=True):
self.return_value = return_value
pass
"This decorator skip the provided function, and return the specific value."
def __call__(self, func):
@functools.wraps(func)
def wrapped_f(*args, **kwargs):
return self.return_value
# wrapped_f.__name__ = func.__name__
# wrapped_f.__doc__ = func.__doc__
# wrapped_f.__dict__.update(func.__dict__)
return wrapped_f
'''
Usage:
#{case 1}
libdecorator.bIgnoreYN = True/False
self.bIgnoreYN = True/False
@yn('Are these correct?', continue_flag='yes', exit_program=True, return_type=(yn/tf/value), continue_message='continue message!', exit_message='exit message!')
def test():
print 'test()'
## return True/'yes' for @libdecorator.yn()
return True/'yes'
#{case 2}
class ctest:
def __init__(self):
self.bIgnoreYN = False
pass
@yn('Are these correct?', continue_flag='yes', exit_program=True, return_type=(yn/tf/value), continue_message='continue message!', exit_message='exit message!')
def test(self):
print 'ctest.test()'
## return True/'yes' for @libdecorator.yn()
return True/'yes'
'''
class yn:
def __init__(self, prompt, continue_flag='yes', exit_program=True, return_type='value', continue_message=None, exit_message=None):
self.prompt = prompt
self.continue_flag = continue_flag
self.exit_program = exit_program
self.return_type = return_type
self.continue_message = continue_message
self.exit_message = exit_message
def __call__(self, func):
yes = 'YES' if self.continue_flag.lower() == 'yes' else 'yes'
no = 'NO' if self.continue_flag.lower() == 'no' else 'no'
@functools.wraps(func)
def wrapped_f(*args, **kwargs):
sReturn = func(*args, **kwargs)
## check attribute of instance
if args and isinstance(args[0], object) and hasattr(args[0], 'bIgnoreYN'):
o = args[0]
bIgnoreYN = o.bIgnoreYN
else:
## check global/local variable
if 'bIgnoreYN' in locals():
bIgnoreYN = locals()['bIgnoreYN']
elif 'bIgnoreYN' in globals():
bIgnoreYN = globals()['bIgnoreYN']
else:
bIgnoreYN = False
## filter ignore flag and set default to False
if bIgnoreYN not in (True, False):
bIgnoreYN = False
while not bIgnoreYN:
bh = libinterrupt.BreakHandler(-1)
bh.enable()
try:
yn = raw_input('%s [%s/%s]: ' % (self.prompt, yes, no))
except EOFError:
yn = None
bh.disable()
if yn.lower() in ('yes', 'no'):
if yn.lower() == self.continue_flag.lower():
if self.continue_message: print self.continue_message
else:
if self.exit_message: print self.exit_message
if self.exit_program: exit(1)
break
else:
print 'Only accept %s/%s!' % (yes, no)
if bIgnoreYN:
return sReturn
else:
if self.return_type.lower() == 'yn':
sReturn = yn.lower()
elif self.return_type.lower() == 'tf':
sReturn = True if yn.lower() == self.continue_flag else False
elif self.return_type.lower() == 'value':
pass
else:
assert False, 'Error return type(%s)!' % self.return_type
return sReturn
# wrapped_f.__name__ = func.__name__
# wrapped_f.__doc__ = func.__doc__
# wrapped_f.__dict__.update(func.__dict__)
return wrapped_f
'''
Usage:
@checkUnixPassword()
@checkUnixPassword('success_message', 'fail_message', 'admin')
def test(): ...
'''
class checkUnixPassword:
def __init__(self, success_message=None, fail_message=None, user=None):
self.success_message = success_message
self.fail_message = fail_message
self.user = user if user else libresourcemanager.getAccountName()
def __call__(self, func):
@functools.wraps(func)
def wrapped_f(*args, **kwargs):
passwd = getpass.getpass("Please enter %s's password: " % self.user).rstrip()
## get password field from shadow file
sp = libcommon.shadowPassword()
sp.setPasswordField(self.user)
## get password from crypt algorithm
up = libcommon.unixPasswordEncryptor(passwd, sp.getPasswordEncryptType(), sp.getPasswordSalt())
if not sp.getPasswordField(self.user) == up.getPasswordField():
if self.fail_message: print self.fail_message
exit(1)
else:
if self.success_message: print self.success_message
return func(*args, **kwargs)
# wrapped_f.__name__ = func.__name__
# wrapped_f.__doc__ = func.__doc__
# wrapped_f.__dict__.update(func.__dict__)
return wrapped_f
'''
Usage:
@checkCLIPassword()
@checkCLIPassword('success_message', 'fail_message')
def test(): ...
'''
class checkCLIPassword:
def __init__(self, success_message=None, fail_message=None):
self.success_message = success_message
self.fail_message = fail_message
self.ENABLE_PASSWORD_FLAG_FILE = '/tmp/enable_password.done'
def getEncyptCLIPasswd(self):
cmd = 'head -1 %s' % self.ENABLE_PASSWORD_FLAG_FILE
return os.popen(cmd).read().strip()
def encryptPasswd(self, plain_text):
return hashlib.sha1(plain_text).hexdigest()
def isValidCLIPasswd(self, plain_text):
return self.getEncyptCLIPasswd() == self.encryptPasswd(plain_text)
def __call__(self, func):
@functools.wraps(func)
def wrapped_f(*args, **kwargs):
passwd = getpass.getpass("Please enter cli password: ").rstrip()
if not self.isValidCLIPasswd(passwd):
if self.fail_message: print self.fail_message
exit(1)
else:
if self.success_message: print self.success_message
return func(*args, **kwargs)
# wrapped_f.__name__ = func.__name__
# wrapped_f.__doc__ = func.__doc__
# wrapped_f.__dict__.update(func.__dict__)
return wrapped_f
'''
libdecorator.pek2c_enable_flag = True/False
Usage:
@pek2c()
@pek2c(before_after='before/after', message='message')
@pek2c(before_after='before/after', message='message', confirm_message='confirm_message')
def test(): ...
'''
class pek2c:
def __init__(self, before_after='after', message='continue', confirm_message=''):
self.before_after = before_after
self.confirm_message = confirm_message
key = 'Enter "%s"' % self.confirm_message if self.confirm_message else 'Press Enter key'
self.message = '%s to %s... ' % (key, message)
def runPek2c(self, message=''):
if 'pek2c_enable_flag' in locals():
pek2c_enable_flag = locals()['pek2c_enable_flag']
elif 'pek2c_enable_flag' in globals():
pek2c_enable_flag = globals()['pek2c_enable_flag']
else:
pek2c_enable_flag = True
if pek2c_enable_flag:
while True:
bh = libinterrupt.BreakHandler(-1)
bh.enable()
try:
x = raw_input(message)
except EOFError:
x = None
bh.disable()
if self.confirm_message and x != self.confirm_message:
continue
else:
break
def __call__(self, func):
@functools.wraps(func)
def wrapped_f(*args, **kwargs):
if self.before_after == 'before': self.runPek2c(self.message)
sReturn = func(*args, **kwargs)
if self.before_after == 'after' : self.runPek2c(self.message)
return sReturn
# wrapped_f.__name__ = func.__name__
# wrapped_f.__doc__ = func.__doc__
# wrapped_f.__dict__.update(func.__dict__)
return wrapped_f
'''
Usage:
@dumpArgs
def test(a, b, c): ...
'''
class dumpArgs:
def __init__(self, func):
self.func = func
"This decorator dumps out the arguments passed to a function before calling it"
self.argnames = func.func_code.co_varnames[:func.func_code.co_argcount]
self.fname = func.func_name
self.__name__ = func.__name__
self.__doc__ = func.__doc__
self.__dict__.update(func.__dict__)
def __call__(self, *args, **kwargs):
print self.fname, ":", ', '.join(
'%s=%r' % entry
for entry in zip(self.argnames,args) + kwargs.items())
return self.func(*args, **kwargs)
'''
Usage:
@retry()
@retry(libdecorator.RetryException, nRetry=3, nDelay=3, nBackOff=2)
def test(a, b, c):
raise retryException('retry')
'''
class RetryException(Exception):
def __init__(self, message=''):
self.__message = message
def __str__(self):
return self.__message
pass
class retry:
def __init__(self, ExceptionToCheck=RetryException, nRetry=3, nDelay=1, nBackOff=1):
self.ExceptionToCheck = ExceptionToCheck
self.nRetry = nRetry
self.nDelay = nDelay
self.nBackOff = nBackOff
def __call__(self, func):
@functools.wraps(func)
def wrapped_f(*args, **kwargs):
for i in range(1, self.nRetry+1):
try:
sReturn = func(*args, **kwargs)
return sReturn
except self.ExceptionToCheck, e:
if i < self.nRetry:
print '%s %s time but fail, wait %s sec to continue' % ('Run' if i == 1 else 'Retry', i, self.nDelay)
time.sleep(self.nDelay)
self.nDelay = self.nDelay * self.nBackOff
if i == self.nRetry:
raise e
except Exception, e:
raise e
else:
break
# wrapped_f.__name__ = func.__name__
# wrapped_f.__doc__ = func.__doc__
# wrapped_f.__dict__.update(func.__dict__)
return wrapped_f
'''
Usage:
from threading import Lock
my_lock = Lock()
@synchronized(my_lock)
def test(a, b, c): ...
'''
class synchronized:
def __init__(self, lock):
self.lock = lock
def __call__(self, func):
@functools.wraps(func)
def wrapped_f(*args, **kw):
self.lock.acquire()
try:
return func(*args, **kw)
finally:
self.lock.release()
# wrapped_f.__name__ = func.__name__
# wrapped_f.__doc__ = func.__doc__
# wrapped_f.__dict__.update(func.__dict__)
return wrapped_f
'''
File lock class for fileLockSynchronized decorator.
'''
class FileLock(object):
""" A file locking mechanism that has context-manager support so
you can use it in a with statement. This should be relatively cross
compatible as it doesn't rely on msvcrt or fcntl for the locking.
"""
def __init__(self, lockfile, timeout=10, delay=.05, max_lifetime=60):
""" Prepare the file locker. Specify the file to lock and optionally
the maximum timeout and the delay between each attempt to lock.
and the maximum life time of the lock.
"""
self.is_locked = False
self.LOCK_FOLDER = "/tmp/LOCKS"
if not os.path.exists(self.LOCK_FOLDER):
os.makedirs(self.LOCK_FOLDER)
self.lockfile = "%s/%s" % (self.LOCK_FOLDER, lockfile)
self.timeout = timeout
self.delay = delay
self.max_lifetime = max_lifetime
self.purgeUnusedLockFile()
def purgeUnusedLockFile(self):
if os.path.exists(self.lockfile):
content = open(self.lockfile, 'r').readlines()
if content:
content_array = content[0].rstrip().split(',')
pid = content_array[0]
print pid
if len(content_array) > 1:
expired_timestamp = content_array[1]
else:
expired_timestamp = 0
if int(os.popen('ls -al /proc/%s/fd/* 2>/dev/null | grep -c " -> %s$"' %(pid, self.lockfile)).read()) > 0 \
and int(expired_timestamp) > int(libcommon.TimeUtils().getUTCTimestamp()):
return False
else:
print int(expired_timestamp)
print int(libcommon.TimeUtils().getUTCTimestamp())
print int(expired_timestamp) > int(libcommon.TimeUtils().getUTCTimestamp())
print 'purge unused lock file (%s)' % self.lockfile
os.unlink(self.lockfile)
return True
else:
return False
def acquire(self):
""" Acquire the lock, if possible. If the lock is in use, it check again
every `wait` seconds. It does this until it either gets the lock or
exceeds `timeout` number of seconds, in which case it throws
an exception.
"""
start_time = time.time()
while True:
try:
self.fd = os.open(self.lockfile, os.O_CREAT|os.O_EXCL|os.O_RDWR)
os.write(self.fd, '%s,%s' % (str(os.getpid()), str(int(libcommon.TimeUtils().getUTCTimestamp())+self.max_lifetime)))
break;
except OSError as e:
if e.errno != errno.EEXIST:
raise
if (time.time() - start_time) >= self.timeout:
raise FileLockException("Timeout occured.")
time.sleep(self.delay)
self.is_locked = True
def release(self):
""" Get rid of the lock by deleting the lockfile.
When working in a `with` statement, this gets automatically
called at the end.
"""
if self.is_locked:
os.close(self.fd)
os.unlink(self.lockfile)
self.is_locked = False
def __enter__(self):
""" Activated when used in the with statement.
Should automatically acquire a lock to be used in the with block.
"""
if not self.is_locked:
self.acquire()
return self
def __exit__(self, type, value, traceback):
""" Activated at the end of the with statement.
It automatically releases the lock if it isn't locked.
"""
if self.is_locked:
self.release()
def __del__(self):
""" Make sure that the FileLock instance doesn't leave a lockfile
lying around.
"""
self.release()
'''
Usage:
@fileLockSynchronized('my_lock1')
@fileLockSynchronized('my_lock2', 10, 1, 30)
def test(a, b, c): ...
'''
class fileLockSynchronized:
def __init__(self, lockfile, timeout=10, delay=1, max_lifetime=60):
self.lockfile = lockfile
self.timeout = timeout
self.delay = delay
self.max_lifetime = max_lifetime
def __call__(self, func):
@functools.wraps(func)
def wrapped_f(*args, **kw):
FL = FileLock(self.lockfile, self.timeout, self.delay, self.max_lifetime)
try:
FL.acquire()
return func(*args, **kw)
finally:
FL.release()
# wrapped_f.__name__ = func.__name__
# wrapped_f.__doc__ = func.__doc__
# wrapped_f.__dict__.update(func.__dict__)
return wrapped_f
'''
Usage:
@saveCurrentWorkingDirectory
def func(): ...
'''
class saveCurrentWorkingDirectory:
def __init__(self, func):
self.func = func
self.__name__ = self.func.__name__
self.__doc__ = self.func.__doc__
self.__dict__.update(self.func.__dict__)
pass
def __call__(self, *args, **kwargs):
cwd = os.getcwd()
sReturn = self.func(*args, **kwargs)
os.chdir(cwd)
return sReturn
'''
Usage:
@singleton
class cls: ...
'''
'''
def singleton(cls):
instances = {}
def getInstance(*args, **kwargs):
if cls not in instances:
instances[cls] = cls(*args, **kwargs)
return instances[cls]
return getInstance
'''
class singleton:
def __init__(self, cls):
self.instances = {}
self.cls = cls
pass
def __call__(self, *args, **kwargs):
if self.cls not in self.instances:
self.instances[self.cls] = self.cls(*args, **kwargs)
return self.instances[self.cls]
Wish this helps.
regards,
Stanley Huang
Comments
Post a Comment