diff --git a/Makefile b/Makefile index 5677ac4f..69f03650 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: duckdb install-duckdb clean-duckdb lintcheck check-regression-duckdb clean-regression .depend +.PHONY: duckdb install-duckdb clean-duckdb clean-all lintcheck check-regression-duckdb clean-regression MODULE_big = pg_duckdb EXTENSION = pg_duckdb @@ -22,7 +22,13 @@ SRCS = src/scan/heap_reader.cpp \ src/pgduckdb_planner.cpp \ src/pgduckdb_ruleutils.cpp \ src/pgduckdb_types.cpp \ - src/pgduckdb.cpp + src/pgduckdb.cpp \ + src/catalog/pgduckdb_storage.cpp \ + src/catalog/pgduckdb_schema.cpp \ + src/catalog/pgduckdb_table.cpp \ + src/catalog/pgduckdb_transaction.cpp \ + src/catalog/pgduckdb_transaction_manager.cpp \ + src/catalog/pgduckdb_catalog.cpp OBJS = $(subst .cpp,.o, $(SRCS)) @@ -47,7 +53,7 @@ endif override PG_CPPFLAGS += -Iinclude -Ithird_party/duckdb/src/include -Ithird_party/duckdb/third_party/re2 override PG_CXXFLAGS += -std=c++17 -Wno-sign-compare ${DUCKDB_BUILD_CXX_FLAGS} -SHLIB_LINK += -Wl,-rpath,$(PG_LIB)/ -lpq -L$(PG_LIB) -lduckdb -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/src -lstdc++ -llz4 +SHLIB_LINK += -Wl,-rpath,$(PG_LIB)/ -lpq -Lthird_party/duckdb/build/$(DUCKDB_BUILD_TYPE)/src -L$(PG_LIB) -lduckdb -lstdc++ -llz4 COMPILE.cc.bc = $(CXX) -Wno-ignored-attributes -Wno-register $(BITCODE_CXXFLAGS) $(CXXFLAGS) $(PG_CPPFLAGS) $(PG_CXXFLAGS) -I$(INCLUDEDIR_SERVER) -emit-llvm -c @@ -63,7 +69,7 @@ ifeq ($(UNAME_S),Linux) DUCKDB_LIB = libduckdb.so endif -all: duckdb $(OBJS) .depend +all: duckdb $(OBJS) include Makefile.global @@ -108,18 +114,12 @@ clean-duckdb: install: install-duckdb -clean: clean-regression clean-duckdb +clean-all: clean clean-regression clean-duckdb lintcheck: clang-tidy $(SRCS) -- -I$(INCLUDEDIR) -I$(INCLUDEDIR_SERVER) -Iinclude $(CPPFLAGS) -std=c++17 ruff check -.depend: - $(RM) -f .depend - $(foreach SRC,$(SRCS),$(CXX) $(CPPFLAGS) -I$(INCLUDEDIR) -I$(INCLUDEDIR_SERVER) -MM -MT $(SRC:.cpp=.o) $(SRC) >> .depend;) - format: git clang-format origin/main ruff format - -include .depend diff --git a/Makefile.global b/Makefile.global index 116c59a5..211dab8f 100644 --- a/Makefile.global +++ b/Makefile.global @@ -7,3 +7,50 @@ INCLUDEDIR_SERVER := ${shell $(PG_CONFIG) --includedir-server} USE_PGXS = 1 include $(PGXS) + +# All the below stuff is vendored in from Postgres its Makefile.global. It's +# normally only enabled when --enabled-depend is provided to Postgres its +# ./configure script. This enables it even if that is not the case, so that +# running "make clean" is pretty much never necessary anymore to re-trigger +# builds of C/C++ files when their headers are modified. +ifneq ($(autodepend), yes) + +ifndef COMPILE.c +COMPILE.c = $(CC) $(CFLAGS) $(CPPFLAGS) -c +endif + +ifndef COMPILE.cc +COMPILE.cc = $(CXX) $(CXXFLAGS) $(CPPFLAGS) -c +endif + +DEPDIR = .deps + +ifeq ($(GCC), yes) + +# GCC allows us to create object and dependency file in one invocation. +%.o : %.c + @if test ! -d $(DEPDIR); then mkdir -p $(DEPDIR); fi + $(COMPILE.c) -o $@ $< -MMD -MP -MF $(DEPDIR)/$(*F).Po + +%.o : %.cpp + @if test ! -d $(DEPDIR); then mkdir -p $(DEPDIR); fi + $(COMPILE.cc) -o $@ $< -MMD -MP -MF $(DEPDIR)/$(*F).Po + +endif # GCC + +# Include all the dependency files generated for the current +# directory. Note that make would complain if include was called with +# no arguments. +Po_files := $(wildcard $(DEPDIR)/*.Po) +ifneq (,$(Po_files)) +include $(Po_files) +endif + +# hook for clean-up +clean distclean: clean-deps + +.PHONY: clean-deps +clean-deps: + @rm -rf $(DEPDIR) + +endif # autodepend diff --git a/README.md b/README.md index 73561ee3..094c849b 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,14 @@ pg_duckdb is a Postgres extension that embeds DuckDB's columnar-vectorized analy pg_duckdb was developed in collaboration with our partners, [Hydra](https://hydra.so) and [MotherDuck](https://motherduck.com). +## Goals + +* Broad support for Postgres types and functions within analytical context +* Query external datasets on S3, GCP and Azure (Parquet, Iceberg and Delta Lake) +* Join native Postgres and analytical data +* Direct access to data stored in MotherDuck +* Support installation of DuckDB extensions + ## Installation Pre-built binaries and additional installation options are coming soon. @@ -38,7 +46,7 @@ CREATE EXTENSION pg_duckdb; - `SELECT n FROM read_parquet('s3://bucket/file.parquet') AS (n int)` - `SELECT n FROM read_csv('s3://bucket/file.csv') AS (n int)` - You can pass globs and arrays to these functions, just like in DuckDB -- Enable the DuckDB Iceberg extension using `SELECT duckdb.enable_extension('iceberg')` and read Iceberg files with `iceberg_scan`. +- Enable the DuckDB Iceberg extension using `SELECT duckdb.install_extension('iceberg')` and read Iceberg files with `iceberg_scan`. - Write a query — or an entire table — to parquet in object storage. - `COPY (SELECT foo, bar FROM baz) TO 's3://...'` - `COPY table TO 's3://...'` @@ -96,7 +104,7 @@ Please see the [project roadmap][roadmap] for upcoming planned tasks and feature ### Connect with MotherDuck -pg_duckdb integration with MotherDuck will enable hybrid execution with Differential Storage. +pg_duckdb integration with MotherDuck will enable dual execution with Differential Storage. * Zero-copy snapshots and forks * Time travel diff --git a/include/pgduckdb/catalog/pgduckdb_catalog.hpp b/include/pgduckdb/catalog/pgduckdb_catalog.hpp new file mode 100644 index 00000000..5a7827af --- /dev/null +++ b/include/pgduckdb/catalog/pgduckdb_catalog.hpp @@ -0,0 +1,55 @@ +#pragma once + +#include "duckdb/storage/storage_extension.hpp" +#include "duckdb/catalog/catalog.hpp" +#include "pgduckdb/catalog/pgduckdb_schema.hpp" + +extern "C" { +#include "postgres.h" +#include "miscadmin.h" +#include "utils/snapshot.h" +} + +namespace duckdb { + +class PostgresCatalog : public Catalog { +public: + PostgresCatalog(AttachedDatabase &db, const string &connection_string, AccessMode access_mode); + +public: + static unique_ptr Attach(StorageExtensionInfo *storage_info, ClientContext &context, AttachedDatabase &db, + const string &name, AttachInfo &info, AccessMode access_mode); + +public: + string path; + AccessMode access_mode; + +public: + // -- Catalog API -- + void Initialize(bool load_builtin) override; + string GetCatalogType() override; + optional_ptr CreateSchema(CatalogTransaction transaction, CreateSchemaInfo &info) override; + optional_ptr GetSchema(CatalogTransaction transaction, const string &schema_name, + OnEntryNotFound if_not_found, + QueryErrorContext error_context = QueryErrorContext()) override; + void ScanSchemas(ClientContext &context, std::function callback) override; + unique_ptr PlanCreateTableAs(ClientContext &context, LogicalCreateTable &op, + unique_ptr plan) override; + unique_ptr PlanInsert(ClientContext &context, LogicalInsert &op, + unique_ptr plan) override; + unique_ptr PlanDelete(ClientContext &context, LogicalDelete &op, + unique_ptr plan) override; + unique_ptr PlanUpdate(ClientContext &context, LogicalUpdate &op, + unique_ptr plan) override; + unique_ptr BindCreateIndex(Binder &binder, CreateStatement &stmt, TableCatalogEntry &table, + unique_ptr plan) override; + DatabaseSize GetDatabaseSize(ClientContext &context) override; + bool InMemory() override; + string GetDBPath() override; + void DropSchema(ClientContext &context, DropInfo &info) override; + +private: + case_insensitive_map_t> schemas; +}; + +} // namespace duckdb diff --git a/include/pgduckdb/catalog/pgduckdb_schema.hpp b/include/pgduckdb/catalog/pgduckdb_schema.hpp new file mode 100644 index 00000000..2538024a --- /dev/null +++ b/include/pgduckdb/catalog/pgduckdb_schema.hpp @@ -0,0 +1,46 @@ +#pragma once + +#include "duckdb/catalog/catalog_entry/schema_catalog_entry.hpp" +#include "pgduckdb/catalog/pgduckdb_table.hpp" + +extern "C" { +#include "postgres.h" +#include "miscadmin.h" +#include "utils/snapshot.h" +#include "nodes/pathnodes.h" +} + +namespace duckdb { + +class PostgresSchema : public SchemaCatalogEntry { +public: + PostgresSchema(Catalog &catalog, CreateSchemaInfo &info, Snapshot snapshot); + +public: + // -- Schema API -- + void Scan(ClientContext &context, CatalogType type, const std::function &callback) override; + void Scan(CatalogType type, const std::function &callback) override; + optional_ptr CreateIndex(CatalogTransaction transaction, CreateIndexInfo &info, + TableCatalogEntry &table) override; + optional_ptr CreateFunction(CatalogTransaction transaction, CreateFunctionInfo &info) override; + optional_ptr CreateTable(CatalogTransaction transaction, BoundCreateTableInfo &info) override; + optional_ptr CreateView(CatalogTransaction transaction, CreateViewInfo &info) override; + optional_ptr CreateSequence(CatalogTransaction transaction, CreateSequenceInfo &info) override; + optional_ptr CreateTableFunction(CatalogTransaction transaction, + CreateTableFunctionInfo &info) override; + optional_ptr CreateCopyFunction(CatalogTransaction transaction, + CreateCopyFunctionInfo &info) override; + optional_ptr CreatePragmaFunction(CatalogTransaction transaction, + CreatePragmaFunctionInfo &info) override; + optional_ptr CreateCollation(CatalogTransaction transaction, CreateCollationInfo &info) override; + optional_ptr CreateType(CatalogTransaction transaction, CreateTypeInfo &info) override; + optional_ptr GetEntry(CatalogTransaction transaction, CatalogType type, const string &name) override; + void DropEntry(ClientContext &context, DropInfo &info) override; + void Alter(CatalogTransaction transaction, AlterInfo &info) override; + +public: + Snapshot snapshot; + Catalog &catalog; +}; + +} // namespace duckdb diff --git a/include/pgduckdb/catalog/pgduckdb_storage.hpp b/include/pgduckdb/catalog/pgduckdb_storage.hpp new file mode 100644 index 00000000..c1c08a2c --- /dev/null +++ b/include/pgduckdb/catalog/pgduckdb_storage.hpp @@ -0,0 +1,27 @@ +#pragma once + +#include "duckdb/storage/storage_extension.hpp" +extern "C" { +#include "postgres.h" +#include "miscadmin.h" +#include "utils/snapshot.h" +#include "nodes/pathnodes.h" +} + +namespace duckdb { + +class PostgresStorageExtensionInfo : public StorageExtensionInfo { +public: + PostgresStorageExtensionInfo(Snapshot snapshot) : snapshot(snapshot) { + } + +public: + Snapshot snapshot; +}; + +class PostgresStorageExtension : public StorageExtension { +public: + PostgresStorageExtension(Snapshot snapshot); +}; + +} // namespace duckdb diff --git a/include/pgduckdb/catalog/pgduckdb_table.hpp b/include/pgduckdb/catalog/pgduckdb_table.hpp new file mode 100644 index 00000000..70966383 --- /dev/null +++ b/include/pgduckdb/catalog/pgduckdb_table.hpp @@ -0,0 +1,71 @@ +#pragma once + +#include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp" +#include "duckdb/storage/table_storage_info.hpp" + +extern "C" { +#include "postgres.h" +#include "utils/snapshot.h" +#include "postgres.h" +#include "catalog/namespace.h" +#include "catalog/pg_class.h" +#include "optimizer/planmain.h" +#include "optimizer/planner.h" +#include "utils/builtins.h" +#include "utils/regproc.h" +#include "utils/snapmgr.h" +#include "utils/syscache.h" +#include "access/htup_details.h" +} + +namespace duckdb { + +class PostgresTable : public TableCatalogEntry { +public: + virtual ~PostgresTable() { + } + +public: + static bool PopulateColumns(CreateTableInfo &info, Oid relid, Snapshot snapshot); + +protected: + PostgresTable(Catalog &catalog, SchemaCatalogEntry &schema, CreateTableInfo &info, Cardinality cardinality, + Snapshot snapshot); + +protected: + Cardinality cardinality; + Snapshot snapshot; +}; + +class PostgresHeapTable : public PostgresTable { +public: + PostgresHeapTable(Catalog &catalog, SchemaCatalogEntry &schema, CreateTableInfo &info, Cardinality cardinality, + Snapshot snapshot, Oid oid); + +public: + // -- Table API -- + unique_ptr GetStatistics(ClientContext &context, column_t column_id) override; + TableFunction GetScanFunction(ClientContext &context, unique_ptr &bind_data) override; + TableStorageInfo GetStorageInfo(ClientContext &context) override; + +private: + Oid oid; +}; + +class PostgresIndexTable : public PostgresTable { +public: + PostgresIndexTable(Catalog &catalog, SchemaCatalogEntry &schema, CreateTableInfo &info, Cardinality cardinality, + Snapshot snapshot, Path *path, PlannerInfo *planner_info); + +public: + // -- Table API -- + unique_ptr GetStatistics(ClientContext &context, column_t column_id) override; + TableFunction GetScanFunction(ClientContext &context, unique_ptr &bind_data) override; + TableStorageInfo GetStorageInfo(ClientContext &context) override; + +private: + Path *path; + PlannerInfo *planner_info; +}; + +} // namespace duckdb diff --git a/include/pgduckdb/catalog/pgduckdb_transaction.hpp b/include/pgduckdb/catalog/pgduckdb_transaction.hpp new file mode 100644 index 00000000..c22362a0 --- /dev/null +++ b/include/pgduckdb/catalog/pgduckdb_transaction.hpp @@ -0,0 +1,43 @@ +#pragma once + +#include "duckdb/transaction/transaction.hpp" +#include "pgduckdb/catalog/pgduckdb_table.hpp" +#include "pgduckdb/catalog/pgduckdb_schema.hpp" + +namespace duckdb { + +class PostgresCatalog; + +class SchemaItems { +public: + SchemaItems(unique_ptr &&schema, const string &name) : name(name), schema(std::move(schema)) { + } + +public: + optional_ptr GetTable(const string &name, PlannerInfo *planner_info); + +public: + string name; + unique_ptr schema; + case_insensitive_map_t> tables; +}; + +class PostgresTransaction : public Transaction { +public: + PostgresTransaction(TransactionManager &manager, ClientContext &context, PostgresCatalog &catalog, + Snapshot snapshot); + ~PostgresTransaction() override; + +public: + optional_ptr GetCatalogEntry(CatalogType type, const string &schema, const string &name); + +private: + optional_ptr GetSchema(const string &name); + +private: + case_insensitive_map_t schemas; + PostgresCatalog &catalog; + Snapshot snapshot; +}; + +} // namespace duckdb diff --git a/include/pgduckdb/catalog/pgduckdb_transaction_manager.hpp b/include/pgduckdb/catalog/pgduckdb_transaction_manager.hpp new file mode 100644 index 00000000..a9c9cd48 --- /dev/null +++ b/include/pgduckdb/catalog/pgduckdb_transaction_manager.hpp @@ -0,0 +1,27 @@ +#pragma once + +#include "duckdb/transaction/transaction_manager.hpp" +#include "pgduckdb/catalog/pgduckdb_catalog.hpp" +#include "pgduckdb/catalog/pgduckdb_transaction.hpp" +#include "duckdb/common/reference_map.hpp" + +namespace duckdb { + +class PostgresTransactionManager : public TransactionManager { +public: + PostgresTransactionManager(AttachedDatabase &db_p, PostgresCatalog &catalog, Snapshot snapshot); + + Transaction &StartTransaction(ClientContext &context) override; + ErrorData CommitTransaction(ClientContext &context, Transaction &transaction) override; + void RollbackTransaction(Transaction &transaction) override; + + void Checkpoint(ClientContext &context, bool force = false) override; + +private: + PostgresCatalog &catalog; + Snapshot snapshot; + mutex transaction_lock; + reference_map_t> transactions; +}; + +} // namespace duckdb diff --git a/include/pgduckdb/pgduckdb_planner.hpp b/include/pgduckdb/pgduckdb_planner.hpp index b95fb65b..1784a46f 100644 --- a/include/pgduckdb/pgduckdb_planner.hpp +++ b/include/pgduckdb/pgduckdb_planner.hpp @@ -1,10 +1,16 @@ #pragma once +#include "duckdb.hpp" + extern "C" { #include "postgres.h" #include "optimizer/planner.h" } +#include "pgduckdb/pgduckdb_duckdb.hpp" + extern bool duckdb_explain_analyze; PlannedStmt *DuckdbPlanNode(Query *parse, int cursor_options, ParamListInfo bound_params); +std::tuple, duckdb::unique_ptr> +DuckdbPrepare(const Query *query, ParamListInfo bound_params); diff --git a/include/pgduckdb/pgduckdb_types.hpp b/include/pgduckdb/pgduckdb_types.hpp index 2f0d00a3..d444bef3 100644 --- a/include/pgduckdb/pgduckdb_types.hpp +++ b/include/pgduckdb/pgduckdb_types.hpp @@ -19,6 +19,7 @@ constexpr int64_t PGDUCKDB_DUCK_TIMESTAMP_OFFSET = INT64CONST(10957) * USECS_PER duckdb::LogicalType ConvertPostgresToDuckColumnType(Form_pg_attribute &attribute); Oid GetPostgresDuckDBType(duckdb::LogicalType type); +duckdb::Value ConvertPostgresParameterToDuckValue(Datum value, Oid postgres_type); void ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset); bool ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, idx_t col); void InsertTupleIntoChunk(duckdb::DataChunk &output, duckdb::shared_ptr scan_global_state, diff --git a/include/pgduckdb/scan/postgres_scan.hpp b/include/pgduckdb/scan/postgres_scan.hpp index 93cb7427..df563fcf 100644 --- a/include/pgduckdb/scan/postgres_scan.hpp +++ b/include/pgduckdb/scan/postgres_scan.hpp @@ -23,8 +23,8 @@ class PostgresScanGlobalState { TupleDesc m_tuple_desc; std::mutex m_lock; // Lock for one replacement scan bool m_count_tuples_only; - duckdb::map m_columns; - duckdb::map m_projections; + duckdb::map m_read_columns_ids; + duckdb::map m_output_columns_ids; duckdb::TableFilterSet *m_filters = nullptr; std::atomic m_total_row_count; }; @@ -39,14 +39,13 @@ class PostgresScanLocalState { bool m_exhausted_scan; }; -struct PostgresReplacementScanDataClientContextState : public duckdb::ClientContextState { +struct PostgresContextState : public duckdb::ClientContextState { public: - PostgresReplacementScanDataClientContextState(List *rtables, PlannerInfo *query_planner_info, List *needed_columns, - const char *query_string) + PostgresContextState(List *rtables, PlannerInfo *query_planner_info, List *needed_columns, const char *query_string) : m_rtables(rtables), m_query_planner_info(query_planner_info), m_needed_columns(needed_columns), m_query_string(query_string) { } - ~PostgresReplacementScanDataClientContextState() override {}; + ~PostgresContextState() override {}; public: List *m_rtables; diff --git a/sql/pg_duckdb--0.0.1.sql b/sql/pg_duckdb--0.0.1.sql index 97c6a277..202e74b7 100644 --- a/sql/pg_duckdb--0.0.1.sql +++ b/sql/pg_duckdb--0.0.1.sql @@ -142,6 +142,9 @@ CREATE TABLE extensions ( CREATE OR REPLACE FUNCTION install_extension(extension_name TEXT) RETURNS bool LANGUAGE C AS 'MODULE_PATHNAME', 'install_extension'; +CREATE OR REPLACE FUNCTION raw_query(query TEXT) RETURNS void + LANGUAGE C AS 'MODULE_PATHNAME', 'pgduckdb_raw_query'; + DO $$ BEGIN RAISE WARNING 'To actually execute queries using DuckDB you need to run "SET duckdb.execution TO true;"'; diff --git a/src/catalog/pgduckdb_catalog.cpp b/src/catalog/pgduckdb_catalog.cpp new file mode 100644 index 00000000..49b0e516 --- /dev/null +++ b/src/catalog/pgduckdb_catalog.cpp @@ -0,0 +1,113 @@ +#include "pgduckdb/catalog/pgduckdb_catalog.hpp" +#include "duckdb/parser/parsed_data/attach_info.hpp" +#include "duckdb/parser/parsed_data/create_schema_info.hpp" +#include "pgduckdb/catalog/pgduckdb_storage.hpp" +#include "pgduckdb/catalog/pgduckdb_transaction.hpp" + +extern "C" { +#include "postgres.h" +#include "utils/fmgroids.h" +#include "fmgr.h" +#include "catalog/pg_namespace.h" +#include "utils/syscache.h" +#include "utils/builtins.h" +#include "utils/rel.h" +#include "access/heapam.h" +#include "access/htup_details.h" +#include "catalog/indexing.h" +#include "access/genam.h" +#include "access/xact.h" +} + +namespace duckdb { + +PostgresCatalog::PostgresCatalog(AttachedDatabase &db, const string &connection_string, AccessMode access_mode) + : Catalog(db), path(connection_string), access_mode(access_mode) { +} + +unique_ptr +PostgresCatalog::Attach(StorageExtensionInfo *storage_info_p, ClientContext &context, AttachedDatabase &db, + const string &name, AttachInfo &info, AccessMode access_mode) { + string connection_string = info.path; + return make_uniq(db, connection_string, access_mode); +} + +// ------------------ Catalog API --------------------- + +void +PostgresCatalog::Initialize(bool load_builtin) { + return; +} + +string +PostgresCatalog::GetCatalogType() { + return "pgduckdb"; +} + +optional_ptr +PostgresCatalog::CreateSchema(CatalogTransaction transaction, CreateSchemaInfo &info) { + throw NotImplementedException("CreateSchema not supported yet"); +} + +optional_ptr +PostgresCatalog::GetSchema(CatalogTransaction transaction, const string &schema_name, OnEntryNotFound if_not_found, + QueryErrorContext error_context) { + auto &pg_transaction = transaction.transaction->Cast(); + auto res = pg_transaction.GetCatalogEntry(CatalogType::SCHEMA_ENTRY, schema_name, ""); + D_ASSERT(res); + D_ASSERT(res->type == CatalogType::SCHEMA_ENTRY); + return (SchemaCatalogEntry *)res.get(); +} + +void +PostgresCatalog::ScanSchemas(ClientContext &context, std::function callback) { + return; +} + +unique_ptr +PostgresCatalog::PlanCreateTableAs(ClientContext &context, LogicalCreateTable &op, unique_ptr plan) { + throw NotImplementedException("PlanCreateTableAs not supported yet"); +} + +unique_ptr +PostgresCatalog::PlanInsert(ClientContext &context, LogicalInsert &op, unique_ptr plan) { + throw NotImplementedException("PlanInsert not supported yet"); +} + +unique_ptr +PostgresCatalog::PlanDelete(ClientContext &context, LogicalDelete &op, unique_ptr plan) { + throw NotImplementedException("PlanDelete not supported yet"); +} + +unique_ptr +PostgresCatalog::PlanUpdate(ClientContext &context, LogicalUpdate &op, unique_ptr plan) { + throw NotImplementedException("PlanUpdate not supported yet"); +} + +unique_ptr +PostgresCatalog::BindCreateIndex(Binder &binder, CreateStatement &stmt, TableCatalogEntry &table, + unique_ptr plan) { + throw NotImplementedException("BindCreateIndex not supported yet"); +} + +DatabaseSize +PostgresCatalog::GetDatabaseSize(ClientContext &context) { + throw NotImplementedException("GetDatabaseSize not supported yet"); +} + +bool +PostgresCatalog::InMemory() { + return false; +} + +string +PostgresCatalog::GetDBPath() { + return path; +} + +void +PostgresCatalog::DropSchema(ClientContext &context, DropInfo &info) { + throw NotImplementedException("DropSchema not supported yet"); +} + +} // namespace duckdb diff --git a/src/catalog/pgduckdb_schema.cpp b/src/catalog/pgduckdb_schema.cpp new file mode 100644 index 00000000..960e8108 --- /dev/null +++ b/src/catalog/pgduckdb_schema.cpp @@ -0,0 +1,108 @@ +#include "pgduckdb/catalog/pgduckdb_schema.hpp" +#include "pgduckdb/catalog/pgduckdb_table.hpp" +#include "pgduckdb/catalog/pgduckdb_transaction.hpp" +#include "duckdb/parser/parsed_data/create_table_info.hpp" + +extern "C" { +#include "postgres.h" +#include "catalog/namespace.h" +#include "catalog/pg_class.h" +#include "optimizer/planmain.h" +#include "optimizer/planner.h" +#include "utils/builtins.h" +#include "utils/regproc.h" +#include "utils/snapmgr.h" +#include "utils/syscache.h" +#include "access/htup_details.h" +#include "executor/nodeIndexscan.h" +#include "nodes/pathnodes.h" +#include "nodes/execnodes.h" +#include "nodes/makefuncs.h" +#include "nodes/nodeFuncs.h" +#include "parser/parsetree.h" +#include "utils/rel.h" +} + +namespace duckdb { + +PostgresSchema::PostgresSchema(Catalog &catalog, CreateSchemaInfo &info, Snapshot snapshot) + : SchemaCatalogEntry(catalog, info), snapshot(snapshot), catalog(catalog) { +} + +void +PostgresSchema::Scan(ClientContext &context, CatalogType type, const std::function &callback) { + return; +} + +void +PostgresSchema::Scan(CatalogType type, const std::function &callback) { + throw NotImplementedException("Scan(no context) not supported yet"); +} + +optional_ptr +PostgresSchema::CreateIndex(CatalogTransaction transaction, CreateIndexInfo &info, TableCatalogEntry &table) { + throw NotImplementedException("CreateIndex not supported yet"); +} + +optional_ptr +PostgresSchema::CreateFunction(CatalogTransaction transaction, CreateFunctionInfo &info) { + throw NotImplementedException("CreateFunction not supported yet"); +} + +optional_ptr +PostgresSchema::CreateTable(CatalogTransaction transaction, BoundCreateTableInfo &info) { + throw NotImplementedException("CreateTable not supported yet"); +} + +optional_ptr +PostgresSchema::CreateView(CatalogTransaction transaction, CreateViewInfo &info) { + throw NotImplementedException("CreateView not supported yet"); +} + +optional_ptr +PostgresSchema::CreateSequence(CatalogTransaction transaction, CreateSequenceInfo &info) { + throw NotImplementedException("CreateSequence not supported yet"); +} + +optional_ptr +PostgresSchema::CreateTableFunction(CatalogTransaction transaction, CreateTableFunctionInfo &info) { + throw NotImplementedException("CreateTableFunction not supported yet"); +} + +optional_ptr +PostgresSchema::CreateCopyFunction(CatalogTransaction transaction, CreateCopyFunctionInfo &info) { + throw NotImplementedException("CreateCopyFunction not supported yet"); +} + +optional_ptr +PostgresSchema::CreatePragmaFunction(CatalogTransaction transaction, CreatePragmaFunctionInfo &info) { + throw NotImplementedException("CreatePragmaFunction not supported yet"); +} + +optional_ptr +PostgresSchema::CreateCollation(CatalogTransaction transaction, CreateCollationInfo &info) { + throw NotImplementedException("CreateCollation not supported yet"); +} + +optional_ptr +PostgresSchema::CreateType(CatalogTransaction transaction, CreateTypeInfo &info) { + throw NotImplementedException("CreateType not supported yet"); +} + +optional_ptr +PostgresSchema::GetEntry(CatalogTransaction transaction, CatalogType type, const string &entry_name) { + auto &pg_transaction = transaction.transaction->Cast(); + return pg_transaction.GetCatalogEntry(type, name, entry_name); +} + +void +PostgresSchema::DropEntry(ClientContext &context, DropInfo &info) { + throw NotImplementedException("DropEntry not supported yet"); +} + +void +PostgresSchema::Alter(CatalogTransaction transaction, AlterInfo &info) { + throw NotImplementedException("Alter not supported yet"); +} + +} // namespace duckdb diff --git a/src/catalog/pgduckdb_storage.cpp b/src/catalog/pgduckdb_storage.cpp new file mode 100644 index 00000000..7b684470 --- /dev/null +++ b/src/catalog/pgduckdb_storage.cpp @@ -0,0 +1,21 @@ +#include "pgduckdb/catalog/pgduckdb_storage.hpp" +#include "pgduckdb/catalog/pgduckdb_catalog.hpp" +#include "pgduckdb/catalog/pgduckdb_transaction_manager.hpp" + +namespace duckdb { + +static unique_ptr +CreateTransactionManager(StorageExtensionInfo *storage_info, AttachedDatabase &db, Catalog &catalog) { + auto &pg_storage_info = *(reinterpret_cast(storage_info)); + auto snapshot = pg_storage_info.snapshot; + + return make_uniq(db, catalog.Cast(), snapshot); +} + +PostgresStorageExtension::PostgresStorageExtension(Snapshot snapshot) { + attach = PostgresCatalog::Attach; + create_transaction_manager = CreateTransactionManager; + storage_info = make_uniq(snapshot); +} + +} // namespace duckdb diff --git a/src/catalog/pgduckdb_table.cpp b/src/catalog/pgduckdb_table.cpp new file mode 100644 index 00000000..15be3008 --- /dev/null +++ b/src/catalog/pgduckdb_table.cpp @@ -0,0 +1,111 @@ +#include "pgduckdb/pgduckdb_types.hpp" +#include "pgduckdb/catalog/pgduckdb_schema.hpp" +#include "pgduckdb/catalog/pgduckdb_table.hpp" +#include "duckdb/parser/parsed_data/create_table_info.hpp" +#include "pgduckdb/scan/postgres_seq_scan.hpp" +#include "pgduckdb/scan/postgres_scan.hpp" +#include "pgduckdb/scan/postgres_index_scan.hpp" + +extern "C" { +#include "postgres.h" +#include "access/tableam.h" +#include "access/heapam.h" +#include "storage/bufmgr.h" +#include "catalog/namespace.h" +#include "catalog/pg_class.h" +#include "optimizer/planmain.h" +#include "optimizer/planner.h" +#include "utils/builtins.h" +#include "utils/regproc.h" +#include "utils/snapmgr.h" +#include "utils/syscache.h" +#include "access/htup_details.h" +#include "parser/parsetree.h" +} + +namespace duckdb { + +PostgresTable::PostgresTable(Catalog &catalog, SchemaCatalogEntry &schema, CreateTableInfo &info, + Cardinality cardinality, Snapshot snapshot) + : TableCatalogEntry(catalog, schema, info), cardinality(cardinality), snapshot(snapshot) { +} + +bool +PostgresTable::PopulateColumns(CreateTableInfo &info, Oid relid, Snapshot snapshot) { + auto rel = RelationIdGetRelation(relid); + auto tupleDesc = RelationGetDescr(rel); + + if (!tupleDesc) { + elog(ERROR, "Failed to get tuple descriptor for relation with OID %u", relid); + RelationClose(rel); + return false; + } + + for (int i = 0; i < tupleDesc->natts; i++) { + Form_pg_attribute attr = &tupleDesc->attrs[i]; + auto col_name = duckdb::string(NameStr(attr->attname)); + auto duck_type = pgduckdb::ConvertPostgresToDuckColumnType(attr); + info.columns.AddColumn(duckdb::ColumnDefinition(col_name, duck_type)); + /* Log column name and type */ + elog(DEBUG3, "-- (DuckDB/PostgresHeapBind) Column name: %s, Type: %s --", col_name.c_str(), + duck_type.ToString().c_str()); + } + + RelationClose(rel); + return true; +} + +//===--------------------------------------------------------------------===// +// PostgresHeapTable +//===--------------------------------------------------------------------===// + +PostgresHeapTable::PostgresHeapTable(Catalog &catalog, SchemaCatalogEntry &schema, CreateTableInfo &info, + Cardinality cardinality, Snapshot snapshot, Oid oid) + : PostgresTable(catalog, schema, info, cardinality, snapshot), oid(oid) { +} + +unique_ptr +PostgresHeapTable::GetStatistics(ClientContext &context, column_t column_id) { + throw duckdb::NotImplementedException("GetStatistics not supported yet"); +} + +TableFunction +PostgresHeapTable::GetScanFunction(ClientContext &context, unique_ptr &bind_data) { + bind_data = duckdb::make_uniq(cardinality, oid, snapshot); + return pgduckdb::PostgresSeqScanFunction(); +} + +TableStorageInfo +PostgresHeapTable::GetStorageInfo(ClientContext &context) { + throw duckdb::NotImplementedException("GetStorageInfo not supported yet"); +} + +//===--------------------------------------------------------------------===// +// PostgresIndexTable +//===--------------------------------------------------------------------===// + +PostgresIndexTable::PostgresIndexTable(Catalog &catalog, SchemaCatalogEntry &schema, CreateTableInfo &info, + Cardinality cardinality, Snapshot snapshot, Path *path, + PlannerInfo *planner_info) + : PostgresTable(catalog, schema, info, cardinality, snapshot), path(path), planner_info(planner_info) { +} + +unique_ptr +PostgresIndexTable::GetStatistics(ClientContext &context, column_t column_id) { + throw duckdb::NotImplementedException("GetStatistics not supported yet"); +} + +TableFunction +PostgresIndexTable::GetScanFunction(ClientContext &context, unique_ptr &bind_data) { + RangeTblEntry *rte = planner_rt_fetch(path->parent->relid, planner_info); + bind_data = duckdb::make_uniq(cardinality, path, planner_info, rte->relid, + snapshot); + return pgduckdb::PostgresIndexScanFunction(); +} + +TableStorageInfo +PostgresIndexTable::GetStorageInfo(ClientContext &context) { + throw duckdb::NotImplementedException("GetStorageInfo not supported yet"); +} + +} // namespace duckdb diff --git a/src/catalog/pgduckdb_transaction.cpp b/src/catalog/pgduckdb_transaction.cpp new file mode 100644 index 00000000..67258ac4 --- /dev/null +++ b/src/catalog/pgduckdb_transaction.cpp @@ -0,0 +1,179 @@ +#include "pgduckdb/catalog/pgduckdb_catalog.hpp" +#include "pgduckdb/catalog/pgduckdb_transaction.hpp" +#include "pgduckdb/catalog/pgduckdb_table.hpp" +#include "pgduckdb/scan/postgres_scan.hpp" +#include "duckdb/parser/parsed_data/create_table_info.hpp" +#include "duckdb/parser/parsed_data/create_schema_info.hpp" +#include "duckdb/catalog/catalog.hpp" + +extern "C" { +#include "postgres.h" +#include "catalog/namespace.h" +#include "catalog/pg_class.h" +#include "optimizer/planmain.h" +#include "optimizer/planner.h" +#include "utils/builtins.h" +#include "utils/regproc.h" +#include "utils/snapmgr.h" +#include "utils/syscache.h" +#include "access/htup_details.h" +#include "executor/nodeIndexscan.h" +#include "nodes/pathnodes.h" +#include "nodes/execnodes.h" +#include "nodes/makefuncs.h" +#include "nodes/nodeFuncs.h" +#include "parser/parsetree.h" +#include "utils/rel.h" +} + +namespace duckdb { + +PostgresTransaction::PostgresTransaction(TransactionManager &manager, ClientContext &context, PostgresCatalog &catalog, + Snapshot snapshot) + : Transaction(manager, context), catalog(catalog), snapshot(snapshot) { +} + +PostgresTransaction::~PostgresTransaction() { +} + +static RelOptInfo * +FindMatchingRelEntry(Oid relid, PlannerInfo *planner_info) { + int i = 1; + RelOptInfo *node = nullptr; + for (; i < planner_info->simple_rel_array_size; i++) { + if (planner_info->simple_rte_array[i]->rtekind == RTE_SUBQUERY && planner_info->simple_rel_array[i]) { + node = FindMatchingRelEntry(relid, planner_info->simple_rel_array[i]->subroot); + if (node) { + return node; + } + } else if (planner_info->simple_rte_array[i]->rtekind == RTE_RELATION) { + if (relid == planner_info->simple_rte_array[i]->relid) { + return planner_info->simple_rel_array[i]; + } + }; + } + return nullptr; +} + +static bool +IsIndexScan(const Path *nodePath) { + if (nodePath == nullptr) { + return false; + } + + if (nodePath->pathtype == T_IndexScan || nodePath->pathtype == T_IndexOnlyScan) { + return true; + } + + return false; +} + +optional_ptr +SchemaItems::GetTable(const string &entry_name, PlannerInfo *planner_info) { + auto it = tables.find(entry_name); + if (it != tables.end()) { + return it->second.get(); + } + auto snapshot = schema->snapshot; + auto &catalog = schema->catalog; + + List *name_list = NIL; + name_list = lappend(name_list, makeString(pstrdup(name.c_str()))); + name_list = lappend(name_list, makeString(pstrdup(entry_name.c_str()))); + + RangeVar *table_range_var = makeRangeVarFromNameList(name_list); + Oid rel_oid = RangeVarGetRelid(table_range_var, AccessShareLock, true); + if (rel_oid == InvalidOid) { + // Table could not be found + return nullptr; + } + + // Check if the Relation is a VIEW + auto tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(rel_oid)); + if (!HeapTupleIsValid(tuple)) { + elog(ERROR, "Cache lookup failed for relation %u", rel_oid); + } + + auto relForm = (Form_pg_class)GETSTRUCT(tuple); + + // Check if the relation is a view + if (relForm->relkind == RELKIND_VIEW) { + ReleaseSysCache(tuple); + // Let the replacement scan handle this, the ReplacementScan replaces the view with its view_definition, which + // will get bound again and hit a PostgresIndexTable / PostgresHeapTable. + return nullptr; + } + ReleaseSysCache(tuple); + + Path *node_path = nullptr; + + if (planner_info) { + auto node = FindMatchingRelEntry(rel_oid, planner_info); + if (node) { + node_path = get_cheapest_fractional_path(node, 0.0); + } + } + + unique_ptr table; + CreateTableInfo info; + info.table = entry_name; + Cardinality cardinality = node_path ? node_path->rows : 1; + if (IsIndexScan(node_path)) { + RangeTblEntry *rte = planner_rt_fetch(node_path->parent->relid, planner_info); + rel_oid = rte->relid; + if (!PostgresTable::PopulateColumns(info, rel_oid, snapshot)) { + return nullptr; + } + table = make_uniq(catalog, *schema, info, cardinality, snapshot, node_path, planner_info); + } else { + if (!PostgresTable::PopulateColumns(info, rel_oid, snapshot)) { + return nullptr; + } + table = make_uniq(catalog, *schema, info, cardinality, snapshot, rel_oid); + } + + tables[entry_name] = std::move(table); + return tables[entry_name].get(); +} + +optional_ptr +PostgresTransaction::GetSchema(const string &name) { + auto it = schemas.find(name); + if (it != schemas.end()) { + return it->second.schema.get(); + } + + CreateSchemaInfo create_schema; + create_schema.schema = name; + auto pg_schema = make_uniq(catalog, create_schema, snapshot); + schemas.emplace(std::make_pair(name, SchemaItems(std::move(pg_schema), name))); + return schemas.at(name).schema.get(); +} + +optional_ptr +PostgresTransaction::GetCatalogEntry(CatalogType type, const string &schema, const string &name) { + auto scan_data = context.lock()->registered_state->Get("postgres_state"); + if (!scan_data) { + throw InternalException("Could not find 'postgres_state' in 'PostgresTransaction::GetCatalogEntry'"); + } + auto planner_info = scan_data->m_query_planner_info; + switch (type) { + case CatalogType::TABLE_ENTRY: { + auto it = schemas.find(schema); + if (it == schemas.end()) { + return nullptr; + } + auto &schema_entry = it->second; + return schema_entry.GetTable(name, planner_info); + } + case CatalogType::SCHEMA_ENTRY: { + return GetSchema(schema); + } + default: + return nullptr; + } +} + +} // namespace duckdb + +// namespace duckdb diff --git a/src/catalog/pgduckdb_transaction_manager.cpp b/src/catalog/pgduckdb_transaction_manager.cpp new file mode 100644 index 00000000..b2343bd5 --- /dev/null +++ b/src/catalog/pgduckdb_transaction_manager.cpp @@ -0,0 +1,37 @@ +#include "pgduckdb/catalog/pgduckdb_transaction_manager.hpp" +#include "duckdb/main/attached_database.hpp" + +namespace duckdb { + +PostgresTransactionManager::PostgresTransactionManager(AttachedDatabase &db_p, PostgresCatalog &catalog, + Snapshot snapshot) + : TransactionManager(db_p), catalog(catalog), snapshot(snapshot) { +} + +Transaction & +PostgresTransactionManager::StartTransaction(ClientContext &context) { + auto transaction = make_uniq(*this, context, catalog, snapshot); + auto &result = *transaction; + lock_guard l(transaction_lock); + transactions[result] = std::move(transaction); + return result; +} + +ErrorData +PostgresTransactionManager::CommitTransaction(ClientContext &context, Transaction &transaction) { + lock_guard l(transaction_lock); + transactions.erase(transaction); + return ErrorData(); +} + +void +PostgresTransactionManager::RollbackTransaction(Transaction &transaction) { + return; +} + +void +PostgresTransactionManager::Checkpoint(ClientContext &context, bool force) { + return; +} + +} // namespace duckdb diff --git a/src/pgduckdb_duckdb.cpp b/src/pgduckdb_duckdb.cpp index 26f6659a..4b7ee3bb 100644 --- a/src/pgduckdb_duckdb.cpp +++ b/src/pgduckdb_duckdb.cpp @@ -1,6 +1,9 @@ #include "duckdb.hpp" #include "duckdb/parser/parsed_data/create_table_function_info.hpp" #include "duckdb/main/extension_util.hpp" +#include "duckdb/main/client_data.hpp" +#include "duckdb/catalog/catalog_search_path.hpp" +#include "duckdb/main/extension_install_info.hpp" #include "pgduckdb/pgduckdb_options.hpp" #include "pgduckdb/pgduckdb_duckdb.hpp" @@ -8,6 +11,7 @@ #include "pgduckdb/scan/postgres_index_scan.hpp" #include "pgduckdb/scan/postgres_seq_scan.hpp" #include "pgduckdb/pgduckdb_utils.hpp" +#include "pgduckdb/catalog/pgduckdb_storage.hpp" #include @@ -71,12 +75,19 @@ DuckDBManager::DuckDBManager() { duckdb::DBConfig config; config.SetOptionByName("extension_directory", GetExtensionDirectory()); + // Transforms VIEWs into their view definition config.replacement_scans.emplace_back(pgduckdb::PostgresReplacementScan); + database = duckdb::make_uniq(nullptr, &config); + duckdb::DBConfig::GetConfig(*database->instance).storage_extensions["pgduckdb"] = + duckdb::make_uniq(GetActiveSnapshot()); + duckdb::ExtensionInstallInfo extension_install_info; + database->instance->SetExtensionLoaded("pgduckdb", extension_install_info); auto connection = duckdb::make_uniq(*database); auto &context = *connection->context; + context.Query("ATTACH DATABASE 'pgduckdb' (TYPE pgduckdb)", false); LoadFunctions(context); LoadSecrets(context); LoadExtensions(context); @@ -126,7 +137,11 @@ DuckDBManager::LoadSecrets(duckdb::ClientContext &context) { appendStringInfo(secret_key, ", USE_SSL 'FALSE'"); } appendStringInfo(secret_key, ");"); - context.Query(secret_key->data, false); + auto res = context.Query(secret_key->data, false); + if (res->HasError()) { + elog(ERROR, "(PGDuckDB/LoadSecrets) secret `%s` could not be loaded with DuckDB", secret.id.c_str()); + } + pfree(secret_key->data); secret_id++; } @@ -154,9 +169,14 @@ DuckdbCreateConnection(List *rtables, PlannerInfo *planner_info, List *needed_co auto &db = DuckDBManager::Get().GetDatabase(); /* Add DuckDB replacement scan to read PG data */ auto con = duckdb::make_uniq(db); - con->context->registered_state->Insert( - "postgres_scan", duckdb::make_shared_ptr(rtables, planner_info, - needed_columns, query)); + auto &context = *con->context; + + context.registered_state->Insert( + "postgres_state", duckdb::make_shared_ptr(rtables, planner_info, needed_columns, query)); + auto res = context.Query("set search_path='pgduckdb.main'", false); + if (res->HasError()) { + elog(WARNING, "(DuckDB) %s", res->GetError().c_str()); + } return con; } diff --git a/src/pgduckdb_filter.cpp b/src/pgduckdb_filter.cpp index 200e3b42..081271d2 100644 --- a/src/pgduckdb_filter.cpp +++ b/src/pgduckdb_filter.cpp @@ -5,7 +5,8 @@ extern "C" { #include "postgres.h" #include "catalog/pg_type.h" #include "utils/builtins.h" - +#include "utils/date.h" +#include "utils/timestamp.h" } #include "pgduckdb/pgduckdb_filter.hpp" @@ -16,8 +17,8 @@ namespace pgduckdb { template bool -TemplatedFilterOperation(Datum &value, const duckdb::Value &constant) { - return OP::Operation((T)value, constant.GetValueUnsafe()); +TemplatedFilterOperation(T value, const duckdb::Value &constant) { + return OP::Operation(value, constant.GetValueUnsafe()); } template @@ -45,26 +46,26 @@ static bool FilterOperationSwitch(Datum &value, duckdb::Value &constant, Oid type_oid) { switch (type_oid) { case BOOLOID: - return TemplatedFilterOperation(value, constant); + return TemplatedFilterOperation(DatumGetBool(value), constant); case CHAROID: - return TemplatedFilterOperation(value, constant); + return TemplatedFilterOperation(DatumGetChar(value), constant); case INT2OID: - return TemplatedFilterOperation(value, constant); + return TemplatedFilterOperation(DatumGetInt16(value), constant); case INT4OID: - return TemplatedFilterOperation(value, constant); + return TemplatedFilterOperation(DatumGetInt32(value), constant); case INT8OID: - return TemplatedFilterOperation(value, constant); + return TemplatedFilterOperation(DatumGetInt64(value), constant); case FLOAT4OID: - return TemplatedFilterOperation(value, constant); + return TemplatedFilterOperation(DatumGetFloat4(value), constant); case FLOAT8OID: - return TemplatedFilterOperation(value, constant); + return TemplatedFilterOperation(DatumGetFloat8(value), constant); case DATEOID: { - Datum date_datum = static_cast(value + pgduckdb::PGDUCKDB_DUCK_DATE_OFFSET); - return TemplatedFilterOperation(date_datum, constant); + int32_t date = DatumGetDateADT(value) + pgduckdb::PGDUCKDB_DUCK_DATE_OFFSET; + return TemplatedFilterOperation(date, constant); } case TIMESTAMPOID: { - Datum timestamp_datum = static_cast(value + pgduckdb::PGDUCKDB_DUCK_TIMESTAMP_OFFSET); - return TemplatedFilterOperation(timestamp_datum, constant); + int64_t timestamp = DatumGetTimestamp(value) + pgduckdb::PGDUCKDB_DUCK_TIMESTAMP_OFFSET; + return TemplatedFilterOperation(timestamp, constant); } case TEXTOID: case VARCHAROID: diff --git a/src/pgduckdb_hooks.cpp b/src/pgduckdb_hooks.cpp index 6c7c6d70..486c58af 100644 --- a/src/pgduckdb_hooks.cpp +++ b/src/pgduckdb_hooks.cpp @@ -170,6 +170,17 @@ extern "C" { void DuckdbExplainOneQueryHook(Query *query, int cursorOptions, IntoClause *into, ExplainState *es, const char *queryString, ParamListInfo params, QueryEnvironment *queryEnv) { + /* + * It might seem sensible to store this data in the custom_private + * field of the CustomScan node, but that's not a trivial change to make. + * Storing this in a global variable works fine, as long as we only use + * this variable during planning when we're actually executing an explain + * QUERY (this can be checked by checking the commandTag of the + * ActivePortal). This even works when plans would normally be cached, + * because EXPLAIN always execute this hook whenever they are executed. + * EXPLAIN queries are also always re-planned (see + * standard_ExplainOneQuery). + */ duckdb_explain_analyze = es->analyze; prev_explain_one_query_hook(query, cursorOptions, into, es, queryString, params, queryEnv); } diff --git a/src/pgduckdb_node.cpp b/src/pgduckdb_node.cpp index daf267ed..ef4585d2 100644 --- a/src/pgduckdb_node.cpp +++ b/src/pgduckdb_node.cpp @@ -4,10 +4,15 @@ extern "C" { #include "postgres.h" #include "miscadmin.h" +#include "tcop/pquery.h" +#include "nodes/params.h" +#include "utils/ruleutils.h" } #include "pgduckdb/pgduckdb_node.hpp" #include "pgduckdb/pgduckdb_types.hpp" +#include "pgduckdb/pgduckdb_duckdb.hpp" +#include "pgduckdb/pgduckdb_planner.hpp" /* global variables */ CustomScanMethods duckdb_scan_scan_methods; @@ -17,6 +22,8 @@ static CustomExecMethods duckdb_scan_exec_methods; typedef struct DuckdbScanState { CustomScanState css; /* must be first field */ + const Query *query; + ParamListInfo params; duckdb::Connection *duckdb_connection; duckdb::PreparedStatement *prepared_statement; bool is_executed; @@ -46,10 +53,8 @@ static Node * Duckdb_CreateCustomScanState(CustomScan *cscan) { DuckdbScanState *duckdb_scan_state = (DuckdbScanState *)newNode(sizeof(DuckdbScanState), T_CustomScanState); CustomScanState *custom_scan_state = &duckdb_scan_state->css; - duckdb_scan_state->duckdb_connection = (duckdb::Connection *)linitial(cscan->custom_private); - duckdb_scan_state->prepared_statement = (duckdb::PreparedStatement *)lsecond(cscan->custom_private); - duckdb_scan_state->is_executed = false; - duckdb_scan_state->fetch_next = true; + + duckdb_scan_state->query = (const Query *)linitial(cscan->custom_private); custom_scan_state->methods = &duckdb_scan_exec_methods; return (Node *)custom_scan_state; } @@ -57,6 +62,19 @@ Duckdb_CreateCustomScanState(CustomScan *cscan) { void Duckdb_BeginCustomScan(CustomScanState *cscanstate, EState *estate, int eflags) { DuckdbScanState *duckdb_scan_state = (DuckdbScanState *)cscanstate; + auto prepare_result = DuckdbPrepare(duckdb_scan_state->query, estate->es_param_list_info); + auto prepared_query = std::move(std::get<0>(prepare_result)); + auto duckdb_connection = std::move(std::get<1>(prepare_result)); + + if (prepared_query->HasError()) { + elog(ERROR, "DuckDB re-planning failed %s", prepared_query->GetError().c_str()); + } + + duckdb_scan_state->duckdb_connection = duckdb_connection.release(); + duckdb_scan_state->prepared_statement = prepared_query.release(); + duckdb_scan_state->params = estate->es_param_list_info; + duckdb_scan_state->is_executed = false; + duckdb_scan_state->fetch_next = true; duckdb_scan_state->css.ss.ps.ps_ResultTupleDesc = duckdb_scan_state->css.ss.ss_ScanTupleSlot->tts_tupleDescriptor; HOLD_CANCEL_INTERRUPTS(); } @@ -66,8 +84,33 @@ ExecuteQuery(DuckdbScanState *state) { auto &prepared = *state->prepared_statement; auto &query_results = state->query_results; auto &connection = state->duckdb_connection; + auto pg_params = state->params; + const auto num_params = pg_params ? pg_params->numParams : 0; + duckdb::vector duckdb_params; + for (int i = 0; i < num_params; i++) { + ParamExternData *pg_param; + ParamExternData tmp_workspace; + + /* give hook a chance in case parameter is dynamic */ + if (pg_params->paramFetch != NULL) + pg_param = pg_params->paramFetch(pg_params, i + 1, false, &tmp_workspace); + else + pg_param = &pg_params->params[i]; + + if (pg_param->isnull) { + duckdb_params.push_back(duckdb::Value()); + } else { + if (!OidIsValid(pg_param->ptype)) { + elog(ERROR, "parameter with invalid type during execution"); + } + duckdb_params.push_back(pgduckdb::ConvertPostgresParameterToDuckValue(pg_param->value, pg_param->ptype)); + } + } - auto pending = prepared.PendingQuery(); + auto pending = prepared.PendingQuery(duckdb_params, true); + if (pending->HasError()) { + elog(ERROR, "DuckDB execute returned an error: %s", pending->GetError().c_str()); + } duckdb::PendingExecutionResult execution_result; do { execution_result = pending->ExecuteTask(); @@ -99,7 +142,8 @@ Duckdb_ExecCustomScan(CustomScanState *node) { TupleTableSlot *slot = duckdb_scan_state->css.ss.ss_ScanTupleSlot; MemoryContext old_context; - if (!duckdb_scan_state->is_executed) { + bool already_executed = duckdb_scan_state->is_executed; + if (!already_executed) { ExecuteQuery(duckdb_scan_state); } @@ -138,7 +182,7 @@ Duckdb_ExecCustomScan(CustomScanState *node) { duckdb_scan_state->current_row++; if (duckdb_scan_state->current_row >= duckdb_scan_state->current_data_chunk->size()) { - delete duckdb_scan_state->current_data_chunk.release(); + duckdb_scan_state->current_data_chunk.reset(); duckdb_scan_state->fetch_next = true; } @@ -160,14 +204,14 @@ Duckdb_ReScanCustomScan(CustomScanState *node) { void Duckdb_ExplainCustomScan(CustomScanState *node, List *ancestors, ExplainState *es) { DuckdbScanState *duckdb_scan_state = (DuckdbScanState *)node; - auto res = duckdb_scan_state->prepared_statement->Execute(); - std::string explain_output = "\n\n"; - auto chunk = res->Fetch(); + ExecuteQuery(duckdb_scan_state); + auto chunk = duckdb_scan_state->query_results->Fetch(); if (!chunk || chunk->size() == 0) { return; } /* Is it safe to hardcode this as result of DuckDB explain? */ auto value = chunk->GetValue(1, 0); + std::string explain_output = "\n\n"; explain_output += value.GetValue(); explain_output += "\n"; ExplainPropertyText("DuckDB Execution Plan", explain_output.c_str(), es); diff --git a/src/pgduckdb_options.cpp b/src/pgduckdb_options.cpp index 2c703726..d2f7e5ac 100644 --- a/src/pgduckdb_options.cpp +++ b/src/pgduckdb_options.cpp @@ -9,6 +9,7 @@ extern "C" { #include "access/xact.h" #include "catalog/indexing.h" #include "catalog/namespace.h" +#include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/rel.h" #include "utils/snapmgr.h" @@ -164,4 +165,19 @@ install_extension(PG_FUNCTION_ARGS) { PG_RETURN_BOOL(result); } +PG_FUNCTION_INFO_V1(pgduckdb_raw_query); +Datum +pgduckdb_raw_query(PG_FUNCTION_ARGS) { + const char *query = text_to_cstring(PG_GETARG_TEXT_PP(0)); + auto db = pgduckdb::DuckDBManager::Get().GetDatabase(); + auto connection = duckdb::make_uniq(db); + auto &context = *connection->context; + auto result = context.Query(query, false); + if (result->HasError()) { + elog(ERROR, "(PGDuckDB/DuckdbInstallExtension) %s", result->GetError().c_str()); + } + elog(NOTICE, "result: %s", result->ToString().c_str()); + PG_RETURN_BOOL(true); +} + } // extern "C" diff --git a/src/pgduckdb_planner.cpp b/src/pgduckdb_planner.cpp index 05cadb64..d01d9798 100644 --- a/src/pgduckdb_planner.cpp +++ b/src/pgduckdb_planner.cpp @@ -4,9 +4,12 @@ extern "C" { #include "postgres.h" #include "catalog/pg_type.h" #include "nodes/makefuncs.h" +#include "nodes/nodes.h" +#include "nodes/params.h" #include "optimizer/optimizer.h" #include "tcop/pquery.h" #include "utils/syscache.h" +#include "utils/guc.h" #include "pgduckdb/vendor/pg_ruleutils.h" } @@ -53,21 +56,55 @@ PlanQuery(Query *parse, ParamListInfo bound_params) { ); } -static Plan * -CreatePlan(Query *query, const char *query_string, ParamListInfo bound_params) { +std::tuple, duckdb::unique_ptr> +DuckdbPrepare(const Query *query, ParamListInfo bound_params) { + + /* + * Copy the query, so the original one is not modified by the + * subquery_planner call that PlanQuery does. + */ + Query *copied_query = (Query *)copyObjectImpl(query); + /* + Temporarily clear search_path so that the query will contain only fully qualified tables. + If we don't do this tables are only fully-qualified if they are not part of the current search_path. + NOTE: This still doesn't fully qualify tables in pg_catalog or temporary tables, for that we'd need to modify + pgduckdb_pg_get_querydef + */ + + auto save_nestlevel = NewGUCNestLevel(); + SetConfigOption("search_path", "", PGC_USERSET, PGC_S_SESSION); + const char *query_string = pgduckdb_pg_get_querydef(copied_query, false); + AtEOXact_GUC(false, save_nestlevel); - List *rtables = query->rtable; + if (ActivePortal && ActivePortal->commandTag == CMDTAG_EXPLAIN) { + if (duckdb_explain_analyze) { + query_string = psprintf("EXPLAIN ANALYZE %s", query_string); + } else { + query_string = psprintf("EXPLAIN %s", query_string); + } + } + + elog(DEBUG2, "(PGDuckDB/DuckdbPrepare) Preparing: %s", query_string); + List *rtables = copied_query->rtable; /* Extract required vars for table */ int flags = PVC_RECURSE_AGGREGATES | PVC_RECURSE_WINDOWFUNCS | PVC_RECURSE_PLACEHOLDERS; - List *vars = list_concat(pull_var_clause((Node *)query->targetList, flags), - pull_var_clause((Node *)query->jointree->quals, flags)); - - PlannerInfo *query_planner_info = PlanQuery(query, bound_params); + List *vars = list_concat(pull_var_clause((Node *)copied_query->targetList, flags), + pull_var_clause((Node *)copied_query->jointree->quals, flags)); + PlannerInfo *query_planner_info = PlanQuery(copied_query, bound_params); auto duckdb_connection = pgduckdb::DuckdbCreateConnection(rtables, query_planner_info, vars, query_string); auto context = duckdb_connection->context; - auto prepared_query = context->Prepare(query_string); + return {std::move(prepared_query), std::move(duckdb_connection)}; +} + +static Plan * +CreatePlan(Query *query, ParamListInfo bound_params) { + /* + * Prepare the query, se we can get the returned types and column names. + */ + auto prepare_result = DuckdbPrepare(query, bound_params); + auto prepared_query = std::move(std::get<0>(prepare_result)); if (prepared_query->HasError()) { elog(WARNING, "(PGDuckDB/CreatePlan) Prepared query returned an error: '%s", @@ -103,12 +140,12 @@ CreatePlan(Query *query, const char *query_string, ParamListInfo bound_params) { duckdb_node->custom_scan_tlist = lappend(duckdb_node->custom_scan_tlist, - makeTargetEntry((Expr *)var, i + 1, (char *)prepared_query->GetNames()[i].c_str(), false)); + makeTargetEntry((Expr *)var, i + 1, (char *)pstrdup(prepared_query->GetNames()[i].c_str()), false)); ReleaseSysCache(tp); } - duckdb_node->custom_private = list_make2(duckdb_connection.release(), prepared_query.release()); + duckdb_node->custom_private = list_make1(query); duckdb_node->methods = &duckdb_scan_scan_methods; return (Plan *)duckdb_node; @@ -116,17 +153,8 @@ CreatePlan(Query *query, const char *query_string, ParamListInfo bound_params) { PlannedStmt * DuckdbPlanNode(Query *parse, int cursor_options, ParamListInfo bound_params) { - const char *query_string = pgduckdb_pg_get_querydef(parse, false); - - if (ActivePortal && ActivePortal->commandTag == CMDTAG_EXPLAIN) { - if (duckdb_explain_analyze) { - query_string = psprintf("EXPLAIN ANALYZE %s", query_string); - } else { - query_string = psprintf("EXPLAIN %s", query_string); - } - } /* We need to check can we DuckDB create plan */ - Plan *duckdb_plan = (Plan *)castNode(CustomScan, CreatePlan(parse, query_string, bound_params)); + Plan *duckdb_plan = (Plan *)castNode(CustomScan, CreatePlan(parse, bound_params)); if (!duckdb_plan) { return nullptr; diff --git a/src/pgduckdb_types.cpp b/src/pgduckdb_types.cpp index 7a8f4048..f9d89dfb 100644 --- a/src/pgduckdb_types.cpp +++ b/src/pgduckdb_types.cpp @@ -9,12 +9,15 @@ extern "C" { #include "miscadmin.h" #include "catalog/pg_type.h" #include "executor/tuptable.h" +#include "utils/builtins.h" #include "utils/numeric.h" #include "utils/uuid.h" #include "utils/array.h" #include "fmgr.h" #include "utils/lsyscache.h" #include "utils/syscache.h" +#include "utils/date.h" +#include "utils/timestamp.h" } #include "pgduckdb/pgduckdb.h" @@ -707,6 +710,47 @@ ConvertDecimal(const NumericVar &numeric) { return (NumericIsNegative(numeric) ? -base_res : base_res); } +/* + * Convert a Postgres Datum to a DuckDB Value. This is meant to be used to + * covert query parameters in a prepared statement to its DuckDB equivalent. + * Passing it a Datum that is stored on disk results in undefined behavior, + * because this fuction makes no effert to detoast the Datum. + */ +duckdb::Value +ConvertPostgresParameterToDuckValue(Datum value, Oid postgres_type) { + switch (postgres_type) { + case BOOLOID: + return duckdb::Value::BOOLEAN(DatumGetBool(value)); + case INT2OID: + return duckdb::Value::SMALLINT(DatumGetInt16(value)); + case INT4OID: + return duckdb::Value::INTEGER(DatumGetInt32(value)); + case INT8OID: + return duckdb::Value::BIGINT(DatumGetInt64(value)); + case BPCHAROID: + case TEXTOID: + case JSONOID: + case VARCHAROID: { + // FIXME: TextDatumGetCstring allocates so it needs a + // guard, but it's a macro not a function, so our current gaurd + // template does not handle it. + return duckdb::Value(TextDatumGetCString(value)); + } + case DATEOID: + return duckdb::Value::DATE(duckdb::date_t(DatumGetDateADT(value) + PGDUCKDB_DUCK_DATE_OFFSET)); + case TIMESTAMPOID: + return duckdb::Value::TIMESTAMP(duckdb::timestamp_t(DatumGetTimestamp(value) + PGDUCKDB_DUCK_TIMESTAMP_OFFSET)); + case FLOAT4OID: { + return duckdb::Value::FLOAT(DatumGetFloat4(value)); + } + case FLOAT8OID: { + return duckdb::Value::DOUBLE(DatumGetFloat8(value)); + } + default: + elog(ERROR, "Could not convert Postgres parameter of type: %d to DuckDB type", postgres_type); + } +} + void ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset) { auto &type = result.GetType(); @@ -982,12 +1026,18 @@ InsertTupleIntoChunk(duckdb::DataChunk &output, duckdb::shared_ptrm_columns.size()); - bool *nulls = (bool *)duckdb_malloc(sizeof(bool) * scan_global_state->m_columns.size()); + Datum *values = (Datum *)duckdb_malloc(sizeof(Datum) * scan_global_state->m_read_columns_ids.size()); + bool *nulls = (bool *)duckdb_malloc(sizeof(bool) * scan_global_state->m_read_columns_ids.size()); bool valid_tuple = true; - for (auto const &[columnIdx, valueIdx] : scan_global_state->m_columns) { + /* First we are fetching all required columns ordered by column id + * and than we need to write this tuple into output vector. Output column id list + * could be out of order so we need to match column values from ordered list. + */ + + /* Read heap tuple with all required columns. */ + for (auto const &[columnIdx, valueIdx] : scan_global_state->m_read_columns_ids) { values[valueIdx] = HeapTupleFetchNextColumnDatum(scan_global_state->m_tuple_desc, tuple, heap_tuple_read_state, columnIdx + 1, &nulls[valueIdx]); if (scan_global_state->m_filters && @@ -1002,36 +1052,34 @@ InsertTupleIntoChunk(duckdb::DataChunk &output, duckdb::shared_ptrm_projections.size(); idx++) { + /* Write tuple columns in output vector. */ + for (idx_t idx = 0; valid_tuple && idx < scan_global_state->m_output_columns_ids.size(); idx++) { auto &result = output.data[idx]; if (nulls[idx]) { auto &array_mask = duckdb::FlatVector::Validity(result); array_mask.SetInvalid(scan_local_state->m_output_vector_size); } else { - idx_t projectionColumnIdx = scan_global_state->m_columns[scan_global_state->m_projections[idx]]; - if (scan_global_state->m_tuple_desc->attrs[scan_global_state->m_projections[idx]].attlen == -1) { + idx_t output_column_idx = + scan_global_state->m_read_columns_ids[scan_global_state->m_output_columns_ids[idx]]; + if (scan_global_state->m_tuple_desc->attrs[scan_global_state->m_output_columns_ids[idx]].attlen == -1) { bool should_free = false; - values[projectionColumnIdx] = - DetoastPostgresDatum(reinterpret_cast(values[projectionColumnIdx]), &should_free); - ConvertPostgresToDuckValue(values[projectionColumnIdx], result, scan_local_state->m_output_vector_size); + values[output_column_idx] = + DetoastPostgresDatum(reinterpret_cast(values[output_column_idx]), &should_free); + ConvertPostgresToDuckValue(values[output_column_idx], result, scan_local_state->m_output_vector_size); if (should_free) { - duckdb_free(reinterpret_cast(values[projectionColumnIdx])); + duckdb_free(reinterpret_cast(values[output_column_idx])); } } else { - ConvertPostgresToDuckValue(values[projectionColumnIdx], result, scan_local_state->m_output_vector_size); + ConvertPostgresToDuckValue(values[output_column_idx], result, scan_local_state->m_output_vector_size); } } } if (valid_tuple) { scan_local_state->m_output_vector_size++; + scan_global_state->m_total_row_count++; } - output.SetCardinality(scan_local_state->m_output_vector_size); - output.Verify(); - - scan_global_state->m_total_row_count++; - duckdb_free(values); duckdb_free(nulls); } diff --git a/src/scan/heap_reader.cpp b/src/scan/heap_reader.cpp index af2cc1f0..a25817d4 100644 --- a/src/scan/heap_reader.cpp +++ b/src/scan/heap_reader.cpp @@ -139,6 +139,7 @@ HeapReader::ReadPageTuples(duckdb::DataChunk &output) { /* We have collected STANDARD_VECTOR_SIZE */ if (m_local_state->m_output_vector_size == STANDARD_VECTOR_SIZE) { output.SetCardinality(m_local_state->m_output_vector_size); + output.Verify(); m_local_state->m_output_vector_size = 0; return true; } @@ -147,6 +148,7 @@ HeapReader::ReadPageTuples(duckdb::DataChunk &output) { /* Next assigned block number is InvalidBlockNumber so we check did we write any tuples in output vector */ if (m_local_state->m_output_vector_size) { output.SetCardinality(m_local_state->m_output_vector_size); + output.Verify(); m_local_state->m_output_vector_size = 0; } diff --git a/src/scan/postgres_index_scan.cpp b/src/scan/postgres_index_scan.cpp index f5f5d8ad..729a319c 100644 --- a/src/scan/postgres_index_scan.cpp +++ b/src/scan/postgres_index_scan.cpp @@ -196,6 +196,7 @@ PostgresIndexScanFunction::PostgresIndexScanFunc(duckdb::ClientContext &context, } output.SetCardinality(local_state.m_local_state->m_output_vector_size); + output.Verify(); } duckdb::unique_ptr diff --git a/src/scan/postgres_scan.cpp b/src/scan/postgres_scan.cpp index 29ed4eaa..4c6873df 100644 --- a/src/scan/postgres_scan.cpp +++ b/src/scan/postgres_scan.cpp @@ -26,6 +26,7 @@ extern "C" { #include "pgduckdb/scan/postgres_scan.hpp" #include "pgduckdb/pgduckdb_types.hpp" +#include "pgduckdb/pgduckdb_utils.hpp" namespace pgduckdb { @@ -37,18 +38,24 @@ PostgresScanGlobalState::InitGlobalState(duckdb::TableFunctionInitInput &input) return; } - /* We need ordered columns ids for tuple fetch */ + /* We need ordered columns ids for reading tuple. */ for (duckdb::idx_t i = 0; i < input.column_ids.size(); i++) { - m_columns[input.column_ids[i]] = i; + m_read_columns_ids[input.column_ids[i]] = i; } + /* We need to check do we consider projection_ids or column_ids list to be used + * for writing to output vector. Projection ids list will be used when + * columns that are used for query filtering are not used afterwards; otherwise + * column ids list will be used and all read tuple columns need to passed + * to upper layers of query execution. + */ if (input.CanRemoveFilterColumns()) { for (duckdb::idx_t i = 0; i < input.projection_ids.size(); i++) { - m_projections[i] = input.column_ids[input.projection_ids[i]]; + m_output_columns_ids[i] = input.column_ids[input.projection_ids[i]]; } } else { - for (duckdb::idx_t i = 0; i < input.projection_ids.size(); i++) { - m_projections[i] = input.column_ids[i]; + for (duckdb::idx_t i = 0; i < input.column_ids.size(); i++) { + m_output_columns_ids[i] = input.column_ids[i]; } } @@ -56,85 +63,42 @@ PostgresScanGlobalState::InitGlobalState(duckdb::TableFunctionInitInput &input) } static Oid -FindMatchingRelation(const duckdb::string &to_find) { -#if PG_VERSION_NUM >= 160000 - RangeVar *table_range_var = makeRangeVarFromNameList(stringToQualifiedNameList(to_find.c_str(), NULL)); -#else - RangeVar *table_range_var = makeRangeVarFromNameList(stringToQualifiedNameList(to_find.c_str())); -#endif - Oid rel_oid = RangeVarGetRelid(table_range_var, AccessShareLock, true); - if (rel_oid != InvalidOid) { - return rel_oid; +FindMatchingRelation(const duckdb::string &schema, const duckdb::string &table) { + List *name_list = NIL; + if (!schema.empty()) { + name_list = lappend(name_list, makeString(pstrdup(schema.c_str()))); } - return InvalidOid; -} - -static duckdb::vector> -CreateFunctionSeqScanArguments(uint64 cardinality, Oid relid, Snapshot snapshot) { - duckdb::vector> children; + name_list = lappend(name_list, makeString(pstrdup(table.c_str()))); - children.push_back(duckdb::make_uniq( - duckdb::ExpressionType::COMPARE_EQUAL, duckdb::make_uniq("cardinality"), - duckdb::make_uniq(duckdb::Value::UBIGINT(cardinality)))); - - children.push_back(duckdb::make_uniq( - duckdb::ExpressionType::COMPARE_EQUAL, duckdb::make_uniq("relid"), - duckdb::make_uniq(duckdb::Value::UINTEGER(relid)))); - - children.push_back(duckdb::make_uniq( - duckdb::ExpressionType::COMPARE_EQUAL, duckdb::make_uniq("snapshot"), - duckdb::make_uniq(duckdb::Value::POINTER(duckdb::CastPointerToValue(snapshot))))); - - return children; -} - -static duckdb::vector> -CreateFunctionIndexScanArguments(uint64_t cardinality, Path *path, PlannerInfo *plannerInfo, Snapshot snapshot) { - duckdb::vector> children; - - children.push_back(duckdb::make_uniq( - duckdb::ExpressionType::COMPARE_EQUAL, duckdb::make_uniq("cardinality"), - duckdb::make_uniq(duckdb::Value::UBIGINT(cardinality)))); - - children.push_back(duckdb::make_uniq( - duckdb::ExpressionType::COMPARE_EQUAL, duckdb::make_uniq("path"), - duckdb::make_uniq(duckdb::Value::POINTER(duckdb::CastPointerToValue(path))))); - - children.push_back(duckdb::make_uniq( - duckdb::ExpressionType::COMPARE_EQUAL, duckdb::make_uniq("planner_info"), - duckdb::make_uniq( - duckdb::Value::POINTER(duckdb::CastPointerToValue(plannerInfo))))); - - children.push_back(duckdb::make_uniq( - duckdb::ExpressionType::COMPARE_EQUAL, duckdb::make_uniq("snapshot"), - duckdb::make_uniq(duckdb::Value::POINTER(duckdb::CastPointerToValue(snapshot))))); - - return children; + RangeVar *table_range_var = makeRangeVarFromNameList(name_list); + Oid relOid = RangeVarGetRelid(table_range_var, AccessShareLock, true); + if (relOid != InvalidOid) { + return relOid; + } + return InvalidOid; } duckdb::unique_ptr ReplaceView(Oid view) { auto oid = ObjectIdGetDatum(view); - Datum view_def = DirectFunctionCall1(pg_get_viewdef, oid); - auto view_definiton = text_to_cstring(DatumGetTextP(view_def)); + Datum viewdef = PostgresFunctionGuard( + [](PGFunction func, Datum arg) { return DirectFunctionCall1(func, arg); }, pg_get_viewdef, oid); + auto view_definition = text_to_cstring(DatumGetTextP(viewdef)); - if (!view_definiton) { - elog(WARNING, "(PGDuckDB/ReplaceView) Could not retrieve view definition for Relation with relid: %u", view); - return nullptr; + if (!view_definition) { + throw duckdb::InvalidInputException("Could not retrieve view definition for Relation with relid: %u", view); } duckdb::Parser parser; - parser.ParseQuery(view_definiton); + parser.ParseQuery(view_definition); auto statements = std::move(parser.statements); if (statements.size() != 1) { - elog(WARNING, "(PGDuckDB/ReplaceView) View definition contained more than 1 statement!"); - return nullptr; + throw duckdb::InvalidInputException("View definition contained more than 1 statement!"); } if (statements[0]->type != duckdb::StatementType::SELECT_STATEMENT) { - elog(WARNING, "(PGDuckDB/ReplaceView) View definition (%s) did not contain a SELECT statement!", - view_definiton); - return nullptr; + throw duckdb::InvalidInputException("View definition (%s) did not contain a SELECT statement!", + view_definition); } auto select = duckdb::unique_ptr_cast(std::move(statements[0])); @@ -142,31 +106,13 @@ ReplaceView(Oid view) { return std::move(subquery); } -static RelOptInfo * -FindMatchingRelEntry(Oid relid, PlannerInfo *planner_info) { - int i = 1; - RelOptInfo *node = nullptr; - for (; i < planner_info->simple_rel_array_size; i++) { - if (planner_info->simple_rte_array[i]->rtekind == RTE_SUBQUERY && planner_info->simple_rel_array[i]) { - node = FindMatchingRelEntry(relid, planner_info->simple_rel_array[i]->subroot); - if (node) { - return node; - } - } else if (planner_info->simple_rte_array[i]->rtekind == RTE_RELATION) { - if (relid == planner_info->simple_rte_array[i]->relid) { - return planner_info->simple_rel_array[i]; - } - }; - } - return nullptr; -} - duckdb::unique_ptr PostgresReplacementScan(duckdb::ClientContext &context, duckdb::ReplacementScanInput &input, duckdb::optional_ptr data) { - auto table_name = duckdb::ReplacementScan::GetFullPath(input); - auto scan_data = context.registered_state->Get("postgres_scan"); + auto &schema_name = input.schema_name; + auto &table_name = input.table_name; + auto scan_data = context.registered_state->Get("postgres_state"); if (!scan_data) { /* There is no scan data provided by postgres so we cannot access any * of postgres tables. This is the case for queries that are not @@ -176,15 +122,13 @@ PostgresReplacementScan(duckdb::ClientContext &context, duckdb::ReplacementScanI return nullptr; } - /* Check name against query table list and verify that it is heap table */ - auto relid = FindMatchingRelation(table_name); + auto relid = FindMatchingRelation(schema_name, table_name); if (relid == InvalidOid) { return nullptr; } - // Check if the Relation is a VIEW - auto tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid)); + auto tuple = PostgresFunctionGuard(SearchSysCache1, RELOID, ObjectIdGetDatum(relid)); if (!HeapTupleIsValid(tuple)) { elog(WARNING, "(PGDuckDB/PostgresReplacementScan) Cache lookup failed for relation %u", relid); return nullptr; @@ -192,43 +136,12 @@ PostgresReplacementScan(duckdb::ClientContext &context, duckdb::ReplacementScanI auto relForm = (Form_pg_class)GETSTRUCT(tuple); - // Check if the relation is a view - if (relForm->relkind == RELKIND_VIEW) { - ReleaseSysCache(tuple); - return ReplaceView(relid); - } - ReleaseSysCache(tuple); - - RelOptInfo *node = nullptr; - Path *node_path = nullptr; - - if (scan_data->m_query_planner_info) { - node = FindMatchingRelEntry(relid, scan_data->m_query_planner_info); - if (node) { - node_path = get_cheapest_fractional_path(node, 0.0); - } - } - - /* SELECT query will have nodePath so we can return cardinality estimate of scan */ - Cardinality nodeCardinality = node_path ? node_path->rows : 1; - - if ((node_path != nullptr && (node_path->pathtype == T_IndexScan || node_path->pathtype == T_IndexOnlyScan))) { - auto children = CreateFunctionIndexScanArguments(nodeCardinality, node_path, scan_data->m_query_planner_info, - GetActiveSnapshot()); - auto table_function = duckdb::make_uniq(); - table_function->function = - duckdb::make_uniq("postgres_index_scan", std::move(children)); - table_function->alias = table_name; - return std::move(table_function); - } else { - auto children = CreateFunctionSeqScanArguments(nodeCardinality, relid, GetActiveSnapshot()); - auto table_function = duckdb::make_uniq(); - table_function->function = - duckdb::make_uniq("postgres_seq_scan", std::move(children)); - table_function->alias = table_name; - return std::move(table_function); + if (relForm->relkind != RELKIND_VIEW) { + PostgresFunctionGuard(ReleaseSysCache, tuple); + return nullptr; } - return nullptr; + PostgresFunctionGuard(ReleaseSysCache, tuple); + return ReplaceView(relid); } } // namespace pgduckdb diff --git a/src/scan/postgres_seq_scan.cpp b/src/scan/postgres_seq_scan.cpp index f05859c8..0026839e 100644 --- a/src/scan/postgres_seq_scan.cpp +++ b/src/scan/postgres_seq_scan.cpp @@ -2,6 +2,7 @@ #include "pgduckdb/scan/postgres_seq_scan.hpp" #include "pgduckdb/pgduckdb_types.hpp" +#include namespace pgduckdb { @@ -14,7 +15,8 @@ PostgresSeqScanGlobalState::PostgresSeqScanGlobalState(Relation relation, duckdb m_heap_reader_global_state(duckdb::make_shared_ptr(relation)), m_relation(relation) { m_global_state->InitGlobalState(input); m_global_state->m_tuple_desc = RelationGetDescr(m_relation); - elog(DEBUG2, "(PGDuckDB/PostgresSeqScanGlobalState) Running %lu threads -- ", MaxThreads()); + elog(DEBUG2, "-- (DuckDB/PostgresReplacementScanGlobalState) Running %" PRIu64 " threads -- ", + (uint64_t)MaxThreads()); } PostgresSeqScanGlobalState::~PostgresSeqScanGlobalState() { diff --git a/test/pycheck/explain_test.py b/test/pycheck/explain_test.py index a1411610..3569ba11 100644 --- a/test/pycheck/explain_test.py +++ b/test/pycheck/explain_test.py @@ -2,7 +2,7 @@ def test_explain(cur: Cursor): - cur.sql("CREATE TABLE test_table (id int primary key, name text)") + cur.sql("CREATE TABLE test_table (id int, name text)") result = cur.sql("EXPLAIN SELECT count(*) FROM test_table") plan = "\n".join(result) assert "UNGROUPED_AGGREGATE" in plan @@ -13,3 +13,17 @@ def test_explain(cur: Cursor): assert "Query Profiling Information" in plan assert "UNGROUPED_AGGREGATE" in plan assert "Total Time:" in plan + + result = cur.sql("EXPLAIN SELECT count(*) FROM test_table where id = %s", (1,)) + plan = "\n".join(result) + assert "UNGROUPED_AGGREGATE" in plan + assert "id=1 AND id IS NOT NULL" in plan + assert "Total Time:" not in plan + + result = cur.sql( + "EXPLAIN ANALYZE SELECT count(*) FROM test_table where id = %s", (1,) + ) + plan = "\n".join(result) + assert "UNGROUPED_AGGREGATE" in plan + assert "id=1 AND id IS NOT NULL" in plan + assert "Total Time:" in plan diff --git a/test/pycheck/prepared_test.py b/test/pycheck/prepared_test.py new file mode 100644 index 00000000..ecf31582 --- /dev/null +++ b/test/pycheck/prepared_test.py @@ -0,0 +1,95 @@ +from .utils import Cursor + +import datetime +import psycopg.types.json + + +def test_prepared(cur: Cursor): + cur.sql("CREATE TABLE test_table (id int)") + + # Try prepared query without parameters + q1 = "SELECT count(*) FROM test_table" + assert cur.sql(q1, prepare=True) == 0 + assert cur.sql(q1) == 0 + assert cur.sql(q1) == 0 + + cur.sql("INSERT INTO test_table VALUES (1), (2), (3)") + assert cur.sql(q1) == 3 + + # The following tests a prepared query that has parameters. + # There are two ways in which prepared queries that have parameters can be + # executed: + # 1. With a custom plan, where the query is prepared with the exact values + # 2. With a generic plan, where the query is planned without the values and + # the values get only substituted at execution time + # + # The below tests both of these cases, by setting the plan_cache_mode. + q2 = "SELECT count(*) FROM test_table where id = %s" + cur.sql("SET plan_cache_mode = 'force_custom_plan'") + assert cur.sql(q2, (1,), prepare=True) == 1 + assert cur.sql(q2, (1,)) == 1 + assert cur.sql(q2, (1,)) == 1 + assert cur.sql(q2, (3,)) == 1 + assert cur.sql(q2, (4,)) == 0 + + cur.sql("SET plan_cache_mode = 'force_generic_plan'") + assert cur.sql(q2, (1,)) == 1 # creates generic plan + assert cur.sql(q2, (1,)) == 1 + assert cur.sql(q2, (3,)) == 1 + assert cur.sql(q2, (4,)) == 0 + + +def test_extended(cur: Cursor): + cur.sql(""" + CREATE TABLE t( + bool BOOLEAN, + i2 SMALLINT, + i4 INT, + i8 BIGINT, + fl4 REAL, + fl8 DOUBLE PRECISION, + t1 TEXT, + t2 VARCHAR, + t3 BPCHAR, + d DATE, + ts TIMESTAMP, + json_obj JSON); + """) + + row = ( + True, + 2, + 4, + 8, + 4.0, + 8.0, + "t1", + "t2", + "t3", + datetime.date(2024, 5, 4), + datetime.datetime(2020, 1, 1, 1, 2, 3), + psycopg.types.json.Json({"a": 1}), + ) + cur.sql( + "INSERT INTO t VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", row + ) + + assert (True,) * len(row) == cur.sql( + """ + SELECT + bool = %s, + i2 = %s, + i4 = %s, + i8 = %s, + fl4 = %s, + fl8 = %s, + t1 = %s, + t2 = %s, + t3 = %s, + d = %s, + ts = %s, + json_obj::text = %s::text + FROM t; + """, + row, + ) diff --git a/test/pycheck/utils.py b/test/pycheck/utils.py index 89b864a1..33fbb7b0 100644 --- a/test/pycheck/utils.py +++ b/test/pycheck/utils.py @@ -237,8 +237,8 @@ def __init__(self, cursor: psycopg.Cursor): def __getattr__(self, name): return getattr(self.cursor, name) - def sql(self, query, params=None) -> Any: - self.execute(query, params) + def sql(self, query, params=None, **kwargs) -> Any: + self.execute(query, params, **kwargs) try: return simplify_query_results(self.fetchall()) except psycopg.ProgrammingError as e: @@ -256,11 +256,11 @@ def __init__(self, cursor: psycopg.AsyncCursor): def __getattr__(self, name): return getattr(self.cursor, name) - def sql(self, query, params=None): - return asyncio.ensure_future(self.sql_coroutine(query, params)) + def sql(self, query, params=None, **kwargs): + return asyncio.ensure_future(self.sql_coroutine(query, params, **kwargs)) - async def sql_coroutine(self, query, params=None) -> Any: - await self.execute(query, params) + async def sql_coroutine(self, query, params=None, **kwargs) -> Any: + await self.execute(query, params, **kwargs) try: return simplify_query_results(await self.fetchall()) except psycopg.ProgrammingError as e: diff --git a/test/regression/expected/basic.out b/test/regression/expected/basic.out index fc407602..ed181bea 100644 --- a/test/regression/expected/basic.out +++ b/test/regression/expected/basic.out @@ -16,6 +16,28 @@ SELECT a, COUNT(*) FROM t WHERE a > 5 GROUP BY a ORDER BY a; 9 | 100 (4 rows) +select COUNT(*) from t \bind \g + count +------- + 1000 +(1 row) + +select a, COUNT(*) from t WHERE a > $1 GROUP BY a ORDER BY a \bind 5 \g + a | count +---+------- + 6 | 100 + 7 | 100 + 8 | 100 + 9 | 100 +(4 rows) + +\bind 7 \g + a | count +---+------- + 8 | 100 + 9 | 100 +(2 rows) + SET duckdb.max_threads_per_query to 4; SELECT COUNT(*) FROM t; count diff --git a/test/regression/expected/execution_error.out b/test/regression/expected/execution_error.out index d43a87c2..7ee50ddc 100644 --- a/test/regression/expected/execution_error.out +++ b/test/regression/expected/execution_error.out @@ -5,5 +5,5 @@ insert into int_as_varchar SELECT * from ( ) t(a); select a::INTEGER from int_as_varchar; ERROR: (PGDuckDB/ExecuteQuery) Conversion Error: Could not convert string 'abc' to INT32 -LINE 1: SELECT (a)::integer AS a FROM int_as_varchar +LINE 1: SELECT (a)::integer AS a FROM public.int_as_varch... ^ diff --git a/test/regression/expected/query_filter.out b/test/regression/expected/query_filter.out new file mode 100644 index 00000000..b6335685 --- /dev/null +++ b/test/regression/expected/query_filter.out @@ -0,0 +1,95 @@ +CREATE TABLE query_filter_int(a INT); +INSERT INTO query_filter_int SELECT g FROM generate_series(1,100) g; +SELECT COUNT(*) FROM query_filter_int WHERE a <= 50; + count +------- + 50 +(1 row) + +DROP TABLE query_filter_int; +CREATE TABLE query_filter_float(a FLOAT8); +INSERT INTO query_filter_float VALUES (0.9), (1.0), (1.1); +SELECT COUNT(*) FROM query_filter_float WHERE a < 1.0; + count +------- + 1 +(1 row) + +SELECT COUNT(*) FROM query_filter_float WHERE a <= 1.0; + count +------- + 2 +(1 row) + +SELECT COUNT(*) FROM query_filter_float WHERE a < 1.1; + count +------- + 2 +(1 row) + +DROP TABLE query_filter_float; +CREATE TABLE query_filter_varchar(a VARCHAR); +INSERT INTO query_filter_varchar VALUES ('t1'), ('t2'), ('t1'); +SELECT COUNT(*)FROM query_filter_varchar WHERE a = 't1'; + count +------- + 2 +(1 row) + +SELECT COUNT(a) FROM query_filter_varchar WHERE a = 't1'; + count +------- + 2 +(1 row) + +SELECT a, COUNT(*) FROM query_filter_varchar WHERE a = 't1' GROUP BY a; + a | count +----+------- + t1 | 2 +(1 row) + +DROP TABLE query_filter_varchar; +CREATE TABLE query_filter_output_column(a INT, b VARCHAR, c FLOAT8); +INSERT INTO query_filter_output_column VALUES (1, 't1', 1.0), (2, 't1', 2.0), (2, 't2', 1.0); +-- Projection ids list will be used (column `a`is not needed after scan) +SELECT b FROM query_filter_output_column WHERE a = 2; + b +---- + t1 + t2 +(2 rows) + +-- Column ids list used because both of fetched column are used after scan +SELECT a, b FROM query_filter_output_column WHERE b = 't1'; + a | b +---+---- + 1 | t1 + 2 | t1 +(2 rows) + +-- Column ids list used because both of fetched column are used after scan. +-- Swapped order of table columns. +SELECT b, a FROM query_filter_output_column WHERE b = 't1'; + b | a +----+--- + t1 | 1 + t1 | 2 +(2 rows) + +-- Projection ids list will be used (column `b`is not needed after scan) +SELECT a, c FROM query_filter_output_column WHERE b = 't1'; + a | c +---+--- + 1 | 1 + 2 | 2 +(2 rows) + +-- All columns in tuple unordered +SELECT c, a, b FROM query_filter_output_column WHERE a = 2; + c | a | b +---+---+---- + 2 | 2 | t1 + 1 | 2 | t2 +(2 rows) + +DROP TABLE query_filter_output_column; diff --git a/test/regression/expected/views.out b/test/regression/expected/views.out index 8c87c5cf..d57dbf00 100644 --- a/test/regression/expected/views.out +++ b/test/regression/expected/views.out @@ -27,10 +27,6 @@ create table "s.t" as select 42; create view vw1 as select * from s.t; create view vw2 as select * from "s.t"; select * from vw1, vw2; -WARNING: (PGDuckDB/CreatePlan) Prepared query returned an error: 'Binder Error: Referenced table "t" not found! -Candidate tables: "s.t" -LINE 1: ...?column?", vw2."?column?" FROM (SELECT t."?column?" FROM s.t) vw1, (SELECT "s.... - ^ ?column? | ?column? ----------+---------- 21 | 42 diff --git a/test/regression/schedule b/test/regression/schedule index 8489a20f..360ad831 100644 --- a/test/regression/schedule +++ b/test/regression/schedule @@ -12,3 +12,4 @@ test: duckdb_only_functions test: cte test: create_table_as test: standard_conforming_strings +test: query_filter diff --git a/test/regression/sql/basic.sql b/test/regression/sql/basic.sql index d32d3f0a..b9b3f5e9 100644 --- a/test/regression/sql/basic.sql +++ b/test/regression/sql/basic.sql @@ -6,6 +6,9 @@ SET client_min_messages to 'DEBUG1'; SELECT COUNT(*) FROM t; SELECT a, COUNT(*) FROM t WHERE a > 5 GROUP BY a ORDER BY a; +select COUNT(*) from t \bind \g +select a, COUNT(*) from t WHERE a > $1 GROUP BY a ORDER BY a \bind 5 \g +\bind 7 \g SET duckdb.max_threads_per_query to 4; diff --git a/test/regression/sql/query_filter.sql b/test/regression/sql/query_filter.sql new file mode 100644 index 00000000..5c099165 --- /dev/null +++ b/test/regression/sql/query_filter.sql @@ -0,0 +1,33 @@ +CREATE TABLE query_filter_int(a INT); +INSERT INTO query_filter_int SELECT g FROM generate_series(1,100) g; +SELECT COUNT(*) FROM query_filter_int WHERE a <= 50; +DROP TABLE query_filter_int; + +CREATE TABLE query_filter_float(a FLOAT8); +INSERT INTO query_filter_float VALUES (0.9), (1.0), (1.1); +SELECT COUNT(*) FROM query_filter_float WHERE a < 1.0; +SELECT COUNT(*) FROM query_filter_float WHERE a <= 1.0; +SELECT COUNT(*) FROM query_filter_float WHERE a < 1.1; +DROP TABLE query_filter_float; + +CREATE TABLE query_filter_varchar(a VARCHAR); +INSERT INTO query_filter_varchar VALUES ('t1'), ('t2'), ('t1'); +SELECT COUNT(*)FROM query_filter_varchar WHERE a = 't1'; +SELECT COUNT(a) FROM query_filter_varchar WHERE a = 't1'; +SELECT a, COUNT(*) FROM query_filter_varchar WHERE a = 't1' GROUP BY a; +DROP TABLE query_filter_varchar; + +CREATE TABLE query_filter_output_column(a INT, b VARCHAR, c FLOAT8); +INSERT INTO query_filter_output_column VALUES (1, 't1', 1.0), (2, 't1', 2.0), (2, 't2', 1.0); +-- Projection ids list will be used (column `a`is not needed after scan) +SELECT b FROM query_filter_output_column WHERE a = 2; +-- Column ids list used because both of fetched column are used after scan +SELECT a, b FROM query_filter_output_column WHERE b = 't1'; +-- Column ids list used because both of fetched column are used after scan. +-- Swapped order of table columns. +SELECT b, a FROM query_filter_output_column WHERE b = 't1'; +-- Projection ids list will be used (column `b`is not needed after scan) +SELECT a, c FROM query_filter_output_column WHERE b = 't1'; +-- All columns in tuple unordered +SELECT c, a, b FROM query_filter_output_column WHERE a = 2; +DROP TABLE query_filter_output_column;