Skip to content

Commit

Permalink
feat(external connector): #38
Browse files Browse the repository at this point in the history
* feat(http-connector): update workspace

* feat(http-connector): proof of concept extract tables

* feat(http-connector): proof of concept extract relations

* feat(http-connector): url param

* feat(http-connector): better http

* feat(http-connector): add pull

* feat(http-connector): add venom test

* feat(http-connector): add http pinger

* feat(http-connector): fix venom test

* feat(http-connector): add schema param

* feat(pull): filter body

* feat(pull): filter body

* feat(http): add content-type

* feat(http): wip code review push

* feat(http): wip fix close resource and lint

* feat(http): fix data push content-type

* feat(http): push add mode and disableContraints

* feat(http): wip remove debug data

* fix(httpconnector): log pull body

* fix(httpconnector): wait for request completion

* fix(httpconnector): venom test impact

* fix(httpconnector): venom test impact

* docs(httpconnector): changelog and readme

* fix(httpconnector): add passing of configured pks

* feat(httpconnector): possibility to select columns

* feat(httpconnector): possibility to select columns
  • Loading branch information
adrienaury authored Sep 20, 2021
1 parent 4a30808 commit 61e4af1
Show file tree
Hide file tree
Showing 25 changed files with 851 additions and 94 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ Types of changes
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [1.7.0]

- `Added` new datasource type with string connection `http://...` LINO can pull/push data to an HTTP Endpoint API

## [1.6.0]

- `Added` option to change ingress-descriptor filename
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Currently supported vendors are :
* oracle
* oracle-raw (for full TNS support `oracle-raw://user:pwd@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=dbhost.example.com)(PORT=1521))(CONNECT_DATA=(SERVICE_NAME=orclpdb1)))`)
* db2 (alpha feature) : the DB2 driver is currently in development, contact us for a compilation of a LINO binary with DB2 support with your target os/arch
* http : use an HTTP endpoint to push and pull data (for databases with no native driver supported by golang)

### dataconnector.yml

Expand Down
1 change: 1 addition & 0 deletions cmd/lino/dep_dataconnector.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,6 @@ func dataPingerFactory() map[string]domain.DataPingerFactory {
"godror": infra.NewSQLDataPingerFactory(),
"godror-raw": infra.NewSQLDataPingerFactory(),
"db2": infra.NewSQLDataPingerFactory(),
"http": infra.NewHTTPDataPingerFactory(),
}
}
1 change: 1 addition & 0 deletions cmd/lino/dep_pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func pullDataSourceFactory() map[string]domain.DataSourceFactory {
"godror": infra.NewOracleDataSourceFactory(),
"godror-raw": infra.NewOracleDataSourceFactory(),
"db2": infra.NewDb2DataSourceFactory(),
"http": infra.NewHTTPDataSourceFactory(),
}
}

Expand Down
1 change: 1 addition & 0 deletions cmd/lino/dep_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func pushDataDestinationFactory() map[string]domain.DataDestinationFactory {
"godror": infra.NewOracleDataDestinationFactory(),
"godror-raw": infra.NewOracleDataDestinationFactory(),
"db2": infra.NewDb2DataDestinationFactory(),
"http": infra.NewHTTPDataDestinationFactory(),
}
}

Expand Down
1 change: 1 addition & 0 deletions cmd/lino/dep_relation.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,6 @@ func relationExtractorFactory() map[string]domain.ExtractorFactory {
"godror": infra.NewOracleExtractorFactory(),
"godror-raw": infra.NewOracleExtractorFactory(),
"db2": infra.NewDb2ExtractorFactory(),
"http": infra.NewHTTPExtractorFactory(),
}
}
1 change: 1 addition & 0 deletions cmd/lino/dep_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,6 @@ func tableExtractorFactory() map[string]domain.ExtractorFactory {
"godror": infra.NewOracleExtractorFactory(),
"godror-raw": infra.NewOracleExtractorFactory(),
"db2": infra.NewDb2ExtractorFactory(),
"http": infra.NewHTTPExtractorFactory(),
}
}
22 changes: 15 additions & 7 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ services:
context: .devcontainer
args:
http_proxy: ${http_proxy}
https_proxy: ${https_proxy}
https_proxy: ${https_proxy}
no_proxy: ${no_proxy}
volumes:
- ./:/workspace
Expand All @@ -47,7 +47,7 @@ services:
environment:
POSTGRES_PASSWORD: sakila
expose:
- 5432
- 5432

