forked from mirror/afero
308 lines
6.5 KiB
Go
308 lines
6.5 KiB
Go
// Copyright © 2021 Vasily Ovchinnikov <vasily@remerge.io>.
|
|
//
|
|
// The code in this file is derived from afero fork github.com/Zatte/afero by Mikael Rapp
|
|
// licensed under Apache License 2.0.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package gcsfs
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"syscall"
|
|
|
|
"github.com/googleapis/google-cloud-go-testing/storage/stiface"
|
|
|
|
"cloud.google.com/go/storage"
|
|
|
|
"google.golang.org/api/iterator"
|
|
)
|
|
|
|
// GcsFs is the Afero version adapted for GCS
|
|
type GcsFile struct {
|
|
openFlags int
|
|
fhOffset int64 //File handle specific offset
|
|
closed bool
|
|
ReadDirIt stiface.ObjectIterator
|
|
resource *gcsFileResource
|
|
}
|
|
|
|
func NewGcsFile(
|
|
ctx context.Context,
|
|
fs *GcsFs,
|
|
obj stiface.ObjectHandle,
|
|
openFlags int,
|
|
// Unused: there is no use to the file mode in GCloud just yet - but we keep it here, just in case we need it
|
|
fileMode os.FileMode,
|
|
name string,
|
|
) *GcsFile {
|
|
return &GcsFile{
|
|
openFlags: openFlags,
|
|
fhOffset: 0,
|
|
closed: false,
|
|
ReadDirIt: nil,
|
|
resource: &gcsFileResource{
|
|
ctx: ctx,
|
|
fs: fs,
|
|
|
|
obj: obj,
|
|
name: name,
|
|
fileMode: fileMode,
|
|
|
|
currentGcsSize: 0,
|
|
|
|
offset: 0,
|
|
reader: nil,
|
|
writer: nil,
|
|
},
|
|
}
|
|
}
|
|
|
|
func NewGcsFileFromOldFH(
|
|
openFlags int,
|
|
fileMode os.FileMode,
|
|
oldFile *gcsFileResource,
|
|
) *GcsFile {
|
|
res := &GcsFile{
|
|
openFlags: openFlags,
|
|
fhOffset: 0,
|
|
closed: false,
|
|
ReadDirIt: nil,
|
|
|
|
resource: oldFile,
|
|
}
|
|
res.resource.fileMode = fileMode
|
|
|
|
return res
|
|
}
|
|
|
|
func (o *GcsFile) Close() error {
|
|
if o.closed {
|
|
// the afero spec expects the call to Close on a closed file to return an error
|
|
return ErrFileClosed
|
|
}
|
|
o.closed = true
|
|
return o.resource.Close()
|
|
}
|
|
|
|
func (o *GcsFile) Seek(newOffset int64, whence int) (int64, error) {
|
|
if o.closed {
|
|
return 0, ErrFileClosed
|
|
}
|
|
|
|
//Since this is an expensive operation; let's make sure we need it
|
|
if (whence == 0 && newOffset == o.fhOffset) || (whence == 1 && newOffset == 0) {
|
|
return o.fhOffset, nil
|
|
}
|
|
log.Printf("WARNING: Seek beavhior triggered, highly inefficent. Offset before seek is at %d\n", o.fhOffset)
|
|
|
|
//Fore the reader/writers to be reopened (at correct offset)
|
|
err := o.Sync()
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
stat, err := o.Stat()
|
|
if err != nil {
|
|
return 0, nil
|
|
}
|
|
|
|
switch whence {
|
|
case 0:
|
|
o.fhOffset = newOffset
|
|
case 1:
|
|
o.fhOffset += newOffset
|
|
case 2:
|
|
o.fhOffset = stat.Size() + newOffset
|
|
}
|
|
return o.fhOffset, nil
|
|
}
|
|
|
|
func (o *GcsFile) Read(p []byte) (n int, err error) {
|
|
return o.ReadAt(p, o.fhOffset)
|
|
}
|
|
|
|
func (o *GcsFile) ReadAt(p []byte, off int64) (n int, err error) {
|
|
if o.closed {
|
|
return 0, ErrFileClosed
|
|
}
|
|
|
|
read, err := o.resource.ReadAt(p, off)
|
|
o.fhOffset += int64(read)
|
|
return read, err
|
|
}
|
|
|
|
func (o *GcsFile) Write(p []byte) (n int, err error) {
|
|
return o.WriteAt(p, o.fhOffset)
|
|
}
|
|
|
|
func (o *GcsFile) WriteAt(b []byte, off int64) (n int, err error) {
|
|
if o.closed {
|
|
return 0, ErrFileClosed
|
|
}
|
|
|
|
if o.openFlags&os.O_RDONLY != 0 {
|
|
return 0, fmt.Errorf("file is opend as read only")
|
|
}
|
|
|
|
_, err = o.resource.obj.Attrs(o.resource.ctx)
|
|
if err != nil {
|
|
if err == storage.ErrObjectNotExist {
|
|
if o.openFlags&os.O_CREATE == 0 {
|
|
return 0, ErrFileNotFound
|
|
}
|
|
} else {
|
|
return 0, fmt.Errorf("error getting file attributes: %v", err)
|
|
}
|
|
}
|
|
|
|
written, err := o.resource.WriteAt(b, off)
|
|
o.fhOffset += int64(written)
|
|
return written, err
|
|
}
|
|
|
|
func (o *GcsFile) Name() string {
|
|
return filepath.FromSlash(o.resource.name)
|
|
}
|
|
|
|
func (o *GcsFile) readdirImpl(count int) ([]*FileInfo, error) {
|
|
err := o.Sync()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var ownInfo os.FileInfo
|
|
ownInfo, err = o.Stat()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if !ownInfo.IsDir() {
|
|
return nil, syscall.ENOTDIR
|
|
}
|
|
|
|
path := o.resource.fs.ensureTrailingSeparator(o.resource.name)
|
|
if o.ReadDirIt == nil {
|
|
//log.Printf("Querying path : %s\n", path)
|
|
bucketName, bucketPath := o.resource.fs.splitName(path)
|
|
|
|
o.ReadDirIt = o.resource.fs.client.Bucket(bucketName).Objects(
|
|
o.resource.ctx, &storage.Query{Delimiter: o.resource.fs.separator, Prefix: bucketPath, Versions: false})
|
|
}
|
|
var res []*FileInfo
|
|
for {
|
|
object, err := o.ReadDirIt.Next()
|
|
if err == iterator.Done {
|
|
// reset the iterator
|
|
o.ReadDirIt = nil
|
|
|
|
if len(res) > 0 || count <= 0 {
|
|
return res, nil
|
|
}
|
|
|
|
return res, io.EOF
|
|
}
|
|
if err != nil {
|
|
return res, err
|
|
}
|
|
|
|
tmp := newFileInfoFromAttrs(object, o.resource.fileMode)
|
|
|
|
if tmp.Name() == "" {
|
|
// neither object.Name, not object.Prefix were present - so let's skip this unknown thing
|
|
continue
|
|
}
|
|
|
|
if object.Name == "" && object.Prefix == "" {
|
|
continue
|
|
}
|
|
|
|
if tmp.Name() == ownInfo.Name() {
|
|
// Hmmm
|
|
continue
|
|
}
|
|
|
|
res = append(res, tmp)
|
|
|
|
// This would interrupt the iteration, once we reach the count.
|
|
// But it would then have files coming before folders - that's not what we want to have exactly,
|
|
// since it makes the results unpredictable. Hence, we iterate all the objects and then do
|
|
// the cut-off in a higher level method
|
|
//if count > 0 && len(res) >= count {
|
|
// break
|
|
//}
|
|
}
|
|
//return res, nil
|
|
}
|
|
|
|
func (o *GcsFile) Readdir(count int) ([]os.FileInfo, error) {
|
|
fi, err := o.readdirImpl(count)
|
|
if len(fi) > 0 {
|
|
sort.Sort(ByName(fi))
|
|
}
|
|
|
|
if count > 0 {
|
|
fi = fi[:count]
|
|
}
|
|
|
|
var res []os.FileInfo
|
|
for _, f := range fi {
|
|
res = append(res, f)
|
|
}
|
|
return res, err
|
|
}
|
|
|
|
func (o *GcsFile) Readdirnames(n int) ([]string, error) {
|
|
fi, err := o.Readdir(n)
|
|
if err != nil && err != io.EOF {
|
|
return nil, err
|
|
}
|
|
names := make([]string, len(fi))
|
|
|
|
for i, f := range fi {
|
|
names[i] = f.Name()
|
|
}
|
|
return names, err
|
|
}
|
|
|
|
func (o *GcsFile) Stat() (os.FileInfo, error) {
|
|
err := o.Sync()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return newFileInfo(o.resource.name, o.resource.fs, o.resource.fileMode)
|
|
}
|
|
|
|
func (o *GcsFile) Sync() error {
|
|
return o.resource.maybeCloseIo()
|
|
}
|
|
|
|
func (o *GcsFile) Truncate(wantedSize int64) error {
|
|
if o.closed {
|
|
return ErrFileClosed
|
|
}
|
|
if o.openFlags == os.O_RDONLY {
|
|
return fmt.Errorf("file was opened as read only")
|
|
}
|
|
return o.resource.Truncate(wantedSize)
|
|
}
|
|
|
|
func (o *GcsFile) WriteString(s string) (ret int, err error) {
|
|
return o.Write([]byte(s))
|
|
}
|