Add option to cosume connect events rather than polling to restart deployments

This commit is contained in:
jillianwilson
2021-10-06 14:50:47 -03:00
parent b35c668959
commit a5f4a7a0c1
733 changed files with 86908 additions and 24010 deletions

201
vendor/github.com/suborbital/grav/LICENSE generated vendored Normal file
View File

@@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@@ -0,0 +1,84 @@
package local
import (
"encoding/json"
"fmt"
"time"
"github.com/schollz/peerdiscovery"
"github.com/suborbital/grav/grav"
"github.com/suborbital/vektor/vlog"
)
// Discovery is a grav Discovery plugin using local network multicast
type Discovery struct {
opts *grav.DiscoveryOpts
log *vlog.Logger
discoveryFunc grav.DiscoveryFunc
}
// payload is a discovery payload
type payload struct {
UUID string `json:"uuid"`
Port string `json:"port"`
Path string `json:"path"`
}
// New creates a new local discovery plugin
func New() *Discovery {
g := &Discovery{}
return g
}
// Start starts discovery
func (d *Discovery) Start(opts *grav.DiscoveryOpts, discoveryFunc grav.DiscoveryFunc) error {
d.opts = opts
d.log = opts.Logger
d.discoveryFunc = discoveryFunc
d.log.Info("[discovery-local] starting discovery, advertising endpoint", opts.TransportPort, opts.TransportURI)
payloadFunc := func() []byte {
payload := payload{
UUID: d.opts.NodeUUID,
Port: opts.TransportPort,
Path: opts.TransportURI,
}
payloadBytes, _ := json.Marshal(payload)
return payloadBytes
}
notifyFunc := func(peer peerdiscovery.Discovered) {
d.log.Debug("[discovery-local] potential peer found:", peer.Address)
payload := payload{}
if err := json.Unmarshal(peer.Payload, &payload); err != nil {
d.log.Debug("[discovery-local] peer did not offer correct payload, discarding")
return
}
endpoint := fmt.Sprintf("%s:%s%s", peer.Address, payload.Port, payload.Path)
// send the discovery to Grav. Grav is responsible for ensuring uniqueness of the connections.
d.discoveryFunc(endpoint, payload.UUID)
}
_, err := peerdiscovery.Discover(peerdiscovery.Settings{
Limit: -1,
PayloadFunc: payloadFunc,
Delay: 10 * time.Second,
TimeLimit: -1,
Notify: notifyFunc,
AllowSelf: true,
})
return err
}
// UseDiscoveryFunc sets the function to be used when a new peer is discovered
func (d *Discovery) UseDiscoveryFunc(dFunc func(endpoint string, uuid string)) {
d.discoveryFunc = dFunc
}

83
vendor/github.com/suborbital/grav/grav/bus.go generated vendored Normal file
View File

@@ -0,0 +1,83 @@
package grav
const (
defaultBusChanSize = 256
)
// messageBus is responsible for emitting messages among the connected pods
// and managing the failure cases for those pods
type messageBus struct {
busChan MsgChan
pool *connectionPool
buffer *MsgBuffer
}
// newMessageBus creates a new messageBus
func newMessageBus() *messageBus {
b := &messageBus{
busChan: make(chan Message, defaultBusChanSize),
pool: newConnectionPool(),
buffer: NewMsgBuffer(defaultBufferSize),
}
b.start()
return b
}
// addPod adds a pod to the connection pool
func (b *messageBus) addPod(pod *Pod) {
b.pool.insert(pod)
}
func (b *messageBus) start() {
go func() {
// continually take new messages and for each,
// grab the next active connection from the ring and then
// start traversing around the ring to emit the message to
// each connection until landing back at the beginning of the
// ring, and repeat forever when each new message arrives
for msg := range b.busChan {
for {
// make sure the next pod is ready for messages
if err := b.pool.prepareNext(b.buffer); err == nil {
break
}
}
startingConn := b.pool.next()
b.traverse(msg, startingConn)
b.buffer.Push(msg)
}
}()
}
func (b *messageBus) traverse(msg Message, start *podConnection) {
startID := start.ID
conn := start
for {
// send the message to the pod
conn.send(msg)
// run checks on the next podConnection to see if
// anything needs to be done (including potentially deleting it)
next := b.pool.peek()
if err := b.pool.prepareNext(b.buffer); err != nil {
if startID == next.ID {
startID = next.next.ID
}
}
// now advance the ring
conn = b.pool.next()
if startID == conn.ID {
// if we have arrived back at the starting point on the ring
// we have done our job and are ready for the next message
break
}
}
}

21
vendor/github.com/suborbital/grav/grav/discovery.go generated vendored Normal file
View File

@@ -0,0 +1,21 @@
package grav
import "github.com/suborbital/vektor/vlog"
// DiscoveryFunc is a function that allows a plugin to report a newly discovered node
type DiscoveryFunc func(endpoint string, uuid string)
// Discovery represents a discovery plugin
type Discovery interface {
// Start is called to start the Discovery plugin
Start(*DiscoveryOpts, DiscoveryFunc) error
}
// DiscoveryOpts is a set of options for transports
type DiscoveryOpts struct {
NodeUUID string
TransportPort string
TransportURI string
Logger *vlog.Logger
Custom interface{}
}

71
vendor/github.com/suborbital/grav/grav/filter.go generated vendored Normal file
View File

@@ -0,0 +1,71 @@
package grav
import (
"sync"
)
// messageFilter is a series of maps that associate things about a message (its UUID, type, etc) with a boolean value to say if
// it should be allowed or denied. For each of the maps, if an entry is included, the value of the boolean is respected (true = allow, false = deny)
// Maps are either inclusive (meaning that a missing entry defaults to allow), or exclusive (meaning that a missing entry defaults to deny)
// This can be configured per map by modifiying the UUIDInclusive, TypeInclusive (etc) fields.
type messageFilter struct {
UUIDMap map[string]bool
UUIDInclusive bool
TypeMap map[string]bool
TypeInclusive bool
lock sync.RWMutex
}
func newMessageFilter() *messageFilter {
mf := &messageFilter{
UUIDMap: map[string]bool{},
UUIDInclusive: true,
TypeMap: map[string]bool{},
TypeInclusive: true,
lock: sync.RWMutex{},
}
return mf
}
func (mf *messageFilter) allow(msg Message) bool {
mf.lock.RLock()
defer mf.lock.RUnlock()
// for each map, deny the message if:
// - a filter entry exists and it's value is false
// - a filter entry doesn't exist and its inclusive rule is false
allowType, typeExists := mf.TypeMap[msg.Type()]
if typeExists && !allowType {
return false
} else if !typeExists && !mf.TypeInclusive {
return false
}
allowUUID, uuidExists := mf.UUIDMap[msg.UUID()]
if uuidExists && !allowUUID {
return false
} else if !uuidExists && !mf.UUIDInclusive {
return false
}
return true
}
// FilterUUID likely should not be used in normal cases, it adds a message UUID to the pod's filter.
func (mf *messageFilter) FilterUUID(uuid string, allow bool) {
mf.lock.Lock()
defer mf.lock.Unlock()
mf.UUIDMap[uuid] = allow
}
func (mf *messageFilter) FilterType(msgType string, allow bool) {
mf.lock.Lock()
defer mf.lock.Unlock()
mf.TypeMap[msgType] = allow
}

