[Level 3] More decorator samples.

There is a good website to demo decorator sample codes. here

'''
Usage:
@deprecated
def func(): ...
'''
class deprecated:
    def __init__(self, func):
        self.func = func
        self.__name__ = self.func.__name__
        self.__doc__ = self.func.__doc__
        self.__dict__.update(self.func.__dict__)
        pass
    
    """This is a decorator which can be used to mark functions
    as deprecated. It will result in a warning being emitted
    when the function is used."""
    def __call__(self, *args, **kwargs):
        msg = "Call to deprecated function %s()." % self.func.__name__
        ## Suppressing Warnings
        warnings.simplefilter("ignore")
        warnings.warn(msg, category=DeprecationWarning)
        ## Delete ignore filter 
        del warnings.filters[0]
        (tmpDebug2Console, libcommon.bDebug2Console) = (libcommon.bDebug2Console, False)
        libcommon.debug(msg, logfile='deprecated.log')
        libcommon.bDebug2Console = tmpDebug2Console
        return self.func(*args, **kwargs)

'''
Usage:
#{case 1}
libdecorator.func_state_flag_default = True/False/None
libdecorator.func_state_flag_test = True/False/None
libdecorator.func_state_flag = True/False/None
@funcState()
@funcState(func_set='test')
@funcState(func_set='test', skip_return_value=True)
def func(): ...

#{case 2}
@funcEnabled
def func(): ...

#{case 3}
@funcDisabled
def func(): ...

#{case 4}
@funcSkipped(True/False/...)
def func(): ...
'''
#global func_state_flag
#func_state_flag=None
class funcState:
    def __init__(self, func_set='default', skip_return_value=True):
        self.func_set = func_set
        self.skip_return_value = skip_return_value
        
    def __call__(self, func):            
        if 'func_state_flag_%s' % self.func_set in locals():
            self.func_state_flag = locals()['func_state_flag_%s' % self.func_set]
        elif 'func_state_flag_%s' % self.func_set in globals():
            self.func_state_flag = globals()['func_state_flag_%s' % self.func_set]
        elif 'func_state_flag' in locals():
            self.func_state_flag = locals()['func_state_flag']
        elif 'func_state_flag' in globals():
            self.func_state_flag = globals()['func_state_flag']
        else:
            self.func_state_flag = True
        
        self.func_enabled = funcEnabled()
        self.func_disabled = funcDisabled(func.__name__)
        self.func_skipped = funcSkipped(self.skip_return_value)

        funcState = {True: self.func_enabled,
                     False: self.func_disabled,
                     None: self.func_skipped
                     }[self.func_state_flag]

        @funcState
        @functools.wraps(func)
        def wrapped_f(*args, **kwargs):
            return func(*args, **kwargs)
#        wrapped_f.__name__ = func.__name__
#        wrapped_f.__doc__ = func.__doc__
#        wrapped_f.__dict__.update(func.__dict__)
        return wrapped_f

class funcEnabled:
    def __init__(self):
        pass
    
    "This decorator enables the provided function, and does nothing."
    def __call__(self, func):
        return func

class funcDisabled:
    def __init__(self, func_name=None):
        self.func_name = func_name
        pass
    
    "This decorator disables the provided function, and does nothing."
    def __call__(self, func):
        func_name = self.func_name if self.func_name else func.__name__
        assert False, 'function %s() is disabled!' % func_name

class funcSkipped:
    def __init__(self, return_value=True):
        self.return_value = return_value
        pass
    
    "This decorator skip the provided function, and return the specific value."
    def __call__(self, func):
        @functools.wraps(func)
        def wrapped_f(*args, **kwargs):
            return self.return_value
#        wrapped_f.__name__ = func.__name__
#        wrapped_f.__doc__ = func.__doc__
#        wrapped_f.__dict__.update(func.__dict__)
        return wrapped_f

