Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ Changelog for NeoFS Node
- Store in metabase associated object ID in bytes instead of Base58 (#3971)
- Optimized local RANGE request execution (#3967)
- GET now supports payload ranges (#3991)
- Optimized EC GET request execution (#3996)

### Removed
- `policer.max_workers` configuration (#3920)
Expand Down
2 changes: 1 addition & 1 deletion cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func initObjectService(c *cfg) {
putSvc: sPut,
keys: keyStorage,
}
server := objectService.New(objSvc, mNumber, c.cfgObject.pool.search, fsChain, storage, c.metaService, c.key.PrivateKey, c.metricsCollector, aclChecker, aclSvc, coreConstructor)
server := objectService.New(objSvc, mNumber, c.cfgObject.pool.search, fsChain, storage, c.metaService, c.key.PrivateKey, c.metricsCollector, aclChecker, aclSvc, coreConstructor, c.log)
os.server = server

svcDesc := protoobject.ObjectService_ServiceDesc
Expand Down
41 changes: 37 additions & 4 deletions internal/object/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,23 @@ func GetParentNonPayloadFieldBounds(buf []byte) (iprotobuf.FieldBounds, iprotobu
return idf, sigf, hdrf, nil
}

splitf, err := iprotobuf.GetLENFieldBounds(buf[rootHdrf.ValueFrom:rootHdrf.To], protoobject.FieldHeaderSplit)
return getParentNonPayloadFieldBounds(buf, rootHdrf.ValueFrom, rootHdrf.To)
}

// GetParentNonPayloadFieldBoundsHeader is GetParentNonPayloadFieldBounds
// analogue accepting header.
func GetParentNonPayloadFieldBoundsHeader(buf []byte) (iprotobuf.FieldBounds, iprotobuf.FieldBounds, iprotobuf.FieldBounds, error) {
if len(buf) == 0 {
return iprotobuf.FieldBounds{}, iprotobuf.FieldBounds{}, iprotobuf.FieldBounds{}, errEmptyData
}

return getParentNonPayloadFieldBounds(buf, 0, len(buf))
}

func getParentNonPayloadFieldBounds(buf []byte, hdrFrom, hdrTo int) (iprotobuf.FieldBounds, iprotobuf.FieldBounds, iprotobuf.FieldBounds, error) {
var idf, sigf, hdrf iprotobuf.FieldBounds

splitf, err := iprotobuf.GetLENFieldBounds(buf[hdrFrom:hdrTo], protoobject.FieldHeaderSplit)
if err != nil {
return idf, sigf, hdrf, err
}
Expand All @@ -205,8 +221,9 @@ func GetParentNonPayloadFieldBounds(buf []byte) (iprotobuf.FieldBounds, iprotobu
return idf, sigf, hdrf, nil
}

buf = buf[:rootHdrf.ValueFrom+splitf.To]
off := rootHdrf.ValueFrom + splitf.ValueFrom
buf = buf[:hdrFrom+splitf.To]
off := hdrFrom + splitf.ValueFrom

var prevNum protowire.Number
loop:
for {
Expand Down Expand Up @@ -270,7 +287,7 @@ func GetPayloadLengthAndFieldOffset(buf []byte) (uint64, int, error) {

var pldLen uint64
if !hf.IsMissing() {
pldLen, err = iprotobuf.GetUint64Field(buf[hf.ValueFrom:hf.To], protoobject.FieldHeaderPayloadLength)
pldLen, err = GetPayloadLengthHeader(buf[hf.ValueFrom:hf.To])
if err != nil {
return 0, 0, fmt.Errorf("seek payload length field in header: %w", err)
}
Expand All @@ -289,3 +306,19 @@ func GetPayloadLengthAndFieldOffset(buf []byte) (uint64, int, error) {

return pldLen, off + tagLn, nil
}

// GetPayloadLengthHeader reads payload length header. If field is missing, no
// error is returned.
//
// Message should have ascending field order, otherwise error returns.
func GetPayloadLengthHeader(buf []byte) (uint64, error) {
return iprotobuf.GetUint64Field(buf, protoobject.FieldHeaderPayloadLength)
}

// GetTypeHeader reads object type header. If field is missing, no error is
// returned.
//
// Message should have ascending field order, otherwise error returns.
func GetTypeHeader(buf []byte) (object.Type, error) {
return iprotobuf.GetEnumField[object.Type](buf, protoobject.FieldHeaderObjectType)
}
6 changes: 5 additions & 1 deletion internal/protobuf/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/crypto/hash"
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
"github.com/nspcc-dev/neofs-sdk-go/checksum"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
Expand All @@ -21,7 +22,10 @@ import (

// Fixed message lengths.
const (
ObjectIDLength = 1 + 1 + oid.Size
ObjectIDLength = 1 + 1 + oid.Size
ContainerIDLength = 1 + 1 + cid.Size
ObjectAddressLength = 1 + 1 + ObjectIDLength +
1 + 1 + ContainerIDLength
)

// Message length limits.
Expand Down
26 changes: 26 additions & 0 deletions internal/protobuf/api_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package protobuf_test

import (
"testing"

iprotobuf "github.com/nspcc-dev/neofs-node/internal/protobuf"
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
)

func TestMessageLengths(t *testing.T) {
for _, tc := range []struct {
name string
msg interface{ MarshaledSize() int }
cnst int
}{
{name: "object ID", msg: oidtest.ID().ProtoMessage(), cnst: iprotobuf.ObjectIDLength},
{name: "container ID", msg: cidtest.ID().ProtoMessage(), cnst: iprotobuf.ContainerIDLength},
{name: "object address", msg: oidtest.Address().ProtoMessage(), cnst: iprotobuf.ObjectAddressLength},
} {
t.Run(tc.name, func(t *testing.T) {
require.EqualValues(t, tc.msg.MarshaledSize(), tc.cnst)
})
}
}
20 changes: 20 additions & 0 deletions internal/protobuf/seekers.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,23 @@ func GetUint64Field(buf []byte, num protowire.Number) (uint64, error) {
u, _, err := ParseUint64Field(buf[off+tagLn:], num, typ)
return u, err
}

// GetEnumField seeks enum field in buf by number and parses it. If field is
// missing, no error is returned.
//
// Message should have ascending field order, otherwise error returns.
//
// If there is an error, its text contains num.
func GetEnumField[T ~int32](buf []byte, num protowire.Number) (T, error) {
off, tagLn, typ, err := SeekFieldByNumber(buf, num)
if err != nil {
return 0, err
}

if off < 0 {
return 0, nil
}

u, _, err := ParseEnumField[T](buf[off+tagLn:], num, typ)
return u, err
}
3 changes: 3 additions & 0 deletions internal/protobuf/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ const (
TagBytes4 = 34
TagBytes5 = 42
TagBytes6 = 50
TagBytes7 = 58
TagBytes8 = 66
TagBytes9 = 74
)

// One-byte tags for VARINT fields.
Expand Down
3 changes: 3 additions & 0 deletions internal/protobuf/tags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ func TestTags(t *testing.T) {
{tag: iprotobuf.TagBytes4, num: 4},
{tag: iprotobuf.TagBytes5, num: 5},
{tag: iprotobuf.TagBytes6, num: 6},
{tag: iprotobuf.TagBytes7, num: 7},
{tag: iprotobuf.TagBytes8, num: 8},
{tag: iprotobuf.TagBytes9, num: 9},
} {
require.EqualValues(t, protowire.EncodeTag(protowire.Number(tc.num), protowire.BytesType), tc.tag)

Expand Down
15 changes: 14 additions & 1 deletion pkg/core/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"context"
"errors"
"io"

"github.com/nspcc-dev/neofs-sdk-go/client"
Expand Down Expand Up @@ -29,12 +30,24 @@ type Client interface {
AnnounceIntermediateTrust(ctx context.Context, epoch uint64, trust reputationSDK.PeerToPeerTrust, prm client.PrmAnnounceIntermediateTrust) error
}

// ErrSkipConnection is returned to skip connection.
var ErrSkipConnection = errors.New("connection skipped")

// ErrAllConnectionsSkipped allows to check whether
// [MultiAddressClient.ForAnyGRPCConn] error is returns because all connections
// are unavailable or skipped.
var ErrAllConnectionsSkipped = errors.New("all connections skipped")

// MultiAddressClient is an interface of the
// Client that supports multihost work.
type MultiAddressClient interface {
Client

// ForAnyGRPCConn executes op over gRPC connections to given multi-address
// endpoint-by-endpoint until success.
ForAnyGRPCConn(context.Context, func(context.Context, *grpc.ClientConn) error) error
//
// If next endpoint is unavailable or f returns [ErrSkipConnection] for it,
// ForAnyGRPCConn continues. If this happens on all endpoints, ForAnyGRPCConn
// returns [ErrAllConnectionsSkipped].
ForAnyGRPCConn(ctx context.Context, f func(context.Context, *grpc.ClientConn) error) error
}
49 changes: 41 additions & 8 deletions pkg/network/cache/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/hex"
"errors"
"fmt"
"io"
"iter"
Expand Down Expand Up @@ -279,20 +280,26 @@ func (x *connections) all(f func(ma string, c *client.Client) bool) {
}

func (x *connections) forAny(ctx context.Context, f func(context.Context, *client.Client) error) error {
var firstErr error
var firstUnavailableErr error
for ma, c := range x.all {
err := f(ctx, c)
if err == nil {
return nil
}
if !isTempError(err) {
if errors.Is(err, clientcore.ErrSkipConnection) {
continue
}
if !isUnavailableError(err) {
return newEndpointError(ma, err)
}
if firstErr == nil {
firstErr = newEndpointError(ma, err)
if firstUnavailableErr == nil {
firstUnavailableErr = newEndpointError(ma, err)
}
}
return newMultiEndpointError(x.nodeID, firstErr)
if firstUnavailableErr == nil {
return clientcore.ErrAllConnectionsSkipped
}
return newMultiEndpointError(x.nodeID, firstUnavailableErr)
}

func (x *connections) ForAnyGRPCConn(ctx context.Context, f func(context.Context, *grpc.ClientConn) error) error {
Expand All @@ -318,7 +325,7 @@ func (x *connections) ReplicateObject(ctx context.Context, id oid.ID, src io.Rea
if err == nil {
return sig, nil
}
if !isTempError(err) {
if !isUnavailableError(err) {
return nil, newEndpointError(ma, err)
}
if _, errSeek := src.Seek(0, io.SeekStart); errSeek != nil {
Expand Down Expand Up @@ -401,7 +408,7 @@ func (x *connections) SearchObjects(ctx context.Context, cnr cid.ID, fs object.S
})
}

func isTempError(err error) bool {
func isUnavailableError(err error) bool {
st, ok := status.FromError(err)
return ok && st.Code() == codes.Unavailable
}
Expand All @@ -410,6 +417,32 @@ func newEndpointError(addr string, err error) error {
return fmt.Errorf("%s: %w", addr, err)
}

type multiEndpointError struct {
nodeID string
first error
}

// Error implements built-in [error].
func (x multiEndpointError) Error() string {
return fmt.Sprintf("all %s endpoints failed, first error: %s", x.nodeID, x.first)
}

// Unwrap implements interface for [errors] package.
func (x multiEndpointError) Unwrap() error {
return x.first
}

// Is implements interface for [errors.Is].
func (x multiEndpointError) Is(target error) bool {
if errors.Is(target, clientcore.ErrAllConnectionsSkipped) {
return true
}
return errors.Is(x.first, target)
}

func newMultiEndpointError(nodeID string, first error) error {
return fmt.Errorf("all %s endpoints failed, first error: %w", nodeID, first)
return multiEndpointError{
nodeID: nodeID,
first: first,
}
}
18 changes: 18 additions & 0 deletions pkg/network/cache/clients_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package cache

import (
"errors"
"testing"

coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client"
"github.com/stretchr/testify/require"
)

func TestMultiEndpointError(t *testing.T) {
firstErr := errors.New("test err")

err := newMultiEndpointError("NODE_X", firstErr)
require.EqualError(t, err, "all NODE_X endpoints failed, first error: test err")
require.ErrorIs(t, err, firstErr)
require.ErrorIs(t, err, coreclient.ErrAllConnectionsSkipped)
}
Loading
Loading