import logging
import signal
from common.middleware.middleware_rabbitmq import RabbitMQTopicExchange
from common.protocol.internal import InternalSerializer
from common.worker.worker import Worker
from config import Config
USD_CURRENCY = "usd"
NO_USD_CURRENCY = "nousd"
PERIOD_1 = "period1"
PERIOD_2 = "period2"
class DateFilter(Worker):
def __init__(self, config: Config):
super().__init__()
self.config = config
self._input_exchange = RabbitMQTopicExchange(
host=config.rabbitmq_host,
exchange_name=config.input_exchange,
route_keys=[config.input_routing_key],
)
self._output_exchange = RabbitMQTopicExchange(
host=config.rabbitmq_host,
exchange_name=config.output_exchange,
route_keys=[],
)
signal.signal(signal.SIGTERM, self._handle_sigterm)
def _get_currency_period_classification(self, transaction) -> str | None:
transaction_timestamp = transaction.timestamp
transaction_currency_type = (
USD_CURRENCY
if transaction.currency.lower() == USD_CURRENCY
else NO_USD_CURRENCY
)
if self.config.date_from_1 <= transaction_timestamp <= self.config.date_to_1:
return f"{transaction_currency_type}.{PERIOD_1}"
if self.config.date_from_2 <= transaction_timestamp <= self.config.date_to_2:
return f"{transaction_currency_type}.{PERIOD_2}"
return None
def handle_message(self, message: bytes, ack: callable, nack: callable) -> None:
try:
transaction = InternalSerializer.deserialize(message)
routing_key = self._get_currency_period_classification(transaction)
if routing_key is None:
return
self._output_exchange.send(
routing_key=routing_key,
body=message,
)
ack()
except Exception as e:
logging.error("Error handling message: %s", e)
nack()
raise e
def run(self) -> None:
logging.info("Running DateFilter")
self._input_exchange.start_consuming(self.handle_message)
def shutdown(self) -> None:
logging.info("Stopping DateFilter")
self._input_exchange.stop_consuming()
self._input_exchange.close()
self._output_exchange.close()
def _handle_sigterm(self, _signum: int, _frame: object) -> None:
logging.info("Received SIGTERM, shutting down DateFilter")
self.shutdown()
def main():
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [DateFilter] %(levelname)s %(message)s",
)
config = Config()
date_filter = DateFilter(config)
try:
date_filter.run()
except Exception as e:
logging.error("Error running DateFilter: %s", e)
finally:
date_filter.shutdown()
if __name__ == "__main__":
main()