72
vendor/github.com/suborbital/grav/grav/grav.go generated vendored Normal file
View File

@@ -0,0 +1,72 @@
package grav
import (
"github.com/pkg/errors"
"github.com/google/uuid"
"github.com/suborbital/vektor/vlog"
)
// ErrTransportNotConfigured represent package-level vars
var (
ErrTransportNotConfigured = errors.New("transport plugin not configured")
)
// Grav represents a Grav message bus instance
type Grav struct {
NodeUUID string
bus *messageBus
logger *vlog.Logger
hub *hub
}
// New creates a new Grav with the provided options
func New(opts ...OptionsModifier) *Grav {
nodeUUID := uuid.New().String()
options := newOptionsWithModifiers(opts...)
g := &Grav{
NodeUUID: nodeUUID,
bus: newMessageBus(),
logger: options.Logger,
}
// the hub handles coordinating the transport and discovery plugins
g.hub = initHub(nodeUUID, options, options.Transport, options.Discovery, g.Connect)
return g
}
// Connect creates a new connection (pod) to the bus
func (g *Grav) Connect() *Pod {
opts := &podOpts{WantsReplay: false}
return g.connectWithOpts(opts)
}
// ConnectWithReplay creates a new connection (pod) to the bus
// and replays recent messages when the pod sets its onFunc
func (g *Grav) ConnectWithReplay() *Pod {
opts := &podOpts{WantsReplay: true}
return g.connectWithOpts(opts)
}
// ConnectEndpoint uses the configured transport to connect the bus to an external endpoint
func (g *Grav) ConnectEndpoint(endpoint string) error {
return g.hub.connectEndpoint(endpoint, "")
}
// ConnectBridgeTopic connects the Grav instance to a particular topic on the connected bridge
func (g *Grav) ConnectBridgeTopic(topic string) error {
return g.hub.connectBridgeTopic(topic)
}
func (g *Grav) connectWithOpts(opts *podOpts) *Pod {
pod := newPod(g.bus.busChan, opts)
g.bus.addPod(pod)
return pod
}

290
vendor/github.com/suborbital/grav/grav/hub.go generated vendored Normal file
View File

@@ -0,0 +1,290 @@
package grav
import (
"sync"
"github.com/pkg/errors"
"github.com/suborbital/vektor/vlog"
)
// hub is responsible for coordinating the transport and discovery plugins
type hub struct {
nodeUUID string
transport Transport
discovery Discovery
log *vlog.Logger
pod *Pod
connectFunc func() *Pod
connections map[string]Connection
topicConnections map[string]TopicConnection
lock sync.RWMutex
}
func initHub(nodeUUID string, options *Options, tspt Transport, dscv Discovery, connectFunc func() *Pod) *hub {
h := &hub{
nodeUUID: nodeUUID,
transport: tspt,
discovery: dscv,
log: options.Logger,
pod: connectFunc(),
connectFunc: connectFunc,
connections: map[string]Connection{},
topicConnections: map[string]TopicConnection{},
lock: sync.RWMutex{},
}
// start transport, then discovery if each have been configured (can have transport but no discovery)
if h.transport != nil {
transportOpts := &TransportOpts{
NodeUUID: nodeUUID,
Port: options.Port,
URI: options.URI,
Logger: options.Logger,
}
// setup messages to be sent to all active connections
h.pod.On(h.outgoingMessageHandler())
go func() {
if err := h.transport.Setup(transportOpts, h.handleIncomingConnection, h.findConnection); err != nil {
options.Logger.Error(errors.Wrap(err, "failed to Setup transport"))
}
}()
if h.discovery != nil {
discoveryOpts := &DiscoveryOpts{
NodeUUID: nodeUUID,
TransportPort: transportOpts.Port,
TransportURI: transportOpts.URI,
Logger: options.Logger,
}
go func() {
if err := h.discovery.Start(discoveryOpts, h.discoveryHandler()); err != nil {
options.Logger.Error(errors.Wrap(err, "failed to Start discovery"))
}
}()
}
}
return h
}
func (h *hub) discoveryHandler() func(endpoint string, uuid string) {
return func(endpoint string, uuid string) {
if uuid == h.nodeUUID {
h.log.Debug("discovered self, discarding")
return
}
// connectEndpoint does this check as well, but it's better to do it here as well
// as it reduces the number of extraneous outgoing handshakes that get attempted.
if existing, exists := h.findConnection(uuid); exists {
if !existing.CanReplace() {
h.log.Debug("encountered duplicate connection, discarding")
return
}
}
if err := h.connectEndpoint(endpoint, uuid); err != nil {
h.log.Error(errors.Wrap(err, "failed to connectEndpoint for discovered peer"))
}
}
}
// connectEndpoint creates a new outgoing connection
func (h *hub) connectEndpoint(endpoint, uuid string) error {
if h.transport == nil {
return ErrTransportNotConfigured
}
if h.transport.Type() == TransportTypeBridge {
return ErrBridgeOnlyTransport
}
h.log.Debug("connecting to endpoint", endpoint)
conn, err := h.transport.CreateConnection(endpoint)
if err != nil {
return errors.Wrap(err, "failed to transport.CreateConnection")
}
h.setupOutgoingConnection(conn, uuid)
return nil
}
// connectBridgeTopic creates a new outgoing connection
func (h *hub) connectBridgeTopic(topic string) error {
if h.transport == nil {
return ErrTransportNotConfigured
}
if h.transport.Type() != TransportTypeBridge {
return ErrNotBridgeTransport
}
h.log.Debug("connecting to topic", topic)
conn, err := h.transport.ConnectBridgeTopic(topic)
if err != nil {
return errors.Wrap(err, "failed to transport.CreateConnection")
}
h.addTopicConnection(conn, topic)
return nil
}
func (h *hub) setupOutgoingConnection(connection Connection, uuid string) {
handshake := &TransportHandshake{h.nodeUUID}
ack, err := connection.DoOutgoingHandshake(handshake)
if err != nil {
h.log.Error(errors.Wrap(err, "failed to connection.DoOutgoingHandshake"))
connection.Close()
return
}
if uuid == "" {
if ack.UUID == "" {
h.log.ErrorString("connection handshake returned empty UUID, terminating connection")
connection.Close()
return
}
uuid = ack.UUID
} else if ack.UUID != uuid {
h.log.ErrorString("connection handshake Ack did not match Discovery Ack, terminating connection")
connection.Close()
return
}
h.setupNewConnection(connection, uuid)
}
func (h *hub) handleIncomingConnection(connection Connection) {
ack := &TransportHandshakeAck{h.nodeUUID}
handshake, err := connection.DoIncomingHandshake(ack)
if err != nil {
h.log.Error(errors.Wrap(err, "failed to connection.DoIncomingHandshake"))
connection.Close()
return
}
if handshake.UUID == "" {
h.log.ErrorString("connection handshake returned empty UUID, terminating connection")
connection.Close()
return
}
h.setupNewConnection(connection, handshake.UUID)
}
func (h *hub) setupNewConnection(connection Connection, uuid string) {
// if an existing connection is found, check if it can be replaced and do so if possible
if existing, exists := h.findConnection(uuid); exists {
if !existing.CanReplace() {
connection.Close()
h.log.Debug("encountered duplicate connection, discarding")
} else {
existing.Close()
h.replaceConnection(connection, uuid)
}
} else {
h.addConnection(connection, uuid)
}
}
func (h *hub) outgoingMessageHandler() MsgFunc {
return func(msg Message) error {
// read-lock while dispatching all of the goroutines to prevent concurrent read/write
h.lock.RLock()
defer h.lock.RUnlock()
for u := range h.connections {
uuid := u
conn := h.connections[uuid]
go func() {
h.log.Debug("sending message", msg.UUID(), "to", uuid)
if err := conn.Send(msg); err != nil {
if errors.Is(err, ErrConnectionClosed) {
h.log.Debug("attempted to send on closed connection, will remove")
} else {
h.log.Warn("error sending to connection", uuid, ":", err.Error())
}
h.removeConnection(uuid)
}
}()
}
return nil
}
}
func (h *hub) incomingMessageHandler(uuid string) ReceiveFunc {
return func(msg Message) {
h.log.Debug("received message ", msg.UUID(), "from node", uuid)
h.pod.Send(msg)
}
}
func (h *hub) addConnection(connection Connection, uuid string) {
h.lock.Lock()
defer h.lock.Unlock()
h.log.Debug("adding connection for", uuid)
connection.Start(h.incomingMessageHandler(uuid))
h.connections[uuid] = connection
}
func (h *hub) addTopicConnection(connection TopicConnection, topic string) {
h.lock.Lock()
defer h.lock.Unlock()
h.log.Debug("adding bridge connection for", topic)
connection.Start(h.connectFunc())
h.topicConnections[topic] = connection
}
func (h *hub) replaceConnection(newConnection Connection, uuid string) {
h.lock.Lock()
defer h.lock.Unlock()
h.log.Debug("replacing connection for", uuid)
delete(h.connections, uuid)
newConnection.Start(h.incomingMessageHandler(uuid))
h.connections[uuid] = newConnection
}
func (h *hub) removeConnection(uuid string) {
h.lock.Lock()
defer h.lock.Unlock()
h.log.Debug("removing connection for", uuid)
delete(h.connections, uuid)
}
func (h *hub) findConnection(uuid string) (Connection, bool) {
h.lock.RLock()
defer h.lock.RUnlock()
conn, exists := h.connections[uuid]
return conn, exists
}

