Skip to content

Commit

Permalink
Add support for merging chunks
Browse files Browse the repository at this point in the history
A new procedure called `merge_chunks` is introduced that can merge an
arbitrary number of chunks if the right conditions apply. Basic checks
are done to ensure that the chunks can be merged from a partitioning
perspective. Some more advanced cases that are potentially mergeable
are not supported at this time (e.g., complicated merges of chunks
with multi-dimensional partitioning).

Currently, the merge defaults to taking an AccessExclusive lock on the
merged chunks to prevent deadlocks and concurrent
modifications. Weaker locking is supported via an anonymous settings
variable, but this is mostly to prove in tests that these approaches
can lead to deadlocks.

The actual merging is done by rewriting all the data from multiple
chunks into a (temporary) merged heap using the same approach as that
implemented to support VACUUM FULL and CLUSTER. Then this new heap is
swapped into one of the original relations while the rest are
dropped. This approach is MVCC compliant and implements correct
visibility under higher isolation levels, while also doing vacuum and
leaving no garbage tuples.
  • Loading branch information
erimatnor committed Dec 16, 2024
1 parent b1d47ac commit c9760c6
Show file tree
Hide file tree
Showing 23 changed files with 2,306 additions and 27 deletions.
1 change: 1 addition & 0 deletions .unreleased/pr_7433
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #7433 Add support for merging chunks
4 changes: 4 additions & 0 deletions sql/maintenance_utils.sql
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ CREATE OR REPLACE PROCEDURE @extschema@.convert_to_rowstore(
if_columnstore BOOLEAN = true
) AS '@MODULE_PATHNAME@', 'ts_decompress_chunk' LANGUAGE C;

CREATE OR REPLACE PROCEDURE @extschema@.merge_chunks(
variadic chunks REGCLASS[]
) LANGUAGE C AS '@MODULE_PATHNAME@', 'ts_merge_chunks';

