Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ExecuteBatchCAS not applied, MapExecuteBatchCAS is applied #1746

Open
danthegoodman1 opened this issue Apr 7, 2024 · 20 comments
Open

ExecuteBatchCAS not applied, MapExecuteBatchCAS is applied #1746

danthegoodman1 opened this issue Apr 7, 2024 · 20 comments

Comments

@danthegoodman1
Copy link

Using the latest version of scylla on docker, I have the following code:

	b := tx.session.NewBatch(gocql.UnloggedBatch)

	for key, val := range tx.pendingWrites {
		// Write rowLock and data to primary key
		lock := rowLock{
			PrimaryLockKey: tx.primaryLockKey,
			StartTs:        tx.readTime.UnixNano(),
			TimeoutTs:      tx.readTime.Add(time.Second * 5).UnixNano(),
		}

		encodedLock, err := lock.Encode()
		if err != nil {
			return fmt.Errorf("error in rowLock.Encode: %w", err)
		}

		// Insert the lock
		b.Entries = append(b.Entries, gocql.BatchEntry{
			Stmt: fmt.Sprintf("insert into \"%s\" (key, ts, col, val) values (?, 0, 'l', ?) if not exists", tx.table),
			Args: []any{key, []byte(encodedLock)},
		})

		// Insert the data record
		b.Entries = append(b.Entries, gocql.BatchEntry{
			Stmt: fmt.Sprintf("insert into \"%s\" (key, ts, col, val) values (?, ?, 'd', ?) if not exists", tx.table),
			Args: []any{key, tx.readTime.UnixNano(), val},
		})
	}

	// Non-map version was always having applied: false
	applied, _, err := tx.session.MapExecuteBatchCAS(b, make(map[string]interface{}))
	if err != nil {
		return fmt.Errorf("error in MapExecuteBatchCAS: %w", err)
	}

	if !applied {
		return fmt.Errorf("%w: prewrite not applied (confict)", &TxnAborted{})
	}

When using ExecuteBatchCAS(b), the applied return would always be false, but when using MapExecuteBatchCAS(b, make(map[string]interface{})) (despite not actaully binding to anything), applied would be true.

Why is this? My understand is that it si a difference of binding only.

@testisnullus
Copy link

@danthegoodman1 Hello, I am investigating this issue. For now, I can verify that ExecuteBatchCAS() and MapExecuteBatchCAS() functions are working correctly.

When using MapExecuteBatchCAS(), the function involves passing a map which is intended to hold any values returned by the operation if the batch is not applied. The typical usage of MapExecuteBatchCAS() involves passing a map (even the empty one, like make(map[string]interface{})) that will store column values from the row that prevented the batch from being applied successfully.

Also, If the data state matches the condition (like IF NOT EXISTS in this particular case), the MapExecuteBatchCAS() operation can return applied = true, in case the data did not need any modification or was modified as expected. But, if the data state does not match, you cas see applied = false, indicating the batch was not applied because conditions weren't met.

@danthegoodman1
Copy link
Author

danthegoodman1 commented Apr 30, 2024

@testisnullus I'm not sure you've understood the issue, or rather I was likely not clear enough. The issue is that ExecuteBatchCAS is returning applied false, when it should be true (there were no issues).

@testisnullus
Copy link

@danthegoodman1 I am testing on the Cassandra 4.1.4 version. The ExecuteBatchCAS() function works without any issues for me. I've tried to execute it with some data that is not in the table to bypass the IF NOT EXISTS statement provided in the CQL query:

		b.Entries = append(b.Entries, gocql.BatchEntry{
			Stmt: fmt.Sprintf("INSERT INTO \"%s\" (name, study_date, sleep_time_hours) VALUES (?, ?, ?) IF NOT EXISTS", tx.table),
			Args: []any{key, studyDate, sleepTimeHours32},
		})

		b.Entries = append(b.Entries, gocql.BatchEntry{
			Stmt: fmt.Sprintf("UPDATE \"%s\" SET sleep_time_hours=? WHERE name=? AND study_date=?", tx.table),
			Args: []any{sleepTimeHours32, key, "2023-04-30"},
		})

I've called the ExecuteBatchCAS() func the same way:

	applied, _, err := tx.session.ExecuteBatchCAS(b)
	if err != nil {
		return fmt.Errorf("error in ExecuteBatchCAS: %w", err)
	}

If I println the applied variable here, it appears true in this case.

Could you please provide more information about your issues with the ExecuteBatchCAS() function? Do you have any error messages related to this operation?

@danthegoodman1
Copy link
Author

No errors, I noticed purely that it would never apply (checking the DB shows that it indeed did not apply), but if I simply swapped it with the Map- version it would apply.

@danthegoodman1
Copy link
Author

Here is the keyspace and table I used

func createKeyspace(s *gocql.Session) error {
	if err := s.Query("CREATE KEYSPACE if not exists test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor' : 1}").Exec(); err != nil {
		return fmt.Errorf("error creating keyspace: %w", err)
	}

	return nil
}

func createTable(s *gocql.Session) error {
	if err := s.Query("create table if not exists testkv (key text, col text, ts bigint, val blob, primary key (key, col, ts)) with clustering order by (col desc, ts desc)").Exec(); err != nil {
		return fmt.Errorf("error creating table: %w", err)
	}

	return nil
}

@testisnullus
Copy link

@danthegoodman1 Hello, I've tested the ExecuteBatchCAS() with your keyspace and table examples.

The table looks like this after batch operation:

ra@cqlsh> SELECT * FROM test.testkv;

 key  | col | ts                  | val
------+-----+---------------------+--------------------------
 key1 |   l |                   0 | 0x656e636f6465644c6f636b
 key1 |   d | 1714566727267344355 |           0x76616c756531

(2 rows)

I can share the full code snippet with you also:

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/gocql/gocql"
)

type Tx struct {
	session       *gocql.Session
	pendingWrites map[string][]byte
	readTime      time.Time
	table         string
}

type TxnAborted struct{}

func (e *TxnAborted) Error() string {
	return "Transaction aborted due to write conflict"
}

func createKeyspace(s *gocql.Session) error {
	if err := s.Query("CREATE KEYSPACE if not exists test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor' : 1}").Exec(); err != nil {
		return fmt.Errorf("error creating keyspace: %w", err)
	}

	return nil
}

func createTable(s *gocql.Session) error {
	if err := s.Query("create table if not exists testkv (key text, col text, ts bigint, val blob, primary key (key, col, ts)) with clustering order by (col desc, ts desc)").Exec(); err != nil {
		return fmt.Errorf("error creating table: %w", err)
	}

	return nil
}

func (tx *Tx) ExecuteBatchTransaction() error {
	b := tx.session.NewBatch(gocql.UnloggedBatch)

	for key, val := range tx.pendingWrites {
		b.Entries = append(b.Entries, gocql.BatchEntry{
			Stmt: fmt.Sprintf("insert into \"%s\" (key, ts, col, val) values (?, 0, 'l', ?) if not exists", tx.table),
			Args: []any{key, []byte("encodedLock")},
		})

		// Insert the data record
		b.Entries = append(b.Entries, gocql.BatchEntry{
			Stmt: fmt.Sprintf("insert into \"%s\" (key, ts, col, val) values (?, ?, 'd', ?) if not exists", tx.table),
			Args: []any{key, tx.readTime.UnixNano(), val},
		})
	}

	applied, _, err := tx.session.ExecuteBatchCAS(b)
	if err != nil {
		return fmt.Errorf("error in ExecuteBatchCAS: %w", err)
	}

	if !applied {
		return fmt.Errorf("%w: prewrite not applied (conflict)", &TxnAborted{})
	}

	fmt.Println(applied)

	return nil
}

func main() {
	// connect to the cluster
	cluster := gocql.NewCluster("host1", "host2", "host3")
	cluster.Consistency = gocql.Quorum
	cluster.ProtoVersion = 4
	cluster.ConnectTimeout = time.Second * 10
	cluster.Keyspace = "test"
	cluster.Authenticator = gocql.PasswordAuthenticator{Username: "name", Password: "pass", AllowedAuthenticators: []string{"com.instaclustr.cassandra.auth.InstaclustrPasswordAuthenticator"}}
	session, err := cluster.CreateSession()
	if err != nil {
		log.Println(err)
		return
	}
	defer session.Close()

	err = createKeyspace(session)
	if err != nil {
		fmt.Println("Keyspace failed:", err)
	}

	err = createTable(session)
	if err != nil {
		fmt.Println("Table failed:", err)
	}

	tx := Tx{
		session:       session,
		pendingWrites: map[string][]byte{"key1": []byte("value1")},
		readTime:      time.Now(),
		table:         "testkv",
	}

	err = tx.ExecuteBatchTransaction()
	if err != nil {
		fmt.Println("Transaction failed:", err)
	}
}

The applied variable is also true in this case.

@testisnullus
Copy link

testisnullus commented May 1, 2024

Just tested on the latest ScyllaDB image (scylladb/scylla:latest), and it also works well without any issues.

What version of GoCQL and Scylla do you use now?

@danthegoodman1
Copy link
Author

Scylla is still latest, gocql was version v1.6.0

@danthegoodman1
Copy link
Author

I was also quite confused considering I changed nothing on the query and simply swapped out ExecuteBatchCAS for MapExecuteBatchCAS, no other changes.

@testisnullus
Copy link

@danthegoodman1 I cannot reproduce the issue for now :(
Does the issue affect executing queries for you or can you easily use the MapExecuteBatchCAS() function for your operations? I will be tracking this issue and will try to test the ExecuteBatchCAS() more closely to identify this problem's root cause.

@danthegoodman1
Copy link
Author

danthegoodman1 commented May 7, 2024 via email

@theoribeiro
Copy link

Interestingly, I'm also having the same issue happen to me. What's even more strange is that even though ExecuteBatchCAS() says it wasn't applied returning false (but no errors), the batch is applied successfully.
MapExecuteBatchCAS() works as expected!

@worryg0d
Copy link

Hello. I guess I successfully reproduced it. I used the code provided by @testisnullus and adjusted it a bit to run on Scylla 6.0.

I'm experiencing getting applied = false when executing batch ExecuteBatchCAS() on an empty table, but the data is applied:

cqlsh> SELECT * from test.testkv ;

 key  | col | ts                  | val
------+-----+---------------------+--------------------------
 key1 |   l |                   0 | 0x656e636f6465644c6f636b
 key1 |   d | 1737625396336730945 |           0x76616c756531

Also, I can't reproduce it on the latest Cassandra. The driver behaves the way it should and returns applied = true when the data is actually applied and false otherwise.

My Scylla setup:

docker run --name scylla-latest --hostname some-scylla -p 9042:9042 -d scylladb/scylla --smp 1

My Cassandra setup:

docker run --name cassandra-latest -p 9043:9042 -d cassandra

The code I used to reproduce it:

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/gocql/gocql"
)

type Tx struct {
	session       *gocql.Session
	pendingWrites map[string][]byte
	readTime      time.Time
	table         string
}

type TxnAborted struct{}

func (e *TxnAborted) Error() string {
	return "Transaction aborted due to write conflict"
}

func createKeyspace(s *gocql.Session) error {
	//if err := s.Query("CREATE KEYSPACE if not exists test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor' : 1}").Exec(); err != nil {
	//	return fmt.Errorf("error creating keyspace: %w", err)
	//}

	// Scylla 6.0 specific code. LWT is not yet supported with tablets
	if err := s.Query("CREATE KEYSPACE if not exists test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor' : 1} AND TABLETS = {'enabled': false}").Exec(); err != nil {
		return fmt.Errorf("error creating keyspace: %w", err)
	}

	return nil
}

func createTable(s *gocql.Session) error {
	// Table here won't be created because the keyspace is not specified...
	//if err := s.Query("create table if not exists testkv (key text, col text, ts bigint, val blob, primary key (key, col, ts)) with clustering order by (col desc, ts desc)").Exec(); err != nil {
	//	return fmt.Errorf("error creating table: %w", err)
	//}

	if err := s.Query("create table if not exists test.testkv (key text, col text, ts bigint, val blob, primary key (key, col, ts)) with clustering order by (col desc, ts desc)").Exec(); err != nil {
		return fmt.Errorf("error creating table: %w", err)
	}

	return nil
}

func (tx *Tx) ExecuteBatchTransaction() error {
	b := tx.session.NewBatch(gocql.UnloggedBatch)

	for key, val := range tx.pendingWrites {
		b.Entries = append(b.Entries, gocql.BatchEntry{
			//Stmt: fmt.Sprintf("insert into \"%s\" (key, ts, col, val) values (?, 0, 'l', ?) if not exists", tx.table),
			// DO NOT USE "\"\ for table name
			Stmt: fmt.Sprintf("insert into %s (key, ts, col, val) values (?, 0, 'l', ?) if not exists", tx.table),
			Args: []any{key, []byte("encodedLock")},
		})

		// Insert the data record
		b.Entries = append(b.Entries, gocql.BatchEntry{
			//Stmt: fmt.Sprintf("insert into \"%s\" (key, ts, col, val) values (?, ?, 'd', ?) if not exists", tx.table),
			Stmt: fmt.Sprintf("insert into %s (key, ts, col, val) values (?, ?, 'd', ?) if not exists", tx.table),
			Args: []any{key, tx.readTime.UnixNano(), val},
		})
	}

	//m := make(map[string]any)
	applied, _, err := tx.session.ExecuteBatchCAS(b)
	if err != nil {
		return fmt.Errorf("error in ExecuteBatchCAS: %w", err)
	}

	fmt.Println("APPLIED?", applied)
	//fmt.Println("MapExecuteBatchCAS:", m)

	if !applied {
		return fmt.Errorf("%w: prewrite not applied (conflict)", &TxnAborted{})
	}

	fmt.Println(applied)

	return nil
}

