dendrite/userapi/consumers/devicelistupdate.go

92 lines
2.9 KiB
Go

// Copyright 2024 New Vector Ltd.
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
// Please see LICENSE files in the repository root for full details.
package consumers
import (
"context"
"encoding/json"
"time"
"github.com/element-hq/dendrite/userapi/internal"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/gomatrixserverlib/spec"
"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
"github.com/element-hq/dendrite/setup/config"
"github.com/element-hq/dendrite/setup/jetstream"
"github.com/element-hq/dendrite/setup/process"
)
// DeviceListUpdateConsumer consumes device list updates that came in over federation.
type DeviceListUpdateConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
durable string
topic string
updater *internal.DeviceListUpdater
isLocalServerName func(spec.ServerName) bool
}
// NewDeviceListUpdateConsumer creates a new DeviceListConsumer. Call Start() to begin consuming from key servers.
func NewDeviceListUpdateConsumer(
process *process.ProcessContext,
cfg *config.UserAPI,
js nats.JetStreamContext,
updater *internal.DeviceListUpdater,
) *DeviceListUpdateConsumer {
return &DeviceListUpdateConsumer{
ctx: process.Context(),
jetstream: js,
durable: cfg.Matrix.JetStream.Prefixed("KeyServerInputDeviceListConsumer"),
topic: cfg.Matrix.JetStream.Prefixed(jetstream.InputDeviceListUpdate),
updater: updater,
isLocalServerName: cfg.Matrix.IsLocalServerName,
}
}
// Start consuming from key servers
func (t *DeviceListUpdateConsumer) Start() error {
return jetstream.JetStreamConsumer(
t.ctx, t.jetstream, t.topic, t.durable, 1,
t.onMessage, nats.DeliverAll(), nats.ManualAck(),
)
}
// onMessage is called in response to a message received on the
// key change events topic from the key server.
func (t *DeviceListUpdateConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
msg := msgs[0] // Guaranteed to exist if onMessage is called
var m gomatrixserverlib.DeviceListUpdateEvent
if err := json.Unmarshal(msg.Data, &m); err != nil {
logrus.WithError(err).Errorf("Failed to read from device list update input topic")
return true
}
origin := spec.ServerName(msg.Header.Get("origin"))
if _, serverName, err := gomatrixserverlib.SplitID('@', m.UserID); err != nil {
return true
} else if t.isLocalServerName(serverName) {
return true
} else if serverName != origin {
return true
}
timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
err := t.updater.Update(timeoutCtx, m)
if err != nil {
logrus.WithFields(logrus.Fields{
"user_id": m.UserID,
"device_id": m.DeviceID,
"stream_id": m.StreamID,
"prev_id": m.PrevID,
}).WithError(err).Errorf("Failed to update device list")
return false
}
return true
}