AG

Angelo Gladding
lahacker.net

dlv5vbq7lzlthol5 4b942a3185b37d00

South Pasadena, California, United States currently feels like 47.15°F

Home CodecanopyFiles

canopy_bot.py

"""
canopy job queue

`JobQueue` spawns a pool of workers (default `20`) in a
`gevent.queue.PriorityQueue`. JSON encoded job inputs are routed through a
list at key `queue` in the environment's Redis database
(`KVDB=./path/to/redis.sock`).

The following example enqueues a request to a resource at Alice's site.

    > canopy.enqueue(canopy.post, "https://alice.example.org")

The idea is to have all tasks piped through this queue, no matter how
trivial. Web responses should be instantaneous. Background tasks can be
trivially restarted, reprioritized, canceled, etc.

"""

# TODO make use of priority levels
# TODO log back to kv

from gevent import monkey
monkey.patch_all()

import importlib  # noqa
import json  # noqa
import sys  # noqa
import time  # noqa

import gevent.queue  # noqa
import pendulum  # noqa
import term  # noqa
import web  # noqa

import canopy  # noqa


__all__ = ["work", "handle", "JobQueue"]


main = term.application("canopy-bot", "Canopy bot")
queue = gevent.queue.PriorityQueue()
worker_count = 20


def schedule(browser):
    """
    check all schedules every minute and enqueue any upcoming jobs

    """
    # TODO support for hours, days of month, months, days of week
    hosts = ["lahacker.net"]  # TODO iterate over all trees
    while True:
        while True:  # wait for the start of a new minute
            now = pendulum.now()
            if now.second == 0:
                break
            time.sleep(1)
        for host in hosts:
            canopy.contextualize(host)
            canopy.tx.browser = browser
            jobs = canopy.tx.db.select("job_schedules AS sch",
                                       join="""job_signatures AS sig ON
                                               sch.job_signature_id =
                                                 sig.rowid""")
            for job in jobs:
                run = False
                minute = job["minute"]
                if minute == "*" or (minute[:2] == "*/" and
                                     now.minute % int(minute[2]) == 0):
                    run = True
                if run:
                    callable = getattr(importlib.import_module(job["module"]),
                                       job["object"])
                    canopy.enqueue(callable)
        time.sleep(1)


def work(browser):
    while True:
        priority, job = queue.get()
        handle(job, browser)


# TODO handle exceptions; retries=n
# TODO use JSON in place of pickling and normalize URIs into plain strings
def handle(job_identifier, browser):
    """

    """
    host, _, job_run_id = job_identifier.partition(":")
    canopy.contextualize(host)
    canopy.tx.browser = browser
    job = canopy.tx.db.select("job_runs AS jr", what="j.rowid, *",
                              join="""job_signatures AS j
                                      ON j.rowid = jr.job_signature_id""",
                              where="jr.id = ?", vals=[job_run_id])[0]
    _module = job["module"]
    _object = job["object"]
    _args = json.loads(job["args"])
    _kwargs = json.loads(job["kwargs"])
    print(f"{host}/{_module}:{_object}",
          *(_args + list(f"{k}={v}" for k, v in _kwargs.items())), sep="\n  ")
    sys.stdout.flush()
    canopy.tx.db.update("job_runs",
                        what="started = STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')",
                        where="id = ?", vals=[job_run_id])
    callable = getattr(importlib.import_module(_module), _object)
    try:
        status = 0
        output = callable(*_args, **_kwargs)
    except Exception as err:
        status = 1
        output = err.message
    canopy.tx.db.update("job_runs",
                        what="""finished = STRFTIME('%Y-%m-%d %H:%M:%f',
                                                    'NOW'),
                                status = ?, output = ?""",
                        where="id = ?", vals=[status, output, job_run_id])
    run = canopy.tx.db.select("job_runs", where="id = ?", vals=[job_run_id])[0]
    st = run["started"] - run["created"]
    start_time = f"{st.seconds}.{st.microseconds}"
    rt = run["finished"] - run["started"]
    run_time = f"{rt.seconds}.{rt.microseconds}"
    canopy.tx.db.update("job_runs", what="start_time = ?, run_time = ?",
                        where="id = ?", vals=[start_time, run_time,
                                              job_run_id])


@main.register()
class JobQueue:

    def run(self, stdin, log):
        browser = web.browser()
        # TODO capture supervisor's kill signal and make sure to quit browser
        gevent.spawn(schedule, browser)
        for _ in range(worker_count):
            gevent.spawn(work, browser)
        for job in canopy.global_kv["jobqueue"].keep_popping():
            queue.put((1, job))  # TODO utilize priorities!
        browser.quit()


if __name__ == "__main__":
    main()