diff --git a/README.md b/README.md index a31626f..6c5cd4b 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@

go version -version
+version
version version version @@ -13,6 +13,16 @@ Fast message broker implemented with Golang programming language.
Using no external libraries, just internal Golang libraries. +## Guide +- [Install Stallion](#how-to-use) +- [Setup Stallion Server](#create-server-in-golang) +- [Using Docker](#create-a-server-with-docker) +- [Stallion Go SDK](#creating-clients) + - [Subscribe](#subscribe-on-a-topic) + - [Publish](#publish-over-a-topic) + - [Unsubscribe](#unsubscribe-from-a-topic) +- [Auth](#creating-a-server-with-auth) + ## How to use? Get package: ```shell @@ -76,3 +86,25 @@ client.Publish("topic", []byte("Hello")) client.Unsubscribe("topic") ``` +## Creating a server with Auth +You can create a Stallion server with username and password for Auth. +```go +package main + +import "github.com/official-stallion/stallion" + +func main() { + if err := stallion.NewServer(":9090", "root", "Pa$$word"); err != nil { + panic(err) + } +} +``` + +Now you can connect with username and password set in url. +```go +client, err := stallion.NewClient("st://root:Pa$$word@localhost:9090") +if err != nil { + panic(err) +} +``` + diff --git a/client.go b/client.go index b3af43d..83dab22 100644 --- a/client.go +++ b/client.go @@ -32,7 +32,10 @@ func NewClient(uri string) (Client, error) { return nil, fmt.Errorf("failed to connect to server: %w", err) } - client := internal.NewClient(conn) + client, err := internal.NewClient(conn, url.auth) + if err != nil { + return nil, fmt.Errorf("failed to connect to server: %w", err) + } return client, nil } diff --git a/example/client/auth-client/main.go b/example/client/auth-client/main.go new file mode 100644 index 0000000..97fbc0a --- /dev/null +++ b/example/client/auth-client/main.go @@ -0,0 +1,23 @@ +package main + +import ( + "fmt" + "time" + + "github.com/official-stallion/stallion" +) + +func main() { + client, err := stallion.NewClient("st://root:Pa$$word@localhost:9090") + if err != nil { + panic(err) + } + + client.Subscribe("topic", func(data []byte) { + fmt.Println(string(data)) + }) + + client.Publish("topic", []byte("Hello")) + + time.Sleep(3 * time.Second) +} diff --git a/example/server/auth-server/main.go b/example/server/auth-server/main.go new file mode 100644 index 0000000..95e142f --- /dev/null +++ b/example/server/auth-server/main.go @@ -0,0 +1,9 @@ +package main + +import "github.com/official-stallion/stallion" + +func main() { + if err := stallion.NewServer(":9090", "root", "Pa$$word"); err != nil { + panic(err) + } +} diff --git a/example/server/main.go b/example/server/simple-server/main.go similarity index 100% rename from example/server/main.go rename to example/server/simple-server/main.go diff --git a/internal/client.go b/internal/client.go index 1e673df..c825a6c 100644 --- a/internal/client.go +++ b/internal/client.go @@ -1,6 +1,7 @@ package internal import ( + "fmt" "net" "time" ) @@ -28,7 +29,7 @@ type client struct { } // NewClient creates a new client handler. -func NewClient(conn net.Conn) *client { +func NewClient(conn net.Conn, auth string) (*client, error) { c := &client{ topics: make(map[string]MessageHandler), communicateChannel: make(chan message), @@ -38,13 +39,18 @@ func NewClient(conn net.Conn) *client { }, } + // send the ping message + if err := c.ping([]byte(auth)); err != nil { + return nil, fmt.Errorf("failed to create client: %w", err) + } + // starting data reader go c.readDataFromServer() // start listening on channels go c.listen() - return c + return c, nil } // readDataFromServer gets all data from server. @@ -98,6 +104,38 @@ func (c *client) close() { _ = c.network.connection.Close() } +// send a ping message to stallion server. +func (c *client) ping(data []byte) error { + // sending ping data as a message + if err := c.network.send(encodeMessage(newMessage(PingMessage, "", data))); err != nil { + return fmt.Errorf("failed to ping server: %w", err) + } + + // creating a buffer + var buffer = make([]byte, 2048) + + // read data from network + tmp, er := c.network.get(buffer) + if er != nil { + return fmt.Errorf("server failed to pong: %w", er) + } + + // check for response + response, err := decodeMessage(tmp) + if err != nil { + return fmt.Errorf("decode message failed") + } + + switch response.Type { + case PongMessage: + return nil + case Imposter: + return fmt.Errorf("unauthorized user") + default: + return fmt.Errorf("connection failed") + } +} + // Publish will send a message to broker server. func (c *client) Publish(topic string, data []byte) error { err := c.network.send(encodeMessage(newMessage(Text, topic, data))) diff --git a/internal/enum.go b/internal/enum.go index 91a38cd..a77e62a 100644 --- a/internal/enum.go +++ b/internal/enum.go @@ -5,4 +5,7 @@ const ( Text int = iota + 1 // normal message Subscribe // subscribe message Unsubscribe // unsubscribe message + PingMessage // ping message + PongMessage // pong message + Imposter // unauthorized user message ) diff --git a/internal/server.go b/internal/server.go index 47db721..7d66584 100644 --- a/internal/server.go +++ b/internal/server.go @@ -7,13 +7,18 @@ import ( // server is our broker service. type server struct { + user string + pass string + prefix int broker *broker } // NewServer returns a new broker server. -func NewServer() *server { +func NewServer(user string, pass string) *server { s := &server{ + user: user, + pass: pass, prefix: 101, } @@ -33,6 +38,8 @@ func NewServer() *server { func (s *server) Handle(conn net.Conn) { w := newWorker( s.prefix, + s.user, + s.pass, conn, make(chan message), s.broker.receiveChannel, diff --git a/internal/types.go b/internal/types.go index 9c3d772..7dfaf08 100644 --- a/internal/types.go +++ b/internal/types.go @@ -3,20 +3,20 @@ package internal // MessageHandler is a handler for messages that come from subscribing. type MessageHandler func([]byte) -// WorkerChannel is worker channel with its id. +// workerChannel is worker channel with its id. type workerChannel struct { id int channel chan message } -// SubscribeChannel is for subscribe data channel. +// subscribeChannel is for subscribe data channel. type subscribeChannel struct { id int topic string channel chan message } -// UnsubscribeChannel is for unsubscribe data channel. +// unsubscribeChannel is for unsubscribe data channel. type unsubscribeChannel struct { id int topic string diff --git a/internal/worker.go b/internal/worker.go index 00d1e93..08a7569 100644 --- a/internal/worker.go +++ b/internal/worker.go @@ -1,7 +1,9 @@ package internal import ( + "fmt" "net" + "strings" "time" ) @@ -11,6 +13,12 @@ type worker struct { // each worker has its unique id id int + // authentication fields + // user of stallion client + user string + // pass of stallion client + pass string + // for network socket handling network network @@ -33,6 +41,8 @@ type worker struct { // newWorker generates a new worker. func newWorker( id int, + user string, + pass string, conn net.Conn, sen, rec chan message, sub chan subscribeChannel, @@ -40,7 +50,9 @@ func newWorker( ter chan int, ) *worker { return &worker{ - id: id, + id: id, + user: user, + pass: pass, network: network{ connection: conn, }, @@ -57,6 +69,15 @@ func (w *worker) start() { // closing channel after we are done defer close(w.sendChannel) + // check the ping pong connection + if err := w.pong(); err != nil { + logError("failed to pong client", err) + + w.terminateChannel <- w.id + + return + } + // start for input data go w.arrival() @@ -68,6 +89,43 @@ func (w *worker) start() { } } +// get client ping message. +func (w *worker) pong() error { + // creating a buffer + var buffer = make([]byte, 2048) + + // read data from network + tmp, er := w.network.get(buffer) + if er != nil { + return fmt.Errorf("client failed to ping: %w", er) + } + + // get user request + request, err := decodeMessage(tmp) + if err != nil { + return fmt.Errorf("decode message failed") + } + + data := strings.Split(string(request.Data), ":") + + // check auth + if w.user == data[0] && w.pass == data[1] { + // send pong response + if e := w.network.send(encodeMessage(newMessage(PongMessage, "", nil))); e != nil { + return fmt.Errorf("failed to pong client: %w", e) + } + + return nil + } + + // return sabotage message + if e := w.network.send(encodeMessage(newMessage(Imposter, "", nil))); e != nil { + return fmt.Errorf("failed to pong client: %w", e) + } + + return fmt.Errorf("un-auth client") +} + // transfer will send a data byte through handler. func (w *worker) transfer(data message) { err := w.network.send(encodeMessage(data)) diff --git a/server.go b/server.go index cca0d6e..1b0e7a2 100644 --- a/server.go +++ b/server.go @@ -13,9 +13,24 @@ type Server interface { } // NewServer creates a new broker server on given port. -func NewServer(port string) error { +func NewServer(port string, auth ...string) error { + // get authentication options + var ( + user string + pass string + ) + + // setting the authentication options + if len(auth) > 1 { + user = auth[0] + pass = auth[1] + } else { + user = " " + pass = " " + } + // creating a new server - serve := internal.NewServer() + serve := internal.NewServer(user, pass) // listen over a port listener, err := net.Listen("tcp", port) diff --git a/url.go b/url.go index a4341d9..dff2648 100644 --- a/url.go +++ b/url.go @@ -11,6 +11,7 @@ import ( // - port type url struct { address string + auth string } // urlUnpack @@ -27,12 +28,34 @@ func urlUnpack(inputUrl string) (*url, error) { return nil, fmt.Errorf("not using stallion protocol (st://...)") } - // exporting the host:port pair. - if len(strings.Split(protocolSplit[1], ":")) < 2 { - return nil, fmt.Errorf("server ip or port is not given") + var ( + address string + auth string + ) + + // exporting the user:pass@host:port with @ + if len(strings.Split(protocolSplit[1], "@")) < 2 { + auth = " : " + + // exporting the host:port pair + if len(strings.Split(protocolSplit[1], ":")) < 2 { + return nil, fmt.Errorf("server ip or port is not given") + } + + address = protocolSplit[1] + } else { + authAndAddress := strings.Split(protocolSplit[1], "@") + + if len(strings.Split(authAndAddress[0], ":")) < 2 { + return nil, fmt.Errorf("auth user or pass is not given") + } + + auth = authAndAddress[0] + address = authAndAddress[1] } return &url{ - address: protocolSplit[1], + address: address, + auth: auth, }, nil }