Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bad state after delete issued #119

Open
andrewgdavis opened this issue Oct 10, 2019 · 3 comments
Open

bad state after delete issued #119

andrewgdavis opened this issue Oct 10, 2019 · 3 comments

Comments

@andrewgdavis
Copy link

andrewgdavis commented Oct 10, 2019

similar issue to #13

basically, there was an access issue to the save point location (i was using S3), and after seeing the logs that something was wrong kubectl delete -f myFlinkApp.yaml

however the jobs would not delete, and the following error message is looped infinitely:
{"json":{"app_name":"davis-wordcount-operator-example","ns":"flink-operator","phase":"Deleting"},"level":"info","msg":"Logged Warning event: SavepointFailed: Failed to take savepoint {java.util.concurrent.CompletionException java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Failed to trigger savepoint. Failure reason: Not all required tasks are currently running.\n\tat org.apache.flink.runtime.scheduler.LegacyScheduler.lambda$triggerSavepoint$0(LegacyScheduler.java:510)\n\tat java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)\n\tat java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)\n\tat java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)\n\tat org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)\n\tat akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)\n\tat scala.PartialFunction.applyOrElse(PartialFunction.scala:123)\n\tat scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)\n\tat akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat akka.actor.Actor.aroundReceive(Actor.scala:517)\n\tat akka.actor.Actor.aroundReceive$(Actor.scala:515)\n\tat akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)\n\tat akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)\n\tat akka.actor.ActorCell.invoke(ActorCell.scala:561)\n\tat akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)\n\tat akka.dispatch.Mailbox.run(Mailbox.scala:225)\n\tat akka.dispatch.Mailbox.exec(Mailbox.scala:235)\n\tat akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\nCaused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Failed to trigger savepoint. Failure reason: Not all required tasks are currently running.\n\tat java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)\n\tat java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)\n\tat java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)\n\tat java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:614)\n\tat java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1983)\n\tat org.apache.flink.runtime.scheduler.LegacyScheduler.triggerSavepoint(LegacyScheduler.java:504)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.triggerSavepoint(JobMaster.java:638)\n\tat sun.reflect.GeneratedMethodAccessor113.invoke(Unknown Source)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)\n\tat org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)\n\t... 22 more\nCaused by: org.apache.flink.runtime.checkpoint.CheckpointException: Failed to trigger savepoint. Failure reason: Not all required tasks are currently running.\n\tat org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerSavepointInternal(CheckpointCoordinator.java:428)\n\tat org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerSavepoint(CheckpointCoordinator.java:377)\n\tat org.apache.flink.runtime.scheduler.LegacyScheduler.triggerSavepoint(LegacyScheduler.java:503)\n\t... 28 more\n}","ts":"2019-10-10T21:16:34Z"}

it may be nice to have pre-emptive test, or configurable number of retrys on failure.


after manually issuing kubectl delete deployment $nameOfFlinkAppDeployment the flinkoperator gets into a bad state trying to manage the jobs. issuing kubectl delete flinkapplication $nameOfFlinkApp results in a failure message in the operator:
{"json":{"app_name":"davis-wordcount-operator-example","ns":"flink-operator","phase":"Deleting"},"level":"warning","msg":"Failed to reconcile resource flink-operator/davis-wordcount-operator-example: GetJobs call failed with status FAILED and message []: Get http://davis-wordcount-operator-example-0a501337.flink-operator:8081/jobs: net/http: request canceled while waiting for connection (Client.Timeout exceeded while awaiting headers)","ts":"2019-10-10T21:26:47Z"}
The workaround was to delete the finalizer when kubectl edit flinkapplication $nameOfFlinkApp

@anandswaminathan
Copy link
Contributor

@andrewgdavis

This is the default setting. We try to savepoint the job before bringing the cluster down. Please set the deleteMode to None if you want deletion to go through without savepointing - https://github.com/lyft/flinkk8soperator/blob/master/docs/crd.md

@andrewgdavis
Copy link
Author

Savepointing was desired; it just happened to fail, which in turn put the operator into a bad state.

@anandswaminathan
Copy link
Contributor

@andrewgdavis I missed the last line in your issue.

We already have it. You can configure by setting here: https://github.com/lyft/flinkk8soperator/blob/master/pkg/controller/config/config.go#L26

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants