项目作者: mmadfox

项目描述 :
Simple bidirectional stream server
高级语言: Go
项目地址: git://github.com/mmadfox/go-wirenet.git
创建时间: 2020-04-30T05:25:10Z
项目社区:https://github.com/mmadfox/go-wirenet

开源协议:Apache License 2.0

下载


go-wirenet

Simple bidirectional TCP stream server. Useful for NAT traversal.

Coverage Status
Go Documentation
Go Report Card

Design

Design

Client-Server
  1. // client <-> server
  2. client1 join to the server ---------NAT------> server
  3. client2 join to the server ---------NAT------> server
  4. client3 join to the server ---------NAT------> server
  5. client4 join to the server ---------NAT------> server
  6. // call from the server
  7. call client1 from the server ---------NAT------> client1
  8. call client 2 from the server --------NAT------> client2
  9. call client 3 from the server --------NAT------> client3
  10. call client 4 from the server --------NAT------> client4
  11. // call from the client
  12. call server from the client1 ---------NAT------> server
  13. call server from the client2 ---------NAT------> server
  14. call server from the client3 --------NAT------> server
  15. call server from the client4 --------NAT------> server
Client-Hub
  1. // clients <-> hub
  2. client1 join to the hub ---------NAT------> hub
  3. client2 join to the hub ---------NAT------> hub
  4. client3 join to the hub ---------NAT------> hub
  5. client4 join to the hub ---------NAT------> hub
  6. // call from the client
  7. call client2 from the client1 ---------NAT------> client2
  8. call client1 from the client2 ---------NAT------> client1
  9. call client2 from the client3 --------NAT------> client2
  10. call client1 from the client4 --------NAT------> client1

Table of contents

Installation

  1. go get github.com/mediabuyerbot/go-wirenet

Examples

Creating connection

  1. import "github.com/mediabuyerbot/go-wirenet"
  2. // make server side
  3. wire, err := wirenet.Mount(":8989", nil)
  4. if err != nil {
  5. handleError(err)
  6. }
  7. if err := wire.Connect(); err != nil {
  8. handleError(err)
  9. }
  10. // OR make client side
  11. wire, err := wirenet.Join(":8989", nil)
  12. if err != nil {
  13. handleError(err)
  14. }
  15. // connection
  16. if err := wire.Connect(); err != nil {
  17. handleError(err)
  18. }

Stream handling

  1. import "github.com/mediabuyerbot/go-wirenet"
  2. // server side
  3. wire, err := wirenet.Mount(":8989", nil)
  4. if err != nil {
  5. handleError(err)
  6. }
  7. // or client side
  8. wire, err := wirenet.Join(":8989", nil)
  9. if err != nil {
  10. handleError(err)
  11. }
  12. backupStream := func(ctx context.Context, stream wirenet.Stream) {
  13. file, err := os.Open("/backup.log")
  14. ...
  15. // write to stream
  16. n, err := stream.ReadFrom(file)
  17. ...
  18. stream.Close()
  19. }
  20. openChromeStream := func(ctx context.Context, stream wirenet.Stream) {
  21. // read from stream
  22. n, err := stream.WriteTo(os.Stdout)
  23. }
  24. wire.Stream("backup", backupStream)
  25. wire.Stream("openChrome", openChromeStream)
  26. if err := wire.Connect(); err != nil {
  27. handleError(err)
  28. }

Stream opening

  1. // make options
  2. opts := []wirenet.Option{
  3. wirenet.WithSessionOpenHook(func(session wirenet.Session) {
  4. hub.registerSession(session)
  5. }),
  6. wirenet.WithSessionCloseHook(func(session wirenet.Session) {
  7. hub.unregisterSession(session)
  8. }),
  9. }
  10. // make client side
  11. wire, err := wirenet.Join(":8989", opts...)
  12. // OR make server side
  13. wire, err := wirenet.Mount(":8989", opts...)
  14. ...
  15. // find an open session in some repository
  16. sess := hub.findSession("sessionID")
  17. stream, err := sess.OpenStream("backup")
  18. if err != nil {
  19. handleError(err)
  20. }
  21. defer stream.Close()
  22. backup, err := os.Open("/backup.log")
  23. if err != nil {
  24. handleError(err)
  25. }
  26. defer backup.Close()
  27. // write to stream
  28. n, err := stream.ReadFrom(backup)
  29. ...

Writing to stream

  1. wire.Stream("account.set", func(ctx context.Context, stream wirenet.Stream) {
  2. // write to stream using writer
  3. writer := stream.Writer()
  4. for {
  5. n, err := fileOne.Read(buf)
  6. if err != nil {
  7. handleError(err)
  8. break
  9. }
  10. n, err := writer.Write(buf[:n])
  11. ...
  12. }
  13. // EOF frame
  14. writer.Close()
  15. for {
  16. n, err := fileTwo.Read(buf)
  17. if err != nil {
  18. handleError(err)
  19. break
  20. }
  21. n, err := writer.Write(buf[:n])
  22. ...
  23. }
  24. // EOF frame
  25. writer.Close()
  26. ...
  27. // or write to stream (recommended)
  28. n, err := stream.ReadFrom(fileOne)
  29. ...
  30. n, err := stream.ReadFrom(fileTwo)
  31. })

Reading from stream

  1. wire.Stream("account.set", func(ctx context.Context, stream wirenet.Stream) {
  2. // reading from stream using reader
  3. reader := stream.Reader()
  4. buf := make([]byte, wirenet.BufSize)
  5. n, err := reader.Read(buf)
  6. // EOF frame
  7. reader.Close()
  8. ...
  9. // or reader from stream (recommended)
  10. n, err := stream.WriteTo(file)
  11. ...
  12. })

Using authentication

server

  1. tokenValidator := func(streamName string, id wirenet.Identification, token wirenet.Token) error {
  2. if streamName == "public" {
  3. return nil
  4. }
  5. return validate(token)
  6. }
  7. wire, err := wirenet.Mount(":8989", wirenet.WithTokenValidator(tokenValidator))
  8. go func() {
  9. if err := wire.Connect(); err != nil {
  10. handleError(err)
  11. }
  12. }()
  13. <-terminate()
  14. wire.Close()

client

  1. token := wirenet.Token("token")
  2. identification := wirenet.Identification("uuid")
  3. wire, err := wirenet.Join(":8989",
  4. wirenet.WithIdentification(identification, token),
  5. )
  6. if err := wire.Connect(); err != nil {
  7. handleError(err)
  8. }

Using SSL/TLS certs

server

  1. // make keys
  2. // ./certs/server.key
  3. // ./certs/server.pem
  4. tlsConf, err := wirenet.LoadCertificates("server", "./certs")
  5. if err != nil {
  6. handleError(err)
  7. }
  8. wire, err := wirenet.Mount(":8989", wirenet.WithTLS(tlsConf))
  9. go func() {
  10. if err := wire.Connect(); err != nil {
  11. handleError(err)
  12. }
  13. }()
  14. <-terminate()
  15. wire.Close()

client

  1. // make keys
  2. // ./certs/client.key
  3. // ./certs/client.pem
  4. tlsConf, err := wirenet.LoadCertificates("client", "./certs")
  5. if err != nil {
  6. handleError(err)
  7. }
  8. wire, err := wirenet.Mount(":8989", wirenet.WithTLS(tlsConf))
  9. if err := wire.Connect(); err != nil {
  10. handleError(err)
  11. }

Shutdown

  1. timeout := 120*time.Second
  2. wire, err := wirenet.Mount(":8989",
  3. // Waiting time for completion of all streams
  4. wirenet.WithSessionCloseTimeout(timeout),
  5. )
  6. go func() {
  7. if err := wire.Connect(); err != nil {
  8. handleError(err)
  9. }
  10. }()
  11. <-terminate()
  12. wire.Close()

KeepAlive

  1. // server side
  2. wire, err := wirenet.Mount(":8989",
  3. WithKeepAlive(true),
  4. WithKeepAliveInterval(30 * time.Second),
  5. )
  6. // OR client side
  7. wire, err := wirenet.Join(":8989",
  8. WithKeepAlive(true),
  9. WithKeepAliveInterval(30 * time.Second),
  10. )
  11. go func() {
  12. if err := wire.Connect(); err != nil {
  13. handleError(err)
  14. }
  15. }()
  16. <-terminate()
  17. wire.Close()

Hub mode

Attention! The name of the stream for each client must be unique!

hub

  1. hub, err := wirenet.Hub(":8989")
  2. hub.Connect()

client1

  1. client1, err := wirenet.Join(":8989")
  2. client1.Stream("client1:readBalance", func(ctx context.Context, s Stream) {})
  3. go func() {
  4. client1.Connect()
  5. }()
  6. ...
  7. sess, err := client2.Session("uuid")
  8. stream, err := sess.OpenStream("client2:readBalance")
  9. <-termiate()
  10. client1.Close()

client2

  1. client2, err := wirenet.Join(":8989")
  2. client2.Stream("client2:readBalance", func(ctx context.Context, s Stream) {})
  3. go func() {
  4. client2.Connect()
  5. }()
  6. ...
  7. sess, err := client2.Session("uuid")
  8. stream, err := sess.OpenStream("client1:readBalance")
  9. <-termiate()
  10. client2.Close()

Options

  1. wirenet.WithConnectHook(hook func(io.Closer)) Option
  2. wirenet.WithSessionOpenHook(hook wirenet.SessionHook) Option
  3. wirenet.WithSessionCloseHook(hook wirenet.SessionHook) Option
  4. wirenet.WithIdentification(id wirenet.Identification, token wirenet.Token) Option
  5. wirenet.WithTokenValidator(v wirenet.TokenValidator) Option // server side
  6. wirenet.WithTLS(conf *tls.Config) Option
  7. wirenet.WithRetryWait(min, max time.Duration) Option
  8. wirenet.WithRetryMax(n int) Option
  9. wirenet.WithReadWriteTimeouts(read, write time.Duration) Option
  10. wirenet.WithSessionCloseTimeout(dur time.Duration) Option