'''
Usage:
#{case 1}
libdecorator.bIgnoreYN = True/False
self.bIgnoreYN = True/False
@yn('Are these correct?', continue_flag='yes', exit_program=True, return_type=(yn/tf/value), continue_message='continue message!', exit_message='exit message!')
def test():
  print 'test()'
  ## return True/'yes' for @libdecorator.yn()
  return True/'yes'

#{case 2}
class ctest:
  def __init__(self):
    self.bIgnoreYN = False
    pass

  @yn('Are these correct?', continue_flag='yes', exit_program=True, return_type=(yn/tf/value), continue_message='continue message!', exit_message='exit message!')
  def test(self):
    print 'ctest.test()'
    ## return True/'yes' for @libdecorator.yn()
    return True/'yes'
'''
class yn:
    def __init__(self, prompt, continue_flag='yes', exit_program=True, return_type='value', continue_message=None, exit_message=None):
        self.prompt = prompt
        self.continue_flag = continue_flag
        self.exit_program = exit_program
        self.return_type = return_type
        self.continue_message = continue_message
        self.exit_message = exit_message
        
    def __call__(self, func):
        yes = 'YES' if self.continue_flag.lower() == 'yes' else 'yes'
        no = 'NO' if self.continue_flag.lower() == 'no' else 'no'
        @functools.wraps(func)
        def wrapped_f(*args, **kwargs):
            sReturn = func(*args, **kwargs)

            ## check attribute of instance
            if args and isinstance(args[0], object) and hasattr(args[0], 'bIgnoreYN'):
                o = args[0]
                bIgnoreYN = o.bIgnoreYN
            else:
                ## check global/local variable
                if 'bIgnoreYN' in locals():
                    bIgnoreYN = locals()['bIgnoreYN']
                elif 'bIgnoreYN' in globals():
                    bIgnoreYN = globals()['bIgnoreYN']
                else:
                    bIgnoreYN = False
            ## filter ignore flag and set default to False
            if bIgnoreYN not in (True, False):
                bIgnoreYN = False
            
            while not bIgnoreYN:
                bh = libinterrupt.BreakHandler(-1)
                bh.enable()
                try:
                    yn = raw_input('%s [%s/%s]: ' % (self.prompt, yes, no))
                except EOFError:
                    yn = None
                bh.disable()
                    
                if yn.lower() in ('yes', 'no'):
                    if yn.lower() == self.continue_flag.lower():
                        if self.continue_message: print self.continue_message
                    else:
                        if self.exit_message: print self.exit_message
                        if self.exit_program: exit(1)
                    break
                else:
                    print 'Only accept %s/%s!' % (yes, no)
            
            if bIgnoreYN:
                return sReturn
            else:
                if self.return_type.lower() == 'yn':
                    sReturn = yn.lower()
                elif self.return_type.lower() == 'tf':
                    sReturn = True if yn.lower() == self.continue_flag else False
                elif self.return_type.lower() == 'value':
                    pass
                else:
                    assert False, 'Error return type(%s)!' % self.return_type
                return sReturn
#        wrapped_f.__name__ = func.__name__
#        wrapped_f.__doc__ = func.__doc__
#        wrapped_f.__dict__.update(func.__dict__)
        return wrapped_f


'''
Usage:
@checkUnixPassword()
@checkUnixPassword('success_message', 'fail_message', 'admin')
def test(): ...
'''
class checkUnixPassword:
    def __init__(self, success_message=None, fail_message=None, user=None):
        self.success_message = success_message
        self.fail_message = fail_message
        self.user = user if user else libresourcemanager.getAccountName()

    def __call__(self, func):
        @functools.wraps(func)
        def wrapped_f(*args, **kwargs):
            passwd = getpass.getpass("Please enter %s's password: " % self.user).rstrip()
        
            ## get password field from shadow file
            sp = libcommon.shadowPassword()
            sp.setPasswordField(self.user)
        
            ## get password from crypt algorithm
            up = libcommon.unixPasswordEncryptor(passwd, sp.getPasswordEncryptType(), sp.getPasswordSalt())        
            if not sp.getPasswordField(self.user) == up.getPasswordField():
                if self.fail_message: print self.fail_message
                exit(1)
            else:
                if self.success_message: print self.success_message
                return func(*args, **kwargs)