CREATE OR REPLACE FUNCTION _timescaledb_functions.recompress_chunk_segmentwise(
uncompressed_chunk REGCLASS,
if_compressed BOOLEAN = true
Expand Down
5 changes: 5 additions & 0 deletions sql/updates/latest-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,8 @@ CREATE FUNCTION @extschema@.hypertable_columnstore_stats (hypertable REGCLASS)
STABLE STRICT
AS 'SELECT * FROM @extschema@.hypertable_compression_stats($1)'
SET search_path TO pg_catalog, pg_temp;

-- Merge chunks
CREATE PROCEDURE @extschema@.merge_chunks(
variadic chunks REGCLASS[]
) LANGUAGE C AS '@MODULE_PATHNAME@', 'ts_update_placeholder';
2 changes: 2 additions & 0 deletions sql/updates/reverse-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,5 @@ ALTER EXTENSION timescaledb DROP VIEW timescaledb_information.chunk_columnstore_
DROP VIEW timescaledb_information.hypertable_columnstore_settings;
DROP VIEW timescaledb_information.chunk_columnstore_settings;

-- Merge chunks
DROP PROCEDURE IF EXISTS @extschema@.merge_chunks(variadic chunks REGCLASS[]);
3 changes: 2 additions & 1 deletion src/chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ extern bool ts_chunk_exists_relid(Oid relid);
extern TSDLLEXPORT bool ts_chunk_exists_with_compression(int32 hypertable_id);
extern void ts_chunk_recreate_all_constraints_for_dimension(Hypertable *ht, int32 dimension_id);
extern int ts_chunk_delete_by_hypertable_id(int32 hypertable_id);
extern int ts_chunk_delete_by_name(const char *schema, const char *table, DropBehavior behavior);
extern TSDLLEXPORT int ts_chunk_delete_by_name(const char *schema, const char *table,
DropBehavior behavior);
extern bool ts_chunk_set_name(Chunk *chunk, const char *newname);
extern bool ts_chunk_set_schema(Chunk *chunk, const char *newschema);
extern TSDLLEXPORT List *ts_chunk_get_window(int32 dimension_id, int64 point, int count,
Expand Down
9 changes: 5 additions & 4 deletions src/chunk_constraint.c
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,9 @@ ts_chunk_constraints_add_from_tuple(ChunkConstraints *ccs, const TupleInfo *ti)
/*
* Create a dimensional CHECK constraint for a partitioning dimension.
*/
static Constraint *
create_dimension_check_constraint(const Dimension *dim, const DimensionSlice *slice,
const char *name)
Constraint *
ts_chunk_constraint_dimensional_create(const Dimension *dim, const DimensionSlice *slice,
const char *name)
{
Constraint *constr = NULL;
bool isvarlena;
Expand Down Expand Up @@ -489,7 +489,8 @@ ts_chunk_constraints_create(const Hypertable *ht, const Chunk *chunk)

dim = ts_hyperspace_get_dimension_by_id(ht->space, slice->fd.dimension_id);
Assert(dim);
constr = create_dimension_check_constraint(dim, slice, NameStr(cc->fd.constraint_name));
constr =
ts_chunk_constraint_dimensional_create(dim, slice, NameStr(cc->fd.constraint_name));

/* In some cases, a CHECK constraint is not needed. For instance,
* if the range is -INF to +INF. */
Expand Down
9 changes: 6 additions & 3 deletions src/chunk_constraint.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ extern int ts_chunk_constraint_scan_by_dimension_slice(const DimensionSlice *sli
ChunkScanCtx *ctx, MemoryContext mctx);
extern int ts_chunk_constraint_scan_by_dimension_slice_to_list(const DimensionSlice *slice,
List **list, MemoryContext mctx);
extern int ts_chunk_constraint_scan_by_dimension_slice_id(int32 dimension_slice_id,
ChunkConstraints *ccs,
MemoryContext mctx);
extern int TSDLLEXPORT ts_chunk_constraint_scan_by_dimension_slice_id(int32 dimension_slice_id,
ChunkConstraints *ccs,
MemoryContext mctx);
extern ChunkConstraint *ts_chunk_constraints_add(ChunkConstraints *ccs, int32 chunk_id,
int32 dimension_slice_id,
const char *constraint_name,
Expand All @@ -58,6 +58,9 @@ extern TSDLLEXPORT int ts_chunk_constraints_add_inheritable_constraints(ChunkCon
extern TSDLLEXPORT int ts_chunk_constraints_add_inheritable_check_constraints(
ChunkConstraints *ccs, int32 chunk_id, const char chunk_relkind, Oid hypertable_oid);
extern TSDLLEXPORT void ts_chunk_constraints_insert_metadata(const ChunkConstraints *ccs);
extern TSDLLEXPORT Constraint *ts_chunk_constraint_dimensional_create(const Dimension *dim,
const DimensionSlice *slice,
const char *name);
extern TSDLLEXPORT void ts_chunk_constraints_create(const Hypertable *ht, const Chunk *chunk);
extern void ts_chunk_constraint_create_on_chunk(const Hypertable *ht, const Chunk *chunk,
Oid constraint_oid);
Expand Down
2 changes: 2 additions & 0 deletions src/cross_module_fn.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ CROSSMODULE_WRAPPER(chunk_create_empty_table);

CROSSMODULE_WRAPPER(recompress_chunk_segmentwise);
CROSSMODULE_WRAPPER(get_compressed_chunk_index_for_recompression);
CROSSMODULE_WRAPPER(merge_chunks);

/* hypercore */
CROSSMODULE_WRAPPER(is_compressed_tid);
Expand Down Expand Up @@ -407,6 +408,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
.recompress_chunk_segmentwise = error_no_default_fn_pg_community,
.get_compressed_chunk_index_for_recompression = error_no_default_fn_pg_community,
.preprocess_query_tsl = preprocess_query_tsl_default_fn_community,
.merge_chunks = error_no_default_fn_pg_community,
};

TSDLLEXPORT CrossModuleFunctions *ts_cm_functions = &ts_cm_functions_default;
1 change: 1 addition & 0 deletions src/cross_module_fn.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ typedef struct CrossModuleFunctions
PGFunction recompress_chunk_segmentwise;
PGFunction get_compressed_chunk_index_for_recompression;
void (*preprocess_query_tsl)(Query *parse, int *cursor_opts);
PGFunction merge_chunks;
} CrossModuleFunctions;

extern TSDLLEXPORT CrossModuleFunctions *ts_cm_functions;
Expand Down
3 changes: 2 additions & 1 deletion src/dimension.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ extern Hyperspace *ts_dimension_scan(int32 hypertable_id, Oid main_table_relid,
extern DimensionSlice *ts_dimension_calculate_default_slice(const Dimension *dim, int64 value);
extern TSDLLEXPORT Point *ts_hyperspace_calculate_point(const Hyperspace *h, TupleTableSlot *slot);
extern int ts_dimension_get_slice_ordinal(const Dimension *dim, const DimensionSlice *slice);
extern const Dimension *ts_hyperspace_get_dimension_by_id(const Hyperspace *hs, int32 id);
extern TSDLLEXPORT const Dimension *ts_hyperspace_get_dimension_by_id(const Hyperspace *hs,
int32 id);
extern TSDLLEXPORT const Dimension *ts_hyperspace_get_dimension(const Hyperspace *hs,
DimensionType type, Index n);
extern TSDLLEXPORT Dimension *ts_hyperspace_get_mutable_dimension(Hyperspace *hs,
Expand Down
12 changes: 7 additions & 5 deletions src/dimension_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ ts_dimension_slice_scan_range_limit(int32 dimension_id, StrategyNumber start_str
int limit, const ScanTupLock *tuplock);
extern DimensionVec *ts_dimension_slice_collision_scan_limit(int32 dimension_id, int64 range_start,
int64 range_end, int limit);
extern bool ts_dimension_slice_scan_for_existing(const DimensionSlice *slice,
const ScanTupLock *tuplock);
extern TSDLLEXPORT bool ts_dimension_slice_scan_for_existing(const DimensionSlice *slice,
const ScanTupLock *tuplock);
extern DimensionSlice *ts_dimension_slice_scan_by_id_and_lock(int32 dimension_slice_id,
const ScanTupLock *tuplock,
MemoryContext mctx,
Expand All @@ -70,18 +70,20 @@ extern DimensionVec *ts_dimension_slice_scan_by_dimension_before_point(int32 dim
ScanDirection scandir,
MemoryContext mctx);
extern int ts_dimension_slice_delete_by_dimension_id(int32 dimension_id, bool delete_constraints);
extern int ts_dimension_slice_delete_by_id(int32 dimension_slice_id, bool delete_constraints);
extern TSDLLEXPORT int ts_dimension_slice_delete_by_id(int32 dimension_slice_id,
bool delete_constraints);
extern TSDLLEXPORT DimensionSlice *ts_dimension_slice_create(int dimension_id, int64 range_start,
int64 range_end);
extern TSDLLEXPORT DimensionSlice *ts_dimension_slice_copy(const DimensionSlice *original);
extern TSDLLEXPORT bool ts_dimension_slices_collide(const DimensionSlice *slice1,
const DimensionSlice *slice2);
extern bool ts_dimension_slices_equal(const DimensionSlice *slice1, const DimensionSlice *slice2);
extern TSDLLEXPORT bool ts_dimension_slices_equal(const DimensionSlice *slice1,
const DimensionSlice *slice2);
extern bool ts_dimension_slice_cut(DimensionSlice *to_cut, const DimensionSlice *other,
int64 coord);
extern void ts_dimension_slice_free(DimensionSlice *slice);
extern int ts_dimension_slice_insert_multi(DimensionSlice **slice, Size num_slices);
extern void ts_dimension_slice_insert(DimensionSlice *slice);
extern TSDLLEXPORT void ts_dimension_slice_insert(DimensionSlice *slice);
extern int ts_dimension_slice_cmp(const DimensionSlice *left, const DimensionSlice *right);
extern int ts_dimension_slice_cmp_coordinate(const DimensionSlice *slice, int64 coord);

Expand Down
4 changes: 2 additions & 2 deletions src/hypercube.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ typedef struct Hypercube
(sizeof(Hypercube) + (sizeof(DimensionSlice *) * (num_dimensions)))

extern TSDLLEXPORT Hypercube *ts_hypercube_alloc(int16 num_dimensions);
extern void ts_hypercube_free(Hypercube *hc);
extern TSDLLEXPORT void ts_hypercube_free(Hypercube *hc);

extern TSDLLEXPORT DimensionSlice *
ts_hypercube_add_slice_from_range(Hypercube *hc, int32 dimension_id, int64 start, int64 end);
Expand All @@ -41,6 +41,6 @@ extern Hypercube *ts_hypercube_calculate_from_point(const Hyperspace *hs, const
extern bool ts_hypercubes_collide(const Hypercube *cube1, const Hypercube *cube2);
extern TSDLLEXPORT const DimensionSlice *ts_hypercube_get_slice_by_dimension_id(const Hypercube *hc,
int32 dimension_id);
extern Hypercube *ts_hypercube_copy(const Hypercube *hc);
extern TSDLLEXPORT Hypercube *ts_hypercube_copy(const Hypercube *hc);
extern bool ts_hypercube_equal(const Hypercube *hc1, const Hypercube *hc2);
extern void ts_hypercube_slice_sort(Hypercube *hc);
Loading

0 comments on commit c9760c6

Please sign in to comment.