Skip to content

Commit c92c8a7

Browse files
committed
merge: pre-release-v3.8.4
1 parent 07badb1 commit c92c8a7

File tree

17 files changed

+1211
-14
lines changed

17 files changed

+1211
-14
lines changed

.github/workflows/update-version-file-on-release.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,24 +96,24 @@ jobs:
9696
repo,
9797
per_page: 100
9898
});
99-
99+
100100
release = releases.data.find(r => r.draft && r.tag_name === tagName);
101101
if (!release) {
102102
throw new Error(`No release found with tag ${tagName}`);
103103
}
104104
}
105-
105+
106106
await github.rest.repos.updateRelease({
107107
owner,
108108
repo,
109109
release_id: release.id,
110110
draft: false,
111111
prerelease: release.prerelease
112112
});
113-
113+
114114
const status = release.draft ? "was draft" : "was already published";
115115
core.info(`Release ${tagName} ensured to be published (${status}).`);
116-
116+
117117
} catch (error) {
118118
core.warning(`Could not find or update release for tag ${tagName}: ${error.message}`);
119119
}

config/openim-rpc-msg.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
rpc:
22
# The IP address where this RPC service registers itself; if left blank, it defaults to the internal network IP
3-
registerIP:
3+
registerIP:
44
# IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP
55
listenIP: 0.0.0.0
66
# autoSetPorts indicates whether to automatically set the ports

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ require (
2727
require github.com/google/uuid v1.6.0
2828

2929
require (
30+
github.com/IBM/sarama v1.43.0
3031
github.com/fatih/color v1.14.1
3132
github.com/gin-contrib/gzip v1.0.1
3233
github.com/go-redis/redis v6.15.9+incompatible
@@ -54,7 +55,6 @@ require (
5455
cloud.google.com/go/iam v1.1.7 // indirect
5556
cloud.google.com/go/longrunning v0.5.5 // indirect
5657
cloud.google.com/go/storage v1.40.0 // indirect
57-
github.com/IBM/sarama v1.43.0 // indirect
5858
github.com/MicahParks/keyfunc v1.9.0 // indirect
5959
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible // indirect
6060
github.com/aws/aws-sdk-go-v2 v1.32.5 // indirect

internal/api/router.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"github.com/gin-contrib/gzip"
99
"github.com/gin-gonic/gin"
1010
"github.com/gin-gonic/gin/binding"
11-
"github.com/go-playground/validator/v10"
1211
"github.com/openimsdk/open-im-server/v3/internal/api/jssdk"
1312
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
1413
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
@@ -290,6 +289,7 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cf
290289
conversationGroup.POST("/get_not_notify_conversation_ids", c.GetNotNotifyConversationIDs)
291290
conversationGroup.POST("/get_pinned_conversation_ids", c.GetPinnedConversationIDs)
292291
conversationGroup.POST("/delete_conversations", c.DeleteConversations)
292+
conversationGroup.POST("/update_conversations_by_user", c.UpdateConversationsByUser)
293293
}
294294

295295
{

internal/rpc/conversation/sync.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func (c *conversationServer) GetFullOwnerConversationIDs(ctx context.Context, re
2727
conversationIDs = nil
2828
}
2929
return &conversation.GetFullOwnerConversationIDsResp{
30-
Version: idHash,
30+
Version: uint64(vl.Version),
3131
VersionID: vl.ID.Hex(),
3232
Equal: req.IdHash == idHash,
3333
ConversationIDs: conversationIDs,

internal/rpc/group/sync.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func (g *groupServer) GetFullGroupMemberUserIDs(ctx context.Context, req *pbgrou
3434
userIDs = nil
3535
}
3636
return &pbgroup.GetFullGroupMemberUserIDsResp{
37-
Version: idHash,
37+
Version: uint64(vl.Version),
3838
VersionID: vl.ID.Hex(),
3939
Equal: req.IdHash == idHash,
4040
UserIDs: userIDs,
@@ -58,7 +58,7 @@ func (g *groupServer) GetFullJoinGroupIDs(ctx context.Context, req *pbgroup.GetF
5858
groupIDs = nil
5959
}
6060
return &pbgroup.GetFullJoinGroupIDsResp{
61-
Version: idHash,
61+
Version: uint64(vl.Version),
6262
VersionID: vl.ID.Hex(),
6363
Equal: req.IdHash == idHash,
6464
GroupIDs: groupIDs,

internal/rpc/relation/sync.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func (s *friendServer) GetFullFriendUserIDs(ctx context.Context, req *relation.G
5656
userIDs = nil
5757
}
5858
return &relation.GetFullFriendUserIDsResp{
59-
Version: idHash,
59+
Version: uint64(vl.Version),
6060
VersionID: vl.ID.Hex(),
6161
Equal: req.IdHash == idHash,
6262
UserIDs: userIDs,

pkg/common/storage/cache/redis/token.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"github.com/openimsdk/tools/errs"
1414
"github.com/openimsdk/tools/log"
1515
"github.com/openimsdk/tools/utils/datautil"
16-
"github.com/redis/go-redis/v9"
1716
)
1817

1918
type tokenCache struct {
@@ -220,7 +219,6 @@ func (c *tokenCache) DeleteAndSetTemporary(ctx context.Context, userID string, p
220219
if err := c.rdb.HDel(ctx, key, fields...).Err(); err != nil {
221220
return errs.Wrap(err)
222221
}
223-
224222
if c.localCache != nil {
225223
c.removeLocalTokenCache(ctx, key)
226224
}

pkg/common/storage/kafka/config.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Copyright © 2024 OpenIM open source community. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package kafka
16+
17+
type TLSConfig struct {
18+
EnableTLS bool `yaml:"enableTLS"`
19+
CACrt string `yaml:"caCrt"`
20+
ClientCrt string `yaml:"clientCrt"`
21+
ClientKey string `yaml:"clientKey"`
22+
ClientKeyPwd string `yaml:"clientKeyPwd"`
23+
InsecureSkipVerify bool `yaml:"insecureSkipVerify"`
24+
}
25+
26+
type Config struct {
27+
Username string `yaml:"username"`
28+
Password string `yaml:"password"`
29+
ProducerAck string `yaml:"producerAck"`
30+
CompressType string `yaml:"compressType"`
31+
Addr []string `yaml:"addr"`
32+
TLS TLSConfig `yaml:"tls"`
33+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// Copyright © 2023 OpenIM. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package kafka
16+
17+
import (
18+
"context"
19+
"errors"
20+
21+
"github.com/IBM/sarama"
22+
"github.com/openimsdk/tools/log"
23+
)
24+
25+
type MConsumerGroup struct {
26+
sarama.ConsumerGroup
27+
groupID string
28+
topics []string
29+
}
30+
31+
func NewMConsumerGroup(conf *Config, groupID string, topics []string, autoCommitEnable bool) (*MConsumerGroup, error) {
32+
config, err := BuildConsumerGroupConfig(conf, sarama.OffsetNewest, autoCommitEnable)
33+
if err != nil {
34+
return nil, err
35+
}
36+
group, err := NewConsumerGroup(config, conf.Addr, groupID)
37+
if err != nil {
38+
return nil, err
39+
}
40+
return &MConsumerGroup{
41+
ConsumerGroup: group,
42+
groupID: groupID,
43+
topics: topics,
44+
}, nil
45+
}
46+
47+
func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage) context.Context {
48+
return GetContextWithMQHeader(cMsg.Headers)
49+
}
50+
51+
func (mc *MConsumerGroup) RegisterHandleAndConsumer(ctx context.Context, handler sarama.ConsumerGroupHandler) {
52+
for {
53+
err := mc.ConsumerGroup.Consume(ctx, mc.topics, handler)
54+
if errors.Is(err, sarama.ErrClosedConsumerGroup) {
55+
return
56+
}
57+
if errors.Is(err, context.Canceled) {
58+
return
59+
}
60+
if err != nil {
61+
log.ZWarn(ctx, "consume err", err, "topic", mc.topics, "groupID", mc.groupID)
62+
}
63+
}
64+
}
65+
66+
func (mc *MConsumerGroup) Close() error {
67+
return mc.ConsumerGroup.Close()
68+
}

0 commit comments

Comments
 (0)