From 865fff5f03d06f649736c7fcca134086dfccf7ab Mon Sep 17 00:00:00 2001 From: Till <2353100+S7evinK@users.noreply.github.com> Date: Wed, 28 Feb 2024 20:59:34 +0100 Subject: [PATCH] Make usage of relays optional, avoid DB roundtrips (#3337) This should avoid 2 additional DB roundtrips if we don't want to use relays. So instead of possibly doing roughly 20k trips to the DB, we are now "only" doing ~6600. --------- Co-authored-by: devonh --- federationapi/federationapi.go | 5 +---- federationapi/internal/federationclient_test.go | 10 +++++----- federationapi/internal/perform_test.go | 10 +++++----- federationapi/queue/queue_test.go | 2 +- federationapi/statistics/statistics.go | 11 +++++++++++ federationapi/statistics/statistics_test.go | 4 ++-- setup/config/config_federationapi.go | 7 +++++++ 7 files changed, 32 insertions(+), 17 deletions(-) diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go index efbfa3315..d3730c7ca 100644 --- a/federationapi/federationapi.go +++ b/federationapi/federationapi.go @@ -113,10 +113,7 @@ func NewInternalAPI( _ = federationDB.RemoveAllServersFromBlacklist() } - stats := statistics.NewStatistics( - federationDB, - cfg.FederationMaxRetries+1, - cfg.P2PFederationRetriesUntilAssumedOffline+1) + stats := statistics.NewStatistics(federationDB, cfg.FederationMaxRetries+1, cfg.P2PFederationRetriesUntilAssumedOffline+1, cfg.EnableRelays) js, nats := natsInstance.Prepare(processContext, &cfg.Matrix.JetStream) diff --git a/federationapi/internal/federationclient_test.go b/federationapi/internal/federationclient_test.go index fe8d84ffb..47efb11da 100644 --- a/federationapi/internal/federationclient_test.go +++ b/federationapi/internal/federationclient_test.go @@ -61,7 +61,7 @@ func TestFederationClientQueryKeys(t *testing.T) { }, } fedClient := &testFedClient{} - stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline) + stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline, false) queues := queue.NewOutgoingQueues( testDB, process.NewProcessContext(), false, @@ -92,7 +92,7 @@ func TestFederationClientQueryKeysBlacklisted(t *testing.T) { }, } fedClient := &testFedClient{} - stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline) + stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline, false) queues := queue.NewOutgoingQueues( testDB, process.NewProcessContext(), false, @@ -122,7 +122,7 @@ func TestFederationClientQueryKeysFailure(t *testing.T) { }, } fedClient := &testFedClient{shouldFail: true} - stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline) + stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline, false) queues := queue.NewOutgoingQueues( testDB, process.NewProcessContext(), false, @@ -152,7 +152,7 @@ func TestFederationClientClaimKeys(t *testing.T) { }, } fedClient := &testFedClient{} - stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline) + stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline, false) queues := queue.NewOutgoingQueues( testDB, process.NewProcessContext(), false, @@ -183,7 +183,7 @@ func TestFederationClientClaimKeysBlacklisted(t *testing.T) { }, } fedClient := &testFedClient{} - stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline) + stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline, false) queues := queue.NewOutgoingQueues( testDB, process.NewProcessContext(), false, diff --git a/federationapi/internal/perform_test.go b/federationapi/internal/perform_test.go index 2795a018a..82f9b9db1 100644 --- a/federationapi/internal/perform_test.go +++ b/federationapi/internal/perform_test.go @@ -66,7 +66,7 @@ func TestPerformWakeupServers(t *testing.T) { }, } fedClient := &testFedClient{} - stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline) + stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline, true) queues := queue.NewOutgoingQueues( testDB, process.NewProcessContext(), false, @@ -112,7 +112,7 @@ func TestQueryRelayServers(t *testing.T) { }, } fedClient := &testFedClient{} - stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline) + stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline, false) queues := queue.NewOutgoingQueues( testDB, process.NewProcessContext(), false, @@ -153,7 +153,7 @@ func TestRemoveRelayServers(t *testing.T) { }, } fedClient := &testFedClient{} - stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline) + stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline, false) queues := queue.NewOutgoingQueues( testDB, process.NewProcessContext(), false, @@ -193,7 +193,7 @@ func TestPerformDirectoryLookup(t *testing.T) { }, } fedClient := &testFedClient{} - stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline) + stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline, false) queues := queue.NewOutgoingQueues( testDB, process.NewProcessContext(), false, @@ -232,7 +232,7 @@ func TestPerformDirectoryLookupRelaying(t *testing.T) { }, } fedClient := &testFedClient{} - stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline) + stats := statistics.NewStatistics(testDB, FailuresUntilBlacklist, FailuresUntilAssumedOffline, true) queues := queue.NewOutgoingQueues( testDB, process.NewProcessContext(), false, diff --git a/federationapi/queue/queue_test.go b/federationapi/queue/queue_test.go index 73d3b0598..6da863427 100644 --- a/federationapi/queue/queue_test.go +++ b/federationapi/queue/queue_test.go @@ -117,7 +117,7 @@ func testSetup(failuresUntilBlacklist uint32, failuresUntilAssumedOffline uint32 txRelayCount: *atomic.NewUint32(0), } - stats := statistics.NewStatistics(db, failuresUntilBlacklist, failuresUntilAssumedOffline) + stats := statistics.NewStatistics(db, failuresUntilBlacklist, failuresUntilAssumedOffline, false) signingInfo := []*fclient.SigningIdentity{ { KeyID: "ed21019:auto", diff --git a/federationapi/statistics/statistics.go b/federationapi/statistics/statistics.go index e5fc4b940..e133fc9c9 100644 --- a/federationapi/statistics/statistics.go +++ b/federationapi/statistics/statistics.go @@ -34,12 +34,15 @@ type Statistics struct { // mark the destination as offline. At this point we should attempt // to send messages to the user's async relay servers if we know them. FailuresUntilAssumedOffline uint32 + + enableRelays bool } func NewStatistics( db storage.Database, failuresUntilBlacklist uint32, failuresUntilAssumedOffline uint32, + enableRelays bool, ) Statistics { return Statistics{ DB: db, @@ -47,6 +50,7 @@ func NewStatistics( FailuresUntilAssumedOffline: failuresUntilAssumedOffline, backoffTimers: make(map[spec.ServerName]*time.Timer), servers: make(map[spec.ServerName]*ServerStatistics), + enableRelays: enableRelays, } } @@ -73,6 +77,13 @@ func (s *Statistics) ForServer(serverName spec.ServerName) *ServerStatistics { } else { server.blacklisted.Store(blacklisted) } + + // Don't bother hitting the database 2 additional times + // if we don't want to use relays. + if !s.enableRelays { + return server + } + assumedOffline, err := s.DB.IsServerAssumedOffline(context.Background(), serverName) if err != nil { logrus.WithError(err).Errorf("Failed to get assumed offline entry %q", serverName) diff --git a/federationapi/statistics/statistics_test.go b/federationapi/statistics/statistics_test.go index a930bc3b0..4376a9050 100644 --- a/federationapi/statistics/statistics_test.go +++ b/federationapi/statistics/statistics_test.go @@ -16,7 +16,7 @@ const ( ) func TestBackoff(t *testing.T) { - stats := NewStatistics(nil, FailuresUntilBlacklist, FailuresUntilAssumedOffline) + stats := NewStatistics(nil, FailuresUntilBlacklist, FailuresUntilAssumedOffline, false) server := ServerStatistics{ statistics: &stats, serverName: "test.com", @@ -106,7 +106,7 @@ func TestBackoff(t *testing.T) { } func TestRelayServersListing(t *testing.T) { - stats := NewStatistics(test.NewInMemoryFederationDatabase(), FailuresUntilBlacklist, FailuresUntilAssumedOffline) + stats := NewStatistics(test.NewInMemoryFederationDatabase(), FailuresUntilBlacklist, FailuresUntilAssumedOffline, false) server := ServerStatistics{statistics: &stats} server.AddRelayServers([]spec.ServerName{"relay1", "relay1", "relay2"}) relayServers := server.KnownRelayServers() diff --git a/setup/config/config_federationapi.go b/setup/config/config_federationapi.go index a72eee369..073c46e03 100644 --- a/setup/config/config_federationapi.go +++ b/setup/config/config_federationapi.go @@ -18,6 +18,13 @@ type FederationAPI struct { // The default value is 16 if not specified, which is circa 18 hours. FederationMaxRetries uint32 `yaml:"send_max_retries"` + // P2P Feature: Whether relaying to specific nodes should be enabled. + // Defaults to false. + // Note: Enabling relays introduces a huge startup delay, if you are not using + // relays and have many servers to re-hydrate on start. Only enable this + // if you are using relays! + EnableRelays bool `yaml:"enable_relays"` + // P2P Feature: How many consecutive failures that we should tolerate when // sending federation requests to a specific server until we should assume they // are offline. If we assume they are offline then we will attempt to send