1
0
mirror of https://github.com/matrix-org/dendrite.git synced 2024-06-14 04:56:41 +00:00
matrix-org.dendrite/src/github.com/matrix-org/dendrite/roomserver/input/consumer.go
Mark Haines 41c6a3737e A kafkaesque room event consumer for the roomserver. (#1)
* A kafkaesque room event consumer for the roomserver.

Implement the main input loop for the roomserver.

It will receive events from a kafkaesque event source and track
where it is in the stream.

It currently does nothing with the events it consumes.
2017-02-03 13:52:32 +00:00

104 lines
3.6 KiB
Go

// Package input contains the code that writes
package input
import (
"github.com/matrix-org/dendrite/roomserver/types"
sarama "gopkg.in/Shopify/sarama.v1"
)
// A ConsumerDatabase has the storage APIs needed by the consumer.
type ConsumerDatabase interface {
// PartitionOffsets returns the offsets the consumer has reached for each partition.
PartitionOffsets(topic string) ([]types.PartitionOffset, error)
// SetPartitionOffset records where the consumer has reached for a partition.
SetPartitionOffset(topic string, partition int32, offset int64) error
}
// An ErrorLogger handles the errors encountered by the consumer.
type ErrorLogger interface {
OnError(message *sarama.ConsumerMessage, err error)
}
// A Consumer consumes a kafkaesque stream of room events.
// The room events are supplied as api.InputRoomEvent structs serialised as JSON.
// The events should be valid matrix events.
// The events needed to authenticate the event should already be stored on the roomserver.
// The events needed to construct the state at the event should already be stored on the roomserver.
// If the event is not valid then it will be discarded and an error will be logged.
type Consumer struct {
// A kafkaesque stream consumer providing the APIs for talking to the event source.
// The interface is taken from a client library for Apache Kafka.
// But any equivalent event streaming protocol could be made to implement the same interface.
Consumer sarama.Consumer
// The database used to store the room events.
DB ConsumerDatabase
// The kafkaesque topic to consume room events from.
// This is the name used in kafka to identify the stream to consume events from.
RoomEventTopic string
// The ErrorLogger for this consumer.
// If left as nil then the consumer will panic when it encounters an error
ErrorLogger ErrorLogger
}
// Start starts the consumer consuming.
// Starts up a goroutine for each partition in the kafka stream.
// Returns nil once all the goroutines are started.
// Returns an error if it can't start consuming for any of the partitions.
func (c *Consumer) Start() error {
offsets := map[int32]int64{}
partitions, err := c.Consumer.Partitions(c.RoomEventTopic)
if err != nil {
return err
}
for _, partition := range partitions {
// Default all the offsets to the beginning of the stream.
offsets[partition] = sarama.OffsetOldest
}
storedOffsets, err := c.DB.PartitionOffsets(c.RoomEventTopic)
if err != nil {
return err
}
for _, offset := range storedOffsets {
// We've already processed events from this partition so advance the offset to where we got to.
offsets[offset.Partition] = offset.Offset
}
var partitionConsumers []sarama.PartitionConsumer
for partition, offset := range offsets {
pc, err := c.Consumer.ConsumePartition(c.RoomEventTopic, partition, offset)
if err != nil {
for _, p := range partitionConsumers {
p.Close()
}
return err
}
partitionConsumers = append(partitionConsumers, pc)
}
for _, pc := range partitionConsumers {
go c.consumePartition(pc)
}
return nil
}
// consumePartition consumes the room events for a single partition of the kafkaesque stream.
func (c *Consumer) consumePartition(pc sarama.PartitionConsumer) {
defer pc.Close()
for message := range pc.Messages() {
// TODO: Do stuff with message.
if err := c.DB.SetPartitionOffset(c.RoomEventTopic, message.Partition, message.Offset); err != nil {
c.logError(message, err)
}
}
}
// logError is a convenience method for logging errors.
func (c *Consumer) logError(message *sarama.ConsumerMessage, err error) {
if c.ErrorLogger == nil {
panic(err)
}
c.ErrorLogger.OnError(message, err)
}