Skip to content

Commit

Permalink
fix delivery restart (#430)
Browse files Browse the repository at this point in the history
Signed-off-by: Angelo De Caro <adc@zurich.ibm.com>
  • Loading branch information
adecaro authored Nov 16, 2022
1 parent 79739ca commit 35c062a
Showing 1 changed file with 60 additions and 47 deletions.
107 changes: 60 additions & 47 deletions platform/fabric/core/generic/delivery/delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ import (
var logger = flogging.MustGetLogger("fabric-sdk.delivery")

var (
ErrComm = errors.New("communication issue")
ErrComm = errors.New("communication issue")
StartGenesis = &ab.SeekPosition{
Type: &ab.SeekPosition_Oldest{
Oldest: &ab.SeekOldest{},
},
}
)

type Callback func(block *common.Block) (bool, error)
Expand Down Expand Up @@ -196,57 +201,12 @@ func (d *Delivery) connect(ctx context.Context) (DeliverStream, error) {
return nil, errors.Wrapf(err, "failed to get delivery stream")
}

start := &ab.SeekPosition{}
if d.lastBlockReceived != 0 {
if logger.IsEnabledFor(zapcore.DebugLevel) {
logger.Debugf("restarting from the last block received [%d]", d.lastBlockReceived)
}

start.Type = &ab.SeekPosition_Specified{
Specified: &ab.SeekSpecified{
Number: d.lastBlockReceived,
},
}
} else {
lastTxID, err := d.vault.GetLastTxID()
if err != nil {
return nil, errors.WithMessagef(err, "failed getting last transaction committed/discarted from the vault")
}

if len(lastTxID) != 0 && !strings.HasPrefix(lastTxID, committer.ConfigTXPrefix) {
// Retrieve block from Fabric
ch, err := d.network.Channel(d.channel)
if err != nil {
return nil, errors.WithMessagef(err, "failed getting channeln [%s]", d.channel)
}
blockNumber, err := ch.GetBlockNumberByTxID(lastTxID)
if err != nil {
return nil, errors.WithMessagef(err, "failed getting block number for transaction [%s]", lastTxID)
}
start.Type = &ab.SeekPosition_Specified{
Specified: &ab.SeekSpecified{
Number: blockNumber,
},
}
if logger.IsEnabledFor(zapcore.DebugLevel) {
logger.Debugf("restarting from block [%d], tx [%s]", blockNumber, lastTxID)
}
} else {
start.Type = &ab.SeekPosition_Oldest{
Oldest: &ab.SeekOldest{},
}
if logger.IsEnabledFor(zapcore.DebugLevel) {
logger.Debugf("starting from the beginning, no last transaction found")
}
}
}

blockEnvelope, err := CreateDeliverEnvelope(
d.channel,
d.network.LocalMembership().DefaultSigningIdentity(),
deliverClient.Certificate(),
hash.GetHasher(d.sp),
start,
d.GetStartPosition(),
)
if err != nil {
return nil, errors.Wrap(err, "failed to create deliver envelope")
Expand All @@ -262,6 +222,59 @@ func (d *Delivery) connect(ctx context.Context) (DeliverStream, error) {
return stream, nil
}

func (d *Delivery) GetStartPosition() *ab.SeekPosition {
if d.lastBlockReceived != 0 {
if logger.IsEnabledFor(zapcore.DebugLevel) {
logger.Debugf("restarting from the last block received [%d]", d.lastBlockReceived)
}

return &ab.SeekPosition{
Type: &ab.SeekPosition_Specified{
Specified: &ab.SeekSpecified{
Number: d.lastBlockReceived,
},
},
}
}

if logger.IsEnabledFor(zapcore.DebugLevel) {
logger.Debugf("no last block received set [%d], check last TxID in the vault", d.lastBlockReceived)
}

lastTxID, err := d.vault.GetLastTxID()
if err != nil {
logger.Errorf("failed getting last transaction committed/discarded from the vault [%s], restarting from genesis", err)
return StartGenesis
}

if len(lastTxID) != 0 && !strings.HasPrefix(lastTxID, committer.ConfigTXPrefix) {
// Retrieve block from Fabric
ch, err := d.network.Channel(d.channel)
if err != nil {
logger.Errorf("failed getting channel [%s], restarting from genesis: [%s]", d.channel, err)
return StartGenesis
}
blockNumber, err := ch.GetBlockNumberByTxID(lastTxID)
if err != nil {
logger.Errorf("failed getting block number for transaction [%s], restart from genesis [%s]", lastTxID, err)
return StartGenesis
}
if logger.IsEnabledFor(zapcore.DebugLevel) {
logger.Debugf("restarting from block [%d], tx [%s]", blockNumber, lastTxID)
}

return &ab.SeekPosition{
Type: &ab.SeekPosition_Specified{
Specified: &ab.SeekSpecified{
Number: blockNumber,
},
},
}
}

return StartGenesis
}

func (d *Delivery) cleanup() {
if d.client != nil {
d.client.Close()
Expand Down

0 comments on commit 35c062a

Please sign in to comment.