L
The website of a web programmer, footie fan, rubbish photographer, and jack of literally some trades.

t @jessemcnelis yeah, not a fan of this, but like I said, conforming to community expectations is more important than my personal preferences

t @jessemcnelis but the best choice is to follow the community conventions, even if we don't like them; in golang thats \t, in python spaces

t @jessemcnelis spaces get you alignment (readability) w/o invisible rules; adjustable indent w/ \t is false promise, or inferior, in pactice

t @jessemcnelis in python code, there are few C-style for's; nearly everything is iteration. go's range could do this with no new semantics

t @jacobian think of rpython of as a pythonic C flavor with neat compile time optimizations; pypy written in that + a jit for user python code

t OH: "@rays: Well, `fix` is a strong word."

t Are there bandages that work with capacitance touch screens?

b commit #217 to johnny-cache

May 7, 2012, 5:30 p.m.

Fixed NameError introduced in 4b35eb1009d7.

johnny/utils.py

t Interesting #GSOC project, #pypi to #debian repos conversion: http://t.co/MPECaSME

g commit 198fec2f to arachne

April 23, 2012, 5:46 p.m.

remove some unnecessary debug logging

  • @@ -237,17 +237,12 @@ def wrapped(*a, **kw):
             ignore_errors = kw.pop('ignore_errors', True)
             is_json = kw.pop('json', False)
    -        logger.debug(" >> get %s" % (a[0][:100]))
             response = func(*a, **kw)
             # pre-load the content with a read
             content = response.content
    -        logger.debug(" << get %s" % (a[0][:100]))
             # parse json
             if response.headers['content-type'].split(';')[0] in json_types or is_json:
    -            t0 = time()
                 response.json = json.loads(response.content) if response.content else {}
    -            td = time() - t0
    -            logger.debug(" jj %s in %0.2f" % (naturalsize(len(response.content)), td))
             # if an error occured and we waned to raise an exception, do it;  we
             # can still take the response off of this error
             if response.status_code > 400 and not ignore_errors:
    

g commit 558819f1 to arachne

April 23, 2012, 6:07 a.m.

fix a serious performance issue in requests, which i will blog about because it's cost me an entire weekend of my life

  • @@ -35,6 +35,59 @@
         'text/json',
     )
    +from requests import models
    +
    +import zlib
    +
    +def decompress(content, mode='gzip'):
    +    """Decode a string.  A copy of requests' stream_decompress."""
    +    if mode not in ("gzip", "deflate"):
    +        raise ValueError("decompress mode must be gzip or deflate")
    +    zlib_mode = 16 + zlib.MAX_WBITS if mode == 'gzip' else -zlib.MAX_WBITS
    +    return zlib.decompress(content, zlib_mode)
    +
    +def untransfer(content, resp):
    +    if "gzip" in resp.headers.get("content-encoding", ""):
    +        content = decompress(content, "gzip")
    +    elif "deflate" in resp.headers.get("content-encoding", ""):
    +        content = decompress(content, "deflate")
    +    return content
    +
    +class Response(models.Response):
    +    def __init__(self):
    +        super(Response, self).__init__()
    +        # some versions set to False, some to None;  we require False
    +        self._content = False
    +
    +    def iter_content(self, chunk_size=1, decode_unicode=False):
    +        """Request's default response object will read 1 (one) byte at a time
    +        from the raw response.  This normally doesn't make a huge lot of
    +        difference for a gevent application, but it does if some C library
    +        (like ujson, or lxml) has the GIL and will not allow any other python
    +        code while gevent's hub goes loco.  This behavior absolutely kills our
    +        spider's performance, so here we read the entire response in, decode,
    +        and return.  Requests was also calling str.join a needless amount."""
    +        content = untransfer(self.raw.read(), self)
    +        return content
    +
    +    @property
    +    def content(self):
    +        if self._content is False:
    +            if self._content_consumed:
    +                raise RuntimeError("The content for this response was already consumed.")
    +            try:
    +                if self.status_code is 0:
    +                    self._content = None
    +                else:
    +                    self._content = self.iter_content()
    +            except AttributeError:
    +                self._content = None
    +        self._content_consumed = True
    +        return self._content
    +
    +if models.Response != Response:
    +    models.Response = Response
    +
     class HttpError(Exception):
         def __init__(self, response):
             self.message = "%d encountered getting \"%s\"" % (response.status_code, response.url)
    @@ -186,6 +239,8 @@ def wrapped(*a, **kw):
             logger.debug(" >> get %s" % (a[0][:100]))
             response = func(*a, **kw)
    +        # pre-load the content with a read
    +        content = response.content
             logger.debug(" << get %s" % (a[0][:100]))
             # parse json
             if response.headers['content-type'].split(';')[0] in json_types or is_json:
    

