[three]Bean

Getting the docstring of a function (without the name)

Sep 13, 2011 | categories: python, oops View Comments

I was using argparse and entry-points [console_scripts] in a project at work and I wanted each of the 12 console scripts to have a --help option that would display (among other things) the __doc__ of their main entry function. This way I wouldn't have to rewrite those docs over and over when I added a new script.

I came up with this little guy:

#!/usr/bin/env python
""" The module level docstring """

import inspect


def f():
    """ The function level docstring """
    print __my_doc__()


def g():
    """ Another function level docstring """
    print __my_doc__()


def __my_doc__():
    """ Print the docstring of the calling function """
    return globals()[inspect.stack()[1][0].f_code.co_name].__doc__


if __name__ == '__main__':
    f()
    g()

You could use it to, say, do:

import argparse

def some_command():
    """ This command will knock your socks off! """
    parser = argparse.ArgumentParser(description=__my_doc__())
    args = parser.parse_args()
    raise NotImplementedError("TODO -- gotta write this command still...")

Relying on globals() really sucks. If anyone can think of a better way to do it, let me know!

EDIT -- 09/15/2011

Thanks to some inspiration from @heewa, I landed on this solution which is much more portable.

import inspect

def __my_doc__(n=1):
    """ Print the docstring of the calling function.
    
    Because this function doesn't rely on :py:func:`globals()`, it is more
    portable.  It can now live in its own module and be invoked from elsewhere.
    """

    frame = inspect.stack()[n][0]
    return frame.f_globals[frame.f_code.co_name].__doc__
View Comments

Automatically converting integer timestamps to python datetime in reflected SQLAlchemy models

Sep 01, 2011 | categories: python, sqlalchemy, slurm View Comments

What a title...

I'm working on slurchemy and I have a legacy database with tons of tables (many are dynamically created by another app). Each table has a few '*_time' fields that are stored as Integers. A quick google showed me how to reflect SQLAlchemy models from the prexisting database, but getting the '*_time' columns to play out in python as datetime objects (and not as `long`s) was a real nuisance.

I first tried to use the event framework like so:

from sqlalchemy import types
from sqlalchemy import event

def listen_for_reflect(table, column_info):
    if 'time' in column_info['name']:
        column_info['type'] = types.DateTime()

event.listen(Table, 'column_reflect', listen_for_reflect)

This worked insofar as the Table's column type was really changed to a sqlalchemy.types.DateTime object in every case I wanted it to. But once a sqlalchemy.orm.mapper was applied, my changes weren't reflected, so to speak.

I banged my head against the sqlalchemy codebase and couldn't make anything really elegant happen. Here's what I settled with:

import datetime
import time

from sqlalchemy import MetaData, Table
from sqlalchemy.orm import scoped_session, class_mapper

def add_datetime_properties(cls):
    """ For every property of a class that contains '_time', add a
    corresponding '_datetime' property that converts to and from seconds 
    since the epoch.

    Author:  Ralph Bean <ralph.bean@gmail.com>

    Use like::
        >>> DBSession = scoped_session(maker)
        >>> DBSession.configure(bind=engine)
        >>> metadata = MetaData(engine.url)
        >>> table = Table('thing_table', metadata, autoload=True)

        >>> class Thing(object):
        ...     pass
        >>> mapper(Thing, table)

        >>> add_datetime_properties(Thing)

        >>> t = DBSession.query(Thing).first()
        >>> print t.create_time
        ... 1314900554
        >>> print t.create_datetime
        ... 2011-09-01 14:09:14
    """

    for prop in class_mapper(cls).iterate_properties:
        if '_time' not in prop.key:
            continue  # Fugheddaboudit

        key = prop.key

        def getx(self):
            return datetime.datetime.fromtimestamp(
                float(getattr(self, key)))

        def setx(self, x):
            setattr(self, key, time.mktime(x.timetuple()))

        datetime_key = key.replace('_time', '_datetime')

        setattr(cls, datetime_key, property(getx, setx))

And it worked!

View Comments

Using repoze.who.plugins.ldap in a TurboGears 2.1 app

Jul 19, 2011 | categories: python, turbogears, ldap View Comments

Often, you will need to authenticate against ldap in your webapp. Here's how to make that happen in a freshly quickstarted TurboGears 2.1 app.

Setting up your environment

mkvirtualenv --no-site-packages repoze-ldap-app
pip install tg.devtools
paster quickstart   # call the app repoze-ldap-app, yes to mako and auth
cd repoze-ldap-app
python setup.py develop
pip install genshi  # This is a workaround.
paster setup-app development.ini
paster serve development.ini  # To test if the basic app works.

Point your browser at http://localhost:8080 just to make sure everything is cool.

Setting up repoze.who.plugins.ldap

Add the following line to the install_requires list in setup.py:

    "repoze.who.plugins.ldap",

Run python setup.py develop to install the newly listed repoze plugin.

Add the following four lines to development.ini which reference an as yet unwritten secondary configuration file. Place them just above the sqlalchemy.url=... lines:

