From 6aaaf7a62da982c928d5c8e6b9d1793cbff08262 Mon Sep 17 00:00:00 2001 From: amirhnajafiz Date: Tue, 11 Oct 2022 10:52:05 +0330 Subject: [PATCH 01/17] feat: ping pong --- internal/client.go | 5 +++++ internal/server.go | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/internal/client.go b/internal/client.go index 1e673df..ceecb9a 100644 --- a/internal/client.go +++ b/internal/client.go @@ -98,6 +98,11 @@ func (c *client) close() { _ = c.network.connection.Close() } +// send a ping message to stallion server. +func (c *client) ping() error { + return nil +} + // Publish will send a message to broker server. func (c *client) Publish(topic string, data []byte) error { err := c.network.send(encodeMessage(newMessage(Text, topic, data))) diff --git a/internal/server.go b/internal/server.go index 47db721..79ab9d3 100644 --- a/internal/server.go +++ b/internal/server.go @@ -29,6 +29,11 @@ func NewServer() *server { return s } +// get client ping message. +func (s *server) pong() error { + return nil +} + // Handle will handle the clients. func (s *server) Handle(conn net.Conn) { w := newWorker( From 193454b7b3b6b5428af46de6b301e6fd875ad138 Mon Sep 17 00:00:00 2001 From: amirhnajafiz Date: Tue, 11 Oct 2022 13:20:43 +0330 Subject: [PATCH 02/17] add: ping message --- internal/types.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/internal/types.go b/internal/types.go index 9c3d772..54dcca5 100644 --- a/internal/types.go +++ b/internal/types.go @@ -3,21 +3,28 @@ package internal // MessageHandler is a handler for messages that come from subscribing. type MessageHandler func([]byte) -// WorkerChannel is worker channel with its id. +// workerChannel is worker channel with its id. type workerChannel struct { id int channel chan message } -// SubscribeChannel is for subscribe data channel. +// subscribeChannel is for subscribe data channel. type subscribeChannel struct { id int topic string channel chan message } -// UnsubscribeChannel is for unsubscribe data channel. +// unsubscribeChannel is for unsubscribe data channel. type unsubscribeChannel struct { id int topic string } + +// pingMessage is the first message that is being sent +// to stallion server. +type pingMessage struct { + username string + password string +} From b9cd601c2fd87ee6aa5edfeb4cce8716aa84d519 Mon Sep 17 00:00:00 2001 From: amirhnajafiz Date: Tue, 11 Oct 2022 13:28:32 +0330 Subject: [PATCH 03/17] implement: ping method --- internal/client.go | 29 ++++++++++++++++++++++++++++- internal/enum.go | 3 +++ internal/types.go | 7 ------- 3 files changed, 31 insertions(+), 8 deletions(-) diff --git a/internal/client.go b/internal/client.go index ceecb9a..c198849 100644 --- a/internal/client.go +++ b/internal/client.go @@ -1,6 +1,7 @@ package internal import ( + "fmt" "net" "time" ) @@ -100,7 +101,33 @@ func (c *client) close() { // send a ping message to stallion server. func (c *client) ping() error { - return nil + // creating ping data + data := []byte("username:password") + + // sending ping data as a message + if err := c.network.send(encodeMessage(newMessage(PingMessage, "", data))); err != nil { + return fmt.Errorf("failed to ping server: %w", err) + } + + // creating a buffer + var buffer = make([]byte, 2048) + + // read data from network + tmp, er := c.network.get(buffer) + if er != nil { + return fmt.Errorf("server failed to pong: %w", er) + } + + // check for response + response, _ := decodeMessage(tmp) + switch response.Type { + case PongMessage: + return nil + case Imposter: + return fmt.Errorf("unauthorized user") + default: + return fmt.Errorf("connection failed") + } } // Publish will send a message to broker server. diff --git a/internal/enum.go b/internal/enum.go index 91a38cd..a77e62a 100644 --- a/internal/enum.go +++ b/internal/enum.go @@ -5,4 +5,7 @@ const ( Text int = iota + 1 // normal message Subscribe // subscribe message Unsubscribe // unsubscribe message + PingMessage // ping message + PongMessage // pong message + Imposter // unauthorized user message ) diff --git a/internal/types.go b/internal/types.go index 54dcca5..7dfaf08 100644 --- a/internal/types.go +++ b/internal/types.go @@ -21,10 +21,3 @@ type unsubscribeChannel struct { id int topic string } - -// pingMessage is the first message that is being sent -// to stallion server. -type pingMessage struct { - username string - password string -} From 1aa09960f6ff02de0eb6ac1c67ec738ab37bb852 Mon Sep 17 00:00:00 2001 From: amirhnajafiz Date: Tue, 11 Oct 2022 13:29:32 +0330 Subject: [PATCH 04/17] update: client ping --- internal/client.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/internal/client.go b/internal/client.go index c198849..ea4df0d 100644 --- a/internal/client.go +++ b/internal/client.go @@ -100,10 +100,7 @@ func (c *client) close() { } // send a ping message to stallion server. -func (c *client) ping() error { - // creating ping data - data := []byte("username:password") - +func (c *client) ping(data []byte) error { // sending ping data as a message if err := c.network.send(encodeMessage(newMessage(PingMessage, "", data))); err != nil { return fmt.Errorf("failed to ping server: %w", err) From 6353a060c831affe183c6643038a93488344878c Mon Sep 17 00:00:00 2001 From: amirhnajafiz Date: Tue, 11 Oct 2022 13:34:34 +0330 Subject: [PATCH 05/17] implement: pong method --- internal/server.go | 5 ----- internal/worker.go | 25 +++++++++++++++++++++++++ 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/internal/server.go b/internal/server.go index 79ab9d3..47db721 100644 --- a/internal/server.go +++ b/internal/server.go @@ -29,11 +29,6 @@ func NewServer() *server { return s } -// get client ping message. -func (s *server) pong() error { - return nil -} - // Handle will handle the clients. func (s *server) Handle(conn net.Conn) { w := newWorker( diff --git a/internal/worker.go b/internal/worker.go index 00d1e93..a80019a 100644 --- a/internal/worker.go +++ b/internal/worker.go @@ -1,6 +1,7 @@ package internal import ( + "fmt" "net" "time" ) @@ -68,6 +69,30 @@ func (w *worker) start() { } } +// get client ping message. +func (w *worker) pong() error { + // creating a buffer + var buffer = make([]byte, 2048) + + // read data from network + tmp, er := w.network.get(buffer) + if er != nil { + return fmt.Errorf("client failed to ping: %w", er) + } + + // get user request + _, _ = decodeMessage(tmp) + + // todo: check username and password + + // send pong response + if err := w.network.send(encodeMessage(newMessage(PongMessage, "", nil))); err != nil { + return fmt.Errorf("failed to pong client: %w", err) + } + + return nil +} + // transfer will send a data byte through handler. func (w *worker) transfer(data message) { err := w.network.send(encodeMessage(data)) From 20b0ea7559ecce8e4d75eff7357d6d004dd3eb40 Mon Sep 17 00:00:00 2001 From: amirhnajafiz Date: Tue, 11 Oct 2022 13:40:37 +0330 Subject: [PATCH 06/17] update: pong message --- internal/worker.go | 30 ++++++++++++++++++++++++------ 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/internal/worker.go b/internal/worker.go index a80019a..5758397 100644 --- a/internal/worker.go +++ b/internal/worker.go @@ -3,6 +3,7 @@ package internal import ( "fmt" "net" + "strings" "time" ) @@ -12,6 +13,12 @@ type worker struct { // each worker has its unique id id int + // authentication fields + // user of stallion client + user string + // pass of stallion client + pass string + // for network socket handling network network @@ -34,6 +41,8 @@ type worker struct { // newWorker generates a new worker. func newWorker( id int, + user string, + pass string, conn net.Conn, sen, rec chan message, sub chan subscribeChannel, @@ -41,7 +50,9 @@ func newWorker( ter chan int, ) *worker { return &worker{ - id: id, + id: id, + user: user, + pass: pass, network: network{ connection: conn, }, @@ -81,12 +92,19 @@ func (w *worker) pong() error { } // get user request - _, _ = decodeMessage(tmp) - - // todo: check username and password + request, _ := decodeMessage(tmp) + data := strings.Split(string(request.Data), ":") + + // check auth + if w.user == data[0] && w.pass == data[1] { + // send pong response + if err := w.network.send(encodeMessage(newMessage(PongMessage, "", nil))); err != nil { + return fmt.Errorf("failed to pong client: %w", err) + } + } - // send pong response - if err := w.network.send(encodeMessage(newMessage(PongMessage, "", nil))); err != nil { + // return sabotage message + if err := w.network.send(encodeMessage(newMessage(Imposter, "", nil))); err != nil { return fmt.Errorf("failed to pong client: %w", err) } From 0068f805f8d3bb548684d9f557492e11d66cca1d Mon Sep 17 00:00:00 2001 From: amirhnajafiz Date: Tue, 11 Oct 2022 13:44:12 +0330 Subject: [PATCH 07/17] add: server user and pass --- internal/server.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/server.go b/internal/server.go index 47db721..6e5d375 100644 --- a/internal/server.go +++ b/internal/server.go @@ -33,6 +33,8 @@ func NewServer() *server { func (s *server) Handle(conn net.Conn) { w := newWorker( s.prefix, + "", + "", conn, make(chan message), s.broker.receiveChannel, From e4c4c7e88898a6dc26d40334dc667ecf700900ab Mon Sep 17 00:00:00 2001 From: amirhnajafiz Date: Tue, 11 Oct 2022 13:47:23 +0330 Subject: [PATCH 08/17] add: server auth options in new server method --- internal/server.go | 11 ++++++++--- server.go | 16 ++++++++++++++-- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/internal/server.go b/internal/server.go index 6e5d375..7d66584 100644 --- a/internal/server.go +++ b/internal/server.go @@ -7,13 +7,18 @@ import ( // server is our broker service. type server struct { + user string + pass string + prefix int broker *broker } // NewServer returns a new broker server. -func NewServer() *server { +func NewServer(user string, pass string) *server { s := &server{ + user: user, + pass: pass, prefix: 101, } @@ -33,8 +38,8 @@ func NewServer() *server { func (s *server) Handle(conn net.Conn) { w := newWorker( s.prefix, - "", - "", + s.user, + s.pass, conn, make(chan message), s.broker.receiveChannel, diff --git a/server.go b/server.go index cca0d6e..0bf4d08 100644 --- a/server.go +++ b/server.go @@ -13,9 +13,21 @@ type Server interface { } // NewServer creates a new broker server on given port. -func NewServer(port string) error { +func NewServer(port string, auth ...string) error { + // get authentication options + var ( + user string + pass string + ) + + // setting the authentication options + if len(auth) > 1 { + user = auth[0] + pass = auth[1] + } + // creating a new server - serve := internal.NewServer() + serve := internal.NewServer(user, pass) // listen over a port listener, err := net.Listen("tcp", port) From b52d47d4280bfe42f68330accd09f4e544362577 Mon Sep 17 00:00:00 2001 From: amirhnajafiz Date: Tue, 11 Oct 2022 13:50:11 +0330 Subject: [PATCH 09/17] update: worker start with pong --- internal/worker.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/internal/worker.go b/internal/worker.go index 5758397..afc7fdc 100644 --- a/internal/worker.go +++ b/internal/worker.go @@ -69,6 +69,13 @@ func (w *worker) start() { // closing channel after we are done defer close(w.sendChannel) + // check the ping pong connection + if err := w.pong(); err != nil { + w.terminateChannel <- w.id + + return + } + // start for input data go w.arrival() From 1a10205a9707d9cdb98b5b5b00d30780583d014e Mon Sep 17 00:00:00 2001 From: amirhnajafiz Date: Tue, 11 Oct 2022 13:51:59 +0330 Subject: [PATCH 10/17] update: client with ping method --- internal/client.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/internal/client.go b/internal/client.go index ea4df0d..a04c226 100644 --- a/internal/client.go +++ b/internal/client.go @@ -29,7 +29,7 @@ type client struct { } // NewClient creates a new client handler. -func NewClient(conn net.Conn) *client { +func NewClient(conn net.Conn) (*client, error) { c := &client{ topics: make(map[string]MessageHandler), communicateChannel: make(chan message), @@ -39,13 +39,18 @@ func NewClient(conn net.Conn) *client { }, } + // send the ping message + if err := c.ping([]byte("")); err != nil { + return nil, fmt.Errorf("failed to create client: %w", err) + } + // starting data reader go c.readDataFromServer() // start listening on channels go c.listen() - return c + return c, nil } // readDataFromServer gets all data from server. From 53d27227aa78967d1e150f4fc6db8dd0d66cd556 Mon Sep 17 00:00:00 2001 From: amirhnajafiz Date: Tue, 11 Oct 2022 13:54:07 +0330 Subject: [PATCH 11/17] add: client error handling --- client.go | 5 ++++- internal/client.go | 4 ++-- url.go | 1 + 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/client.go b/client.go index b3af43d..83dab22 100644 --- a/client.go +++ b/client.go @@ -32,7 +32,10 @@ func NewClient(uri string) (Client, error) { return nil, fmt.Errorf("failed to connect to server: %w", err) } - client := internal.NewClient(conn) + client, err := internal.NewClient(conn, url.auth) + if err != nil { + return nil, fmt.Errorf("failed to connect to server: %w", err) + } return client, nil } diff --git a/internal/client.go b/internal/client.go index a04c226..1a64125 100644 --- a/internal/client.go +++ b/internal/client.go @@ -29,7 +29,7 @@ type client struct { } // NewClient creates a new client handler. -func NewClient(conn net.Conn) (*client, error) { +func NewClient(conn net.Conn, auth string) (*client, error) { c := &client{ topics: make(map[string]MessageHandler), communicateChannel: make(chan message), @@ -40,7 +40,7 @@ func NewClient(conn net.Conn) (*client, error) { } // send the ping message - if err := c.ping([]byte("")); err != nil { + if err := c.ping([]byte(auth)); err != nil { return nil, fmt.Errorf("failed to create client: %w", err) } diff --git a/url.go b/url.go index a4341d9..68a46f1 100644 --- a/url.go +++ b/url.go @@ -11,6 +11,7 @@ import ( // - port type url struct { address string + auth string } // urlUnpack From c2d8f156819564bf1becb38bffc4d2ea8c70d8e2 Mon Sep 17 00:00:00 2001 From: amirhnajafiz Date: Tue, 11 Oct 2022 13:59:56 +0330 Subject: [PATCH 12/17] update: client url unpacking method --- server.go | 3 +++ url.go | 30 ++++++++++++++++++++++++++---- 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/server.go b/server.go index 0bf4d08..1b0e7a2 100644 --- a/server.go +++ b/server.go @@ -24,6 +24,9 @@ func NewServer(port string, auth ...string) error { if len(auth) > 1 { user = auth[0] pass = auth[1] + } else { + user = " " + pass = " " } // creating a new server diff --git a/url.go b/url.go index 68a46f1..dff2648 100644 --- a/url.go +++ b/url.go @@ -28,12 +28,34 @@ func urlUnpack(inputUrl string) (*url, error) { return nil, fmt.Errorf("not using stallion protocol (st://...)") } - // exporting the host:port pair. - if len(strings.Split(protocolSplit[1], ":")) < 2 { - return nil, fmt.Errorf("server ip or port is not given") + var ( + address string + auth string + ) + + // exporting the user:pass@host:port with @ + if len(strings.Split(protocolSplit[1], "@")) < 2 { + auth = " : " + + // exporting the host:port pair + if len(strings.Split(protocolSplit[1], ":")) < 2 { + return nil, fmt.Errorf("server ip or port is not given") + } + + address = protocolSplit[1] + } else { + authAndAddress := strings.Split(protocolSplit[1], "@") + + if len(strings.Split(authAndAddress[0], ":")) < 2 { + return nil, fmt.Errorf("auth user or pass is not given") + } + + auth = authAndAddress[0] + address = authAndAddress[1] } return &url{ - address: protocolSplit[1], + address: address, + auth: auth, }, nil } From af467f82d1e3a90076ccf25829cca87d394fe637 Mon Sep 17 00:00:00 2001 From: amirhnajafiz Date: Tue, 11 Oct 2022 14:01:43 +0330 Subject: [PATCH 13/17] new: auth server example --- example/server/auth-server/main.go | 9 +++++++++ example/server/{ => simple-server}/main.go | 0 2 files changed, 9 insertions(+) create mode 100644 example/server/auth-server/main.go rename example/server/{ => simple-server}/main.go (100%) diff --git a/example/server/auth-server/main.go b/example/server/auth-server/main.go new file mode 100644 index 0000000..95e142f --- /dev/null +++ b/example/server/auth-server/main.go @@ -0,0 +1,9 @@ +package main + +import "github.com/official-stallion/stallion" + +func main() { + if err := stallion.NewServer(":9090", "root", "Pa$$word"); err != nil { + panic(err) + } +} diff --git a/example/server/main.go b/example/server/simple-server/main.go similarity index 100% rename from example/server/main.go rename to example/server/simple-server/main.go From b3cf7343c3df0dcdefc5c1d357ca4e3ca019a27f Mon Sep 17 00:00:00 2001 From: amirhnajafiz Date: Tue, 11 Oct 2022 14:02:32 +0330 Subject: [PATCH 14/17] update: client auth example --- example/client/auth-client/main.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 example/client/auth-client/main.go diff --git a/example/client/auth-client/main.go b/example/client/auth-client/main.go new file mode 100644 index 0000000..97fbc0a --- /dev/null +++ b/example/client/auth-client/main.go @@ -0,0 +1,23 @@ +package main + +import ( + "fmt" + "time" + + "github.com/official-stallion/stallion" +) + +func main() { + client, err := stallion.NewClient("st://root:Pa$$word@localhost:9090") + if err != nil { + panic(err) + } + + client.Subscribe("topic", func(data []byte) { + fmt.Println(string(data)) + }) + + client.Publish("topic", []byte("Hello")) + + time.Sleep(3 * time.Second) +} From 99390c21eec902201aba21330c2750f57cdab04b Mon Sep 17 00:00:00 2001 From: amirhnajafiz Date: Tue, 11 Oct 2022 14:25:02 +0330 Subject: [PATCH 15/17] fix: ping pong bug --- internal/client.go | 6 +++++- internal/worker.go | 20 ++++++++++++++------ 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/internal/client.go b/internal/client.go index 1a64125..c825a6c 100644 --- a/internal/client.go +++ b/internal/client.go @@ -121,7 +121,11 @@ func (c *client) ping(data []byte) error { } // check for response - response, _ := decodeMessage(tmp) + response, err := decodeMessage(tmp) + if err != nil { + return fmt.Errorf("decode message failed") + } + switch response.Type { case PongMessage: return nil diff --git a/internal/worker.go b/internal/worker.go index afc7fdc..08a7569 100644 --- a/internal/worker.go +++ b/internal/worker.go @@ -71,6 +71,8 @@ func (w *worker) start() { // check the ping pong connection if err := w.pong(); err != nil { + logError("failed to pong client", err) + w.terminateChannel <- w.id return @@ -99,23 +101,29 @@ func (w *worker) pong() error { } // get user request - request, _ := decodeMessage(tmp) + request, err := decodeMessage(tmp) + if err != nil { + return fmt.Errorf("decode message failed") + } + data := strings.Split(string(request.Data), ":") // check auth if w.user == data[0] && w.pass == data[1] { // send pong response - if err := w.network.send(encodeMessage(newMessage(PongMessage, "", nil))); err != nil { - return fmt.Errorf("failed to pong client: %w", err) + if e := w.network.send(encodeMessage(newMessage(PongMessage, "", nil))); e != nil { + return fmt.Errorf("failed to pong client: %w", e) } + + return nil } // return sabotage message - if err := w.network.send(encodeMessage(newMessage(Imposter, "", nil))); err != nil { - return fmt.Errorf("failed to pong client: %w", err) + if e := w.network.send(encodeMessage(newMessage(Imposter, "", nil))); e != nil { + return fmt.Errorf("failed to pong client: %w", e) } - return nil + return fmt.Errorf("un-auth client") } // transfer will send a data byte through handler. From e655d64b18da13be1290ccd5d46438aab9eb5b45 Mon Sep 17 00:00:00 2001 From: amirhnajafiz Date: Tue, 11 Oct 2022 14:31:33 +0330 Subject: [PATCH 16/17] update: readme --- README.md | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index a31626f..2a84584 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@

