-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
libp2p + HTTP an exploratory refactor #102
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,16 +15,23 @@ import ( | |
"github.com/ipld/go-ipld-prime" | ||
"github.com/ipld/go-ipld-prime/codec/dagjson" | ||
"github.com/ipld/go-ipld-prime/datamodel" | ||
"github.com/ipld/go-ipld-prime/fluent" | ||
"github.com/ipld/go-ipld-prime/linking" | ||
cidlink "github.com/ipld/go-ipld-prime/linking/cid" | ||
"github.com/ipld/go-ipld-prime/node/basicnode" | ||
"github.com/ipld/go-ipld-prime/storage/memstore" | ||
"github.com/ipld/go-ipld-prime/traversal" | ||
selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse" | ||
"github.com/ipni/go-libipni/announce" | ||
"github.com/ipni/go-libipni/announce/message" | ||
"github.com/ipni/go-libipni/dagsync/httpsync" | ||
"github.com/libp2p/go-libp2p" | ||
"github.com/libp2p/go-libp2p/core/crypto" | ||
"github.com/libp2p/go-libp2p/core/peer" | ||
"github.com/libp2p/go-libp2p/core/protocol" | ||
libp2phttp "github.com/libp2p/go-libp2p/p2p/http" | ||
"github.com/multiformats/go-multiaddr" | ||
"github.com/multiformats/go-multicodec" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
|
@@ -96,6 +103,129 @@ func TestNewPublisherForListener(t *testing.T) { | |
} | ||
} | ||
|
||
func TestPublisherWithLibp2pHTTP(t *testing.T) { | ||
ctx := context.Background() | ||
req := require.New(t) | ||
|
||
publisherStore := &correctedMemStore{&memstore.Store{ | ||
Bag: make(map[string][]byte), | ||
}} | ||
publisherLsys := cidlink.DefaultLinkSystem() | ||
publisherLsys.TrustedStorage = true | ||
publisherLsys.SetReadStorage(publisherStore) | ||
publisherLsys.SetWriteStorage(publisherStore) | ||
|
||
privKey, _, err := crypto.GenerateKeyPairWithReader(crypto.Ed25519, 256, rand.Reader) | ||
req.NoError(err) | ||
|
||
publisher, err := httpsync.NewPublisherHandler(publisherLsys, privKey) | ||
req.NoError(err) | ||
|
||
publisherStreamHost, err := libp2p.New(libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0")) | ||
req.NoError(err) | ||
|
||
publisherHost, err := libp2phttp.New( | ||
libp2phttp.StreamHost(publisherStreamHost), | ||
libp2phttp.ListenAddrs([]multiaddr.Multiaddr{multiaddr.StringCast("/ip4/127.0.0.1/tcp/0/http")}), | ||
) | ||
req.NoError(err) | ||
|
||
go publisherHost.Serve() | ||
defer publisherHost.Close() | ||
|
||
protoID := protocol.ID("/ipni-sync/1") | ||
|
||
serverStreamMa := publisherHost.Addrs()[0] | ||
serverHTTPMa := publisherHost.Addrs()[1] | ||
req.Contains(serverHTTPMa.String(), "/http") | ||
|
||
publisherHost.SetHttpHandlerAtPath(protoID, "/ipni/", http.StripPrefix("/ipni/", publisher)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here is where we attach our request handler. Note that we are mounting the "/ipni-sync/1" protocol at In this case we also want out HTTP handler to not even know about the prefix, so we use the stdlib |
||
|
||
link, err := publisherLsys.Store( | ||
ipld.LinkContext{Ctx: ctx}, | ||
cidlink.LinkPrototype{ | ||
Prefix: cid.Prefix{ | ||
Version: 1, | ||
Codec: uint64(multicodec.DagJson), | ||
MhType: uint64(multicodec.Sha2_256), | ||
MhLength: -1, | ||
}, | ||
}, | ||
fluent.MustBuildMap(basicnode.Prototype.Map, 4, func(na fluent.MapAssembler) { | ||
na.AssembleEntry("fish").AssignString("lobster") | ||
na.AssembleEntry("fish1").AssignString("lobster1") | ||
na.AssembleEntry("fish2").AssignString("lobster2") | ||
na.AssembleEntry("fish0").AssignString("lobster0") | ||
})) | ||
req.NoError(err) | ||
publisher.SetRoot(link.(cidlink.Link).Cid) | ||
|
||
testCases := []struct { | ||
name string | ||
newClient func(t *testing.T) *http.Client | ||
}{ | ||
{"HTTP transport", func(t *testing.T) *http.Client { | ||
clientHost, err := libp2phttp.New() | ||
req.NoError(err) | ||
|
||
c, err := clientHost.NamespacedClient(nil, protoID, peer.AddrInfo{Addrs: []multiaddr.Multiaddr{serverHTTPMa}}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This creates an http.Client that automatically knows about the prefix for a given protocol ID. |
||
req.NoError(err) | ||
return &c | ||
}}, | ||
{"libp2p stream transport", func(t *testing.T) *http.Client { | ||
clientStreamHost, err := libp2p.New(libp2p.NoListenAddrs) | ||
req.NoError(err) | ||
clientHost, err := libp2phttp.New() | ||
req.NoError(err) | ||
|
||
c, err := clientHost.NamespacedClient(clientStreamHost, protoID, peer.AddrInfo{ID: publisherStreamHost.ID(), Addrs: []multiaddr.Multiaddr{serverStreamMa}}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the same thing as above but it is going over a libp2p stream. Note: I think I'll change the |
||
req.NoError(err) | ||
|
||
wk, err := clientHost.GetAndStorePeerProtoMap(c.Transport, publisherStreamHost.ID()) | ||
req.NoError(err) | ||
// Assert we see the ipni protocol in the well known map | ||
req.Contains(wk, protoID) | ||
|
||
return &c | ||
}, | ||
}, | ||
} | ||
|
||
for _, tc := range testCases { | ||
t.Run(tc.name, func(t *testing.T) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nothing special here. Just plumbing to set up the test. |
||
clientStore := &correctedMemStore{&memstore.Store{ | ||
Bag: make(map[string][]byte), | ||
}} | ||
clientLsys := cidlink.DefaultLinkSystem() | ||
clientLsys.TrustedStorage = true | ||
clientLsys.SetReadStorage(clientStore) | ||
clientLsys.SetWriteStorage(clientStore) | ||
clientSync := httpsync.NewSync(clientLsys, tc.newClient(t), nil) | ||
|
||
clientSyncer, err := clientSync.NewSyncerWithoutAddrs(publisher.ID()) | ||
req.NoError(err) | ||
|
||
headCid, err := clientSyncer.GetHead(ctx) | ||
req.NoError(err) | ||
|
||
req.Equal(link.(cidlink.Link).Cid, headCid) | ||
|
||
clientSyncer.Sync(ctx, headCid, selectorparse.CommonSelector_MatchPoint) | ||
require.NoError(t, err) | ||
|
||
// Assert that data is loadable from the link system. | ||
wantLink := cidlink.Link{Cid: headCid} | ||
node, err := clientLsys.Load(ipld.LinkContext{Ctx: ctx}, wantLink, basicnode.Prototype.Any) | ||
require.NoError(t, err) | ||
|
||
// Assert synced node link matches the computed link, i.e. is spec-compliant. | ||
gotLink, err := clientLsys.ComputeLink(wantLink.Prototype(), node) | ||
require.NoError(t, err) | ||
require.Equal(t, gotLink, wantLink, "computed %s but got %s", gotLink.String(), wantLink.String()) | ||
}) | ||
} | ||
} | ||
|
||
func mapKeys(t *testing.T, n ipld.Node) []string { | ||
var keys []string | ||
require.Equal(t, n.Kind(), datamodel.Kind_Map) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -69,6 +69,15 @@ func (s *Sync) NewSyncer(peerID peer.ID, peerAddrs []multiaddr.Multiaddr) (*Sync | |
}, nil | ||
} | ||
|
||
func (s *Sync) NewSyncerWithoutAddrs(peerID peer.ID) (*Syncer, error) { | ||
return &Syncer{ | ||
peerID: peerID, | ||
rootURL: url.URL{Path: "/"}, | ||
urls: nil, | ||
sync: s, | ||
}, nil | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A I think instead of Will it be necessary to close each There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
👍 . I thought about this change, but wanted to make a less invasive change to demo. I like it though.
Happens automatically :) (just like the stock http.Client does it) |
||
|
||
func (s *Sync) Close() { | ||
s.client.CloseIdleConnections() | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the "HTTP Host". It's like the libp2p "stream host" (aka core host.Host), but it uses HTTP semantics instead of stream semantics.
You can pass in options on creation like a stream host to do HTTP over libp2p streams, and multiaddrs to create listeners on.