Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 24 additions & 14 deletions io/gocloud/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ import (
"context"
"errors"
"fmt"
"net"
"net/http"
"net/url"
"os"
"slices"
"strconv"
"time"

"github.com/apache/iceberg-go/io"
"github.com/apache/iceberg-go/utils"
Expand All @@ -39,19 +40,8 @@ import (
"gocloud.dev/blob/s3blob"
)

var unsupportedS3Props = []string{
io.S3ConnectTimeout,
}

// ParseAWSConfig parses S3 properties and returns a configuration.
func ParseAWSConfig(ctx context.Context, props map[string]string) (*aws.Config, error) {
// If any unsupported properties are set, return an error.
for k := range props {
if slices.Contains(unsupportedS3Props, k) {
return nil, fmt.Errorf("unsupported S3 property %q", k)
}
}

// Remote S3 request signing is not implemented yet.
if v, ok := props[io.S3RemoteSigningEnabled]; ok {
if enabled, err := strconv.ParseBool(v); err == nil && enabled {
Expand Down Expand Up @@ -79,17 +69,37 @@ func ParseAWSConfig(ctx context.Context, props map[string]string) (*aws.Config,
props[io.S3AccessKeyID], props[io.S3SecretAccessKey], props[io.S3SessionToken])))
}

var httpClient *awshttp.BuildableClient

if proxy, ok := props[io.S3ProxyURI]; ok {
proxyURL, err := url.Parse(proxy)
if err != nil {
return nil, fmt.Errorf("invalid s3 proxy url '%s'", proxy)
}

opts = append(opts, config.WithHTTPClient(awshttp.NewBuildableClient().WithTransportOptions(
httpClient = awshttp.NewBuildableClient().WithTransportOptions(
func(t *http.Transport) {
t.Proxy = http.ProxyURL(proxyURL)
},
)))
)
}

if connectTimeout, ok := props[io.S3ConnectTimeout]; ok {
timeout, err := time.ParseDuration(connectTimeout)
if err != nil {
return nil, fmt.Errorf("invalid s3 connect timeout %q: %w", connectTimeout, err)
}

if httpClient == nil {
httpClient = awshttp.NewBuildableClient()
}
httpClient = httpClient.WithDialerOptions(func(d *net.Dialer) {
d.Timeout = timeout
})
}

if httpClient != nil {
opts = append(opts, config.WithHTTPClient(httpClient))
}

awscfg := new(aws.Config)
Expand Down
19 changes: 17 additions & 2 deletions io/gocloud/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ package gocloud
import (
"context"
"testing"
"time"

"github.com/apache/iceberg-go/io"
awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -80,11 +82,24 @@ func TestParseAWSConfigRemoteSigningEnabled(t *testing.T) {
})
}

func TestParseAWSConfigUnsupportedProperty(t *testing.T) {
func TestParseAWSConfigConnectTimeout(t *testing.T) {
t.Parallel()

cfg, err := ParseAWSConfig(context.Background(), map[string]string{
io.S3ConnectTimeout: "5s",
})
require.NoError(t, err)

client, ok := cfg.HTTPClient.(*awshttp.BuildableClient)
require.True(t, ok)
assert.Equal(t, 5*time.Second, client.GetDialer().Timeout)
}

func TestParseAWSConfigInvalidConnectTimeout(t *testing.T) {
t.Parallel()

_, err := ParseAWSConfig(context.Background(), map[string]string{
io.S3ConnectTimeout: "5000",
})
require.ErrorContains(t, err, "unsupported S3 property")
require.ErrorContains(t, err, "invalid s3 connect timeout")
}
Loading