From 1a0599134dcb97ec4b0eea2b3500c0e58d874934 Mon Sep 17 00:00:00 2001 From: Dave Gramlich Date: Mon, 17 Oct 2016 17:23:19 -0400 Subject: [PATCH] bigquery: add promise support (#1701) --- README.md | 10 +++ package.json | 2 +- src/dataset.js | 136 ++++++++++++++++++++++++---- src/index.js | 194 +++++++++++++++++++++++++++------------- src/job.js | 99 ++++++++++++++++++-- src/table.js | 145 ++++++++++++++++++++++++------ system-test/bigquery.js | 65 +++++++++++--- test/dataset.js | 106 +++++++++++++++++++--- test/index.js | 31 ++++++- test/job.js | 96 +++++++++++++++++++- test/table.js | 47 +++++++--- 11 files changed, 767 insertions(+), 164 deletions(-) diff --git a/README.md b/README.md index 0291cc97..63c7f2fe 100644 --- a/README.md +++ b/README.md @@ -31,6 +31,16 @@ job.getQueryResults(function(err, rows) {}); // Or get the same results as a readable stream. job.getQueryResults().on('data', function(row) {}); + +// Promises are also supported by omitting callbacks. +job.getQueryResults().then(function(data) { + var rows = data[0]; +}); + +// It's also possible to integrate with third-party Promise libraries. +var bigquery = require('@google-cloud/bigquery')({ + promise: require('bluebird') +}); ``` diff --git a/package.json b/package.json index ed726b34..183cb9c2 100644 --- a/package.json +++ b/package.json @@ -50,7 +50,7 @@ "bigquery" ], "dependencies": { - "@google-cloud/common": "^0.6.0", + "@google-cloud/common": "^0.7.0", "arrify": "^1.0.0", "duplexify": "^3.2.0", "extend": "^3.0.0", diff --git a/src/dataset.js b/src/dataset.js index c94a3931..80775ebb 100644 --- a/src/dataset.js +++ b/src/dataset.js @@ -57,6 +57,14 @@ function Dataset(bigQuery, id) { * // The dataset was created successfully. * } * }); + * + * //- + * // If the callback is omitted, we'll return a Promise. + * //- + * dataset.create().then(function(data) { + * var dataset = data[0]; + * var apiResponse = data[1]; + * }); */ create: true, @@ -70,6 +78,13 @@ function Dataset(bigQuery, id) { * * @example * dataset.exists(function(err, exists) {}); + * + * //- + * // If the callback is omitted, we'll return a Promise. + * //- + * dataset.exists().then(function(data) { + * var exists = data[0]; + * }); */ exists: true, @@ -91,6 +106,14 @@ function Dataset(bigQuery, id) { * // `dataset.metadata` has been populated. * } * }); + * + * //- + * // If the callback is omitted, we'll return a Promise. + * //- + * dataset.get().then(function(data) { + * var dataset = data[0]; + * var apiResponse = data[1]; + * }); */ get: true, @@ -107,6 +130,14 @@ function Dataset(bigQuery, id) { * * @example * dataset.getMetadata(function(err, metadata, apiResponse) {}); + * + * //- + * // If the callback is omitted, we'll return a Promise. + * //- + * dataset.getMetadata().then(function(data) { + * var metadata = data[0]; + * var apiResponse = data[1]; + * }); */ getMetadata: true, @@ -127,6 +158,13 @@ function Dataset(bigQuery, id) { * }; * * dataset.setMetadata(metadata, function(err, apiResponse) {}); + * + * //- + * // If the callback is omitted, we'll return a Promise. + * //- + * dataset.setMetadata(metadata).then(function(data) { + * var apiResponse = data[0]; + * }); */ setMetadata: true }; @@ -144,6 +182,28 @@ function Dataset(bigQuery, id) { util.inherits(Dataset, common.ServiceObject); +/** + * Run a query scoped to your dataset as a readable object stream. + * + * See {module:bigquery#createQueryStream} for full documentation of this + * method. + */ +Dataset.prototype.createQueryStream = function(options) { + if (is.string(options)) { + options = { + query: options + }; + } + + options = extend(true, {}, options, { + defaultDataset: { + datasetId: this.id + } + }); + + return this.bigQuery.createQueryStream(options); +}; + /** * Create a table given a tableId or configuration object. * @@ -172,6 +232,14 @@ util.inherits(Dataset, common.ServiceObject); * }; * * dataset.createTable(tableId, options, function(err, table, apiResponse) {}); + * + * //- + * // If the callback is omitted, we'll return a Promise. + * //- + * dataset.createTable(tableId, options).then(function(data) { + * var table = data[0]; + * var apiResponse = data[1]; + * }); */ Dataset.prototype.createTable = function(id, options, callback) { var self = this; @@ -248,6 +316,13 @@ Dataset.prototype.createTable = function(id, options, callback) { * // Delete the dataset and any tables it contains. * //- * dataset.delete({ force: true }, function(err, apiResponse) {}); + * + * //- + * // If the callback is omitted, we'll return a Promise. + * //- + * dataset.delete().then(function(data) { + * var apiResponse = data[0]; + * }); */ Dataset.prototype.delete = function(options, callback) { if (!callback) { @@ -290,23 +365,11 @@ Dataset.prototype.delete = function(options, callback) { * }); * * //- - * // Get the tables as a readable object stream. `table` is a Table object - * //- - * dataset.getTables() - * .on('error', console.error) - * .on('data', function(table) {}) - * .on('end', function() { - * // All tables have been retrieved - * }); - * - * //- - * // If you anticipate many results, you can end a stream early to prevent - * // unnecessary processing and API requests. + * // If the callback is omitted, we'll return a Promise. * //- - * dataset.getTables() - * .on('data', function(table) { - * this.end(); - * }); + * dataset.getTables().then(function(data) { + * var tables = data[0]; + * }); */ Dataset.prototype.getTables = function(query, callback) { var that = this; @@ -344,6 +407,33 @@ Dataset.prototype.getTables = function(query, callback) { }); }; +/** + * List all or some of the {module:bigquery/table} objects in your project as a + * readable object stream. + * + * @param {object=} query - Configuration object. See + * {module:bigquery/dataset#getTables} for a complete list of options. + * @return {stream} + * + * @example + * dataset.getTablesStream() + * .on('error', console.error) + * .on('data', function(table) {}) + * .on('end', function() { + * // All tables have been retrieved + * }); + * + * //- + * // If you anticipate many results, you can end a stream early to prevent + * // unnecessary processing and API requests. + * //- + * dataset.getTablesStream() + * .on('data', function(table) { + * this.end(); + * }); + */ +Dataset.prototype.getTablesStream = common.paginator.streamify('getTables'); + /** * Run a query scoped to your dataset. * @@ -380,9 +470,17 @@ Dataset.prototype.table = function(id) { /*! Developer Documentation * - * These methods can be used with either a callback or as a readable object - * stream. `streamRouter` is used to add this dual behavior. + * These methods can be auto-paginated. + */ +common.paginator.extend(Dataset, ['getTables']); + +/*! Developer Documentation + * + * All async methods (except for streams) will return a Promise in the event + * that a callback is omitted. */ -common.streamRouter.extend(Dataset, ['getTables']); +common.util.promisifyAll(Dataset, { + exclude: ['table'] +}); module.exports = Dataset; diff --git a/src/index.js b/src/index.js index 38bd11d8..b885af03 100644 --- a/src/index.js +++ b/src/index.js @@ -90,6 +90,14 @@ util.inherits(BigQuery, common.Service); * * @example * bigquery.createDataset('my-dataset', function(err, dataset, apiResponse) {}); + * + * //- + * // If the callback is omitted, we'll return a Promise. + * //- + * bigquery.createDataset('my-dataset').then(function(data) { + * var dataset = data[0]; + * var apiResponse = data[1]; + * }); */ BigQuery.prototype.createDataset = function(id, options, callback) { var that = this; @@ -120,6 +128,36 @@ BigQuery.prototype.createDataset = function(id, options, callback) { }); }; +/** + * Run a query scoped to your project as a readable object stream. + * + * @param {object=} query - Configuration object. See + * {module:bigquery#query} for a complete list of options. + * @return {stream} + * + * @example + * var query = 'SELECT url FROM [publicdata:samples.github_nested] LIMIT 100'; + * + * bigquery.createQueryStream(query) + * .on('error', console.error) + * .on('data', function(row) { + * // row is a result from your query. + * }) + * .on('end', function() { + * // All rows retrieved. + * }); + * + * //- + * // If you anticipate many results, you can end a stream early to prevent + * // unnecessary processing and API requests. + * //- + * bigquery.createQueryStream(query) + * .on('data', function(row) { + * this.end(); + * }); + */ +BigQuery.prototype.createQueryStream = common.paginator.streamify('query'); + /** * Create a reference to a dataset. * @@ -146,7 +184,7 @@ BigQuery.prototype.dataset = function(id) { * @param {number} query.maxResults - Maximum number of results to return. * @param {string} query.pageToken - Token returned from a previous call, to * request the next page of results. - * @param {function=} callback - The callback function. + * @param {function} callback - The callback function. * @param {?error} callback.err - An error returned while making this request * @param {module:bigquery/dataset[]} callback.datasets - The list of datasets * in your project. @@ -175,25 +213,9 @@ BigQuery.prototype.dataset = function(id) { * }, callback); * * //- - * // Get the datasets from your project as a readable object stream. + * // If the callback is omitted, we'll return a Promise. * //- - * bigquery.getDatasets() - * .on('error', console.error) - * .on('data', function(dataset) { - * // dataset is a Dataset object. - * }) - * .on('end', function() { - * // All datasets retrieved. - * }); - * - * //- - * // If you anticipate many results, you can end a stream early to prevent - * // unnecessary processing and API requests. - * //- - * bigquery.getDatasets() - * .on('data', function(dataset) { - * this.end(); - * }); + * bigquery.getDatasets().then(function(datasets) {}); */ BigQuery.prototype.getDatasets = function(query, callback) { var that = this; @@ -232,6 +254,36 @@ BigQuery.prototype.getDatasets = function(query, callback) { }); }; +/** + * List all or some of the {module:bigquery/dataset} objects in your project as + * a readable object stream. + * + * @param {object=} query - Configuration object. See + * {module:bigquery#getDatasets} for a complete list of options. + * @return {stream} + * + * @example + * bigquery.getDatasetsStream() + * .on('error', console.error) + * .on('data', function(dataset) { + * // dataset is a Dataset object. + * }) + * .on('end', function() { + * // All datasets retrieved. + * }); + * + * //- + * // If you anticipate many results, you can end a stream early to prevent + * // unnecessary processing and API requests. + * //- + * bigquery.getDatasetsStream() + * .on('data', function(dataset) { + * this.end(); + * }); + */ +BigQuery.prototype.getDatasetsStream = + common.paginator.streamify('getDatasets'); + /** * Get all of the jobs from your project. * @@ -251,7 +303,7 @@ BigQuery.prototype.getDatasets = function(query, callback) { * "minimal", to not include the job configuration. * @param {string=} options.stateFilter - Filter for job state. Acceptable * values are "done", "pending", and "running". - * @param {function=} callback - The callback function. + * @param {function} callback - The callback function. * @param {?error} callback.err - An error returned while making this request * @param {module:bigquery/job[]} callback.jobs - The list of jobs in your * project. @@ -280,25 +332,11 @@ BigQuery.prototype.getDatasets = function(query, callback) { * }, callback); * * //- - * // Get the jobs from your project as a readable object stream. + * // If the callback is omitted, we'll return a Promise. * //- - * bigquery.getJobs() - * .on('error', console.error) - * .on('data', function(job) { - * // job is a Job object. - * }) - * .on('end', function() { - * // All jobs retrieved. - * }); - * - * //- - * // If you anticipate many results, you can end a stream early to prevent - * // unnecessary processing and API requests. - * //- - * bigquery.getJobs() - * .on('data', function(job) { - * this.end(); - * }); + * bigquery.getJobs().then(function(data) { + * var jobs = data[0]; + * }); */ BigQuery.prototype.getJobs = function(options, callback) { var that = this; @@ -337,6 +375,35 @@ BigQuery.prototype.getJobs = function(options, callback) { }); }; +/** + * List all or some of the {module:bigquery/job} objects in your project as a + * readable object stream. + * + * @param {object=} query - Configuration object. See + * {module:bigquery#getJobs} for a complete list of options. + * @return {stream} + * + * @example + * bigquery.getJobsStream() + * .on('error', console.error) + * .on('data', function(job) { + * // job is a Job object. + * }) + * .on('end', function() { + * // All jobs retrieved. + * }); + * + * //- + * // If you anticipate many results, you can end a stream early to prevent + * // unnecessary processing and API requests. + * //- + * bigquery.getJobsStream() + * .on('data', function(job) { + * this.end(); + * }); + */ +BigQuery.prototype.getJobsStream = common.paginator.streamify('getJobs'); + /** * Create a reference to an existing job. * @@ -353,8 +420,6 @@ BigQuery.prototype.job = function(id) { /** * Run a query scoped to your project. * - * This method also runs as a readable stream if you do not provide a callback. - * * @resource [Jobs: query API Documentation]{@link https://cloud.google.com/bigquery/docs/reference/v2/jobs/query} * * @param {string|object} options - A string SQL query or configuration object. @@ -370,7 +435,7 @@ BigQuery.prototype.job = function(id) { * complete, in milliseconds, before returning. Default is to return * immediately. If the timeout passes before the job completes, the request * will fail with a `TIMEOUT` error. - * @param {function=} callback - The callback function. + * @param {function} callback - The callback function. * @param {?error} callback.err - An error returned while making this request * @param {array} callback.rows - The list of results from your query. * @param {object} callback.apiResponse - The full API response. @@ -400,26 +465,11 @@ BigQuery.prototype.job = function(id) { * }, callback); * * //- - * // You can also use the `query` method as a readable object stream by - * // omitting the callback. - * //- - * bigquery.query(query) - * .on('error', console.error) - * .on('data', function(row) { - * // row is a result from your query. - * }) - * .on('end', function() { - * // All rows retrieved. - * }); - * - * //- - * // If you anticipate many results, you can end a stream early to prevent - * // unnecessary processing and API requests. + * // If the callback is omitted, we'll return a Promise. * //- - * bigquery.query(query) - * .on('data', function(row) { - * this.end(); - * }); + * bigquery.query(query).then(function(data) { + * var rows = data[0]; + * }); */ BigQuery.prototype.query = function(options, callback) { var self = this; @@ -535,6 +585,16 @@ BigQuery.prototype.query = function(options, callback) { * job.getQueryResults(function(err, rows, apiResponse) {}); * } * }); + * + * //- + * // If the callback is omitted, we'll return a Promise. + * //- + * bigquery.startQuery(query).then(function(data) { + * var job = data[0]; + * var apiResponse = data[1]; + * + * return job.getQueryResults(); + * }); */ BigQuery.prototype.startQuery = function(options, callback) { var that = this; @@ -590,10 +650,18 @@ BigQuery.prototype.startQuery = function(options, callback) { /*! Developer Documentation * - * These methods can be used with either a callback or as a readable object - * stream. `streamRouter` is used to add this dual behavior. + * These methods can be auto-paginated. + */ +common.paginator.extend(BigQuery, ['getDatasets', 'getJobs', 'query']); + +/*! Developer Documentation + * + * All async methods (except for streams) will return a Promise in the event + * that a callback is omitted. */ -common.streamRouter.extend(BigQuery, ['getDatasets', 'getJobs', 'query']); +common.util.promisifyAll(BigQuery, { + exclude: ['dataset', 'job'] +}); BigQuery.Dataset = Dataset; BigQuery.Job = Job; diff --git a/src/job.js b/src/job.js index 3d8e5875..6376d8d3 100644 --- a/src/job.js +++ b/src/job.js @@ -87,6 +87,13 @@ function Job(bigQuery, id) { * * @example * job.exists(function(err, exists) {}); + * + * //- + * // If the callback is omitted, we'll return a Promise. + * //- + * job.exists().then(function(data) { + * var exists = data[0]; + * }); */ exists: true, @@ -99,6 +106,14 @@ function Job(bigQuery, id) { * // `job.metadata` has been populated. * } * }); + * + * //- + * // If the callback is omitted, we'll return a Promise. + * //- + * job.get().then(function(data) { + * var job = data[0]; + * var apiResponse = data[1]; + * }); */ get: true, @@ -117,6 +132,14 @@ function Job(bigQuery, id) { * @example * var job = bigquery.job('id'); * job.getMetadata(function(err, metadata, apiResponse) {}); + * + * //- + * // If the callback is omitted, we'll return a Promise. + * //- + * job.getMetadata().then(function(data) { + * var metadata = data[0]; + * var apiResponse = data[1]; + * }); */ getMetadata: true }; @@ -168,6 +191,13 @@ modelo.inherits(Job, common.ServiceObject, events.EventEmitter); * job.on('error', function(err) {}); * job.on('complete', function(metadata) {}); * }); + * + * //- + * // If the callback is omitted, we'll return a Promise. + * //- + * job.cancel().then(function(data) { + * var apiResponse = data[0]; + * }); */ Job.prototype.cancel = function(callback) { callback = callback || common.util.noop; @@ -240,27 +270,71 @@ Job.prototype.cancel = function(callback) { * }, callback); * * //- - * // Consume the results from the query as a readable object stream. + * // If the callback is omitted, we'll return a Promise. * //- + * job.getQueryResults().then(function(data) { + * var rows = data[0]; + * }); + */ +Job.prototype.getQueryResults = function(options, callback) { + if (is.fn(options)) { + callback = options; + options = {}; + } + + options = options || {}; + options.job = this; + this.bigQuery.query(options, callback); +}; + +/** + * Get the results of a job as a readable object stream. + * + * @param {object=} options - Configuration object. See + * {module:bigquery/job#getQueryResults} for a complete list of options. + * @return {stream} + * + * @example * var through2 = require('through2'); * var fs = require('fs'); * - * job.getQueryResults() + * job.getQueryResultsStream() * .pipe(through2.obj(function (row, enc, next) { * this.push(JSON.stringify(row) + '\n'); * next(); * })) * .pipe(fs.createWriteStream('./test/testdata/testfile.json')); */ -Job.prototype.getQueryResults = function(options, callback) { - if (is.fn(options)) { - callback = options; - options = {}; - } - +Job.prototype.getQueryResultsStream = function(options) { options = options || {}; options.job = this; - return this.bigQuery.query(options, callback); + + return this.bigQuery.createQueryStream(options); +}; + +/** + * Convenience method that wraps the `complete` and `error` events in a + * Promise. + * + * @return {promise} + * + * @example + * job.promise().then(function(metadata) { + * // The job is complete. + * }, function(err) { + * // An error occurred during the job. + * }); + */ +Job.prototype.promise = function() { + var self = this; + + return new self.Promise(function(resolve, reject) { + self + .on('error', reject) + .on('complete', function(metadata) { + resolve([metadata]); + }); + }); }; /** @@ -329,4 +403,11 @@ Job.prototype.startPolling_ = function() { }); }; +/*! Developer Documentation + * + * All async methods (except for streams) will return a Promise in the event + * that a callback is omitted. + */ +common.util.promisifyAll(Job); + module.exports = Job; diff --git a/src/table.js b/src/table.js index ecbc1c17..a05a62ae 100644 --- a/src/table.js +++ b/src/table.js @@ -74,6 +74,14 @@ function Table(dataset, id) { * // The table was created successfully. * } * }); + * + * //- + * // If the callback is omitted, we'll return a Promise. + * //- + * table.create().then(function(data) { + * var table = data[0]; + * var apiResponse = data[1]; + * }); */ create: true, @@ -89,6 +97,13 @@ function Table(dataset, id) { * * @example * table.delete(function(err, apiResponse) {}); + * + * //- + * // If the callback is omitted, we'll return a Promise. + * //- + * table.delete().then(function(data) { + * var apiResponse = data[0]; + * }); */ delete: true, @@ -102,6 +117,13 @@ function Table(dataset, id) { * * @example * table.exists(function(err, exists) {}); + * + * //- + * // If the callback is omitted, we'll return a Promise. + * //- + * table.exists().then(function(data) { + * var exists = data[0]; + * }); */ exists: true, @@ -121,6 +143,14 @@ function Table(dataset, id) { * table.get(function(err, table, apiResponse) { * // `table.metadata` has been populated. * }); + * + * //- + * // If the callback is omitted, we'll return a Promise. + * //- + * table.get().then(function(data) { + * var table = data[0]; + * var apiResponse = data[1]; + * }); */ get: true, @@ -137,6 +167,14 @@ function Table(dataset, id) { * * @example * table.getMetadata(function(err, metadata, apiResponse) {}); + * + * //- + * // If the callback is omitted, we'll return a Promise. + * //- + * table.getMetadata().then(function(data) { + * var metadata = data[0]; + * var apiResponse = data[1]; + * }); */ getMetadata: true }; @@ -319,6 +357,14 @@ Table.mergeSchemaWithRows_ = function(schema, rows) { * }; * * table.copy(yourTable, metadata, function(err, job, apiResponse) {}); + * + * //- + * // If the callback is omitted, we'll return a Promise. + * //- + * table.copy(yourTable, metadata).then(function(data) { + * var job = data[0]; + * var apiResponse = data[1]; + * }); */ Table.prototype.copy = function(destination, metadata, callback) { var self = this; @@ -366,6 +412,16 @@ Table.prototype.copy = function(destination, metadata, callback) { }); }; +/** + * Run a query scoped to your dataset as a readable object stream. + * + * See {module:bigquery#createQueryStream} for full documentation of this + * method. + */ +Table.prototype.createQueryStream = function(query) { + return this.dataset.createQueryStream(query); +}; + /** * Create a readable stream of the rows of data in your table. This method is * simply a wrapper around {module:bigquery/table#getRows}. @@ -375,18 +431,23 @@ Table.prototype.copy = function(destination, metadata, callback) { * @return {ReadableStream} * * @example - * var through2 = require('through2'); - * var fs = require('fs'); + * table.createReadStream(options) + * .on('error', console.error) + * .on('data', function(row) {}) + * .on('end', function() { + * // All rows have been retrieved. + * }); * + * //- + * // If you anticipate many results, you can end a stream early to prevent + * // unnecessary processing and API requests. + * //- * table.createReadStream() - * .pipe(through2.obj(function(row, enc, next) { - * this.push(JSON.stringify(row) + '\n'); - * })) - * .pipe(fs.createWriteStream('./test/testdata/testfile.json')); + * .on('data', function(row) { + * this.end(); + * }); */ -Table.prototype.createReadStream = function() { - return this.getRows(); -}; +Table.prototype.createReadStream = common.paginator.streamify('getRows'); /** * Load data into your table from a readable stream of JSON, CSV, or @@ -545,6 +606,14 @@ Table.prototype.createWriteStream = function(metadata) { * gcs.bucket('institutions').file('2014.json'), * gcs.bucket('institutions-copy').file('2014.json') * ], options, function(err, job, apiResponse) {}); + * + * //- + * // If the callback is omitted, we'll return a Promise. + * //- + * table.export(exportedFile, options).then(function(data) { + * var job = data[0]; + * var apiResponse = data[1]; + * }); */ Table.prototype.export = function(destination, options, callback) { var self = this; @@ -655,23 +724,11 @@ Table.prototype.export = function(destination, options, callback) { * }, callback); * * //- - * // Get the rows as a readable object stream. + * // If the callback is omitted, we'll return a Promise. * //- - * table.getRows(options) - * .on('error', console.error) - * .on('data', function(row) {}) - * .on('end', function() { - * // All rows have been retrieved. - * }); - * - * //- - * // If you anticipate many results, you can end a stream early to prevent - * // unnecessary processing and API requests. - * //- - * table.getRows() - * .on('data', function(row) { - * this.end(); - * }); + * table.getRows().then(function(data) { + * var rows = data[0]; +}); */ Table.prototype.getRows = function(options, callback) { var self = this; @@ -790,6 +847,14 @@ Table.prototype.getRows = function(options, callback) { * gcs.bucket('institutions').file('2011.csv'), * gcs.bucket('institutions').file('2012.csv') * ], function(err, job, apiResponse) {}); + * + * //- + * // If the callback is omitted, we'll return a Promise. + * //- + * table.import(data).then(function(data) { + * var job = data[0]; + * var apiResponse = data[1]; + * }); */ Table.prototype.import = function(source, metadata, callback) { var self = this; @@ -964,6 +1029,14 @@ Table.prototype.import = function(source, metadata, callback) { * // See https://developers.google.com/bigquery/troubleshooting-errors for * // recommendations on handling errors. * } + * + * //- + * // If the callback is omitted, we'll return a Promise. + * //- + * table.insert(rows).then(function(data) { + * var insertErrors = data[0]; + * var apiResponse = data[1]; + * }); */ Table.prototype.insert = function(rows, options, callback) { if (is.fn(options)) { @@ -1017,7 +1090,7 @@ Table.prototype.insert = function(rows, options, callback) { * See {module:bigquery#query} for full documentation of this method. */ Table.prototype.query = function(query, callback) { - return this.dataset.query(query, callback); + this.dataset.query(query, callback); }; /** @@ -1048,6 +1121,14 @@ Table.prototype.query = function(query, callback) { * }; * * table.setMetadata(metadata, function(err, metadata, apiResponse) {}); + * + * //- + * // If the callback is omitted, we'll return a Promise. + * //- + * table.setMetadata(metadata).then(function(data) { + * var metadata = data[0]; + * var apiResponse = data[1]; + * }); */ Table.prototype.setMetadata = function(metadata, callback) { var self = this; @@ -1079,9 +1160,15 @@ Table.prototype.setMetadata = function(metadata, callback) { /*! Developer Documentation * - * These methods can be used with either a callback or as a readable object - * stream. `streamRouter` is used to add this dual behavior. + * These methods can be auto-paginated. + */ +common.paginator.extend(Table, ['getRows']); + +/*! Developer Documentation + * + * All async methods (except for streams) will return a Promise in the event + * that a callback is omitted. */ -common.streamRouter.extend(Table, ['getRows']); +common.util.promisifyAll(Table); module.exports = Table; diff --git a/system-test/bigquery.js b/system-test/bigquery.js index 5d5769fd..f2e1a126 100644 --- a/system-test/bigquery.js +++ b/system-test/bigquery.js @@ -146,10 +146,53 @@ describe('BigQuery', function() { }); }); + it('should return a promise', function() { + return bigquery.getDatasets().then(function(data) { + var datasets = data[0]; + + assert(datasets.length > 0); + assert(datasets[0] instanceof Dataset); + }); + }); + + it('should allow limiting API calls via promises', function() { + var maxApiCalls = 1; + var numRequestsMade = 0; + + var bigquery = require('../')(env); + + bigquery.interceptors.push({ + request: function(reqOpts) { + numRequestsMade++; + return reqOpts; + } + }); + + return bigquery.getDatasets({ + maxApiCalls: maxApiCalls + }).then(function() { + assert.strictEqual(numRequestsMade, maxApiCalls); + }); + }); + + it('should allow for manual pagination in promise mode', function() { + return bigquery.getDatasets({ + autoPaginate: false + }).then(function(data) { + var datasets = data[0]; + var nextQuery = data[1]; + var apiResponse = data[2]; + + assert(datasets[0] instanceof Dataset); + assert.strictEqual(nextQuery, null); + assert(apiResponse); + }); + }); + it('should list datasets as a stream', function(done) { var datasetEmitted = false; - bigquery.getDatasets() + bigquery.getDatasetsStream() .on('error', done) .on('data', function(dataset) { datasetEmitted = dataset instanceof Dataset; @@ -180,7 +223,7 @@ describe('BigQuery', function() { var rowsEmitted = []; - job.getQueryResults() + job.getQueryResultsStream() .on('error', done) .on('data', function(row) { rowsEmitted.push(row); @@ -196,7 +239,7 @@ describe('BigQuery', function() { it('should query as a stream', function(done) { var rowsEmitted = 0; - bigquery.query(query) + bigquery.createQueryStream(query) .on('data', function(row) { rowsEmitted++; assert.equal(typeof row.url, 'string'); @@ -239,7 +282,7 @@ describe('BigQuery', function() { it('should list jobs as a stream', function(done) { var jobEmitted = false; - bigquery.getJobs() + bigquery.getJobsStream() .on('error', done) .on('data', function(job) { jobEmitted = job instanceof Job; @@ -294,7 +337,7 @@ describe('BigQuery', function() { it('should get tables as a stream', function(done) { var tableEmitted = false; - dataset.getTables() + dataset.getTablesStream() .on('error', done) .on('data', function(table) { tableEmitted = table instanceof Table; @@ -373,7 +416,7 @@ describe('BigQuery', function() { }); it('should get the rows in a table via stream', function(done) { - table.getRows() + table.createReadStream() .on('error', done) .on('data', function() {}) .on('end', done); @@ -455,13 +498,13 @@ describe('BigQuery', function() { } function query(callback) { + var query = { + query: 'SELECT * FROM ' + table.id + ' WHERE id = ' + data.id, + useLegacySql: false + }; var row; - table - .query({ - query: 'SELECT * FROM ' + table.id + ' WHERE id = ' + data.id, - useLegacySql: false - }) + table.createQueryStream(query) .on('error', callback) .once('data', function(row_) { row = row_; }) .on('end', function() { diff --git a/test/dataset.js b/test/dataset.js index e7b37ac1..878d8790 100644 --- a/test/dataset.js +++ b/test/dataset.js @@ -25,8 +25,20 @@ var proxyquire = require('proxyquire'); var ServiceObject = require('@google-cloud/common').ServiceObject; var util = require('@google-cloud/common').util; +var promisified = false; +var fakeUtil = extend({}, util, { + promisifyAll: function(Class, options) { + if (Class.name !== 'Dataset') { + return; + } + + promisified = true; + assert.deepEqual(options.exclude, ['table']); + } +}); + var extended = false; -var fakeStreamRouter = { +var fakePaginator = { extend: function(Class, methods) { if (Class.name !== 'Dataset') { return; @@ -36,6 +48,9 @@ var fakeStreamRouter = { assert.equal(Class.name, 'Dataset'); assert.deepEqual(methods, ['getTables']); extended = true; + }, + streamify: function(methodName) { + return methodName; } }; @@ -59,8 +74,9 @@ describe('BigQuery/Dataset', function() { before(function() { Dataset = proxyquire('../src/dataset.js', { '@google-cloud/common': { - streamRouter: fakeStreamRouter, - ServiceObject: FakeServiceObject + paginator: fakePaginator, + ServiceObject: FakeServiceObject, + util: fakeUtil } }); Table = require('../src/table.js'); @@ -72,7 +88,15 @@ describe('BigQuery/Dataset', function() { describe('instantiation', function() { it('should extend the correct methods', function() { - assert(extended); // See `fakeStreamRouter.extend` + assert(extended); // See `fakePaginator.extend` + }); + + it('should streamify the correct methods', function() { + assert.strictEqual(ds.getTablesStream, 'getTables'); + }); + + it('should promisify all the things', function() { + assert(promisified); }); it('should inherit from ServiceObject', function(done) { @@ -103,6 +127,70 @@ describe('BigQuery/Dataset', function() { }); }); + describe('createQueryStream', function() { + var options = { + a: 'b', + c: 'd' + }; + + it('should call through to bigQuery', function(done) { + ds.bigQuery.createQueryStream = function() { + done(); + }; + + ds.createQueryStream(); + }); + + it('should return the result of the call to bq.query', function(done) { + ds.bigQuery.createQueryStream = function() { + return { + done: done + }; + }; + + ds.createQueryStream().done(); + }); + + it('should accept a string', function(done) { + var query = 'SELECT * FROM allthedata'; + + ds.bigQuery.createQueryStream = function(opts) { + assert.equal(opts.query, query); + done(); + }; + + ds.createQueryStream(query); + }); + + it('should pass along options', function(done) { + ds.bigQuery.createQueryStream = function(opts) { + assert.equal(opts.a, options.a); + assert.equal(opts.c, options.c); + done(); + }; + + ds.createQueryStream(options); + }); + + it('should extend options with defaultDataset', function(done) { + ds.bigQuery.createQueryStream = function(opts) { + assert.deepEqual(opts.defaultDataset, { datasetId: ds.id }); + done(); + }; + + ds.createQueryStream(options); + }); + + it('should not modify original options object', function(done) { + ds.bigQuery.createQueryStream = function() { + assert.deepEqual(options, { a: 'b', c: 'd' }); + done(); + }; + + ds.createQueryStream(); + }); + }); + describe('createTable', function() { var SCHEMA_OBJECT = { fields: [ @@ -438,16 +526,6 @@ describe('BigQuery/Dataset', function() { ds.query(); }); - it('should return the result of the call to bq.query', function(done) { - ds.bigQuery.query = function() { - return { - done: done - }; - }; - - ds.query().done(); - }); - it('should accept a string', function(done) { var query = 'SELECT * FROM allthedata'; diff --git a/test/index.js b/test/index.js index bd91873e..9cf6b118 100644 --- a/test/index.js +++ b/test/index.js @@ -26,7 +26,17 @@ var Service = require('@google-cloud/common').Service; var Table = require('../src/table.js'); var util = require('@google-cloud/common').util; -var fakeUtil = extend({}, util); +var promisified = false; +var fakeUtil = extend({}, util, { + promisifyAll: function(Class, options) { + if (Class.name !== 'BigQuery') { + return; + } + + promisified = true; + assert.deepEqual(options.exclude, ['dataset', 'job']); + } +}); function FakeTable(a, b) { Table.call(this, a, b); @@ -40,7 +50,7 @@ FakeTable.mergeSchemaWithRows_ = function() { }; var extended = false; -var fakeStreamRouter = { +var fakePaginator = { extend: function(Class, methods) { if (Class.name !== 'BigQuery') { return; @@ -50,6 +60,9 @@ var fakeStreamRouter = { assert.equal(Class.name, 'BigQuery'); assert.deepEqual(methods, ['getDatasets', 'getJobs', 'query']); extended = true; + }, + streamify: function(methodName) { + return methodName; } }; @@ -72,7 +85,7 @@ describe('BigQuery', function() { './table.js': FakeTable, '@google-cloud/common': { Service: FakeService, - streamRouter: fakeStreamRouter, + paginator: fakePaginator, util: fakeUtil } }); @@ -84,7 +97,17 @@ describe('BigQuery', function() { describe('instantiation', function() { it('should extend the correct methods', function() { - assert(extended); // See `fakeStreamRouter.extend` + assert(extended); // See `fakePaginator.extend` + }); + + it('should streamify the correct methods', function() { + assert.strictEqual(bq.getDatasetsStream, 'getDatasets'); + assert.strictEqual(bq.getJobsStream, 'getJobs'); + assert.strictEqual(bq.createQueryStream, 'query'); + }); + + it('should promisify all the things', function() { + assert(promisified); }); it('should normalize the arguments', function() { diff --git a/test/job.js b/test/job.js index 24b83b4c..f2da94da 100644 --- a/test/job.js +++ b/test/job.js @@ -31,7 +31,15 @@ function FakeServiceObject() { nodeutil.inherits(FakeServiceObject, ServiceObject); -var utilOverrides = {}; +var promisified = false; +var utilOverrides = { + promisifyAll: function(Class) { + if (Class.name === 'Job') { + promisified = true; + } + } +}; + var fakeUtil = Object.keys(util).reduce(function(fakeUtil, methodName) { fakeUtil[methodName] = function() { var method = utilOverrides[methodName] || util[methodName]; @@ -42,7 +50,8 @@ var fakeUtil = Object.keys(util).reduce(function(fakeUtil, methodName) { describe('BigQuery/Job', function() { var BIGQUERY = { - projectId: 'my-project' + projectId: 'my-project', + Promise: Promise }; var JOB_ID = 'job_XYrk_3z'; var Job; @@ -63,6 +72,10 @@ describe('BigQuery/Job', function() { }); describe('initialization', function() { + it('should promisify all the things', function() { + assert(promisified); + }); + it('should assign this.bigQuery', function() { assert.deepEqual(job.bigQuery, BIGQUERY); }); @@ -214,15 +227,90 @@ describe('BigQuery/Job', function() { job.getQueryResults(); }); + }); + + describe('getQueryResultsStream', function() { + var options = { + a: 'b', + c: 'd' + }; + var callback = util.noop; + + it('should accept an options object', function(done) { + job.bigQuery.createQueryStream = function(opts) { + assert.deepEqual(opts, options); + done(); + }; + + job.getQueryResultsStream(options, callback); + }); + + it('should accept no arguments', function(done) { + job.bigQuery.createQueryStream = function(opts, cb) { + assert.deepEqual(opts, { job: job }); + assert.equal(cb, undefined); + done(); + }; + + job.getQueryResultsStream(); + }); + + it('should assign job to the options object', function(done) { + job.bigQuery.createQueryStream = function(opts) { + assert.deepEqual(opts.job, job); + done(); + }; + + job.getQueryResultsStream(); + }); it('should return the result of the call to bq.query', function(done) { - job.bigQuery.query = function() { + job.bigQuery.createQueryStream = function() { return { done: done }; }; - job.getQueryResults().done(); + job.getQueryResultsStream().done(); + }); + }); + + describe('promise', function() { + beforeEach(function() { + job.startPolling_ = util.noop; + }); + + it('should return an instance of the localized Promise', function() { + var FakePromise = job.Promise = function() {}; + var promise = job.promise(); + + assert(promise instanceof FakePromise); + }); + + it('should reject the promise if an error occurs', function() { + var error = new Error('err'); + + setImmediate(function() { + job.emit('error', error); + }); + + return job.promise().then(function() { + throw new Error('Promise should have been rejected.'); + }, function(err) { + assert.strictEqual(err, error); + }); + }); + + it('should resolve the promise on complete', function() { + var metadata = {}; + + setImmediate(function() { + job.emit('complete', metadata); + }); + + return job.promise().then(function(data) { + assert.deepEqual(data, [metadata]); + }); }); }); diff --git a/test/table.js b/test/table.js index cf820048..e192063e 100644 --- a/test/table.js +++ b/test/table.js @@ -27,6 +27,7 @@ var stream = require('stream'); var ServiceObject = require('@google-cloud/common').ServiceObject; var util = require('@google-cloud/common').util; +var promisified = false; var makeWritableStreamOverride; var isCustomTypeOverride; var fakeUtil = extend({}, util, { @@ -36,11 +37,16 @@ var fakeUtil = extend({}, util, { makeWritableStream: function() { var args = arguments; (makeWritableStreamOverride || util.makeWritableStream).apply(null, args); + }, + promisifyAll: function(Class) { + if (Class.name === 'Table') { + promisified = true; + } } }); var extended = false; -var fakeStreamRouter = { +var fakePaginator = { extend: function(Class, methods) { if (Class.name !== 'Table') { return; @@ -50,6 +56,9 @@ var fakeStreamRouter = { assert.equal(Class.name, 'Table'); assert.deepEqual(methods, ['getRows']); extended = true; + }, + streamify: function(methodName) { + return methodName; } }; @@ -99,7 +108,7 @@ describe('BigQuery/Table', function() { Table = proxyquire('../src/table.js', { '@google-cloud/common': { ServiceObject: FakeServiceObject, - streamRouter: fakeStreamRouter, + paginator: fakePaginator, util: fakeUtil } }); @@ -130,7 +139,15 @@ describe('BigQuery/Table', function() { describe('instantiation', function() { it('should extend the correct methods', function() { - assert(extended); // See `fakeStreamRouter.extend` + assert(extended); // See `fakePaginator.extend` + }); + + it('should streamify the correct methods', function() { + assert.strictEqual(table.createReadStream, 'getRows'); + }); + + it('should promisify all the things', function() { + assert(promisified); }); it('should inherit from ServiceObject', function(done) { @@ -440,16 +457,26 @@ describe('BigQuery/Table', function() { }); }); - describe('createReadStream', function() { - it('should return table.getRows()', function() { - var uniqueReturnValue = 'abc123'; + describe('createQueryStream', function() { + it('should call datasetInstance.createQueryStream()', function(done) { + table.dataset.createQueryStream = function(a) { + assert.equal(a, 'a'); + done(); + }; - table.getRows = function() { - assert.equal(arguments.length, 0); - return uniqueReturnValue; + table.createQueryStream('a'); + }); + + it('should return whatever dataset.createQueryStream returns', function() { + var fakeValue = 123; + + table.dataset.createQueryStream = function() { + return fakeValue; }; - assert.equal(table.createReadStream(), uniqueReturnValue); + var val = table.createQueryStream(); + + assert.strictEqual(val, fakeValue); }); });