func main() {
	// connect to the cluster
	cluster := gocql.NewCluster("localhost:9042") // scylla
	//cluster := gocql.NewCluster("localhost:9043") // cassandra
	cluster.Consistency = gocql.Quorum
	cluster.ProtoVersion = 4
	cluster.ConnectTimeout = time.Second * 10
	// Keyspace is being created when session is already initialized...
	//cluster.Keyspace = "test"
	session, err := cluster.CreateSession()
	if err != nil {
		log.Println(err)
		return
	}
	defer session.Close()

	// keyspace "test" is being created here
	err = createKeyspace(session)
	if err != nil {
		fmt.Println("Keyspace failed:", err)
	}

	// table "testkv" in ks "test"
	err = createTable(session)
	if err != nil {
		fmt.Println("Table failed:", err)
	}

	tx := Tx{
		session:       session,
		pendingWrites: map[string][]byte{"key1": []byte("value1")},
		readTime:      time.Now(),
		// It won't work because keyspace is not specified...
		// Transaction failed: error in ExecuteBatchCAS: No keyspace has been specified. USE a keyspace, or explicitly specify keyspace.tablename
		//table: "testkv",
		table: "test.testkv",
	}

	err = tx.ExecuteBatchTransaction()
	if err != nil {
		fmt.Println("Transaction failed:", err)
	}
}

I'm not a Scylla expert, so I can't say if this is the driver issue or not. However, at least there is a way to reproduce it.
Also, it is reproducable on scylladb/gocql fork as well.

@worryg0d
Copy link

worryg0d commented Jan 23, 2025

Ok, now I see the difference. Cassandra when LWT is applied returns a single column [applied], but Scylla also returns the table columns. So, the result is 1 col for Cassandra and 5 for Scylla on our table. The driver checks whether is there enough destination variables to scan the result and if not enough it returns an error. However, here the driver doesn't account for iter.err and just returns nil.

If we adjust the code above then the driver will properly handle the applied value:

var (
	keyRead string
	tsRead  int64
	colRead string
	valRead string
)
applied, _, err := tx.session.ExecuteBatchCAS(b, &keyRead, &colRead, &tsRead, &valRead)

@joao-r-reis I think this is a good idea to make ExecuteBatchCAS() return iter.err instead of nil as we do in MapExecuteBatchCAS(), because any rows scanning errors are not handled here:

func (s *Session) ExecuteBatchCAS(batch *Batch, dest ...interface{}) (applied bool, iter *Iter, err error) {
	iter = s.executeBatch(batch)
	if err := iter.checkErrAndNotFound(); err != nil {
		iter.Close()
		return false, nil, err
	}

	if len(iter.Columns()) > 1 {
		dest = append([]interface{}{&applied}, dest...)
		iter.Scan(dest...)
	} else {
		iter.Scan(&applied)
	}

	return applied, iter, nil // here is nil returned, but should be iter.err
}

@joao-r-reis
Copy link
Contributor

Yeah that's a good idea, we should create a JIRA for this

@worryg0d
Copy link

And here it is: CASSGO-47

@danthegoodman1
Copy link
Author

danthegoodman1 commented Jan 23, 2025

So basically you just have to add the columns to scan against? That’d be a good comment to add, however I'm sure that'd be the error then

@worryg0d
Copy link

worryg0d commented Jan 23, 2025

Yep, the current gocql codebase properly handles it when additional columns for scanning purpose are presented as I showed in the comment above, but is only true for Scylla, with Cassandra it works as it should without any additional columns because it returns only an [applied] column in opposite to Scylla

@danthegoodman1
Copy link
Author

oh i just edited without getting the update, I assume with the open PR it'd throw an error about not having enough columns so you'd know if that was the issue (it was being masked by the nil return)

@worryg0d
Copy link

it'd throw an error about not having enough columns so you'd know if that was the issue

Yes, exactly

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants