Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,10 @@ type SpannerConn interface {
// was executed on the connection, or an error if the connection has not executed a read/write transaction
// that committed successfully.
CommitResponse() (commitResponse *spanner.CommitResponse, err error)
// ReadOnlyTransactionTimestamp returns the read timestamp chosen by Cloud Spanner
// for the current read-only transaction, or an error if there is no active
// read-only transaction.
ReadOnlyTransactionTimestamp() (readTimestamp time.Time, err error)

// UnderlyingClient returns the underlying Spanner client for the database.
// The client cannot be used to access the current transaction or batch on
Expand Down Expand Up @@ -314,6 +318,13 @@ func (c *conn) CommitResponse() (commitResponse *spanner.CommitResponse, err err
return resp, nil
}

func (c *conn) ReadOnlyTransactionTimestamp() (time.Time, error) {
if !c.inTransaction() {
return time.Time{}, spanner.ToSpannerError(status.Error(codes.FailedPrecondition, "no active transaction"))
}
return c.tx.ReadOnlyTransactionTimestamp()
}

func (c *conn) clearCommitResponse() {
_ = propertyCommitResponse.SetValue(c.state, nil, connectionstate.ContextUser)
_ = propertyCommitTimestamp.SetValue(c.state, nil, connectionstate.ContextUser)
Expand Down
11 changes: 9 additions & 2 deletions testutil/inmem_spanner_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,8 +670,10 @@ func (s *inMemSpannerServer) generateTransactionName(session string) string {
func (s *inMemSpannerServer) beginTransaction(session *spannerpb.Session, options *spannerpb.TransactionOptions) *spannerpb.Transaction {
id := s.generateTransactionName(session.Name)
res := &spannerpb.Transaction{
Id: []byte(id),
ReadTimestamp: getCurrentTimestamp(),
Id: []byte(id),
}
if options != nil && options.GetReadOnly() != nil {
res.ReadTimestamp = getCurrentTimestamp()
}
s.mu.Lock()
s.transactions[id] = res
Expand Down Expand Up @@ -988,6 +990,11 @@ func (s *inMemSpannerServer) executeStreamingSQL(req *spannerpb.ExecuteSqlReques
}
s.mu.Lock()
resWithTx := statementResult.getResultSetWithTransactionSet(req.GetTransaction(), id)
if resWithTx.ResultSet != nil && resWithTx.ResultSet.Metadata != nil && resWithTx.ResultSet.Metadata.Transaction != nil {
if tx, ok := s.transactions[string(id)]; ok {
resWithTx.ResultSet.Metadata.Transaction.ReadTimestamp = tx.ReadTimestamp
}
}
isPartitionedDml := s.partitionedDmlTransactions[string(id)]
s.mu.Unlock()
switch statementResult.Type {
Expand Down
22 changes: 22 additions & 0 deletions transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type contextTransaction interface {
IsInBatch() bool

BufferWrite(ms []*spanner.Mutation) error
ReadOnlyTransactionTimestamp() (time.Time, error)
}

type rowIterator interface {
Expand Down Expand Up @@ -168,6 +169,13 @@ func (d *delegatingTransaction) Rollback() error {
return d.contextTransaction.Rollback()
}

func (d *delegatingTransaction) ReadOnlyTransactionTimestamp() (time.Time, error) {
if err := d.ensureActivated(); err != nil {
return time.Time{}, err
}
return d.contextTransaction.ReadOnlyTransactionTimestamp()
}
Comment on lines +172 to +177

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Calling d.ensureActivated() here is inefficient because it forces the activation/creation of the underlying Spanner transaction even if no query has been executed yet. Since a transaction cannot have a read timestamp before any query is run, we can avoid this overhead by checking if d.contextTransaction is nil and returning the appropriate error directly based on the transaction's read-only/read-write configuration.

func (d *delegatingTransaction) ReadOnlyTransactionTimestamp() (time.Time, error) {
	if d.contextTransaction == nil {
		if !propertyTransactionReadOnly.GetValueOrDefault(d.conn.state) {
			return time.Time{}, spanner.ToSpannerError(status.Error(codes.FailedPrecondition, "cannot retrieve read timestamp on a read-write transaction"))
		}
		return time.Time{}, spanner.ToSpannerError(status.Error(codes.FailedPrecondition, "underlying read-only transaction is not initialized"))
	}
	return d.contextTransaction.ReadOnlyTransactionTimestamp()
}


func (d *delegatingTransaction) resetForRetry(ctx context.Context) error {
if d.contextTransaction == nil {
return status.Error(codes.FailedPrecondition, "a transaction can only be reset after it has been activated")
Expand Down Expand Up @@ -371,6 +379,16 @@ func (tx *readOnlyTransaction) BufferWrite([]*spanner.Mutation) error {
return spanner.ToSpannerError(status.Errorf(codes.FailedPrecondition, "read-only transactions cannot write"))
}

func (tx *readOnlyTransaction) ReadOnlyTransactionTimestamp() (time.Time, error) {
if tx.roTx != nil {
return tx.roTx.Timestamp()
}
if tx.boTx != nil {
return tx.boTx.Timestamp()
}
return time.Time{}, spanner.ToSpannerError(status.Error(codes.FailedPrecondition, "underlying read-only transaction is not initialized"))
}
Comment on lines +382 to +390

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The check if tx.boTx != nil is redundant and represents dead code. When tx.boTx (the batch read-only transaction) is non-nil, tx.roTx is also always initialized to &bo.ReadOnlyTransaction during activation. Therefore, tx.roTx will never be nil in that case, and calling tx.roTx.Timestamp() is sufficient and equivalent to calling tx.boTx.Timestamp().

func (tx *readOnlyTransaction) ReadOnlyTransactionTimestamp() (time.Time, error) {
	if tx.roTx != nil {
		return tx.roTx.Timestamp()
	}
	return time.Time{}, spanner.ToSpannerError(status.Error(codes.FailedPrecondition, "underlying read-only transaction is not initialized"))
}


// ErrAbortedDueToConcurrentModification is returned by a read/write transaction
// that was aborted by Cloud Spanner, and where the internal retry attempt
// failed because it detected that the results during the retry were different
Expand Down Expand Up @@ -810,6 +828,10 @@ func (tx *readWriteTransaction) BufferWrite(ms []*spanner.Mutation) error {
return tx.rwTx.BufferWrite(ms)
}

func (tx *readWriteTransaction) ReadOnlyTransactionTimestamp() (time.Time, error) {
return time.Time{}, spanner.ToSpannerError(status.Error(codes.FailedPrecondition, "cannot retrieve read timestamp on a read-write transaction"))
}

// errorsEqualForRetry returns true if the two errors should be considered equal
// when retrying a transaction. This comparison will return true if:
// - The errors are the same instances
Expand Down
68 changes: 68 additions & 0 deletions transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,3 +461,71 @@ func BenchmarkReadWriteTransaction(b *testing.B) {
})
}
}

func TestReadOnlyTransactionTimestamp(t *testing.T) {
t.Parallel()

db, _, teardown := setupTestDBConnection(t)
defer teardown()
ctx := context.Background()

conn, err := db.Conn(ctx)
if err != nil {
t.Fatal(err)
}
defer silentClose(conn)

err = conn.Raw(func(driverConn any) error {
spannerConn := driverConn.(SpannerConn)
_, err := spannerConn.ReadOnlyTransactionTimestamp()
return err
})
if err == nil {
t.Error("expected error retrieving read timestamp when no transaction is active")
} else if g, w := status.Code(err), codes.FailedPrecondition; g != w {
t.Errorf("error code mismatch\n Got: %v\nWant: %v", g, w)
}

if _, err := conn.ExecContext(ctx, "begin transaction read only"); err != nil {
t.Fatal(err)
}

rows, err := conn.QueryContext(ctx, testutil.SelectFooFromBar)
if err != nil {
t.Fatal(err)
}
if rows.Next() {
var foo int64
_ = rows.Scan(&foo)
}
_ = rows.Close()

var readTimestamp time.Time
err = conn.Raw(func(driverConn any) error {
spannerConn := driverConn.(SpannerConn)
var err error
readTimestamp, err = spannerConn.ReadOnlyTransactionTimestamp()
return err
})
if err != nil {
t.Fatalf("unexpected error retrieving read timestamp: %v", err)
}
if readTimestamp.IsZero() {
t.Error("expected non-zero read timestamp")
}

if _, err := conn.ExecContext(ctx, "rollback"); err != nil {
t.Fatal(err)
}

err = conn.Raw(func(driverConn any) error {
spannerConn := driverConn.(SpannerConn)
_, err := spannerConn.ReadOnlyTransactionTimestamp()
return err
})
if err == nil {
t.Error("expected error retrieving read timestamp after transaction is closed")
} else if g, w := status.Code(err), codes.FailedPrecondition; g != w {
t.Errorf("error code mismatch\n Got: %v\nWant: %v", g, w)
}
}
Loading