Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic sharding #224

Merged
merged 63 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
e4f52df
fix: update routing periodically
marcus-pousette Dec 4, 2023
2230c75
feat: replace emitSelf property with publish event
marcus-pousette Dec 4, 2023
ee590f9
fix: refactor remote block retreival
marcus-pousette Dec 4, 2023
97c682c
fix: add 'from' property in GetOptions
marcus-pousette Dec 4, 2023
9e449e3
fix: correctly handle trim cache reset on log entry removal
marcus-pousette Dec 4, 2023
fd82525
fix: add event object to interface
marcus-pousette Dec 4, 2023
6f8f23a
fix: make signing function append signatures instead of replacing
marcus-pousette Dec 4, 2023
8c080bf
fix: replace emitSelf property with PublishEvent
marcus-pousette Dec 4, 2023
3d2f100
fix: add timestamp on RequestContext
marcus-pousette Dec 4, 2023
87931ca
feat: get store size function
marcus-pousette Dec 4, 2023
0cae4ea
fix: drop remove blocks
marcus-pousette Dec 4, 2023
97ad692
feat!: dynamic sharding and block scopes
marcus-pousette Dec 5, 2023
a5170af
fix: increase timeout
marcus-pousette Dec 5, 2023
77dc3c7
fix: rm redundant emit self property
marcus-pousette Dec 5, 2023
bda2378
feat: add peer id option
marcus-pousette Dec 5, 2023
0c460c3
fix!: make sure pubsub data events a propagating between clients with…
marcus-pousette Dec 5, 2023
fba99b6
fix: update tests to work with the RemoteBlocks abstraction
marcus-pousette Dec 5, 2023
a6a48e9
fix: wait for peer check
marcus-pousette Dec 5, 2023
5c57f9b
fix: rm comment
marcus-pousette Dec 5, 2023
62cee57
fix: add extra delay
marcus-pousette Dec 5, 2023
241d009
fix: disable route updates for redundance message checks
marcus-pousette Dec 5, 2023
6b107e4
fix: remove event listeners first on close
marcus-pousette Dec 5, 2023
110517f
fix: dont load log on updateRole when closed
marcus-pousette Dec 5, 2023
e5d19a3
feat: add countAll method for routes
marcus-pousette Dec 5, 2023
e744096
fix: wait for routes
marcus-pousette Dec 5, 2023
cafe432
fix: allow update role while closed
marcus-pousette Dec 5, 2023
087e38b
fix: prevent route loss on commit on target route
marcus-pousette Dec 5, 2023
3e6976b
fix: collect uniqueAcks by message id
marcus-pousette Dec 5, 2023
daa9a13
fix: test add delay
marcus-pousette Dec 5, 2023
e7ac7c4
fix: set replication factor
marcus-pousette Dec 5, 2023
ab1f8ce
fix: correctly handle ack cache cb
marcus-pousette Dec 5, 2023
1c6cd7a
fix: wait for setup
marcus-pousette Dec 13, 2023
9b366c0
feat!: refactor delivery modes
marcus-pousette Dec 13, 2023
380468e
fix: correctly handle role updates on pruning
marcus-pousette Dec 13, 2023
fef11e9
fix: remove strict option
marcus-pousette Dec 13, 2023
7fe0fef
fix: improve update role tests
marcus-pousette Dec 13, 2023
366bcd9
fix: wait for replicator
marcus-pousette Dec 13, 2023
59bf648
fix: remove strict option
marcus-pousette Dec 13, 2023
4a698f2
fix: update dial test
marcus-pousette Dec 13, 2023
d4cf164
fix: increase seek timeout
marcus-pousette Dec 13, 2023
07c80ee
fix: set seekTimeout
marcus-pousette Dec 13, 2023
410be43
fix: test add delay
marcus-pousette Dec 13, 2023
be01253
fix: update test
marcus-pousette Dec 13, 2023
a7dfe2b
fix: reduce timeout
marcus-pousette Dec 13, 2023
0888f53
fix: don't process messages if closed
marcus-pousette Dec 13, 2023
035d47c
fix: clear healtcheck on reconnect
marcus-pousette Dec 13, 2023
250cbb2
fix: test add delay
marcus-pousette Dec 14, 2023
398105e
fix: wait until timeout for relayed ACKs
marcus-pousette Dec 14, 2023
6275062
fix: dont process messages if not started
marcus-pousette Dec 14, 2023
12a203a
fix: add timeout
marcus-pousette Dec 14, 2023
8a2b69e
fix: force messages to be provessed slowly to ensure topology
marcus-pousette Dec 14, 2023
bb61810
fix: replicator factor 1
marcus-pousette Dec 14, 2023
5407016
fix: waitFor timeout
marcus-pousette Dec 14, 2023
498537a
fix: improve assertion
marcus-pousette Dec 15, 2023
018b344
fix: refactor
marcus-pousette Dec 15, 2023
7c22c83
fix: improve assertion
marcus-pousette Dec 15, 2023
3bf4fec
fix: correctly ignore already seen messages
marcus-pousette Dec 15, 2023
6cbceb4
fix: improve assertions
marcus-pousette Dec 15, 2023
115bc33
fix: balance tests
marcus-pousette Dec 15, 2023
ab72362
fix: improve assertion
marcus-pousette Dec 15, 2023
eae57b5
fix: improve assertion
marcus-pousette Dec 15, 2023
b389cd3
fix: increase timeout
marcus-pousette Dec 15, 2023
4f3df49
fix: rm distribute override
marcus-pousette Dec 15, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ jobs:
matrix:
node-version: [18.x]
test_cmd:
- yarn playwright install --with-deps && yarn test:node --roots ./packages/clients --w 2 && yarn test:browser
- yarn test:node --roots ./packages/programs ./docs ./packages/log --w 2
- yarn playwright install --with-deps && yarn test:node --roots ./docs ./packages/log ./packages/clients --w 2 && yarn test:browser
- yarn test:node --roots ./packages/programs --w 2
- yarn test:node --roots ./packages/transport ./packages/utils --w 2

