Add support for receiving room events over federation. (#130)

* Add API for querying events by ID.

* Fix tense

* Start implementing federation ingress

* More stuff

* Hook up federation event receiving

* Handle the case where we are missing state

* Fix docstring and comments

* Fix infinite loop when printing unknownRoomError
This commit is contained in:
Mark Haines 2017-06-07 14:32:53 +01:00 committed by GitHub
parent 2d202cec07
commit 515cce1a45
9 changed files with 441 additions and 12 deletions

View File

@ -18,11 +18,14 @@ import (
"encoding/base64"
"net/http"
"os"
"strings"
"time"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/federationapi/config"
"github.com/matrix-org/dendrite/federationapi/routing"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/Sirupsen/logrus"
@ -40,7 +43,10 @@ var (
// openssl x509 -noout -fingerprint -sha256 -inform pem -in server.crt |\
// python -c 'print raw_input()[19:].replace(":","").decode("hex").encode("base64").rstrip("=\n")'
//
tlsFingerprint = os.Getenv("TLS_FINGERPRINT")
tlsFingerprint = os.Getenv("TLS_FINGERPRINT")
kafkaURIs = strings.Split(os.Getenv("KAFKA_URIS"), ",")
roomserverURL = os.Getenv("ROOMSERVER_URL")
roomserverInputTopic = os.Getenv("TOPIC_INPUT_ROOM_EVENT")
)
func main() {
@ -57,6 +63,18 @@ func main() {
log.Panic("No TLS_FINGERPRINT environment variable found.")
}
if len(kafkaURIs) == 0 {
// the kafka default is :9092
kafkaURIs = []string{"localhost:9092"}
}
if roomserverURL == "" {
log.Panic("No ROOMSERVER_URL environment variable found.")
}
if roomserverInputTopic == "" {
log.Panic("No TOPIC_INPUT_ROOM_EVENT environment variable found. This should match the roomserver input topic.")
}
cfg := config.FederationAPI{
ServerName: serverName,
// TODO: make the validity period configurable.
@ -75,6 +93,37 @@ func main() {
}
cfg.TLSFingerPrints = []gomatrixserverlib.TLSFingerprint{{fingerprintSHA256}}
routing.Setup(http.DefaultServeMux, cfg)
federation := gomatrixserverlib.NewFederationClient(cfg.ServerName, cfg.KeyID, cfg.PrivateKey)
keyRing := gomatrixserverlib.KeyRing{
KeyFetchers: []gomatrixserverlib.KeyFetcher{
// TODO: Use perspective key fetchers for production.
&gomatrixserverlib.DirectKeyFetcher{federation.Client},
},
KeyDatabase: &dummyKeyDatabase{},
}
queryAPI := api.NewRoomserverQueryAPIHTTP(roomserverURL, nil)
roomserverProducer, err := producers.NewRoomserverProducer(kafkaURIs, roomserverInputTopic)
if err != nil {
log.Panicf("Failed to setup kafka producers(%s): %s", kafkaURIs, err)
}
routing.Setup(http.DefaultServeMux, cfg, queryAPI, roomserverProducer, keyRing, federation)
log.Fatal(http.ListenAndServe(bindAddr, nil))
}
// TODO: Implement a proper key database.
type dummyKeyDatabase struct{}
func (d *dummyKeyDatabase) FetchKeys(
requests map[gomatrixserverlib.PublicKeyRequest]gomatrixserverlib.Timestamp,
) (map[gomatrixserverlib.PublicKeyRequest]gomatrixserverlib.ServerKeys, error) {
return nil, nil
}
func (d *dummyKeyDatabase) StoreKeys(
map[gomatrixserverlib.PublicKeyRequest]gomatrixserverlib.ServerKeys,
) error {
return nil
}

View File

@ -0,0 +1,124 @@
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"flag"
"fmt"
log "github.com/Sirupsen/logrus"
"net/http"
"net/http/httputil"
"net/url"
"os"
"strings"
"time"
)
const usage = `Usage: %s
Create a single endpoint URL which remote matrix servers can be pointed at.
The server-server API in Dendrite is split across multiple processes
which listen on multiple ports. You cannot point a Matrix server at
any of those ports, as there will be unimplemented functionality.
In addition, all server-server API processes start with the additional
path prefix '/api', which Matrix servers will be unaware of.
This tool will proxy requests for all server-server URLs and forward
them to their respective process. It will also add the '/api' path
prefix to incoming requests.
THIS TOOL IS FOR TESTING AND NOT INTENDED FOR PRODUCTION USE.
Arguments:
`
var (
federationAPIURL = flag.String("federation-api-url", "", "The base URL of the listening 'dendrite-federation-api-server' process. E.g. 'http://localhost:4200'")
bindAddress = flag.String("bind-address", ":8448", "The listening port for the proxy.")
certFile = flag.String("tls-cert", "server.crt", "The PEM formatted X509 certificate to use for TLS")
keyFile = flag.String("tls-key", "server.key", "The PEM private key to use for TLS")
)
func makeProxy(targetURL string) (*httputil.ReverseProxy, error) {
if !strings.HasSuffix(targetURL, "/") {
targetURL += "/"
}
// Check that we can parse the URL.
_, err := url.Parse(targetURL)
if err != nil {
return nil, err
}
return &httputil.ReverseProxy{
Director: func(req *http.Request) {
// URL.Path() removes the % escaping from the path.
// The % encoding will be added back when the url is encoded
// when the request is forwarded.
// This means that we will lose any unessecary escaping from the URL.
// Pratically this means that any distinction between '%2F' and '/'
// in the URL will be lost by the time it reaches the target.
path := req.URL.Path
path = "api" + path
log.WithFields(log.Fields{
"path": path,
"url": targetURL,
"method": req.Method,
}).Print("proxying request")
newURL, err := url.Parse(targetURL + path)
if err != nil {
// We already checked that we can parse the URL
// So this shouldn't ever get hit.
panic(err)
}
// Copy the query parameters from the request.
newURL.RawQuery = req.URL.RawQuery
req.URL = newURL
},
}, nil
}
func main() {
flag.Usage = func() {
fmt.Fprintf(os.Stderr, usage, os.Args[0])
flag.PrintDefaults()
}
flag.Parse()
if *federationAPIURL == "" {
flag.Usage()
fmt.Fprintln(os.Stderr, "no --federation-api-url specified.")
os.Exit(1)
}
federationProxy, err := makeProxy(*federationAPIURL)
if err != nil {
panic(err)
}
http.Handle("/", federationProxy)
srv := &http.Server{
Addr: *bindAddress,
ReadTimeout: 1 * time.Minute, // how long we wait for the client to send the entire request (after connection accept)
WriteTimeout: 5 * time.Minute, // how long the proxy has to write the full response
}
fmt.Println("Proxying requests to:")
fmt.Println(" /* => ", *federationAPIURL+"/api/*")
fmt.Println("Listening on ", *bindAddress)
panic(srv.ListenAndServeTLS(*certFile, *keyFile))
}

View File

@ -16,21 +16,35 @@ package routing
import (
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/federationapi/config"
"github.com/matrix-org/dendrite/federationapi/readers"
"github.com/matrix-org/dendrite/federationapi/writers"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/prometheus/client_golang/prometheus"
"net/http"
"time"
)
const (
pathPrefixV2Keys = "/_matrix/key/v2"
pathPrefixV2Keys = "/_matrix/key/v2"
pathPrefixV1Federation = "/_matrix/federation/v1"
)
// Setup registers HTTP handlers with the given ServeMux.
func Setup(servMux *http.ServeMux, cfg config.FederationAPI) {
func Setup(
servMux *http.ServeMux,
cfg config.FederationAPI,
query api.RoomserverQueryAPI,
producer *producers.RoomserverProducer,
keys gomatrixserverlib.KeyRing,
federation *gomatrixserverlib.FederationClient,
) {
apiMux := mux.NewRouter()
v2keysmux := apiMux.PathPrefix(pathPrefixV2Keys).Subrouter()
v1fedmux := apiMux.PathPrefix(pathPrefixV1Federation).Subrouter()
localKeys := makeAPI("localkeys", func(req *http.Request) util.JSONResponse {
return readers.LocalKeys(req, cfg)
@ -43,6 +57,17 @@ func Setup(servMux *http.ServeMux, cfg config.FederationAPI) {
v2keysmux.Handle("/server/{keyID}", localKeys)
v2keysmux.Handle("/server/", localKeys)
v1fedmux.Handle("/send/{txnID}/", makeAPI("send",
func(req *http.Request) util.JSONResponse {
vars := mux.Vars(req)
return writers.Send(
req, gomatrixserverlib.TransactionID(vars["txnID"]),
time.Now(),
cfg, query, producer, keys, federation,
)
},
))
servMux.Handle("/metrics", prometheus.Handler())
servMux.Handle("/api/", http.StripPrefix("/api", apiMux))
}

View File

@ -0,0 +1,211 @@
package writers
import (
"encoding/json"
"fmt"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/federationapi/config"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"net/http"
"time"
)
// Send implements /_matrix/federation/v1/send/{txnID}
func Send(
req *http.Request,
txnID gomatrixserverlib.TransactionID,
now time.Time,
cfg config.FederationAPI,
query api.RoomserverQueryAPI,
producer *producers.RoomserverProducer,
keys gomatrixserverlib.KeyRing,
federation *gomatrixserverlib.FederationClient,
) util.JSONResponse {
request, errResp := gomatrixserverlib.VerifyHTTPRequest(req, now, cfg.ServerName, keys)
if request == nil {
return errResp
}
t := txnReq{
query: query,
producer: producer,
keys: keys,
federation: federation,
}
if err := json.Unmarshal(request.Content(), &t); err != nil {
return util.JSONResponse{
Code: 400,
JSON: jsonerror.BadJSON("The request body could not be decoded into valid JSON. " + err.Error()),
}
}
t.Origin = request.Origin()
t.TransactionID = txnID
t.Destination = cfg.ServerName
resp, err := t.processTransaction()
if err != nil {
return httputil.LogThenError(req, err)
}
return util.JSONResponse{
Code: 200,
JSON: resp,
}
}
type txnReq struct {
gomatrixserverlib.Transaction
query api.RoomserverQueryAPI
producer *producers.RoomserverProducer
keys gomatrixserverlib.KeyRing
federation *gomatrixserverlib.FederationClient
}
func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) {
// Check the event signatures
if err := gomatrixserverlib.VerifyEventSignatures(t.PDUs, t.keys); err != nil {
return nil, err
}
// Process the events.
results := map[string]gomatrixserverlib.PDUResult{}
for _, e := range t.PDUs {
err := t.processEvent(e)
if err != nil {
// If the error is due to the event itself being bad then we skip
// it and move onto the next event. We report an error so that the
// sender knows that we have skipped processing it.
//
// However if the event is due to a temporary failure in our server
// such as a database being unavailable then we should bail, and
// hope that the sender will retry when we are feeling better.
//
// It is uncertain what we should do if an event fails because
// we failed to fetch more information from the sending server.
// For example if a request to /state fails.
// If we skip the event then we risk missing the event until we
// receive another event referencing it.
// If we bail and stop processing then we risk wedging incoming
// transactions from that server forever.
switch err.(type) {
case unknownRoomError:
case *gomatrixserverlib.NotAllowed:
default:
// Any other error should be the result of a temporary error in
// our server so we should bail processing the transaction entirely.
return nil, err
}
results[e.EventID()] = gomatrixserverlib.PDUResult{err.Error()}
} else {
results[e.EventID()] = gomatrixserverlib.PDUResult{}
}
}
// TODO: Process the EDUs.
return &gomatrixserverlib.RespSend{PDUs: results}, nil
}
type unknownRoomError struct {
roomID string
}
func (e unknownRoomError) Error() string { return fmt.Sprintf("unknown room %q", e.roomID) }
func (t *txnReq) processEvent(e gomatrixserverlib.Event) error {
refs := e.PrevEvents()
prevEventIDs := make([]string, len(refs))
for i := range refs {
prevEventIDs[i] = refs[i].EventID
}
// Fetch the state needed to authenticate the event.
needed := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{e})
stateReq := api.QueryStateAfterEventsRequest{
RoomID: e.RoomID(),
PrevEventIDs: prevEventIDs,
StateToFetch: needed.Tuples(),
}
var stateResp api.QueryStateAfterEventsResponse
if err := t.query.QueryStateAfterEvents(&stateReq, &stateResp); err != nil {
return err
}
if !stateResp.RoomExists {
// TODO: When synapse receives a message for a room it is not in it
// asks the remote server for the state of the room so that it can
// check if the remote server knows of a join "m.room.member" event
// that this server is unaware of.
// However generally speaking we should reject events for rooms we
// aren't a member of.
return unknownRoomError{e.RoomID()}
}
if !stateResp.PrevEventsExist {
return t.processEventWithMissingState(e)
}
// Check that the event is allowed by the state at the event.
if err := checkAllowedByState(e, stateResp.StateEvents); err != nil {
return err
}
// TODO: Check that the roomserver has a copy of all of the auth_events.
// TODO: Check that the event is allowed by its auth_events.
// pass the event to the roomserver
if err := t.producer.SendEvents([]gomatrixserverlib.Event{e}); err != nil {
return err
}
return nil
}
func checkAllowedByState(e gomatrixserverlib.Event, stateEvents []gomatrixserverlib.Event) error {
authUsingState := gomatrixserverlib.NewAuthEvents(nil)
for i := range stateEvents {
authUsingState.AddEvent(&stateEvents[i])
}
return gomatrixserverlib.Allowed(e, &authUsingState)
}
func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event) error {
// We are missing the previous events for this events.
// This means that there is a gap in our view of the history of the
// room. There two ways that we can handle such a gap:
// 1) We can fill in the gap using /get_missing_events
// 2) We can leave the gap and request the state of the room at
// this event from the remote server using either /state_ids
// or /state.
// Synapse will attempt to do 1 and if that fails or if the gap is
// too large then it will attempt 2.
// Synapse will use /state_ids if possible since ususally the state
// is largely unchanged and it is more efficient to fetch a list of
// event ids and then use /event to fetch the individual events.
// However not all version of synapse support /state_ids so you may
// need to fallback to /state.
// TODO: Attempt to fill in the gap using /get_missing_events
// TODO: Attempt to fetch the state using /state_ids and /events
state, err := t.federation.LookupState(t.Origin, e.RoomID(), e.EventID())
if err != nil {
return err
}
// Check that the returned state is valid.
if err := state.Check(t.keys); err != nil {
return err
}
// Check that the event is allowed by the state.
if err := checkAllowedByState(e, state.StateEvents); err != nil {
return err
}
// pass the event along with the state to the roomserver
if err := t.producer.SendEventWithState(state, e); err != nil {
return err
}
return nil
}

View File

@ -28,7 +28,7 @@ type RoomEventDatabase interface {
StoreEvent(event gomatrixserverlib.Event, authEventNIDs []types.EventNID) (types.RoomNID, types.StateAtEvent, error)
// Lookup the state entries for a list of string event IDs
// Returns an error if the there is an error talking to the database
// or if the event IDs aren't in the database.
// Returns a types.MissingEventError if the event IDs aren't in the database.
StateEntriesForEventIDs(eventIDs []string) ([]types.StateEntry, error)
// Set the state at an event.
SetState(eventNID types.EventNID, stateNID types.StateSnapshotNID) error

View File

@ -97,9 +97,12 @@ func (r *RoomserverQueryAPI) QueryStateAfterEvents(
prevStates, err := r.DB.StateAtEventIDs(request.PrevEventIDs)
if err != nil {
// TODO: Check if the error was because we are missing events from the
// database or are missing state at events from the database.
return err
switch err.(type) {
case types.MissingEventError:
return nil
default:
return err
}
}
response.PrevEventsExist = true

View File

@ -32,7 +32,7 @@ type RoomStateDatabase interface {
AddState(roomNID types.RoomNID, stateBlockNIDs []types.StateBlockNID, state []types.StateEntry) (types.StateSnapshotNID, error)
// Lookup the state of a room at each event for a list of string event IDs.
// Returns an error if there is an error talking to the database
// or if the room state for the event IDs aren't in the database
// Returns a types.MissingEventError if the room state for the event IDs aren't in the database
StateAtEventIDs(eventIDs []string) ([]types.StateAtEvent, error)
// Lookup the numeric IDs for a list of string event types.
// Returns a map from string event type to numeric ID for the event type.

View File

@ -166,6 +166,8 @@ func (s *eventStatements) selectEvent(eventID string) (types.EventNID, types.Sta
return types.EventNID(eventNID), types.StateSnapshotNID(stateNID), err
}
// bulkSelectStateEventByID lookups a list of state events by event ID.
// If any of the requested events are missing from the database it returns a types.MissingEventError
func (s *eventStatements) bulkSelectStateEventByID(eventIDs []string) ([]types.StateEntry, error) {
rows, err := s.bulkSelectStateEventByIDStmt.Query(pq.StringArray(eventIDs))
if err != nil {
@ -194,11 +196,16 @@ func (s *eventStatements) bulkSelectStateEventByID(eventIDs []string) ([]types.S
// However it should be possible debug this by replaying queries or entries from the input kafka logs.
// If this turns out to be impossible and we do need the debug information here, it would be better
// to do it as a separate query rather than slowing down/complicating the common case.
return nil, fmt.Errorf("storage: state event IDs missing from the database (%d != %d)", i, len(eventIDs))
return nil, types.MissingEventError(
fmt.Sprintf("storage: state event IDs missing from the database (%d != %d)", i, len(eventIDs)),
)
}
return results, err
}
// bulkSelectStateAtEventByID lookups the state at a list of events by event ID.
// If any of the requested events are missing from the database it returns a types.MissingEventError.
// If we do not have the state for any of the requested events it returns a types.MissingEventError.
func (s *eventStatements) bulkSelectStateAtEventByID(eventIDs []string) ([]types.StateAtEvent, error) {
rows, err := s.bulkSelectStateAtEventByIDStmt.Query(pq.StringArray(eventIDs))
if err != nil {
@ -218,11 +225,15 @@ func (s *eventStatements) bulkSelectStateAtEventByID(eventIDs []string) ([]types
return nil, err
}
if result.BeforeStateSnapshotNID == 0 {
return nil, fmt.Errorf("storage: missing state for event NID %d", result.EventNID)
return nil, types.MissingEventError(
fmt.Sprintf("storage: missing state for event NID %d", result.EventNID),
)
}
}
if i != len(eventIDs) {
return nil, fmt.Errorf("storage: event IDs missing from the database (%d != %d)", i, len(eventIDs))
return nil, types.MissingEventError(
fmt.Sprintf("storage: event IDs missing from the database (%d != %d)", i, len(eventIDs)),
)
}
return results, err
}

View File

@ -168,3 +168,9 @@ type RoomRecentEventsUpdater interface {
// Rollback the transaction.
Rollback() error
}
// A MissingEventError is an error that happened because the roomserver was
// missing requested events from its database.
type MissingEventError string
func (e MissingEventError) Error() string { return string(e) }