Angelo Gladding
lahacker.net

dlv5vbq7lzlthol5 4b942a3185b37d00

angelo@lahacker.net

South Pasadena, California, United States currently feels like 68.15°F

Home CodecanopyFiles

canopy_bot.py

Download raw file

"""
canopy job queue

`JobQueue` spawns a pool of workers in a `gevent.queue.PriorityQueue`.
JSON encoded job inputs are routed through a list at key `jobqueue` in
the environment's Redis database (eg. `KVDB=./path/to/redis.sock`).

    >>> # enqueues a job to fetch a resource at Alice's site root
    >>> canopy.enqueue(canopy.get, "https://alice.example.org")
    >>> # sets a schedule to enqueue the same job every three minutes
    >>> canopy.enqueue(canopy.get, "https://alice.example.org",
    ...                _schedule="*/3 * * * *")

The idea is to have all outgoing tasks piped through this queue, no matter
how trivial. Web responses should be instantaneous. Background tasks can be
trivially restarted, reprioritized, canceled, etc.

"""

# TODO log back to kv

from gevent import monkey
monkey.patch_all()

from importlib import import_module  # noqa
import json  # noqa
import sys  # noqa
import time  # noqa

import gevent.queue  # noqa
import pendulum  # noqa
import term  # noqa
import web  # noqa

import canopy  # noqa
from canopy.util import update_inbox, IMAPClient  # noqa


main = term.application("canopy-bot", "Canopy bot")
queue = gevent.queue.PriorityQueue()
worker_count = 20


def idle_imap(host):
    """
    connect to email provider's IMAP server and IDLE waiting for new mail

    """
    tx = canopy.contextualize(host)
    provider = tx.kv["providers:email"]
    client = IMAPClient(provider["imap_host"], provider["imap_username"],
                        provider["imap_password"]).imap
    try:
        client.select("INBOX", readonly=True)
        client.send(f"{client._new_tag()} IDLE\r\n".encode("utf-8"))
        while True:
            line = client.readline().strip()
            if line.startswith(b"* BYE ") or len(line) == 0:
                break  # connection timed out
            if line.endswith(b"EXISTS"):
                update_inbox()
    finally:
        try:
            client.close()
        except Exception:
            pass
        client.logout()


def schedule_jobs(browser):
    """
    check all schedules every minute and enqueue any scheduled jobs

    """
    # TODO support for days of month, days of week
    while True:  # wait for the start of a new minute
        now = pendulum.now()
        if now.second == 0:
            break
        time.sleep(1)
    hosts = ["lahacker.net"]  # TODO iterate over all trees
    for host in hosts:
        tx = canopy.contextualize(host)
        jobs = tx.db.select("job_schedules AS sch",
                            join="""job_signatures AS sig ON
                                    sch.job_signature_id = sig.rowid""")
        for job in jobs:
            run = True
            minute = job["minute"]
            hour = job["hour"]
            month = job["month"]
            if minute[:2] == "*/":
                if now.minute % int(minute[2]) == 0:
                    run = True
                else:
                    run = False
            if hour[:2] == "*/":
                if now.hour % int(hour[2]) == 0 and now.minute == 0:
                    run = True
                else:
                    run = False
            if month[:2] == "*/":
                if now.month % int(month[2]) == 0 and now.hour == 0 \
                   and now.minute == 0:
                    run = True
                else:
                    run = False
            if run:
                canopy.enqueue(getattr(import_module(job["module"]),
                                       job["object"]))
    time.sleep(1)


def handle_job(job_identifier, browser):
    """
    handle a freshly dequeued job

    """
    # TODO handle retries
    host, _, job_run_id = job_identifier.partition(":")
    tx = canopy.contextualize(host)
    tx.browser = browser
    job = tx.db.select("job_runs AS r", what="s.rowid, *",
                       join="""job_signatures AS s
                               ON s.rowid = r.job_signature_id""",
                       where="r.id = ?", vals=[job_run_id])[0]
    _module = job["module"]
    _object = job["object"]
    _args = json.loads(job["args"])
    _kwargs = json.loads(job["kwargs"])
    print(f"{host}/{_module}:{_object}",
          *(_args + list(f"{k}={v}" for k, v in _kwargs.items())),
          sep="\n  ", flush=True)
    tx.db.update("job_runs", where="id = ?", vals=[job_run_id],
                 what="started = STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')")
    status = 0
    try:
        output = getattr(import_module(_module), _object)(*_args, **_kwargs)
    except Exception as err:
        status = 1
        output = str(err)
    tx.db.update("job_runs", vals=[status, output, job_run_id],
                 what="""finished = STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW'),
                         status = ?, output = ?""", where="id = ?")
    run = tx.db.select("job_runs", where="id = ?", vals=[job_run_id])[0]
    st, rt = run["started"] - run["created"], run["finished"] - run["started"]
    tx.db.update("job_runs", what="start_time = ?, run_time = ?",
                 where="id = ?", vals=[f"{st.seconds}.{st.microseconds}",
                                       f"{rt.seconds}.{rt.microseconds}",
                                       job_run_id])


def run_imap_idler(host):
    while True:
        idle_imap(host)


def run_scheduler(browser):
    while True:
        schedule_jobs(browser)


def run_worker(browser):
    while True:
        priority, job = queue.get()
        handle_job(job, browser)


@main.register()
class JobQueue:

    """manage the job queue"""

    def run(self, stdin, log):
        # TODO capture supervisor's kill signal and make sure to quit browser
        for host in ("lahacker.net",):  # TODO all hosts w/ IMAP enabled
            gevent.spawn(run_imap_idler, host)
        browser = web.browser()
        gevent.spawn(run_scheduler, browser)
        for _ in range(worker_count):
            gevent.spawn(run_worker, browser)
        try:
            for job in canopy.global_kv["jobqueue"].keep_popping():
                queue.put((1, job))  # TODO utilize priority levels
        except KeyboardInterrupt:
            browser.quit()


if __name__ == "__main__":
    main()