Add basic runtime tracing (#2996)

This allows us in almost all places to use regions to further trace down
long running tasks.
Also removes an unused function.
This commit is contained in:
Till 2023-03-13 16:45:14 +01:00 committed by GitHub
parent 689b5ee72f
commit 232aef016c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 190 additions and 155 deletions

View File

@ -25,10 +25,10 @@ import (
"strings" "strings"
"sync" "sync"
"github.com/opentracing/opentracing-go"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/appservice/api" "github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/config"
) )
@ -50,8 +50,8 @@ func (a *AppServiceQueryAPI) RoomAliasExists(
request *api.RoomAliasExistsRequest, request *api.RoomAliasExistsRequest,
response *api.RoomAliasExistsResponse, response *api.RoomAliasExistsResponse,
) error { ) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "ApplicationServiceRoomAlias") trace, ctx := internal.StartRegion(ctx, "ApplicationServiceRoomAlias")
defer span.Finish() defer trace.EndRegion()
// Determine which application service should handle this request // Determine which application service should handle this request
for _, appservice := range a.Cfg.Derived.ApplicationServices { for _, appservice := range a.Cfg.Derived.ApplicationServices {
@ -117,8 +117,8 @@ func (a *AppServiceQueryAPI) UserIDExists(
request *api.UserIDExistsRequest, request *api.UserIDExistsRequest,
response *api.UserIDExistsResponse, response *api.UserIDExistsResponse,
) error { ) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "ApplicationServiceUserID") trace, ctx := internal.StartRegion(ctx, "ApplicationServiceUserID")
defer span.Finish() defer trace.EndRegion()
// Determine which application service should handle this request // Determine which application service should handle this request
for _, appservice := range a.Cfg.Derived.ApplicationServices { for _, appservice := range a.Cfg.Derived.ApplicationServices {

View File

@ -25,8 +25,6 @@ import (
"github.com/getsentry/sentry-go" "github.com/getsentry/sentry-go"
"github.com/matrix-org/util" "github.com/matrix-org/util"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
@ -34,6 +32,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth" "github.com/matrix-org/dendrite/clientapi/auth"
"github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/internal"
userapi "github.com/matrix-org/dendrite/userapi/api" userapi "github.com/matrix-org/dendrite/userapi/api"
) )
@ -186,9 +185,9 @@ func MakeExternalAPI(metricsName string, f func(*http.Request) util.JSONResponse
} }
} }
span := opentracing.StartSpan(metricsName) trace, ctx := internal.StartTask(req.Context(), metricsName)
defer span.Finish() defer trace.EndTask()
req = req.WithContext(opentracing.ContextWithSpan(req.Context(), span)) req = req.WithContext(ctx)
h.ServeHTTP(nextWriter, req) h.ServeHTTP(nextWriter, req)
} }
@ -200,9 +199,9 @@ func MakeExternalAPI(metricsName string, f func(*http.Request) util.JSONResponse
// This is used to serve HTML alongside JSON error messages // This is used to serve HTML alongside JSON error messages
func MakeHTMLAPI(metricsName string, enableMetrics bool, f func(http.ResponseWriter, *http.Request)) http.Handler { func MakeHTMLAPI(metricsName string, enableMetrics bool, f func(http.ResponseWriter, *http.Request)) http.Handler {
withSpan := func(w http.ResponseWriter, req *http.Request) { withSpan := func(w http.ResponseWriter, req *http.Request) {
span := opentracing.StartSpan(metricsName) trace, ctx := internal.StartTask(req.Context(), metricsName)
defer span.Finish() defer trace.EndTask()
req = req.WithContext(opentracing.ContextWithSpan(req.Context(), span)) req = req.WithContext(ctx)
f(w, req) f(w, req)
} }
@ -223,57 +222,6 @@ func MakeHTMLAPI(metricsName string, enableMetrics bool, f func(http.ResponseWri
) )
} }
// MakeInternalAPI turns a util.JSONRequestHandler function into an http.Handler.
// This is used for APIs that are internal to dendrite.
// If we are passed a tracing context in the request headers then we use that
// as the parent of any tracing spans we create.
func MakeInternalAPI(metricsName string, enableMetrics bool, f func(*http.Request) util.JSONResponse) http.Handler {
h := util.MakeJSONAPI(util.NewJSONRequestHandler(f))
withSpan := func(w http.ResponseWriter, req *http.Request) {
carrier := opentracing.HTTPHeadersCarrier(req.Header)
tracer := opentracing.GlobalTracer()
clientContext, err := tracer.Extract(opentracing.HTTPHeaders, carrier)
var span opentracing.Span
if err == nil {
// Default to a span without RPC context.
span = tracer.StartSpan(metricsName)
} else {
// Set the RPC context.
span = tracer.StartSpan(metricsName, ext.RPCServerOption(clientContext))
}
defer span.Finish()
req = req.WithContext(opentracing.ContextWithSpan(req.Context(), span))
h.ServeHTTP(w, req)
}
if !enableMetrics {
return http.HandlerFunc(withSpan)
}
return promhttp.InstrumentHandlerCounter(
promauto.NewCounterVec(
prometheus.CounterOpts{
Name: metricsName + "_requests_total",
Help: "Total number of internal API calls",
Namespace: "dendrite",
},
[]string{"code"},
),
promhttp.InstrumentHandlerResponseSize(
promauto.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dendrite",
Name: metricsName + "_response_size_bytes",
Help: "A histogram of response sizes for requests.",
Buckets: []float64{200, 500, 900, 1500, 5000, 15000, 50000, 100000},
},
[]string{},
),
http.HandlerFunc(withSpan),
),
)
}
// WrapHandlerInBasicAuth adds basic auth to a handler. Only used for /metrics // WrapHandlerInBasicAuth adds basic auth to a handler. Only used for /metrics
func WrapHandlerInBasicAuth(h http.Handler, b BasicAuth) http.HandlerFunc { func WrapHandlerInBasicAuth(h http.Handler, b BasicAuth) http.HandlerFunc {
if b.Username == "" || b.Password == "" { if b.Username == "" || b.Password == "" {

View File

@ -10,8 +10,6 @@ import (
"time" "time"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/opentracing/opentracing-go"
) )
type httpClient struct { type httpClient struct {
@ -34,8 +32,8 @@ func NewHTTPClient(disableTLSValidation bool) Client {
} }
func (h *httpClient) Notify(ctx context.Context, url string, req *NotifyRequest, resp *NotifyResponse) error { func (h *httpClient) Notify(ctx context.Context, url string, req *NotifyRequest, resp *NotifyResponse) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "Notify") trace, ctx := internal.StartRegion(ctx, "Notify")
defer span.Finish() defer trace.EndRegion()
body, err := json.Marshal(req) body, err := json.Marshal(req)
if err != nil { if err != nil {

64
internal/tracing.go Normal file
View File

@ -0,0 +1,64 @@
// Copyright 2023 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package internal
import (
"context"
"runtime/trace"
"github.com/opentracing/opentracing-go"
)
type Trace struct {
span opentracing.Span
region *trace.Region
task *trace.Task
}
func StartTask(inCtx context.Context, name string) (Trace, context.Context) {
ctx, task := trace.NewTask(inCtx, name)
span, ctx := opentracing.StartSpanFromContext(ctx, name)
return Trace{
span: span,
task: task,
}, ctx
}
func StartRegion(inCtx context.Context, name string) (Trace, context.Context) {
region := trace.StartRegion(inCtx, name)
span, ctx := opentracing.StartSpanFromContext(inCtx, name)
return Trace{
span: span,
region: region,
}, ctx
}
func (t Trace) EndRegion() {
t.span.Finish()
if t.region != nil {
t.region.End()
}
}
func (t Trace) EndTask() {
t.span.Finish()
if t.task != nil {
t.task.End()
}
}
func (t Trace) SetTag(key string, value any) {
t.span.SetTag(key, value)
}

25
internal/tracing_test.go Normal file
View File

@ -0,0 +1,25 @@
package internal
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
)
func TestTracing(t *testing.T) {
inCtx := context.Background()
task, ctx := StartTask(inCtx, "testing")
assert.NotNil(t, ctx)
assert.NotNil(t, task)
assert.NotEqual(t, inCtx, ctx)
task.SetTag("key", "value")
region, ctx2 := StartRegion(ctx, "testing")
assert.NotNil(t, ctx)
assert.NotNil(t, region)
assert.NotEqual(t, ctx, ctx2)
defer task.EndTask()
defer region.EndRegion()
}

View File

@ -28,7 +28,6 @@ import (
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -85,10 +84,10 @@ func (r *Inputer) processRoomEvent(
default: default:
} }
span, ctx := opentracing.StartSpanFromContext(ctx, "processRoomEvent") trace, ctx := internal.StartRegion(ctx, "processRoomEvent")
span.SetTag("room_id", input.Event.RoomID()) trace.SetTag("room_id", input.Event.RoomID())
span.SetTag("event_id", input.Event.EventID()) trace.SetTag("event_id", input.Event.EventID())
defer span.Finish() defer trace.EndRegion()
// Measure how long it takes to process this event. // Measure how long it takes to process this event.
started := time.Now() started := time.Now()
@ -608,8 +607,8 @@ func (r *Inputer) fetchAuthEvents(
known map[string]*types.Event, known map[string]*types.Event,
servers []gomatrixserverlib.ServerName, servers []gomatrixserverlib.ServerName,
) error { ) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "fetchAuthEvents") trace, ctx := internal.StartRegion(ctx, "fetchAuthEvents")
defer span.Finish() defer trace.EndRegion()
unknown := map[string]struct{}{} unknown := map[string]struct{}{}
authEventIDs := event.AuthEventIDs() authEventIDs := event.AuthEventIDs()
@ -753,8 +752,8 @@ func (r *Inputer) calculateAndSetState(
event *gomatrixserverlib.Event, event *gomatrixserverlib.Event,
isRejected bool, isRejected bool,
) error { ) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "calculateAndSetState") trace, ctx := internal.StartRegion(ctx, "calculateAndSetState")
defer span.Finish() defer trace.EndRegion()
var succeeded bool var succeeded bool
updater, err := r.DB.GetRoomUpdater(ctx, roomInfo) updater, err := r.DB.GetRoomUpdater(ctx, roomInfo)

View File

@ -23,9 +23,9 @@ import (
"github.com/getsentry/sentry-go" "github.com/getsentry/sentry-go"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
"github.com/opentracing/opentracing-go"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/state" "github.com/matrix-org/dendrite/roomserver/state"
@ -59,8 +59,8 @@ func (r *Inputer) updateLatestEvents(
rewritesState bool, rewritesState bool,
historyVisibility gomatrixserverlib.HistoryVisibility, historyVisibility gomatrixserverlib.HistoryVisibility,
) (err error) { ) (err error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "updateLatestEvents") trace, ctx := internal.StartRegion(ctx, "updateLatestEvents")
defer span.Finish() defer trace.EndRegion()
var succeeded bool var succeeded bool
updater, err := r.DB.GetRoomUpdater(ctx, roomInfo) updater, err := r.DB.GetRoomUpdater(ctx, roomInfo)
@ -209,8 +209,8 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
} }
func (u *latestEventsUpdater) latestState() error { func (u *latestEventsUpdater) latestState() error {
span, ctx := opentracing.StartSpanFromContext(u.ctx, "processEventWithMissingState") trace, ctx := internal.StartRegion(u.ctx, "processEventWithMissingState")
defer span.Finish() defer trace.EndRegion()
var err error var err error
roomState := state.NewStateResolution(u.updater, u.roomInfo) roomState := state.NewStateResolution(u.updater, u.roomInfo)
@ -329,8 +329,8 @@ func (u *latestEventsUpdater) calculateLatest(
newEvent *gomatrixserverlib.Event, newEvent *gomatrixserverlib.Event,
newStateAndRef types.StateAtEventAndReference, newStateAndRef types.StateAtEventAndReference,
) (bool, error) { ) (bool, error) {
span, _ := opentracing.StartSpanFromContext(u.ctx, "calculateLatest") trace, _ := internal.StartRegion(u.ctx, "calculateLatest")
defer span.Finish() defer trace.EndRegion()
// First of all, get a list of all of the events in our current // First of all, get a list of all of the events in our current
// set of forward extremities. // set of forward extremities.

View File

@ -18,13 +18,14 @@ import (
"context" "context"
"fmt" "fmt"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/internal/helpers" "github.com/matrix-org/dendrite/roomserver/internal/helpers"
"github.com/matrix-org/dendrite/roomserver/storage/shared" "github.com/matrix-org/dendrite/roomserver/storage/shared"
"github.com/matrix-org/dendrite/roomserver/storage/tables" "github.com/matrix-org/dendrite/roomserver/storage/tables"
"github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/opentracing/opentracing-go"
) )
// updateMembership updates the current membership and the invites for each // updateMembership updates the current membership and the invites for each
@ -36,8 +37,8 @@ func (r *Inputer) updateMemberships(
updater *shared.RoomUpdater, updater *shared.RoomUpdater,
removed, added []types.StateEntry, removed, added []types.StateEntry,
) ([]api.OutputEvent, error) { ) ([]api.OutputEvent, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "updateMemberships") trace, ctx := internal.StartRegion(ctx, "updateMemberships")
defer span.Finish() defer trace.EndRegion()
changes := membershipChanges(removed, added) changes := membershipChanges(removed, added)
var eventNIDs []types.EventNID var eventNIDs []types.EventNID

View File

@ -7,16 +7,16 @@ import (
"sync" "sync"
"time" "time"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
fedapi "github.com/matrix-org/dendrite/federationapi/api" fedapi "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/state" "github.com/matrix-org/dendrite/roomserver/state"
"github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/opentracing/opentracing-go"
"github.com/sirupsen/logrus"
) )
type parsedRespState struct { type parsedRespState struct {
@ -62,8 +62,8 @@ type missingStateReq struct {
func (t *missingStateReq) processEventWithMissingState( func (t *missingStateReq) processEventWithMissingState(
ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion, ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion,
) (*parsedRespState, error) { ) (*parsedRespState, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "processEventWithMissingState") trace, ctx := internal.StartRegion(ctx, "processEventWithMissingState")
defer span.Finish() defer trace.EndRegion()
// We are missing the previous events for this events. // We are missing the previous events for this events.
// This means that there is a gap in our view of the history of the // This means that there is a gap in our view of the history of the
@ -241,8 +241,8 @@ func (t *missingStateReq) processEventWithMissingState(
} }
func (t *missingStateReq) lookupResolvedStateBeforeEvent(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (*parsedRespState, error) { func (t *missingStateReq) lookupResolvedStateBeforeEvent(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (*parsedRespState, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "lookupResolvedStateBeforeEvent") trace, ctx := internal.StartRegion(ctx, "lookupResolvedStateBeforeEvent")
defer span.Finish() defer trace.EndRegion()
type respState struct { type respState struct {
// A snapshot is considered trustworthy if it came from our own roomserver. // A snapshot is considered trustworthy if it came from our own roomserver.
@ -319,8 +319,8 @@ func (t *missingStateReq) lookupResolvedStateBeforeEvent(ctx context.Context, e
// lookupStateAfterEvent returns the room state after `eventID`, which is the state before eventID with the state of `eventID` (if it's a state event) // lookupStateAfterEvent returns the room state after `eventID`, which is the state before eventID with the state of `eventID` (if it's a state event)
// added into the mix. // added into the mix.
func (t *missingStateReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (*parsedRespState, bool, error) { func (t *missingStateReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (*parsedRespState, bool, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "lookupStateAfterEvent") trace, ctx := internal.StartRegion(ctx, "lookupStateAfterEvent")
defer span.Finish() defer trace.EndRegion()
// try doing all this locally before we resort to querying federation // try doing all this locally before we resort to querying federation
respState := t.lookupStateAfterEventLocally(ctx, eventID) respState := t.lookupStateAfterEventLocally(ctx, eventID)
@ -376,8 +376,8 @@ func (t *missingStateReq) cacheAndReturn(ev *gomatrixserverlib.Event) *gomatrixs
} }
func (t *missingStateReq) lookupStateAfterEventLocally(ctx context.Context, eventID string) *parsedRespState { func (t *missingStateReq) lookupStateAfterEventLocally(ctx context.Context, eventID string) *parsedRespState {
span, ctx := opentracing.StartSpanFromContext(ctx, "lookupStateAfterEventLocally") trace, ctx := internal.StartRegion(ctx, "lookupStateAfterEventLocally")
defer span.Finish() defer trace.EndRegion()
var res parsedRespState var res parsedRespState
roomState := state.NewStateResolution(t.db, t.roomInfo) roomState := state.NewStateResolution(t.db, t.roomInfo)
@ -449,16 +449,16 @@ func (t *missingStateReq) lookupStateAfterEventLocally(ctx context.Context, even
// the server supports. // the server supports.
func (t *missingStateReq) lookupStateBeforeEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) ( func (t *missingStateReq) lookupStateBeforeEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (
*parsedRespState, error) { *parsedRespState, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "lookupStateBeforeEvent") trace, ctx := internal.StartRegion(ctx, "lookupStateBeforeEvent")
defer span.Finish() defer trace.EndRegion()
// Attempt to fetch the missing state using /state_ids and /events // Attempt to fetch the missing state using /state_ids and /events
return t.lookupMissingStateViaStateIDs(ctx, roomID, eventID, roomVersion) return t.lookupMissingStateViaStateIDs(ctx, roomID, eventID, roomVersion)
} }
func (t *missingStateReq) resolveStatesAndCheck(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, states []*parsedRespState, backwardsExtremity *gomatrixserverlib.Event) (*parsedRespState, error) { func (t *missingStateReq) resolveStatesAndCheck(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, states []*parsedRespState, backwardsExtremity *gomatrixserverlib.Event) (*parsedRespState, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "resolveStatesAndCheck") trace, ctx := internal.StartRegion(ctx, "resolveStatesAndCheck")
defer span.Finish() defer trace.EndRegion()
var authEventList []*gomatrixserverlib.Event var authEventList []*gomatrixserverlib.Event
var stateEventList []*gomatrixserverlib.Event var stateEventList []*gomatrixserverlib.Event
@ -503,8 +503,8 @@ retryAllowedState:
// get missing events for `e`. If `isGapFilled`=true then `newEvents` contains all the events to inject, // get missing events for `e`. If `isGapFilled`=true then `newEvents` contains all the events to inject,
// without `e`. If `isGapFilled=false` then `newEvents` contains the response to /get_missing_events // without `e`. If `isGapFilled=false` then `newEvents` contains the response to /get_missing_events
func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (newEvents []*gomatrixserverlib.Event, isGapFilled, prevStateKnown bool, err error) { func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (newEvents []*gomatrixserverlib.Event, isGapFilled, prevStateKnown bool, err error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "getMissingEvents") trace, ctx := internal.StartRegion(ctx, "getMissingEvents")
defer span.Finish() defer trace.EndRegion()
logger := t.log.WithField("event_id", e.EventID()).WithField("room_id", e.RoomID()) logger := t.log.WithField("event_id", e.EventID()).WithField("room_id", e.RoomID())
latest, _, _, err := t.db.LatestEventIDs(ctx, t.roomInfo.RoomNID) latest, _, _, err := t.db.LatestEventIDs(ctx, t.roomInfo.RoomNID)
@ -633,8 +633,8 @@ func (t *missingStateReq) isPrevStateKnown(ctx context.Context, e *gomatrixserve
func (t *missingStateReq) lookupMissingStateViaState( func (t *missingStateReq) lookupMissingStateViaState(
ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion, ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion,
) (respState *parsedRespState, err error) { ) (respState *parsedRespState, err error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "lookupMissingStateViaState") trace, ctx := internal.StartRegion(ctx, "lookupMissingStateViaState")
defer span.Finish() defer trace.EndRegion()
state, err := t.federation.LookupState(ctx, t.virtualHost, t.origin, roomID, eventID, roomVersion) state, err := t.federation.LookupState(ctx, t.virtualHost, t.origin, roomID, eventID, roomVersion)
if err != nil { if err != nil {
@ -665,8 +665,8 @@ func (t *missingStateReq) lookupMissingStateViaState(
func (t *missingStateReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) ( func (t *missingStateReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) (
*parsedRespState, error) { *parsedRespState, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "lookupMissingStateViaStateIDs") trace, ctx := internal.StartRegion(ctx, "lookupMissingStateViaStateIDs")
defer span.Finish() defer trace.EndRegion()
t.log.Infof("lookupMissingStateViaStateIDs %s", eventID) t.log.Infof("lookupMissingStateViaStateIDs %s", eventID)
// fetch the state event IDs at the time of the event // fetch the state event IDs at the time of the event
@ -839,8 +839,8 @@ func (t *missingStateReq) createRespStateFromStateIDs(
} }
func (t *missingStateReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, _, missingEventID string, localFirst bool) (*gomatrixserverlib.Event, error) { func (t *missingStateReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, _, missingEventID string, localFirst bool) (*gomatrixserverlib.Event, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "lookupEvent") trace, ctx := internal.StartRegion(ctx, "lookupEvent")
defer span.Finish() defer trace.EndRegion()
if localFirst { if localFirst {
// fetch from the roomserver // fetch from the roomserver

View File

@ -25,9 +25,9 @@ import (
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util" "github.com/matrix-org/util"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/types"
) )
@ -106,8 +106,8 @@ func (p *StateResolution) Resolve(ctx context.Context, eventID string) (*gomatri
func (v *StateResolution) LoadStateAtSnapshot( func (v *StateResolution) LoadStateAtSnapshot(
ctx context.Context, stateNID types.StateSnapshotNID, ctx context.Context, stateNID types.StateSnapshotNID,
) ([]types.StateEntry, error) { ) ([]types.StateEntry, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.LoadStateAtSnapshot") trace, ctx := internal.StartRegion(ctx, "StateResolution.LoadStateAtSnapshot")
defer span.Finish() defer trace.EndRegion()
stateBlockNIDLists, err := v.db.StateBlockNIDs(ctx, []types.StateSnapshotNID{stateNID}) stateBlockNIDLists, err := v.db.StateBlockNIDs(ctx, []types.StateSnapshotNID{stateNID})
if err != nil { if err != nil {
@ -147,8 +147,8 @@ func (v *StateResolution) LoadStateAtSnapshot(
func (v *StateResolution) LoadStateAtEvent( func (v *StateResolution) LoadStateAtEvent(
ctx context.Context, eventID string, ctx context.Context, eventID string,
) ([]types.StateEntry, error) { ) ([]types.StateEntry, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.LoadStateAtEvent") trace, ctx := internal.StartRegion(ctx, "StateResolution.LoadStateAtEvent")
defer span.Finish() defer trace.EndRegion()
snapshotNID, err := v.db.SnapshotNIDFromEventID(ctx, eventID) snapshotNID, err := v.db.SnapshotNIDFromEventID(ctx, eventID)
if err != nil { if err != nil {
@ -169,8 +169,8 @@ func (v *StateResolution) LoadStateAtEvent(
func (v *StateResolution) LoadMembershipAtEvent( func (v *StateResolution) LoadMembershipAtEvent(
ctx context.Context, eventIDs []string, stateKeyNID types.EventStateKeyNID, ctx context.Context, eventIDs []string, stateKeyNID types.EventStateKeyNID,
) (map[string][]types.StateEntry, error) { ) (map[string][]types.StateEntry, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.LoadMembershipAtEvent") trace, ctx := internal.StartRegion(ctx, "StateResolution.LoadMembershipAtEvent")
defer span.Finish() defer trace.EndRegion()
// Get a mapping from snapshotNID -> eventIDs // Get a mapping from snapshotNID -> eventIDs
snapshotNIDMap, err := v.db.BulkSelectSnapshotsFromEventIDs(ctx, eventIDs) snapshotNIDMap, err := v.db.BulkSelectSnapshotsFromEventIDs(ctx, eventIDs)
@ -238,8 +238,8 @@ func (v *StateResolution) LoadMembershipAtEvent(
func (v *StateResolution) LoadStateAtEventForHistoryVisibility( func (v *StateResolution) LoadStateAtEventForHistoryVisibility(
ctx context.Context, eventID string, ctx context.Context, eventID string,
) ([]types.StateEntry, error) { ) ([]types.StateEntry, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.LoadStateAtEvent") trace, ctx := internal.StartRegion(ctx, "StateResolution.LoadStateAtEventForHistoryVisibility")
defer span.Finish() defer trace.EndRegion()
snapshotNID, err := v.db.SnapshotNIDFromEventID(ctx, eventID) snapshotNID, err := v.db.SnapshotNIDFromEventID(ctx, eventID)
if err != nil { if err != nil {
@ -263,8 +263,8 @@ func (v *StateResolution) LoadStateAtEventForHistoryVisibility(
func (v *StateResolution) LoadCombinedStateAfterEvents( func (v *StateResolution) LoadCombinedStateAfterEvents(
ctx context.Context, prevStates []types.StateAtEvent, ctx context.Context, prevStates []types.StateAtEvent,
) ([]types.StateEntry, error) { ) ([]types.StateEntry, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.LoadCombinedStateAfterEvents") trace, ctx := internal.StartRegion(ctx, "StateResolution.LoadCombinedStateAfterEvents")
defer span.Finish() defer trace.EndRegion()
stateNIDs := make([]types.StateSnapshotNID, len(prevStates)) stateNIDs := make([]types.StateSnapshotNID, len(prevStates))
for i, state := range prevStates { for i, state := range prevStates {
@ -338,8 +338,8 @@ func (v *StateResolution) LoadCombinedStateAfterEvents(
func (v *StateResolution) DifferenceBetweeenStateSnapshots( func (v *StateResolution) DifferenceBetweeenStateSnapshots(
ctx context.Context, oldStateNID, newStateNID types.StateSnapshotNID, ctx context.Context, oldStateNID, newStateNID types.StateSnapshotNID,
) (removed, added []types.StateEntry, err error) { ) (removed, added []types.StateEntry, err error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.DifferenceBetweeenStateSnapshots") trace, ctx := internal.StartRegion(ctx, "StateResolution.DifferenceBetweeenStateSnapshots")
defer span.Finish() defer trace.EndRegion()
if oldStateNID == newStateNID { if oldStateNID == newStateNID {
// If the snapshot NIDs are the same then nothing has changed // If the snapshot NIDs are the same then nothing has changed
@ -402,8 +402,8 @@ func (v *StateResolution) LoadStateAtSnapshotForStringTuples(
stateNID types.StateSnapshotNID, stateNID types.StateSnapshotNID,
stateKeyTuples []gomatrixserverlib.StateKeyTuple, stateKeyTuples []gomatrixserverlib.StateKeyTuple,
) ([]types.StateEntry, error) { ) ([]types.StateEntry, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.LoadStateAtSnapshotForStringTuples") trace, ctx := internal.StartRegion(ctx, "StateResolution.LoadStateAtSnapshotForStringTuples")
defer span.Finish() defer trace.EndRegion()
numericTuples, err := v.stringTuplesToNumericTuples(ctx, stateKeyTuples) numericTuples, err := v.stringTuplesToNumericTuples(ctx, stateKeyTuples)
if err != nil { if err != nil {
@ -419,8 +419,8 @@ func (v *StateResolution) stringTuplesToNumericTuples(
ctx context.Context, ctx context.Context,
stringTuples []gomatrixserverlib.StateKeyTuple, stringTuples []gomatrixserverlib.StateKeyTuple,
) ([]types.StateKeyTuple, error) { ) ([]types.StateKeyTuple, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.stringTuplesToNumericTuples") trace, ctx := internal.StartRegion(ctx, "StateResolution.stringTuplesToNumericTuples")
defer span.Finish() defer trace.EndRegion()
eventTypes := make([]string, len(stringTuples)) eventTypes := make([]string, len(stringTuples))
stateKeys := make([]string, len(stringTuples)) stateKeys := make([]string, len(stringTuples))
@ -464,8 +464,8 @@ func (v *StateResolution) loadStateAtSnapshotForNumericTuples(
stateNID types.StateSnapshotNID, stateNID types.StateSnapshotNID,
stateKeyTuples []types.StateKeyTuple, stateKeyTuples []types.StateKeyTuple,
) ([]types.StateEntry, error) { ) ([]types.StateEntry, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.loadStateAtSnapshotForNumericTuples") trace, ctx := internal.StartRegion(ctx, "StateResolution.loadStateAtSnapshotForNumericTuples")
defer span.Finish() defer trace.EndRegion()
stateBlockNIDLists, err := v.db.StateBlockNIDs(ctx, []types.StateSnapshotNID{stateNID}) stateBlockNIDLists, err := v.db.StateBlockNIDs(ctx, []types.StateSnapshotNID{stateNID})
if err != nil { if err != nil {
@ -515,8 +515,8 @@ func (v *StateResolution) LoadStateAfterEventsForStringTuples(
prevStates []types.StateAtEvent, prevStates []types.StateAtEvent,
stateKeyTuples []gomatrixserverlib.StateKeyTuple, stateKeyTuples []gomatrixserverlib.StateKeyTuple,
) ([]types.StateEntry, error) { ) ([]types.StateEntry, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.LoadStateAfterEventsForStringTuples") trace, ctx := internal.StartRegion(ctx, "StateResolution.LoadStateAfterEventsForStringTuples")
defer span.Finish() defer trace.EndRegion()
numericTuples, err := v.stringTuplesToNumericTuples(ctx, stateKeyTuples) numericTuples, err := v.stringTuplesToNumericTuples(ctx, stateKeyTuples)
if err != nil { if err != nil {
@ -530,8 +530,8 @@ func (v *StateResolution) loadStateAfterEventsForNumericTuples(
prevStates []types.StateAtEvent, prevStates []types.StateAtEvent,
stateKeyTuples []types.StateKeyTuple, stateKeyTuples []types.StateKeyTuple,
) ([]types.StateEntry, error) { ) ([]types.StateEntry, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.loadStateAfterEventsForNumericTuples") trace, ctx := internal.StartRegion(ctx, "StateResolution.loadStateAfterEventsForNumericTuples")
defer span.Finish() defer trace.EndRegion()
if len(prevStates) == 1 { if len(prevStates) == 1 {
// Fast path for a single event. // Fast path for a single event.
@ -705,8 +705,8 @@ func (v *StateResolution) CalculateAndStoreStateBeforeEvent(
event *gomatrixserverlib.Event, event *gomatrixserverlib.Event,
isRejected bool, isRejected bool,
) (types.StateSnapshotNID, error) { ) (types.StateSnapshotNID, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.CalculateAndStoreStateBeforeEvent") trace, ctx := internal.StartRegion(ctx, "StateResolution.CalculateAndStoreStateBeforeEvent")
defer span.Finish() defer trace.EndRegion()
// Load the state at the prev events. // Load the state at the prev events.
prevStates, err := v.db.StateAtEventIDs(ctx, event.PrevEventIDs()) prevStates, err := v.db.StateAtEventIDs(ctx, event.PrevEventIDs())
@ -724,8 +724,8 @@ func (v *StateResolution) CalculateAndStoreStateAfterEvents(
ctx context.Context, ctx context.Context,
prevStates []types.StateAtEvent, prevStates []types.StateAtEvent,
) (types.StateSnapshotNID, error) { ) (types.StateSnapshotNID, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.CalculateAndStoreStateAfterEvents") trace, ctx := internal.StartRegion(ctx, "StateResolution.CalculateAndStoreStateAfterEvents")
defer span.Finish() defer trace.EndRegion()
metrics := calculateStateMetrics{startTime: time.Now(), prevEventLength: len(prevStates)} metrics := calculateStateMetrics{startTime: time.Now(), prevEventLength: len(prevStates)}
@ -799,8 +799,8 @@ func (v *StateResolution) calculateAndStoreStateAfterManyEvents(
prevStates []types.StateAtEvent, prevStates []types.StateAtEvent,
metrics calculateStateMetrics, metrics calculateStateMetrics,
) (types.StateSnapshotNID, error) { ) (types.StateSnapshotNID, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.calculateAndStoreStateAfterManyEvents") trace, ctx := internal.StartRegion(ctx, "StateResolution.calculateAndStoreStateAfterManyEvents")
defer span.Finish() defer trace.EndRegion()
state, algorithm, conflictLength, err := state, algorithm, conflictLength, err :=
v.calculateStateAfterManyEvents(ctx, v.roomInfo.RoomVersion, prevStates) v.calculateStateAfterManyEvents(ctx, v.roomInfo.RoomVersion, prevStates)
@ -820,8 +820,8 @@ func (v *StateResolution) calculateStateAfterManyEvents(
ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, ctx context.Context, roomVersion gomatrixserverlib.RoomVersion,
prevStates []types.StateAtEvent, prevStates []types.StateAtEvent,
) (state []types.StateEntry, algorithm string, conflictLength int, err error) { ) (state []types.StateEntry, algorithm string, conflictLength int, err error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.calculateStateAfterManyEvents") trace, ctx := internal.StartRegion(ctx, "StateResolution.calculateStateAfterManyEvents")
defer span.Finish() defer trace.EndRegion()
var combined []types.StateEntry var combined []types.StateEntry
// Conflict resolution. // Conflict resolution.
@ -875,8 +875,8 @@ func (v *StateResolution) resolveConflicts(
ctx context.Context, version gomatrixserverlib.RoomVersion, ctx context.Context, version gomatrixserverlib.RoomVersion,
notConflicted, conflicted []types.StateEntry, notConflicted, conflicted []types.StateEntry,
) ([]types.StateEntry, error) { ) ([]types.StateEntry, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.resolveConflicts") trace, ctx := internal.StartRegion(ctx, "StateResolution.resolveConflicts")
defer span.Finish() defer trace.EndRegion()
stateResAlgo, err := version.StateResAlgorithm() stateResAlgo, err := version.StateResAlgorithm()
if err != nil { if err != nil {
@ -902,8 +902,8 @@ func (v *StateResolution) resolveConflictsV1(
ctx context.Context, ctx context.Context,
notConflicted, conflicted []types.StateEntry, notConflicted, conflicted []types.StateEntry,
) ([]types.StateEntry, error) { ) ([]types.StateEntry, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.resolveConflictsV1") trace, ctx := internal.StartRegion(ctx, "StateResolution.resolveConflictsV1")
defer span.Finish() defer trace.EndRegion()
// Load the conflicted events // Load the conflicted events
conflictedEvents, eventIDMap, err := v.loadStateEvents(ctx, conflicted) conflictedEvents, eventIDMap, err := v.loadStateEvents(ctx, conflicted)
@ -967,8 +967,8 @@ func (v *StateResolution) resolveConflictsV2(
ctx context.Context, ctx context.Context,
notConflicted, conflicted []types.StateEntry, notConflicted, conflicted []types.StateEntry,
) ([]types.StateEntry, error) { ) ([]types.StateEntry, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.resolveConflictsV2") trace, ctx := internal.StartRegion(ctx, "StateResolution.resolveConflictsV2")
defer span.Finish() defer trace.EndRegion()
estimate := len(conflicted) + len(notConflicted) estimate := len(conflicted) + len(notConflicted)
eventIDMap := make(map[string]types.StateEntry, estimate) eventIDMap := make(map[string]types.StateEntry, estimate)
@ -1000,8 +1000,8 @@ func (v *StateResolution) resolveConflictsV2(
// For each conflicted event, let's try and get the needed auth events. // For each conflicted event, let's try and get the needed auth events.
if err = func() error { if err = func() error {
span, sctx := opentracing.StartSpanFromContext(ctx, "StateResolution.loadAuthEvents") loadAuthEventsTrace, sctx := internal.StartRegion(ctx, "StateResolution.loadAuthEvents")
defer span.Finish() defer loadAuthEventsTrace.EndRegion()
loader := authEventLoader{ loader := authEventLoader{
v: v, v: v,
@ -1045,8 +1045,8 @@ func (v *StateResolution) resolveConflictsV2(
// Resolve the conflicts. // Resolve the conflicts.
resolvedEvents := func() []*gomatrixserverlib.Event { resolvedEvents := func() []*gomatrixserverlib.Event {
span, _ := opentracing.StartSpanFromContext(ctx, "gomatrixserverlib.ResolveStateConflictsV2") resolvedTrace, _ := internal.StartRegion(ctx, "StateResolution.ResolveStateConflictsV2")
defer span.Finish() defer resolvedTrace.EndRegion()
return gomatrixserverlib.ResolveStateConflictsV2( return gomatrixserverlib.ResolveStateConflictsV2(
conflictedEvents, conflictedEvents,
@ -1118,8 +1118,8 @@ func (v *StateResolution) stateKeyTuplesNeeded(stateKeyNIDMap map[string]types.E
func (v *StateResolution) loadStateEvents( func (v *StateResolution) loadStateEvents(
ctx context.Context, entries []types.StateEntry, ctx context.Context, entries []types.StateEntry,
) ([]*gomatrixserverlib.Event, map[string]types.StateEntry, error) { ) ([]*gomatrixserverlib.Event, map[string]types.StateEntry, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "StateResolution.loadStateEvents") trace, ctx := internal.StartRegion(ctx, "StateResolution.loadStateEvents")
defer span.Finish() defer trace.EndRegion()
result := make([]*gomatrixserverlib.Event, 0, len(entries)) result := make([]*gomatrixserverlib.Event, 0, len(entries))
eventEntries := make([]types.StateEntry, 0, len(entries)) eventEntries := make([]types.StateEntry, 0, len(entries))