diff --git a/pkg/connector/chatsync.go b/pkg/connector/chatsync.go index fb86efb..0fd4f5d 100644 --- a/pkg/connector/chatsync.go +++ b/pkg/connector/chatsync.go @@ -55,12 +55,10 @@ func (l *LinkedInClient) deleteURN(ctx context.Context, urn linkedingo.URN) { } } -func (l *LinkedInClient) handleConversations(ctx context.Context, convs []linkedingo.Conversation) { +// handleConversations processes a page of conversations. Returns true if create limit was reached. +func (l *LinkedInClient) handleConversations(ctx context.Context, convs []linkedingo.Conversation, created, updated *int) bool { log := zerolog.Ctx(ctx) - updatedBefore := time.Now() - var updated, created int - for _, conv := range convs { if slices.Contains(conv.Categories, "SPAM") { l.deleteConversation(ctx, conv) @@ -98,10 +96,6 @@ func (l *LinkedInClient) handleConversations(ctx context.Context, convs []linked Read: conv.Read, } - if conv.LastActivityAt.Before(updatedBefore) { - updatedBefore = conv.LastActivityAt.Time - } - portalKey := l.makePortalKey(conv) portal, err := l.main.Bridge.GetPortalByKey(ctx, portalKey) if err != nil { @@ -114,13 +108,17 @@ func (l *LinkedInClient) handleConversations(ctx context.Context, convs []linked return c.Str("update", "sync") }, PortalKey: portalKey, - CreatePortal: l.main.Config.Sync.CreateLimit == 0 || created <= l.main.Config.Sync.CreateLimit, + CreatePortal: true, } if portal == nil || portal.MXID == "" { - created++ + if l.main.Config.Sync.CreateLimit > 0 && *created >= l.main.Config.Sync.CreateLimit { + log.Info().Int("created", *created).Msg("Create limit reached") + return true + } + *created++ } - updated++ + *updated++ var latestMessageTS time.Time for _, msg := range conv.Messages.Elements { @@ -154,48 +152,53 @@ func (l *LinkedInClient) handleConversations(ctx context.Context, convs []linked }) } - if l.main.Config.Sync.UpdateLimit > 0 && updated >= l.main.Config.Sync.UpdateLimit { + if l.main.Config.Sync.UpdateLimit > 0 && *updated >= l.main.Config.Sync.UpdateLimit { log.Info().Msg("Update limit reached") - return + return true } } + return false } func (l *LinkedInClient) syncConversations(ctx context.Context) { log := zerolog.Ctx(ctx).With().Str("action", "sync_conversations").Logger() log.Info().Msg("starting conversation sync") - lastUsedUpdatedBefore := time.Time{} - updatedBefore := time.Now() - for { + var nextCursor string + var created, updated int + for page := 1; ; page++ { if ctx.Err() != nil { log.Info().Msg("sync canceled") return } - log := log.With(). - Time("updated_before", updatedBefore). - Time("last_used_updated_before", lastUsedUpdatedBefore). - Logger() - - if lastUsedUpdatedBefore.Equal(updatedBefore) { - log.Info().Msg("no more conversations found") - return - } - lastUsedUpdatedBefore = updatedBefore - + log := log.With().Int("page", page).Int("created", created).Int("updated", updated).Logger() log.Info().Msg("fetching conversations") - conversations, err := l.client.GetConversationsUpdatedBefore(ctx, updatedBefore) + var conversations *linkedingo.CollectionResponse[linkedingo.ConversationCursorMetadata, linkedingo.Conversation] + var err error + if nextCursor == "" { + conversations, err = l.client.GetConversationsUpdatedBefore(ctx, time.Now()) + } else { + conversations, err = l.client.GetConversationsWithCursor(ctx, nextCursor) + } if err != nil { log.Err(err).Msg("failed to fetch conversations") return - } else if conversations == nil { - log.Warn().Msg("no conversations found") + } else if conversations == nil || len(conversations.Elements) == 0 { + log.Info().Msg("no more conversations found") return } - l.handleConversations(ctx, conversations.Elements) + if l.handleConversations(ctx, conversations.Elements, &created, &updated) { + return + } + + nextCursor = conversations.Metadata.NextCursor + if nextCursor == "" { + log.Info().Msg("no more pages (no nextCursor)") + return + } } } func (l *LinkedInClient) getConversationsBySyncToken(ctx context.Context) { @@ -207,7 +210,8 @@ func (l *LinkedInClient) getConversationsBySyncToken(ctx context.Context) { if convs == nil { return } - l.handleConversations(ctx, convs.Elements) + var created, updated int + _ = l.handleConversations(ctx, convs.Elements, &created, &updated) for _, item := range convs.Metadata.DeletedURNs { l.deleteURN(ctx, item.Conversation.EntityURN) } diff --git a/pkg/connector/handlelinkedin.go b/pkg/connector/handlelinkedin.go index 326d554..65b9099 100644 --- a/pkg/connector/handlelinkedin.go +++ b/pkg/connector/handlelinkedin.go @@ -89,7 +89,8 @@ func (l *LinkedInClient) onDecoratedEvent(ctx context.Context, decoratedEvent *l func (l *LinkedInClient) onRealtimeConversations(ctx context.Context, conv *linkedingo.Conversation) { if conv != nil { convs := []linkedingo.Conversation{*conv} - l.handleConversations(ctx, convs) + var created, updated int + _ = l.handleConversations(ctx, convs, &created, &updated) } else { l.getConversationsBySyncToken(ctx) } diff --git a/pkg/linkedingo/conversations.go b/pkg/linkedingo/conversations.go index 641c45e..16802c0 100644 --- a/pkg/linkedingo/conversations.go +++ b/pkg/linkedingo/conversations.go @@ -180,6 +180,25 @@ func (c *Client) GetConversationsUpdatedBefore(ctx context.Context, updatedBefor return response.Data.MessengerConversationsByCategoryQuery, nil } +func (c *Client) GetConversationsWithCursor(ctx context.Context, nextCursor string) (*CollectionResponse[ConversationCursorMetadata, Conversation], error) { + zerolog.Ctx(ctx).Info(). + Str("next_cursor", nextCursor). + Msg("Getting conversations with cursor") + var response GraphQlResponse + _, err := c.newAuthedRequest(http.MethodGet, linkedInVoyagerMessagingGraphQLURL). + WithGraphQLQuery(graphQLQueryIDMessengerConversationsWithCursor, map[string]string{ + "mailboxUrn": url.QueryEscape(c.userEntityURN.WithPrefix("urn", "li", "fsd_profile").String()), + "nextCursor": nextCursor, + "count": "20", + "query": "(predicateUnions:List((conversationCategoryPredicate:(category:PRIMARY_INBOX))))", + }). + Do(ctx, &response) + if err != nil { + return nil, err + } + return response.Data.MessengerConversationsByCategoryQuery, nil +} + func (c *Client) GetConversationsBySyncToken(ctx context.Context) (*CollectionResponse[ConversationSyncMetadata, Conversation], error) { zerolog.Ctx(ctx).Info().Msg("Getting conversations") req := c.newAuthedRequest(http.MethodGet, linkedInVoyagerMessagingGraphQLURL)