t @stephaniebeth Avignon was papal home for over a century! Chateauneuf du pape (prestigious wine), means "new chateau of the pope"

t AMQP.get is a complete waste of time (literally). Set your phasers to "consume". #tmyk #psa

g commit fb9d9e78 to arachne

April 21, 2012, 10:37 p.m.

push some debug logging for the workerserver

  • @@ -19,6 +19,8 @@
     from arachne.conf import merge, settings, require
     from arachne.utils import encode, decode
    +from humanize.filesize import naturalsize
    +
     # OAuth v1.0a support from requests-oauth
     from oauth_hook import OAuthHook
    @@ -182,10 +184,15 @@ def wrapped(*a, **kw):
             ignore_errors = kw.pop('ignore_errors', True)
             is_json = kw.pop('json', False)
    +        logger.debug(" >> get %s" % (a[0][:100]))
             response = func(*a, **kw)
    +        logger.debug(" << get %s" % (a[0][:100]))
             # parse json
             if response.headers['content-type'].split(';')[0] in json_types or is_json:
    +            t0 = time()
                 response.json = json.loads(response.content) if response.content else {}
    +            td = time() - t0
    +            logger.debug(" jj %s in %0.2f" % (naturalsize(len(response.content)), td))
             # if an error occured and we waned to raise an exception, do it;  we
             # can still take the response off of this error
             if response.status_code > 400 and not ignore_errors:
    

g commit f546808f to arachne

April 21, 2012, 10:28 p.m.

