Guest

DateFilter

May 12th, 2026
4
0
Never
Not a member of GistPad yet? Sign Up, it unlocks many cool features!
Python 14.51 KB | Software | 0 0
  1. import logging
  2. import signal
  3.  
  4. from common.middleware.middleware_rabbitmq import RabbitMQTopicExchange
  5. from common.protocol.internal import InternalSerializer
  6. from common.worker.worker import Worker
  7. from config import Config
  8.  
  9. USD_CURRENCY = "usd"
  10. NO_USD_CURRENCY = "nousd"
  11. PERIOD_1 = "period1"
  12. PERIOD_2 = "period2"
  13.  
  14.  
  15. class DateFilter(Worker):
  16. def __init__(self, config: Config):
  17. super().__init__()
  18. self.config = config
  19.  
  20. self._input_exchange = RabbitMQTopicExchange(
  21. host=config.rabbitmq_host,
  22. exchange_name=config.input_exchange,
  23. route_keys=[config.input_routing_key],
  24. )
  25. self._output_exchange = RabbitMQTopicExchange(
  26. host=config.rabbitmq_host,
  27. exchange_name=config.output_exchange,
  28. route_keys=[],
  29. )
  30.  
  31. signal.signal(signal.SIGTERM, self._handle_sigterm)
  32.  
  33. def _get_currency_period_classification(self, transaction) -> str | None:
  34. transaction_timestamp = transaction.timestamp
  35. transaction_currency_type = (
  36. USD_CURRENCY
  37. if transaction.currency.lower() == USD_CURRENCY
  38. else NO_USD_CURRENCY
  39. )
  40.  
  41. if self.config.date_from_1 <= transaction_timestamp <= self.config.date_to_1:
  42. return f"{transaction_currency_type}.{PERIOD_1}"
  43. if self.config.date_from_2 <= transaction_timestamp <= self.config.date_to_2:
  44. return f"{transaction_currency_type}.{PERIOD_2}"
  45. return None
  46.  
  47. def handle_message(self, message: bytes, ack: callable, nack: callable) -> None:
  48. try:
  49. transaction = InternalSerializer.deserialize(message)
  50. routing_key = self._get_currency_period_classification(transaction)
  51. if routing_key is None:
  52. return
  53. self._output_exchange.send(
  54. routing_key=routing_key,
  55. body=message,
  56. )
  57. ack()
  58. except Exception as e:
  59. logging.error("Error handling message: %s", e)
  60. nack()
  61. raise e
  62.  
  63. def run(self) -> None:
  64. logging.info("Running DateFilter")
  65. self._input_exchange.start_consuming(self.handle_message)
  66.  
  67. def shutdown(self) -> None:
  68. logging.info("Stopping DateFilter")
  69. self._input_exchange.stop_consuming()
  70. self._input_exchange.close()
  71. self._output_exchange.close()
  72.  
  73. def _handle_sigterm(self, _signum: int, _frame: object) -> None:
  74. logging.info("Received SIGTERM, shutting down DateFilter")
  75. self.shutdown()
  76.  
  77.  
  78. def main():
  79. logging.basicConfig(
  80. level=logging.INFO,
  81. format="%(asctime)s [DateFilter] %(levelname)s %(message)s",
  82. )
  83. config = Config()
  84.  
  85. date_filter = DateFilter(config)
  86.  
  87. try:
  88. date_filter.run()
  89. except Exception as e:
  90. logging.error("Error running DateFilter: %s", e)
  91. finally:
  92. date_filter.shutdown()
  93.  
  94.  
  95. if __name__ == "__main__":
  96. main()
RAW Paste Data Copied