Skip to content
8 changes: 8 additions & 0 deletions .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ jobs:
run: dotnet test --no-restore --verbosity normal
- name: Run BootAndSync mission
run: dotnet run --project src/App/App.fsproj --configuration Release -- mission BootAndSync --image stellar/stellar-core:stable --kubeconfig $KUBECONFIG --namespace default --ingress-class nginx --ingress-internal-domain local --ingress-external-host localhost --uneven-sched
# Exercise the pod-exec HTTP transport (--core-http-via-pod-exec) with no
# resolvable ingress host: the ingress host falls back to <nonce>.local,
# which does not resolve from the runner. If any core HTTP access leaked
# back to the ingress, this run would fail with a name-resolution error, so
# a green run proves all access goes through pod exec. This mirrors an
# out-of-cluster runner (e.g. Namespace/nsc k3s). See issue #399.
- name: Run BootAndSync mission via pod-exec
run: dotnet run --project src/App/App.fsproj --configuration Release -- mission BootAndSync --image stellar/stellar-core:stable --kubeconfig $KUBECONFIG --namespace default --ingress-class nginx --ingress-internal-domain local --core-http-via-pod-exec --uneven-sched
- uses: actions/upload-artifact@v4
with:
name: destination
Expand Down
8 changes: 8 additions & 0 deletions src/App/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type MissionOptions
ingressInternalDomain: string,
ingressExternalHost: string option,
ingressExternalPort: int,
coreHttpViaPodExec: bool,
exportToPrometheus: bool,
probeTimeout: int,
missions: string seq,
Expand Down Expand Up @@ -178,6 +179,12 @@ type MissionOptions
Default = 80)>]
member self.IngressExternalPort = ingressExternalPort

[<Option("core-http-via-pod-exec",
HelpText = "Reach stellar-core's admin HTTP endpoint by exec'ing curl inside each pod (via the Kubernetes API) instead of through the ingress. Use when the ingress hostname is not reachable from where SSC runs, e.g. a runner outside a non-SDF k3s cluster.",
Required = false,
Default = false)>]
member self.CoreHttpViaPodExec = coreHttpViaPodExec

[<Option("export-to-prometheus", HelpText = "Whether to export core metrics to prometheus")>]
member self.ExportToPrometheus : bool = exportToPrometheus

Expand Down Expand Up @@ -779,6 +786,7 @@ let main argv =
ingressInternalDomain = mission.IngressInternalDomain
ingressExternalHost = mission.IngressExternalHost
ingressExternalPort = mission.IngressExternalPort
coreHttpViaPodExec = mission.CoreHttpViaPodExec
exportToPrometheus = mission.ExportToPrometheus
probeTimeout = mission.ProbeTimeout
coreResources = SmallTestResources
Expand Down
1 change: 1 addition & 0 deletions src/FSLibrary.Tests/Tests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ let ctx : MissionContext =
ingressInternalDomain = "local"
ingressExternalHost = None
ingressExternalPort = 80
coreHttpViaPodExec = false
exportToPrometheus = false
probeTimeout = 10
coreResources = SmallTestResources
Expand Down
196 changes: 111 additions & 85 deletions src/FSLibrary/StellarCoreHTTP.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@
module StellarCoreHTTP

open FSharp.Data
open k8s
open k8s.Models
open Microsoft.Rest.Serialization
open StellarCoreSet
open PollRetry
open Logging
open StellarMissionContext
open StellarNetworkCfg
open StellarCorePeer
open System.IO
open System.Threading
open StellarDotnetSdk.Transactions
open StellarDotnetSdk.Responses.Results
Expand Down Expand Up @@ -388,9 +392,92 @@ type Peer with
self.PodName.StringName
path