dest:
image: frantiseks/postgres-sakila
Expand All @@ -56,7 +56,7 @@ services:
POSTGRES_PASSWORD: sakila
expose:
- 5432

db2:
image: ibmcom/db2
restart: always
Expand All @@ -78,7 +78,7 @@ services:
no_proxy: ${no_proxy}
environment:
- PASSWORD=sakila
- CGO_ENABLED=1
- CGO_ENABLED=1
command: http
expose:
- 8000
Expand All @@ -89,10 +89,18 @@ services:
image: dpage/pgadmin4
restart: always
environment:
PGADMIN_DEFAULT_EMAIL: user@domain.com
PGADMIN_DEFAULT_PASSWORD: SuperSecret
PGADMIN_DEFAULT_EMAIL: user@domain.com
PGADMIN_DEFAULT_PASSWORD: SuperSecret
ports:
- 8080:80
- 8080:80

httpmock:
image: mockserver/mockserver
environment:
#MOCKSERVER_PROPERTY_FILE: /config/mockserver.properties
MOCKSERVER_INITIALIZATION_JSON_PATH: /config/default.json
volumes:
- testdata:/config

volumes:
testdata:
9 changes: 7 additions & 2 deletions internal/app/pull/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,12 +262,17 @@ func (c epToStepListConverter) getTable(name string) pull.Table {
table, ok := c.tmap[name]
if !ok {
log.Warn().Msg(fmt.Sprintf("missing table %v in tables.yaml", name))
return pull.NewTable(name, []string{})
return pull.NewTable(name, []string{}, nil)
}

log.Trace().Msg(fmt.Sprintf("building table %v", table))

return pull.NewTable(table.Name, table.Keys)
columns := []pull.Column{}
for _, col := range table.Columns {
columns = append(columns, pull.NewColumn(col.Name))
}

return pull.NewTable(table.Name, table.Keys, pull.NewColumnList(columns))
}

func (c epToStepListConverter) getRelation(name string) (pull.Relation, error) {
Expand Down
10 changes: 10 additions & 0 deletions internal/app/urlbuilder/urlbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,16 @@ func init() {
Override: "go_ibm_db",
}
dburl.Register(db2Scheme)

httpScheme := dburl.Scheme{
Driver: "http",
Generator: dburl.GenFromURL("http://localhost:8080"),
Proto: dburl.ProtoAny,
Opaque: false,
Aliases: []string{"https"},
Override: "",
}
dburl.Register(httpScheme)
}

