mcp server apache airflow

Local 2025-09-01 00:09:49 0

Provides a standardized way for MCP clients to interact with Apache Airflow's REST API, supporting operations like DAG management and monitoring Airflow system health.


smithery badge

A Model Context Protocol (MCP) server implementation for Apache Airflow, enabling seamless integration with MCP clients. This project provides a standardized way to interact with Apache Airflow through the Model Context Protocol.

Server for Apache Airflow MCP server

About

This project implements a Model Context Protocol server that wraps Apache Airflow's REST API, allowing MCP clients to interact with Airflow in a standardized way. It uses the official Apache Airflow client library to ensure compatibility and maintainability.

Feature Implementation Status

Feature API Path Status
DAG Management
List DAGs /api/v1/dags
Get DAG Details /api/v1/dags/{dag_id}
Pause DAG /api/v1/dags/{dag_id}
Unpause DAG /api/v1/dags/{dag_id}
Update DAG /api/v1/dags/{dag_id}
Delete DAG /api/v1/dags/{dag_id}
Get DAG Source /api/v1/dagSources/{file_token}
Patch Multiple DAGs /api/v1/dags
Reparse DAG File /api/v1/dagSources/{file_token}/reparse
DAG Runs
List DAG Runs /api/v1/dags/{dag_id}/dagRuns
Create DAG Run /api/v1/dags/{dag_id}/dagRuns
Get DAG Run Details /api/v1/dags/{dag_id}/dagRuns/{dag_run_id}
Update DAG Run /api/v1/dags/{dag_id}/dagRuns/{dag_run_id}
Delete DAG Run /api/v1/dags/{dag_id}/dagRuns/{dag_run_id}
Get DAG Runs Batch /api/v1/dags/~/dagRuns/list
Clear DAG Run /api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/clear
Set DAG Run Note /api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/setNote
Get Upstream Dataset Events /api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/upstreamDatasetEvents
Tasks
List DAG Tasks /api/v1/dags/{dag_id}/tasks
Get Task Details /api/v1/dags/{dag_id}/tasks/{task_id}
Get Task Instance /api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}
List Task Instances /api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances
Update Task Instance /api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}
Clear Task Instances /api/v1/dags/{dag_id}/clearTaskInstances
Set Task Instances State /api/v1/dags/{dag_id}/updateTaskInstancesState
Variables
List Variables /api/v1/variables
Create Variable /api/v1/variables
Get Variable /api/v1/variables/{variable_key}
Update Variable /api/v1/variables/{variable_key}
Delete Variable /api/v1/variables/{variable_key}
Connections
List Connections /api/v1/connections
Create Connection /api/v1/connections
Get Connection /api/v1/connections/{connection_id}
Update Connection /api/v1/connections/{connection_id}
Delete Connection /api/v1/connections/{connection_id}
Test Connection /api/v1/connections/test
Pools
List Pools /api/v1/pools
Create Pool /api/v1/pools
Get Pool /api/v1/pools/{pool_name}
Update Pool /api/v1/pools/{pool_name}
Delete Pool /api/v1/pools/{pool_name}
XComs
List XComs /api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries
Get XCom Entry /api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}
Datasets
List Datasets /api/v1/datasets
Get Dataset /api/v1/datasets/{uri}
Get Dataset Events /api/v1/datasetEvents
Create Dataset Event /api/v1/datasetEvents
Get DAG Dataset Queued Event /api/v1/dags/{dag_id}/dagRuns/queued/datasetEvents/{uri}
Get DAG Dataset Queued Events /api/v1/dags/{dag_id}/dagRuns/queued/datasetEvents
Delete DAG Dataset Queued Event /api/v1/dags/{dag_id}/dagRuns/queued/datasetEvents/{uri}
Delete DAG Dataset Queued Events /api/v1/dags/{dag_id}/dagRuns/queued/datasetEvents
Get Dataset Queued Events /api/v1/datasets/{uri}/dagRuns/queued/datasetEvents
Delete Dataset Queued Events /api/v1/datasets/{uri}/dagRuns/queued/datasetEvents
Monitoring
Get Health /api/v1/health
DAG Stats
Get DAG Stats /api/v1/dags/statistics
Config
Get Config /api/v1/config
Plugins
Get Plugins /api/v1/plugins
Providers
List Providers /api/v1/providers
Event Logs
List Event Logs /api/v1/eventLogs
Get Event Log /api/v1/eventLogs/{event_log_id}
System
Get Import Errors /api/v1/importErrors
Get Import Error Details /api/v1/importErrors/{import_error_id}
Get Health Status /api/v1/health
Get Version /api/v1/version

