mirror of https://github.com/usememos/memos
Merge 3bea950588
into 3245613a88
commit
337201e9f0
@ -0,0 +1,87 @@
|
||||
package v1
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
|
||||
"github.com/labstack/echo/v4"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"github.com/usememos/memos/store"
|
||||
"github.com/usememos/memos/store/cache"
|
||||
)
|
||||
|
||||
// GetCacheStatus returns the current cache status for monitoring and debugging.
|
||||
func (s *APIV1Service) GetCacheStatus(ctx context.Context) (*CacheStatusResponse, error) {
|
||||
// Check if user is admin
|
||||
currentUser, err := s.GetCurrentUser(ctx)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "failed to get current user: %v", err)
|
||||
}
|
||||
if currentUser == nil {
|
||||
return nil, status.Errorf(codes.Unauthenticated, "user not authenticated")
|
||||
}
|
||||
if currentUser.Role != store.RoleHost && currentUser.Role != store.RoleAdmin {
|
||||
return nil, status.Errorf(codes.PermissionDenied, "only admins can access cache status")
|
||||
}
|
||||
|
||||
response := &CacheStatusResponse{
|
||||
UserCache: getCacheInfo(s.Store.GetUserCache()),
|
||||
UserSettingCache: getCacheInfo(s.Store.GetUserSettingCache()),
|
||||
WorkspaceSettingCache: getCacheInfo(s.Store.GetWorkspaceSettingCache()),
|
||||
}
|
||||
|
||||
return response, nil
|
||||
}
|
||||
|
||||
// getCacheInfo extracts cache information from a cache instance.
|
||||
func getCacheInfo(c cache.Interface) *CacheInfo {
|
||||
info := &CacheInfo{
|
||||
Size: c.Size(),
|
||||
Type: "local",
|
||||
}
|
||||
|
||||
// Check if it's a hybrid cache to get additional info
|
||||
if hybrid, ok := c.(*cache.HybridCache); ok {
|
||||
info.Type = "hybrid"
|
||||
stats := hybrid.GetStats()
|
||||
info.RedisAvailable = stats.RedisAvailable
|
||||
info.PodId = stats.PodID
|
||||
info.LocalSize = stats.LocalSize
|
||||
info.RedisSize = stats.RedisSize
|
||||
info.EventQueueSize = stats.EventQueueSize
|
||||
}
|
||||
|
||||
return info
|
||||
}
|
||||
|
||||
// CacheStatusResponse contains cache status information.
|
||||
type CacheStatusResponse struct {
|
||||
UserCache *CacheInfo `json:"user_cache"`
|
||||
UserSettingCache *CacheInfo `json:"user_setting_cache"`
|
||||
WorkspaceSettingCache *CacheInfo `json:"workspace_setting_cache"`
|
||||
}
|
||||
|
||||
// CacheInfo contains information about a specific cache.
|
||||
type CacheInfo struct {
|
||||
Type string `json:"type"` // "local" or "hybrid"
|
||||
Size int64 `json:"size"` // Total items in cache
|
||||
LocalSize int64 `json:"local_size"` // Items in local cache (for hybrid)
|
||||
RedisSize int64 `json:"redis_size"` // Items in Redis (for hybrid)
|
||||
RedisAvailable bool `json:"redis_available"` // Whether Redis is available
|
||||
PodId string `json:"pod_id"` // Unique pod identifier
|
||||
EventQueueSize int64 `json:"event_queue_size"` // Pending cache events
|
||||
}
|
||||
|
||||
// registerCacheRoutes registers cache-related REST endpoints.
|
||||
func (s *APIV1Service) registerCacheRoutes(g *echo.Group) {
|
||||
g.GET("/cache/status", func(c echo.Context) error {
|
||||
ctx := c.Request().Context()
|
||||
response, err := s.GetCacheStatus(ctx)
|
||||
if err != nil {
|
||||
return echo.NewHTTPError(http.StatusInternalServerError, err.Error())
|
||||
}
|
||||
return c.JSON(http.StatusOK, response)
|
||||
})
|
||||
}
|
@ -0,0 +1,266 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// HybridCache provides a Redis-backed cache with in-memory fallback.
|
||||
// It automatically handles Redis failures by falling back to local cache.
|
||||
type HybridCache struct {
|
||||
redis *RedisCache
|
||||
local *Cache
|
||||
config Config
|
||||
podID string
|
||||
|
||||
// Event handling
|
||||
mu sync.RWMutex
|
||||
subscription context.CancelFunc
|
||||
eventCh chan CacheEvent
|
||||
}
|
||||
|
||||
// NewHybridCache creates a new hybrid cache with Redis primary and local fallback.
|
||||
func NewHybridCache(redisConfig RedisConfig, cacheConfig Config) (*HybridCache, error) {
|
||||
// Create Redis cache
|
||||
redisCache, err := NewRedisCache(redisConfig, cacheConfig)
|
||||
if err != nil {
|
||||
slog.Warn("failed to create Redis cache, falling back to local cache only", "error", err)
|
||||
return &HybridCache{
|
||||
local: New(cacheConfig),
|
||||
config: cacheConfig,
|
||||
podID: generatePodID(),
|
||||
eventCh: make(chan CacheEvent, 100),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Create local cache for fallback
|
||||
localCache := New(cacheConfig)
|
||||
|
||||
hybrid := &HybridCache{
|
||||
redis: redisCache,
|
||||
local: localCache,
|
||||
config: cacheConfig,
|
||||
podID: generatePodID(),
|
||||
eventCh: make(chan CacheEvent, 100),
|
||||
}
|
||||
|
||||
// Start event subscription if Redis is available
|
||||
if redisCache != nil {
|
||||
hybrid.startEventSubscription()
|
||||
}
|
||||
|
||||
return hybrid, nil
|
||||
}
|
||||
|
||||
// generatePodID creates a unique identifier for this pod instance.
|
||||
func generatePodID() string {
|
||||
return uuid.New().String()[:8]
|
||||
}
|
||||
|
||||
// startEventSubscription begins listening for cache events from other pods.
|
||||
func (h *HybridCache) startEventSubscription() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
h.subscription = cancel
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
slog.Error("cache event subscription panicked", "panic", r)
|
||||
}
|
||||
}()
|
||||
|
||||
err := h.redis.Subscribe(ctx, h.handleCacheEvent)
|
||||
if err != nil && err != context.Canceled {
|
||||
slog.Error("Redis subscription failed", "error", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Start event processor
|
||||
go h.processEvents(ctx)
|
||||
}
|
||||
|
||||
// handleCacheEvent processes cache events from other pods.
|
||||
func (h *HybridCache) handleCacheEvent(event CacheEvent) {
|
||||
// Ignore events from this pod
|
||||
if event.Source == h.podID {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case h.eventCh <- event:
|
||||
// Event queued successfully
|
||||
default:
|
||||
// Channel full, drop event
|
||||
slog.Warn("cache event channel full, dropping event", "event", event)
|
||||
}
|
||||
}
|
||||
|
||||
// processEvents processes queued cache events.
|
||||
func (h *HybridCache) processEvents(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case event := <-h.eventCh:
|
||||
h.processEvent(event)
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// processEvent handles a single cache event.
|
||||
func (h *HybridCache) processEvent(event CacheEvent) {
|
||||
switch event.Type {
|
||||
case "delete":
|
||||
h.local.Delete(context.Background(), event.Key)
|
||||
case "clear":
|
||||
h.local.Clear(context.Background())
|
||||
}
|
||||
}
|
||||
|
||||
// Set adds a value to both Redis and local cache.
|
||||
func (h *HybridCache) Set(ctx context.Context, key string, value any) {
|
||||
h.SetWithTTL(ctx, key, value, h.config.DefaultTTL)
|
||||
}
|
||||
|
||||
// SetWithTTL adds a value to both Redis and local cache with custom TTL.
|
||||
func (h *HybridCache) SetWithTTL(ctx context.Context, key string, value any, ttl time.Duration) {
|
||||
// Always set in local cache first
|
||||
h.local.SetWithTTL(ctx, key, value, ttl)
|
||||
|
||||
// Try to set in Redis (no event needed - other pods will get it on demand)
|
||||
if h.redis != nil {
|
||||
h.redis.SetWithTTL(ctx, key, value, ttl)
|
||||
}
|
||||
}
|
||||
|
||||
// Get retrieves a value from cache, trying local first for speed, then Redis.
|
||||
func (h *HybridCache) Get(ctx context.Context, key string) (any, bool) {
|
||||
// Try local cache first for speed
|
||||
if value, ok := h.local.Get(ctx, key); ok {
|
||||
return value, true
|
||||
}
|
||||
|
||||
// Try Redis if local cache miss and Redis is available
|
||||
if h.redis != nil {
|
||||
if value, ok := h.redis.Get(ctx, key); ok {
|
||||
// Populate local cache for faster subsequent access
|
||||
h.local.SetWithTTL(ctx, key, value, h.config.DefaultTTL)
|
||||
return value, true
|
||||
}
|
||||
}
|
||||
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// Delete removes a value from both Redis and local cache.
|
||||
func (h *HybridCache) Delete(ctx context.Context, key string) {
|
||||
// Delete from local cache immediately
|
||||
h.local.Delete(ctx, key)
|
||||
|
||||
// Try to delete from Redis and notify other pods
|
||||
if h.redis != nil {
|
||||
h.redis.Delete(ctx, key)
|
||||
|
||||
// Publish delete event to other pods
|
||||
event := CacheEvent{
|
||||
Type: "delete",
|
||||
Key: key,
|
||||
Timestamp: time.Now(),
|
||||
Source: h.podID,
|
||||
}
|
||||
|
||||
if err := h.redis.Publish(ctx, event); err != nil {
|
||||
slog.Debug("failed to publish cache delete event", "key", key, "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Clear removes all values from both Redis and local cache.
|
||||
func (h *HybridCache) Clear(ctx context.Context) {
|
||||
// Clear local cache immediately
|
||||
h.local.Clear(ctx)
|
||||
|
||||
// Try to clear Redis and notify other pods
|
||||
if h.redis != nil {
|
||||
h.redis.Clear(ctx)
|
||||
|
||||
// Publish clear event to other pods
|
||||
event := CacheEvent{
|
||||
Type: "clear",
|
||||
Key: "",
|
||||
Timestamp: time.Now(),
|
||||
Source: h.podID,
|
||||
}
|
||||
|
||||
if err := h.redis.Publish(ctx, event); err != nil {
|
||||
slog.Debug("failed to publish cache clear event", "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Size returns the size of the local cache (Redis size is expensive to compute).
|
||||
func (h *HybridCache) Size() int64 {
|
||||
return h.local.Size()
|
||||
}
|
||||
|
||||
// Close stops all background processes and closes connections.
|
||||
func (h *HybridCache) Close() error {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
// Stop event subscription
|
||||
if h.subscription != nil {
|
||||
h.subscription()
|
||||
h.subscription = nil
|
||||
}
|
||||
|
||||
// Close local cache
|
||||
if err := h.local.Close(); err != nil {
|
||||
slog.Error("failed to close local cache", "error", err)
|
||||
}
|
||||
|
||||
// Close Redis cache
|
||||
if h.redis != nil {
|
||||
if err := h.redis.Close(); err != nil {
|
||||
slog.Error("failed to close Redis cache", "error", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsRedisAvailable returns true if Redis cache is available.
|
||||
func (h *HybridCache) IsRedisAvailable() bool {
|
||||
return h.redis != nil
|
||||
}
|
||||
|
||||
// GetStats returns cache statistics.
|
||||
func (h *HybridCache) GetStats() CacheStats {
|
||||
stats := CacheStats{
|
||||
LocalSize: h.local.Size(),
|
||||
RedisAvailable: h.redis != nil,
|
||||
PodID: h.podID,
|
||||
EventQueueSize: int64(len(h.eventCh)),
|
||||
}
|
||||
|
||||
if h.redis != nil {
|
||||
// Note: Redis size is expensive, only call when needed
|
||||
stats.RedisSize = h.redis.Size()
|
||||
}
|
||||
|
||||
return stats
|
||||
}
|
||||
|
||||
// CacheStats provides information about cache state.
|
||||
type CacheStats struct {
|
||||
LocalSize int64 `json:"local_size"`
|
||||
RedisSize int64 `json:"redis_size"`
|
||||
RedisAvailable bool `json:"redis_available"`
|
||||
PodID string `json:"pod_id"`
|
||||
EventQueueSize int64 `json:"event_queue_size"`
|
||||
}
|
@ -0,0 +1,237 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
// RedisCache implements the Interface using Redis as the backend.
|
||||
type RedisCache struct {
|
||||
client *redis.Client
|
||||
config Config
|
||||
prefix string
|
||||
}
|
||||
|
||||
// RedisConfig contains Redis-specific configuration.
|
||||
type RedisConfig struct {
|
||||
// Redis connection URL (redis://localhost:6379)
|
||||
URL string
|
||||
// Connection pool size
|
||||
PoolSize int
|
||||
// Connection timeout
|
||||
DialTimeout time.Duration
|
||||
// Read timeout
|
||||
ReadTimeout time.Duration
|
||||
// Write timeout
|
||||
WriteTimeout time.Duration
|
||||
// Key prefix for all cache keys
|
||||
KeyPrefix string
|
||||
}
|
||||
|
||||
// NewRedisCache creates a new Redis-backed cache with the given configuration.
|
||||
func NewRedisCache(redisConfig RedisConfig, cacheConfig Config) (*RedisCache, error) {
|
||||
// Parse Redis URL
|
||||
opts, err := redis.ParseURL(redisConfig.URL)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse Redis URL: %w", err)
|
||||
}
|
||||
|
||||
// Override with provided configuration
|
||||
if redisConfig.PoolSize > 0 {
|
||||
opts.PoolSize = redisConfig.PoolSize
|
||||
}
|
||||
if redisConfig.DialTimeout > 0 {
|
||||
opts.DialTimeout = redisConfig.DialTimeout
|
||||
}
|
||||
if redisConfig.ReadTimeout > 0 {
|
||||
opts.ReadTimeout = redisConfig.ReadTimeout
|
||||
}
|
||||
if redisConfig.WriteTimeout > 0 {
|
||||
opts.WriteTimeout = redisConfig.WriteTimeout
|
||||
}
|
||||
|
||||
client := redis.NewClient(opts)
|
||||
|
||||
// Test connection
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
if err := client.Ping(ctx).Err(); err != nil {
|
||||
return nil, fmt.Errorf("failed to connect to Redis: %w", err)
|
||||
}
|
||||
|
||||
prefix := redisConfig.KeyPrefix
|
||||
if prefix == "" {
|
||||
prefix = "memos:cache"
|
||||
}
|
||||
|
||||
return &RedisCache{
|
||||
client: client,
|
||||
config: cacheConfig,
|
||||
prefix: prefix,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// buildKey creates a prefixed cache key.
|
||||
func (r *RedisCache) buildKey(key string) string {
|
||||
return fmt.Sprintf("%s:%s", r.prefix, key)
|
||||
}
|
||||
|
||||
// Set adds a value to the cache with the default TTL.
|
||||
func (r *RedisCache) Set(ctx context.Context, key string, value any) {
|
||||
r.SetWithTTL(ctx, key, value, r.config.DefaultTTL)
|
||||
}
|
||||
|
||||
// SetWithTTL adds a value to the cache with a custom TTL.
|
||||
func (r *RedisCache) SetWithTTL(ctx context.Context, key string, value any, ttl time.Duration) {
|
||||
data, err := json.Marshal(value)
|
||||
if err != nil {
|
||||
slog.Error("failed to marshal cache value", "key", key, "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
redisKey := r.buildKey(key)
|
||||
if err := r.client.Set(ctx, redisKey, data, ttl).Err(); err != nil {
|
||||
slog.Error("failed to set cache value in Redis", "key", redisKey, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Get retrieves a value from the cache.
|
||||
func (r *RedisCache) Get(ctx context.Context, key string) (any, bool) {
|
||||
redisKey := r.buildKey(key)
|
||||
data, err := r.client.Get(ctx, redisKey).Bytes()
|
||||
if err != nil {
|
||||
if err == redis.Nil {
|
||||
return nil, false
|
||||
}
|
||||
slog.Error("failed to get cache value from Redis", "key", redisKey, "error", err)
|
||||
return nil, false
|
||||
}
|
||||
|
||||
var value any
|
||||
if err := json.Unmarshal(data, &value); err != nil {
|
||||
slog.Error("failed to unmarshal cache value", "key", redisKey, "error", err)
|
||||
return nil, false
|
||||
}
|
||||
|
||||
return value, true
|
||||
}
|
||||
|
||||
// Delete removes a value from the cache.
|
||||
func (r *RedisCache) Delete(ctx context.Context, key string) {
|
||||
redisKey := r.buildKey(key)
|
||||
if err := r.client.Del(ctx, redisKey).Err(); err != nil {
|
||||
slog.Error("failed to delete cache value from Redis", "key", redisKey, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Clear removes all values from the cache with the configured prefix.
|
||||
func (r *RedisCache) Clear(ctx context.Context) {
|
||||
// Use SCAN to find all keys with our prefix
|
||||
pattern := fmt.Sprintf("%s:*", r.prefix)
|
||||
|
||||
iter := r.client.Scan(ctx, 0, pattern, 0).Iterator()
|
||||
keys := make([]string, 0)
|
||||
|
||||
for iter.Next(ctx) {
|
||||
keys = append(keys, iter.Val())
|
||||
}
|
||||
|
||||
if err := iter.Err(); err != nil {
|
||||
slog.Error("failed to scan Redis keys", "pattern", pattern, "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
if len(keys) > 0 {
|
||||
if err := r.client.Del(ctx, keys...).Err(); err != nil {
|
||||
slog.Error("failed to delete Redis keys", "pattern", pattern, "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Size returns the number of items in the cache with our prefix.
|
||||
// Note: This is an expensive operation in Redis and should be used sparingly.
|
||||
func (r *RedisCache) Size() int64 {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
pattern := fmt.Sprintf("%s:*", r.prefix)
|
||||
|
||||
iter := r.client.Scan(ctx, 0, pattern, 0).Iterator()
|
||||
count := int64(0)
|
||||
|
||||
for iter.Next(ctx) {
|
||||
count++
|
||||
}
|
||||
|
||||
if err := iter.Err(); err != nil {
|
||||
slog.Error("failed to count Redis keys", "pattern", pattern, "error", err)
|
||||
return 0
|
||||
}
|
||||
|
||||
return count
|
||||
}
|
||||
|
||||
// Close closes the Redis connection.
|
||||
func (r *RedisCache) Close() error {
|
||||
return r.client.Close()
|
||||
}
|
||||
|
||||
// Publish publishes a cache invalidation event to other instances.
|
||||
func (r *RedisCache) Publish(ctx context.Context, event CacheEvent) error {
|
||||
data, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal cache event: %w", err)
|
||||
}
|
||||
|
||||
channel := fmt.Sprintf("%s:events", r.prefix)
|
||||
if err := r.client.Publish(ctx, channel, data).Err(); err != nil {
|
||||
return fmt.Errorf("failed to publish cache event: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Subscribe subscribes to cache invalidation events from other instances.
|
||||
func (r *RedisCache) Subscribe(ctx context.Context, handler func(CacheEvent)) error {
|
||||
channel := fmt.Sprintf("%s:events", r.prefix)
|
||||
|
||||
pubsub := r.client.Subscribe(ctx, channel)
|
||||
defer pubsub.Close()
|
||||
|
||||
// Start receiving messages
|
||||
ch := pubsub.Channel()
|
||||
|
||||
slog.Info("subscribed to Redis cache events", "channel", channel)
|
||||
|
||||
for {
|
||||
select {
|
||||
case msg := <-ch:
|
||||
var event CacheEvent
|
||||
if err := json.Unmarshal([]byte(msg.Payload), &event); err != nil {
|
||||
slog.Error("failed to unmarshal cache event", "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
slog.Debug("received cache event", "event", event)
|
||||
handler(event)
|
||||
|
||||
case <-ctx.Done():
|
||||
slog.Info("cache event subscription cancelled")
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// CacheEvent represents a cache invalidation event.
|
||||
type CacheEvent struct {
|
||||
Type string `json:"type"` // "set", "delete", "clear"
|
||||
Key string `json:"key"` // cache key (without prefix)
|
||||
Timestamp time.Time `json:"timestamp"` // when the event occurred
|
||||
Source string `json:"source"` // identifier of the pod that generated the event
|
||||
}
|
@ -0,0 +1,245 @@
|
||||
package teststore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/crypto/bcrypt"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
|
||||
storepb "github.com/usememos/memos/proto/gen/store"
|
||||
"github.com/usememos/memos/store"
|
||||
"github.com/usememos/memos/store/cache"
|
||||
"github.com/usememos/memos/store/db"
|
||||
)
|
||||
|
||||
// TestDistributedSessionStore tests the core business problem we solved:
|
||||
// Multi-pod Kubernetes deployments sharing user sessions to fix SSO redirect issues
|
||||
func TestDistributedSessionStore(t *testing.T) {
|
||||
redisURL := os.Getenv("REDIS_URL")
|
||||
if redisURL == "" {
|
||||
t.Skip("REDIS_URL not set, skipping distributed session tests - this tests the core K8s scaling feature")
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Create two store instances to simulate multiple K8s pods
|
||||
store1 := createStoreWithRedisCache(ctx, t, "pod1")
|
||||
defer store1.Close()
|
||||
|
||||
store2 := createStoreWithRedisCache(ctx, t, "pod2")
|
||||
defer store2.Close()
|
||||
|
||||
// Give time for cache initialization
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Test the core SSO redirect issue: session created in pod1 should be available in pod2
|
||||
t.Run("SSO_RedirectFix_SessionSharingAcrossPods", func(t *testing.T) {
|
||||
testSessionSharingAcrossPods(t, ctx, store1, store2)
|
||||
})
|
||||
|
||||
// Test session cleanup works across pods
|
||||
t.Run("SessionInvalidationAcrossPods", func(t *testing.T) {
|
||||
testSessionInvalidationAcrossPods(t, ctx, store1, store2)
|
||||
})
|
||||
|
||||
// Test user settings sync (part of session management)
|
||||
t.Run("UserSettingsSynchronization", func(t *testing.T) {
|
||||
testUserSettingsSynchronization(t, ctx, store1, store2)
|
||||
})
|
||||
}
|
||||
|
||||
func createStoreWithRedisCache(ctx context.Context, t *testing.T, instanceID string) *store.Store {
|
||||
redisURL := os.Getenv("REDIS_URL")
|
||||
|
||||
// Create profile for testing
|
||||
profile := getTestingProfile(t)
|
||||
profile.Data = fmt.Sprintf("%s_%s", profile.Data, instanceID)
|
||||
|
||||
// Create database driver
|
||||
dbDriver, err := db.NewDBDriver(profile)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Reset and migrate database
|
||||
resetTestingDB(ctx, profile, dbDriver)
|
||||
|
||||
// Create store with Redis cache
|
||||
testStore := store.New(dbDriver, profile)
|
||||
|
||||
// Override cache with Redis-enabled cache for testing
|
||||
redisConfig := cache.RedisConfig{
|
||||
URL: redisURL,
|
||||
PoolSize: 10,
|
||||
DialTimeout: 5 * time.Second,
|
||||
ReadTimeout: 3 * time.Second,
|
||||
WriteTimeout: 3 * time.Second,
|
||||
KeyPrefix: fmt.Sprintf("test-%s", instanceID),
|
||||
}
|
||||
|
||||
localConfig := cache.Config{
|
||||
MaxItems: 100,
|
||||
DefaultTTL: time.Hour,
|
||||
CleanupInterval: time.Minute,
|
||||
}
|
||||
|
||||
hybridCache, err := cache.NewHybridCache(redisConfig, localConfig)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Set the hybrid cache for user settings
|
||||
testStore.SetUserSettingCache(hybridCache)
|
||||
|
||||
// Migrate database
|
||||
err = testStore.Migrate(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
return testStore
|
||||
}
|
||||
|
||||
func testSessionSharingAcrossPods(t *testing.T, ctx context.Context, store1, store2 *store.Store) {
|
||||
// Create a user in store1
|
||||
user, err := createTestingHostUser(ctx, store1)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Add session to user in store1
|
||||
sessionID := "test-session-12345"
|
||||
now := timestamppb.Now()
|
||||
session := &storepb.SessionsUserSetting_Session{
|
||||
SessionId: sessionID,
|
||||
CreateTime: now,
|
||||
LastAccessedTime: now,
|
||||
ClientInfo: &storepb.SessionsUserSetting_ClientInfo{},
|
||||
}
|
||||
|
||||
err = store1.AddUserSession(ctx, user.ID, session)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Give time for cache synchronization
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Verify session is available in store2
|
||||
sessions, err := store2.GetUserSessions(ctx, user.ID)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, sessions, 1)
|
||||
require.Equal(t, sessionID, sessions[0].SessionId)
|
||||
}
|
||||
|
||||
func testSessionInvalidationAcrossPods(t *testing.T, ctx context.Context, store1, store2 *store.Store) {
|
||||
// Create a user and add session
|
||||
user, err := createTestingHostUser(ctx, store1)
|
||||
require.NoError(t, err)
|
||||
|
||||
sessionID1 := "test-session-invalidate-1"
|
||||
sessionID2 := "test-session-invalidate-2"
|
||||
|
||||
session1 := &storepb.SessionsUserSetting_Session{
|
||||
SessionId: sessionID1,
|
||||
CreateTime: timestamppb.Now(),
|
||||
LastAccessedTime: timestamppb.Now(),
|
||||
ClientInfo: &storepb.SessionsUserSetting_ClientInfo{},
|
||||
}
|
||||
session2 := &storepb.SessionsUserSetting_Session{
|
||||
SessionId: sessionID2,
|
||||
CreateTime: timestamppb.Now(),
|
||||
LastAccessedTime: timestamppb.Now(),
|
||||
ClientInfo: &storepb.SessionsUserSetting_ClientInfo{},
|
||||
}
|
||||
|
||||
err = store1.AddUserSession(ctx, user.ID, session1)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = store1.AddUserSession(ctx, user.ID, session2)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Give time for synchronization
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Verify both sessions exist in store2
|
||||
sessions, err := store2.GetUserSessions(ctx, user.ID)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, sessions, 2)
|
||||
|
||||
// Remove one session from store1
|
||||
err = store1.RemoveUserSession(ctx, user.ID, sessionID1)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Give time for cache invalidation
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Verify session is removed from store2 as well
|
||||
sessions, err = store2.GetUserSessions(ctx, user.ID)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, sessions, 1)
|
||||
require.Equal(t, sessionID2, sessions[0].SessionId)
|
||||
}
|
||||
|
||||
func testUserSettingsSynchronization(t *testing.T, ctx context.Context, store1, store2 *store.Store) {
|
||||
// Create a user
|
||||
user, err := createTestingHostUser(ctx, store1)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Create user setting in store1
|
||||
generalSetting := &storepb.UserSetting{
|
||||
UserId: user.ID,
|
||||
Key: storepb.UserSetting_GENERAL,
|
||||
Value: &storepb.UserSetting_General{
|
||||
General: &storepb.GeneralUserSetting{
|
||||
Locale: "en-US",
|
||||
Theme: "dark",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
_, err = store1.UpsertUserSetting(ctx, generalSetting)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Give time for cache synchronization
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Verify setting is available in store2
|
||||
settings, err := store2.ListUserSettings(ctx, &store.FindUserSetting{
|
||||
UserID: &user.ID,
|
||||
Key: storepb.UserSetting_GENERAL,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, settings, 1)
|
||||
require.Equal(t, "en-US", settings[0].GetGeneral().Locale)
|
||||
require.Equal(t, "dark", settings[0].GetGeneral().Theme)
|
||||
|
||||
// Update setting in store2
|
||||
generalSetting.Value.(*storepb.UserSetting_General).General.Theme = "light"
|
||||
_, err = store2.UpsertUserSetting(ctx, generalSetting)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Give time for synchronization
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Verify update is reflected in store1
|
||||
settings, err = store1.ListUserSettings(ctx, &store.FindUserSetting{
|
||||
UserID: &user.ID,
|
||||
Key: storepb.UserSetting_GENERAL,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, settings, 1)
|
||||
require.Equal(t, "light", settings[0].GetGeneral().Theme)
|
||||
}
|
||||
|
||||
func createTestingHostUserWithName(ctx context.Context, ts *store.Store, username string) (*store.User, error) {
|
||||
userCreate := &store.User{
|
||||
Username: username,
|
||||
Role: store.RoleHost,
|
||||
Email: fmt.Sprintf("%s@test.com", username),
|
||||
Nickname: fmt.Sprintf("%s_nickname", username),
|
||||
Description: fmt.Sprintf("%s_description", username),
|
||||
}
|
||||
passwordHash, err := bcrypt.GenerateFromPassword([]byte("test_password"), bcrypt.DefaultCost)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
userCreate.PasswordHash = string(passwordHash)
|
||||
user, err := ts.CreateUser(ctx, userCreate)
|
||||
return user, err
|
||||
}
|
@ -0,0 +1,131 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Comprehensive cache testing script for Memos distributed caching
|
||||
set -e
|
||||
|
||||
echo "🧪 Running Comprehensive Cache Tests for Memos"
|
||||
echo "=============================================="
|
||||
|
||||
# Colors for output
|
||||
RED='\033[0;31m'
|
||||
GREEN='\033[0;32m'
|
||||
YELLOW='\033[1;33m'
|
||||
BLUE='\033[0;34m'
|
||||
NC='\033[0m'
|
||||
|
||||
print_status() {
|
||||
local color=$1
|
||||
local message=$2
|
||||
echo -e "${color}${message}${NC}"
|
||||
}
|
||||
|
||||
# Check if Redis is available
|
||||
REDIS_AVAILABLE=false
|
||||
if command -v redis-server &> /dev/null && command -v redis-cli &> /dev/null; then
|
||||
if redis-cli ping &> /dev/null; then
|
||||
REDIS_AVAILABLE=true
|
||||
print_status $GREEN "✅ Redis is available and running"
|
||||
else
|
||||
print_status $YELLOW "⚠️ Redis CLI available but server not responding"
|
||||
fi
|
||||
else
|
||||
print_status $YELLOW "⚠️ Redis not available - will skip Redis-dependent tests"
|
||||
fi
|
||||
|
||||
# Set Redis URL for tests if available
|
||||
if [ "$REDIS_AVAILABLE" = true ]; then
|
||||
export REDIS_URL="redis://localhost:6379"
|
||||
print_status $BLUE "📡 Using Redis URL: $REDIS_URL"
|
||||
else
|
||||
export REDIS_URL=""
|
||||
print_status $YELLOW "📡 Redis URL not set - distributed cache tests will be skipped"
|
||||
fi
|
||||
|
||||
# Test categories - focused on our business logic
|
||||
TESTS=(
|
||||
"distributed_session_test.go"
|
||||
)
|
||||
|
||||
# Run tests with different configurations
|
||||
print_status $BLUE "🏃 Running cache tests..."
|
||||
|
||||
# Create test results directory
|
||||
mkdir -p test-results
|
||||
|
||||
# Run core business logic tests first (these work without Redis too - will just skip)
|
||||
print_status $YELLOW "🔄 Running distributed session store tests (core K8s scaling feature)..."
|
||||
go test -v -timeout 60s ./store/test/ -run TestDistributedSessionStore > test-results/distributed-session.log 2>&1
|
||||
if [ $? -eq 0 ]; then
|
||||
print_status $GREEN "✅ Distributed session store tests passed"
|
||||
else
|
||||
# Check if it was just skipped due to no Redis
|
||||
if grep -q "SKIP.*REDIS_URL not set" test-results/distributed-session.log; then
|
||||
print_status $YELLOW "⏭️ Distributed session tests skipped (no Redis) - this is expected"
|
||||
else
|
||||
print_status $RED "❌ Distributed session store tests failed"
|
||||
cat test-results/distributed-session.log
|
||||
exit 1
|
||||
fi
|
||||
fi
|
||||
|
||||
# Run the REAL test with Redis if available
|
||||
if [ "$REDIS_AVAILABLE" = true ]; then
|
||||
print_status $YELLOW "🔄 Running distributed session store tests with Redis (the real test!)..."
|
||||
REDIS_URL="$REDIS_URL" go test -v -timeout 120s ./store/test/ -run TestDistributedSessionStore > test-results/distributed-session-redis.log 2>&1
|
||||
if [ $? -eq 0 ]; then
|
||||
print_status $GREEN "✅ 🎯 CORE FEATURE WORKING: Multi-pod session sharing tested successfully!"
|
||||
print_status $BLUE "📊 This proves the SSO redirect issue is fixed!"
|
||||
else
|
||||
print_status $RED "❌ Distributed session store tests failed with Redis"
|
||||
cat test-results/distributed-session-redis.log
|
||||
exit 1
|
||||
fi
|
||||
else
|
||||
print_status $YELLOW "⏭️ Skipping Redis-dependent tests (Redis not available)"
|
||||
print_status $BLUE "💡 To test the core K8s scaling feature, start Redis and run:"
|
||||
print_status $BLUE " redis-server &"
|
||||
print_status $BLUE " REDIS_URL=redis://localhost:6379 ./test_cache_comprehensive.sh"
|
||||
fi
|
||||
|
||||
# Generate summary report
|
||||
print_status $BLUE "📋 Generating test summary..."
|
||||
|
||||
echo "" > test-results/summary.txt
|
||||
echo "Memos Distributed Cache Test Summary" >> test-results/summary.txt
|
||||
echo "====================================" >> test-results/summary.txt
|
||||
echo "Test Date: $(date)" >> test-results/summary.txt
|
||||
echo "Redis Available: $REDIS_AVAILABLE" >> test-results/summary.txt
|
||||
echo "" >> test-results/summary.txt
|
||||
|
||||
echo "Test Results:" >> test-results/summary.txt
|
||||
echo "-------------" >> test-results/summary.txt
|
||||
|
||||
for log_file in test-results/*.log; do
|
||||
if [ -f "$log_file" ]; then
|
||||
test_name=$(basename "$log_file" .log)
|
||||
if grep -q "PASS" "$log_file"; then
|
||||
echo "✅ $test_name: PASSED" >> test-results/summary.txt
|
||||
elif grep -q "FAIL" "$log_file"; then
|
||||
echo "❌ $test_name: FAILED" >> test-results/summary.txt
|
||||
else
|
||||
echo "⚠️ $test_name: UNKNOWN" >> test-results/summary.txt
|
||||
fi
|
||||
fi
|
||||
done
|
||||
|
||||
echo "" >> test-results/summary.txt
|
||||
echo "Detailed logs available in test-results/ directory" >> test-results/summary.txt
|
||||
|
||||
# Display summary
|
||||
cat test-results/summary.txt
|
||||
|
||||
print_status $GREEN "🎉 Cache testing completed!"
|
||||
print_status $BLUE "📁 Test logs saved in test-results/ directory"
|
||||
|
||||
if [ "$REDIS_AVAILABLE" = true ]; then
|
||||
print_status $GREEN "✅ All distributed cache features have been tested"
|
||||
print_status $BLUE "🚀 Your Memos deployment is ready for multi-pod scaling!"
|
||||
else
|
||||
print_status $YELLOW "⚠️ Redis-dependent tests were skipped"
|
||||
print_status $BLUE "💡 To test distributed caching, start Redis and run this script again"
|
||||
fi
|
Loading…
Reference in New Issue