From 7229ce6427e8ffb3c944aa8cada2309b87ccf563 Mon Sep 17 00:00:00 2001 From: Ewan Chou Date: Wed, 27 Dec 2017 13:49:27 +0800 Subject: [PATCH 01/11] *: shard implicit row ID --- ast/ddl.go | 1 + ddl/ddl_api.go | 53 ++++++++++++++++++++++++++++++++++++++++-- ddl/ddl_worker.go | 2 ++ ddl/table.go | 23 ++++++++++++++++++ executor/show.go | 4 ++++ model/ddl.go | 3 +++ model/model.go | 3 +++ parser/misc.go | 1 + parser/parser.y | 5 ++++ table/tables/tables.go | 9 ++++++- 10 files changed, 101 insertions(+), 3 deletions(-) diff --git a/ast/ddl.go b/ast/ddl.go index e4e59871acdf1..4f365556bdf9f 100644 --- a/ast/ddl.go +++ b/ast/ddl.go @@ -636,6 +636,7 @@ const ( TableOptionDelayKeyWrite TableOptionRowFormat TableOptionStatsPersistent + TableOptionShardRowID ) // RowFormat types diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 7a3a307a85294..efdbc80f8ad77 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -828,8 +828,22 @@ func handleTableOptions(options []*ast.TableOption, tbInfo *model.TableInfo) { tbInfo.Charset = op.StrValue case ast.TableOptionCollate: tbInfo.Collate = op.StrValue + case ast.TableOptionShardRowID: + if !hasAutoIncrementColumn(tbInfo) { + //tbInfo.ShardRowID = op.UintValue != 0 + tbInfo.ShardRowID = true + } + } + } +} + +func hasAutoIncrementColumn(tbInfo *model.TableInfo) bool { + for _, col := range tbInfo.Columns { + if mysql.HasAutoIncrementFlag(col.Flag) { + return true } } + return false } func (d *ddl) AlterTable(ctx context.Context, ident ast.Ident, specs []*ast.AlterTableSpec) (err error) { @@ -888,9 +902,14 @@ func (d *ddl) AlterTable(ctx context.Context, ident ast.Ident, specs []*ast.Alte err = ErrUnsupportedModifyPrimaryKey.GenByArgs("drop") case ast.AlterTableOption: for _, opt := range spec.Options { - if opt.Tp == ast.TableOptionAutoIncrement { + switch opt.Tp { + case ast.TableOptionShardRowID: + err = d.ShardRowID(ctx, ident, opt.UintValue) + case ast.TableOptionAutoIncrement: err = d.RebaseAutoID(ctx, ident, int64(opt.UintValue)) - break + } + if err != nil { + return errors.Trace(err) } } default: @@ -932,6 +951,36 @@ func (d *ddl) RebaseAutoID(ctx context.Context, ident ast.Ident, newBase int64) return errors.Trace(err) } +func (d *ddl) ShardRowID(ctx context.Context, ident ast.Ident, uVal uint64) error { + job, err := d.createJobByIdent(ident) + if err != nil { + return errors.Trace(err) + } + job.Type = model.ActionShardRowID + job.Args = []interface{}{uVal} + err = d.doDDLJob(ctx, job) + err = d.callHookOnChanged(err) + return errors.Trace(err) +} + +func (d *ddl) createJobByIdent(ident ast.Ident) (*model.Job, error) { + is := d.GetInformationSchema() + schema, ok := is.SchemaByName(ident.Schema) + if !ok { + return nil, infoschema.ErrDatabaseNotExists.GenByArgs(ident.Schema) + } + t, err := is.TableByName(ident.Schema, ident.Name) + if err != nil { + return nil, errors.Trace(infoschema.ErrTableNotExists.GenByArgs(ident.Schema, ident.Name)) + } + job := &model.Job{ + SchemaID: schema.ID, + TableID: t.Meta().ID, + BinlogInfo: &model.HistoryInfo{}, + } + return job, nil +} + func checkColumnConstraint(constraints []*ast.ColumnOption) error { for _, constraint := range constraints { switch constraint.Tp { diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index dc3fb3da5f9bf..d36b31e7d714f 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -305,6 +305,8 @@ func (d *ddl) runDDLJob(t *meta.Meta, job *model.Job) (ver int64) { ver, err = d.onRenameTable(t, job) case model.ActionSetDefaultValue: ver, err = d.onSetDefaultValue(t, job) + case model.ActionShardRowID: + ver, err = d.onShardRowID(t, job) default: // Invalid job, cancel it. job.State = model.JobStateCancelled diff --git a/ddl/table.go b/ddl/table.go index 37a492725fcc9..eeb81dd748a67 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -259,6 +259,29 @@ func (d *ddl) onRebaseAutoID(t *meta.Meta, job *model.Job) (ver int64, _ error) return ver, nil } +func (d *ddl) onShardRowID(t *meta.Meta, job *model.Job) (ver int64, _ error) { + var shardRowIDOn uint64 + err := job.DecodeArgs(&shardRowIDOn) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + tblInfo, err := getTableInfo(t, job, job.SchemaID) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + tblInfo.ShardRowID = shardRowIDOn != 0 + job.State = model.JobStateDone + job.BinlogInfo.AddTableInfo(ver, tblInfo) + ver, err = updateTableInfo(t, job, tblInfo, tblInfo.State) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + return ver, nil +} + func (d *ddl) onRenameTable(t *meta.Meta, job *model.Job) (ver int64, _ error) { var oldSchemaID int64 var tableName model.CIStr diff --git a/executor/show.go b/executor/show.go index bf62ca463657e..4e5e61180c429 100644 --- a/executor/show.go +++ b/executor/show.go @@ -623,6 +623,10 @@ func (e *ShowExec) fetchShowCreateTable() error { } } + if tb.Meta().ShardRowID { + buf.WriteString(" SHARD_ROW_ID=1") + } + if len(tb.Meta().Comment) > 0 { buf.WriteString(fmt.Sprintf(" COMMENT='%s'", format.OutputFormat(tb.Meta().Comment))) } diff --git a/model/ddl.go b/model/ddl.go index 3880aeaafb8df..564d9a06f3fe5 100644 --- a/model/ddl.go +++ b/model/ddl.go @@ -43,6 +43,7 @@ const ( ActionRebaseAutoID ActionRenameTable ActionSetDefaultValue + ActionShardRowID ) func (action ActionType) String() string { @@ -77,6 +78,8 @@ func (action ActionType) String() string { return "rename table" case ActionSetDefaultValue: return "set default value" + case ActionShardRowID: + return "shard row ID" default: return "none" } diff --git a/model/model.go b/model/model.go index e72c47ad55d53..9d01347543b3c 100644 --- a/model/model.go +++ b/model/model.go @@ -113,6 +113,9 @@ type TableInfo struct { // TODO: Remove it. // Now it only uses for compatibility with the old version that already uses this field. OldSchemaID int64 `json:"old_schema_id,omitempty"` + + // ShardRowID specify if the implicit row ID is sharded. + ShardRowID bool } // GetDBID returns the schema ID that is used to create an allocator. diff --git a/parser/misc.go b/parser/misc.go index 1d982b123751b..c6e9a419d0ff5 100644 --- a/parser/misc.go +++ b/parser/misc.go @@ -375,6 +375,7 @@ var tokenMap = map[string]int{ "QUARTER": quarter, "QUERY": query, "QUICK": quick, + "SHARD_ROW_ID": shardRowID, "RANGE": rangeKwd, "READ": read, "REAL": realType, diff --git a/parser/parser.y b/parser/parser.y index e15d0802d3adf..39a35102a6890 100644 --- a/parser/parser.y +++ b/parser/parser.y @@ -181,6 +181,7 @@ import ( precisionType "PRECISION" primary "PRIMARY" procedure "PROCEDURE" + shardRowID "SHARD_ROW_ID" rangeKwd "RANGE" read "READ" realType "REAL" @@ -5272,6 +5273,10 @@ TableOption: { $$ = &ast.TableOption{Tp: ast.TableOptionStatsPersistent} } +| "SHARD_ROW_ID" EqOpt LengthNum + { + $$ = &ast.TableOption{Tp: ast.TableOptionShardRowID, UintValue: $3.(uint64)} + } StatsPersistentVal: "DEFAULT" diff --git a/table/tables/tables.go b/table/tables/tables.go index a2013e1c23b3d..aca3dbce6985e 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -698,7 +698,14 @@ func GetColDefaultValue(ctx context.Context, col *table.Column, defaultVals []ty // AllocAutoID implements table.Table AllocAutoID interface. func (t *Table) AllocAutoID(ctx context.Context) (int64, error) { - return t.Allocator(ctx).Alloc(t.ID) + rowID, err := t.Allocator(ctx).Alloc(t.ID) + if err != nil { + return 0, errors.Trace(err) + } + if t.meta.ShardRowID { + rowID |= (rowID & 0x7fff) << 48 + } + return rowID, nil } // Allocator implements table.Table Allocator interface. From 2a6a84cbf613176bfe85366922f1fffe93a850fb Mon Sep 17 00:00:00 2001 From: Ewan Chou Date: Fri, 29 Dec 2017 11:22:40 +0800 Subject: [PATCH 02/11] *: change option to shard row ID bits. --- ddl/ddl_api.go | 9 +++++++-- ddl/table.go | 6 +++--- executor/show.go | 4 ++-- model/model.go | 4 ++-- parser/misc.go | 2 +- parser/parser.y | 4 ++-- table/tables/tables.go | 4 ++-- 7 files changed, 19 insertions(+), 14 deletions(-) diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index efdbc80f8ad77..c26138c5df04c 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -830,8 +830,10 @@ func handleTableOptions(options []*ast.TableOption, tbInfo *model.TableInfo) { tbInfo.Collate = op.StrValue case ast.TableOptionShardRowID: if !hasAutoIncrementColumn(tbInfo) { - //tbInfo.ShardRowID = op.UintValue != 0 - tbInfo.ShardRowID = true + tbInfo.ShardRowIDBits = op.UintValue + if tbInfo.ShardRowIDBits > 15 { + tbInfo.ShardRowIDBits = 15 + } } } } @@ -904,6 +906,9 @@ func (d *ddl) AlterTable(ctx context.Context, ident ast.Ident, specs []*ast.Alte for _, opt := range spec.Options { switch opt.Tp { case ast.TableOptionShardRowID: + if opt.UintValue > 15 { + opt.UintValue = 15 + } err = d.ShardRowID(ctx, ident, opt.UintValue) case ast.TableOptionAutoIncrement: err = d.RebaseAutoID(ctx, ident, int64(opt.UintValue)) diff --git a/ddl/table.go b/ddl/table.go index eeb81dd748a67..245104ea10e15 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -260,8 +260,8 @@ func (d *ddl) onRebaseAutoID(t *meta.Meta, job *model.Job) (ver int64, _ error) } func (d *ddl) onShardRowID(t *meta.Meta, job *model.Job) (ver int64, _ error) { - var shardRowIDOn uint64 - err := job.DecodeArgs(&shardRowIDOn) + var shardRowIDBits uint64 + err := job.DecodeArgs(&shardRowIDBits) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) @@ -271,7 +271,7 @@ func (d *ddl) onShardRowID(t *meta.Meta, job *model.Job) (ver int64, _ error) { job.State = model.JobStateCancelled return ver, errors.Trace(err) } - tblInfo.ShardRowID = shardRowIDOn != 0 + tblInfo.ShardRowIDBits = shardRowIDBits job.State = model.JobStateDone job.BinlogInfo.AddTableInfo(ver, tblInfo) ver, err = updateTableInfo(t, job, tblInfo, tblInfo.State) diff --git a/executor/show.go b/executor/show.go index 4e5e61180c429..094b480be07d2 100644 --- a/executor/show.go +++ b/executor/show.go @@ -623,8 +623,8 @@ func (e *ShowExec) fetchShowCreateTable() error { } } - if tb.Meta().ShardRowID { - buf.WriteString(" SHARD_ROW_ID=1") + if tb.Meta().ShardRowIDBits > 0 { + buf.WriteString(fmt.Sprintf(" SHARD_ROW_ID_BITS=%d", tb.Meta().ShardRowIDBits)) } if len(tb.Meta().Comment) > 0 { diff --git a/model/model.go b/model/model.go index 9d01347543b3c..e46a0f411d11b 100644 --- a/model/model.go +++ b/model/model.go @@ -114,8 +114,8 @@ type TableInfo struct { // Now it only uses for compatibility with the old version that already uses this field. OldSchemaID int64 `json:"old_schema_id,omitempty"` - // ShardRowID specify if the implicit row ID is sharded. - ShardRowID bool + // ShardRowIDBits specify if the implicit row ID is sharded. + ShardRowIDBits uint64 } // GetDBID returns the schema ID that is used to create an allocator. diff --git a/parser/misc.go b/parser/misc.go index c6e9a419d0ff5..df7b4a92dd24a 100644 --- a/parser/misc.go +++ b/parser/misc.go @@ -375,7 +375,7 @@ var tokenMap = map[string]int{ "QUARTER": quarter, "QUERY": query, "QUICK": quick, - "SHARD_ROW_ID": shardRowID, + "SHARD_ROW_ID_BITS": shardRowIDBits, "RANGE": rangeKwd, "READ": read, "REAL": realType, diff --git a/parser/parser.y b/parser/parser.y index 39a35102a6890..fcc26bd053ae2 100644 --- a/parser/parser.y +++ b/parser/parser.y @@ -181,7 +181,7 @@ import ( precisionType "PRECISION" primary "PRIMARY" procedure "PROCEDURE" - shardRowID "SHARD_ROW_ID" + shardRowIDBits "SHARD_ROW_ID_BITS" rangeKwd "RANGE" read "READ" realType "REAL" @@ -5273,7 +5273,7 @@ TableOption: { $$ = &ast.TableOption{Tp: ast.TableOptionStatsPersistent} } -| "SHARD_ROW_ID" EqOpt LengthNum +| "SHARD_ROW_ID_BITS" EqOpt LengthNum { $$ = &ast.TableOption{Tp: ast.TableOptionShardRowID, UintValue: $3.(uint64)} } diff --git a/table/tables/tables.go b/table/tables/tables.go index aca3dbce6985e..cb2624a71115d 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -702,8 +702,8 @@ func (t *Table) AllocAutoID(ctx context.Context) (int64, error) { if err != nil { return 0, errors.Trace(err) } - if t.meta.ShardRowID { - rowID |= (rowID & 0x7fff) << 48 + if t.meta.ShardRowIDBits > 0 { + rowID |= (rowID & (1< Date: Sat, 30 Dec 2017 23:17:12 +0800 Subject: [PATCH 03/11] *: use the same shard value for a transaction. --- sessionctx/variable/session.go | 1 + table/tables/tables.go | 8 +++++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 18b8a2122da5d..a0bad0fd7efbe 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -87,6 +87,7 @@ type TransactionContext struct { Histroy interface{} SchemaVersion int64 StartTS uint64 + Shard *int64 TableDeltaMap map[int64]TableDelta } diff --git a/table/tables/tables.go b/table/tables/tables.go index cb2624a71115d..be7d63a972678 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -19,6 +19,7 @@ package tables import ( "math" + "math/rand" "strings" "github.com/juju/errors" @@ -703,7 +704,12 @@ func (t *Table) AllocAutoID(ctx context.Context) (int64, error) { return 0, errors.Trace(err) } if t.meta.ShardRowIDBits > 0 { - rowID |= (rowID & (1< Date: Tue, 2 Jan 2018 14:45:31 +0800 Subject: [PATCH 04/11] *: increase auto ID step. --- meta/autoid/autoid.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/meta/autoid/autoid.go b/meta/autoid/autoid.go index 637b22807b1d3..6d724044ed67d 100644 --- a/meta/autoid/autoid.go +++ b/meta/autoid/autoid.go @@ -27,7 +27,7 @@ import ( ) // Test needs to change it, so it's a variable. -var step = int64(5000) +var step = int64(100000) var errInvalidTableID = terror.ClassAutoid.New(codeInvalidTableID, "invalid TableID") From 09be7513d2d958007d8951c916cfc38f944f149a Mon Sep 17 00:00:00 2001 From: Ewan Chou Date: Tue, 2 Jan 2018 15:49:46 +0800 Subject: [PATCH 05/11] *: use hash instead of rand to avoid global lock and add test --- ddl/ddl_db_test.go | 5 +++++ executor/ddl_test.go | 27 +++++++++++++++++++++++++++ executor/executor_test.go | 6 ++++++ meta/autoid/autoid.go | 2 +- parser/parser_test.go | 2 ++ table/tables/tables.go | 12 ++++++++++-- 6 files changed, 51 insertions(+), 3 deletions(-) diff --git a/ddl/ddl_db_test.go b/ddl/ddl_db_test.go index cdc9cea42a144..a718aaf6f22c8 100644 --- a/ddl/ddl_db_test.go +++ b/ddl/ddl_db_test.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" + "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/model" "github.com/pingcap/tidb/mysql" tmysql "github.com/pingcap/tidb/mysql" @@ -66,6 +67,7 @@ type testDBSuite struct { tk *testkit.TestKit s tidb.Session lease time.Duration + autoIDStep int64 } func (s *testDBSuite) SetUpSuite(c *C) { @@ -77,6 +79,8 @@ func (s *testDBSuite) SetUpSuite(c *C) { tidb.SetSchemaLease(s.lease) tidb.SetStatsLease(0) s.schemaName = "test_db" + s.autoIDStep = autoid.GetStep() + autoid.SetStep(5000) s.store, err = tikv.NewMockTikvStore() c.Assert(err, IsNil) @@ -99,6 +103,7 @@ func (s *testDBSuite) TearDownSuite(c *C) { s.dom.Close() s.store.Close() testleak.AfterTest(c)() + autoid.SetStep(s.autoIDStep) } func (s *testDBSuite) testErrorCode(c *C, sql string, errCode int) { diff --git a/executor/ddl_test.go b/executor/ddl_test.go index bc0a364b0cc72..d44732d6cf9c9 100644 --- a/executor/ddl_test.go +++ b/executor/ddl_test.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/model" "github.com/pingcap/tidb/mysql" "github.com/pingcap/tidb/plan" + "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/testkit" goctx "golang.org/x/net/context" @@ -334,3 +335,29 @@ func (s *testSuite) TestTooLargeIdentifierLength(c *C) { _, err = tk.Exec(fmt.Sprintf("create index %s on t(c)", indexName2)) c.Assert(err.Error(), Equals, fmt.Sprintf("[ddl:1059]Identifier name '%s' is too long", indexName2)) } + +func (s *testSuite) TestShardRowIDBits(c *C) { + tk := testkit.NewTestKit(c, s.store) + + tk.MustExec("use test") + tk.MustExec("create table t (a int) shard_row_id_bits = 15") + for i := 0; i < 100; i++ { + tk.MustExec(fmt.Sprintf("insert t values (%d)", i)) + } + tbl, err := domain.GetDomain(tk.Se).InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + c.Assert(err, IsNil) + var hasShardedID bool + var count int + c.Assert(tk.Se.NewTxn(), IsNil) + err = tbl.IterRecords(tk.Se, tbl.FirstKey(), nil, func(h int64, rec []types.Datum, cols []*table.Column) (more bool, err error) { + c.Assert(h, GreaterEqual, int64(0)) + if (h >> 56) > 0 { + hasShardedID = true + } + count++ + return true, nil + }) + c.Assert(err, IsNil) + c.Assert(count, Equals, 100) + c.Assert(hasShardedID, IsTrue) +} diff --git a/executor/executor_test.go b/executor/executor_test.go index de03594fffea1..61b80c3f2b216 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/model" "github.com/pingcap/tidb/mysql" "github.com/pingcap/tidb/parser" @@ -64,11 +65,15 @@ type testSuite struct { mvccStore *mocktikv.MvccStore store kv.Storage *parser.Parser + + autoIDStep int64 } var mockTikv = flag.Bool("mockTikv", true, "use mock tikv store in executor test") func (s *testSuite) SetUpSuite(c *C) { + s.autoIDStep = autoid.GetStep() + autoid.SetStep(5000) s.Parser = parser.New() flag.Lookup("mockTikv") useMockTikv := *mockTikv @@ -95,6 +100,7 @@ func (s *testSuite) SetUpSuite(c *C) { func (s *testSuite) TearDownSuite(c *C) { s.store.Close() + autoid.SetStep(s.autoIDStep) } func (s *testSuite) SetUpTest(c *C) { diff --git a/meta/autoid/autoid.go b/meta/autoid/autoid.go index 6d724044ed67d..e36186091c407 100644 --- a/meta/autoid/autoid.go +++ b/meta/autoid/autoid.go @@ -27,7 +27,7 @@ import ( ) // Test needs to change it, so it's a variable. -var step = int64(100000) +var step = int64(30000) var errInvalidTableID = terror.ClassAutoid.New(codeInvalidTableID, "invalid TableID") diff --git a/parser/parser_test.go b/parser/parser_test.go index 553e806286742..c2321c82b9235 100644 --- a/parser/parser_test.go +++ b/parser/parser_test.go @@ -1472,6 +1472,7 @@ func (s *testParserSuite) TestDDL(c *C) { {"create table t (a timestamp default now() on update now)", false}, {"create table t (a timestamp default now() on update now())", true}, {"CREATE TABLE t (c TEXT) default CHARACTER SET utf8, default COLLATE utf8_general_ci;", true}, + {"CREATE TABLE t (c TEXT) shard_row_id_bits = 1;", true}, // Create table with ON UPDATE CURRENT_TIMESTAMP(6), specify fraction part. {"CREATE TABLE IF NOT EXISTS `general_log` (`event_time` timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6),`user_host` mediumtext NOT NULL,`thread_id` bigint(20) unsigned NOT NULL,`server_id` int(10) unsigned NOT NULL,`command_type` varchar(64) NOT NULL,`argument` mediumblob NOT NULL) ENGINE=CSV DEFAULT CHARSET=utf8 COMMENT='General log'", true}, @@ -1523,6 +1524,7 @@ func (s *testParserSuite) TestDDL(c *C) { {"ALTER TABLE t ENGINE = '', COMMENT='', default COLLATE = utf8_general_ci", true}, {"ALTER TABLE t ENGINE = '', ADD COLUMN a SMALLINT", true}, {"ALTER TABLE t default COLLATE = utf8_general_ci, ENGINE = '', ADD COLUMN a SMALLINT", true}, + {"ALTER TABLE t shard_row_id_bits = 1", true}, // For create index statement {"CREATE INDEX idx ON t (a)", true}, diff --git a/table/tables/tables.go b/table/tables/tables.go index be7d63a972678..0dee8d30d671d 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -18,8 +18,8 @@ package tables import ( + "encoding/binary" "math" - "math/rand" "strings" "github.com/juju/errors" @@ -36,6 +36,7 @@ import ( "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tipb/go-binlog" log "github.com/sirupsen/logrus" + "github.com/spaolacci/murmur3" ) // Table implements table.Table interface. @@ -706,7 +707,7 @@ func (t *Table) AllocAutoID(ctx context.Context) (int64, error) { if t.meta.ShardRowIDBits > 0 { txnCtx := ctx.GetSessionVars().TxnCtx if txnCtx.Shard == nil { - shard := (rand.Int63() & (1< Date: Tue, 2 Jan 2018 18:27:00 +0800 Subject: [PATCH 06/11] *: add comments --- ddl/ddl_api.go | 1 + 1 file changed, 1 insertion(+) diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index c26138c5df04c..bd21accb3a46c 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -956,6 +956,7 @@ func (d *ddl) RebaseAutoID(ctx context.Context, ident ast.Ident, newBase int64) return errors.Trace(err) } +// ShardRowID shards the implicit row ID by adding shard value to the row ID's first few bits. func (d *ddl) ShardRowID(ctx context.Context, ident ast.Ident, uVal uint64) error { job, err := d.createJobByIdent(ident) if err != nil { From 32b42ac47c765d735e7cb6ac2956427323833530 Mon Sep 17 00:00:00 2001 From: Ewan Chou Date: Tue, 2 Jan 2018 19:15:26 +0800 Subject: [PATCH 07/11] *: address comment --- ddl/ddl.go | 2 ++ ddl/ddl_api.go | 10 +++++----- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/ddl/ddl.go b/ddl/ddl.go index 8d0454a73f473..5ccb40e7e0cc3 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -48,6 +48,8 @@ const ( // DDLOwnerKey is the ddl owner path that is saved to etcd, and it's exported for testing. DDLOwnerKey = "/tidb/ddl/fg/owner" ddlPrompt = "ddl" + + shardRowIDBitsMax = 15 ) var ( diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index bd21accb3a46c..ba8f6579afb97 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -831,8 +831,8 @@ func handleTableOptions(options []*ast.TableOption, tbInfo *model.TableInfo) { case ast.TableOptionShardRowID: if !hasAutoIncrementColumn(tbInfo) { tbInfo.ShardRowIDBits = op.UintValue - if tbInfo.ShardRowIDBits > 15 { - tbInfo.ShardRowIDBits = 15 + if tbInfo.ShardRowIDBits > shardRowIDBitsMax { + tbInfo.ShardRowIDBits = shardRowIDBitsMax } } } @@ -906,8 +906,8 @@ func (d *ddl) AlterTable(ctx context.Context, ident ast.Ident, specs []*ast.Alte for _, opt := range spec.Options { switch opt.Tp { case ast.TableOptionShardRowID: - if opt.UintValue > 15 { - opt.UintValue = 15 + if opt.UintValue > shardRowIDBitsMax { + opt.UintValue = shardRowIDBitsMax } err = d.ShardRowID(ctx, ident, opt.UintValue) case ast.TableOptionAutoIncrement: @@ -977,7 +977,7 @@ func (d *ddl) createJobByIdent(ident ast.Ident) (*model.Job, error) { } t, err := is.TableByName(ident.Schema, ident.Name) if err != nil { - return nil, errors.Trace(infoschema.ErrTableNotExists.GenByArgs(ident.Schema, ident.Name)) + return nil, infoschema.ErrTableNotExists.GenByArgs(ident.Schema, ident.Name) } job := &model.Job{ SchemaID: schema.ID, From a290d98bec4f837a9278441dd6478b56c42ad140 Mon Sep 17 00:00:00 2001 From: Ewan Chou Date: Wed, 3 Jan 2018 19:27:03 +0800 Subject: [PATCH 08/11] *: add special comment for ddl binlog. --- sessionctx/binloginfo/binloginfo.go | 18 ++++++++++++++++++ sessionctx/binloginfo/binloginfo_test.go | 5 +++-- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/sessionctx/binloginfo/binloginfo.go b/sessionctx/binloginfo/binloginfo.go index 28568650c4858..5c79e2545621c 100644 --- a/sessionctx/binloginfo/binloginfo.go +++ b/sessionctx/binloginfo/binloginfo.go @@ -14,6 +14,8 @@ package binloginfo import ( + "regexp" + "strings" "sync" "time" @@ -98,6 +100,7 @@ func SetDDLBinlog(client interface{}, txn kv.Transaction, jobID int64, ddlQuery if client == nil { return } + ddlQuery = addSpecialComment(ddlQuery) info := &BinlogInfo{ Data: &binlog.Binlog{ Tp: binlog.BinlogType_Prewrite, @@ -108,3 +111,18 @@ func SetDDLBinlog(client interface{}, txn kv.Transaction, jobID int64, ddlQuery } txn.SetOption(kv.BinlogInfo, info) } + +const specialPrefix = `/*!90000` + +func addSpecialComment(ddlQuery string) string { + if strings.Contains(ddlQuery, specialPrefix) { + return ddlQuery + } + upperQuery := strings.ToUpper(ddlQuery) + reg, _ := regexp.Compile(`SHARD_ROW_ID_BITS\s*=\s*\d+`) + loc := reg.FindStringIndex(upperQuery) + if len(loc) < 2 { + return ddlQuery + } + return ddlQuery[:loc[0]] + specialPrefix + ddlQuery[loc[0]:loc[1]] + ` */` + ddlQuery[loc[1]:] +} diff --git a/sessionctx/binloginfo/binloginfo_test.go b/sessionctx/binloginfo/binloginfo_test.go index 09c75c4321efe..eb0809eef09d7 100644 --- a/sessionctx/binloginfo/binloginfo_test.go +++ b/sessionctx/binloginfo/binloginfo_test.go @@ -117,11 +117,12 @@ func (s *testBinlogSuite) TestBinlog(c *C) { tk := s.tk pump := s.pump tk.MustExec("drop table if exists local_binlog") - ddlQuery := "create table local_binlog (id int primary key, name varchar(10))" + ddlQuery := "create table local_binlog (id int primary key, name varchar(10)) shard_row_id_bits=1" + binlogDDLQuery := "create table local_binlog (id int primary key, name varchar(10)) /*! 90000 shard_row_id_bits=1 */" tk.MustExec(ddlQuery) var matched bool // got matched pre DDL and commit DDL for i := 0; i < 10; i++ { - preDDL, commitDDL := getLatestDDLBinlog(c, pump, ddlQuery) + preDDL, commitDDL := getLatestDDLBinlog(c, pump, binlogDDLQuery) if preDDL != nil && commitDDL != nil { if preDDL.DdlJobId == commitDDL.DdlJobId { c.Assert(commitDDL.StartTs, Equals, preDDL.StartTs) From 7e4df19c1fb0d825ddd7ecf6b6d5d7afba339046 Mon Sep 17 00:00:00 2001 From: Ewan Chou Date: Wed, 3 Jan 2018 19:33:07 +0800 Subject: [PATCH 09/11] *: address comment and fix ci --- executor/ddl_test.go | 3 ++- sessionctx/binloginfo/binloginfo.go | 2 +- sessionctx/binloginfo/binloginfo_test.go | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/executor/ddl_test.go b/executor/ddl_test.go index d44732d6cf9c9..fb7f9647f3248 100644 --- a/executor/ddl_test.go +++ b/executor/ddl_test.go @@ -351,7 +351,8 @@ func (s *testSuite) TestShardRowIDBits(c *C) { c.Assert(tk.Se.NewTxn(), IsNil) err = tbl.IterRecords(tk.Se, tbl.FirstKey(), nil, func(h int64, rec []types.Datum, cols []*table.Column) (more bool, err error) { c.Assert(h, GreaterEqual, int64(0)) - if (h >> 56) > 0 { + first8bits := h >> 56 + if first8bits > 0 { hasShardedID = true } count++ diff --git a/sessionctx/binloginfo/binloginfo.go b/sessionctx/binloginfo/binloginfo.go index 5c79e2545621c..ec20be43cfa82 100644 --- a/sessionctx/binloginfo/binloginfo.go +++ b/sessionctx/binloginfo/binloginfo.go @@ -112,7 +112,7 @@ func SetDDLBinlog(client interface{}, txn kv.Transaction, jobID int64, ddlQuery txn.SetOption(kv.BinlogInfo, info) } -const specialPrefix = `/*!90000` +const specialPrefix = `/*!90000 ` func addSpecialComment(ddlQuery string) string { if strings.Contains(ddlQuery, specialPrefix) { diff --git a/sessionctx/binloginfo/binloginfo_test.go b/sessionctx/binloginfo/binloginfo_test.go index eb0809eef09d7..715a79d9f2f6e 100644 --- a/sessionctx/binloginfo/binloginfo_test.go +++ b/sessionctx/binloginfo/binloginfo_test.go @@ -118,7 +118,7 @@ func (s *testBinlogSuite) TestBinlog(c *C) { pump := s.pump tk.MustExec("drop table if exists local_binlog") ddlQuery := "create table local_binlog (id int primary key, name varchar(10)) shard_row_id_bits=1" - binlogDDLQuery := "create table local_binlog (id int primary key, name varchar(10)) /*! 90000 shard_row_id_bits=1 */" + binlogDDLQuery := "create table local_binlog (id int primary key, name varchar(10)) /*!90000 shard_row_id_bits=1 */" tk.MustExec(ddlQuery) var matched bool // got matched pre DDL and commit DDL for i := 0; i < 10; i++ { From 0a42ecf58cc59923b52dee2a32b802bb42634af3 Mon Sep 17 00:00:00 2001 From: Ewan Chou Date: Wed, 3 Jan 2018 19:49:18 +0800 Subject: [PATCH 10/11] *: check error --- sessionctx/binloginfo/binloginfo.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sessionctx/binloginfo/binloginfo.go b/sessionctx/binloginfo/binloginfo.go index ec20be43cfa82..099e6f30dc54b 100644 --- a/sessionctx/binloginfo/binloginfo.go +++ b/sessionctx/binloginfo/binloginfo.go @@ -119,7 +119,8 @@ func addSpecialComment(ddlQuery string) string { return ddlQuery } upperQuery := strings.ToUpper(ddlQuery) - reg, _ := regexp.Compile(`SHARD_ROW_ID_BITS\s*=\s*\d+`) + reg, err := regexp.Compile(`SHARD_ROW_ID_BITS\s*=\s*\d+`) + terror.Log(err) loc := reg.FindStringIndex(upperQuery) if len(loc) < 2 { return ddlQuery From 3976397abe2326e3d7d96778eb872b6d1bebce9d Mon Sep 17 00:00:00 2001 From: Ewan Chou Date: Wed, 3 Jan 2018 23:29:09 +0800 Subject: [PATCH 11/11] *: address comment --- ddl/ddl_api.go | 14 +++++++------- executor/show.go | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index ba8f6579afb97..46b2482c22755 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -957,8 +957,8 @@ func (d *ddl) RebaseAutoID(ctx context.Context, ident ast.Ident, newBase int64) } // ShardRowID shards the implicit row ID by adding shard value to the row ID's first few bits. -func (d *ddl) ShardRowID(ctx context.Context, ident ast.Ident, uVal uint64) error { - job, err := d.createJobByIdent(ident) +func (d *ddl) ShardRowID(ctx context.Context, tableIdent ast.Ident, uVal uint64) error { + job, err := d.createJobForTable(tableIdent) if err != nil { return errors.Trace(err) } @@ -969,15 +969,15 @@ func (d *ddl) ShardRowID(ctx context.Context, ident ast.Ident, uVal uint64) erro return errors.Trace(err) } -func (d *ddl) createJobByIdent(ident ast.Ident) (*model.Job, error) { +func (d *ddl) createJobForTable(tableIdent ast.Ident) (*model.Job, error) { is := d.GetInformationSchema() - schema, ok := is.SchemaByName(ident.Schema) + schema, ok := is.SchemaByName(tableIdent.Schema) if !ok { - return nil, infoschema.ErrDatabaseNotExists.GenByArgs(ident.Schema) + return nil, infoschema.ErrDatabaseNotExists.GenByArgs(tableIdent.Schema) } - t, err := is.TableByName(ident.Schema, ident.Name) + t, err := is.TableByName(tableIdent.Schema, tableIdent.Name) if err != nil { - return nil, infoschema.ErrTableNotExists.GenByArgs(ident.Schema, ident.Name) + return nil, infoschema.ErrTableNotExists.GenByArgs(tableIdent.Schema, tableIdent.Name) } job := &model.Job{ SchemaID: schema.ID, diff --git a/executor/show.go b/executor/show.go index 094b480be07d2..7536d4abad700 100644 --- a/executor/show.go +++ b/executor/show.go @@ -624,7 +624,7 @@ func (e *ShowExec) fetchShowCreateTable() error { } if tb.Meta().ShardRowIDBits > 0 { - buf.WriteString(fmt.Sprintf(" SHARD_ROW_ID_BITS=%d", tb.Meta().ShardRowIDBits)) + buf.WriteString(fmt.Sprintf("/*!90000 SHARD_ROW_ID_BITS=%d */", tb.Meta().ShardRowIDBits)) } if len(tb.Meta().Comment) > 0 {