170
vendor/github.com/suborbital/grav/grav/message.go generated vendored Normal file
View File

@@ -0,0 +1,170 @@
package grav
import (
"encoding/json"
"io/ioutil"
"net/http"
"time"
"github.com/google/uuid"
)
// MsgTypeDefault and other represent message consts
const (
MsgTypeDefault string = "grav.default"
msgTypePodFeedback string = "grav.feedback"
)
// MsgFunc is a callback function that accepts a message and returns an error
type MsgFunc func(Message) error
// MsgChan is a channel that accepts a message
type MsgChan chan Message
// Message represents a message
type Message interface {
// Unique ID for this message
UUID() string
// ID of the parent event or request, such as HTTP request
ParentID() string
// The UUID of the message being replied to, if any
ReplyTo() string
// Allow setting a message UUID that this message is a response to
SetReplyTo(string)
// Type of message (application-specific)
Type() string
// Time the message was sent
Timestamp() time.Time
// Raw data of message
Data() []byte
// Unmarshal the message's data into a struct
UnmarshalData(interface{}) error
// Marshal the message itself to encoded bytes (JSON or otherwise)
Marshal() ([]byte, error)
// Unmarshal encoded Message into object
Unmarshal([]byte) error
}
// NewMsg creates a new Message with the built-in `_message` type
func NewMsg(msgType string, data []byte) Message {
return new(msgType, "", data)
}
// NewMsgWithParentID returns a new message with the provided parent ID
func NewMsgWithParentID(msgType, parentID string, data []byte) Message {
return new(msgType, parentID, data)
}
// NewMsgReplyTo creates a new message in response to a previous message
func NewMsgReplyTo(ticket MsgReceipt, msgType string, data []byte) Message {
m := new(msgType, "", data)
m.SetReplyTo(ticket.UUID)
return m
}
// MsgFromBytes returns a default _message that has been unmarshalled from bytes.
// Should only be used if the default _message type is being used.
func MsgFromBytes(bytes []byte) (Message, error) {
m := &_message{}
if err := m.Unmarshal(bytes); err != nil {
return nil, err
}
return m, nil
}
// MsgFromRequest extracts an encoded Message from an HTTP request
func MsgFromRequest(r *http.Request) (Message, error) {
defer r.Body.Close()
bytes, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, err
}
return MsgFromBytes(bytes)
}
func new(msgType, parentID string, data []byte) Message {
uuid := uuid.New()
m := &_message{
Meta: _meta{
UUID: uuid.String(),
ParentID: parentID,
ReplyTo: "",
MsgType: msgType,
Timestamp: time.Now(),
},
Payload: _payload{
Data: data,
},
}
return m
}
// _message is a basic built-in implementation of Message
// most applications should define their own data structure
// that implements the interface
type _message struct {
Meta _meta `json:"meta"`
Payload _payload `json:"payload"`
}
type _meta struct {
UUID string `json:"uuid"`
ParentID string `json:"parent_id"`
ReplyTo string `json:"response_to"`
MsgType string `json:"msg_type"`
Timestamp time.Time `json:"timestamp"`
}
type _payload struct {
Data []byte `json:"data"`
}
func (m *_message) UUID() string {
return m.Meta.UUID
}
func (m *_message) ParentID() string {
return m.Meta.ParentID
}
func (m *_message) ReplyTo() string {
return m.Meta.ReplyTo
}
func (m *_message) SetReplyTo(uuid string) {
m.Meta.ReplyTo = uuid
}
func (m *_message) Type() string {
return m.Meta.MsgType
}
func (m *_message) Timestamp() time.Time {
return m.Meta.Timestamp
}
func (m *_message) Data() []byte {
return m.Payload.Data
}
func (m *_message) UnmarshalData(target interface{}) error {
return json.Unmarshal(m.Payload.Data, target)
}
func (m *_message) Marshal() ([]byte, error) {
bytes, err := json.Marshal(m)
if err != nil {
return nil, err
}
return bytes, nil
}
func (m *_message) Unmarshal(bytes []byte) error {
return json.Unmarshal(bytes, m)
}

