fix(sse): stream initial response and refresh tokens

pull/5947/head
boojack 2 weeks ago
parent 88ac3ec31e
commit 21303e879d

@ -68,6 +68,15 @@ func handleSSE(c *echo.Context, hub *SSEHub, authenticator *auth.Authenticator)
slog.Debug("SSE client connected", "userID", userID)
// Send an initial comment so clients and dev proxies observe the stream
// immediately instead of waiting for the first heartbeat or data event.
if _, err := fmt.Fprint(w, ": connected\n\n"); err != nil {
return nil
}
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
for {
select {
case <-ctx.Done():

@ -1,6 +1,7 @@
package test
import (
"bufio"
"context"
"io"
"net/http"
@ -78,6 +79,27 @@ func TestSSEHandler_Authentication(t *testing.T) {
require.Equal(t, http.StatusUnauthorized, rec.Code)
})
t.Run("valid token streams initial comment", func(t *testing.T) {
server := httptest.NewServer(e)
defer server.Close()
reqCtx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, server.URL+"/api/v1/sse", nil)
require.NoError(t, err)
req.Header.Set("Authorization", "Bearer "+token)
resp, err := server.Client().Do(req)
require.NoError(t, err)
defer resp.Body.Close()
require.Equal(t, http.StatusOK, resp.StatusCode)
require.Equal(t, "text/event-stream", resp.Header.Get("Content-Type"))
line, err := bufio.NewReader(resp.Body).ReadString('\n')
require.NoError(t, err)
require.Equal(t, ": connected\n", line)
})
t.Run("hub close disconnects stream", func(t *testing.T) {
server := httptest.NewServer(e)
defer server.Close()

@ -121,7 +121,7 @@ async function refreshAndGetAccessToken(): Promise<string> {
return token;
}
async function getRequestToken(): Promise<string | null> {
export async function getRequestToken(): Promise<string | null> {
let token = getAccessToken();
if (!token) {
if (!hasStoredToken()) return null;

@ -1,6 +1,6 @@
import { useQueryClient } from "@tanstack/react-query";
import { useCallback, useEffect, useRef, useSyncExternalStore } from "react";
import { getAccessToken } from "@/auth-state";
import { getRequestToken, refreshAccessToken } from "@/connect";
import { useAuth } from "@/contexts/AuthContext";
import { memoKeys } from "@/hooks/useMemoQueries";
import { userKeys } from "@/hooks/useUserQueries";
@ -89,7 +89,7 @@ export function useLiveMemoRefresh() {
return;
}
const token = getAccessToken();
let token = await getRequestToken();
if (!token) {
setSSEStatus("disconnected");
// Not logged in; do not retry. Effect will re-run when currentUser is set.
@ -101,13 +101,16 @@ export function useLiveMemoRefresh() {
abortControllerRef.current = abortController;
try {
const response = await fetch("/api/v1/sse", {
headers: {
Authorization: `Bearer ${token}`,
},
signal: abortController.signal,
credentials: "include",
});
let response = await fetchSSEStream(token, abortController.signal);
if (response.status === 401) {
await refreshAccessToken();
token = await getRequestToken();
if (!token) {
throw new Error("SSE connection failed: missing token after refresh");
}
response = await fetchSSEStream(token, abortController.signal);
}
if (!response.ok || !response.body) {
throw new Error(`SSE connection failed: ${response.status}`);
@ -196,6 +199,17 @@ export function useLiveMemoRefresh() {
// Event handling
// ---------------------------------------------------------------------------
function fetchSSEStream(token: string, signal: AbortSignal): Promise<Response> {
return fetch("/api/v1/sse", {
headers: {
Accept: "text/event-stream",
Authorization: `Bearer ${token}`,
},
signal,
credentials: "include",
});
}
interface SSEChangeEvent {
type: (typeof SSE_EVENT_TYPES)[keyof typeof SSE_EVENT_TYPES];
name: string;

Loading…
Cancel
Save