go version -version
+version
version version version @@ -76,3 +76,25 @@ client.Publish("topic", []byte("Hello")) client.Unsubscribe("topic") ``` +## Creating a server with Auth +You can create a Stallion server with username and password for Auth. +```go +package main + +import "github.com/official-stallion/stallion" + +func main() { + if err := stallion.NewServer(":9090", "root", "Pa$$word"); err != nil { + panic(err) + } +} +``` + +Now you can connect with username and password set in url. +```go +client, err := stallion.NewClient("st://root:Pa$$word@localhost:9090") +if err != nil { + panic(err) +} +``` + From f3dd6d9283b662692732bad90ab6f3c29b6ce3bb Mon Sep 17 00:00:00 2001 From: amirhnajafiz Date: Tue, 11 Oct 2022 14:35:12 +0330 Subject: [PATCH 17/17] update: readme --- README.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/README.md b/README.md index 2a84584..6c5cd4b 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,16 @@ Fast message broker implemented with Golang programming language.
Using no external libraries, just internal Golang libraries. +## Guide +- [Install Stallion](#how-to-use) +- [Setup Stallion Server](#create-server-in-golang) +- [Using Docker](#create-a-server-with-docker) +- [Stallion Go SDK](#creating-clients) + - [Subscribe](#subscribe-on-a-topic) + - [Publish](#publish-over-a-topic) + - [Unsubscribe](#unsubscribe-from-a-topic) +- [Auth](#creating-a-server-with-auth) + ## How to use? Get package: ```shell