Skip to content

Commit 559945f

Browse files
committed
fix: support RM delete undo log request
1 parent 174d2f8 commit 559945f

File tree

7 files changed

+678
-1
lines changed

7 files changed

+678
-1
lines changed

pkg/protocol/codec/codec.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ func Init() {
139139
// RM
140140
GetCodecManager().RegisterCodec(CodecTypeSeata, &RegisterRMRequestCodec{})
141141
GetCodecManager().RegisterCodec(CodecTypeSeata, &RegisterRMResponseCodec{})
142+
GetCodecManager().RegisterCodec(CodecTypeSeata, &UndoLogDeleteRequestCodec{})
142143

143144
// TM
144145
GetCodecManager().RegisterCodec(CodecTypeSeata, &RegisterTMRequestCodec{})
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2. 0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache. org/licenses/LICENSE-2. 0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package codec
19+
20+
import (
21+
"seata.apache.org/seata-go/pkg/protocol/branch"
22+
"seata.apache.org/seata-go/pkg/protocol/message"
23+
"seata.apache.org/seata-go/pkg/util/bytes"
24+
)
25+
26+
type UndoLogDeleteRequestCodec struct{}
27+
28+
func (u *UndoLogDeleteRequestCodec) Decode(in []byte) interface{} {
29+
data := message.UndoLogDeleteRequest{}
30+
buf := bytes.NewByteBuffer(in)
31+
32+
data.ResourceId = bytes.ReadString16Length(buf)
33+
saveDays, _ := buf.ReadUint16()
34+
data.SaveDays = int16(saveDays)
35+
data.BranchType = branch.BranchType(bytes.ReadByte(buf))
36+
37+
return data
38+
}
39+
40+
func (u *UndoLogDeleteRequestCodec) Encode(in interface{}) []byte {
41+
data, _ := in.(message.UndoLogDeleteRequest)
42+
buf := bytes.NewByteBuffer([]byte{})
43+
44+
bytes.WriteString16Length(data.ResourceId, buf)
45+
buf.WriteUint16(uint16(data.SaveDays))
46+
buf.WriteByte(byte(data.BranchType))
47+
48+
return buf.Bytes()
49+
}
50+
51+
func (u *UndoLogDeleteRequestCodec) GetMessageType() message.MessageType {
52+
return message.MessageTypeRmDeleteUndolog
53+
}
Lines changed: 274 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,274 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package codec
19+
20+
import (
21+
"testing"
22+
23+
"github.com/stretchr/testify/assert"
24+
"seata.apache.org/seata-go/pkg/protocol/branch"
25+
"seata.apache.org/seata-go/pkg/protocol/message"
26+
)
27+
28+
func TestUndoLogDeleteRequestCodec_Encode(t *testing.T) {
29+
tests := []struct {
30+
name string
31+
req message.UndoLogDeleteRequest
32+
}{
33+
{
34+
name: "normal case with resource id",
35+
req: message.UndoLogDeleteRequest{
36+
ResourceId: "jdbc:mysql://localhost:3306/seata",
37+
SaveDays: 7,
38+
BranchType: branch.BranchTypeAT,
39+
},
40+
},
41+
{
42+
name: "empty resource id",
43+
req: message.UndoLogDeleteRequest{
44+
ResourceId: "",
45+
SaveDays: 30,
46+
BranchType: branch.BranchTypeTCC,
47+
},
48+
},
49+
{
50+
name: "max save days",
51+
req: message.UndoLogDeleteRequest{
52+
ResourceId: "test-resource",
53+
SaveDays: 32767, // max int16
54+
BranchType: branch.BranchTypeXA,
55+
},
56+
},
57+
{
58+
name: "min save days",
59+
req: message.UndoLogDeleteRequest{
60+
ResourceId: "test-resource",
61+
SaveDays: -32768, // min int16
62+
BranchType: branch.BranchTypeAT,
63+
},
64+
},
65+
{
66+
name: "zero save days",
67+
req: message.UndoLogDeleteRequest{
68+
ResourceId: "test-resource",
69+
SaveDays: 0,
70+
BranchType: branch.BranchTypeAT,
71+
},
72+
},
73+
}
74+
75+
codec := &UndoLogDeleteRequestCodec{}
76+
77+
for _, tt := range tests {
78+
t.Run(tt.name, func(t *testing.T) {
79+
encoded := codec.Encode(tt.req)
80+
assert.NotNil(t, encoded)
81+
assert.Greater(t, len(encoded), 0, "encoded data should not be empty")
82+
})
83+
}
84+
}
85+
86+
func TestUndoLogDeleteRequestCodec_Decode(t *testing.T) {
87+
codec := &UndoLogDeleteRequestCodec{}
88+
89+
tests := []struct {
90+
name string
91+
req message.UndoLogDeleteRequest
92+
validate func(t *testing.T, decoded message.UndoLogDeleteRequest)
93+
}{
94+
{
95+
name: "decode normal request",
96+
req: message.UndoLogDeleteRequest{
97+
ResourceId: "jdbc:mysql://localhost:3306/seata",
98+
SaveDays: 7,
99+
BranchType: branch.BranchTypeAT,
100+
},
101+
validate: func(t *testing.T, decoded message.UndoLogDeleteRequest) {
102+
assert.Equal(t, "jdbc:mysql://localhost:3306/seata", decoded.ResourceId)
103+
assert.Equal(t, int16(7), decoded.SaveDays)
104+
assert.Equal(t, branch.BranchTypeAT, decoded.BranchType)
105+
},
106+
},
107+
{
108+
name: "decode request with TCC branch type",
109+
req: message.UndoLogDeleteRequest{
110+
ResourceId: "tcc-resource",
111+
SaveDays: 15,
112+
BranchType: branch.BranchTypeTCC,
113+
},
114+
validate: func(t *testing.T, decoded message.UndoLogDeleteRequest) {
115+
assert.Equal(t, "tcc-resource", decoded.ResourceId)
116+
assert.Equal(t, int16(15), decoded.SaveDays)
117+
assert.Equal(t, branch.BranchTypeTCC, decoded.BranchType)
118+
},
119+
},
120+
{
121+
name: "decode request with XA branch type",
122+
req: message.UndoLogDeleteRequest{
123+
ResourceId: "xa-resource",
124+
SaveDays: 30,
125+
BranchType: branch.BranchTypeXA,
126+
},
127+
validate: func(t *testing.T, decoded message.UndoLogDeleteRequest) {
128+
assert.Equal(t, "xa-resource", decoded.ResourceId)
129+
assert.Equal(t, int16(30), decoded.SaveDays)
130+
assert.Equal(t, branch.BranchTypeXA, decoded.BranchType)
131+
},
132+
},
133+
}
134+
135+
for _, tt := range tests {
136+
t.Run(tt.name, func(t *testing.T) {
137+
// Encode first
138+
encoded := codec.Encode(tt.req)
139+
assert.NotNil(t, encoded)
140+
141+
// Then decode
142+
decoded := codec.Decode(encoded)
143+
assert.NotNil(t, decoded)
144+
145+
decodedReq, ok := decoded.(message.UndoLogDeleteRequest)
146+
assert.True(t, ok, "decoded result should be UndoLogDeleteRequest type")
147+
148+
// Validate
149+
tt.validate(t, decodedReq)
150+
})
151+
}
152+
}
153+
154+
func TestUndoLogDeleteRequestCodec_EncodeDecode(t *testing.T) {
155+
codec := &UndoLogDeleteRequestCodec{}
156+
157+
tests := []struct {
158+
name string
159+
req message.UndoLogDeleteRequest
160+
}{
161+
{
162+
name: "round trip - normal case",
163+
req: message.UndoLogDeleteRequest{
164+
ResourceId: "jdbc:mysql://127.0.0.1:3306/test_db",
165+
SaveDays: 10,
166+
BranchType: branch.BranchTypeAT,
167+
},
168+
},
169+
{
170+
name: "round trip - empty resource id",
171+
req: message.UndoLogDeleteRequest{
172+
ResourceId: "",
173+
SaveDays: 5,
174+
BranchType: branch.BranchTypeTCC,
175+
},
176+
},
177+
{
178+
name: "round trip - negative save days",
179+
req: message.UndoLogDeleteRequest{
180+
ResourceId: "test-resource",
181+
SaveDays: -1,
182+
BranchType: branch.BranchTypeXA,
183+
},
184+
},
185+
}
186+
187+
for _, tt := range tests {
188+
t.Run(tt.name, func(t *testing.T) {
189+
// Encode
190+
encoded := codec.Encode(tt.req)
191+
assert.NotNil(t, encoded)
192+
193+
// Decode
194+
decoded := codec.Decode(encoded)
195+
assert.NotNil(t, decoded)
196+
197+
decodedReq, ok := decoded.(message.UndoLogDeleteRequest)
198+
assert.True(t, ok)
199+
200+
// Verify all fields match
201+
assert.Equal(t, tt.req.ResourceId, decodedReq.ResourceId, "ResourceId should match")
202+
assert.Equal(t, tt.req.SaveDays, decodedReq.SaveDays, "SaveDays should match")
203+
assert.Equal(t, tt.req.BranchType, decodedReq.BranchType, "BranchType should match")
204+
})
205+
}
206+
}
207+
208+
func TestUndoLogDeleteRequestCodec_GetMessageType(t *testing.T) {
209+
codec := &UndoLogDeleteRequestCodec{}
210+
msgType := codec.GetMessageType()
211+
212+
assert.Equal(t, message.MessageTypeRmDeleteUndolog, msgType,
213+
"message type should be MessageTypeRmDeleteUndolog (111)")
214+
}
215+
216+
func TestUndoLogDeleteRequestCodec_Integration(t *testing.T) {
217+
// Test integration with codec manager
218+
Init() // Initialize all codecs
219+
220+
cm := GetCodecManager()
221+
codec := cm.GetCodec(CodecTypeSeata, message.MessageTypeRmDeleteUndolog)
222+
223+
assert.NotNil(t, codec, "UndoLogDeleteRequest codec should be registered")
224+
225+
// Test encoding through codec manager
226+
req := message.UndoLogDeleteRequest{
227+
ResourceId: "integration-test-resource",
228+
SaveDays: 20,
229+
BranchType: branch.BranchTypeAT,
230+
}
231+
232+
encoded := cm.Encode(CodecTypeSeata, req)
233+
assert.NotNil(t, encoded)
234+
assert.Greater(t, len(encoded), 2, "encoded data should include type code (2 bytes) + body")
235+
236+
// Test decoding through codec manager
237+
decoded := cm.Decode(CodecTypeSeata, encoded)
238+
assert.NotNil(t, decoded)
239+
240+
decodedReq, ok := decoded.(message.UndoLogDeleteRequest)
241+
assert.True(t, ok)
242+
assert.Equal(t, req.ResourceId, decodedReq.ResourceId)
243+
assert.Equal(t, req.SaveDays, decodedReq.SaveDays)
244+
assert.Equal(t, req.BranchType, decodedReq.BranchType)
245+
}
246+
247+
func BenchmarkUndoLogDeleteRequestCodec_Encode(b *testing.B) {
248+
codec := &UndoLogDeleteRequestCodec{}
249+
req := message.UndoLogDeleteRequest{
250+
ResourceId: "jdbc:mysql://localhost:3306/seata",
251+
SaveDays: 7,
252+
BranchType: branch.BranchTypeAT,
253+
}
254+
255+
b.ResetTimer()
256+
for i := 0; i < b.N; i++ {
257+
_ = codec.Encode(req)
258+
}
259+
}
260+
261+
func BenchmarkUndoLogDeleteRequestCodec_Decode(b *testing.B) {
262+
codec := &UndoLogDeleteRequestCodec{}
263+
req := message.UndoLogDeleteRequest{
264+
ResourceId: "jdbc:mysql://localhost:3306/seata",
265+
SaveDays: 7,
266+
BranchType: branch.BranchTypeAT,
267+
}
268+
encoded := codec.Encode(req)
269+
270+
b.ResetTimer()
271+
for i := 0; i < b.N; i++ {
272+
_ = codec.Decode(encoded)
273+
}
274+
}

pkg/protocol/message/request_message.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ func (req GlobalRollbackRequest) GetTypeCode() MessageType {
131131

132132
type UndoLogDeleteRequest struct {
133133
ResourceId string
134-
SaveDays MessageType
134+
SaveDays int16
135135
BranchType model2.BranchType
136136
}
137137

pkg/remoting/processor/client/init.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,5 @@ func RegisterProcessor() {
2323
initOnResponse()
2424
initBranchCommit()
2525
initBranchRollback()
26+
initDeleteUndoLog()
2627
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package client
19+
20+
import (
21+
"context"
22+
23+
"seata.apache.org/seata-go/pkg/protocol/message"
24+
"seata.apache.org/seata-go/pkg/remoting/getty"
25+
"seata.apache.org/seata-go/pkg/util/log"
26+
)
27+
28+
func initDeleteUndoLog() {
29+
rmDeleteUndoLogProcessor := &rmDeleteUndoLogProcessor{}
30+
getty.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageTypeRmDeleteUndolog, rmDeleteUndoLogProcessor)
31+
}
32+
33+
type rmDeleteUndoLogProcessor struct{}
34+
35+
func (r *rmDeleteUndoLogProcessor) Process(ctx context.Context, rpcMessage message.RpcMessage) error {
36+
req := rpcMessage.Body.(message.UndoLogDeleteRequest)
37+
38+
log.Infof("Received undo log delete request: resourceId=%s, saveDays=%d, branchType=%v",
39+
req.ResourceId, req.SaveDays, req.BranchType)
40+
41+
// TODO: Implement actual undo log deletion logic here
42+
// This should delete undo log records older than saveDays for the specified resourceId and branchType
43+
44+
// Note: The TC server sends this message as a one-way notification, no response is required
45+
return nil
46+
}

0 commit comments

Comments
 (0)