producer.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. import datetime
  2. import logging
  3. import threading
  4. import time
  5. from .runner import Runner
  6. from .utils.db import Mongo
  7. logger = logging.getLogger(__name__)
  8. class Producer(threading.Thread):
  9. def __init__(self, queue):
  10. self.queue = queue
  11. self.load_users(need_initialize=True)
  12. super(Producer, self).__init__()
  13. def load_users(self, need_initialize=False) -> None:
  14. if need_initialize:
  15. self.users = {i['_id'] for i in Mongo.db.user.find({})}
  16. self.updated_at = datetime.datetime.utcnow()
  17. else:
  18. new_users = {i['_id'] for i in Mongo.db.user.find({
  19. 'createdAt': {
  20. '$gt': self.updated_at
  21. }
  22. })}
  23. if new_users:
  24. logger.info('Loaded users: %s', new_users)
  25. self.users.update(new_users)
  26. self.updated_at = datetime.datetime.utcnow()
  27. def run(self):
  28. while True:
  29. try:
  30. for i in self.users:
  31. self.queue.put(i)
  32. self.load_users()
  33. logger.info('Got all users')
  34. except Exception:
  35. logger.exception('Producer Error')
  36. time.sleep(180)
  37. class Consumer(threading.Thread):
  38. def __init__(self, queue):
  39. self.queue = queue
  40. super(Consumer, self).__init__()
  41. def run(self):
  42. while True:
  43. try:
  44. _id = self.queue.get()
  45. runner = Runner(_id)
  46. logger.info('Got User [%s], starting Runner!', _id)
  47. runner.run()
  48. except Exception:
  49. logger.exception('Consumer Error')
  50. time.sleep(5)