Skip to content

Commit 7b62b55

Browse files
authored
refactor: optimize write performance for multiple replicas (#244)
* fix: opendal is not available (#241) * feat: improve s3 api error message * fix: fix data loading bug * refactor: optimize write performance for multiple replicas * feat: load command adds watch option
1 parent d569c0e commit 7b62b55

File tree

26 files changed

+508
-102
lines changed

26 files changed

+508
-102
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,3 +121,4 @@ bigdecimal = "0.4.8"
121121
bitvec = "1.0.1"
122122
config = "0.13.4"
123123
tempfile = "3.21.0"
124+
md-5 = "0.10.6"

curvine-cli/src/cmds/load.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use crate::cmds::LoadStatusCommand;
1516
use crate::util::*;
1617
use clap::Parser;
1718
use curvine_client::rpc::JobMasterClient;
@@ -24,6 +25,10 @@ pub struct LoadCommand {
2425

2526
#[arg(long, default_value = "${CURVINE_CONF_FILE}")]
2627
conf: String,
28+
29+
/// Watch load job status after submission
30+
#[arg(long, short = 'w')]
31+
watch: bool,
2732
}
2833

2934
impl LoadCommand {
@@ -39,6 +44,18 @@ impl LoadCommand {
3944
let command = LoadJobCommand::builder(&self.path).build();
4045
let rep = handle_rpc_result(client.submit_load_job(command)).await;
4146
println!("{}", rep);
47+
48+
if self.watch {
49+
let status_command = LoadStatusCommand::new(
50+
rep.job_id.clone(),
51+
false,
52+
"1s".to_string(),
53+
self.conf.clone(),
54+
);
55+
56+
status_command.execute(client).await?;
57+
}
58+
4259
Ok(())
4360
}
4461
}

curvine-cli/src/cmds/load_status.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,14 @@ pub struct LoadStatusCommand {
3434
}
3535

3636
impl LoadStatusCommand {
37+
pub fn new(job_id: String, verbose: bool, watch: String, conf: String) -> Self {
38+
Self {
39+
job_id,
40+
verbose,
41+
watch: Some(watch),
42+
conf,
43+
}
44+
}
3745
pub async fn execute(&self, client: JobMasterClient) -> CommonResult<()> {
3846
println!("\n Checking status for {}", self.job_id);
3947

curvine-client/src/block/block_writer.rs

Lines changed: 69 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use crate::block::{BlockWriterLocal, BlockWriterRemote};
1717
use crate::file::FsContext;
1818
use curvine_common::state::{BlockLocation, CommitBlock, LocatedBlock, WorkerAddress};
1919
use curvine_common::FsResult;
20+
use futures::future::try_join_all;
2021
use orpc::err_box;
2122
use orpc::runtime::{RpcRuntime, Runtime};
2223
use orpc::sys::DataSlice;
@@ -125,51 +126,93 @@ impl BlockWriter {
125126

126127
pub async fn write(&mut self, chunk: DataSlice) -> FsResult<()> {
127128
let chunk = chunk.freeze();
128-
for writer in &mut self.inners {
129-
if let Err(e) = writer.write(chunk.clone()).await {
130-
self.fs_context.add_failed_worker(writer.worker_address());
131-
return Err(e);
132-
}
129+
let mut futures = Vec::with_capacity(self.inners.len());
130+
for writer in self.inners.iter_mut() {
131+
let chunk_clone = chunk.clone();
132+
let task = async move {
133+
writer
134+
.write(chunk_clone)
135+
.await
136+
.map_err(|e| (writer.worker_address().clone(), e))
137+
};
138+
futures.push(task);
139+
}
140+
141+
if let Err((worker_addr, e)) = try_join_all(futures).await {
142+
self.fs_context.add_failed_worker(&worker_addr);
143+
return Err(e);
133144
}
134145
Ok(())
135146
}
136147

137-
pub fn blocking_write(&mut self, rt: &Runtime, buf: DataSlice) -> FsResult<()> {
138-
for writer in &mut self.inners {
139-
if let Err(e) = writer.blocking_write(rt, buf.clone()) {
140-
self.fs_context.add_failed_worker(writer.worker_address());
141-
return Err(e);
148+
pub fn blocking_write(&mut self, rt: &Runtime, chunk: DataSlice) -> FsResult<()> {
149+
if self.inners.len() == 1 {
150+
if let Err(e) = self.inners[0].blocking_write(rt, chunk) {
151+
self.fs_context
152+
.add_failed_worker(self.inners[0].worker_address());
153+
Err(e)
154+
} else {
155+
Ok(())
142156
}
157+
} else {
158+
rt.block_on(self.write(chunk))?;
159+
Ok(())
143160
}
144-
Ok(())
145161
}
146162

147163
pub async fn flush(&mut self) -> FsResult<()> {
148-
for writer in &mut self.inners {
149-
if let Err(e) = writer.flush().await {
150-
self.fs_context.add_failed_worker(writer.worker_address());
151-
return Err(e);
152-
}
164+
let mut futures = Vec::with_capacity(self.inners.len());
165+
for writer in self.inners.iter_mut() {
166+
let task = async move {
167+
writer
168+
.flush()
169+
.await
170+
.map_err(|e| (writer.worker_address().clone(), e))
171+
};
172+
futures.push(task);
173+
}
174+
175+
if let Err((worker_addr, e)) = try_join_all(futures).await {
176+
self.fs_context.add_failed_worker(&worker_addr);
177+
return Err(e);
153178
}
154179
Ok(())
155180
}
156181

157182
pub async fn complete(&mut self) -> FsResult<CommitBlock> {
158-
for writer in &mut self.inners {
159-
if let Err(e) = writer.complete().await {
160-
self.fs_context.add_failed_worker(writer.worker_address());
161-
return Err(e);
162-
}
183+
let mut futures = Vec::with_capacity(self.inners.len());
184+
for writer in self.inners.iter_mut() {
185+
let task = async move {
186+
writer
187+
.complete()
188+
.await
189+
.map_err(|e| (writer.worker_address().clone(), e))
190+
};
191+
futures.push(task);
192+
}
193+
194+
if let Err((worker_addr, e)) = try_join_all(futures).await {
195+
self.fs_context.add_failed_worker(&worker_addr);
196+
return Err(e);
163197
}
164198
Ok(self.to_commit_block())
165199
}
166200

167201
pub async fn cancel(&mut self) -> FsResult<()> {
168-
for writer in &mut self.inners {
169-
if let Err(e) = writer.cancel().await {
170-
self.fs_context.add_failed_worker(writer.worker_address());
171-
return Err(e);
172-
}
202+
let mut futures = Vec::with_capacity(self.inners.len());
203+
for writer in self.inners.iter_mut() {
204+
let task = async move {
205+
writer
206+
.cancel()
207+
.await
208+
.map_err(|e| (writer.worker_address().clone(), e))
209+
};
210+
futures.push(task);
211+
}
212+
213+
if let Err((worker_addr, e)) = try_join_all(futures).await {
214+
self.fs_context.add_failed_worker(&worker_addr);
215+
return Err(e);
173216
}
174217
Ok(())
175218
}

curvine-client/src/unified/macros.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -194,13 +194,8 @@ macro_rules! impl_filesystem_for_enum {
194194
::curvine_common::fs::FileSystem<
195195
$crate::unified::UnifiedWriter,
196196
$crate::unified::UnifiedReader,
197-
::curvine_common::conf::UfsConf,
198197
> for $enum_name
199198
{
200-
fn conf(&self) -> &::curvine_common::conf::UfsConf {
201-
match_fs_variants!(self, conf)
202-
}
203-
204199
async fn mkdir(
205200
&self,
206201
path: &::curvine_common::fs::Path,

curvine-client/src/unified/mod.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ use crate::file::{FsReader, FsWriter};
1616
use crate::impl_filesystem_for_enum;
1717
use crate::*;
1818
use crate::{impl_reader_for_enum, impl_writer_for_enum};
19-
use curvine_common::conf::UfsConf;
2019
use curvine_common::fs::Path;
2120
use curvine_common::state::MountInfo;
2221
use curvine_common::FsResult;
@@ -78,15 +77,15 @@ impl UfsFileSystem {
7877
match path.scheme() {
7978
#[cfg(feature = "s3")]
8079
Some(S3_SCHEME) => {
81-
let fs = S3FileSystem::new(UfsConf::with_map(conf))?;
80+
let fs = S3FileSystem::new(conf)?;
8281
Ok(UfsFileSystem::S3(fs))
8382
}
8483

8584
#[cfg(feature = "opendal")]
8685
Some(scheme)
8786
if ["s3", "oss", "cos", "gcs", "azure", "azblob", "hdfs"].contains(&scheme) =>
8887
{
89-
let fs = OpendalFileSystem::new(path, UfsConf::with_map(conf))?;
88+
let fs = OpendalFileSystem::new(path, conf)?;
9089
Ok(UfsFileSystem::OpenDAL(fs))
9190
}
9291

curvine-client/src/unified/unified_filesystem.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ impl UnifiedFileSystem {
6969
Ok(fs)
7070
}
7171

72+
pub fn conf(&self) -> &ClusterConf {
73+
self.cv.conf()
74+
}
75+
7276
pub fn cv(&self) -> CurvineFileSystem {
7377
self.cv.clone()
7478
}
@@ -206,11 +210,7 @@ impl UnifiedFileSystem {
206210
}
207211
}
208212

209-
impl FileSystem<UnifiedWriter, UnifiedReader, ClusterConf> for UnifiedFileSystem {
210-
fn conf(&self) -> &ClusterConf {
211-
self.cv.conf()
212-
}
213-
213+
impl FileSystem<UnifiedWriter, UnifiedReader> for UnifiedFileSystem {
214214
async fn mkdir(&self, path: &Path, create_parent: bool) -> FsResult<bool> {
215215
match self.get_mount(path).await? {
216216
None => self.cv.mkdir(path, create_parent).await,

curvine-common/src/conf/job_conf.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,11 @@ pub struct JobConf {
5353
}
5454

5555
impl JobConf {
56-
pub const DEFAULT_JOB_LIFE_TTL: &'static str = "6h";
56+
pub const DEFAULT_JOB_LIFE_TTL: &'static str = "24h";
5757
pub const DEFAULT_JOB_CLEANUP_TTL_STR: &'static str = "10m";
5858
pub const DEFAULT_JOB_MAX_FILES: usize = 100000;
5959
pub const DEFAULT_TASK_TIMEOUT: &'static str = "1h";
60-
pub const DEFAULT_TASK_REPORT_INTERVAL: &'static str = "5s";
60+
pub const DEFAULT_TASK_REPORT_INTERVAL: &'static str = "10s";
6161
pub const DEFAULT_WORKER_MAX_CONCURRENT_TASKS: usize = 1000;
6262

6363
pub fn init(&mut self) -> FsResult<()> {

curvine-common/src/fs/filesystem.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,7 @@ use crate::FsResult;
2020
use prost::bytes::BytesMut;
2121
use std::future::Future;
2222

23-
pub trait FileSystem<Writer, Reader, Conf> {
24-
fn conf(&self) -> &Conf;
25-
23+
pub trait FileSystem<Writer, Reader> {
2624
fn mkdir(&self, path: &Path, create_parent: bool) -> impl Future<Output = FsResult<bool>>;
2725

2826
fn create(&self, path: &Path, overwrite: bool) -> impl Future<Output = FsResult<Writer>>;

0 commit comments

Comments
 (0)