member self.fetch(path: string) : string =
let url = self.URL path
Http.RequestString(url, headers = self.Headers)
// Reach stellar-core's admin HTTP endpoint by exec'ing `curl` inside the
// pod (via the Kubernetes API) rather than over the ingress. This is used
// when --core-http-via-pod-exec is set, for environments where the ingress
// hostname is not reachable from where SSC runs (e.g. a runner outside a
// non-SDF k3s cluster). Failures are surfaced as WebException so the
// existing WebExceptionRetry wrappers retry transient errors (e.g. core
// still booting) just as they do for ingress fetches.
// See https://github.com/stellar/supercluster/issues/399.
member self.fetchViaPodExec (path: string) (query: (string * string) list) : string =
let kube = self.networkCfg.missionContext.kube
let ns = self.networkCfg.NamespaceProperty
let name = self.PodName

let encode (s: string) : string = System.Uri.EscapeDataString s

let queryString =
match query with
| [] -> ""
| _ ->
query
|> List.map (fun (k, v) -> sprintf "%s=%s" (encode k) (encode v))
|> String.concat "&"
|> sprintf "?%s"

let url =
sprintf "http://localhost:%d/%s%s" StellarCoreCfg.CfgVal.httpPort path queryString

try
let muxedStream =
kube
.MuxedStreamNamespacedPodExecAsync(name = name.StringName,
``namespace`` = ns,
command = [| "curl"; "-sf"; url |],
container = "stellar-core-run",
tty = false,
cancellationToken = CancellationToken())
.GetAwaiter()
.GetResult()

let stdOut =
muxedStream.GetStream(
System.Nullable<ChannelIndex>(ChannelIndex.StdOut),
System.Nullable<ChannelIndex>()
)

let error =
muxedStream.GetStream(
System.Nullable<ChannelIndex>(ChannelIndex.Error),
System.Nullable<ChannelIndex>()
)

let outReader = new StreamReader(stdOut)
let errReader = new StreamReader(error)
muxedStream.Start()
let outStr = outReader.ReadToEndAsync().GetAwaiter().GetResult()
let errStr = errReader.ReadToEndAsync().GetAwaiter().GetResult()
let returnMessage = SafeJsonConvert.DeserializeObject<V1Status>(errStr)
Kubernetes.GetExitCodeOrThrow(returnMessage) |> ignore
outStr
with
| :? System.Net.WebException -> reraise ()
| e -> raise (System.Net.WebException(sprintf "pod-exec fetch of /%s failed: %s" path e.Message))

// Single point through which all stellar-core admin HTTP GETs flow, so the
// transport (ingress vs. in-pod curl) is chosen in one place.
member self.httpGet (path: string) (query: (string * string) list) : string =
if self.networkCfg.missionContext.coreHttpViaPodExec then
self.fetchViaPodExec path query
else
Http.RequestString(url = self.URL path, httpMethod = "GET", headers = self.Headers, query = query)

member self.fetch(path: string) : string = self.httpGet path []

// For endpoints that must return a (JSON) body. While core is still booting,
// its admin HTTP server can answer with an empty body (and curl exits 0);
// surface that as a WebException so the JSON-parsing callers' retry loops
// wait for core rather than feeding an empty string to a parser. Endpoints
// that legitimately return an empty body on success (e.g. upgrades) keep
// using fetch/httpGet directly.
member self.fetchNonEmpty(path: string) : string =
let resp = self.fetch path

if System.String.IsNullOrEmpty resp then
raise (System.Net.WebException(sprintf "core returned an empty response for /%s, not ready yet" path))

resp

member self.GetState() = self.GetInfo().State

Expand All @@ -399,15 +486,15 @@ type Peer with
if i.Status.Length = 0 then i.State else i.Status.[0]

member self.GetMetrics() : Metrics.Metrics =
WebExceptionRetry DefaultRetry (fun _ -> Metrics.Parse(self.fetch "metrics").Metrics)
WebExceptionRetry DefaultRetry (fun _ -> Metrics.Parse(self.fetchNonEmpty "metrics").Metrics)

member self.GetRawMetrics() = WebExceptionRetry DefaultRetry (fun _ -> self.fetch "metrics")
member self.GetRawMetrics() = WebExceptionRetry DefaultRetry (fun _ -> self.fetchNonEmpty "metrics")

