Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ src/main/resources/static/assets/
### Local scratchpad ###
tmp/
bin/
dogfood-output/

### Build artifacts (root level) ###
BOOT-INF/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class RetrievalAugmentationConfig {
private static final int OVERLAP_DEF = 150;
private static final int CITE_DEF = 3;
private static final double MMR_LAMBDA_DEF = 0.5d;
private static final Duration RERANK_TIMEOUT_DEF = Duration.ofSeconds(12);
private static final Duration RERANK_TIMEOUT_DEF = Duration.ofSeconds(30);
private static final int MIN_POSITIVE = 1;
private static final int MIN_NON_NEG = 0;
private static final double MMR_MIN = 0.0d;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private static List<float[]> embedSingleBatch(

List<float[]> batchEmbeddings;
try {
batchEmbeddings = embeddingClient.embed(textBatch);
batchEmbeddings = embeddingClient.embed(textBatch, LlmGatewayTier.BATCH);
} catch (EmbeddingServiceUnavailableException embeddingFailure) {
String firstBatchUrl = extractDocumentUrl(documentBatch.getFirst(), batchStartIndex);
String lastBatchUrl = extractDocumentUrl(documentBatch.getLast(), batchEndIndex - 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,26 @@ public interface EmbeddingClient {
* Produces one dense embedding vector per input text, preserving input order.
*
* @param texts input texts
* @param requestTier gateway capacity tier for this embedding request
* @return embedding vectors in the same order as {@code texts}
*/
List<float[]> embed(List<String> texts);
List<float[]> embed(List<String> texts, LlmGatewayTier requestTier);

/**
* Produces a dense embedding vector for a single text.
*
* @param text input text
* @param requestTier gateway capacity tier for this embedding request
* @return embedding vector
*/
default float[] embed(String text) {
default float[] embed(String text, LlmGatewayTier requestTier) {
Objects.requireNonNull(requestTier, "requestTier");
String safeText = Objects.requireNonNullElse(text, "");
List<float[]> vectors = embed(List.of(safeText));
if (vectors.isEmpty()) {
List<float[]> embeddingVectors = embed(List.of(safeText), requestTier);
if (embeddingVectors.isEmpty()) {
throw new EmbeddingServiceUnavailableException("Embedding response was empty");
}
return vectors.get(0);
return embeddingVectors.get(0);
}

/**
Expand All @@ -44,7 +47,7 @@ default float[] embed(String text) {
* Issues a minimal embedding request so the provider keeps its model resident.
*
* <p>Implementations must call their provider-specific request path directly instead
* of delegating to {@link #embed(List)}. The RAG pipeline logging aspect advises
* of delegating to {@link #embed(List, LlmGatewayTier)}. The RAG pipeline logging aspect advises
* public {@code embed} executions, so routing scheduled probes around that method
* keeps "STEP 1" pipeline logs scoped to real requests.</p>
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public SearchOutcome searchOutcome(String query, int topK, RetrievalConstraint r
return new SearchOutcome(List.of(), List.of());
}

float[] denseVector = queryEncoding.embeddingClient().embed(query);
float[] denseVector = queryEncoding.embeddingClient().embed(query, LlmGatewayTier.LIVE);
LexicalSparseVectorEncoder.SparseVector sparseVector =
queryEncoding.sparseVectorEncoder().encode(query);
Optional<Filter> retrievalFilter = queryEncoding.constraintBuilder().buildFilter(retrievalConstraint);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.williamcallahan.javachat.service;

/**
* Defines the request tiers understood by the LLM gateway.
*
* <p>Live user-facing work uses {@link #LIVE}. Background ingestion and scheduled
* embedding probes use {@link #BATCH} so live requests keep reserved capacity.</p>
*/
public enum LlmGatewayTier {
/** User-facing request tier with production reserved capacity. */
LIVE("production-z"),

/** Background request tier for ingestion, backfills, and scheduled probes. */
BATCH("batch");

/** HTTP header used by the gateway to classify request capacity. */
public static final String REQUEST_TIER_HEADER = "X-Tier";

private final String requestHeader;

LlmGatewayTier(String requestHeader) {
this.requestHeader = requestHeader;
}

/**
* Returns the gateway header payload for this request tier.
*
* @return header payload sent in {@link #REQUEST_TIER_HEADER}
*/
public String requestHeader() {
return requestHeader;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ public LocalEmbeddingClient(
}

@Override
public List<float[]> embed(List<String> texts) {
public List<float[]> embed(List<String> texts, LlmGatewayTier requestTier) {
Objects.requireNonNull(requestTier, "requestTier");
if (texts == null || texts.isEmpty()) {
return List.of();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ public class OpenAIStreamingService {
private static final Logger log = LoggerFactory.getLogger(OpenAIStreamingService.class);

private static final int COMPLETE_REQUEST_TIMEOUT_SECONDS = 30;
private static final String LLM_GATEWAY_TIER_LIVE = "production-z";
private static final String STREAM_STATUS_CODE_PROVIDER_FALLBACK =
SseConstants.STATUS_CODE_STREAM_PROVIDER_FALLBACK;
private static final String STREAM_STAGE_STREAM = SseConstants.STATUS_STAGE_STREAM;
Expand Down Expand Up @@ -180,6 +179,25 @@ public Mono<StreamingResult> streamResponse(StructuredPrompt structuredPrompt, d
* @return completion text from the first successful provider attempt
*/
public Mono<String> complete(String prompt, double temperature) {
return complete(prompt, temperature, null);
}

/**
* Sends a non-streaming completion request with an explicit output budget.
*
* @param prompt completion prompt
* @param temperature response temperature
* @param maximumOutputTokens maximum output tokens needed by this caller
* @return completion text from the first successful provider attempt
*/
public Mono<String> complete(String prompt, double temperature, int maximumOutputTokens) {
if (maximumOutputTokens <= 0) {
return Mono.error(new IllegalArgumentException("maximumOutputTokens must be positive"));
}
return complete(prompt, temperature, Integer.valueOf(maximumOutputTokens));
}

private Mono<String> complete(String prompt, double temperature, Integer maximumOutputTokens) {
return Mono.<String>defer(() -> {
List<OpenAiProviderCandidate> availableProviders =
providerRoutingService.selectAvailableProviderCandidates(clientPrimary, clientSecondary);
Expand All @@ -196,7 +214,7 @@ public Mono<String> complete(String prompt, double temperature) {
RateLimitService.ApiProvider activeProvider = providerCandidate.provider();

ResponseCreateParams requestParameters =
requestFactory.buildCompletionRequest(prompt, temperature, activeProvider);
buildCompletionRequest(prompt, temperature, activeProvider, maximumOutputTokens);
try {
log.info("[LLM] Complete started (providerId={})", activeProvider.ordinal());
RequestOptions requestOptions = RequestOptions.builder()
Expand Down Expand Up @@ -237,6 +255,17 @@ public Mono<String> complete(String prompt, double temperature) {
.subscribeOn(Schedulers.boundedElastic());
}

private ResponseCreateParams buildCompletionRequest(
String prompt,
double temperature,
RateLimitService.ApiProvider activeProvider,
Integer maximumOutputTokens) {
if (maximumOutputTokens == null) {
return requestFactory.buildCompletionRequest(prompt, temperature, activeProvider);
}
return requestFactory.buildCompletionRequest(prompt, temperature, activeProvider, maximumOutputTokens);
}

/**
* Returns whether a streaming failure is likely recoverable with a retry.
*
Expand Down Expand Up @@ -338,15 +367,17 @@ private OpenAIClient createClient(String apiKey, String baseUrl) {
return OpenAIOkHttpClient.builder()
.apiKey(apiKey)
.baseUrl(OpenAiSdkUrlNormalizer.normalize(baseUrl))
.putHeader("X-Tier", resolvedLlmGatewayTier())
.putHeader(LlmGatewayTier.REQUEST_TIER_HEADER, resolvedLlmGatewayTier())
// Disable SDK-level retries: Reactor timeout and onErrorResume handle failures.
// Retries cause InterruptedException when Reactor cancels a sleeping retry.
.maxRetries(0)
.build();
}

private String resolvedLlmGatewayTier() {
return llmGatewayTier == null || llmGatewayTier.isBlank() ? LLM_GATEWAY_TIER_LIVE : llmGatewayTier.trim();
return llmGatewayTier == null || llmGatewayTier.isBlank()
? LlmGatewayTier.LIVE.requestHeader()
: llmGatewayTier.trim();
}

private void closeClientSafely(OpenAIClient client, String clientName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ public class OpenAiCompatibleEmbeddingClient implements EmbeddingClient, AutoClo
private static final int HTTP_TOO_MANY_REQUESTS = 429;
private static final int HTTP_INTERNAL_SERVER_ERROR = 500;

private final OpenAIClient client;
private final OpenAIClient liveEmbeddingClient;
private final OpenAIClient batchEmbeddingClient;
private final String modelName;
private final int dimensionsHint;
private final boolean closeBatchEmbeddingClient;

/**
* Creates an OpenAI-compatible embedding client backed by a remote REST API endpoint.
Expand All @@ -56,61 +58,76 @@ public class OpenAiCompatibleEmbeddingClient implements EmbeddingClient, AutoClo
public static OpenAiCompatibleEmbeddingClient create(
String baseUrl, String apiKey, String modelName, int dimensionsHint) {
validateDimensions(dimensionsHint);
OpenAIClient client = OpenAIOkHttpClient.builder()
.apiKey(requireConfiguredApiKey(apiKey))
.baseUrl(normalizeSdkBaseUrl(baseUrl))
// Embedding traffic is ingestion/backfill-dominated, so it is classed
// as the LLM gateway's "batch" tier. The current sf7-direct endpoint
// ignores the header; it becomes load-bearing if this client is ever
// pointed at the gateway queue (api.llm-gateway.iocloudhost.net).
.putHeader("X-Tier", "batch")
.build();
return new OpenAiCompatibleEmbeddingClient(client, requireConfiguredModel(modelName), dimensionsHint);
String configuredApiKey = requireConfiguredApiKey(apiKey);
String normalizedBaseUrl = normalizeSdkBaseUrl(baseUrl);
OpenAIClient liveEmbeddingClient = createTieredClient(configuredApiKey, normalizedBaseUrl, LlmGatewayTier.LIVE);
OpenAIClient batchEmbeddingClient =
createTieredClient(configuredApiKey, normalizedBaseUrl, LlmGatewayTier.BATCH);
return new OpenAiCompatibleEmbeddingClient(
liveEmbeddingClient, batchEmbeddingClient, requireConfiguredModel(modelName), dimensionsHint);
}

static OpenAiCompatibleEmbeddingClient create(OpenAIClient client, String modelName, int dimensionsHint) {
validateDimensions(dimensionsHint);
OpenAIClient embeddingClient = Objects.requireNonNull(client, "client");
return new OpenAiCompatibleEmbeddingClient(
Objects.requireNonNull(client, "client"), requireConfiguredModel(modelName), dimensionsHint);
embeddingClient, embeddingClient, requireConfiguredModel(modelName), dimensionsHint, false);
}

OpenAiCompatibleEmbeddingClient(OpenAIClient client, String modelName, int dimensionsHint) {
this.client = client;
OpenAiCompatibleEmbeddingClient(
OpenAIClient liveEmbeddingClient, OpenAIClient batchEmbeddingClient, String modelName, int dimensionsHint) {
this(liveEmbeddingClient, batchEmbeddingClient, modelName, dimensionsHint, true);
}

private OpenAiCompatibleEmbeddingClient(
OpenAIClient liveEmbeddingClient,
OpenAIClient batchEmbeddingClient,
String modelName,
int dimensionsHint,
boolean closeBatchEmbeddingClient) {
this.liveEmbeddingClient = Objects.requireNonNull(liveEmbeddingClient, "liveEmbeddingClient");
this.batchEmbeddingClient = Objects.requireNonNull(batchEmbeddingClient, "batchEmbeddingClient");
this.modelName = modelName;
this.dimensionsHint = dimensionsHint;
this.closeBatchEmbeddingClient = closeBatchEmbeddingClient;
}

@Override
public List<float[]> embed(List<String> texts) {
public List<float[]> embed(List<String> texts, LlmGatewayTier requestTier) {
Objects.requireNonNull(requestTier, "requestTier");
if (texts == null || texts.isEmpty()) {
return List.of();
}
return createEmbeddings(texts);
return createEmbeddings(texts, requestTier);
}

@Override
public void warmUp() {
createEmbeddings(List.of(EMBEDDING_WARM_UP_PROBE_TEXT));
createEmbeddings(List.of(EMBEDDING_WARM_UP_PROBE_TEXT), LlmGatewayTier.BATCH);
}

private List<float[]> createEmbeddings(List<String> texts) {
private List<float[]> createEmbeddings(List<String> texts, LlmGatewayTier requestTier) {
EmbeddingCreateParams.Builder embeddingRequestBuilder =
EmbeddingCreateParams.builder().model(modelName).inputOfArrayOfStrings(texts);
if (supportsDimensionOverride(modelName)) {
embeddingRequestBuilder.dimensions((long) dimensionsHint);
}
EmbeddingCreateParams params = embeddingRequestBuilder.build();
EmbeddingCreateParams embeddingRequest = embeddingRequestBuilder.build();
RequestOptions requestOptions =
RequestOptions.builder().timeout(embeddingTimeout()).build();
return executeWithRetry(params, requestOptions, texts.size());
return executeWithRetry(clientFor(requestTier), embeddingRequest, requestOptions, texts.size());
}

private List<float[]> executeWithRetry(
EmbeddingCreateParams params, RequestOptions requestOptions, int expectedCount) {
OpenAIClient requestClient,
EmbeddingCreateParams embeddingRequest,
RequestOptions requestOptions,
int expectedCount) {
long retryBackoffMillis = INITIAL_RETRY_BACKOFF_MILLIS;
for (int attemptNumber = 1; attemptNumber <= MAX_EMBED_ATTEMPTS; attemptNumber++) {
try {
CreateEmbeddingResponse embeddingResponse = client.embeddings().create(params, requestOptions);
CreateEmbeddingResponse embeddingResponse =
requestClient.embeddings().create(embeddingRequest, requestOptions);
return parseResponse(embeddingResponse, expectedCount);
} catch (OpenAIServiceException exception) {
retryBackoffMillis = handleServiceError(exception, attemptNumber, retryBackoffMillis);
Expand Down Expand Up @@ -372,7 +389,10 @@ private static String sanitizeMessage(String message) {
*/
@Override
public void close() {
client.close();
liveEmbeddingClient.close();
if (closeBatchEmbeddingClient) {
batchEmbeddingClient.close();
}
}

private static String requireConfiguredApiKey(String apiKey) {
Expand All @@ -393,6 +413,21 @@ private static String normalizeSdkBaseUrl(String baseUrl) {
return OpenAiSdkUrlNormalizer.normalize(baseUrl);
}

private static OpenAIClient createTieredClient(String apiKey, String baseUrl, LlmGatewayTier requestTier) {
return OpenAIOkHttpClient.builder()
.apiKey(apiKey)
.baseUrl(baseUrl)
.putHeader(LlmGatewayTier.REQUEST_TIER_HEADER, requestTier.requestHeader())
.build();
}

private OpenAIClient clientFor(LlmGatewayTier requestTier) {
return switch (requestTier) {
case LIVE -> liveEmbeddingClient;
case BATCH -> batchEmbeddingClient;
};
}

private static void validateDimensions(int dimensionsHint) {
if (dimensionsHint <= 0) {
throw new IllegalArgumentException("Embedding dimensions must be positive");
Expand Down
Loading