Not a member of GistPad yet?
Sign Up,
it unlocks many cool features!
- import logging
- import signal
- from common.middleware.middleware_rabbitmq import RabbitMQTopicExchange
- from common.protocol.internal import InternalSerializer
- from common.worker.worker import Worker
- from config import Config
- USD_CURRENCY = "usd"
- NO_USD_CURRENCY = "nousd"
- PERIOD_1 = "period1"
- PERIOD_2 = "period2"
- class DateFilter(Worker):
- def __init__(self, config: Config):
- super().__init__()
- self.config = config
- self._input_exchange = RabbitMQTopicExchange(
- host=config.rabbitmq_host,
- exchange_name=config.input_exchange,
- route_keys=[config.input_routing_key],
- )
- self._output_exchange = RabbitMQTopicExchange(
- host=config.rabbitmq_host,
- exchange_name=config.output_exchange,
- route_keys=[],
- )
- signal.signal(signal.SIGTERM, self._handle_sigterm)
- def _get_currency_period_classification(self, transaction) -> str | None:
- transaction_timestamp = transaction.timestamp
- transaction_currency_type = (
- USD_CURRENCY
- if transaction.currency.lower() == USD_CURRENCY
- else NO_USD_CURRENCY
- )
- if self.config.date_from_1 <= transaction_timestamp <= self.config.date_to_1:
- return f"{transaction_currency_type}.{PERIOD_1}"
- if self.config.date_from_2 <= transaction_timestamp <= self.config.date_to_2:
- return f"{transaction_currency_type}.{PERIOD_2}"
- return None
- def handle_message(self, message: bytes, ack: callable, nack: callable) -> None:
- try:
- transaction = InternalSerializer.deserialize(message)
- routing_key = self._get_currency_period_classification(transaction)
- if routing_key is None:
- return
- self._output_exchange.send(
- routing_key=routing_key,
- body=message,
- )
- ack()
- except Exception as e:
- logging.error("Error handling message: %s", e)
- nack()
- raise e
- def run(self) -> None:
- logging.info("Running DateFilter")
- self._input_exchange.start_consuming(self.handle_message)
- def shutdown(self) -> None:
- logging.info("Stopping DateFilter")
- self._input_exchange.stop_consuming()
- self._input_exchange.close()
- self._output_exchange.close()
- def _handle_sigterm(self, _signum: int, _frame: object) -> None:
- logging.info("Received SIGTERM, shutting down DateFilter")
- self.shutdown()
- def main():
- logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s [DateFilter] %(levelname)s %(message)s",
- )
- config = Config()
- date_filter = DateFilter(config)
- try:
- date_filter.run()
- except Exception as e:
- logging.error("Error running DateFilter: %s", e)
- finally:
- date_filter.shutdown()
- if __name__ == "__main__":
- main()
RAW Paste Data
Copied