member self.GetInfo() : Info.Info =
WebExceptionRetry
DefaultRetry
(fun _ ->
let resp = self.fetch "info"
let resp = self.fetchNonEmpty "info"
let parsed = Info.Parse(resp)

// stellar-core can respond with {"error":"Core is booting, try again later"}
Expand All @@ -416,7 +503,7 @@ type Peer with
| None -> raise (System.Net.WebException("Core is not ready, info property missing")))

member self.GetSorobanInfo() : SorobanInfo.Root =
WebExceptionRetry DefaultRetry (fun _ -> SorobanInfo.Parse(self.fetch "sorobaninfo"))
WebExceptionRetry DefaultRetry (fun _ -> SorobanInfo.Parse(self.fetchNonEmpty "sorobaninfo"))

member self.GetLedgerNum() : int = self.GetInfo().Ledger.Num

Expand Down Expand Up @@ -486,15 +573,7 @@ type Peer with

member self.SetUpgrades(upgrades: UpgradeParameters) =
let res =
WebExceptionRetry
DefaultRetry
(fun _ ->
Http.RequestString(
httpMethod = "GET",
url = self.URL "upgrades",
headers = self.Headers,
query = upgrades.ToQuery
))
WebExceptionRetry DefaultRetry (fun _ -> self.httpGet "upgrades" upgrades.ToQuery)

if res.ToLower().Contains("exception") then
raise (PeerRejectedUpgradesException res)
Expand Down Expand Up @@ -684,68 +763,37 @@ type Peer with
raise (ProtocolVersionNotUpgradedException(currentProtocolVersion, lastestProtocolVersion))

member self.ClearMetrics() =
WebExceptionRetry
DefaultRetry
(fun _ -> Http.RequestString(httpMethod = "GET", headers = self.Headers, url = self.URL "clearmetrics"))
WebExceptionRetry DefaultRetry (fun _ -> self.httpGet "clearmetrics" [])
|> ignore

member self.ToggleOverlayOnlyMode() =
WebExceptionRetry
DefaultRetry
(fun _ ->
Http.RequestString(httpMethod = "GET", headers = self.Headers, url = self.URL "toggleoverlayonlymode"))
WebExceptionRetry DefaultRetry (fun _ -> self.httpGet "toggleoverlayonlymode" [])

member self.GetTestAcc(accName: string) : TestAcc.Root =
// NB: work around buggy JSON parser upstream, see
// https://github.com/fsharp/FSharp.Data/pull/1262
let s =
WebExceptionRetry
DefaultRetry
(fun _ ->
Http.RequestString(
httpMethod = "GET",
url = self.URL("testacc"),
headers = self.Headers,
query = [ ("name", accName) ]
))
WebExceptionRetry DefaultRetry (fun _ -> self.httpGet "testacc" [ ("name", accName) ])

TestAcc.Parse(if s.Trim().StartsWith("null") then "{}" else s)

member self.StartSurveyCollecting(nonce: int) =
WebExceptionRetry
DefaultRetry
(fun _ ->
Http.RequestString(
httpMethod = "GET",
url = self.URL("startsurveycollecting"),
headers = self.Headers,
query = [ ("nonce", nonce.ToString()) ]
))
WebExceptionRetry DefaultRetry (fun _ -> self.httpGet "startsurveycollecting" [ ("nonce", nonce.ToString()) ])

member self.StopSurveyCollecting() =
WebExceptionRetry
DefaultRetry
(fun _ ->
Http.RequestString(httpMethod = "GET", url = self.URL("stopsurveycollecting"), headers = self.Headers))
WebExceptionRetry DefaultRetry (fun _ -> self.httpGet "stopsurveycollecting" [])

member self.SurveyTopologyTimeSliced (node: string) (inboundPeersIndex: int) (outboundPeersIndex: int) =
WebExceptionRetry
DefaultRetry
(fun _ ->
Http.RequestString(
httpMethod = "GET",
url = self.URL("surveytopologytimesliced"),
headers = self.Headers,
query =
[ ("node", node)
("inboundpeerindex", inboundPeersIndex.ToString())
("outboundpeerindex", outboundPeersIndex.ToString()) ]
))

