From a8e7ffc7ab147ebced766da8e0e1ebb1d75f846a Mon Sep 17 00:00:00 2001 From: devonh Date: Fri, 18 Nov 2022 00:29:23 +0000 Subject: [PATCH] Add p2p wakeup broadcast handling to pinecone demos (#2841) Adds wakeup broadcast handling to the pinecone demos. This will reset their blacklist status and interrupt any ongoing federation queue backoffs currently in progress for this peer. The end result is that any queued events will quickly be sent to the peer if they had disconnected while attempting to send events to them. --- build/gobind-pinecone/monolith.go | 35 +++++++++++++++++++++++++ cmd/dendrite-demo-pinecone/main.go | 35 +++++++++++++++++++++++++ federationapi/api/api.go | 13 +++++++++ federationapi/internal/perform.go | 16 ++++++++++- federationapi/inthttp/client.go | 13 +++++++++ federationapi/inthttp/server.go | 5 ++++ federationapi/queue/destinationqueue.go | 31 ++++++++++++++++++---- federationapi/queue/queue.go | 16 ++++++++--- go.mod | 4 ++- go.sum | 16 +++++++++-- 10 files changed, 172 insertions(+), 12 deletions(-) diff --git a/build/gobind-pinecone/monolith.go b/build/gobind-pinecone/monolith.go index adb4e40a6..9100ebf0f 100644 --- a/build/gobind-pinecone/monolith.go +++ b/build/gobind-pinecone/monolith.go @@ -40,6 +40,7 @@ import ( "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/users" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" "github.com/matrix-org/dendrite/federationapi" + "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/keyserver" "github.com/matrix-org/dendrite/roomserver" @@ -58,6 +59,7 @@ import ( pineconeConnections "github.com/matrix-org/pinecone/connections" pineconeMulticast "github.com/matrix-org/pinecone/multicast" pineconeRouter "github.com/matrix-org/pinecone/router" + pineconeEvents "github.com/matrix-org/pinecone/router/events" pineconeSessions "github.com/matrix-org/pinecone/sessions" "github.com/matrix-org/pinecone/types" @@ -295,7 +297,12 @@ func (m *DendriteMonolith) Start() { m.logger.SetOutput(BindLogger{}) logrus.SetOutput(BindLogger{}) + pineconeEventChannel := make(chan pineconeEvents.Event) m.PineconeRouter = pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk) + m.PineconeRouter.EnableHopLimiting() + m.PineconeRouter.EnableWakeupBroadcasts() + m.PineconeRouter.Subscribe(pineconeEventChannel) + m.PineconeQUIC = pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), m.PineconeRouter, []string{"matrix"}) m.PineconeMulticast = pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), m.PineconeRouter) m.PineconeManager = pineconeConnections.NewConnectionManager(m.PineconeRouter, nil) @@ -423,6 +430,34 @@ func (m *DendriteMonolith) Start() { m.logger.Fatal(err) } }() + + go func(ch <-chan pineconeEvents.Event) { + eLog := logrus.WithField("pinecone", "events") + + for event := range ch { + switch e := event.(type) { + case pineconeEvents.PeerAdded: + case pineconeEvents.PeerRemoved: + case pineconeEvents.TreeParentUpdate: + case pineconeEvents.SnakeDescUpdate: + case pineconeEvents.TreeRootAnnUpdate: + case pineconeEvents.SnakeEntryAdded: + case pineconeEvents.SnakeEntryRemoved: + case pineconeEvents.BroadcastReceived: + eLog.Info("Broadcast received from: ", e.PeerID) + + req := &api.PerformWakeupServersRequest{ + ServerNames: []gomatrixserverlib.ServerName{gomatrixserverlib.ServerName(e.PeerID)}, + } + res := &api.PerformWakeupServersResponse{} + if err := fsAPI.PerformWakeupServers(base.Context(), req, res); err != nil { + logrus.WithError(err).Error("Failed to wakeup destination", e.PeerID) + } + case pineconeEvents.BandwidthReport: + default: + } + } + }(pineconeEventChannel) } func (m *DendriteMonolith) Stop() { diff --git a/cmd/dendrite-demo-pinecone/main.go b/cmd/dendrite-demo-pinecone/main.go index 6c719a1ee..421b17d56 100644 --- a/cmd/dendrite-demo-pinecone/main.go +++ b/cmd/dendrite-demo-pinecone/main.go @@ -37,6 +37,7 @@ import ( "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/users" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" "github.com/matrix-org/dendrite/federationapi" + "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/keyserver" @@ -51,6 +52,7 @@ import ( pineconeConnections "github.com/matrix-org/pinecone/connections" pineconeMulticast "github.com/matrix-org/pinecone/multicast" pineconeRouter "github.com/matrix-org/pinecone/router" + pineconeEvents "github.com/matrix-org/pinecone/router/events" pineconeSessions "github.com/matrix-org/pinecone/sessions" "github.com/sirupsen/logrus" @@ -155,7 +157,12 @@ func main() { base := base.NewBaseDendrite(cfg, "Monolith") defer base.Close() // nolint: errcheck + pineconeEventChannel := make(chan pineconeEvents.Event) pRouter := pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk) + pRouter.EnableHopLimiting() + pRouter.EnableWakeupBroadcasts() + pRouter.Subscribe(pineconeEventChannel) + pQUIC := pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), pRouter, []string{"matrix"}) pMulticast := pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), pRouter) pManager := pineconeConnections.NewConnectionManager(pRouter, nil) @@ -293,5 +300,33 @@ func main() { logrus.Fatal(http.ListenAndServe(httpBindAddr, httpRouter)) }() + go func(ch <-chan pineconeEvents.Event) { + eLog := logrus.WithField("pinecone", "events") + + for event := range ch { + switch e := event.(type) { + case pineconeEvents.PeerAdded: + case pineconeEvents.PeerRemoved: + case pineconeEvents.TreeParentUpdate: + case pineconeEvents.SnakeDescUpdate: + case pineconeEvents.TreeRootAnnUpdate: + case pineconeEvents.SnakeEntryAdded: + case pineconeEvents.SnakeEntryRemoved: + case pineconeEvents.BroadcastReceived: + eLog.Info("Broadcast received from: ", e.PeerID) + + req := &api.PerformWakeupServersRequest{ + ServerNames: []gomatrixserverlib.ServerName{gomatrixserverlib.ServerName(e.PeerID)}, + } + res := &api.PerformWakeupServersResponse{} + if err := fsAPI.PerformWakeupServers(base.Context(), req, res); err != nil { + logrus.WithError(err).Error("Failed to wakeup destination", e.PeerID) + } + case pineconeEvents.BandwidthReport: + default: + } + } + }(pineconeEventChannel) + base.WaitForShutdown() } diff --git a/federationapi/api/api.go b/federationapi/api/api.go index f4be53b9a..50d0339e4 100644 --- a/federationapi/api/api.go +++ b/federationapi/api/api.go @@ -30,6 +30,12 @@ type FederationInternalAPI interface { request *PerformBroadcastEDURequest, response *PerformBroadcastEDUResponse, ) error + + PerformWakeupServers( + ctx context.Context, + request *PerformWakeupServersRequest, + response *PerformWakeupServersResponse, + ) error } type ClientFederationAPI interface { @@ -214,6 +220,13 @@ type PerformBroadcastEDURequest struct { type PerformBroadcastEDUResponse struct { } +type PerformWakeupServersRequest struct { + ServerNames []gomatrixserverlib.ServerName `json:"server_names"` +} + +type PerformWakeupServersResponse struct { +} + type InputPublicKeysRequest struct { Keys map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult `json:"keys"` } diff --git a/federationapi/internal/perform.go b/federationapi/internal/perform.go index 603e2a9c9..d86d07e03 100644 --- a/federationapi/internal/perform.go +++ b/federationapi/internal/perform.go @@ -670,9 +670,23 @@ func (r *FederationInternalAPI) PerformBroadcastEDU( return nil } +// PerformWakeupServers implements api.FederationInternalAPI +func (r *FederationInternalAPI) PerformWakeupServers( + ctx context.Context, + request *api.PerformWakeupServersRequest, + response *api.PerformWakeupServersResponse, +) (err error) { + r.MarkServersAlive(request.ServerNames) + return nil +} + func (r *FederationInternalAPI) MarkServersAlive(destinations []gomatrixserverlib.ServerName) { for _, srv := range destinations { - _ = r.db.RemoveServerFromBlacklist(srv) + // Check the statistics cache for the blacklist status to prevent hitting + // the database unnecessarily. + if r.queues.IsServerBlacklisted(srv) { + _ = r.db.RemoveServerFromBlacklist(srv) + } r.queues.RetryServer(srv) } } diff --git a/federationapi/inthttp/client.go b/federationapi/inthttp/client.go index 6c37a1f5a..6eefdc7cd 100644 --- a/federationapi/inthttp/client.go +++ b/federationapi/inthttp/client.go @@ -23,6 +23,7 @@ const ( FederationAPIPerformInviteRequestPath = "/federationapi/performInviteRequest" FederationAPIPerformOutboundPeekRequestPath = "/federationapi/performOutboundPeekRequest" FederationAPIPerformBroadcastEDUPath = "/federationapi/performBroadcastEDU" + FederationAPIPerformWakeupServers = "/federationapi/performWakeupServers" FederationAPIGetUserDevicesPath = "/federationapi/client/getUserDevices" FederationAPIClaimKeysPath = "/federationapi/client/claimKeys" @@ -150,6 +151,18 @@ func (h *httpFederationInternalAPI) PerformBroadcastEDU( ) } +// Handle an instruction to remove the respective servers from being blacklisted. +func (h *httpFederationInternalAPI) PerformWakeupServers( + ctx context.Context, + request *api.PerformWakeupServersRequest, + response *api.PerformWakeupServersResponse, +) error { + return httputil.CallInternalRPCAPI( + "PerformWakeupServers", h.federationAPIURL+FederationAPIPerformWakeupServers, + h.httpClient, ctx, request, response, + ) +} + type getUserDevices struct { S gomatrixserverlib.ServerName Origin gomatrixserverlib.ServerName diff --git a/federationapi/inthttp/server.go b/federationapi/inthttp/server.go index 7b3edb2a7..21a070392 100644 --- a/federationapi/inthttp/server.go +++ b/federationapi/inthttp/server.go @@ -43,6 +43,11 @@ func AddRoutes(intAPI api.FederationInternalAPI, internalAPIMux *mux.Router) { httputil.MakeInternalRPCAPI("FederationAPIPerformBroadcastEDU", intAPI.PerformBroadcastEDU), ) + internalAPIMux.Handle( + FederationAPIPerformWakeupServers, + httputil.MakeInternalRPCAPI("FederationAPIPerformWakeupServers", intAPI.PerformWakeupServers), + ) + internalAPIMux.Handle( FederationAPIPerformJoinRequestPath, httputil.MakeInternalRPCAPI( diff --git a/federationapi/queue/destinationqueue.go b/federationapi/queue/destinationqueue.go index 63fc59c89..a4a87fe99 100644 --- a/federationapi/queue/destinationqueue.go +++ b/federationapi/queue/destinationqueue.go @@ -141,23 +141,44 @@ func (oq *destinationQueue) handleBackoffNotifier() { } } +// wakeQueueIfEventsPending calls wakeQueueAndNotify only if there are +// pending events or if forceWakeup is true. This prevents starting the +// queue unnecessarily. +func (oq *destinationQueue) wakeQueueIfEventsPending(forceWakeup bool) { + eventsPending := func() bool { + oq.pendingMutex.Lock() + defer oq.pendingMutex.Unlock() + return len(oq.pendingPDUs) > 0 || len(oq.pendingEDUs) > 0 + } + + // NOTE : Only wakeup and notify the queue if there are pending events + // or if forceWakeup is true. Otherwise there is no reason to start the + // queue goroutine and waste resources. + if forceWakeup || eventsPending() { + logrus.Info("Starting queue due to pending events or forceWakeup") + oq.wakeQueueAndNotify() + } +} + // wakeQueueAndNotify ensures the destination queue is running and notifies it // that there is pending work. func (oq *destinationQueue) wakeQueueAndNotify() { - // Wake up the queue if it's asleep. - oq.wakeQueueIfNeeded() + // NOTE : Send notification before waking queue to prevent a race + // where the queue was running and stops due to a timeout in between + // checking it and sending the notification. // Notify the queue that there are events ready to send. select { case oq.notify <- struct{}{}: default: } + + // Wake up the queue if it's asleep. + oq.wakeQueueIfNeeded() } // wakeQueueIfNeeded will wake up the destination queue if it is -// not already running. If it is running but it is backing off -// then we will interrupt the backoff, causing any federation -// requests to retry. +// not already running. func (oq *destinationQueue) wakeQueueIfNeeded() { // Clear the backingOff flag and update the backoff metrics if it was set. if oq.backingOff.CompareAndSwap(true, false) { diff --git a/federationapi/queue/queue.go b/federationapi/queue/queue.go index 31124e0c1..75b1b36be 100644 --- a/federationapi/queue/queue.go +++ b/federationapi/queue/queue.go @@ -374,14 +374,24 @@ func (oqs *OutgoingQueues) SendEDU( return nil } +// IsServerBlacklisted returns whether or not the provided server is currently +// blacklisted. +func (oqs *OutgoingQueues) IsServerBlacklisted(srv gomatrixserverlib.ServerName) bool { + return oqs.statistics.ForServer(srv).Blacklisted() +} + // RetryServer attempts to resend events to the given server if we had given up. func (oqs *OutgoingQueues) RetryServer(srv gomatrixserverlib.ServerName) { if oqs.disabled { return } - oqs.statistics.ForServer(srv).RemoveBlacklist() + + serverStatistics := oqs.statistics.ForServer(srv) + forceWakeup := serverStatistics.Blacklisted() + serverStatistics.RemoveBlacklist() + serverStatistics.ClearBackoff() + if queue := oqs.getQueue(srv); queue != nil { - queue.statistics.ClearBackoff() - queue.wakeQueueIfNeeded() + queue.wakeQueueIfEventsPending(forceWakeup) } } diff --git a/go.mod b/go.mod index 97dcdb293..adf319127 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530 github.com/matrix-org/gomatrixserverlib v0.0.0-20221115151040-900369eadf39 - github.com/matrix-org/pinecone v0.0.0-20221103125849-37f2e9b9ba37 + github.com/matrix-org/pinecone v0.0.0-20221117214503-218c39e0cd6d github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/mattn/go-sqlite3 v1.14.15 github.com/nats-io/nats-server/v2 v2.9.6 @@ -109,6 +109,8 @@ require ( github.com/nats-io/jwt/v2 v2.3.0 // indirect github.com/nats-io/nkeys v0.3.0 // indirect github.com/nats-io/nuid v1.0.1 // indirect + github.com/nxadm/tail v1.4.8 // indirect + github.com/onsi/ginkgo v1.16.5 // indirect github.com/onsi/ginkgo/v2 v2.3.0 // indirect github.com/onsi/gomega v1.22.1 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect diff --git a/go.sum b/go.sum index 25331b2f6..73741136d 100644 --- a/go.sum +++ b/go.sum @@ -335,8 +335,12 @@ github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgx github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w= github.com/lib/pq v1.10.7 h1:p7ZhMD+KsSRozJr34udlUrhboJwWAgCg34+/ZZNvZZw= github.com/lib/pq v1.10.7/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/lucas-clemente/quic-go v0.28.1/go.mod h1:oGz5DKK41cJt5+773+BSO9BXDsREY4HLf7+0odGAPO0= +github.com/lucas-clemente/quic-go v0.29.2 h1:O8Mt0O6LpvEW+wfC40vZdcw0DngwYzoxq5xULZNzSI8= +github.com/lucas-clemente/quic-go v0.29.2/go.mod h1:g6/h9YMmLuU54tL1gW25uIi3VlBp3uv+sBihplIuskE= github.com/lucas-clemente/quic-go v0.30.0 h1:nwLW0h8ahVQ5EPTIM7uhl/stHqQDea15oRlYKZmw2O0= github.com/lucas-clemente/quic-go v0.30.0/go.mod h1:ssOrRsOmdxa768Wr78vnh2B8JozgLsMzG/g+0qEC7uk= +github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/marten-seemann/qtls-go1-18 v0.1.3 h1:R4H2Ks8P6pAtUagjFty2p7BVHn3XiwDAl7TTQf5h7TI= github.com/marten-seemann/qtls-go1-18 v0.1.3/go.mod h1:mJttiymBAByA49mhlNZZGrH5u1uXYZJ+RW28Py7f4m4= @@ -350,8 +354,10 @@ github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530 h1:kHKxCOLcHH8 github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= github.com/matrix-org/gomatrixserverlib v0.0.0-20221115151040-900369eadf39 h1:VapUpY3oSbEGhfSpnnTKh7bz6AA5R4tVB5lwdlcA6jE= github.com/matrix-org/gomatrixserverlib v0.0.0-20221115151040-900369eadf39/go.mod h1:Mtifyr8q8htcBeugvlDnkBcNUy5LO8OzUoplAf1+mb4= -github.com/matrix-org/pinecone v0.0.0-20221103125849-37f2e9b9ba37 h1:CQWFrgH9TJOU2f2qCDhGwaSdAnmgSu3/f+2xcf/Fse4= -github.com/matrix-org/pinecone v0.0.0-20221103125849-37f2e9b9ba37/go.mod h1:F3GHppRuHCTDeoOmmgjZMeJdbql91+RSGGsATWfC7oc= +github.com/matrix-org/pinecone v0.0.0-20221026160848-639feeff74d6 h1:nAT5w41Q9uWTSnpKW55/hBwP91j2IFYPDRs0jJ8TyFI= +github.com/matrix-org/pinecone v0.0.0-20221026160848-639feeff74d6/go.mod h1:K0N1ixHQxXoCyqolDqVxPM3ArrDtcMs8yegOx2Lfv9k= +github.com/matrix-org/pinecone v0.0.0-20221117214503-218c39e0cd6d h1:RRJg8TP3bYYtVyFIMv/ebJR+ZTWv7Xtci5zk1D3GD10= +github.com/matrix-org/pinecone v0.0.0-20221117214503-218c39e0cd6d/go.mod h1:F3GHppRuHCTDeoOmmgjZMeJdbql91+RSGGsATWfC7oc= github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk= github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= @@ -402,6 +408,12 @@ github.com/ngrok/sqlmw v0.0.0-20220520173518-97c9c04efc79/go.mod h1:E26fwEtRNigB github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= +github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= +github.com/onsi/ginkgo v1.16.2/go.mod h1:CObGmKUOKaSC0RjmoAK7tKyn4Azo5P2IWuoMnvwxz1E= +github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= github.com/onsi/ginkgo/v2 v2.3.0 h1:kUMoxMoQG3ogk/QWyKh3zibV7BKZ+xBpWil1cTylVqc= github.com/onsi/ginkgo/v2 v2.3.0/go.mod h1:Eew0uilEqZmIEZr8JrvYlvOM7Rr6xzTmMV8AyFNU9d0= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=