mcp server apache airflow
Provides a standardized way for MCP clients to interact with Apache Airflow's REST API, supporting operations like DAG management and monitoring Airflow system health.
Provides a standardized way for MCP clients to interact with Apache Airflow's REST API, supporting operations like DAG management and monitoring Airflow system health.
A Model Context Protocol (MCP) server implementation for Apache Airflow, enabling seamless integration with MCP clients. This project provides a standardized way to interact with Apache Airflow through the Model Context Protocol.
This project implements a Model Context Protocol server that wraps Apache Airflow's REST API, allowing MCP clients to interact with Airflow in a standardized way. It uses the official Apache Airflow client library to ensure compatibility and maintainability.
Feature | API Path | Status |
---|---|---|
DAG Management | ||
List DAGs | /api/v1/dags |
✅ |
Get DAG Details | /api/v1/dags/{dag_id} |
✅ |
Pause DAG | /api/v1/dags/{dag_id} |
✅ |
Unpause DAG | /api/v1/dags/{dag_id} |
✅ |
Update DAG | /api/v1/dags/{dag_id} |
✅ |
Delete DAG | /api/v1/dags/{dag_id} |
✅ |
Get DAG Source | /api/v1/dagSources/{file_token} |
✅ |
Patch Multiple DAGs | /api/v1/dags |
✅ |
Reparse DAG File | /api/v1/dagSources/{file_token}/reparse |
✅ |
DAG Runs | ||
List DAG Runs | /api/v1/dags/{dag_id}/dagRuns |
✅ |
Create DAG Run | /api/v1/dags/{dag_id}/dagRuns |
✅ |
Get DAG Run Details | /api/v1/dags/{dag_id}/dagRuns/{dag_run_id} |
✅ |
Update DAG Run | /api/v1/dags/{dag_id}/dagRuns/{dag_run_id} |
✅ |
Delete DAG Run | /api/v1/dags/{dag_id}/dagRuns/{dag_run_id} |
✅ |
Get DAG Runs Batch | /api/v1/dags/~/dagRuns/list |
✅ |
Clear DAG Run | /api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/clear |
✅ |
Set DAG Run Note | /api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/setNote |
✅ |
Get Upstream Dataset Events | /api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/upstreamDatasetEvents |
✅ |
Tasks | ||
List DAG Tasks | /api/v1/dags/{dag_id}/tasks |
✅ |
Get Task Details | /api/v1/dags/{dag_id}/tasks/{task_id} |
✅ |
Get Task Instance | /api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id} |
✅ |
List Task Instances | /api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances |
✅ |
Update Task Instance | /api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id} |
✅ |
Clear Task Instances | /api/v1/dags/{dag_id}/clearTaskInstances |
✅ |
Set Task Instances State | /api/v1/dags/{dag_id}/updateTaskInstancesState |
✅ |
Variables | ||
List Variables | /api/v1/variables |
✅ |
Create Variable | /api/v1/variables |
✅ |
Get Variable | /api/v1/variables/{variable_key} |
✅ |
Update Variable | /api/v1/variables/{variable_key} |
✅ |
Delete Variable | /api/v1/variables/{variable_key} |
✅ |
Connections | ||
List Connections | /api/v1/connections |
✅ |
Create Connection | /api/v1/connections |
✅ |
Get Connection | /api/v1/connections/{connection_id} |
✅ |
Update Connection | /api/v1/connections/{connection_id} |
✅ |
Delete Connection | /api/v1/connections/{connection_id} |
✅ |
Test Connection | /api/v1/connections/test |
✅ |
Pools | ||
List Pools | /api/v1/pools |
✅ |
Create Pool | /api/v1/pools |
✅ |
Get Pool | /api/v1/pools/{pool_name} |
✅ |
Update Pool | /api/v1/pools/{pool_name} |
✅ |
Delete Pool | /api/v1/pools/{pool_name} |
✅ |
XComs | ||
List XComs | /api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries |
✅ |
Get XCom Entry | /api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key} |
✅ |
Datasets | ||
List Datasets | /api/v1/datasets |
✅ |
Get Dataset | /api/v1/datasets/{uri} |
✅ |
Get Dataset Events | /api/v1/datasetEvents |
✅ |
Create Dataset Event | /api/v1/datasetEvents |
✅ |
Get DAG Dataset Queued Event | /api/v1/dags/{dag_id}/dagRuns/queued/datasetEvents/{uri} |
✅ |
Get DAG Dataset Queued Events | /api/v1/dags/{dag_id}/dagRuns/queued/datasetEvents |
✅ |
Delete DAG Dataset Queued Event | /api/v1/dags/{dag_id}/dagRuns/queued/datasetEvents/{uri} |
✅ |
Delete DAG Dataset Queued Events | /api/v1/dags/{dag_id}/dagRuns/queued/datasetEvents |
✅ |
Get Dataset Queued Events | /api/v1/datasets/{uri}/dagRuns/queued/datasetEvents |
✅ |
Delete Dataset Queued Events | /api/v1/datasets/{uri}/dagRuns/queued/datasetEvents |
✅ |
Monitoring | ||
Get Health | /api/v1/health |
✅ |
DAG Stats | ||
Get DAG Stats | /api/v1/dags/statistics |
✅ |
Config | ||
Get Config | /api/v1/config |
✅ |
Plugins | ||
Get Plugins | /api/v1/plugins |
✅ |
Providers | ||
List Providers | /api/v1/providers |
✅ |
Event Logs | ||
List Event Logs | /api/v1/eventLogs |
✅ |
Get Event Log | /api/v1/eventLogs/{event_log_id} |
✅ |
System | ||
Get Import Errors | /api/v1/importErrors |
✅ |
Get Import Error Details | /api/v1/importErrors/{import_error_id} |
✅ |
Get Health Status | /api/v1/health |
✅ |
Get Version | /api/v1/version |
✅ |
This project depends on the official Apache Airflow client library (apache-airflow-client
). It will be automatically installed when you install this package.
Set the following environment variables:
AIRFLOW_HOST=<your-airflow-host>
AIRFLOW_USERNAME=<your-airflow-username>
AIRFLOW_PASSWORD=<your-airflow-password>
Add to your claude_desktop_config.json
:
{
"mcpServers": {
"mcp-server-apache-airflow": {
"command": "uvx",
"args": ["mcp-server-apache-airflow"],
"env": {
"AIRFLOW_HOST": "https://your-airflow-host",
"AIRFLOW_USERNAME": "your-username",
"AIRFLOW_PASSWORD": "your-password"
}
}
}
}
Alternative configuration using uv
:
{
"mcpServers": {
"mcp-server-apache-airflow": {
"command": "uv",
"args": [
"--directory",
"/path/to/mcp-server-apache-airflow",
"run",
"mcp-server-apache-airflow"
],
"env": {
"AIRFLOW_HOST": "https://your-airflow-host",
"AIRFLOW_USERNAME": "your-username",
"AIRFLOW_PASSWORD": "your-password"
}
}
}
}
Replace /path/to/mcp-server-apache-airflow
with the actual path where you've cloned the repository.
You can select the API groups you want to use by setting the --apis
flag.
uv run mcp-server-apache-airflow --apis "dag,dagrun"
The default is to use all APIs.
Allowed values are:
You can also run the server manually:
make run
make run
accepts following options:
Options:
- --port
: Port to listen on for SSE (default: 8000)
- --transport
: Transport type (stdio/sse, default: stdio)
Or, you could run the sse server directly, which accepts same parameters:
make run-sse
To install Apache Airflow MCP Server for Claude Desktop automatically via Smithery:
npx -y @smithery/cli install @yangkyeongmo/mcp-server-apache-airflow --client claude
Contributions are welcome! Please feel free to submit a Pull Request.
[
{
"description": "Lists all DAGs in the Airflow instance",
"inputSchema": {
"properties": {
"dag_id_pattern": {
"description": "If set, only return DAGs with dag_ids matching this pattern",
"type": "string"
},
"limit": {
"description": "The numbers of items to return (default: 100)",
"minimum": 1,
"type": "integer"
},
"offset": {
"description": "The number of items to skip before starting to collect the result set",
"minimum": 0,
"type": "integer"
},
"only_active": {
"description": "Only filter active DAGs (default: true)",
"type": "boolean"
},
"order_by": {
"description": "The name of the field to order the results by. Prefix with - to reverse sort order",
"type": "string"
},
"paused": {
"description": "Only filter paused/unpaused DAGs. If absent, returns both",
"type": "boolean"
},
"tags": {
"description": "List of tags to filter results",
"items": {
"type": "string"
},
"type": "array"
}
},
"type": "object"
},
"name": "list_dags"
},
{
"description": "Get details of a specific DAG",
"inputSchema": {
"properties": {
"dag_id": {
"description": "The ID of the DAG to retrieve",
"type": "string"
}
},
"required": [
"dag_id"
],
"type": "object"
},
"name": "get_dag"
},
{
"description": "Pause a DAG",
"inputSchema": {
"properties": {
"dag_id": {
"description": "The ID of the DAG to pause",
"type": "string"
}
},
"required": [
"dag_id"
],
"type": "object"
},
"name": "pause_dag"
},
{
"description": "Unpause a DAG",
"inputSchema": {
"properties": {
"dag_id": {
"description": "The ID of the DAG to unpause",
"type": "string"
}
},
"required": [
"dag_id"
],
"type": "object"
},
"name": "unpause_dag"
},
{
"description": "Trigger a DAG run",
"inputSchema": {
"properties": {
"dag_id": {
"description": "The ID of the DAG to trigger",
"type": "string"
}
},
"required": [
"dag_id"
],
"type": "object"
},
"name": "trigger_dag"
},
{
"description": "Get DAG runs for a specific DAG",
"inputSchema": {
"properties": {
"dag_id": {
"description": "The ID of the DAG to retrieve DAG runs for",
"type": "string"
},
"end_date_gte": {
"description": "Returns objects greater or equal the specified date",
"format": "date-time",
"type": "string"
},
"end_date_lte": {
"description": "Returns objects less than or equal to the specified date",
"format": "date-time",
"type": "string"
},
"execution_date_gte": {
"description": "Returns objects greater or equal to the specified date",
"format": "date-time",
"type": "string"
},
"execution_date_lte": {
"description": "Returns objects less than or equal to the specified date",
"format": "date-time",
"type": "string"
},
"limit": {
"description": "The numbers of items to return (default: 100)",
"minimum": 1,
"type": "integer"
},
"offset": {
"description": "The number of items to skip before starting to collect the result set",
"minimum": 0,
"type": "integer"
},
"order_by": {
"description": "The name of the field to order the results by. Prefix with - to reverse sort order",
"type": "string"
},
"start_date_gte": {
"description": "Returns objects greater or equal the specified date",
"format": "date-time",
"type": "string"
},
"start_date_lte": {
"description": "Returns objects less or equal the specified date",
"format": "date-time",
"type": "string"
},
"state": {
"description": "The value can be repeated to retrieve multiple matching values (OR condition)",
"items": {
"type": "string"
},
"type": "array"
},
"updated_at_gte": {
"description": "Returns objects greater or equal the specified date",
"format": "date-time",
"type": "string"
},
"updated_at_lte": {
"description": "Returns objects less or equal the specified date",
"format": "date-time",
"type": "string"
}
},
"required": [
"dag_id"
],
"type": "object"
},
"name": "get_dag_runs"
},
{
"description": "Get tasks for a specific DAG",
"inputSchema": {
"properties": {
"dag_id": {
"description": "The ID of the DAG to retrieve tasks for",
"type": "string"
}
},
"required": [
"dag_id"
],
"type": "object"
},
"name": "get_dag_tasks"
},
{
"description": "Get details of a specific task instance",
"inputSchema": {
"properties": {
"dag_id": {
"description": "The ID of the DAG",
"type": "string"
},
"dag_run_id": {
"description": "The ID of the DAG run",
"type": "string"
},
"task_id": {
"description": "The ID of the task",
"type": "string"
}
},
"required": [
"dag_id",
"task_id",
"dag_run_id"
],
"type": "object"
},
"name": "get_task_instance"
},
{
"description": "List all task instances for a specific DAG run",
"inputSchema": {
"properties": {
"dag_id": {
"description": "The ID of the DAG",
"type": "string"
},
"dag_run_id": {
"description": "The ID of the DAG run",
"type": "string"
},
"duration_gte": {
"description": "Returns objects greater than or equal to the specified values",
"type": "number"
},
"duration_lte": {
"description": "Returns objects less than or equal to the specified values",
"type": "number"
},
"end_date_gte": {
"description": "Returns objects greater or equal the specified date",
"format": "date-time",
"type": "string"
},
"end_date_lte": {
"description": "Returns objects less than or equal to the specified date",
"format": "date-time",
"type": "string"
},
"execution_date_gte": {
"description": "Returns objects greater or equal to the specified date",
"format": "date-time",
"type": "string"
},
"execution_date_lte": {
"description": "Returns objects less than or equal to the specified date",
"format": "date-time",
"type": "string"
},
"limit": {
"description": "The numbers of items to return (default: 100)",
"minimum": 1,
"type": "integer"
},
"offset": {
"description": "The number of items to skip before starting to collect the result set",
"minimum": 0,
"type": "integer"
},
"pool": {
"description": "The value can be repeated to retrieve multiple matching values (OR condition)",
"items": {
"type": "string"
},
"type": "array"
},
"queue": {
"description": "The value can be repeated to retrieve multiple matching values (OR condition)",
"items": {
"type": "string"
},
"type": "array"
},
"start_date_gte": {
"description": "Returns objects greater or equal the specified date",
"format": "date-time",
"type": "string"
},
"start_date_lte": {
"description": "Returns objects less or equal the specified date",
"format": "date-time",
"type": "string"
},
"state": {
"description": "States of the task instance. The value can be repeated to retrieve multiple matching values (OR condition)",
"items": {
"type": "string"
},
"type": "array"
},
"updated_at_gte": {
"description": "Returns objects greater or equal the specified date",
"format": "date-time",
"type": "string"
},
"updated_at_lte": {
"description": "Returns objects less or equal the specified date",
"format": "date-time",
"type": "string"
}
},
"required": [
"dag_id",
"dag_run_id"
],
"type": "object"
},
"name": "list_task_instances"
},
{
"description": "Get details of a specific import error",
"inputSchema": {
"properties": {
"import_error_id": {
"description": "The ID of the import error to retrieve",
"type": "integer"
}
},
"required": [
"import_error_id"
],
"type": "object"
},
"name": "get_import_error"
},
{
"description": "List all import errors",
"inputSchema": {
"properties": {
"limit": {
"description": "The numbers of items to return (default: 100)",
"minimum": 1,
"type": "integer"
},
"offset": {
"description": "The number of items to skip before starting to collect the result set",
"minimum": 0,
"type": "integer"
},
"order_by": {
"description": "The name of the field to order the results by. Prefix with - to reverse sort order",
"type": "string"
}
},
"type": "object"
},
"name": "list_import_errors"
},
{
"description": "Get the health status of the Airflow instance",
"inputSchema": {
"properties": {},
"type": "object"
},
"name": "get_health"
},
{
"description": "Get the version information of the Airflow instance",
"inputSchema": {
"properties": {},
"type": "object"
},
"name": "get_version"
}
]