add support for consume/cancel and a basic channel consumer which plays well with gevent (much, much more well than polling with get)

  • @@ -4,6 +4,7 @@
     """AMQP adapters for the scheduler."""
     from functools import wraps
    +import logging
     from gevent import queue, sleep, getcurrent
     from time import time
    @@ -18,6 +19,8 @@
         "poolsize": 5,
     }
    +logger = logging.getLogger(__name__)
    +
     def autoreconnect(func):
         @wraps(func)
         def wrapper(self, *a, **kw):
    @@ -40,7 +43,10 @@ def new_connection(self):
             con = AmqpClient(**c)
             return con
    -class Amqp(object):
    +class AmqpPool(object):
    +    """A pooled Amqp client.  Multiple connections are made and passed out
    +    on demand, so they cannot be used by two different greenlets at once.  It
    +    might be better to use a single amqp connection with a Consumer."""
         def __init__(self, **kw):
             config = merge(defaults, settings.like("amqp"), kw)
             required = ("port", "username", "password", "host", "vhost", "exchange", "queue")
    @@ -69,10 +75,21 @@ def poll(self, *a, **kw):
             with self.pool.connection() as client:
                 return client.poll(*a, **kw)
    -class AmqpClient(object):
    +    def consume(self, *a, **kw):
    +        with self.pool.connection() as client:
    +            return client.consume(*a, **kw)
    +
    +    def cancel(self, *a, **kw):
    +        with self.pool.connection() as client:
    +            return client.cancel(*a, **kw)
    +
    +class Amqp(object):
         def __init__(self, **kw):
    -        self.__dict__.update(kw)
    -        self.config = kw
    +        config = merge(defaults, settings.like("amqp"), kw)
    +        required = ("port", "username", "password", "host", "vhost", "exchange", "queue")
    +        require(self, config, required)
    +        self.__dict__.update(config)
    +        self.config = config
             self.reconnect()
         def reconnect(self):
    @@ -121,11 +138,43 @@ def poll(self, queue=None, timeout=None, every=0.1):
                 m = self.get(queue)
             return m
    -# FIXME: a joinable queue?
    -class Queue(queue.Queue):
    -
    -    def fill(self, client, queue=None):
    +    def consume(self, callback, queue=None, no_ack=True):
    +        """Start consuming messages from a channel.  Returns the channel.
    +        Use AmqpClient.cancel() to cancel this consuming."""
    +        self.tag = self.channel.basic_consume(queue or self.queue, callback=callback, no_ack=no_ack)
    +        return self.channel
    +
    +    def cancel(self, tag=None):
    +        """Cancel consuming."""
    +        self.channel.basic_cancel(tag or self.tag)
    +
    +class Consumer(object):
    +    """A queue consumer.  This queue will consume a channel and fill up a local
    +    synchronized queue which can then be polled by many greenlets.  The consume
    +    should be much lower impact than issuing a storm of failing gets."""
    +    def __init__(self, client=None, size=100):
    +        self.greenlets = []
    +        self.messages = queue.Queue(int(size))
    +        self.client = client if client else Amqp()
    +
    +    def start(self):
    +        channel = self.client.consume(callback=self.fill)
    +        while 1:
    +            try:
    +                channel.wait()
    +            except Exception, e:
    +                self.client.cancel()
    +                logger.error("Error occured while waiting on channel: %s" % e)
    +                self.client.reconnect()
    +                channel = self.client.consume(callback=self.fill)
    +        logger.error("leaving impossible-to-leave loop")
    +
    +    def stop(self):
    +        logger.debug("Stopping consumer")
    +        self.client.cancel()
    +
    +    def fill(self, message):
             """Fill a local gevent-synced queue with items from a client."""
    -        # FIXME: write this
    +        self.messages.put(message)
    

t @Imposeren not sure it's an easy fix; the cache key is based on the query. similar issue: orm queries using datetime.now() never cache

t @Imposeren afraid so. Order by RANDOM() is pretty scary, though...

g commit 778bca66 to arachne

April 20, 2012, 6:01 p.m.

guard around header cache a bit differently

  • @@ -205,19 +205,22 @@ def wrapper(*a, **kw):
                 return func(*a, **kw)
             url = requests_url(*a, **kw)
    -        ch = header_cache.get(url)
    -        if "expires" in ch and ch["expires"] > utcnow():
    -            raise CacheHit("Expires in the future.")
    -        kw.setdefault("headers", {}).update(ch)
    +        if settings.enable_header_cache:
    +            ch = header_cache.get(url)
    +            if ch:
    +                if "expires" in ch and ch["expires"] > utcnow():
    +                    raise CacheHit("Expires in the future.")
    +                kw.setdefault("headers", {}).update(ch)
             response = func(*a, **kw)
             if response.status_code == 304:
                 raise CacheHit("304 status code.")
             # set cache control headers if available
    -        ch = cache_headers(response.headers)
    -        if ch:
    -            header_cache.set(url, cache_headers(response.headers))
    +        if settings.enable_header_cache:
    +            ch = cache_headers(response.headers)
    +            if ch:
    +                header_cache.set(url, cache_headers(response.headers))
             return response
         return wrapper
    @@ -254,19 +257,20 @@ def cache_headers(headers):
     def disable_header_cache():
         """Utility function to disable the header cache."""
         global header_cache
    +    settings.enable_header_cache = False
         header_cache = DummyHeaderCache()
     def enable_header_cache(**kw):
         """Utility function to enable the header cache with arguments."""
         global header_cache
    +    settings.enable_header_cache = True
         header_cache = HeaderCache(**kw)
     class HeaderCache(object):
         """Keeps a cache of url headers."""
         def __init__(self, **kw):
             # use specialized cache if available, else default cache location
    -        self.config = merge(settings.like("header_cache"), kw)
    -        self.config = merge(settings.like("memcached"), self.config)
    +        self.config = merge(settings.like("memcached"), settings.like("header_cache"), kw)
             self.client = memcached.Memcached(**self.config)
         def get(self, url):
    @@ -288,6 +292,7 @@ def set(self, url, header): return
     # -- static modifications --
     header_cache = HeaderCache() if settings.enable_header_cache else DummyHeaderCache()
    +
     get = wrapget(requests.get)
     post = wrapget(requests.post)
     head = wrapget(requests.head)
    

