mirror of https://github.com/mautrix/go.git
Merge remote-tracking branch 'recht/synccontext'
commit
0a302c753d
15
client.go
15
client.go
|
@ -174,8 +174,15 @@ func (cli *Client) SyncWithContext(ctx context.Context) error {
|
|||
// We will keep syncing until the syncing state changes. Either because
|
||||
// Sync is called or StopSync is called.
|
||||
syncingID := cli.incrementSyncingID()
|
||||
nextBatch := cli.Store.LoadNextBatch(ctx, cli.UserID)
|
||||
filterID := cli.Store.LoadFilterID(ctx, cli.UserID)
|
||||
nextBatch, err := cli.Store.LoadNextBatch(ctx, cli.UserID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
filterID, err := cli.Store.LoadFilterID(ctx, cli.UserID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if filterID == "" {
|
||||
filterJSON := cli.Syncer.GetFilterJSON(cli.UserID)
|
||||
resFilter, err := cli.CreateFilter(ctx, filterJSON)
|
||||
|
@ -183,7 +190,9 @@ func (cli *Client) SyncWithContext(ctx context.Context) error {
|
|||
return err
|
||||
}
|
||||
filterID = resFilter.FilterID
|
||||
cli.Store.SaveFilterID(ctx, cli.UserID, filterID)
|
||||
if err := cli.Store.SaveFilterID(ctx, cli.UserID, filterID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
lastSuccessfulSync := time.Now().Add(-cli.StreamSyncMinAge - 1*time.Hour)
|
||||
for {
|
||||
|
|
|
@ -88,22 +88,27 @@ func (store *SQLCryptoStore) GetNextBatch(ctx context.Context) (string, error) {
|
|||
|
||||
var _ mautrix.SyncStore = (*SQLCryptoStore)(nil)
|
||||
|
||||
func (store *SQLCryptoStore) SaveFilterID(ctx context.Context, _ id.UserID, _ string) {}
|
||||
func (store *SQLCryptoStore) LoadFilterID(ctx context.Context, _ id.UserID) string { return "" }
|
||||
|
||||
func (store *SQLCryptoStore) SaveNextBatch(ctx context.Context, _ id.UserID, nextBatchToken string) {
|
||||
err := store.PutNextBatch(ctx, nextBatchToken)
|
||||
if err != nil {
|
||||
// TODO handle error
|
||||
}
|
||||
func (store *SQLCryptoStore) SaveFilterID(ctx context.Context, _ id.UserID, _ string) error {
|
||||
return nil
|
||||
}
|
||||
func (store *SQLCryptoStore) LoadFilterID(ctx context.Context, _ id.UserID) (string, error) {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
func (store *SQLCryptoStore) LoadNextBatch(ctx context.Context, _ id.UserID) string {
|
||||
func (store *SQLCryptoStore) SaveNextBatch(ctx context.Context, _ id.UserID, nextBatchToken string) error {
|
||||
err := store.PutNextBatch(ctx, nextBatchToken)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to store batch: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (store *SQLCryptoStore) LoadNextBatch(ctx context.Context, _ id.UserID) (string, error) {
|
||||
nb, err := store.GetNextBatch(ctx)
|
||||
if err != nil {
|
||||
// TODO handle error
|
||||
return "", fmt.Errorf("unable to load batch: %w", err)
|
||||
}
|
||||
return nb
|
||||
return nb, nil
|
||||
}
|
||||
|
||||
func (store *SQLCryptoStore) FindDeviceID() (deviceID id.DeviceID) {
|
||||
|
|
45
syncstore.go
45
syncstore.go
|
@ -3,6 +3,7 @@ package mautrix
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"maunium.net/go/mautrix/id"
|
||||
)
|
||||
|
@ -16,10 +17,10 @@ var _ SyncStore = (*AccountDataStore)(nil)
|
|||
// provided "MemorySyncStore" which just keeps data around in-memory which is lost on
|
||||
// restarts.
|
||||
type SyncStore interface {
|
||||
SaveFilterID(ctx context.Context, userID id.UserID, filterID string)
|
||||
LoadFilterID(ctx context.Context, userID id.UserID) string
|
||||
SaveNextBatch(ctx context.Context, userID id.UserID, nextBatchToken string)
|
||||
LoadNextBatch(ctx context.Context, userID id.UserID) string
|
||||
SaveFilterID(ctx context.Context, userID id.UserID, filterID string) error
|
||||
LoadFilterID(ctx context.Context, userID id.UserID) (string, error)
|
||||
SaveNextBatch(ctx context.Context, userID id.UserID, nextBatchToken string) error
|
||||
LoadNextBatch(ctx context.Context, userID id.UserID) (string, error)
|
||||
}
|
||||
|
||||
// Deprecated: renamed to SyncStore
|
||||
|
@ -36,23 +37,25 @@ type MemorySyncStore struct {
|
|||
}
|
||||
|
||||
// SaveFilterID to memory.
|
||||
func (s *MemorySyncStore) SaveFilterID(ctx context.Context, userID id.UserID, filterID string) {
|
||||
func (s *MemorySyncStore) SaveFilterID(ctx context.Context, userID id.UserID, filterID string) error {
|
||||
s.Filters[userID] = filterID
|
||||
return nil
|
||||
}
|
||||
|
||||
// LoadFilterID from memory.
|
||||
func (s *MemorySyncStore) LoadFilterID(ctx context.Context, userID id.UserID) string {
|
||||
return s.Filters[userID]
|
||||
func (s *MemorySyncStore) LoadFilterID(ctx context.Context, userID id.UserID) (string, error) {
|
||||
return s.Filters[userID], nil
|
||||
}
|
||||
|
||||
// SaveNextBatch to memory.
|
||||
func (s *MemorySyncStore) SaveNextBatch(ctx context.Context, userID id.UserID, nextBatchToken string) {
|
||||
func (s *MemorySyncStore) SaveNextBatch(ctx context.Context, userID id.UserID, nextBatchToken string) error {
|
||||
s.NextBatch[userID] = nextBatchToken
|
||||
return nil
|
||||
}
|
||||
|
||||
// LoadNextBatch from memory.
|
||||
func (s *MemorySyncStore) LoadNextBatch(ctx context.Context, userID id.UserID) string {
|
||||
return s.NextBatch[userID]
|
||||
func (s *MemorySyncStore) LoadNextBatch(ctx context.Context, userID id.UserID) (string, error) {
|
||||
return s.NextBatch[userID], nil
|
||||
}
|
||||
|
||||
// NewMemorySyncStore constructs a new MemorySyncStore.
|
||||
|
@ -76,25 +79,26 @@ type accountData struct {
|
|||
NextBatch string `json:"next_batch"`
|
||||
}
|
||||
|
||||
func (s *AccountDataStore) SaveFilterID(ctx context.Context, userID id.UserID, filterID string) {
|
||||
func (s *AccountDataStore) SaveFilterID(ctx context.Context, userID id.UserID, filterID string) error {
|
||||
if userID.String() != s.client.UserID.String() {
|
||||
panic("AccountDataStore must only be used with a single account")
|
||||
}
|
||||
s.FilterID = filterID
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *AccountDataStore) LoadFilterID(ctx context.Context, userID id.UserID) string {
|
||||
func (s *AccountDataStore) LoadFilterID(ctx context.Context, userID id.UserID) (string, error) {
|
||||
if userID.String() != s.client.UserID.String() {
|
||||
panic("AccountDataStore must only be used with a single account")
|
||||
}
|
||||
return s.FilterID
|
||||
return s.FilterID, nil
|
||||
}
|
||||
|
||||
func (s *AccountDataStore) SaveNextBatch(ctx context.Context, userID id.UserID, nextBatchToken string) {
|
||||
func (s *AccountDataStore) SaveNextBatch(ctx context.Context, userID id.UserID, nextBatchToken string) error {
|
||||
if userID.String() != s.client.UserID.String() {
|
||||
panic("AccountDataStore must only be used with a single account")
|
||||
} else if nextBatchToken == s.nextBatch {
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
data := accountData{
|
||||
|
@ -103,7 +107,7 @@ func (s *AccountDataStore) SaveNextBatch(ctx context.Context, userID id.UserID,
|
|||
|
||||
err := s.client.SetAccountData(ctx, s.EventType, data)
|
||||
if err != nil {
|
||||
s.client.Log.Warn().Err(err).Msg("Failed to save next batch token to account data")
|
||||
return fmt.Errorf("failed to save next batch token to account data: %w", err)
|
||||
} else {
|
||||
s.client.Log.Debug().
|
||||
Str("old_token", s.nextBatch).
|
||||
|
@ -111,9 +115,10 @@ func (s *AccountDataStore) SaveNextBatch(ctx context.Context, userID id.UserID,
|
|||
Msg("Saved next batch token")
|
||||
s.nextBatch = nextBatchToken
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *AccountDataStore) LoadNextBatch(ctx context.Context, userID id.UserID) string {
|
||||
func (s *AccountDataStore) LoadNextBatch(ctx context.Context, userID id.UserID) (string, error) {
|
||||
if userID.String() != s.client.UserID.String() {
|
||||
panic("AccountDataStore must only be used with a single account")
|
||||
}
|
||||
|
@ -124,15 +129,15 @@ func (s *AccountDataStore) LoadNextBatch(ctx context.Context, userID id.UserID)
|
|||
if err != nil {
|
||||
if errors.Is(err, MNotFound) {
|
||||
s.client.Log.Debug().Msg("No next batch token found in account data")
|
||||
return "", nil
|
||||
} else {
|
||||
s.client.Log.Warn().Err(err).Msg("Failed to load next batch token from account data")
|
||||
return "", fmt.Errorf("failed to load next batch token from account data: %w", err)
|
||||
}
|
||||
return ""
|
||||
}
|
||||
s.nextBatch = data.NextBatch
|
||||
s.client.Log.Debug().Str("next_batch", data.NextBatch).Msg("Loaded next batch token from account data")
|
||||
|
||||
return s.nextBatch
|
||||
return s.nextBatch, nil
|
||||
}
|
||||
|
||||
// NewAccountDataStore returns a new AccountDataStore, which stores
|
||||
|
|
Loading…
Reference in New Issue