diff --git a/appservice/appservice_test.go b/appservice/appservice_test.go index de9f5aaf1..ad6f1dfc9 100644 --- a/appservice/appservice_test.go +++ b/appservice/appservice_test.go @@ -12,6 +12,7 @@ import ( "github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/appservice/api" + "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/test" @@ -123,8 +124,9 @@ func TestAppserviceInternalAPI(t *testing.T) { }, } + caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) // Create required internal APIs - rsAPI := roomserver.NewInternalAPI(base) + rsAPI := roomserver.NewInternalAPI(base, caches) usrAPI := userapi.NewInternalAPI(base, rsAPI, nil) asAPI := appservice.NewInternalAPI(base, usrAPI, rsAPI) diff --git a/build/dendritejs-pinecone/main.go b/build/dendritejs-pinecone/main.go index 44e52286f..96f034bdf 100644 --- a/build/dendritejs-pinecone/main.go +++ b/build/dendritejs-pinecone/main.go @@ -29,6 +29,7 @@ import ( "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" "github.com/matrix-org/dendrite/federationapi" + "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/setup" @@ -192,7 +193,8 @@ func startup() { base, userAPI, rsAPI, ) rsAPI.SetAppserviceAPI(asQuery) - fedSenderAPI := federationapi.NewInternalAPI(base, federation, rsAPI, base.Caches, keyRing, true) + caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.EnableMetrics) + fedSenderAPI := federationapi.NewInternalAPI(base, federation, rsAPI, caches, keyRing, true) rsAPI.SetFederationAPI(fedSenderAPI, keyRing) monolith := setup.Monolith{ @@ -208,15 +210,15 @@ func startup() { //ServerKeyAPI: serverKeyAPI, ExtPublicRoomsProvider: rooms.NewPineconeRoomProvider(pRouter, pSessions, fedSenderAPI, federation), } - monolith.AddAllPublicRoutes(base) + monolith.AddAllPublicRoutes(base, caches) httpRouter := mux.NewRouter().SkipClean(true).UseEncodedPath() - httpRouter.PathPrefix(httputil.PublicClientPathPrefix).Handler(base.PublicClientAPIMux) - httpRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.PublicMediaAPIMux) + httpRouter.PathPrefix(httputil.PublicClientPathPrefix).Handler(base.Routers.Client) + httpRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.Routers.Media) p2pRouter := pSessions.Protocol("matrix").HTTP().Mux() - p2pRouter.Handle(httputil.PublicFederationPathPrefix, base.PublicFederationAPIMux) - p2pRouter.Handle(httputil.PublicMediaPathPrefix, base.PublicMediaAPIMux) + p2pRouter.Handle(httputil.PublicFederationPathPrefix, base.Routers.Federation) + p2pRouter.Handle(httputil.PublicMediaPathPrefix, base.Routers.Media) // Expose the matrix APIs via fetch - for local traffic go func() { diff --git a/build/gobind-yggdrasil/monolith.go b/build/gobind-yggdrasil/monolith.go index 32af611ae..8faad1d02 100644 --- a/build/gobind-yggdrasil/monolith.go +++ b/build/gobind-yggdrasil/monolith.go @@ -19,6 +19,7 @@ import ( "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggrooms" "github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/federationapi/api" + "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/setup" @@ -158,10 +159,11 @@ func (m *DendriteMonolith) Start() { serverKeyAPI := &signing.YggdrasilKeys{} keyRing := serverKeyAPI.KeyRing() - rsAPI := roomserver.NewInternalAPI(base) + caches := caching.NewRistrettoCache(cfg.Global.Cache.EstimatedMaxSize, cfg.Global.Cache.MaxAge, caching.DisableMetrics) + rsAPI := roomserver.NewInternalAPI(base, caches) fsAPI := federationapi.NewInternalAPI( - base, federation, rsAPI, base.Caches, keyRing, true, + base, federation, rsAPI, caches, keyRing, true, ) userAPI := userapi.NewInternalAPI(base, rsAPI, federation) @@ -187,17 +189,17 @@ func (m *DendriteMonolith) Start() { ygg, fsAPI, federation, ), } - monolith.AddAllPublicRoutes(base) + monolith.AddAllPublicRoutes(base, caches) httpRouter := mux.NewRouter() - httpRouter.PathPrefix(httputil.PublicClientPathPrefix).Handler(base.PublicClientAPIMux) - httpRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.PublicMediaAPIMux) - httpRouter.PathPrefix(httputil.DendriteAdminPathPrefix).Handler(base.DendriteAdminMux) - httpRouter.PathPrefix(httputil.SynapseAdminPathPrefix).Handler(base.SynapseAdminMux) + httpRouter.PathPrefix(httputil.PublicClientPathPrefix).Handler(base.Routers.Client) + httpRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.Routers.Media) + httpRouter.PathPrefix(httputil.DendriteAdminPathPrefix).Handler(base.Routers.DendriteAdmin) + httpRouter.PathPrefix(httputil.SynapseAdminPathPrefix).Handler(base.Routers.SynapseAdmin) yggRouter := mux.NewRouter() - yggRouter.PathPrefix(httputil.PublicFederationPathPrefix).Handler(base.PublicFederationAPIMux) - yggRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.PublicMediaAPIMux) + yggRouter.PathPrefix(httputil.PublicFederationPathPrefix).Handler(base.Routers.Federation) + yggRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.Routers.Media) // Build both ends of a HTTP multiplex. m.httpServer = &http.Server{ diff --git a/clientapi/admin_test.go b/clientapi/admin_test.go index 300d3a88a..46e2d3037 100644 --- a/clientapi/admin_test.go +++ b/clientapi/admin_test.go @@ -8,6 +8,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/federationapi" + "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" @@ -37,7 +38,8 @@ func TestAdminResetPassword(t *testing.T) { SigningIdentity: gomatrixserverlib.SigningIdentity{ServerName: "vh1"}, }) - rsAPI := roomserver.NewInternalAPI(base) + caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) + rsAPI := roomserver.NewInternalAPI(base, caches) // Needed for changing the password/login userAPI := userapi.NewInternalAPI(base, rsAPI, nil) // We mostly need the userAPI for this test, so nil for other APIs/caches etc. @@ -71,7 +73,7 @@ func TestAdminResetPassword(t *testing.T) { "password": password, })) rec := httptest.NewRecorder() - base.PublicClientAPIMux.ServeHTTP(rec, req) + base.Routers.Client.ServeHTTP(rec, req) if rec.Code != http.StatusOK { t.Fatalf("failed to login: %s", rec.Body.String()) } @@ -124,7 +126,7 @@ func TestAdminResetPassword(t *testing.T) { } rec := httptest.NewRecorder() - base.DendriteAdminMux.ServeHTTP(rec, req) + base.Routers.DendriteAdmin.ServeHTTP(rec, req) t.Logf("%s", rec.Body.String()) if tc.wantOK && rec.Code != http.StatusOK { t.Fatalf("expected http status %d, got %d: %s", http.StatusOK, rec.Code, rec.Body.String()) @@ -148,15 +150,16 @@ func TestPurgeRoom(t *testing.T) { test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { base, baseClose := testrig.CreateBaseDendrite(t, dbType) + caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) defer baseClose() fedClient := base.CreateFederationClient() - rsAPI := roomserver.NewInternalAPI(base) + rsAPI := roomserver.NewInternalAPI(base, caches) userAPI := userapi.NewInternalAPI(base, rsAPI, nil) // this starts the JetStream consumers - syncapi.AddPublicRoutes(base, userAPI, rsAPI) - federationapi.NewInternalAPI(base, fedClient, rsAPI, base.Caches, nil, true) + syncapi.AddPublicRoutes(base, userAPI, rsAPI, caches) + federationapi.NewInternalAPI(base, fedClient, rsAPI, caches, nil, true) rsAPI.SetFederationAPI(nil, nil) // Create the room @@ -193,7 +196,7 @@ func TestPurgeRoom(t *testing.T) { "password": password, })) rec := httptest.NewRecorder() - base.PublicClientAPIMux.ServeHTTP(rec, req) + base.Routers.Client.ServeHTTP(rec, req) if rec.Code != http.StatusOK { t.Fatalf("failed to login: %s", rec.Body.String()) } @@ -218,7 +221,7 @@ func TestPurgeRoom(t *testing.T) { req.Header.Set("Authorization", "Bearer "+accessTokens[aliceAdmin]) rec := httptest.NewRecorder() - base.DendriteAdminMux.ServeHTTP(rec, req) + base.Routers.DendriteAdmin.ServeHTTP(rec, req) t.Logf("%s", rec.Body.String()) if tc.wantOK && rec.Code != http.StatusOK { t.Fatalf("expected http status %d, got %d: %s", http.StatusOK, rec.Code, rec.Body.String()) diff --git a/clientapi/routing/joinroom_test.go b/clientapi/routing/joinroom_test.go index 1450ef4bd..de8f9538d 100644 --- a/clientapi/routing/joinroom_test.go +++ b/clientapi/routing/joinroom_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/dendrite/appservice" @@ -27,7 +28,8 @@ func TestJoinRoomByIDOrAlias(t *testing.T) { base, baseClose := testrig.CreateBaseDendrite(t, dbType) defer baseClose() - rsAPI := roomserver.NewInternalAPI(base) + caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) + rsAPI := roomserver.NewInternalAPI(base, caches) userAPI := userapi.NewInternalAPI(base, rsAPI, nil) asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI) rsAPI.SetFederationAPI(nil, nil) // creates the rs.Inputer etc diff --git a/clientapi/routing/login_test.go b/clientapi/routing/login_test.go index b72db9d8b..fd3d8cba9 100644 --- a/clientapi/routing/login_test.go +++ b/clientapi/routing/login_test.go @@ -9,6 +9,7 @@ import ( "testing" "github.com/matrix-org/dendrite/clientapi/auth/authtypes" + "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/gomatrixserverlib" @@ -36,7 +37,8 @@ func TestLogin(t *testing.T) { SigningIdentity: gomatrixserverlib.SigningIdentity{ServerName: "vh1"}, }) - rsAPI := roomserver.NewInternalAPI(base) + caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) + rsAPI := roomserver.NewInternalAPI(base, caches) // Needed for /login userAPI := userapi.NewInternalAPI(base, rsAPI, nil) @@ -114,7 +116,7 @@ func TestLogin(t *testing.T) { "password": password, })) rec := httptest.NewRecorder() - base.PublicClientAPIMux.ServeHTTP(rec, req) + base.Routers.Client.ServeHTTP(rec, req) if tc.wantOK && rec.Code != http.StatusOK { t.Fatalf("failed to login: %s", rec.Body.String()) } diff --git a/clientapi/routing/register_test.go b/clientapi/routing/register_test.go index 651e3d3d6..c06b0ae12 100644 --- a/clientapi/routing/register_test.go +++ b/clientapi/routing/register_test.go @@ -30,6 +30,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/test" @@ -407,7 +408,8 @@ func Test_register(t *testing.T) { base, baseClose := testrig.CreateBaseDendrite(t, dbType) defer baseClose() - rsAPI := roomserver.NewInternalAPI(base) + caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) + rsAPI := roomserver.NewInternalAPI(base, caches) userAPI := userapi.NewInternalAPI(base, rsAPI, nil) for _, tc := range testCases { @@ -578,7 +580,8 @@ func TestRegisterUserWithDisplayName(t *testing.T) { defer baseClose() base.Cfg.Global.ServerName = "server" - rsAPI := roomserver.NewInternalAPI(base) + caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) + rsAPI := roomserver.NewInternalAPI(base, caches) userAPI := userapi.NewInternalAPI(base, rsAPI, nil) deviceName, deviceID := "deviceName", "deviceID" expectedDisplayName := "DisplayName" @@ -616,8 +619,8 @@ func TestRegisterAdminUsingSharedSecret(t *testing.T) { base.Cfg.Global.ServerName = "server" sharedSecret := "dendritetest" base.Cfg.ClientAPI.RegistrationSharedSecret = sharedSecret - - rsAPI := roomserver.NewInternalAPI(base) + caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) + rsAPI := roomserver.NewInternalAPI(base, caches) userAPI := userapi.NewInternalAPI(base, rsAPI, nil) expectedDisplayName := "rabbit" diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go index 028d02e97..4ef2ac92e 100644 --- a/clientapi/routing/routing.go +++ b/clientapi/routing/routing.go @@ -63,10 +63,10 @@ func Setup( extRoomsProvider api.ExtraPublicRoomsProvider, mscCfg *config.MSCs, natsClient *nats.Conn, ) { - publicAPIMux := base.PublicClientAPIMux - wkMux := base.PublicWellKnownAPIMux - synapseAdminRouter := base.SynapseAdminMux - dendriteAdminRouter := base.DendriteAdminMux + publicAPIMux := base.Routers.Client + wkMux := base.Routers.WellKnown + synapseAdminRouter := base.Routers.SynapseAdmin + dendriteAdminRouter := base.Routers.DendriteAdmin if base.EnableMetrics { prometheus.MustRegister(amtRegUsers, sendEventDuration) diff --git a/cmd/dendrite-demo-pinecone/monolith/monolith.go b/cmd/dendrite-demo-pinecone/monolith/monolith.go index ea8e985c7..5781d6571 100644 --- a/cmd/dendrite-demo-pinecone/monolith/monolith.go +++ b/cmd/dendrite-demo-pinecone/monolith/monolith.go @@ -37,6 +37,7 @@ import ( "github.com/matrix-org/dendrite/federationapi" federationAPI "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/federationapi/producers" + "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/relayapi" relayAPI "github.com/matrix-org/dendrite/relayapi/api" @@ -134,17 +135,17 @@ func (p *P2PMonolith) SetupDendrite(cfg *config.Dendrite, port int, enableRelayi serverKeyAPI := &signing.YggdrasilKeys{} keyRing := serverKeyAPI.KeyRing() - rsComponent := roomserver.NewInternalAPI(p.BaseDendrite) - rsAPI := rsComponent + caches := caching.NewRistrettoCache(cfg.Global.Cache.EstimatedMaxSize, cfg.Global.Cache.MaxAge, enableMetrics) + rsAPI := roomserver.NewInternalAPI(p.BaseDendrite, caches) fsAPI := federationapi.NewInternalAPI( - p.BaseDendrite, federation, rsAPI, p.BaseDendrite.Caches, keyRing, true, + p.BaseDendrite, federation, rsAPI, caches, keyRing, true, ) userAPI := userapi.NewInternalAPI(p.BaseDendrite, rsAPI, federation) asAPI := appservice.NewInternalAPI(p.BaseDendrite, userAPI, rsAPI) - rsComponent.SetFederationAPI(fsAPI, keyRing) + rsAPI.SetFederationAPI(fsAPI, keyRing) userProvider := users.NewPineconeUserProvider(p.Router, p.Sessions, userAPI, federation) roomProvider := rooms.NewPineconeRoomProvider(p.Router, p.Sessions, fsAPI, federation) @@ -161,7 +162,7 @@ func (p *P2PMonolith) SetupDendrite(cfg *config.Dendrite, port int, enableRelayi Config: &p.BaseDendrite.Cfg.FederationAPI, UserAPI: userAPI, } - relayAPI := relayapi.NewRelayInternalAPI(p.BaseDendrite, federation, rsAPI, keyRing, producer, enableRelaying) + relayAPI := relayapi.NewRelayInternalAPI(p.BaseDendrite, federation, rsAPI, keyRing, producer, enableRelaying, caches) logrus.Infof("Relaying enabled: %v", relayAPI.RelayingEnabled()) p.dendrite = setup.Monolith{ @@ -178,7 +179,7 @@ func (p *P2PMonolith) SetupDendrite(cfg *config.Dendrite, port int, enableRelayi ExtPublicRoomsProvider: roomProvider, ExtUserDirectoryProvider: userProvider, } - p.dendrite.AddAllPublicRoutes(p.BaseDendrite) + p.dendrite.AddAllPublicRoutes(p.BaseDendrite, caches) p.setupHttpServers(userProvider, enableWebsockets) } @@ -247,10 +248,10 @@ func (p *P2PMonolith) Addr() string { func (p *P2PMonolith) setupHttpServers(userProvider *users.PineconeUserProvider, enableWebsockets bool) { p.httpMux = mux.NewRouter().SkipClean(true).UseEncodedPath() - p.httpMux.PathPrefix(httputil.PublicClientPathPrefix).Handler(p.BaseDendrite.PublicClientAPIMux) - p.httpMux.PathPrefix(httputil.PublicMediaPathPrefix).Handler(p.BaseDendrite.PublicMediaAPIMux) - p.httpMux.PathPrefix(httputil.DendriteAdminPathPrefix).Handler(p.BaseDendrite.DendriteAdminMux) - p.httpMux.PathPrefix(httputil.SynapseAdminPathPrefix).Handler(p.BaseDendrite.SynapseAdminMux) + p.httpMux.PathPrefix(httputil.PublicClientPathPrefix).Handler(p.BaseDendrite.Routers.Client) + p.httpMux.PathPrefix(httputil.PublicMediaPathPrefix).Handler(p.BaseDendrite.Routers.Media) + p.httpMux.PathPrefix(httputil.DendriteAdminPathPrefix).Handler(p.BaseDendrite.Routers.DendriteAdmin) + p.httpMux.PathPrefix(httputil.SynapseAdminPathPrefix).Handler(p.BaseDendrite.Routers.SynapseAdmin) if enableWebsockets { wsUpgrader := websocket.Upgrader{ @@ -283,8 +284,8 @@ func (p *P2PMonolith) setupHttpServers(userProvider *users.PineconeUserProvider, p.pineconeMux = mux.NewRouter().SkipClean(true).UseEncodedPath() p.pineconeMux.PathPrefix(users.PublicURL).HandlerFunc(userProvider.FederatedUserProfiles) - p.pineconeMux.PathPrefix(httputil.PublicFederationPathPrefix).Handler(p.BaseDendrite.PublicFederationAPIMux) - p.pineconeMux.PathPrefix(httputil.PublicMediaPathPrefix).Handler(p.BaseDendrite.PublicMediaAPIMux) + p.pineconeMux.PathPrefix(httputil.PublicFederationPathPrefix).Handler(p.BaseDendrite.Routers.Federation) + p.pineconeMux.PathPrefix(httputil.PublicMediaPathPrefix).Handler(p.BaseDendrite.Routers.Media) pHTTP := p.Sessions.Protocol(SessionProtocol).HTTP() pHTTP.Mux().Handle(users.PublicURL, p.pineconeMux) diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go index d759c6a73..62184719a 100644 --- a/cmd/dendrite-demo-yggdrasil/main.go +++ b/cmd/dendrite-demo-yggdrasil/main.go @@ -27,6 +27,7 @@ import ( "path/filepath" "time" + "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/gomatrixserverlib" "github.com/gorilla/mux" @@ -156,16 +157,15 @@ func main() { serverKeyAPI := &signing.YggdrasilKeys{} keyRing := serverKeyAPI.KeyRing() - rsAPI := roomserver.NewInternalAPI( - base, - ) + caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.EnableMetrics) + rsAPI := roomserver.NewInternalAPI(base, caches) userAPI := userapi.NewInternalAPI(base, rsAPI, federation) asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI) rsAPI.SetAppserviceAPI(asAPI) fsAPI := federationapi.NewInternalAPI( - base, federation, rsAPI, base.Caches, keyRing, true, + base, federation, rsAPI, caches, keyRing, true, ) rsAPI.SetFederationAPI(fsAPI, keyRing) @@ -184,21 +184,21 @@ func main() { ygg, fsAPI, federation, ), } - monolith.AddAllPublicRoutes(base) - if err := mscs.Enable(base, &monolith); err != nil { + monolith.AddAllPublicRoutes(base, caches) + if err := mscs.Enable(base, &monolith, caches); err != nil { logrus.WithError(err).Fatalf("Failed to enable MSCs") } httpRouter := mux.NewRouter().SkipClean(true).UseEncodedPath() - httpRouter.PathPrefix(httputil.PublicClientPathPrefix).Handler(base.PublicClientAPIMux) - httpRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.PublicMediaAPIMux) - httpRouter.PathPrefix(httputil.DendriteAdminPathPrefix).Handler(base.DendriteAdminMux) - httpRouter.PathPrefix(httputil.SynapseAdminPathPrefix).Handler(base.SynapseAdminMux) + httpRouter.PathPrefix(httputil.PublicClientPathPrefix).Handler(base.Routers.Client) + httpRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.Routers.Media) + httpRouter.PathPrefix(httputil.DendriteAdminPathPrefix).Handler(base.Routers.DendriteAdmin) + httpRouter.PathPrefix(httputil.SynapseAdminPathPrefix).Handler(base.Routers.SynapseAdmin) embed.Embed(httpRouter, *instancePort, "Yggdrasil Demo") yggRouter := mux.NewRouter().SkipClean(true).UseEncodedPath() - yggRouter.PathPrefix(httputil.PublicFederationPathPrefix).Handler(base.PublicFederationAPIMux) - yggRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.PublicMediaAPIMux) + yggRouter.PathPrefix(httputil.PublicFederationPathPrefix).Handler(base.Routers.Federation) + yggRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.Routers.Media) // Build both ends of a HTTP multiplex. httpServer := &http.Server{ diff --git a/cmd/dendrite/main.go b/cmd/dendrite/main.go index 472b8be18..29290eb98 100644 --- a/cmd/dendrite/main.go +++ b/cmd/dendrite/main.go @@ -17,6 +17,7 @@ package main import ( "flag" + "github.com/matrix-org/dendrite/internal/caching" "github.com/sirupsen/logrus" "github.com/matrix-org/dendrite/appservice" @@ -72,10 +73,11 @@ func main() { federation := base.CreateFederationClient() - rsAPI := roomserver.NewInternalAPI(base) + caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.EnableMetrics) + rsAPI := roomserver.NewInternalAPI(base, caches) fsAPI := federationapi.NewInternalAPI( - base, federation, rsAPI, base.Caches, nil, false, + base, federation, rsAPI, caches, nil, false, ) keyRing := fsAPI.KeyRing() @@ -104,10 +106,10 @@ func main() { RoomserverAPI: rsAPI, UserAPI: userAPI, } - monolith.AddAllPublicRoutes(base) + monolith.AddAllPublicRoutes(base, caches) if len(base.Cfg.MSCs.MSCs) > 0 { - if err := mscs.Enable(base, &monolith); err != nil { + if err := mscs.Enable(base, &monolith, caches); err != nil { logrus.WithError(err).Fatalf("Failed to enable MSCs") } } diff --git a/cmd/resolve-state/main.go b/cmd/resolve-state/main.go index a9cc80cb7..099daaa9a 100644 --- a/cmd/resolve-state/main.go +++ b/cmd/resolve-state/main.go @@ -55,7 +55,7 @@ func main() { fmt.Println("Fetching", len(snapshotNIDs), "snapshot NIDs") roomserverDB, err := storage.Open( - base, &cfg.RoomServer.Database, + base.ProcessContext.Context(), base.ConnectionManager, &cfg.RoomServer.Database, caching.NewRistrettoCache(128*1024*1024, time.Hour, true), ) if err != nil { diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go index ec482659a..8a4237bae 100644 --- a/federationapi/federationapi.go +++ b/federationapi/federationapi.go @@ -94,7 +94,7 @@ func NewInternalAPI( ) api.FederationInternalAPI { cfg := &base.Cfg.FederationAPI - federationDB, err := storage.NewDatabase(base, &cfg.Database, base.Caches, base.Cfg.Global.IsLocalServerName) + federationDB, err := storage.NewDatabase(base.ProcessContext.Context(), base.ConnectionManager, &cfg.Database, caches, base.Cfg.Global.IsLocalServerName) if err != nil { logrus.WithError(err).Panic("failed to connect to federation sender db") } diff --git a/federationapi/federationapi_test.go b/federationapi/federationapi_test.go index 57d4b9644..8aea96a73 100644 --- a/federationapi/federationapi_test.go +++ b/federationapi/federationapi_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/gomatrix" "github.com/matrix-org/gomatrixserverlib" "github.com/nats-io/nats.go" @@ -163,6 +164,7 @@ func TestFederationAPIJoinThenKeyUpdate(t *testing.T) { func testFederationAPIJoinThenKeyUpdate(t *testing.T, dbType test.DBType) { base, close := testrig.CreateBaseDendrite(t, dbType) + caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) base.Cfg.FederationAPI.PreferDirectFetch = true base.Cfg.FederationAPI.KeyPerspectives = nil defer close() @@ -212,7 +214,7 @@ func testFederationAPIJoinThenKeyUpdate(t *testing.T, dbType test.DBType) { }, }, } - fsapi := federationapi.NewInternalAPI(base, fc, rsapi, base.Caches, nil, false) + fsapi := federationapi.NewInternalAPI(base, fc, rsapi, caches, nil, false) var resp api.PerformJoinResponse fsapi.PerformJoin(context.Background(), &api.PerformJoinRequest{ @@ -278,7 +280,7 @@ func TestRoomsV3URLEscapeDoNot404(t *testing.T) { // TODO: This is pretty fragile, as if anything calls anything on these nils this test will break. // Unfortunately, it makes little sense to instantiate these dependencies when we just want to test routing. federationapi.AddPublicRoutes(b, nil, nil, keyRing, nil, &internal.FederationInternalAPI{}, nil) - baseURL, cancel := test.ListenAndServe(t, b.PublicFederationAPIMux, true) + baseURL, cancel := test.ListenAndServe(t, b.Routers.Federation, true) defer cancel() serverName := gomatrixserverlib.ServerName(strings.TrimPrefix(baseURL, "https://")) diff --git a/federationapi/queue/queue_test.go b/federationapi/queue/queue_test.go index bccfb3428..c3fb10561 100644 --- a/federationapi/queue/queue_test.go +++ b/federationapi/queue/queue_test.go @@ -21,6 +21,8 @@ import ( "testing" "time" + "github.com/matrix-org/dendrite/internal/caching" + "github.com/matrix-org/dendrite/test/testrig" "go.uber.org/atomic" "gotest.tools/v3/poll" @@ -34,17 +36,17 @@ import ( "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/test" - "github.com/matrix-org/dendrite/test/testrig" ) func mustCreateFederationDatabase(t *testing.T, dbType test.DBType, realDatabase bool) (storage.Database, *process.ProcessContext, func()) { if realDatabase { // Real Database/s b, baseClose := testrig.CreateBaseDendrite(t, dbType) + caches := caching.NewRistrettoCache(b.Cfg.Global.Cache.EstimatedMaxSize, b.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) connStr, dbClose := test.PrepareDBConnectionString(t, dbType) - db, err := storage.NewDatabase(b, &config.DatabaseOptions{ + db, err := storage.NewDatabase(b.ProcessContext.Context(), b.ConnectionManager, &config.DatabaseOptions{ ConnectionString: config.DataSource(connStr), - }, b.Caches, b.Cfg.Global.IsLocalServerName) + }, caches, b.Cfg.Global.IsLocalServerName) if err != nil { t.Fatalf("NewDatabase returned %s", err) } @@ -55,10 +57,7 @@ func mustCreateFederationDatabase(t *testing.T, dbType test.DBType, realDatabase } else { // Fake Database db := test.NewInMemoryFederationDatabase() - b := struct { - ProcessContext *process.ProcessContext - }{ProcessContext: process.NewProcessContext()} - return db, b.ProcessContext, func() {} + return db, process.NewProcessContext(), func() {} } } diff --git a/federationapi/routing/profile_test.go b/federationapi/routing/profile_test.go index 3b9d576bf..df494a743 100644 --- a/federationapi/routing/profile_test.go +++ b/federationapi/routing/profile_test.go @@ -50,7 +50,7 @@ func TestHandleQueryProfile(t *testing.T) { defer close() fedMux := mux.NewRouter().SkipClean(true).PathPrefix(httputil.PublicFederationPathPrefix).Subrouter().UseEncodedPath() - base.PublicFederationAPIMux = fedMux + base.Routers.Federation = fedMux base.Cfg.FederationAPI.Matrix.SigningIdentity.ServerName = testOrigin base.Cfg.FederationAPI.Matrix.Metrics.Enabled = false fedClient := fakeFedClient{} diff --git a/federationapi/routing/query_test.go b/federationapi/routing/query_test.go index d839a16b8..69cf7047d 100644 --- a/federationapi/routing/query_test.go +++ b/federationapi/routing/query_test.go @@ -50,7 +50,7 @@ func TestHandleQueryDirectory(t *testing.T) { defer close() fedMux := mux.NewRouter().SkipClean(true).PathPrefix(httputil.PublicFederationPathPrefix).Subrouter().UseEncodedPath() - base.PublicFederationAPIMux = fedMux + base.Routers.Federation = fedMux base.Cfg.FederationAPI.Matrix.SigningIdentity.ServerName = testOrigin base.Cfg.FederationAPI.Matrix.Metrics.Enabled = false fedClient := fakeFedClient{} diff --git a/federationapi/routing/routing.go b/federationapi/routing/routing.go index 324740ddc..c86d18d2f 100644 --- a/federationapi/routing/routing.go +++ b/federationapi/routing/routing.go @@ -65,9 +65,9 @@ func Setup( servers federationAPI.ServersInRoomProvider, producer *producers.SyncAPIProducer, ) { - fedMux := base.PublicFederationAPIMux - keyMux := base.PublicKeyAPIMux - wkMux := base.PublicWellKnownAPIMux + fedMux := base.Routers.Federation + keyMux := base.Routers.Keys + wkMux := base.Routers.WellKnown cfg := &base.Cfg.FederationAPI if base.EnableMetrics { diff --git a/federationapi/routing/send_test.go b/federationapi/routing/send_test.go index eed4e7e69..53e1399ab 100644 --- a/federationapi/routing/send_test.go +++ b/federationapi/routing/send_test.go @@ -48,7 +48,7 @@ func TestHandleSend(t *testing.T) { defer close() fedMux := mux.NewRouter().SkipClean(true).PathPrefix(httputil.PublicFederationPathPrefix).Subrouter().UseEncodedPath() - base.PublicFederationAPIMux = fedMux + base.Routers.Federation = fedMux base.Cfg.FederationAPI.Matrix.SigningIdentity.ServerName = testOrigin base.Cfg.FederationAPI.Matrix.Metrics.Enabled = false fedapi := fedAPI.NewInternalAPI(base, nil, nil, nil, nil, true) diff --git a/federationapi/storage/postgres/storage.go b/federationapi/storage/postgres/storage.go index b81f128e7..468567cf0 100644 --- a/federationapi/storage/postgres/storage.go +++ b/federationapi/storage/postgres/storage.go @@ -16,6 +16,7 @@ package postgres import ( + "context" "database/sql" "fmt" @@ -23,7 +24,6 @@ import ( "github.com/matrix-org/dendrite/federationapi/storage/shared" "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/sqlutil" - "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/gomatrixserverlib" ) @@ -36,10 +36,10 @@ type Database struct { } // NewDatabase opens a new database -func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, cache caching.FederationCache, isLocalServerName func(gomatrixserverlib.ServerName) bool) (*Database, error) { +func NewDatabase(ctx context.Context, conMan sqlutil.Connections, dbProperties *config.DatabaseOptions, cache caching.FederationCache, isLocalServerName func(gomatrixserverlib.ServerName) bool) (*Database, error) { var d Database var err error - if d.db, d.writer, err = base.DatabaseConnection(dbProperties, sqlutil.NewDummyWriter()); err != nil { + if d.db, d.writer, err = conMan.Connection(dbProperties); err != nil { return nil, err } blacklist, err := NewPostgresBlacklistTable(d.db) @@ -95,7 +95,7 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, Version: "federationsender: drop federationsender_rooms", Up: deltas.UpRemoveRoomsTable, }) - err = m.Up(base.Context()) + err = m.Up(ctx) if err != nil { return nil, err } diff --git a/federationapi/storage/sqlite3/storage.go b/federationapi/storage/sqlite3/storage.go index 1e7e41a2c..c64c9a4f0 100644 --- a/federationapi/storage/sqlite3/storage.go +++ b/federationapi/storage/sqlite3/storage.go @@ -15,13 +15,13 @@ package sqlite3 import ( + "context" "database/sql" "github.com/matrix-org/dendrite/federationapi/storage/shared" "github.com/matrix-org/dendrite/federationapi/storage/sqlite3/deltas" "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/sqlutil" - "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/gomatrixserverlib" ) @@ -34,10 +34,10 @@ type Database struct { } // NewDatabase opens a new database -func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, cache caching.FederationCache, isLocalServerName func(gomatrixserverlib.ServerName) bool) (*Database, error) { +func NewDatabase(ctx context.Context, conMan sqlutil.Connections, dbProperties *config.DatabaseOptions, cache caching.FederationCache, isLocalServerName func(gomatrixserverlib.ServerName) bool) (*Database, error) { var d Database var err error - if d.db, d.writer, err = base.DatabaseConnection(dbProperties, sqlutil.NewExclusiveWriter()); err != nil { + if d.db, d.writer, err = conMan.Connection(dbProperties); err != nil { return nil, err } blacklist, err := NewSQLiteBlacklistTable(d.db) @@ -93,7 +93,7 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, Version: "federationsender: drop federationsender_rooms", Up: deltas.UpRemoveRoomsTable, }) - err = m.Up(base.Context()) + err = m.Up(ctx) if err != nil { return nil, err } diff --git a/federationapi/storage/storage.go b/federationapi/storage/storage.go index 142e281ea..4eb9d2c98 100644 --- a/federationapi/storage/storage.go +++ b/federationapi/storage/storage.go @@ -18,23 +18,24 @@ package storage import ( + "context" "fmt" "github.com/matrix-org/dendrite/federationapi/storage/postgres" "github.com/matrix-org/dendrite/federationapi/storage/sqlite3" "github.com/matrix-org/dendrite/internal/caching" - "github.com/matrix-org/dendrite/setup/base" + "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/gomatrixserverlib" ) // NewDatabase opens a new database -func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, cache caching.FederationCache, isLocalServerName func(gomatrixserverlib.ServerName) bool) (Database, error) { +func NewDatabase(ctx context.Context, conMan sqlutil.Connections, dbProperties *config.DatabaseOptions, cache caching.FederationCache, isLocalServerName func(gomatrixserverlib.ServerName) bool) (Database, error) { switch { case dbProperties.ConnectionString.IsSQLite(): - return sqlite3.NewDatabase(base, dbProperties, cache, isLocalServerName) + return sqlite3.NewDatabase(ctx, conMan, dbProperties, cache, isLocalServerName) case dbProperties.ConnectionString.IsPostgres(): - return postgres.NewDatabase(base, dbProperties, cache, isLocalServerName) + return postgres.NewDatabase(ctx, conMan, dbProperties, cache, isLocalServerName) default: return nil, fmt.Errorf("unexpected database type") } diff --git a/federationapi/storage/storage_test.go b/federationapi/storage/storage_test.go index 1d2a13e81..a02b71b60 100644 --- a/federationapi/storage/storage_test.go +++ b/federationapi/storage/storage_test.go @@ -7,26 +7,28 @@ import ( "time" "github.com/matrix-org/dendrite/federationapi/storage" + "github.com/matrix-org/dendrite/internal/caching" + "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/test" - "github.com/matrix-org/dendrite/test/testrig" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" "github.com/stretchr/testify/assert" ) func mustCreateFederationDatabase(t *testing.T, dbType test.DBType) (storage.Database, func()) { - b, baseClose := testrig.CreateBaseDendrite(t, dbType) + caches := caching.NewRistrettoCache(8*1024*1024, time.Hour, false) connStr, dbClose := test.PrepareDBConnectionString(t, dbType) - db, err := storage.NewDatabase(b, &config.DatabaseOptions{ + ctx := context.Background() + cm := sqlutil.NewConnectionManager() + db, err := storage.NewDatabase(ctx, cm, &config.DatabaseOptions{ ConnectionString: config.DataSource(connStr), - }, b.Caches, func(server gomatrixserverlib.ServerName) bool { return server == "localhost" }) + }, caches, func(server gomatrixserverlib.ServerName) bool { return server == "localhost" }) if err != nil { t.Fatalf("NewDatabase returned %s", err) } return db, func() { dbClose() - baseClose() } } diff --git a/federationapi/storage/storage_wasm.go b/federationapi/storage/storage_wasm.go index 84d5a3a4c..d1652d712 100644 --- a/federationapi/storage/storage_wasm.go +++ b/federationapi/storage/storage_wasm.go @@ -15,20 +15,21 @@ package storage import ( + "context" "fmt" "github.com/matrix-org/dendrite/federationapi/storage/sqlite3" "github.com/matrix-org/dendrite/internal/caching" - "github.com/matrix-org/dendrite/setup/base" + "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/gomatrixserverlib" ) // NewDatabase opens a new database -func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, cache caching.FederationCache, serverName gomatrixserverlib.ServerName) (Database, error) { +func NewDatabase(ctx context.Context, conMan sqlutil.Connections, dbProperties *config.DatabaseOptions, cache caching.FederationCache, isLocalServerName func(gomatrixserverlib.ServerName) bool) (Database, error) { switch { case dbProperties.ConnectionString.IsSQLite(): - return sqlite3.NewDatabase(base, dbProperties, cache, serverName) + return sqlite3.NewDatabase(ctx, conMan, dbProperties, cache, isLocalServerName) case dbProperties.ConnectionString.IsPostgres(): return nil, fmt.Errorf("can't use Postgres implementation") default: diff --git a/internal/caching/impl_ristretto.go b/internal/caching/impl_ristretto.go index 106b9c99f..7663ddcb9 100644 --- a/internal/caching/impl_ristretto.go +++ b/internal/caching/impl_ristretto.go @@ -46,6 +46,11 @@ const ( eventStateKeyNIDCache ) +const ( + DisableMetrics = false + EnableMetrics = true +) + func NewRistrettoCache(maxCost config.DataUnit, maxAge time.Duration, enablePrometheus bool) *Caches { cache, err := ristretto.NewCache(&ristretto.Config{ NumCounters: int64((maxCost / 1024) * 10), // 10 counters per 1KB data, affects bloom filter size diff --git a/internal/fulltext/bleve.go b/internal/fulltext/bleve.go index 7187861dd..08cef8a97 100644 --- a/internal/fulltext/bleve.go +++ b/internal/fulltext/bleve.go @@ -18,10 +18,10 @@ package fulltext import ( + "context" "strings" "github.com/blevesearch/bleve/v2" - // side effect imports to allow all possible languages _ "github.com/blevesearch/bleve/v2/analysis/lang/ar" _ "github.com/blevesearch/bleve/v2/analysis/lang/cjk" @@ -55,6 +55,13 @@ type Search struct { FulltextIndex bleve.Index } +type Indexer interface { + Index(elements ...IndexElement) error + Delete(eventID string) error + Search(term string, roomIDs, keys []string, limit, from int, orderByStreamPos bool) (*bleve.SearchResult, error) + Close() error +} + // IndexElement describes the layout of an element to index type IndexElement struct { EventID string @@ -77,12 +84,18 @@ func (i *IndexElement) SetContentType(v string) { } // New opens a new/existing fulltext index -func New(cfg config.Fulltext) (fts *Search, err error) { +func New(ctx context.Context, cfg config.Fulltext) (fts *Search, err error) { fts = &Search{} fts.FulltextIndex, err = openIndex(cfg) if err != nil { return nil, err } + go func() { + // Wait for the context (should be from process.ProcessContext) to be + // done, indicating that Dendrite is shutting down. + <-ctx.Done() + _ = fts.Close() + }() return fts, nil } diff --git a/internal/fulltext/bleve_test.go b/internal/fulltext/bleve_test.go index d16397a45..6ce691454 100644 --- a/internal/fulltext/bleve_test.go +++ b/internal/fulltext/bleve_test.go @@ -18,6 +18,7 @@ import ( "reflect" "testing" + "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -25,7 +26,7 @@ import ( "github.com/matrix-org/dendrite/setup/config" ) -func mustOpenIndex(t *testing.T, tempDir string) *fulltext.Search { +func mustOpenIndex(t *testing.T, tempDir string) (*fulltext.Search, *process.ProcessContext) { t.Helper() cfg := config.Fulltext{ Enabled: true, @@ -36,11 +37,12 @@ func mustOpenIndex(t *testing.T, tempDir string) *fulltext.Search { cfg.IndexPath = config.Path(tempDir) cfg.InMemory = false } - fts, err := fulltext.New(cfg) + ctx := process.NewProcessContext() + fts, err := fulltext.New(ctx.Context(), cfg) if err != nil { t.Fatal("failed to open fulltext index:", err) } - return fts + return fts, ctx } func mustAddTestData(t *testing.T, fts *fulltext.Search, firstStreamPos int64) (eventIDs, roomIDs []string) { @@ -93,19 +95,17 @@ func mustAddTestData(t *testing.T, fts *fulltext.Search, firstStreamPos int64) ( func TestOpen(t *testing.T) { dataDir := t.TempDir() - fts := mustOpenIndex(t, dataDir) - if err := fts.Close(); err != nil { - t.Fatal("unable to close fulltext index", err) - } + _, ctx := mustOpenIndex(t, dataDir) + ctx.ShutdownDendrite() // open existing index - fts = mustOpenIndex(t, dataDir) - defer fts.Close() + _, ctx = mustOpenIndex(t, dataDir) + ctx.ShutdownDendrite() } func TestIndex(t *testing.T) { - fts := mustOpenIndex(t, "") - defer fts.Close() + fts, ctx := mustOpenIndex(t, "") + defer ctx.ShutdownDendrite() // add some data var streamPos int64 = 1 @@ -128,8 +128,8 @@ func TestIndex(t *testing.T) { } func TestDelete(t *testing.T) { - fts := mustOpenIndex(t, "") - defer fts.Close() + fts, ctx := mustOpenIndex(t, "") + defer ctx.ShutdownDendrite() eventIDs, roomIDs := mustAddTestData(t, fts, 0) res1, err := fts.Search("lorem", roomIDs[:1], nil, 50, 0, false) if err != nil { @@ -224,7 +224,8 @@ func TestSearch(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - f := mustOpenIndex(t, "") + f, ctx := mustOpenIndex(t, "") + defer ctx.ShutdownDendrite() eventIDs, roomIDs := mustAddTestData(t, f, 0) var searchRooms []string for _, x := range tt.args.roomIndex { diff --git a/internal/fulltext/bleve_wasm.go b/internal/fulltext/bleve_wasm.go index a69a8926e..0053ed8c2 100644 --- a/internal/fulltext/bleve_wasm.go +++ b/internal/fulltext/bleve_wasm.go @@ -15,8 +15,9 @@ package fulltext import ( - "github.com/matrix-org/dendrite/setup/config" "time" + + "github.com/matrix-org/dendrite/setup/config" ) type Search struct{} @@ -28,6 +29,13 @@ type IndexElement struct { StreamPosition int64 } +type Indexer interface { + Index(elements ...IndexElement) error + Delete(eventID string) error + Search(term string, roomIDs, keys []string, limit, from int, orderByStreamPos bool) (SearchResult, error) + Close() error +} + type SearchResult struct { Status interface{} `json:"status"` Request *interface{} `json:"request"` @@ -48,7 +56,7 @@ func (f *Search) Close() error { return nil } -func (f *Search) Index(e IndexElement) error { +func (f *Search) Index(e ...IndexElement) error { return nil } diff --git a/internal/httputil/routing.go b/internal/httputil/routing.go index 0bd3655ec..c733c8ce7 100644 --- a/internal/httputil/routing.go +++ b/internal/httputil/routing.go @@ -15,7 +15,10 @@ package httputil import ( + "net/http" "net/url" + + "github.com/gorilla/mux" ) // URLDecodeMapValues is a function that iterates through each of the items in a @@ -33,3 +36,52 @@ func URLDecodeMapValues(vmap map[string]string) (map[string]string, error) { return decoded, nil } + +type Routers struct { + Client *mux.Router + Federation *mux.Router + Keys *mux.Router + Media *mux.Router + WellKnown *mux.Router + Static *mux.Router + DendriteAdmin *mux.Router + SynapseAdmin *mux.Router +} + +func NewRouters() Routers { + r := Routers{ + Client: mux.NewRouter().SkipClean(true).PathPrefix(PublicClientPathPrefix).Subrouter().UseEncodedPath(), + Federation: mux.NewRouter().SkipClean(true).PathPrefix(PublicFederationPathPrefix).Subrouter().UseEncodedPath(), + Keys: mux.NewRouter().SkipClean(true).PathPrefix(PublicKeyPathPrefix).Subrouter().UseEncodedPath(), + Media: mux.NewRouter().SkipClean(true).PathPrefix(PublicMediaPathPrefix).Subrouter().UseEncodedPath(), + WellKnown: mux.NewRouter().SkipClean(true).PathPrefix(PublicWellKnownPrefix).Subrouter().UseEncodedPath(), + Static: mux.NewRouter().SkipClean(true).PathPrefix(PublicStaticPath).Subrouter().UseEncodedPath(), + DendriteAdmin: mux.NewRouter().SkipClean(true).PathPrefix(DendriteAdminPathPrefix).Subrouter().UseEncodedPath(), + SynapseAdmin: mux.NewRouter().SkipClean(true).PathPrefix(SynapseAdminPathPrefix).Subrouter().UseEncodedPath(), + } + r.configureHTTPErrors() + return r +} + +var NotAllowedHandler = WrapHandlerInCORS(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusMethodNotAllowed) + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"errcode":"M_UNRECOGNIZED","error":"Unrecognized request"}`)) // nolint:misspell +})) + +var NotFoundCORSHandler = WrapHandlerInCORS(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNotFound) + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"errcode":"M_UNRECOGNIZED","error":"Unrecognized request"}`)) // nolint:misspell +})) + +func (r *Routers) configureHTTPErrors() { + for _, router := range []*mux.Router{ + r.Client, r.Federation, r.Keys, + r.Media, r.WellKnown, r.Static, + r.DendriteAdmin, r.SynapseAdmin, + } { + router.NotFoundHandler = NotFoundCORSHandler + router.MethodNotAllowedHandler = NotAllowedHandler + } +} diff --git a/internal/httputil/routing_test.go b/internal/httputil/routing_test.go new file mode 100644 index 000000000..21e2bf48a --- /dev/null +++ b/internal/httputil/routing_test.go @@ -0,0 +1,38 @@ +package httputil + +import ( + "net/http" + "net/http/httptest" + "path/filepath" + "testing" +) + +func TestRoutersError(t *testing.T) { + r := NewRouters() + + // not found test + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, filepath.Join(PublicFederationPathPrefix, "doesnotexist"), nil) + r.Federation.ServeHTTP(rec, req) + if rec.Code != http.StatusNotFound { + t.Fatalf("unexpected status code: %d - %s", rec.Code, rec.Body.String()) + } + if ct := rec.Header().Get("Content-Type"); ct != "application/json" { + t.Fatalf("unexpected content-type: %s", ct) + } + + // not allowed test + r.DendriteAdmin. + Handle("/test", http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {})). + Methods(http.MethodPost) + + rec = httptest.NewRecorder() + req = httptest.NewRequest(http.MethodGet, filepath.Join(DendriteAdminPathPrefix, "test"), nil) + r.DendriteAdmin.ServeHTTP(rec, req) + if rec.Code != http.StatusMethodNotAllowed { + t.Fatalf("unexpected status code: %d - %s", rec.Code, rec.Body.String()) + } + if ct := rec.Header().Get("Content-Type"); ct != "application/json" { + t.Fatalf("unexpected content-type: %s", ct) + } +} diff --git a/internal/sqlutil/connection_manager.go b/internal/sqlutil/connection_manager.go new file mode 100644 index 000000000..cefd9f808 --- /dev/null +++ b/internal/sqlutil/connection_manager.go @@ -0,0 +1,54 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlutil + +import ( + "database/sql" + "fmt" + + "github.com/matrix-org/dendrite/setup/config" +) + +type Connections struct { + db *sql.DB + writer Writer +} + +func NewConnectionManager() Connections { + return Connections{} +} + +func (c *Connections) Connection(dbProperties *config.DatabaseOptions) (*sql.DB, Writer, error) { + writer := NewDummyWriter() + if dbProperties.ConnectionString.IsSQLite() { + writer = NewExclusiveWriter() + } + if dbProperties.ConnectionString != "" || c.db == nil { + var err error + // Open a new database connection using the supplied config. + c.db, err = Open(dbProperties, writer) + if err != nil { + return nil, nil, err + } + c.writer = writer + return c.db, c.writer, nil + } + if c.db != nil && c.writer != nil { + // Ignore the supplied config and return the global pool and + // writer. + return c.db, c.writer, nil + } + return nil, nil, fmt.Errorf("no database connections configured") +} diff --git a/internal/sqlutil/connection_manager_test.go b/internal/sqlutil/connection_manager_test.go new file mode 100644 index 000000000..610629d5e --- /dev/null +++ b/internal/sqlutil/connection_manager_test.go @@ -0,0 +1,56 @@ +package sqlutil_test + +import ( + "reflect" + "testing" + + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/test" +) + +func TestConnectionManager(t *testing.T) { + test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { + conStr, close := test.PrepareDBConnectionString(t, dbType) + t.Cleanup(close) + cm := sqlutil.NewConnectionManager() + + dbProps := &config.DatabaseOptions{ConnectionString: config.DataSource(string(conStr))} + db, writer, err := cm.Connection(dbProps) + if err != nil { + t.Fatal(err) + } + + switch dbType { + case test.DBTypeSQLite: + _, ok := writer.(*sqlutil.ExclusiveWriter) + if !ok { + t.Fatalf("expected exclusive writer") + } + case test.DBTypePostgres: + _, ok := writer.(*sqlutil.DummyWriter) + if !ok { + t.Fatalf("expected dummy writer") + } + } + + // test global db pool + dbGlobal, writerGlobal, err := cm.Connection(&config.DatabaseOptions{}) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(db, dbGlobal) { + t.Fatalf("expected database connection to be reused") + } + if !reflect.DeepEqual(writer, writerGlobal) { + t.Fatalf("expected database writer to be reused") + } + + // test invalid connection string configured + cm = sqlutil.NewConnectionManager() + _, _, err = cm.Connection(&config.DatabaseOptions{ConnectionString: "http://"}) + if err == nil { + t.Fatal("expected an error but got none") + } + }) +} diff --git a/mediaapi/mediaapi.go b/mediaapi/mediaapi.go index 4792c996d..42e0ea88f 100644 --- a/mediaapi/mediaapi.go +++ b/mediaapi/mediaapi.go @@ -32,12 +32,12 @@ func AddPublicRoutes( cfg := &base.Cfg.MediaAPI rateCfg := &base.Cfg.ClientAPI.RateLimiting - mediaDB, err := storage.NewMediaAPIDatasource(base, &cfg.Database) + mediaDB, err := storage.NewMediaAPIDatasource(base.ConnectionManager, &cfg.Database) if err != nil { logrus.WithError(err).Panicf("failed to connect to media db") } routing.Setup( - base.PublicMediaAPIMux, cfg, rateCfg, mediaDB, userAPI, client, + base.Routers.Media, cfg, rateCfg, mediaDB, userAPI, client, ) } diff --git a/mediaapi/routing/upload_test.go b/mediaapi/routing/upload_test.go index 420d0eba9..d4fb45d1b 100644 --- a/mediaapi/routing/upload_test.go +++ b/mediaapi/routing/upload_test.go @@ -9,6 +9,7 @@ import ( "strings" "testing" + "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/mediaapi/fileutils" "github.com/matrix-org/dendrite/mediaapi/storage" "github.com/matrix-org/dendrite/mediaapi/types" @@ -49,8 +50,8 @@ func Test_uploadRequest_doUpload(t *testing.T) { // create testdata folder and remove when done _ = os.Mkdir(testdataPath, os.ModePerm) defer fileutils.RemoveDir(types.Path(testdataPath), nil) - - db, err := storage.NewMediaAPIDatasource(nil, &config.DatabaseOptions{ + cm := sqlutil.NewConnectionManager() + db, err := storage.NewMediaAPIDatasource(cm, &config.DatabaseOptions{ ConnectionString: "file::memory:?cache=shared", MaxOpenConnections: 100, MaxIdleConnections: 2, diff --git a/mediaapi/storage/postgres/mediaapi.go b/mediaapi/storage/postgres/mediaapi.go index 30ec64f84..5b6687743 100644 --- a/mediaapi/storage/postgres/mediaapi.go +++ b/mediaapi/storage/postgres/mediaapi.go @@ -20,13 +20,12 @@ import ( _ "github.com/lib/pq" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/mediaapi/storage/shared" - "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" ) // NewDatabase opens a postgres database. -func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (*shared.Database, error) { - db, writer, err := base.DatabaseConnection(dbProperties, sqlutil.NewDummyWriter()) +func NewDatabase(conMan sqlutil.Connections, dbProperties *config.DatabaseOptions) (*shared.Database, error) { + db, writer, err := conMan.Connection(dbProperties) if err != nil { return nil, err } diff --git a/mediaapi/storage/sqlite3/mediaapi.go b/mediaapi/storage/sqlite3/mediaapi.go index c0ab10e9f..4d484f326 100644 --- a/mediaapi/storage/sqlite3/mediaapi.go +++ b/mediaapi/storage/sqlite3/mediaapi.go @@ -19,13 +19,12 @@ import ( // Import the postgres database driver. "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/mediaapi/storage/shared" - "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" ) // NewDatabase opens a SQLIte database. -func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (*shared.Database, error) { - db, writer, err := base.DatabaseConnection(dbProperties, sqlutil.NewExclusiveWriter()) +func NewDatabase(conMan sqlutil.Connections, dbProperties *config.DatabaseOptions) (*shared.Database, error) { + db, writer, err := conMan.Connection(dbProperties) if err != nil { return nil, err } diff --git a/mediaapi/storage/storage.go b/mediaapi/storage/storage.go index f673ae7e6..8e67af9f9 100644 --- a/mediaapi/storage/storage.go +++ b/mediaapi/storage/storage.go @@ -20,19 +20,19 @@ package storage import ( "fmt" + "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/mediaapi/storage/postgres" "github.com/matrix-org/dendrite/mediaapi/storage/sqlite3" - "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" ) // NewMediaAPIDatasource opens a database connection. -func NewMediaAPIDatasource(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (Database, error) { +func NewMediaAPIDatasource(conMan sqlutil.Connections, dbProperties *config.DatabaseOptions) (Database, error) { switch { case dbProperties.ConnectionString.IsSQLite(): - return sqlite3.NewDatabase(base, dbProperties) + return sqlite3.NewDatabase(conMan, dbProperties) case dbProperties.ConnectionString.IsPostgres(): - return postgres.NewDatabase(base, dbProperties) + return postgres.NewDatabase(conMan, dbProperties) default: return nil, fmt.Errorf("unexpected database type") } diff --git a/mediaapi/storage/storage_test.go b/mediaapi/storage/storage_test.go index 81f0a5d24..11febd275 100644 --- a/mediaapi/storage/storage_test.go +++ b/mediaapi/storage/storage_test.go @@ -5,6 +5,7 @@ import ( "reflect" "testing" + "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/mediaapi/storage" "github.com/matrix-org/dendrite/mediaapi/types" "github.com/matrix-org/dendrite/setup/config" @@ -13,7 +14,8 @@ import ( func mustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, func()) { connStr, close := test.PrepareDBConnectionString(t, dbType) - db, err := storage.NewMediaAPIDatasource(nil, &config.DatabaseOptions{ + cm := sqlutil.NewConnectionManager() + db, err := storage.NewMediaAPIDatasource(cm, &config.DatabaseOptions{ ConnectionString: config.DataSource(connStr), }) if err != nil { diff --git a/mediaapi/storage/storage_wasm.go b/mediaapi/storage/storage_wasm.go index 41e4a28c0..47ee3792c 100644 --- a/mediaapi/storage/storage_wasm.go +++ b/mediaapi/storage/storage_wasm.go @@ -17,16 +17,16 @@ package storage import ( "fmt" + "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/mediaapi/storage/sqlite3" - "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" ) // Open opens a postgres database. -func NewMediaAPIDatasource(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (Database, error) { +func NewMediaAPIDatasource(conMan sqlutil.Connections, dbProperties *config.DatabaseOptions) (Database, error) { switch { case dbProperties.ConnectionString.IsSQLite(): - return sqlite3.NewDatabase(base, dbProperties) + return sqlite3.NewDatabase(conMan, dbProperties) case dbProperties.ConnectionString.IsPostgres(): return nil, fmt.Errorf("can't use Postgres implementation") default: diff --git a/relayapi/relayapi.go b/relayapi/relayapi.go index 200a1814a..925fc031d 100644 --- a/relayapi/relayapi.go +++ b/relayapi/relayapi.go @@ -16,6 +16,7 @@ package relayapi import ( "github.com/matrix-org/dendrite/federationapi/producers" + "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/relayapi/api" "github.com/matrix-org/dendrite/relayapi/internal" "github.com/matrix-org/dendrite/relayapi/routing" @@ -41,7 +42,7 @@ func AddPublicRoutes( } routing.Setup( - base.PublicFederationAPIMux, + base.Routers.Federation, fedCfg, relay, keyRing, @@ -55,10 +56,10 @@ func NewRelayInternalAPI( keyRing *gomatrixserverlib.KeyRing, producer *producers.SyncAPIProducer, relayingEnabled bool, + caches caching.FederationCache, ) api.RelayInternalAPI { cfg := &base.Cfg.RelayAPI - - relayDB, err := storage.NewDatabase(base, &cfg.Database, base.Caches, base.Cfg.Global.IsLocalServerName) + relayDB, err := storage.NewDatabase(base.ConnectionManager, &cfg.Database, caches, base.Cfg.Global.IsLocalServerName) if err != nil { logrus.WithError(err).Panic("failed to connect to relay db") } diff --git a/relayapi/relayapi_test.go b/relayapi/relayapi_test.go index f1b3262aa..e81203098 100644 --- a/relayapi/relayapi_test.go +++ b/relayapi/relayapi_test.go @@ -24,6 +24,7 @@ import ( "github.com/gorilla/mux" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" + "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/relayapi" "github.com/matrix-org/dendrite/test" "github.com/matrix-org/dendrite/test/testrig" @@ -34,9 +35,10 @@ import ( func TestCreateNewRelayInternalAPI(t *testing.T) { test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { base, close := testrig.CreateBaseDendrite(t, dbType) + caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) defer close() - relayAPI := relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil, true) + relayAPI := relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil, true, caches) assert.NotNil(t, relayAPI) }) } @@ -52,7 +54,7 @@ func TestCreateRelayInternalInvalidDatabasePanics(t *testing.T) { defer close() assert.Panics(t, func() { - relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil, true) + relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil, true, nil) }) }) } @@ -107,8 +109,9 @@ func TestCreateRelayPublicRoutes(t *testing.T) { test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { base, close := testrig.CreateBaseDendrite(t, dbType) defer close() + caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) - relayAPI := relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil, true) + relayAPI := relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil, true, caches) assert.NotNil(t, relayAPI) serverKeyAPI := &signing.YggdrasilKeys{} @@ -144,7 +147,7 @@ func TestCreateRelayPublicRoutes(t *testing.T) { for _, tc := range testCases { w := httptest.NewRecorder() - base.PublicFederationAPIMux.ServeHTTP(w, tc.req) + base.Routers.Federation.ServeHTTP(w, tc.req) if w.Code != tc.wantCode { t.Fatalf("%s: got HTTP %d want %d", tc.name, w.Code, tc.wantCode) } @@ -156,8 +159,9 @@ func TestDisableRelayPublicRoutes(t *testing.T) { test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { base, close := testrig.CreateBaseDendrite(t, dbType) defer close() + caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) - relayAPI := relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil, false) + relayAPI := relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil, false, caches) assert.NotNil(t, relayAPI) serverKeyAPI := &signing.YggdrasilKeys{} @@ -183,7 +187,7 @@ func TestDisableRelayPublicRoutes(t *testing.T) { for _, tc := range testCases { w := httptest.NewRecorder() - base.PublicFederationAPIMux.ServeHTTP(w, tc.req) + base.Routers.Federation.ServeHTTP(w, tc.req) if w.Code != tc.wantCode { t.Fatalf("%s: got HTTP %d want %d", tc.name, w.Code, tc.wantCode) } diff --git a/relayapi/storage/postgres/storage.go b/relayapi/storage/postgres/storage.go index 1042beba7..78bbaf1c2 100644 --- a/relayapi/storage/postgres/storage.go +++ b/relayapi/storage/postgres/storage.go @@ -20,7 +20,6 @@ import ( "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/relayapi/storage/shared" - "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/gomatrixserverlib" ) @@ -34,14 +33,14 @@ type Database struct { // NewDatabase opens a new database func NewDatabase( - base *base.BaseDendrite, + conMan sqlutil.Connections, dbProperties *config.DatabaseOptions, cache caching.FederationCache, isLocalServerName func(gomatrixserverlib.ServerName) bool, ) (*Database, error) { var d Database var err error - if d.db, d.writer, err = base.DatabaseConnection(dbProperties, sqlutil.NewDummyWriter()); err != nil { + if d.db, d.writer, err = conMan.Connection(dbProperties); err != nil { return nil, err } queue, err := NewPostgresRelayQueueTable(d.db) diff --git a/relayapi/storage/sqlite3/storage.go b/relayapi/storage/sqlite3/storage.go index 3ed4ab046..da2cf9f7f 100644 --- a/relayapi/storage/sqlite3/storage.go +++ b/relayapi/storage/sqlite3/storage.go @@ -20,7 +20,6 @@ import ( "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/relayapi/storage/shared" - "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/gomatrixserverlib" ) @@ -34,14 +33,14 @@ type Database struct { // NewDatabase opens a new database func NewDatabase( - base *base.BaseDendrite, + conMan sqlutil.Connections, dbProperties *config.DatabaseOptions, cache caching.FederationCache, isLocalServerName func(gomatrixserverlib.ServerName) bool, ) (*Database, error) { var d Database var err error - if d.db, d.writer, err = base.DatabaseConnection(dbProperties, sqlutil.NewExclusiveWriter()); err != nil { + if d.db, d.writer, err = conMan.Connection(dbProperties); err != nil { return nil, err } queue, err := NewSQLiteRelayQueueTable(d.db) diff --git a/relayapi/storage/storage.go b/relayapi/storage/storage.go index 16ecbcfb7..17d9e6c75 100644 --- a/relayapi/storage/storage.go +++ b/relayapi/storage/storage.go @@ -21,25 +21,25 @@ import ( "fmt" "github.com/matrix-org/dendrite/internal/caching" + "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/relayapi/storage/postgres" "github.com/matrix-org/dendrite/relayapi/storage/sqlite3" - "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/gomatrixserverlib" ) // NewDatabase opens a new database func NewDatabase( - base *base.BaseDendrite, + conMan sqlutil.Connections, dbProperties *config.DatabaseOptions, cache caching.FederationCache, isLocalServerName func(gomatrixserverlib.ServerName) bool, ) (Database, error) { switch { case dbProperties.ConnectionString.IsSQLite(): - return sqlite3.NewDatabase(base, dbProperties, cache, isLocalServerName) + return sqlite3.NewDatabase(conMan, dbProperties, cache, isLocalServerName) case dbProperties.ConnectionString.IsPostgres(): - return postgres.NewDatabase(base, dbProperties, cache, isLocalServerName) + return postgres.NewDatabase(conMan, dbProperties, cache, isLocalServerName) default: return nil, fmt.Errorf("unexpected database type") } diff --git a/relayapi/storage/storage_wasm.go b/relayapi/storage/storage_wasm.go new file mode 100644 index 000000000..92847e1ba --- /dev/null +++ b/relayapi/storage/storage_wasm.go @@ -0,0 +1,42 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "fmt" + + "github.com/matrix-org/dendrite/internal/caching" + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/relayapi/storage/sqlite3" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/gomatrixserverlib" +) + +// NewDatabase opens a new database +func NewDatabase( + conMan sqlutil.Connections, + dbProperties *config.DatabaseOptions, + cache caching.FederationCache, + isLocalServerName func(gomatrixserverlib.ServerName) bool, +) (Database, error) { + switch { + case dbProperties.ConnectionString.IsSQLite(): + return sqlite3.NewDatabase(conMan, dbProperties, cache, isLocalServerName) + case dbProperties.ConnectionString.IsPostgres(): + return nil, fmt.Errorf("can't use Postgres implementation") + default: + return nil, fmt.Errorf("unexpected database type") + } +} diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go index c43b9d049..2e987d681 100644 --- a/roomserver/internal/api.go +++ b/roomserver/internal/api.go @@ -60,7 +60,7 @@ type RoomserverInternalAPI struct { func NewRoomserverAPI( base *base.BaseDendrite, roomserverDB storage.Database, - js nats.JetStreamContext, nc *nats.Conn, + js nats.JetStreamContext, nc *nats.Conn, caches caching.RoomServerCaches, ) *RoomserverInternalAPI { var perspectiveServerNames []gomatrixserverlib.ServerName for _, kp := range base.Cfg.FederationAPI.KeyPerspectives { @@ -78,7 +78,7 @@ func NewRoomserverAPI( DB: roomserverDB, Base: base, Cfg: &base.Cfg.RoomServer, - Cache: base.Caches, + Cache: caches, ServerName: base.Cfg.Global.ServerName, PerspectiveServerNames: perspectiveServerNames, InputRoomEventTopic: base.Cfg.Global.JetStream.Prefixed(jetstream.InputRoomEvent), @@ -89,7 +89,7 @@ func NewRoomserverAPI( ServerACLs: serverACLs, Queryer: &query.Queryer{ DB: roomserverDB, - Cache: base.Caches, + Cache: caches, IsLocalServerName: base.Cfg.Global.IsLocalServerName, ServerACLs: serverACLs, }, diff --git a/roomserver/internal/helpers/helpers_test.go b/roomserver/internal/helpers/helpers_test.go index c056e704c..03a8bf575 100644 --- a/roomserver/internal/helpers/helpers_test.go +++ b/roomserver/internal/helpers/helpers_test.go @@ -3,26 +3,28 @@ package helpers import ( "context" "testing" + "time" + "github.com/matrix-org/dendrite/internal/caching" + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/setup/config" "github.com/stretchr/testify/assert" "github.com/matrix-org/dendrite/roomserver/types" - "github.com/matrix-org/dendrite/setup/base" - - "github.com/matrix-org/dendrite/test" - "github.com/matrix-org/dendrite/test/testrig" - "github.com/matrix-org/dendrite/roomserver/storage" + "github.com/matrix-org/dendrite/test" ) -func mustCreateDatabase(t *testing.T, dbType test.DBType) (*base.BaseDendrite, storage.Database, func()) { - base, close := testrig.CreateBaseDendrite(t, dbType) - db, err := storage.Open(base, &base.Cfg.RoomServer.Database, base.Caches) +func mustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, func()) { + conStr, close := test.PrepareDBConnectionString(t, dbType) + caches := caching.NewRistrettoCache(8*1024*1024, time.Hour, caching.DisableMetrics) + cm := sqlutil.NewConnectionManager() + db, err := storage.Open(context.Background(), cm, &config.DatabaseOptions{ConnectionString: config.DataSource(conStr)}, caches) if err != nil { t.Fatalf("failed to create Database: %v", err) } - return base, db, close + return db, close } func TestIsInvitePendingWithoutNID(t *testing.T) { @@ -32,7 +34,7 @@ func TestIsInvitePendingWithoutNID(t *testing.T) { room := test.NewRoom(t, alice, test.RoomPreset(test.PresetPublicChat)) _ = bob test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { - _, db, close := mustCreateDatabase(t, dbType) + db, close := mustCreateDatabase(t, dbType) defer close() // store all events diff --git a/roomserver/internal/input/input_test.go b/roomserver/internal/input/input_test.go index 4708560ac..555ec9c6d 100644 --- a/roomserver/internal/input/input_test.go +++ b/roomserver/internal/input/input_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/matrix-org/dendrite/internal/caching" + "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/internal/input" "github.com/matrix-org/dendrite/roomserver/storage" @@ -48,14 +49,15 @@ func TestSingleTransactionOnInput(t *testing.T) { Kind: api.KindOutlier, // don't panic if we generate an output event Event: event.Headered(gomatrixserverlib.RoomVersionV6), } + cm := sqlutil.NewConnectionManager() db, err := storage.Open( - nil, + context.Background(), cm, &config.DatabaseOptions{ ConnectionString: "", MaxOpenConnections: 1, MaxIdleConnections: 1, }, - caching.NewRistrettoCache(8*1024*1024, time.Hour, false), + caching.NewRistrettoCache(8*1024*1024, time.Hour, caching.DisableMetrics), ) if err != nil { t.Logf("PostgreSQL not available (%s), skipping", err) diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go index 5a8d8b570..1c55423ef 100644 --- a/roomserver/roomserver.go +++ b/roomserver/roomserver.go @@ -15,6 +15,7 @@ package roomserver import ( + "github.com/matrix-org/dendrite/internal/caching" "github.com/sirupsen/logrus" "github.com/matrix-org/dendrite/roomserver/api" @@ -26,10 +27,11 @@ import ( // NewInternalAPI returns a concrete implementation of the internal API. func NewInternalAPI( base *base.BaseDendrite, + caches caching.RoomServerCaches, ) api.RoomserverInternalAPI { cfg := &base.Cfg.RoomServer - roomserverDB, err := storage.Open(base, &cfg.Database, base.Caches) + roomserverDB, err := storage.Open(base.ProcessContext.Context(), base.ConnectionManager, &cfg.Database, caches) if err != nil { logrus.WithError(err).Panicf("failed to connect to room server db") } @@ -37,6 +39,6 @@ func NewInternalAPI( js, nc := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) return internal.NewRoomserverAPI( - base, roomserverDB, js, nc, + base, roomserverDB, js, nc, caches, ) } diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go index a3ca5909e..1b0b3155d 100644 --- a/roomserver/roomserver_test.go +++ b/roomserver/roomserver_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/matrix-org/dendrite/internal/caching" "github.com/stretchr/testify/assert" "github.com/matrix-org/dendrite/roomserver/state" @@ -32,7 +33,8 @@ import ( func mustCreateDatabase(t *testing.T, dbType test.DBType) (*base.BaseDendrite, storage.Database, func()) { t.Helper() base, close := testrig.CreateBaseDendrite(t, dbType) - db, err := storage.Open(base, &base.Cfg.RoomServer.Database, base.Caches) + caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) + db, err := storage.Open(base.ProcessContext.Context(), base.ConnectionManager, &base.Cfg.RoomServer.Database, caches) if err != nil { t.Fatalf("failed to create Database: %v", err) } @@ -43,7 +45,8 @@ func TestUsers(t *testing.T) { test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { base, close := testrig.CreateBaseDendrite(t, dbType) defer close() - rsAPI := roomserver.NewInternalAPI(base) + caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) + rsAPI := roomserver.NewInternalAPI(base, caches) // SetFederationAPI starts the room event input consumer rsAPI.SetFederationAPI(nil, nil) @@ -181,7 +184,8 @@ func Test_QueryLeftUsers(t *testing.T) { base, _, close := mustCreateDatabase(t, dbType) defer close() - rsAPI := roomserver.NewInternalAPI(base) + caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) + rsAPI := roomserver.NewInternalAPI(base, caches) // SetFederationAPI starts the room event input consumer rsAPI.SetFederationAPI(nil, nil) // Create the room @@ -232,12 +236,13 @@ func TestPurgeRoom(t *testing.T) { defer jetstream.DeleteAllStreams(jsCtx, &base.Cfg.Global.JetStream) fedClient := base.CreateFederationClient() - rsAPI := roomserver.NewInternalAPI(base) + caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) + rsAPI := roomserver.NewInternalAPI(base, caches) userAPI := userapi.NewInternalAPI(base, rsAPI, nil) // this starts the JetStream consumers - syncapi.AddPublicRoutes(base, userAPI, rsAPI) - federationapi.NewInternalAPI(base, fedClient, rsAPI, base.Caches, nil, true) + syncapi.AddPublicRoutes(base, userAPI, rsAPI, caches) + federationapi.NewInternalAPI(base, fedClient, rsAPI, caches, nil, true) rsAPI.SetFederationAPI(nil, nil) // Create the room diff --git a/roomserver/storage/postgres/storage.go b/roomserver/storage/postgres/storage.go index d98a5cf97..19cde5410 100644 --- a/roomserver/storage/postgres/storage.go +++ b/roomserver/storage/postgres/storage.go @@ -28,7 +28,6 @@ import ( "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/roomserver/storage/postgres/deltas" "github.com/matrix-org/dendrite/roomserver/storage/shared" - "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" ) @@ -38,10 +37,10 @@ type Database struct { } // Open a postgres database. -func Open(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches) (*Database, error) { +func Open(ctx context.Context, conMan sqlutil.Connections, dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches) (*Database, error) { var d Database var err error - db, writer, err := base.DatabaseConnection(dbProperties, sqlutil.NewDummyWriter()) + db, writer, err := conMan.Connection(dbProperties) if err != nil { return nil, fmt.Errorf("sqlutil.Open: %w", err) } @@ -53,7 +52,7 @@ func Open(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, cache c // Special case, since this migration uses several tables, so it needs to // be sure that all tables are created first. - if err = executeMigration(base.Context(), db); err != nil { + if err = executeMigration(ctx, db); err != nil { return nil, err } diff --git a/roomserver/storage/sqlite3/storage.go b/roomserver/storage/sqlite3/storage.go index 2adedd2d8..89e16fc14 100644 --- a/roomserver/storage/sqlite3/storage.go +++ b/roomserver/storage/sqlite3/storage.go @@ -28,7 +28,6 @@ import ( "github.com/matrix-org/dendrite/roomserver/storage/shared" "github.com/matrix-org/dendrite/roomserver/storage/sqlite3/deltas" "github.com/matrix-org/dendrite/roomserver/types" - "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" ) @@ -38,10 +37,10 @@ type Database struct { } // Open a sqlite database. -func Open(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches) (*Database, error) { +func Open(ctx context.Context, conMan sqlutil.Connections, dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches) (*Database, error) { var d Database var err error - db, writer, err := base.DatabaseConnection(dbProperties, sqlutil.NewExclusiveWriter()) + db, writer, err := conMan.Connection(dbProperties) if err != nil { return nil, fmt.Errorf("sqlutil.Open: %w", err) } @@ -62,7 +61,7 @@ func Open(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, cache c // Special case, since this migration uses several tables, so it needs to // be sure that all tables are created first. - if err = executeMigration(base.Context(), db); err != nil { + if err = executeMigration(ctx, db); err != nil { return nil, err } diff --git a/roomserver/storage/storage.go b/roomserver/storage/storage.go index 8a87b7d7c..2b3b3bd85 100644 --- a/roomserver/storage/storage.go +++ b/roomserver/storage/storage.go @@ -18,22 +18,23 @@ package storage import ( + "context" "fmt" "github.com/matrix-org/dendrite/internal/caching" + "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/roomserver/storage/postgres" "github.com/matrix-org/dendrite/roomserver/storage/sqlite3" - "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" ) // Open opens a database connection. -func Open(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches) (Database, error) { +func Open(ctx context.Context, conMan sqlutil.Connections, dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches) (Database, error) { switch { case dbProperties.ConnectionString.IsSQLite(): - return sqlite3.Open(base, dbProperties, cache) + return sqlite3.Open(ctx, conMan, dbProperties, cache) case dbProperties.ConnectionString.IsPostgres(): - return postgres.Open(base, dbProperties, cache) + return postgres.Open(ctx, conMan, dbProperties, cache) default: return nil, fmt.Errorf("unexpected database type") } diff --git a/roomserver/storage/storage_wasm.go b/roomserver/storage/storage_wasm.go index df5a56ac3..817f9304c 100644 --- a/roomserver/storage/storage_wasm.go +++ b/roomserver/storage/storage_wasm.go @@ -15,19 +15,20 @@ package storage import ( + "context" "fmt" "github.com/matrix-org/dendrite/internal/caching" + "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/roomserver/storage/sqlite3" - "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" ) // NewPublicRoomsServerDatabase opens a database connection. -func Open(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches) (Database, error) { +func Open(ctx context.Context, conMan sqlutil.Connections, dbProperties *config.DatabaseOptions, cache caching.RoomServerCaches) (Database, error) { switch { case dbProperties.ConnectionString.IsSQLite(): - return sqlite3.Open(base, dbProperties, cache) + return sqlite3.Open(ctx, conMan, dbProperties, cache) case dbProperties.ConnectionString.IsPostgres(): return nil, fmt.Errorf("can't use Postgres implementation") default: diff --git a/setup/base/base.go b/setup/base/base.go index dfe48ff3c..8c9b06d0e 100644 --- a/setup/base/base.go +++ b/setup/base/base.go @@ -17,7 +17,6 @@ package base import ( "bytes" "context" - "database/sql" "embed" "encoding/json" "errors" @@ -36,15 +35,13 @@ import ( "github.com/getsentry/sentry-go" sentryhttp "github.com/getsentry/sentry-go/http" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/gomatrixserverlib" "github.com/prometheus/client_golang/prometheus/promhttp" "go.uber.org/atomic" "github.com/matrix-org/dendrite/internal" - "github.com/matrix-org/dendrite/internal/caching" - "github.com/matrix-org/dendrite/internal/fulltext" "github.com/matrix-org/dendrite/internal/httputil" - "github.com/matrix-org/dendrite/internal/pushgateway" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/gorilla/mux" @@ -53,7 +50,6 @@ import ( "github.com/sirupsen/logrus" "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" ) @@ -67,24 +63,14 @@ var staticContent embed.FS // Must be closed when shutting down. type BaseDendrite struct { *process.ProcessContext - tracerCloser io.Closer - PublicClientAPIMux *mux.Router - PublicFederationAPIMux *mux.Router - PublicKeyAPIMux *mux.Router - PublicMediaAPIMux *mux.Router - PublicWellKnownAPIMux *mux.Router - PublicStaticMux *mux.Router - DendriteAdminMux *mux.Router - SynapseAdminMux *mux.Router - NATS *jetstream.NATSInstance - Cfg *config.Dendrite - Caches *caching.Caches - DNSCache *gomatrixserverlib.DNSCache - Database *sql.DB - DatabaseWriter sqlutil.Writer - EnableMetrics bool - Fulltext *fulltext.Search - startupLock sync.Mutex + tracerCloser io.Closer + Routers httputil.Routers + NATS *jetstream.NATSInstance + Cfg *config.Dendrite + DNSCache *gomatrixserverlib.DNSCache + ConnectionManager sqlutil.Connections + EnableMetrics bool + startupLock sync.Mutex } const HTTPServerTimeout = time.Minute * 5 @@ -130,14 +116,6 @@ func NewBaseDendrite(cfg *config.Dendrite, options ...BaseDendriteOptions) *Base logrus.WithError(err).Panicf("failed to start opentracing") } - var fts *fulltext.Search - if cfg.SyncAPI.Fulltext.Enabled { - fts, err = fulltext.New(cfg.SyncAPI.Fulltext) - if err != nil { - logrus.WithError(err).Panicf("failed to create full text") - } - } - if cfg.Global.Sentry.Enabled { logrus.Info("Setting up Sentry for debugging...") err = sentry.Init(sentry.ClientOptions{ @@ -169,14 +147,13 @@ func NewBaseDendrite(cfg *config.Dendrite, options ...BaseDendriteOptions) *Base // If we're in monolith mode, we'll set up a global pool of database // connections. A component is welcome to use this pool if they don't // have a separate database config of their own. - var db *sql.DB - var writer sqlutil.Writer + cm := sqlutil.NewConnectionManager() if cfg.Global.DatabaseOptions.ConnectionString != "" { if cfg.Global.DatabaseOptions.ConnectionString.IsSQLite() { logrus.Panic("Using a global database connection pool is not supported with SQLite databases") } - writer = sqlutil.NewDummyWriter() - if db, err = sqlutil.Open(&cfg.Global.DatabaseOptions, writer); err != nil { + _, _, err := cm.Connection(&cfg.Global.DatabaseOptions) + if err != nil { logrus.WithError(err).Panic("Failed to set up global database connections") } logrus.Debug("Using global database connection pool") @@ -195,24 +172,14 @@ func NewBaseDendrite(cfg *config.Dendrite, options ...BaseDendriteOptions) *Base // directory traversal attack e.g /../../../etc/passwd return &BaseDendrite{ - ProcessContext: process.NewProcessContext(), - tracerCloser: closer, - Cfg: cfg, - Caches: caching.NewRistrettoCache(cfg.Global.Cache.EstimatedMaxSize, cfg.Global.Cache.MaxAge, enableMetrics), - DNSCache: dnsCache, - PublicClientAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.PublicClientPathPrefix).Subrouter().UseEncodedPath(), - PublicFederationAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.PublicFederationPathPrefix).Subrouter().UseEncodedPath(), - PublicKeyAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.PublicKeyPathPrefix).Subrouter().UseEncodedPath(), - PublicMediaAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.PublicMediaPathPrefix).Subrouter().UseEncodedPath(), - PublicWellKnownAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.PublicWellKnownPrefix).Subrouter().UseEncodedPath(), - PublicStaticMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.PublicStaticPath).Subrouter().UseEncodedPath(), - DendriteAdminMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.DendriteAdminPathPrefix).Subrouter().UseEncodedPath(), - SynapseAdminMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.SynapseAdminPathPrefix).Subrouter().UseEncodedPath(), - NATS: &jetstream.NATSInstance{}, - Database: db, // set if monolith with global connection pool only - DatabaseWriter: writer, // set if monolith with global connection pool only - EnableMetrics: enableMetrics, - Fulltext: fts, + ProcessContext: process.NewProcessContext(), + tracerCloser: closer, + Cfg: cfg, + DNSCache: dnsCache, + Routers: httputil.NewRouters(), + NATS: &jetstream.NATSInstance{}, + ConnectionManager: cm, + EnableMetrics: enableMetrics, } } @@ -223,34 +190,6 @@ func (b *BaseDendrite) Close() error { return b.tracerCloser.Close() } -// DatabaseConnection assists in setting up a database connection. It accepts -// the database properties and a new writer for the given component. If we're -// running in monolith mode with a global connection pool configured then we -// will return that connection, along with the global writer, effectively -// ignoring the options provided. Otherwise we'll open a new database connection -// using the supplied options and writer. Note that it's possible for the pointer -// receiver to be nil here – that's deliberate as some of the unit tests don't -// have a BaseDendrite and just want a connection with the supplied config -// without any pooling stuff. -func (b *BaseDendrite) DatabaseConnection(dbProperties *config.DatabaseOptions, writer sqlutil.Writer) (*sql.DB, sqlutil.Writer, error) { - if dbProperties.ConnectionString != "" || b == nil { - // Open a new database connection using the supplied config. - db, err := sqlutil.Open(dbProperties, writer) - return db, writer, err - } - if b.Database != nil && b.DatabaseWriter != nil { - // Ignore the supplied config and return the global pool and - // writer. - return b.Database, b.DatabaseWriter, nil - } - return nil, nil, fmt.Errorf("no database connections configured") -} - -// PushGatewayHTTPClient returns a new client for interacting with (external) Push Gateways. -func (b *BaseDendrite) PushGatewayHTTPClient() pushgateway.Client { - return pushgateway.NewHTTPClient(b.Cfg.UserAPI.PushGatewayDisableTLSValidation) -} - // CreateClient creates a new client (normally used for media fetch requests). // Should only be called once per component. func (b *BaseDendrite) CreateClient() *gomatrixserverlib.Client { @@ -295,40 +234,11 @@ func (b *BaseDendrite) CreateFederationClient() *gomatrixserverlib.FederationCli return client } -func (b *BaseDendrite) configureHTTPErrors() { - notAllowedHandler := func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusMethodNotAllowed) - _, _ = w.Write([]byte(fmt.Sprintf("405 %s not allowed on this endpoint", r.Method))) - } - - clientNotFoundHandler := func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusNotFound) - w.Header().Set("Content-Type", "application/json") - _, _ = w.Write([]byte(`{"errcode":"M_UNRECOGNIZED","error":"Unrecognized request"}`)) // nolint:misspell - } - - notFoundCORSHandler := httputil.WrapHandlerInCORS(http.NotFoundHandler()) - notAllowedCORSHandler := httputil.WrapHandlerInCORS(http.HandlerFunc(notAllowedHandler)) - - for _, router := range []*mux.Router{ - b.PublicMediaAPIMux, b.DendriteAdminMux, - b.SynapseAdminMux, b.PublicWellKnownAPIMux, - b.PublicStaticMux, - } { - router.NotFoundHandler = notFoundCORSHandler - router.MethodNotAllowedHandler = notAllowedCORSHandler - } - - // Special case so that we don't upset clients on the CS API. - b.PublicClientAPIMux.NotFoundHandler = http.HandlerFunc(clientNotFoundHandler) - b.PublicClientAPIMux.MethodNotAllowedHandler = http.HandlerFunc(clientNotFoundHandler) -} - func (b *BaseDendrite) ConfigureAdminEndpoints() { - b.DendriteAdminMux.HandleFunc("/monitor/up", func(w http.ResponseWriter, r *http.Request) { + b.Routers.DendriteAdmin.HandleFunc("/monitor/up", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(200) }) - b.DendriteAdminMux.HandleFunc("/monitor/health", func(w http.ResponseWriter, r *http.Request) { + b.Routers.DendriteAdmin.HandleFunc("/monitor/health", func(w http.ResponseWriter, r *http.Request) { if isDegraded, reasons := b.ProcessContext.IsDegraded(); isDegraded { w.WriteHeader(503) _ = json.NewEncoder(w).Encode(struct { @@ -363,8 +273,6 @@ func (b *BaseDendrite) SetupAndServeHTTP( }, } - b.configureHTTPErrors() - //Redirect for Landing Page externalRouter.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { http.Redirect(w, r, httputil.PublicStaticPath, http.StatusFound) @@ -385,39 +293,42 @@ func (b *BaseDendrite) SetupAndServeHTTP( logrus.WithError(err).Fatal("failed to execute landing page template") } - b.PublicStaticMux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + b.Routers.Static.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { _, _ = w.Write(landingPage.Bytes()) }) var clientHandler http.Handler - clientHandler = b.PublicClientAPIMux + clientHandler = b.Routers.Client if b.Cfg.Global.Sentry.Enabled { sentryHandler := sentryhttp.New(sentryhttp.Options{ Repanic: true, }) - clientHandler = sentryHandler.Handle(b.PublicClientAPIMux) + clientHandler = sentryHandler.Handle(b.Routers.Client) } var federationHandler http.Handler - federationHandler = b.PublicFederationAPIMux + federationHandler = b.Routers.Federation if b.Cfg.Global.Sentry.Enabled { sentryHandler := sentryhttp.New(sentryhttp.Options{ Repanic: true, }) - federationHandler = sentryHandler.Handle(b.PublicFederationAPIMux) + federationHandler = sentryHandler.Handle(b.Routers.Federation) } - externalRouter.PathPrefix(httputil.DendriteAdminPathPrefix).Handler(b.DendriteAdminMux) + externalRouter.PathPrefix(httputil.DendriteAdminPathPrefix).Handler(b.Routers.DendriteAdmin) externalRouter.PathPrefix(httputil.PublicClientPathPrefix).Handler(clientHandler) if !b.Cfg.Global.DisableFederation { - externalRouter.PathPrefix(httputil.PublicKeyPathPrefix).Handler(b.PublicKeyAPIMux) + externalRouter.PathPrefix(httputil.PublicKeyPathPrefix).Handler(b.Routers.Keys) externalRouter.PathPrefix(httputil.PublicFederationPathPrefix).Handler(federationHandler) } - externalRouter.PathPrefix(httputil.SynapseAdminPathPrefix).Handler(b.SynapseAdminMux) - externalRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(b.PublicMediaAPIMux) - externalRouter.PathPrefix(httputil.PublicWellKnownPrefix).Handler(b.PublicWellKnownAPIMux) - externalRouter.PathPrefix(httputil.PublicStaticPath).Handler(b.PublicStaticMux) + externalRouter.PathPrefix(httputil.SynapseAdminPathPrefix).Handler(b.Routers.SynapseAdmin) + externalRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(b.Routers.Media) + externalRouter.PathPrefix(httputil.PublicWellKnownPrefix).Handler(b.Routers.WellKnown) + externalRouter.PathPrefix(httputil.PublicStaticPath).Handler(b.Routers.Static) b.startupLock.Unlock() + externalRouter.NotFoundHandler = httputil.NotFoundCORSHandler + externalRouter.MethodNotAllowedHandler = httputil.NotAllowedHandler + if externalHTTPAddr.Enabled() { go func() { var externalShutdown atomic.Bool // RegisterOnShutdown can be called more than once @@ -493,12 +404,6 @@ func (b *BaseDendrite) WaitForShutdown() { logrus.Warnf("failed to flush all Sentry events!") } } - if b.Fulltext != nil { - err := b.Fulltext.Close() - if err != nil { - logrus.Warnf("failed to close full text search!") - } - } logrus.Warnf("Dendrite is exiting now") } diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go index 01fec9ad6..48683789b 100644 --- a/setup/jetstream/nats.go +++ b/setup/jetstream/nats.go @@ -20,6 +20,8 @@ import ( type NATSInstance struct { *natsserver.Server + nc *natsclient.Conn + js natsclient.JetStreamContext } var natsLock sync.Mutex @@ -69,11 +71,18 @@ func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetS if !s.ReadyForConnections(time.Second * 10) { logrus.Fatalln("NATS did not start in time") } + // reuse existing connections + if s.nc != nil { + return s.js, s.nc + } nc, err := natsclient.Connect("", natsclient.InProcessServer(s)) if err != nil { logrus.Fatalln("Failed to create NATS client") } - return setupNATS(process, cfg, nc) + js, _ := setupNATS(process, cfg, nc) + s.js = js + s.nc = nc + return js, nc } func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStreamContext, *natsclient.Conn) { diff --git a/setup/monolith.go b/setup/monolith.go index d8c652234..174eba680 100644 --- a/setup/monolith.go +++ b/setup/monolith.go @@ -20,6 +20,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/api" "github.com/matrix-org/dendrite/federationapi" federationAPI "github.com/matrix-org/dendrite/federationapi/api" + "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/transactions" "github.com/matrix-org/dendrite/mediaapi" "github.com/matrix-org/dendrite/relayapi" @@ -52,7 +53,7 @@ type Monolith struct { } // AddAllPublicRoutes attaches all public paths to the given router -func (m *Monolith) AddAllPublicRoutes(base *base.BaseDendrite) { +func (m *Monolith) AddAllPublicRoutes(base *base.BaseDendrite, caches *caching.Caches) { userDirectoryProvider := m.ExtUserDirectoryProvider if userDirectoryProvider == nil { userDirectoryProvider = m.UserAPI @@ -66,7 +67,7 @@ func (m *Monolith) AddAllPublicRoutes(base *base.BaseDendrite) { base, m.UserAPI, m.FedClient, m.KeyRing, m.RoomserverAPI, m.FederationAPI, nil, ) mediaapi.AddPublicRoutes(base, m.UserAPI, m.Client) - syncapi.AddPublicRoutes(base, m.UserAPI, m.RoomserverAPI) + syncapi.AddPublicRoutes(base, m.UserAPI, m.RoomserverAPI, caches) if m.RelayAPI != nil { relayapi.AddPublicRoutes(base, m.KeyRing, m.RelayAPI) diff --git a/setup/mscs/msc2836/msc2836.go b/setup/mscs/msc2836/msc2836.go index 4bb6a5eee..7c1e0fc61 100644 --- a/setup/mscs/msc2836/msc2836.go +++ b/setup/mscs/msc2836/msc2836.go @@ -102,7 +102,7 @@ func Enable( base *base.BaseDendrite, rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationInternalAPI, userAPI userapi.UserInternalAPI, keyRing gomatrixserverlib.JSONVerifier, ) error { - db, err := NewDatabase(base, &base.Cfg.MSCs.Database) + db, err := NewDatabase(base.ConnectionManager, &base.Cfg.MSCs.Database) if err != nil { return fmt.Errorf("cannot enable MSC2836: %w", err) } @@ -125,11 +125,11 @@ func Enable( } }) - base.PublicClientAPIMux.Handle("/unstable/event_relationships", + base.Routers.Client.Handle("/unstable/event_relationships", httputil.MakeAuthAPI("eventRelationships", userAPI, eventRelationshipHandler(db, rsAPI, fsAPI)), ).Methods(http.MethodPost, http.MethodOptions) - base.PublicFederationAPIMux.Handle("/unstable/event_relationships", httputil.MakeExternalAPI( + base.Routers.Federation.Handle("/unstable/event_relationships", httputil.MakeExternalAPI( "msc2836_event_relationships", func(req *http.Request) util.JSONResponse { fedReq, errResp := gomatrixserverlib.VerifyHTTPRequest( req, time.Now(), base.Cfg.Global.ServerName, base.Cfg.Global.IsLocalServerName, keyRing, diff --git a/setup/mscs/msc2836/msc2836_test.go b/setup/mscs/msc2836/msc2836_test.go index f12fbbfcb..24e96f931 100644 --- a/setup/mscs/msc2836/msc2836_test.go +++ b/setup/mscs/msc2836/msc2836_test.go @@ -19,6 +19,7 @@ import ( "github.com/matrix-org/dendrite/internal/hooks" "github.com/matrix-org/dendrite/internal/httputil" + "github.com/matrix-org/dendrite/internal/sqlutil" roomserver "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" @@ -554,10 +555,11 @@ func injectEvents(t *testing.T, userAPI userapi.UserInternalAPI, rsAPI roomserve cfg.Global.ServerName = "localhost" cfg.MSCs.Database.ConnectionString = "file:msc2836_test.db" cfg.MSCs.MSCs = []string{"msc2836"} + cm := sqlutil.NewConnectionManager() base := &base.BaseDendrite{ - Cfg: cfg, - PublicClientAPIMux: mux.NewRouter().PathPrefix(httputil.PublicClientPathPrefix).Subrouter(), - PublicFederationAPIMux: mux.NewRouter().PathPrefix(httputil.PublicFederationPathPrefix).Subrouter(), + Cfg: cfg, + Routers: httputil.NewRouters(), + ConnectionManager: cm, } err := msc2836.Enable(base, rsAPI, nil, userAPI, nil) @@ -567,7 +569,7 @@ func injectEvents(t *testing.T, userAPI userapi.UserInternalAPI, rsAPI roomserve for _, ev := range events { hooks.Run(hooks.KindNewEventPersisted, ev) } - return base.PublicClientAPIMux + return base.Routers.Client } type fledglingEvent struct { diff --git a/setup/mscs/msc2836/storage.go b/setup/mscs/msc2836/storage.go index 827e82f70..1cf7e8785 100644 --- a/setup/mscs/msc2836/storage.go +++ b/setup/mscs/msc2836/storage.go @@ -8,7 +8,6 @@ import ( "encoding/json" "github.com/matrix-org/dendrite/internal/sqlutil" - "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -59,17 +58,17 @@ type DB struct { } // NewDatabase loads the database for msc2836 -func NewDatabase(base *base.BaseDendrite, dbOpts *config.DatabaseOptions) (Database, error) { +func NewDatabase(conMan sqlutil.Connections, dbOpts *config.DatabaseOptions) (Database, error) { if dbOpts.ConnectionString.IsPostgres() { - return newPostgresDatabase(base, dbOpts) + return newPostgresDatabase(conMan, dbOpts) } - return newSQLiteDatabase(base, dbOpts) + return newSQLiteDatabase(conMan, dbOpts) } -func newPostgresDatabase(base *base.BaseDendrite, dbOpts *config.DatabaseOptions) (Database, error) { +func newPostgresDatabase(conMan sqlutil.Connections, dbOpts *config.DatabaseOptions) (Database, error) { d := DB{} var err error - if d.db, d.writer, err = base.DatabaseConnection(dbOpts, sqlutil.NewDummyWriter()); err != nil { + if d.db, d.writer, err = conMan.Connection(dbOpts); err != nil { return nil, err } _, err = d.db.Exec(` @@ -144,10 +143,10 @@ func newPostgresDatabase(base *base.BaseDendrite, dbOpts *config.DatabaseOptions return &d, err } -func newSQLiteDatabase(base *base.BaseDendrite, dbOpts *config.DatabaseOptions) (Database, error) { +func newSQLiteDatabase(conMan sqlutil.Connections, dbOpts *config.DatabaseOptions) (Database, error) { d := DB{} var err error - if d.db, d.writer, err = base.DatabaseConnection(dbOpts, sqlutil.NewExclusiveWriter()); err != nil { + if d.db, d.writer, err = conMan.Connection(dbOpts); err != nil { return nil, err } _, err = d.db.Exec(` diff --git a/setup/mscs/msc2946/msc2946.go b/setup/mscs/msc2946/msc2946.go index 56c063598..b4b93ff39 100644 --- a/setup/mscs/msc2946/msc2946.go +++ b/setup/mscs/msc2946/msc2946.go @@ -58,8 +58,8 @@ func Enable( fsAPI fs.FederationInternalAPI, keyRing gomatrixserverlib.JSONVerifier, cache caching.SpaceSummaryRoomsCache, ) error { clientAPI := httputil.MakeAuthAPI("spaces", userAPI, spacesHandler(rsAPI, fsAPI, cache, base.Cfg.Global.ServerName), httputil.WithAllowGuests()) - base.PublicClientAPIMux.Handle("/v1/rooms/{roomID}/hierarchy", clientAPI).Methods(http.MethodGet, http.MethodOptions) - base.PublicClientAPIMux.Handle("/unstable/org.matrix.msc2946/rooms/{roomID}/hierarchy", clientAPI).Methods(http.MethodGet, http.MethodOptions) + base.Routers.Client.Handle("/v1/rooms/{roomID}/hierarchy", clientAPI).Methods(http.MethodGet, http.MethodOptions) + base.Routers.Client.Handle("/unstable/org.matrix.msc2946/rooms/{roomID}/hierarchy", clientAPI).Methods(http.MethodGet, http.MethodOptions) fedAPI := httputil.MakeExternalAPI( "msc2946_fed_spaces", func(req *http.Request) util.JSONResponse { @@ -78,8 +78,8 @@ func Enable( return federatedSpacesHandler(req.Context(), fedReq, roomID, cache, rsAPI, fsAPI, base.Cfg.Global.ServerName) }, ) - base.PublicFederationAPIMux.Handle("/unstable/org.matrix.msc2946/hierarchy/{roomID}", fedAPI).Methods(http.MethodGet) - base.PublicFederationAPIMux.Handle("/v1/hierarchy/{roomID}", fedAPI).Methods(http.MethodGet) + base.Routers.Federation.Handle("/unstable/org.matrix.msc2946/hierarchy/{roomID}", fedAPI).Methods(http.MethodGet) + base.Routers.Federation.Handle("/v1/hierarchy/{roomID}", fedAPI).Methods(http.MethodGet) return nil } diff --git a/setup/mscs/mscs.go b/setup/mscs/mscs.go index 35b7bba3b..b58c800b0 100644 --- a/setup/mscs/mscs.go +++ b/setup/mscs/mscs.go @@ -19,6 +19,7 @@ import ( "context" "fmt" + "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/setup" "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/mscs/msc2836" @@ -27,22 +28,22 @@ import ( ) // Enable MSCs - returns an error on unknown MSCs -func Enable(base *base.BaseDendrite, monolith *setup.Monolith) error { +func Enable(base *base.BaseDendrite, monolith *setup.Monolith, caches *caching.Caches) error { for _, msc := range base.Cfg.MSCs.MSCs { util.GetLogger(context.Background()).WithField("msc", msc).Info("Enabling MSC") - if err := EnableMSC(base, monolith, msc); err != nil { + if err := EnableMSC(base, monolith, msc, caches); err != nil { return err } } return nil } -func EnableMSC(base *base.BaseDendrite, monolith *setup.Monolith, msc string) error { +func EnableMSC(base *base.BaseDendrite, monolith *setup.Monolith, msc string, caches *caching.Caches) error { switch msc { case "msc2836": return msc2836.Enable(base, monolith.RoomserverAPI, monolith.FederationAPI, monolith.UserAPI, monolith.KeyRing) case "msc2946": - return msc2946.Enable(base, monolith.RoomserverAPI, monolith.UserAPI, monolith.FederationAPI, monolith.KeyRing, base.Caches) + return msc2946.Enable(base, monolith.RoomserverAPI, monolith.UserAPI, monolith.FederationAPI, monolith.KeyRing, caches) case "msc2444": // enabled inside federationapi case "msc2753": // enabled inside clientapi default: diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go index 735f6718c..43dc0f517 100644 --- a/syncapi/consumers/clientapi.go +++ b/syncapi/consumers/clientapi.go @@ -50,7 +50,7 @@ type OutputClientDataConsumer struct { stream streams.StreamProvider notifier *notifier.Notifier serverName gomatrixserverlib.ServerName - fts *fulltext.Search + fts fulltext.Indexer cfg *config.SyncAPI } diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index a8d4d2b2c..21f6104d6 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -51,7 +51,7 @@ type OutputRoomEventConsumer struct { pduStream streams.StreamProvider inviteStream streams.StreamProvider notifier *notifier.Notifier - fts *fulltext.Search + fts fulltext.Indexer } // NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers. diff --git a/syncapi/routing/routing.go b/syncapi/routing/routing.go index 4cc1a6a85..b1283247b 100644 --- a/syncapi/routing/routing.go +++ b/syncapi/routing/routing.go @@ -43,7 +43,7 @@ func Setup( rsAPI api.SyncRoomserverAPI, cfg *config.SyncAPI, lazyLoadCache caching.LazyLoadCache, - fts *fulltext.Search, + fts fulltext.Indexer, ) { v1unstablemux := csMux.PathPrefix("/{apiversion:(?:v1|unstable)}/").Subrouter() v3mux := csMux.PathPrefix("/{apiversion:(?:r0|v3)}/").Subrouter() diff --git a/syncapi/routing/search.go b/syncapi/routing/search.go index 081ec6cb1..13625b9cb 100644 --- a/syncapi/routing/search.go +++ b/syncapi/routing/search.go @@ -37,7 +37,7 @@ import ( ) // nolint:gocyclo -func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts *fulltext.Search, from *string) util.JSONResponse { +func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts fulltext.Indexer, from *string) util.JSONResponse { start := time.Now() var ( searchReq SearchRequest diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index 850d24a07..9f9de28d9 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -16,12 +16,12 @@ package postgres import ( + "context" "database/sql" // Import the postgres database driver. _ "github.com/lib/pq" "github.com/matrix-org/dendrite/internal/sqlutil" - "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas" "github.com/matrix-org/dendrite/syncapi/storage/shared" @@ -36,10 +36,10 @@ type SyncServerDatasource struct { } // NewDatabase creates a new sync server database -func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) { +func NewDatabase(ctx context.Context, cm sqlutil.Connections, dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) { var d SyncServerDatasource var err error - if d.db, d.writer, err = base.DatabaseConnection(dbProperties, sqlutil.NewDummyWriter()); err != nil { + if d.db, d.writer, err = cm.Connection(dbProperties); err != nil { return nil, err } accountData, err := NewPostgresAccountDataTable(d.db) @@ -111,7 +111,7 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) Up: deltas.UpSetHistoryVisibility, // Requires current_room_state and output_room_events to be created. }, ) - err = m.Up(base.Context()) + err = m.Up(ctx) if err != nil { return nil, err } diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index 510546909..3f1ca355e 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -20,7 +20,6 @@ import ( "database/sql" "github.com/matrix-org/dendrite/internal/sqlutil" - "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/storage/shared" "github.com/matrix-org/dendrite/syncapi/storage/sqlite3/deltas" @@ -37,13 +36,14 @@ type SyncServerDatasource struct { // NewDatabase creates a new sync server database // nolint: gocyclo -func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) { +func NewDatabase(ctx context.Context, conMan sqlutil.Connections, dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) { var d SyncServerDatasource var err error - if d.db, d.writer, err = base.DatabaseConnection(dbProperties, sqlutil.NewExclusiveWriter()); err != nil { + + if d.db, d.writer, err = conMan.Connection(dbProperties); err != nil { return nil, err } - if err = d.prepare(base.Context()); err != nil { + if err = d.prepare(ctx); err != nil { return nil, err } return &d, nil diff --git a/syncapi/storage/storage.go b/syncapi/storage/storage.go index 5b20c6cc2..8714ec5e2 100644 --- a/syncapi/storage/storage.go +++ b/syncapi/storage/storage.go @@ -18,21 +18,22 @@ package storage import ( + "context" "fmt" - "github.com/matrix-org/dendrite/setup/base" + "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/storage/postgres" "github.com/matrix-org/dendrite/syncapi/storage/sqlite3" ) // NewSyncServerDatasource opens a database connection. -func NewSyncServerDatasource(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (Database, error) { +func NewSyncServerDatasource(ctx context.Context, conMan sqlutil.Connections, dbProperties *config.DatabaseOptions) (Database, error) { switch { case dbProperties.ConnectionString.IsSQLite(): - return sqlite3.NewDatabase(base, dbProperties) + return sqlite3.NewDatabase(ctx, conMan, dbProperties) case dbProperties.ConnectionString.IsPostgres(): - return postgres.NewDatabase(base, dbProperties) + return postgres.NewDatabase(ctx, conMan, dbProperties) default: return nil, fmt.Errorf("unexpected database type") } diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go index 05d498bc2..9f0064907 100644 --- a/syncapi/storage/storage_test.go +++ b/syncapi/storage/storage_test.go @@ -9,27 +9,27 @@ import ( "reflect" "testing" + "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/test" - "github.com/matrix-org/dendrite/test/testrig" "github.com/matrix-org/gomatrixserverlib" "github.com/stretchr/testify/assert" ) var ctx = context.Background() -func MustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, func(), func()) { +func MustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, func()) { connStr, close := test.PrepareDBConnectionString(t, dbType) - base, closeBase := testrig.CreateBaseDendrite(t, dbType) - db, err := storage.NewSyncServerDatasource(base, &config.DatabaseOptions{ + cm := sqlutil.NewConnectionManager() + db, err := storage.NewSyncServerDatasource(context.Background(), cm, &config.DatabaseOptions{ ConnectionString: config.DataSource(connStr), }) if err != nil { t.Fatalf("NewSyncServerDatasource returned %s", err) } - return db, close, closeBase + return db, close } func MustWriteEvents(t *testing.T, db storage.Database, events []*gomatrixserverlib.HeaderedEvent) (positions []types.StreamPosition) { @@ -55,9 +55,8 @@ func TestWriteEvents(t *testing.T) { test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { alice := test.NewUser(t) r := test.NewRoom(t, alice) - db, close, closeBase := MustCreateDatabase(t, dbType) + db, close := MustCreateDatabase(t, dbType) defer close() - defer closeBase() MustWriteEvents(t, db, r.Events()) }) } @@ -76,9 +75,8 @@ func WithSnapshot(t *testing.T, db storage.Database, f func(snapshot storage.Dat // These tests assert basic functionality of RecentEvents for PDUs func TestRecentEventsPDU(t *testing.T) { test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { - db, close, closeBase := MustCreateDatabase(t, dbType) + db, close := MustCreateDatabase(t, dbType) defer close() - defer closeBase() alice := test.NewUser(t) // dummy room to make sure SQL queries are filtering on room ID MustWriteEvents(t, db, test.NewRoom(t, alice).Events()) @@ -191,9 +189,8 @@ func TestRecentEventsPDU(t *testing.T) { // The purpose of this test is to ensure that backfill does indeed go backwards, using a topology token func TestGetEventsInRangeWithTopologyToken(t *testing.T) { test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { - db, close, closeBase := MustCreateDatabase(t, dbType) + db, close := MustCreateDatabase(t, dbType) defer close() - defer closeBase() alice := test.NewUser(t) r := test.NewRoom(t, alice) for i := 0; i < 10; i++ { @@ -276,9 +273,8 @@ func TestStreamToTopologicalPosition(t *testing.T) { } test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { - db, close, closeBase := MustCreateDatabase(t, dbType) + db, close := MustCreateDatabase(t, dbType) defer close() - defer closeBase() txn, err := db.NewDatabaseTransaction(ctx) if err != nil { @@ -514,9 +510,8 @@ func TestSendToDeviceBehaviour(t *testing.T) { bob := test.NewUser(t) deviceID := "one" test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { - db, close, closeBase := MustCreateDatabase(t, dbType) + db, close := MustCreateDatabase(t, dbType) defer close() - defer closeBase() // At this point there should be no messages. We haven't sent anything // yet. @@ -899,9 +894,8 @@ func TestRoomSummary(t *testing.T) { } test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { - db, close, closeBase := MustCreateDatabase(t, dbType) + db, close := MustCreateDatabase(t, dbType) defer close() - defer closeBase() for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { @@ -939,11 +933,8 @@ func TestRecentEvents(t *testing.T) { test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { filter := gomatrixserverlib.DefaultRoomEventFilter() - db, close, closeBase := MustCreateDatabase(t, dbType) - t.Cleanup(func() { - close() - closeBase() - }) + db, close := MustCreateDatabase(t, dbType) + t.Cleanup(close) MustWriteEvents(t, db, room1.Events()) MustWriteEvents(t, db, room2.Events()) diff --git a/syncapi/storage/storage_wasm.go b/syncapi/storage/storage_wasm.go index c15444743..db0b173bb 100644 --- a/syncapi/storage/storage_wasm.go +++ b/syncapi/storage/storage_wasm.go @@ -15,18 +15,19 @@ package storage import ( + "context" "fmt" - "github.com/matrix-org/dendrite/setup/base" + "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/storage/sqlite3" ) // NewPublicRoomsServerDatabase opens a database connection. -func NewSyncServerDatasource(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (Database, error) { +func NewSyncServerDatasource(ctx context.Context, conMan sqlutil.Connections, dbProperties *config.DatabaseOptions) (Database, error) { switch { case dbProperties.ConnectionString.IsSQLite(): - return sqlite3.NewDatabase(base, dbProperties) + return sqlite3.NewDatabase(ctx, conMan, dbProperties) case dbProperties.ConnectionString.IsPostgres(): return nil, fmt.Errorf("can't use Postgres implementation") default: diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 153f7af5d..e0cc8462e 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -17,6 +17,7 @@ package syncapi import ( "context" + "github.com/matrix-org/dendrite/internal/fulltext" "github.com/sirupsen/logrus" "github.com/matrix-org/dendrite/internal/caching" @@ -41,24 +42,34 @@ func AddPublicRoutes( base *base.BaseDendrite, userAPI userapi.SyncUserAPI, rsAPI api.SyncRoomserverAPI, + caches caching.LazyLoadCache, ) { cfg := &base.Cfg.SyncAPI js, natsClient := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) - syncDB, err := storage.NewSyncServerDatasource(base, &cfg.Database) + syncDB, err := storage.NewSyncServerDatasource(base.Context(), base.ConnectionManager, &cfg.Database) if err != nil { logrus.WithError(err).Panicf("failed to connect to sync db") } eduCache := caching.NewTypingCache() notifier := notifier.NewNotifier() - streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, eduCache, base.Caches, notifier) + streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, eduCache, caches, notifier) notifier.SetCurrentPosition(streams.Latest(context.Background())) if err = notifier.Load(context.Background(), syncDB); err != nil { logrus.WithError(err).Panicf("failed to load notifier ") } + var fts *fulltext.Search + if cfg.Fulltext.Enabled { + fts, err = fulltext.New(base.ProcessContext.Context(), cfg.Fulltext) + if err != nil { + logrus.WithError(err).Panicf("failed to create full text") + } + base.ProcessContext.ComponentStarted() + } + federationPresenceProducer := &producers.FederationAPIPresenceProducer{ Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent), JetStream: js, @@ -86,7 +97,7 @@ func AddPublicRoutes( roomConsumer := consumers.NewOutputRoomEventConsumer( base.ProcessContext, cfg, js, syncDB, notifier, streams.PDUStreamProvider, - streams.InviteStreamProvider, rsAPI, base.Fulltext, + streams.InviteStreamProvider, rsAPI, fts, ) if err = roomConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start room server consumer") @@ -94,7 +105,7 @@ func AddPublicRoutes( clientConsumer := consumers.NewOutputClientDataConsumer( base.ProcessContext, cfg, js, natsClient, syncDB, notifier, - streams.AccountDataStreamProvider, base.Fulltext, + streams.AccountDataStreamProvider, fts, ) if err = clientConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start client data consumer") @@ -129,7 +140,7 @@ func AddPublicRoutes( } routing.Setup( - base.PublicClientAPIMux, requestPool, syncDB, userAPI, - rsAPI, cfg, base.Caches, base.Fulltext, + base.Routers.Client, requestPool, syncDB, userAPI, + rsAPI, cfg, caches, fts, ) } diff --git a/syncapi/syncapi_test.go b/syncapi/syncapi_test.go index 1cb82ce11..13a078659 100644 --- a/syncapi/syncapi_test.go +++ b/syncapi/syncapi_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/gomatrixserverlib" "github.com/nats-io/nats.go" "github.com/tidwall/gjson" @@ -114,12 +115,13 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) { } base, close := testrig.CreateBaseDendrite(t, dbType) + caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) defer close() jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) msgs := toNATSMsgs(t, base, room.Events()...) - AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}) + AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches) testrig.MustPublishMsgs(t, jsctx, msgs...) testCases := []struct { @@ -162,7 +164,7 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) { for _, tc := range testCases { w := httptest.NewRecorder() - base.PublicClientAPIMux.ServeHTTP(w, tc.req) + base.Routers.Client.ServeHTTP(w, tc.req) if w.Code != tc.wantCode { t.Fatalf("%s: got HTTP %d want %d", tc.name, w.Code, tc.wantCode) } @@ -218,12 +220,13 @@ func testSyncAPICreateRoomSyncEarly(t *testing.T, dbType test.DBType) { // m.room.history_visibility msgs := toNATSMsgs(t, base, room.Events()...) sinceTokens := make([]string, len(msgs)) - AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}) + caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) + AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{rooms: []*test.Room{room}}, caches) for i, msg := range msgs { testrig.MustPublishMsgs(t, jsctx, msg) time.Sleep(100 * time.Millisecond) w := httptest.NewRecorder() - base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ + base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ "access_token": alice.AccessToken, "timeout": "0", }))) @@ -253,7 +256,7 @@ func testSyncAPICreateRoomSyncEarly(t *testing.T, dbType test.DBType) { t.Logf("waited for events to be consumed; syncing with %v", sinceTokens) for i, since := range sinceTokens { w := httptest.NewRecorder() - base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ + base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ "access_token": alice.AccessToken, "timeout": "0", "since": since, @@ -302,9 +305,10 @@ func testSyncAPIUpdatePresenceImmediately(t *testing.T, dbType test.DBType) { jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) - AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}) + caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) + AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches) w := httptest.NewRecorder() - base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ + base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ "access_token": alice.AccessToken, "timeout": "0", "set_presence": "online", @@ -417,10 +421,10 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) { defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) // Use the actual internal roomserver API - rsAPI := roomserver.NewInternalAPI(base) + caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) + rsAPI := roomserver.NewInternalAPI(base, caches) rsAPI.SetFederationAPI(nil, nil) - - AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI) + AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches) for _, tc := range testCases { testname := fmt.Sprintf("%s - %s", tc.historyVisibility, userType) @@ -444,7 +448,7 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) { // There is only one event, we expect only to be able to see this, if the room is world_readable w := httptest.NewRecorder() - base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{ + base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{ "access_token": bobDev.AccessToken, "dir": "b", "filter": `{"lazy_load_members":true}`, // check that lazy loading doesn't break history visibility @@ -484,7 +488,7 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) { // Verify the messages after/before invite are visible or not w = httptest.NewRecorder() - base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{ + base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{ "access_token": bobDev.AccessToken, "dir": "b", }))) @@ -717,10 +721,11 @@ func TestGetMembership(t *testing.T) { defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) // Use an actual roomserver for this - rsAPI := roomserver.NewInternalAPI(base) + caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) + rsAPI := roomserver.NewInternalAPI(base, caches) rsAPI.SetFederationAPI(nil, nil) - AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI) + AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, caches) for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { @@ -748,7 +753,7 @@ func TestGetMembership(t *testing.T) { } w := httptest.NewRecorder() - base.PublicClientAPIMux.ServeHTTP(w, tc.request(t, room)) + base.Routers.Client.ServeHTTP(w, tc.request(t, room)) if w.Code != 200 && tc.wantOK { t.Logf("%s", w.Body.String()) t.Fatalf("got HTTP %d want %d", w.Code, 200) @@ -786,8 +791,8 @@ func testSendToDevice(t *testing.T, dbType test.DBType) { jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) - - AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}) + caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) + AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, &syncRoomserverAPI{}, caches) producer := producers.SyncAPIProducer{ TopicSendToDeviceEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), @@ -885,7 +890,7 @@ func testSendToDevice(t *testing.T, dbType test.DBType) { // Execute a /sync request, recording the response w := httptest.NewRecorder() - base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ + base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ "access_token": alice.AccessToken, "since": tc.since, }))) @@ -1003,10 +1008,11 @@ func testContext(t *testing.T, dbType test.DBType) { defer baseClose() // Use an actual roomserver for this - rsAPI := roomserver.NewInternalAPI(base) + caches := caching.NewRistrettoCache(base.Cfg.Global.Cache.EstimatedMaxSize, base.Cfg.Global.Cache.MaxAge, caching.DisableMetrics) + rsAPI := roomserver.NewInternalAPI(base, caches) rsAPI.SetFederationAPI(nil, nil) - AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, rsAPI) + AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, rsAPI, caches) room := test.NewRoom(t, user) @@ -1049,7 +1055,7 @@ func testContext(t *testing.T, dbType test.DBType) { params[k] = v } } - base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", requestPath, test.WithQueryParams(params))) + base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", requestPath, test.WithQueryParams(params))) if tc.wantError && w.Code == 200 { t.Fatalf("Expected an error, but got none") @@ -1139,7 +1145,7 @@ func TestUpdateRelations(t *testing.T) { test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { base, shutdownBase := testrig.CreateBaseDendrite(t, dbType) t.Cleanup(shutdownBase) - db, err := storage.NewSyncServerDatasource(base, &base.Cfg.SyncAPI.Database) + db, err := storage.NewSyncServerDatasource(base.Context(), base.ConnectionManager, &base.Cfg.SyncAPI.Database) if err != nil { t.Fatal(err) } @@ -1178,7 +1184,7 @@ func syncUntil(t *testing.T, go func() { for { w := httptest.NewRecorder() - base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ + base.Routers.Client.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ "access_token": accessToken, "timeout": "1000", }))) diff --git a/userapi/consumers/roomserver_test.go b/userapi/consumers/roomserver_test.go index bc5ae652d..a54706dbb 100644 --- a/userapi/consumers/roomserver_test.go +++ b/userapi/consumers/roomserver_test.go @@ -7,22 +7,22 @@ import ( "testing" "time" + "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/gomatrixserverlib" "github.com/stretchr/testify/assert" "github.com/matrix-org/dendrite/internal/pushrules" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/test" - "github.com/matrix-org/dendrite/test/testrig" "github.com/matrix-org/dendrite/userapi/storage" userAPITypes "github.com/matrix-org/dendrite/userapi/types" ) func mustCreateDatabase(t *testing.T, dbType test.DBType) (storage.UserDatabase, func()) { - base, baseclose := testrig.CreateBaseDendrite(t, dbType) t.Helper() connStr, close := test.PrepareDBConnectionString(t, dbType) - db, err := storage.NewUserDatabase(base, &config.DatabaseOptions{ + cm := sqlutil.NewConnectionManager() + db, err := storage.NewUserDatabase(context.Background(), cm, &config.DatabaseOptions{ ConnectionString: config.DataSource(connStr), }, "", 4, 0, 0, "") if err != nil { @@ -30,7 +30,6 @@ func mustCreateDatabase(t *testing.T, dbType test.DBType) (storage.UserDatabase, } return db, func() { close() - baseclose() } } diff --git a/userapi/internal/device_list_update_test.go b/userapi/internal/device_list_update_test.go index 868fc9be8..6ccc4b5fb 100644 --- a/userapi/internal/device_list_update_test.go +++ b/userapi/internal/device_list_update_test.go @@ -27,13 +27,13 @@ import ( "testing" "time" + "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/gomatrixserverlib" roomserver "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/test" - "github.com/matrix-org/dendrite/test/testrig" "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/storage" ) @@ -363,9 +363,9 @@ func TestDebounce(t *testing.T) { func mustCreateKeyserverDB(t *testing.T, dbType test.DBType) (storage.KeyDatabase, func()) { t.Helper() - base, _, _ := testrig.Base(nil) connStr, clearDB := test.PrepareDBConnectionString(t, dbType) - db, err := storage.NewKeyDatabase(base, &config.DatabaseOptions{ConnectionString: config.DataSource(connStr)}) + cm := sqlutil.NewConnectionManager() + db, err := storage.NewKeyDatabase(cm, &config.DatabaseOptions{ConnectionString: config.DataSource(connStr)}) if err != nil { t.Fatal(err) } diff --git a/userapi/internal/key_api_test.go b/userapi/internal/key_api_test.go index fc7e7e0df..ec315c738 100644 --- a/userapi/internal/key_api_test.go +++ b/userapi/internal/key_api_test.go @@ -5,9 +5,9 @@ import ( "reflect" "testing" + "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/test" - "github.com/matrix-org/dendrite/test/testrig" "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/internal" "github.com/matrix-org/dendrite/userapi/storage" @@ -16,15 +16,14 @@ import ( func mustCreateDatabase(t *testing.T, dbType test.DBType) (storage.KeyDatabase, func()) { t.Helper() connStr, close := test.PrepareDBConnectionString(t, dbType) - base, _, _ := testrig.Base(nil) - db, err := storage.NewKeyDatabase(base, &config.DatabaseOptions{ + cm := sqlutil.NewConnectionManager() + db, err := storage.NewKeyDatabase(cm, &config.DatabaseOptions{ ConnectionString: config.DataSource(connStr), }) if err != nil { t.Fatalf("failed to create new user db: %v", err) } return db, func() { - base.Close() close() } } diff --git a/userapi/storage/postgres/storage.go b/userapi/storage/postgres/storage.go index 673d123ba..7bfae7b20 100644 --- a/userapi/storage/postgres/storage.go +++ b/userapi/storage/postgres/storage.go @@ -23,7 +23,6 @@ import ( "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/dendrite/internal/sqlutil" - "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/userapi/storage/postgres/deltas" "github.com/matrix-org/dendrite/userapi/storage/shared" @@ -33,8 +32,8 @@ import ( ) // NewDatabase creates a new accounts and profiles database -func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, serverName gomatrixserverlib.ServerName, bcryptCost int, openIDTokenLifetimeMS int64, loginTokenLifetime time.Duration, serverNoticesLocalpart string) (*shared.Database, error) { - db, writer, err := base.DatabaseConnection(dbProperties, sqlutil.NewDummyWriter()) +func NewDatabase(ctx context.Context, conMan sqlutil.Connections, dbProperties *config.DatabaseOptions, serverName gomatrixserverlib.ServerName, bcryptCost int, openIDTokenLifetimeMS int64, loginTokenLifetime time.Duration, serverNoticesLocalpart string) (*shared.Database, error) { + db, writer, err := conMan.Connection(dbProperties) if err != nil { return nil, err } @@ -51,7 +50,7 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, return deltas.UpServerNames(ctx, txn, serverName) }, }) - if err = m.Up(base.Context()); err != nil { + if err = m.Up(ctx); err != nil { return nil, err } @@ -111,7 +110,7 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, return deltas.UpServerNamesPopulate(ctx, txn, serverName) }, }) - if err = m.Up(base.Context()); err != nil { + if err = m.Up(ctx); err != nil { return nil, err } @@ -137,8 +136,8 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, }, nil } -func NewKeyDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (*shared.KeyDatabase, error) { - db, writer, err := base.DatabaseConnection(dbProperties, sqlutil.NewDummyWriter()) +func NewKeyDatabase(conMan sqlutil.Connections, dbProperties *config.DatabaseOptions) (*shared.KeyDatabase, error) { + db, writer, err := conMan.Connection(dbProperties) if err != nil { return nil, err } diff --git a/userapi/storage/sqlite3/storage.go b/userapi/storage/sqlite3/storage.go index 0f3eeed1b..3742eebad 100644 --- a/userapi/storage/sqlite3/storage.go +++ b/userapi/storage/sqlite3/storage.go @@ -23,7 +23,6 @@ import ( "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/dendrite/internal/sqlutil" - "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/userapi/storage/shared" @@ -31,8 +30,8 @@ import ( ) // NewUserDatabase creates a new accounts and profiles database -func NewUserDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions, serverName gomatrixserverlib.ServerName, bcryptCost int, openIDTokenLifetimeMS int64, loginTokenLifetime time.Duration, serverNoticesLocalpart string) (*shared.Database, error) { - db, writer, err := base.DatabaseConnection(dbProperties, sqlutil.NewExclusiveWriter()) +func NewUserDatabase(ctx context.Context, conMan sqlutil.Connections, dbProperties *config.DatabaseOptions, serverName gomatrixserverlib.ServerName, bcryptCost int, openIDTokenLifetimeMS int64, loginTokenLifetime time.Duration, serverNoticesLocalpart string) (*shared.Database, error) { + db, writer, err := conMan.Connection(dbProperties) if err != nil { return nil, err } @@ -49,7 +48,7 @@ func NewUserDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptio return deltas.UpServerNames(ctx, txn, serverName) }, }) - if err = m.Up(base.Context()); err != nil { + if err = m.Up(ctx); err != nil { return nil, err } @@ -109,7 +108,7 @@ func NewUserDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptio return deltas.UpServerNamesPopulate(ctx, txn, serverName) }, }) - if err = m.Up(base.Context()); err != nil { + if err = m.Up(ctx); err != nil { return nil, err } @@ -135,8 +134,8 @@ func NewUserDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptio }, nil } -func NewKeyDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (*shared.KeyDatabase, error) { - db, writer, err := base.DatabaseConnection(dbProperties, sqlutil.NewExclusiveWriter()) +func NewKeyDatabase(conMan sqlutil.Connections, dbProperties *config.DatabaseOptions) (*shared.KeyDatabase, error) { + db, writer, err := conMan.Connection(dbProperties) if err != nil { return nil, err } diff --git a/userapi/storage/storage.go b/userapi/storage/storage.go index 0329fb46a..6981765f9 100644 --- a/userapi/storage/storage.go +++ b/userapi/storage/storage.go @@ -18,12 +18,13 @@ package storage import ( + "context" "fmt" "time" + "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/userapi/storage/postgres" "github.com/matrix-org/dendrite/userapi/storage/sqlite3" @@ -32,7 +33,8 @@ import ( // NewUserDatabase opens a new Postgres or Sqlite database (based on dataSourceName scheme) // and sets postgres connection parameters func NewUserDatabase( - base *base.BaseDendrite, + ctx context.Context, + conMan sqlutil.Connections, dbProperties *config.DatabaseOptions, serverName gomatrixserverlib.ServerName, bcryptCost int, @@ -42,9 +44,9 @@ func NewUserDatabase( ) (UserDatabase, error) { switch { case dbProperties.ConnectionString.IsSQLite(): - return sqlite3.NewUserDatabase(base, dbProperties, serverName, bcryptCost, openIDTokenLifetimeMS, loginTokenLifetime, serverNoticesLocalpart) + return sqlite3.NewUserDatabase(ctx, conMan, dbProperties, serverName, bcryptCost, openIDTokenLifetimeMS, loginTokenLifetime, serverNoticesLocalpart) case dbProperties.ConnectionString.IsPostgres(): - return postgres.NewDatabase(base, dbProperties, serverName, bcryptCost, openIDTokenLifetimeMS, loginTokenLifetime, serverNoticesLocalpart) + return postgres.NewDatabase(ctx, conMan, dbProperties, serverName, bcryptCost, openIDTokenLifetimeMS, loginTokenLifetime, serverNoticesLocalpart) default: return nil, fmt.Errorf("unexpected database type") } @@ -52,12 +54,12 @@ func NewUserDatabase( // NewKeyDatabase opens a new Postgres or Sqlite database (base on dataSourceName) scheme) // and sets postgres connection parameters. -func NewKeyDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions) (KeyDatabase, error) { +func NewKeyDatabase(conMan sqlutil.Connections, dbProperties *config.DatabaseOptions) (KeyDatabase, error) { switch { case dbProperties.ConnectionString.IsSQLite(): - return sqlite3.NewKeyDatabase(base, dbProperties) + return sqlite3.NewKeyDatabase(conMan, dbProperties) case dbProperties.ConnectionString.IsPostgres(): - return postgres.NewKeyDatabase(base, dbProperties) + return postgres.NewKeyDatabase(conMan, dbProperties) default: return nil, fmt.Errorf("unexpected database type") } diff --git a/userapi/storage/storage_test.go b/userapi/storage/storage_test.go index f52e7e17d..c6369ef93 100644 --- a/userapi/storage/storage_test.go +++ b/userapi/storage/storage_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/userapi/types" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -33,9 +34,9 @@ var ( ) func mustCreateUserDatabase(t *testing.T, dbType test.DBType) (storage.UserDatabase, func()) { - base, baseclose := testrig.CreateBaseDendrite(t, dbType) connStr, close := test.PrepareDBConnectionString(t, dbType) - db, err := storage.NewUserDatabase(base, &config.DatabaseOptions{ + cm := sqlutil.NewConnectionManager() + db, err := storage.NewUserDatabase(context.Background(), cm, &config.DatabaseOptions{ ConnectionString: config.DataSource(connStr), }, "localhost", bcrypt.MinCost, openIDLifetimeMS, loginTokenLifetime, "_server") if err != nil { @@ -43,7 +44,6 @@ func mustCreateUserDatabase(t *testing.T, dbType test.DBType) (storage.UserDatab } return db, func() { close() - baseclose() } } @@ -577,7 +577,7 @@ func Test_Notification(t *testing.T) { func mustCreateKeyDatabase(t *testing.T, dbType test.DBType) (storage.KeyDatabase, func()) { base, close := testrig.CreateBaseDendrite(t, dbType) - db, err := storage.NewKeyDatabase(base, &base.Cfg.KeyServer.Database) + db, err := storage.NewKeyDatabase(base.ConnectionManager, &base.Cfg.KeyServer.Database) if err != nil { t.Fatalf("failed to create new database: %v", err) } diff --git a/userapi/storage/storage_wasm.go b/userapi/storage/storage_wasm.go index 163e3e173..19e5f23c6 100644 --- a/userapi/storage/storage_wasm.go +++ b/userapi/storage/storage_wasm.go @@ -15,17 +15,19 @@ package storage import ( + "context" "fmt" "time" - "github.com/matrix-org/dendrite/setup/base" + "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/userapi/storage/sqlite3" "github.com/matrix-org/gomatrixserverlib" ) -func NewUserAPIDatabase( - base *base.BaseDendrite, +func NewUserDatabase( + ctx context.Context, + conMan sqlutil.Connections, dbProperties *config.DatabaseOptions, serverName gomatrixserverlib.ServerName, bcryptCost int, @@ -35,7 +37,20 @@ func NewUserAPIDatabase( ) (UserDatabase, error) { switch { case dbProperties.ConnectionString.IsSQLite(): - return sqlite3.NewUserDatabase(base, dbProperties, serverName, bcryptCost, openIDTokenLifetimeMS, loginTokenLifetime, serverNoticesLocalpart) + return sqlite3.NewUserDatabase(ctx, conMan, dbProperties, serverName, bcryptCost, openIDTokenLifetimeMS, loginTokenLifetime, serverNoticesLocalpart) + case dbProperties.ConnectionString.IsPostgres(): + return nil, fmt.Errorf("can't use Postgres implementation") + default: + return nil, fmt.Errorf("unexpected database type") + } +} + +// NewKeyDatabase opens a new Postgres or Sqlite database (base on dataSourceName) scheme) +// and sets postgres connection parameters. +func NewKeyDatabase(conMan sqlutil.Connections, dbProperties *config.DatabaseOptions) (KeyDatabase, error) { + switch { + case dbProperties.ConnectionString.IsSQLite(): + return sqlite3.NewKeyDatabase(conMan, dbProperties) case dbProperties.ConnectionString.IsPostgres(): return nil, fmt.Errorf("can't use Postgres implementation") default: diff --git a/userapi/userapi.go b/userapi/userapi.go index 826bd7213..3ada8020c 100644 --- a/userapi/userapi.go +++ b/userapi/userapi.go @@ -18,6 +18,7 @@ import ( "time" fedsenderapi "github.com/matrix-org/dendrite/federationapi/api" + "github.com/matrix-org/dendrite/internal/pushgateway" "github.com/sirupsen/logrus" rsapi "github.com/matrix-org/dendrite/roomserver/api" @@ -42,10 +43,11 @@ func NewInternalAPI( js, _ := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream) appServices := base.Cfg.Derived.ApplicationServices - pgClient := base.PushGatewayHTTPClient() + pgClient := pushgateway.NewHTTPClient(cfg.PushGatewayDisableTLSValidation) db, err := storage.NewUserDatabase( - base, + base.ProcessContext.Context(), + base.ConnectionManager, &cfg.AccountDatabase, cfg.Matrix.ServerName, cfg.BCryptCost, @@ -57,7 +59,7 @@ func NewInternalAPI( logrus.WithError(err).Panicf("failed to connect to accounts db") } - keyDB, err := storage.NewKeyDatabase(base, &base.Cfg.KeyServer.Database) + keyDB, err := storage.NewKeyDatabase(base.ConnectionManager, &base.Cfg.KeyServer.Database) if err != nil { logrus.WithError(err).Panicf("failed to connect to key db") } diff --git a/userapi/userapi_test.go b/userapi/userapi_test.go index 01e491cb6..c2d4e5a27 100644 --- a/userapi/userapi_test.go +++ b/userapi/userapi_test.go @@ -22,6 +22,7 @@ import ( "testing" "time" + "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/userapi/producers" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -73,14 +74,16 @@ func MustMakeInternalAPI(t *testing.T, opts apiTestOpts, dbType test.DBType, pub if opts.serverName != "" { sName = gomatrixserverlib.ServerName(opts.serverName) } - accountDB, err := storage.NewUserDatabase(base, &config.DatabaseOptions{ + cm := sqlutil.NewConnectionManager() + ctx := context.Background() + accountDB, err := storage.NewUserDatabase(ctx, cm, &config.DatabaseOptions{ ConnectionString: config.DataSource(connStr), }, sName, bcrypt.MinCost, config.DefaultOpenIDTokenLifetimeMS, opts.loginTokenLifetime, "") if err != nil { t.Fatalf("failed to create account DB: %s", err) } - keyDB, err := storage.NewKeyDatabase(base, &config.DatabaseOptions{ + keyDB, err := storage.NewKeyDatabase(base.ConnectionManager, &config.DatabaseOptions{ ConnectionString: config.DataSource(connStr), }) if err != nil { diff --git a/userapi/util/notify_test.go b/userapi/util/notify_test.go index 421852d3f..c899e3a7c 100644 --- a/userapi/util/notify_test.go +++ b/userapi/util/notify_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" "golang.org/x/crypto/bcrypt" @@ -15,7 +16,6 @@ import ( "github.com/matrix-org/dendrite/internal/pushgateway" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/test" - "github.com/matrix-org/dendrite/test/testrig" "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/storage" userUtil "github.com/matrix-org/dendrite/userapi/util" @@ -77,9 +77,8 @@ func TestNotifyUserCountsAsync(t *testing.T) { // Create DB and Dendrite base connStr, close := test.PrepareDBConnectionString(t, dbType) defer close() - base, _, _ := testrig.Base(nil) - defer base.Close() - db, err := storage.NewUserDatabase(base, &config.DatabaseOptions{ + cm := sqlutil.NewConnectionManager() + db, err := storage.NewUserDatabase(ctx, cm, &config.DatabaseOptions{ ConnectionString: config.DataSource(connStr), }, "test", bcrypt.MinCost, 0, 0, "") if err != nil { diff --git a/userapi/util/phonehomestats_test.go b/userapi/util/phonehomestats_test.go index 5f626b5bc..4e931a1b7 100644 --- a/userapi/util/phonehomestats_test.go +++ b/userapi/util/phonehomestats_test.go @@ -21,7 +21,7 @@ func TestCollect(t *testing.T) { b, _, _ := testrig.Base(nil) connStr, closeDB := test.PrepareDBConnectionString(t, dbType) defer closeDB() - db, err := storage.NewUserDatabase(b, &config.DatabaseOptions{ + db, err := storage.NewUserDatabase(b.Context(), b.ConnectionManager, &config.DatabaseOptions{ ConnectionString: config.DataSource(connStr), }, "localhost", bcrypt.MinCost, 1000, 1000, "") if err != nil {