Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 36 additions & 32 deletions pkg/connector/chatsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,10 @@ func (l *LinkedInClient) deleteURN(ctx context.Context, urn linkedingo.URN) {
}
}

func (l *LinkedInClient) handleConversations(ctx context.Context, convs []linkedingo.Conversation) {
// handleConversations processes a page of conversations. Returns true if create limit was reached.
func (l *LinkedInClient) handleConversations(ctx context.Context, convs []linkedingo.Conversation, created, updated *int) bool {
log := zerolog.Ctx(ctx)

updatedBefore := time.Now()
var updated, created int

for _, conv := range convs {
if slices.Contains(conv.Categories, "SPAM") {
l.deleteConversation(ctx, conv)
Expand Down Expand Up @@ -98,10 +96,6 @@ func (l *LinkedInClient) handleConversations(ctx context.Context, convs []linked
Read: conv.Read,
}

if conv.LastActivityAt.Before(updatedBefore) {
updatedBefore = conv.LastActivityAt.Time
}

portalKey := l.makePortalKey(conv)
portal, err := l.main.Bridge.GetPortalByKey(ctx, portalKey)
if err != nil {
Expand All @@ -114,13 +108,17 @@ func (l *LinkedInClient) handleConversations(ctx context.Context, convs []linked
return c.Str("update", "sync")
},
PortalKey: portalKey,
CreatePortal: l.main.Config.Sync.CreateLimit == 0 || created <= l.main.Config.Sync.CreateLimit,
CreatePortal: true,
}

if portal == nil || portal.MXID == "" {
created++
if l.main.Config.Sync.CreateLimit > 0 && *created >= l.main.Config.Sync.CreateLimit {
log.Info().Int("created", *created).Msg("Create limit reached")
return true
}
*created++
}
updated++
*updated++

var latestMessageTS time.Time
for _, msg := range conv.Messages.Elements {
Expand Down Expand Up @@ -154,48 +152,53 @@ func (l *LinkedInClient) handleConversations(ctx context.Context, convs []linked
})
}

if l.main.Config.Sync.UpdateLimit > 0 && updated >= l.main.Config.Sync.UpdateLimit {
if l.main.Config.Sync.UpdateLimit > 0 && *updated >= l.main.Config.Sync.UpdateLimit {
log.Info().Msg("Update limit reached")
return
return true
}
}
return false
}

func (l *LinkedInClient) syncConversations(ctx context.Context) {
log := zerolog.Ctx(ctx).With().Str("action", "sync_conversations").Logger()
log.Info().Msg("starting conversation sync")

lastUsedUpdatedBefore := time.Time{}
updatedBefore := time.Now()
for {
var nextCursor string
var created, updated int
for page := 1; ; page++ {
if ctx.Err() != nil {
log.Info().Msg("sync canceled")
return
}

log := log.With().
Time("updated_before", updatedBefore).
Time("last_used_updated_before", lastUsedUpdatedBefore).
Logger()

if lastUsedUpdatedBefore.Equal(updatedBefore) {
log.Info().Msg("no more conversations found")
return
}
lastUsedUpdatedBefore = updatedBefore

log := log.With().Int("page", page).Int("created", created).Int("updated", updated).Logger()
log.Info().Msg("fetching conversations")

conversations, err := l.client.GetConversationsUpdatedBefore(ctx, updatedBefore)
var conversations *linkedingo.CollectionResponse[linkedingo.ConversationCursorMetadata, linkedingo.Conversation]
var err error
if nextCursor == "" {
conversations, err = l.client.GetConversationsUpdatedBefore(ctx, time.Now())
} else {
conversations, err = l.client.GetConversationsWithCursor(ctx, nextCursor)
}
if err != nil {
log.Err(err).Msg("failed to fetch conversations")
return
} else if conversations == nil {
log.Warn().Msg("no conversations found")
} else if conversations == nil || len(conversations.Elements) == 0 {
log.Info().Msg("no more conversations found")
return
}

l.handleConversations(ctx, conversations.Elements)
if l.handleConversations(ctx, conversations.Elements, &created, &updated) {
return
}

nextCursor = conversations.Metadata.NextCursor
if nextCursor == "" {
log.Info().Msg("no more pages (no nextCursor)")
return
}
}
}
func (l *LinkedInClient) getConversationsBySyncToken(ctx context.Context) {
Expand All @@ -207,7 +210,8 @@ func (l *LinkedInClient) getConversationsBySyncToken(ctx context.Context) {
if convs == nil {
return
}
l.handleConversations(ctx, convs.Elements)
var created, updated int
_ = l.handleConversations(ctx, convs.Elements, &created, &updated)
for _, item := range convs.Metadata.DeletedURNs {
l.deleteURN(ctx, item.Conversation.EntityURN)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/connector/handlelinkedin.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ func (l *LinkedInClient) onDecoratedEvent(ctx context.Context, decoratedEvent *l
func (l *LinkedInClient) onRealtimeConversations(ctx context.Context, conv *linkedingo.Conversation) {
if conv != nil {
convs := []linkedingo.Conversation{*conv}
l.handleConversations(ctx, convs)
var created, updated int
_ = l.handleConversations(ctx, convs, &created, &updated)
} else {
l.getConversationsBySyncToken(ctx)
}
Expand Down
19 changes: 19 additions & 0 deletions pkg/linkedingo/conversations.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,25 @@ func (c *Client) GetConversationsUpdatedBefore(ctx context.Context, updatedBefor
return response.Data.MessengerConversationsByCategoryQuery, nil
}

func (c *Client) GetConversationsWithCursor(ctx context.Context, nextCursor string) (*CollectionResponse[ConversationCursorMetadata, Conversation], error) {
zerolog.Ctx(ctx).Info().
Str("next_cursor", nextCursor).
Msg("Getting conversations with cursor")
var response GraphQlResponse
_, err := c.newAuthedRequest(http.MethodGet, linkedInVoyagerMessagingGraphQLURL).
WithGraphQLQuery(graphQLQueryIDMessengerConversationsWithCursor, map[string]string{
"mailboxUrn": url.QueryEscape(c.userEntityURN.WithPrefix("urn", "li", "fsd_profile").String()),
"nextCursor": nextCursor,
"count": "20",
"query": "(predicateUnions:List((conversationCategoryPredicate:(category:PRIMARY_INBOX))))",
}).
Do(ctx, &response)
if err != nil {
return nil, err
}
return response.Data.MessengerConversationsByCategoryQuery, nil
}

func (c *Client) GetConversationsBySyncToken(ctx context.Context) (*CollectionResponse[ConversationSyncMetadata, Conversation], error) {
zerolog.Ctx(ctx).Info().Msg("Getting conversations")
req := c.newAuthedRequest(http.MethodGet, linkedInVoyagerMessagingGraphQLURL)
Expand Down