Skip to content

Commit aadc5b8

Browse files
committed
fix: cancel blocking command during migration finalization
1 parent addf049 commit aadc5b8

File tree

2 files changed

+52
-1
lines changed

2 files changed

+52
-1
lines changed

src/server/cluster/cluster_family.cc

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -990,10 +990,17 @@ void ClusterFamily::ApplyMigrationSlotRangeToConfig(std::string_view node_id,
990990
auto new_config = is_incoming ? ClusterConfig::Current()->CloneWithChanges(slots, {})
991991
: ClusterConfig::Current()->CloneWithChanges({}, slots);
992992

993+
auto blocking_filter = [&new_config](ArgSlice keys) {
994+
bool moved = any_of(keys.begin(), keys.end(), [&](auto k) { return !new_config->IsMySlot(k); });
995+
return moved ? OpStatus::KEY_MOVED : OpStatus::OK;
996+
};
993997
// we don't need to use DispatchTracker here because for IncomingMingration we don't have
994998
// connectionas that should be tracked and for Outgoing migration we do it under Pause
995999
server_family_->service().proactor_pool().AwaitFiberOnAll(
996-
[&new_config](util::ProactorBase*) { ClusterConfig::SetCurrent(new_config); });
1000+
[this, &new_config, &blocking_filter](util::ProactorBase*) {
1001+
server_family_->CancelBlockingOnThread(blocking_filter);
1002+
ClusterConfig::SetCurrent(new_config);
1003+
});
9971004
DCHECK(ClusterConfig::Current() != nullptr);
9981005
VLOG(1) << "Config is updated for slots ranges: " << slots.ToString() << " for " << MyID()
9991006
<< " : " << node_id;

tests/dragonfly/cluster_test.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3172,3 +3172,47 @@ async def test_readonly_replication(
31723172

31733173
# This behavior can be changed in the future
31743174
assert await r1_node.client.execute_command("GET Y") == None
3175+
3176+
3177+
@dfly_args({"proactor_threads": 2, "cluster_mode": "yes"})
3178+
async def test_cluster_sharded_pub_sub_migration(df_factory: DflyInstanceFactory):
3179+
# blocking commands should be canceled during migration finalization
3180+
instances = [df_factory.create(port=next(next_port)) for i in range(2)]
3181+
df_factory.start_all(instances)
3182+
3183+
c_nodes = [instance.client() for instance in instances]
3184+
3185+
nodes = [(await create_node_info(instance)) for instance in instances]
3186+
nodes[0].slots = [(0, 16383)]
3187+
nodes[1].slots = []
3188+
3189+
await push_config(json.dumps(generate_config(nodes)), [node.client for node in nodes])
3190+
3191+
logging.debug("Start blpop task")
3192+
blpop_task = asyncio.create_task(c_nodes[0].blpop("list", 0))
3193+
3194+
await asyncio.sleep(0.5)
3195+
3196+
assert not blpop_task.done()
3197+
3198+
nodes[0].migrations.append(
3199+
MigrationInfo("127.0.0.1", nodes[1].instance.port, [(0, 16383)], nodes[1].id)
3200+
)
3201+
await push_config(json.dumps(generate_config(nodes)), [node.client for node in nodes])
3202+
3203+
await wait_for_status(nodes[0].client, nodes[1].id, "FINISHED")
3204+
3205+
with pytest.raises(aioredis.ResponseError) as e_info:
3206+
await blpop_task
3207+
assert "MOVED" in str(e_info.value)
3208+
3209+
assert await c_nodes[1].type("list") == "none"
3210+
3211+
nodes[0].migrations = []
3212+
nodes[0].slots = []
3213+
nodes[1].slots = [(0, 16383)]
3214+
3215+
logging.debug("remove finished migrations")
3216+
await push_config(json.dumps(generate_config(nodes)), [node.client for node in nodes])
3217+
3218+
assert await c_nodes[1].type("list") == "none"

0 commit comments

Comments
 (0)