From 15511de8aba11c5be1e666837049fb6156243111 Mon Sep 17 00:00:00 2001 From: Borys Date: Tue, 8 Apr 2025 16:30:06 +0300 Subject: [PATCH 1/2] fix: cancel blocking command during migration finalization --- src/server/cluster/cluster_family.cc | 9 +++++- tests/dragonfly/cluster_test.py | 44 ++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index e897760df694..cc3866f5210c 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -1006,10 +1006,17 @@ void ClusterFamily::ApplyMigrationSlotRangeToConfig(std::string_view node_id, auto new_config = is_incoming ? ClusterConfig::Current()->CloneWithChanges(slots, {}) : ClusterConfig::Current()->CloneWithChanges({}, slots); + auto blocking_filter = [&new_config](ArgSlice keys) { + bool moved = any_of(keys.begin(), keys.end(), [&](auto k) { return !new_config->IsMySlot(k); }); + return moved ? OpStatus::KEY_MOVED : OpStatus::OK; + }; // we don't need to use DispatchTracker here because for IncomingMingration we don't have // connectionas that should be tracked and for Outgoing migration we do it under Pause server_family_->service().proactor_pool().AwaitFiberOnAll( - [&new_config](util::ProactorBase*) { ClusterConfig::SetCurrent(new_config); }); + [this, &new_config, &blocking_filter](util::ProactorBase*) { + server_family_->CancelBlockingOnThread(blocking_filter); + ClusterConfig::SetCurrent(new_config); + }); DCHECK(ClusterConfig::Current() != nullptr); VLOG(1) << "Config is updated for slots ranges: " << slots.ToString() << " for " << MyID() << " : " << node_id; diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index 684496d6fa62..324c207afd22 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -3172,3 +3172,47 @@ async def test_readonly_replication( # This behavior can be changed in the future assert await r1_node.client.execute_command("GET Y") == None + + +@dfly_args({"proactor_threads": 2, "cluster_mode": "yes"}) +async def test_cluster_sharded_pub_sub_migration(df_factory: DflyInstanceFactory): + # blocking commands should be canceled during migration finalization + instances = [df_factory.create(port=next(next_port)) for i in range(2)] + df_factory.start_all(instances) + + c_nodes = [instance.client() for instance in instances] + + nodes = [(await create_node_info(instance)) for instance in instances] + nodes[0].slots = [(0, 16383)] + nodes[1].slots = [] + + await push_config(json.dumps(generate_config(nodes)), [node.client for node in nodes]) + + logging.debug("Start blpop task") + blpop_task = asyncio.create_task(c_nodes[0].blpop("list", 0)) + + await asyncio.sleep(0.5) + + assert not blpop_task.done() + + nodes[0].migrations.append( + MigrationInfo("127.0.0.1", nodes[1].instance.port, [(0, 16383)], nodes[1].id) + ) + await push_config(json.dumps(generate_config(nodes)), [node.client for node in nodes]) + + await wait_for_status(nodes[0].client, nodes[1].id, "FINISHED") + + with pytest.raises(aioredis.ResponseError) as e_info: + await blpop_task + assert "MOVED" in str(e_info.value) + + assert await c_nodes[1].type("list") == "none" + + nodes[0].migrations = [] + nodes[0].slots = [] + nodes[1].slots = [(0, 16383)] + + logging.debug("remove finished migrations") + await push_config(json.dumps(generate_config(nodes)), [node.client for node in nodes]) + + assert await c_nodes[1].type("list") == "none" From 7baf9cd634291aca5318ef3a9e4f766165228fea Mon Sep 17 00:00:00 2001 From: Borys Date: Thu, 10 Apr 2025 13:24:58 +0300 Subject: [PATCH 2/2] fix: rename test --- tests/dragonfly/cluster_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index 324c207afd22..ac2f2a7e3491 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -3175,7 +3175,7 @@ async def test_readonly_replication( @dfly_args({"proactor_threads": 2, "cluster_mode": "yes"}) -async def test_cluster_sharded_pub_sub_migration(df_factory: DflyInstanceFactory): +async def test_cancel_blocking_cmd_during_mygration_finalization(df_factory: DflyInstanceFactory): # blocking commands should be canceled during migration finalization instances = [df_factory.create(port=next(next_port)) for i in range(2)] df_factory.start_all(instances)