Sandbox - Docs - Twitter - YouTube
Memphis is a next-generation alternative to traditional message brokers.
A simple, robust, and durable cloud-native message broker wrapped with
an entire ecosystem that enables cost-effective, fast, and reliable development of modern queue-based use cases.
Memphis enables the building of modern queue-based applications that require
large volumes of streamed and enriched data, modern protocols, zero ops, rapid development,
extreme cost reduction, and a significantly lower amount of dev time for data-oriented developers and data engineers.
$ gradle install memphis-dev
import com.memphis.Memphis;
First, we need to create a connection to the broker using MemphisConnection
.
ClientOptions opts = new ClientOptions.Builder()
.host("<memphis-host>")
.username("<application-username>")
.token("<broker-token>")
.port("<port>") // defaults to 6666
.reconnect(true) // defaults to true
.maxReconnect(3) // defaults to 3
.reconnectInterval(1500) // defaults to 1500 ms
.timeOut(1500) // defaults to 1500 ms
.keyFile("<key-client.pem>") // key_file, for TLS connection
.certFile("<cert-client.pem>") // cert_file, for TLS connection
.caFile("<rootCA.pem>") // ca_file, for TLS connection
.build();
MemphisConnection memphisConnection = MemphisConnection(opts);
Once connected, the entire functionalities offered by Memphis are available.
To disconnect from Memphis, call close()
on the memphis object.
memphisConnection.close();
If a station already exists nothing happens, the new configuration will not be applied
Station station = memphisConnection.createStation(
"<station-name>",
"<schema-name>",
Retention.MAX_MESSAGE_AGE_SECONDS, // MAX_MESSAGE_AGE_SECONDS/MESSAGES/BYTES. Defaults to MAX_MESSAGE_AGE_SECONDS
604800, // defaults to 604800
Storage.DISK, // Storage.DISK/Storage.MEMORY. Defaults to DISK
1, // defaults to 1
120000, // defaults to 2 minutes
true, // defaults to true
true, // defaults to true
false // defaults to false
).get();
Memphis currently supports the following types of retention:
RetentionTypes.MAX_MESSAGE_AGE_SECONDS
Means that every message persists for the value set in retention value field (in seconds)
RetentionTypes.MESSAGES
Means that after max amount of saved messages (set in retention value), the oldest messages will be deleted
RetentionTypes.BYTES
Means that after max amount of saved bytes (set in retention value), the oldest messages will be deleted
The retention values
are directly related to the retention types
mentioned above, where the values vary according to the type of retention chosen.
All retention values are of type Integer
but with different representations as follows:
RetentionTypes.MAX_MESSAGE_AGE_SECONDS
is represented in seconds, RetentionTypes.MESSAGES
in a number of messages and finally RetentionTypes.BYTES
in a number of bytes.
After these limits are reached oldest messages will be deleted.
Memphis currently supports the following types of messages storage:
StorageTypes.DISK
Means that messages persist on disk
StorageTypes.MEMORY
Means that messages persist on the main memory
Destroying a station will remove all its resources (producers/consumers)
station.destroy().get()
memphisConnection.attachSchema("<schema-name>", "<station-name>").get();
memphisConnection.detachSchema("<station-name>").get();
The most common client operations are produce
to send messages and consume
to
receive messages.
Messages are published to a station and consumed from it by creating a consumer. Consumers are pull based and consume all the messages in a station unless you are using a consumers group, in this case messages are spread across all members in this group.
Memphis messages are payload agnostic. Payloads are byte[]
.
In order to stop getting messages, you have to call consumer.destroy()
. Destroy will terminate regardless
of whether there are messages in flight for the client.
ProducerOptions pOpts = new ProducerOptions.Builder()
.stationName("<station-name>")
.producerName("<producer-name>")
.build();
MemphisProducer producer = memphisConnection.createProducer(pOpts);
producer.produce(byte[] message);
This method will add the message to an internal queue and return. If the queue is full, this method will block until the queue has been drained some.
producer.produceNonblocking(byte[] message);
producer.stop();
The asynchronous consumer executes a provided function on each batch of messages received in a background thread.
ConsumerOptions opts = new ConsumerOptions.Builder()
.consumerName("test-runner")
.stationName("example-station")
.build();
MemphisAsyncConsumer consumer = memphisConnection.createAsyncConsumer(
opts,
messages -> {
for(MemphisMessage msg : messages) {
System.out.println(new String(msg.getData(), StandardCharsets.UTF_8));
msg.ack();
}
});
);
consumer.start();
consumer.stop();
The synchronous consumer checks for messages when its fetch()
method is called.
The call blocks until messages are available or the wait timeout has been exceeded.
ConsumerOptions opts = new ConsumerOptions.Builder()
.consumerName("test-runner")
.stationName("example-station")
.build();
MemphisSyncConsumer consumer = connection.createSyncConsumer(opts);
var messages = consumer.fetch();
for(var msg : messages) {
msg.ack();
}
memphisConnection.isConnected();