# Repoze.who stuff
who.config_file = %(here)s/who.ini
who.log_level = INFO
who.log_stream = stdout

Create a new file who.ini with the following contents:

# This file is adapted from:
# http://threebean.org/blog/2011/07/19/using-repoze-who-plugins-ldap-in-a-turbogears-2-1-app/
# which has been adapted from:
# http://static.repoze.org/whodocs/#middleware-configuration-via-config-file
# which has been adapted from:
# http://code.gustavonarea.net/repoze.who.plugins.ldap/Using.html

[plugin:friendlyform]
use = repoze.who.plugins.friendlyform:FriendlyFormPlugin
login_form_url= /login
login_handler_path = /login_handler
logout_handler_path = /logout_handler
rememberer_name = auth_tkt
post_login_url = /post_login
post_logout_url = /post_logout

[plugin:auth_tkt]
use = repoze.who.plugins.auth_tkt:make_plugin
secret = omg_this_is_so_secret_lololololol_2938485#butts

[plugin:ldap_auth]
# Here I use my own ldap_auth, since by default ldap allows affirmative
# authentication with *no password specified*.  That is lame; I override it.
use = repozeldapapp.lib.auth:ReconnectingAuthenticatorPlugin

# This is the URI of wherever you want to connect to.  I work at RIT.
ldap_connection = ldap://ldap.rit.edu

# This is the base of the 'distinguished names' (DNs) of persons in your
# particular LDAP instance.  It will vary from server to server.
base_dn = ou=People,dc=rit,dc=edu

[plugin:ldap_attributes]
# I also do some overriding for more security in how I get attributes for
# users.
use = repozeldapapp.lib.auth:ReconnectingLDAPAttrsPlugin
ldap_connection = ldap://ldap.rit.edu

[general]
request_classifier = repoze.who.classifiers:default_request_classifier
challenge_decider = repoze.who.classifiers:default_challenge_decider

[mdproviders]
plugins =
    ldap_attributes

[identifiers]
plugins =
    friendlyform;browser
    auth_tkt

[authenticators]
plugins =
    ldap_auth

[challengers]
plugins =
    friendlyform;browser

Create another new file repozeldapapp/lib/auth.py with the following contents:

from repoze.who.plugins.ldap import (
    LDAPAttributesPlugin, LDAPAuthenticatorPlugin
)
import ldap


class URISaver(object):
    """ Saves the ldap_connection str given to repoze authn and authz """
    def __init__(self, *args, **kw):
        self.uri = kw['ldap_connection']
        super(URISaver, self).__init__(*args, **kw)


class ReconnectingLDAPAttrsPlugin(LDAPAttributesPlugin, URISaver):
    """ Gets attributes from LDAP.  Refreshes connection if stale. """

    def add_metadata(self, environ, identity):
        """ Add ldap attributes to the `identity` entry. """

        try:
            return super(ReconnectingLDAPAttrsPlugin, self).add_metadata(
                environ, identity)
        except Exception, e:
            print "FAILED TO CONNECT TO LDAP 1 : " + str(e)
            print "Retrying..."
            self.ldap_connection = ldap.initialize(self.uri)
            return super(ReconnectingLDAPAttrsPlugin, self).add_metadata(
                environ, identity)


class ReconnectingAuthenticatorPlugin(LDAPAuthenticatorPlugin, URISaver):
    """ Authenticates against LDAP.

    - Refreshes connection if stale.
    - Denies anonymously-authenticated users

    """

    def authenticate(self, environ, identity):
        """ Extending the repoze.who.plugins.ldap plugin to make it much
        more secure. """

        res = None

        try:
            # This is unbelievable.  Without this, ldap will
            #   let you bind anonymously
            if not identity.get('password', None):
                return None
            try:
                dn = self._get_dn(environ, identity)
            except (KeyError, TypeError, ValueError):
                return None

            res = super(ReconnectingAuthenticatorPlugin, self).authenticate(
                environ, identity)

            # Sanity check here (for the same reason as the above check)
            if "dn:%s" % dn != self.ldap_connection.whoami_s():
                return None

        except ldap.LDAPError, e:
            print "FAILED TO CONNECT TO LDAP 2 : " + str(e)
            print "Retrying..."
            self.ldap_connection = ldap.initialize(self.uri)

        return res

Finally, do two things to repozeldapapp/config/middleware.py.

Edit it and at the top of the file add:

from repoze.who.config import make_middleware_with_config

Add the following inside the make_app(...) function, just below the comment line about Wrap your base TurboGears 2..., like so:

    # Wrap your base TurboGears 2 application with custom middleware here
    app = make_middleware_with_config(
        app, global_conf,
        app_conf['who.config_file'],
        app_conf['who.log_stream'],
        app_conf['who.log_level'])

Give it a test

Restart the paster server and reload http://localhost:8080. Try logging in as a user in your ldap instance and you should be all gravy.

You should be all gravy.
View Comments

Cached function calls with expiration in python with shelve and decorator

Jun 08, 2011 | categories: python View Comments