if: github.event_name == 'push'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,51 +258,86 @@ describe("index", () => {
await client1.services.pubsub.waitFor(client2.peerId);
});

it("subscribe/unsubscribe", async () => {
await client2.services.pubsub.subscribe("topic");
let msg1 = false;
let msg1b = false;
let msg2 = false;
let msg2b = false;

await client1.services.pubsub.addEventListener("data", () => {
msg1 = true;
describe("publish", () => {
it("multiple hosts", async () => {
await client2.services.pubsub.subscribe("topic");
let msg1 = false;
let msg1b = false;
let msg2 = false;
let msg2b = false;

await client1.services.pubsub.addEventListener("data", () => {
msg1 = true;
});

await client1b.services.pubsub.addEventListener("data", () => {
msg1b = true;
});

await client2.services.pubsub.addEventListener("data", () => {
msg2 = true;
});

await client2b.services.pubsub.addEventListener("data", () => {
msg2b = true;
});

await client1.services.pubsub.requestSubscribers("topic");
await waitForResolved(async () =>
expect(
(await client1.services.pubsub.getSubscribers("topic"))!.length
).toEqual(1)
);
await client1.services.pubsub.publish(data, { topics: ["topic"] });
await waitForResolved(() => expect(msg2).toBeTrue());
expect(msg2b).toBeFalse();
await client2b.services.pubsub.subscribe("topic");
await client1.services.pubsub.publish(data, { topics: ["topic"] });
await waitForResolved(() => expect(msg2b).toBeTrue());

expect(msg1).toBeFalse();
expect(msg1b).toBeFalse();

await client2.services.pubsub.unsubscribe("topic");

msg2 = false;
await client1.services.pubsub.publish(data, { topics: ["topic"] });
await delay(3000);
expect(msg2).toBeFalse();
});

await client1b.services.pubsub.addEventListener("data", () => {
msg1b = true;
});
it("same host", async () => {
let msg1data = false;
let msg1publish = false;
let msg2data = false;
let msg2publish = false;

await client2.services.pubsub.addEventListener("data", () => {
msg2 = true;
});
await client1.services.pubsub.addEventListener("data", () => {
msg1data = true;
});

await client2b.services.pubsub.addEventListener("data", () => {
msg2b = true;
});
await client1.services.pubsub.addEventListener("publish", () => {
msg1publish = true;
});

await client1.services.pubsub.requestSubscribers("topic");
await waitForResolved(async () =>
expect(
(await client1.services.pubsub.getSubscribers("topic"))!.length
).toEqual(1)
);
await client1.services.pubsub.publish(data, { topics: ["topic"] });
await waitForResolved(() => expect(msg2).toBeTrue());
expect(msg2b).toBeFalse();
await client2b.services.pubsub.subscribe("topic");
await client1.services.pubsub.publish(data, { topics: ["topic"] });
await waitForResolved(() => expect(msg2b).toBeTrue());

expect(msg1).toBeFalse();
expect(msg1b).toBeFalse();

await client2.services.pubsub.unsubscribe("topic");

msg2 = false;
await client1.services.pubsub.publish(data, { topics: ["topic"] });
await delay(3000);
expect(msg2).toBeFalse();
await client1b.services.pubsub.addEventListener("data", () => {
msg2data = true;
});

await client1b.services.pubsub.addEventListener("publish", () => {
msg2publish = true;
});

await client1.services.pubsub.subscribe("topic");
await client1b.services.pubsub.subscribe("topic");
await client1.services.pubsub.publish(data, { topics: ["topic"] });

expect(msg1data).toBeFalse();
expect(msg1publish).toBeTrue();
await waitForResolved(() => expect(msg2data).toBeTrue());

expect(msg2publish).toBeTrue(); // TODO expected?
});
});

it("getSubscribers", async () => {
Expand Down Expand Up @@ -339,10 +374,5 @@ describe("index", () => {
await waitForResolved(() => expect(receivedMessages).toHaveLength(1));
expect(receivedMessages[0]).toBeInstanceOf(GetSubscribers);
});

it("emitSelf", () => {
expect(host1.services.pubsub.emitSelf).toBeFalse();
expect(client1.services.pubsub.emitSelf).toBeFalse();
});
});
});
13 changes: 7 additions & 6 deletions packages/clients/peerbit-proxy/interface/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,8 @@ export class PeerbitProxyClient implements ProgramClient {
> = new Map();
this._services = {
pubsub: {
emitSelf: false, // might be changed to true on connect()
addEventListener: async (type, lister, options) => {
pubsubEventEmitter.addEventListener(type, lister, options);

let subscription = eventListenerSubscribeCounter.get(type);
if (!subscription) {
const emitMessageId = randomBytes(32);
Expand Down Expand Up @@ -344,6 +342,13 @@ export class PeerbitProxyClient implements ProgramClient {
await this.request<memory.MemoryMessage<memory.api.RESP_Open>>(
new memory.MemoryMessage(new memory.api.REQ_Open({ level }))
);
},
size: async () => {
return (
await this.request<memory.MemoryMessage<memory.api.RESP_Size>>(
new memory.MemoryMessage(new memory.api.REQ_Size({ level }))
)
).message.size;
}
};
};
Expand All @@ -367,10 +372,6 @@ export class PeerbitProxyClient implements ProgramClient {
new network.REQ_GetMultiaddrs()
)
).multiaddr;

this.services.pubsub.emitSelf = (
await this.request<pubsub.RESP_EmitSelf>(new pubsub.REQ_EmitSelf())
).value;
}

getMultiaddrs(): Multiaddr[] {
Expand Down
110 changes: 70 additions & 40 deletions packages/clients/peerbit-proxy/interface/src/host.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { PeerId } from "@libp2p/interface/peer-id";
import { Multiaddr } from "@multiformats/multiaddr";
import { Blocks } from "@peerbit/blocks-interface";
import { Keychain, Ed25519Keypair } from "@peerbit/crypto";
import { DataEvent, PubSub } from "@peerbit/pubsub-interface";
import { DataEvent, PubSub, PublishEvent } from "@peerbit/pubsub-interface";
import { ProgramClient } from "@peerbit/program";
import * as blocks from "./blocks.js";
import * as keychain from "./keychain.js";
Expand All @@ -14,7 +14,7 @@ import { Message } from "./message.js";
import * as network from "./network.js";
import * as pubsub from "./pubsub.js";
import * as connection from "./connection.js";

import { CustomEvent } from "@libp2p/interface/events";
import { serialize, deserialize } from "@dao-xyz/borsh";

const levelKey = (level: string[]) => JSON.stringify(level);
Expand Down Expand Up @@ -43,6 +43,20 @@ export class PeerbitProxyHost implements ProgramClient {
this._pubsubTopicSubscriptions = new Map();
this._memoryIterator = new Map();

const dispatchFunction = this.hostClient.services.pubsub.dispatchEvent.bind(
this.hostClient.services.pubsub
);

// Override pubsub dispatchEvent so that data that is published from one client
// appears in other clients as incoming data messages.
// this allows multiple clients to subscribe to share the same host and
// also have same databases open
this.hostClient.services.pubsub.dispatchEvent = (evt: CustomEvent<any>) => {
if (evt.type === "publish") {
dispatchFunction(new CustomEvent("data", { detail: evt.detail }));
}
return dispatchFunction(evt);
};
this.messages.start();
}

Expand Down Expand Up @@ -308,43 +322,54 @@ export class PeerbitProxyHost implements ProgramClient {
}
let subscription = map.get(message.type);
if (!subscription) {
subscription = {
counter: 1,
fn: async (e) => {
// TODO what if many clients whants the same data, dedup serialization invokations?
if (e.detail instanceof DataEvent) {
const subscriptions = this._pubsubTopicSubscriptions.get(
from.id
);
let found = false;
if (subscriptions) {
for (const topic of e.detail.data.topics) {
found = subscriptions.has(topic);
if (found) {
break;
}
}
}
const cb = async (e: CustomEvent<any>) => {
// TODO what if many clients whants the same data, dedup serialization invokations?
if (
e.detail instanceof PublishEvent &&
e.detail.client === from.id &&
message.type === "data"
) {
// ignore 'publish' events routed to 'data' events if the dispatcher is the same as the receiver
return;
}

if (!found) {
// Ignore this message, since the client is not subscribing to any of the topics
return;
if (
e.detail instanceof DataEvent ||
e.detail instanceof PublishEvent
) {
const subscriptions = this._pubsubTopicSubscriptions.get(from.id);
let found = false;
if (subscriptions) {
for (const topic of e.detail.data.topics) {
found = subscriptions.has(topic);
if (found) {
break;
}
}
}

const request = new pubsub.RESP_EmitEvent(
message.type,
serialize(e.detail)
);
request.messageId = message.emitMessageId; // Same message id so that receiver can subscribe to all events emitted from this listener
await this.messages.send(serialize(request), from.id);
if (!found) {
// Ignore this message, since the client is not subscribing to any of the topics
return;
}
}

const request = new pubsub.RESP_EmitEvent(
message.type,
serialize(e.detail)
);
request.messageId = message.emitMessageId; // Same message id so that receiver can subscribe to all events emitted from this listener
await this.messages.send(serialize(request), from.id);
};
subscription = {
counter: 1,
fn: cb
};

await this.services.pubsub.addEventListener(
message.type,
subscription.fn
);

map.set(message.type, subscription);
} else {
subscription.counter += 1;
Expand Down Expand Up @@ -378,11 +403,22 @@ export class PeerbitProxyHost implements ProgramClient {
message.type,
message.data
);
const dispatched =
await this.services.pubsub.dispatchEvent(customEvent);

/*
if (message.type === 'publish') {
await this.services.pubsub.dispatchEvent(
pubsub.createCustomEventFromType(
'data',
message.data
)
)
}
*/
await this.respond(
message,
new pubsub.RESP_DispatchEvent(
await this.services.pubsub.dispatchEvent(customEvent)
),
new pubsub.RESP_DispatchEvent(dispatched),
from
);
} else if (message instanceof pubsub.REQ_GetSubscribers) {
Expand All @@ -398,9 +434,9 @@ export class PeerbitProxyHost implements ProgramClient {
message,
new pubsub.RESP_Publish(
await this.services.pubsub.publish(message.data, {
strict: message.strict,
to: message.to!,
topics: message.topics!
topics: message.topics!,
client: from.id
})
),
from
Expand Down Expand Up @@ -435,12 +471,6 @@ export class PeerbitProxyHost implements ProgramClient {
),
from
);
} else if (message instanceof pubsub.REQ_EmitSelf) {
await this.respond(
message,
new pubsub.RESP_EmitSelf(this.services.pubsub.emitSelf),
from
);
} else {
throw new Error("Unknown message type: " + message.constructor.name);
}
Expand Down
Loading
Loading