Skip to content

Commit 677f049

Browse files
committed
Support remote arrays for fragment_list consolidation
1 parent 1326ed4 commit 677f049

File tree

9 files changed

+335
-134
lines changed

9 files changed

+335
-134
lines changed

test/src/unit-capi-consolidation.cc

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
#include <test/support/tdb_catch.h>
3434
#include "test/support/src/helpers.h"
35+
#include "test/support/src/serialization_wrappers.h"
3536
#include "test/support/src/vfs_helpers.h"
3637
#include "tiledb/common/stdx_string.h"
3738
#include "tiledb/platform/platform.h"
@@ -72,6 +73,8 @@ struct ConsolidationFx {
7273
tiledb_encryption_type_t encryption_type_ = TILEDB_NO_ENCRYPTION;
7374
const char* encryption_key_ = nullptr;
7475

76+
bool serialize_ = false;
77+
7578
// Constructors/destructors
7679
ConsolidationFx();
7780

@@ -5113,6 +5116,13 @@ TEST_CASE_METHOD(
51135116
REQUIRE(rc == TILEDB_OK);
51145117
REQUIRE(error == nullptr);
51155118

5119+
if (serialize_) {
5120+
std::vector<std::string> frag_uris_deserialized;
5121+
tiledb_array_consolidation_request_wrapper(
5122+
ctx_, tiledb_serialization_type_t(0), nullptr, &frag_uris_deserialized);
5123+
REQUIRE(frag_uris_deserialized.empty());
5124+
}
5125+
51165126
// Consolidate
51175127
rc = tiledb_array_consolidate(ctx_, dense_vector_uri_.c_str(), config);
51185128
CHECK(rc == TILEDB_OK);
@@ -7158,7 +7168,11 @@ TEST_CASE_METHOD(
71587168
TEST_CASE_METHOD(
71597169
ConsolidationFx,
71607170
"C API: Test consolidation, dense split fragments",
7161-
"[capi][consolidation][dense][split-fragments][non-rest]") {
7171+
"[capi][consolidation][dense][split-fragments][rest]") {
7172+
#ifdef TILEDB_SERIALIZATION
7173+
serialize_ = GENERATE(true, false);
7174+
#endif
7175+
71627176
remove_dense_array();
71637177
create_dense_array();
71647178
write_dense_subarray(1, 2, 1, 2);
@@ -7198,6 +7212,23 @@ TEST_CASE_METHOD(
71987212

71997213
// Consolidate
72007214
const char* uris[2] = {strrchr(uri, '/') + 1, strrchr(uri2, '/') + 1};
7215+
7216+
if (serialize_) {
7217+
std::vector<std::string> frag_uris;
7218+
frag_uris.reserve(2);
7219+
for (uint64_t i = 0; i < 2; i++) {
7220+
frag_uris.emplace_back(uris[i]);
7221+
}
7222+
7223+
std::vector<std::string> frag_uris_deserialized;
7224+
tiledb_array_consolidation_request_wrapper(
7225+
ctx_,
7226+
tiledb_serialization_type_t(0),
7227+
&frag_uris,
7228+
&frag_uris_deserialized);
7229+
REQUIRE(frag_uris == frag_uris_deserialized);
7230+
}
7231+
72017232
rc = tiledb_array_consolidate_fragments(
72027233
ctx_, dense_array_uri_.c_str(), uris, 2, cfg);
72037234
CHECK(rc == TILEDB_OK);
@@ -7234,7 +7265,7 @@ TEST_CASE_METHOD(
72347265
TEST_CASE_METHOD(
72357266
ConsolidationFx,
72367267
"C API: Test consolidation, sparse split fragments",
7237-
"[capi][consolidation][sparse][split-fragments][non-rest]") {
7268+
"[capi][consolidation][sparse][split-fragments][rest]") {
72387269
remove_sparse_array();
72397270
create_sparse_array();
72407271
write_sparse_row(0);
@@ -7274,6 +7305,23 @@ TEST_CASE_METHOD(
72747305

72757306
// Consolidate
72767307
const char* uris[2] = {strrchr(uri, '/') + 1, strrchr(uri2, '/') + 1};
7308+
7309+
if (serialize_) {
7310+
std::vector<std::string> frag_uris;
7311+
frag_uris.reserve(2);
7312+
for (uint64_t i = 0; i < 2; i++) {
7313+
frag_uris.emplace_back(uris[i]);
7314+
}
7315+
7316+
std::vector<std::string> frag_uris_deserialized;
7317+
tiledb_array_consolidation_request_wrapper(
7318+
ctx_,
7319+
tiledb_serialization_type_t(0),
7320+
&frag_uris,
7321+
&frag_uris_deserialized);
7322+
REQUIRE(frag_uris == frag_uris_deserialized);
7323+
}
7324+
72777325
rc = tiledb_array_consolidate_fragments(
72787326
ctx_, sparse_array_uri_.c_str(), uris, 2, cfg);
72797327
CHECK(rc == TILEDB_OK);

test/support/src/serialization_wrappers.cc

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,12 @@
3232
*/
3333

3434
#include "test/support/src/helpers.h"
35+
#include "tiledb/api/c_api/buffer/buffer_api_internal.h"
36+
#include "tiledb/api/c_api/context/context_api_internal.h"
3537
#include "tiledb/sm/c_api/tiledb.h"
3638
#include "tiledb/sm/c_api/tiledb_serialization.h"
3739
#include "tiledb/sm/c_api/tiledb_struct_def.h"
40+
#include "tiledb/sm/serialization/consolidation.h"
3841
#include "tiledb/sm/serialization/query.h"
3942

4043
#ifdef TILEDB_SERIALIZATION
@@ -218,3 +221,28 @@ void tiledb_subarray_serialize(
218221
*subarray = deserialized_subarray;
219222
#endif
220223
}
224+
225+
void tiledb_array_consolidation_request_wrapper(
226+
tiledb_ctx_t* ctx,
227+
tiledb_serialization_type_t serialize_type,
228+
const std::vector<std::string>* fragment_uris_in,
229+
std::vector<std::string>* fragment_uris_out) {
230+
// Serialize and Deserialize
231+
auto buffer = tiledb_buffer_handle_t::make_handle();
232+
serialization::array_consolidation_request_serialize(
233+
ctx->config(),
234+
fragment_uris_in,
235+
static_cast<tiledb::sm::SerializationType>(serialize_type),
236+
&(buffer->buffer()));
237+
238+
auto [config, fragment_uris_deser] =
239+
serialization::array_consolidation_request_deserialize(
240+
static_cast<tiledb::sm::SerializationType>(serialize_type),
241+
buffer->buffer());
242+
243+
tiledb_buffer_handle_t::break_handle(buffer);
244+
245+
if (fragment_uris_deser.has_value()) {
246+
*fragment_uris_out = fragment_uris_deser.value();
247+
}
248+
}

test/support/src/serialization_wrappers.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,4 +141,11 @@ int tiledb_fragment_info_serialize(
141141
*/
142142
void tiledb_subarray_serialize(
143143
tiledb_ctx_t* ctx, tiledb_array_t* array, tiledb_subarray_t** subarray);
144+
145+
void tiledb_array_consolidation_request_wrapper(
146+
tiledb_ctx_t* ctx,
147+
tiledb_serialization_type_t serialize_type,
148+
const std::vector<std::string>* fragment_uris_in,
149+
std::vector<std::string>* fragment_uris_out);
150+
144151
#endif // TILEDB_TEST_SERIALIZATION_WRAPPERS_H

tiledb/sm/c_api/tiledb.cc

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2664,13 +2664,31 @@ int32_t tiledb_array_create_with_key(
26642664

26652665
int32_t tiledb_array_consolidate(
26662666
tiledb_ctx_t* ctx, const char* array_uri, tiledb_config_t* config) {
2667+
// Validate input arguments
2668+
api::ensure_context_is_valid(ctx);
26672669
api::ensure_config_is_valid_if_present(config);
2670+
2671+
auto uri = tiledb::sm::URI(array_uri);
2672+
if (uri.is_invalid()) {
2673+
throw api::CAPIStatusException(
2674+
"Failed to consolidate fragments; Invalid input array uri");
2675+
}
2676+
2677+
auto input_config = (config == nullptr) ? ctx->config() : config->config();
2678+
if (uri.is_tiledb() &&
2679+
tiledb::sm::Consolidator::mode_from_config(input_config) ==
2680+
tiledb::sm::ConsolidationMode::FRAGMENT) {
2681+
throw api::CAPIStatusException(
2682+
"Please use tiledb_array_consolidate_fragments API for consolidating "
2683+
"fragments on remote arrays.");
2684+
}
2685+
26682686
tiledb::sm::Consolidator::array_consolidate(
26692687
array_uri,
26702688
tiledb::sm::EncryptionType::NO_ENCRYPTION,
26712689
nullptr,
26722690
0,
2673-
(config == nullptr) ? ctx->config() : config->config(),
2691+
input_config,
26742692
ctx->storage_manager());
26752693
return TILEDB_OK;
26762694
}
@@ -2682,7 +2700,15 @@ int32_t tiledb_array_consolidate_with_key(
26822700
const void* encryption_key,
26832701
uint32_t key_length,
26842702
tiledb_config_t* config) {
2685-
// Sanity checks
2703+
// Validate input arguments
2704+
api::ensure_context_is_valid(ctx);
2705+
api::ensure_config_is_valid_if_present(config);
2706+
2707+
auto uri = tiledb::sm::URI(array_uri);
2708+
if (uri.is_invalid()) {
2709+
throw api::CAPIStatusException(
2710+
"Failed to consolidate fragments; Invalid input array uri");
2711+
}
26862712

26872713
tiledb::sm::Consolidator::array_consolidate(
26882714
array_uri,
@@ -2701,7 +2727,33 @@ int32_t tiledb_array_consolidate_fragments(
27012727
const char** fragment_uris,
27022728
const uint64_t num_fragments,
27032729
tiledb_config_t* config) {
2704-
// Sanity checks
2730+
// Validate input arguments
2731+
api::ensure_context_is_valid(ctx);
2732+
api::ensure_config_is_valid_if_present(config);
2733+
2734+
if (fragment_uris == nullptr) {
2735+
throw api::CAPIStatusException(
2736+
"Failed to consolidate fragments; Invalid input fragment list");
2737+
}
2738+
2739+
auto uri = tiledb::sm::URI(array_uri);
2740+
if (uri.is_invalid()) {
2741+
throw api::CAPIStatusException(
2742+
"Failed to consolidate fragments; Invalid input array uri");
2743+
}
2744+
2745+
if (num_fragments < 1) {
2746+
throw api::CAPIStatusException(
2747+
"Failed to consolidate fragments; Invalid input number of fragments");
2748+
}
2749+
2750+
for (size_t i = 0; i < num_fragments; i++) {
2751+
if (tiledb::sm::URI(fragment_uris[i]).is_invalid()) {
2752+
throw api::CAPIStatusException(
2753+
"Failed to consolidate fragments; Invalid uri(s) in input fragment "
2754+
"list");
2755+
}
2756+
}
27052757

27062758
// Convert the list of fragments to a vector
27072759
std::vector<std::string> uris;

tiledb/sm/consolidator/consolidator.cc

Lines changed: 57 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -146,21 +146,21 @@ void Consolidator::array_consolidate(
146146
throw ConsolidatorException("Cannot consolidate array; Invalid URI");
147147
}
148148

149-
// Check if array exists
150-
ObjectType obj_type;
151-
throw_if_not_ok(
152-
object_type(storage_manager->resources(), array_uri, &obj_type));
153-
154-
if (obj_type != ObjectType::ARRAY) {
155-
throw ConsolidatorException(
156-
"Cannot consolidate array; Array does not exist");
157-
}
158-
159149
if (array_uri.is_tiledb()) {
160150
throw_if_not_ok(
161151
storage_manager->resources().rest_client()->post_consolidation_to_rest(
162152
array_uri, config));
163153
} else {
154+
// Check if array exists
155+
ObjectType obj_type;
156+
throw_if_not_ok(
157+
object_type(storage_manager->resources(), array_uri, &obj_type));
158+
159+
if (obj_type != ObjectType::ARRAY) {
160+
throw ConsolidatorException(
161+
"Cannot consolidate array; Array does not exist");
162+
}
163+
164164
// Get encryption key from config
165165
std::string encryption_key_from_cfg;
166166
if (!encryption_key) {
@@ -210,50 +210,58 @@ void Consolidator::fragments_consolidate(
210210
throw ConsolidatorException("Cannot consolidate array; Invalid URI");
211211
}
212212

213-
// Check if array exists
214-
ObjectType obj_type;
215-
throw_if_not_ok(
216-
object_type(storage_manager->resources(), array_uri, &obj_type));
213+
if (array_uri.is_tiledb()) {
214+
throw_if_not_ok(
215+
storage_manager->resources().rest_client()->post_consolidation_to_rest(
216+
array_uri, config, &fragment_uris));
217+
} else {
218+
// Check if array exists
219+
ObjectType obj_type;
220+
throw_if_not_ok(
221+
object_type(storage_manager->resources(), array_uri, &obj_type));
217222

218-
if (obj_type != ObjectType::ARRAY) {
219-
throw ConsolidatorException(
220-
"Cannot consolidate array; Array does not exist");
221-
}
223+
if (obj_type != ObjectType::ARRAY) {
224+
throw ConsolidatorException(
225+
"Cannot consolidate array; Array does not exist");
226+
}
222227

223-
// Get encryption key from config
224-
std::string encryption_key_from_cfg;
225-
if (!encryption_key) {
226-
bool found = false;
227-
encryption_key_from_cfg = config.get("sm.encryption_key", &found);
228-
assert(found);
229-
}
228+
// Get encryption key from config
229+
std::string encryption_key_from_cfg;
230+
if (!encryption_key) {
231+
bool found = false;
232+
encryption_key_from_cfg = config.get("sm.encryption_key", &found);
233+
assert(found);
234+
}
235+
236+
if (!encryption_key_from_cfg.empty()) {
237+
encryption_key = encryption_key_from_cfg.c_str();
238+
key_length = static_cast<uint32_t>(encryption_key_from_cfg.size());
239+
std::string encryption_type_from_cfg;
240+
bool found = false;
241+
encryption_type_from_cfg = config.get("sm.encryption_type", &found);
242+
assert(found);
243+
auto [st, et] = encryption_type_enum(encryption_type_from_cfg);
244+
throw_if_not_ok(st);
245+
encryption_type = et.value();
230246

231-
if (!encryption_key_from_cfg.empty()) {
232-
encryption_key = encryption_key_from_cfg.c_str();
233-
key_length = static_cast<uint32_t>(encryption_key_from_cfg.size());
234-
std::string encryption_type_from_cfg;
235-
bool found = false;
236-
encryption_type_from_cfg = config.get("sm.encryption_type", &found);
237-
assert(found);
238-
auto [st, et] = encryption_type_enum(encryption_type_from_cfg);
239-
throw_if_not_ok(st);
240-
encryption_type = et.value();
241-
242-
if (!EncryptionKey::is_valid_key_length(
243-
encryption_type,
244-
static_cast<uint32_t>(encryption_key_from_cfg.size()))) {
245-
encryption_key = nullptr;
246-
key_length = 0;
247+
if (!EncryptionKey::is_valid_key_length(
248+
encryption_type,
249+
static_cast<uint32_t>(encryption_key_from_cfg.size()))) {
250+
encryption_key = nullptr;
251+
key_length = 0;
252+
}
247253
}
248-
}
249254

250-
// Consolidate
251-
auto consolidator = Consolidator::create(
252-
ConsolidationMode::FRAGMENT, config, storage_manager);
253-
auto fragment_consolidator =
254-
dynamic_cast<FragmentConsolidator*>(consolidator.get());
255-
throw_if_not_ok(fragment_consolidator->consolidate_fragments(
256-
array_name, encryption_type, encryption_key, key_length, fragment_uris));
255+
// Consolidate
256+
auto fragment_consolidator =
257+
make_shared<FragmentConsolidator>(HERE(), config, storage_manager);
258+
throw_if_not_ok(fragment_consolidator->consolidate_fragments(
259+
array_name,
260+
encryption_type,
261+
encryption_key,
262+
key_length,
263+
fragment_uris));
264+
}
257265
}
258266

259267
void Consolidator::write_consolidated_commits_file(

tiledb/sm/rest/rest_client.cc

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1570,10 +1570,12 @@ Status RestClient::ensure_json_null_delimited_string(Buffer* buffer) {
15701570
}
15711571

15721572
Status RestClient::post_consolidation_to_rest(
1573-
const URI& uri, const Config& config) {
1573+
const URI& uri,
1574+
const Config& config,
1575+
const std::vector<std::string>* fragment_uris) {
15741576
Buffer buff;
1575-
RETURN_NOT_OK(serialization::array_consolidation_request_serialize(
1576-
config, serialization_type_, &buff));
1577+
serialization::array_consolidation_request_serialize(
1578+
config, fragment_uris, serialization_type_, &buff);
15771579
// Wrap in a list
15781580
BufferList serialized;
15791581
RETURN_NOT_OK(serialized.add_buffer(std::move(buff)));
@@ -1817,7 +1819,8 @@ void RestClient::delete_group_from_rest(const URI&, bool) {
18171819
throw RestClientDisabledException();
18181820
}
18191821

1820-
Status RestClient::post_consolidation_to_rest(const URI&, const Config&) {
1822+
Status RestClient::post_consolidation_to_rest(
1823+
const URI&, const Config&, const std::vector<std::string>*) {
18211824
throw RestClientDisabledException();
18221825
}
18231826

0 commit comments

Comments
 (0)