member self.GetSurveyResult() =
WebExceptionRetry
DefaultRetry
(fun _ -> Http.RequestString(httpMethod = "GET", url = self.URL("getsurveyresult"), headers = self.Headers))
self.httpGet
"surveytopologytimesliced"
[ ("node", node)
("inboundpeerindex", inboundPeersIndex.ToString())
("outboundpeerindex", outboundPeersIndex.ToString()) ])

member self.GetSurveyResult() = WebExceptionRetry DefaultRetry (fun _ -> self.httpGet "getsurveyresult" [])

member self.GetTestAccBalance(accName: string) : int64 =
RetryUntilSome
Expand All @@ -758,22 +806,12 @@ type Peer with
(fun _ -> LogWarn "Waiting for account %s to exist, to read seqnum" accName)

member self.GenerateLoad(loadGen: LoadGen) : string =
WebExceptionRetry
DefaultRetry
(fun _ ->
Http.RequestString(
httpMethod = "GET",
headers = self.Headers,
url = self.URL "generateload",
query = loadGen.ToQuery
))
WebExceptionRetry DefaultRetry (fun _ -> self.httpGet "generateload" loadGen.ToQuery)

member self.StopLoadGen() : string = self.GenerateLoad { LoadGen.GetDefault() with mode = StopRun }

member self.ManualClose() =
WebExceptionRetry
DefaultRetry
(fun _ -> Http.RequestString(httpMethod = "GET", headers = self.Headers, url = self.URL "manualclose"))
WebExceptionRetry DefaultRetry (fun _ -> self.httpGet "manualclose" [])
|> ignore

member self.SubmitSignedTransaction(tx: Transaction) : Tx.Root =
Expand All @@ -783,20 +821,8 @@ type Peer with
WebExceptionRetry
DefaultRetry
(fun _ ->
LogDebug
"Submitting transaction: %s"
((self.URL "tx") + "?blob=" + (System.Uri.EscapeDataString b64))

let response =
Http.RequestStream(
(self.URL "tx"),
httpMethod = "GET",
query = [ "blob", b64 ],
headers = [ "Host", self.networkCfg.IngressInternalHostName ]
)

use reader = new System.IO.StreamReader(response.ResponseStream)
reader.ReadToEnd())
LogDebug "Submitting transaction with blob: %s" b64
self.httpGet "tx" [ ("blob", b64) ])

LogDebug "Transaction response: %s" s
let res = Tx.Parse(s)
Expand Down
10 changes: 7 additions & 3 deletions src/FSLibrary/StellarDataDump.fs
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,14 @@ type StellarFormation with
Kubernetes.GetExitCodeOrThrow(returnMessage) |> ignore
with x -> ()

// Best-effort: a failed metrics scrape must not fail an otherwise-successful
// mission (the impact reported in #399). The transport used to reach core's
// admin HTTP endpoint -- ingress or in-pod `curl` -- is selected by
// GetRawMetrics based on the --core-http-via-pod-exec flag.
member self.DumpPeerMetrics(p: Peer) =
let destination = self.NetworkCfg.missionContext.destination
let name = p.PodName
destination.WriteString(sprintf "%s.metrics.json" name.StringName) (p.GetRawMetrics())
try
self.Destination.WriteString(sprintf "%s.metrics.json" p.PodName.StringName) (p.GetRawMetrics())
with x -> LogWarn "Failed to dump metrics of peer %s: %s" p.PodName.StringName x.Message

member self.DumpPeerData(p: Peer) =
self.DumpPeerLogs p
Expand Down
1 change: 1 addition & 0 deletions src/FSLibrary/StellarMissionContext.fs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type MissionContext =
ingressInternalDomain: string
ingressExternalHost: string option
ingressExternalPort: int
coreHttpViaPodExec: bool
exportToPrometheus: bool
probeTimeout: int
coreResources: CoreResources
Expand Down
Loading