Hyperledger Fabric链码数据模型的探讨(1)之转账和交易
编写过一些链码的人可能会觉得是在操作一个简单的key-value数据库, 就是GetState和PutState去操作键值对,而对复杂些的一对多,多对多等实体关系和数据模型不知怎么设计。我们先从官方的例子入手一起探讨下。
1.简单转账例子
/fabric-samples/chaincode/chaincode_example02/go/chaincode_example02.go
假设链码调用peer chaincode invoke … -c ‘{“Args”:[“invoke”,“a”,“b”,“10”]}’ 转账逻辑简单, 就是把用户a上的余额加10, b账户上钱减10, 最后重新putState保存两个用户状态即可。
而现实当中, 我们关系户头上剩下多少钱, 也关心消费和收入的每笔流水账。所有流水账的总和应该等于账户余额,后面我们考虑如何记账本上记录流水账。
// Transaction makes payment of X units from A to B
func (t *SimpleChaincode) invoke(stub shim.ChaincodeStubInterface, args []string) pb.Response {
var A, B string // Entities
var Aval, Bval int // Asset holdings
var X int // Transaction value
var err error
if len(args) != 3 {
return shim.Error("Incorrect number of arguments. Expecting 3")
}
A = args[0]
B = args[1]
// Get the state from the ledger
// TODO: will be nice to have a GetAllState call to ledger
Avalbytes, err := stub.GetState(A)
if err != nil {
return shim.Error("Failed to get state")
}
if Avalbytes == nil {
return shim.Error("Entity not found")
}
Aval, _ = strconv.Atoi(string(Avalbytes))
Bvalbytes, err := stub.GetState(B)
if err != nil {
return shim.Error("Failed to get state")
}
if Bvalbytes == nil {
return shim.Error("Entity not found")
}
Bval, _ = strconv.Atoi(string(Bvalbytes))
// Perform the execution
X, err = strconv.Atoi(args[2])
if err != nil {
return shim.Error("Invalid transaction amount, expecting a integer value")
}
Aval = Aval - X
Bval = Bval + X
fmt.Printf("Aval = %d, Bval = %d\n", Aval, Bval)
// Write the state back to the ledger
err = stub.PutState(A, []byte(strconv.Itoa(Aval)))
if err != nil {
return shim.Error(err.Error())
}
err = stub.PutState(B, []byte(strconv.Itoa(Bval)))
if err != nil {
return shim.Error(err.Error())
}
return shim.Success(nil)
}
2.Marbles简单资产转移
Marbles是个弹珠游戏, 模拟多个用户可创建和转移弹珠, 具体可以参考说明 https://github.com/IBM-Blockchain/marbles/blob/master/README-cn.md 链码位置fabric-samples/chaincode/marbles02/go/marbles_chaincode.go
弹珠数据结构, name作为key值,拥有颜色,大小和拥有者等属性。
type marble struct {
ObjectType string `json:"docType"` //docType is used to distinguish the various types of objects in state database
Name string `json:"name"` //the fieldtags are needed to keep case from bouncing around
Color string `json:"color"`
Size int `json:"size"`
Owner string `json:"owner"`
}
初始化新增三个弹珠
// peer chaincode invoke -C myc1 -n marbles -c '{"Args":["initMarble","marble1","blue","35","tom"]}'
// peer chaincode invoke -C myc1 -n marbles -c '{"Args":["initMarble","marble2","red","50","tom"]}'
// peer chaincode invoke -C myc1 -n marbles -c '{"Args":["initMarble","marble3","blue","70","tom"]}'
// peer chaincode invoke -C myc1 -n marbles -c '{"Args":["transferMarble","marble2","jerry"]}'
把第二颗弹珠marble2的拥有者从tom修改为jerry,即tom把第二颗珠子给了jerry. 代码较为简单, 用marble2名字找到珠子后修改拥有人属性即可。
// ===========================================================
// transfer a marble by setting a new owner name on the marble
// ===========================================================
func (t *SimpleChaincode) transferMarble(stub shim.ChaincodeStubInterface, args []string) pb.Response {
// 0 1
// "name", "bob"
// ===========================================================
func (t *SimpleChaincode) transferMarble(stub shim.ChaincodeStubInterface, args []string) pb.Response {
// 0 1
// "name", "bob"
if len(args) < 2 {
return shim.Error("Incorrect number of arguments. Expecting 2")
}
marbleName := args[0]
newOwner := strings.ToLower(args[1])
fmt.Println("- start transferMarble ", marbleName, newOwner)
marbleAsBytes, err := stub.GetState(marbleName)
if err != nil {
return shim.Error("Failed to get marble:" + err.Error())
} else if marbleAsBytes == nil {
return shim.Error("Marble does not exist")
}
marbleToTransfer := marble{}
err = json.Unmarshal(marbleAsBytes, &marbleToTransfer) //unmarshal it aka JSON.parse()
if err != nil {
return shim.Error(err.Error())
}
marbleToTransfer.Owner = newOwner //change the owner
marbleJSONasBytes, _ := json.Marshal(marbleToTransfer)
err = stub.PutState(marbleName, marbleJSONasBytes) //rewrite the marble
if err != nil {
return shim.Error(err.Error())
}
fmt.Println("- end transferMarble (success)")
return shim.Success(nil)
}
相比第一个例子转账加强了些些, Marble是资产, 对应有拥有人。如果人还拥有其它资产例如汽车, 就有点一对多的味道了吧。我们继续看例子。
3.High-through高吞吐交易例子
参考fabric-samples/high-throughput/chaincode/high-throughput.go
(1)update使用一个组合key聚合, 名字~操作符+或-~数值~交易ID, value为0, 即主要的信息都是保存在组合键中, 组合键也方便用于类似模糊部分查询, 该例是使用name作为部分查询条件。 这里保存的实际每条交易就是上面我们说没记录的流水了。
/**
* Updates the ledger to include a new delta for a particular variable. If this is the first time
* this variable is being added to the ledger, then its initial value is assumed to be 0. The arguments
* to give in the args array are as follows:
* - args[0] -> name of the variable
* - args[1] -> new delta (float)
* - args[2] -> operation (currently supported are addition "+" and subtraction "-")
*
* @param APIstub The chaincode shim
* @param args The arguments array for the update invocation
*
* @return A response structure indicating success or failure with a message
*/
func (s *SmartContract) update(APIstub shim.ChaincodeStubInterface, args []string) sc.Response {
// Check we have a valid number of args
if len(args) != 3 {
return shim.Error("Incorrect number of arguments, expecting 3")
}
// Extract the args
name := args[0]
op := args[2]
_, err := strconv.ParseFloat(args[1], 64)
if err != nil {
return shim.Error("Provided value was not a number")
}
// Make sure a valid operator is provided
if op != "+" && op != "-" {
return shim.Error(fmt.Sprintf("Operator %s is unrecognized", op))
}
// Retrieve info needed for the update procedure
txid := APIstub.GetTxID()
compositeIndexName := "varName~op~value~txID"
// Create the composite key that will allow us to query for all deltas on a particular variable
compositeKey, compositeErr := APIstub.CreateCompositeKey(compositeIndexName, []string{name, op, args[1], txid})
if compositeErr != nil {
return shim.Error(fmt.Sprintf("Could not create a composite key for %s: %s", name, compositeErr.Error()))
}
// Save the composite key index
compositePutErr := APIstub.PutState(compositeKey, []byte{0x00})
if compositePutErr != nil {
return shim.Error(fmt.Sprintf("Could not put operation for %s in the ledger: %s", name, compositePutErr.Error()))
}
return shim.Success([]byte(fmt.Sprintf("Successfully added %s%s to %s", op, args[1], name)))
}
(2)get则是按照name作为部分查询条件, 查询到对应name归属的所有交易流水了, 最后可以计算出交易流水的总和, 就是账户的余额。
/**
* Retrieves the aggregate value of a variable in the ledger. Gets all delta rows for the variable
* and computes the final value from all deltas. The args array for the invocation must contain the
* following argument:
* - args[0] -> The name of the variable to get the value of
*
* @param APIstub The chaincode shim
* @param args The arguments array for the get invocation
*
* @return A response structure indicating success or failure with a message
*/
func (s *SmartContract) get(APIstub shim.ChaincodeStubInterface, args []string) sc.Response {
// Check we have a valid number of args
if len(args) != 1 {
return shim.Error("Incorrect number of arguments, expecting 1")
}
name := args[0]
// Get all deltas for the variable
deltaResultsIterator, deltaErr := APIstub.GetStateByPartialCompositeKey("varName~op~value~txID", []string{name})
if deltaErr != nil {
return shim.Error(fmt.Sprintf("Could not retrieve value for %s: %s", name, deltaErr.Error()))
}
defer deltaResultsIterator.Close()
// Check the variable existed
if !deltaResultsIterator.HasNext() {
return shim.Error(fmt.Sprintf("No variable by the name %s exists", name))
}
// Iterate through result set and compute final value
var finalVal float64
var i int
for i = 0; deltaResultsIterator.HasNext(); i++ {
// Get the next row
responseRange, nextErr := deltaResultsIterator.Next()
if nextErr != nil {
return shim.Error(nextErr.Error())
}
// Split the composite key into its component parts
_, keyParts, splitKeyErr := APIstub.SplitCompositeKey(responseRange.Key)
if splitKeyErr != nil {
return shim.Error(splitKeyErr.Error())
}
// Retrieve the delta value and operation
operation := keyParts[1]
valueStr := keyParts[2]
// Convert the value string and perform the operation
value, convErr := strconv.ParseFloat(valueStr, 64)
if convErr != nil {
return shim.Error(convErr.Error())
}
switch operation {
case "+":
finalVal += value
case "-":
finalVal -= value
default:
return shim.Error(fmt.Sprintf("Unrecognized operation %s", operation))
}
}
return shim.Success([]byte(strconv.FormatFloat(finalVal, 'f', -1, 64)))
}
(3)prueFast也是使用name作为查询条件查出某个用户所有的交易流水, 但累加流水记录的时候计算一条删除一条, 最后计算出余额后, 作为新的一条总流水调用update添加到该账户上, 注意这个操作是不安全的, 这个迭代过程因为删除流水如果中途出错会导致流水丢失。
/**
* Prunes a variable by deleting all of its delta rows while computing the final value. Once all rows
* have been processed and deleted, a single new row is added which defines a delta containing the final
* computed value of the variable. This function is NOT safe as any failures or errors during pruning
* will result in an undefined final value for the variable and loss of data. Use pruneSafe if data
* integrity is important. The args array contains the following argument:
* - args[0] -> The name of the variable to prune
*
* @param APIstub The chaincode shim
* @param args The args array for the pruneFast invocation
*
* @return A response structure indicating success or failure with a message
*/
func (s *SmartContract) pruneFast(APIstub shim.ChaincodeStubInterface, args []string) sc.Response {
// Check we have a valid number of ars
if len(args) != 1 {
return shim.Error("Incorrect number of arguments, expecting 1")
}
// Retrieve the name of the variable to prune
name := args[0]
// Get all delta rows for the variable
deltaResultsIterator, deltaErr := APIstub.GetStateByPartialCompositeKey("varName~op~value~txID", []string{name})
if deltaErr != nil {
return shim.Error(fmt.Sprintf("Could not retrieve value for %s: %s", name, deltaErr.Error()))
}
defer deltaResultsIterator.Close()
// Check the variable existed
if !deltaResultsIterator.HasNext() {
return shim.Error(fmt.Sprintf("No variable by the name %s exists", name))
}
// Iterate through result set computing final value while iterating and deleting each key
var finalVal float64
var i int
for i = 0; deltaResultsIterator.HasNext(); i++ {
// Get the next row
responseRange, nextErr := deltaResultsIterator.Next()
if nextErr != nil {
return shim.Error(nextErr.Error())
}
// Split the key into its composite parts
_, keyParts, splitKeyErr := APIstub.SplitCompositeKey(responseRange.Key)
if splitKeyErr != nil {
return shim.Error(splitKeyErr.Error())
}
// Retrieve the operation and value
operation := keyParts[1]
valueStr := keyParts[2]
// Convert the value to a float
value, convErr := strconv.ParseFloat(valueStr, 64)
if convErr != nil {
return shim.Error(convErr.Error())
}
// Delete the row from the ledger
deltaRowDelErr := APIstub.DelState(responseRange.Key)
if deltaRowDelErr != nil {
return shim.Error(fmt.Sprintf("Could not delete delta row: %s", deltaRowDelErr.Error()))
}
// Add the value of the deleted row to the final aggregate
switch operation {
case "+":
finalVal += value
case "-":
finalVal -= value
default:
return shim.Error(fmt.Sprintf("Unrecognized operation %s", operation))
}
}
// Update the ledger with the final value and return
updateResp := s.update(APIstub, []string{name, strconv.FormatFloat(finalVal, 'f', -1, 64), "+"})
if updateResp.Status == OK {
return shim.Success([]byte(fmt.Sprintf("Successfully pruned variable %s, final value is %f, %d rows pruned", args[0], finalVal, i)))
}
return shim.Error(fmt.Sprintf("Failed to prune variable: all rows deleted but could not update value to %f, variable no longer exists in ledger", finalVal))
}
(4)prueSafe则是安全版本, 首先会调用get方法获取对用name的总账, 先保存在key值为name_PRUE_BACKUP的值中, 接着删除所有的流水记录, 之后把总账作为新的流水update插入, 最后删除备份的总账。
/**
* This function performs the same function as pruneFast except it provides data backups in case the
* prune fails. The final aggregate value is computed before any deletion occurs and is backed up
* to a new row. This back-up row is deleted only after the new aggregate delta has been successfully
* written to the ledger. The args array contains the following argument:
* args[0] -> The name of the variable to prune
*
* @param APIstub The chaincode shim
* @param args The arguments array for the pruneSafe invocation
*
* @result A response structure indicating success or failure with a message
*/
func (s *SmartContract) pruneSafe(APIstub shim.ChaincodeStubInterface, args []string) sc.Response {
// Verify there are a correct number of arguments
if len(args) != 1 {
return shim.Error("Incorrect number of arguments, expecting 1 (the name of the variable to prune)")
}
// Get the var name
name := args[0]
// Get the var's value and process it
getResp := s.get(APIstub, args)
if getResp.Status == ERROR {
return shim.Error(fmt.Sprintf("Could not retrieve the value of %s before pruning, pruning aborted: %s", name, getResp.Message))
}
valueStr := string(getResp.Payload)
val, convErr := strconv.ParseFloat(valueStr, 64)
if convErr != nil {
return shim.Error(fmt.Sprintf("Could not convert the value of %s to a number before pruning, pruning aborted: %s", name, convErr.Error()))
}
// Store the var's value temporarily
backupPutErr := APIstub.PutState(fmt.Sprintf("%s_PRUNE_BACKUP", name), []byte(valueStr))
if backupPutErr != nil {
return shim.Error(fmt.Sprintf("Could not backup the value of %s before pruning, pruning aborted: %s", name, backupPutErr.Error()))
}
// Get all deltas for the variable
deltaResultsIterator, deltaErr := APIstub.GetStateByPartialCompositeKey("varName~op~value~txID", []string{name})
if deltaErr != nil {
return shim.Error(fmt.Sprintf("Could not retrieve value for %s: %s", name, deltaErr.Error()))
}
defer deltaResultsIterator.Close()
// Delete each row
var i int
for i = 0; deltaResultsIterator.HasNext(); i++ {
responseRange, nextErr := deltaResultsIterator.Next()
if nextErr != nil {
return shim.Error(fmt.Sprintf("Could not retrieve next row for pruning: %s", nextErr.Error()))
}
deltaRowDelErr := APIstub.DelState(responseRange.Key)
if deltaRowDelErr != nil {
return shim.Error(fmt.Sprintf("Could not delete delta row: %s", deltaRowDelErr.Error()))
}
}
// Insert new row for the final value
updateResp := s.update(APIstub, []string{name, valueStr, "+"})
if updateResp.Status == ERROR {
return shim.Error(fmt.Sprintf("Could not insert the final value of the variable after pruning, variable backup is stored in %s_PRUNE_BACKUP: %s", name, updateResp.Message))
}
// Delete the backup value
delErr := APIstub.DelState(fmt.Sprintf("%s_PRUNE_BACKUP", name))
if delErr != nil {
return shim.Error(fmt.Sprintf("Could not delete backup value %s_PRUNE_BACKUP, this does not affect the ledger but should be removed manually", name))
}
return shim.Success([]byte(fmt.Sprintf("Successfully pruned variable %s, final value is %f, %d rows pruned", name, val, i)))
}
这个操作可以认为的类似人工保证数据库事务一样的操作, 是有点绕, 不过当然不是严格的ACID, 只是如果迭代删除流水的时候出错了, 有个备份总账, 后面还能恢复,保证总余额没丢。
可能大家也有不少问题, 为什么用组合键, 用couchdb的index似乎也可以记录流水, key就用交易ID, 能不能像传统数据库那样设计, 一个账户表, 保存名字和余额, 另外一个账户交易流水表, 保存用户的每个交易流水。
带着这些疑问, 我们下次再扯.
- 原文作者:Zealot
- 原文链接:https://www.51discuss.com/posts/chaincode-model-disscuss-1/
- 版权声明:本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可,非商业转载请注明出处(作者,原文链接),商业转载请联系作者获得授权。