AG

Angelo Gladding
lahacker.net

dlv5vbq7lzlthol5 4b942a3185b37d00

Home CodecanopyFiles

canopy_bot.py

"""
canopy job queue

`JobQueue` spawns a pool of workers (default `20`) in a
`gevent.queue.PriorityQueue`. Pickled job inputs are routed through a
list at key `queue` in the environment's Redis database
(`KVDB=./path/to/redis.sock`).

The following example enqueues a request to a resource at Alice's site.

    > canopy.enqueue(canopy.post, "https://alice.example.org")

The idea is to have all tasks piped through this queue, no matter how
trivial. Web responses should be instantaneous. Background tasks can be
trivially restarted, reprioritized, canceled, etc.

"""

# TODO make use of priority levels
# TODO log back to kv

from gevent import monkey
monkey.patch_all()

# import inspect
import pickle  # noqa
import sys  # noqa
import time  # noqa

import gevent.queue  # noqa
import term  # noqa
import web  # noqa

import canopy  # noqa


__all__ = ["work", "handle", "JobQueue"]


main = term.application("canopy-bot", "Canopy bot")
queue = gevent.queue.PriorityQueue()
worker_count = 20


def work(browser):
    while True:
        priority, job = queue.get()
        handle(job, browser)


def handle(job, browser):
    # TODO handle exceptions; offer retries=n
    host, function, args, kwargs = pickle.loads(job)
    print(f"{host}/{function.__module__}:{function.__name__}",
          *(args + tuple(f"{k}={v}" for k, v in kwargs.items())), sep="\n  ")
    canopy.tx.host.identity = canopy.global_kv["hosts"][host]
    canopy.tx.host.name = host
    canopy.tx.db = canopy.util.get_db()
    canopy.tx.kv = canopy.util.get_kv()
    canopy.tx.browser = browser

    job_id = web.nbencode(canopy.tx.kv["jobs", "count"].incr())
    metadata = {"function": function, "args": args, "kwargs": kwargs,
                "start": time.time()}
    canopy.tx.kv["jobs"][job_id] = pickle.dumps(metadata)

    canopy.tx.kv["jobs", "active"].append(job_id)
    function(*args, **kwargs)
    sys.stdout.flush()
    canopy.tx.kv["jobs", "active"].remove(job_id)

    metadata["end"] = time.time()
    canopy.tx.kv["jobs"][job_id] = pickle.dumps(metadata)
    canopy.tx.kv["jobs", "completed"].append(job_id)


@main.register()
class JobQueue:

    def run(self, stdin, log):
        browser = web.browser()
        # TODO capture supervisor's kill signal and make sure to quit browser
        for _ in range(worker_count):
            gevent.spawn(work, browser)
        for job in canopy.global_kv["jobqueue"].keep_popping():
            queue.put((1, job))
        browser.quit()


if __name__ == "__main__":
    main()