Skip to content
1 change: 1 addition & 0 deletions ast/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,7 @@ const (
TableOptionDelayKeyWrite
TableOptionRowFormat
TableOptionStatsPersistent
TableOptionShardRowID
TableOptionPackKeys
)

Expand Down
2 changes: 2 additions & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ const (
// DDLOwnerKey is the ddl owner path that is saved to etcd, and it's exported for testing.
DDLOwnerKey = "/tidb/ddl/fg/owner"
ddlPrompt = "ddl"

shardRowIDBitsMax = 15
)

var (
Expand Down
59 changes: 57 additions & 2 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -828,8 +828,24 @@ func handleTableOptions(options []*ast.TableOption, tbInfo *model.TableInfo) {
tbInfo.Charset = op.StrValue
case ast.TableOptionCollate:
tbInfo.Collate = op.StrValue
case ast.TableOptionShardRowID:
if !hasAutoIncrementColumn(tbInfo) {
tbInfo.ShardRowIDBits = op.UintValue
if tbInfo.ShardRowIDBits > shardRowIDBitsMax {
tbInfo.ShardRowIDBits = shardRowIDBitsMax
}
}
}
}
}

func hasAutoIncrementColumn(tbInfo *model.TableInfo) bool {
for _, col := range tbInfo.Columns {
if mysql.HasAutoIncrementFlag(col.Flag) {
return true
}
}
return false
}

