import logging import signal from common.middleware.middleware_rabbitmq import RabbitMQTopicExchange from common.protocol.internal import InternalSerializer from common.worker.worker import Worker from config import Config USD_CURRENCY = "usd" NO_USD_CURRENCY = "nousd" PERIOD_1 = "period1" PERIOD_2 = "period2" class DateFilter(Worker): def __init__(self, config: Config): super().__init__() self.config = config self._input_exchange = RabbitMQTopicExchange( host=config.rabbitmq_host, exchange_name=config.input_exchange, route_keys=[config.input_routing_key], ) self._output_exchange = RabbitMQTopicExchange( host=config.rabbitmq_host, exchange_name=config.output_exchange, route_keys=[], ) signal.signal(signal.SIGTERM, self._handle_sigterm) def _get_currency_period_classification(self, transaction) -> str | None: transaction_timestamp = transaction.timestamp transaction_currency_type = ( USD_CURRENCY if transaction.currency.lower() == USD_CURRENCY else NO_USD_CURRENCY ) if self.config.date_from_1 <= transaction_timestamp <= self.config.date_to_1: return f"{transaction_currency_type}.{PERIOD_1}" if self.config.date_from_2 <= transaction_timestamp <= self.config.date_to_2: return f"{transaction_currency_type}.{PERIOD_2}" return None def handle_message(self, message: bytes, ack: callable, nack: callable) -> None: try: transaction = InternalSerializer.deserialize(message) routing_key = self._get_currency_period_classification(transaction) if routing_key is None: return self._output_exchange.send( routing_key=routing_key, body=message, ) ack() except Exception as e: logging.error("Error handling message: %s", e) nack() raise e def run(self) -> None: logging.info("Running DateFilter") self._input_exchange.start_consuming(self.handle_message) def shutdown(self) -> None: logging.info("Stopping DateFilter") self._input_exchange.stop_consuming() self._input_exchange.close() self._output_exchange.close() def _handle_sigterm(self, _signum: int, _frame: object) -> None: logging.info("Received SIGTERM, shutting down DateFilter") self.shutdown() def main(): logging.basicConfig( level=logging.INFO, format="%(asctime)s [DateFilter] %(levelname)s %(message)s", ) config = Config() date_filter = DateFilter(config) try: date_filter.run() except Exception as e: logging.error("Error running DateFilter: %s", e) finally: date_filter.shutdown() if __name__ == "__main__": main()