diff --git a/dozer-sink-aerospike/src/aerospike.rs b/dozer-sink-aerospike/src/aerospike.rs index 1a98592817..5deb836801 100644 --- a/dozer-sink-aerospike/src/aerospike.rs +++ b/dozer-sink-aerospike/src/aerospike.rs @@ -255,19 +255,23 @@ impl Client { }) } + pub(crate) fn config(&self) -> &as_config { + unsafe { &(*self.inner.as_ptr()).config } + } + pub(crate) unsafe fn write_batch( &self, batch: *mut as_batch_records, + policy: Option<*const as_policy_batch>, ) -> Result<(), AerospikeError> { debug!(target: "aerospike_sink", "Writing batch of size {}", batch.as_ref().unwrap().list.size); let started = Instant::now(); - let policy = self.inner.as_ref().config.policies.batch; as_try(|err| { aerospike_batch_write( self.inner.as_ptr(), err, - &policy as *const as_policy_batch, + policy.unwrap_or(std::ptr::null()), batch, ) })?; @@ -1210,7 +1214,16 @@ impl<'a> WriteBatch<'a> { } pub(crate) fn execute(mut self) -> Result<(), AerospikeError> { - unsafe { self.client.write_batch(self.inner.take().unwrap().as_ptr()) } + let config = self.client.config(); + let mut policy = config.policies.batch; + policy.base.max_retries = 2; + policy.base.sleep_between_retries = 1000; + unsafe { + self.client.write_batch( + self.inner.take().unwrap().as_ptr(), + Some((&policy) as *const as_policy_batch), + ) + } } }