90
vendor/github.com/suborbital/grav/grav/msgbuffer.go generated vendored Normal file
View File

@@ -0,0 +1,90 @@
package grav
import (
"sync"
)
const (
defaultBufferSize = 128
)
// MsgBuffer is a buffer of messages with a particular size limit.
// Oldest messages are automatically evicted as new ones are added
// past said limit. Push() and Iter() are thread-safe.
type MsgBuffer struct {
msgs map[string]Message
order []string
limit int
startIndex int
lock sync.RWMutex
}
func NewMsgBuffer(limit int) *MsgBuffer {
m := &MsgBuffer{
msgs: map[string]Message{},
order: []string{},
limit: limit,
startIndex: 0,
lock: sync.RWMutex{},
}
return m
}
// Push pushes a new message onto the end of the buffer and evicts the oldest, if needed (based on limit)
func (m *MsgBuffer) Push(msg Message) {
m.lock.Lock()
defer m.lock.Unlock()
m.msgs[msg.UUID()] = msg
lastIndex := len(m.order) - 1
if len(m.order) == m.limit {
delete(m.msgs, m.order[m.startIndex]) // delete the current "first"
m.order[m.startIndex] = msg.UUID()
if m.startIndex == lastIndex {
m.startIndex = 0
} else {
m.startIndex++
}
} else {
m.order = append(m.order, msg.UUID())
}
}
// Iter calls msgFunc once per message in the buffer
func (m *MsgBuffer) Iter(msgFunc MsgFunc) {
m.lock.RLock()
defer m.lock.RUnlock()
if len(m.order) == 0 {
return
}
index := m.startIndex
lastIndex := len(m.order) - 1
more := true
for more {
uuid := m.order[index]
msg := m.msgs[uuid]
msgFunc(msg)
newIndex := index
if newIndex == lastIndex {
newIndex = 0
} else {
newIndex++
}
if newIndex == m.startIndex {
more = false
}
index = newIndex
}
}

72
vendor/github.com/suborbital/grav/grav/options.go generated vendored Normal file
View File

@@ -0,0 +1,72 @@
package grav
import "github.com/suborbital/vektor/vlog"
// Options represent Grav options
type Options struct {
Logger *vlog.Logger
Transport Transport
Discovery Discovery
Port string
URI string
}
// OptionsModifier is function that modifies an option
type OptionsModifier func(*Options)
func newOptionsWithModifiers(mods ...OptionsModifier) *Options {
opts := defaultOptions()
for _, m := range mods {
m(opts)
}
return opts
}
// UseLogger allows a custom logger to be used
func UseLogger(logger *vlog.Logger) OptionsModifier {
return func(o *Options) {
o.Logger = logger
}
}
// UseTransport sets the transport plugin to be used.
func UseTransport(transport Transport) OptionsModifier {
return func(o *Options) {
o.Transport = transport
}
}
// UseEndpoint sets the endpoint settings for the instance to broadcast for discovery
// Pass empty strings for either if you would like to keep the defaults (8080 and /meta/message)
func UseEndpoint(port, uri string) OptionsModifier {
return func(o *Options) {
if port != "" {
o.Port = port
}
if uri != "" {
o.URI = uri
}
}
}
// UseDiscovery sets the discovery plugin to be used
func UseDiscovery(discovery Discovery) OptionsModifier {
return func(o *Options) {
o.Discovery = discovery
}
}
func defaultOptions() *Options {
o := &Options{
Logger: vlog.Default(),
Port: "8080",
URI: "/meta/message",
Transport: nil,
Discovery: nil,
}
return o
}

277
vendor/github.com/suborbital/grav/grav/pod.go generated vendored Normal file
View File

