Skip to content

Can't stop with a savepoint? #1383

@zhaoyangjun0328

Description

@zhaoyangjun0328

Describe the bug(Please use English)
A clear and concise description of what the bug is.

Environment :

  • Flink version : 1.15.0
  • Flink CDC version: 2.2.1
  • Database and version: oracle 11g

To Reproduce
Steps to reproduce the behavior:

  1. Thes test data :
    flink stop {jobId}
  2. The test code :
    SET parallelism.default = 4;
    CREATE TABLE input(
    L_DATE DECIMAL(38, 0)
    ...
    ,PRIMARY KEY(L_DATE) NOT ENFORCED
    ) WITH (
    'connector' = 'oracle-cdc'
    ,'hostname' = ''
    ,'port' = '
    '
    ,'username' = 'flinkuser'
    ,'password' = '123456'
    ,'database-name' = 'orcl'
    ,'schema-name' = 'FLINKUSER'
    ,'table-name' = ''
    ,'scan.startup.mode' = 'initial'
    ,'debezium.log.mining.strategy' = 'online_catalog'
    ,'debezium.log.mining.continuous.mine' = 'true'
    ,'debezium.database.tablename.case.insensitive' = 'false'
    );
    CREATE TABLE output(
    L_DATE BIGINT
    ...
    ) WITH (
    'connector' = 'kafka'
    ,'topic' = '
    '
    ,'properties.bootstrap.servers' = ''
    ,'properties.group.id' = 'tmp_001'
    ,'scan.startup.mode' = 'earliest-offset'
    ,'format' = 'debezium-json'
    );

SET execution.checkpointing.interval = 10s;
SET execution.checkpointing.tolerable-failed-checkpoints = 1000;
INSERT INTO output
SELECT CAST(L_DATE AS BIGINT)
...
FROM input ;
5. The error :
Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR or HADOOP_CLASSPATH was set.
Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
Suspending job "eb5ad9271f8a9824238845ff19ae171d" with a CANONICAL savepoint.


The program finished with the following exception:

org.apache.flink.util.FlinkException: Could not stop with a savepoint job "eb5ad9271f8a9824238845ff19ae171d".
at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:588)
at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1026)
at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:573)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1093)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1156)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1156)
Caused by: java.util.concurrent.ExecutionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Task has failed.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:586)
... 6 more
Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Task has failed.
at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
at java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:252)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1387)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:45)
at akka.dispatch.OnComplete.internal(Future.scala:299)
at akka.dispatch.OnComplete.internal(Future.scala:297)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:25)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task has failed.
at org.apache.flink.runtime.checkpoint.PendingCheckpoint.abort(PendingCheckpoint.java:545)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2070)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1038)
at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103)
at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task name with subtask : Source: input[1] -> Calc[2] (1/1)#0 Failure reason: Task has failed.
at org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1388)
at org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1331)
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$11(StreamTask.java:1094)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
... 1 more
Caused by: java.util.concurrent.CompletionException: java.lang.Exception: Could not perform checkpoint 7 for operator Source: input[1] -> Calc[2] (1/1)#0.
at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
at java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
... 15 more
Caused by: java.lang.Exception: Could not perform checkpoint 7 for operator Source: input[1] -> Calc[2] (1/1)#0.
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1138)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$11(StreamTask.java:1085)
... 12 more
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 7 for operator Source: input[1] -> Calc[2] (1/1)#0. Failure reason: Checkpoint was declined.
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:227)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:212)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:647)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:320)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$12(StreamTask.java:1253)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1241)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1126)
... 13 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Call snapshotState() on closed source, checkpoint failed.
at com.ververica.cdc.debezium.DebeziumSourceFunction.snapshotState(DebeziumSourceFunction.java:309)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:87)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:222)
... 24 more

Additional Description
If applicable, add screenshots to help explain your problem.
OracleSorce And PgSource happen error, But MysqlSource is OK.

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions