@@ -500,7 +500,7 @@ func (s *schemaVersionSyncer) syncJobSchemaVer(ctx context.Context) {
500
500
}
501
501
502
502
func (s * schemaVersionSyncer ) handleJobSchemaVerKV (kv * mvccpb.KeyValue , tp mvccpb.Event_EventType ) {
503
- jobID , tidbID , schemaVer , valid := decodeJobVersionEvent (kv , s .jobNodeVerPrefix )
503
+ jobID , tidbID , schemaVer , valid := decodeJobVersionEvent (kv , tp , s .jobNodeVerPrefix )
504
504
if ! valid {
505
505
logutil .DDLLogger ().Error ("invalid job version kv" , zap .Stringer ("kv" , kv ), zap .Stringer ("type" , tp ))
506
506
return
@@ -543,7 +543,7 @@ func (s *schemaVersionSyncer) jobSchemaVerMatchOrSet(jobID int64, matchFn func(m
543
543
return item
544
544
}
545
545
546
- func decodeJobVersionEvent (kv * mvccpb.KeyValue , prefix string ) (jobID int64 , tidbID string , schemaVer int64 , valid bool ) {
546
+ func decodeJobVersionEvent (kv * mvccpb.KeyValue , tp mvccpb. Event_EventType , prefix string ) (jobID int64 , tidbID string , schemaVer int64 , valid bool ) {
547
547
left := strings .TrimPrefix (string (kv .Key ), prefix )
548
548
parts := strings .Split (left , "/" )
549
549
if len (parts ) != 2 {
@@ -553,9 +553,12 @@ func decodeJobVersionEvent(kv *mvccpb.KeyValue, prefix string) (jobID int64, tid
553
553
if err != nil {
554
554
return 0 , "" , 0 , false
555
555
}
556
- schemaVer , err = strconv .ParseInt (string (kv .Value ), 10 , 64 )
557
- if err != nil {
558
- return 0 , "" , 0 , false
556
+ // there is Value in DELETE event, so we need to check it.
557
+ if tp == mvccpb .PUT {
558
+ schemaVer , err = strconv .ParseInt (string (kv .Value ), 10 , 64 )
559
+ if err != nil {
560
+ return 0 , "" , 0 , false
561
+ }
559
562
}
560
563
return jobID , parts [1 ], schemaVer , true
561
564
}
0 commit comments