@@ -0,0 +1,277 @@
package grav
import (
"errors"
"sync"
"sync/atomic"
)
const (
// defaultPodChanSize is the default size of the channels used for pod - bus communication
defaultPodChanSize = 128
)
// podFeedbackMsgReplay and others are the messages sent via feedback channel when the pod needs to communicate its state to the bus
var (
podFeedbackMsgReplay = NewMsg(msgTypePodFeedback, []byte{})
podFeedbackMsgSuccess = NewMsg(msgTypePodFeedback, []byte{})
podFeedbackMsgDisconnect = NewMsg(msgTypePodFeedback, []byte{})
)
/**
┌─────────────────────┐
│ │
──messageChan─────▶─────────────────────▶─────On────▶
┌────────┐ │ │ ┌───────────────┐
│ Bus │ │ Pod │ │ Pod Owner │
└────────┘ │ │ └───────────────┘
◀───BusChan------─◀─────────────────────◀────Send────
│ │
└─────────────────────┘
Created with Monodraw
**/
// Pod is a connection to Grav
// Pods are bi-directional. Messages can be sent to them from the bus, and they can be used to send messages
// to the bus. Pods are meant to be extremely lightweight with no persistence they are meant to quickly
// and immediately route a message between its owner and the Bus. The Bus is responsible for any "smarts".
// Messages coming from the bus are filtered using the pod's messageFilter, which is configurable by the caller.
type Pod struct {
onFunc MsgFunc // the onFunc is called whenever a message is recieved
onFuncLock sync.RWMutex
messageChan MsgChan // messageChan is used to recieve messages coming from the bus
feedbackChan MsgChan // feedbackChan is used to send "feedback" to the bus about the pod's status
busChan MsgChan // busChan is used to emit messages to the bus
*messageFilter // the embedded messageFilter controls which messages reach the onFunc
opts *podOpts
dead *atomic.Value
}
type podOpts struct {
WantsReplay bool
replayOnce sync.Once
}
// newPod creates a new Pod
func newPod(busChan MsgChan, opts *podOpts) *Pod {
p := &Pod{
onFuncLock: sync.RWMutex{},
messageChan: make(chan Message, defaultPodChanSize),
feedbackChan: make(chan Message, defaultPodChanSize),
busChan: busChan,
messageFilter: newMessageFilter(),
opts: opts,
dead: &atomic.Value{},
}
// do some "delayed setup"
p.opts.replayOnce = sync.Once{}
p.dead.Store(false)
p.start()
return p
}
// Send emits a message to be routed to the bus
// If the returned ticket is nil, it means the pod was unable to send
// It is safe to call methods on a nil ticket, they will error with ErrNoTicket
// This means error checking can be done on a chained call such as err := p.Send(msg).Wait(...)
func (p *Pod) Send(msg Message) *MsgReceipt {
// check to see if the pod has died (aka disconnected)
if p.dead.Load().(bool) == true {
return nil
}
p.FilterUUID(msg.UUID(), false) // don't allow the same message to bounce back through this pod
p.busChan <- msg
t := &MsgReceipt{
UUID: msg.UUID(),
pod: p,
}
return t
}
// ReplyTo sends a response to a message. The reply message's ticket is returned.
func (p *Pod) ReplyTo(inReplyTo Message, msg Message) *MsgReceipt {
msg.SetReplyTo(inReplyTo.UUID())
return p.Send(msg)
}
// On sets the function to be called whenever this pod recieves a message from the bus. If nil is passed, the pod will ignore all messages.
// Calling On multiple times causes the function to be overwritten. To recieve using two different functions, create two pods.
// Errors returned from the onFunc are interpreted as problems handling messages. Too many errors will result in the pod being disconnected.
// Failed messages will be replayed when messages begin to succeed. Returning an error is inadvisable unless there is a real problem handling messages.
func (p *Pod) On(onFunc MsgFunc) {
p.onFuncLock.Lock()
defer p.onFuncLock.Unlock()
p.setOnFunc(onFunc)
}
// OnType sets the function to be called whenever this pod recieves a message and sets the pod's filter to only receive certain message types.
// The same rules as `On` about error handling apply to OnType.
func (p *Pod) OnType(msgType string, onFunc MsgFunc) {
p.onFuncLock.Lock()
defer p.onFuncLock.Unlock()
p.setOnFunc(onFunc)
p.FilterType(msgType, true)
p.TypeInclusive = false // only allow the listed types
}
// Disconnect indicates to the bus that this pod is no longer needed and should be disconnected.
// Sending will immediately become unavailable, and the pod will soon stop recieving messages.
func (p *Pod) Disconnect() {
// stop future messages from being sent and then indicate to the bus that disconnection is desired
// The bus will close the busChan, which will cause the onFunc listener to quit.
p.dead.Store(true)
p.feedbackChan <- podFeedbackMsgDisconnect
}
// ErrMsgNotWanted is used by WaitOn to determine if the current message is what's being waited on
var ErrMsgNotWanted = errors.New("message not wanted")
// ErrWaitTimeout is returned if a timeout is exceeded
var ErrWaitTimeout = errors.New("waited past timeout")
// WaitOn takes a function to be called whenever this pod recieves a message and blocks until that function returns
// something other than ErrMsgNotWanted. WaitOn should be used if there is a need to wait for a particular message.
// When the onFunc returns something other than ErrMsgNotWanted (such as nil or a different error), WaitOn will return and set
// the onFunc to nil. If an error other than ErrMsgNotWanted is returned from the onFunc, it will be propogated to the caller.
// WaitOn will block forever if the desired message is never found. Use WaitUntil if a timeout is desired.
func (p *Pod) WaitOn(onFunc MsgFunc) error {
return p.WaitUntil(nil, onFunc)
}
// WaitUntil takes a function to be called whenever this pod recieves a message and blocks until that function returns
// something other than ErrMsgNotWanted. WaitOn should be used if there is a need to wait for a particular message.
// When the onFunc returns something other than ErrMsgNotWanted (such as nil or a different error), WaitUntil will return and set
// the onFunc to nil. If an error other than ErrMsgNotWanted is returned from the onFunc, it will be propogated to the caller.
// A timeout can be provided. If the timeout is non-nil and greater than 0, ErrWaitTimeout is returned if the time is exceeded.
func (p *Pod) WaitUntil(timeout TimeoutFunc, onFunc MsgFunc) error {
p.onFuncLock.Lock()
errChan := make(chan error)
p.setOnFunc(func(msg Message) error {
if err := onFunc(msg); err != nil {
if err == ErrMsgNotWanted {
return nil // don't do anything
}
errChan <- err
} else {
errChan <- nil
}
return nil
})
p.onFuncLock.Unlock() // can't stay locked here or the onFunc will never be called
var onFuncErr error
if timeout == nil {
timeout = Timeout(-1)
}
select {
case err := <-errChan:
onFuncErr = err
case <-timeout():
onFuncErr = ErrWaitTimeout
}
p.onFuncLock.Lock()
defer p.onFuncLock.Unlock()
p.setOnFunc(nil)
return onFuncErr
}
// waitOnReply waits on a reply message to arrive at the pod and then calls onFunc with that message.
// If the onFunc produces an error, it will be propogated to the caller.
// If a non-nil timeout greater than 0 is passed, the function will return ErrWaitTimeout if the timeout elapses.
func (p *Pod) waitOnReply(ticket *MsgReceipt, timeout TimeoutFunc, onFunc MsgFunc) error {
var reply Message
if err := p.WaitUntil(timeout, func(msg Message) error {
if msg.ReplyTo() != ticket.UUID {
return ErrMsgNotWanted
}
reply = msg
return nil
}); err != nil {
return err
}
return onFunc(reply)
}
// setOnFunc sets the OnFunc. THIS DOES NOT LOCK. THE CALLER MUST LOCK.
func (p *Pod) setOnFunc(on MsgFunc) {
// reset the message filter when the onFunc is changed
p.messageFilter = newMessageFilter()
p.onFunc = on
// request replay from the bus if needed
if on != nil {
p.opts.replayOnce.Do(func() {
if p.opts.WantsReplay {
p.feedbackChan <- podFeedbackMsgReplay
}
})
}
}
// busChans returns the messageChan and feedbackChan to be used by the bus
func (p *Pod) busChans() (MsgChan, MsgChan) {
return p.messageChan, p.feedbackChan
}
func (p *Pod) start() {
go func() {
// this loop ends when the bus closes the messageChan
for {
msg, ok := <-p.messageChan
if !ok {
break
}
go func() {
p.onFuncLock.RLock() // in case the onFunc gets replaced
defer p.onFuncLock.RUnlock()
if p.onFunc == nil {
return
}
if p.allow(msg) {
if err := p.onFunc(msg); err != nil {
// if the onFunc failed, send it back to the bus to be re-sent later
p.feedbackChan <- msg
} else {
// if it was successful, a success message on the channel lets the conn know all is well
p.feedbackChan <- podFeedbackMsgSuccess
}
}
}()
}
// if we've gotten this far, it means the pod has been killed and should not be allowed to send
p.dead.Store(true)
}()
}