#        wrapped_f.__name__ = func.__name__
#        wrapped_f.__doc__ = func.__doc__
#        wrapped_f.__dict__.update(func.__dict__)
        return wrapped_f

'''
Usage:
@checkCLIPassword()
@checkCLIPassword('success_message', 'fail_message')
def test(): ...
'''
class checkCLIPassword:
    def __init__(self, success_message=None, fail_message=None):
        self.success_message = success_message
        self.fail_message = fail_message
        self.ENABLE_PASSWORD_FLAG_FILE = '/tmp/enable_password.done'

    def getEncyptCLIPasswd(self):
        cmd = 'head -1 %s' % self.ENABLE_PASSWORD_FLAG_FILE
        return os.popen(cmd).read().strip()

    def encryptPasswd(self, plain_text):
        return  hashlib.sha1(plain_text).hexdigest()

    def isValidCLIPasswd(self, plain_text):
        return self.getEncyptCLIPasswd() == self.encryptPasswd(plain_text)

    def __call__(self, func):
        @functools.wraps(func)
        def wrapped_f(*args, **kwargs):
            passwd = getpass.getpass("Please enter cli password: ").rstrip()
            
            if not self.isValidCLIPasswd(passwd):
                if self.fail_message: print self.fail_message
                exit(1)
            else:
                if self.success_message: print self.success_message
                return func(*args, **kwargs)
#        wrapped_f.__name__ = func.__name__
#        wrapped_f.__doc__ = func.__doc__
#        wrapped_f.__dict__.update(func.__dict__)
        return wrapped_f

'''
libdecorator.pek2c_enable_flag = True/False
Usage:
@pek2c() 
@pek2c(before_after='before/after', message='message') 
@pek2c(before_after='before/after', message='message', confirm_message='confirm_message') 
def test(): ...
'''
class pek2c:
    def __init__(self, before_after='after', message='continue', confirm_message=''):
        self.before_after = before_after
        self.confirm_message = confirm_message
        key = 'Enter "%s"' % self.confirm_message if self.confirm_message else 'Press Enter key'
        self.message = '%s to %s... ' % (key, message)
    
    def runPek2c(self, message=''):
        if 'pek2c_enable_flag' in locals():
            pek2c_enable_flag = locals()['pek2c_enable_flag']
        elif 'pek2c_enable_flag' in globals():
            pek2c_enable_flag = globals()['pek2c_enable_flag']
        else:
            pek2c_enable_flag = True
        
        if pek2c_enable_flag:
            while True:
                bh = libinterrupt.BreakHandler(-1)
                bh.enable()
                try:
                    x = raw_input(message)
                except EOFError:
                    x = None
                bh.disable()
                
                if self.confirm_message and x != self.confirm_message:
                    continue
                else:
                    break
    
    def __call__(self, func):
        @functools.wraps(func)
        def wrapped_f(*args, **kwargs):
            if self.before_after == 'before': self.runPek2c(self.message)
            sReturn = func(*args, **kwargs)
            if self.before_after == 'after' : self.runPek2c(self.message)
            return sReturn
#        wrapped_f.__name__ = func.__name__
#        wrapped_f.__doc__ = func.__doc__
#        wrapped_f.__dict__.update(func.__dict__)
        return wrapped_f

'''
Usage:
@dumpArgs  
def test(a, b, c): ...
'''
class dumpArgs:
    def __init__(self, func):
        self.func = func
        "This decorator dumps out the arguments passed to a function before calling it"
        self.argnames = func.func_code.co_varnames[:func.func_code.co_argcount]
        self.fname = func.func_name
        self.__name__ = func.__name__
        self.__doc__ = func.__doc__
        self.__dict__.update(func.__dict__)
        
    def __call__(self, *args, **kwargs):
        print self.fname, ":", ', '.join(
            '%s=%r' % entry
            for entry in zip(self.argnames,args) + kwargs.items())
        return self.func(*args, **kwargs)