Setup

Dependencies

This project depends on the official Apache Airflow client library (apache-airflow-client). It will be automatically installed when you install this package.

Environment Variables

Set the following environment variables:

AIRFLOW_HOST=<your-airflow-host>
AIRFLOW_USERNAME=<your-airflow-username>
AIRFLOW_PASSWORD=<your-airflow-password>

Usage with Claude Desktop

Add to your claude_desktop_config.json:

{
  "mcpServers": {
    "mcp-server-apache-airflow": {
      "command": "uvx",
      "args": ["mcp-server-apache-airflow"],
      "env": {
        "AIRFLOW_HOST": "https://your-airflow-host",
        "AIRFLOW_USERNAME": "your-username",
        "AIRFLOW_PASSWORD": "your-password"
      }
    }
  }
}

Alternative configuration using uv:

{
  "mcpServers": {
    "mcp-server-apache-airflow": {
      "command": "uv",
      "args": [
        "--directory",
        "/path/to/mcp-server-apache-airflow",
        "run",
        "mcp-server-apache-airflow"
      ],
      "env": {
        "AIRFLOW_HOST": "https://your-airflow-host",
        "AIRFLOW_USERNAME": "your-username",
        "AIRFLOW_PASSWORD": "your-password"
      }
    }
  }
}

Replace /path/to/mcp-server-apache-airflow with the actual path where you've cloned the repository.

Selecting the API groups

You can select the API groups you want to use by setting the --apis flag.

uv run mcp-server-apache-airflow --apis "dag,dagrun"

The default is to use all APIs.

Allowed values are:

  • config
  • connections
  • dag
  • dagrun
  • dagstats
  • dataset
  • eventlog
  • importerror
  • monitoring
  • plugin
  • pool
  • provider
  • taskinstance
  • variable
  • xcom

Manual Execution

You can also run the server manually:

make run

make run accepts following options:

Options: - --port: Port to listen on for SSE (default: 8000) - --transport: Transport type (stdio/sse, default: stdio)

Or, you could run the sse server directly, which accepts same parameters:

make run-sse

Installing via Smithery

To install Apache Airflow MCP Server for Claude Desktop automatically via Smithery:

npx -y @smithery/cli install @yangkyeongmo/mcp-server-apache-airflow --client claude

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

MIT License