260
vendor/github.com/suborbital/grav/grav/pool.go generated vendored Normal file
View File

@@ -0,0 +1,260 @@
package grav
import (
"errors"
"sync"
"sync/atomic"
)
const (
highWaterMark = 64
)
var (
errFailedMessage = errors.New("pod reports failed message")
errFailedMessageMax = errors.New("pod reports max number of failed messages, will terminate connection")
)
// connectionPool is a ring of connections to pods
// which will be iterated over constantly in order to send
// incoming messages to them
type connectionPool struct {
current *podConnection
maxID int64
lock sync.Mutex
}
func newConnectionPool() *connectionPool {
p := &connectionPool{
current: nil,
maxID: 0,
lock: sync.Mutex{},
}
return p
}
// insert inserts a new connection into the ring
func (c *connectionPool) insert(pod *Pod) {
c.lock.Lock()
defer c.lock.Unlock()
c.maxID++
id := c.maxID
conn := newPodConnection(id, pod)
// if there's nothing in the ring, create a "ring of one"
if c.current == nil {
conn.next = conn
c.current = conn
} else {
c.current.insertAfter(conn)
}
}
// peek returns a peek at the next connection in the ring wihout advancing the ring's current location
func (c *connectionPool) peek() *podConnection {
c.lock.Lock()
defer c.lock.Unlock()
return c.current.next
}
// next returns the next connection in the ring
func (c *connectionPool) next() *podConnection {
c.lock.Lock()
defer c.lock.Unlock()
c.current = c.current.next
return c.current
}
// prepareNext ensures that the next pod connection in the ring is ready to recieve
// new messages by checking its status, deleting it if unhealthy or disconnected, replaying the message
// buffer if needed, or flushing failed messages back onto its channel if needeed.
func (c *connectionPool) prepareNext(buffer *MsgBuffer) error {
// peek gives us the next conn without advancing the ring
// this makes it easy to delete the next conn if it's unhealthy
next := c.peek()
// check the state of the next connection
status := next.checkStatus()
if status.Error != nil {
// if the connection has an issue, handle it
if status.Error == errFailedMessageMax {
c.deleteNext()
return errors.New("removing next podConnection")
}
} else if status.WantsDisconnect {
// if the pod has requested disconnection, grant its wish
c.deleteNext()
return errors.New("next pod requested disconnection, removing podConnection")
} else if status.WantsReplay {
// if the pod has indicated that it wants a replay of recent messages, do so
c.replayNext(buffer)
}
if status.HadSuccess {
// if the most recent status check indicates there was a success,
// then tell the connection to flush any failed messages
// this is a no-op if there are no failed messages queued
next.flushFailed()
}
return nil
}
// replayNext replays the current message buffer into the next connection
func (c *connectionPool) replayNext(buffer *MsgBuffer) {
next := c.peek()
// iterate over the buffer and send each message to the pod
buffer.Iter(func(msg Message) error {
next.send(msg)
return nil
})
}
// deleteNext deletes the next connection in the ring
// this is useful after having checkError'd the next conn
// and seeing that it's unhealthy
func (c *connectionPool) deleteNext() {
c.lock.Lock()
defer c.lock.Unlock()
next := c.current.next
// indicate the conn is dead so future attempts to send are abandonded
next.dead.Store(true)
// close the messageChan so the pod can know it's been cut off
close(next.messageChan)
if next == c.current {
// if there's only one thing in the ring, empty the ring
c.current = nil
} else {
// cut out `next` and link `current` to `next-next`
c.current.next = next.next
}
}
// podConnection is a connection to a pod via its messageChan
// podConnection is also a circular linked list/ring of connections
// that is meant to be iterated around and inserted into/removed from
// forever as the bus sends events to the registered pods
type podConnection struct {
ID int64
next *podConnection
messageChan MsgChan
feedbackChan MsgChan
failed []Message
dead *atomic.Value
}
// connStatus is used to communicate the status of a podConnection back to the bus
type connStatus struct {
HadSuccess bool
WantsReplay bool
WantsDisconnect bool
Error error
}
func newPodConnection(id int64, pod *Pod) *podConnection {
msgChan, feedbackChan := pod.busChans()
p := &podConnection{
ID: id,
messageChan: msgChan,
feedbackChan: feedbackChan,
failed: []Message{},
dead: &atomic.Value{},
next: nil,
}
p.dead.Store(false)
return p
}
// send asynchronously writes a message to a connection's messageChan
// ordering to the messageChan if it becomes full is not guaranteed, this
// is sacrificed to ensure that the bus does not block because of a delinquient pod
func (p *podConnection) send(msg Message) {
go func() {
// if the conn is dead, abandon the attempt
if p.dead.Load().(bool) == true {
return
}
p.messageChan <- msg
}()
}
// checkStatus checks the pod's feedback for any information or failed messages and drains the failures into the failed Message buffer
func (p *podConnection) checkStatus() *connStatus {
status := &connStatus{
HadSuccess: false,
WantsReplay: false,
WantsDisconnect: false,
Error: nil,
}
done := false
for !done {
select {
case feedbackMsg := <-p.feedbackChan:
if feedbackMsg == podFeedbackMsgSuccess {
status.HadSuccess = true
} else if feedbackMsg == podFeedbackMsgReplay {
status.WantsReplay = true
} else if feedbackMsg == podFeedbackMsgDisconnect {
status.WantsDisconnect = true
} else {
p.failed = append(p.failed, feedbackMsg)
status.Error = errFailedMessage
}
default:
done = true
}
}
if len(p.failed) >= highWaterMark {
status.Error = errFailedMessageMax
}
return status
}
// flushFailed takes all of the failed messages in the failed queue
// and pushes them back out onto the pod's channel
func (p *podConnection) flushFailed() {
for i := range p.failed {
failedMsg := p.failed[i]
p.send(failedMsg)
}
if len(p.failed) > 0 {
p.failed = []Message{}
}
}
// insertAfter inserts a new connection into the ring
func (p *podConnection) insertAfter(conn *podConnection) {
next := p
if p.next != nil {
next = p.next
}
p.next = conn
conn.next = next
}

44
vendor/github.com/suborbital/grav/grav/receipt.go generated vendored Normal file
View File

