afero/gcsfs/file.go

313 lines
6.7 KiB
Go

// Copyright © 2021 Vasily Ovchinnikov <vasily@remerge.io>.
//
// The code in this file is derived from afero fork github.com/Zatte/afero by Mikael Rapp
// licensed under Apache License 2.0.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package gcsfs
import (
"context"
"errors"
"fmt"
"io"
"log"
"os"
"path/filepath"
"sort"
"syscall"
"github.com/googleapis/google-cloud-go-testing/storage/stiface"
"cloud.google.com/go/storage"
"google.golang.org/api/iterator"
)
// GcsFs is the Afero version adapted for GCS
type GcsFile struct {
openFlags int
fhOffset int64 //File handle specific offset
closed bool
ReadDirIt stiface.ObjectIterator
resource *gcsFileResource
}
func NewGcsFile(
ctx context.Context,
fs *Fs,
obj stiface.ObjectHandle,
openFlags int,
// Unused: there is no use to the file mode in GCloud just yet - but we keep it here, just in case we need it
fileMode os.FileMode,
name string,
) *GcsFile {
return &GcsFile{
openFlags: openFlags,
fhOffset: 0,
closed: false,
ReadDirIt: nil,
resource: &gcsFileResource{
ctx: ctx,
fs: fs,
obj: obj,
name: name,
fileMode: fileMode,
currentGcsSize: 0,
offset: 0,
reader: nil,
writer: nil,
},
}
}
func NewGcsFileFromOldFH(
openFlags int,
fileMode os.FileMode,
oldFile *gcsFileResource,
) *GcsFile {
res := &GcsFile{
openFlags: openFlags,
fhOffset: 0,
closed: false,
ReadDirIt: nil,
resource: oldFile,
}
res.resource.fileMode = fileMode
return res
}
func (o *GcsFile) Close() error {
if o.closed {
// the afero spec expects the call to Close on a closed file to return an error
return ErrFileClosed
}
o.closed = true
return o.resource.Close()
}
func (o *GcsFile) Seek(newOffset int64, whence int) (int64, error) {
if o.closed {
return 0, ErrFileClosed
}
//Since this is an expensive operation; let's make sure we need it
if (whence == 0 && newOffset == o.fhOffset) || (whence == 1 && newOffset == 0) {
return o.fhOffset, nil
}
log.Printf("WARNING: Seek behavior triggered, highly inefficent. Offset before seek is at %d\n", o.fhOffset)
//Fore the reader/writers to be reopened (at correct offset)
err := o.Sync()
if err != nil {
return 0, err
}
stat, err := o.Stat()
if err != nil {
return 0, nil
}
switch whence {
case 0:
o.fhOffset = newOffset
case 1:
o.fhOffset += newOffset
case 2:
o.fhOffset = stat.Size() + newOffset
}
return o.fhOffset, nil
}
func (o *GcsFile) Read(p []byte) (n int, err error) {
return o.ReadAt(p, o.fhOffset)
}
func (o *GcsFile) ReadAt(p []byte, off int64) (n int, err error) {
if o.closed {
return 0, ErrFileClosed
}
read, err := o.resource.ReadAt(p, off)
o.fhOffset += int64(read)
return read, err
}
func (o *GcsFile) Write(p []byte) (n int, err error) {
return o.WriteAt(p, o.fhOffset)
}
func (o *GcsFile) WriteAt(b []byte, off int64) (n int, err error) {
if o.closed {
return 0, ErrFileClosed
}
if o.openFlags&os.O_RDONLY != 0 {
return 0, fmt.Errorf("file is opend as read only")
}
_, err = o.resource.obj.Attrs(o.resource.ctx)
if err != nil {
if err == storage.ErrObjectNotExist {
if o.openFlags&os.O_CREATE == 0 {
return 0, ErrFileNotFound
}
} else {
return 0, fmt.Errorf("error getting file attributes: %v", err)
}
}
written, err := o.resource.WriteAt(b, off)
o.fhOffset += int64(written)
return written, err
}
func (o *GcsFile) Chmod(mode os.FileMode) error {
return errors.New("method Chmod is not implemented in GCS")
}
func (o *GcsFile) Name() string {
return filepath.FromSlash(o.resource.name)
}
func (o *GcsFile) readdirImpl(count int) ([]*FileInfo, error) {
err := o.Sync()
if err != nil {
return nil, err
}
var ownInfo os.FileInfo
ownInfo, err = o.Stat()
if err != nil {
return nil, err
}
if !ownInfo.IsDir() {
return nil, syscall.ENOTDIR
}
path := o.resource.fs.ensureTrailingSeparator(o.resource.name)
if o.ReadDirIt == nil {
//log.Printf("Querying path : %s\n", path)
bucketName, bucketPath := o.resource.fs.splitName(path)
o.ReadDirIt = o.resource.fs.client.Bucket(bucketName).Objects(
o.resource.ctx, &storage.Query{Delimiter: o.resource.fs.separator, Prefix: bucketPath, Versions: false})
}
var res []*FileInfo
for {
object, err := o.ReadDirIt.Next()
if err == iterator.Done {
// reset the iterator
o.ReadDirIt = nil
if len(res) > 0 || count <= 0 {
return res, nil
}
return res, io.EOF
}
if err != nil {
return res, err
}
tmp := newFileInfoFromAttrs(object, o.resource.fileMode)
if tmp.Name() == "" {
// neither object.Name, not object.Prefix were present - so let's skip this unknown thing
continue
}
if object.Name == "" && object.Prefix == "" {
continue
}
if tmp.Name() == ownInfo.Name() {
// Hmmm
continue
}
res = append(res, tmp)
// This would interrupt the iteration, once we reach the count.
// But it would then have files coming before folders - that's not what we want to have exactly,
// since it makes the results unpredictable. Hence, we iterate all the objects and then do
// the cut-off in a higher level method
//if count > 0 && len(res) >= count {
// break
//}
}
//return res, nil
}
func (o *GcsFile) Readdir(count int) ([]os.FileInfo, error) {
fi, err := o.readdirImpl(count)
if len(fi) > 0 {
sort.Sort(ByName(fi))
}
if count > 0 {
fi = fi[:count]
}
var res []os.FileInfo
for _, f := range fi {
res = append(res, f)
}
return res, err
}
func (o *GcsFile) Readdirnames(n int) ([]string, error) {
fi, err := o.Readdir(n)
if err != nil && err != io.EOF {
return nil, err
}
names := make([]string, len(fi))
for i, f := range fi {
names[i] = f.Name()
}
return names, err
}
func (o *GcsFile) Stat() (os.FileInfo, error) {
err := o.Sync()
if err != nil {
return nil, err
}
return newFileInfo(o.resource.name, o.resource.fs, o.resource.fileMode)
}
func (o *GcsFile) Sync() error {
return o.resource.maybeCloseIo()
}
func (o *GcsFile) Truncate(wantedSize int64) error {
if o.closed {
return ErrFileClosed
}
if o.openFlags == os.O_RDONLY {
return fmt.Errorf("file was opened as read only")
}
return o.resource.Truncate(wantedSize)
}
func (o *GcsFile) WriteString(s string) (ret int, err error) {
return o.Write([]byte(s))
}