'''
Usage:
@retry()
@retry(libdecorator.RetryException, nRetry=3, nDelay=3, nBackOff=2)
def test(a, b, c):
    raise retryException('retry')
'''
class RetryException(Exception):
    def __init__(self, message=''):
        self.__message = message
    
    def __str__(self):
        return self.__message
    pass

class retry:
    def __init__(self, ExceptionToCheck=RetryException, nRetry=3, nDelay=1, nBackOff=1):
        self.ExceptionToCheck = ExceptionToCheck
        self.nRetry = nRetry
        self.nDelay = nDelay
        self.nBackOff = nBackOff
    
    def __call__(self, func):
        @functools.wraps(func)
        def wrapped_f(*args, **kwargs):
            for i in range(1, self.nRetry+1):
                try:
                    sReturn = func(*args, **kwargs)
                    return sReturn
                except self.ExceptionToCheck, e:
                    if i < self.nRetry:
                        print '%s %s time but fail, wait %s sec to continue' % ('Run' if i == 1 else 'Retry', i, self.nDelay)
                        time.sleep(self.nDelay)
                        self.nDelay = self.nDelay * self.nBackOff
                    if i == self.nRetry:
                        raise e
                except Exception, e:
                    raise e
                else:
                    break
#        wrapped_f.__name__ = func.__name__
#        wrapped_f.__doc__ = func.__doc__
#        wrapped_f.__dict__.update(func.__dict__)
        return wrapped_f

'''
Usage:
from threading import Lock
my_lock = Lock()
@synchronized(my_lock)
def test(a, b, c): ...
'''
class synchronized:
    def __init__(self, lock):
        self.lock = lock
    
    def __call__(self, func):
        @functools.wraps(func)
        def wrapped_f(*args, **kw):
            self.lock.acquire()
            try:
                return func(*args, **kw)
            finally:
                self.lock.release()
#        wrapped_f.__name__ = func.__name__
#        wrapped_f.__doc__ = func.__doc__
#        wrapped_f.__dict__.update(func.__dict__)
        return wrapped_f

