diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java index 9098e1f98..bac574a43 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java @@ -257,6 +257,12 @@ public Optional copyLogSegmentData(final RemoteLogSegmentMetadat maybeEncryptionKey, customMetadataBuilder); } catch (final Exception e) { + try { + // best effort on removing orphan files + tryDeleteSegmentObjects(remoteLogSegmentMetadata); + } catch (final Exception ignored) { + // ignore all exceptions + } throw new RemoteStorageException(e); } @@ -342,7 +348,7 @@ private TransformChunkEnumeration transformation( return transformEnum; } - private SegmentIndexesV1 uploadIndexes( + SegmentIndexesV1 uploadIndexes( final RemoteLogSegmentMetadata remoteLogSegmentMetadata, final LogSegmentData segmentData, final DataKeyAndAAD maybeEncryptionKey, @@ -661,10 +667,7 @@ public void deleteLogSegmentData(final RemoteLogSegmentMetadata remoteLogSegment final long startedMs = time.milliseconds(); try { - final Set keys = Arrays.stream(ObjectKeyFactory.Suffix.values()) - .map(s -> objectKeyFactory.key(remoteLogSegmentMetadata, s)) - .collect(Collectors.toSet()); - deleter.delete(keys); + tryDeleteSegmentObjects(remoteLogSegmentMetadata); } catch (final Exception e) { metrics.recordSegmentDeleteError(remoteLogSegmentMetadata.remoteLogSegmentId() .topicIdPartition().topicPartition()); @@ -678,6 +681,15 @@ public void deleteLogSegmentData(final RemoteLogSegmentMetadata remoteLogSegment log.info("Deleting log segment data for completed successfully {}", remoteLogSegmentMetadata); } + private void tryDeleteSegmentObjects( + final RemoteLogSegmentMetadata remoteLogSegmentMetadata + ) throws StorageBackendException { + final Set keys = Arrays.stream(ObjectKeyFactory.Suffix.values()) + .map(s -> objectKeyFactory.key(remoteLogSegmentMetadata, s)) + .collect(Collectors.toSet()); + deleter.delete(keys); + } + @Override public void close() { metrics.close(); diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java index 9ef568e6f..7c75183f0 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/RemoteStorageManagerTest.java @@ -18,15 +18,19 @@ import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; import java.nio.channels.ClosedByInterruptException; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.util.Map; +import java.util.Optional; import java.util.stream.Stream; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; +import org.apache.kafka.server.log.remote.storage.LogSegmentData; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.server.log.remote.storage.RemoteStorageException; @@ -36,6 +40,7 @@ import io.aiven.kafka.tieredstorage.storage.StorageBackendException; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -45,8 +50,12 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.params.provider.Arguments.arguments; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; class RemoteStorageManagerTest { @@ -221,6 +230,85 @@ void fetchSegmentNonInterruptionExceptionWhenGettingSegment( .hasRootCauseInstanceOf(exceptionClass); } + @Test + void deleteObjectsWhenUploadFails( + @TempDir final Path partitionDir + ) throws IOException, StorageBackendException, RemoteStorageException { + // given a sample local segment to be uploaded + final var segmentPath = Files.createFile(partitionDir.resolve("0000.log")); + final var segmentContent = "test"; + Files.writeString(segmentPath, segmentContent); + final var indexPath = Files.createFile(partitionDir.resolve("0000.index")); + final var timeIndexPath = Files.createFile(partitionDir.resolve("0000.timeindex")); + final var producerSnapshotPath = Files.createFile(partitionDir.resolve("0000.snapshot")); + final var logSegmentData = new LogSegmentData( + segmentPath, + indexPath, + timeIndexPath, + Optional.empty(), + producerSnapshotPath, + ByteBuffer.wrap("test".getBytes(StandardCharsets.UTF_8)) + ); + + final var remoteLogSegmentMetadata = new RemoteLogSegmentMetadata( + REMOTE_SEGMENT_ID, 0, 1L, + 0, 0, 0, segmentContent.length(), Map.of(0, 0L)); + + final var remotePartitionPath = targetDir.resolve(TOPIC_ID_PARTITION.topic() + "-" + TOPIC_ID) + .resolve(String.valueOf(TOPIC_ID_PARTITION.partition())); + + final var config = Map.of( + "chunk.size", "1", + "storage.backend.class", "io.aiven.kafka.tieredstorage.storage.filesystem.FileSystemStorage", + "storage.root", targetDir.toString() + ); + rsm.configure(config); + rsm = spy(rsm); + + // when first upload fails + doThrow(IOException.class).when(rsm).uploadSegmentLog(any(), any(), anyBoolean(), any(), any()); + + assertThatThrownBy(() -> rsm.copyLogSegmentData(remoteLogSegmentMetadata, logSegmentData)) + .isInstanceOf(RemoteStorageException.class) + .hasRootCauseInstanceOf(IOException.class); + + // then no files stored in remote + assertThat(remotePartitionPath).doesNotExist(); + + // fallback to real method + doCallRealMethod().when(rsm).uploadSegmentLog(any(), any(), anyBoolean(), any(), any()); + + // when second upload fails + doThrow(IOException.class).when(rsm).uploadIndexes(any(), any(), any(), any()); + + assertThatThrownBy(() -> rsm.copyLogSegmentData(remoteLogSegmentMetadata, logSegmentData)) + .isInstanceOf(RemoteStorageException.class) + .hasRootCauseInstanceOf(IOException.class); + + // then no files stored in remote + assertThat(remotePartitionPath).doesNotExist(); + + // fallback to real method + doCallRealMethod().when(rsm).uploadIndexes(any(), any(), any(), any()); + + // when third upload fails + doThrow(IOException.class).when(rsm).uploadManifest(any(), any(), any(), anyBoolean(), any(), any()); + + assertThatThrownBy(() -> rsm.copyLogSegmentData(remoteLogSegmentMetadata, logSegmentData)) + .isInstanceOf(RemoteStorageException.class) + .hasRootCauseInstanceOf(IOException.class); + + // then no files stored in remote + assertThat(remotePartitionPath).doesNotExist(); + + // fallback to real method + doCallRealMethod().when(rsm).uploadManifest(any(), any(), any(), anyBoolean(), any(), any()); + + // when all good + rsm.copyLogSegmentData(remoteLogSegmentMetadata, logSegmentData); + assertThat(Files.list(remotePartitionPath)).hasSize(3); + } + static Stream provideNonInterruptionExceptions() { return Stream.of( arguments(null, Exception.class),