@@ -0,0 +1,44 @@
package grav
import "github.com/pkg/errors"
// ErrNoReceipt is returned when a method is called on a nil ticket
var ErrNoReceipt = errors.New("message receipt is nil")
// MsgReceipt represents a "ticket" that references a message that was sent with the hopes of getting a response
// The embedded pod is a pointer to the pod that sent the original message, and therefore any ticket methods used
// will replace the OnFunc of the pod.
type MsgReceipt struct {
UUID string
pod *Pod
}
// WaitOn will block until a response to the message is recieved and passes it to the provided onFunc.
// onFunc errors are propogated to the caller.
func (m *MsgReceipt) WaitOn(onFunc MsgFunc) error {
return m.WaitUntil(nil, onFunc)
}
// WaitUntil will block until a response to the message is recieved and passes it to the provided onFunc.
// ErrWaitTimeout is returned if the timeout elapses, onFunc errors are propogated to the caller.
func (m *MsgReceipt) WaitUntil(timeout TimeoutFunc, onFunc MsgFunc) error {
if m == nil {
return ErrNoReceipt
}
return m.pod.waitOnReply(m, timeout, onFunc)
}
// OnReply will set the pod's OnFunc to the provided MsgFunc and set it to run asynchronously when a reply is received
// onFunc errors are discarded.
func (m *MsgReceipt) OnReply(mfn MsgFunc) error {
if m == nil {
return ErrNoReceipt
}
go func() {
m.pod.waitOnReply(m, nil, mfn)
}()
return nil
}

28
vendor/github.com/suborbital/grav/grav/timeout.go generated vendored Normal file
View File

@@ -0,0 +1,28 @@
package grav
import "time"
// TimeoutFunc is a function that takes a value (a number of seconds) and returns a channel that fires after that given amount of time
type TimeoutFunc func() chan time.Time
// Timeout returns a function that returns a channel that fires after the provided number of seconds have elapsed
// if the value passed is less than or equal to 0, the timeout will never fire
func Timeout(seconds int) TimeoutFunc {
return func() chan time.Time {
tChan := make(chan time.Time)
if seconds > 0 {
go func() {
duration := time.Second * time.Duration(seconds)
tChan <- <-time.After(duration)
}()
}
return tChan
}
}
// TO is a shorthand for Timeout
func TO(seconds int) TimeoutFunc {
return Timeout(seconds)
}

92
vendor/github.com/suborbital/grav/grav/transport.go generated vendored Normal file
View File

@@ -0,0 +1,92 @@
package grav
import (
"github.com/pkg/errors"
"github.com/suborbital/vektor/vlog"
)
// TransportMsgTypeHandshake and others represent internal Transport message types used for handshakes and metadata transfer
const (
TransportMsgTypeHandshake = 1
TransportMsgTypeUser = 2
)
// ErrConnectionClosed and others are transport and connection related errors
var (
ErrConnectionClosed = errors.New("connection was closed")
ErrNodeUUIDMismatch = errors.New("handshake UUID did not match node UUID")
ErrNotBridgeTransport = errors.New("transport is not a bridge")
ErrBridgeOnlyTransport = errors.New("transport only supports bridge connection")
)
var (
TransportTypeMesh = TransportType("transport.mesh")
TransportTypeBridge = TransportType("transport.bridge")
)
type (
// ReceiveFunc is a function that allows passing along a received message
ReceiveFunc func(msg Message)
// ConnectFunc is a function that provides a new Connection
ConnectFunc func(Connection)
// FindFunc allows a Transport to query Grav for an active connection for the given UUID
FindFunc func(uuid string) (Connection, bool)
// TransportType defines the type of Transport (mesh or bridge)
TransportType string
)
// TransportOpts is a set of options for transports
type TransportOpts struct {
NodeUUID string
Port string
URI string
Logger *vlog.Logger
Custom interface{}
}
// Transport represents a Grav transport plugin
type Transport interface {
// Type returns the transport's type (mesh or bridge)
Type() TransportType
// Setup is a transport-specific function that allows bootstrapping
// Setup can block forever if needed; for example if a webserver is bring run
Setup(opts *TransportOpts, connFunc ConnectFunc, findFunc FindFunc) error
// CreateConnection connects to an endpoint and returns the Connection
CreateConnection(endpoint string) (Connection, error)
// ConnectBridgeTopic connects to a topic and returns a TopicConnection
ConnectBridgeTopic(topic string) (TopicConnection, error)
}
// Connection represents a connection to another node
type Connection interface {
// Called when the connection handshake is complete and the connection can actively start exchanging messages
Start(recvFunc ReceiveFunc)
// Send a message from the local instance to the connected node
Send(msg Message) error
// CanReplace returns true if the connection can be replaced (i.e. is not a persistent connection like a websocket)
CanReplace() bool
// Initiate a handshake for an outgoing connection and return the remote Ack
DoOutgoingHandshake(handshake *TransportHandshake) (*TransportHandshakeAck, error)
// Wait for an incoming handshake and return the provided Ack to the remote connection
DoIncomingHandshake(handshakeAck *TransportHandshakeAck) (*TransportHandshake, error)
// Close requests that the Connection close itself
Close()
}
// TopicConnection is a connection to something via a bridge such as a topic
type TopicConnection interface {
// Called when the connection can actively start exchanging messages
Start(pod *Pod)
// Close requests that the Connection close itself
Close()
}
// TransportHandshake represents a handshake sent to a node that you're trying to connect to
type TransportHandshake struct {
UUID string `json:"uuid"`
}
// TransportHandshakeAck represents a handshake response
type TransportHandshakeAck struct {
UUID string `json:"uuid"`
}

View File

@@ -0,0 +1,5 @@
# Grav Transport: Websocket
This is a streaming transport plugin for Grav that uses standard websockets.
Handler functions are made available for http.Server. Connections are managed by the `Transport` object.

View File