func BuildURL(dc *dataconnector.DataConnector, out io.Writer) *dburl.URL {
Expand Down
57 changes: 57 additions & 0 deletions internal/infra/dataconnector/pinger_http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright (C) 2021 CGI France
//
// This file is part of LINO.
//
// LINO is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// LINO is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with LINO. If not, see <http://www.gnu.org/licenses/>.

package dataconnector

import (
"context"
"net/http"

"github.com/cgi-fr/lino/pkg/dataconnector"
)

type HTTPDataPingerFactory struct{}

// NewHTTPDataPinger creates a new HTTP pinger.
func NewHTTPDataPingerFactory() *HTTPDataPingerFactory {
return &HTTPDataPingerFactory{}
}

func (pdpf HTTPDataPingerFactory) New(url string) dataconnector.DataPinger {
return NewHTTPDataPinger(url)
}

func NewHTTPDataPinger(url string) HTTPDataPinger {
return HTTPDataPinger{url}
}

type HTTPDataPinger struct {
url string
}

func (pdp HTTPDataPinger) Ping() *dataconnector.Error {
req, err := http.NewRequestWithContext(context.Background(), http.MethodHead, pdp.url, nil)
if err != nil {
return &dataconnector.Error{Description: err.Error()}
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return &dataconnector.Error{Description: err.Error()}
}
resp.Body.Close()
return nil
}
116 changes: 116 additions & 0 deletions internal/infra/pull/http_datasource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright (C) 2021 CGI France
//
// This file is part of LINO.
//
// LINO is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// LINO is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with LINO. If not, see <http://www.gnu.org/licenses/>.

package pull

import (
"context"
"encoding/json"
"io"
"net/http"
"strings"

"github.com/cgi-fr/lino/pkg/pull"
"github.com/rs/zerolog/log"
)

// HTTPDataSourceFactory exposes methods to create new HTTP pullers.
type HTTPDataSourceFactory struct{}

// NewHTTPDataSourceFactory creates a new HTTP datasource factory.
func NewHTTPDataSourceFactory() *HTTPDataSourceFactory {
return &HTTPDataSourceFactory{}
}

// New return a HTTP puller
func (e *HTTPDataSourceFactory) New(url string, schema string) pull.DataSource {
return &HTTPDataSource{
url: url,
schema: schema,
}
}

// HTTPDataSource to read in the pull process.
type HTTPDataSource struct {
url string
schema string
result io.ReadCloser
}

// Open a connection to the HTTP DB
func (ds *HTTPDataSource) Open() *pull.Error {
return nil
}

// RowReader iterate over rows in table with filter
func (ds *HTTPDataSource) RowReader(source pull.Table, filter pull.Filter) (pull.RowReader, *pull.Error) {
b, err := json.Marshal(struct {
Values pull.Row `json:"values"`
Limit uint `json:"limit"`
Where string `json:"where"`
}{
Values: filter.Values(),
Limit: filter.Limit(),
Where: filter.Where(),
})
if err != nil {
return nil, &pull.Error{Description: err.Error()}
}
reqbody := strings.NewReader(string(b))

url := ds.url + "/data/" + source.Name()
if len(ds.schema) > 0 {
url = url + "?schema=" + ds.schema
}

log.Debug().RawJSON("body", b).Str("url", url).Msg("External connector request")

req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, url, reqbody)
if err != nil {
return nil, &pull.Error{Description: err.Error()}
}
req.Header.Add("Content-Type", "application/json")

if pcols := source.Columns(); pcols != nil && pcols.Len() > 0 {
pcolsList := []string{}
for idx := uint(0); idx < pcols.Len(); idx++ {
pcolsList = append(pcolsList, pcols.Column(idx).Name())
}
b, err = json.Marshal(pcolsList)
if err != nil {
return nil, &pull.Error{Description: err.Error()}
}
pcolsJSON := string(b)
req.Header.Add("Select-Columns", pcolsJSON)
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, &pull.Error{Description: err.Error()}
}
ds.result = resp.Body

return NewJSONRowReader(resp.Body), nil
}

// Close a connection to the HTTP DB
func (ds *HTTPDataSource) Close() *pull.Error {
if ds.result != nil {
ds.result.Close()
}
return nil
}
15 changes: 14 additions & 1 deletion internal/infra/pull/sql_datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,20 @@ func (ds *SQLDataSource) tableName(source pull.Table) string {
// RowReader iterate over rows in table with filter
func (ds *SQLDataSource) RowReader(source pull.Table, filter pull.Filter) (pull.RowReader, *pull.Error) {
sql := &strings.Builder{}
sql.Write([]byte("SELECT * FROM "))
sql.Write([]byte("SELECT "))

if pcols := source.Columns(); pcols != nil && pcols.Len() > 0 {
for idx := uint(0); idx < pcols.Len(); idx++ {
if idx > 0 {
sql.Write([]byte(","))
}
sql.Write([]byte(pcols.Column(idx).Name()))
}
} else {
sql.Write([]byte("*"))
}

sql.Write([]byte(" FROM "))
sql.Write([]byte(ds.tableName(source)))
sql.Write([]byte(" WHERE "))

Expand Down
Loading

0 comments on commit 61e4af1

Please sign in to comment.