-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'master' of github.com:hyperledger/fabric into Add_role_…
…type
- Loading branch information
Showing
506 changed files
with
1,561 additions
and
266 deletions.
There are no files selected for viewing
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
Empty file.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
/* | ||
Licensed to the Apache Software Foundation (ASF) under one | ||
or more contributor license agreements. See the NOTICE file | ||
distributed with this work for additional information | ||
regarding copyright ownership. The ASF licenses this file | ||
to you under the Apache License, Version 2.0 (the | ||
"License"); you may not use this file except in compliance | ||
with the License. You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, | ||
software distributed under the License is distributed on an | ||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
KIND, either express or implied. See the License for the | ||
specific language governing permissions and limitations | ||
under the License. | ||
*/ | ||
|
||
package obcpbft | ||
|
||
import ( | ||
"time" | ||
|
||
"github.com/hyperledger/fabric/consensus/obcpbft/custodian" | ||
) | ||
|
||
// complaintHandler represents a receiver of complaints | ||
type complaintHandler interface { | ||
// Complain is called by the complainer to signal that custody | ||
// or complaint timeout was reached. The second argument | ||
// specifies whether the timeout was a Custody timeout (false) | ||
// or Complaint timeout (true). | ||
Complain(string, *Request, bool) | ||
} | ||
|
||
// complainer provideds lifecycle management to ensure that Requests | ||
// are not censored by a primary. Initially Requests are put into the | ||
// custody of the complainer using Custody(). When the Request has | ||
// been processed successfully by the network, Success() signals to | ||
// the complainer to drop custody of the Request. If no success is | ||
// signaled before the custody timeout expires, the complainer will | ||
// signal this fact to the registered handler. Likewise, Complaint() | ||
// will register a Request for complaining. Requests stay | ||
// indefinitely in Custody and will repeatedly signal timeouts until | ||
// they are removed using Success(); Requests registered via | ||
// Complaint() only signal a timeout once. | ||
type complainer struct { | ||
custody *custodian.Custodian | ||
complaints *custodian.Custodian | ||
|
||
h complaintHandler | ||
} | ||
|
||
// newComplainer creates a new complainer. | ||
func newComplainer(h complaintHandler, custodyTimeout time.Duration, complaintTimeout time.Duration) *complainer { | ||
c := &complainer{} | ||
c.custody = custodian.New(custodyTimeout, c.custodyTimeout) | ||
c.complaints = custodian.New(complaintTimeout, c.complaintTimeout) | ||
c.h = h | ||
return c | ||
} | ||
|
||
// Stop cleans up all outstanding goroutines of the complainer. | ||
// Typically only used in tests. | ||
func (c *complainer) Stop() { | ||
c.custody.Stop() | ||
c.complaints.Stop() | ||
} | ||
|
||
// Custody adds a Request into custody of the complainer. When the | ||
// custody timeout expires, the complaintHandler will be invoked with | ||
// the bool argument set to false. The Request stays in custody until | ||
// Success() is called. | ||
func (c *complainer) Custody(req *Request) string { | ||
hash := hashReq(req) | ||
c.custody.Register(hash, req) | ||
return hash | ||
} | ||
|
||
// custodyTimeout is the callback from the Custodian for requests that | ||
// are in custody. | ||
func (c *complainer) custodyTimeout(hash string, req_ interface{}) { | ||
req := req_.(*Request) | ||
c.custody.Register(hash, req) | ||
c.h.Complain(hash, req, false) | ||
} | ||
|
||
// Complaint adds a Request to the complaint queue of the complainer. | ||
// When the complaint timeout expires, the complaintHandler will be | ||
// invoked with the bool argument set to true. The Request is removed | ||
// from the complaint queue once the timeout expires. | ||
func (c *complainer) Complaint(req *Request) string { | ||
hash := hashReq(req) | ||
c.complaints.Register(hash, req) | ||
return hash | ||
} | ||
|
||
// complaintTimeout is the callback from the Custodian for requests | ||
// that are in the complaint queue. | ||
func (c *complainer) complaintTimeout(hash string, req_ interface{}) { | ||
req := req_.(*Request) | ||
c.h.Complain(hash, req, true) | ||
} | ||
|
||
// Success removes a Request from both custody and complaint queues. | ||
func (c *complainer) Success(req *Request) { | ||
hash := hashReq(req) | ||
c.SuccessHash(hash) | ||
} | ||
|
||
// SuccessHash is like Success, but takes directly the hash of the | ||
// Request, as returned by hashReq. | ||
func (c *complainer) SuccessHash(hash string) { | ||
c.custody.Remove(hash) | ||
c.complaints.Remove(hash) | ||
} | ||
|
||
// Restart resets custody and complaint queues without calling into | ||
// the complaintHandler. The complaint queue is drained completely. | ||
// The custody queue timeouts are reset. Restart returns all requests | ||
// that are maintained in custody. | ||
func (c *complainer) Restart() map[string]*Request { | ||
c.complaints.RemoveAll() | ||
custody := c.custody.RemoveAll() | ||
reqs := make(map[string]*Request) | ||
for _, pair := range custody { | ||
c.custody.Register(pair.ID, pair.Data) | ||
reqs[pair.ID] = pair.Data.(*Request) | ||
} | ||
return reqs | ||
} |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,181 @@ | ||
/* | ||
Licensed to the Apache Software Foundation (ASF) under one | ||
or more contributor license agreements. See the NOTICE file | ||
distributed with this work for additional information | ||
regarding copyright ownership. The ASF licenses this file | ||
to you under the Apache License, Version 2.0 (the | ||
"License"); you may not use this file except in compliance | ||
with the License. You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, | ||
software distributed under the License is distributed on an | ||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
KIND, either express or implied. See the License for the | ||
specific language governing permissions and limitations | ||
under the License. | ||
*/ | ||
|
||
package custodian | ||
|
||
import ( | ||
"sync" | ||
"time" | ||
) | ||
|
||
type custody struct { | ||
id string | ||
data interface{} | ||
deadline time.Time | ||
canceled bool | ||
} | ||
|
||
// Custodian provides a timeout service for objects. The timeout is | ||
// the same for all enqueued objects. Order is retained. | ||
type Custodian struct { | ||
lock sync.Mutex | ||
timeout time.Duration | ||
timer *time.Timer | ||
notifyCb CustodyNotify | ||
stopCh chan struct{} | ||
requests map[string]*custody | ||
seq []*custody | ||
} | ||
|
||
// CustodyPair is a tuple of enqueued id and data object | ||
type CustodyPair struct { | ||
ID string | ||
Data interface{} | ||
} | ||
|
||
// CustodyNotify is the callback function type as called by the | ||
// Custodian when the timeout expires | ||
type CustodyNotify func(id string, data interface{}) | ||
|
||
// New creates a new Custodian. Timeout specifies the timeout of the | ||
// Custodian, notifyCb specifies the callback function that is called | ||
// by the Custodian when a timeout expires. | ||
func New(timeout time.Duration, notifyCb CustodyNotify) *Custodian { | ||
c := &Custodian{ | ||
timeout: timeout, | ||
notifyCb: notifyCb, | ||
requests: make(map[string]*custody), | ||
} | ||
c.timer = time.NewTimer(time.Hour) | ||
c.timer.Stop() | ||
c.stopCh = make(chan struct{}) | ||
go c.notifyRoutine() | ||
return c | ||
} | ||
|
||
// Stop closes down all Custodian activity. Only used for tests. | ||
func (c *Custodian) Stop() { | ||
close(c.stopCh) | ||
} | ||
|
||
// Register enqueues a new object to the custodian. The data object | ||
// is referred to by id. | ||
func (c *Custodian) Register(id string, data interface{}) { | ||
obj := &custody{ | ||
id: id, | ||
data: data, | ||
deadline: time.Now().Add(c.timeout), | ||
} | ||
c.lock.Lock() | ||
defer c.lock.Unlock() | ||
c.requests[obj.id] = obj | ||
c.seq = append(c.seq, obj) | ||
if len(c.seq) == 1 { | ||
c.resetTimer() | ||
} | ||
} | ||
|
||
// Remove removes an object from custody. No callback will be invoked | ||
// on this object anymore. | ||
func (c *Custodian) Remove(id string) bool { | ||
c.lock.Lock() | ||
defer c.lock.Unlock() | ||
obj, ok := c.requests[id] | ||
if ok { | ||
delete(c.requests, id) | ||
obj.canceled = true | ||
obj.data = nil | ||
} | ||
return ok | ||
} | ||
|
||
// Elements returns all objects that are currently under custody. | ||
func (c *Custodian) Elements() []CustodyPair { | ||
c.lock.Lock() | ||
defer c.lock.Unlock() | ||
return c.syncElements() | ||
} | ||
|
||
func (c *Custodian) syncElements() []CustodyPair { | ||
var m []CustodyPair | ||
for _, obj := range c.seq { | ||
if obj.canceled { | ||
continue | ||
} | ||
m = append(m, CustodyPair{obj.id, obj.data}) | ||
} | ||
return m | ||
} | ||
|
||
// RemoveAll deletes all objects from custody. No callbacks will be | ||
// invoked. RemoveAll returns all objects that have been under | ||
// custody. | ||
func (c *Custodian) RemoveAll() []CustodyPair { | ||
c.lock.Lock() | ||
defer c.lock.Unlock() | ||
c.timer.Stop() | ||
m := c.syncElements() | ||
c.requests = make(map[string]*custody) | ||
c.seq = nil | ||
return m | ||
} | ||
|
||
// resetTimer must be called with lock held | ||
func (c *Custodian) resetTimer() { | ||
if len(c.seq) == 0 { | ||
c.timer.Stop() | ||
return | ||
} | ||
next := c.seq[0] | ||
diff := next.deadline.Sub(time.Now()) | ||
if diff < 0 { | ||
diff = 0 | ||
} | ||
c.timer.Reset(diff) | ||
} | ||
|
||
func (c *Custodian) notifyRoutine() { | ||
for { | ||
select { | ||
case <-c.timer.C: | ||
break | ||
case <-c.stopCh: | ||
c.stopCh = nil | ||
return | ||
} | ||
c.lock.Lock() | ||
var expired []CustodyPair | ||
for _, obj := range c.seq { | ||
if obj.deadline.After(time.Now()) { | ||
break | ||
} | ||
if !obj.canceled { | ||
expired = append(expired, CustodyPair{obj.id, obj.data}) | ||
} | ||
delete(c.requests, obj.id) | ||
c.seq = c.seq[1:] | ||
} | ||
c.resetTimer() | ||
c.lock.Unlock() | ||
|
||
for _, data := range expired { | ||
c.notifyCb(data.ID, data.Data) | ||
} | ||
} | ||
} |
Oops, something went wrong.