| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859 |
- import datetime
- import logging
- import threading
- import time
- from .runner import Runner
- from .utils.db import Mongo
- logger = logging.getLogger(__name__)
- class Producer(threading.Thread):
- def __init__(self, queue):
- self.queue = queue
- self.load_users(need_initialize=True)
- super(Producer, self).__init__()
- def load_users(self, need_initialize=False) -> None:
- if need_initialize:
- self.users = {i['_id'] for i in Mongo.db.user.find({})}
- self.updated_at = datetime.datetime.utcnow()
- else:
- new_users = {i['_id'] for i in Mongo.db.user.find({
- 'createdAt': {
- '$gt': self.updated_at
- }
- })}
- if new_users:
- logger.info('Loaded users: %s', new_users)
- self.users.update(new_users)
- self.updated_at = datetime.datetime.utcnow()
- def run(self):
- while True:
- try:
- for i in self.users:
- self.queue.put(i)
- self.load_users()
- logger.info('Got all users')
- except Exception:
- logger.exception('Producer Error')
- time.sleep(180)
- class Consumer(threading.Thread):
- def __init__(self, queue):
- self.queue = queue
- super(Consumer, self).__init__()
- def run(self):
- while True:
- try:
- _id = self.queue.get()
- runner = Runner(_id)
- logger.info('Got User [%s], starting Runner!', _id)
- runner.run()
- except Exception:
- logger.exception('Consumer Error')
- time.sleep(5)
|