-
Notifications
You must be signed in to change notification settings - Fork 91
Open
Labels
enhancementNew feature or requestNew feature or requesthelp wantedExtra attention is neededExtra attention is needed
Description
We have rack-awareness on for our cluster. If I use kafkactl to increase the replication-factor on a topic, does the new replication-factor and assignment maintain rack awareness?
We used kafkactl last week, and it now appears that one of the topics is no longer rack-safe. I'm checking to see if the topic that is no longer rack-safe is the topic that we used kafkactl on.
I browsed through the code for kafkactl, and couldn't tell if you were taking into account broker racks, when increasing replication factor.
kafkactl/internal/topic/topic-operation.go
Lines 501 to 545 in 105b481
func getTargetReplicas(currentReplicas []int32, brokerReplicaCount map[int32]int, targetReplicationFactor int16) ([]int32, error) { | |
replicas := currentReplicas | |
for len(replicas) > int(targetReplicationFactor) { | |
sort.Slice(replicas, func(i, j int) bool { | |
brokerI := replicas[i] | |
brokerJ := replicas[j] | |
return brokerReplicaCount[brokerI] < brokerReplicaCount[brokerJ] || (brokerReplicaCount[brokerI] == brokerReplicaCount[brokerJ] && brokerI < brokerJ) | |
}) | |
lastReplica := replicas[len(replicas)-1] | |
replicas = replicas[:len(replicas)-1] | |
brokerReplicaCount[lastReplica]-- | |
} | |
var unusedBrokerIds []int32 | |
if len(replicas) < int(targetReplicationFactor) { | |
for brokerID := range brokerReplicaCount { | |
if !util.ContainsInt32(replicas, brokerID) { | |
unusedBrokerIds = append(unusedBrokerIds, brokerID) | |
} | |
} | |
if len(unusedBrokerIds) < (int(targetReplicationFactor) - len(replicas)) { | |
return nil, errors.New("not enough brokers") | |
} | |
} | |
for len(replicas) < int(targetReplicationFactor) { | |
sort.Slice(unusedBrokerIds, func(i, j int) bool { | |
brokerI := unusedBrokerIds[i] | |
brokerJ := unusedBrokerIds[j] | |
return brokerReplicaCount[brokerI] < brokerReplicaCount[brokerJ] || (brokerReplicaCount[brokerI] == brokerReplicaCount[brokerJ] && brokerI > brokerJ) | |
}) | |
replicas = append(replicas, unusedBrokerIds[0]) | |
brokerReplicaCount[unusedBrokerIds[0]]++ | |
unusedBrokerIds = unusedBrokerIds[1:] | |
} | |
return replicas, nil | |
} |
Thanks!
jrodriguez-tivo
Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or requesthelp wantedExtra attention is neededExtra attention is needed