20
20
21
21
import pytest
22
22
23
- from airflow .models import DAG , RenderedTaskInstanceFields
23
+ from airflow .models import DAG , RenderedTaskInstanceFields , Variable
24
24
from airflow .operators .bash import BashOperator
25
25
from airflow .serialization .serialized_objects import SerializedDAG
26
26
from airflow .utils import timezone
@@ -61,6 +61,15 @@ def task2(dag):
61
61
)
62
62
63
63
64
+ @pytest .fixture ()
65
+ def task_secret (dag ):
66
+ return BashOperator (
67
+ task_id = 'task_secret' ,
68
+ bash_command = 'echo {{ var.value.my_secret }} && echo {{ var.value.spam }}' ,
69
+ dag = dag ,
70
+ )
71
+
72
+
64
73
@pytest .fixture (scope = "module" , autouse = True )
65
74
def init_blank_db ():
66
75
"""Make sure there are no runs before we test anything.
@@ -73,15 +82,15 @@ def init_blank_db():
73
82
74
83
75
84
@pytest .fixture (autouse = True )
76
- def reset_db (dag , task1 , task2 ):
85
+ def reset_db (dag , task1 , task2 , task_secret ):
77
86
yield
78
87
clear_db_dags ()
79
88
clear_db_runs ()
80
89
clear_rendered_ti_fields ()
81
90
82
91
83
92
@pytest .fixture ()
84
- def create_dag_run (dag , task1 , task2 ):
93
+ def create_dag_run (dag , task1 , task2 , task_secret ):
85
94
def _create_dag_run (* , execution_date , session ):
86
95
dag_run = dag .create_dagrun (
87
96
state = DagRunState .RUNNING ,
@@ -94,6 +103,8 @@ def _create_dag_run(*, execution_date, session):
94
103
ti1 .state = TaskInstanceState .SUCCESS
95
104
ti2 = dag_run .get_task_instance (task2 .task_id , session = session )
96
105
ti2 .state = TaskInstanceState .SCHEDULED
106
+ ti3 = dag_run .get_task_instance (task_secret .task_id , session = session )
107
+ ti3 .state = TaskInstanceState .QUEUED
97
108
session .flush ()
98
109
return dag_run
99
110
@@ -168,3 +179,29 @@ def test_user_defined_filter_and_macros_raise_error(admin_client, create_dag_run
168
179
# MarkupSafe changed the exception detail from 'no filter named' to
169
180
# 'No filter named' in 2.0 (I think), so we normalize for comparison.
170
181
assert "originalerror: no filter named 'hello'" in resp_html .lower ()
182
+
183
+
184
+ @pytest .mark .usefixtures ("patch_app" )
185
+ def test_rendered_template_secret (admin_client , create_dag_run , task_secret ):
186
+ """Test that the Rendered View masks values retrieved from secret variables."""
187
+ Variable .set ("my_secret" , "foo" )
188
+ Variable .set ("spam" , "egg" )
189
+
190
+ assert task_secret .bash_command == 'echo {{ var.value.my_secret }} && echo {{ var.value.spam }}'
191
+
192
+ with create_session () as session :
193
+ dag_run = create_dag_run (execution_date = DEFAULT_DATE , session = session )
194
+ ti = dag_run .get_task_instance (task_secret .task_id , session = session )
195
+ assert ti is not None , "task instance not found"
196
+ ti .refresh_from_task (task_secret )
197
+ assert ti .state == TaskInstanceState .QUEUED
198
+
199
+ date = quote_plus (str (DEFAULT_DATE ))
200
+ url = f'rendered-templates?task_id=task_secret&dag_id=testdag&execution_date={ date } '
201
+
202
+ resp = admin_client .get (url , follow_redirects = True )
203
+ check_content_in_response (
204
+ 'echo</span> *** <span class="o">&&</span> <span class="nb">echo</span> egg' , resp
205
+ )
206
+ ti .refresh_from_task (task_secret )
207
+ assert ti .state == TaskInstanceState .QUEUED
0 commit comments