|
6 | 6 | "time" |
7 | 7 |
|
8 | 8 | "google.golang.org/grpc/codes" |
| 9 | + "k8s.io/utils/clock" |
9 | 10 |
|
10 | 11 | "github.com/flyteorg/flyte/datacatalog/pkg/common" |
11 | 12 | "github.com/flyteorg/flyte/datacatalog/pkg/errors" |
@@ -51,6 +52,7 @@ type artifactManager struct { |
51 | 52 | repo repositories.RepositoryInterface |
52 | 53 | artifactStore ArtifactDataStore |
53 | 54 | systemMetrics artifactMetrics |
| 55 | + clock clock.Clock |
54 | 56 | } |
55 | 57 |
|
56 | 58 | // Create an Artifact along with the associated ArtifactData. The ArtifactData will be stored in an offloaded location. |
@@ -111,6 +113,12 @@ func (m *artifactManager) CreateArtifact(ctx context.Context, request *datacatal |
111 | 113 | return nil, err |
112 | 114 | } |
113 | 115 |
|
| 116 | + // Set expiration |
| 117 | + if request.GetArtifact().GetTtl() != nil { |
| 118 | + expiration := m.clock.Now().Add(request.GetArtifact().GetTtl().AsDuration()) |
| 119 | + artifactModel.ExpiresAt = &expiration |
| 120 | + } |
| 121 | + |
114 | 122 | err = m.repo.ArtifactRepo().Create(ctx, artifactModel) |
115 | 123 | if err != nil { |
116 | 124 | if errors.IsAlreadyExistsError(err) { |
@@ -184,7 +192,7 @@ func (m *artifactManager) findArtifact(ctx context.Context, datasetID *datacatal |
184 | 192 | logger.Debugf(ctx, "Get artifact by id %v", key) |
185 | 193 | artifactKey := transformers.ToArtifactKey(datasetID, key) |
186 | 194 | var err error |
187 | | - artifactModel, err = m.repo.ArtifactRepo().Get(ctx, artifactKey) |
| 195 | + artifactModel, err = m.repo.ArtifactRepo().GetAndFilterExpired(ctx, artifactKey) |
188 | 196 |
|
189 | 197 | if err != nil { |
190 | 198 | if errors.IsDoesNotExistError(err) { |
@@ -215,6 +223,11 @@ func (m *artifactManager) findArtifact(ctx context.Context, datasetID *datacatal |
215 | 223 | artifactModel = tag.Artifact |
216 | 224 | } |
217 | 225 |
|
| 226 | + // If the artifact is expired consider this tag expired too |
| 227 | + if artifactModel.ExpiresAt != nil && artifactModel.ExpiresAt.Before(m.clock.Now()) { |
| 228 | + return models.Artifact{}, errors.NewDataCatalogErrorf(codes.NotFound, "entry not found") |
| 229 | + } |
| 230 | + |
218 | 231 | if len(artifactModel.ArtifactData) == 0 { |
219 | 232 | return models.Artifact{}, errors.NewDataCatalogErrorf(codes.Internal, "artifact [%+v] with key %v does not have artifact data associated", artifactModel, key) |
220 | 233 | } |
@@ -273,7 +286,7 @@ func (m *artifactManager) ListArtifacts(ctx context.Context, request *datacatalo |
273 | 286 | } |
274 | 287 |
|
275 | 288 | // Perform the list with the dataset and listInput filters |
276 | | - artifactModels, err := m.repo.ArtifactRepo().List(ctx, dataset.DatasetKey, listInput) |
| 289 | + artifactModels, err := m.repo.ArtifactRepo().ListAndFilterExpired(ctx, dataset.DatasetKey, listInput) |
277 | 290 | if err != nil { |
278 | 291 | logger.Errorf(ctx, "Unable to list Artifacts err: %v", err) |
279 | 292 | m.systemMetrics.listFailureCounter.Inc(ctx) |
@@ -379,6 +392,15 @@ func (m *artifactManager) UpdateArtifact(ctx context.Context, request *datacatal |
379 | 392 |
|
380 | 393 | // update artifact in DB, also replaces/upserts associated artifact data |
381 | 394 | artifactModel.ArtifactData = artifactDataModels |
| 395 | + |
| 396 | + // Reset TTL on the artifact since all the data is fresh. |
| 397 | + if request.GetTtl() != nil { |
| 398 | + expiration := m.clock.Now().Add(request.GetTtl().AsDuration()) |
| 399 | + artifactModel.ExpiresAt = &expiration |
| 400 | + } else { |
| 401 | + artifactModel.ExpiresAt = nil |
| 402 | + } |
| 403 | + |
382 | 404 | logger.Debugf(ctx, "Updating ArtifactModel with %+v", artifactModel) |
383 | 405 |
|
384 | 406 | err = m.repo.ArtifactRepo().Update(ctx, artifactModel) |
@@ -416,7 +438,7 @@ func (m *artifactManager) UpdateArtifact(ctx context.Context, request *datacatal |
416 | 438 | }, nil |
417 | 439 | } |
418 | 440 |
|
419 | | -func NewArtifactManager(repo repositories.RepositoryInterface, store *storage.DataStore, storagePrefix storage.DataReference, artifactScope promutils.Scope) interfaces.ArtifactManager { |
| 441 | +func NewArtifactManager(repo repositories.RepositoryInterface, store *storage.DataStore, storagePrefix storage.DataReference, artifactScope promutils.Scope, clock clock.Clock) interfaces.ArtifactManager { |
420 | 442 | artifactMetrics := artifactMetrics{ |
421 | 443 | scope: artifactScope, |
422 | 444 | createResponseTime: labeled.NewStopWatch("create_duration", "The duration of the create artifact calls.", time.Millisecond, artifactScope, labeled.EmitUnlabeledMetric), |
@@ -446,5 +468,6 @@ func NewArtifactManager(repo repositories.RepositoryInterface, store *storage.Da |
446 | 468 | repo: repo, |
447 | 469 | artifactStore: NewArtifactDataStore(store, storagePrefix), |
448 | 470 | systemMetrics: artifactMetrics, |
| 471 | + clock: clock, |
449 | 472 | } |
450 | 473 | } |
0 commit comments