1
1
package com .provectus .kafka .ui .service .acl ;
2
2
3
+ import static org .apache .kafka .common .acl .AclOperation .ALL ;
4
+ import static org .apache .kafka .common .acl .AclOperation .CREATE ;
5
+ import static org .apache .kafka .common .acl .AclOperation .DESCRIBE ;
6
+ import static org .apache .kafka .common .acl .AclOperation .IDEMPOTENT_WRITE ;
7
+ import static org .apache .kafka .common .acl .AclOperation .READ ;
8
+ import static org .apache .kafka .common .acl .AclOperation .WRITE ;
9
+ import static org .apache .kafka .common .acl .AclPermissionType .ALLOW ;
10
+ import static org .apache .kafka .common .resource .PatternType .LITERAL ;
11
+ import static org .apache .kafka .common .resource .PatternType .PREFIXED ;
12
+ import static org .apache .kafka .common .resource .ResourceType .CLUSTER ;
13
+ import static org .apache .kafka .common .resource .ResourceType .GROUP ;
14
+ import static org .apache .kafka .common .resource .ResourceType .TOPIC ;
15
+ import static org .apache .kafka .common .resource .ResourceType .TRANSACTIONAL_ID ;
16
+
3
17
import com .google .common .collect .Sets ;
18
+ import com .provectus .kafka .ui .model .CreateConsumerAclDTO ;
19
+ import com .provectus .kafka .ui .model .CreateProducerAclDTO ;
20
+ import com .provectus .kafka .ui .model .CreateStreamAppAclDTO ;
4
21
import com .provectus .kafka .ui .model .KafkaCluster ;
5
22
import com .provectus .kafka .ui .service .AdminClientService ;
23
+ import com .provectus .kafka .ui .service .ReactiveAdminClient ;
24
+ import java .util .ArrayList ;
25
+ import java .util .Collection ;
6
26
import java .util .Comparator ;
7
27
import java .util .List ;
28
+ import java .util .Optional ;
8
29
import java .util .Set ;
30
+ import javax .annotation .Nullable ;
9
31
import lombok .RequiredArgsConstructor ;
10
32
import lombok .extern .slf4j .Slf4j ;
33
+ import org .apache .kafka .common .acl .AccessControlEntry ;
11
34
import org .apache .kafka .common .acl .AclBinding ;
35
+ import org .apache .kafka .common .acl .AclOperation ;
36
+ import org .apache .kafka .common .resource .Resource ;
37
+ import org .apache .kafka .common .resource .ResourcePattern ;
12
38
import org .apache .kafka .common .resource .ResourcePatternFilter ;
39
+ import org .apache .kafka .common .resource .ResourceType ;
13
40
import org .springframework .stereotype .Service ;
41
+ import org .springframework .util .CollectionUtils ;
14
42
import reactor .core .publisher .Flux ;
15
43
import reactor .core .publisher .Mono ;
16
44
@@ -22,11 +50,14 @@ public class AclsService {
22
50
private final AdminClientService adminClientService ;
23
51
24
52
public Mono <Void > createAcl (KafkaCluster cluster , AclBinding aclBinding ) {
25
- var aclString = AclCsv .createAclString (aclBinding );
26
- log .info ("CREATING ACL: [{}]" , aclString );
27
53
return adminClientService .get (cluster )
28
- .flatMap (ac -> ac .createAcls (List .of (aclBinding )))
29
- .doOnSuccess (v -> log .info ("ACL CREATED: [{}]" , aclString ));
54
+ .flatMap (ac -> createAclsWithLogging (ac , List .of (aclBinding )));
55
+ }
56
+
57
+ private Mono <Void > createAclsWithLogging (ReactiveAdminClient ac , Collection <AclBinding > bindings ) {
58
+ bindings .forEach (b -> log .info ("CREATING ACL: [{}]" , AclCsv .createAclString (b )));
59
+ return ac .createAcls (bindings )
60
+ .doOnSuccess (v -> bindings .forEach (b -> log .info ("ACL CREATED: [{}]" , AclCsv .createAclString (b ))));
30
61
}
31
62
32
63
public Mono <Void > deleteAcl (KafkaCluster cluster , AclBinding aclBinding ) {
@@ -92,4 +123,150 @@ private void logAclSyncPlan(KafkaCluster cluster, Set<AclBinding> toBeAdded, Set
92
123
}
93
124
}
94
125
126
+ // creates allow binding for resources by prefix or specific names list
127
+ private List <AclBinding > createAllowBindings (ResourceType resourceType ,
128
+ List <AclOperation > opsToAllow ,
129
+ String principal ,
130
+ String host ,
131
+ @ Nullable String resourcePrefix ,
132
+ @ Nullable Collection <String > resourceNames ) {
133
+ List <AclBinding > bindings = new ArrayList <>();
134
+ if (resourcePrefix != null ) {
135
+ for (var op : opsToAllow ) {
136
+ bindings .add (
137
+ new AclBinding (
138
+ new ResourcePattern (resourceType , resourcePrefix , PREFIXED ),
139
+ new AccessControlEntry (principal , host , op , ALLOW )));
140
+ }
141
+ }
142
+ if (!CollectionUtils .isEmpty (resourceNames )) {
143
+ resourceNames .stream ()
144
+ .distinct ()
145
+ .forEach (resource ->
146
+ opsToAllow .forEach (op ->
147
+ bindings .add (
148
+ new AclBinding (
149
+ new ResourcePattern (resourceType , resource , LITERAL ),
150
+ new AccessControlEntry (principal , host , op , ALLOW )))));
151
+ }
152
+ return bindings ;
153
+ }
154
+
155
+ public Mono <Void > createConsumerAcl (KafkaCluster cluster , CreateConsumerAclDTO request ) {
156
+ return adminClientService .get (cluster )
157
+ .flatMap (ac -> createAclsWithLogging (ac , createConsumerBindings (request )))
158
+ .then ();
159
+ }
160
+
161
+ //Read, Describe on topics, Read on consumerGroups
162
+ private List <AclBinding > createConsumerBindings (CreateConsumerAclDTO request ) {
163
+ List <AclBinding > bindings = new ArrayList <>();
164
+ bindings .addAll (
165
+ createAllowBindings (TOPIC ,
166
+ List .of (READ , DESCRIBE ),
167
+ request .getPrincipal (),
168
+ request .getHost (),
169
+ request .getTopicsPrefix (),
170
+ request .getTopics ()));
171
+
172
+ bindings .addAll (
173
+ createAllowBindings (
174
+ GROUP ,
175
+ List .of (READ ),
176
+ request .getPrincipal (),
177
+ request .getHost (),
178
+ request .getConsumerGroupsPrefix (),
179
+ request .getConsumerGroups ()));
180
+ return bindings ;
181
+ }
182
+
183
+ public Mono <Void > createProducerAcl (KafkaCluster cluster , CreateProducerAclDTO request ) {
184
+ return adminClientService .get (cluster )
185
+ .flatMap (ac -> createAclsWithLogging (ac , createProducerBindings (request )))
186
+ .then ();
187
+ }
188
+
189
+ //Write, Describe, Create permission on topics, Write, Describe on transactionalIds
190
+ //IDEMPOTENT_WRITE on cluster if idempotent is enabled
191
+ private List <AclBinding > createProducerBindings (CreateProducerAclDTO request ) {
192
+ List <AclBinding > bindings = new ArrayList <>();
193
+ bindings .addAll (
194
+ createAllowBindings (
195
+ TOPIC ,
196
+ List .of (WRITE , DESCRIBE , CREATE ),
197
+ request .getPrincipal (),
198
+ request .getHost (),
199
+ request .getTopicsPrefix (),
200
+ request .getTopics ()));
201
+
202
+ bindings .addAll (
203
+ createAllowBindings (
204
+ TRANSACTIONAL_ID ,
205
+ List .of (WRITE , DESCRIBE ),
206
+ request .getPrincipal (),
207
+ request .getHost (),
208
+ request .getTransactionsIdPrefix (),
209
+ Optional .ofNullable (request .getTransactionalId ()).map (List ::of ).orElse (null )));
210
+
211
+ if (Boolean .TRUE .equals (request .getIdempotent ())) {
212
+ bindings .addAll (
213
+ createAllowBindings (
214
+ CLUSTER ,
215
+ List .of (IDEMPOTENT_WRITE ),
216
+ request .getPrincipal (),
217
+ request .getHost (),
218
+ null ,
219
+ List .of (Resource .CLUSTER_NAME ))); // cluster name is a const string in ACL api
220
+ }
221
+ return bindings ;
222
+ }
223
+
224
+ public Mono <Void > createStreamAppAcl (KafkaCluster cluster , CreateStreamAppAclDTO request ) {
225
+ return adminClientService .get (cluster )
226
+ .flatMap (ac -> createAclsWithLogging (ac , createStreamAppBindings (request )))
227
+ .then ();
228
+ }
229
+
230
+ // Read on input topics, Write on output topics
231
+ // ALL on applicationId-prefixed Groups and Topics
232
+ private List <AclBinding > createStreamAppBindings (CreateStreamAppAclDTO request ) {
233
+ List <AclBinding > bindings = new ArrayList <>();
234
+ bindings .addAll (
235
+ createAllowBindings (
236
+ TOPIC ,
237
+ List .of (READ ),
238
+ request .getPrincipal (),
239
+ request .getHost (),
240
+ null ,
241
+ request .getInputTopics ()));
242
+
243
+ bindings .addAll (
244
+ createAllowBindings (
245
+ TOPIC ,
246
+ List .of (WRITE ),
247
+ request .getPrincipal (),
248
+ request .getHost (),
249
+ null ,
250
+ request .getOutputTopics ()));
251
+
252
+ bindings .addAll (
253
+ createAllowBindings (
254
+ GROUP ,
255
+ List .of (ALL ),
256
+ request .getPrincipal (),
257
+ request .getHost (),
258
+ request .getApplicationId (),
259
+ null ));
260
+
261
+ bindings .addAll (
262
+ createAllowBindings (
263
+ TOPIC ,
264
+ List .of (ALL ),
265
+ request .getPrincipal (),
266
+ request .getHost (),
267
+ request .getApplicationId (),
268
+ null ));
269
+ return bindings ;
270
+ }
271
+
95
272
}
0 commit comments