Skip to content

Commit

Permalink
CASSGO-22: Refactor Query and Batch to be immutable
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasz-antoniak committed Dec 12, 2024
1 parent 37030fb commit 2d6d9a1
Show file tree
Hide file tree
Showing 11 changed files with 392 additions and 210 deletions.
111 changes: 86 additions & 25 deletions cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ func TestCAS(t *testing.T) {
insertBatch := session.Batch(LoggedBatch)
insertBatch.Query("INSERT INTO cas_table (title, revid, last_modified) VALUES ('_foo', 2c3af400-73a4-11e5-9381-29463d90c3f0, DATEOF(NOW()))")
insertBatch.Query("INSERT INTO cas_table (title, revid, last_modified) VALUES ('_foo', 3e4ad2f1-73a4-11e5-9381-29463d90c3f0, DATEOF(NOW()))")
if err := session.ExecuteBatch(insertBatch); err != nil {
if _, err := session.ExecuteBatch(insertBatch); err != nil {
t.Fatal("insert:", err)
}

Expand Down Expand Up @@ -616,7 +616,7 @@ func TestBatch(t *testing.T) {
batch.Query(`INSERT INTO batch_table (id) VALUES (?)`, i)
}

if err := session.ExecuteBatch(batch); err != nil {
if _, err := session.ExecuteBatch(batch); err != nil {
t.Fatal("execute batch:", err)
}

Expand Down Expand Up @@ -652,7 +652,7 @@ func TestUnpreparedBatch(t *testing.T) {
batch.Query(`UPDATE batch_unprepared SET c = c + 1 WHERE id = 1`)
}

if err := session.ExecuteBatch(batch); err != nil {
if _, err := session.ExecuteBatch(batch); err != nil {
t.Fatal("execute batch:", err)
}

Expand Down Expand Up @@ -688,7 +688,7 @@ func TestBatchLimit(t *testing.T) {
for i := 0; i < 65537; i++ {
batch.Query(`INSERT INTO batch_table2 (id) VALUES (?)`, i)
}
if err := session.ExecuteBatch(batch); err != ErrTooManyStmts {
if _, err := session.ExecuteBatch(batch); err != ErrTooManyStmts {
t.Fatal("gocql attempted to execute a batch larger than the support limit of statements.")
}

Expand Down Expand Up @@ -740,7 +740,7 @@ func TestTooManyQueryArgs(t *testing.T) {

batch := session.Batch(UnloggedBatch)
batch.Query("INSERT INTO too_many_query_args (id, value) VALUES (?, ?)", 1, 2, 3)
err = session.ExecuteBatch(batch)
_, err = session.ExecuteBatch(batch)

if err == nil {
t.Fatal("'`INSERT INTO too_many_query_args (id, value) VALUES (?, ?)`, 1, 2, 3' should return an error")
Expand Down Expand Up @@ -772,7 +772,7 @@ func TestNotEnoughQueryArgs(t *testing.T) {

batch := session.Batch(UnloggedBatch)
batch.Query("INSERT INTO not_enough_query_args (id, cluster, value) VALUES (?, ?, ?)", 1, 2)
err = session.ExecuteBatch(batch)
_, err = session.ExecuteBatch(batch)

if err == nil {
t.Fatal("'`INSERT INTO not_enough_query_args (id, cluster, value) VALUES (?, ?, ?)`, 1, 2' should return an error")
Expand Down Expand Up @@ -1110,8 +1110,9 @@ func Test_RetryPolicyIdempotence(t *testing.T) {
q.RetryPolicy(&MyRetryPolicy{})
q.Consistency(All)

_ = q.Exec()
require.Equal(t, tc.expectedNumberOfTries, q.Attempts())
iter := q.Iter()
_ = iter.Close()
require.Equal(t, tc.expectedNumberOfTries, iter.Attempts())
})
}
}
Expand Down Expand Up @@ -1395,7 +1396,7 @@ func TestBatchQueryInfo(t *testing.T) {
batch := session.Batch(LoggedBatch)
batch.Bind("INSERT INTO batch_query_info (id, cluster, value) VALUES (?, ?,?)", write)

if err := session.ExecuteBatch(batch); err != nil {
if _, err := session.ExecuteBatch(batch); err != nil {
t.Fatalf("batch insert into batch_query_info failed, err '%v'", err)
}

Expand Down Expand Up @@ -1481,15 +1482,15 @@ func TestPrepare_MissingSchemaPrepare(t *testing.T) {
defer s.Close()

insertQry := s.Query("INSERT INTO invalidschemaprep (val) VALUES (?)", 5)
if err := conn.executeQuery(ctx, insertQry).err; err == nil {
if err := conn.executeQuery(ctx, insertQry, nil).err; err == nil {
t.Fatal("expected error, but got nil.")
}

if err := createTable(s, "CREATE TABLE gocql_test.invalidschemaprep (val int, PRIMARY KEY (val))"); err != nil {
t.Fatal("create table:", err)
}

if err := conn.executeQuery(ctx, insertQry).err; err != nil {
if err := conn.executeQuery(ctx, insertQry, nil).err; err != nil {
t.Fatal(err) // unconfigured columnfamily
}
}
Expand All @@ -1503,7 +1504,7 @@ func TestPrepare_ReprepareStatement(t *testing.T) {

stmt, conn := injectInvalidPreparedStatement(t, session, "test_reprepare_statement")
query := session.Query(stmt, "bar")
if err := conn.executeQuery(ctx, query).Close(); err != nil {
if err := conn.executeQuery(ctx, query, nil).Close(); err != nil {
t.Fatalf("Failed to execute query for reprepare statement: %v", err)
}
}
Expand Down Expand Up @@ -1867,14 +1868,15 @@ func TestQueryStats(t *testing.T) {
session := createSession(t)
defer session.Close()
qry := session.Query("SELECT * FROM system.peers")
if err := qry.Exec(); err != nil {
iter := qry.Iter()
if err := iter.Close(); err != nil {
t.Fatalf("query failed. %v", err)
} else {
if qry.Attempts() < 1 {
if iter.Attempts() < 1 {
t.Fatal("expected at least 1 attempt, but got 0")
}
if qry.Latency() <= 0 {
t.Fatalf("expected latency to be greater than 0, but got %v instead.", qry.Latency())
if iter.Latency() <= 0 {
t.Fatalf("expected latency to be greater than 0, but got %v instead.", iter.Latency())
}
}
}
Expand Down Expand Up @@ -1908,14 +1910,14 @@ func TestBatchStats(t *testing.T) {
b.Query("INSERT INTO batchStats (id) VALUES (?)", 1)
b.Query("INSERT INTO batchStats (id) VALUES (?)", 2)

if err := session.ExecuteBatch(b); err != nil {
if iter, err := session.ExecuteBatch(b); err != nil {
t.Fatalf("query failed. %v", err)
} else {
if b.Attempts() < 1 {
if iter.Attempts() < 1 {
t.Fatal("expected at least 1 attempt, but got 0")
}
if b.Latency() <= 0 {
t.Fatalf("expected latency to be greater than 0, but got %v instead.", b.Latency())
if iter.Latency() <= 0 {
t.Fatalf("expected latency to be greater than 0, but got %v instead.", iter.Latency())
}
}
}
Expand Down Expand Up @@ -1965,7 +1967,7 @@ func TestBatchObserve(t *testing.T) {
batch.Query(fmt.Sprintf(`INSERT INTO batch_observe_table (id,other) VALUES (?,%d)`, i), i)
}

if err := session.ExecuteBatch(batch); err != nil {
if _, err := session.ExecuteBatch(batch); err != nil {
t.Fatal("execute batch:", err)
}
if observedBatch == nil {
Expand Down Expand Up @@ -2876,6 +2878,65 @@ func TestManualQueryPaging(t *testing.T) {
}
}

func TestQueryImmutability(t *testing.T) {
const rowsToInsert = 5

session := createSession(t)
defer session.Close()

if err := createTable(session, "CREATE TABLE gocql_test.testAutomaticPaging (id int, count int, PRIMARY KEY (id))"); err != nil {
t.Fatal(err)
}

for i := 0; i < rowsToInsert; i++ {
err := session.Query("INSERT INTO testAutomaticPaging(id, count) VALUES(?, ?)", i, i*i).Exec()
if err != nil {
t.Fatal(err)
}
}

query := session.Query("SELECT id, count FROM testAutomaticPaging").PageSize(2)
var id, count, fetched1, fetched2 int

iter1 := query.Iter()
iter2 := query.Iter()
scanner1 := iter1.Scanner()
scanner2 := iter2.Scanner()
for scanner1.Next() {
err := scanner1.Scan(&id, &count)
if err != nil {
t.Fatalf(err.Error())
}
if fetched1%2 == 0 {
// move two iterators at different pace, to verify that one does not impact the other
if !scanner2.Next() {
t.Fatalf("unexpected end of pagination after %d entries", fetched2)
} else {
fetched2++
}
}
if count != (id * id) {
t.Fatalf("got wrong value from iteration: got %d expected %d", count, id*id)
}
require.True(t, query.pageState == nil, "initial page state was not set")
require.True(t, iter1.PageState() != nil, "page state is handled by the iterator")

fetched1++
}

if err := iter1.Close(); err != nil {
t.Fatal(err)
}
if err := iter2.Close(); err != nil {
t.Fatal(err)
}

if fetched1 != rowsToInsert {
t.Fatalf("expected to fetch %d rows got %d", rowsToInsert, fetched1)
}
require.Equal(t, math.Ceil(rowsToInsert/2.0), float64(fetched2))
}

func TestLexicalUUIDType(t *testing.T) {
session := createSession(t)
defer session.Close()
Expand Down Expand Up @@ -3291,14 +3352,14 @@ func TestUnsetColBatch(t *testing.T) {
b.Query("INSERT INTO gocql_test.batchUnsetInsert(id, my_int, my_text) VALUES (?,?,?)", 1, UnsetValue, "")
b.Query("INSERT INTO gocql_test.batchUnsetInsert(id, my_int, my_text) VALUES (?,?,?)", 2, 2, UnsetValue)

if err := session.ExecuteBatch(b); err != nil {
if iter, err := session.ExecuteBatch(b); err != nil {
t.Fatalf("query failed. %v", err)
} else {
if b.Attempts() < 1 {
if iter.Attempts() < 1 {
t.Fatal("expected at least 1 attempt, but got 0")
}
if b.Latency() <= 0 {
t.Fatalf("expected latency to be greater than 0, but got %v instead.", b.Latency())
if iter.Latency() <= 0 {
t.Fatalf("expected latency to be greater than 0, but got %v instead.", iter.Latency())
}
}
var id, mInt, count int
Expand Down
Loading

0 comments on commit 2d6d9a1

Please sign in to comment.