diff --git a/conn.go b/conn.go index d7d9aafb..cc4d2bb2 100644 --- a/conn.go +++ b/conn.go @@ -213,6 +213,10 @@ type SpannerConn interface { // was executed on the connection, or an error if the connection has not executed a read/write transaction // that committed successfully. CommitResponse() (commitResponse *spanner.CommitResponse, err error) + // ReadOnlyTransactionTimestamp returns the read timestamp chosen by Cloud Spanner + // for the current read-only transaction, or an error if there is no active + // read-only transaction. + ReadOnlyTransactionTimestamp() (readTimestamp time.Time, err error) // UnderlyingClient returns the underlying Spanner client for the database. // The client cannot be used to access the current transaction or batch on @@ -314,6 +318,13 @@ func (c *conn) CommitResponse() (commitResponse *spanner.CommitResponse, err err return resp, nil } +func (c *conn) ReadOnlyTransactionTimestamp() (time.Time, error) { + if !c.inTransaction() { + return time.Time{}, spanner.ToSpannerError(status.Error(codes.FailedPrecondition, "no active transaction")) + } + return c.tx.ReadOnlyTransactionTimestamp() +} + func (c *conn) clearCommitResponse() { _ = propertyCommitResponse.SetValue(c.state, nil, connectionstate.ContextUser) _ = propertyCommitTimestamp.SetValue(c.state, nil, connectionstate.ContextUser) diff --git a/testutil/inmem_spanner_server.go b/testutil/inmem_spanner_server.go index 02477b24..a70c6f32 100644 --- a/testutil/inmem_spanner_server.go +++ b/testutil/inmem_spanner_server.go @@ -670,8 +670,10 @@ func (s *inMemSpannerServer) generateTransactionName(session string) string { func (s *inMemSpannerServer) beginTransaction(session *spannerpb.Session, options *spannerpb.TransactionOptions) *spannerpb.Transaction { id := s.generateTransactionName(session.Name) res := &spannerpb.Transaction{ - Id: []byte(id), - ReadTimestamp: getCurrentTimestamp(), + Id: []byte(id), + } + if options != nil && options.GetReadOnly() != nil { + res.ReadTimestamp = getCurrentTimestamp() } s.mu.Lock() s.transactions[id] = res @@ -988,6 +990,11 @@ func (s *inMemSpannerServer) executeStreamingSQL(req *spannerpb.ExecuteSqlReques } s.mu.Lock() resWithTx := statementResult.getResultSetWithTransactionSet(req.GetTransaction(), id) + if resWithTx.ResultSet != nil && resWithTx.ResultSet.Metadata != nil && resWithTx.ResultSet.Metadata.Transaction != nil { + if tx, ok := s.transactions[string(id)]; ok { + resWithTx.ResultSet.Metadata.Transaction.ReadTimestamp = tx.ReadTimestamp + } + } isPartitionedDml := s.partitionedDmlTransactions[string(id)] s.mu.Unlock() switch statementResult.Type { diff --git a/transaction.go b/transaction.go index c01698f3..001d90b0 100644 --- a/transaction.go +++ b/transaction.go @@ -58,6 +58,7 @@ type contextTransaction interface { IsInBatch() bool BufferWrite(ms []*spanner.Mutation) error + ReadOnlyTransactionTimestamp() (time.Time, error) } type rowIterator interface { @@ -168,6 +169,13 @@ func (d *delegatingTransaction) Rollback() error { return d.contextTransaction.Rollback() } +func (d *delegatingTransaction) ReadOnlyTransactionTimestamp() (time.Time, error) { + if err := d.ensureActivated(); err != nil { + return time.Time{}, err + } + return d.contextTransaction.ReadOnlyTransactionTimestamp() +} + func (d *delegatingTransaction) resetForRetry(ctx context.Context) error { if d.contextTransaction == nil { return status.Error(codes.FailedPrecondition, "a transaction can only be reset after it has been activated") @@ -371,6 +379,16 @@ func (tx *readOnlyTransaction) BufferWrite([]*spanner.Mutation) error { return spanner.ToSpannerError(status.Errorf(codes.FailedPrecondition, "read-only transactions cannot write")) } +func (tx *readOnlyTransaction) ReadOnlyTransactionTimestamp() (time.Time, error) { + if tx.roTx != nil { + return tx.roTx.Timestamp() + } + if tx.boTx != nil { + return tx.boTx.Timestamp() + } + return time.Time{}, spanner.ToSpannerError(status.Error(codes.FailedPrecondition, "underlying read-only transaction is not initialized")) +} + // ErrAbortedDueToConcurrentModification is returned by a read/write transaction // that was aborted by Cloud Spanner, and where the internal retry attempt // failed because it detected that the results during the retry were different @@ -810,6 +828,10 @@ func (tx *readWriteTransaction) BufferWrite(ms []*spanner.Mutation) error { return tx.rwTx.BufferWrite(ms) } +func (tx *readWriteTransaction) ReadOnlyTransactionTimestamp() (time.Time, error) { + return time.Time{}, spanner.ToSpannerError(status.Error(codes.FailedPrecondition, "cannot retrieve read timestamp on a read-write transaction")) +} + // errorsEqualForRetry returns true if the two errors should be considered equal // when retrying a transaction. This comparison will return true if: // - The errors are the same instances diff --git a/transaction_test.go b/transaction_test.go index 1592f6ba..1295d070 100644 --- a/transaction_test.go +++ b/transaction_test.go @@ -461,3 +461,71 @@ func BenchmarkReadWriteTransaction(b *testing.B) { }) } } + +func TestReadOnlyTransactionTimestamp(t *testing.T) { + t.Parallel() + + db, _, teardown := setupTestDBConnection(t) + defer teardown() + ctx := context.Background() + + conn, err := db.Conn(ctx) + if err != nil { + t.Fatal(err) + } + defer silentClose(conn) + + err = conn.Raw(func(driverConn any) error { + spannerConn := driverConn.(SpannerConn) + _, err := spannerConn.ReadOnlyTransactionTimestamp() + return err + }) + if err == nil { + t.Error("expected error retrieving read timestamp when no transaction is active") + } else if g, w := status.Code(err), codes.FailedPrecondition; g != w { + t.Errorf("error code mismatch\n Got: %v\nWant: %v", g, w) + } + + if _, err := conn.ExecContext(ctx, "begin transaction read only"); err != nil { + t.Fatal(err) + } + + rows, err := conn.QueryContext(ctx, testutil.SelectFooFromBar) + if err != nil { + t.Fatal(err) + } + if rows.Next() { + var foo int64 + _ = rows.Scan(&foo) + } + _ = rows.Close() + + var readTimestamp time.Time + err = conn.Raw(func(driverConn any) error { + spannerConn := driverConn.(SpannerConn) + var err error + readTimestamp, err = spannerConn.ReadOnlyTransactionTimestamp() + return err + }) + if err != nil { + t.Fatalf("unexpected error retrieving read timestamp: %v", err) + } + if readTimestamp.IsZero() { + t.Error("expected non-zero read timestamp") + } + + if _, err := conn.ExecContext(ctx, "rollback"); err != nil { + t.Fatal(err) + } + + err = conn.Raw(func(driverConn any) error { + spannerConn := driverConn.(SpannerConn) + _, err := spannerConn.ReadOnlyTransactionTimestamp() + return err + }) + if err == nil { + t.Error("expected error retrieving read timestamp after transaction is closed") + } else if g, w := status.Code(err), codes.FailedPrecondition; g != w { + t.Errorf("error code mismatch\n Got: %v\nWant: %v", g, w) + } +}