t A fast event loop won't help resource-starved green threads. Proper scheduling is still really important, and non-trivial.

g commit 2c03968d to arachne

April 19, 2012, 3:16 p.m.

apply qos to the queue channel

  • @@ -13,8 +13,9 @@
     defaults = {
         "port": 5672,
    -    "prefetch_count": 50,
    +    "prefetch_count": 20,
         "queue_size": 100,
    +    "poolsize": 5,
     }
     def autoreconnect(func):
    @@ -83,6 +84,7 @@ def reconnect(self):
             )
             qa = dict(durable=False, auto_delete=False)
             self.channel = self.connection.channel()
    +        self.channel.basic_qos(0, self.prefetch_count, False)
             self.channel.queue_declare(queue=self.queue,exclusive=False, **qa)
             self.channel.exchange_declare(self.exchange, type="fanout", **qa)
             self.channel.queue_bind(queue=self.queue, exchange=self.exchange)
    

g commit a5477ba3 to arachne

April 19, 2012, 2:52 p.m.

add timing utils to arachne.utils

  • @@ -5,8 +5,11 @@
     import re
     import inspect
    +import time
     import zlib
     import ujson as json
    +import logging
    +from functools import wraps
     from collections import defaultdict
     class Registry(defaultdict):
    @@ -108,3 +111,43 @@ def __iter__(self):
         def __len__(self):
             return len(self.items)
    +def timer(f, threshold=0.5):
    +    """Simple timing of a whole function.  Does not take into consideration time
    +    this greenlet has spent sleeping."""
    +    logger = logging.getLogger("%s.%s" % (f.__module__, f.__name__))
    +    @wraps(f)
    +    def wrapper(*a, **kw):
    +        t0 = time.time()
    +        r = f(*a,**kw)
    +        td = time.time() - t0
    +        if td > threshold:
    +            logger.info("took %0.2fs (threshold: %0.2f)" % (td, threshold))
    +        return r
    +    return wrapper
    +
    +class Stopwatch(object):
    +    """A timer that allows you to make named ticks and can print a simple
    +    breakdown of the time between ticks after it's stopped."""
    +    def __init__(self, name='Stopwatch'):
    +        self.name = name
    +        self.start = time.time()
    +        self.ticks = []
    +
    +    def tick(self, name):
    +        self.ticks.append((name, time.time()))
    +
    +    def stop(self):
    +        self.stop = time.time()
    +
    +    def summary(self):
    +        """Return a summary of timing information."""
    +        self.stop()
    +        total = self.stop - self.start
    +        s = "%s duration: %0.2f\n" % (self.name, total)
    +        prev = ("start", self.start)
    +        for tick in self.ticks:
    +            s += ("   %s => %s" % (prev[0], tick[0])).ljust(30) + "... %0.2fs\n" % (tick[1] - prev[1])
    +            prev = tick
    +        s += ("   %s => end" % (tick[0])).ljust(30) + "... %0.2fs" % (self.stop - tick[1])
    +        return s
    +
    

g commit d6c90c73 to arachne

