Adding a custom filter to Airflow logs

Recently I tried to add a custom logger for our Airflow deployment in kubernetes. Since we allow our logs to be shown to our customers we cannot have our secrets and other kubernetes config be shown to the users.

On pod failure we get a dump of the entire pod config and that is what we want to remove today

Random pod configs that also expose our secrets at random intervals

Note: Our logs are sent to S3 and remote logging is enabled for our version of airflow 2.5.0, so whatever filter we write we need to add it to the S3 handler as well. If you are not using that then there is no need for you to add it to your system.

Logging filters

In short logging filters take a log message and based on the conditions we have supplied it will tell weather or not this should be logged

def filter(self, record):
    if "word" in record.getMessage():
        return False
    return True

The above function ensure that any log record that has a word “word” in it is omitted by the loggers.

Now let us write a filter class that omits any log that comes from taskinstance.py,standard_task_runner.py

import logging
class CustomFilter(logging.Filter):
    # filter out all log records that have taskinstance.py in them
    def __init__(self):
        super().__init__()

    def filter(self, record):
        files_to_filter = ["taskinstance.py","standard_task_runner.py"]
        return record.filename not in files_to_filter

The things available to you to use in record are –

  1. args
  2. levelname
  3. levelno
  4. pathname
  5. exc_info
  6. exc_text
  7. funcName
  8. created
  9. logThreads
  10. processName
  11. Message

So you can filter out the logs using any of the above record properties

Let us add the above CustomFilter class to Airflow

from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
from copy import deepcopy
LOGGING_CONFIG = deepcopy(DEFAULT_LOGGING_CONFIG)

LOGGING_CONFIG["filters"]["custom_filter"] = {
          "()": CustomFilter,
      }
if "filters" in LOGGING_CONFIG["handlers"]["task"]:
      LOGGING_CONFIG["handlers"]["task"]["filters"].append("custom_filter")
  else:
      LOGGING_CONFIG["handlers"]["task"]["filters"] = ["custom_filter"]
#since I only want it for tasks i am editing only this. But if you need to add it to other aspects of airflow feel free to do so.
airflow.logging_config.dictConfig(LOGGING_CONFIG)

I had some issues with deploying this. Mainly the issue was ModuleNotFoundError: No module named ‘config. The simplest way I could fix it was to just replace the actual local settings file with my custom one while creating the docker image

So my docker file has the following lines –

FROM apache/airflow:2.5.0-python3.10
COPY ./config/airflow_local_settings.py /home/airflow/.local/lib/python3.10/site-packages/airflow/config_templates

And you can see the changes I made to the airflow_local_settings.py file here – https://gist.github.com/PandaWhoCodes/52ab5ffb93d881ee90113f4eb0e23b5d/revisionshttps://gist.github.com/PandaWhoCodes/52ab5ffb93d881ee90113f4eb0e23b5d/revisions

Leave a reply:

Your email address will not be published.

Site Footer

Sliding Sidebar

About Me

About Me

Hey, I am Thomas Ashish Cherian. What I cannot create, I do not understand.

Social Profiles