From 23cd7877a14bca5315467591cd47a7d51aec22ce Mon Sep 17 00:00:00 2001 From: Till <2353100+S7evinK@users.noreply.github.com> Date: Wed, 28 Jun 2023 20:29:49 +0200 Subject: [PATCH] Add `MXIDMapping` for pseudoID rooms (#3112) Add `MXIDMapping` on membership events when creating/joining rooms. --- clientapi/routing/membership.go | 18 +-- clientapi/routing/profile.go | 20 +-- clientapi/routing/redaction.go | 4 +- clientapi/routing/sendevent.go | 53 +++++-- clientapi/routing/server_notices.go | 4 +- federationapi/consumers/roomserver.go | 15 +- federationapi/internal/perform.go | 22 ++- federationapi/routing/join.go | 12 ++ go.mod | 4 +- go.sum | 4 +- roomserver/api/api.go | 7 +- roomserver/api/query.go | 2 + roomserver/internal/alias.go | 11 +- roomserver/internal/api.go | 50 ++++++- roomserver/internal/input/input.go | 2 +- roomserver/internal/input/input_events.go | 19 ++- .../internal/perform/perform_backfill.go | 2 +- .../internal/perform/perform_create_room.go | 89 +++++++++--- roomserver/internal/perform/perform_invite.go | 17 +++ roomserver/internal/perform/perform_join.go | 39 +++++- roomserver/internal/perform/perform_leave.go | 9 +- roomserver/internal/query/query.go | 3 + roomserver/roomserver_test.go | 10 +- roomserver/storage/interface.go | 5 +- roomserver/storage/shared/storage.go | 104 +++++++------- .../storage/sqlite3/user_room_keys_table.go | 5 +- roomserver/types/headered_event.go | 5 + syncapi/consumers/roomserver.go | 72 +++++++--- syncapi/routing/search_test.go | 1 + .../postgres/current_room_state_table.go | 4 +- syncapi/storage/postgres/invites_table.go | 2 +- syncapi/storage/postgres/memberships_table.go | 2 +- .../postgres/output_room_events_table.go | 2 +- .../sqlite3/current_room_state_table.go | 4 +- syncapi/storage/sqlite3/invites_table.go | 2 +- syncapi/storage/sqlite3/memberships_table.go | 2 +- .../sqlite3/output_room_events_table.go | 2 +- syncapi/storage/storage_test.go | 1 + .../storage/tables/current_room_state_test.go | 8 +- syncapi/storage/tables/memberships_test.go | 2 + syncapi/streams/stream_pdu.go | 131 +++++++++++++++++- 41 files changed, 593 insertions(+), 177 deletions(-) diff --git a/clientapi/routing/membership.go b/clientapi/routing/membership.go index bafc37b67..60b120b9c 100644 --- a/clientapi/routing/membership.go +++ b/clientapi/routing/membership.go @@ -22,10 +22,6 @@ import ( "time" "github.com/getsentry/sentry-go" - "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/gomatrixserverlib/fclient" - "github.com/matrix-org/gomatrixserverlib/spec" - appserviceAPI "github.com/matrix-org/dendrite/appservice/api" "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/httputil" @@ -36,6 +32,9 @@ import ( "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/setup/config" userapi "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/gomatrixserverlib/fclient" + "github.com/matrix-org/gomatrixserverlib/spec" "github.com/matrix-org/util" ) @@ -433,11 +432,6 @@ func buildMembershipEvent( return nil, err } - identity, err := cfg.Matrix.SigningIdentityFor(device.UserDomain()) - if err != nil { - return nil, err - } - userID, err := spec.NewUserID(device.UserID, true) if err != nil { return nil, err @@ -459,6 +453,12 @@ func buildMembershipEvent( if err != nil { return nil, err } + + identity, err := rsAPI.SigningIdentityFor(ctx, *validRoomID, *userID) + if err != nil { + return nil, err + } + return buildMembershipEventDirect(ctx, targetSenderID, reason, profile.DisplayName, profile.AvatarURL, senderID, device.UserDomain(), membership, roomID, isDirect, identity.KeyID, identity.PrivateKey, evTime, rsAPI) } diff --git a/clientapi/routing/profile.go b/clientapi/routing/profile.go index c89ece41f..35da15e0e 100644 --- a/clientapi/routing/profile.go +++ b/clientapi/routing/profile.go @@ -145,7 +145,7 @@ func SetAvatarURL( } } - response, err := updateProfile(req.Context(), rsAPI, device, profile, userID, cfg, evTime) + response, err := updateProfile(req.Context(), rsAPI, device, profile, userID, evTime) if err != nil { return response } @@ -234,7 +234,7 @@ func SetDisplayName( } } - response, err := updateProfile(req.Context(), rsAPI, device, profile, userID, cfg, evTime) + response, err := updateProfile(req.Context(), rsAPI, device, profile, userID, evTime) if err != nil { return response } @@ -248,7 +248,7 @@ func SetDisplayName( func updateProfile( ctx context.Context, rsAPI api.ClientRoomserverAPI, device *userapi.Device, profile *authtypes.Profile, - userID string, cfg *config.ClientAPI, evTime time.Time, + userID string, evTime time.Time, ) (util.JSONResponse, error) { var res api.QueryRoomsForUserResponse err := rsAPI.QueryRoomsForUser(ctx, &api.QueryRoomsForUserRequest{ @@ -273,7 +273,7 @@ func updateProfile( } events, err := buildMembershipEvents( - ctx, device, res.RoomIDs, *profile, userID, cfg, evTime, rsAPI, + ctx, res.RoomIDs, *profile, userID, evTime, rsAPI, ) switch e := err.(type) { case nil: @@ -344,9 +344,8 @@ func getProfile( func buildMembershipEvents( ctx context.Context, - device *userapi.Device, roomIDs []string, - newProfile authtypes.Profile, userID string, cfg *config.ClientAPI, + newProfile authtypes.Profile, userID string, evTime time.Time, rsAPI api.ClientRoomserverAPI, ) ([]*types.HeaderedEvent, error) { evs := []*types.HeaderedEvent{} @@ -383,12 +382,17 @@ func buildMembershipEvents( return nil, err } - identity, err := cfg.Matrix.SigningIdentityFor(device.UserDomain()) + user, err := spec.NewUserID(userID, true) if err != nil { return nil, err } - event, err := eventutil.QueryAndBuildEvent(ctx, &proto, identity, evTime, rsAPI, nil) + identity, err := rsAPI.SigningIdentityFor(ctx, *validRoomID, *user) + if err != nil { + return nil, err + } + + event, err := eventutil.QueryAndBuildEvent(ctx, &proto, &identity, evTime, rsAPI, nil) if err != nil { return nil, err } diff --git a/clientapi/routing/redaction.go b/clientapi/routing/redaction.go index 42f029395..1b9a5a818 100644 --- a/clientapi/routing/redaction.go +++ b/clientapi/routing/redaction.go @@ -150,7 +150,7 @@ func SendRedaction( } } - identity, err := cfg.Matrix.SigningIdentityFor(device.UserDomain()) + identity, err := rsAPI.SigningIdentityFor(req.Context(), *validRoomID, *deviceUserID) if err != nil { return util.JSONResponse{ Code: http.StatusInternalServerError, @@ -159,7 +159,7 @@ func SendRedaction( } var queryRes roomserverAPI.QueryLatestEventsAndStateResponse - e, err := eventutil.QueryAndBuildEvent(req.Context(), &proto, identity, time.Now(), rsAPI, &queryRes) + e, err := eventutil.QueryAndBuildEvent(req.Context(), &proto, &identity, time.Now(), rsAPI, &queryRes) if errors.Is(err, eventutil.ErrRoomNoExists{}) { return util.JSONResponse{ Code: http.StatusNotFound, diff --git a/clientapi/routing/sendevent.go b/clientapi/routing/sendevent.go index d51a570de..41a3793ae 100644 --- a/clientapi/routing/sendevent.go +++ b/clientapi/routing/sendevent.go @@ -23,12 +23,6 @@ import ( "sync" "time" - "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/gomatrixserverlib/spec" - "github.com/matrix-org/util" - "github.com/prometheus/client_golang/prometheus" - "github.com/sirupsen/logrus" - "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/internal/transactions" @@ -36,6 +30,11 @@ import ( "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/setup/config" userapi "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/gomatrixserverlib/spec" + "github.com/matrix-org/util" + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" ) // http://matrix.org/docs/spec/client_server/r0.2.0.html#put-matrix-client-r0-rooms-roomid-send-eventtype-txnid @@ -68,6 +67,8 @@ var sendEventDuration = prometheus.NewHistogramVec( // /rooms/{roomID}/send/{eventType} // /rooms/{roomID}/send/{eventType}/{txnID} // /rooms/{roomID}/state/{eventType}/{stateKey} +// +// nolint: gocyclo func SendEvent( req *http.Request, device *userapi.Device, @@ -121,6 +122,17 @@ func SendEvent( delete(r, "join_authorised_via_users_server") } + // for power level events we need to replace the userID with the pseudoID + if roomVersion == gomatrixserverlib.RoomVersionPseudoIDs && eventType == spec.MRoomPowerLevels { + err = updatePowerLevels(req, r, roomID, rsAPI) + if err != nil { + return util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{Err: err.Error()}, + } + } + } + evTime, err := httputil.ParseTSParam(req) if err != nil { return util.JSONResponse{ @@ -129,7 +141,7 @@ func SendEvent( } } - e, resErr := generateSendEvent(req.Context(), r, device, roomID, eventType, stateKey, cfg, rsAPI, evTime) + e, resErr := generateSendEvent(req.Context(), r, device, roomID, eventType, stateKey, rsAPI, evTime) if resErr != nil { return *resErr } @@ -225,6 +237,28 @@ func SendEvent( return res } +func updatePowerLevels(req *http.Request, r map[string]interface{}, roomID string, rsAPI api.ClientRoomserverAPI) error { + userMap := r["users"].(map[string]interface{}) + validRoomID, err := spec.NewRoomID(roomID) + if err != nil { + return err + } + for user, level := range userMap { + uID, err := spec.NewUserID(user, true) + if err != nil { + continue // we're modifying the map in place, so we're going to have invalid userIDs after the first iteration + } + senderID, err := rsAPI.QuerySenderIDForUser(req.Context(), *validRoomID, *uID) + if err != nil { + return err + } + userMap[string(senderID)] = level + delete(userMap, user) + } + r["users"] = userMap + return nil +} + // stateEqual compares the new and the existing state event content. If they are equal, returns a *util.JSONResponse // with the existing event_id, making this an idempotent request. func stateEqual(ctx context.Context, rsAPI api.ClientRoomserverAPI, eventType, stateKey, roomID string, newContent map[string]interface{}) *util.JSONResponse { @@ -261,7 +295,6 @@ func generateSendEvent( r map[string]interface{}, device *userapi.Device, roomID, eventType string, stateKey *string, - cfg *config.ClientAPI, rsAPI api.ClientRoomserverAPI, evTime time.Time, ) (gomatrixserverlib.PDU, *util.JSONResponse) { @@ -304,7 +337,7 @@ func generateSendEvent( } } - identity, err := cfg.Matrix.SigningIdentityFor(device.UserDomain()) + identity, err := rsAPI.SigningIdentityFor(ctx, *validRoomID, *fullUserID) if err != nil { return nil, &util.JSONResponse{ Code: http.StatusInternalServerError, @@ -313,7 +346,7 @@ func generateSendEvent( } var queryRes api.QueryLatestEventsAndStateResponse - e, err := eventutil.QueryAndBuildEvent(ctx, &proto, identity, evTime, rsAPI, &queryRes) + e, err := eventutil.QueryAndBuildEvent(ctx, &proto, &identity, evTime, rsAPI, &queryRes) switch specificErr := err.(type) { case nil: case eventutil.ErrRoomNoExists: diff --git a/clientapi/routing/server_notices.go b/clientapi/routing/server_notices.go index 7006ced46..66258a68a 100644 --- a/clientapi/routing/server_notices.go +++ b/clientapi/routing/server_notices.go @@ -221,7 +221,7 @@ func SendServerNotice( "body": r.Content.Body, "msgtype": r.Content.MsgType, } - e, resErr := generateSendEvent(ctx, request, senderDevice, roomID, "m.room.message", nil, cfgClient, rsAPI, time.Now()) + e, resErr := generateSendEvent(ctx, request, senderDevice, roomID, "m.room.message", nil, rsAPI, time.Now()) if resErr != nil { logrus.Errorf("failed to send message: %+v", resErr) return *resErr @@ -350,7 +350,7 @@ func getSenderDevice( if len(deviceRes.Devices) > 0 { // If there were changes to the profile, create a new membership event if displayNameChanged || avatarChanged { - _, err = updateProfile(ctx, rsAPI, &deviceRes.Devices[0], profile, accRes.Account.UserID, cfg, time.Now()) + _, err = updateProfile(ctx, rsAPI, &deviceRes.Devices[0], profile, accRes.Account.UserID, time.Now()) if err != nil { return nil, err } diff --git a/federationapi/consumers/roomserver.go b/federationapi/consumers/roomserver.go index c6ad3f748..6dd2fd345 100644 --- a/federationapi/consumers/roomserver.go +++ b/federationapi/consumers/roomserver.go @@ -192,7 +192,7 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent, rew evs[i] = addsStateEvents[i].PDU } - addsJoinedHosts, err := JoinedHostsFromEvents(evs) + addsJoinedHosts, err := JoinedHostsFromEvents(s.ctx, evs, s.rsAPI) if err != nil { return err } @@ -345,7 +345,7 @@ func (s *OutputRoomEventConsumer) joinedHostsAtEvent( return nil, err } - combinedAddsJoinedHosts, err := JoinedHostsFromEvents(combinedAddsEvents) + combinedAddsJoinedHosts, err := JoinedHostsFromEvents(s.ctx, combinedAddsEvents, s.rsAPI) if err != nil { return nil, err } @@ -394,7 +394,7 @@ func (s *OutputRoomEventConsumer) joinedHostsAtEvent( // JoinedHostsFromEvents turns a list of state events into a list of joined hosts. // This errors if one of the events was invalid. // It should be impossible for an invalid event to get this far in the pipeline. -func JoinedHostsFromEvents(evs []gomatrixserverlib.PDU) ([]types.JoinedHost, error) { +func JoinedHostsFromEvents(ctx context.Context, evs []gomatrixserverlib.PDU, rsAPI api.FederationRoomserverAPI) ([]types.JoinedHost, error) { var joinedHosts []types.JoinedHost for _, ev := range evs { if ev.Type() != "m.room.member" || ev.StateKey() == nil { @@ -407,12 +407,17 @@ func JoinedHostsFromEvents(evs []gomatrixserverlib.PDU) ([]types.JoinedHost, err if membership != spec.Join { continue } - _, serverName, err := gomatrixserverlib.SplitID('@', *ev.StateKey()) + validRoomID, err := spec.NewRoomID(ev.RoomID()) if err != nil { return nil, err } + userID, err := rsAPI.QueryUserIDForSender(ctx, *validRoomID, spec.SenderID(*ev.StateKey())) + if err != nil { + return nil, err + } + joinedHosts = append(joinedHosts, types.JoinedHost{ - MemberEventID: ev.EventID(), ServerName: serverName, + MemberEventID: ev.EventID(), ServerName: userID.Domain(), }) } return joinedHosts, nil diff --git a/federationapi/internal/perform.go b/federationapi/internal/perform.go index 7f61dba41..515b3377d 100644 --- a/federationapi/internal/perform.go +++ b/federationapi/internal/perform.go @@ -2,6 +2,7 @@ package internal import ( "context" + "crypto/ed25519" "encoding/json" "errors" "fmt" @@ -170,13 +171,24 @@ func (r *FederationInternalAPI) performJoinUsingServer( UserIDQuerier: func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) { return r.rsAPI.QueryUserIDForSender(ctx, roomID, senderID) }, - SenderIDCreator: func(ctx context.Context, userID spec.UserID, roomID spec.RoomID) (spec.SenderID, error) { + GetOrCreateSenderID: func(ctx context.Context, userID spec.UserID, roomID spec.RoomID, roomVersion string) (spec.SenderID, ed25519.PrivateKey, error) { + // assign a roomNID, otherwise we can't create a private key for the user + _, nidErr := r.rsAPI.AssignRoomNID(ctx, roomID, gomatrixserverlib.RoomVersion(roomVersion)) + if nidErr != nil { + return "", nil, nidErr + } key, keyErr := r.rsAPI.GetOrCreateUserRoomPrivateKey(ctx, userID, roomID) if keyErr != nil { - return "", keyErr + return "", nil, keyErr } - - return spec.SenderID(spec.Base64Bytes(key).Encode()), nil + return spec.SenderIDFromPseudoIDKey(key), key, nil + }, + StoreSenderIDFromPublicID: func(ctx context.Context, senderID spec.SenderID, userIDRaw string, roomID spec.RoomID) error { + storeUserID, userErr := spec.NewUserID(userIDRaw, true) + if userErr != nil { + return userErr + } + return r.rsAPI.StoreUserRoomPublicKey(ctx, senderID, *storeUserID, roomID) }, } response, joinErr := gomatrixserverlib.PerformJoin(ctx, r, joinInput) @@ -200,7 +212,7 @@ func (r *FederationInternalAPI) performJoinUsingServer( // joining a room, waiting for 200 OK then changing device keys and have those keys not be sent // to other servers (this was a cause of a flakey sytest "Local device key changes get to remote servers") // The events are trusted now as we performed auth checks above. - joinedHosts, err := consumers.JoinedHostsFromEvents(response.StateSnapshot.GetStateEvents().TrustedEvents(response.JoinEvent.Version(), false)) + joinedHosts, err := consumers.JoinedHostsFromEvents(ctx, response.StateSnapshot.GetStateEvents().TrustedEvents(response.JoinEvent.Version(), false), r.rsAPI) if err != nil { return fmt.Errorf("JoinedHostsFromEvents: failed to get joined hosts: %s", err) } diff --git a/federationapi/routing/join.go b/federationapi/routing/join.go index 7aa50f65a..bfa1ba8b8 100644 --- a/federationapi/routing/join.go +++ b/federationapi/routing/join.go @@ -15,6 +15,7 @@ package routing import ( + "context" "fmt" "net/http" "sort" @@ -107,6 +108,10 @@ func MakeJoin( } } + if senderID == "" { + senderID = spec.SenderID(userID.String()) + } + input := gomatrixserverlib.HandleMakeJoinInput{ Context: httpReq.Context(), UserID: userID, @@ -218,6 +223,13 @@ func SendJoin( UserIDQuerier: func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) { return rsAPI.QueryUserIDForSender(httpReq.Context(), roomID, senderID) }, + StoreSenderIDFromPublicID: func(ctx context.Context, senderID spec.SenderID, userIDRaw string, roomID spec.RoomID) error { + userID, userErr := spec.NewUserID(userIDRaw, true) + if userErr != nil { + return userErr + } + return rsAPI.StoreUserRoomPublicKey(ctx, senderID, *userID, roomID) + }, } response, joinErr := gomatrixserverlib.HandleSendJoin(input) switch e := joinErr.(type) { diff --git a/go.mod b/go.mod index 930db3958..f43760e31 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530 - github.com/matrix-org/gomatrixserverlib v0.0.0-20230614140620-4dea2171c8f1 + github.com/matrix-org/gomatrixserverlib v0.0.0-20230628151943-f6e3c7f7b093 github.com/matrix-org/pinecone v0.11.1-0.20230210171230-8c3b24f2649a github.com/matrix-org/util v0.0.0-20221111132719-399730281e66 github.com/mattn/go-sqlite3 v1.14.16 @@ -43,6 +43,7 @@ require ( github.com/yggdrasil-network/yggdrasil-go v0.4.6 go.uber.org/atomic v1.10.0 golang.org/x/crypto v0.10.0 + golang.org/x/exp v0.0.0-20221205204356-47842c84f3db golang.org/x/image v0.5.0 golang.org/x/mobile v0.0.0-20221020085226-b36e6246172e golang.org/x/sync v0.1.0 @@ -124,7 +125,6 @@ require ( github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.1 // indirect go.etcd.io/bbolt v1.3.6 // indirect - golang.org/x/exp v0.0.0-20221205204356-47842c84f3db // indirect golang.org/x/mod v0.8.0 // indirect golang.org/x/net v0.10.0 // indirect golang.org/x/sys v0.9.0 // indirect diff --git a/go.sum b/go.sum index cf6993938..e261f551f 100644 --- a/go.sum +++ b/go.sum @@ -323,8 +323,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 h1:s7fexw github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1:e+cg2q7C7yE5QnAXgzo512tgFh1RbQLC0+jozuegKgo= github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530 h1:kHKxCOLcHH8r4Fzarl4+Y3K5hjothkVW5z7T1dUM11U= github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= -github.com/matrix-org/gomatrixserverlib v0.0.0-20230614140620-4dea2171c8f1 h1:k75Fy0iQVbDjvddip/x898+BdyopBNAfL1BMNx0awA0= -github.com/matrix-org/gomatrixserverlib v0.0.0-20230614140620-4dea2171c8f1/go.mod h1:H9V9N3Uqn1bBJqYJNGK1noqtgJTaCEhtTdcH/mp50uU= +github.com/matrix-org/gomatrixserverlib v0.0.0-20230628151943-f6e3c7f7b093 h1:FHd3SYhU2ZxZhkssZ/7ms5+M2j+g94lYp8ztvA1E6tA= +github.com/matrix-org/gomatrixserverlib v0.0.0-20230628151943-f6e3c7f7b093/go.mod h1:H9V9N3Uqn1bBJqYJNGK1noqtgJTaCEhtTdcH/mp50uU= github.com/matrix-org/pinecone v0.11.1-0.20230210171230-8c3b24f2649a h1:awrPDf9LEFySxTLKYBMCiObelNx/cBuv/wzllvCCH3A= github.com/matrix-org/pinecone v0.11.1-0.20230210171230-8c3b24f2649a/go.mod h1:HchJX9oKMXaT2xYFs0Ha/6Zs06mxLU8k6F1ODnrGkeQ= github.com/matrix-org/util v0.0.0-20221111132719-399730281e66 h1:6z4KxomXSIGWqhHcfzExgkH3Z3UkIXry4ibJS4Aqz2Y= diff --git a/roomserver/api/api.go b/roomserver/api/api.go index e2dd5dd73..ab56529c5 100644 --- a/roomserver/api/api.go +++ b/roomserver/api/api.go @@ -5,6 +5,7 @@ import ( "crypto/ed25519" "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/gomatrixserverlib/fclient" "github.com/matrix-org/gomatrixserverlib/spec" "github.com/matrix-org/util" @@ -73,6 +74,7 @@ type RoomserverInternalAPI interface { type UserRoomPrivateKeyCreator interface { // GetOrCreateUserRoomPrivateKey gets the user room key for the specified user. If no key exists yet, a new one is created. GetOrCreateUserRoomPrivateKey(ctx context.Context, userID spec.UserID, roomID spec.RoomID) (ed25519.PrivateKey, error) + StoreUserRoomPublicKey(ctx context.Context, senderID spec.SenderID, userID spec.UserID, roomID spec.RoomID) error } type InputRoomEventsAPI interface { @@ -184,6 +186,7 @@ type ClientRoomserverAPI interface { QueryBulkStateContentAPI QueryEventsAPI QuerySenderIDAPI + UserRoomPrivateKeyCreator QueryMembershipForUser(ctx context.Context, req *QueryMembershipForUserRequest, res *QueryMembershipForUserResponse) error QueryMembershipsForRoom(ctx context.Context, req *QueryMembershipsForRoomRequest, res *QueryMembershipsForRoomResponse) error QueryRoomsForUser(ctx context.Context, req *QueryRoomsForUserRequest, res *QueryRoomsForUserResponse) error @@ -213,6 +216,7 @@ type ClientRoomserverAPI interface { PerformForget(ctx context.Context, req *PerformForgetRequest, resp *PerformForgetResponse) error SetRoomAlias(ctx context.Context, req *SetRoomAliasRequest, res *SetRoomAliasResponse) error RemoveRoomAlias(ctx context.Context, req *RemoveRoomAliasRequest, res *RemoveRoomAliasResponse) error + SigningIdentityFor(ctx context.Context, roomID spec.RoomID, senderID spec.UserID) (fclient.SigningIdentity, error) } type UserRoomserverAPI interface { @@ -232,7 +236,8 @@ type FederationRoomserverAPI interface { QueryBulkStateContentAPI QuerySenderIDAPI UserRoomPrivateKeyCreator - + AssignRoomNID(ctx context.Context, roomID spec.RoomID, roomVersion gomatrixserverlib.RoomVersion) (roomNID types.RoomNID, err error) + SigningIdentityFor(ctx context.Context, roomID spec.RoomID, senderID spec.UserID) (fclient.SigningIdentity, error) // QueryServerBannedFromRoom returns whether a server is banned from a room by server ACLs. QueryServerBannedFromRoom(ctx context.Context, req *QueryServerBannedFromRoomRequest, res *QueryServerBannedFromRoomResponse) error QueryMembershipForUser(ctx context.Context, req *QueryMembershipForUserRequest, res *QueryMembershipForUserResponse) error diff --git a/roomserver/api/query.go b/roomserver/api/query.go index 684a5b0e3..b6140afd5 100644 --- a/roomserver/api/query.go +++ b/roomserver/api/query.go @@ -174,6 +174,8 @@ type QueryServerJoinedToRoomResponse struct { RoomExists bool `json:"room_exists"` // True if we still believe that the server is participating in the room IsInRoom bool `json:"is_in_room"` + // The roomversion if joined to room + RoomVersion gomatrixserverlib.RoomVersion } // QueryServerAllowedToSeeEventRequest is a request to QueryServerAllowedToSeeEvent diff --git a/roomserver/internal/alias.go b/roomserver/internal/alias.go index e6fb73383..b04a56fe8 100644 --- a/roomserver/internal/alias.go +++ b/roomserver/internal/alias.go @@ -115,6 +115,7 @@ func (r *RoomserverInternalAPI) GetAliasesForRoomID( // nolint:gocyclo // RemoveRoomAlias implements alias.RoomserverInternalAPI +// nolint: gocyclo func (r *RoomserverInternalAPI) RemoveRoomAlias( ctx context.Context, request *api.RemoveRoomAliasRequest, @@ -188,9 +189,11 @@ func (r *RoomserverInternalAPI) RemoveRoomAlias( return err } - senderDomain := sender.Domain() - - identity, err := r.Cfg.Global.SigningIdentityFor(senderDomain) + validRoomID, err := spec.NewRoomID(roomID) + if err != nil { + return err + } + identity, err := r.SigningIdentityFor(ctx, *validRoomID, *sender) if err != nil { return err } @@ -216,7 +219,7 @@ func (r *RoomserverInternalAPI) RemoveRoomAlias( return err } - newEvent, err := eventutil.BuildEvent(ctx, proto, identity, time.Now(), &eventsNeeded, stateRes) + newEvent, err := eventutil.BuildEvent(ctx, proto, &identity, time.Now(), &eventsNeeded, stateRes) if err != nil { return err } diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go index 7943ae5c0..2e12671ff 100644 --- a/roomserver/internal/api.go +++ b/roomserver/internal/api.go @@ -6,6 +6,7 @@ import ( "github.com/getsentry/sentry-go" "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/gomatrixserverlib/fclient" "github.com/matrix-org/gomatrixserverlib/spec" "github.com/matrix-org/util" "github.com/nats-io/nats.go" @@ -110,11 +111,6 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio r.fsAPI = fsAPI r.KeyRing = keyRing - identity, err := r.Cfg.Global.SigningIdentityFor(r.ServerName) - if err != nil { - logrus.Panic(err) - } - r.Inputer = &input.Inputer{ Cfg: &r.Cfg.RoomServer, ProcessContext: r.ProcessContext, @@ -125,7 +121,7 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio NATSClient: r.NATSClient, Durable: nats.Durable(r.Durable), ServerName: r.ServerName, - SigningIdentity: identity, + SigningIdentity: r.SigningIdentityFor, FSAPI: fsAPI, KeyRing: keyRing, ACLs: r.ServerACLs, @@ -292,3 +288,45 @@ func (r *RoomserverInternalAPI) GetOrCreateUserRoomPrivateKey(ctx context.Contex } return key, nil } + +func (r *RoomserverInternalAPI) StoreUserRoomPublicKey(ctx context.Context, senderID spec.SenderID, userID spec.UserID, roomID spec.RoomID) error { + pubKeyBytes, err := senderID.RawBytes() + if err != nil { + return err + } + _, err = r.DB.InsertUserRoomPublicKey(ctx, userID, roomID, ed25519.PublicKey(pubKeyBytes)) + return err +} + +func (r *RoomserverInternalAPI) SigningIdentityFor(ctx context.Context, roomID spec.RoomID, senderID spec.UserID) (fclient.SigningIdentity, error) { + roomVersion, ok := r.Cache.GetRoomVersion(roomID.String()) + if !ok { + roomInfo, err := r.DB.RoomInfo(ctx, roomID.String()) + if err != nil { + return fclient.SigningIdentity{}, err + } + if roomInfo != nil { + roomVersion = roomInfo.RoomVersion + } + } + if roomVersion == gomatrixserverlib.RoomVersionPseudoIDs { + privKey, err := r.GetOrCreateUserRoomPrivateKey(ctx, senderID, roomID) + if err != nil { + return fclient.SigningIdentity{}, err + } + return fclient.SigningIdentity{ + PrivateKey: privKey, + KeyID: "ed25519:1", + ServerName: "self", + }, nil + } + identity, err := r.Cfg.Global.SigningIdentityFor(senderID.Domain()) + if err != nil { + return fclient.SigningIdentity{}, err + } + return *identity, err +} + +func (r *RoomserverInternalAPI) AssignRoomNID(ctx context.Context, roomID spec.RoomID, roomVersion gomatrixserverlib.RoomVersion) (roomNID types.RoomNID, err error) { + return r.DB.AssignRoomNID(ctx, roomID, roomVersion) +} diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 3db2d0a67..dea8f8c87 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -81,7 +81,7 @@ type Inputer struct { JetStream nats.JetStreamContext Durable nats.SubOpt ServerName spec.ServerName - SigningIdentity *fclient.SigningIdentity + SigningIdentity func(ctx context.Context, roomID spec.RoomID, senderID spec.UserID) (fclient.SigningIdentity, error) FSAPI fedapi.RoomserverFederationAPI KeyRing gomatrixserverlib.JSONVerifier ACLs *acls.ServerACLs diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index aa05d9594..db3c95502 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -406,7 +406,7 @@ func (r *Inputer) processRoomEvent( ) if !isRejected && !isCreateEvent { resolver := state.NewStateResolution(r.DB, roomInfo, r.Queryer) - redactionEvent, redactedEvent, err = r.DB.MaybeRedactEvent(ctx, roomInfo, eventNID, event, &resolver) + redactionEvent, redactedEvent, err = r.DB.MaybeRedactEvent(ctx, roomInfo, eventNID, event, &resolver, r.Queryer) if err != nil { return err } @@ -895,7 +895,22 @@ func (r *Inputer) kickGuests(ctx context.Context, event gomatrixserverlib.PDU, r return err } - event, err := eventutil.BuildEvent(ctx, fledglingEvent, r.SigningIdentity, time.Now(), &eventsNeeded, latestRes) + validRoomID, err := spec.NewRoomID(event.RoomID()) + if err != nil { + return err + } + + userID, err := spec.NewUserID(stateKey, true) + if err != nil { + return err + } + + signingIdentity, err := r.SigningIdentity(ctx, *validRoomID, *userID) + if err != nil { + return err + } + + event, err := eventutil.BuildEvent(ctx, fledglingEvent, &signingIdentity, time.Now(), &eventsNeeded, latestRes) if err != nil { return err } diff --git a/roomserver/internal/perform/perform_backfill.go b/roomserver/internal/perform/perform_backfill.go index 3fdc8e4d0..33200e819 100644 --- a/roomserver/internal/perform/perform_backfill.go +++ b/roomserver/internal/perform/perform_backfill.go @@ -647,7 +647,7 @@ func persistEvents(ctx context.Context, db storage.Database, querier api.QuerySe resolver := state.NewStateResolution(db, roomInfo, querier) - _, redactedEvent, err := db.MaybeRedactEvent(ctx, roomInfo, eventNID, ev, &resolver) + _, redactedEvent, err := db.MaybeRedactEvent(ctx, roomInfo, eventNID, ev, &resolver, querier) if err != nil { logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to redact event") continue diff --git a/roomserver/internal/perform/perform_create_room.go b/roomserver/internal/perform/perform_create_room.go index dcaf8dca6..8c9656453 100644 --- a/roomserver/internal/perform/perform_create_room.go +++ b/roomserver/internal/perform/perform_create_room.go @@ -31,6 +31,7 @@ import ( "github.com/matrix-org/gomatrixserverlib/fclient" "github.com/matrix-org/gomatrixserverlib/spec" "github.com/matrix-org/util" + "github.com/sirupsen/logrus" ) const ( @@ -64,6 +65,16 @@ func (c *Creator) PerformCreateRoom(ctx context.Context, userID spec.UserID, roo } } } + + _, err = c.DB.AssignRoomNID(ctx, roomID, createRequest.RoomVersion) + if err != nil { + util.GetLogger(ctx).WithError(err).Error("failed to assign roomNID") + return "", &util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } + } + var senderID spec.SenderID if createRequest.RoomVersion == gomatrixserverlib.RoomVersionPseudoIDs { // create user room key if needed @@ -75,7 +86,7 @@ func (c *Creator) PerformCreateRoom(ctx context.Context, userID spec.UserID, roo JSON: spec.InternalServerError{}, } } - senderID = spec.SenderID(spec.Base64Bytes(key.Public().(ed25519.PublicKey)).Encode()) + senderID = spec.SenderIDFromPseudoIDKey(key) } else { senderID = spec.SenderID(userID.String()) } @@ -138,13 +149,59 @@ func (c *Creator) PerformCreateRoom(ctx context.Context, userID spec.UserID, roo membershipEvent := gomatrixserverlib.FledglingEvent{ Type: spec.MRoomMember, StateKey: string(senderID), - Content: gomatrixserverlib.MemberContent{ - Membership: spec.Join, - DisplayName: createRequest.UserDisplayName, - AvatarURL: createRequest.UserAvatarURL, - }, } + memberContent := gomatrixserverlib.MemberContent{ + Membership: spec.Join, + DisplayName: createRequest.UserDisplayName, + AvatarURL: createRequest.UserAvatarURL, + } + + // get the signing identity + identity, err := c.Cfg.Matrix.SigningIdentityFor(userID.Domain()) // we MUST use the server signing mxid_mapping + if err != nil { + logrus.WithError(err).WithField("domain", userID.Domain()).Error("unable to find signing identity for domain") + return "", &util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } + } + + // If we are creating a room with pseudo IDs, create and sign the MXIDMapping + if createRequest.RoomVersion == gomatrixserverlib.RoomVersionPseudoIDs { + var pseudoIDKey ed25519.PrivateKey + pseudoIDKey, err = c.RSAPI.GetOrCreateUserRoomPrivateKey(ctx, userID, roomID) + if err != nil { + util.GetLogger(ctx).WithError(err).Error("GetOrCreateUserRoomPrivateKey failed") + return "", &util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } + } + + mapping := &gomatrixserverlib.MXIDMapping{ + UserRoomKey: spec.SenderIDFromPseudoIDKey(pseudoIDKey), + UserID: userID.String(), + } + + // Sign the mapping with the server identity + if err = mapping.Sign(identity.ServerName, identity.KeyID, identity.PrivateKey); err != nil { + return "", &util.JSONResponse{ + Code: http.StatusInternalServerError, + JSON: spec.InternalServerError{}, + } + } + memberContent.MXIDMapping = mapping + + // sign all events with the pseudo ID key + identity = &fclient.SigningIdentity{ + ServerName: "self", + KeyID: "ed25519:1", + PrivateKey: pseudoIDKey, + } + } + membershipEvent.Content = memberContent + var nameEvent *gomatrixserverlib.FledglingEvent var topicEvent *gomatrixserverlib.FledglingEvent var guestAccessEvent *gomatrixserverlib.FledglingEvent @@ -322,7 +379,7 @@ func (c *Creator) PerformCreateRoom(ctx context.Context, userID spec.UserID, roo JSON: spec.InternalServerError{}, } } - ev, err = builder.Build(createRequest.EventTime, userID.Domain(), createRequest.KeyID, createRequest.PrivateKey) + ev, err = builder.Build(createRequest.EventTime, identity.ServerName, identity.KeyID, identity.PrivateKey) if err != nil { util.GetLogger(ctx).WithError(err).Error("buildEvent failed") return "", &util.JSONResponse{ @@ -363,17 +420,8 @@ func (c *Creator) PerformCreateRoom(ctx context.Context, userID spec.UserID, roo }) } - // first send the `m.room.create` event, so we have a roomNID - if err = api.SendInputRoomEvents(ctx, c.RSAPI, userID.Domain(), inputs[:1], false); err != nil { - util.GetLogger(ctx).WithError(err).Error("roomserverAPI.SendInputRoomEvents failed") - return "", &util.JSONResponse{ - Code: http.StatusInternalServerError, - JSON: spec.InternalServerError{}, - } - } - - // send the remaining events - if err = api.SendInputRoomEvents(ctx, c.RSAPI, userID.Domain(), inputs[1:], false); err != nil { + // send the events to the roomserver + if err = api.SendInputRoomEvents(ctx, c.RSAPI, userID.Domain(), inputs, false); err != nil { util.GetLogger(ctx).WithError(err).Error("roomserverAPI.SendInputRoomEvents failed") return "", &util.JSONResponse{ Code: http.StatusInternalServerError, @@ -483,11 +531,6 @@ func (c *Creator) PerformCreateRoom(ctx context.Context, userID spec.UserID, roo } // Build the invite event. - identity := &fclient.SigningIdentity{ - ServerName: userID.Domain(), - KeyID: createRequest.KeyID, - PrivateKey: createRequest.PrivateKey, - } inviteEvent, err = eventutil.QueryAndBuildEvent(ctx, &proto, identity, createRequest.EventTime, c.RSAPI, nil) if err != nil { diff --git a/roomserver/internal/perform/perform_invite.go b/roomserver/internal/perform/perform_invite.go index babd5f812..f19a508a3 100644 --- a/roomserver/internal/perform/perform_invite.go +++ b/roomserver/internal/perform/perform_invite.go @@ -153,6 +153,23 @@ func (r *Inviter) PerformInvite( } isTargetLocal := r.Cfg.Matrix.IsLocalServerName(invitedUser.Domain()) + // If we're inviting a local user, we can generate the needed pseudoID key here. (if needed) + if isTargetLocal { + var roomVersion gomatrixserverlib.RoomVersion + roomVersion, err = r.DB.GetRoomVersion(ctx, event.RoomID()) + if err != nil { + return err + } + + switch roomVersion { + case gomatrixserverlib.RoomVersionPseudoIDs: + _, err = r.RSAPI.GetOrCreateUserRoomPrivateKey(ctx, *invitedUser, *validRoomID) + if err != nil { + return err + } + } + } + invitedSenderID, err := r.RSAPI.QuerySenderIDForUser(ctx, *validRoomID, *invitedUser) if err != nil { return fmt.Errorf("failed looking up senderID for invited user") diff --git a/roomserver/internal/perform/perform_join.go b/roomserver/internal/perform/perform_join.go index 5867ee6e0..c14554640 100644 --- a/roomserver/internal/perform/perform_join.go +++ b/roomserver/internal/perform/perform_join.go @@ -16,6 +16,7 @@ package perform import ( "context" + "crypto/ed25519" "database/sql" "errors" "fmt" @@ -24,6 +25,7 @@ import ( "github.com/getsentry/sentry-go" "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/gomatrixserverlib/fclient" "github.com/matrix-org/gomatrixserverlib/spec" "github.com/matrix-org/util" "github.com/sirupsen/logrus" @@ -202,14 +204,15 @@ func (r *Joiner) performJoinRoomByID( senderID, err = r.Queryer.QuerySenderIDForUser(ctx, *roomID, *userID) if err == nil { checkInvitePending = true - } else { + } + if senderID == "" { // create user room key if needed key, keyErr := r.RSAPI.GetOrCreateUserRoomPrivateKey(ctx, *userID, *roomID) if keyErr != nil { util.GetLogger(ctx).WithError(keyErr).Error("GetOrCreateUserRoomPrivateKey failed") return "", "", fmt.Errorf("GetOrCreateUserRoomPrivateKey failed: %w", keyErr) } - senderID = spec.SenderID(spec.Base64Bytes(key).Encode()) + senderID = spec.SenderIDFromPseudoIDKey(key) } default: checkInvitePending = true @@ -283,11 +286,39 @@ func (r *Joiner) performJoinRoomByID( // but everyone has since left. I suspect it does the wrong thing. var buildRes rsAPI.QueryLatestEventsAndStateResponse - identity, err := r.Cfg.Matrix.SigningIdentityFor(userDomain) + identity, err := r.RSAPI.SigningIdentityFor(ctx, *roomID, *userID) if err != nil { return "", "", fmt.Errorf("error joining local room: %q", err) } + // at this point we know we have an existing room + if inRoomRes.RoomVersion == gomatrixserverlib.RoomVersionPseudoIDs { + var pseudoIDKey ed25519.PrivateKey + pseudoIDKey, err = r.RSAPI.GetOrCreateUserRoomPrivateKey(ctx, *userID, *roomID) + if err != nil { + util.GetLogger(ctx).WithError(err).Error("GetOrCreateUserRoomPrivateKey failed") + return "", "", err + } + + mapping := &gomatrixserverlib.MXIDMapping{ + UserRoomKey: spec.SenderIDFromPseudoIDKey(pseudoIDKey), + UserID: userID.String(), + } + + // Sign the mapping with the server identity + if err = mapping.Sign(identity.ServerName, identity.KeyID, identity.PrivateKey); err != nil { + return "", "", err + } + req.Content["mxid_mapping"] = mapping + + // sign the event with the pseudo ID key + identity = fclient.SigningIdentity{ + ServerName: "self", + KeyID: "ed25519:1", + PrivateKey: pseudoIDKey, + } + } + senderIDString := string(senderID) // Prepare the template for the join event. @@ -317,7 +348,7 @@ func (r *Joiner) performJoinRoomByID( if err = proto.SetContent(req.Content); err != nil { return "", "", fmt.Errorf("eb.SetContent: %w", err) } - event, err := eventutil.QueryAndBuildEvent(ctx, &proto, identity, time.Now(), r.RSAPI, &buildRes) + event, err := eventutil.QueryAndBuildEvent(ctx, &proto, &identity, time.Now(), r.RSAPI, &buildRes) switch err.(type) { case nil: diff --git a/roomserver/internal/perform/perform_leave.go b/roomserver/internal/perform/perform_leave.go index e1ddb9b50..a20896cf7 100644 --- a/roomserver/internal/perform/perform_leave.go +++ b/roomserver/internal/perform/perform_leave.go @@ -177,12 +177,17 @@ func (r *Leaver) performLeaveRoomByID( // TODO: Check what happens if the room exists on the server // but everyone has since left. I suspect it does the wrong thing. + validRoomID, err := spec.NewRoomID(req.RoomID) + if err != nil { + return nil, err + } + var buildRes rsAPI.QueryLatestEventsAndStateResponse - identity, err := r.Cfg.Matrix.SigningIdentityFor(req.Leaver.Domain()) + identity, err := r.RSAPI.SigningIdentityFor(ctx, *validRoomID, req.Leaver) if err != nil { return nil, fmt.Errorf("SigningIdentityFor: %w", err) } - event, err := eventutil.QueryAndBuildEvent(ctx, &proto, identity, time.Now(), r.RSAPI, &buildRes) + event, err := eventutil.QueryAndBuildEvent(ctx, &proto, &identity, time.Now(), r.RSAPI, &buildRes) if err != nil { return nil, fmt.Errorf("eventutil.QueryAndBuildEvent: %w", err) } diff --git a/roomserver/internal/query/query.go b/roomserver/internal/query/query.go index 19fd456b5..918619e5e 100644 --- a/roomserver/internal/query/query.go +++ b/roomserver/internal/query/query.go @@ -478,6 +478,9 @@ func (r *Queryer) QueryServerJoinedToRoom( if err != nil { return fmt.Errorf("r.DB.RoomInfo: %w", err) } + if info != nil { + response.RoomVersion = info.RoomVersion + } if info == nil || info.IsStub() { return nil } diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go index 077957fa1..76b21ad23 100644 --- a/roomserver/roomserver_test.go +++ b/roomserver/roomserver_test.go @@ -35,6 +35,14 @@ import ( "github.com/matrix-org/dendrite/test/testrig" ) +type FakeQuerier struct { + api.QuerySenderIDAPI +} + +func (f *FakeQuerier) QueryUserIDForSender(ctx context.Context, roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) { + return spec.NewUserID(string(senderID), true) +} + func TestUsers(t *testing.T) { test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) { cfg, processCtx, close := testrig.CreateConfig(t, dbType) @@ -566,7 +574,7 @@ func TestRedaction(t *testing.T) { err = updater.Commit() assert.NoError(t, err) - _, redactedEvent, err := db.MaybeRedactEvent(ctx, roomInfo, eventNID, ev.PDU, &plResolver) + _, redactedEvent, err := db.MaybeRedactEvent(ctx, roomInfo, eventNID, ev.PDU, &plResolver, &FakeQuerier{}) assert.NoError(t, err) if redactedEvent != nil { assert.Equal(t, ev.Redacts(), redactedEvent.EventID()) diff --git a/roomserver/storage/interface.go b/roomserver/storage/interface.go index 7156c11cc..e9b4609ec 100644 --- a/roomserver/storage/interface.go +++ b/roomserver/storage/interface.go @@ -18,6 +18,7 @@ import ( "context" "crypto/ed25519" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib/spec" @@ -190,7 +191,7 @@ type Database interface { GetOrCreateEventTypeNID(ctx context.Context, eventType string) (eventTypeNID types.EventTypeNID, err error) GetOrCreateEventStateKeyNID(ctx context.Context, eventStateKey *string) (types.EventStateKeyNID, error) MaybeRedactEvent( - ctx context.Context, roomInfo *types.RoomInfo, eventNID types.EventNID, event gomatrixserverlib.PDU, plResolver state.PowerLevelResolver, + ctx context.Context, roomInfo *types.RoomInfo, eventNID types.EventNID, event gomatrixserverlib.PDU, plResolver state.PowerLevelResolver, querier api.QuerySenderIDAPI, ) (gomatrixserverlib.PDU, gomatrixserverlib.PDU, error) } @@ -251,7 +252,7 @@ type EventDatabase interface { // MaybeRedactEvent returns the redaction event and the redacted event if this call resulted in a redaction, else an error // (nil if there was nothing to do) MaybeRedactEvent( - ctx context.Context, roomInfo *types.RoomInfo, eventNID types.EventNID, event gomatrixserverlib.PDU, plResolver state.PowerLevelResolver, + ctx context.Context, roomInfo *types.RoomInfo, eventNID types.EventNID, event gomatrixserverlib.PDU, plResolver state.PowerLevelResolver, querier api.QuerySenderIDAPI, ) (gomatrixserverlib.PDU, gomatrixserverlib.PDU, error) StoreEvent(ctx context.Context, event gomatrixserverlib.PDU, roomInfo *types.RoomInfo, eventTypeNID types.EventTypeNID, eventStateKeyNID types.EventStateKeyNID, authEventNIDs []types.EventNID, isRejected bool) (types.EventNID, types.StateAtEvent, error) } diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index 61a3520a4..fc3ace6a6 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -10,6 +10,7 @@ import ( "sort" "github.com/matrix-org/dendrite/internal/eventutil" + "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib/spec" "github.com/matrix-org/util" @@ -991,6 +992,7 @@ func extractRoomVersionFromCreateEvent(event gomatrixserverlib.PDU) ( // Returns the redaction event and the redacted event if this call resulted in a redaction. func (d *EventDatabase) MaybeRedactEvent( ctx context.Context, roomInfo *types.RoomInfo, eventNID types.EventNID, event gomatrixserverlib.PDU, plResolver state.PowerLevelResolver, + querier api.QuerySenderIDAPI, ) (gomatrixserverlib.PDU, gomatrixserverlib.PDU, error) { var ( redactionEvent, redactedEvent *types.Event @@ -1030,15 +1032,18 @@ func (d *EventDatabase) MaybeRedactEvent( return nil } - // TODO: Don't hack senderID into userID here (pseudoIDs) + var validRoomID *spec.RoomID + validRoomID, err = spec.NewRoomID(redactedEvent.RoomID()) + if err != nil { + return err + } sender1Domain := "" - sender1, err1 := spec.NewUserID(string(redactedEvent.SenderID()), true) + sender1, err1 := querier.QueryUserIDForSender(ctx, *validRoomID, redactedEvent.SenderID()) if err1 == nil { sender1Domain = string(sender1.Domain()) } - // TODO: Don't hack senderID into userID here (pseudoIDs) sender2Domain := "" - sender2, err2 := spec.NewUserID(string(redactionEvent.SenderID()), true) + sender2, err2 := querier.QueryUserIDForSender(ctx, *validRoomID, redactionEvent.SenderID()) if err2 == nil { sender2Domain = string(sender2.Domain()) } @@ -1698,6 +1703,7 @@ func (d *Database) InsertUserRoomPublicKey(ctx context.Context, userID spec.User // SelectUserRoomPrivateKey queries the users room private key. // If no key exists, returns no key and no error. Otherwise returns // the key and a database error, if any. +// TODO: Cache this? func (d *Database) SelectUserRoomPrivateKey(ctx context.Context, userID spec.UserID, roomID spec.RoomID) (key ed25519.PrivateKey, err error) { uID := userID.String() stateKeyNIDMap, sErr := d.eventStateKeyNIDs(ctx, nil, []string{uID}) @@ -1756,58 +1762,54 @@ func (d *Database) SelectUserRoomPublicKey(ctx context.Context, userID spec.User // SelectUserIDsForPublicKeys returns a map from roomID -> map from senderKey -> userID func (d *Database) SelectUserIDsForPublicKeys(ctx context.Context, publicKeys map[spec.RoomID][]ed25519.PublicKey) (result map[spec.RoomID]map[string]string, err error) { result = make(map[spec.RoomID]map[string]string, len(publicKeys)) - err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { - // map all roomIDs to roomNIDs - query := make(map[types.RoomNID][]ed25519.PublicKey) - rooms := make(map[types.RoomNID]spec.RoomID) - for roomID, keys := range publicKeys { - roomNID, ok := d.Cache.GetRoomServerRoomNID(roomID.String()) - if !ok { - roomInfo, rErr := d.roomInfo(ctx, txn, roomID.String()) - if rErr != nil { - return rErr - } - if roomInfo == nil { - logrus.Warnf("missing room info for %s, there will be missing users in the response", roomID.String()) - continue - } - roomNID = roomInfo.RoomNID + // map all roomIDs to roomNIDs + query := make(map[types.RoomNID][]ed25519.PublicKey) + rooms := make(map[types.RoomNID]spec.RoomID) + for roomID, keys := range publicKeys { + roomNID, ok := d.Cache.GetRoomServerRoomNID(roomID.String()) + if !ok { + roomInfo, rErr := d.roomInfo(ctx, nil, roomID.String()) + if rErr != nil { + return nil, rErr } - - query[roomNID] = keys - rooms[roomNID] = roomID - } - - // get the user room key pars - userRoomKeyPairMap, sErr := d.UserRoomKeyTable.BulkSelectUserNIDs(ctx, txn, query) - if sErr != nil { - return sErr - } - nids := make([]types.EventStateKeyNID, 0, len(userRoomKeyPairMap)) - for _, nid := range userRoomKeyPairMap { - nids = append(nids, nid.EventStateKeyNID) - } - // get the userIDs - nidMap, seErr := d.EventStateKeys(ctx, nids) - if seErr != nil { - return seErr - } - - // build the result map (roomID -> map publicKey -> userID) - for publicKey, userRoomKeyPair := range userRoomKeyPairMap { - userID := nidMap[userRoomKeyPair.EventStateKeyNID] - roomID := rooms[userRoomKeyPair.RoomNID] - resMap, exists := result[roomID] - if !exists { - resMap = map[string]string{} + if roomInfo == nil { + logrus.Warnf("missing room info for %s, there will be missing users in the response", roomID.String()) + continue } - resMap[publicKey] = userID - result[roomID] = resMap + roomNID = roomInfo.RoomNID } - return nil - }) + query[roomNID] = keys + rooms[roomNID] = roomID + } + + // get the user room key pars + userRoomKeyPairMap, sErr := d.UserRoomKeyTable.BulkSelectUserNIDs(ctx, nil, query) + if sErr != nil { + return nil, sErr + } + nids := make([]types.EventStateKeyNID, 0, len(userRoomKeyPairMap)) + for _, nid := range userRoomKeyPairMap { + nids = append(nids, nid.EventStateKeyNID) + } + // get the userIDs + nidMap, seErr := d.EventStateKeys(ctx, nids) + if seErr != nil { + return nil, seErr + } + + // build the result map (roomID -> map publicKey -> userID) + for publicKey, userRoomKeyPair := range userRoomKeyPairMap { + userID := nidMap[userRoomKeyPair.EventStateKeyNID] + roomID := rooms[userRoomKeyPair.RoomNID] + resMap, exists := result[roomID] + if !exists { + resMap = map[string]string{} + } + resMap[publicKey] = userID + result[roomID] = resMap + } return result, err } diff --git a/roomserver/storage/sqlite3/user_room_keys_table.go b/roomserver/storage/sqlite3/user_room_keys_table.go index d58b8ac3f..5d6ddc9a8 100644 --- a/roomserver/storage/sqlite3/user_room_keys_table.go +++ b/roomserver/storage/sqlite3/user_room_keys_table.go @@ -57,6 +57,7 @@ const selectUserRoomPublicKeySQL = `SELECT pseudo_id_pub_key FROM roomserver_use const selectUserNIDsSQL = `SELECT user_nid, room_nid, pseudo_id_pub_key FROM roomserver_user_room_keys WHERE room_nid IN ($1) AND pseudo_id_pub_key IN ($2)` type userRoomKeysStatements struct { + db *sql.DB insertUserRoomPrivateKeyStmt *sql.Stmt insertUserRoomPublicKeyStmt *sql.Stmt selectUserRoomKeyStmt *sql.Stmt @@ -70,7 +71,7 @@ func CreateUserRoomKeysTable(db *sql.DB) error { } func PrepareUserRoomKeysTable(db *sql.DB) (tables.UserRoomKeys, error) { - s := &userRoomKeysStatements{} + s := &userRoomKeysStatements{db: db} return s, sqlutil.StatementList{ {&s.insertUserRoomPrivateKeyStmt, insertUserRoomKeySQL}, {&s.insertUserRoomPublicKeyStmt, insertUserRoomPublicKeySQL}, @@ -137,7 +138,7 @@ func (s *userRoomKeysStatements) BulkSelectUserNIDs(ctx context.Context, txn *sq selectSQL := strings.Replace(selectUserNIDsSQL, "($2)", sqlutil.QueryVariadicOffset(len(senders), len(senderKeys)), 1) selectSQL = strings.Replace(selectSQL, "($1)", sqlutil.QueryVariadic(len(senderKeys)), 1) // replace $1 with the roomNIDs - selectStmt, err := txn.Prepare(selectSQL) + selectStmt, err := s.db.Prepare(selectSQL) if err != nil { return nil, err } diff --git a/roomserver/types/headered_event.go b/roomserver/types/headered_event.go index 52d006bd9..783999822 100644 --- a/roomserver/types/headered_event.go +++ b/roomserver/types/headered_event.go @@ -18,6 +18,7 @@ import ( "unsafe" "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/gomatrixserverlib/spec" ) // HeaderedEvent is an Event which serialises to the headered form, which includes @@ -25,6 +26,10 @@ import ( type HeaderedEvent struct { gomatrixserverlib.PDU Visibility gomatrixserverlib.HistoryVisibility + // TODO: Remove this. This is a temporary workaround to store the userID in the syncAPI. + // It really should be the userKey instead. + UserID spec.UserID + StateKeyResolved *string } func (h *HeaderedEvent) CacheCost() int { diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 90f9ff67d..e6b5ddbb0 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -256,16 +256,19 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( } } - pduPos, err := s.db.WriteEvent( - ctx, - ev, - addsStateEvents, - msg.AddsStateEventIDs, - msg.RemovesStateEventIDs, - msg.TransactionID, - false, - msg.HistoryVisibility, - ) + validRoomID, err := spec.NewRoomID(ev.RoomID()) + if err != nil { + return err + } + + userID, err := s.rsAPI.QueryUserIDForSender(ctx, *validRoomID, ev.SenderID()) + if err != nil { + return err + } + + ev.UserID = *userID + + pduPos, err := s.db.WriteEvent(ctx, ev, addsStateEvents, msg.AddsStateEventIDs, msg.RemovesStateEventIDs, msg.TransactionID, false, msg.HistoryVisibility) if err != nil { // panic rather than continue with an inconsistent database log.WithFields(log.Fields{ @@ -315,16 +318,19 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent( // hack but until we have some better strategy for dealing with // old events in the sync API, this should at least prevent us // from confusing clients into thinking they've joined/left rooms. - pduPos, err := s.db.WriteEvent( - ctx, - ev, - []*rstypes.HeaderedEvent{}, - []string{}, // adds no state - []string{}, // removes no state - nil, // no transaction - ev.StateKey() != nil, // exclude from sync?, - msg.HistoryVisibility, - ) + + validRoomID, err := spec.NewRoomID(ev.RoomID()) + if err != nil { + return err + } + + userID, err := s.rsAPI.QueryUserIDForSender(ctx, *validRoomID, ev.SenderID()) + if err != nil { + return err + } + ev.UserID = *userID + + pduPos, err := s.db.WriteEvent(ctx, ev, []*rstypes.HeaderedEvent{}, []string{}, []string{}, nil, ev.StateKey() != nil, msg.HistoryVisibility) if err != nil { // panic rather than continue with an inconsistent database log.WithFields(log.Fields{ @@ -420,6 +426,8 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent( return } + msg.Event.UserID = *userID + pduPos, err := s.db.AddInviteEvent(ctx, msg.Event) if err != nil { sentry.CaptureException(err) @@ -537,6 +545,7 @@ func (s *OutputRoomEventConsumer) onPurgeRoom( } func (s *OutputRoomEventConsumer) updateStateEvent(event *rstypes.HeaderedEvent) (*rstypes.HeaderedEvent, error) { + event.StateKeyResolved = event.StateKey() if event.StateKey() == nil { return event, nil } @@ -556,6 +565,29 @@ func (s *OutputRoomEventConsumer) updateStateEvent(event *rstypes.HeaderedEvent) return event, err } + validRoomID, err := spec.NewRoomID(event.RoomID()) + if err != nil { + return event, err + } + + if event.StateKey() != nil { + if *event.StateKey() != "" { + var sku *spec.UserID + sku, err = s.rsAPI.QueryUserIDForSender(s.ctx, *validRoomID, spec.SenderID(stateKey)) + if err == nil && sku != nil { + sKey := sku.String() + event.StateKeyResolved = &sKey + } + } + } + + userID, err := s.rsAPI.QueryUserIDForSender(s.ctx, *validRoomID, event.SenderID()) + if err != nil { + return event, err + } + + event.UserID = *userID + if prevEvent == nil || prevEvent.EventID() == event.EventID() { return event, nil } diff --git a/syncapi/routing/search_test.go b/syncapi/routing/search_test.go index f6d7fb4eb..905a9a1ac 100644 --- a/syncapi/routing/search_test.go +++ b/syncapi/routing/search_test.go @@ -230,6 +230,7 @@ func TestSearch(t *testing.T) { stateEvents = append(stateEvents, x) stateEventIDs = append(stateEventIDs, x.EventID()) } + x.StateKeyResolved = x.StateKey() sp, err = db.WriteEvent(processCtx.Context(), x, stateEvents, stateEventIDs, nil, nil, false, gomatrixserverlib.HistoryVisibilityShared) assert.NoError(t, err) if x.Type() != "m.room.message" { diff --git a/syncapi/storage/postgres/current_room_state_table.go b/syncapi/storage/postgres/current_room_state_table.go index bfe5e9bdd..112fa9d4a 100644 --- a/syncapi/storage/postgres/current_room_state_table.go +++ b/syncapi/storage/postgres/current_room_state_table.go @@ -343,9 +343,9 @@ func (s *currentRoomStateStatements) UpsertRoomState( event.RoomID(), event.EventID(), event.Type(), - event.SenderID(), + event.UserID.String(), containsURL, - *event.StateKey(), + *event.StateKeyResolved, headeredJSON, membership, addedAt, diff --git a/syncapi/storage/postgres/invites_table.go b/syncapi/storage/postgres/invites_table.go index 267209bba..7b8d2d733 100644 --- a/syncapi/storage/postgres/invites_table.go +++ b/syncapi/storage/postgres/invites_table.go @@ -101,7 +101,7 @@ func (s *inviteEventsStatements) InsertInviteEvent( ctx, inviteEvent.RoomID(), inviteEvent.EventID(), - *inviteEvent.StateKey(), + inviteEvent.UserID.String(), headeredJSON, ).Scan(&streamPos) return diff --git a/syncapi/storage/postgres/memberships_table.go b/syncapi/storage/postgres/memberships_table.go index 3905f9abb..09b47432b 100644 --- a/syncapi/storage/postgres/memberships_table.go +++ b/syncapi/storage/postgres/memberships_table.go @@ -109,7 +109,7 @@ func (s *membershipsStatements) UpsertMembership( _, err = sqlutil.TxStmt(txn, s.upsertMembershipStmt).ExecContext( ctx, event.RoomID(), - *event.StateKey(), + event.StateKeyResolved, membership, event.EventID(), streamPos, diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index e068afab1..b58cf59f0 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -407,7 +407,7 @@ func (s *outputRoomEventsStatements) InsertEvent( event.EventID(), headeredJSON, event.Type(), - event.SenderID(), + event.UserID.String(), containsURL, pq.StringArray(addState), pq.StringArray(removeState), diff --git a/syncapi/storage/sqlite3/current_room_state_table.go b/syncapi/storage/sqlite3/current_room_state_table.go index e432e483b..3bd19b367 100644 --- a/syncapi/storage/sqlite3/current_room_state_table.go +++ b/syncapi/storage/sqlite3/current_room_state_table.go @@ -342,9 +342,9 @@ func (s *currentRoomStateStatements) UpsertRoomState( event.RoomID(), event.EventID(), event.Type(), - event.SenderID(), + event.UserID.String(), containsURL, - *event.StateKey(), + *event.StateKeyResolved, headeredJSON, membership, addedAt, diff --git a/syncapi/storage/sqlite3/invites_table.go b/syncapi/storage/sqlite3/invites_table.go index 347523cf7..7e0d895f1 100644 --- a/syncapi/storage/sqlite3/invites_table.go +++ b/syncapi/storage/sqlite3/invites_table.go @@ -108,7 +108,7 @@ func (s *inviteEventsStatements) InsertInviteEvent( streamPos, inviteEvent.RoomID(), inviteEvent.EventID(), - *inviteEvent.StateKey(), + inviteEvent.UserID.String(), headeredJSON, ) return diff --git a/syncapi/storage/sqlite3/memberships_table.go b/syncapi/storage/sqlite3/memberships_table.go index c09fa1510..a9e880d2a 100644 --- a/syncapi/storage/sqlite3/memberships_table.go +++ b/syncapi/storage/sqlite3/memberships_table.go @@ -112,7 +112,7 @@ func (s *membershipsStatements) UpsertMembership( _, err = sqlutil.TxStmt(txn, s.upsertMembershipStmt).ExecContext( ctx, event.RoomID(), - *event.StateKey(), + event.StateKeyResolved, membership, event.EventID(), streamPos, diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index 5a47aec44..06c65419a 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -348,7 +348,7 @@ func (s *outputRoomEventsStatements) InsertEvent( event.EventID(), headeredJSON, event.Type(), - event.SenderID(), + event.UserID.String(), containsURL, string(addStateJSON), string(removeStateJSON), diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go index f56e44a30..f57b0d618 100644 --- a/syncapi/storage/storage_test.go +++ b/syncapi/storage/storage_test.go @@ -43,6 +43,7 @@ func MustWriteEvents(t *testing.T, db storage.Database, events []*rstypes.Header var addStateEventIDs []string var removeStateEventIDs []string if ev.StateKey() != nil { + ev.StateKeyResolved = ev.StateKey() addStateEvents = append(addStateEvents, ev) addStateEventIDs = append(addStateEventIDs, ev.EventID()) } diff --git a/syncapi/storage/tables/current_room_state_test.go b/syncapi/storage/tables/current_room_state_test.go index 7d4ec812c..2df111a26 100644 --- a/syncapi/storage/tables/current_room_state_test.go +++ b/syncapi/storage/tables/current_room_state_test.go @@ -54,7 +54,13 @@ func TestCurrentRoomStateTable(t *testing.T) { events := room.CurrentState() err := sqlutil.WithTransaction(db, func(txn *sql.Tx) error { for i, ev := range events { - err := tab.UpsertRoomState(ctx, txn, ev, nil, types.StreamPosition(i)) + ev.StateKeyResolved = ev.StateKey() + userID, err := spec.NewUserID(string(ev.SenderID()), true) + if err != nil { + return err + } + ev.UserID = *userID + err = tab.UpsertRoomState(ctx, txn, ev, nil, types.StreamPosition(i)) if err != nil { return fmt.Errorf("failed to UpsertRoomState: %w", err) } diff --git a/syncapi/storage/tables/memberships_test.go b/syncapi/storage/tables/memberships_test.go index 4afa2ac5b..a421a9772 100644 --- a/syncapi/storage/tables/memberships_test.go +++ b/syncapi/storage/tables/memberships_test.go @@ -80,6 +80,7 @@ func TestMembershipsTable(t *testing.T) { defer cancel() for _, ev := range userEvents { + ev.StateKeyResolved = ev.StateKey() if err := table.UpsertMembership(ctx, nil, ev, types.StreamPosition(ev.Depth()), 1); err != nil { t.Fatalf("failed to upsert membership: %s", err) } @@ -134,6 +135,7 @@ func testUpsert(t *testing.T, ctx context.Context, table tables.Memberships, mem ev := room.CreateAndInsert(t, user, spec.MRoomMember, map[string]interface{}{ "membership": spec.Join, }, test.WithStateKey(user.ID)) + ev.StateKeyResolved = ev.StateKey() // Insert the same event again, but with different positions, which should get updated if err = table.UpsertMembership(ctx, nil, ev, 2, 2); err != nil { t.Fatalf("failed to upsert membership: %s", err) diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 7939dd8fa..1a4e5351d 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -3,6 +3,7 @@ package streams import ( "context" "database/sql" + "encoding/json" "fmt" "time" @@ -15,6 +16,8 @@ import ( "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib/spec" + "github.com/tidwall/gjson" + "github.com/tidwall/sjson" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/gomatrixserverlib" @@ -346,13 +349,40 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( // Now that we've filtered the timeline, work out which state events are still // left. Anything that appears in the filtered timeline will be removed from the // "state" section and kept in "timeline". + + // update the powerlevel event for timeline events + for i, ev := range events { + if ev.Version() != gomatrixserverlib.RoomVersionPseudoIDs { + continue + } + if ev.Type() != spec.MRoomPowerLevels || !ev.StateKeyEquals("") { + continue + } + var newEvent gomatrixserverlib.PDU + newEvent, err = p.updatePowerLevelEvent(ctx, ev) + if err != nil { + return r.From, err + } + events[i] = &rstypes.HeaderedEvent{PDU: newEvent} + } + sEvents := gomatrixserverlib.HeaderedReverseTopologicalOrdering( gomatrixserverlib.ToPDUs(removeDuplicates(delta.StateEvents, events)), gomatrixserverlib.TopologicalOrderByAuthEvents, ) delta.StateEvents = make([]*rstypes.HeaderedEvent, len(sEvents)) for i := range sEvents { - delta.StateEvents[i] = sEvents[i].(*rstypes.HeaderedEvent) + ev := sEvents[i] + delta.StateEvents[i] = ev.(*rstypes.HeaderedEvent) + // update the powerlevel event for state events + if ev.Version() == gomatrixserverlib.RoomVersionPseudoIDs && ev.Type() == spec.MRoomPowerLevels && ev.StateKeyEquals("") { + var newEvent gomatrixserverlib.PDU + newEvent, err = p.updatePowerLevelEvent(ctx, ev.(*rstypes.HeaderedEvent)) + if err != nil { + return r.From, err + } + delta.StateEvents[i] = &rstypes.HeaderedEvent{PDU: newEvent} + } } if len(delta.StateEvents) > 0 { @@ -421,6 +451,75 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( return latestPosition, nil } +func (p *PDUStreamProvider) updatePowerLevelEvent(ctx context.Context, ev *rstypes.HeaderedEvent) (gomatrixserverlib.PDU, error) { + pls, err := gomatrixserverlib.NewPowerLevelContentFromEvent(ev) + if err != nil { + return nil, err + } + newPls := make(map[string]int64) + var userID *spec.UserID + for user, level := range pls.Users { + validRoomID, _ := spec.NewRoomID(ev.RoomID()) + userID, err = p.rsAPI.QueryUserIDForSender(ctx, *validRoomID, spec.SenderID(user)) + if err != nil { + return nil, err + } + newPls[userID.String()] = level + } + var newPlBytes, newEv []byte + newPlBytes, err = json.Marshal(newPls) + if err != nil { + return nil, err + } + newEv, err = sjson.SetRawBytes(ev.JSON(), "content.users", newPlBytes) + if err != nil { + return nil, err + } + + // do the same for prev content + prevContent := gjson.GetBytes(ev.JSON(), "unsigned.prev_content") + if !prevContent.Exists() { + var evNew gomatrixserverlib.PDU + evNew, err = gomatrixserverlib.MustGetRoomVersion(gomatrixserverlib.RoomVersionPseudoIDs).NewEventFromTrustedJSON(newEv, false) + if err != nil { + return nil, err + } + + return evNew, err + } + pls = gomatrixserverlib.PowerLevelContent{} + err = json.Unmarshal([]byte(prevContent.Raw), &pls) + if err != nil { + return nil, err + } + + newPls = make(map[string]int64) + for user, level := range pls.Users { + validRoomID, _ := spec.NewRoomID(ev.RoomID()) + userID, err = p.rsAPI.QueryUserIDForSender(ctx, *validRoomID, spec.SenderID(user)) + if err != nil { + return nil, err + } + newPls[userID.String()] = level + } + newPlBytes, err = json.Marshal(newPls) + if err != nil { + return nil, err + } + newEv, err = sjson.SetRawBytes(newEv, "unsigned.prev_content.users", newPlBytes) + if err != nil { + return nil, err + } + + var evNew gomatrixserverlib.PDU + evNew, err = gomatrixserverlib.MustGetRoomVersion(gomatrixserverlib.RoomVersionPseudoIDs).NewEventFromTrustedJSON(newEv, false) + if err != nil { + return nil, err + } + + return evNew, err +} + // applyHistoryVisibilityFilter gets the current room state and supplies it to ApplyHistoryVisibilityFilter, to make // sure we always return the required events in the timeline. func applyHistoryVisibilityFilter( @@ -470,6 +569,7 @@ func applyHistoryVisibilityFilter( return events, nil } +// nolint: gocyclo func (p *PDUStreamProvider) getJoinResponseForCompleteSync( ctx context.Context, snapshot storage.DatabaseTransaction, @@ -563,6 +663,35 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( prevBatch.Decrement() } + // Update powerlevel events for timeline events + for i, ev := range events { + if ev.Version() != gomatrixserverlib.RoomVersionPseudoIDs { + continue + } + if ev.Type() != spec.MRoomPowerLevels || !ev.StateKeyEquals("") { + continue + } + newEvent, err := p.updatePowerLevelEvent(ctx, ev) + if err != nil { + return nil, err + } + events[i] = &rstypes.HeaderedEvent{PDU: newEvent} + } + // Update powerlevel events for state events + for i, ev := range stateEvents { + if ev.Version() != gomatrixserverlib.RoomVersionPseudoIDs { + continue + } + if ev.Type() != spec.MRoomPowerLevels || !ev.StateKeyEquals("") { + continue + } + newEvent, err := p.updatePowerLevelEvent(ctx, ev) + if err != nil { + return nil, err + } + stateEvents[i] = &rstypes.HeaderedEvent{PDU: newEvent} + } + jr.Timeline.PrevBatch = prevBatch jr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(events), synctypes.FormatSync, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) { return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID)