package containerd import ( "context" "encoding/json" "io" "os" "path/filepath" "strings" "time" containerd "github.com/containerd/containerd/v2/client" "github.com/fsnotify/fsnotify" "github.com/k3s-io/k3s/pkg/agent/cri" "github.com/k3s-io/k3s/pkg/daemons/config" pkgerrors "github.com/pkg/errors" "github.com/rancher/wharfie/pkg/tarfile" "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/util/workqueue" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" ) type fileInfo struct { Size int64 `json:"size"` ModTime metav1.Time `json:"modTime"` seen bool // field is not serialized, and can be used to track if a file has been seen since the last restart } type watchqueue struct { cfg *config.Node watcher *fsnotify.Watcher filesCache map[string]*fileInfo workqueue workqueue.TypedDelayingInterface[string] } func createWatcher(path string) (*fsnotify.Watcher, error) { watcher, err := fsnotify.NewWatcher() if err != nil { return nil, err } if err := watcher.Add(path); err != nil { return nil, err } return watcher, nil } func mustCreateWatcher(path string) *fsnotify.Watcher { watcher, err := createWatcher(path) if err != nil { panic("Failed to create image import watcher:" + err.Error()) } return watcher } func isFileSupported(path string) bool { for _, ext := range append(tarfile.SupportedExtensions, ".txt") { if strings.HasSuffix(path, ext) { return true } } return false } // runWorkerForImages connects to containerd and calls processNextEventForImages to process items from the workqueue. // This blocks until the workqueue is shut down. func (w *watchqueue) runWorkerForImages(ctx context.Context) { // create the connections to not create every time when processing a event client, err := Client(w.cfg.Containerd.Address) if err != nil { logrus.Errorf("Failed to create containerd client: %v", err) w.watcher.Close() return } defer client.Close() criConn, err := cri.Connection(ctx, w.cfg.Containerd.Address) if err != nil { logrus.Errorf("Failed to create CRI connection: %v", err) w.watcher.Close() return } defer criConn.Close() imageClient := runtimeapi.NewImageServiceClient(criConn) for w.processNextEventForImages(ctx, client, imageClient) { } } // processNextEventForImages retrieves a single event from the workqueue and processes it. // It returns a boolean that is true if the workqueue is still open and this function should be called again. func (w *watchqueue) processNextEventForImages(ctx context.Context, client *containerd.Client, imageClient runtimeapi.ImageServiceClient) bool { key, shutdown := w.workqueue.Get() if shutdown { return false } if err := w.processImageEvent(ctx, key, client, imageClient); err != nil { logrus.Errorf("Failed to process image event: %v", err) } return true } // processImageEvent processes a single item from the workqueue. func (w *watchqueue) processImageEvent(ctx context.Context, key string, client *containerd.Client, imageClient runtimeapi.ImageServiceClient) error { defer w.workqueue.Done(key) // Watch is rooted at the parent dir of the images dir, but we only need to handle things within the images dir if !strings.HasPrefix(key, w.cfg.Images) { return nil } file, err := os.Stat(key) // if the file does not exists, we assume that the event was RENAMED or REMOVED if os.IsNotExist(err) { // if the whole images dir was removed, reset the fileinfo cache if key == w.cfg.Images { w.filesCache = make(map[string]*fileInfo) defer w.syncCache() return nil } if !isFileSupported(key) { return nil } delete(w.filesCache, key) defer w.syncCache() return nil } else if err != nil { return pkgerrors.Wrapf(err, "failed to get fileinfo for image event %s", key) } if file.IsDir() { // Add to watch and list+enqueue directory contents, as notify is not recursive if err := w.watcher.Add(key); err != nil { return pkgerrors.Wrapf(err, "failed to add watch of %s", key) } fileInfos, err := os.ReadDir(key) if err != nil { return pkgerrors.Wrapf(err, "unable to list contents of %s", key) } for _, fileInfo := range fileInfos { w.workqueue.Add(filepath.Join(key, fileInfo.Name())) } return nil } if !isFileSupported(key) { return nil } if lastFileState := w.filesCache[key]; lastFileState == nil || (file.Size() != lastFileState.Size && file.ModTime().After(lastFileState.ModTime.Time)) { start := time.Now() if err := preloadFile(ctx, w.cfg, client, imageClient, key); err != nil { return pkgerrors.Wrapf(err, "failed to import %s", key) } logrus.Infof("Imported images from %s in %s", key, time.Since(start)) w.filesCache[key] = &fileInfo{Size: file.Size(), ModTime: metav1.NewTime(file.ModTime()), seen: true} defer w.syncCache() } else if lastFileState != nil && !lastFileState.seen { lastFileState.seen = true // no need to sync as the field is not serialized } return nil } // pruneCache removes entries for all files that have not been seen since the last restart, // and syncs the cache to disk. This is done to ensure that the cache file does not grow without // bounds by continuing to track files that do not exist. func (w *watchqueue) pruneCache() { for path, fileState := range w.filesCache { if !fileState.seen { delete(w.filesCache, path) } } w.syncCache() } // syncCache writes the fileinfo cache to disk. // if the cache file does not exist, this is a no-op. The file must be manually // created by the user in order for the cache to be persisted across restarts. func (w *watchqueue) syncCache() { filePath := filepath.Join(w.cfg.Images, ".cache.json") f, err := os.OpenFile(filePath, os.O_WRONLY|os.O_TRUNC, 0644) if err != nil { if !os.IsNotExist(err) { logrus.Errorf("Failed to truncate image import fileinfo cache: %v", err) } return } defer f.Close() b, err := json.Marshal(&w.filesCache) if err != nil { logrus.Errorf("Failed to marshal image import fileinfo cache: %v", err) return } if _, err := f.Write(b); err != nil { logrus.Errorf("Failed to write image import fileinfo cache: %v", err) } } // loadCaache reads the fileinfo cache from disk. // It is not an error if this file exists or is empty. func (w *watchqueue) loadCache() { filePath := filepath.Join(w.cfg.Images, ".cache.json") f, err := os.OpenFile(filePath, os.O_RDONLY, 0664) if err != nil { if !os.IsNotExist(err) { logrus.Errorf("Failed to open image import fileinfo cache: %v", err) } return } defer f.Close() b, err := io.ReadAll(f) if err != nil { logrus.Errorf("Failed to read image import fileinfo cache: %v", err) return } // 0 byte file is fine, but don't try to load it - allows users to simply // touch the file to enable it for use. if len(b) == 0 { return } if err := json.Unmarshal(b, &w.filesCache); err != nil { logrus.Errorf("Failed to unmarshal image import fileinfo cache: %v", err) } } // importAndWatchImages starts the image watcher and workqueue. // This function block until the workqueue is empty, indicating that all images // that currently exist have been imported. func importAndWatchImages(ctx context.Context, cfg *config.Node) error { w, err := watchImages(ctx, cfg) if err != nil { return err } // Add images dir to workqueue; if it exists and contains images // they will be recursively listed and enqueued. w.workqueue.Add(cfg.Images) // wait for the workqueue to empty before returning for w.workqueue.Len() > 0 { logrus.Debugf("Waiting for initial import of images from %s", cfg.Images) time.Sleep(time.Second * 2) } // prune unseen entries from last run once all existing files have been processed w.pruneCache() return nil } // watchImages starts a watcher on the parent of the images dir, and a workqueue to process events // from the watch stream. func watchImages(ctx context.Context, cfg *config.Node) (*watchqueue, error) { // watch the directory above the images dir, as it may not exist yet when the watch is started. watcher, err := createWatcher(filepath.Dir(cfg.Images)) if err != nil { return nil, pkgerrors.Wrapf(err, "failed to create image import watcher for %s", filepath.Dir(cfg.Images)) } w := &watchqueue{ cfg: cfg, watcher: watcher, filesCache: make(map[string]*fileInfo), workqueue: workqueue.TypedNewDelayingQueue[string](), } logrus.Debugf("Image import watcher created") w.loadCache() go func() { <-ctx.Done() w.watcher.Close() }() go w.runWorkerForImages(ctx) go func() { for { select { case event, ok := <-w.watcher.Events: if !ok { logrus.Info("Image import watcher event channel closed; retrying in 5 seconds") select { case <-time.After(time.Second * 5): w.watcher = mustCreateWatcher(filepath.Dir(cfg.Images)) case <-ctx.Done(): return } } // only enqueue event if it is for a path within the images dir - not the parent dir that we are watching if strings.HasPrefix(event.Name, cfg.Images) { w.workqueue.AddAfter(event.Name, time.Second*2) } case err, ok := <-w.watcher.Errors: if !ok { logrus.Info("Image import watcher error channel closed; retrying in 5 seconds") select { case <-time.After(time.Second * 5): w.watcher = mustCreateWatcher(filepath.Dir(cfg.Images)) case <-ctx.Done(): return } } logrus.Errorf("Image import watcher received an error: %v", err) } } }() return w, nil }