import datetime import logging import threading import time from .runner import Runner from .utils.db import Mongo logger = logging.getLogger(__name__) class Producer(threading.Thread): def __init__(self, queue): self.queue = queue self.load_users(need_initialize=True) super(Producer, self).__init__() def load_users(self, need_initialize=False) -> None: if need_initialize: self.users = {i['_id'] for i in Mongo.db.user.find({})} self.updated_at = datetime.datetime.utcnow() else: new_users = {i['_id'] for i in Mongo.db.user.find({ 'createdAt': { '$gt': self.updated_at } })} if new_users: logger.info('Loaded users: %s', new_users) self.users.update(new_users) self.updated_at = datetime.datetime.utcnow() def run(self): while True: try: for i in self.users: self.queue.put(i) self.load_users() logger.info('Got all users') except Exception: logger.exception('Producer Error') time.sleep(180) class Consumer(threading.Thread): def __init__(self, queue): self.queue = queue super(Consumer, self).__init__() def run(self): while True: try: _id = self.queue.get() runner = Runner(_id) logger.info('Got User [%s], starting Runner!', _id) runner.run() except Exception: logger.exception('Consumer Error') time.sleep(5)