April 19, 2012, 2:16 p.m.

pool changes, make argument aliasing safer against error

  • @@ -8,6 +8,7 @@
     from time import time
     from arachne.conf import settings, merge, require
    +from arachne.utils import ConnectionPool
     from kombu.transport.amqplib import Connection, amqp
     defaults = {
    @@ -27,6 +28,17 @@ def wrapper(self, *a, **kw):
             return ret
         return wrapper
    +class AmqpConnectionPool(ConnectionPool):
    +    def __init__(self, config, maxsize=10):
    +        maxsize = int(config.get("poolsize", maxsize))
    +        super(AmqpConnectionPool, self).__init__(maxsize)
    +        self.config = config
    +
    +    def new_connection(self):
    +        c = self.config
    +        con = AmqpClient(**c)
    +        return con
    +
     class Amqp(object):
         def __init__(self, **kw):
             config = merge(defaults, settings.like("amqp"), kw)
    @@ -34,31 +46,27 @@ def __init__(self, **kw):
             require(self, config, required)
             self.__dict__.update(config)
             self.config = config
    -        self.pool = {}
    -
    -    def client(self):
    -        current = getcurrent()
    -        if current in self.pool:
    -            return self.pool[current]
    -        c = self.config
    -        client = AmqpClient(**c)
    -        self.pool[current] = client
    -        return client
    +        self.pool = AmqpConnectionPool(config)
         def reconnect(self):
    -        return self.client().reconnect()
    +        with self.pool.connection() as client:
    +            return client.reconnect()
         def status(self, **kw):
    -        return self.client().status(**kw)
    +        with self.pool.connection() as client:
    +            return client.status(**kw)
         def publish(self, *a, **kw):
    -        return self.client().publish(*a, **kw)
    +        with self.pool.connection() as client:
    +            return client.publish(*a, **kw)
         def get(self, *a, **kw):
    -        return self.client().get(*a, **kw)
    +        with self.pool.connection() as client:
    +            return client.get(*a, **kw)
         def poll(self, *a, **kw):
    -        return self.client().poll(*a, **kw)
    +        with self.pool.connection() as client:
    +            return client.poll(*a, **kw)
     class AmqpClient(object):
         def __init__(self, **kw):
    
  • @@ -20,7 +20,7 @@ def __init__(self, config, maxsize=10):
             super(MysqlConnectionPool, self).__init__(maxsize)
             self.config = config
    -    def connection(self):
    +    def new_connection(self):
             c = self.config
             con = umysql.Connection()
             con.connect(c['host'], c['port'], c['username'], c['password'], c['database'])
    @@ -35,13 +35,10 @@ def __init__(self, **kw):
         def query(self, sql, args=None):
             """Return the results for a query."""
    -        c = self.pool.get()
    -        try:
    +        with self.pool.connection() as c:
                 if args:
                     return c.query(sql, args)
                 return c.query(sql)
    -        finally:
    -            self.pool.put(c)
         def getone(self, sql, args=None):
             return self.query(sql, args)[0]
    
  • @@ -40,7 +40,7 @@ def argument_alias(method, args):
         if not hasattr(plugin, 'aliases'):
             return args
         for k,v in plugin.aliases.iteritems():
    -        args[k] = args[v]
    +        args[k] = args.get(v, None)
         return args
     def interval(seconds, **kw):
    
  • @@ -44,6 +44,7 @@ def decode(data):
         return json.loads(zlib.decompress(data))
     from Queue import Queue
    +import contextlib
     class ConnectionPool(object):
         """A simple connection pool which uses a queue to limit how many
    @@ -60,7 +61,7 @@ def get(self):
                 return pool.get()
             self.size += 1
             try:
    -            con = self.connection()
    +            con = self.new_connection()
             except:
                 self.size -= 1
                 raise
    @@ -69,7 +70,15 @@ def get(self):
         def put(self, con):
             self.pool.put(con)
    -    def connection(self, *a, **kw):
    +    @contextlib.contextmanager
    +    def connection(self):
    +        con = self.get()
    +        try:
    +            yield con
    +        finally:
    +            self.put(con)
    +
    +    def new_connection(self, *a, **kw):
             raise NotImplementedError
    

g commit 68cd62ab to arachne

April 18, 2012, 2:06 p.m.

implement a simple connection pool and use that to limit the number of concurrent mysql connections

  • @@ -7,35 +7,41 @@
     import umysql
     import gevent
    +from arachne.utils import ConnectionPool
     from arachne.conf import settings, merge, require
     defaults = {
         "port": 3306,
     }
    -class Mysql(object):
    -    def __init__(self, **kw):
    -        config = merge(defaults, settings.like("mysql"), kw)
    -        require(self, config, ("host", "password", "username", "database"))
    +class MysqlConnectionPool(ConnectionPool):
    +    def __init__(self, config, maxsize=10):
    +        maxsize = int(config.get("poolsize", maxsize))
    +        super(MysqlConnectionPool, self).__init__(maxsize)
             self.config = config
    -        self.pool = {}
    -    def client(self):
    -        current = gevent.getcurrent()
    -        if current in self.pool:
    -            return self.pool[current]
    +    def connection(self):
             c = self.config
             con = umysql.Connection()
             con.connect(c['host'], c['port'], c['username'], c['password'], c['database'])
    -        self.pool[current] = con
             return con
    +class Mysql(object):
    +    def __init__(self, **kw):
    +        config = merge(defaults, settings.like("mysql"), kw)
    +        require(self, config, ("host", "password", "username", "database"))
    +        self.config = config
    +        self.pool = MysqlConnectionPool(config)
    +
         def query(self, sql, args=None):
             """Return the results for a query."""
    -        c = self.client()
    -        if args:
    -            return c.query(sql, args)
    -        return c.query(sql)
    +        c = self.pool.get()
    +        try:
    +            if args:
    +                return c.query(sql, args)
    +            return c.query(sql)
    +        finally:
    +            self.pool.put(c)
         def getone(self, sql, args=None):
             return self.query(sql, args)[0]
    
  • @@ -43,6 +43,36 @@ def decode(data):
         """Decode data coming out of storage."""
         return json.loads(zlib.decompress(data))
    +from Queue import Queue
    +
    +class ConnectionPool(object):
    +    """A simple connection pool which uses a queue to limit how many
    +    connections to a single resource are made.  Override the `connection`
    +    method to make new connections to your resource."""
    +    def __init__(self, maxsize=10):
    +        self.maxsize = maxsize
    +        self.pool = Queue()
    +        self.size = 0
    +
    +    def get(self):
    +        pool = self.pool
    +        if self.size >= self.maxsize or pool.qsize():
    +            return pool.get()
    +        self.size += 1
    +        try:
    +            con = self.connection()
    +        except:
    +            self.size -= 1
    +            raise
    +        return con
    +
    +    def put(self, con):
    +        self.pool.put(con)
    +
    +    def connection(self, *a, **kw):
    +        raise NotImplementedError
    +
    +
     from heapq import heappush, heappop, heapify, heapreplace
     class Heap(object):
    

t Re-reading Tanenbaum & Steen's Distributed Systems; embarrassing, the old wheels we in the web community proudly show off as new discovery

g commit 10728993 to arachne

April 18, 2012, 12:15 a.m.

fix queue.publish to publish on exchanges, not queues

  • @@ -87,8 +87,8 @@ def status(self, cached=True):
             return dict(name=status[0], messages=status[1], consumers=status[2])
         @autoreconnect
    -    def publish(self, message, queue=None):
    -        self.channel.basic_publish(amqp.Message(message), queue or self.queue)
    +    def publish(self, message, exchange=None):
    +        self.channel.basic_publish(amqp.Message(message), exchange or self.exchange)
         @autoreconnect
         def get(self, queue=None):