[Level 3] More decorator samples.
There is a good website to demo decorator sample codes. here
regards,
Stanley Huang
''' Usage: @deprecated def func(): ... ''' class deprecated: def __init__(self, func): self.func = func self.__name__ = self.func.__name__ self.__doc__ = self.func.__doc__ self.__dict__.update(self.func.__dict__) pass """This is a decorator which can be used to mark functions as deprecated. It will result in a warning being emitted when the function is used.""" def __call__(self, *args, **kwargs): msg = "Call to deprecated function %s()." % self.func.__name__ ## Suppressing Warnings warnings.simplefilter("ignore") warnings.warn(msg, category=DeprecationWarning) ## Delete ignore filter del warnings.filters[0] (tmpDebug2Console, libcommon.bDebug2Console) = (libcommon.bDebug2Console, False) libcommon.debug(msg, logfile='deprecated.log') libcommon.bDebug2Console = tmpDebug2Console return self.func(*args, **kwargs) ''' Usage: #{case 1} libdecorator.func_state_flag_default = True/False/None libdecorator.func_state_flag_test = True/False/None libdecorator.func_state_flag = True/False/None @funcState() @funcState(func_set='test') @funcState(func_set='test', skip_return_value=True) def func(): ... #{case 2} @funcEnabled def func(): ... #{case 3} @funcDisabled def func(): ... #{case 4} @funcSkipped(True/False/...) def func(): ... ''' #global func_state_flag #func_state_flag=None class funcState: def __init__(self, func_set='default', skip_return_value=True): self.func_set = func_set self.skip_return_value = skip_return_value def __call__(self, func): if 'func_state_flag_%s' % self.func_set in locals(): self.func_state_flag = locals()['func_state_flag_%s' % self.func_set] elif 'func_state_flag_%s' % self.func_set in globals(): self.func_state_flag = globals()['func_state_flag_%s' % self.func_set] elif 'func_state_flag' in locals(): self.func_state_flag = locals()['func_state_flag'] elif 'func_state_flag' in globals(): self.func_state_flag = globals()['func_state_flag'] else: self.func_state_flag = True self.func_enabled = funcEnabled() self.func_disabled = funcDisabled(func.__name__) self.func_skipped = funcSkipped(self.skip_return_value) funcState = {True: self.func_enabled, False: self.func_disabled, None: self.func_skipped }[self.func_state_flag] @funcState @functools.wraps(func) def wrapped_f(*args, **kwargs): return func(*args, **kwargs) # wrapped_f.__name__ = func.__name__ # wrapped_f.__doc__ = func.__doc__ # wrapped_f.__dict__.update(func.__dict__) return wrapped_f class funcEnabled: def __init__(self): pass "This decorator enables the provided function, and does nothing." def __call__(self, func): return func class funcDisabled: def __init__(self, func_name=None): self.func_name = func_name pass "This decorator disables the provided function, and does nothing." def __call__(self, func): func_name = self.func_name if self.func_name else func.__name__ assert False, 'function %s() is disabled!' % func_name class funcSkipped: def __init__(self, return_value=True): self.return_value = return_value pass "This decorator skip the provided function, and return the specific value." def __call__(self, func): @functools.wraps(func) def wrapped_f(*args, **kwargs): return self.return_value # wrapped_f.__name__ = func.__name__ # wrapped_f.__doc__ = func.__doc__ # wrapped_f.__dict__.update(func.__dict__) return wrapped_f ''' Usage: #{case 1} libdecorator.bIgnoreYN = True/False self.bIgnoreYN = True/False @yn('Are these correct?', continue_flag='yes', exit_program=True, return_type=(yn/tf/value), continue_message='continue message!', exit_message='exit message!') def test(): print 'test()' ## return True/'yes' for @libdecorator.yn() return True/'yes' #{case 2} class ctest: def __init__(self): self.bIgnoreYN = False pass @yn('Are these correct?', continue_flag='yes', exit_program=True, return_type=(yn/tf/value), continue_message='continue message!', exit_message='exit message!') def test(self): print 'ctest.test()' ## return True/'yes' for @libdecorator.yn() return True/'yes' ''' class yn: def __init__(self, prompt, continue_flag='yes', exit_program=True, return_type='value', continue_message=None, exit_message=None): self.prompt = prompt self.continue_flag = continue_flag self.exit_program = exit_program self.return_type = return_type self.continue_message = continue_message self.exit_message = exit_message def __call__(self, func): yes = 'YES' if self.continue_flag.lower() == 'yes' else 'yes' no = 'NO' if self.continue_flag.lower() == 'no' else 'no' @functools.wraps(func) def wrapped_f(*args, **kwargs): sReturn = func(*args, **kwargs) ## check attribute of instance if args and isinstance(args[0], object) and hasattr(args[0], 'bIgnoreYN'): o = args[0] bIgnoreYN = o.bIgnoreYN else: ## check global/local variable if 'bIgnoreYN' in locals(): bIgnoreYN = locals()['bIgnoreYN'] elif 'bIgnoreYN' in globals(): bIgnoreYN = globals()['bIgnoreYN'] else: bIgnoreYN = False ## filter ignore flag and set default to False if bIgnoreYN not in (True, False): bIgnoreYN = False while not bIgnoreYN: bh = libinterrupt.BreakHandler(-1) bh.enable() try: yn = raw_input('%s [%s/%s]: ' % (self.prompt, yes, no)) except EOFError: yn = None bh.disable() if yn.lower() in ('yes', 'no'): if yn.lower() == self.continue_flag.lower(): if self.continue_message: print self.continue_message else: if self.exit_message: print self.exit_message if self.exit_program: exit(1) break else: print 'Only accept %s/%s!' % (yes, no) if bIgnoreYN: return sReturn else: if self.return_type.lower() == 'yn': sReturn = yn.lower() elif self.return_type.lower() == 'tf': sReturn = True if yn.lower() == self.continue_flag else False elif self.return_type.lower() == 'value': pass else: assert False, 'Error return type(%s)!' % self.return_type return sReturn # wrapped_f.__name__ = func.__name__ # wrapped_f.__doc__ = func.__doc__ # wrapped_f.__dict__.update(func.__dict__) return wrapped_f ''' Usage: @checkUnixPassword() @checkUnixPassword('success_message', 'fail_message', 'admin') def test(): ... ''' class checkUnixPassword: def __init__(self, success_message=None, fail_message=None, user=None): self.success_message = success_message self.fail_message = fail_message self.user = user if user else libresourcemanager.getAccountName() def __call__(self, func): @functools.wraps(func) def wrapped_f(*args, **kwargs): passwd = getpass.getpass("Please enter %s's password: " % self.user).rstrip() ## get password field from shadow file sp = libcommon.shadowPassword() sp.setPasswordField(self.user) ## get password from crypt algorithm up = libcommon.unixPasswordEncryptor(passwd, sp.getPasswordEncryptType(), sp.getPasswordSalt()) if not sp.getPasswordField(self.user) == up.getPasswordField(): if self.fail_message: print self.fail_message exit(1) else: if self.success_message: print self.success_message return func(*args, **kwargs) # wrapped_f.__name__ = func.__name__ # wrapped_f.__doc__ = func.__doc__ # wrapped_f.__dict__.update(func.__dict__) return wrapped_f ''' Usage: @checkCLIPassword() @checkCLIPassword('success_message', 'fail_message') def test(): ... ''' class checkCLIPassword: def __init__(self, success_message=None, fail_message=None): self.success_message = success_message self.fail_message = fail_message self.ENABLE_PASSWORD_FLAG_FILE = '/tmp/enable_password.done' def getEncyptCLIPasswd(self): cmd = 'head -1 %s' % self.ENABLE_PASSWORD_FLAG_FILE return os.popen(cmd).read().strip() def encryptPasswd(self, plain_text): return hashlib.sha1(plain_text).hexdigest() def isValidCLIPasswd(self, plain_text): return self.getEncyptCLIPasswd() == self.encryptPasswd(plain_text) def __call__(self, func): @functools.wraps(func) def wrapped_f(*args, **kwargs): passwd = getpass.getpass("Please enter cli password: ").rstrip() if not self.isValidCLIPasswd(passwd): if self.fail_message: print self.fail_message exit(1) else: if self.success_message: print self.success_message return func(*args, **kwargs) # wrapped_f.__name__ = func.__name__ # wrapped_f.__doc__ = func.__doc__ # wrapped_f.__dict__.update(func.__dict__) return wrapped_f ''' libdecorator.pek2c_enable_flag = True/False Usage: @pek2c() @pek2c(before_after='before/after', message='message') @pek2c(before_after='before/after', message='message', confirm_message='confirm_message') def test(): ... ''' class pek2c: def __init__(self, before_after='after', message='continue', confirm_message=''): self.before_after = before_after self.confirm_message = confirm_message key = 'Enter "%s"' % self.confirm_message if self.confirm_message else 'Press Enter key' self.message = '%s to %s... ' % (key, message) def runPek2c(self, message=''): if 'pek2c_enable_flag' in locals(): pek2c_enable_flag = locals()['pek2c_enable_flag'] elif 'pek2c_enable_flag' in globals(): pek2c_enable_flag = globals()['pek2c_enable_flag'] else: pek2c_enable_flag = True if pek2c_enable_flag: while True: bh = libinterrupt.BreakHandler(-1) bh.enable() try: x = raw_input(message) except EOFError: x = None bh.disable() if self.confirm_message and x != self.confirm_message: continue else: break def __call__(self, func): @functools.wraps(func) def wrapped_f(*args, **kwargs): if self.before_after == 'before': self.runPek2c(self.message) sReturn = func(*args, **kwargs) if self.before_after == 'after' : self.runPek2c(self.message) return sReturn # wrapped_f.__name__ = func.__name__ # wrapped_f.__doc__ = func.__doc__ # wrapped_f.__dict__.update(func.__dict__) return wrapped_f ''' Usage: @dumpArgs def test(a, b, c): ... ''' class dumpArgs: def __init__(self, func): self.func = func "This decorator dumps out the arguments passed to a function before calling it" self.argnames = func.func_code.co_varnames[:func.func_code.co_argcount] self.fname = func.func_name self.__name__ = func.__name__ self.__doc__ = func.__doc__ self.__dict__.update(func.__dict__) def __call__(self, *args, **kwargs): print self.fname, ":", ', '.join( '%s=%r' % entry for entry in zip(self.argnames,args) + kwargs.items()) return self.func(*args, **kwargs) ''' Usage: @retry() @retry(libdecorator.RetryException, nRetry=3, nDelay=3, nBackOff=2) def test(a, b, c): raise retryException('retry') ''' class RetryException(Exception): def __init__(self, message=''): self.__message = message def __str__(self): return self.__message pass class retry: def __init__(self, ExceptionToCheck=RetryException, nRetry=3, nDelay=1, nBackOff=1): self.ExceptionToCheck = ExceptionToCheck self.nRetry = nRetry self.nDelay = nDelay self.nBackOff = nBackOff def __call__(self, func): @functools.wraps(func) def wrapped_f(*args, **kwargs): for i in range(1, self.nRetry+1): try: sReturn = func(*args, **kwargs) return sReturn except self.ExceptionToCheck, e: if i < self.nRetry: print '%s %s time but fail, wait %s sec to continue' % ('Run' if i == 1 else 'Retry', i, self.nDelay) time.sleep(self.nDelay) self.nDelay = self.nDelay * self.nBackOff if i == self.nRetry: raise e except Exception, e: raise e else: break # wrapped_f.__name__ = func.__name__ # wrapped_f.__doc__ = func.__doc__ # wrapped_f.__dict__.update(func.__dict__) return wrapped_f ''' Usage: from threading import Lock my_lock = Lock() @synchronized(my_lock) def test(a, b, c): ... ''' class synchronized: def __init__(self, lock): self.lock = lock def __call__(self, func): @functools.wraps(func) def wrapped_f(*args, **kw): self.lock.acquire() try: return func(*args, **kw) finally: self.lock.release() # wrapped_f.__name__ = func.__name__ # wrapped_f.__doc__ = func.__doc__ # wrapped_f.__dict__.update(func.__dict__) return wrapped_f ''' File lock class for fileLockSynchronized decorator. ''' class FileLock(object): """ A file locking mechanism that has context-manager support so you can use it in a with statement. This should be relatively cross compatible as it doesn't rely on msvcrt or fcntl for the locking. """ def __init__(self, lockfile, timeout=10, delay=.05, max_lifetime=60): """ Prepare the file locker. Specify the file to lock and optionally the maximum timeout and the delay between each attempt to lock. and the maximum life time of the lock. """ self.is_locked = False self.LOCK_FOLDER = "/tmp/LOCKS" if not os.path.exists(self.LOCK_FOLDER): os.makedirs(self.LOCK_FOLDER) self.lockfile = "%s/%s" % (self.LOCK_FOLDER, lockfile) self.timeout = timeout self.delay = delay self.max_lifetime = max_lifetime self.purgeUnusedLockFile() def purgeUnusedLockFile(self): if os.path.exists(self.lockfile): content = open(self.lockfile, 'r').readlines() if content: content_array = content[0].rstrip().split(',') pid = content_array[0] print pid if len(content_array) > 1: expired_timestamp = content_array[1] else: expired_timestamp = 0 if int(os.popen('ls -al /proc/%s/fd/* 2>/dev/null | grep -c " -> %s$"' %(pid, self.lockfile)).read()) > 0 \ and int(expired_timestamp) > int(libcommon.TimeUtils().getUTCTimestamp()): return False else: print int(expired_timestamp) print int(libcommon.TimeUtils().getUTCTimestamp()) print int(expired_timestamp) > int(libcommon.TimeUtils().getUTCTimestamp()) print 'purge unused lock file (%s)' % self.lockfile os.unlink(self.lockfile) return True else: return False def acquire(self): """ Acquire the lock, if possible. If the lock is in use, it check again every `wait` seconds. It does this until it either gets the lock or exceeds `timeout` number of seconds, in which case it throws an exception. """ start_time = time.time() while True: try: self.fd = os.open(self.lockfile, os.O_CREAT|os.O_EXCL|os.O_RDWR) os.write(self.fd, '%s,%s' % (str(os.getpid()), str(int(libcommon.TimeUtils().getUTCTimestamp())+self.max_lifetime))) break; except OSError as e: if e.errno != errno.EEXIST: raise if (time.time() - start_time) >= self.timeout: raise FileLockException("Timeout occured.") time.sleep(self.delay) self.is_locked = True def release(self): """ Get rid of the lock by deleting the lockfile. When working in a `with` statement, this gets automatically called at the end. """ if self.is_locked: os.close(self.fd) os.unlink(self.lockfile) self.is_locked = False def __enter__(self): """ Activated when used in the with statement. Should automatically acquire a lock to be used in the with block. """ if not self.is_locked: self.acquire() return self def __exit__(self, type, value, traceback): """ Activated at the end of the with statement. It automatically releases the lock if it isn't locked. """ if self.is_locked: self.release() def __del__(self): """ Make sure that the FileLock instance doesn't leave a lockfile lying around. """ self.release() ''' Usage: @fileLockSynchronized('my_lock1') @fileLockSynchronized('my_lock2', 10, 1, 30) def test(a, b, c): ... ''' class fileLockSynchronized: def __init__(self, lockfile, timeout=10, delay=1, max_lifetime=60): self.lockfile = lockfile self.timeout = timeout self.delay = delay self.max_lifetime = max_lifetime def __call__(self, func): @functools.wraps(func) def wrapped_f(*args, **kw): FL = FileLock(self.lockfile, self.timeout, self.delay, self.max_lifetime) try: FL.acquire() return func(*args, **kw) finally: FL.release() # wrapped_f.__name__ = func.__name__ # wrapped_f.__doc__ = func.__doc__ # wrapped_f.__dict__.update(func.__dict__) return wrapped_f ''' Usage: @saveCurrentWorkingDirectory def func(): ... ''' class saveCurrentWorkingDirectory: def __init__(self, func): self.func = func self.__name__ = self.func.__name__ self.__doc__ = self.func.__doc__ self.__dict__.update(self.func.__dict__) pass def __call__(self, *args, **kwargs): cwd = os.getcwd() sReturn = self.func(*args, **kwargs) os.chdir(cwd) return sReturn ''' Usage: @singleton class cls: ... ''' ''' def singleton(cls): instances = {} def getInstance(*args, **kwargs): if cls not in instances: instances[cls] = cls(*args, **kwargs) return instances[cls] return getInstance ''' class singleton: def __init__(self, cls): self.instances = {} self.cls = cls pass def __call__(self, *args, **kwargs): if self.cls not in self.instances: self.instances[self.cls] = self.cls(*args, **kwargs) return self.instances[self.cls]
Wish this helps.
regards,
Stanley Huang
Comments
Post a Comment