Skip to content

Commit

Permalink
Revert "pageserver: revert flush backpressure (#8550) (#10135)" (#10270)
Browse files Browse the repository at this point in the history
This reverts commit f3ecd5d.

It is
[suspected](https://neondb.slack.com/archives/C033RQ5SPDH/p1735907405716759)
to have caused significant read amplification in the [ingest
benchmark](https://neonprod.grafana.net/d/de3mupf4g68e8e/perf-test3a-ingest-benchmark?orgId=1&from=now-30d&to=now&timezone=utc&var-new_project_endpoint_id=ep-solitary-sun-w22bmut6&var-large_tenant_endpoint_id=ep-holy-bread-w203krzs)
(specifically during index creation).

We will revisit an intermediate improvement here to unblock [upload
parallelism](#10096) before
properly addressing [compaction
backpressure](#8390).
  • Loading branch information
erikgrinaker authored Jan 3, 2025
1 parent b33299d commit 1393cc6
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 13 deletions.
25 changes: 24 additions & 1 deletion pageserver/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use metrics::{
register_counter_vec, register_gauge_vec, register_histogram, register_histogram_vec,
register_int_counter, register_int_counter_pair_vec, register_int_counter_vec,
register_int_gauge, register_int_gauge_vec, register_uint_gauge, register_uint_gauge_vec,
Counter, CounterVec, GaugeVec, Histogram, HistogramVec, IntCounter, IntCounterPair,
Counter, CounterVec, Gauge, GaugeVec, Histogram, HistogramVec, IntCounter, IntCounterPair,
IntCounterPairVec, IntCounterVec, IntGauge, IntGaugeVec, UIntGauge, UIntGaugeVec,
};
use once_cell::sync::Lazy;
Expand Down Expand Up @@ -445,6 +445,15 @@ pub(crate) static WAIT_LSN_TIME: Lazy<Histogram> = Lazy::new(|| {
.expect("failed to define a metric")
});

static FLUSH_WAIT_UPLOAD_TIME: Lazy<GaugeVec> = Lazy::new(|| {
register_gauge_vec!(
"pageserver_flush_wait_upload_seconds",
"Time spent waiting for preceding uploads during layer flush",
&["tenant_id", "shard_id", "timeline_id"]
)
.expect("failed to define a metric")
});

static LAST_RECORD_LSN: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_last_record_lsn",
Expand Down Expand Up @@ -2577,6 +2586,7 @@ pub(crate) struct TimelineMetrics {
shard_id: String,
timeline_id: String,
pub flush_time_histo: StorageTimeMetrics,
pub flush_wait_upload_time_gauge: Gauge,
pub compact_time_histo: StorageTimeMetrics,
pub create_images_time_histo: StorageTimeMetrics,
pub logical_size_histo: StorageTimeMetrics,
Expand Down Expand Up @@ -2622,6 +2632,9 @@ impl TimelineMetrics {
&shard_id,
&timeline_id,
);
let flush_wait_upload_time_gauge = FLUSH_WAIT_UPLOAD_TIME
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
let compact_time_histo = StorageTimeMetrics::new(
StorageTimeOperation::Compact,
&tenant_id,
Expand Down Expand Up @@ -2767,6 +2780,7 @@ impl TimelineMetrics {
shard_id,
timeline_id,
flush_time_histo,
flush_wait_upload_time_gauge,
compact_time_histo,
create_images_time_histo,
logical_size_histo,
Expand Down Expand Up @@ -2816,6 +2830,14 @@ impl TimelineMetrics {
self.resident_physical_size_gauge.get()
}

pub(crate) fn flush_wait_upload_time_gauge_add(&self, duration: f64) {
self.flush_wait_upload_time_gauge.add(duration);
crate::metrics::FLUSH_WAIT_UPLOAD_TIME
.get_metric_with_label_values(&[&self.tenant_id, &self.shard_id, &self.timeline_id])
.unwrap()
.add(duration);
}

pub(crate) fn shutdown(&self) {
let was_shutdown = self
.shutdown
Expand All @@ -2833,6 +2855,7 @@ impl TimelineMetrics {
let shard_id = &self.shard_id;
let _ = LAST_RECORD_LSN.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = DISK_CONSISTENT_LSN.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = FLUSH_WAIT_UPLOAD_TIME.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = STANDBY_HORIZON.remove_label_values(&[tenant_id, shard_id, timeline_id]);
{
RESIDENT_PHYSICAL_SIZE_GLOBAL.sub(self.resident_physical_size_get());
Expand Down
38 changes: 30 additions & 8 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,19 @@ use self::layer_manager::LayerManager;
use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};

use super::config::TenantConf;
use super::remote_timeline_client::index::IndexPart;
use super::remote_timeline_client::RemoteTimelineClient;
use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline};
use super::storage_layer::{LayerFringe, LayerVisibilityHint, ReadableLayer};
use super::upload_queue::NotInitialized;
use super::GcError;
use super::{
debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf, MaybeOffloaded,
config::TenantConf, storage_layer::LayerVisibilityHint, upload_queue::NotInitialized,
MaybeOffloaded,
};
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
use super::{
remote_timeline_client::RemoteTimelineClient, remote_timeline_client::WaitCompletionError,
storage_layer::ReadableLayer,
};
use super::{
secondary::heatmap::{HeatMapLayer, HeatMapTimeline},
GcError,
};

#[cfg(test)]
Expand Down Expand Up @@ -3836,6 +3840,24 @@ impl Timeline {
// release lock on 'layers'
};

// Backpressure mechanism: wait with continuation of the flush loop until we have uploaded all layer files.
// This makes us refuse ingest until the new layers have been persisted to the remote
let start = Instant::now();
self.remote_client
.wait_completion()
.await
.map_err(|e| match e {
WaitCompletionError::UploadQueueShutDownOrStopped
| WaitCompletionError::NotInitialized(
NotInitialized::ShuttingDown | NotInitialized::Stopped,
) => FlushLayerError::Cancelled,
WaitCompletionError::NotInitialized(NotInitialized::Uninitialized) => {
FlushLayerError::Other(anyhow!(e).into())
}
})?;
let duration = start.elapsed().as_secs_f64();
self.metrics.flush_wait_upload_time_gauge_add(duration);

// FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`,
// a compaction can delete the file and then it won't be available for uploads any more.
// We still schedule the upload, resulting in an error, but ideally we'd somehow avoid this
Expand Down
1 change: 1 addition & 0 deletions test_runner/fixtures/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ def counter(name: str) -> str:
"pageserver_evictions_with_low_residence_duration_total",
"pageserver_aux_file_estimated_size",
"pageserver_valid_lsn_lease_count",
"pageserver_flush_wait_upload_seconds",
counter("pageserver_tenant_throttling_count_accounted_start"),
counter("pageserver_tenant_throttling_count_accounted_finish"),
counter("pageserver_tenant_throttling_wait_usecs_sum"),
Expand Down
13 changes: 9 additions & 4 deletions test_runner/regress/test_branching.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from fixtures.utils import query_scalar
from performance.test_perf_pgbench import get_scales_matrix
from requests import RequestException
from requests.exceptions import RetryError


# Test branch creation
Expand Down Expand Up @@ -177,8 +176,11 @@ def start_creating_timeline():

env.neon_cli.mappings_map_branch(initial_branch, env.initial_tenant, env.initial_timeline)

with pytest.raises(RuntimeError, match="is not active, state: Loading"):
env.endpoints.create_start(initial_branch, tenant_id=env.initial_tenant)
with pytest.raises(RuntimeError, match="ERROR: Not found: Timeline"):
env.endpoints.create_start(
initial_branch, tenant_id=env.initial_tenant, basebackup_request_tries=2
)
ps_http.configure_failpoints(("before-upload-index-pausable", "off"))
finally:
env.pageserver.stop(immediate=True)

Expand Down Expand Up @@ -219,7 +221,10 @@ def start_creating_timeline():

branch_id = TimelineId.generate()

with pytest.raises(RetryError, match="too many 503 error responses"):
with pytest.raises(
PageserverApiException,
match="Cannot branch off the timeline that's not present in pageserver",
):
ps_http.timeline_create(
env.pg_version,
env.initial_tenant,
Expand Down
48 changes: 48 additions & 0 deletions test_runner/regress/test_remote_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,54 @@ def create_in_background():
create_thread.join()


def test_paused_upload_stalls_checkpoint(
neon_env_builder: NeonEnvBuilder,
):
"""
This test checks that checkpoints block on uploads to remote storage.
"""
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)

env = neon_env_builder.init_start(
initial_tenant_conf={
# Set a small compaction threshold
"compaction_threshold": "3",
# Disable GC
"gc_period": "0s",
# disable PITR
"pitr_interval": "0s",
}
)

env.pageserver.allowed_errors.append(
f".*PUT.* path=/v1/tenant/{env.initial_tenant}/timeline.* request was dropped before completing"
)

tenant_id = env.initial_tenant
timeline_id = env.initial_timeline

client = env.pageserver.http_client()
layers_at_creation = client.layer_map_info(tenant_id, timeline_id)
deltas_at_creation = len(layers_at_creation.delta_layers())
assert (
deltas_at_creation == 1
), "are you fixing #5863? make sure we end up with 2 deltas at the end of endpoint lifecycle"

# Make new layer uploads get stuck.
# Note that timeline creation waits for the initial layers to reach remote storage.
# So at this point, the `layers_at_creation` are in remote storage.
client.configure_failpoints(("before-upload-layer-pausable", "pause"))

with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
# Build two tables with some data inside
endpoint.safe_psql("CREATE TABLE foo AS SELECT x FROM generate_series(1, 10000) g(x)")
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)

with pytest.raises(ReadTimeout):
client.timeline_checkpoint(tenant_id, timeline_id, timeout=5)
client.configure_failpoints(("before-upload-layer-pausable", "off"))


def wait_upload_queue_empty(
client: PageserverHttpClient, tenant_id: TenantId, timeline_id: TimelineId
):
Expand Down

1 comment on commit 1393cc6

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7377 tests run: 7013 passed, 1 failed, 363 skipped (full report)


Failures on Postgres 16

  • test_storage_controller_many_tenants[github-actions-selfhosted]: release-x86-64
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_storage_controller_many_tenants[release-pg16-github-actions-selfhosted]"

Code coverage* (full report)

  • functions: 31.2% (8407 of 26945 functions)
  • lines: 47.9% (66724 of 139172 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
1393cc6 at 2025-01-03T18:10:40.426Z :recycle:

Please sign in to comment.