Cacheing with decorators is nice. Sometimes you don't want to use something super heavyweight, but just a little something you rolled on your own.

import time
import random

@scached(cache_file='shelvecache.db', expiry=datetime.timedelta(seconds=5))
def f1(foo, bar='baz'):
    """ Example of using the cache decorator """

    print " ** starting in f1.. sleepy time"
    time.sleep(5)
    # The result of all my hard work
    result = random.random()
    print " ** woke up with", result
    return result

if __name__ == '__main__':
    print f1('hai') # slow
    print f1('hai') # fast
    print f1(foo='hai') # fast

    print "okay.. sleeping on the outside"
    time.sleep(5)

    print f1('hai') # slow again
    print f1('hai') # fast again

Here's the code that provides the @scached decorator.

import datetime
import decorator
import shelve
from hashlib import md5

def scached(cache_file, expiry):
    """ Decorator setup """

    def scached_closure(func, *args, **kw):
        """ The actual decorator """
        key = md5(':'.join([func.__name__, str(args), str(kw)])).hexdigest()
        d = shelve.open(cache_file)

        # Expire old data if we have to
        if key in d:
            if d[key]['expires_on'] < datetime.datetime.now():
                del d[key]

        # Get new data if we have to
        if key not in d:
            data = func(*args, **kw)
            d[key] = {
                'expires_on' : datetime.datetime.now() + expiry,
                'data': data,
            }

        # Return what we got
        result = d[key]['data']
        d.close()

        return result

    return decorator.decorator(scached_closure)

For extra cool points, combine the above with my post on shelve and context managers.

View Comments

Switching virtualenvs with a python context manager

Jun 06, 2011 | categories: pip, python, virtualenv, pypi View Comments

EDIT: I released this. You can find it on pypi.

---

Got it! Spent the last day working on a control script for moksha to replace my mistake of choosing fabric.

With this little nugget, you can do cool in-python context switching of virtualenvs with VirtualenvContext like this:

#!/usr/bin/python

from blogcopypaste import VirtualenvContext

try:
    import kitchen
except ImportError as e:
    print "kitchen is definitely not installed in system-python"

with VirtaulenvContext("my-venv"):
    import kitchen
    print "But it *is* installed in my virtualenv"

try:
    import kitchen
except ImportError as e:
    print "But once I exit that block, I lose my powers again..."

kitchen could be any non-standard-library python package you choose. (Although kitchen itself is pretty cool).

I learned a ton about ihooks and the python __import__ built-in... PEP 302 was an eye-opener.

Here's the code that makes that fancy VirtualenvContext happen:

""" Virtualenv context management! """

import os
import sys
import ihooks
import warnings
import imp

def _silent_load_source(name, filename, file=None):
    """ Helper function.  Overrides a import hook.  Suppresses warnings. """
    with warnings.catch_warnings():
        warnings.simplefilter("ignore")
        return imp.load_source(name, filename, file)

class VenvModuleLoader(ihooks.ModuleLoader):
    """ Overridden ModuleLoader.

    Checks for a virtualenv first and remembers imports.
    """

    remembered = []

    def __init__(self, venv, verbose=0):
        self.venv = venv
        ihooks.ModuleLoader.__init__(self, verbose=verbose)
        self.hooks.load_source = _silent_load_source

    def default_path(self):
        workon = os.getenv("WORKON_HOME", None)
        venv_location = "/".join([
            workon, self.venv, 'lib/python2.7/site-packages'])
        full = lambda i : "/".join([venv_location, i])
        venv_path = [venv_location] + [
            full(item) for item in os.listdir(venv_location)
            if os.path.isdir(full(item))] + sys.path
        return venv_path + sys.path

    def load_module(self, name, stuff):
        """ Overloaded just to remember what we load """
        self.remembered.append(name)
        return ihooks.ModuleLoader.load_module(self, name, stuff)

class VirtualenvContext(object):
    """ Context manager for entering a virtualenv """

    def __init__(self, venv_name):
        self.venv = venv_name
        self.loader = VenvModuleLoader(venv=self.venv)
        self.importer = ihooks.ModuleImporter(loader=self.loader)

    def __enter__(self):
        # Install our custom importer
        self.importer.install()

        # Pretend like our exectuable is really somewhere else
        self.old_exe = sys.executable
        workon = os.getenv("WORKON_HOME", None)
        sys.executable = "/".join([workon, self.venv, 'bin/python'])

    def __exit__(self, exc_type, exc_value, traceback):
        # Uninstall our custom importer
        self.importer.uninstall()

        # Reset our executable
        sys.exectuable = self.old_exe

        # Unload anything loaded while inside the context
        for name in self.importer.loader.remembered:
            if not name in sys.modules:
                continue
            del sys.modules[name]
        self.importer.loader.remembered = []
        sys.path_importer_cache.clear()

Fun fact: you can combine this with the install_distributions function in my previous post to do:

with VirtualenvContext('some-environment'):
    install_distributions(['Markdown'])
View Comments

« Previous Page -- Next Page »