26
26
27
27
namespace doris {
28
28
29
+ enum class TxnOpParamType : int {
30
+ ILLEGAL,
31
+ WITH_TXN_ID,
32
+ WITH_LABEL,
33
+ };
34
+
29
35
CloudStreamLoadExecutor::CloudStreamLoadExecutor (ExecEnv* exec_env)
30
36
: StreamLoadExecutor(exec_env) {}
31
37
@@ -42,13 +48,48 @@ Status CloudStreamLoadExecutor::pre_commit_txn(StreamLoadContext* ctx) {
42
48
}
43
49
44
50
Status CloudStreamLoadExecutor::operate_txn_2pc (StreamLoadContext* ctx) {
45
- VLOG_DEBUG << " operate_txn_2pc, op: " << ctx->txn_operation ;
51
+ std::stringstream ss;
52
+ ss << " db_id=" << ctx->db_id << " txn_id=" << ctx->txn_id << " label=" << ctx->label
53
+ << " txn_2pc_op=" << ctx->txn_operation ;
54
+ std::string op_info = ss.str ();
55
+ VLOG_DEBUG << " operate_txn_2pc " << op_info;
56
+ TxnOpParamType topt = ctx->txn_id > 0 ? TxnOpParamType::WITH_TXN_ID
57
+ : !ctx->label .empty () ? TxnOpParamType::WITH_LABEL
58
+ : TxnOpParamType::ILLEGAL;
59
+
60
+ Status st = Status::InternalError<false >(" impossible branch reached, " + op_info);
61
+
46
62
if (ctx->txn_operation .compare (" commit" ) == 0 ) {
47
- return _exec_env->storage_engine ().to_cloud ().meta_mgr ().commit_txn (*ctx, true );
63
+ if (topt == TxnOpParamType::WITH_TXN_ID) {
64
+ VLOG_DEBUG << " 2pc commit stream load txn directly: " << op_info;
65
+ st = _exec_env->storage_engine ().to_cloud ().meta_mgr ().commit_txn (*ctx, true );
66
+ } else if (topt == TxnOpParamType::WITH_LABEL) {
67
+ VLOG_DEBUG << " 2pc commit stream load txn with FE support: " << op_info;
68
+ st = StreamLoadExecutor::operate_txn_2pc (ctx);
69
+ } else {
70
+ st = Status::InternalError<false >(
71
+ " failed to 2pc commit txn, with TxnOpParamType::illegal input, " + op_info);
72
+ }
73
+ } else if (ctx->txn_operation .compare (" abort" ) == 0 ) {
74
+ if (topt == TxnOpParamType::WITH_TXN_ID) {
75
+ LOG (INFO) << " 2pc abort stream load txn directly: " << op_info;
76
+ st = _exec_env->storage_engine ().to_cloud ().meta_mgr ().abort_txn (*ctx);
77
+ WARN_IF_ERROR (st, " failed to rollback txn " + op_info);
78
+ } else if (topt == TxnOpParamType::WITH_LABEL) { // maybe a label send to FE to abort
79
+ VLOG_DEBUG << " 2pc abort stream load txn with FE support: " << op_info;
80
+ StreamLoadExecutor::rollback_txn (ctx);
81
+ st = Status::OK ();
82
+ } else {
83
+ st = Status::InternalError<false >(" failed abort txn, with illegal input, " + op_info);
84
+ }
48
85
} else {
49
- // 2pc abort
50
- return _exec_env->storage_engine ().to_cloud ().meta_mgr ().abort_txn (*ctx);
86
+ std::string msg =
87
+ " failed to operate_txn_2pc, unrecognized operation: " + ctx->txn_operation ;
88
+ LOG (WARNING) << msg << " " << op_info;
89
+ st = Status::InternalError<false >(msg + " " + op_info);
51
90
}
91
+ WARN_IF_ERROR (st, " failed to operate_txn_2pc " + op_info)
92
+ return st;
52
93
}
53
94
54
95
Status CloudStreamLoadExecutor::commit_txn (StreamLoadContext* ctx) {
@@ -85,8 +126,24 @@ Status CloudStreamLoadExecutor::commit_txn(StreamLoadContext* ctx) {
85
126
}
86
127
87
128
void CloudStreamLoadExecutor::rollback_txn (StreamLoadContext* ctx) {
88
- WARN_IF_ERROR (_exec_env->storage_engine ().to_cloud ().meta_mgr ().abort_txn (*ctx),
89
- " Failed to rollback txn" );
129
+ std::stringstream ss;
130
+ ss << " db_id=" << ctx->db_id << " txn_id=" << ctx->txn_id << " label=" << ctx->label ;
131
+ std::string op_info = ss.str ();
132
+ LOG (INFO) << " rollback stream laod txn " << op_info;
133
+ TxnOpParamType topt = ctx->txn_id > 0 ? TxnOpParamType::WITH_TXN_ID
134
+ : !ctx->label .empty () ? TxnOpParamType::WITH_LABEL
135
+ : TxnOpParamType::ILLEGAL;
136
+
137
+ if (topt == TxnOpParamType::WITH_TXN_ID) {
138
+ VLOG_DEBUG << " abort stream load txn directly: " << op_info;
139
+ WARN_IF_ERROR (_exec_env->storage_engine ().to_cloud ().meta_mgr ().abort_txn (*ctx),
140
+ " failed to rollback txn " + op_info);
141
+ } else { // maybe a label send to FE to abort
142
+ // does not care about the return status
143
+ // ctx->db_id > 0 && !ctx->label.empty()
144
+ VLOG_DEBUG << " abort stream load txn with FE support: " << op_info;
145
+ StreamLoadExecutor::rollback_txn (ctx);
146
+ }
90
147
}
91
148
92
149
} // namespace doris
0 commit comments