'''
File lock class for fileLockSynchronized decorator.
'''
class FileLock(object):
    """ A file locking mechanism that has context-manager support so 
        you can use it in a with statement. This should be relatively cross
        compatible as it doesn't rely on msvcrt or fcntl for the locking.
    """
 
    def __init__(self, lockfile, timeout=10, delay=.05, max_lifetime=60):
        """ Prepare the file locker. Specify the file to lock and optionally
            the maximum timeout and the delay between each attempt to lock.
            and the maximum life time of the lock.
        """
        self.is_locked = False
        self.LOCK_FOLDER = "/tmp/LOCKS"
        if not os.path.exists(self.LOCK_FOLDER):
            os.makedirs(self.LOCK_FOLDER)
        self.lockfile = "%s/%s" % (self.LOCK_FOLDER, lockfile)
        self.timeout = timeout
        self.delay = delay
        self.max_lifetime = max_lifetime
        self.purgeUnusedLockFile()
 
    def purgeUnusedLockFile(self):
        if os.path.exists(self.lockfile):
            content = open(self.lockfile, 'r').readlines()
            if content:
                content_array = content[0].rstrip().split(',')
                pid = content_array[0]
                print pid
                if len(content_array) > 1:
                    expired_timestamp = content_array[1]
                else:
                    expired_timestamp = 0
                if int(os.popen('ls -al /proc/%s/fd/* 2>/dev/null | grep -c " -> %s$"' %(pid, self.lockfile)).read()) > 0 \
                and int(expired_timestamp) > int(libcommon.TimeUtils().getUTCTimestamp()):
                    return False
                else:
                    print int(expired_timestamp)
                    print int(libcommon.TimeUtils().getUTCTimestamp())
                    print int(expired_timestamp) > int(libcommon.TimeUtils().getUTCTimestamp())
            print 'purge unused lock file (%s)' % self.lockfile
            os.unlink(self.lockfile)
            return True
        else:
            return False    
 
    def acquire(self):
        """ Acquire the lock, if possible. If the lock is in use, it check again
            every `wait` seconds. It does this until it either gets the lock or
            exceeds `timeout` number of seconds, in which case it throws 
            an exception.
        """
        start_time = time.time()
        while True:
            try:
                self.fd = os.open(self.lockfile, os.O_CREAT|os.O_EXCL|os.O_RDWR)
                os.write(self.fd, '%s,%s' % (str(os.getpid()), str(int(libcommon.TimeUtils().getUTCTimestamp())+self.max_lifetime)))
                break;
            except OSError as e:
                if e.errno != errno.EEXIST:
                    raise 
                if (time.time() - start_time) >= self.timeout:
                    raise FileLockException("Timeout occured.")
                time.sleep(self.delay)
        self.is_locked = True
 
 
    def release(self):
        """ Get rid of the lock by deleting the lockfile. 
            When working in a `with` statement, this gets automatically 
            called at the end.
        """
        if self.is_locked:
            os.close(self.fd)
            os.unlink(self.lockfile)
            self.is_locked = False
 
 
    def __enter__(self):
        """ Activated when used in the with statement. 
            Should automatically acquire a lock to be used in the with block.
        """
        if not self.is_locked:
            self.acquire()
        return self
 
 
    def __exit__(self, type, value, traceback):
        """ Activated at the end of the with statement.
            It automatically releases the lock if it isn't locked.
        """
        if self.is_locked:
            self.release()
 
 
    def __del__(self):
        """ Make sure that the FileLock instance doesn't leave a lockfile
            lying around.
        """
        self.release()

'''
Usage:
@fileLockSynchronized('my_lock1')
@fileLockSynchronized('my_lock2', 10, 1, 30)
def test(a, b, c): ...
'''
class fileLockSynchronized:
    def __init__(self, lockfile, timeout=10, delay=1, max_lifetime=60):
        self.lockfile = lockfile
        self.timeout = timeout
        self.delay = delay
        self.max_lifetime = max_lifetime
    
    def __call__(self, func):
        @functools.wraps(func)
        def wrapped_f(*args, **kw):
            FL = FileLock(self.lockfile, self.timeout, self.delay, self.max_lifetime)
            try:
                FL.acquire()
                return func(*args, **kw)
            finally:
                FL.release()
#        wrapped_f.__name__ = func.__name__
#        wrapped_f.__doc__ = func.__doc__
#        wrapped_f.__dict__.update(func.__dict__)
        return wrapped_f

'''
Usage:
@saveCurrentWorkingDirectory
def func(): ...
'''
class saveCurrentWorkingDirectory:
    def __init__(self, func):
        self.func = func
        self.__name__ = self.func.__name__
        self.__doc__ = self.func.__doc__
        self.__dict__.update(self.func.__dict__)
        pass
    
    def __call__(self, *args, **kwargs):
        cwd = os.getcwd()
        sReturn = self.func(*args, **kwargs)
        os.chdir(cwd)
        return sReturn

'''
Usage:
@singleton
class cls: ...
'''

'''
def singleton(cls):
    instances = {}
    def getInstance(*args, **kwargs):
        if cls not in instances:
            instances[cls] = cls(*args, **kwargs)
        return instances[cls]
    return getInstance
'''
class singleton:
    def __init__(self, cls):
        self.instances = {}
        self.cls = cls
        pass
    
    def __call__(self, *args, **kwargs):
        if self.cls not in self.instances:
            self.instances[self.cls] = self.cls(*args, **kwargs)
        return self.instances[self.cls]

Wish this helps.  
regards,
Stanley Huang

Comments

Popular posts from this blog

[Level 1] Reversion of git

[Level 2] iif in Python