[
  {
    "description": "Lists all DAGs in the Airflow instance",
    "inputSchema": {
      "properties": {
        "dag_id_pattern": {
          "description": "If set, only return DAGs with dag_ids matching this pattern",
          "type": "string"
        },
        "limit": {
          "description": "The numbers of items to return (default: 100)",
          "minimum": 1,
          "type": "integer"
        },
        "offset": {
          "description": "The number of items to skip before starting to collect the result set",
          "minimum": 0,
          "type": "integer"
        },
        "only_active": {
          "description": "Only filter active DAGs (default: true)",
          "type": "boolean"
        },
        "order_by": {
          "description": "The name of the field to order the results by. Prefix with - to reverse sort order",
          "type": "string"
        },
        "paused": {
          "description": "Only filter paused/unpaused DAGs. If absent, returns both",
          "type": "boolean"
        },
        "tags": {
          "description": "List of tags to filter results",
          "items": {
            "type": "string"
          },
          "type": "array"
        }
      },
      "type": "object"
    },
    "name": "list_dags"
  },
  {
    "description": "Get details of a specific DAG",
    "inputSchema": {
      "properties": {
        "dag_id": {
          "description": "The ID of the DAG to retrieve",
          "type": "string"
        }
      },
      "required": [
        "dag_id"
      ],
      "type": "object"
    },
    "name": "get_dag"
  },
  {
    "description": "Pause a DAG",
    "inputSchema": {
      "properties": {
        "dag_id": {
          "description": "The ID of the DAG to pause",
          "type": "string"
        }
      },
      "required": [
        "dag_id"
      ],
      "type": "object"
    },
    "name": "pause_dag"
  },
  {
    "description": "Unpause a DAG",
    "inputSchema": {
      "properties": {
        "dag_id": {
          "description": "The ID of the DAG to unpause",
          "type": "string"
        }
      },
      "required": [
        "dag_id"
      ],
      "type": "object"
    },
    "name": "unpause_dag"
  },
  {
    "description": "Trigger a DAG run",
    "inputSchema": {
      "properties": {
        "dag_id": {
          "description": "The ID of the DAG to trigger",
          "type": "string"
        }
      },
      "required": [
        "dag_id"
      ],
      "type": "object"
    },
    "name": "trigger_dag"
  },
  {
    "description": "Get DAG runs for a specific DAG",
    "inputSchema": {
      "properties": {
        "dag_id": {
          "description": "The ID of the DAG to retrieve DAG runs for",
          "type": "string"
        },
        "end_date_gte": {
          "description": "Returns objects greater or equal the specified date",
          "format": "date-time",
          "type": "string"
        },
        "end_date_lte": {
          "description": "Returns objects less than or equal to the specified date",
          "format": "date-time",
          "type": "string"
        },
        "execution_date_gte": {
          "description": "Returns objects greater or equal to the specified date",
          "format": "date-time",
          "type": "string"
        },
        "execution_date_lte": {
          "description": "Returns objects less than or equal to the specified date",
          "format": "date-time",
          "type": "string"
        },
        "limit": {
          "description": "The numbers of items to return (default: 100)",
          "minimum": 1,
          "type": "integer"
        },
        "offset": {
          "description": "The number of items to skip before starting to collect the result set",
          "minimum": 0,
          "type": "integer"
        },
        "order_by": {
          "description": "The name of the field to order the results by. Prefix with - to reverse sort order",
          "type": "string"
        },
        "start_date_gte": {
          "description": "Returns objects greater or equal the specified date",
          "format": "date-time",
          "type": "string"
        },
        "start_date_lte": {
          "description": "Returns objects less or equal the specified date",
          "format": "date-time",
          "type": "string"
        },
        "state": {
          "description": "The value can be repeated to retrieve multiple matching values (OR condition)",
          "items": {
            "type": "string"
          },
          "type": "array"
        },
        "updated_at_gte": {
          "description": "Returns objects greater or equal the specified date",
          "format": "date-time",
          "type": "string"
        },
        "updated_at_lte": {
          "description": "Returns objects less or equal the specified date",
          "format": "date-time",
          "type": "string"
        }
      },
      "required": [
        "dag_id"
      ],
      "type": "object"
    },
    "name": "get_dag_runs"
  },
  {
    "description": "Get tasks for a specific DAG",
    "inputSchema": {
      "properties": {
        "dag_id": {
          "description": "The ID of the DAG to retrieve tasks for",
          "type": "string"
        }
      },
      "required": [
        "dag_id"
      ],
      "type": "object"
    },
    "name": "get_dag_tasks"
  },
  {
    "description": "Get details of a specific task instance",
    "inputSchema": {
      "properties": {
        "dag_id": {
          "description": "The ID of the DAG",
          "type": "string"
        },
        "dag_run_id": {
          "description": "The ID of the DAG run",
          "type": "string"
        },
        "task_id": {
          "description": "The ID of the task",
          "type": "string"
        }
      },
      "required": [
        "dag_id",
        "task_id",
        "dag_run_id"
      ],
      "type": "object"
    },
    "name": "get_task_instance"
  },
  {
    "description": "List all task instances for a specific DAG run",
    "inputSchema": {
      "properties": {
        "dag_id": {
          "description": "The ID of the DAG",
          "type": "string"
        },
        "dag_run_id": {
          "description": "The ID of the DAG run",
          "type": "string"
        },
        "duration_gte": {
          "description": "Returns objects greater than or equal to the specified values",
          "type": "number"
        },
        "duration_lte": {
          "description": "Returns objects less than or equal to the specified values",
          "type": "number"
        },
        "end_date_gte": {
          "description": "Returns objects greater or equal the specified date",
          "format": "date-time",
          "type": "string"
        },
        "end_date_lte": {
          "description": "Returns objects less than or equal to the specified date",
          "format": "date-time",
          "type": "string"
        },
        "execution_date_gte": {
          "description": "Returns objects greater or equal to the specified date",
          "format": "date-time",
          "type": "string"
        },
        "execution_date_lte": {
          "description": "Returns objects less than or equal to the specified date",
          "format": "date-time",
          "type": "string"
        },
        "limit": {
          "description": "The numbers of items to return (default: 100)",
          "minimum": 1,
          "type": "integer"
        },
        "offset": {
          "description": "The number of items to skip before starting to collect the result set",
          "minimum": 0,
          "type": "integer"
        },
        "pool": {
          "description": "The value can be repeated to retrieve multiple matching values (OR condition)",
          "items": {
            "type": "string"
          },
          "type": "array"
        },
        "queue": {
          "description": "The value can be repeated to retrieve multiple matching values (OR condition)",
          "items": {
            "type": "string"
          },
          "type": "array"
        },
        "start_date_gte": {
          "description": "Returns objects greater or equal the specified date",
          "format": "date-time",
          "type": "string"
        },
        "start_date_lte": {
          "description": "Returns objects less or equal the specified date",
          "format": "date-time",
          "type": "string"
        },
        "state": {
          "description": "States of the task instance. The value can be repeated to retrieve multiple matching values (OR condition)",
          "items": {
            "type": "string"
          },
          "type": "array"
        },
        "updated_at_gte": {
          "description": "Returns objects greater or equal the specified date",
          "format": "date-time",
          "type": "string"
        },
        "updated_at_lte": {
          "description": "Returns objects less or equal the specified date",
          "format": "date-time",
          "type": "string"
        }
      },
      "required": [
        "dag_id",
        "dag_run_id"
      ],
      "type": "object"
    },
    "name": "list_task_instances"
  },
  {
    "description": "Get details of a specific import error",
    "inputSchema": {
      "properties": {
        "import_error_id": {
          "description": "The ID of the import error to retrieve",
          "type": "integer"
        }
      },
      "required": [
        "import_error_id"
      ],
      "type": "object"
    },
    "name": "get_import_error"
  },
  {
    "description": "List all import errors",
    "inputSchema": {
      "properties": {
        "limit": {
          "description": "The numbers of items to return (default: 100)",
          "minimum": 1,
          "type": "integer"
        },
        "offset": {
          "description": "The number of items to skip before starting to collect the result set",
          "minimum": 0,
          "type": "integer"
        },
        "order_by": {
          "description": "The name of the field to order the results by. Prefix with - to reverse sort order",
          "type": "string"
        }
      },
      "type": "object"
    },
    "name": "list_import_errors"
  },
  {
    "description": "Get the health status of the Airflow instance",
    "inputSchema": {
      "properties": {},
      "type": "object"
    },
    "name": "get_health"
  },
  {
    "description": "Get the version information of the Airflow instance",
    "inputSchema": {
      "properties": {},
      "type": "object"
    },
    "name": "get_version"
  }
]