@@ -0,0 +1,261 @@
package websocket
import (
"encoding/json"
"fmt"
"net/http"
"net/url"
"strings"
"sync"
"github.com/gorilla/websocket"
"github.com/pkg/errors"
"github.com/suborbital/grav/grav"
"github.com/suborbital/vektor/vlog"
)
var upgrader = websocket.Upgrader{}
// Transport is a transport that connects Grav nodes via standard websockets
type Transport struct {
opts *grav.TransportOpts
log *vlog.Logger
connectionFunc func(grav.Connection)
}
// Conn implements transport.Connection and represents a websocket connection
type Conn struct {
nodeUUID string
log *vlog.Logger
conn *websocket.Conn
cLock sync.Mutex
recvFunc grav.ReceiveFunc
}
// New creates a new websocket transport
func New() *Transport {
t := &Transport{}
return t
}
// Type returns the transport's type
func (t *Transport) Type() grav.TransportType {
return grav.TransportTypeMesh
}
// Setup sets up the transport
func (t *Transport) Setup(opts *grav.TransportOpts, connFunc grav.ConnectFunc, findFunc grav.FindFunc) error {
// independent serving is not yet implemented, use the HTTP handler
t.opts = opts
t.log = opts.Logger
t.connectionFunc = connFunc
return nil
}
// CreateConnection adds a websocket endpoint to emit messages to
func (t *Transport) CreateConnection(endpoint string) (grav.Connection, error) {
if !strings.HasPrefix(endpoint, "ws") {
endpoint = fmt.Sprintf("ws://%s", endpoint)
}
endpointURL, err := url.Parse(endpoint)
if err != nil {
return nil, err
}
c, _, err := websocket.DefaultDialer.Dial(endpointURL.String(), nil)
if err != nil {
return nil, errors.Wrapf(err, "[transport-websocket] failed to Dial endpoint")
}
conn := &Conn{
log: t.log,
conn: c,
cLock: sync.Mutex{},
}
return conn, nil
}
// ConnectBridgeTopic connects to a topic if the transport is a bridge
func (t *Transport) ConnectBridgeTopic(topic string) (grav.TopicConnection, error) {
return nil, grav.ErrNotBridgeTransport
}
// HTTPHandlerFunc returns an http.HandlerFunc for incoming connections
func (t *Transport) HTTPHandlerFunc() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if t.connectionFunc == nil {
t.log.ErrorString("[transport-websocket] incoming connection received, but no connFunc configured")
w.WriteHeader(http.StatusInternalServerError)
return
}
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
t.log.Error(errors.Wrap(err, "[transport-websocket] failed to upgrade connection"))
return
}
t.log.Debug("[transport-websocket] upgraded connection:", r.URL.String())
conn := &Conn{
conn: c,
log: t.log,
}
t.connectionFunc(conn)
}
}
// Start begins the receiving of messages
func (c *Conn) Start(recvFunc grav.ReceiveFunc) {
c.recvFunc = recvFunc
c.conn.SetCloseHandler(func(code int, text string) error {
c.log.Warn(fmt.Sprintf("[transport-websocket] connection closing with code: %d", code))
return nil
})
go func() {
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
c.log.Error(errors.Wrap(err, "[transport-websocket] failed to ReadMessage, terminating connection"))
break
}
c.log.Debug("[transport-websocket] recieved message via", c.nodeUUID)
msg, err := grav.MsgFromBytes(message)
if err != nil {
c.log.Error(errors.Wrap(err, "[transport-websocket] failed to MsgFromBytes"))
continue
}
// send to the Grav instance
c.recvFunc(msg)
}
}()
}
// Send sends a message to the connection
func (c *Conn) Send(msg grav.Message) error {
msgBytes, err := msg.Marshal()
if err != nil {
// not exactly sure what to do here (we don't want this going into the dead letter queue)
c.log.Error(errors.Wrap(err, "[transport-websocket] failed to Marshal message"))
return nil
}
c.log.Debug("[transport-websocket] sending message to connection", c.nodeUUID)
if err := c.WriteMessage(grav.TransportMsgTypeUser, msgBytes); err != nil {
if errors.Is(err, websocket.ErrCloseSent) {
return grav.ErrConnectionClosed
}
return errors.Wrap(err, "[transport-websocket] failed to WriteMessage")
}
c.log.Debug("[transport-websocket] sent message to connection", c.nodeUUID)
return nil
}
// CanReplace returns true if the connection can be replaced
func (c *Conn) CanReplace() bool {
return false
}
// DoOutgoingHandshake performs a connection handshake and returns the UUID of the node that we're connected to
// so that it can be validated against the UUID that was provided in discovery (or if none was provided)
func (c *Conn) DoOutgoingHandshake(handshake *grav.TransportHandshake) (*grav.TransportHandshakeAck, error) {
handshakeJSON, err := json.Marshal(handshake)
if err != nil {
return nil, errors.Wrap(err, "failed to Marshal handshake JSON")
}
c.log.Debug("[transport-websocket] sending handshake")
if err := c.WriteMessage(grav.TransportMsgTypeHandshake, handshakeJSON); err != nil {
return nil, errors.Wrap(err, "failed to WriteMessage handshake")
}
mt, message, err := c.conn.ReadMessage()
if err != nil {
return nil, errors.Wrap(err, "failed to ReadMessage for handshake ack, terminating connection")
}
if mt != grav.TransportMsgTypeHandshake {
return nil, errors.New("first message recieved was not handshake ack")
}
c.log.Debug("[transport-websocket] recieved handshake ack")
ack := grav.TransportHandshakeAck{}
if err := json.Unmarshal(message, &ack); err != nil {
return nil, errors.Wrap(err, "failed to Unmarshal handshake ack")
}
c.nodeUUID = ack.UUID
return &ack, nil
}
// DoIncomingHandshake performs a connection handshake and returns the UUID of the node that we're connected to
// so that it can be validated against the UUID that was provided in discovery (or if none was provided)
func (c *Conn) DoIncomingHandshake(handshakeAck *grav.TransportHandshakeAck) (*grav.TransportHandshake, error) {
mt, message, err := c.conn.ReadMessage()
if err != nil {
return nil, errors.Wrap(err, "failed to ReadMessage for handshake, terminating connection")
}
if mt != grav.TransportMsgTypeHandshake {
return nil, errors.New("first message recieved was not handshake")
}
c.log.Debug("[transport-websocket] recieved handshake")
handshake := grav.TransportHandshake{}
if err := json.Unmarshal(message, &handshake); err != nil {
return nil, errors.Wrap(err, "failed to Unmarshal handshake")
}
ackJSON, err := json.Marshal(handshakeAck)
if err != nil {
return nil, errors.Wrap(err, "failed to Marshal handshake JSON")
}
c.log.Debug("[transport-websocket] sending handshake ack")
if err := c.WriteMessage(grav.TransportMsgTypeHandshake, ackJSON); err != nil {
return nil, errors.Wrap(err, "failed to WriteMessage handshake ack")
}
c.log.Debug("[transport-websocket] sent handshake ack")
c.nodeUUID = handshake.UUID
return &handshake, nil
}
// Close closes the underlying connection
func (c *Conn) Close() {
c.log.Debug("[transport-websocket] connection for", c.nodeUUID, "is closing")
c.conn.Close()
}
// WriteMessage is a concurrent-safe wrapper around the websocket WriteMessage
func (c *Conn) WriteMessage(messageType int, data []byte) error {
c.cLock.Lock()
defer c.cLock.Unlock()
return c.conn.WriteMessage(messageType, data)
}