func (d *ddl) AlterTable(ctx context.Context, ident ast.Ident, specs []*ast.AlterTableSpec) (err error) {
Expand Down Expand Up @@ -888,9 +904,17 @@ func (d *ddl) AlterTable(ctx context.Context, ident ast.Ident, specs []*ast.Alte
err = ErrUnsupportedModifyPrimaryKey.GenByArgs("drop")
case ast.AlterTableOption:
for _, opt := range spec.Options {
if opt.Tp == ast.TableOptionAutoIncrement {
switch opt.Tp {
case ast.TableOptionShardRowID:
if opt.UintValue > shardRowIDBitsMax {
opt.UintValue = shardRowIDBitsMax
}
err = d.ShardRowID(ctx, ident, opt.UintValue)
case ast.TableOptionAutoIncrement:
err = d.RebaseAutoID(ctx, ident, int64(opt.UintValue))
break
}
if err != nil {
return errors.Trace(err)
}
}
default:
Expand Down Expand Up @@ -932,6 +956,37 @@ func (d *ddl) RebaseAutoID(ctx context.Context, ident ast.Ident, newBase int64)
return errors.Trace(err)
}

// ShardRowID shards the implicit row ID by adding shard value to the row ID's first few bits.
func (d *ddl) ShardRowID(ctx context.Context, tableIdent ast.Ident, uVal uint64) error {
job, err := d.createJobForTable(tableIdent)
if err != nil {
return errors.Trace(err)
}
job.Type = model.ActionShardRowID
job.Args = []interface{}{uVal}
err = d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
return errors.Trace(err)
}

func (d *ddl) createJobForTable(tableIdent ast.Ident) (*model.Job, error) {
is := d.GetInformationSchema()
schema, ok := is.SchemaByName(tableIdent.Schema)
if !ok {
return nil, infoschema.ErrDatabaseNotExists.GenByArgs(tableIdent.Schema)
}
t, err := is.TableByName(tableIdent.Schema, tableIdent.Name)
if err != nil {
return nil, infoschema.ErrTableNotExists.GenByArgs(tableIdent.Schema, tableIdent.Name)
}
job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
BinlogInfo: &model.HistoryInfo{},
}
return job, nil
}

func checkColumnConstraint(constraints []*ast.ColumnOption) error {
for _, constraint := range constraints {
switch constraint.Tp {
Expand Down
5 changes: 5 additions & 0 deletions ddl/ddl_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
tmysql "github.com/pingcap/tidb/mysql"
Expand Down Expand Up @@ -66,6 +67,7 @@ type testDBSuite struct {
tk *testkit.TestKit
s tidb.Session
lease time.Duration
autoIDStep int64
}

func (s *testDBSuite) SetUpSuite(c *C) {
Expand All @@ -77,6 +79,8 @@ func (s *testDBSuite) SetUpSuite(c *C) {
tidb.SetSchemaLease(s.lease)
tidb.SetStatsLease(0)
s.schemaName = "test_db"
s.autoIDStep = autoid.GetStep()
autoid.SetStep(5000)

s.store, err = tikv.NewMockTikvStore()
c.Assert(err, IsNil)
Expand All @@ -99,6 +103,7 @@ func (s *testDBSuite) TearDownSuite(c *C) {
s.dom.Close()
s.store.Close()
testleak.AfterTest(c)()
autoid.SetStep(s.autoIDStep)
}

func (s *testDBSuite) testErrorCode(c *C, sql string, errCode int) {
Expand Down
2 changes: 2 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,8 @@ func (d *ddl) runDDLJob(t *meta.Meta, job *model.Job) (ver int64, err error) {
ver, err = d.onRenameTable(t, job)
case model.ActionSetDefaultValue:
ver, err = d.onSetDefaultValue(t, job)
case model.ActionShardRowID:
ver, err = d.onShardRowID(t, job)
default:
// Invalid job, cancel it.
job.State = model.JobStateCancelled
Expand Down
23 changes: 23 additions & 0 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,29 @@ func (d *ddl) onRebaseAutoID(t *meta.Meta, job *model.Job) (ver int64, _ error)
return ver, nil
}

func (d *ddl) onShardRowID(t *meta.Meta, job *model.Job) (ver int64, _ error) {
var shardRowIDBits uint64
err := job.DecodeArgs(&shardRowIDBits)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
tblInfo, err := getTableInfo(t, job, job.SchemaID)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
tblInfo.ShardRowIDBits = shardRowIDBits
job.State = model.JobStateDone
job.BinlogInfo.AddTableInfo(ver, tblInfo)
ver, err = updateTableInfo(t, job, tblInfo, tblInfo.State)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
return ver, nil
}

func (d *ddl) onRenameTable(t *meta.Meta, job *model.Job) (ver int64, _ error) {
var oldSchemaID int64
var tableName model.CIStr
Expand Down
28 changes: 28 additions & 0 deletions executor/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/plan"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/testkit"
goctx "golang.org/x/net/context"
Expand Down Expand Up @@ -334,3 +335,30 @@ func (s *testSuite) TestTooLargeIdentifierLength(c *C) {
_, err = tk.Exec(fmt.Sprintf("create index %s on t(c)", indexName2))
c.Assert(err.Error(), Equals, fmt.Sprintf("[ddl:1059]Identifier name '%s' is too long", indexName2))
}

func (s *testSuite) TestShardRowIDBits(c *C) {
tk := testkit.NewTestKit(c, s.store)

tk.MustExec("use test")
tk.MustExec("create table t (a int) shard_row_id_bits = 15")
for i := 0; i < 100; i++ {
tk.MustExec(fmt.Sprintf("insert t values (%d)", i))
}
tbl, err := domain.GetDomain(tk.Se).InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
var hasShardedID bool
var count int
c.Assert(tk.Se.NewTxn(), IsNil)
err = tbl.IterRecords(tk.Se, tbl.FirstKey(), nil, func(h int64, rec []types.Datum, cols []*table.Column) (more bool, err error) {
c.Assert(h, GreaterEqual, int64(0))
first8bits := h >> 56
if first8bits > 0 {
hasShardedID = true
}
count++
return true, nil
})
c.Assert(err, IsNil)
c.Assert(count, Equals, 100)
c.Assert(hasShardedID, IsTrue)
}
6 changes: 6 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/parser"
Expand Down Expand Up @@ -65,11 +66,15 @@ type testSuite struct {
mvccStore *mocktikv.MvccStore
store kv.Storage
*parser.Parser

autoIDStep int64
}

var mockTikv = flag.Bool("mockTikv", true, "use mock tikv store in executor test")

func (s *testSuite) SetUpSuite(c *C) {
s.autoIDStep = autoid.GetStep()
autoid.SetStep(5000)
s.Parser = parser.New()
flag.Lookup("mockTikv")
useMockTikv := *mockTikv
Expand All @@ -96,6 +101,7 @@ func (s *testSuite) SetUpSuite(c *C) {

func (s *testSuite) TearDownSuite(c *C) {
s.store.Close()
autoid.SetStep(s.autoIDStep)
}

func (s *testSuite) SetUpTest(c *C) {
Expand Down
4 changes: 4 additions & 0 deletions executor/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,10 @@ func (e *ShowExec) fetchShowCreateTable() error {
}
}

if tb.Meta().ShardRowIDBits > 0 {
buf.WriteString(fmt.Sprintf("/*!90000 SHARD_ROW_ID_BITS=%d */", tb.Meta().ShardRowIDBits))
}

if len(tb.Meta().Comment) > 0 {
buf.WriteString(fmt.Sprintf(" COMMENT='%s'", format.OutputFormat(tb.Meta().Comment)))
}
Expand Down
2 changes: 1 addition & 1 deletion meta/autoid/autoid.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

// Test needs to change it, so it's a variable.
var step = int64(5000)
var step = int64(30000)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a metrics for auto-id allocation latter.


var errInvalidTableID = terror.ClassAutoid.New(codeInvalidTableID, "invalid TableID")

Expand Down
3 changes: 3 additions & 0 deletions model/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const (
ActionRebaseAutoID
ActionRenameTable
ActionSetDefaultValue
ActionShardRowID
)

func (action ActionType) String() string {
Expand Down Expand Up @@ -78,6 +79,8 @@ func (action ActionType) String() string {
return "rename table"
case ActionSetDefaultValue:
return "set default value"
case ActionShardRowID:
return "shard row ID"
default:
return "none"
}
Expand Down
3 changes: 3 additions & 0 deletions model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ type TableInfo struct {
// TODO: Remove it.
// Now it only uses for compatibility with the old version that already uses this field.
OldSchemaID int64 `json:"old_schema_id,omitempty"`

// ShardRowIDBits specify if the implicit row ID is sharded.
ShardRowIDBits uint64
}

// GetDBID returns the schema ID that is used to create an allocator.
Expand Down
1 change: 1 addition & 0 deletions parser/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ var tokenMap = map[string]int{
"QUARTER": quarter,
"QUERY": query,
"QUICK": quick,
"SHARD_ROW_ID_BITS": shardRowIDBits,
"RANGE": rangeKwd,
"READ": read,
"REAL": realType,
Expand Down
6 changes: 5 additions & 1 deletion parser/parser.y
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ import (
precisionType "PRECISION"
primary "PRIMARY"
procedure "PROCEDURE"
shardRowIDBits "SHARD_ROW_ID_BITS"
rangeKwd "RANGE"
read "READ"
realType "REAL"
Expand Down Expand Up @@ -5273,13 +5274,16 @@ TableOption:
{
$$ = &ast.TableOption{Tp: ast.TableOptionStatsPersistent}
}
| "SHARD_ROW_ID_BITS" EqOpt LengthNum
{
$$ = &ast.TableOption{Tp: ast.TableOptionShardRowID, UintValue: $3.(uint64)}
}
| "PACK_KEYS" EqOpt StatsPersistentVal
{
// Parse it but will ignore it.
$$ = &ast.TableOption{Tp: ast.TableOptionPackKeys}
}


StatsPersistentVal:
"DEFAULT"
{}
Expand Down
2 changes: 2 additions & 0 deletions parser/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1475,6 +1475,7 @@ func (s *testParserSuite) TestDDL(c *C) {
{"create table t (a timestamp default now() on update now)", false},
{"create table t (a timestamp default now() on update now())", true},
{"CREATE TABLE t (c TEXT) default CHARACTER SET utf8, default COLLATE utf8_general_ci;", true},
{"CREATE TABLE t (c TEXT) shard_row_id_bits = 1;", true},
// Create table with ON UPDATE CURRENT_TIMESTAMP(6), specify fraction part.
{"CREATE TABLE IF NOT EXISTS `general_log` (`event_time` timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6),`user_host` mediumtext NOT NULL,`thread_id` bigint(20) unsigned NOT NULL,`server_id` int(10) unsigned NOT NULL,`command_type` varchar(64) NOT NULL,`argument` mediumblob NOT NULL) ENGINE=CSV DEFAULT CHARSET=utf8 COMMENT='General log'", true},

Expand Down Expand Up @@ -1526,6 +1527,7 @@ func (s *testParserSuite) TestDDL(c *C) {
{"ALTER TABLE t ENGINE = '', COMMENT='', default COLLATE = utf8_general_ci", true},
{"ALTER TABLE t ENGINE = '', ADD COLUMN a SMALLINT", true},
{"ALTER TABLE t default COLLATE = utf8_general_ci, ENGINE = '', ADD COLUMN a SMALLINT", true},
{"ALTER TABLE t shard_row_id_bits = 1", true},

// For create index statement
{"CREATE INDEX idx ON t (a)", true},
Expand Down
19 changes: 19 additions & 0 deletions sessionctx/binloginfo/binloginfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package binloginfo

import (
"regexp"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -98,6 +100,7 @@ func SetDDLBinlog(client interface{}, txn kv.Transaction, jobID int64, ddlQuery
if client == nil {
return
}
ddlQuery = addSpecialComment(ddlQuery)
info := &BinlogInfo{
Data: &binlog.Binlog{
Tp: binlog.BinlogType_Prewrite,
Expand All @@ -108,3 +111,19 @@ func SetDDLBinlog(client interface{}, txn kv.Transaction, jobID int64, ddlQuery
}
txn.SetOption(kv.BinlogInfo, info)
}

const specialPrefix = `/*!90000 `

func addSpecialComment(ddlQuery string) string {
if strings.Contains(ddlQuery, specialPrefix) {
return ddlQuery
}
upperQuery := strings.ToUpper(ddlQuery)
reg, err := regexp.Compile(`SHARD_ROW_ID_BITS\s*=\s*\d+`)
terror.Log(err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If err isn't nil, reg may be nil.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just added to pass errcheck, it never returns nil

loc := reg.FindStringIndex(upperQuery)
if len(loc) < 2 {
return ddlQuery
}
return ddlQuery[:loc[0]] + specialPrefix + ddlQuery[loc[0]:loc[1]] + ` */` + ddlQuery[loc[1]:]
}
5 changes: 3 additions & 2 deletions sessionctx/binloginfo/binloginfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,12 @@ func (s *testBinlogSuite) TestBinlog(c *C) {
tk := s.tk
pump := s.pump
tk.MustExec("drop table if exists local_binlog")
ddlQuery := "create table local_binlog (id int primary key, name varchar(10))"
ddlQuery := "create table local_binlog (id int primary key, name varchar(10)) shard_row_id_bits=1"
binlogDDLQuery := "create table local_binlog (id int primary key, name varchar(10)) /*!90000 shard_row_id_bits=1 */"
tk.MustExec(ddlQuery)
var matched bool // got matched pre DDL and commit DDL
for i := 0; i < 10; i++ {
preDDL, commitDDL := getLatestDDLBinlog(c, pump, ddlQuery)
preDDL, commitDDL := getLatestDDLBinlog(c, pump, binlogDDLQuery)
if preDDL != nil && commitDDL != nil {
if preDDL.DdlJobId == commitDDL.DdlJobId {
c.Assert(commitDDL.StartTs, Equals, preDDL.StartTs)
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ type TransactionContext struct {
Histroy interface{}
SchemaVersion int64
StartTS uint64
Shard *int64
TableDeltaMap map[int64]TableDelta
}

Expand Down
Loading