mirror of https://github.com/k3s-io/k3s.git
334 lines
9.3 KiB
Go
334 lines
9.3 KiB
Go
package containerd
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"time"
|
|
|
|
containerd "github.com/containerd/containerd/v2/client"
|
|
"github.com/fsnotify/fsnotify"
|
|
"github.com/k3s-io/k3s/pkg/agent/cri"
|
|
"github.com/k3s-io/k3s/pkg/daemons/config"
|
|
pkgerrors "github.com/pkg/errors"
|
|
"github.com/rancher/wharfie/pkg/tarfile"
|
|
"github.com/sirupsen/logrus"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/client-go/util/workqueue"
|
|
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
|
|
)
|
|
|
|
type fileInfo struct {
|
|
Size int64 `json:"size"`
|
|
ModTime metav1.Time `json:"modTime"`
|
|
seen bool // field is not serialized, and can be used to track if a file has been seen since the last restart
|
|
}
|
|
|
|
type watchqueue struct {
|
|
cfg *config.Node
|
|
watcher *fsnotify.Watcher
|
|
filesCache map[string]*fileInfo
|
|
workqueue workqueue.TypedDelayingInterface[string]
|
|
}
|
|
|
|
func createWatcher(path string) (*fsnotify.Watcher, error) {
|
|
watcher, err := fsnotify.NewWatcher()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := watcher.Add(path); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return watcher, nil
|
|
}
|
|
|
|
func mustCreateWatcher(path string) *fsnotify.Watcher {
|
|
watcher, err := createWatcher(path)
|
|
if err != nil {
|
|
panic("Failed to create image import watcher:" + err.Error())
|
|
}
|
|
return watcher
|
|
}
|
|
|
|
func isFileSupported(path string) bool {
|
|
for _, ext := range append(tarfile.SupportedExtensions, ".txt") {
|
|
if strings.HasSuffix(path, ext) {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// runWorkerForImages connects to containerd and calls processNextEventForImages to process items from the workqueue.
|
|
// This blocks until the workqueue is shut down.
|
|
func (w *watchqueue) runWorkerForImages(ctx context.Context) {
|
|
// create the connections to not create every time when processing a event
|
|
client, err := Client(w.cfg.Containerd.Address)
|
|
if err != nil {
|
|
logrus.Errorf("Failed to create containerd client: %v", err)
|
|
w.watcher.Close()
|
|
return
|
|
}
|
|
|
|
defer client.Close()
|
|
|
|
criConn, err := cri.Connection(ctx, w.cfg.Containerd.Address)
|
|
if err != nil {
|
|
logrus.Errorf("Failed to create CRI connection: %v", err)
|
|
w.watcher.Close()
|
|
return
|
|
}
|
|
|
|
defer criConn.Close()
|
|
|
|
imageClient := runtimeapi.NewImageServiceClient(criConn)
|
|
|
|
for w.processNextEventForImages(ctx, client, imageClient) {
|
|
}
|
|
}
|
|
|
|
// processNextEventForImages retrieves a single event from the workqueue and processes it.
|
|
// It returns a boolean that is true if the workqueue is still open and this function should be called again.
|
|
func (w *watchqueue) processNextEventForImages(ctx context.Context, client *containerd.Client, imageClient runtimeapi.ImageServiceClient) bool {
|
|
key, shutdown := w.workqueue.Get()
|
|
|
|
if shutdown {
|
|
return false
|
|
}
|
|
|
|
if err := w.processImageEvent(ctx, key, client, imageClient); err != nil {
|
|
logrus.Errorf("Failed to process image event: %v", err)
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// processImageEvent processes a single item from the workqueue.
|
|
func (w *watchqueue) processImageEvent(ctx context.Context, key string, client *containerd.Client, imageClient runtimeapi.ImageServiceClient) error {
|
|
defer w.workqueue.Done(key)
|
|
|
|
// Watch is rooted at the parent dir of the images dir, but we only need to handle things within the images dir
|
|
if !strings.HasPrefix(key, w.cfg.Images) {
|
|
return nil
|
|
}
|
|
|
|
file, err := os.Stat(key)
|
|
|
|
// if the file does not exists, we assume that the event was RENAMED or REMOVED
|
|
if os.IsNotExist(err) {
|
|
// if the whole images dir was removed, reset the fileinfo cache
|
|
if key == w.cfg.Images {
|
|
w.filesCache = make(map[string]*fileInfo)
|
|
defer w.syncCache()
|
|
return nil
|
|
}
|
|
|
|
if !isFileSupported(key) {
|
|
return nil
|
|
}
|
|
|
|
delete(w.filesCache, key)
|
|
defer w.syncCache()
|
|
return nil
|
|
} else if err != nil {
|
|
return pkgerrors.Wrapf(err, "failed to get fileinfo for image event %s", key)
|
|
}
|
|
|
|
if file.IsDir() {
|
|
// Add to watch and list+enqueue directory contents, as notify is not recursive
|
|
if err := w.watcher.Add(key); err != nil {
|
|
return pkgerrors.Wrapf(err, "failed to add watch of %s", key)
|
|
}
|
|
|
|
fileInfos, err := os.ReadDir(key)
|
|
if err != nil {
|
|
return pkgerrors.Wrapf(err, "unable to list contents of %s", key)
|
|
}
|
|
|
|
for _, fileInfo := range fileInfos {
|
|
w.workqueue.Add(filepath.Join(key, fileInfo.Name()))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
if !isFileSupported(key) {
|
|
return nil
|
|
}
|
|
|
|
if lastFileState := w.filesCache[key]; lastFileState == nil || (file.Size() != lastFileState.Size && file.ModTime().After(lastFileState.ModTime.Time)) {
|
|
start := time.Now()
|
|
if err := preloadFile(ctx, w.cfg, client, imageClient, key); err != nil {
|
|
return pkgerrors.Wrapf(err, "failed to import %s", key)
|
|
}
|
|
logrus.Infof("Imported images from %s in %s", key, time.Since(start))
|
|
w.filesCache[key] = &fileInfo{Size: file.Size(), ModTime: metav1.NewTime(file.ModTime()), seen: true}
|
|
defer w.syncCache()
|
|
} else if lastFileState != nil && !lastFileState.seen {
|
|
lastFileState.seen = true
|
|
// no need to sync as the field is not serialized
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// pruneCache removes entries for all files that have not been seen since the last restart,
|
|
// and syncs the cache to disk. This is done to ensure that the cache file does not grow without
|
|
// bounds by continuing to track files that do not exist.
|
|
func (w *watchqueue) pruneCache() {
|
|
for path, fileState := range w.filesCache {
|
|
if !fileState.seen {
|
|
delete(w.filesCache, path)
|
|
}
|
|
}
|
|
w.syncCache()
|
|
}
|
|
|
|
// syncCache writes the fileinfo cache to disk.
|
|
// if the cache file does not exist, this is a no-op. The file must be manually
|
|
// created by the user in order for the cache to be persisted across restarts.
|
|
func (w *watchqueue) syncCache() {
|
|
filePath := filepath.Join(w.cfg.Images, ".cache.json")
|
|
f, err := os.OpenFile(filePath, os.O_WRONLY|os.O_TRUNC, 0644)
|
|
if err != nil {
|
|
if !os.IsNotExist(err) {
|
|
logrus.Errorf("Failed to truncate image import fileinfo cache: %v", err)
|
|
}
|
|
return
|
|
}
|
|
defer f.Close()
|
|
|
|
b, err := json.Marshal(&w.filesCache)
|
|
if err != nil {
|
|
logrus.Errorf("Failed to marshal image import fileinfo cache: %v", err)
|
|
return
|
|
}
|
|
|
|
if _, err := f.Write(b); err != nil {
|
|
logrus.Errorf("Failed to write image import fileinfo cache: %v", err)
|
|
}
|
|
}
|
|
|
|
// loadCaache reads the fileinfo cache from disk.
|
|
// It is not an error if this file exists or is empty.
|
|
func (w *watchqueue) loadCache() {
|
|
filePath := filepath.Join(w.cfg.Images, ".cache.json")
|
|
f, err := os.OpenFile(filePath, os.O_RDONLY, 0664)
|
|
if err != nil {
|
|
if !os.IsNotExist(err) {
|
|
logrus.Errorf("Failed to open image import fileinfo cache: %v", err)
|
|
}
|
|
return
|
|
}
|
|
defer f.Close()
|
|
|
|
b, err := io.ReadAll(f)
|
|
if err != nil {
|
|
logrus.Errorf("Failed to read image import fileinfo cache: %v", err)
|
|
return
|
|
}
|
|
|
|
// 0 byte file is fine, but don't try to load it - allows users to simply
|
|
// touch the file to enable it for use.
|
|
if len(b) == 0 {
|
|
return
|
|
}
|
|
|
|
if err := json.Unmarshal(b, &w.filesCache); err != nil {
|
|
logrus.Errorf("Failed to unmarshal image import fileinfo cache: %v", err)
|
|
}
|
|
}
|
|
|
|
// importAndWatchImages starts the image watcher and workqueue.
|
|
// This function block until the workqueue is empty, indicating that all images
|
|
// that currently exist have been imported.
|
|
func importAndWatchImages(ctx context.Context, cfg *config.Node) error {
|
|
w, err := watchImages(ctx, cfg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Add images dir to workqueue; if it exists and contains images
|
|
// they will be recursively listed and enqueued.
|
|
w.workqueue.Add(cfg.Images)
|
|
|
|
// wait for the workqueue to empty before returning
|
|
for w.workqueue.Len() > 0 {
|
|
logrus.Debugf("Waiting for initial import of images from %s", cfg.Images)
|
|
time.Sleep(time.Second * 2)
|
|
}
|
|
|
|
// prune unseen entries from last run once all existing files have been processed
|
|
w.pruneCache()
|
|
|
|
return nil
|
|
}
|
|
|
|
// watchImages starts a watcher on the parent of the images dir, and a workqueue to process events
|
|
// from the watch stream.
|
|
func watchImages(ctx context.Context, cfg *config.Node) (*watchqueue, error) {
|
|
// watch the directory above the images dir, as it may not exist yet when the watch is started.
|
|
watcher, err := createWatcher(filepath.Dir(cfg.Images))
|
|
if err != nil {
|
|
return nil, pkgerrors.Wrapf(err, "failed to create image import watcher for %s", filepath.Dir(cfg.Images))
|
|
}
|
|
|
|
w := &watchqueue{
|
|
cfg: cfg,
|
|
watcher: watcher,
|
|
filesCache: make(map[string]*fileInfo),
|
|
workqueue: workqueue.TypedNewDelayingQueue[string](),
|
|
}
|
|
logrus.Debugf("Image import watcher created")
|
|
|
|
w.loadCache()
|
|
|
|
go func() {
|
|
<-ctx.Done()
|
|
w.watcher.Close()
|
|
}()
|
|
|
|
go w.runWorkerForImages(ctx)
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case event, ok := <-w.watcher.Events:
|
|
if !ok {
|
|
logrus.Info("Image import watcher event channel closed; retrying in 5 seconds")
|
|
select {
|
|
case <-time.After(time.Second * 5):
|
|
w.watcher = mustCreateWatcher(filepath.Dir(cfg.Images))
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
|
|
// only enqueue event if it is for a path within the images dir - not the parent dir that we are watching
|
|
if strings.HasPrefix(event.Name, cfg.Images) {
|
|
w.workqueue.AddAfter(event.Name, time.Second*2)
|
|
}
|
|
|
|
case err, ok := <-w.watcher.Errors:
|
|
if !ok {
|
|
logrus.Info("Image import watcher error channel closed; retrying in 5 seconds")
|
|
select {
|
|
case <-time.After(time.Second * 5):
|
|
w.watcher = mustCreateWatcher(filepath.Dir(cfg.Images))
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
logrus.Errorf("Image import watcher received an error: %v", err)
|
|
}
|
|
}
|
|
}()
|
|
|
|
return w, nil
|
|
}
|