Angelo Gladding

dlv5vbq7lzlthol5 4b942a3185b37d00

Home CodecanopyFiles

canopy job queue

`JobQueue` spawns a pool of workers (default `20`) in a
`gevent.queue.PriorityQueue`. Pickled job inputs are routed through a
list at key `queue` in the environment's Redis database

The following example enqueues a request to a resource at Alice's site.

    > canopy.enqueue(, "")

The idea is to have all tasks piped through this queue, no matter how
trivial. Web responses should be instantaneous. Background tasks can be
trivially restarted, reprioritized, canceled, etc.


# TODO make use of priority levels
# TODO log back to kv

from gevent import monkey

# import inspect
import pickle  # noqa
import sys  # noqa
import time  # noqa

import gevent.queue  # noqa
import term  # noqa
import web  # noqa

import canopy  # noqa

__all__ = ["work", "handle", "JobQueue"]

main = term.application("canopy-bot", "Canopy bot")
queue = gevent.queue.PriorityQueue()
worker_count = 20

def work(browser):
    while True:
        priority, job = queue.get()
        handle(job, browser)

def handle(job, browser):
    # TODO handle exceptions; offer retries=n
    host, function, args, kwargs = pickle.loads(job)
          *(args + tuple(f"{k}={v}" for k, v in kwargs.items())), sep="\n  ") = canopy.global_kv["hosts"][host] = host
    canopy.tx.db = canopy.util.get_db()
    canopy.tx.kv = canopy.util.get_kv()
    canopy.tx.browser = browser

    job_id = web.nbencode(canopy.tx.kv["jobs", "count"].incr())
    metadata = {"function": function, "args": args, "kwargs": kwargs,
                "start": time.time()}
    canopy.tx.kv["jobs"][job_id] = pickle.dumps(metadata)

    canopy.tx.kv["jobs", "active"].append(job_id)
    function(*args, **kwargs)
    canopy.tx.kv["jobs", "active"].remove(job_id)

    metadata["end"] = time.time()
    canopy.tx.kv["jobs"][job_id] = pickle.dumps(metadata)
    canopy.tx.kv["jobs", "completed"].append(job_id)

class JobQueue:

    def run(self, stdin, log):
        browser = web.browser()
        # TODO capture supervisor's kill signal and make sure to quit browser
        for _ in range(worker_count):
            gevent.spawn(work, browser)
        for job in canopy.global_kv["jobqueue"].keep_popping():
            queue.put((1, job))

if __name__ == "__main__":